Skip to main content

hashiverse_server_lib/server/
hashiverse_server.rs

1//! # Top-level server orchestrator
2//!
3//! [`HashiverseServer`] is the server-binary analogue of
4//! [`hashiverse_lib::client::hashiverse_client::HashiverseClient`]: the single struct
5//! that wires together every subsystem the node needs to operate and hands them to
6//! the inbound-request handler.
7//!
8//! What it owns:
9//!
10//! - **Identity** — a persisted [`hashiverse_lib::tools::server_id::ServerId`] is
11//!   loaded from the [`crate::environment::environment::Environment`] or minted
12//!   fresh with proof-of-work on first start.
13//! - **Transport** — built via a `TransportFactory` (TLS in production, plain TCP
14//!   or in-memory in tests) and bound to the port from
15//!   [`crate::server::args::Args`].
16//! - **DHT** — a [`crate::server::kademlia::kademlia::Kademlia`] populated from the
17//!   persisted peer buckets at startup and kept up to date by the handler dispatch.
18//! - **Caches** — [`crate::server::post_bundle_caching`] and
19//!   [`crate::server::post_bundle_feedback_caching`] plus per-connection reply-salt
20//!   and heal caches, all backed by `moka` with TTL/TTI eviction.
21//! - **Replay protection** — a short-window set of observed request salts so an
22//!   attacker can't replay a valid signed request back at us.
23
24use crate::environment::environment::{Environment, EnvironmentDimensions, EnvironmentFactory, CONFIG_KADEMLIA_PEER_BUCKETS, CONFIG_SERVER_ID};
25use crate::server::kademlia::kademlia;
26use crate::server::kademlia::kademlia::Kademlia;
27use crate::server::post_bundle_caching::PostBundleCache;
28use crate::server::post_bundle_feedback_caching::PostBundleFeedbackCache;
29use crate::server::stats::RequestRateWindows;
30use hashiverse_lib::anyhow_assert_eq;
31use hashiverse_lib::protocol::payload::payload::{AnnounceResponseV1, AnnounceResponseV2, AnnounceV1, AnnounceV2, BootstrapResponseV1, BootstrapV1, PayloadRequestKind, PayloadResponseKind, PeerStatsResponseV1, PAYLOAD_REQUEST_KIND_COUNT};
32use hashiverse_lib::protocol::peer::Peer;
33use hashiverse_lib::protocol::rpc;
34use hashiverse_lib::tools::runtime_services::RuntimeServices;
35use hashiverse_lib::tools::server_id::ServerId;
36use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE, MILLIS_IN_SECOND};
37use hashiverse_lib::tools::time_provider::moka_clock::TimeProviderMokaClock;
38use hashiverse_lib::tools::time_provider::time_provider::TimeProvider;
39use hashiverse_lib::tools::types::{Id, Salt};
40use hashiverse_lib::tools::{config, tools};
41use hashiverse_lib::tools::json;
42use hashiverse_lib::transport::transport::{IncomingRequest, TransportServer};
43use log::{error, info, trace, warn};
44use moka::sync::Cache;
45use parking_lot::{Mutex, RwLock};
46use std::sync::Arc;
47use std::sync::atomic::AtomicU64;
48use std::time::Duration;
49use bytes::Bytes;
50use tokio::sync::mpsc;
51use tokio_util::sync::CancellationToken;
52use hashiverse_lib::protocol::rpc::rpc_response::RpcResponsePacketRx;
53use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
54use hashiverse_lib::protocol::payload::payload::TrendingHashtagsFetchResponseV1;
55use crate::server::args::Args;
56
57pub struct HashiverseServer {
58    pub runtime_services: Arc<RuntimeServices>,
59    pub environment: Arc<Environment>,
60    pub server_id: ServerId,
61    pub kademlia: Arc<RwLock<Kademlia<Id, Peer>>>,
62    pub transport_server: Arc<dyn TransportServer>,
63    pub peer_self: Arc<RwLock<Peer>>,
64    pub heal_in_progress: Cache<Id, ()>,
65    pub seen_salts: Cache<Salt, ()>,
66    pub post_bundle_cache: PostBundleCache,
67    pub post_bundle_feedback_cache: PostBundleFeedbackCache,
68    pub trending_hashtags: Cache<String, HyperLogLog>,
69    pub trending_hashtags_response_cache: Mutex<Option<(TimeMillis, TrendingHashtagsFetchResponseV1)>>,
70    /// Per-`PayloadRequestKind` running counter of inbound dispatches. Incremented
71    /// after packet decode + replay-guard but before per-handler PoW gates, so
72    /// adversarial load shows up too.
73    pub request_counters: Arc<[AtomicU64; PAYLOAD_REQUEST_KIND_COUNT]>,
74    /// Per-`PayloadRequestKind` decaying call-rate estimates (hour/day/month).
75    /// Recorded alongside `request_counters` on the same hot path; rendered next
76    /// to the all-time total so idle servers no longer look busy. Each kind has
77    /// its own `Mutex` so a single-kind flood contends only with itself.
78    pub request_rate_windows: Arc<[Mutex<RequestRateWindows>; PAYLOAD_REQUEST_KIND_COUNT]>,
79    /// Cached signed stats blob. The `TimeMillis` is the timestamp recorded in
80    /// the cached `PeerStatsResponseV1` itself — the cache hands the response
81    /// back verbatim so clients re-sharing it get a single canonical byte sequence.
82    pub peer_stats_response_cache: Mutex<Option<(TimeMillis, PeerStatsResponseV1)>>,
83}
84
85impl HashiverseServer {
86    pub async fn new(runtime_services: Arc<RuntimeServices>, environment_factory: Arc<dyn EnvironmentFactory>, args: Args) -> anyhow::Result<Arc<Self>> {
87        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(args.max_post_database_size_megabytes * 1024 * 1024); 
88        let environment = environment_factory.open_next_available(environment_dimensions).await?;
89
90        // let passphrase = passphrase::get_passphrase(args.passphrase_path);
91        let config_server_id = environment.config_get_bytes(CONFIG_SERVER_ID)?;
92        let server_id = match config_server_id {
93            None => {
94                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, args.skip_pq_commitment_bytes, runtime_services.pow_generator.as_ref()).await?;
95                environment.config_put_bytes(CONFIG_SERVER_ID, server_id.encode()?)?;
96                info!("starting new server with server_id={}", server_id);
97                server_id
98            }
99            Some(config_server_id) => {
100                let server_id = ServerId::decode(config_server_id.as_ref())?;
101                server_id.verify()?;
102                info!("restarting existing server with server_id={}", server_id);
103                server_id
104            }
105        };
106
107        let transport_server = runtime_services.transport_factory.create_server(&args.base_path, args.port, args.force_local_network).await?;
108
109        // Update the address in our Peer record
110        let mut peer_self = server_id.to_peer(runtime_services.time_provider.as_ref())?;
111        peer_self.address = transport_server.get_address().to_string();
112        peer_self.sign(runtime_services.time_provider.as_ref(), &server_id.keys.signature_key)?;
113
114        // Boto our kademlia
115        let mut kademlia = Kademlia::<Id, Peer>::new(server_id.id, config::SERVER_KADEMLIA_MAX_PEERS_PER_BUCKET);
116        {
117            let now = runtime_services.time_provider.current_time_millis();
118
119            // We always belong in our own kademlia
120            kademlia.add_peer(peer_self.clone(), now)?;
121
122            // Have we previously persisted our peer buckets?
123            {
124                let try_result = try {
125                    let peer_buckets = environment.config_get_struct::<Vec<Vec<Peer>>>(CONFIG_KADEMLIA_PEER_BUCKETS)?;
126                    if let Some(peer_buckets) = peer_buckets {
127                        for peer_bucket in peer_buckets {
128                            for peer in peer_bucket {
129                                kademlia.add_peer(peer, now)?;
130                            }
131                        }
132                    }
133                };
134
135                if let Err(e) = try_result {
136                    warn!("problem depersisting peer_buckets: {}", e);
137                }
138            }
139        }
140
141        info!("server_id={}", server_id);
142        info!("peer_self={}", peer_self);
143
144        // All time-based moka caches below run on our TimeProvider (scaled in tests), not wall time.
145        let time_provider = runtime_services.time_provider.clone();
146
147        let hashiverse_server = HashiverseServer {
148            runtime_services,
149            environment: Arc::new(environment),
150            server_id,
151            kademlia: Arc::new(RwLock::new(kademlia)),
152            transport_server,
153            peer_self: Arc::new(RwLock::new(peer_self)),
154            heal_in_progress: Cache::builder().time_to_live(Duration::from_secs(60)).external_clock(Arc::new(TimeProviderMokaClock::new(time_provider.clone()))).build(),
155            seen_salts: Cache::builder().time_to_live(Duration::from_mins(5)).max_capacity(100_000).external_clock(Arc::new(TimeProviderMokaClock::new(time_provider.clone()))).build(),
156            post_bundle_cache: PostBundleCache::new(config::SERVER_POST_BUNDLE_CACHE_MAX_ORIGINATORS_PER_LOCATION, config::SERVER_POST_BUNDLE_CACHE_MAX_BYTES, time_provider.clone()),
157            post_bundle_feedback_cache: PostBundleFeedbackCache::new(config::SERVER_POST_BUNDLE_FEEDBACK_CACHE_MAX_BYTES, time_provider.clone()),
158            trending_hashtags: Cache::builder().max_capacity(256).build(),
159            trending_hashtags_response_cache: Mutex::new(None),
160            request_counters: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))),
161            request_rate_windows: Arc::new(std::array::from_fn(|_| Mutex::new(RequestRateWindows::new()))),
162            peer_stats_response_cache: Mutex::new(None),
163        };
164
165        Ok(Arc::new(hashiverse_server))
166    }
167
168    pub async fn run(&self, cancellation_token: CancellationToken) {
169        info!("server started");
170
171        let (tx, rx) = mpsc::channel::<IncomingRequest>(32);
172
173        let res = tokio::try_join!(
174            self.wrap_and_dispatch_network_envelopes(cancellation_token.clone(), rx),
175            self.maintain_environment(cancellation_token.clone(), self.runtime_services.time_provider.clone()),
176            self.maintain_kademlia(cancellation_token.clone(), self.runtime_services.time_provider.clone()),
177            self.transport_server.listen(cancellation_token.clone(), tx),
178        );
179
180        match res {
181            Ok(_) => info!("server stopped"),
182            Err(e) => error!("server stopped with error: {}", e),
183        }
184    }
185
186    pub async fn add_potential_peer_to_kademlia(&self, peer: Peer, time_millis: TimeMillis) {
187        // Verify this peer
188        let result = peer.verify();
189        if let Err(e) = result {
190            warn!("peer {} failed verification: {}", peer, e);
191            return;
192        }
193
194        // Has this peer done enough work?
195        if peer.pow_initial.pow < config::SERVER_KEY_POW_MIN {
196            warn!("peer {} failed pow so not adding to our kademlia", peer);
197            return;
198        }
199
200        let result = self.kademlia.write().add_peer(peer, time_millis);
201        if let Err(e) = result {
202            warn!("problem adding peer: {}", e);
203        }
204    }
205
206    async fn rpc_server_unknown(&self, address: &str, payload_request_kind: PayloadRequestKind, payload: Bytes) -> anyhow::Result<RpcResponsePacketRx> {
207        rpc::rpc::rpc_server_unknown(&self.runtime_services, &self.server_id.id, address, payload_request_kind, payload).await
208    }
209
210    async fn rpc_server_known(&self, destination_peer: &Peer, payload_request_kind: PayloadRequestKind, payload: Bytes) -> anyhow::Result<RpcResponsePacketRx> {
211        rpc::rpc::rpc_server_known(&self.runtime_services, &self.server_id.id, destination_peer, payload_request_kind, payload).await
212    }
213
214    async fn maintain_environment(&self, cancellation_token: CancellationToken, time_provider: Arc<dyn TimeProvider>) -> Result<(), anyhow::Error> {
215        loop {
216            if cancellation_token.is_cancelled() {
217                break;
218            }
219
220            let time_millis = time_provider.current_time_millis();
221
222            self.environment.do_maintenance(&cancellation_token, time_millis).await?;
223
224            tools::cancellable_sleep_millis(self.runtime_services.time_provider.as_ref(), MILLIS_IN_MINUTE, &cancellation_token).await;
225        }
226
227        Ok(())
228    }
229
230    async fn maintain_kademlia(&self, cancellation_token: CancellationToken, time_provider: Arc<dyn TimeProvider>) -> Result<(), anyhow::Error> {
231        let mut last_bootstrap = TimeMillis::zero();
232        let mut last_announce = TimeMillis::zero();
233        let mut last_peers_dump_to_storage = self.runtime_services.time_provider.current_time_millis();
234
235        loop {
236            if cancellation_token.is_cancelled() {
237                break;
238            }
239
240            let now = time_provider.current_time_millis();
241
242            // Do we need to bootstrap?
243            if now - last_bootstrap > config::MILLIS_TO_WAIT_BETWEEN_BOOTSTRAPS {
244                last_bootstrap = now;
245
246                let needs_bootstrapping = { self.kademlia.read().len() < config::MINIMUM_PEERS_TO_STOP_BOOTSTRAPPING };
247
248                if needs_bootstrapping {
249
250                    // Lets randomize these addresses so that the first one is not snowed
251                    let mut bootstrap_addresses = self.runtime_services.transport_factory.get_bootstrap_addresses().await;
252                    tools::shuffle(&mut bootstrap_addresses);
253                    trace!("bootstrap addresses: {:?}", bootstrap_addresses);
254
255                    for bootstrap_address in bootstrap_addresses {
256                        if cancellation_token.is_cancelled() {
257                            break;
258                        }
259
260                        let try_result: anyhow::Result<()> = try {
261                            {
262                                trace!("bootstrapping {}", bootstrap_address);
263                                let rpc_response_packet_rx = self.rpc_server_unknown(&bootstrap_address, PayloadRequestKind::BootstrapV1, json::struct_to_bytes(&BootstrapV1 {})?).await?;
264                                anyhow_assert_eq!(&PayloadResponseKind::BootstrapResponseV1, &rpc_response_packet_rx.response_request_kind);
265                                let response = json::bytes_to_struct::<BootstrapResponseV1>(&rpc_response_packet_rx.bytes)?;
266                                for peer in response.peers_random {
267                                    self.add_potential_peer_to_kademlia(peer, now).await;
268                                }
269                            }
270                        };
271
272                        if let Err(e) = try_result {
273                            warn!("problem bootstrapping {}: {}", bootstrap_address, e);
274                        }
275
276                        let needs_bootstrapping = { self.kademlia.read().len() < config::MINIMUM_PEERS_TO_STOP_BOOTSTRAPPING };
277                        if !needs_bootstrapping {
278                            break;
279                        }
280                        
281                    }
282
283                    trace!("We now have {} peers", self.kademlia.read().len());
284                }
285            }
286
287            // Do we need to announce?
288            if now - last_announce > config::MILLIS_TO_WAIT_BETWEEN_ANNOUNCES {
289                last_announce = now;
290
291                let peer_self = self.peer_self.read().clone();
292
293
294                // Pick some candidates who we will poke
295                let mut announce_peers = Vec::<Peer>::new();
296                {
297                    let kademlia = self.kademlia.read();
298
299                    // One that is potentially dead
300                    let peer_with_lowest_score = kademlia.get_peer_with_lowest_score();
301                    if let Some(peer_with_lowest_score) = peer_with_lowest_score {
302                        announce_peers.push(peer_with_lowest_score.clone());
303                    }
304
305                    // Someone in our vicinity so our vicinity becomes more tightly knit
306                    let (peers_nearest, _) = kademlia.get_peers_for_key(&peer_self.id, 8);
307                    if !peers_nearest.is_empty() {
308                        announce_peers.push(tools::random_element(&peers_nearest).clone());
309                    }
310                }
311
312                // Pick the peer that we havent heard from for the longest
313                for announce_peer in announce_peers {
314                    if cancellation_token.is_cancelled() {
315                        break;
316                    }
317
318                    // If we have to announce to ourself, then simply push our own freshest peer
319                    if announce_peer == peer_self {
320                        // trace!("We are own own oldest peer!");
321                        self.add_potential_peer_to_kademlia(peer_self.clone(), now).await;
322                        continue;
323                    }
324
325                    // V2-first outbound. The transport ownership proof carries our current
326                    // ACME chain (HTTPS) or an empty marker (mem / TCP). If we don't have a
327                    // proof to send yet — e.g. ACME hasn't completed first issuance — we skip
328                    // this announce tick; the next tick will pick up the chain as soon as the
329                    // cert refresher loads it (see HttpsTransportOwnershipProof's cache + the
330                    // RwLock pull pattern).
331                    let proof_payload = match self.transport_server.get_transport_ownership_proof().make_ownership_proof_payload() {
332                        Some(p) => p,
333                        None => {
334                            trace!("skipping announce to {}: no transport ownership proof available yet", announce_peer);
335                            continue;
336                        }
337                    };
338
339                    let try_result: anyhow::Result<()> = try {
340                        // Try V2 first. If V2 fails for any reason — peer is V1-only and doesn't
341                        // know the variant, our chain didn't validate at their end, transient
342                        // network error — fall back to V1. Only if V1 *also* fails do we prune.
343                        // This keeps the network functional during the V1 coexistence window:
344                        // we never drop a peer just because they haven't upgraded yet.
345                        let v2_result = self.rpc_server_known(
346                            &announce_peer,
347                            PayloadRequestKind::AnnounceV2,
348                            json::struct_to_bytes(&AnnounceV2 { peer_self: peer_self.clone(), proof_payload: proof_payload.to_vec() })?,
349                        ).await;
350
351                        match v2_result {
352                            Ok(rpc_response_packet_rx) => {
353                                anyhow_assert_eq!(&PayloadResponseKind::AnnounceResponseV2, &rpc_response_packet_rx.response_request_kind);
354                                let response = json::bytes_to_struct::<AnnounceResponseV2>(&rpc_response_packet_rx.bytes)?;
355                                self.add_potential_peer_to_kademlia(response.peer_self, now).await;
356                                for peer in response.peers_nearest {
357                                    self.add_potential_peer_to_kademlia(peer, now).await;
358                                }
359                            }
360                            Err(v2_err) => {
361                                trace!("V2 announce to {} failed: {}; falling back to V1", announce_peer, v2_err);
362                                let rpc_response_packet_rx = self.rpc_server_known(&announce_peer, PayloadRequestKind::AnnounceV1, json::struct_to_bytes(&AnnounceV1 { peer_self: peer_self.clone() })?).await?;
363                                anyhow_assert_eq!(&PayloadResponseKind::AnnounceResponseV1, &rpc_response_packet_rx.response_request_kind);
364                                let response = json::bytes_to_struct::<AnnounceResponseV1>(&rpc_response_packet_rx.bytes)?;
365                                self.add_potential_peer_to_kademlia(response.peer_self, now).await;
366                                for peer in response.peers_nearest {
367                                    self.add_potential_peer_to_kademlia(peer, now).await;
368                                }
369                            }
370                        }
371                    };
372
373                    if let Err(e) = try_result {
374                        warn!("problem announcing {}: {}", announce_peer, e);
375                        self.kademlia.write().remove_peer(&announce_peer.id, now);
376                    }
377                }
378            }
379
380            // Do we need to persist our peer buckets?
381            {
382                let try_result = try {
383                    let kademlia = self.kademlia.read();
384                    if last_peers_dump_to_storage < kademlia.peers_last_changed() && now - last_peers_dump_to_storage > config::MILLIS_TO_WAIT_BETWEEN_PEER_DUMPS {
385                        last_peers_dump_to_storage = kademlia.peers_last_changed();
386                        let peer_buckets = kademlia.get_peer_buckets();
387                        //let total_peers: usize = peer_buckets.iter().map(|peers| peers.len()).sum();
388                        // trace!("persisting peer_buckets of length={}", total_peers);
389                        self.environment.config_put_struct(CONFIG_KADEMLIA_PEER_BUCKETS, &peer_buckets)?;
390                    }
391                };
392
393                if let Err(e) = try_result {
394                    warn!("problem persisting peer_buckets: {}", e);
395                }
396            }
397
398            tools::cancellable_sleep_millis(self.runtime_services.time_provider.as_ref(), MILLIS_IN_SECOND.const_mul(30), &cancellation_token).await;
399        }
400
401        Ok(())
402    }
403}
404
405impl kademlia::Peer<Id> for Peer {
406    fn id(&self) -> &Id {
407        &self.id
408    }
409    fn score(&self, time_millis: TimeMillis) -> f64 {
410        // This score makes sure that peers are currently active, but also benefits peers who have been active for a while
411        self.pow_current_day.pow_decayed_day(time_millis) + self.pow_current_month.pow_decayed_month(time_millis)
412    }
413}