Skip to main content

hashiverse_lib/transport/
transport.rs

1//! # Transport abstraction — the seam between protocol and network
2//!
3//! Three traits define everything the protocol code needs from a transport, with
4//! concrete implementations (HTTPS in production, in-memory for tests) supplied by
5//! sibling modules:
6//!
7//! - [`TransportFactory`] — builds a [`TransportServer`] and issues outbound client
8//!   requests. Injected via [`crate::tools::runtime_services::RuntimeServices`] so the
9//!   same protocol code runs over HTTPS on native, over HTTP via `gloo-net` in WASM,
10//!   and over a deterministic in-memory queue in tests.
11//! - [`TransportServer`] — binds an address, delivers inbound requests over an
12//!   `mpsc::Receiver<IncomingRequest>`, and shuts down on a `CancellationToken`.
13//! - [`TransportServerHandler`] — the application-level processor of incoming requests,
14//!   implemented by the server binary.
15//!
16//! [`IncomingRequest`] wraps one inbound request: the raw bytes, the caller's address,
17//! a oneshot response channel, and a
18//! [`crate::transport::ddos::ddos::DdosConnectionGuard`] whose drop releases the
19//! caller's per-IP connection slot — so request-level DDoS accounting is automatic and
20//! can't be leaked.
21
22use crate::tools::BytesGatherer;
23use crate::transport::ddos::ddos::DdosConnectionGuard;
24use crate::transport::transport_ownership_proof::{RejectAllTransportOwnershipProof, TransportOwnershipProof};
25use bytes::Bytes;
26use log::{info, trace, warn};
27use std::sync::Arc;
28use tokio::sync::{mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30
31/// A single request delivered from a [`TransportServer`] to the
32/// application-level [`TransportServerHandler`] for processing.
33///
34/// The transport layer owns inbound sockets, parses wire-level framing, and forwards the
35/// decoded payload to the handler as an `IncomingRequest`. It carries the raw request bytes
36/// plus the metadata the handler may need to reason about: the caller's address (for logging
37/// and DDoS accounting), a oneshot channel on which to send the response, and an
38/// [`DdosConnectionGuard`] that ties the request's lifetime to its underlying connection slot
39/// — dropping the guard releases the per-IP connection count.
40///
41/// Handlers do not construct `IncomingRequest` directly; they `.await` them off the
42/// `mpsc::Receiver` passed to [`TransportServerHandler::run`].
43pub struct IncomingRequest {
44    pub caller_address: String,
45    pub bytes: Bytes,
46    pub reply: oneshot::Sender<BytesGatherer>,
47    ddos_connection_guard: Arc<DdosConnectionGuard>,
48}
49
50impl IncomingRequest {
51    pub fn new(caller_address: String, bytes: Bytes, reply: oneshot::Sender<BytesGatherer>, ddos_connection_guard: Arc<DdosConnectionGuard>) -> Self {
52        Self { caller_address, bytes, reply, ddos_connection_guard }
53    }
54
55    pub fn report_bad_request(&self) {
56        self.ddos_connection_guard.report_bad_request();
57    }
58}
59
60/// The lifecycle state of a [`TransportServer`].
61///
62/// A server starts in `Created`, transitions to `Listening` when [`TransportServer::listen`]
63/// is called for the first time, and moves to `Shutdown` when its cancellation token fires.
64/// Implementations use this to reject double-start (`Listening` → `Listening`) and
65/// use-after-shutdown (`Shutdown` → anything) errors cleanly rather than silently racing.
66#[derive(Debug, Clone, Copy, PartialEq)]
67pub enum ServerState {
68    Created,
69    Listening,
70    Shutdown,
71}
72
73/// The application-level request handler that sits behind a [`TransportServer`].
74///
75/// Implementors receive decoded request bytes and produce response bytes without caring
76/// about sockets, framing, or connection lifecycle — all of that stays in the transport
77/// layer. The default [`TransportServerHandler::run`] implementation is the canonical event
78/// loop: it reads [`IncomingRequest`]s off a channel, dispatches each one to
79/// [`TransportServerHandler::handle`], and ships the response back to the originating client.
80/// Shutdown is cooperative via a [`CancellationToken`].
81///
82/// The server binary's RPC dispatcher and the client's own loopback handler both implement
83/// this trait, which lets them share the same event-loop plumbing.
84pub trait TransportServerHandler {
85    async fn handle(&self, bytes: Bytes) -> BytesGatherer;
86
87    async fn run(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> anyhow::Result<()> {
88        loop {
89            tokio::select! {
90                _ = cancellation_token.cancelled() => {
91                    break;
92                },
93
94                receipt = rx.recv() => {
95                    match receipt {
96                            Some(incoming) => {
97                                info!("received packet from {}: {:?}", incoming.caller_address, incoming.bytes);
98                                let result = self.handle(incoming.bytes.clone()).await;
99                                let result = incoming.reply.send(result);
100                                match result {
101                                    Ok(_) => { trace!("sent reply"); },
102                                    Err(_) => { warn!("failed to send reply"); },
103                                }
104                            },
105                            None => {
106                                warn!("channel closed");
107                                break;
108                            }
109                        }
110                },
111            }
112        }
113
114        Ok(())
115    }
116}
117
118/// A server-side endpoint that accepts inbound connections and forwards each request to a
119/// handler via an mpsc channel.
120///
121/// `TransportServer` abstracts the concrete listening strategy — `MemTransportServer` for
122/// in-memory tests, the HTTPS implementation in the server crate for production, a browser
123/// stub in the wasm client that panics on `listen`. All of them expose the same two-operation
124/// surface: report where they can be reached (`get_address`) and run the accept loop until
125/// the supplied `CancellationToken` fires (`listen`).
126///
127/// A `TransportServer` instance is single-shot — once `listen` has completed (cancelled or
128/// errored) the server transitions to [`ServerState::Shutdown`] and must not be re-used.
129#[async_trait::async_trait]
130pub trait TransportServer: Send + Sync {
131    fn get_address(&self) -> &String;
132
133    async fn listen(&self, cancellation_token: CancellationToken, handler: mpsc::Sender<IncomingRequest>) -> anyhow::Result<()>;
134
135    /// Returns the per-transport ownership-proof object used to (a) produce the proof bytes
136    /// embedded in this server's outbound `AnnounceV2` requests and (b) verify proof bytes
137    /// received from peers in inbound `AnnounceV2` requests. The default reject-all
138    /// implementation is overridden by transports that have a real notion of address
139    /// ownership (HTTPS via ACME cert, mem-transport via an empty marker, …).
140    fn get_transport_ownership_proof(&self) -> Arc<dyn TransportOwnershipProof> {
141        Arc::new(RejectAllTransportOwnershipProof)
142    }
143}
144
145/// The pluggable network layer of the protocol — the single point where the crate touches
146/// "how do we move bytes around the world".
147///
148/// A `TransportFactory` knows three things: (1) where to find the network's bootstrap peers
149/// for initial peer discovery, (2) how to create a [`TransportServer`] that listens on a given
150/// port, and (3) how to perform an outbound unary RPC against a peer address. Everything
151/// above this layer — the RPC packet framing, PoW, peer tracking, Kademlia — is network-
152/// agnostic and simply calls [`TransportFactory::rpc`].
153///
154/// Concrete implementations include the in-memory `MemTransportFactory` used by integration
155/// tests, `FullHttpsTransportFactory` in the server crate for production TLS+HTTPS, and
156/// `WasmTransportFactory` in the browser client that speaks HTTP via `gloo-net`. Swapping
157/// the factory on [`crate::tools::runtime_services::RuntimeServices`] changes the wire protocol
158/// without any other code having to care.
159#[async_trait::async_trait]
160pub trait TransportFactory: Send + Sync {
161    async fn get_bootstrap_addresses(&self) -> Vec<String>;
162    async fn create_server(&self, base_path: &str, port: u16, force_local_network: bool) -> anyhow::Result<Arc<dyn TransportServer>>;
163    async fn rpc(&self, address: &str, bytes: Bytes) -> anyhow::Result<Bytes>;
164}
165
166#[cfg(any(test, feature = "generic-tests"))]
167pub mod tests {
168    use crate::tools::time::{MILLIS_IN_MILLISECOND, MILLIS_IN_SECOND};
169    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
170    use crate::tools::tools::get_temp_dir;
171    use crate::tools::BytesGatherer;
172    use crate::transport::transport::{IncomingRequest, TransportFactory, TransportServerHandler};
173    use bytes::Bytes;
174    use log::{info, trace};
175    use std::sync::Arc;
176    use tokio::join;
177    use tokio::sync::mpsc;
178    use tokio_util::sync::CancellationToken;
179
180    pub async fn rpc_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
181        let time_provider = Arc::new(RealTimeProvider);
182        // configure_logging_with_time_provider("trace", time_provider.clone());
183
184        let cancellation_token = CancellationToken::new();
185        let (_, temp_dir_str) = get_temp_dir()?;
186        let transport_server = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
187        let address = transport_server.get_address().clone();
188        trace!("server address is {}", address);
189
190        let (tx, rx) = mpsc::channel::<IncomingRequest>(32);
191
192        struct MyHandler {}
193        impl TransportServerHandler for MyHandler {
194            async fn handle(&self, _: Bytes) -> BytesGatherer {
195                BytesGatherer::from_bytes(Bytes::from("here is the reply"))
196            }
197        }
198
199        let my_handler = MyHandler {};
200
201        info!("running server and clients in parallel");
202        let results = join!(
203            my_handler.run(cancellation_token.clone(), rx),
204            transport_server.listen(cancellation_token.clone(), tx),
205            // The driver process
206            async {
207                info!("waiting for server to start");
208                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
209
210                for _ in 0..20 {
211                    info!("calling server");
212                    let bytes = Bytes::from("hello");
213                    let response = transport_factory.rpc(&address, bytes).await.unwrap();
214                    assert_eq!(response, Bytes::from("here is the reply"));
215                    time_provider.sleep_millis(MILLIS_IN_MILLISECOND.const_mul(100)).await;
216                }
217
218                info!("shutting down servers");
219                cancellation_token.cancel();
220                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
221
222                Ok::<(), anyhow::Error>(())
223            }
224        );
225
226        // Check all the processes exited happily
227        assert!(results.0.is_ok());
228        assert!(results.1.is_ok());
229        assert!(results.2.is_ok());
230
231        Ok(())
232    }
233
234    pub async fn bind_port_zero_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
235        let time_provider = Arc::new(RealTimeProvider);
236        // configure_logging_with_time_provider("trace", time_provider.clone());
237
238        info!("starting test");
239
240        let cancellation_token = CancellationToken::new();
241        let (_, temp_dir_str) = get_temp_dir()?;
242        let transport_server_1 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
243        let transport_server_2 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
244
245        let (tx_1, rx_1) = mpsc::channel::<IncomingRequest>(32);
246        let (tx_2, rx_2) = mpsc::channel::<IncomingRequest>(32);
247
248        struct MyHandler {}
249        impl TransportServerHandler for MyHandler {
250            async fn handle(&self, _: Bytes) -> BytesGatherer {
251                BytesGatherer::from_bytes(Bytes::from("here is the reply"))
252            }
253        }
254
255        let my_handler = MyHandler {};
256
257        info!("running server and clients in parallel");
258        let results = join!(
259            my_handler.run(cancellation_token.clone(), rx_1),
260            my_handler.run(cancellation_token.clone(), rx_2),
261            transport_server_1.listen(cancellation_token.clone(), tx_1),
262            transport_server_2.listen(cancellation_token.clone(), tx_2),
263            // The driver process
264            async {
265                info!("waiting for server to start");
266                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
267
268                info!("shutting down servers");
269                cancellation_token.cancel();
270                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
271
272                Ok::<(), anyhow::Error>(())
273            }
274        );
275
276        // Check all the processes exited happily
277        assert!(results.0.is_ok());
278        assert!(results.1.is_ok());
279        assert!(results.2.is_ok());
280        assert!(results.3.is_ok());
281        assert!(results.4.is_ok());
282
283        Ok(())
284    }
285}