1use crate::environment::environment::PostBundleMetadata;
24use crate::server::hashiverse_server::HashiverseServer;
25use crate::tools::tools::is_ssrf_protected_ip;
26use bytes::{Bytes, BytesMut};
27use hashiverse_lib::anyhow_assert_eq;
28use hashiverse_lib::protocol::payload::payload::{
29 AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
30 GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
31 HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1,
32 SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
33};
34use hashiverse_lib::protocol::peer::PeerPow;
35use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
36use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
37use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
38use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
39use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
40use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
41use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
42use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
43use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
44use hashiverse_lib::tools::types::{Id, Signature};
45use hashiverse_lib::tools::{hashing, url_preview};
46use hashiverse_lib::tools::{compression, config, json, signing, BytesGatherer};
47use hashiverse_lib::transport::transport::IncomingRequest;
48use log::{info, trace, warn};
49use std::collections::HashSet;
50use std::sync::atomic::Ordering;
51
52use crate::server::stats::{environment_stats_subtree, kademlia_stats_subtree, request_counts_subtree, system_stats_subtree};
53use tokio::sync::mpsc;
54use tokio_util::sync::CancellationToken;
55
56const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
62
63fn normalise_hashtag(hashtag: &str) -> String {
66 let lowercased = hashtag.to_lowercase();
67 match lowercased.strip_prefix('#') {
68 Some(stripped) => stripped.to_string(),
69 None => lowercased,
70 }
71}
72
73fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
78 let target_length = limit as usize;
79 if trending_hashtags.len() >= target_length {
80 return;
81 }
82
83 let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
84 .map(|entry| normalise_hashtag(&entry.hashtag))
85 .collect();
86
87 for fallback_hashtag in fallback_hashtags {
88 if trending_hashtags.len() >= target_length {
89 break;
90 }
91 let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
92 if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
93 continue;
94 }
95 trending_hashtags.push(TrendingHashtagV1 {
96 hashtag: (*fallback_hashtag).to_string(),
97 count: 0,
98 });
99 existing_normalised_hashtags.insert(normalised_fallback_hashtag);
100 }
101}
102
103impl HashiverseServer {
104 pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
105 loop {
106 tokio::select! {
107 _ = cancellation_token.cancelled() => { break },
108
109 receipt = rx.recv() => {
110 match receipt {
111 Some(incoming) => {
112 let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
114 match result {
115 Ok(bytes) => {
116 let result = incoming.reply.send(bytes);
117 if result.is_err() { warn!("failed to send reply"); }
118 },
119 Err(e) => {
120 warn!("failed to process packet from {}: {}", incoming.caller_address, e);
121 incoming.report_bad_request();
122 drop(incoming.reply);
123 },
124 }
125 },
126 None => {
127 warn!("channel closed");
128 break;
129 }
130 }
131 }
132 }
133 }
134
135 Ok(())
136 }
137
138 async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
139 let caller_address = incoming.caller_address.as_str();
140 let current_time_millis = self.runtime_services.time_provider.current_time_millis();
141
142 let rpc_request_packet_rx = RpcRequestPacketRx::decode(¤t_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
144 self.request_counters[rpc_request_packet_rx.payload_request_kind.clone() as usize].fetch_add(1, Ordering::Relaxed);
150
151 {
153 if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
154 anyhow::bail!("replay detected: salt already seen");
155 }
156 self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
157 }
158
159 let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
161
162 let dispatch_result: anyhow::Result<BytesGatherer> = try {
163 let pow = match rpc_request_packet_rx.pow_server_known {
165 true => {
166 let (pow, improved_pow_current_day, improved_pow_current_month) = {
167 let peer_self = self.peer_self.read(); let pow = PeerPow::new(
169 rpc_request_packet_rx.pow_sponsor_id,
170 &peer_self.verification_key_bytes,
171 &peer_self.pq_commitment_bytes,
172 rpc_request_packet_rx.pow_timestamp,
173 rpc_request_packet_rx.pow_content_hash,
174 rpc_request_packet_rx.pow_salt,
175 )?;
176
177 let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
178 let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
179
180 (pow, improved_pow_current_day, improved_pow_current_month)
181 };
182
183 if improved_pow_current_day || improved_pow_current_month {
185 let mut peer_self = self.peer_self.write(); if improved_pow_current_day {
187 trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
188 peer_self.pow_current_day = pow.clone();
189 }
190 if improved_pow_current_month {
191 trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
192 peer_self.pow_current_month = pow.clone();
193 }
194
195 peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
196 }
197
198 Some(pow)
199 }
200
201 false => {
202 match rpc_request_packet_rx.payload_request_kind {
204 PayloadRequestKind::BootstrapV1 => {}
205 _ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
206 }
207
208 None
209 }
210 };
211
212 let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
214 let response_flags = match compress_response {
215 true => RpcResponsePacketTxFlags::COMPRESSED,
216 false => RpcResponsePacketTxFlags::empty(),
217 };
218
219 RpcResponsePacketTx::encode(
221 &self.server_id.keys.signature_key,
222 &self.server_id.keys.verification_key_bytes,
223 &self.server_id.keys.pq_commitment_bytes,
224 &self.server_id.sponsor_id,
225 &self.server_id.timestamp,
226 &self.server_id.hash,
227 &self.server_id.salt,
228 &pow_content_hash,
229 response_flags,
230 payload_response_kind,
231 payload,
232 )?
233 };
234
235 match dispatch_result {
236 Ok(results) => Ok(results),
237 Err(e) => {
238 warn!("failed to dispatch packet from {}: {}", caller_address, e);
239 incoming.report_bad_request();
240
241 let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
242 let response = ErrorResponseV1 { code: 0, message: e.to_string() };
243 let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
244
245 RpcResponsePacketTx::encode(
247 &self.server_id.keys.signature_key,
248 &self.server_id.keys.verification_key_bytes,
249 &self.server_id.keys.pq_commitment_bytes,
250 &self.server_id.sponsor_id,
251 &self.server_id.timestamp,
252 &self.server_id.hash,
253 &self.server_id.salt,
254 &pow_content_hash,
255 RpcResponsePacketTxFlags::COMPRESSED,
256 payload_response_kind,
257 payload,
258 )
259 }
260 }
261 }
262
263 async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
264 let compress_response = match rpc_request_packet_rx.payload_request_kind {
266 PayloadRequestKind::GetPostBundleV1 => false, PayloadRequestKind::CachePostBundleV1 => false, _ => true,
269 };
270
271 let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
272 PayloadRequestKind::ErrorV1 => {
273 anyhow::bail!("Received ErrorV1");
274 }
275 PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
276 PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
277 PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
278 PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
279 PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
280 PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
281 PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
282 PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
283 PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
284 PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
285 PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
286 PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
287 PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
288 PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
289 PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
290 PayloadRequestKind::PeerStatsRequestV1 => self.dispatch_network_payload_x_PeerStatsRequestV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
291 };
292
293 Ok((compress_response, payload_response_kind, payload))
294 }
295
296 #[allow(non_snake_case)]
297 async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
298 anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
299 let peer = self.peer_self.read().clone();
300 let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
301 Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
302 }
303
304 #[allow(non_snake_case)]
305 async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
306 anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
307 let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
308 let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
309 Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
310 }
311
312 #[allow(non_snake_case)]
313 async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
314 anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
315
316 let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
317 let peer = request.peer_self;
320 let peer_id = peer.id;
321
322 self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
324
325 let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
326
327 let json = json::struct_to_bytes(&AnnounceResponseV1 {
328 peer_self: self.peer_self.read().clone(),
329 peers_nearest,
330 })?;
331 Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
332 }
333
334 #[allow(non_snake_case)]
335 async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
336 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
337
338 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
339
340 let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
341 trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
342
343 request.bucket_location.validate()?;
345
346 {
348 for peer in request.peers_visited {
349 self.add_potential_peer_to_kademlia(peer, time_millis).await;
350 }
351 }
352
353 let peer_self = self.peer_self.read().clone();
354
355 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
357 if !among_peers_nearer {
358 warn!("I am not in peers_nearer {}", peer_self);
359 }
360
361 let post_bundle = match among_peers_nearer {
362 true => {
363 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
366
367 let mut encoded_post_bundle_bytes: Option<Bytes> = None;
368
369 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
371 if let Some(mut post_bundle_metadata) = post_bundle_metadata {
372 encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
373
374 if !post_bundle_metadata.sealed {
376 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
377 if sealed {
378 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
380 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
381 encoded_post_bundle.header.time_millis = time_millis;
382 encoded_post_bundle.header.sealed = true;
383 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
384 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
385 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
386 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
387 }
388
389 post_bundle_metadata.sealed = true;
391 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
392 }
393 else {
394 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
396 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
397 encoded_post_bundle.header.time_millis = time_millis;
398 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
399 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
400 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
401 }
402 }
403 }
404 };
405
406 if encoded_post_bundle_bytes.is_none() {
409 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
410
411 let mut header = EncodedPostBundleHeaderV1 {
412 time_millis,
413 location_id: request.bucket_location.location_id,
414 overflowed: false,
415 sealed,
416 num_posts: 0,
417 encoded_post_ids: vec![],
418 encoded_post_lengths: vec![],
419 encoded_post_healed: HashSet::new(),
420 peer: peer_self.clone(),
421 signature: Signature::zero(),
422 };
423 header.signature_generate(&self.server_id.keys.signature_key)?;
424
425 let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
426 encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
427 }
428
429 encoded_post_bundle_bytes
430 }
431 false => None,
432 };
433
434 let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
435
436 let get_post_bundle_response = GetPostBundleResponseV1 {
437 peers_nearer,
438 cache_request_token: cache_result.cache_request_token,
439 post_bundles_cached: cache_result.cached_items,
440 post_bundle,
441 };
442 Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
443 }
444
445 #[allow(non_snake_case)]
446 async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
447 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
448
449 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
450
451 let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
452 trace!("received GetPostBundleFeedbackV1");
453
454 {
456 for peer in request.peers_visited {
457 self.add_potential_peer_to_kademlia(peer, time_millis).await;
458 }
459 }
460
461 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
463
464 let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
465
466 if among_peers_nearer {
467 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
469 if post_bundle_metadata.is_some() {
470 post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
471 }
472 }
473
474 let peer_self = self.peer_self.read().clone();
476 let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
477 Some(feedbacks_bytes) => {
478
479 let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
480
481 let mut header = EncodedPostBundleFeedbackHeaderV1 {
482 time_millis,
483 location_id: request.bucket_location.location_id,
484 feedbacks_bytes_hash,
485 peer: peer_self.clone(),
486 signature: Signature::zero(),
487 };
488 header.signature_generate(&self.server_id.keys.signature_key);
489
490 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
491 header,
492 feedbacks_bytes,
493 };
494 Some(encoded_post_bundle_feedback.to_bytes()?)
495 }
496 None => None,
497 };
498
499 let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
500
501 let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
502 peers_nearer,
503 cache_request_token: cache_result.cache_request_token,
504 post_bundle_feedbacks_cached: cache_result.cached_items,
505 encoded_post_bundle_feedback,
506 };
507 Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
508 }
509
510 #[allow(non_snake_case)]
511 async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
512 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
513
514 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
515
516 let pow = match pow {
517 Some(pow) => pow,
518 None => anyhow::bail!("We need pow for a submit post claim"),
519 };
520
521 let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
522 trace!("received SubmitPostClaimV1");
523
524 request.bucket_location.validate()?;
526
527 let bucket_duration = {
529 let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
530 match bucket_duration {
531 Some(bucket_duration) => *bucket_duration,
532 None => anyhow::bail!("Unrecognised bucket duration provided"),
533 }
534 };
535
536 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
537
538 {
540 let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
541 if pow.pow < pow_minimum {
542 anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
543 }
544 }
545
546 {
548 let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
550 if timestamp != request.bucket_location.bucket_time_millis {
551 anyhow::bail!("The post timestamp does not match the bucket");
552 }
553 }
554
555 let client_id = decoded_post.header.client_id()?;
556
557 if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
559 anyhow::bail!("The base_id is not related to the post");
560 }
561
562 if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
564 anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
565 }
566
567 if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
570 let original_header_bytes = request.referenced_post_header_bytes
571 .ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
572
573 let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
576
577 if original_post.post_id != request.bucket_location.base_id {
579 anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
580 }
581
582 if request.bucket_location.bucket_type == BucketType::Sequel {
584 let original_client_id = original_post.header.client_id()?;
585 if original_client_id != client_id {
586 anyhow::bail!("Sequel post author does not match original post author");
587 }
588 }
589 }
590
591 {
593 let delta = (time_millis - decoded_post.header.time_millis).abs();
594 if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
595 anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
596 }
597 }
598
599 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
601
602 let submit_post_claim_token = match among_peers_nearer {
603 true => {
604 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
606 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
607 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
608
609 if !post_bundle_metadata.sealed {
611 post_bundle_metadata.num_posts_granted += 1;
612 post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
613 post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
614
615 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
616 }
617
618 match post_bundle_metadata.sealed {
620 false => {
621 info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
622 Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
623 }
624 true => {
625 info!(
626 "Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
627 request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
628 );
629 None
630 }
631 }
632 }
633
634 false => None,
635 };
636
637 if submit_post_claim_token.is_some() {
639 if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
641 let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
642 for referenced_hashtag in &request.referenced_hashtags {
643 let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
644 Ok(id) => id,
645 Err(_) => continue, };
647 if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
648 continue; }
650 let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_else(HyperLogLog::new);
651 hll.insert(author_verification_key_bytes.as_ref());
652 self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
653 }
654 }
655 }
656
657 let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
658 Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
659 }
660
661 #[allow(non_snake_case)]
662 async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
663 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
664
665 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
666
667 let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
668 trace!("received SubmitPostCommitV1");
669
670 let peer_self = self.peer_self.read(); if request.submit_post_claim_token.peer.id != peer_self.id {
674 anyhow::bail!("The submit_post_claim_token is not from us");
675 }
676
677 request.bucket_location.validate()?;
679 if request.bucket_location != request.submit_post_claim_token.bucket_location {
680 anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
681 }
682
683 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
685
686 if decoded_post.post_id != request.submit_post_claim_token.post_id {
688 anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
689 }
690
691 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
693 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
694 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
695
696 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
698
699 let mut post_bundle = match post_bundle_bytes {
701 Some(bytes) => {
702 let bytes = Bytes::from_owner(bytes);
703 let bundle = EncodedPostBundleV1::from_bytes(bytes, true)?;
704 bundle
705 }
706 None => {
707 let header = EncodedPostBundleHeaderV1 {
708 time_millis: TimeMillis::zero(),
709 location_id: request.bucket_location.location_id,
710 overflowed: false,
711 sealed: false,
712 num_posts: 0,
713 encoded_post_ids: vec![],
714 encoded_post_lengths: vec![],
715 encoded_post_healed: HashSet::new(),
716 peer: self.peer_self.read().clone(),
717 signature: Signature::zero(),
718 };
719
720 EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
721 }
722 };
723
724 if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
726 anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
727 }
728
729 post_bundle.header.time_millis = time_millis;
731 post_bundle.header.num_posts += 1;
732 post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
733 post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
734 post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
735 post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
736 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
737 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
738 posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
739 post_bundle.encoded_posts_bytes = posts_mut.freeze();
740 let post_bundle_bytes_new = post_bundle.to_bytes()?;
741
742 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
744 post_bundle_metadata.overflowed = post_bundle.header.overflowed;
745 post_bundle_metadata.sealed = post_bundle.header.sealed;
746 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
747
748 {
749 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
750 self.environment
751 .put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
752 }
753
754 info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
755
756 let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
757
758 let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
759 Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
760 }
761
762 #[allow(non_snake_case)]
763 async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
764 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
765
766 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
767
768 let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
769 trace!("received SubmitPostFeedbackV1");
770
771 let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
773 if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
774 anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
775 }
776
777 request.encoded_post_feedback.pow_verify()?;
779
780 let location_id = request.bucket_location.location_id;
781
782 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
784
785 let accepted = (|| -> anyhow::Result<bool> {
786 if !among_peers_nearer {
787 return Ok(false);
788 }
789
790 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
792 let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
793 else {
794 return Ok(false);
795 };
796
797 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
798 if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
799 return Ok(false);
800 }
801
802 Ok(true)
803 })()?;
804
805 if accepted {
807 trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
808 self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
809 }
810
811 let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
812 Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
813 }
814
815 #[allow(non_snake_case)]
816 async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
817 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
818
819 fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
820 let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
821 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
822 }
823
824 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
825 let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
826 trace!("received HealPostBundleClaimV1");
827
828 request.bucket_location.validate()?;
830 if request.bucket_location.location_id != request.donor_header.location_id {
831 anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
832 }
833
834 request.donor_header.verify()?;
836
837 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
839 if !among_peers_nearer {
840 return generate_negatory_response();
841 }
842
843 if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
845 return generate_negatory_response();
846 }
847
848 let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
850 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
851
852 let our_post_ids: HashSet<Id> = match post_bundle_bytes {
853 Some(bytes) => {
854 let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
855 bundle.header.encoded_post_ids.into_iter().collect()
856 }
857 None => HashSet::new(),
858 };
859
860 let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
862
863 if needed_post_ids.is_empty() {
864 return generate_negatory_response();
865 }
866
867 self.heal_in_progress.insert(request.donor_header.location_id, ());
868
869 let token = Some(HealPostBundleClaimTokenV1::new(
870 self.peer_self.read().clone(),
871 request.bucket_location,
872 needed_post_ids.clone(),
873 request.donor_header.signature,
874 &self.server_id.keys.signature_key,
875 ));
876 let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
877 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
878 }
879
880 #[allow(non_snake_case)]
881 async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
882 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
883
884 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
885 let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
886 trace!("received HealPostBundleCommitV1");
887
888 let peer_self = self.peer_self.read().clone();
890 if request.token.peer.id != peer_self.id {
891 anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
892 }
893 request.token.verify()?;
894
895 if request.donor_header.signature != request.token.donor_header_signature {
897 anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
898 }
899 request.donor_header.verify()?;
900
901 if request.token.bucket_location.location_id != request.donor_header.location_id {
902 anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
903 }
904
905 let location_id = request.donor_header.location_id;
906
907 let mut remaining_bytes = request.encoded_posts_bytes.clone();
909 let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
910 for post_id in &request.token.needed_post_ids {
911 let len = request
912 .donor_header
913 .encoded_post_ids
914 .iter()
915 .zip(request.donor_header.encoded_post_lengths.iter())
916 .find(|(id, _)| *id == post_id)
917 .map(|(_, len)| *len)
918 .ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
919 if remaining_bytes.len() < len {
920 anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
921 }
922 let post_bytes = remaining_bytes.split_to(len);
923 posts_to_add.push((*post_id, post_bytes));
924 }
925 if !remaining_bytes.is_empty() {
926 anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
927 }
928
929 for (post_id, post_bytes) in &posts_to_add {
931 EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
932 }
933
934 let _lock = self.environment.get_write_lock_for_location_id(&location_id);
936 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
937 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
938 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
939
940 let mut post_bundle = match post_bundle_bytes {
941 Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
942 None => EncodedPostBundleV1 {
943 header: EncodedPostBundleHeaderV1 {
944 time_millis: TimeMillis::zero(),
945 location_id,
946 overflowed: request.donor_header.overflowed,
947 sealed: request.donor_header.sealed,
948 num_posts: 0,
949 encoded_post_ids: vec![],
950 encoded_post_lengths: vec![],
951 encoded_post_healed: HashSet::new(),
952 peer: peer_self.clone(),
953 signature: Signature::zero(),
954 },
955 encoded_posts_bytes: Bytes::new(),
956 },
957 };
958
959 let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
960
961 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
962 let mut added_any = false;
963 for (post_id, post_bytes) in posts_to_add {
964 if !our_post_ids.contains(&post_id) {
965 let len = post_bytes.len();
966 posts_mut.extend_from_slice(&post_bytes);
967 post_bundle.header.encoded_post_ids.push(post_id);
968 post_bundle.header.encoded_post_lengths.push(len);
969 post_bundle.header.encoded_post_healed.insert(post_id);
970 added_any = true;
971 }
972 }
973 post_bundle.encoded_posts_bytes = posts_mut.freeze();
974
975 if added_any {
976 post_bundle.header.time_millis = time_millis;
977 post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
978 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
979
980 let new_bytes = post_bundle.to_bytes()?;
981 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
982 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
983
984 self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
985 self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
986
987 info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
988 }
989
990 self.heal_in_progress.invalidate(&location_id);
991
992 let response = HealPostBundleCommitResponseV1 { accepted: added_any };
993 Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
994 }
995
996 #[allow(non_snake_case)]
997 async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
998 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
999
1000 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1001 let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
1002 trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
1003
1004 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
1006 if !among_peers_nearer {
1007 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1008 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1009 }
1010
1011 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
1013 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
1014 let Some(post_bundle_bytes) = post_bundle_bytes
1015 else {
1016 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1017 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1018 };
1019 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
1020
1021 let mut accepted_count: u32 = 0;
1022 for feedback in &request.encoded_post_feedbacks {
1023 if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
1025 continue;
1026 }
1027 self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
1028 accepted_count += 1;
1029 }
1030
1031 if accepted_count > 0 {
1032 trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
1033 }
1034
1035 let response = HealPostBundleFeedbackResponseV1 { accepted_count };
1036 Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1037 }
1038
1039 #[allow(non_snake_case)]
1040 async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1041 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
1042
1043 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1044 let request = CachePostBundleV1::from_bytes(&mut bytes)?;
1045 trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
1046
1047 let peer_self = self.peer_self.read().clone();
1049 if request.token.peer.id != peer_self.id {
1050 anyhow::bail!("CachePostBundleV1: token was not issued by this server");
1051 }
1052 request.token.verify()?;
1053 if request.token.is_expired(time_millis) {
1054 anyhow::bail!("CachePostBundleV1: token has expired");
1055 }
1056
1057 let mut any_accepted = false;
1058 for bundle_bytes in request.encoded_post_bundles {
1059 let parse_result: anyhow::Result<()> = try {
1060 let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
1061
1062 encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
1064
1065 let originator_peer_id = encoded_post_bundle.header.peer.id;
1066 let is_sealed = encoded_post_bundle.header.sealed;
1067 if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
1068 any_accepted = true;
1069 }
1070 };
1071 if let Err(e) = &parse_result {
1072 warn!("CachePostBundleV1: failed to parse bundle: {}", e);
1073 }
1074 }
1075 let response = CachePostBundleResponseV1 { accepted: any_accepted };
1076 Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1077 }
1078
1079 #[allow(non_snake_case)]
1080 async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1081 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
1082
1083 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1084 let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1085 trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
1086
1087 let peer_self = self.peer_self.read().clone();
1089 if request.token.peer.id != peer_self.id {
1090 anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
1091 }
1092 request.token.verify()?;
1093 if request.token.is_expired(time_millis) {
1094 anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
1095 }
1096
1097 let result: anyhow::Result<bool> = try {
1098 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
1099
1100 encoded_post_bundle_feedback.verify()?;
1101
1102 let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
1103 self.post_bundle_feedback_cache
1105 .on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
1106 };
1107 let accepted = result.unwrap_or_else(|e| {
1108 warn!("CachePostBundleFeedbackV1: parse error: {}", e);
1109 false
1110 });
1111
1112 let response = CachePostBundleFeedbackResponseV1 { accepted };
1113 Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1114 }
1115
1116 #[allow(non_snake_case)]
1117 async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1118 anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
1119
1120 let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1121 trace!("received FetchUrlPreviewV1 for url={}", request.url);
1122
1123 if !request.url.starts_with("https://") {
1126 anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
1127 }
1128
1129 let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
1131 let host = if host_and_port.starts_with('[') {
1132 host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
1134 } else {
1135 host_and_port.split(':').next().unwrap_or(host_and_port)
1137 };
1138 if host.is_empty() {
1139 anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
1140 }
1141 if host.parse::<std::net::IpAddr>().is_ok() {
1142 anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
1143 }
1144
1145 let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
1149 .await
1150 .map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
1151 .collect();
1152 if resolved_socket_addrs.is_empty() {
1153 anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
1154 }
1155 for socket_addr in &resolved_socket_addrs {
1156 let ip = socket_addr.ip();
1157 if is_ssrf_protected_ip(ip) {
1158 anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
1159 }
1160 }
1161
1162 let http_client = reqwest::Client::builder()
1169 .connect_timeout(std::time::Duration::from_secs(1))
1170 .timeout(std::time::Duration::from_secs(3))
1171 .user_agent("hashiverse-preview/1.0")
1172 .resolve_to_addrs(host, &resolved_socket_addrs)
1173 .redirect(reqwest::redirect::Policy::none())
1174 .no_proxy()
1175 .build()?;
1176
1177 const URL_FETCH_MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
1178 let mut http_response = http_client.get(&request.url).send().await?;
1179
1180 if let Some(content_length) = http_response.content_length() {
1182 if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
1183 anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
1184 }
1185 }
1186 let mut body_bytes = BytesMut::new();
1187 while let Some(chunk) = http_response.chunk().await? {
1188 let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
1193 body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
1194 if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
1195 break;
1196 }
1197 }
1198 let html = String::from_utf8_lossy(&body_bytes).into_owned();
1199
1200 let preview_data = url_preview::extract_url_preview(&html);
1201
1202 let response = FetchUrlPreviewResponseV1 {
1203 url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
1204 title: preview_data.title,
1205 description: preview_data.description,
1206 image_url: preview_data.image_url,
1207 };
1208
1209 Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1210 }
1211
1212 #[allow(non_snake_case)]
1213 async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1214 anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
1215
1216 let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
1217 trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
1218
1219 let time_millis = self.runtime_services.time_provider.current_time_millis();
1220
1221 let cached_response = {
1223 let cache = self.trending_hashtags_response_cache.lock();
1224 match cache.as_ref() {
1225 Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
1226 Some(cached_response.clone())
1227 }
1228 _ => None,
1229 }
1230 };
1231
1232 let mut response = match cached_response {
1233 Some(mut cached) => {
1234 cached.trending_hashtags.truncate(request.limit as usize);
1235 cached
1236 }
1237 None => {
1238 let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
1240 .map(|(hashtag, hll)| TrendingHashtagV1 {
1241 hashtag: hashtag.as_ref().clone(),
1242 count: hll.count(),
1243 })
1244 .filter(|entry| entry.count > 0)
1245 .collect();
1246
1247 trending_hashtags.sort_by(|a, b| b.count.cmp(&a.count));
1248
1249 let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
1250
1251 {
1253 let mut cache = self.trending_hashtags_response_cache.lock();
1254 *cache = Some((time_millis, full_response.clone()));
1255 }
1256
1257 let mut truncated_response = full_response;
1258 truncated_response.trending_hashtags.truncate(request.limit as usize);
1259 truncated_response
1260 }
1261 };
1262
1263 top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
1264
1265 Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1266 }
1267
1268 #[allow(non_snake_case)]
1269 async fn dispatch_network_payload_x_PeerStatsRequestV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1270 anyhow_assert_eq!(&PayloadRequestKind::PeerStatsRequestV1, &payload_request_kind);
1271
1272 let _request = PeerStatsRequestV1::from_bytes(&bytes)?;
1273
1274 let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for PeerStatsRequestV1"))?;
1275 if pow.pow < config::POW_MINIMUM_PER_PEER_STATS {
1276 anyhow::bail!("Insufficient pow for PeerStatsRequestV1: {} < {}", pow.pow, config::POW_MINIMUM_PER_PEER_STATS);
1277 }
1278
1279 let time_millis = self.runtime_services.time_provider.current_time_millis();
1280
1281 let cached_response = {
1284 let cache = self.peer_stats_response_cache.lock();
1285 match cache.as_ref() {
1286 Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(60) => Some(cached_response.clone()),
1287 _ => None,
1288 }
1289 };
1290
1291 let response = match cached_response {
1292 Some(cached) => cached,
1293 None => {
1294 let doc = serde_json::json!({
1295 "version": env!("CARGO_PKG_VERSION"),
1296 "requests": request_counts_subtree(&self.request_counters),
1297 "system": system_stats_subtree(),
1298 "kademlia": kademlia_stats_subtree(&self.kademlia.read()),
1299 "environment": environment_stats_subtree(&self.environment),
1300 });
1301
1302 let json_bytes = serde_json::to_vec(&doc)?;
1303 let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
1304
1305 let signing_input = PeerStatsResponseV1::signing_input(time_millis, &json_compressed);
1306 let signature = signing::sign(&self.server_id.keys.signature_key, &signing_input);
1307
1308 let response = PeerStatsResponseV1 {
1309 peer: self.peer_self.read().clone(),
1310 timestamp: time_millis,
1311 json_compressed,
1312 signature,
1313 };
1314
1315 *self.peer_stats_response_cache.lock() = Some((time_millis, response.clone()));
1316 response
1317 }
1318 };
1319
1320 Ok((PayloadResponseKind::PeerStatsResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1321 }
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326 use super::*;
1327
1328 fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
1329 TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
1330 }
1331
1332 #[test]
1333 fn top_up_adds_fallback_when_empty() {
1334 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1335 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
1336 assert_eq!(trending_hashtags.len(), 2);
1337 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1338 assert_eq!(trending_hashtags[0].count, 0);
1339 assert_eq!(trending_hashtags[1].hashtag, "#news");
1340 assert_eq!(trending_hashtags[1].count, 0);
1341 }
1342
1343 #[test]
1344 fn top_up_respects_limit_smaller_than_fallback_list() {
1345 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1346 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
1347 assert_eq!(trending_hashtags.len(), 1);
1348 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1349 }
1350
1351 #[test]
1352 fn top_up_preserves_fallback_order() {
1353 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1354 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
1355 assert_eq!(trending_hashtags[0].hashtag, "#first");
1356 assert_eq!(trending_hashtags[1].hashtag, "#second");
1357 assert_eq!(trending_hashtags[2].hashtag, "#third");
1358 }
1359
1360 #[test]
1361 fn top_up_is_noop_when_already_at_limit() {
1362 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
1363 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
1364 assert_eq!(trending_hashtags.len(), 2);
1365 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1366 assert_eq!(trending_hashtags[1].hashtag, "#golang");
1367 }
1368
1369 #[test]
1370 fn top_up_partially_fills_when_real_trending_exists() {
1371 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
1372 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1373 assert_eq!(trending_hashtags.len(), 3);
1374 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1375 assert_eq!(trending_hashtags[0].count, 10);
1376 assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
1377 assert_eq!(trending_hashtags[1].count, 0);
1378 assert_eq!(trending_hashtags[2].hashtag, "#news");
1379 assert_eq!(trending_hashtags[2].count, 0);
1380 }
1381
1382 #[test]
1383 fn top_up_skips_fallback_already_present_exact_match() {
1384 let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
1385 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1386 assert_eq!(trending_hashtags.len(), 2);
1387 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1388 assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
1389 assert_eq!(trending_hashtags[1].hashtag, "#news");
1390 assert_eq!(trending_hashtags[1].count, 0);
1391 }
1392
1393 #[test]
1394 fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
1395 let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
1397 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1398 assert_eq!(trending_hashtags.len(), 2);
1399 assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
1400 assert_eq!(trending_hashtags[1].hashtag, "#news");
1401 }
1402
1403 #[test]
1404 fn top_up_with_empty_fallback_is_noop() {
1405 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1406 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
1407 assert_eq!(trending_hashtags.len(), 0);
1408 }
1409
1410 #[test]
1411 fn top_up_handles_zero_limit() {
1412 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1413 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
1414 assert_eq!(trending_hashtags.len(), 0);
1415 }
1416
1417 #[test]
1418 fn top_up_exhausts_fallback_without_reaching_limit() {
1419 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1421 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
1422 assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
1423 }
1424
1425 mod peer_stats {
1426 use super::*;
1427 use crate::environment::mem_environment_store::MemEnvironmentFactory;
1428 use crate::environment::environment::EnvironmentFactory;
1429 use crate::server::args::Args;
1430 use crate::server::hashiverse_server::HashiverseServer;
1431 use hashiverse_lib::protocol::payload::payload::{PAYLOAD_REQUEST_KIND_COUNT, PeerStatsRequestV1, PeerStatsResponseV1};
1432 use hashiverse_lib::protocol::peer::PeerPow;
1433 use hashiverse_lib::tools::compression;
1434 use hashiverse_lib::tools::parallel_pow_generator::StubParallelPowGenerator;
1435 use hashiverse_lib::tools::runtime_services::RuntimeServices;
1436 use hashiverse_lib::tools::time::TimeMillis;
1437 use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
1438 use hashiverse_lib::tools::types::{Pow, VerificationKey};
1439 use hashiverse_lib::transport::mem_transport::MemTransportFactory;
1440 use std::sync::Arc;
1441 use std::sync::atomic::Ordering;
1442
1443 async fn make_server() -> anyhow::Result<Arc<HashiverseServer>> {
1444 let time_provider = Arc::new(RealTimeProvider::default());
1445 let transport_factory = MemTransportFactory::default();
1446 let pow_generator = Arc::new(StubParallelPowGenerator::new());
1447 let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
1448 let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
1449 let args = Args::default_for_testing();
1450 HashiverseServer::new(runtime_services, environment_factory, args).await
1451 }
1452
1453 fn synthetic_pow(pow: Pow) -> PeerPow {
1457 let mut peer_pow = PeerPow::zero();
1458 peer_pow.pow = pow;
1459 peer_pow
1460 }
1461
1462 fn empty_request_bytes() -> Bytes {
1463 PeerStatsRequestV1 {}.to_bytes().expect("PeerStatsRequestV1 must serialise")
1464 }
1465
1466 fn decode_doc(response: &PeerStatsResponseV1) -> serde_json::Value {
1467 let bytes = compression::decompress(&response.json_compressed).expect("decompress doc").to_bytes();
1468 serde_json::from_slice(&bytes).expect("doc must be valid JSON")
1469 }
1470
1471 #[tokio::test]
1472 async fn rejects_insufficient_pow() {
1473 let server = make_server().await.expect("server must start");
1474 let result = server
1475 .dispatch_network_payload_x_PeerStatsRequestV1(
1476 CancellationToken::new(),
1477 Some(synthetic_pow(Pow(config::POW_MINIMUM_PER_PEER_STATS.0.saturating_sub(1)))),
1478 PayloadRequestKind::PeerStatsRequestV1,
1479 empty_request_bytes(),
1480 )
1481 .await;
1482 assert!(result.is_err(), "expected insufficient PoW to be rejected");
1483 }
1484
1485 #[tokio::test]
1486 async fn returns_response_with_expected_top_level_keys() {
1487 let server = make_server().await.expect("server must start");
1488 let (response_kind, gatherer) = server
1489 .dispatch_network_payload_x_PeerStatsRequestV1(
1490 CancellationToken::new(),
1491 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1492 PayloadRequestKind::PeerStatsRequestV1,
1493 empty_request_bytes(),
1494 )
1495 .await
1496 .expect("handler must succeed at threshold pow");
1497 assert_eq!(response_kind, PayloadResponseKind::PeerStatsResponseV1);
1498
1499 let response_bytes = gatherer.to_bytes();
1500 let response = PeerStatsResponseV1::from_bytes(&response_bytes).expect("response must decode");
1501 let doc = decode_doc(&response);
1502
1503 assert!(doc.get("version").and_then(|v| v.as_str()).map(|s| !s.is_empty()).unwrap_or(false), "version must be a non-empty string");
1504 assert_eq!(doc["version"].as_str().unwrap(), env!("CARGO_PKG_VERSION"));
1505 assert!(doc.get("requests").is_some(), "requests subtree missing");
1506 assert!(doc.get("system").is_some(), "system subtree missing");
1507 assert!(doc.get("kademlia").is_some(), "kademlia subtree missing");
1508 assert!(doc.get("environment").is_some(), "environment subtree missing");
1509
1510 for key in ["memory_total_bytes", "memory_free_bytes", "disk_total_bytes", "disk_free_bytes", "load_1m", "load_5m", "load_15m"] {
1511 assert!(doc["system"].get(key).map(|v| v.is_number()).unwrap_or(false), "system.{key} must be a number");
1512 }
1513 for key in ["post_bundle_count", "post_bundle_feedback_count", "post_bundle_total_bytes"] {
1514 assert!(doc["environment"].get(key).map(|v| v.is_number()).unwrap_or(false), "environment.{key} must be a number");
1515 }
1516 }
1517
1518 #[tokio::test]
1519 async fn counters_reflect_recorded_dispatches() {
1520 let server = make_server().await.expect("server must start");
1521
1522 for _ in 0..7 {
1526 server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(1, Ordering::Relaxed);
1527 }
1528
1529 let (_, gatherer) = server
1530 .dispatch_network_payload_x_PeerStatsRequestV1(
1531 CancellationToken::new(),
1532 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1533 PayloadRequestKind::PeerStatsRequestV1,
1534 empty_request_bytes(),
1535 )
1536 .await
1537 .expect("handler must succeed");
1538 let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1539 let doc = decode_doc(&response);
1540 assert_eq!(doc["requests"]["PingV1"].as_u64(), Some(7));
1541 }
1542
1543 #[tokio::test]
1544 async fn cache_returns_byte_identical_response_within_ttl() {
1545 let server = make_server().await.expect("server must start");
1546 let pow = synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS);
1547
1548 let (_, gatherer_a) = server
1549 .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow.clone()), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1550 .await
1551 .expect("first call must succeed");
1552 let bytes_a = gatherer_a.to_bytes();
1553
1554 server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(99, Ordering::Relaxed);
1557
1558 let (_, gatherer_b) = server
1559 .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1560 .await
1561 .expect("second call must succeed");
1562 let bytes_b = gatherer_b.to_bytes();
1563
1564 assert_eq!(bytes_a, bytes_b, "cached response should be byte-identical across the TTL");
1565 }
1566
1567 #[tokio::test]
1568 async fn signature_verifies_and_fails_on_tamper() {
1569 let server = make_server().await.expect("server must start");
1570 let (_, gatherer) = server
1571 .dispatch_network_payload_x_PeerStatsRequestV1(
1572 CancellationToken::new(),
1573 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1574 PayloadRequestKind::PeerStatsRequestV1,
1575 empty_request_bytes(),
1576 )
1577 .await
1578 .expect("handler must succeed");
1579 let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1580
1581 let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes).expect("verification key must decode");
1582 let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
1583 signing::verify(&verification_key, &response.signature, &signing_input).expect("signature must verify against transmitted bytes");
1584
1585 let mut tampered = response.json_compressed.to_vec();
1587 let tamper_index = tampered.len() / 2;
1588 tampered[tamper_index] ^= 0xff;
1589 let tampered_signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &tampered);
1590 assert!(signing::verify(&verification_key, &response.signature, &tampered_signing_input).is_err(), "verification must fail when json_compressed is tampered");
1591
1592 let bumped_signing_input = PeerStatsResponseV1::signing_input(TimeMillis(response.timestamp.0 + 1), &response.json_compressed);
1594 assert!(signing::verify(&verification_key, &response.signature, &bumped_signing_input).is_err(), "verification must fail when timestamp is mutated");
1595 }
1596
1597 #[test]
1598 fn request_counts_subtree_covers_every_variant() {
1599 let counters: [std::sync::atomic::AtomicU64; PAYLOAD_REQUEST_KIND_COUNT] = std::array::from_fn(|_| std::sync::atomic::AtomicU64::new(0));
1602 let subtree = request_counts_subtree(&counters);
1603 let map = subtree.as_object().expect("request_counts subtree must be an object");
1604 assert_eq!(map.len(), PAYLOAD_REQUEST_KIND_COUNT);
1605 }
1606 }
1607}
1608