Skip to main content

hashiverse_server_lib/server/handlers/
dispatch.rs

1//! # Inbound RPC dispatch loop
2//!
3//! The hot path of the server: a single async loop that drains `IncomingRequest`s
4//! from the transport's `mpsc::Receiver`, decodes each packet via
5//! [`hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx`], and routes
6//! by [`hashiverse_lib::protocol::payload::payload::PayloadRequestKind`] to the
7//! correct per-op handler (bootstrap, announce, get/submit/heal/cache post
8//! bundles, get/submit/heal/cache feedback, fetch URL preview, trending
9//! hashtags, ping).
10//!
11//! Per-request safety checks happen before any real work:
12//!
13//! - **PoW verification** — the packet's PoW must be sufficient for *this* server's
14//!   identity. Anything under-powered or stale is dropped immediately.
15//! - **Replay protection** — a short-lived salt cache rejects salts we've already
16//!   seen, so a valid signed request can't be replayed from another network vantage.
17//! - **Peer upgrade** — if the caller's embedded [`hashiverse_lib::protocol::peer::Peer`] carries a stronger PoW
18//!   than what we have in the tracker, the tracker is upgraded in place.
19//!
20//! The loop respects a `CancellationToken` so graceful shutdown drains in-flight
21//! work and stops accepting new requests cleanly.
22
23use crate::environment::environment::PostBundleMetadata;
24use crate::server::hashiverse_server::HashiverseServer;
25use crate::tools::tools::is_ssrf_protected_ip;
26use bytes::{Bytes, BytesMut};
27use hashiverse_lib::anyhow_assert_eq;
28use hashiverse_lib::protocol::payload::payload::{
29    AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
30    GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
31    HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1,
32    SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
33};
34use hashiverse_lib::protocol::peer::PeerPow;
35use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
36use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
37use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
38use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
39use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
40use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
41use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
42use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
43use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
44use hashiverse_lib::tools::types::{Id, Signature};
45use hashiverse_lib::tools::{hashing, url_preview};
46use hashiverse_lib::tools::{compression, config, json, signing, BytesGatherer};
47use hashiverse_lib::transport::transport::IncomingRequest;
48use log::{info, trace, warn};
49use std::collections::HashSet;
50use std::sync::atomic::Ordering;
51
52use crate::server::stats::{environment_stats_subtree, kademlia_stats_subtree, request_counts_subtree, system_stats_subtree};
53use tokio::sync::mpsc;
54use tokio_util::sync::CancellationToken;
55
56/// Fallback hashtags used to top up a trending-hashtags response when the server
57/// does not yet know enough real trending hashtags to satisfy the requested limit.
58/// Applied in order, skipping any entry whose normalised form is already present
59/// in the real trending list. Filler entries are returned with `count = 0` so
60/// clients can distinguish seeded fillers from genuine trending data.
61const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
62
63/// Normalise a hashtag for equality comparison: lowercase, with any leading `#`
64/// stripped. Mirrors the canonicalisation performed by `Id::from_hashtag_str`.
65fn normalise_hashtag(hashtag: &str) -> String {
66    let lowercased = hashtag.to_lowercase();
67    match lowercased.strip_prefix('#') {
68        Some(stripped) => stripped.to_string(),
69        None => lowercased,
70    }
71}
72
73/// Top up `trending_hashtags` from `fallback_hashtags` (in order) until it reaches
74/// `limit`, skipping any fallback whose normalised form already appears in the list.
75/// Filler entries are inserted with `count = 0`. No-op if the list is already at
76/// or above the limit.
77fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
78    let target_length = limit as usize;
79    if trending_hashtags.len() >= target_length {
80        return;
81    }
82
83    let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
84        .map(|entry| normalise_hashtag(&entry.hashtag))
85        .collect();
86
87    for fallback_hashtag in fallback_hashtags {
88        if trending_hashtags.len() >= target_length {
89            break;
90        }
91        let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
92        if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
93            continue;
94        }
95        trending_hashtags.push(TrendingHashtagV1 {
96            hashtag: (*fallback_hashtag).to_string(),
97            count: 0,
98        });
99        existing_normalised_hashtags.insert(normalised_fallback_hashtag);
100    }
101}
102
103impl HashiverseServer {
104    pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
105        loop {
106            tokio::select! {
107                _ = cancellation_token.cancelled() => { break },
108
109                receipt = rx.recv() => {
110                    match receipt {
111                        Some(incoming) => {
112                            // trace!("dispatch_network_envelopes received bytes={:?}", incoming.bytes);
113                            let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
114                            match result {
115                                Ok(bytes) => {
116                                    let result = incoming.reply.send(bytes);
117                                    if result.is_err() { warn!("failed to send reply"); }
118                                },
119                                Err(e) => {
120                                    warn!("failed to process packet from {}: {}", incoming.caller_address, e);
121                                    incoming.report_bad_request();
122                                    drop(incoming.reply);
123                                },
124                            }
125                        },
126                        None => {
127                            warn!("channel closed");
128                            break;
129                        }
130                    }
131                }
132            }
133        }
134
135        Ok(())
136    }
137
138    async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
139        let caller_address = incoming.caller_address.as_str();
140        let current_time_millis = self.runtime_services.time_provider.current_time_millis();
141
142        // Decode the envelope
143        let rpc_request_packet_rx = RpcRequestPacketRx::decode(&current_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
144        // trace!("payload_request_kind={}", rpc_request_packet_rx.payload_request_kind);
145
146        // Count this inbound request. After decoding (so malformed traffic doesn't
147        // pollute the totals) but before the per-handler PoW check (so failed-PoW
148        // attempts still show up — adversarial load is the load we'd want to see).
149        self.request_counters[rpc_request_packet_rx.payload_request_kind.clone() as usize].fetch_add(1, Ordering::Relaxed);
150
151        // Check that we have not seen this salt recently (stops replay attacks)
152        {
153            if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
154                anyhow::bail!("replay detected: salt already seen");
155            }
156            self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
157        }
158
159        // Keep this for our response
160        let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
161
162        let dispatch_result: anyhow::Result<BytesGatherer> = try {
163            // Check that the pow is meaningful
164            let pow = match rpc_request_packet_rx.pow_server_known {
165                true => {
166                    let (pow, improved_pow_current_day, improved_pow_current_month) = {
167                        let peer_self = self.peer_self.read(); // Remember alphabetical locking order!
168                        let pow = PeerPow::new(
169                            rpc_request_packet_rx.pow_sponsor_id,
170                            &peer_self.verification_key_bytes,
171                            &peer_self.pq_commitment_bytes,
172                            rpc_request_packet_rx.pow_timestamp,
173                            rpc_request_packet_rx.pow_content_hash,
174                            rpc_request_packet_rx.pow_salt,
175                        )?;
176
177                        let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
178                        let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
179
180                        (pow, improved_pow_current_day, improved_pow_current_month)
181                    };
182
183                    // Check if we need to modify peer_self
184                    if improved_pow_current_day || improved_pow_current_month {
185                        let mut peer_self = self.peer_self.write(); // Remember alphabetical locking order!
186                        if improved_pow_current_day {
187                            trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
188                            peer_self.pow_current_day = pow.clone();
189                        }
190                        if improved_pow_current_month {
191                            trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
192                            peer_self.pow_current_month = pow.clone();
193                        }
194
195                        peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
196                    }
197
198                    Some(pow)
199                }
200
201                false => {
202                    // Only some request types are allowed anonymous pow
203                    match rpc_request_packet_rx.payload_request_kind {
204                        PayloadRequestKind::BootstrapV1 => {}
205                        _ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
206                    }
207
208                    None
209                }
210            };
211
212            // Dispatch appropriately
213            let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
214            let response_flags = match compress_response {
215                true => RpcResponsePacketTxFlags::COMPRESSED,
216                false => RpcResponsePacketTxFlags::empty(),
217            };
218
219            // Encode response
220            RpcResponsePacketTx::encode(
221                &self.server_id.keys.signature_key,
222                &self.server_id.keys.verification_key_bytes,
223                &self.server_id.keys.pq_commitment_bytes,
224                &self.server_id.sponsor_id,
225                &self.server_id.timestamp,
226                &self.server_id.hash,
227                &self.server_id.salt,
228                &pow_content_hash,
229                response_flags,
230                payload_response_kind,
231                payload,
232            )?
233        };
234
235        match dispatch_result {
236            Ok(results) => Ok(results),
237            Err(e) => {
238                warn!("failed to dispatch packet from {}: {}", caller_address, e);
239                incoming.report_bad_request();
240
241                let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
242                let response = ErrorResponseV1 { code: 0, message: e.to_string() };
243                let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
244
245                // Encode response
246                RpcResponsePacketTx::encode(
247                    &self.server_id.keys.signature_key,
248                    &self.server_id.keys.verification_key_bytes,
249                    &self.server_id.keys.pq_commitment_bytes,
250                    &self.server_id.sponsor_id,
251                    &self.server_id.timestamp,
252                    &self.server_id.hash,
253                    &self.server_id.salt,
254                    &pow_content_hash,
255                    RpcResponsePacketTxFlags::COMPRESSED,
256                    payload_response_kind,
257                    payload,
258                )
259            }
260        }
261    }
262
263    async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
264        // Where do we want to decide if we should compress?  Here in one block, or at the end of each individual dispatch_xxx?
265        let compress_response = match rpc_request_packet_rx.payload_request_kind {
266            PayloadRequestKind::GetPostBundleV1 => false,   // We don't compress these again as they are already predominantly compressed
267            PayloadRequestKind::CachePostBundleV1 => false, // We don't compress these again as they are already predominantly compressed
268            _ => true,
269        };
270
271        let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
272            PayloadRequestKind::ErrorV1 => {
273                anyhow::bail!("Received ErrorV1");
274            }
275            PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
276            PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
277            PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
278            PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
279            PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
280            PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
281            PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
282            PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
283            PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
284            PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
285            PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
286            PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
287            PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
288            PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
289            PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
290            PayloadRequestKind::PeerStatsRequestV1 => self.dispatch_network_payload_x_PeerStatsRequestV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
291        };
292
293        Ok((compress_response, payload_response_kind, payload))
294    }
295
296    #[allow(non_snake_case)]
297    async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
298        anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
299        let peer = self.peer_self.read().clone();
300        let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
301        Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
302    }
303
304    #[allow(non_snake_case)]
305    async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
306        anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
307        let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
308        let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
309        Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
310    }
311
312    #[allow(non_snake_case)]
313    async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
314        anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
315
316        let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
317        // trace!("received AnnounceV1 from peer={}", request.peer_self);
318
319        let peer = request.peer_self;
320        let peer_id = peer.id;
321
322        // Check that this Peer checks out and add it to our kademlia
323        self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
324
325        let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
326
327        let json = json::struct_to_bytes(&AnnounceResponseV1 {
328            peer_self: self.peer_self.read().clone(),
329            peers_nearest,
330        })?;
331        Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
332    }
333
334    #[allow(non_snake_case)]
335    async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
336        anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
337
338        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
339
340        let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
341        trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
342
343        // Check that the location_id makes sense
344        request.bucket_location.validate()?;
345
346        // Store the provided Peers
347        {
348            for peer in request.peers_visited {
349                self.add_potential_peer_to_kademlia(peer, time_millis).await;
350            }
351        }
352
353        let peer_self = self.peer_self.read().clone();
354
355        // Check our own records to see that we are close enough to store this post
356        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
357        if !among_peers_nearer {
358            warn!("I am not in peers_nearer {}", peer_self);
359        }
360
361        let post_bundle = match among_peers_nearer {
362            true => {
363                // At some point, for parallelisms sake, we may wish to replace this with a read lock followed by a release and write lock and recheck
364                // But then again - if our server is getting load, the federated cache mechanism should alleviate it, so this may be overkill...
365                let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
366
367                let mut encoded_post_bundle_bytes: Option<Bytes> = None;
368
369                // Try to load bytes from the disk (if we even have them)
370                let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
371                if let Some(mut post_bundle_metadata) = post_bundle_metadata {
372                    encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
373
374                    // Has the PostBundle become sealed since the last time it was written to disk?  Perhaps enough time has passed
375                    if !post_bundle_metadata.sealed {
376                        let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
377                        if sealed {
378                            // We can rewrite the postbundle on disk for the final time now that it is sealed
379                            if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
380                                let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
381                                encoded_post_bundle.header.time_millis = time_millis;
382                                encoded_post_bundle.header.sealed = true;
383                                encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
384                                let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
385                                self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
386                                encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
387                            }
388
389                            // And the updated metadata
390                            post_bundle_metadata.sealed = true;
391                            self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
392                        }
393                        else {
394                            // We must simply update the timestamp - we need this for client caching.  It's expensive, but caching should help us out here if it is a truly busy bucket
395                            if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
396                                let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
397                                encoded_post_bundle.header.time_millis = time_millis;
398                                encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
399                                let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
400                                encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
401                            }
402                        }
403                    }
404                };
405
406                // If we dont have the metadata, or the bytes on disk, return a fresh one
407                // Generally if we have the metadata, we should always have bytes on disk, except if a request ot post was granted but they then never came back with the data...
408                if encoded_post_bundle_bytes.is_none() {
409                    let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
410
411                    let mut header = EncodedPostBundleHeaderV1 {
412                        time_millis,
413                        location_id: request.bucket_location.location_id,
414                        overflowed: false,
415                        sealed,
416                        num_posts: 0,
417                        encoded_post_ids: vec![],
418                        encoded_post_lengths: vec![],
419                        encoded_post_healed: HashSet::new(),
420                        peer: peer_self.clone(),
421                        signature: Signature::zero(),
422                    };
423                    header.signature_generate(&self.server_id.keys.signature_key)?;
424
425                    let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
426                    encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
427                }
428
429                encoded_post_bundle_bytes
430            }
431            false => None,
432        };
433
434        let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
435
436        let get_post_bundle_response = GetPostBundleResponseV1 {
437            peers_nearer,
438            cache_request_token: cache_result.cache_request_token,
439            post_bundles_cached: cache_result.cached_items,
440            post_bundle,
441        };
442        Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
443    }
444
445    #[allow(non_snake_case)]
446    async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
447        anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
448
449        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
450
451        let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
452        trace!("received GetPostBundleFeedbackV1");
453
454        // Store the provided Peers
455        {
456            for peer in request.peers_visited {
457                self.add_potential_peer_to_kademlia(peer, time_millis).await;
458            }
459        }
460
461        // Check our own records to see that we are close enough to store this post
462        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
463
464        let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
465
466        if among_peers_nearer {
467            // We only support feedbacks if we know about this post_bundle
468            let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
469            if post_bundle_metadata.is_some() {
470                post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
471            }
472        }
473
474        // Wrap the feedbacks with a header (if we have any)
475        let peer_self = self.peer_self.read().clone();
476        let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
477            Some(feedbacks_bytes) => {
478
479                let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
480
481                let mut header = EncodedPostBundleFeedbackHeaderV1 {
482                    time_millis,
483                    location_id: request.bucket_location.location_id,
484                    feedbacks_bytes_hash,
485                    peer: peer_self.clone(),
486                    signature: Signature::zero(),
487                };
488                header.signature_generate(&self.server_id.keys.signature_key);
489
490                let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
491                    header,
492                    feedbacks_bytes,
493                };
494                Some(encoded_post_bundle_feedback.to_bytes()?)
495            }
496            None => None,
497        };
498
499        let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
500
501        let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
502            peers_nearer,
503            cache_request_token: cache_result.cache_request_token,
504            post_bundle_feedbacks_cached: cache_result.cached_items,
505            encoded_post_bundle_feedback,
506        };
507        Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
508    }
509
510    #[allow(non_snake_case)]
511    async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
512        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
513
514        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
515
516        let pow = match pow {
517            Some(pow) => pow,
518            None => anyhow::bail!("We need pow for a submit post claim"),
519        };
520
521        let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
522        trace!("received SubmitPostClaimV1");
523
524        // Check that the location_id makes sense
525        request.bucket_location.validate()?;
526
527        // Check that we support the bucket duration
528        let bucket_duration = {
529            let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
530            match bucket_duration {
531                Some(bucket_duration) => *bucket_duration,
532                None => anyhow::bail!("Unrecognised bucket duration provided"),
533            }
534        };
535
536        let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
537
538        // Check that enough pow has been done for this post
539        {
540            let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
541            if pow.pow < pow_minimum {
542                anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
543            }
544        }
545
546        // Check that the post matches the bucket
547        {
548            // Ensure that the bucket timestamp fits the post
549            let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
550            if timestamp != request.bucket_location.bucket_time_millis {
551                anyhow::bail!("The post timestamp does not match the bucket");
552            }
553        }
554
555        let client_id = decoded_post.header.client_id()?;
556
557        // Ensure that the base_id is related to the post in the linked_base_ids.
558        if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
559            anyhow::bail!("The base_id is not related to the post");
560        }
561
562        // Check that only the posting user is allowed to post to a bucket of type USER
563        if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
564            anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
565        }
566
567        // For ReplyToPost and Sequel buckets, verify the referenced post is real (valid signature).
568        // For Sequel buckets, additionally verify same-author.
569        if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
570            let original_header_bytes = request.referenced_post_header_bytes
571                .ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
572
573            // Decode the original post header using the submitter's client_id as the decryption password.
574            // decode_from_bytes verifies the signature — a forged header will fail here.
575            let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
576
577            // Verify the original post's post_id matches the bucket's base_id
578            if original_post.post_id != request.bucket_location.base_id {
579                anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
580            }
581
582            // For Sequel buckets, additionally verify the sequel author matches the original post author
583            if request.bucket_location.bucket_type == BucketType::Sequel {
584                let original_client_id = original_post.header.client_id()?;
585                if original_client_id != client_id {
586                    anyhow::bail!("Sequel post author does not match original post author");
587                }
588            }
589        }
590
591        // Check that the post timestamp is reasonable
592        {
593            let delta = (time_millis - decoded_post.header.time_millis).abs();
594            if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
595                anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
596            }
597        }
598
599        // Check our own records to see that we are close enough to store this post
600        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
601
602        let submit_post_claim_token = match among_peers_nearer {
603            true => {
604                // Are we still willing to accept this post?
605                let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
606                let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
607                let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
608
609                // If it is not yet sealed, update our metadata
610                if !post_bundle_metadata.sealed {
611                    post_bundle_metadata.num_posts_granted += 1;
612                    post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
613                    post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
614
615                    self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
616                }
617
618                // We grant the token if we are not yet sealed
619                match post_bundle_metadata.sealed {
620                    false => {
621                        info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
622                        Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
623                    }
624                    true => {
625                        info!(
626                            "Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
627                            request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
628                        );
629                        None
630                    }
631                }
632            }
633
634            false => None,
635        };
636
637        // If we are willing to accept this post, then we are willing to track the trendiness of its hashtags
638        if submit_post_claim_token.is_some() {
639            // Track referenced hashtags for trending (only for User bucket posts, where the hashtags originate)
640            if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
641                let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
642                for referenced_hashtag in &request.referenced_hashtags {
643                    let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
644                        Ok(id) => id,
645                        Err(_) => continue, // Skip invalid hashtags silently
646                    };
647                    if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
648                        continue; // Hashtag not backed by a linked_base_id (no PoW), skip it
649                    }
650                    let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_else(HyperLogLog::new);
651                    hll.insert(author_verification_key_bytes.as_ref());
652                    self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
653                }
654            }
655        }
656
657        let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
658        Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
659    }
660
661    #[allow(non_snake_case)]
662    async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
663        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
664
665        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
666
667        let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
668        trace!("received SubmitPostCommitV1");
669
670        let peer_self = self.peer_self.read(); // Remember alphabetical locking order!
671
672        // Is the submit_post_claim_token from us?
673        if request.submit_post_claim_token.peer.id != peer_self.id {
674            anyhow::bail!("The submit_post_claim_token is not from us");
675        }
676
677        // Check that the location_id still makes sense
678        request.bucket_location.validate()?;
679        if request.bucket_location != request.submit_post_claim_token.bucket_location {
680            anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
681        }
682
683        // Check that we can decode the post with the bucket's base_id as password
684        let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
685
686        // Check that the committed post matches what was claimed
687        if decoded_post.post_id != request.submit_post_claim_token.post_id {
688            anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
689        }
690
691        // Update the postbundle and metadata
692        let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
693        let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
694        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
695
696        // We should always have the metadata, but just in case!
697        let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
698
699        // Make a PostBundle if we dont have one on disk
700        let mut post_bundle = match post_bundle_bytes {
701            Some(bytes) => {
702                let bytes = Bytes::from_owner(bytes);
703                let bundle = EncodedPostBundleV1::from_bytes(bytes, true)?;
704                bundle
705            }
706            None => {
707                let header = EncodedPostBundleHeaderV1 {
708                    time_millis: TimeMillis::zero(),
709                    location_id: request.bucket_location.location_id,
710                    overflowed: false,
711                    sealed: false,
712                    num_posts: 0,
713                    encoded_post_ids: vec![],
714                    encoded_post_lengths: vec![],
715                    encoded_post_healed: HashSet::new(),
716                    peer: self.peer_self.read().clone(),
717                    signature: Signature::zero(),
718                };
719
720                EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
721            }
722        };
723
724        // Sanity check that we don't already have this post
725        if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
726            anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
727        }
728
729        // The post bundle
730        post_bundle.header.time_millis = time_millis;
731        post_bundle.header.num_posts += 1;
732        post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
733        post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
734        post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
735        post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
736        post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
737        let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
738        posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
739        post_bundle.encoded_posts_bytes = posts_mut.freeze();
740        let post_bundle_bytes_new = post_bundle.to_bytes()?;
741
742        // The metadata
743        post_bundle_metadata.num_posts = post_bundle.header.num_posts;
744        post_bundle_metadata.overflowed = post_bundle.header.overflowed;
745        post_bundle_metadata.sealed = post_bundle.header.sealed;
746        post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
747
748        {
749            self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
750            self.environment
751                .put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
752        }
753
754        info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
755
756        let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
757
758        let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
759        Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
760    }
761
762    #[allow(non_snake_case)]
763    async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
764        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
765
766        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
767
768        let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
769        trace!("received SubmitPostFeedbackV1");
770
771        // Enforce minimum PoW for feedback submission
772        let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
773        if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
774            anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
775        }
776
777        // Check the feedback makes sense
778        request.encoded_post_feedback.pow_verify()?;
779
780        let location_id = request.bucket_location.location_id;
781
782        // Check our own records to see that we are close enough to store this post feedback
783        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
784
785        let accepted = (|| -> anyhow::Result<bool> {
786            if !among_peers_nearer {
787                return Ok(false);
788            }
789
790            // Do we recognise the post associated with this feedback
791            let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
792            let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
793            else {
794                return Ok(false);
795            };
796
797            let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
798            if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
799                return Ok(false);
800            }
801
802            Ok(true)
803        })()?;
804
805        // Update our environment with the feedback
806        if accepted {
807            trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
808            self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
809        }
810
811        let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
812        Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
813    }
814
815    #[allow(non_snake_case)]
816    async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
817        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
818
819        fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
820            let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
821            Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
822        }
823
824        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
825        let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
826        trace!("received HealPostBundleClaimV1");
827
828        // Verify the bucket_location is internally consistent and maps to the donor_header's location_id
829        request.bucket_location.validate()?;
830        if request.bucket_location.location_id != request.donor_header.location_id {
831            anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
832        }
833
834        // Verify the donor_header provided by the client is self-consistent and properly signed
835        request.donor_header.verify()?;
836
837        // Only heal if we are among the nearest peers for this location
838        let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
839        if !among_peers_nearer {
840            return generate_negatory_response();
841        }
842
843        // Reject if a heal for this location is already in progress (another client beat us to it)
844        if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
845            return generate_negatory_response();
846        }
847
848        // Load our current bundle (header only) to see what post_ids we already have
849        let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
850        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
851
852        let our_post_ids: HashSet<Id> = match post_bundle_bytes {
853            Some(bytes) => {
854                let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
855                bundle.header.encoded_post_ids.into_iter().collect()
856            }
857            None => HashSet::new(),
858        };
859
860        // Posts in the donor_header that we do not yet have, in canonical order
861        let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
862
863        if needed_post_ids.is_empty() {
864            return generate_negatory_response();
865        }
866
867        self.heal_in_progress.insert(request.donor_header.location_id, ());
868
869        let token = Some(HealPostBundleClaimTokenV1::new(
870            self.peer_self.read().clone(),
871            request.bucket_location,
872            needed_post_ids.clone(),
873            request.donor_header.signature,
874            &self.server_id.keys.signature_key,
875        ));
876        let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
877        Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
878    }
879
880    #[allow(non_snake_case)]
881    async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
882        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
883
884        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
885        let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
886        trace!("received HealPostBundleCommitV1");
887
888        // Verify the token was issued by this server
889        let peer_self = self.peer_self.read().clone();
890        if request.token.peer.id != peer_self.id {
891            anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
892        }
893        request.token.verify()?;
894
895        // Verify the donor_header matches what the token was issued for
896        if request.donor_header.signature != request.token.donor_header_signature {
897            anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
898        }
899        request.donor_header.verify()?;
900
901        if request.token.bucket_location.location_id != request.donor_header.location_id {
902            anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
903        }
904
905        let location_id = request.donor_header.location_id;
906
907        // Parse the post bytes supplied by the client (one entry per token.needed_post_id, in order)
908        let mut remaining_bytes = request.encoded_posts_bytes.clone();
909        let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
910        for post_id in &request.token.needed_post_ids {
911            let len = request
912                .donor_header
913                .encoded_post_ids
914                .iter()
915                .zip(request.donor_header.encoded_post_lengths.iter())
916                .find(|(id, _)| *id == post_id)
917                .map(|(_, len)| *len)
918                .ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
919            if remaining_bytes.len() < len {
920                anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
921            }
922            let post_bytes = remaining_bytes.split_to(len);
923            posts_to_add.push((*post_id, post_bytes));
924        }
925        if !remaining_bytes.is_empty() {
926            anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
927        }
928
929        // Validate each post decrypts successfully with the provided base_id
930        for (post_id, post_bytes) in &posts_to_add {
931            EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
932        }
933
934        // Load our current bundle (or start empty)
935        let _lock = self.environment.get_write_lock_for_location_id(&location_id);
936        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
937        let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
938        let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
939
940        let mut post_bundle = match post_bundle_bytes {
941            Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
942            None => EncodedPostBundleV1 {
943                header: EncodedPostBundleHeaderV1 {
944                    time_millis: TimeMillis::zero(),
945                    location_id,
946                    overflowed: request.donor_header.overflowed,
947                    sealed: request.donor_header.sealed,
948                    num_posts: 0,
949                    encoded_post_ids: vec![],
950                    encoded_post_lengths: vec![],
951                    encoded_post_healed: HashSet::new(),
952                    peer: peer_self.clone(),
953                    signature: Signature::zero(),
954                },
955                encoded_posts_bytes: Bytes::new(),
956            },
957        };
958
959        let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
960
961        let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
962        let mut added_any = false;
963        for (post_id, post_bytes) in posts_to_add {
964            if !our_post_ids.contains(&post_id) {
965                let len = post_bytes.len();
966                posts_mut.extend_from_slice(&post_bytes);
967                post_bundle.header.encoded_post_ids.push(post_id);
968                post_bundle.header.encoded_post_lengths.push(len);
969                post_bundle.header.encoded_post_healed.insert(post_id);
970                added_any = true;
971            }
972        }
973        post_bundle.encoded_posts_bytes = posts_mut.freeze();
974
975        if added_any {
976            post_bundle.header.time_millis = time_millis;
977            post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
978            post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
979
980            let new_bytes = post_bundle.to_bytes()?;
981            post_bundle_metadata.num_posts = post_bundle.header.num_posts;
982            post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
983
984            self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
985            self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
986
987            info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
988        }
989
990        self.heal_in_progress.invalidate(&location_id);
991
992        let response = HealPostBundleCommitResponseV1 { accepted: added_any };
993        Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
994    }
995
996    #[allow(non_snake_case)]
997    async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
998        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
999
1000        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1001        let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
1002        trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
1003
1004        // Only accept if we are among the nearest peers for this location
1005        let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
1006        if !among_peers_nearer {
1007            let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1008            return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1009        }
1010
1011        // Check we actually have a post bundle for this location (same guard as SubmitPostFeedbackV1)
1012        let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
1013        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
1014        let Some(post_bundle_bytes) = post_bundle_bytes
1015        else {
1016            let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1017            return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1018        };
1019        let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
1020
1021        let mut accepted_count: u32 = 0;
1022        for feedback in &request.encoded_post_feedbacks {
1023            // Only store feedback for posts we actually hold
1024            if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
1025                continue;
1026            }
1027            self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
1028            accepted_count += 1;
1029        }
1030
1031        if accepted_count > 0 {
1032            trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
1033        }
1034
1035        let response = HealPostBundleFeedbackResponseV1 { accepted_count };
1036        Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1037    }
1038
1039    #[allow(non_snake_case)]
1040    async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1041        anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
1042
1043        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1044        let request = CachePostBundleV1::from_bytes(&mut bytes)?;
1045        trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
1046
1047        // Verify token was issued by this server and has not expired
1048        let peer_self = self.peer_self.read().clone();
1049        if request.token.peer.id != peer_self.id {
1050            anyhow::bail!("CachePostBundleV1: token was not issued by this server");
1051        }
1052        request.token.verify()?;
1053        if request.token.is_expired(time_millis) {
1054            anyhow::bail!("CachePostBundleV1: token has expired");
1055        }
1056
1057        let mut any_accepted = false;
1058        for bundle_bytes in request.encoded_post_bundles {
1059            let parse_result: anyhow::Result<()> = try {
1060                let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
1061
1062                // Check that this proposed cache content is legitimate
1063                encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
1064
1065                let originator_peer_id = encoded_post_bundle.header.peer.id;
1066                let is_sealed = encoded_post_bundle.header.sealed;
1067                if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
1068                    any_accepted = true;
1069                }
1070            };
1071            if let Err(e) = &parse_result {
1072                warn!("CachePostBundleV1: failed to parse bundle: {}", e);
1073            }
1074        }
1075        let response = CachePostBundleResponseV1 { accepted: any_accepted };
1076        Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1077    }
1078
1079    #[allow(non_snake_case)]
1080    async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1081        anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
1082
1083        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1084        let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1085        trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
1086
1087        // Verify token was issued by this server and has not expired
1088        let peer_self = self.peer_self.read().clone();
1089        if request.token.peer.id != peer_self.id {
1090            anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
1091        }
1092        request.token.verify()?;
1093        if request.token.is_expired(time_millis) {
1094            anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
1095        }
1096
1097        let result: anyhow::Result<bool> = try {
1098            let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
1099
1100            encoded_post_bundle_feedback.verify()?;
1101
1102            let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
1103            // Feedback bundles have no sealed flag — treat as live (5-min TTL)
1104            self.post_bundle_feedback_cache
1105                .on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
1106        };
1107        let accepted = result.unwrap_or_else(|e| {
1108            warn!("CachePostBundleFeedbackV1: parse error: {}", e);
1109            false
1110        });
1111
1112        let response = CachePostBundleFeedbackResponseV1 { accepted };
1113        Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1114    }
1115
1116    #[allow(non_snake_case)]
1117    async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1118        anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
1119
1120        let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1121        trace!("received FetchUrlPreviewV1 for url={}", request.url);
1122
1123        // SSRF protection:
1124        // https:// only — blocks plaintext metadata endpoints and forces TLS cert validation.
1125        if !request.url.starts_with("https://") {
1126            anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
1127        }
1128
1129        // Extract host; reject bare IP literals (IPv4 and IPv6 bracket form).
1130        let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
1131        let host = if host_and_port.starts_with('[') {
1132            // IPv6 literal [addr] or [addr]:port
1133            host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
1134        } else {
1135            // hostname or IPv4 — strip optional port
1136            host_and_port.split(':').next().unwrap_or(host_and_port)
1137        };
1138        if host.is_empty() {
1139            anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
1140        }
1141        if host.parse::<std::net::IpAddr>().is_ok() {
1142            anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
1143        }
1144
1145        // Resolve once and validate every returned address.  Collecting into a Vec lets us
1146        // re-use the same addresses to pin the reqwest client, closing the DNS-rebinding window
1147        // (nip.io, metadata.google.internal, TTL=0 rebind, etc.).
1148        let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
1149            .await
1150            .map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
1151            .collect();
1152        if resolved_socket_addrs.is_empty() {
1153            anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
1154        }
1155        for socket_addr in &resolved_socket_addrs {
1156            let ip = socket_addr.ip();
1157            if is_ssrf_protected_ip(ip) {
1158                anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
1159            }
1160        }
1161
1162        // Build a client that:
1163        //   - resolve_to_addrs: skips re-resolution entirely — the validated IPs are used directly
1164        //   - redirect::none: prevents a server-side redirect to an unvalidated internal URL
1165        //   - no_proxy: ignores HTTP_PROXY / NO_PROXY env vars that could route to internal hosts
1166        //   - times out quickly in unreasonable scenarios
1167        //   - limits download size
1168        let http_client = reqwest::Client::builder()
1169            .connect_timeout(std::time::Duration::from_secs(1))
1170            .timeout(std::time::Duration::from_secs(3))
1171            .user_agent("hashiverse-preview/1.0")
1172            .resolve_to_addrs(host, &resolved_socket_addrs)
1173            .redirect(reqwest::redirect::Policy::none())
1174            .no_proxy()
1175            .build()?;
1176
1177        const URL_FETCH_MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
1178        let mut http_response = http_client.get(&request.url).send().await?;
1179
1180        // Reject early if Content-Length already exceeds the limit — before reading any body bytes.
1181        if let Some(content_length) = http_response.content_length() {
1182            if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
1183                anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
1184            }
1185        }
1186        let mut body_bytes = BytesMut::new();
1187        while let Some(chunk) = http_response.chunk().await? {
1188            // body_bytes.len() is already within the limit; only the new chunk can overflow.
1189            // We copy only as many bytes as fit rather than bailing, so a single large chunk
1190            // (which reqwest has already buffered) doesn't cause an error — we just truncate.
1191            // This is sufficient for HTML preview extraction which only needs the <head>.
1192            let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
1193            body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
1194            if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
1195                break;
1196            }
1197        }
1198        let html = String::from_utf8_lossy(&body_bytes).into_owned();
1199
1200        let preview_data = url_preview::extract_url_preview(&html);
1201
1202        let response = FetchUrlPreviewResponseV1 {
1203            url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
1204            title: preview_data.title,
1205            description: preview_data.description,
1206            image_url: preview_data.image_url,
1207        };
1208
1209        Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1210    }
1211
1212    #[allow(non_snake_case)]
1213    async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1214        anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
1215
1216        let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
1217        trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
1218
1219        let time_millis = self.runtime_services.time_provider.current_time_millis();
1220
1221        // Check if we have a cached response that is less than a few minutes old
1222        let cached_response = {
1223            let cache = self.trending_hashtags_response_cache.lock();
1224            match cache.as_ref() {
1225                Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
1226                    Some(cached_response.clone())
1227                }
1228                _ => None,
1229            }
1230        };
1231
1232        let mut response = match cached_response {
1233            Some(mut cached) => {
1234                cached.trending_hashtags.truncate(request.limit as usize);
1235                cached
1236            }
1237            None => {
1238                // Recalculate: iterate the trending_hashtags cache, sort by unique author count
1239                let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
1240                    .map(|(hashtag, hll)| TrendingHashtagV1 {
1241                        hashtag: hashtag.as_ref().clone(),
1242                        count: hll.count(),
1243                    })
1244                    .filter(|entry| entry.count > 0)
1245                    .collect();
1246
1247                trending_hashtags.sort_by(|a, b| b.count.cmp(&a.count));
1248
1249                let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
1250
1251                // Cache the full response (real trending only — fallbacks are applied per-response after truncation)
1252                {
1253                    let mut cache = self.trending_hashtags_response_cache.lock();
1254                    *cache = Some((time_millis, full_response.clone()));
1255                }
1256
1257                let mut truncated_response = full_response;
1258                truncated_response.trending_hashtags.truncate(request.limit as usize);
1259                truncated_response
1260            }
1261        };
1262
1263        top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
1264
1265        Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1266    }
1267
1268    #[allow(non_snake_case)]
1269    async fn dispatch_network_payload_x_PeerStatsRequestV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1270        anyhow_assert_eq!(&PayloadRequestKind::PeerStatsRequestV1, &payload_request_kind);
1271
1272        let _request = PeerStatsRequestV1::from_bytes(&bytes)?;
1273
1274        let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for PeerStatsRequestV1"))?;
1275        if pow.pow < config::POW_MINIMUM_PER_PEER_STATS {
1276            anyhow::bail!("Insufficient pow for PeerStatsRequestV1: {} < {}", pow.pow, config::POW_MINIMUM_PER_PEER_STATS);
1277        }
1278
1279        let time_millis = self.runtime_services.time_provider.current_time_millis();
1280
1281        // Cache check — hand back the same signed blob within the TTL so callers
1282        // re-sharing it operate on a single canonical byte sequence per minute.
1283        let cached_response = {
1284            let cache = self.peer_stats_response_cache.lock();
1285            match cache.as_ref() {
1286                Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(60) => Some(cached_response.clone()),
1287                _ => None,
1288            }
1289        };
1290
1291        let response = match cached_response {
1292            Some(cached) => cached,
1293            None => {
1294                let doc = serde_json::json!({
1295                    "version":     env!("CARGO_PKG_VERSION"),
1296                    "requests":    request_counts_subtree(&self.request_counters),
1297                    "system":      system_stats_subtree(),
1298                    "kademlia":    kademlia_stats_subtree(&self.kademlia.read()),
1299                    "environment": environment_stats_subtree(&self.environment),
1300                });
1301
1302                let json_bytes = serde_json::to_vec(&doc)?;
1303                let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
1304
1305                let signing_input = PeerStatsResponseV1::signing_input(time_millis, &json_compressed);
1306                let signature = signing::sign(&self.server_id.keys.signature_key, &signing_input);
1307
1308                let response = PeerStatsResponseV1 {
1309                    peer: self.peer_self.read().clone(),
1310                    timestamp: time_millis,
1311                    json_compressed,
1312                    signature,
1313                };
1314
1315                *self.peer_stats_response_cache.lock() = Some((time_millis, response.clone()));
1316                response
1317            }
1318        };
1319
1320        Ok((PayloadResponseKind::PeerStatsResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1321    }
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326    use super::*;
1327
1328    fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
1329        TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
1330    }
1331
1332    #[test]
1333    fn top_up_adds_fallback_when_empty() {
1334        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1335        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
1336        assert_eq!(trending_hashtags.len(), 2);
1337        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1338        assert_eq!(trending_hashtags[0].count, 0);
1339        assert_eq!(trending_hashtags[1].hashtag, "#news");
1340        assert_eq!(trending_hashtags[1].count, 0);
1341    }
1342
1343    #[test]
1344    fn top_up_respects_limit_smaller_than_fallback_list() {
1345        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1346        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
1347        assert_eq!(trending_hashtags.len(), 1);
1348        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1349    }
1350
1351    #[test]
1352    fn top_up_preserves_fallback_order() {
1353        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1354        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
1355        assert_eq!(trending_hashtags[0].hashtag, "#first");
1356        assert_eq!(trending_hashtags[1].hashtag, "#second");
1357        assert_eq!(trending_hashtags[2].hashtag, "#third");
1358    }
1359
1360    #[test]
1361    fn top_up_is_noop_when_already_at_limit() {
1362        let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
1363        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
1364        assert_eq!(trending_hashtags.len(), 2);
1365        assert_eq!(trending_hashtags[0].hashtag, "#rust");
1366        assert_eq!(trending_hashtags[1].hashtag, "#golang");
1367    }
1368
1369    #[test]
1370    fn top_up_partially_fills_when_real_trending_exists() {
1371        let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
1372        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1373        assert_eq!(trending_hashtags.len(), 3);
1374        assert_eq!(trending_hashtags[0].hashtag, "#rust");
1375        assert_eq!(trending_hashtags[0].count, 10);
1376        assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
1377        assert_eq!(trending_hashtags[1].count, 0);
1378        assert_eq!(trending_hashtags[2].hashtag, "#news");
1379        assert_eq!(trending_hashtags[2].count, 0);
1380    }
1381
1382    #[test]
1383    fn top_up_skips_fallback_already_present_exact_match() {
1384        let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
1385        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1386        assert_eq!(trending_hashtags.len(), 2);
1387        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1388        assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
1389        assert_eq!(trending_hashtags[1].hashtag, "#news");
1390        assert_eq!(trending_hashtags[1].count, 0);
1391    }
1392
1393    #[test]
1394    fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
1395        // Existing entry "HashiVerse" (no `#`, mixed case) should match fallback "#hashiverse"
1396        let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
1397        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1398        assert_eq!(trending_hashtags.len(), 2);
1399        assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
1400        assert_eq!(trending_hashtags[1].hashtag, "#news");
1401    }
1402
1403    #[test]
1404    fn top_up_with_empty_fallback_is_noop() {
1405        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1406        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
1407        assert_eq!(trending_hashtags.len(), 0);
1408    }
1409
1410    #[test]
1411    fn top_up_handles_zero_limit() {
1412        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1413        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
1414        assert_eq!(trending_hashtags.len(), 0);
1415    }
1416
1417    #[test]
1418    fn top_up_exhausts_fallback_without_reaching_limit() {
1419        // Limit is larger than what real trending + fallback together can satisfy
1420        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1421        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
1422        assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
1423    }
1424
1425    mod peer_stats {
1426        use super::*;
1427        use crate::environment::mem_environment_store::MemEnvironmentFactory;
1428        use crate::environment::environment::EnvironmentFactory;
1429        use crate::server::args::Args;
1430        use crate::server::hashiverse_server::HashiverseServer;
1431        use hashiverse_lib::protocol::payload::payload::{PAYLOAD_REQUEST_KIND_COUNT, PeerStatsRequestV1, PeerStatsResponseV1};
1432        use hashiverse_lib::protocol::peer::PeerPow;
1433        use hashiverse_lib::tools::compression;
1434        use hashiverse_lib::tools::parallel_pow_generator::StubParallelPowGenerator;
1435        use hashiverse_lib::tools::runtime_services::RuntimeServices;
1436        use hashiverse_lib::tools::time::TimeMillis;
1437        use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
1438        use hashiverse_lib::tools::types::{Pow, VerificationKey};
1439        use hashiverse_lib::transport::mem_transport::MemTransportFactory;
1440        use std::sync::Arc;
1441        use std::sync::atomic::Ordering;
1442
1443        async fn make_server() -> anyhow::Result<Arc<HashiverseServer>> {
1444            let time_provider = Arc::new(RealTimeProvider::default());
1445            let transport_factory = MemTransportFactory::default();
1446            let pow_generator = Arc::new(StubParallelPowGenerator::new());
1447            let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
1448            let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
1449            let args = Args::default_for_testing();
1450            HashiverseServer::new(runtime_services, environment_factory, args).await
1451        }
1452
1453        /// Build a synthetic PeerPow with the requested pow value. The handler under
1454        /// test only checks the threshold against `config::POW_MINIMUM_PER_PEER_STATS`
1455        /// and does not re-verify the underlying PoW computation, so this is enough.
1456        fn synthetic_pow(pow: Pow) -> PeerPow {
1457            let mut peer_pow = PeerPow::zero();
1458            peer_pow.pow = pow;
1459            peer_pow
1460        }
1461
1462        fn empty_request_bytes() -> Bytes {
1463            PeerStatsRequestV1 {}.to_bytes().expect("PeerStatsRequestV1 must serialise")
1464        }
1465
1466        fn decode_doc(response: &PeerStatsResponseV1) -> serde_json::Value {
1467            let bytes = compression::decompress(&response.json_compressed).expect("decompress doc").to_bytes();
1468            serde_json::from_slice(&bytes).expect("doc must be valid JSON")
1469        }
1470
1471        #[tokio::test]
1472        async fn rejects_insufficient_pow() {
1473            let server = make_server().await.expect("server must start");
1474            let result = server
1475                .dispatch_network_payload_x_PeerStatsRequestV1(
1476                    CancellationToken::new(),
1477                    Some(synthetic_pow(Pow(config::POW_MINIMUM_PER_PEER_STATS.0.saturating_sub(1)))),
1478                    PayloadRequestKind::PeerStatsRequestV1,
1479                    empty_request_bytes(),
1480                )
1481                .await;
1482            assert!(result.is_err(), "expected insufficient PoW to be rejected");
1483        }
1484
1485        #[tokio::test]
1486        async fn returns_response_with_expected_top_level_keys() {
1487            let server = make_server().await.expect("server must start");
1488            let (response_kind, gatherer) = server
1489                .dispatch_network_payload_x_PeerStatsRequestV1(
1490                    CancellationToken::new(),
1491                    Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1492                    PayloadRequestKind::PeerStatsRequestV1,
1493                    empty_request_bytes(),
1494                )
1495                .await
1496                .expect("handler must succeed at threshold pow");
1497            assert_eq!(response_kind, PayloadResponseKind::PeerStatsResponseV1);
1498
1499            let response_bytes = gatherer.to_bytes();
1500            let response = PeerStatsResponseV1::from_bytes(&response_bytes).expect("response must decode");
1501            let doc = decode_doc(&response);
1502
1503            assert!(doc.get("version").and_then(|v| v.as_str()).map(|s| !s.is_empty()).unwrap_or(false), "version must be a non-empty string");
1504            assert_eq!(doc["version"].as_str().unwrap(), env!("CARGO_PKG_VERSION"));
1505            assert!(doc.get("requests").is_some(), "requests subtree missing");
1506            assert!(doc.get("system").is_some(), "system subtree missing");
1507            assert!(doc.get("kademlia").is_some(), "kademlia subtree missing");
1508            assert!(doc.get("environment").is_some(), "environment subtree missing");
1509
1510            for key in ["memory_total_bytes", "memory_free_bytes", "disk_total_bytes", "disk_free_bytes", "load_1m", "load_5m", "load_15m"] {
1511                assert!(doc["system"].get(key).map(|v| v.is_number()).unwrap_or(false), "system.{key} must be a number");
1512            }
1513            for key in ["post_bundle_count", "post_bundle_feedback_count", "post_bundle_total_bytes"] {
1514                assert!(doc["environment"].get(key).map(|v| v.is_number()).unwrap_or(false), "environment.{key} must be a number");
1515            }
1516        }
1517
1518        #[tokio::test]
1519        async fn counters_reflect_recorded_dispatches() {
1520            let server = make_server().await.expect("server must start");
1521
1522            // Simulate inbound PingV1 dispatches by bumping the same counter the
1523            // wrap_and_dispatch path bumps. This avoids the full RPC envelope dance
1524            // while exercising the read path used by the stats handler.
1525            for _ in 0..7 {
1526                server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(1, Ordering::Relaxed);
1527            }
1528
1529            let (_, gatherer) = server
1530                .dispatch_network_payload_x_PeerStatsRequestV1(
1531                    CancellationToken::new(),
1532                    Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1533                    PayloadRequestKind::PeerStatsRequestV1,
1534                    empty_request_bytes(),
1535                )
1536                .await
1537                .expect("handler must succeed");
1538            let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1539            let doc = decode_doc(&response);
1540            assert_eq!(doc["requests"]["PingV1"].as_u64(), Some(7));
1541        }
1542
1543        #[tokio::test]
1544        async fn cache_returns_byte_identical_response_within_ttl() {
1545            let server = make_server().await.expect("server must start");
1546            let pow = synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS);
1547
1548            let (_, gatherer_a) = server
1549                .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow.clone()), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1550                .await
1551                .expect("first call must succeed");
1552            let bytes_a = gatherer_a.to_bytes();
1553
1554            // Mutate a counter between calls; if the cache hands back a freshly
1555            // built response we'd see the new count, but the cached blob predates it.
1556            server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(99, Ordering::Relaxed);
1557
1558            let (_, gatherer_b) = server
1559                .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1560                .await
1561                .expect("second call must succeed");
1562            let bytes_b = gatherer_b.to_bytes();
1563
1564            assert_eq!(bytes_a, bytes_b, "cached response should be byte-identical across the TTL");
1565        }
1566
1567        #[tokio::test]
1568        async fn signature_verifies_and_fails_on_tamper() {
1569            let server = make_server().await.expect("server must start");
1570            let (_, gatherer) = server
1571                .dispatch_network_payload_x_PeerStatsRequestV1(
1572                    CancellationToken::new(),
1573                    Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1574                    PayloadRequestKind::PeerStatsRequestV1,
1575                    empty_request_bytes(),
1576                )
1577                .await
1578                .expect("handler must succeed");
1579            let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1580
1581            let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes).expect("verification key must decode");
1582            let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
1583            signing::verify(&verification_key, &response.signature, &signing_input).expect("signature must verify against transmitted bytes");
1584
1585            // Mutate one byte of the compressed JSON: verification must fail.
1586            let mut tampered = response.json_compressed.to_vec();
1587            let tamper_index = tampered.len() / 2;
1588            tampered[tamper_index] ^= 0xff;
1589            let tampered_signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &tampered);
1590            assert!(signing::verify(&verification_key, &response.signature, &tampered_signing_input).is_err(), "verification must fail when json_compressed is tampered");
1591
1592            // Mutate the timestamp: verification must also fail.
1593            let bumped_signing_input = PeerStatsResponseV1::signing_input(TimeMillis(response.timestamp.0 + 1), &response.json_compressed);
1594            assert!(signing::verify(&verification_key, &response.signature, &bumped_signing_input).is_err(), "verification must fail when timestamp is mutated");
1595        }
1596
1597        #[test]
1598        fn request_counts_subtree_covers_every_variant() {
1599            // Belt-and-braces guard for PAYLOAD_REQUEST_KIND_COUNT staying in lockstep
1600            // with the enum at the server-stats layer.
1601            let counters: [std::sync::atomic::AtomicU64; PAYLOAD_REQUEST_KIND_COUNT] = std::array::from_fn(|_| std::sync::atomic::AtomicU64::new(0));
1602            let subtree = request_counts_subtree(&counters);
1603            let map = subtree.as_object().expect("request_counts subtree must be an object");
1604            assert_eq!(map.len(), PAYLOAD_REQUEST_KIND_COUNT);
1605        }
1606    }
1607}
1608