hashiverse_server_lib/transport/
full_https_transport.rs1use crate::transport::https_transport_cert_refresher::HttpsTransportCertRefresher;
24use crate::transport::https_transport_ownership_proof::HttpsTransportOwnershipProof;
25use crate::tools::tools::get_public_ipv4;
26use anyhow::anyhow;
27use axum::body::Body;
28use axum::extract::{DefaultBodyLimit, Extension};
29use axum::http::{header, StatusCode, Uri};
30use axum::response::{IntoResponse, Response};
31use axum::{routing::get, Router};
32use bytes::Bytes;
33use futures::stream;
34use hashiverse_lib::tools::config;
35use hashiverse_lib::transport::ddos::ddos::{DdosConnectionGuard, DdosProtection};
36use hashiverse_lib::transport::transport::{IncomingRequest, ServerState, TransportFactory, TransportServer};
37use hashiverse_lib::transport::transport_ownership_proof::TransportOwnershipProof;
38use hyper::body::Incoming;
39use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
40use hyper_util::server::conn::auto::Builder as AutoBuilder;
41use log::{error, info, trace, warn};
42use parking_lot::RwLock;
43use rustls::ServerConfig;
44use std::convert::Infallible;
45use std::path::PathBuf;
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::net::TcpListener;
49use tokio::sync::{mpsc, oneshot, Mutex, Semaphore};
50use tokio::task::JoinSet;
51use tokio_rustls::TlsAcceptor;
52use tokio_util::sync::CancellationToken;
53use tower::{Service, ServiceExt};
54use tower_http::cors::CorsLayer;
55use tower_http::timeout::RequestBodyTimeoutLayer;
56use hashiverse_lib::transport::bootstrap_provider::bootstrap_provider::BootstrapProvider;
57
58#[derive(Clone)]
65pub struct FullHttpsTransportFactory {
66 ddos_protection: Arc<dyn DdosProtection>,
67 https_transport_factory: hashiverse_lib::transport::partial_https_transport::PartialHttpsTransportFactory,
68}
69
70pub struct FullHttpsTransportServer {
71 address: String,
72 listener: Arc<Mutex<Option<TcpListener>>>, state: Arc<RwLock<ServerState>>,
74 ddos_protection: Arc<dyn DdosProtection>,
75 cert_refresher: Arc<HttpsTransportCertRefresher>,
79 ownership_proof: Arc<HttpsTransportOwnershipProof>,
85}
86
87impl FullHttpsTransportServer {
88 async fn new(base_path: &str, address: String, ip: String, port: u16, force_local_network: bool, listener: TcpListener, ddos_protection: Arc<dyn DdosProtection>) -> anyhow::Result<Self> {
89 let path_certs: PathBuf = PathBuf::from(base_path.to_string()).join("certs");
90 let cert_refresher: Arc<HttpsTransportCertRefresher> = Arc::new(HttpsTransportCertRefresher::new(path_certs, ip, port, force_local_network)?);
91 let ownership_proof: Arc<HttpsTransportOwnershipProof> = Arc::new(HttpsTransportOwnershipProof::new(cert_refresher.base_cert.clone()));
92
93 Ok(FullHttpsTransportServer {
94 address,
95 listener: Arc::new(Mutex::new(Some(listener))),
96 state: Arc::new(RwLock::new(ServerState::Created)),
97 ddos_protection,
98 cert_refresher,
99 ownership_proof,
100 })
101 }
102}
103
104#[async_trait::async_trait]
105impl TransportServer for FullHttpsTransportServer {
106 fn get_address(&self) -> &String {
107 &self.address
108 }
109
110 fn get_transport_ownership_proof(&self) -> Arc<dyn TransportOwnershipProof> {
111 self.ownership_proof.clone()
112 }
113
114 async fn listen(&self, cancellation_token: CancellationToken, handler: mpsc::Sender<IncomingRequest>) -> anyhow::Result<()> {
115 {
117 let mut state = self.state.write();
118 match *state {
119 ServerState::Listening => {
120 anyhow::bail!("server is already listening");
121 }
122 ServerState::Shutdown => {
123 anyhow::bail!("server has been shut down");
124 }
125 ServerState::Created => {
126 *state = ServerState::Listening;
127 }
128 }
129 }
130
131 info!("listening on address {}", self.address);
132
133 let mut listener = self.listener.lock().await;
134 let listener = match listener.take() {
135 Some(listener) => listener,
136 None => {
137 return Err(anyhow!("listener had already been taken"));
138 }
139 };
140
141 let handler_clone = handler.clone();
144 let handle_blob = move |Extension(ddos_connection_guard): Extension<Arc<DdosConnectionGuard>>, bytes: Bytes| async move {
145 let handler = handler_clone.clone();
146
147 if !ddos_connection_guard.allow_request() {
148 trace!("DDoS: request from {} blocked", ddos_connection_guard.ip());
149 return Err(StatusCode::TOO_MANY_REQUESTS);
150 }
151
152 let caller_address = ddos_connection_guard.ip().to_string();
153
154 let result: anyhow::Result<Response<axum::body::Body>> = try {
155 let (reply_tx, reply_rx) = oneshot::channel();
156 handler.send(IncomingRequest::new(caller_address, bytes, reply_tx, ddos_connection_guard.clone())).await.map_err(|e| anyhow::anyhow!("Failed to send message: {}", e))?;
157 let response = reply_rx.await.map_err(|e| anyhow::anyhow!("Failed to receive message: {}", e))?;
158
159 let content_length = response.len();
161 let segments = response.compact(config::TRANSPORT_BYTES_GATHERER_COMPACT_THRESHOLD).finish();
162 let body = axum::body::Body::from_stream(stream::iter(segments.into_iter().map(Ok::<Bytes, Infallible>)));
163
164 let response = axum::http::Response::builder()
165 .status(StatusCode::OK)
166 .header(header::CONTENT_TYPE, "application/octet-stream")
167 .header(header::CONTENT_LENGTH, content_length)
168 .body(body)
169 .map_err(|e| anyhow::anyhow!("Failed to build response: {}", e))?;
170
171 response
172 };
173
174 match result {
175 Ok(response) => Ok(response.into_response()),
176
177 Err(e) => {
178 warn!("error processing blob: {}", e);
179 ddos_connection_guard.report_bad_request();
180 Err(StatusCode::BAD_REQUEST)
181 }
182 }
183 };
184
185 let fallback_handler = move |Extension(ddos_connection_guard): Extension<Arc<DdosConnectionGuard>>, uri: Uri| {
186 async move {
187 trace!("unhandled route for path: {} from {}", uri, ddos_connection_guard.ip());
188 ddos_connection_guard.report_bad_request();
189 StatusCode::NOT_FOUND
190 }
191 };
192
193 let axum_app = Router::new()
194 .route("/", get(|| async { "Hashiverse!" }).post(handle_blob))
195 .layer(DefaultBodyLimit::max(config::PROTOCOL_MAX_BLOB_SIZE_REQUEST))
196 .layer(RequestBodyTimeoutLayer::new(Duration::from_secs(config::HTTPS_SERVER_TRANSPORT_BODY_READ_TIMEOUT_SECS)))
197 .layer(CorsLayer::permissive())
198 .fallback(fallback_handler);
199
200 let cert_refresher: Arc<HttpsTransportCertRefresher> = self.cert_refresher.clone();
201 cert_refresher.reload_certs()?;
202
203 let tls_acceptor = {
204 let mut server_config = ServerConfig::builder().with_no_client_auth().with_cert_resolver(cert_refresher.clone());
205 server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"acme-tls/1".to_vec()];
206 TlsAcceptor::from(Arc::new(server_config))
207 };
208
209 let mut make_service = axum_app.into_make_service_with_connect_info::<std::net::SocketAddr>();
210 let connection_semaphore = Arc::new(Semaphore::new(config::HTTPS_SERVER_TRANSPORT_MAX_CONNECTIONS));
211 let mut join_set: JoinSet<()> = JoinSet::new();
212
213 let ddos = self.ddos_protection.clone();
214
215 let accept_loop = async {
216 loop {
217 while join_set.try_join_next().is_some() {}
219
220 tokio::select! {
221 accept_result = listener.accept() => {
222 let (tcp_stream, peer_addr) = match accept_result {
223 Ok(v) => v,
224 Err(e) => { warn!("accept error: {}", e); continue; }
225 };
226 let ip = peer_addr.ip().to_string();
227
228 let ddos_connection_guard = match DdosConnectionGuard::try_new(ddos.clone(), ip.clone()) {
232 Some(guard) => Arc::new(guard),
233 None => {
234 trace!("DDoS: dropping connection from {} (blocked or per-IP cap reached)", ip);
235 continue;
236 }
237 };
238
239 let permit = match Arc::clone(&connection_semaphore).try_acquire_owned() {
241 Ok(p) => p,
242 Err(_) => {
243 warn!("connection cap ({}) reached, dropping {}", config::HTTPS_SERVER_TRANSPORT_MAX_CONNECTIONS, ip);
244 continue;
245 }
246 };
247
248 let tower_service = match make_service.call(peer_addr).await {
252 Ok(s) => s,
253 Err(e) => { warn!("make_service error for {}: {:?}", ip, e); continue; }
254 };
255
256 let tls_acceptor = tls_acceptor.clone();
257
258 join_set.spawn(async move {
259 let _permit = permit; let tls_stream = match tokio::time::timeout(
264 Duration::from_secs(config::HTTPS_SERVER_TRANSPORT_TLS_HANDSHAKE_TIMEOUT_SECS),
265 tls_acceptor.accept(tcp_stream),
266 ).await {
267 Ok(Ok(s)) => s,
268 Ok(Err(e)) => { trace!("TLS error from {}: {}", ip, e); ddos_connection_guard.report_bad_request(); return; }
269 Err(_) => { trace!("TLS handshake timeout from {}", ip); ddos_connection_guard.report_bad_request(); return; }
270 };
271
272 let io = TokioIo::new(tls_stream);
273
274 let hyper_service = hyper::service::service_fn(move |mut req: hyper::Request<Incoming>| {
279 req.extensions_mut().insert(ddos_connection_guard.clone());
280 tower_service.clone().oneshot(req.map(Body::new))
281 });
282
283 let mut auto_builder = AutoBuilder::new(TokioExecutor::new());
287 auto_builder.http1()
288 .timer(TokioTimer::new())
289 .header_read_timeout(Duration::from_secs(config::HTTPS_SERVER_TRANSPORT_HEADER_READ_TIMEOUT_SECS));
290
291 if let Err(e) = auto_builder.serve_connection(io, hyper_service).await {
292 trace!("connection error from {}: {}", ip, e);
293 }
294 });
295 }
296 _ = cancellation_token.cancelled() => break,
297 }
298 }
299
300 let shutdown_deadline = tokio::time::sleep(Duration::from_secs(config::HTTPS_SERVER_TRANSPORT_SHUTDOWN_TIMEOUT_SECS));
302 tokio::pin!(shutdown_deadline);
303 loop {
304 tokio::select! {
305 result = join_set.join_next() => {
306 match result {
307 None => break,
308 Some(Err(e)) => warn!("connection task error during shutdown: {}", e),
309 Some(Ok(())) => {}
310 }
311 }
312 _ = &mut shutdown_deadline => {
313 join_set.abort_all();
314 break;
315 }
316 }
317 }
318
319 anyhow::Ok(())
320 };
321
322 let results = tokio::join!(
324 accept_loop,
325 cert_refresher.process(cancellation_token.clone()),
326 );
327
328 if let Err(e) = results.0 {
329 error!("error in accept loop: {}", e)
330 }
331 if let Err(e) = results.1 {
332 error!("error in cert refresher: {}", e)
333 }
334
335 info!("stopped listening on address {}", self.address);
336 info!("all open connections complete");
337 *self.state.write() = ServerState::Shutdown;
338
339 Ok(())
340 }
341}
342
343impl FullHttpsTransportFactory {
344 pub fn new(ddos_protection: Arc<dyn DdosProtection>, bootstrap_provider: Arc<dyn BootstrapProvider>) -> Self {
345 let https_transport_factory = hashiverse_lib::transport::partial_https_transport::PartialHttpsTransportFactory::new(bootstrap_provider);
346 Self { ddos_protection, https_transport_factory }
347 }
348}
349
350#[async_trait::async_trait]
351impl TransportFactory for FullHttpsTransportFactory {
352 async fn get_bootstrap_addresses(&self) -> Vec<String> {
353 self.https_transport_factory.get_bootstrap_addresses().await
354 }
355
356 async fn create_server(&self, base_path: &str, port: u16, force_local_network: bool) -> anyhow::Result<Arc<dyn TransportServer>> {
357 let address_to_bind = format!("0.0.0.0:{}", port);
362 info!("bind on: {}", address_to_bind);
363 let listener = TcpListener::bind(address_to_bind).await?;
364
365 let address_bound_ip = get_public_ipv4(force_local_network).await?;
366 let address_bound_port = listener.local_addr()?.port();
367 let address = format!("{}:{}", address_bound_ip, address_bound_port);
368
369 let http_transport_server: Arc<dyn TransportServer> = Arc::new(FullHttpsTransportServer::new(base_path, address, address_bound_ip, address_bound_port, force_local_network, listener, self.ddos_protection.clone()).await?);
370 Ok(http_transport_server)
371 }
372
373 async fn rpc(&self, address: &str, bytes: Bytes) -> anyhow::Result<Bytes> {
374 self.https_transport_factory.rpc(address, bytes).await
375 }
376}
377
378
379#[cfg(test)]
380mod tests {
381 use crate::transport::full_https_transport::FullHttpsTransportFactory;
382 use hashiverse_lib::transport::bootstrap_provider::manual_bootstrap_provider::ManualBootstrapProvider;
383 use hashiverse_lib::transport::ddos::noop_ddos::NoopDdosProtection;
384 use hashiverse_lib::transport::transport::TransportFactory;
385 use std::sync::Arc;
386
387 fn install_crypto_provider() {
392 let _ = rustls::crypto::ring::default_provider().install_default();
393 }
394
395 #[tokio::test]
396 async fn rpc_test() -> anyhow::Result<()> {
397 install_crypto_provider();
398 let factory: Arc<dyn TransportFactory> = Arc::new(FullHttpsTransportFactory::new(NoopDdosProtection::default(), ManualBootstrapProvider::default()));
399 hashiverse_lib::transport::transport::tests::rpc_test(factory).await
400 }
401
402 #[tokio::test]
403 async fn bind_port_zero_test() -> anyhow::Result<()> {
404 install_crypto_provider();
405 let factory: Arc<dyn TransportFactory> = Arc::new(FullHttpsTransportFactory::new(NoopDdosProtection::default(), ManualBootstrapProvider::default()));
406 hashiverse_lib::transport::transport::tests::bind_port_zero_test(factory).await
407 }
408}