Skip to main content

hashiverse_lib/protocol/payload/
payload.rs

1//! # RPC request and response payloads
2//!
3//! This is the catalogue of *what* peers say to each other. Every application-level
4//! operation — ping, bootstrap, peer announce, submit post (claim + commit), submit
5//! feedback, get post bundle, get post bundle feedback, heal, cache, fetch URL preview,
6//! trending hashtags — has a versioned request and response type defined here, plus a
7//! `u16` discriminator in [`PayloadRequestKind`] / [`PayloadResponseKind`] that routes
8//! packets to the right handler on the receiving side.
9//!
10//! ## Design notes
11//!
12//! - **Tokens**: Two-phase operations (submit post, heal, cache) exchange server-signed
13//!   token types (`SubmitPostClaimTokenV1`, `CacheRequestTokenV1`,
14//!   `HealPostBundleClaimTokenV1`) so the second phase can prove admission was granted
15//!   without the server needing to keep per-client state. Each token embeds the peer's
16//!   own [`Peer`] record so verification is self-contained.
17//! - **Bulk payloads** (`GetPostBundleResponseV1`, `GetPostBundleFeedbackResponseV1`)
18//!   use a custom length-prefixed-JSON + raw-bytes format rather than pure JSON, so
19//!   large opaque blobs stream through the
20//!   [`crate::tools::bytes_gatherer::BytesGatherer`] without being re-serialised.
21//! - **PoW** on any request that carries weight (posting, feedback, fetching URLs) is
22//!   carried inline via [`crate::protocol::peer::ClientPow`] credentials so the server
23//!   can validate amplification before spending CPU on the payload.
24
25use crate::protocol::peer::Peer;
26use crate::tools::buckets::BucketLocation;
27use crate::tools::time::TimeMillis;
28use crate::tools::types::{Hash, Id, Signature, SignatureKey, VerificationKey, ID_BYTES};
29use crate::tools::{hashing, json, signing, tools, BytesGatherer};
30use bytes::{Buf, BufMut, Bytes, BytesMut};
31use serde::{Deserialize, Serialize};
32use strum_macros::{Display, FromRepr};
33use crate::anyhow_assert_ge;
34use crate::protocol::posting::encoded_post_bundle::EncodedPostBundleHeaderV1;
35use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
36
37/// The wire-level discriminator for every RPC request the protocol supports.
38///
39/// This `u16` value sits in the RPC request header so the server-side dispatcher can route
40/// each incoming [`crate::protocol::rpc::RpcRequestPacketRx`] to the correct handler
41/// without having to partially decode the payload first. The variants cover the three
42/// main subsystems: peer exchange (`PingV1`, `BootstrapV1`, `AnnounceV1`), posting and
43/// retrieval (`GetPostBundleV1`, `SubmitPostClaimV1`, `SubmitPostCommitV1`, feedback, heal,
44/// cache), and secondary services (`FetchUrlPreviewV1`, `TrendingHashtagsFetchV1`).
45///
46/// Every variant has a paired [`PayloadResponseKind`]; backwards-compatible additions go at
47/// the end of the enum so existing `u16` values do not shift.
48#[derive(Debug, Display, PartialEq, Clone, FromRepr)]
49#[repr(u16)]
50pub enum PayloadRequestKind {
51    ErrorV1, // Used solely for testing error handling
52    PingV1,
53    BootstrapV1,
54    AnnounceV1,
55    GetPostBundleV1,
56    GetPostBundleFeedbackV1,
57    SubmitPostClaimV1,
58    SubmitPostCommitV1,
59    SubmitPostFeedbackV1,
60    HealPostBundleClaimV1,
61    HealPostBundleCommitV1,
62    HealPostBundleFeedbackV1,
63    CachePostBundleV1,
64    CachePostBundleFeedbackV1,
65    FetchUrlPreviewV1,
66    TrendingHashtagsFetchV1,
67    PeerStatsRequestV1,
68}
69
70/// Number of variants in [`PayloadRequestKind`]. Manually maintained; the
71/// test `test_PAYLOAD_REQUEST_KIND_COUNT_matches_variants` keeps it honest.
72/// Used to size fixed-length per-kind counter arrays (e.g. on the server).
73pub const PAYLOAD_REQUEST_KIND_COUNT: usize = 17;
74
75impl PayloadRequestKind {
76    pub fn from_u16(value: u16) -> anyhow::Result<Self> {
77        Self::from_repr(value).ok_or_else(|| anyhow::anyhow!("unknown PayloadRequestKind: {}", value))
78    }
79}
80
81/// The wire-level discriminator for every RPC response in the protocol.
82///
83/// Each variant pairs with a [`PayloadRequestKind`] of the same name minus the `Response`
84/// suffix (plus the unpaired `ErrorResponseV1`, which any handler may return on failure).
85/// The discriminator rides in the response header so the client knows which payload
86/// deserializer to use when it parses an [`crate::protocol::rpc::RpcResponsePacketRx`].
87///
88/// Additions go at the end of the enum to preserve backwards compatibility.
89#[derive(Debug, Display, PartialEq, Clone, FromRepr)]
90#[repr(u16)]
91pub enum PayloadResponseKind {
92    ErrorResponseV1, // Can be returned by any RPC call if an error happens on the server
93    PingResponseV1,
94    BootstrapResponseV1,
95    AnnounceResponseV1,
96    GetPostBundleResponseV1,
97    GetPostBundleFeedbackResponseV1,
98    SubmitPostClaimResponseV1,
99    SubmitPostCommitResponseV1,
100    SubmitPostFeedbackResponseV1,
101    HealPostBundleClaimResponseV1,
102    HealPostBundleCommitResponseV1,
103    HealPostBundleFeedbackResponseV1,
104    CachePostBundleResponseV1,
105    CachePostBundleFeedbackResponseV1,
106    FetchUrlPreviewResponseV1,
107    TrendingHashtagsFetchResponseV1,
108    PeerStatsResponseV1,
109}
110
111impl PayloadResponseKind {
112    pub fn from_u16(value: u16) -> anyhow::Result<Self> {
113        Self::from_repr(value).ok_or_else(|| anyhow::anyhow!("unknown PayloadResponseKind: {}", value))
114    }
115}
116
117/// The universal error response body returned when a handler rejects a request.
118///
119/// Any RPC handler may return an `ErrorResponseV1` (packaged in a response with
120/// [`PayloadResponseKind::ErrorResponseV1`]) instead of its usual success shape. The `code`
121/// is a numeric discriminator to let callers branch on error class without string parsing;
122/// the `message` is a human-readable explanation for logs and diagnostics.
123#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
124pub struct ErrorResponseV1 {
125    pub code: u16,
126    pub message: String,
127}
128
129#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
130pub struct PingV1 {}
131
132#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
133pub struct PingResponseV1 {
134    pub peer: Peer,
135}
136
137#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
138pub struct BootstrapV1 {}
139
140#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
141pub struct BootstrapResponseV1 {
142    pub peers_random: Vec<Peer>,
143}
144
145#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
146pub struct AnnounceV1 {
147    pub peer_self: Peer,
148}
149
150#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
151pub struct AnnounceResponseV1 {
152    pub peer_self: Peer,
153    pub peers_nearest: Vec<Peer>,
154}
155
156#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
157pub struct GetPostBundleV1 {
158    pub bucket_location: BucketLocation,
159    pub peers_visited: Vec<Peer>,
160    pub already_retrieved_peer_ids: Vec<Id>,
161}
162
163#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
164pub struct GetPostBundleFeedbackV1 {
165    pub bucket_location: BucketLocation,
166    pub peers_visited: Vec<Peer>,
167    pub already_retrieved_peer_ids: Vec<Id>,
168}
169
170#[derive(Debug, PartialEq, Clone)]
171pub struct GetPostBundleResponseV1 {
172    pub peers_nearer: Vec<Peer>,
173    pub cache_request_token: Option<CacheRequestTokenV1>,
174    pub post_bundles_cached: Vec<Bytes>,  // Cached EncodedPostBundleV1 from intermediate servers
175    pub post_bundle: Option<Bytes>, // Primary EncodedPostBundleV1 from the responsible server
176}
177
178impl GetPostBundleResponseV1 {
179    pub fn to_bytes_gatherer(&self) -> anyhow::Result<BytesGatherer> {
180        let mut bytes_gatherer = BytesGatherer::default();
181
182        tools::write_length_prefixed_json::<Vec<Peer>>(&mut bytes_gatherer, &self.peers_nearer)?;
183
184        match &self.cache_request_token {
185            Some(token) => {
186                bytes_gatherer.put_u8(1);
187                tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
188            }
189            None => bytes_gatherer.put_u8(0),
190        }
191
192        bytes_gatherer.put_u16(self.post_bundles_cached.len() as u16);
193        for bundle in &self.post_bundles_cached {
194            bytes_gatherer.put_u32(bundle.len() as u32);
195            bytes_gatherer.put_bytes(bundle.clone());
196        }
197
198        match &self.post_bundle {
199            Some(post_bundle) => { bytes_gatherer.put_u8(1); bytes_gatherer.put_bytes(post_bundle.clone()); }
200            None => { bytes_gatherer.put_u8(0); }
201        }
202
203        Ok(bytes_gatherer)
204    }
205
206    pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
207        use bytes::Buf;
208
209        let peers_nearer = tools::read_length_prefixed_json::<Vec<Peer>>(&mut bytes)?;
210
211        if bytes.remaining() < 1 {
212            anyhow::bail!("Invalid buffer: missing cache_request_token presence flag");
213        }
214        let cache_request_token = match bytes.get_u8() {
215            0 => None,
216            1 => Some(tools::read_length_prefixed_json::<CacheRequestTokenV1>(&mut bytes)?),
217            _ => anyhow::bail!("Invalid buffer: unknown cache_request_token presence flag"),
218        };
219
220        anyhow_assert_ge!(bytes.remaining(), 2, "Missing post_bundles_cached count");
221        let cached_count = bytes.get_u16() as usize;
222        let mut post_bundles_cached = Vec::with_capacity(cached_count);
223        for _ in 0..cached_count {
224            anyhow_assert_ge!(bytes.remaining(), 4, "Missing post_bundles_cached entry length");
225            let len = bytes.get_u32() as usize;
226            anyhow_assert_ge!(bytes.remaining(), len, "Truncated post_bundles_cached entry");
227            post_bundles_cached.push(bytes.split_to(len));
228        }
229
230        if bytes.remaining() < 1 {
231            anyhow::bail!("Invalid buffer: missing post_bundle presence flag");
232        }
233        let post_bundle = match bytes.get_u8() {
234            0 => None,
235            1 => Some(bytes.copy_to_bytes(bytes.remaining())),
236            _ => anyhow::bail!("Invalid buffer: unknown post_bundle presence flag"),
237        };
238
239        Ok(Self { peers_nearer, cache_request_token, post_bundles_cached, post_bundle })
240    }
241}
242#[derive(Debug, PartialEq, Clone)]
243pub struct GetPostBundleFeedbackResponseV1 {
244    pub peers_nearer: Vec<Peer>,
245    pub cache_request_token: Option<CacheRequestTokenV1>,
246    pub post_bundle_feedbacks_cached: Vec<Bytes>, // Cached feedback from originators the client hasn't seen yet (filtered by already_retrieved_peer_ids).
247    pub encoded_post_bundle_feedback: Option<Bytes>, // Primary feedback from the responsible server
248}
249
250impl GetPostBundleFeedbackResponseV1 {
251    pub fn to_bytes_gatherer(&self) -> anyhow::Result<BytesGatherer> {
252        let mut bytes_gatherer = BytesGatherer::default();
253
254        tools::write_length_prefixed_json::<Vec<Peer>>(&mut bytes_gatherer, &self.peers_nearer)?;
255
256        match &self.cache_request_token {
257            Some(token) => {
258                bytes_gatherer.put_u8(1u8);
259                tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
260            }
261            None => bytes_gatherer.put_u8(0u8),
262        }
263
264        bytes_gatherer.put_u16(self.post_bundle_feedbacks_cached.len() as u16);
265        for feedback in &self.post_bundle_feedbacks_cached {
266            bytes_gatherer.put_u32(feedback.len() as u32);
267            bytes_gatherer.put_bytes(feedback.clone());
268        }
269
270        match &self.encoded_post_bundle_feedback {
271            Some(post_bundle_feedback) => {
272                bytes_gatherer.put_u8(1u8);
273                bytes_gatherer.put_bytes(post_bundle_feedback.clone());
274            }
275            None => bytes_gatherer.put_u8(0u8),
276        }
277
278        Ok(bytes_gatherer)
279    }
280
281    pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
282        use bytes::Buf;
283
284        let peers_nearer = tools::read_length_prefixed_json::<Vec<Peer>>(&mut bytes)?;
285
286        if bytes.remaining() < 1 {
287            anyhow::bail!("Invalid buffer: missing cache_request_token presence flag");
288        }
289        let cache_request_token = match bytes.get_u8() {
290            0 => None,
291            1 => Some(tools::read_length_prefixed_json::<CacheRequestTokenV1>(&mut bytes)?),
292            _ => anyhow::bail!("Invalid buffer: unknown cache_request_token presence flag"),
293        };
294
295        anyhow_assert_ge!(bytes.remaining(), 2, "Missing post_bundle_feedbacks_cached count");
296        let cached_count = bytes.get_u16() as usize;
297        let mut post_bundle_feedbacks_cached = Vec::with_capacity(cached_count);
298        for _ in 0..cached_count {
299            anyhow_assert_ge!(bytes.remaining(), 4, "Missing post_bundle_feedbacks_cached entry length");
300            let len = bytes.get_u32() as usize;
301            anyhow_assert_ge!(bytes.remaining(), len, "Truncated post_bundle_feedbacks_cached entry");
302            post_bundle_feedbacks_cached.push(bytes.split_to(len));
303        }
304
305        if bytes.remaining() < 1 {
306            anyhow::bail!("Invalid buffer: missing post_bundle_feedback presence flag");
307        }
308        let post_bundle_feedback = match bytes.get_u8() {
309            0 => None,
310            1 => Some(bytes.copy_to_bytes(bytes.remaining())),
311            _ => anyhow::bail!("Invalid buffer: unknown post_bundle_feedback presence flag"),
312        };
313
314        Ok(Self { peers_nearer, cache_request_token, post_bundle_feedbacks_cached, encoded_post_bundle_feedback: post_bundle_feedback })
315    }
316}
317
318#[derive(Debug, PartialEq, Clone)]
319pub struct SubmitPostClaimV1 {
320    pub bucket_location: BucketLocation,
321    pub referenced_post_header_bytes: Option<Bytes>, // For Sequel posts: the original post's bytes_without_body, used to verify same-author
322    pub referenced_hashtags: Vec<String>,             // Hashtag strings referenced by this post, for trending tracking
323    pub encoded_post_bytes: Bytes,                    // Note that it has just the header of the EncodedPost...
324}
325
326impl SubmitPostClaimV1 {
327    pub fn new_to_bytes(bucket_location: &BucketLocation, referenced_post_header_bytes: Option<&[u8]>, referenced_hashtags: &[String], encoded_post_bytes_without_body: &[u8]) -> anyhow::Result<Bytes> {
328        let mut bytes_gatherer = BytesGatherer::default();
329        tools::write_length_prefixed_json(&mut bytes_gatherer, bucket_location)?;
330        match referenced_post_header_bytes {
331            Some(header_bytes) => {
332                bytes_gatherer.put_u32(header_bytes.len() as u32);
333                bytes_gatherer.put_slice(header_bytes);
334            }
335            None => {
336                bytes_gatherer.put_u32(0);
337            }
338        }
339        tools::write_length_prefixed_json(&mut bytes_gatherer, &referenced_hashtags)?;
340        bytes_gatherer.put_slice(encoded_post_bytes_without_body);
341        Ok(bytes_gatherer.to_bytes())
342    }
343
344    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
345        use bytes::Buf;
346        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
347
348        anyhow::ensure!(bytes.remaining() >= 4, "SubmitPostClaimV1: missing referenced_post_header_bytes length");
349        let referenced_post_header_bytes_len = bytes.get_u32() as usize;
350        let referenced_post_header_bytes = if referenced_post_header_bytes_len > 0 {
351            anyhow::ensure!(bytes.remaining() >= referenced_post_header_bytes_len, "SubmitPostClaimV1: referenced_post_header_bytes length {} exceeds remaining {}", referenced_post_header_bytes_len, bytes.remaining());
352            Some(bytes.split_to(referenced_post_header_bytes_len))
353        } else {
354            None
355        };
356
357        let referenced_hashtags = tools::read_length_prefixed_json::<Vec<String>>(bytes)?;
358
359        anyhow::ensure!(bytes.has_remaining(), "SubmitPostClaimV1: missing encoded_post_bytes");
360        let encoded_post_bytes = bytes.split_to(bytes.len());
361
362        Ok(Self { bucket_location, referenced_post_header_bytes, referenced_hashtags, encoded_post_bytes })
363    }
364}
365
366#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
367pub struct SubmitPostClaimResponseV1 {
368    pub peers_nearer: Vec<Peer>,
369    pub submit_post_claim_token: Option<SubmitPostClaimTokenV1>,
370}
371
372#[derive(Debug, PartialEq, Clone)]
373pub struct SubmitPostCommitV1 {
374    pub bucket_location: BucketLocation,
375    pub submit_post_claim_token: SubmitPostClaimTokenV1,
376    pub encoded_post_bytes: Bytes,
377}
378
379impl SubmitPostCommitV1 {
380    pub fn new_to_bytes(bucket_location: &BucketLocation, submit_post_claim_token: &SubmitPostClaimTokenV1, encoded_post_bytes: &[u8]) -> anyhow::Result<Bytes> {
381        let mut bytes_gatherer = BytesGatherer::default();
382        tools::write_length_prefixed_json(&mut bytes_gatherer, bucket_location)?;
383        tools::write_length_prefixed_json(&mut bytes_gatherer, submit_post_claim_token)?;
384        bytes_gatherer.put_slice(encoded_post_bytes);
385        Ok(bytes_gatherer.to_bytes())
386    }
387
388    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
389        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
390        let submit_post_claim_token = tools::read_length_prefixed_json::<SubmitPostClaimTokenV1>(bytes)?;
391        let encoded_post_bytes = bytes.split_to(bytes.len());
392
393        Ok(Self {
394            bucket_location,
395            submit_post_claim_token,
396            encoded_post_bytes,
397        })
398    }
399}
400
401#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
402pub struct SubmitPostCommitResponseV1 {
403    pub submit_post_commit_token: SubmitPostCommitTokenV1,
404}
405
406#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
407pub struct SubmitPostClaimTokenV1 {
408    pub peer: Peer,
409    pub bucket_location: BucketLocation,
410    pub post_id: Id,
411    pub token_signature: Signature,
412}
413
414impl SubmitPostClaimTokenV1 {
415    fn get_hash_for_signing(peer: &Peer, bucket_location: &BucketLocation, post_id: &Id) -> Hash {
416        hashing::hash_multiple(&[peer.id.as_ref(), bucket_location.get_hash_for_signing().as_ref(), post_id.as_ref(), "SubmitPostClaimTokenV1".as_bytes()])
417    }
418
419    pub fn new(peer: Peer, bucket_location: BucketLocation, post_id: Id, signature_key: &SignatureKey) -> Self {
420        let token_signature = signing::sign(signature_key, Self::get_hash_for_signing(&peer, &bucket_location, &post_id).as_ref());
421        Self { peer, bucket_location, post_id, token_signature }
422    }
423
424    pub fn verify(&self) -> anyhow::Result<()> {
425        self.peer.verify()?;
426        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
427        signing::verify(&verification_key, &self.token_signature, Self::get_hash_for_signing(&self.peer, &self.bucket_location, &self.post_id).as_ref())
428    }
429}
430
431#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
432pub struct SubmitPostCommitTokenV1 {
433    pub peer: Peer,
434    pub bucket_location: BucketLocation,
435    pub post_id: Id,
436    pub token_signature: Signature,
437}
438
439impl SubmitPostCommitTokenV1 {
440    fn get_hash_for_signing(peer: &Peer, bucket_location: &BucketLocation, post_id: &Id) -> Hash {
441        hashing::hash_multiple(&[peer.id.as_ref(), bucket_location.get_hash_for_signing().as_ref(), post_id.as_ref(), "SubmitPostCommitTokenV1".as_bytes()])
442    }
443
444    pub fn new(peer: Peer, bucket_location: BucketLocation, post_id: Id, signature_key: &SignatureKey) -> Self {
445        let token_signature = signing::sign(signature_key, Self::get_hash_for_signing(&peer, &bucket_location, &post_id).as_ref());
446        Self { peer, bucket_location, post_id, token_signature }
447    }
448
449    pub fn verify(&self) -> anyhow::Result<()> {
450        self.peer.verify()?;
451        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
452        signing::verify(&verification_key, &self.token_signature, Self::get_hash_for_signing(&self.peer, &self.bucket_location, &self.post_id).as_ref())
453    }
454}
455
456
457#[derive(Debug, PartialEq, Clone)]
458pub struct SubmitPostFeedbackV1 {
459    pub bucket_location: BucketLocation,
460    pub encoded_post_feedback: EncodedPostFeedbackV1,
461}
462
463impl SubmitPostFeedbackV1 {
464    pub fn new_to_bytes(bucket_location: &BucketLocation, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<Bytes> {
465        let mut bytes_gatherer = BytesGatherer::default();
466        tools::write_length_prefixed_json(&mut bytes_gatherer, bucket_location)?;
467        let mut feedback_bytes = BytesMut::new();
468        encoded_post_feedback.append_encode_to_bytes(&mut feedback_bytes)?;
469        bytes_gatherer.put_bytes(feedback_bytes.freeze());
470        Ok(bytes_gatherer.to_bytes())
471    }
472
473    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
474        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
475        let encoded_post_feedback = EncodedPostFeedbackV1::decode_from_bytes(bytes)?;
476        Ok(Self { bucket_location, encoded_post_feedback })
477    }
478
479}
480
481#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
482pub struct SubmitPostFeedbackResponseV1 {
483    pub peers_nearer: Vec<Peer>,
484    pub accepted: bool,
485}
486
487
488
489// ---- Heal Post Bundle (two-phase: claim then commit) ----
490
491#[derive(Debug, PartialEq, Clone)]
492pub struct HealPostBundleClaimV1 {
493    pub bucket_location: BucketLocation,
494    pub donor_header: EncodedPostBundleHeaderV1,
495}
496
497impl HealPostBundleClaimV1 {
498    pub fn new_to_bytes(donor_header: &EncodedPostBundleHeaderV1, bucket_location: &BucketLocation) -> anyhow::Result<Bytes> {
499        let mut g = BytesGatherer::default();
500        tools::write_length_prefixed_json(&mut g, bucket_location)?;
501        tools::write_length_prefixed_json(&mut g, donor_header)?;
502        Ok(g.to_bytes())
503    }
504
505    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
506        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
507        let donor_header = tools::read_length_prefixed_json::<EncodedPostBundleHeaderV1>(bytes)?;
508        Ok(Self { donor_header, bucket_location })
509    }
510}
511
512#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
513pub struct HealPostBundleClaimResponseV1 {
514    pub needed_post_ids: Vec<Id>,
515    pub token: Option<HealPostBundleClaimTokenV1>,
516}
517
518#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
519pub struct HealPostBundleClaimTokenV1 {
520    pub peer: Peer,
521    pub bucket_location: BucketLocation,
522    pub donor_header_signature: Signature,
523    pub needed_post_ids: Vec<Id>,
524    pub token_signature: Signature,
525}
526
527impl HealPostBundleClaimTokenV1 {
528    fn get_hash_for_signing(peer_id: &Id, bucket_location: &BucketLocation, donor_header_signature: &Signature, needed_post_ids: &[Id]) -> Hash {
529        let bucket_location_hash = bucket_location.get_hash_for_signing();
530        let mut hash_input: Vec<&[u8]> = vec![
531            peer_id.as_ref(),
532            bucket_location_hash.as_ref(),
533            donor_header_signature.as_ref(),
534            "HealPostBundleClaimTokenV1".as_bytes(),
535        ];
536        for id in needed_post_ids {
537            hash_input.push(id.as_ref());
538        }
539        hashing::hash_multiple(&hash_input)
540    }
541
542    pub fn new(peer: Peer, bucket_location: BucketLocation, needed_post_ids: Vec<Id>, donor_header_signature: Signature, signature_key: &SignatureKey) -> Self {
543        let hash = Self::get_hash_for_signing(&peer.id, &bucket_location, &donor_header_signature, &needed_post_ids);
544        let token_signature = signing::sign(signature_key, hash.as_ref());
545        Self { peer, bucket_location, donor_header_signature, needed_post_ids, token_signature }
546    }
547
548    pub fn verify(&self) -> anyhow::Result<()> {
549        self.peer.verify()?;
550        self.bucket_location.validate()?;
551        let hash = Self::get_hash_for_signing(&self.peer.id, &self.bucket_location, &self.donor_header_signature, &self.needed_post_ids);
552        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
553        signing::verify(&verification_key, &self.token_signature, hash.as_ref())
554    }
555}
556
557#[derive(Debug, PartialEq, Clone)]
558pub struct HealPostBundleCommitV1 {
559    pub token: HealPostBundleClaimTokenV1,
560    pub donor_header: EncodedPostBundleHeaderV1,
561    pub encoded_posts_bytes: Bytes,
562}
563
564impl HealPostBundleCommitV1 {
565    pub fn new_to_bytes(token: &HealPostBundleClaimTokenV1, donor_header: &EncodedPostBundleHeaderV1, encoded_posts_bytes: &[u8]) -> anyhow::Result<Bytes> {
566        let mut bytes_gatherer = BytesGatherer::default();
567        tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
568        tools::write_length_prefixed_json(&mut bytes_gatherer, donor_header)?;
569        bytes_gatherer.put_slice(encoded_posts_bytes);
570        Ok(bytes_gatherer.to_bytes())
571    }
572
573    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
574        let token = tools::read_length_prefixed_json::<HealPostBundleClaimTokenV1>(bytes)?;
575        let donor_header = tools::read_length_prefixed_json::<EncodedPostBundleHeaderV1>(bytes)?;
576        let encoded_posts_bytes = bytes.split_to(bytes.len());
577        Ok(Self { token, donor_header, encoded_posts_bytes })
578    }
579}
580
581#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
582pub struct HealPostBundleCommitResponseV1 {
583    pub accepted: bool,
584}
585
586// ---- Heal Post Bundle Feedback (single-phase) ----
587
588#[derive(Debug, PartialEq, Clone)]
589pub struct HealPostBundleFeedbackV1 {
590    pub location_id: Id,
591    pub encoded_post_feedbacks: Vec<EncodedPostFeedbackV1>,
592}
593
594impl HealPostBundleFeedbackV1 {
595    pub fn new_to_bytes(location_id: &Id, feedbacks: &[EncodedPostFeedbackV1]) -> anyhow::Result<Bytes> {
596        let mut bytes = BytesMut::new();
597        bytes.put_slice(location_id.as_ref());
598        for f in feedbacks {
599            f.append_encode_to_bytes(&mut bytes)?;
600        }
601        Ok(bytes.freeze())
602    }
603
604    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
605        use bytes::Buf;
606        anyhow_assert_ge!(bytes.remaining(), ID_BYTES, "Missing location_id in HealPostBundleFeedbackV1");
607        let location_id = Id::from_slice(&bytes.split_to(ID_BYTES))?;
608        let mut encoded_post_feedbacks = Vec::new();
609        while bytes.has_remaining() {
610            encoded_post_feedbacks.push(EncodedPostFeedbackV1::decode_from_bytes(&mut *bytes)?);
611        }
612        Ok(Self { location_id, encoded_post_feedbacks })
613    }
614}
615
616#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
617pub struct HealPostBundleFeedbackResponseV1 {
618    pub accepted_count: u32,
619}
620
621
622/// Issued by an intermediate server during a GetPostBundle/GetPostBundleFeedback miss or stale hit.
623/// The client uploads the bundle to this server after completing its walk.
624#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
625pub struct CacheRequestTokenV1 {
626    pub peer: Peer,
627    pub bucket_location: BucketLocation,
628    pub expires_at: TimeMillis,
629    /// Originator peer IDs already held in this server's cache for this location_id.
630    /// The client should only upload a bundle whose originator peer.id is NOT in this list.
631    pub already_cached_peer_ids: Vec<Id>,
632    pub token_signature: Signature,
633}
634
635impl CacheRequestTokenV1 {
636    fn get_hash_for_signing(peer_id: &Id, bucket_location: &BucketLocation, expires_at: &TimeMillis, already_cached_peer_ids: &[Id]) -> Hash {
637        let expires_at_be = expires_at.encode_be();
638        let bucket_location_hash = bucket_location.get_hash_for_signing();
639        let mut inputs: Vec<&[u8]> = vec![peer_id.as_ref(), bucket_location_hash.as_ref(), expires_at_be.as_ref(), "CacheRequestToken".as_bytes()];
640        for id in already_cached_peer_ids {
641            inputs.push(id.as_ref());
642        }
643        hashing::hash_multiple(&inputs)
644    }
645
646    pub fn new(peer: Peer, bucket_location: BucketLocation, expires_at: TimeMillis, already_cached_peer_ids: Vec<Id>, signature_key: &SignatureKey) -> Self {
647        let token_signature = signing::sign(signature_key, Self::get_hash_for_signing(&peer.id, &bucket_location, &expires_at, &already_cached_peer_ids).as_ref());
648        Self { peer, bucket_location, expires_at, already_cached_peer_ids, token_signature }
649    }
650
651    pub fn verify(&self) -> anyhow::Result<()> {
652        self.peer.verify()?;
653        self.bucket_location.validate()?;
654        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
655        signing::verify(&verification_key, &self.token_signature, Self::get_hash_for_signing(&self.peer.id, &self.bucket_location, &self.expires_at, &self.already_cached_peer_ids).as_ref())
656    }
657
658    pub fn is_expired(&self, current_time: TimeMillis) -> bool {
659        current_time > self.expires_at
660    }
661}
662
663#[derive(Debug, PartialEq, Clone)]
664pub struct CachePostBundleV1 {
665    pub token: CacheRequestTokenV1,
666    /// One entry per originator — the client uploads all bundles it collected during its walk.
667    pub encoded_post_bundles: Vec<Bytes>,
668}
669
670impl CachePostBundleV1 {
671    pub fn new_to_bytes(token: &CacheRequestTokenV1, encoded_post_bundles: &[&[u8]]) -> anyhow::Result<Bytes> {
672        let mut bytes_gatherer = BytesGatherer::default();
673        tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
674        bytes_gatherer.put_u16(encoded_post_bundles.len() as u16);
675        for bundle in encoded_post_bundles {
676            bytes_gatherer.put_u32(bundle.len() as u32);
677            bytes_gatherer.put_slice(bundle);
678        }
679        Ok(bytes_gatherer.to_bytes())
680    }
681
682    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
683        let token = tools::read_length_prefixed_json::<CacheRequestTokenV1>(bytes)?;
684        anyhow_assert_ge!(bytes.remaining(), 2, "Missing encoded_post_bundles count");
685        let count = bytes.get_u16() as usize;
686        let mut encoded_post_bundles = Vec::with_capacity(count);
687        for _ in 0..count {
688            anyhow_assert_ge!(bytes.remaining(), 4, "Missing encoded_post_bundle entry length");
689            let len = bytes.get_u32() as usize;
690            anyhow_assert_ge!(bytes.remaining(), len, "Truncated encoded_post_bundle entry");
691            encoded_post_bundles.push(bytes.split_to(len));
692        }
693        Ok(Self { token, encoded_post_bundles })
694    }
695}
696
697#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
698pub struct CachePostBundleResponseV1 {
699    pub accepted: bool,
700}
701
702#[derive(Debug, PartialEq, Clone)]
703pub struct CachePostBundleFeedbackV1 {
704    pub token: CacheRequestTokenV1,
705    pub encoded_post_bundle_feedback_bytes: Bytes,
706}
707
708impl CachePostBundleFeedbackV1 {
709    pub fn new_to_bytes(token: &CacheRequestTokenV1, encoded_post_bundle_feedback_bytes: &[u8]) -> anyhow::Result<Bytes> {
710        let mut bytes_gatherer = BytesGatherer::default();
711        tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
712        bytes_gatherer.put_slice(encoded_post_bundle_feedback_bytes);
713        Ok(bytes_gatherer.to_bytes())
714    }
715
716    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
717        let token = tools::read_length_prefixed_json::<CacheRequestTokenV1>(bytes)?;
718        let encoded_post_bundle_feedback_bytes = bytes.split_to(bytes.len());
719        Ok(Self { token, encoded_post_bundle_feedback_bytes })
720    }
721}
722
723#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
724pub struct CachePostBundleFeedbackResponseV1 {
725    pub accepted: bool,
726}
727
728/// Client → Server: fetch Open Graph metadata for `url`
729#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
730pub struct FetchUrlPreviewV1 {
731    pub url: String,
732}
733
734impl FetchUrlPreviewV1 {
735    pub fn new_to_bytes(url: &str) -> anyhow::Result<Bytes> {
736        let mut bytes_gatherer = BytesGatherer::default();
737        tools::write_length_prefixed_json(&mut bytes_gatherer, &FetchUrlPreviewV1 { url: url.to_string() })?;
738        Ok(bytes_gatherer.to_bytes())
739    }
740
741    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
742        tools::read_length_prefixed_json(bytes)
743    }
744}
745
746/// Server → Client: extracted preview metadata
747#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
748pub struct FetchUrlPreviewResponseV1 {
749    pub url: String,
750    pub title: String,
751    pub description: String,
752    pub image_url: String,
753}
754
755impl FetchUrlPreviewResponseV1 {
756    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
757        json::struct_to_bytes(self)
758    }
759
760    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
761        json::bytes_to_struct(bytes)
762    }
763}
764
765/// Client → Server: fetch trending hashtags
766#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
767pub struct TrendingHashtagsFetchV1 {
768    pub limit: u16,
769}
770
771impl TrendingHashtagsFetchV1 {
772    pub fn new_to_bytes(limit: u16) -> anyhow::Result<Bytes> {
773        let mut bytes_gatherer = BytesGatherer::default();
774        tools::write_length_prefixed_json(&mut bytes_gatherer, &TrendingHashtagsFetchV1 { limit })?;
775        Ok(bytes_gatherer.to_bytes())
776    }
777
778    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
779        tools::read_length_prefixed_json(bytes)
780    }
781}
782
783#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
784pub struct TrendingHashtagV1 {
785    pub hashtag: String,
786    pub count: u64,
787}
788
789/// Server → Client: trending hashtags with unique author counts
790#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
791pub struct TrendingHashtagsFetchResponseV1 {
792    pub trending_hashtags: Vec<TrendingHashtagV1>,
793}
794
795impl TrendingHashtagsFetchResponseV1 {
796    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
797        json::struct_to_bytes(self)
798    }
799
800    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
801        json::bytes_to_struct(bytes)
802    }
803}
804
805/// Client → Server: opt-in peer-introspection request.
806///
807/// Empty payload. The corresponding response carries a signed, compressed JSON
808/// document; per-request PoW is gated by `POW_MINIMUM_PER_PEER_STATS` and the
809/// server caches the response for a short window to absorb bursts of well-PoW'd
810/// callers without re-measuring every time.
811#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
812pub struct PeerStatsRequestV1 {}
813
814impl PeerStatsRequestV1 {
815    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
816        json::struct_to_bytes(self)
817    }
818
819    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
820        json::bytes_to_struct(bytes)
821    }
822}
823
824/// Server → Client: signed peer-introspection response.
825///
826/// `json_compressed` is an lz4-compressed JSON object (the stats document).
827/// `signature` covers `timestamp.encode_be() || json_compressed` using the
828/// server's Ed25519 signing key — clients that re-share the response amongst
829/// themselves verify against the bytes as transmitted, so no canonical-JSON
830/// dance is required.
831#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
832pub struct PeerStatsResponseV1 {
833    pub peer: Peer,
834    pub timestamp: TimeMillis,
835    pub json_compressed: Bytes,
836    pub signature: Signature,
837}
838
839impl PeerStatsResponseV1 {
840    /// The exact byte sequence that `signature` covers.
841    pub fn signing_input(timestamp: TimeMillis, json_compressed: &[u8]) -> Vec<u8> {
842        let mut buf = Vec::with_capacity(crate::tools::time::TIME_MILLIS_BYTES + json_compressed.len());
843        buf.extend_from_slice(timestamp.encode_be().as_ref());
844        buf.extend_from_slice(json_compressed);
845        buf
846    }
847
848    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
849        json::struct_to_bytes(self)
850    }
851
852    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
853        json::bytes_to_struct(bytes)
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use crate::protocol::payload::payload::{GetPostBundleResponseV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1, SubmitPostClaimTokenV1, SubmitPostCommitTokenV1};
860    use crate::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
861    use crate::tools::server_id::ServerId;
862    use crate::tools::buckets::{BucketLocation, BucketType};
863    use crate::tools::time::{TimeMillis, MILLIS_IN_HOUR};
864    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
865    use crate::tools::{config, json, tools};
866    use crate::tools::types::{Id, Pow, Signature, ID_BYTES};
867    use std::collections::HashSet;
868    use crate::tools::parallel_pow_generator::StubParallelPowGenerator;
869    use bytes::Bytes;
870
871    #[tokio::test]
872    #[allow(non_snake_case)]
873    async fn test_to_from_GetPostBundleResponseV1() -> anyhow::Result<()> {
874        let time_provider = RealTimeProvider::default();
875        let pow_generator = StubParallelPowGenerator::new();
876        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
877        let peer = server_id.to_peer(&time_provider)?;
878
879        let mut encoded_posts_ids = Vec::new();
880        let mut encoded_posts_lengths = Vec::new();
881        let mut encoded_posts_bytes = Vec::new();
882
883        for i in 0..16 {
884            let len = (i + 1) * 128;
885            let mut encoded_post_bytes = tools::random_bytes(len);
886            let post_id = Id::from_slice(&encoded_post_bytes[0..ID_BYTES])?;
887            encoded_posts_ids.push(post_id);
888            encoded_posts_lengths.push(encoded_post_bytes.len());
889            encoded_posts_bytes.append(&mut encoded_post_bytes);
890        }
891
892        let header = EncodedPostBundleHeaderV1 {
893            time_millis: time_provider.current_time_millis(),
894            location_id: Id::random(),
895            overflowed: false,
896            sealed: false,
897            num_posts: 0,
898            encoded_post_ids: encoded_posts_ids,
899            encoded_post_lengths: encoded_posts_lengths,
900            encoded_post_healed: HashSet::new(),
901            peer,
902            signature: Signature::zero(),
903        };
904
905        let peers_nearer = {
906            vec![
907                ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?.to_peer(&time_provider)?,
908                ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?.to_peer(&time_provider)?,
909                ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?.to_peer(&time_provider)?,
910            ]
911        };
912
913        let response = GetPostBundleResponseV1 {
914            peers_nearer,
915            cache_request_token: None,
916            post_bundles_cached: vec![],
917            post_bundle: Some((EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::from(encoded_posts_bytes) }).to_bytes()?),
918        };
919
920        let encoded = response.to_bytes_gatherer()?.to_bytes();
921        let response_decoded = GetPostBundleResponseV1::from_bytes(encoded)?;
922
923        assert_eq!(response_decoded, response);
924
925        Ok(())
926    }
927
928    async fn make_signed_header(time_provider: &RealTimeProvider) -> anyhow::Result<(EncodedPostBundleHeaderV1, ServerId)> {
929        let pow_generator = StubParallelPowGenerator::new();
930        let server_id = ServerId::new("own_pow", time_provider, config::SERVER_KEY_POW_MIN, true, &pow_generator).await?;
931        let peer = server_id.to_peer(time_provider)?;
932        let num_posts: u8 = 4;
933        let mut header = EncodedPostBundleHeaderV1 {
934            time_millis: time_provider.current_time_millis(),
935            location_id: Id::random(),
936            overflowed: false,
937            sealed: false,
938            num_posts,
939            encoded_post_ids: (0..num_posts).map(|_| Id::random()).collect(),
940            encoded_post_lengths: (0..num_posts).map(|i| (i as usize + 1) * 64).collect(),
941            encoded_post_healed: HashSet::new(),
942            peer,
943            signature: Signature::zero(),
944        };
945        header.signature_generate(&server_id.keys.signature_key)?;
946        Ok((header, server_id))
947    }
948
949    fn make_bucket_location() -> anyhow::Result<BucketLocation> {
950        BucketLocation::new(BucketType::User, Id::random(), MILLIS_IN_HOUR, TimeMillis(1_700_000_000_000))
951    }
952
953    #[tokio::test]
954    #[allow(non_snake_case)]
955    async fn test_SubmitPostClaimTokenV1_verify() -> anyhow::Result<()> {
956        let time_provider = RealTimeProvider::default();
957        let pow_generator = StubParallelPowGenerator::new();
958        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
959        let peer = server_id.to_peer(&time_provider)?;
960
961        let token = SubmitPostClaimTokenV1::new(peer, make_bucket_location()?, Id::random(), &server_id.keys.signature_key);
962        token.verify()?;
963        Ok(())
964    }
965
966    #[tokio::test]
967    #[allow(non_snake_case)]
968    async fn test_SubmitPostCommitTokenV1_verify() -> anyhow::Result<()> {
969        let time_provider = RealTimeProvider::default();
970        let pow_generator = StubParallelPowGenerator::new();
971        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
972        let peer = server_id.to_peer(&time_provider)?;
973
974        let token = SubmitPostCommitTokenV1::new(peer, make_bucket_location()?, Id::random(), &server_id.keys.signature_key);
975        token.verify()?;
976        Ok(())
977    }
978
979    #[tokio::test]
980    #[allow(non_snake_case)]
981    async fn test_to_from_HealPostBundleClaimV1() -> anyhow::Result<()> {
982        let time_provider = RealTimeProvider::default();
983        let (header, _) = make_signed_header(&time_provider).await?;
984
985        let bucket_location = make_bucket_location()?;
986        let encoded = HealPostBundleClaimV1::new_to_bytes(&header, &bucket_location)?;
987        let mut bytes = encoded;
988        let decoded = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
989
990        assert_eq!(decoded.donor_header, header);
991        assert_eq!(decoded.bucket_location, bucket_location);
992        Ok(())
993    }
994
995    #[tokio::test]
996    #[allow(non_snake_case)]
997    async fn test_to_from_HealPostBundleClaimResponseV1() -> anyhow::Result<()> {
998        let time_provider = RealTimeProvider::default();
999        let (header, server_id) = make_signed_header(&time_provider).await?;
1000        let peer = server_id.to_peer(&time_provider)?;
1001
1002        let needed_post_ids = header.encoded_post_ids[0..2].to_vec();
1003        let token = HealPostBundleClaimTokenV1::new(
1004            peer,
1005            make_bucket_location()?,
1006            needed_post_ids.clone(),
1007            header.signature,
1008            &server_id.keys.signature_key,
1009        );
1010
1011        let response = HealPostBundleClaimResponseV1 { needed_post_ids, token: Some(token) };
1012
1013        let encoded = json::struct_to_bytes(&response)?;
1014        let decoded = json::bytes_to_struct::<HealPostBundleClaimResponseV1>(&encoded)?;
1015
1016        assert_eq!(decoded, response);
1017        Ok(())
1018    }
1019
1020    #[tokio::test]
1021    #[allow(non_snake_case)]
1022    async fn test_HealPostBundleClaimTokenV1_verify() -> anyhow::Result<()> {
1023        let time_provider = RealTimeProvider::default();
1024        let (header, server_id) = make_signed_header(&time_provider).await?;
1025        let peer = server_id.to_peer(&time_provider)?;
1026
1027        let token = HealPostBundleClaimTokenV1::new(
1028            peer,
1029            make_bucket_location()?,
1030            header.encoded_post_ids.clone(),
1031            header.signature,
1032            &server_id.keys.signature_key,
1033        );
1034
1035        token.verify()?;
1036        Ok(())
1037    }
1038
1039    #[tokio::test]
1040    #[allow(non_snake_case)]
1041    async fn test_to_from_HealPostBundleCommitV1() -> anyhow::Result<()> {
1042        let time_provider = RealTimeProvider::default();
1043        let (header, server_id) = make_signed_header(&time_provider).await?;
1044        let peer = server_id.to_peer(&time_provider)?;
1045
1046        let token = HealPostBundleClaimTokenV1::new(
1047            peer,
1048            make_bucket_location()?,
1049            header.encoded_post_ids.clone(),
1050            header.signature,
1051            &server_id.keys.signature_key,
1052        );
1053
1054        let post_bytes = tools::random_bytes(512);
1055        let encoded = HealPostBundleCommitV1::new_to_bytes(&token, &header, &post_bytes)?;
1056        let mut bytes = encoded;
1057        let decoded = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
1058
1059        assert_eq!(decoded.token, token);
1060        assert_eq!(decoded.donor_header, header);
1061        assert_eq!(decoded.encoded_posts_bytes.as_ref(), post_bytes.as_slice());
1062        Ok(())
1063    }
1064
1065    #[tokio::test]
1066    #[allow(non_snake_case)]
1067    async fn test_to_from_HealPostBundleCommitResponseV1() -> anyhow::Result<()> {
1068        for accepted in [true, false] {
1069            let response = HealPostBundleCommitResponseV1 { accepted };
1070            let encoded = json::struct_to_bytes(&response)?;
1071            let decoded = json::bytes_to_struct::<HealPostBundleCommitResponseV1>(&encoded)?;
1072            assert_eq!(decoded, response);
1073        }
1074        Ok(())
1075    }
1076
1077    #[tokio::test]
1078    #[allow(non_snake_case)]
1079    async fn test_CacheRequestTokenV1_verify() -> anyhow::Result<()> {
1080        use crate::protocol::payload::payload::CacheRequestTokenV1;
1081        use crate::tools::time::MILLIS_IN_MINUTE;
1082
1083        let time_provider = RealTimeProvider::default();
1084        let pow_generator = StubParallelPowGenerator::new();
1085        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1086        let peer = server_id.to_peer(&time_provider)?;
1087        let bucket_location = make_bucket_location()?;
1088        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1089        let already_cached_peer_ids = vec![Id::random(), Id::random()];
1090
1091        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, already_cached_peer_ids, &server_id.keys.signature_key);
1092        token.verify()?;
1093        Ok(())
1094    }
1095
1096    #[tokio::test]
1097    #[allow(non_snake_case)]
1098    async fn test_to_from_CachePostBundleV1() -> anyhow::Result<()> {
1099        use crate::protocol::payload::payload::{CachePostBundleV1, CacheRequestTokenV1};
1100        use crate::tools::time::MILLIS_IN_MINUTE;
1101
1102        let time_provider = RealTimeProvider::default();
1103        let pow_generator = StubParallelPowGenerator::new();
1104        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1105        let peer = server_id.to_peer(&time_provider)?;
1106        let bucket_location = make_bucket_location()?;
1107        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1108        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, vec![], &server_id.keys.signature_key);
1109
1110        // Multiple bundles — one per originator
1111        let bundle_a = tools::random_bytes(512);
1112        let bundle_b = tools::random_bytes(256);
1113        let bundle_c = tools::random_bytes(1024);
1114        let bundles: &[&[u8]] = &[&bundle_a, &bundle_b, &bundle_c];
1115
1116        let encoded = CachePostBundleV1::new_to_bytes(&token, bundles)?;
1117        let mut bytes = encoded;
1118        let decoded = CachePostBundleV1::from_bytes(&mut bytes)?;
1119
1120        assert_eq!(decoded.token, token);
1121        assert_eq!(decoded.encoded_post_bundles.len(), 3);
1122        assert_eq!(decoded.encoded_post_bundles[0].as_ref(), bundle_a.as_slice());
1123        assert_eq!(decoded.encoded_post_bundles[1].as_ref(), bundle_b.as_slice());
1124        assert_eq!(decoded.encoded_post_bundles[2].as_ref(), bundle_c.as_slice());
1125        Ok(())
1126    }
1127
1128    #[tokio::test]
1129    #[allow(non_snake_case)]
1130    async fn test_to_from_CachePostBundleV1_empty() -> anyhow::Result<()> {
1131        use crate::protocol::payload::payload::{CachePostBundleV1, CacheRequestTokenV1};
1132        use crate::tools::time::MILLIS_IN_MINUTE;
1133
1134        let time_provider = RealTimeProvider::default();
1135        let pow_generator = StubParallelPowGenerator::new();
1136        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1137        let peer = server_id.to_peer(&time_provider)?;
1138        let bucket_location = make_bucket_location()?;
1139        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1140        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, vec![], &server_id.keys.signature_key);
1141
1142        let encoded = CachePostBundleV1::new_to_bytes(&token, &[])?;
1143        let mut bytes = encoded;
1144        let decoded = CachePostBundleV1::from_bytes(&mut bytes)?;
1145
1146        assert_eq!(decoded.token, token);
1147        assert!(decoded.encoded_post_bundles.is_empty());
1148        Ok(())
1149    }
1150
1151    #[tokio::test]
1152    #[allow(non_snake_case)]
1153    async fn test_to_from_CachePostBundleFeedbackV1() -> anyhow::Result<()> {
1154        use crate::protocol::payload::payload::{CachePostBundleFeedbackV1, CacheRequestTokenV1};
1155        use crate::tools::time::MILLIS_IN_MINUTE;
1156
1157        let time_provider = RealTimeProvider::default();
1158        let pow_generator = StubParallelPowGenerator::new();
1159        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1160        let peer = server_id.to_peer(&time_provider)?;
1161        let bucket_location = make_bucket_location()?;
1162        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1163        let already_cached_peer_ids = vec![Id::random()];
1164        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, already_cached_peer_ids, &server_id.keys.signature_key);
1165
1166        let feedback_bytes = tools::random_bytes(256);
1167        let encoded = CachePostBundleFeedbackV1::new_to_bytes(&token, &feedback_bytes)?;
1168        let mut bytes = encoded;
1169        let decoded = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1170
1171        assert_eq!(decoded.token, token);
1172        assert_eq!(decoded.encoded_post_bundle_feedback_bytes.as_ref(), feedback_bytes.as_slice());
1173        Ok(())
1174    }
1175
1176    #[tokio::test]
1177    #[allow(non_snake_case)]
1178    async fn test_to_from_FetchUrlPreviewV1() -> anyhow::Result<()> {
1179        use crate::protocol::payload::payload::FetchUrlPreviewV1;
1180
1181        let url = "https://example.com/article?q=1#section";
1182        let encoded = FetchUrlPreviewV1::new_to_bytes(url)?;
1183        let mut bytes = encoded;
1184        let decoded = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1185
1186        assert_eq!(decoded.url, url);
1187        Ok(())
1188    }
1189
1190    #[tokio::test]
1191    #[allow(non_snake_case)]
1192    async fn test_to_from_FetchUrlPreviewResponseV1() -> anyhow::Result<()> {
1193        use crate::protocol::payload::payload::FetchUrlPreviewResponseV1;
1194
1195        let response = FetchUrlPreviewResponseV1 {
1196            url: "https://example.com/canonical".to_string(),
1197            title: "Example Title".to_string(),
1198            description: "A short description.".to_string(),
1199            image_url: "https://example.com/image.png".to_string(),
1200        };
1201
1202        let encoded = response.to_bytes()?;
1203        let decoded = FetchUrlPreviewResponseV1::from_bytes(&encoded)?;
1204
1205        assert_eq!(decoded, response);
1206        Ok(())
1207    }
1208
1209    // ── Robustness tests: CachePostBundleV1::from_bytes ──
1210
1211    #[test]
1212    #[allow(non_snake_case)]
1213    fn test_CachePostBundleV1_from_bytes_empty_input() {
1214        use crate::protocol::payload::payload::CachePostBundleV1;
1215        let mut bytes = Bytes::new();
1216        assert!(CachePostBundleV1::from_bytes(&mut bytes).is_err());
1217    }
1218
1219    #[test]
1220    #[allow(non_snake_case)]
1221    fn test_CachePostBundleV1_from_bytes_garbage_input() {
1222        use crate::protocol::payload::payload::CachePostBundleV1;
1223        let mut bytes = Bytes::from_static(&[0xff; 64]);
1224        assert!(CachePostBundleV1::from_bytes(&mut bytes).is_err());
1225    }
1226
1227    #[test]
1228    #[allow(non_snake_case)]
1229    fn test_CachePostBundleV1_from_bytes_truncated_count() {
1230        use crate::protocol::payload::payload::CachePostBundleV1;
1231        // Valid length-prefixed JSON for an empty-ish struct won't work here since CacheRequestTokenV1
1232        // has required fields. But a single byte after a valid-looking length prefix will fail at JSON parse.
1233        // The simplest test: just one byte (not enough for the length prefix u64).
1234        let mut bytes = Bytes::from_static(&[0x01]);
1235        assert!(CachePostBundleV1::from_bytes(&mut bytes).is_err());
1236    }
1237
1238    // ── Robustness tests: GetPostBundleResponseV1::from_bytes ──
1239
1240    #[test]
1241    #[allow(non_snake_case)]
1242    fn test_GetPostBundleResponseV1_from_bytes_empty_input() {
1243        let bytes = Bytes::new();
1244        assert!(GetPostBundleResponseV1::from_bytes(bytes).is_err());
1245    }
1246
1247    #[test]
1248    #[allow(non_snake_case)]
1249    fn test_GetPostBundleResponseV1_from_bytes_garbage_input() {
1250        let bytes = Bytes::from_static(&[0xff; 128]);
1251        assert!(GetPostBundleResponseV1::from_bytes(bytes).is_err());
1252    }
1253
1254    // ── Robustness tests: HealPostBundleClaimV1::from_bytes ──
1255
1256    #[test]
1257    #[allow(non_snake_case)]
1258    fn test_HealPostBundleClaimV1_from_bytes_empty_input() {
1259        let mut bytes = Bytes::new();
1260        assert!(HealPostBundleClaimV1::from_bytes(&mut bytes).is_err());
1261    }
1262
1263    #[test]
1264    #[allow(non_snake_case)]
1265    fn test_HealPostBundleClaimV1_from_bytes_garbage_input() {
1266        let mut bytes = Bytes::from_static(&[0xff; 64]);
1267        assert!(HealPostBundleClaimV1::from_bytes(&mut bytes).is_err());
1268    }
1269
1270    // ── Robustness tests: HealPostBundleCommitV1::from_bytes ──
1271
1272    #[test]
1273    #[allow(non_snake_case)]
1274    fn test_HealPostBundleCommitV1_from_bytes_empty_input() {
1275        let mut bytes = Bytes::new();
1276        assert!(HealPostBundleCommitV1::from_bytes(&mut bytes).is_err());
1277    }
1278
1279    #[test]
1280    #[allow(non_snake_case)]
1281    fn test_HealPostBundleCommitV1_from_bytes_garbage_input() {
1282        let mut bytes = Bytes::from_static(&[0xff; 64]);
1283        assert!(HealPostBundleCommitV1::from_bytes(&mut bytes).is_err());
1284    }
1285
1286    // ── Robustness tests: FetchUrlPreviewV1::from_bytes ──
1287
1288    #[test]
1289    #[allow(non_snake_case)]
1290    fn test_FetchUrlPreviewV1_from_bytes_empty_input() {
1291        use crate::protocol::payload::payload::FetchUrlPreviewV1;
1292        let mut bytes = Bytes::new();
1293        assert!(FetchUrlPreviewV1::from_bytes(&mut bytes).is_err());
1294    }
1295
1296    // ── Robustness tests: FetchUrlPreviewResponseV1::from_bytes ──
1297
1298    #[test]
1299    #[allow(non_snake_case)]
1300    fn test_FetchUrlPreviewResponseV1_from_bytes_empty_input() {
1301        use crate::protocol::payload::payload::FetchUrlPreviewResponseV1;
1302        let bytes = Bytes::new();
1303        assert!(FetchUrlPreviewResponseV1::from_bytes(&bytes).is_err());
1304    }
1305
1306    #[test]
1307    #[allow(non_snake_case)]
1308    fn test_FetchUrlPreviewResponseV1_from_bytes_garbage_input() {
1309        use crate::protocol::payload::payload::FetchUrlPreviewResponseV1;
1310        let bytes = Bytes::from_static(&[0xff; 64]);
1311        assert!(FetchUrlPreviewResponseV1::from_bytes(&bytes).is_err());
1312    }
1313
1314    // ── Robustness tests: SubmitPostClaimV1::from_bytes ──
1315
1316    #[test]
1317    #[allow(non_snake_case)]
1318    fn test_SubmitPostClaimV1_from_bytes_empty_input() {
1319        use crate::protocol::payload::payload::SubmitPostClaimV1;
1320        let mut bytes = Bytes::new();
1321        assert!(SubmitPostClaimV1::from_bytes(&mut bytes).is_err());
1322    }
1323
1324    #[test]
1325    #[allow(non_snake_case)]
1326    fn test_SubmitPostClaimV1_from_bytes_garbage_input() {
1327        use crate::protocol::payload::payload::SubmitPostClaimV1;
1328        let mut bytes = Bytes::from_static(&[0xff; 64]);
1329        assert!(SubmitPostClaimV1::from_bytes(&mut bytes).is_err());
1330    }
1331
1332    // ── Robustness tests: SubmitPostCommitV1::from_bytes ──
1333
1334    #[test]
1335    #[allow(non_snake_case)]
1336    fn test_SubmitPostCommitV1_from_bytes_empty_input() {
1337        use crate::protocol::payload::payload::SubmitPostCommitV1;
1338        let mut bytes = Bytes::new();
1339        assert!(SubmitPostCommitV1::from_bytes(&mut bytes).is_err());
1340    }
1341
1342    // ── Robustness tests: SubmitPostFeedbackV1::from_bytes ──
1343
1344    #[test]
1345    #[allow(non_snake_case)]
1346    fn test_SubmitPostFeedbackV1_from_bytes_empty_input() {
1347        use crate::protocol::payload::payload::SubmitPostFeedbackV1;
1348        let mut bytes = Bytes::new();
1349        assert!(SubmitPostFeedbackV1::from_bytes(&mut bytes).is_err());
1350    }
1351
1352    // ── Robustness tests: GetPostBundleFeedbackResponseV1::from_bytes ──
1353
1354    #[test]
1355    #[allow(non_snake_case)]
1356    fn test_GetPostBundleFeedbackResponseV1_from_bytes_empty_input() {
1357        use crate::protocol::payload::payload::GetPostBundleFeedbackResponseV1;
1358        let bytes = Bytes::new();
1359        assert!(GetPostBundleFeedbackResponseV1::from_bytes(bytes).is_err());
1360    }
1361
1362    // ── Robustness tests: CachePostBundleFeedbackV1::from_bytes ──
1363
1364    #[test]
1365    #[allow(non_snake_case)]
1366    fn test_CachePostBundleFeedbackV1_from_bytes_empty_input() {
1367        use crate::protocol::payload::payload::CachePostBundleFeedbackV1;
1368        let mut bytes = Bytes::new();
1369        assert!(CachePostBundleFeedbackV1::from_bytes(&mut bytes).is_err());
1370    }
1371
1372    #[cfg(not(target_arch = "wasm32"))]
1373    mod bolero_fuzz {
1374        use bytes::Bytes;
1375
1376        #[test]
1377        #[allow(non_snake_case)]
1378        fn fuzz_CachePostBundleV1_from_bytes() {
1379            use crate::protocol::payload::payload::CachePostBundleV1;
1380            bolero::check!().for_each(|data: &[u8]| {
1381                let mut bytes = Bytes::copy_from_slice(data);
1382                let _ = CachePostBundleV1::from_bytes(&mut bytes);
1383            });
1384        }
1385
1386        #[test]
1387        #[allow(non_snake_case)]
1388        fn fuzz_GetPostBundleResponseV1_from_bytes() {
1389            use crate::protocol::payload::payload::GetPostBundleResponseV1;
1390            bolero::check!().for_each(|data: &[u8]| {
1391                let _ = GetPostBundleResponseV1::from_bytes(Bytes::copy_from_slice(data));
1392            });
1393        }
1394
1395        #[test]
1396        #[allow(non_snake_case)]
1397        fn fuzz_GetPostBundleFeedbackResponseV1_from_bytes() {
1398            use crate::protocol::payload::payload::GetPostBundleFeedbackResponseV1;
1399            bolero::check!().for_each(|data: &[u8]| {
1400                let _ = GetPostBundleFeedbackResponseV1::from_bytes(Bytes::copy_from_slice(data));
1401            });
1402        }
1403
1404        #[test]
1405        #[allow(non_snake_case)]
1406        fn fuzz_SubmitPostClaimV1_from_bytes() {
1407            use crate::protocol::payload::payload::SubmitPostClaimV1;
1408            bolero::check!().for_each(|data: &[u8]| {
1409                let mut bytes = Bytes::copy_from_slice(data);
1410                let _ = SubmitPostClaimV1::from_bytes(&mut bytes);
1411            });
1412        }
1413
1414        #[test]
1415        #[allow(non_snake_case)]
1416        fn fuzz_HealPostBundleClaimV1_from_bytes() {
1417            use crate::protocol::payload::payload::HealPostBundleClaimV1;
1418            bolero::check!().for_each(|data: &[u8]| {
1419                let mut bytes = Bytes::copy_from_slice(data);
1420                let _ = HealPostBundleClaimV1::from_bytes(&mut bytes);
1421            });
1422        }
1423
1424        #[test]
1425        #[allow(non_snake_case)]
1426        fn fuzz_HealPostBundleCommitV1_from_bytes() {
1427            use crate::protocol::payload::payload::HealPostBundleCommitV1;
1428            bolero::check!().for_each(|data: &[u8]| {
1429                let mut bytes = Bytes::copy_from_slice(data);
1430                let _ = HealPostBundleCommitV1::from_bytes(&mut bytes);
1431            });
1432        }
1433    }
1434
1435    #[test]
1436    #[allow(non_snake_case)]
1437    fn test_PAYLOAD_REQUEST_KIND_COUNT_matches_variants() {
1438        use crate::protocol::payload::payload::{PayloadRequestKind, PAYLOAD_REQUEST_KIND_COUNT};
1439        let last_variant = PayloadRequestKind::from_u16((PAYLOAD_REQUEST_KIND_COUNT - 1) as u16)
1440            .expect("PAYLOAD_REQUEST_KIND_COUNT - 1 must decode to a valid variant");
1441        assert_eq!(last_variant, PayloadRequestKind::PeerStatsRequestV1, "last variant changed; bump PAYLOAD_REQUEST_KIND_COUNT");
1442        assert!(PayloadRequestKind::from_u16(PAYLOAD_REQUEST_KIND_COUNT as u16).is_err(), "PAYLOAD_REQUEST_KIND_COUNT must equal the variant count");
1443    }
1444
1445    #[test]
1446    #[allow(non_snake_case)]
1447    fn test_to_from_PeerStatsRequestV1() -> anyhow::Result<()> {
1448        use crate::protocol::payload::payload::PeerStatsRequestV1;
1449        let request = PeerStatsRequestV1 {};
1450        let encoded = request.to_bytes()?;
1451        let decoded = PeerStatsRequestV1::from_bytes(&encoded)?;
1452        assert_eq!(decoded, request);
1453        Ok(())
1454    }
1455
1456    #[tokio::test]
1457    #[allow(non_snake_case)]
1458    async fn test_to_from_PeerStatsResponseV1() -> anyhow::Result<()> {
1459        use crate::protocol::payload::payload::PeerStatsResponseV1;
1460        use crate::tools::compression;
1461
1462        let time_provider = RealTimeProvider::default();
1463        let pow_generator = StubParallelPowGenerator::new();
1464        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1465        let peer = server_id.to_peer(&time_provider)?;
1466
1467        let doc = serde_json::json!({ "requests": { "PingV1": 7 }, "system": { "memory_total_bytes": 1234 } });
1468        let json_bytes = serde_json::to_vec(&doc)?;
1469        let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
1470
1471        let timestamp = time_provider.current_time_millis();
1472        let signing_input = PeerStatsResponseV1::signing_input(timestamp, &json_compressed);
1473        let signature = crate::tools::signing::sign(&server_id.keys.signature_key, &signing_input);
1474
1475        let response = PeerStatsResponseV1 { peer, timestamp, json_compressed, signature };
1476        let encoded = response.to_bytes()?;
1477        let decoded = PeerStatsResponseV1::from_bytes(&encoded)?;
1478        assert_eq!(decoded, response);
1479        Ok(())
1480    }
1481}