Skip to main content

hashiverse_server_lib/server/handlers/
dispatch.rs

1//! # Inbound RPC dispatch loop
2//!
3//! The hot path of the server: a single async loop that drains `IncomingRequest`s
4//! from the transport's `mpsc::Receiver`, decodes each packet via
5//! [`hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx`], and routes
6//! by [`hashiverse_lib::protocol::payload::payload::PayloadRequestKind`] to the
7//! correct per-op handler (bootstrap, announce, get/submit/heal/cache post
8//! bundles, get/submit/heal/cache feedback, fetch URL preview, trending
9//! hashtags, ping).
10//!
11//! Per-request safety checks happen before any real work:
12//!
13//! - **PoW verification** — the packet's PoW must be sufficient for *this* server's
14//!   identity. Anything under-powered or stale is dropped immediately.
15//! - **Replay protection** — a short-lived salt cache rejects salts we've already
16//!   seen, so a valid signed request can't be replayed from another network vantage.
17//! - **Peer upgrade** — if the caller's embedded [`hashiverse_lib::protocol::peer::Peer`] carries a stronger PoW
18//!   than what we have in the tracker, the tracker is upgraded in place.
19//!
20//! The loop respects a `CancellationToken` so graceful shutdown drains in-flight
21//! work and stops accepting new requests cleanly.
22
23use crate::environment::environment::PostBundleMetadata;
24use crate::server::hashiverse_server::HashiverseServer;
25use crate::tools::tools::is_ssrf_protected_ip;
26use bytes::{Bytes, BytesMut};
27use hashiverse_lib::anyhow_assert_eq;
28use hashiverse_lib::protocol::payload::payload::{
29    AnnounceResponseV1, AnnounceResponseV2, AnnounceV1, AnnounceV2, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
30    GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
31    HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1,
32    SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
33};
34use hashiverse_lib::protocol::peer::PeerPow;
35use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
36use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
37use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
38use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
39use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
40use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
41use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
42use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
43use hashiverse_lib::tools::types::{Id, Signature};
44use hashiverse_lib::tools::{hashing, url_preview};
45use hashiverse_lib::tools::{compression, config, json, signing, BytesGatherer};
46use hashiverse_lib::transport::transport::IncomingRequest;
47use log::{info, trace, warn};
48use std::collections::HashSet;
49use std::sync::atomic::Ordering;
50
51use crate::server::stats::{environment_stats_subtree, kademlia_stats_subtree, request_counts_subtree, system_stats_subtree};
52use tokio::sync::mpsc;
53use tokio_util::sync::CancellationToken;
54
55/// Fallback hashtags used to top up a trending-hashtags response when the server
56/// does not yet know enough real trending hashtags to satisfy the requested limit.
57/// Applied in order, skipping any entry whose normalised form is already present
58/// in the real trending list. Filler entries are returned with `count = 0` so
59/// clients can distinguish seeded fillers from genuine trending data.
60const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
61
62/// Normalise a hashtag for equality comparison: lowercase, with any leading `#`
63/// stripped. Mirrors the canonicalisation performed by `Id::from_hashtag_str`.
64fn normalise_hashtag(hashtag: &str) -> String {
65    let lowercased = hashtag.to_lowercase();
66    match lowercased.strip_prefix('#') {
67        Some(stripped) => stripped.to_string(),
68        None => lowercased,
69    }
70}
71
72/// Top up `trending_hashtags` from `fallback_hashtags` (in order) until it reaches
73/// `limit`, skipping any fallback whose normalised form already appears in the list.
74/// Filler entries are inserted with `count = 0`. No-op if the list is already at
75/// or above the limit.
76fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
77    let target_length = limit as usize;
78    if trending_hashtags.len() >= target_length {
79        return;
80    }
81
82    let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
83        .map(|entry| normalise_hashtag(&entry.hashtag))
84        .collect();
85
86    for fallback_hashtag in fallback_hashtags {
87        if trending_hashtags.len() >= target_length {
88            break;
89        }
90        let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
91        if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
92            continue;
93        }
94        trending_hashtags.push(TrendingHashtagV1 {
95            hashtag: (*fallback_hashtag).to_string(),
96            count: 0,
97        });
98        existing_normalised_hashtags.insert(normalised_fallback_hashtag);
99    }
100}
101
102impl HashiverseServer {
103    pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
104        loop {
105            tokio::select! {
106                _ = cancellation_token.cancelled() => { break },
107
108                receipt = rx.recv() => {
109                    match receipt {
110                        Some(incoming) => {
111                            // trace!("dispatch_network_envelopes received bytes={:?}", incoming.bytes);
112                            let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
113                            match result {
114                                Ok(bytes) => {
115                                    let result = incoming.reply.send(bytes);
116                                    if result.is_err() { warn!("failed to send reply"); }
117                                },
118                                Err(e) => {
119                                    warn!("failed to process packet from {}: {}", incoming.caller_address, e);
120                                    incoming.report_bad_request();
121                                    drop(incoming.reply);
122                                },
123                            }
124                        },
125                        None => {
126                            warn!("channel closed");
127                            break;
128                        }
129                    }
130                }
131            }
132        }
133
134        Ok(())
135    }
136
137    async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
138        let caller_address = incoming.caller_address.as_str();
139        let current_time_millis = self.runtime_services.time_provider.current_time_millis();
140
141        // Decode the envelope
142        let rpc_request_packet_rx = RpcRequestPacketRx::decode(&current_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
143        // trace!("payload_request_kind={}", rpc_request_packet_rx.payload_request_kind);
144
145        // Count this inbound request. After decoding (so malformed traffic doesn't
146        // pollute the totals) but before the per-handler PoW check (so failed-PoW
147        // attempts still show up — adversarial load is the load we'd want to see).
148        let payload_request_kind_index = rpc_request_packet_rx.payload_request_kind.clone() as usize;
149        self.request_counters[payload_request_kind_index].fetch_add(1, Ordering::Relaxed);
150        // Also feed the decaying per-window rate estimates so the stats reflect
151        // recent load, not just the monotonic all-time total.
152        self.request_rate_windows[payload_request_kind_index].lock().record(current_time_millis);
153
154        // Check that we have not seen this salt recently (stops replay attacks)
155        {
156            if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
157                anyhow::bail!("replay detected: salt already seen");
158            }
159            self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
160        }
161
162        // Keep this for our response
163        let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
164
165        let dispatch_result: anyhow::Result<BytesGatherer> = try {
166            // Check that the pow is meaningful
167            let pow = match rpc_request_packet_rx.pow_server_known {
168                true => {
169                    let (pow, improved_pow_current_day, improved_pow_current_month) = {
170                        let peer_self = self.peer_self.read(); // Remember alphabetical locking order!
171                        let pow = PeerPow::new(
172                            rpc_request_packet_rx.pow_sponsor_id,
173                            &peer_self.verification_key_bytes,
174                            &peer_self.pq_commitment_bytes,
175                            rpc_request_packet_rx.pow_timestamp,
176                            rpc_request_packet_rx.pow_content_hash,
177                            rpc_request_packet_rx.pow_salt,
178                        )?;
179
180                        let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
181                        let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
182
183                        (pow, improved_pow_current_day, improved_pow_current_month)
184                    };
185
186                    // Check if we need to modify peer_self
187                    if improved_pow_current_day || improved_pow_current_month {
188                        let mut peer_self = self.peer_self.write(); // Remember alphabetical locking order!
189                        if improved_pow_current_day {
190                            trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
191                            peer_self.pow_current_day = pow.clone();
192                        }
193                        if improved_pow_current_month {
194                            trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
195                            peer_self.pow_current_month = pow.clone();
196                        }
197
198                        peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
199                    }
200
201                    Some(pow)
202                }
203
204                false => {
205                    // Only some request types are allowed anonymous pow
206                    match rpc_request_packet_rx.payload_request_kind {
207                        PayloadRequestKind::BootstrapV1 => {}
208                        _ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
209                    }
210
211                    None
212                }
213            };
214
215            // Dispatch appropriately
216            let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
217            let response_flags = match compress_response {
218                true => RpcResponsePacketTxFlags::COMPRESSED,
219                false => RpcResponsePacketTxFlags::empty(),
220            };
221
222            // Encode response
223            RpcResponsePacketTx::encode(
224                &self.server_id.keys.signature_key,
225                &self.server_id.keys.verification_key_bytes,
226                &self.server_id.keys.pq_commitment_bytes,
227                &self.server_id.sponsor_id,
228                &self.server_id.timestamp,
229                &self.server_id.hash,
230                &self.server_id.salt,
231                &pow_content_hash,
232                response_flags,
233                payload_response_kind,
234                payload,
235            )?
236        };
237
238        match dispatch_result {
239            Ok(results) => Ok(results),
240            Err(e) => {
241                warn!("failed to dispatch packet from {}: {}", caller_address, e);
242                incoming.report_bad_request();
243
244                let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
245                let response = ErrorResponseV1 { code: 0, message: e.to_string() };
246                let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
247
248                // Encode response
249                RpcResponsePacketTx::encode(
250                    &self.server_id.keys.signature_key,
251                    &self.server_id.keys.verification_key_bytes,
252                    &self.server_id.keys.pq_commitment_bytes,
253                    &self.server_id.sponsor_id,
254                    &self.server_id.timestamp,
255                    &self.server_id.hash,
256                    &self.server_id.salt,
257                    &pow_content_hash,
258                    RpcResponsePacketTxFlags::COMPRESSED,
259                    payload_response_kind,
260                    payload,
261                )
262            }
263        }
264    }
265
266    async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
267        // Where do we want to decide if we should compress?  Here in one block, or at the end of each individual dispatch_xxx?
268        let compress_response = match rpc_request_packet_rx.payload_request_kind {
269            PayloadRequestKind::GetPostBundleV1 => false,   // We don't compress these again as they are already predominantly compressed
270            PayloadRequestKind::CachePostBundleV1 => false, // We don't compress these again as they are already predominantly compressed
271            _ => true,
272        };
273
274        let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
275            PayloadRequestKind::ErrorV1 => {
276                anyhow::bail!("Received ErrorV1");
277            }
278            PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
279            PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
280            PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
281            PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
282            PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
283            PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
284            PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
285            PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
286            PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
287            PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
288            PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
289            PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
290            PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
291            PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
292            PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
293            PayloadRequestKind::PeerStatsRequestV1 => self.dispatch_network_payload_x_PeerStatsRequestV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
294            PayloadRequestKind::AnnounceV2 => self.dispatch_network_payload_x_AnnounceV2(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
295        };
296
297        Ok((compress_response, payload_response_kind, payload))
298    }
299
300    #[allow(non_snake_case)]
301    async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
302        anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
303        let peer = self.peer_self.read().clone();
304        let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
305        Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
306    }
307
308    #[allow(non_snake_case)]
309    async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
310        anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
311        let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
312        let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
313        Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
314    }
315
316    #[allow(non_snake_case)]
317    async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
318        anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
319
320        let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
321
322        let peer = request.peer_self;
323        let peer_id = peer.id;
324
325        // V1 is deprecated in favour of V2 (cert-gated). Log every inbound V1 so production
326        // log volume drops as senders upgrade; this lets us pick a V1 cut-off date from log
327        // grep rather than guessing. No rate-limiting needed: per-RPC PoW (see
328        // [[pow-is-rate-limit]]) already gates inbound frequency per source.
329        warn!("received deprecated AnnounceV1 from peer {} at {}; V1 will be removed in a future release", peer_id, peer.address);
330
331        // Check that this Peer checks out and add it to our kademlia
332        self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
333
334        let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
335
336        let json = json::struct_to_bytes(&AnnounceResponseV1 {
337            peer_self: self.peer_self.read().clone(),
338            peers_nearest,
339        })?;
340        Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
341    }
342
343    #[allow(non_snake_case)]
344    async fn dispatch_network_payload_x_AnnounceV2(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
345        anyhow_assert_eq!(&PayloadRequestKind::AnnounceV2, &payload_request_kind);
346
347        let request = json::bytes_to_struct::<AnnounceV2>(&bytes)?;
348
349        let peer = request.peer_self;
350        let peer_id = peer.id;
351        let now = self.runtime_services.time_provider.as_ref().current_time_millis();
352
353        // Gate Kademlia admission on the transport-specific ownership proof. For HTTPS that's
354        // an offline X.509 chain validation against bundled webpki-roots with an IP-SAN match;
355        // for mem / TCP it's the empty-marker check. A failed proof is a hard error: no
356        // Kademlia add, no nearest-peers list returned. The dispatcher's Err -> ErrorResponseV1
357        // wrapper also reports the request as bad to the DDoS layer, so repeat offenders get
358        // throttled by per-IP scoring.
359        let ownership_proof = self.transport_server.get_transport_ownership_proof();
360        if !ownership_proof.prove(&peer, &request.proof_payload, now) {
361            anyhow::bail!("AnnounceV2 from peer {} at {} failed ownership proof", peer_id, peer.address);
362        }
363        self.add_potential_peer_to_kademlia(peer, now).await;
364
365        let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
366
367        let json = json::struct_to_bytes(&AnnounceResponseV2 {
368            peer_self: self.peer_self.read().clone(),
369            peers_nearest,
370        })?;
371        Ok((PayloadResponseKind::AnnounceResponseV2, BytesGatherer::from_bytes(json)))
372    }
373
374    #[allow(non_snake_case)]
375    async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
376        anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
377
378        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
379
380        let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
381        trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
382
383        // Check that the location_id makes sense
384        request.bucket_location.validate()?;
385
386        // Store the provided Peers
387        {
388            for peer in request.peers_visited {
389                self.add_potential_peer_to_kademlia(peer, time_millis).await;
390            }
391        }
392
393        let peer_self = self.peer_self.read().clone();
394
395        // Check our own records to see that we are close enough to store this post
396        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
397        if !among_peers_nearer {
398            warn!("I am not in peers_nearer {}", peer_self);
399        }
400
401        let post_bundle = match among_peers_nearer {
402            true => {
403                // At some point, for parallelisms sake, we may wish to replace this with a read lock followed by a release and write lock and recheck
404                // But then again - if our server is getting load, the federated cache mechanism should alleviate it, so this may be overkill...
405                let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
406
407                let mut encoded_post_bundle_bytes: Option<Bytes> = None;
408
409                // Try to load bytes from the disk (if we even have them)
410                let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
411                if let Some(mut post_bundle_metadata) = post_bundle_metadata {
412                    encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
413
414                    // Has the PostBundle become sealed since the last time it was written to disk?  Perhaps enough time has passed
415                    if !post_bundle_metadata.sealed {
416                        let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
417                        if sealed {
418                            // We can rewrite the postbundle on disk for the final time now that it is sealed
419                            if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
420                                let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
421                                encoded_post_bundle.header.time_millis = time_millis;
422                                encoded_post_bundle.header.sealed = true;
423                                encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
424                                let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
425                                self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
426                                encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
427                            }
428
429                            // And the updated metadata
430                            post_bundle_metadata.sealed = true;
431                            self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
432                        }
433                        else {
434                            // We must simply update the timestamp - we need this for client caching.  It's expensive, but caching should help us out here if it is a truly busy bucket
435                            if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
436                                let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
437                                encoded_post_bundle.header.time_millis = time_millis;
438                                encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
439                                let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
440                                encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
441                            }
442                        }
443                    }
444                };
445
446                // If we dont have the metadata, or the bytes on disk, return a fresh one
447                // Generally if we have the metadata, we should always have bytes on disk, except if a request ot post was granted but they then never came back with the data...
448                if encoded_post_bundle_bytes.is_none() {
449                    let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
450
451                    let mut header = EncodedPostBundleHeaderV1 {
452                        time_millis,
453                        location_id: request.bucket_location.location_id,
454                        overflowed: false,
455                        sealed,
456                        num_posts: 0,
457                        encoded_post_ids: vec![],
458                        encoded_post_lengths: vec![],
459                        encoded_post_healed: HashSet::new(),
460                        peer: peer_self.clone(),
461                        signature: Signature::zero(),
462                    };
463                    header.signature_generate(&self.server_id.keys.signature_key)?;
464
465                    let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
466                    encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
467                }
468
469                encoded_post_bundle_bytes
470            }
471            false => None,
472        };
473
474        let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
475
476        let get_post_bundle_response = GetPostBundleResponseV1 {
477            peers_nearer,
478            cache_request_token: cache_result.cache_request_token,
479            post_bundles_cached: cache_result.cached_items,
480            post_bundle,
481        };
482        Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
483    }
484
485    #[allow(non_snake_case)]
486    async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
487        anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
488
489        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
490
491        let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
492        trace!("received GetPostBundleFeedbackV1");
493
494        // Store the provided Peers
495        {
496            for peer in request.peers_visited {
497                self.add_potential_peer_to_kademlia(peer, time_millis).await;
498            }
499        }
500
501        // Check our own records to see that we are close enough to store this post
502        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
503
504        let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
505
506        if among_peers_nearer {
507            // We only support feedbacks if we know about this post_bundle
508            let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
509            if post_bundle_metadata.is_some() {
510                post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
511            }
512        }
513
514        // Wrap the feedbacks with a header (if we have any)
515        let peer_self = self.peer_self.read().clone();
516        let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
517            Some(feedbacks_bytes) => {
518
519                let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
520
521                let mut header = EncodedPostBundleFeedbackHeaderV1 {
522                    time_millis,
523                    location_id: request.bucket_location.location_id,
524                    feedbacks_bytes_hash,
525                    peer: peer_self.clone(),
526                    signature: Signature::zero(),
527                };
528                header.signature_generate(&self.server_id.keys.signature_key);
529
530                let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
531                    header,
532                    feedbacks_bytes,
533                };
534                Some(encoded_post_bundle_feedback.to_bytes()?)
535            }
536            None => None,
537        };
538
539        let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
540
541        let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
542            peers_nearer,
543            cache_request_token: cache_result.cache_request_token,
544            post_bundle_feedbacks_cached: cache_result.cached_items,
545            encoded_post_bundle_feedback,
546        };
547        Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
548    }
549
550    #[allow(non_snake_case)]
551    async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
552        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
553
554        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
555
556        let pow = match pow {
557            Some(pow) => pow,
558            None => anyhow::bail!("We need pow for a submit post claim"),
559        };
560
561        let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
562        trace!("received SubmitPostClaimV1");
563
564        // Check that the location_id makes sense
565        request.bucket_location.validate()?;
566
567        // Check that we support the bucket duration
568        let bucket_duration = {
569            let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
570            match bucket_duration {
571                Some(bucket_duration) => *bucket_duration,
572                None => anyhow::bail!("Unrecognised bucket duration provided"),
573            }
574        };
575
576        let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
577
578        // Check that enough pow has been done for this post
579        {
580            let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
581            if pow.pow < pow_minimum {
582                anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
583            }
584        }
585
586        // Check that the post matches the bucket
587        {
588            // Ensure that the bucket timestamp fits the post
589            let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
590            if timestamp != request.bucket_location.bucket_time_millis {
591                anyhow::bail!("The post timestamp does not match the bucket");
592            }
593        }
594
595        let client_id = decoded_post.header.client_id()?;
596
597        // Ensure that the base_id is related to the post in the linked_base_ids.
598        if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
599            anyhow::bail!("The base_id is not related to the post");
600        }
601
602        // Check that only the posting user is allowed to post to a bucket of type USER
603        if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
604            anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
605        }
606
607        // For ReplyToPost and Sequel buckets, verify the referenced post is real (valid signature).
608        // For Sequel buckets, additionally verify same-author.
609        if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
610            let original_header_bytes = request.referenced_post_header_bytes
611                .ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
612
613            // Decode the original post header using the submitter's client_id as the decryption password.
614            // decode_from_bytes verifies the signature — a forged header will fail here.
615            let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
616
617            // Verify the original post's post_id matches the bucket's base_id
618            if original_post.post_id != request.bucket_location.base_id {
619                anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
620            }
621
622            // For Sequel buckets, additionally verify the sequel author matches the original post author
623            if request.bucket_location.bucket_type == BucketType::Sequel {
624                let original_client_id = original_post.header.client_id()?;
625                if original_client_id != client_id {
626                    anyhow::bail!("Sequel post author does not match original post author");
627                }
628            }
629        }
630
631        // Check that the post timestamp is reasonable
632        {
633            let delta = (time_millis - decoded_post.header.time_millis).abs();
634            if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
635                anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
636            }
637        }
638
639        // Check our own records to see that we are close enough to store this post
640        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
641
642        let submit_post_claim_token = match among_peers_nearer {
643            true => {
644                // Are we still willing to accept this post?
645                let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
646                let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
647                let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
648
649                // If it is not yet sealed, update our metadata
650                if !post_bundle_metadata.sealed {
651                    post_bundle_metadata.num_posts_granted += 1;
652                    post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
653                    post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
654
655                    self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
656                }
657
658                // We grant the token if we are not yet sealed
659                match post_bundle_metadata.sealed {
660                    false => {
661                        info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
662                        Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
663                    }
664                    true => {
665                        info!(
666                            "Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
667                            request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
668                        );
669                        None
670                    }
671                }
672            }
673
674            false => None,
675        };
676
677        // If we are willing to accept this post, then we are willing to track the trendiness of its hashtags
678        if submit_post_claim_token.is_some() {
679            // Track referenced hashtags for trending (only for User bucket posts, where the hashtags originate)
680            if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
681                let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
682                for referenced_hashtag in &request.referenced_hashtags {
683                    let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
684                        Ok(id) => id,
685                        Err(_) => continue, // Skip invalid hashtags silently
686                    };
687                    if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
688                        continue; // Hashtag not backed by a linked_base_id (no PoW), skip it
689                    }
690                    let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_default();
691                    hll.insert(author_verification_key_bytes.as_ref());
692                    self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
693                }
694            }
695        }
696
697        let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
698        Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
699    }
700
701    #[allow(non_snake_case)]
702    async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
703        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
704
705        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
706
707        let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
708        trace!("received SubmitPostCommitV1");
709
710        let peer_self = self.peer_self.read(); // Remember alphabetical locking order!
711
712        // Is the submit_post_claim_token from us?
713        if request.submit_post_claim_token.peer.id != peer_self.id {
714            anyhow::bail!("The submit_post_claim_token is not from us");
715        }
716
717        // Check that the location_id still makes sense
718        request.bucket_location.validate()?;
719        if request.bucket_location != request.submit_post_claim_token.bucket_location {
720            anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
721        }
722
723        // Check that we can decode the post with the bucket's base_id as password
724        let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
725
726        // Check that the committed post matches what was claimed
727        if decoded_post.post_id != request.submit_post_claim_token.post_id {
728            anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
729        }
730
731        // Update the postbundle and metadata
732        let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
733        let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
734        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
735
736        // We should always have the metadata, but just in case!
737        let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
738
739        // Make a PostBundle if we dont have one on disk
740        let mut post_bundle = match post_bundle_bytes {
741            Some(bytes) => {
742                let bytes = Bytes::from_owner(bytes);
743                
744                EncodedPostBundleV1::from_bytes(bytes, true)?
745            }
746            None => {
747                let header = EncodedPostBundleHeaderV1 {
748                    time_millis: TimeMillis::zero(),
749                    location_id: request.bucket_location.location_id,
750                    overflowed: false,
751                    sealed: false,
752                    num_posts: 0,
753                    encoded_post_ids: vec![],
754                    encoded_post_lengths: vec![],
755                    encoded_post_healed: HashSet::new(),
756                    peer: self.peer_self.read().clone(),
757                    signature: Signature::zero(),
758                };
759
760                EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
761            }
762        };
763
764        // Sanity check that we don't already have this post
765        if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
766            anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
767        }
768
769        // The post bundle
770        post_bundle.header.time_millis = time_millis;
771        post_bundle.header.num_posts += 1;
772        post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
773        post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
774        post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
775        post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
776        post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
777        let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
778        posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
779        post_bundle.encoded_posts_bytes = posts_mut.freeze();
780        let post_bundle_bytes_new = post_bundle.to_bytes()?;
781
782        // The metadata
783        post_bundle_metadata.num_posts = post_bundle.header.num_posts;
784        post_bundle_metadata.overflowed = post_bundle.header.overflowed;
785        post_bundle_metadata.sealed = post_bundle.header.sealed;
786        post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
787
788        {
789            self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
790            self.environment
791                .put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
792        }
793
794        info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
795
796        let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
797
798        let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
799        Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
800    }
801
802    #[allow(non_snake_case)]
803    async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
804        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
805
806        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
807
808        let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
809        trace!("received SubmitPostFeedbackV1");
810
811        // Enforce minimum PoW for feedback submission
812        let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
813        if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
814            anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
815        }
816
817        // Check the feedback makes sense
818        request.encoded_post_feedback.pow_verify()?;
819
820        let location_id = request.bucket_location.location_id;
821
822        // Check our own records to see that we are close enough to store this post feedback
823        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
824
825        let accepted = (|| -> anyhow::Result<bool> {
826            if !among_peers_nearer {
827                return Ok(false);
828            }
829
830            // Do we recognise the post associated with this feedback
831            let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
832            let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
833            else {
834                return Ok(false);
835            };
836
837            let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
838            if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
839                return Ok(false);
840            }
841
842            Ok(true)
843        })()?;
844
845        // Update our environment with the feedback
846        if accepted {
847            trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
848            self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
849        }
850
851        let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
852        Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
853    }
854
855    #[allow(non_snake_case)]
856    async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
857        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
858
859        fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
860            let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
861            Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
862        }
863
864        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
865        let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
866        trace!("received HealPostBundleClaimV1");
867
868        // Verify the bucket_location is internally consistent and maps to the donor_header's location_id
869        request.bucket_location.validate()?;
870        if request.bucket_location.location_id != request.donor_header.location_id {
871            anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
872        }
873
874        // Verify the donor_header provided by the client is self-consistent and properly signed
875        request.donor_header.verify()?;
876
877        // Only heal if we are among the nearest peers for this location
878        let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
879        if !among_peers_nearer {
880            return generate_negatory_response();
881        }
882
883        // Reject if a heal for this location is already in progress (another client beat us to it)
884        if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
885            return generate_negatory_response();
886        }
887
888        // Load our current bundle (header only) to see what post_ids we already have
889        let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
890        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
891
892        let our_post_ids: HashSet<Id> = match post_bundle_bytes {
893            Some(bytes) => {
894                let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
895                bundle.header.encoded_post_ids.into_iter().collect()
896            }
897            None => HashSet::new(),
898        };
899
900        // Posts in the donor_header that we do not yet have, in canonical order
901        let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
902
903        if needed_post_ids.is_empty() {
904            return generate_negatory_response();
905        }
906
907        self.heal_in_progress.insert(request.donor_header.location_id, ());
908
909        let token = Some(HealPostBundleClaimTokenV1::new(
910            self.peer_self.read().clone(),
911            request.bucket_location,
912            needed_post_ids.clone(),
913            request.donor_header.signature,
914            &self.server_id.keys.signature_key,
915        ));
916        let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
917        Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
918    }
919
920    #[allow(non_snake_case)]
921    async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
922        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
923
924        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
925        let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
926        trace!("received HealPostBundleCommitV1");
927
928        // Verify the token was issued by this server
929        let peer_self = self.peer_self.read().clone();
930        if request.token.peer.id != peer_self.id {
931            anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
932        }
933        request.token.verify()?;
934
935        // Verify the donor_header matches what the token was issued for
936        if request.donor_header.signature != request.token.donor_header_signature {
937            anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
938        }
939        request.donor_header.verify()?;
940
941        if request.token.bucket_location.location_id != request.donor_header.location_id {
942            anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
943        }
944
945        let location_id = request.donor_header.location_id;
946
947        // Parse the post bytes supplied by the client (one entry per token.needed_post_id, in order)
948        let mut remaining_bytes = request.encoded_posts_bytes.clone();
949        let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
950        for post_id in &request.token.needed_post_ids {
951            let len = request
952                .donor_header
953                .encoded_post_ids
954                .iter()
955                .zip(request.donor_header.encoded_post_lengths.iter())
956                .find(|(id, _)| *id == post_id)
957                .map(|(_, len)| *len)
958                .ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
959            if remaining_bytes.len() < len {
960                anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
961            }
962            let post_bytes = remaining_bytes.split_to(len);
963            posts_to_add.push((*post_id, post_bytes));
964        }
965        if !remaining_bytes.is_empty() {
966            anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
967        }
968
969        // Validate each post decrypts successfully with the provided base_id
970        for (post_id, post_bytes) in &posts_to_add {
971            EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
972        }
973
974        // Load our current bundle (or start empty)
975        let _lock = self.environment.get_write_lock_for_location_id(&location_id);
976        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
977        let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
978        let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
979
980        let mut post_bundle = match post_bundle_bytes {
981            Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
982            None => EncodedPostBundleV1 {
983                header: EncodedPostBundleHeaderV1 {
984                    time_millis: TimeMillis::zero(),
985                    location_id,
986                    overflowed: request.donor_header.overflowed,
987                    sealed: request.donor_header.sealed,
988                    num_posts: 0,
989                    encoded_post_ids: vec![],
990                    encoded_post_lengths: vec![],
991                    encoded_post_healed: HashSet::new(),
992                    peer: peer_self.clone(),
993                    signature: Signature::zero(),
994                },
995                encoded_posts_bytes: Bytes::new(),
996            },
997        };
998
999        let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
1000
1001        let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
1002        let mut added_any = false;
1003        for (post_id, post_bytes) in posts_to_add {
1004            if !our_post_ids.contains(&post_id) {
1005                let len = post_bytes.len();
1006                posts_mut.extend_from_slice(&post_bytes);
1007                post_bundle.header.encoded_post_ids.push(post_id);
1008                post_bundle.header.encoded_post_lengths.push(len);
1009                post_bundle.header.encoded_post_healed.insert(post_id);
1010                added_any = true;
1011            }
1012        }
1013        post_bundle.encoded_posts_bytes = posts_mut.freeze();
1014
1015        if added_any {
1016            post_bundle.header.time_millis = time_millis;
1017            post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
1018            post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
1019
1020            let new_bytes = post_bundle.to_bytes()?;
1021            post_bundle_metadata.num_posts = post_bundle.header.num_posts;
1022            post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
1023
1024            self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
1025            self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
1026
1027            info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
1028        }
1029
1030        self.heal_in_progress.invalidate(&location_id);
1031
1032        let response = HealPostBundleCommitResponseV1 { accepted: added_any };
1033        Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1034    }
1035
1036    #[allow(non_snake_case)]
1037    async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1038        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
1039
1040        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1041        let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
1042        trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
1043
1044        // Only accept if we are among the nearest peers for this location
1045        let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
1046        if !among_peers_nearer {
1047            let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1048            return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1049        }
1050
1051        // Check we actually have a post bundle for this location (same guard as SubmitPostFeedbackV1)
1052        let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
1053        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
1054        let Some(post_bundle_bytes) = post_bundle_bytes
1055        else {
1056            let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1057            return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1058        };
1059        let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
1060
1061        let mut accepted_count: u32 = 0;
1062        for feedback in &request.encoded_post_feedbacks {
1063            // Only store feedback for posts we actually hold
1064            if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
1065                continue;
1066            }
1067            self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
1068            accepted_count += 1;
1069        }
1070
1071        if accepted_count > 0 {
1072            trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
1073        }
1074
1075        let response = HealPostBundleFeedbackResponseV1 { accepted_count };
1076        Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1077    }
1078
1079    #[allow(non_snake_case)]
1080    async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1081        anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
1082
1083        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1084        let request = CachePostBundleV1::from_bytes(&mut bytes)?;
1085        trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
1086
1087        // Verify token was issued by this server and has not expired
1088        let peer_self = self.peer_self.read().clone();
1089        if request.token.peer.id != peer_self.id {
1090            anyhow::bail!("CachePostBundleV1: token was not issued by this server");
1091        }
1092        request.token.verify()?;
1093        if request.token.is_expired(time_millis) {
1094            anyhow::bail!("CachePostBundleV1: token has expired");
1095        }
1096
1097        let mut any_accepted = false;
1098        for bundle_bytes in request.encoded_post_bundles {
1099            let parse_result: anyhow::Result<()> = try {
1100                let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
1101
1102                // Check that this proposed cache content is legitimate
1103                encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
1104
1105                let originator_peer_id = encoded_post_bundle.header.peer.id;
1106                let is_sealed = encoded_post_bundle.header.sealed;
1107                if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
1108                    any_accepted = true;
1109                }
1110            };
1111            if let Err(e) = &parse_result {
1112                warn!("CachePostBundleV1: failed to parse bundle: {}", e);
1113            }
1114        }
1115        let response = CachePostBundleResponseV1 { accepted: any_accepted };
1116        Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1117    }
1118
1119    #[allow(non_snake_case)]
1120    async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1121        anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
1122
1123        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1124        let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1125        trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
1126
1127        // Verify token was issued by this server and has not expired
1128        let peer_self = self.peer_self.read().clone();
1129        if request.token.peer.id != peer_self.id {
1130            anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
1131        }
1132        request.token.verify()?;
1133        if request.token.is_expired(time_millis) {
1134            anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
1135        }
1136
1137        let result: anyhow::Result<bool> = try {
1138            let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
1139
1140            encoded_post_bundle_feedback.verify()?;
1141
1142            let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
1143            // Feedback bundles have no sealed flag — treat as live (5-min TTL)
1144            self.post_bundle_feedback_cache
1145                .on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
1146        };
1147        let accepted = result.unwrap_or_else(|e| {
1148            warn!("CachePostBundleFeedbackV1: parse error: {}", e);
1149            false
1150        });
1151
1152        let response = CachePostBundleFeedbackResponseV1 { accepted };
1153        Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1154    }
1155
1156    #[allow(non_snake_case)]
1157    async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1158        anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
1159
1160        let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1161        trace!("received FetchUrlPreviewV1 for url={}", request.url);
1162
1163        // SSRF protection:
1164        // https:// only — blocks plaintext metadata endpoints and forces TLS cert validation.
1165        if !request.url.starts_with("https://") {
1166            anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
1167        }
1168
1169        // Extract host; reject bare IP literals (IPv4 and IPv6 bracket form).
1170        let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
1171        let host = if host_and_port.starts_with('[') {
1172            // IPv6 literal [addr] or [addr]:port
1173            host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
1174        } else {
1175            // hostname or IPv4 — strip optional port
1176            host_and_port.split(':').next().unwrap_or(host_and_port)
1177        };
1178        if host.is_empty() {
1179            anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
1180        }
1181        if host.parse::<std::net::IpAddr>().is_ok() {
1182            anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
1183        }
1184
1185        // Resolve once and validate every returned address.  Collecting into a Vec lets us
1186        // re-use the same addresses to pin the reqwest client, closing the DNS-rebinding window
1187        // (nip.io, metadata.google.internal, TTL=0 rebind, etc.).
1188        let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
1189            .await
1190            .map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
1191            .collect();
1192        if resolved_socket_addrs.is_empty() {
1193            anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
1194        }
1195        for socket_addr in &resolved_socket_addrs {
1196            let ip = socket_addr.ip();
1197            if is_ssrf_protected_ip(ip) {
1198                anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
1199            }
1200        }
1201
1202        // Build a client that:
1203        //   - resolve_to_addrs: skips re-resolution entirely — the validated IPs are used directly
1204        //   - redirect::none: prevents a server-side redirect to an unvalidated internal URL
1205        //   - no_proxy: ignores HTTP_PROXY / NO_PROXY env vars that could route to internal hosts
1206        //   - times out quickly in unreasonable scenarios
1207        //   - limits download size
1208        let http_client = reqwest::Client::builder()
1209            .connect_timeout(std::time::Duration::from_secs(1))
1210            .timeout(std::time::Duration::from_secs(3))
1211            .user_agent("hashiverse-preview/1.0")
1212            .resolve_to_addrs(host, &resolved_socket_addrs)
1213            .redirect(reqwest::redirect::Policy::none())
1214            .no_proxy()
1215            .build()?;
1216
1217        const URL_FETCH_MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
1218        let mut http_response = http_client.get(&request.url).send().await?;
1219
1220        // Reject early if Content-Length already exceeds the limit — before reading any body bytes.
1221        if let Some(content_length) = http_response.content_length() {
1222            if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
1223                anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
1224            }
1225        }
1226        let mut body_bytes = BytesMut::new();
1227        while let Some(chunk) = http_response.chunk().await? {
1228            // body_bytes.len() is already within the limit; only the new chunk can overflow.
1229            // We copy only as many bytes as fit rather than bailing, so a single large chunk
1230            // (which reqwest has already buffered) doesn't cause an error — we just truncate.
1231            // This is sufficient for HTML preview extraction which only needs the <head>.
1232            let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
1233            body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
1234            if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
1235                break;
1236            }
1237        }
1238        let html = String::from_utf8_lossy(&body_bytes).into_owned();
1239
1240        let preview_data = url_preview::extract_url_preview(&html);
1241
1242        let response = FetchUrlPreviewResponseV1 {
1243            url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
1244            title: preview_data.title,
1245            description: preview_data.description,
1246            image_url: preview_data.image_url,
1247        };
1248
1249        Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1250    }
1251
1252    #[allow(non_snake_case)]
1253    async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1254        anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
1255
1256        let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
1257        trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
1258
1259        let time_millis = self.runtime_services.time_provider.current_time_millis();
1260
1261        // Check if we have a cached response that is less than a few minutes old
1262        let cached_response = {
1263            let cache = self.trending_hashtags_response_cache.lock();
1264            match cache.as_ref() {
1265                Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
1266                    Some(cached_response.clone())
1267                }
1268                _ => None,
1269            }
1270        };
1271
1272        let mut response = match cached_response {
1273            Some(mut cached) => {
1274                cached.trending_hashtags.truncate(request.limit as usize);
1275                cached
1276            }
1277            None => {
1278                // Recalculate: iterate the trending_hashtags cache, sort by unique author count
1279                let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
1280                    .map(|(hashtag, hll)| TrendingHashtagV1 {
1281                        hashtag: hashtag.as_ref().clone(),
1282                        count: hll.count(),
1283                    })
1284                    .filter(|entry| entry.count > 0)
1285                    .collect();
1286
1287                trending_hashtags.sort_by_key(|entry| std::cmp::Reverse(entry.count));
1288
1289                let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
1290
1291                // Cache the full response (real trending only — fallbacks are applied per-response after truncation)
1292                {
1293                    let mut cache = self.trending_hashtags_response_cache.lock();
1294                    *cache = Some((time_millis, full_response.clone()));
1295                }
1296
1297                let mut truncated_response = full_response;
1298                truncated_response.trending_hashtags.truncate(request.limit as usize);
1299                truncated_response
1300            }
1301        };
1302
1303        top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
1304
1305        Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1306    }
1307
1308    #[allow(non_snake_case)]
1309    async fn dispatch_network_payload_x_PeerStatsRequestV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1310        anyhow_assert_eq!(&PayloadRequestKind::PeerStatsRequestV1, &payload_request_kind);
1311
1312        let _request = PeerStatsRequestV1::from_bytes(&bytes)?;
1313
1314        let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for PeerStatsRequestV1"))?;
1315        if pow.pow < config::POW_MINIMUM_PER_PEER_STATS {
1316            anyhow::bail!("Insufficient pow for PeerStatsRequestV1: {} < {}", pow.pow, config::POW_MINIMUM_PER_PEER_STATS);
1317        }
1318
1319        let time_millis = self.runtime_services.time_provider.current_time_millis();
1320
1321        // Cache check — hand back the same signed blob within the TTL so callers
1322        // re-sharing it operate on a single canonical byte sequence per minute.
1323        let cached_response = {
1324            let cache = self.peer_stats_response_cache.lock();
1325            match cache.as_ref() {
1326                Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(60) => Some(cached_response.clone()),
1327                _ => None,
1328            }
1329        };
1330
1331        let response = match cached_response {
1332            Some(cached) => cached,
1333            None => {
1334                let doc = serde_json::json!({
1335                    "version":     env!("CARGO_PKG_VERSION"),
1336                    "requests":    request_counts_subtree(&self.request_counters, &self.request_rate_windows, time_millis),
1337                    "system":      system_stats_subtree(),
1338                    "kademlia":    kademlia_stats_subtree(&self.kademlia.read()),
1339                    "environment": environment_stats_subtree(&self.environment),
1340                });
1341
1342                let json_bytes = serde_json::to_vec(&doc)?;
1343                let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
1344
1345                let signing_input = PeerStatsResponseV1::signing_input(time_millis, &json_compressed);
1346                let signature = signing::sign(&self.server_id.keys.signature_key, &signing_input);
1347
1348                let response = PeerStatsResponseV1 {
1349                    peer: self.peer_self.read().clone(),
1350                    timestamp: time_millis,
1351                    json_compressed,
1352                    signature,
1353                };
1354
1355                *self.peer_stats_response_cache.lock() = Some((time_millis, response.clone()));
1356                response
1357            }
1358        };
1359
1360        Ok((PayloadResponseKind::PeerStatsResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1361    }
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366    use super::*;
1367
1368    fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
1369        TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
1370    }
1371
1372    #[test]
1373    fn top_up_adds_fallback_when_empty() {
1374        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1375        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
1376        assert_eq!(trending_hashtags.len(), 2);
1377        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1378        assert_eq!(trending_hashtags[0].count, 0);
1379        assert_eq!(trending_hashtags[1].hashtag, "#news");
1380        assert_eq!(trending_hashtags[1].count, 0);
1381    }
1382
1383    #[test]
1384    fn top_up_respects_limit_smaller_than_fallback_list() {
1385        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1386        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
1387        assert_eq!(trending_hashtags.len(), 1);
1388        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1389    }
1390
1391    #[test]
1392    fn top_up_preserves_fallback_order() {
1393        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1394        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
1395        assert_eq!(trending_hashtags[0].hashtag, "#first");
1396        assert_eq!(trending_hashtags[1].hashtag, "#second");
1397        assert_eq!(trending_hashtags[2].hashtag, "#third");
1398    }
1399
1400    #[test]
1401    fn top_up_is_noop_when_already_at_limit() {
1402        let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
1403        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
1404        assert_eq!(trending_hashtags.len(), 2);
1405        assert_eq!(trending_hashtags[0].hashtag, "#rust");
1406        assert_eq!(trending_hashtags[1].hashtag, "#golang");
1407    }
1408
1409    #[test]
1410    fn top_up_partially_fills_when_real_trending_exists() {
1411        let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
1412        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1413        assert_eq!(trending_hashtags.len(), 3);
1414        assert_eq!(trending_hashtags[0].hashtag, "#rust");
1415        assert_eq!(trending_hashtags[0].count, 10);
1416        assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
1417        assert_eq!(trending_hashtags[1].count, 0);
1418        assert_eq!(trending_hashtags[2].hashtag, "#news");
1419        assert_eq!(trending_hashtags[2].count, 0);
1420    }
1421
1422    #[test]
1423    fn top_up_skips_fallback_already_present_exact_match() {
1424        let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
1425        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1426        assert_eq!(trending_hashtags.len(), 2);
1427        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1428        assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
1429        assert_eq!(trending_hashtags[1].hashtag, "#news");
1430        assert_eq!(trending_hashtags[1].count, 0);
1431    }
1432
1433    #[test]
1434    fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
1435        // Existing entry "HashiVerse" (no `#`, mixed case) should match fallback "#hashiverse"
1436        let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
1437        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1438        assert_eq!(trending_hashtags.len(), 2);
1439        assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
1440        assert_eq!(trending_hashtags[1].hashtag, "#news");
1441    }
1442
1443    #[test]
1444    fn top_up_with_empty_fallback_is_noop() {
1445        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1446        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
1447        assert_eq!(trending_hashtags.len(), 0);
1448    }
1449
1450    #[test]
1451    fn top_up_handles_zero_limit() {
1452        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1453        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
1454        assert_eq!(trending_hashtags.len(), 0);
1455    }
1456
1457    #[test]
1458    fn top_up_exhausts_fallback_without_reaching_limit() {
1459        // Limit is larger than what real trending + fallback together can satisfy
1460        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1461        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
1462        assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
1463    }
1464
1465    mod peer_stats {
1466        use super::*;
1467        use crate::environment::mem_environment_store::MemEnvironmentFactory;
1468        use crate::environment::environment::EnvironmentFactory;
1469        use crate::server::args::Args;
1470        use crate::server::hashiverse_server::HashiverseServer;
1471        use hashiverse_lib::protocol::payload::payload::{PAYLOAD_REQUEST_KIND_COUNT, PeerStatsRequestV1, PeerStatsResponseV1};
1472        use hashiverse_lib::protocol::peer::PeerPow;
1473        use hashiverse_lib::tools::compression;
1474        use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
1475        use hashiverse_lib::tools::runtime_services::RuntimeServices;
1476        use hashiverse_lib::tools::time::TimeMillis;
1477        use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
1478        use hashiverse_lib::tools::types::{Pow, VerificationKey};
1479        use hashiverse_lib::transport::mem_transport::MemTransportFactory;
1480        use std::sync::Arc;
1481        use std::sync::atomic::Ordering;
1482
1483        async fn make_server() -> anyhow::Result<Arc<HashiverseServer>> {
1484            let time_provider = Arc::new(RealTimeProvider);
1485            let transport_factory = MemTransportFactory::default();
1486            let pow_generator = Arc::new(SingleThreadedPowGenerator::new());
1487            let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
1488            let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
1489            let args = Args::default_for_testing();
1490            HashiverseServer::new(runtime_services, environment_factory, args).await
1491        }
1492
1493        /// Build a synthetic PeerPow with the requested pow value. The handler under
1494        /// test only checks the threshold against `config::POW_MINIMUM_PER_PEER_STATS`
1495        /// and does not re-verify the underlying PoW computation, so this is enough.
1496        fn synthetic_pow(pow: Pow) -> PeerPow {
1497            let mut peer_pow = PeerPow::zero();
1498            peer_pow.pow = pow;
1499            peer_pow
1500        }
1501
1502        fn empty_request_bytes() -> Bytes {
1503            PeerStatsRequestV1 {}.to_bytes().expect("PeerStatsRequestV1 must serialise")
1504        }
1505
1506        fn decode_doc(response: &PeerStatsResponseV1) -> serde_json::Value {
1507            let bytes = compression::decompress(&response.json_compressed).expect("decompress doc").to_bytes();
1508            serde_json::from_slice(&bytes).expect("doc must be valid JSON")
1509        }
1510
1511        #[tokio::test]
1512        async fn rejects_insufficient_pow() {
1513            let server = make_server().await.expect("server must start");
1514            let result = server
1515                .dispatch_network_payload_x_PeerStatsRequestV1(
1516                    CancellationToken::new(),
1517                    Some(synthetic_pow(Pow(config::POW_MINIMUM_PER_PEER_STATS.0.saturating_sub(1)))),
1518                    PayloadRequestKind::PeerStatsRequestV1,
1519                    empty_request_bytes(),
1520                )
1521                .await;
1522            assert!(result.is_err(), "expected insufficient PoW to be rejected");
1523        }
1524
1525        #[tokio::test]
1526        async fn returns_response_with_expected_top_level_keys() {
1527            let server = make_server().await.expect("server must start");
1528            let (response_kind, gatherer) = server
1529                .dispatch_network_payload_x_PeerStatsRequestV1(
1530                    CancellationToken::new(),
1531                    Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1532                    PayloadRequestKind::PeerStatsRequestV1,
1533                    empty_request_bytes(),
1534                )
1535                .await
1536                .expect("handler must succeed at threshold pow");
1537            assert_eq!(response_kind, PayloadResponseKind::PeerStatsResponseV1);
1538
1539            let response_bytes = gatherer.to_bytes();
1540            let response = PeerStatsResponseV1::from_bytes(&response_bytes).expect("response must decode");
1541            let doc = decode_doc(&response);
1542
1543            assert!(doc.get("version").and_then(|v| v.as_str()).map(|s| !s.is_empty()).unwrap_or(false), "version must be a non-empty string");
1544            assert_eq!(doc["version"].as_str().unwrap(), env!("CARGO_PKG_VERSION"));
1545            assert!(doc.get("requests").is_some(), "requests subtree missing");
1546            assert!(doc.get("system").is_some(), "system subtree missing");
1547            assert!(doc.get("kademlia").is_some(), "kademlia subtree missing");
1548            assert!(doc.get("environment").is_some(), "environment subtree missing");
1549
1550            for key in ["memory_total_bytes", "memory_free_bytes", "disk_total_bytes", "disk_free_bytes", "load_1m", "load_5m", "load_15m"] {
1551                assert!(doc["system"].get(key).map(|v| v.is_number()).unwrap_or(false), "system.{key} must be a number");
1552            }
1553            for key in ["post_bundle_count", "post_bundle_feedback_count", "post_bundle_total_bytes"] {
1554                assert!(doc["environment"].get(key).map(|v| v.is_number()).unwrap_or(false), "environment.{key} must be a number");
1555            }
1556        }
1557
1558        #[tokio::test]
1559        async fn counters_reflect_recorded_dispatches() {
1560            let server = make_server().await.expect("server must start");
1561
1562            // Simulate inbound PingV1 dispatches by bumping the same counter the
1563            // wrap_and_dispatch path bumps. This avoids the full RPC envelope dance
1564            // while exercising the read path used by the stats handler.
1565            for _ in 0..7 {
1566                server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(1, Ordering::Relaxed);
1567            }
1568
1569            let (_, gatherer) = server
1570                .dispatch_network_payload_x_PeerStatsRequestV1(
1571                    CancellationToken::new(),
1572                    Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1573                    PayloadRequestKind::PeerStatsRequestV1,
1574                    empty_request_bytes(),
1575                )
1576                .await
1577                .expect("handler must succeed");
1578            let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1579            let doc = decode_doc(&response);
1580            assert_eq!(doc["requests"]["PingV1"]["total"].as_u64(), Some(7));
1581        }
1582
1583        #[tokio::test]
1584        async fn cache_returns_byte_identical_response_within_ttl() {
1585            let server = make_server().await.expect("server must start");
1586            let pow = synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS);
1587
1588            let (_, gatherer_a) = server
1589                .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow.clone()), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1590                .await
1591                .expect("first call must succeed");
1592            let bytes_a = gatherer_a.to_bytes();
1593
1594            // Mutate a counter between calls; if the cache hands back a freshly
1595            // built response we'd see the new count, but the cached blob predates it.
1596            server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(99, Ordering::Relaxed);
1597
1598            let (_, gatherer_b) = server
1599                .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1600                .await
1601                .expect("second call must succeed");
1602            let bytes_b = gatherer_b.to_bytes();
1603
1604            assert_eq!(bytes_a, bytes_b, "cached response should be byte-identical across the TTL");
1605        }
1606
1607        #[tokio::test]
1608        async fn signature_verifies_and_fails_on_tamper() {
1609            let server = make_server().await.expect("server must start");
1610            let (_, gatherer) = server
1611                .dispatch_network_payload_x_PeerStatsRequestV1(
1612                    CancellationToken::new(),
1613                    Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1614                    PayloadRequestKind::PeerStatsRequestV1,
1615                    empty_request_bytes(),
1616                )
1617                .await
1618                .expect("handler must succeed");
1619            let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1620
1621            let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes).expect("verification key must decode");
1622            let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
1623            signing::verify(&verification_key, &response.signature, &signing_input).expect("signature must verify against transmitted bytes");
1624
1625            // Mutate one byte of the compressed JSON: verification must fail.
1626            let mut tampered = response.json_compressed.to_vec();
1627            let tamper_index = tampered.len() / 2;
1628            tampered[tamper_index] ^= 0xff;
1629            let tampered_signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &tampered);
1630            assert!(signing::verify(&verification_key, &response.signature, &tampered_signing_input).is_err(), "verification must fail when json_compressed is tampered");
1631
1632            // Mutate the timestamp: verification must also fail.
1633            let bumped_signing_input = PeerStatsResponseV1::signing_input(TimeMillis(response.timestamp.0 + 1), &response.json_compressed);
1634            assert!(signing::verify(&verification_key, &response.signature, &bumped_signing_input).is_err(), "verification must fail when timestamp is mutated");
1635        }
1636
1637        #[test]
1638        fn request_counts_subtree_covers_every_variant() {
1639            // Belt-and-braces guard for PAYLOAD_REQUEST_KIND_COUNT staying in lockstep
1640            // with the enum at the server-stats layer.
1641            let counters: [std::sync::atomic::AtomicU64; PAYLOAD_REQUEST_KIND_COUNT] = std::array::from_fn(|_| std::sync::atomic::AtomicU64::new(0));
1642            let windows: [parking_lot::Mutex<crate::server::stats::RequestRateWindows>; PAYLOAD_REQUEST_KIND_COUNT] =
1643                std::array::from_fn(|_| parking_lot::Mutex::new(crate::server::stats::RequestRateWindows::new()));
1644            let subtree = request_counts_subtree(&counters, &windows, TimeMillis::zero());
1645            let map = subtree.as_object().expect("request_counts subtree must be an object");
1646            assert_eq!(map.len(), PAYLOAD_REQUEST_KIND_COUNT);
1647        }
1648    }
1649
1650    mod announce_v2 {
1651        use super::*;
1652        use crate::environment::mem_environment_store::MemEnvironmentFactory;
1653        use crate::environment::environment::EnvironmentFactory;
1654        use crate::server::args::Args;
1655        use crate::server::hashiverse_server::HashiverseServer;
1656        use hashiverse_lib::protocol::payload::payload::AnnounceV2;
1657        use hashiverse_lib::protocol::peer::Peer;
1658        use hashiverse_lib::tools::pow_generator::pow_generator::PowGenerator;
1659        use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
1660        use hashiverse_lib::tools::runtime_services::RuntimeServices;
1661        use hashiverse_lib::tools::server_id::ServerId;
1662        use hashiverse_lib::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
1663        use hashiverse_lib::transport::mem_transport::MemTransportFactory;
1664        use std::sync::Arc;
1665
1666        async fn make_test_server() -> Arc<HashiverseServer> {
1667            let time_provider = Arc::new(RealTimeProvider);
1668            let transport_factory = MemTransportFactory::default();
1669            let pow_generator = Arc::new(SingleThreadedPowGenerator::new());
1670            let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
1671            let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
1672            let args = Args::default_for_testing();
1673            HashiverseServer::new(runtime_services, environment_factory, args).await.expect("server must start")
1674        }
1675
1676        /// Build a freshly-signed `Peer` to play the role of an inbound announcer. Uses a
1677        /// brand-new `ServerId` (separate identity from the receiving server) so the announce
1678        /// looks like it comes from a different node.
1679        async fn make_announcing_peer(time_provider: &dyn TimeProvider, pow_generator: &dyn PowGenerator, address: &str) -> Peer {
1680            let server_id = ServerId::new("own_pow", time_provider, config::SERVER_KEY_POW_MIN, true, pow_generator).await.expect("ServerId::new must succeed in tests (PoW reduced under cfg(test))");
1681            let mut peer = server_id.to_peer(time_provider).expect("to_peer must succeed");
1682            peer.address = address.to_string();
1683            peer.sign(time_provider, &server_id.keys.signature_key).expect("sign must succeed");
1684            peer
1685        }
1686
1687        #[tokio::test]
1688        async fn v2_admits_peer_when_proof_is_valid_for_mem_transport() {
1689            let server = make_test_server().await;
1690            let pow_generator = SingleThreadedPowGenerator::new();
1691            let announcing_peer = make_announcing_peer(server.runtime_services.time_provider.as_ref(), &pow_generator, "9999").await;
1692            let announcing_peer_id = announcing_peer.id;
1693
1694            let baseline_len = server.kademlia.read().len();
1695
1696            // Mem-transport's EmptyMarkerOwnershipProof accepts the empty proof — the announce should be admitted.
1697            let request_bytes = json::struct_to_bytes(&AnnounceV2 { peer_self: announcing_peer, proof_payload: vec![] }).expect("encode V2");
1698
1699            let (response_kind, _gatherer) = server
1700                .dispatch_network_payload_x_AnnounceV2(CancellationToken::new(), PayloadRequestKind::AnnounceV2, request_bytes)
1701                .await
1702                .expect("valid V2 must succeed on mem-transport");
1703
1704            assert_eq!(response_kind, PayloadResponseKind::AnnounceResponseV2);
1705            assert_eq!(server.kademlia.read().len(), baseline_len + 1, "announcer should now be in Kademlia");
1706
1707            // The peer is actually findable by id, not just that the count went up.
1708            let (peers, _) = server.kademlia.read().get_peers_for_key(&announcing_peer_id, 8);
1709            assert!(peers.iter().any(|p| p.id == announcing_peer_id), "announcing peer must be reachable through get_peers_for_key");
1710        }
1711
1712        #[tokio::test]
1713        async fn v2_bails_and_does_not_admit_when_proof_fails() {
1714            let server = make_test_server().await;
1715            let pow_generator = SingleThreadedPowGenerator::new();
1716            let announcing_peer = make_announcing_peer(server.runtime_services.time_provider.as_ref(), &pow_generator, "8888").await;
1717            let announcing_peer_id = announcing_peer.id;
1718
1719            let baseline_len = server.kademlia.read().len();
1720
1721            // Non-empty proof bytes on a mem-transport receiver — EmptyMarkerOwnershipProof::prove
1722            // returns false, so the handler should bail. This is the cross-transport-mismatch case
1723            // (an HTTPS-shaped postcard blob arriving at a mem-transport server).
1724            let request_bytes = json::struct_to_bytes(&AnnounceV2 { peer_self: announcing_peer, proof_payload: vec![1, 2, 3, 4] }).expect("encode V2");
1725
1726            let result = server
1727                .dispatch_network_payload_x_AnnounceV2(CancellationToken::new(), PayloadRequestKind::AnnounceV2, request_bytes)
1728                .await;
1729
1730            assert!(result.is_err(), "failed proof must bail so the dispatcher wraps it as ErrorResponseV1");
1731            assert_eq!(server.kademlia.read().len(), baseline_len, "no Kademlia mutation on failed proof");
1732
1733            // The peer is not findable by id.
1734            let (peers, _) = server.kademlia.read().get_peers_for_key(&announcing_peer_id, 8);
1735            assert!(!peers.iter().any(|p| p.id == announcing_peer_id), "rejected peer must not be reachable through get_peers_for_key");
1736        }
1737    }
1738}
1739