1use crate::environment::environment::PostBundleMetadata;
24use crate::server::hashiverse_server::HashiverseServer;
25use crate::tools::tools::is_ssrf_protected_ip;
26use bytes::{Bytes, BytesMut};
27use hashiverse_lib::anyhow_assert_eq;
28use hashiverse_lib::protocol::payload::payload::{
29 AnnounceResponseV1, AnnounceResponseV2, AnnounceV1, AnnounceV2, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
30 GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
31 HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1,
32 SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
33};
34use hashiverse_lib::protocol::peer::PeerPow;
35use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
36use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
37use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
38use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
39use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
40use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
41use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
42use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
43use hashiverse_lib::tools::types::{Id, Signature};
44use hashiverse_lib::tools::{hashing, url_preview};
45use hashiverse_lib::tools::{compression, config, json, signing, BytesGatherer};
46use hashiverse_lib::transport::transport::IncomingRequest;
47use log::{info, trace, warn};
48use std::collections::HashSet;
49use std::sync::atomic::Ordering;
50
51use crate::server::stats::{environment_stats_subtree, kademlia_stats_subtree, request_counts_subtree, system_stats_subtree};
52use tokio::sync::mpsc;
53use tokio_util::sync::CancellationToken;
54
55const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
61
62fn normalise_hashtag(hashtag: &str) -> String {
65 let lowercased = hashtag.to_lowercase();
66 match lowercased.strip_prefix('#') {
67 Some(stripped) => stripped.to_string(),
68 None => lowercased,
69 }
70}
71
72fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
77 let target_length = limit as usize;
78 if trending_hashtags.len() >= target_length {
79 return;
80 }
81
82 let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
83 .map(|entry| normalise_hashtag(&entry.hashtag))
84 .collect();
85
86 for fallback_hashtag in fallback_hashtags {
87 if trending_hashtags.len() >= target_length {
88 break;
89 }
90 let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
91 if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
92 continue;
93 }
94 trending_hashtags.push(TrendingHashtagV1 {
95 hashtag: (*fallback_hashtag).to_string(),
96 count: 0,
97 });
98 existing_normalised_hashtags.insert(normalised_fallback_hashtag);
99 }
100}
101
102impl HashiverseServer {
103 pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
104 loop {
105 tokio::select! {
106 _ = cancellation_token.cancelled() => { break },
107
108 receipt = rx.recv() => {
109 match receipt {
110 Some(incoming) => {
111 let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
113 match result {
114 Ok(bytes) => {
115 let result = incoming.reply.send(bytes);
116 if result.is_err() { warn!("failed to send reply"); }
117 },
118 Err(e) => {
119 warn!("failed to process packet from {}: {}", incoming.caller_address, e);
120 incoming.report_bad_request();
121 drop(incoming.reply);
122 },
123 }
124 },
125 None => {
126 warn!("channel closed");
127 break;
128 }
129 }
130 }
131 }
132 }
133
134 Ok(())
135 }
136
137 async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
138 let caller_address = incoming.caller_address.as_str();
139 let current_time_millis = self.runtime_services.time_provider.current_time_millis();
140
141 let rpc_request_packet_rx = RpcRequestPacketRx::decode(¤t_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
143 let payload_request_kind_index = rpc_request_packet_rx.payload_request_kind.clone() as usize;
149 self.request_counters[payload_request_kind_index].fetch_add(1, Ordering::Relaxed);
150 self.request_rate_windows[payload_request_kind_index].lock().record(current_time_millis);
153
154 {
156 if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
157 anyhow::bail!("replay detected: salt already seen");
158 }
159 self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
160 }
161
162 let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
164
165 let dispatch_result: anyhow::Result<BytesGatherer> = try {
166 let pow = match rpc_request_packet_rx.pow_server_known {
168 true => {
169 let (pow, improved_pow_current_day, improved_pow_current_month) = {
170 let peer_self = self.peer_self.read(); let pow = PeerPow::new(
172 rpc_request_packet_rx.pow_sponsor_id,
173 &peer_self.verification_key_bytes,
174 &peer_self.pq_commitment_bytes,
175 rpc_request_packet_rx.pow_timestamp,
176 rpc_request_packet_rx.pow_content_hash,
177 rpc_request_packet_rx.pow_salt,
178 )?;
179
180 let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
181 let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
182
183 (pow, improved_pow_current_day, improved_pow_current_month)
184 };
185
186 if improved_pow_current_day || improved_pow_current_month {
188 let mut peer_self = self.peer_self.write(); if improved_pow_current_day {
190 trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
191 peer_self.pow_current_day = pow.clone();
192 }
193 if improved_pow_current_month {
194 trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
195 peer_self.pow_current_month = pow.clone();
196 }
197
198 peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
199 }
200
201 Some(pow)
202 }
203
204 false => {
205 match rpc_request_packet_rx.payload_request_kind {
207 PayloadRequestKind::BootstrapV1 => {}
208 _ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
209 }
210
211 None
212 }
213 };
214
215 let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
217 let response_flags = match compress_response {
218 true => RpcResponsePacketTxFlags::COMPRESSED,
219 false => RpcResponsePacketTxFlags::empty(),
220 };
221
222 RpcResponsePacketTx::encode(
224 &self.server_id.keys.signature_key,
225 &self.server_id.keys.verification_key_bytes,
226 &self.server_id.keys.pq_commitment_bytes,
227 &self.server_id.sponsor_id,
228 &self.server_id.timestamp,
229 &self.server_id.hash,
230 &self.server_id.salt,
231 &pow_content_hash,
232 response_flags,
233 payload_response_kind,
234 payload,
235 )?
236 };
237
238 match dispatch_result {
239 Ok(results) => Ok(results),
240 Err(e) => {
241 warn!("failed to dispatch packet from {}: {}", caller_address, e);
242 incoming.report_bad_request();
243
244 let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
245 let response = ErrorResponseV1 { code: 0, message: e.to_string() };
246 let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
247
248 RpcResponsePacketTx::encode(
250 &self.server_id.keys.signature_key,
251 &self.server_id.keys.verification_key_bytes,
252 &self.server_id.keys.pq_commitment_bytes,
253 &self.server_id.sponsor_id,
254 &self.server_id.timestamp,
255 &self.server_id.hash,
256 &self.server_id.salt,
257 &pow_content_hash,
258 RpcResponsePacketTxFlags::COMPRESSED,
259 payload_response_kind,
260 payload,
261 )
262 }
263 }
264 }
265
266 async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
267 let compress_response = match rpc_request_packet_rx.payload_request_kind {
269 PayloadRequestKind::GetPostBundleV1 => false, PayloadRequestKind::CachePostBundleV1 => false, _ => true,
272 };
273
274 let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
275 PayloadRequestKind::ErrorV1 => {
276 anyhow::bail!("Received ErrorV1");
277 }
278 PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
279 PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
280 PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
281 PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
282 PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
283 PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
284 PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
285 PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
286 PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
287 PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
288 PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
289 PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
290 PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
291 PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
292 PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
293 PayloadRequestKind::PeerStatsRequestV1 => self.dispatch_network_payload_x_PeerStatsRequestV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
294 PayloadRequestKind::AnnounceV2 => self.dispatch_network_payload_x_AnnounceV2(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
295 };
296
297 Ok((compress_response, payload_response_kind, payload))
298 }
299
300 #[allow(non_snake_case)]
301 async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
302 anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
303 let peer = self.peer_self.read().clone();
304 let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
305 Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
306 }
307
308 #[allow(non_snake_case)]
309 async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
310 anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
311 let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
312 let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
313 Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
314 }
315
316 #[allow(non_snake_case)]
317 async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
318 anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
319
320 let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
321
322 let peer = request.peer_self;
323 let peer_id = peer.id;
324
325 warn!("received deprecated AnnounceV1 from peer {} at {}; V1 will be removed in a future release", peer_id, peer.address);
330
331 self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
333
334 let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
335
336 let json = json::struct_to_bytes(&AnnounceResponseV1 {
337 peer_self: self.peer_self.read().clone(),
338 peers_nearest,
339 })?;
340 Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
341 }
342
343 #[allow(non_snake_case)]
344 async fn dispatch_network_payload_x_AnnounceV2(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
345 anyhow_assert_eq!(&PayloadRequestKind::AnnounceV2, &payload_request_kind);
346
347 let request = json::bytes_to_struct::<AnnounceV2>(&bytes)?;
348
349 let peer = request.peer_self;
350 let peer_id = peer.id;
351 let now = self.runtime_services.time_provider.as_ref().current_time_millis();
352
353 let ownership_proof = self.transport_server.get_transport_ownership_proof();
360 if !ownership_proof.prove(&peer, &request.proof_payload, now) {
361 anyhow::bail!("AnnounceV2 from peer {} at {} failed ownership proof", peer_id, peer.address);
362 }
363 self.add_potential_peer_to_kademlia(peer, now).await;
364
365 let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
366
367 let json = json::struct_to_bytes(&AnnounceResponseV2 {
368 peer_self: self.peer_self.read().clone(),
369 peers_nearest,
370 })?;
371 Ok((PayloadResponseKind::AnnounceResponseV2, BytesGatherer::from_bytes(json)))
372 }
373
374 #[allow(non_snake_case)]
375 async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
376 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
377
378 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
379
380 let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
381 trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
382
383 request.bucket_location.validate()?;
385
386 {
388 for peer in request.peers_visited {
389 self.add_potential_peer_to_kademlia(peer, time_millis).await;
390 }
391 }
392
393 let peer_self = self.peer_self.read().clone();
394
395 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
397 if !among_peers_nearer {
398 warn!("I am not in peers_nearer {}", peer_self);
399 }
400
401 let post_bundle = match among_peers_nearer {
402 true => {
403 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
406
407 let mut encoded_post_bundle_bytes: Option<Bytes> = None;
408
409 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
411 if let Some(mut post_bundle_metadata) = post_bundle_metadata {
412 encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
413
414 if !post_bundle_metadata.sealed {
416 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
417 if sealed {
418 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
420 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
421 encoded_post_bundle.header.time_millis = time_millis;
422 encoded_post_bundle.header.sealed = true;
423 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
424 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
425 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
426 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
427 }
428
429 post_bundle_metadata.sealed = true;
431 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
432 }
433 else {
434 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
436 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
437 encoded_post_bundle.header.time_millis = time_millis;
438 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
439 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
440 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
441 }
442 }
443 }
444 };
445
446 if encoded_post_bundle_bytes.is_none() {
449 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
450
451 let mut header = EncodedPostBundleHeaderV1 {
452 time_millis,
453 location_id: request.bucket_location.location_id,
454 overflowed: false,
455 sealed,
456 num_posts: 0,
457 encoded_post_ids: vec![],
458 encoded_post_lengths: vec![],
459 encoded_post_healed: HashSet::new(),
460 peer: peer_self.clone(),
461 signature: Signature::zero(),
462 };
463 header.signature_generate(&self.server_id.keys.signature_key)?;
464
465 let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
466 encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
467 }
468
469 encoded_post_bundle_bytes
470 }
471 false => None,
472 };
473
474 let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
475
476 let get_post_bundle_response = GetPostBundleResponseV1 {
477 peers_nearer,
478 cache_request_token: cache_result.cache_request_token,
479 post_bundles_cached: cache_result.cached_items,
480 post_bundle,
481 };
482 Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
483 }
484
485 #[allow(non_snake_case)]
486 async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
487 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
488
489 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
490
491 let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
492 trace!("received GetPostBundleFeedbackV1");
493
494 {
496 for peer in request.peers_visited {
497 self.add_potential_peer_to_kademlia(peer, time_millis).await;
498 }
499 }
500
501 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
503
504 let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
505
506 if among_peers_nearer {
507 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
509 if post_bundle_metadata.is_some() {
510 post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
511 }
512 }
513
514 let peer_self = self.peer_self.read().clone();
516 let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
517 Some(feedbacks_bytes) => {
518
519 let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
520
521 let mut header = EncodedPostBundleFeedbackHeaderV1 {
522 time_millis,
523 location_id: request.bucket_location.location_id,
524 feedbacks_bytes_hash,
525 peer: peer_self.clone(),
526 signature: Signature::zero(),
527 };
528 header.signature_generate(&self.server_id.keys.signature_key);
529
530 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
531 header,
532 feedbacks_bytes,
533 };
534 Some(encoded_post_bundle_feedback.to_bytes()?)
535 }
536 None => None,
537 };
538
539 let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
540
541 let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
542 peers_nearer,
543 cache_request_token: cache_result.cache_request_token,
544 post_bundle_feedbacks_cached: cache_result.cached_items,
545 encoded_post_bundle_feedback,
546 };
547 Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
548 }
549
550 #[allow(non_snake_case)]
551 async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
552 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
553
554 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
555
556 let pow = match pow {
557 Some(pow) => pow,
558 None => anyhow::bail!("We need pow for a submit post claim"),
559 };
560
561 let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
562 trace!("received SubmitPostClaimV1");
563
564 request.bucket_location.validate()?;
566
567 let bucket_duration = {
569 let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
570 match bucket_duration {
571 Some(bucket_duration) => *bucket_duration,
572 None => anyhow::bail!("Unrecognised bucket duration provided"),
573 }
574 };
575
576 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
577
578 {
580 let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
581 if pow.pow < pow_minimum {
582 anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
583 }
584 }
585
586 {
588 let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
590 if timestamp != request.bucket_location.bucket_time_millis {
591 anyhow::bail!("The post timestamp does not match the bucket");
592 }
593 }
594
595 let client_id = decoded_post.header.client_id()?;
596
597 if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
599 anyhow::bail!("The base_id is not related to the post");
600 }
601
602 if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
604 anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
605 }
606
607 if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
610 let original_header_bytes = request.referenced_post_header_bytes
611 .ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
612
613 let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
616
617 if original_post.post_id != request.bucket_location.base_id {
619 anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
620 }
621
622 if request.bucket_location.bucket_type == BucketType::Sequel {
624 let original_client_id = original_post.header.client_id()?;
625 if original_client_id != client_id {
626 anyhow::bail!("Sequel post author does not match original post author");
627 }
628 }
629 }
630
631 {
633 let delta = (time_millis - decoded_post.header.time_millis).abs();
634 if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
635 anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
636 }
637 }
638
639 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
641
642 let submit_post_claim_token = match among_peers_nearer {
643 true => {
644 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
646 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
647 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
648
649 if !post_bundle_metadata.sealed {
651 post_bundle_metadata.num_posts_granted += 1;
652 post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
653 post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
654
655 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
656 }
657
658 match post_bundle_metadata.sealed {
660 false => {
661 info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
662 Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
663 }
664 true => {
665 info!(
666 "Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
667 request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
668 );
669 None
670 }
671 }
672 }
673
674 false => None,
675 };
676
677 if submit_post_claim_token.is_some() {
679 if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
681 let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
682 for referenced_hashtag in &request.referenced_hashtags {
683 let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
684 Ok(id) => id,
685 Err(_) => continue, };
687 if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
688 continue; }
690 let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_default();
691 hll.insert(author_verification_key_bytes.as_ref());
692 self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
693 }
694 }
695 }
696
697 let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
698 Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
699 }
700
701 #[allow(non_snake_case)]
702 async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
703 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
704
705 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
706
707 let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
708 trace!("received SubmitPostCommitV1");
709
710 let peer_self = self.peer_self.read(); if request.submit_post_claim_token.peer.id != peer_self.id {
714 anyhow::bail!("The submit_post_claim_token is not from us");
715 }
716
717 request.bucket_location.validate()?;
719 if request.bucket_location != request.submit_post_claim_token.bucket_location {
720 anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
721 }
722
723 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
725
726 if decoded_post.post_id != request.submit_post_claim_token.post_id {
728 anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
729 }
730
731 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
733 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
734 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
735
736 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
738
739 let mut post_bundle = match post_bundle_bytes {
741 Some(bytes) => {
742 let bytes = Bytes::from_owner(bytes);
743
744 EncodedPostBundleV1::from_bytes(bytes, true)?
745 }
746 None => {
747 let header = EncodedPostBundleHeaderV1 {
748 time_millis: TimeMillis::zero(),
749 location_id: request.bucket_location.location_id,
750 overflowed: false,
751 sealed: false,
752 num_posts: 0,
753 encoded_post_ids: vec![],
754 encoded_post_lengths: vec![],
755 encoded_post_healed: HashSet::new(),
756 peer: self.peer_self.read().clone(),
757 signature: Signature::zero(),
758 };
759
760 EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
761 }
762 };
763
764 if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
766 anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
767 }
768
769 post_bundle.header.time_millis = time_millis;
771 post_bundle.header.num_posts += 1;
772 post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
773 post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
774 post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
775 post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
776 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
777 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
778 posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
779 post_bundle.encoded_posts_bytes = posts_mut.freeze();
780 let post_bundle_bytes_new = post_bundle.to_bytes()?;
781
782 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
784 post_bundle_metadata.overflowed = post_bundle.header.overflowed;
785 post_bundle_metadata.sealed = post_bundle.header.sealed;
786 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
787
788 {
789 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
790 self.environment
791 .put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
792 }
793
794 info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
795
796 let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
797
798 let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
799 Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
800 }
801
802 #[allow(non_snake_case)]
803 async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
804 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
805
806 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
807
808 let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
809 trace!("received SubmitPostFeedbackV1");
810
811 let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
813 if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
814 anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
815 }
816
817 request.encoded_post_feedback.pow_verify()?;
819
820 let location_id = request.bucket_location.location_id;
821
822 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
824
825 let accepted = (|| -> anyhow::Result<bool> {
826 if !among_peers_nearer {
827 return Ok(false);
828 }
829
830 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
832 let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
833 else {
834 return Ok(false);
835 };
836
837 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
838 if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
839 return Ok(false);
840 }
841
842 Ok(true)
843 })()?;
844
845 if accepted {
847 trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
848 self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
849 }
850
851 let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
852 Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
853 }
854
855 #[allow(non_snake_case)]
856 async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
857 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
858
859 fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
860 let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
861 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
862 }
863
864 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
865 let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
866 trace!("received HealPostBundleClaimV1");
867
868 request.bucket_location.validate()?;
870 if request.bucket_location.location_id != request.donor_header.location_id {
871 anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
872 }
873
874 request.donor_header.verify()?;
876
877 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
879 if !among_peers_nearer {
880 return generate_negatory_response();
881 }
882
883 if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
885 return generate_negatory_response();
886 }
887
888 let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
890 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
891
892 let our_post_ids: HashSet<Id> = match post_bundle_bytes {
893 Some(bytes) => {
894 let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
895 bundle.header.encoded_post_ids.into_iter().collect()
896 }
897 None => HashSet::new(),
898 };
899
900 let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
902
903 if needed_post_ids.is_empty() {
904 return generate_negatory_response();
905 }
906
907 self.heal_in_progress.insert(request.donor_header.location_id, ());
908
909 let token = Some(HealPostBundleClaimTokenV1::new(
910 self.peer_self.read().clone(),
911 request.bucket_location,
912 needed_post_ids.clone(),
913 request.donor_header.signature,
914 &self.server_id.keys.signature_key,
915 ));
916 let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
917 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
918 }
919
920 #[allow(non_snake_case)]
921 async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
922 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
923
924 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
925 let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
926 trace!("received HealPostBundleCommitV1");
927
928 let peer_self = self.peer_self.read().clone();
930 if request.token.peer.id != peer_self.id {
931 anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
932 }
933 request.token.verify()?;
934
935 if request.donor_header.signature != request.token.donor_header_signature {
937 anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
938 }
939 request.donor_header.verify()?;
940
941 if request.token.bucket_location.location_id != request.donor_header.location_id {
942 anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
943 }
944
945 let location_id = request.donor_header.location_id;
946
947 let mut remaining_bytes = request.encoded_posts_bytes.clone();
949 let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
950 for post_id in &request.token.needed_post_ids {
951 let len = request
952 .donor_header
953 .encoded_post_ids
954 .iter()
955 .zip(request.donor_header.encoded_post_lengths.iter())
956 .find(|(id, _)| *id == post_id)
957 .map(|(_, len)| *len)
958 .ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
959 if remaining_bytes.len() < len {
960 anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
961 }
962 let post_bytes = remaining_bytes.split_to(len);
963 posts_to_add.push((*post_id, post_bytes));
964 }
965 if !remaining_bytes.is_empty() {
966 anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
967 }
968
969 for (post_id, post_bytes) in &posts_to_add {
971 EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
972 }
973
974 let _lock = self.environment.get_write_lock_for_location_id(&location_id);
976 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
977 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
978 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
979
980 let mut post_bundle = match post_bundle_bytes {
981 Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
982 None => EncodedPostBundleV1 {
983 header: EncodedPostBundleHeaderV1 {
984 time_millis: TimeMillis::zero(),
985 location_id,
986 overflowed: request.donor_header.overflowed,
987 sealed: request.donor_header.sealed,
988 num_posts: 0,
989 encoded_post_ids: vec![],
990 encoded_post_lengths: vec![],
991 encoded_post_healed: HashSet::new(),
992 peer: peer_self.clone(),
993 signature: Signature::zero(),
994 },
995 encoded_posts_bytes: Bytes::new(),
996 },
997 };
998
999 let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
1000
1001 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
1002 let mut added_any = false;
1003 for (post_id, post_bytes) in posts_to_add {
1004 if !our_post_ids.contains(&post_id) {
1005 let len = post_bytes.len();
1006 posts_mut.extend_from_slice(&post_bytes);
1007 post_bundle.header.encoded_post_ids.push(post_id);
1008 post_bundle.header.encoded_post_lengths.push(len);
1009 post_bundle.header.encoded_post_healed.insert(post_id);
1010 added_any = true;
1011 }
1012 }
1013 post_bundle.encoded_posts_bytes = posts_mut.freeze();
1014
1015 if added_any {
1016 post_bundle.header.time_millis = time_millis;
1017 post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
1018 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
1019
1020 let new_bytes = post_bundle.to_bytes()?;
1021 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
1022 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
1023
1024 self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
1025 self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
1026
1027 info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
1028 }
1029
1030 self.heal_in_progress.invalidate(&location_id);
1031
1032 let response = HealPostBundleCommitResponseV1 { accepted: added_any };
1033 Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1034 }
1035
1036 #[allow(non_snake_case)]
1037 async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1038 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
1039
1040 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1041 let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
1042 trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
1043
1044 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
1046 if !among_peers_nearer {
1047 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1048 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1049 }
1050
1051 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
1053 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
1054 let Some(post_bundle_bytes) = post_bundle_bytes
1055 else {
1056 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1057 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1058 };
1059 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
1060
1061 let mut accepted_count: u32 = 0;
1062 for feedback in &request.encoded_post_feedbacks {
1063 if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
1065 continue;
1066 }
1067 self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
1068 accepted_count += 1;
1069 }
1070
1071 if accepted_count > 0 {
1072 trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
1073 }
1074
1075 let response = HealPostBundleFeedbackResponseV1 { accepted_count };
1076 Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1077 }
1078
1079 #[allow(non_snake_case)]
1080 async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1081 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
1082
1083 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1084 let request = CachePostBundleV1::from_bytes(&mut bytes)?;
1085 trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
1086
1087 let peer_self = self.peer_self.read().clone();
1089 if request.token.peer.id != peer_self.id {
1090 anyhow::bail!("CachePostBundleV1: token was not issued by this server");
1091 }
1092 request.token.verify()?;
1093 if request.token.is_expired(time_millis) {
1094 anyhow::bail!("CachePostBundleV1: token has expired");
1095 }
1096
1097 let mut any_accepted = false;
1098 for bundle_bytes in request.encoded_post_bundles {
1099 let parse_result: anyhow::Result<()> = try {
1100 let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
1101
1102 encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
1104
1105 let originator_peer_id = encoded_post_bundle.header.peer.id;
1106 let is_sealed = encoded_post_bundle.header.sealed;
1107 if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
1108 any_accepted = true;
1109 }
1110 };
1111 if let Err(e) = &parse_result {
1112 warn!("CachePostBundleV1: failed to parse bundle: {}", e);
1113 }
1114 }
1115 let response = CachePostBundleResponseV1 { accepted: any_accepted };
1116 Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1117 }
1118
1119 #[allow(non_snake_case)]
1120 async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1121 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
1122
1123 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1124 let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1125 trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
1126
1127 let peer_self = self.peer_self.read().clone();
1129 if request.token.peer.id != peer_self.id {
1130 anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
1131 }
1132 request.token.verify()?;
1133 if request.token.is_expired(time_millis) {
1134 anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
1135 }
1136
1137 let result: anyhow::Result<bool> = try {
1138 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
1139
1140 encoded_post_bundle_feedback.verify()?;
1141
1142 let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
1143 self.post_bundle_feedback_cache
1145 .on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
1146 };
1147 let accepted = result.unwrap_or_else(|e| {
1148 warn!("CachePostBundleFeedbackV1: parse error: {}", e);
1149 false
1150 });
1151
1152 let response = CachePostBundleFeedbackResponseV1 { accepted };
1153 Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1154 }
1155
1156 #[allow(non_snake_case)]
1157 async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1158 anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
1159
1160 let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1161 trace!("received FetchUrlPreviewV1 for url={}", request.url);
1162
1163 if !request.url.starts_with("https://") {
1166 anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
1167 }
1168
1169 let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
1171 let host = if host_and_port.starts_with('[') {
1172 host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
1174 } else {
1175 host_and_port.split(':').next().unwrap_or(host_and_port)
1177 };
1178 if host.is_empty() {
1179 anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
1180 }
1181 if host.parse::<std::net::IpAddr>().is_ok() {
1182 anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
1183 }
1184
1185 let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
1189 .await
1190 .map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
1191 .collect();
1192 if resolved_socket_addrs.is_empty() {
1193 anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
1194 }
1195 for socket_addr in &resolved_socket_addrs {
1196 let ip = socket_addr.ip();
1197 if is_ssrf_protected_ip(ip) {
1198 anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
1199 }
1200 }
1201
1202 let http_client = reqwest::Client::builder()
1209 .connect_timeout(std::time::Duration::from_secs(1))
1210 .timeout(std::time::Duration::from_secs(3))
1211 .user_agent("hashiverse-preview/1.0")
1212 .resolve_to_addrs(host, &resolved_socket_addrs)
1213 .redirect(reqwest::redirect::Policy::none())
1214 .no_proxy()
1215 .build()?;
1216
1217 const URL_FETCH_MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
1218 let mut http_response = http_client.get(&request.url).send().await?;
1219
1220 if let Some(content_length) = http_response.content_length() {
1222 if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
1223 anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
1224 }
1225 }
1226 let mut body_bytes = BytesMut::new();
1227 while let Some(chunk) = http_response.chunk().await? {
1228 let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
1233 body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
1234 if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
1235 break;
1236 }
1237 }
1238 let html = String::from_utf8_lossy(&body_bytes).into_owned();
1239
1240 let preview_data = url_preview::extract_url_preview(&html);
1241
1242 let response = FetchUrlPreviewResponseV1 {
1243 url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
1244 title: preview_data.title,
1245 description: preview_data.description,
1246 image_url: preview_data.image_url,
1247 };
1248
1249 Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1250 }
1251
1252 #[allow(non_snake_case)]
1253 async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1254 anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
1255
1256 let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
1257 trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
1258
1259 let time_millis = self.runtime_services.time_provider.current_time_millis();
1260
1261 let cached_response = {
1263 let cache = self.trending_hashtags_response_cache.lock();
1264 match cache.as_ref() {
1265 Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
1266 Some(cached_response.clone())
1267 }
1268 _ => None,
1269 }
1270 };
1271
1272 let mut response = match cached_response {
1273 Some(mut cached) => {
1274 cached.trending_hashtags.truncate(request.limit as usize);
1275 cached
1276 }
1277 None => {
1278 let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
1280 .map(|(hashtag, hll)| TrendingHashtagV1 {
1281 hashtag: hashtag.as_ref().clone(),
1282 count: hll.count(),
1283 })
1284 .filter(|entry| entry.count > 0)
1285 .collect();
1286
1287 trending_hashtags.sort_by_key(|entry| std::cmp::Reverse(entry.count));
1288
1289 let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
1290
1291 {
1293 let mut cache = self.trending_hashtags_response_cache.lock();
1294 *cache = Some((time_millis, full_response.clone()));
1295 }
1296
1297 let mut truncated_response = full_response;
1298 truncated_response.trending_hashtags.truncate(request.limit as usize);
1299 truncated_response
1300 }
1301 };
1302
1303 top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
1304
1305 Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1306 }
1307
1308 #[allow(non_snake_case)]
1309 async fn dispatch_network_payload_x_PeerStatsRequestV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1310 anyhow_assert_eq!(&PayloadRequestKind::PeerStatsRequestV1, &payload_request_kind);
1311
1312 let _request = PeerStatsRequestV1::from_bytes(&bytes)?;
1313
1314 let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for PeerStatsRequestV1"))?;
1315 if pow.pow < config::POW_MINIMUM_PER_PEER_STATS {
1316 anyhow::bail!("Insufficient pow for PeerStatsRequestV1: {} < {}", pow.pow, config::POW_MINIMUM_PER_PEER_STATS);
1317 }
1318
1319 let time_millis = self.runtime_services.time_provider.current_time_millis();
1320
1321 let cached_response = {
1324 let cache = self.peer_stats_response_cache.lock();
1325 match cache.as_ref() {
1326 Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(60) => Some(cached_response.clone()),
1327 _ => None,
1328 }
1329 };
1330
1331 let response = match cached_response {
1332 Some(cached) => cached,
1333 None => {
1334 let doc = serde_json::json!({
1335 "version": env!("CARGO_PKG_VERSION"),
1336 "requests": request_counts_subtree(&self.request_counters, &self.request_rate_windows, time_millis),
1337 "system": system_stats_subtree(),
1338 "kademlia": kademlia_stats_subtree(&self.kademlia.read()),
1339 "environment": environment_stats_subtree(&self.environment),
1340 });
1341
1342 let json_bytes = serde_json::to_vec(&doc)?;
1343 let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
1344
1345 let signing_input = PeerStatsResponseV1::signing_input(time_millis, &json_compressed);
1346 let signature = signing::sign(&self.server_id.keys.signature_key, &signing_input);
1347
1348 let response = PeerStatsResponseV1 {
1349 peer: self.peer_self.read().clone(),
1350 timestamp: time_millis,
1351 json_compressed,
1352 signature,
1353 };
1354
1355 *self.peer_stats_response_cache.lock() = Some((time_millis, response.clone()));
1356 response
1357 }
1358 };
1359
1360 Ok((PayloadResponseKind::PeerStatsResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1361 }
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366 use super::*;
1367
1368 fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
1369 TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
1370 }
1371
1372 #[test]
1373 fn top_up_adds_fallback_when_empty() {
1374 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1375 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
1376 assert_eq!(trending_hashtags.len(), 2);
1377 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1378 assert_eq!(trending_hashtags[0].count, 0);
1379 assert_eq!(trending_hashtags[1].hashtag, "#news");
1380 assert_eq!(trending_hashtags[1].count, 0);
1381 }
1382
1383 #[test]
1384 fn top_up_respects_limit_smaller_than_fallback_list() {
1385 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1386 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
1387 assert_eq!(trending_hashtags.len(), 1);
1388 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1389 }
1390
1391 #[test]
1392 fn top_up_preserves_fallback_order() {
1393 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1394 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
1395 assert_eq!(trending_hashtags[0].hashtag, "#first");
1396 assert_eq!(trending_hashtags[1].hashtag, "#second");
1397 assert_eq!(trending_hashtags[2].hashtag, "#third");
1398 }
1399
1400 #[test]
1401 fn top_up_is_noop_when_already_at_limit() {
1402 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
1403 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
1404 assert_eq!(trending_hashtags.len(), 2);
1405 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1406 assert_eq!(trending_hashtags[1].hashtag, "#golang");
1407 }
1408
1409 #[test]
1410 fn top_up_partially_fills_when_real_trending_exists() {
1411 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
1412 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1413 assert_eq!(trending_hashtags.len(), 3);
1414 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1415 assert_eq!(trending_hashtags[0].count, 10);
1416 assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
1417 assert_eq!(trending_hashtags[1].count, 0);
1418 assert_eq!(trending_hashtags[2].hashtag, "#news");
1419 assert_eq!(trending_hashtags[2].count, 0);
1420 }
1421
1422 #[test]
1423 fn top_up_skips_fallback_already_present_exact_match() {
1424 let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
1425 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1426 assert_eq!(trending_hashtags.len(), 2);
1427 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1428 assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
1429 assert_eq!(trending_hashtags[1].hashtag, "#news");
1430 assert_eq!(trending_hashtags[1].count, 0);
1431 }
1432
1433 #[test]
1434 fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
1435 let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
1437 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1438 assert_eq!(trending_hashtags.len(), 2);
1439 assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
1440 assert_eq!(trending_hashtags[1].hashtag, "#news");
1441 }
1442
1443 #[test]
1444 fn top_up_with_empty_fallback_is_noop() {
1445 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1446 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
1447 assert_eq!(trending_hashtags.len(), 0);
1448 }
1449
1450 #[test]
1451 fn top_up_handles_zero_limit() {
1452 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1453 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
1454 assert_eq!(trending_hashtags.len(), 0);
1455 }
1456
1457 #[test]
1458 fn top_up_exhausts_fallback_without_reaching_limit() {
1459 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1461 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
1462 assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
1463 }
1464
1465 mod peer_stats {
1466 use super::*;
1467 use crate::environment::mem_environment_store::MemEnvironmentFactory;
1468 use crate::environment::environment::EnvironmentFactory;
1469 use crate::server::args::Args;
1470 use crate::server::hashiverse_server::HashiverseServer;
1471 use hashiverse_lib::protocol::payload::payload::{PAYLOAD_REQUEST_KIND_COUNT, PeerStatsRequestV1, PeerStatsResponseV1};
1472 use hashiverse_lib::protocol::peer::PeerPow;
1473 use hashiverse_lib::tools::compression;
1474 use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
1475 use hashiverse_lib::tools::runtime_services::RuntimeServices;
1476 use hashiverse_lib::tools::time::TimeMillis;
1477 use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
1478 use hashiverse_lib::tools::types::{Pow, VerificationKey};
1479 use hashiverse_lib::transport::mem_transport::MemTransportFactory;
1480 use std::sync::Arc;
1481 use std::sync::atomic::Ordering;
1482
1483 async fn make_server() -> anyhow::Result<Arc<HashiverseServer>> {
1484 let time_provider = Arc::new(RealTimeProvider);
1485 let transport_factory = MemTransportFactory::default();
1486 let pow_generator = Arc::new(SingleThreadedPowGenerator::new());
1487 let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
1488 let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
1489 let args = Args::default_for_testing();
1490 HashiverseServer::new(runtime_services, environment_factory, args).await
1491 }
1492
1493 fn synthetic_pow(pow: Pow) -> PeerPow {
1497 let mut peer_pow = PeerPow::zero();
1498 peer_pow.pow = pow;
1499 peer_pow
1500 }
1501
1502 fn empty_request_bytes() -> Bytes {
1503 PeerStatsRequestV1 {}.to_bytes().expect("PeerStatsRequestV1 must serialise")
1504 }
1505
1506 fn decode_doc(response: &PeerStatsResponseV1) -> serde_json::Value {
1507 let bytes = compression::decompress(&response.json_compressed).expect("decompress doc").to_bytes();
1508 serde_json::from_slice(&bytes).expect("doc must be valid JSON")
1509 }
1510
1511 #[tokio::test]
1512 async fn rejects_insufficient_pow() {
1513 let server = make_server().await.expect("server must start");
1514 let result = server
1515 .dispatch_network_payload_x_PeerStatsRequestV1(
1516 CancellationToken::new(),
1517 Some(synthetic_pow(Pow(config::POW_MINIMUM_PER_PEER_STATS.0.saturating_sub(1)))),
1518 PayloadRequestKind::PeerStatsRequestV1,
1519 empty_request_bytes(),
1520 )
1521 .await;
1522 assert!(result.is_err(), "expected insufficient PoW to be rejected");
1523 }
1524
1525 #[tokio::test]
1526 async fn returns_response_with_expected_top_level_keys() {
1527 let server = make_server().await.expect("server must start");
1528 let (response_kind, gatherer) = server
1529 .dispatch_network_payload_x_PeerStatsRequestV1(
1530 CancellationToken::new(),
1531 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1532 PayloadRequestKind::PeerStatsRequestV1,
1533 empty_request_bytes(),
1534 )
1535 .await
1536 .expect("handler must succeed at threshold pow");
1537 assert_eq!(response_kind, PayloadResponseKind::PeerStatsResponseV1);
1538
1539 let response_bytes = gatherer.to_bytes();
1540 let response = PeerStatsResponseV1::from_bytes(&response_bytes).expect("response must decode");
1541 let doc = decode_doc(&response);
1542
1543 assert!(doc.get("version").and_then(|v| v.as_str()).map(|s| !s.is_empty()).unwrap_or(false), "version must be a non-empty string");
1544 assert_eq!(doc["version"].as_str().unwrap(), env!("CARGO_PKG_VERSION"));
1545 assert!(doc.get("requests").is_some(), "requests subtree missing");
1546 assert!(doc.get("system").is_some(), "system subtree missing");
1547 assert!(doc.get("kademlia").is_some(), "kademlia subtree missing");
1548 assert!(doc.get("environment").is_some(), "environment subtree missing");
1549
1550 for key in ["memory_total_bytes", "memory_free_bytes", "disk_total_bytes", "disk_free_bytes", "load_1m", "load_5m", "load_15m"] {
1551 assert!(doc["system"].get(key).map(|v| v.is_number()).unwrap_or(false), "system.{key} must be a number");
1552 }
1553 for key in ["post_bundle_count", "post_bundle_feedback_count", "post_bundle_total_bytes"] {
1554 assert!(doc["environment"].get(key).map(|v| v.is_number()).unwrap_or(false), "environment.{key} must be a number");
1555 }
1556 }
1557
1558 #[tokio::test]
1559 async fn counters_reflect_recorded_dispatches() {
1560 let server = make_server().await.expect("server must start");
1561
1562 for _ in 0..7 {
1566 server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(1, Ordering::Relaxed);
1567 }
1568
1569 let (_, gatherer) = server
1570 .dispatch_network_payload_x_PeerStatsRequestV1(
1571 CancellationToken::new(),
1572 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1573 PayloadRequestKind::PeerStatsRequestV1,
1574 empty_request_bytes(),
1575 )
1576 .await
1577 .expect("handler must succeed");
1578 let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1579 let doc = decode_doc(&response);
1580 assert_eq!(doc["requests"]["PingV1"]["total"].as_u64(), Some(7));
1581 }
1582
1583 #[tokio::test]
1584 async fn cache_returns_byte_identical_response_within_ttl() {
1585 let server = make_server().await.expect("server must start");
1586 let pow = synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS);
1587
1588 let (_, gatherer_a) = server
1589 .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow.clone()), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1590 .await
1591 .expect("first call must succeed");
1592 let bytes_a = gatherer_a.to_bytes();
1593
1594 server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(99, Ordering::Relaxed);
1597
1598 let (_, gatherer_b) = server
1599 .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1600 .await
1601 .expect("second call must succeed");
1602 let bytes_b = gatherer_b.to_bytes();
1603
1604 assert_eq!(bytes_a, bytes_b, "cached response should be byte-identical across the TTL");
1605 }
1606
1607 #[tokio::test]
1608 async fn signature_verifies_and_fails_on_tamper() {
1609 let server = make_server().await.expect("server must start");
1610 let (_, gatherer) = server
1611 .dispatch_network_payload_x_PeerStatsRequestV1(
1612 CancellationToken::new(),
1613 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1614 PayloadRequestKind::PeerStatsRequestV1,
1615 empty_request_bytes(),
1616 )
1617 .await
1618 .expect("handler must succeed");
1619 let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1620
1621 let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes).expect("verification key must decode");
1622 let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
1623 signing::verify(&verification_key, &response.signature, &signing_input).expect("signature must verify against transmitted bytes");
1624
1625 let mut tampered = response.json_compressed.to_vec();
1627 let tamper_index = tampered.len() / 2;
1628 tampered[tamper_index] ^= 0xff;
1629 let tampered_signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &tampered);
1630 assert!(signing::verify(&verification_key, &response.signature, &tampered_signing_input).is_err(), "verification must fail when json_compressed is tampered");
1631
1632 let bumped_signing_input = PeerStatsResponseV1::signing_input(TimeMillis(response.timestamp.0 + 1), &response.json_compressed);
1634 assert!(signing::verify(&verification_key, &response.signature, &bumped_signing_input).is_err(), "verification must fail when timestamp is mutated");
1635 }
1636
1637 #[test]
1638 fn request_counts_subtree_covers_every_variant() {
1639 let counters: [std::sync::atomic::AtomicU64; PAYLOAD_REQUEST_KIND_COUNT] = std::array::from_fn(|_| std::sync::atomic::AtomicU64::new(0));
1642 let windows: [parking_lot::Mutex<crate::server::stats::RequestRateWindows>; PAYLOAD_REQUEST_KIND_COUNT] =
1643 std::array::from_fn(|_| parking_lot::Mutex::new(crate::server::stats::RequestRateWindows::new()));
1644 let subtree = request_counts_subtree(&counters, &windows, TimeMillis::zero());
1645 let map = subtree.as_object().expect("request_counts subtree must be an object");
1646 assert_eq!(map.len(), PAYLOAD_REQUEST_KIND_COUNT);
1647 }
1648 }
1649
1650 mod announce_v2 {
1651 use super::*;
1652 use crate::environment::mem_environment_store::MemEnvironmentFactory;
1653 use crate::environment::environment::EnvironmentFactory;
1654 use crate::server::args::Args;
1655 use crate::server::hashiverse_server::HashiverseServer;
1656 use hashiverse_lib::protocol::payload::payload::AnnounceV2;
1657 use hashiverse_lib::protocol::peer::Peer;
1658 use hashiverse_lib::tools::pow_generator::pow_generator::PowGenerator;
1659 use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
1660 use hashiverse_lib::tools::runtime_services::RuntimeServices;
1661 use hashiverse_lib::tools::server_id::ServerId;
1662 use hashiverse_lib::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
1663 use hashiverse_lib::transport::mem_transport::MemTransportFactory;
1664 use std::sync::Arc;
1665
1666 async fn make_test_server() -> Arc<HashiverseServer> {
1667 let time_provider = Arc::new(RealTimeProvider);
1668 let transport_factory = MemTransportFactory::default();
1669 let pow_generator = Arc::new(SingleThreadedPowGenerator::new());
1670 let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
1671 let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
1672 let args = Args::default_for_testing();
1673 HashiverseServer::new(runtime_services, environment_factory, args).await.expect("server must start")
1674 }
1675
1676 async fn make_announcing_peer(time_provider: &dyn TimeProvider, pow_generator: &dyn PowGenerator, address: &str) -> Peer {
1680 let server_id = ServerId::new("own_pow", time_provider, config::SERVER_KEY_POW_MIN, true, pow_generator).await.expect("ServerId::new must succeed in tests (PoW reduced under cfg(test))");
1681 let mut peer = server_id.to_peer(time_provider).expect("to_peer must succeed");
1682 peer.address = address.to_string();
1683 peer.sign(time_provider, &server_id.keys.signature_key).expect("sign must succeed");
1684 peer
1685 }
1686
1687 #[tokio::test]
1688 async fn v2_admits_peer_when_proof_is_valid_for_mem_transport() {
1689 let server = make_test_server().await;
1690 let pow_generator = SingleThreadedPowGenerator::new();
1691 let announcing_peer = make_announcing_peer(server.runtime_services.time_provider.as_ref(), &pow_generator, "9999").await;
1692 let announcing_peer_id = announcing_peer.id;
1693
1694 let baseline_len = server.kademlia.read().len();
1695
1696 let request_bytes = json::struct_to_bytes(&AnnounceV2 { peer_self: announcing_peer, proof_payload: vec![] }).expect("encode V2");
1698
1699 let (response_kind, _gatherer) = server
1700 .dispatch_network_payload_x_AnnounceV2(CancellationToken::new(), PayloadRequestKind::AnnounceV2, request_bytes)
1701 .await
1702 .expect("valid V2 must succeed on mem-transport");
1703
1704 assert_eq!(response_kind, PayloadResponseKind::AnnounceResponseV2);
1705 assert_eq!(server.kademlia.read().len(), baseline_len + 1, "announcer should now be in Kademlia");
1706
1707 let (peers, _) = server.kademlia.read().get_peers_for_key(&announcing_peer_id, 8);
1709 assert!(peers.iter().any(|p| p.id == announcing_peer_id), "announcing peer must be reachable through get_peers_for_key");
1710 }
1711
1712 #[tokio::test]
1713 async fn v2_bails_and_does_not_admit_when_proof_fails() {
1714 let server = make_test_server().await;
1715 let pow_generator = SingleThreadedPowGenerator::new();
1716 let announcing_peer = make_announcing_peer(server.runtime_services.time_provider.as_ref(), &pow_generator, "8888").await;
1717 let announcing_peer_id = announcing_peer.id;
1718
1719 let baseline_len = server.kademlia.read().len();
1720
1721 let request_bytes = json::struct_to_bytes(&AnnounceV2 { peer_self: announcing_peer, proof_payload: vec![1, 2, 3, 4] }).expect("encode V2");
1725
1726 let result = server
1727 .dispatch_network_payload_x_AnnounceV2(CancellationToken::new(), PayloadRequestKind::AnnounceV2, request_bytes)
1728 .await;
1729
1730 assert!(result.is_err(), "failed proof must bail so the dispatcher wraps it as ErrorResponseV1");
1731 assert_eq!(server.kademlia.read().len(), baseline_len, "no Kademlia mutation on failed proof");
1732
1733 let (peers, _) = server.kademlia.read().get_peers_for_key(&announcing_peer_id, 8);
1735 assert!(!peers.iter().any(|p| p.id == announcing_peer_id), "rejected peer must not be reachable through get_peers_for_key");
1736 }
1737 }
1738}
1739