hashiverse_lib/transport/transport.rs
1//! # Transport abstraction — the seam between protocol and network
2//!
3//! Three traits define everything the protocol code needs from a transport, with
4//! concrete implementations (HTTPS in production, in-memory for tests) supplied by
5//! sibling modules:
6//!
7//! - [`TransportFactory`] — builds a [`TransportServer`] and issues outbound client
8//! requests. Injected via [`crate::tools::runtime_services::RuntimeServices`] so the
9//! same protocol code runs over HTTPS on native, over HTTP via `gloo-net` in WASM,
10//! and over a deterministic in-memory queue in tests.
11//! - [`TransportServer`] — binds an address, delivers inbound requests over an
12//! `mpsc::Receiver<IncomingRequest>`, and shuts down on a `CancellationToken`.
13//! - [`TransportServerHandler`] — the application-level processor of incoming requests,
14//! implemented by the server binary.
15//!
16//! [`IncomingRequest`] wraps one inbound request: the raw bytes, the caller's address,
17//! a oneshot response channel, and a
18//! [`crate::transport::ddos::ddos::DdosConnectionGuard`] whose drop releases the
19//! caller's per-IP connection slot — so request-level DDoS accounting is automatic and
20//! can't be leaked.
21
22use crate::tools::BytesGatherer;
23use crate::transport::ddos::ddos::DdosConnectionGuard;
24use crate::transport::transport_ownership_proof::{RejectAllTransportOwnershipProof, TransportOwnershipProof};
25use bytes::Bytes;
26use log::{info, trace, warn};
27use std::sync::Arc;
28use tokio::sync::{mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30
31/// A single request delivered from a [`TransportServer`] to the
32/// application-level [`TransportServerHandler`] for processing.
33///
34/// The transport layer owns inbound sockets, parses wire-level framing, and forwards the
35/// decoded payload to the handler as an `IncomingRequest`. It carries the raw request bytes
36/// plus the metadata the handler may need to reason about: the caller's address (for logging
37/// and DDoS accounting), a oneshot channel on which to send the response, and an
38/// [`DdosConnectionGuard`] that ties the request's lifetime to its underlying connection slot
39/// — dropping the guard releases the per-IP connection count.
40///
41/// Handlers do not construct `IncomingRequest` directly; they `.await` them off the
42/// `mpsc::Receiver` passed to [`TransportServerHandler::run`].
43pub struct IncomingRequest {
44 pub caller_address: String,
45 pub bytes: Bytes,
46 pub reply: oneshot::Sender<BytesGatherer>,
47 ddos_connection_guard: Arc<DdosConnectionGuard>,
48}
49
50impl IncomingRequest {
51 pub fn new(caller_address: String, bytes: Bytes, reply: oneshot::Sender<BytesGatherer>, ddos_connection_guard: Arc<DdosConnectionGuard>) -> Self {
52 Self { caller_address, bytes, reply, ddos_connection_guard }
53 }
54
55 pub fn report_bad_request(&self) {
56 self.ddos_connection_guard.report_bad_request();
57 }
58}
59
60/// The lifecycle state of a [`TransportServer`].
61///
62/// A server starts in `Created`, transitions to `Listening` when [`TransportServer::listen`]
63/// is called for the first time, and moves to `Shutdown` when its cancellation token fires.
64/// Implementations use this to reject double-start (`Listening` → `Listening`) and
65/// use-after-shutdown (`Shutdown` → anything) errors cleanly rather than silently racing.
66#[derive(Debug, Clone, Copy, PartialEq)]
67pub enum ServerState {
68 Created,
69 Listening,
70 Shutdown,
71}
72
73/// The application-level request handler that sits behind a [`TransportServer`].
74///
75/// Implementors receive decoded request bytes and produce response bytes without caring
76/// about sockets, framing, or connection lifecycle — all of that stays in the transport
77/// layer. The default [`TransportServerHandler::run`] implementation is the canonical event
78/// loop: it reads [`IncomingRequest`]s off a channel, dispatches each one to
79/// [`TransportServerHandler::handle`], and ships the response back to the originating client.
80/// Shutdown is cooperative via a [`CancellationToken`].
81///
82/// The server binary's RPC dispatcher and the client's own loopback handler both implement
83/// this trait, which lets them share the same event-loop plumbing.
84pub trait TransportServerHandler {
85 async fn handle(&self, bytes: Bytes) -> BytesGatherer;
86
87 async fn run(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> anyhow::Result<()> {
88 loop {
89 tokio::select! {
90 _ = cancellation_token.cancelled() => {
91 break;
92 },
93
94 receipt = rx.recv() => {
95 match receipt {
96 Some(incoming) => {
97 info!("received packet from {}: {:?}", incoming.caller_address, incoming.bytes);
98 let result = self.handle(incoming.bytes.clone()).await;
99 let result = incoming.reply.send(result);
100 match result {
101 Ok(_) => { trace!("sent reply"); },
102 Err(_) => { warn!("failed to send reply"); },
103 }
104 },
105 None => {
106 warn!("channel closed");
107 break;
108 }
109 }
110 },
111 }
112 }
113
114 Ok(())
115 }
116}
117
118/// A server-side endpoint that accepts inbound connections and forwards each request to a
119/// handler via an mpsc channel.
120///
121/// `TransportServer` abstracts the concrete listening strategy — `MemTransportServer` for
122/// in-memory tests, the HTTPS implementation in the server crate for production, a browser
123/// stub in the wasm client that panics on `listen`. All of them expose the same two-operation
124/// surface: report where they can be reached (`get_address`) and run the accept loop until
125/// the supplied `CancellationToken` fires (`listen`).
126///
127/// A `TransportServer` instance is single-shot — once `listen` has completed (cancelled or
128/// errored) the server transitions to [`ServerState::Shutdown`] and must not be re-used.
129#[async_trait::async_trait]
130pub trait TransportServer: Send + Sync {
131 fn get_address(&self) -> &String;
132
133 async fn listen(&self, cancellation_token: CancellationToken, handler: mpsc::Sender<IncomingRequest>) -> anyhow::Result<()>;
134
135 /// Returns the per-transport ownership-proof object used to (a) produce the proof bytes
136 /// embedded in this server's outbound `AnnounceV2` requests and (b) verify proof bytes
137 /// received from peers in inbound `AnnounceV2` requests. The default reject-all
138 /// implementation is overridden by transports that have a real notion of address
139 /// ownership (HTTPS via ACME cert, mem-transport via an empty marker, …).
140 fn get_transport_ownership_proof(&self) -> Arc<dyn TransportOwnershipProof> {
141 Arc::new(RejectAllTransportOwnershipProof)
142 }
143}
144
145/// The pluggable network layer of the protocol — the single point where the crate touches
146/// "how do we move bytes around the world".
147///
148/// A `TransportFactory` knows three things: (1) where to find the network's bootstrap peers
149/// for initial peer discovery, (2) how to create a [`TransportServer`] that listens on a given
150/// port, and (3) how to perform an outbound unary RPC against a peer address. Everything
151/// above this layer — the RPC packet framing, PoW, peer tracking, Kademlia — is network-
152/// agnostic and simply calls [`TransportFactory::rpc`].
153///
154/// Concrete implementations include the in-memory `MemTransportFactory` used by integration
155/// tests, `FullHttpsTransportFactory` in the server crate for production TLS+HTTPS, and
156/// `WasmTransportFactory` in the browser client that speaks HTTP via `gloo-net`. Swapping
157/// the factory on [`crate::tools::runtime_services::RuntimeServices`] changes the wire protocol
158/// without any other code having to care.
159#[async_trait::async_trait]
160pub trait TransportFactory: Send + Sync {
161 async fn get_bootstrap_addresses(&self) -> Vec<String>;
162 async fn create_server(&self, base_path: &str, port: u16, force_local_network: bool) -> anyhow::Result<Arc<dyn TransportServer>>;
163 async fn rpc(&self, address: &str, bytes: Bytes) -> anyhow::Result<Bytes>;
164}
165
166#[cfg(any(test, feature = "generic-tests"))]
167pub mod tests {
168 use crate::tools::time::{MILLIS_IN_MILLISECOND, MILLIS_IN_SECOND};
169 use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
170 use crate::tools::tools::get_temp_dir;
171 use crate::tools::BytesGatherer;
172 use crate::transport::transport::{IncomingRequest, TransportFactory, TransportServerHandler};
173 use bytes::Bytes;
174 use log::{info, trace};
175 use std::sync::Arc;
176 use tokio::join;
177 use tokio::sync::mpsc;
178 use tokio_util::sync::CancellationToken;
179
180 pub async fn rpc_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
181 let time_provider = Arc::new(RealTimeProvider);
182 // configure_logging_with_time_provider("trace", time_provider.clone());
183
184 let cancellation_token = CancellationToken::new();
185 let (_, temp_dir_str) = get_temp_dir()?;
186 let transport_server = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
187 let address = transport_server.get_address().clone();
188 trace!("server address is {}", address);
189
190 let (tx, rx) = mpsc::channel::<IncomingRequest>(32);
191
192 struct MyHandler {}
193 impl TransportServerHandler for MyHandler {
194 async fn handle(&self, _: Bytes) -> BytesGatherer {
195 BytesGatherer::from_bytes(Bytes::from("here is the reply"))
196 }
197 }
198
199 let my_handler = MyHandler {};
200
201 info!("running server and clients in parallel");
202 let results = join!(
203 my_handler.run(cancellation_token.clone(), rx),
204 transport_server.listen(cancellation_token.clone(), tx),
205 // The driver process
206 async {
207 info!("waiting for server to start");
208 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
209
210 for _ in 0..20 {
211 info!("calling server");
212 let bytes = Bytes::from("hello");
213 let response = transport_factory.rpc(&address, bytes).await.unwrap();
214 assert_eq!(response, Bytes::from("here is the reply"));
215 time_provider.sleep_millis(MILLIS_IN_MILLISECOND.const_mul(100)).await;
216 }
217
218 info!("shutting down servers");
219 cancellation_token.cancel();
220 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
221
222 Ok::<(), anyhow::Error>(())
223 }
224 );
225
226 // Check all the processes exited happily
227 assert!(results.0.is_ok());
228 assert!(results.1.is_ok());
229 assert!(results.2.is_ok());
230
231 Ok(())
232 }
233
234 pub async fn bind_port_zero_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
235 let time_provider = Arc::new(RealTimeProvider);
236 // configure_logging_with_time_provider("trace", time_provider.clone());
237
238 info!("starting test");
239
240 let cancellation_token = CancellationToken::new();
241 let (_, temp_dir_str) = get_temp_dir()?;
242 let transport_server_1 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
243 let transport_server_2 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
244
245 let (tx_1, rx_1) = mpsc::channel::<IncomingRequest>(32);
246 let (tx_2, rx_2) = mpsc::channel::<IncomingRequest>(32);
247
248 struct MyHandler {}
249 impl TransportServerHandler for MyHandler {
250 async fn handle(&self, _: Bytes) -> BytesGatherer {
251 BytesGatherer::from_bytes(Bytes::from("here is the reply"))
252 }
253 }
254
255 let my_handler = MyHandler {};
256
257 info!("running server and clients in parallel");
258 let results = join!(
259 my_handler.run(cancellation_token.clone(), rx_1),
260 my_handler.run(cancellation_token.clone(), rx_2),
261 transport_server_1.listen(cancellation_token.clone(), tx_1),
262 transport_server_2.listen(cancellation_token.clone(), tx_2),
263 // The driver process
264 async {
265 info!("waiting for server to start");
266 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
267
268 info!("shutting down servers");
269 cancellation_token.cancel();
270 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
271
272 Ok::<(), anyhow::Error>(())
273 }
274 );
275
276 // Check all the processes exited happily
277 assert!(results.0.is_ok());
278 assert!(results.1.is_ok());
279 assert!(results.2.is_ok());
280 assert!(results.3.is_ok());
281 assert!(results.4.is_ok());
282
283 Ok(())
284 }
285}