Skip to main content

hashiverse_lib/protocol/payload/
payload.rs

1//! # RPC request and response payloads
2//!
3//! This is the catalogue of *what* peers say to each other. Every application-level
4//! operation — ping, bootstrap, peer announce, submit post (claim + commit), submit
5//! feedback, get post bundle, get post bundle feedback, heal, cache, fetch URL preview,
6//! trending hashtags — has a versioned request and response type defined here, plus a
7//! `u16` discriminator in [`PayloadRequestKind`] / [`PayloadResponseKind`] that routes
8//! packets to the right handler on the receiving side.
9//!
10//! ## Design notes
11//!
12//! - **Tokens**: Two-phase operations (submit post, heal, cache) exchange server-signed
13//!   token types (`SubmitPostClaimTokenV1`, `CacheRequestTokenV1`,
14//!   `HealPostBundleClaimTokenV1`) so the second phase can prove admission was granted
15//!   without the server needing to keep per-client state. Each token embeds the peer's
16//!   own [`Peer`] record so verification is self-contained.
17//! - **Bulk payloads** (`GetPostBundleResponseV1`, `GetPostBundleFeedbackResponseV1`)
18//!   use a custom length-prefixed-JSON + raw-bytes format rather than pure JSON, so
19//!   large opaque blobs stream through the
20//!   [`crate::tools::bytes_gatherer::BytesGatherer`] without being re-serialised.
21//! - **PoW** on any request that carries weight (posting, feedback, fetching URLs) is
22//!   carried inline via [`crate::protocol::peer::ClientPow`] credentials so the server
23//!   can validate amplification before spending CPU on the payload.
24
25use crate::protocol::peer::Peer;
26use crate::tools::buckets::BucketLocation;
27use crate::tools::time::TimeMillis;
28use crate::tools::types::{Hash, Id, Signature, SignatureKey, VerificationKey, ID_BYTES};
29use crate::tools::{hashing, json, signing, tools, BytesGatherer};
30use bytes::{Buf, BufMut, Bytes, BytesMut};
31use serde::{Deserialize, Serialize};
32use strum_macros::{Display, FromRepr};
33use crate::anyhow_assert_ge;
34use crate::protocol::posting::encoded_post_bundle::EncodedPostBundleHeaderV1;
35use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
36
37/// The wire-level discriminator for every RPC request the protocol supports.
38///
39/// This `u16` value sits in the RPC request header so the server-side dispatcher can route
40/// each incoming [`crate::protocol::rpc::RpcRequestPacketRx`] to the correct handler
41/// without having to partially decode the payload first. The variants cover the three
42/// main subsystems: peer exchange (`PingV1`, `BootstrapV1`, `AnnounceV1`), posting and
43/// retrieval (`GetPostBundleV1`, `SubmitPostClaimV1`, `SubmitPostCommitV1`, feedback, heal,
44/// cache), and secondary services (`FetchUrlPreviewV1`, `TrendingHashtagsFetchV1`).
45///
46/// Every variant has a paired [`PayloadResponseKind`]; backwards-compatible additions go at
47/// the end of the enum so existing `u16` values do not shift.
48#[derive(Debug, Display, PartialEq, Clone, FromRepr)]
49#[repr(u16)]
50pub enum PayloadRequestKind {
51    ErrorV1, // Used solely for testing error handling
52    PingV1,
53    BootstrapV1,
54    AnnounceV1,
55    GetPostBundleV1,
56    GetPostBundleFeedbackV1,
57    SubmitPostClaimV1,
58    SubmitPostCommitV1,
59    SubmitPostFeedbackV1,
60    HealPostBundleClaimV1,
61    HealPostBundleCommitV1,
62    HealPostBundleFeedbackV1,
63    CachePostBundleV1,
64    CachePostBundleFeedbackV1,
65    FetchUrlPreviewV1,
66    TrendingHashtagsFetchV1,
67    PeerStatsRequestV1,
68    AnnounceV2,
69}
70
71/// Number of variants in [`PayloadRequestKind`]. Manually maintained; the
72/// test `test_PAYLOAD_REQUEST_KIND_COUNT_matches_variants` keeps it honest.
73/// Used to size fixed-length per-kind counter arrays (e.g. on the server).
74pub const PAYLOAD_REQUEST_KIND_COUNT: usize = 18;
75
76impl PayloadRequestKind {
77    pub fn from_u16(value: u16) -> anyhow::Result<Self> {
78        Self::from_repr(value).ok_or_else(|| anyhow::anyhow!("unknown PayloadRequestKind: {}", value))
79    }
80}
81
82/// The wire-level discriminator for every RPC response in the protocol.
83///
84/// Each variant pairs with a [`PayloadRequestKind`] of the same name minus the `Response`
85/// suffix (plus the unpaired `ErrorResponseV1`, which any handler may return on failure).
86/// The discriminator rides in the response header so the client knows which payload
87/// deserializer to use when it parses an [`crate::protocol::rpc::RpcResponsePacketRx`].
88///
89/// Additions go at the end of the enum to preserve backwards compatibility.
90#[derive(Debug, Display, PartialEq, Clone, FromRepr)]
91#[repr(u16)]
92pub enum PayloadResponseKind {
93    ErrorResponseV1, // Can be returned by any RPC call if an error happens on the server
94    PingResponseV1,
95    BootstrapResponseV1,
96    AnnounceResponseV1,
97    GetPostBundleResponseV1,
98    GetPostBundleFeedbackResponseV1,
99    SubmitPostClaimResponseV1,
100    SubmitPostCommitResponseV1,
101    SubmitPostFeedbackResponseV1,
102    HealPostBundleClaimResponseV1,
103    HealPostBundleCommitResponseV1,
104    HealPostBundleFeedbackResponseV1,
105    CachePostBundleResponseV1,
106    CachePostBundleFeedbackResponseV1,
107    FetchUrlPreviewResponseV1,
108    TrendingHashtagsFetchResponseV1,
109    PeerStatsResponseV1,
110    AnnounceResponseV2,
111}
112
113impl PayloadResponseKind {
114    pub fn from_u16(value: u16) -> anyhow::Result<Self> {
115        Self::from_repr(value).ok_or_else(|| anyhow::anyhow!("unknown PayloadResponseKind: {}", value))
116    }
117}
118
119/// The universal error response body returned when a handler rejects a request.
120///
121/// Any RPC handler may return an `ErrorResponseV1` (packaged in a response with
122/// [`PayloadResponseKind::ErrorResponseV1`]) instead of its usual success shape. The `code`
123/// is a numeric discriminator to let callers branch on error class without string parsing;
124/// the `message` is a human-readable explanation for logs and diagnostics.
125#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
126pub struct ErrorResponseV1 {
127    pub code: u16,
128    pub message: String,
129}
130
131#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
132pub struct PingV1 {}
133
134#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
135pub struct PingResponseV1 {
136    pub peer: Peer,
137}
138
139#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
140pub struct BootstrapV1 {}
141
142#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
143pub struct BootstrapResponseV1 {
144    pub peers_random: Vec<Peer>,
145}
146
147#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
148pub struct AnnounceV1 {
149    pub peer_self: Peer,
150}
151
152#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
153pub struct AnnounceResponseV1 {
154    pub peer_self: Peer,
155    pub peers_nearest: Vec<Peer>,
156}
157
158/// V2 of the peer announce. Adds an opaque `proof_payload` produced and verified by the
159/// announcer's transport-specific
160/// [`crate::transport::transport_ownership_proof::TransportOwnershipProof`] — for the HTTPS
161/// transport that's the announcer's current ACME-issued TLS chain, for mem / TCP it's the
162/// empty marker. The receiver gates Kademlia admission on the proof verifying; an announcer
163/// that can't produce a valid proof (e.g. an HTTPS server that hasn't completed ACME
164/// because it doesn't control port 443) is dropped without ever being added.
165///
166/// `proof_payload` is `Vec<u8>` rather than `bytes::Bytes` because this struct is wire-
167/// encoded as JSON, and `serde_json` doesn't round-trip `Bytes` (the bytes crate's
168/// `Serialize` calls `serialize_bytes`, but `serde_json`'s `Deserialize` reaches Bytes via
169/// `visit_seq` on the integer-array form — they don't meet). `Vec<u8>` round-trips cleanly.
170#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
171pub struct AnnounceV2 {
172    pub peer_self: Peer,
173    pub proof_payload: Vec<u8>,
174}
175
176/// V2 announce response. Same shape as [`AnnounceResponseV1`] — proof bytes never flow
177/// back in the response, since receivers only validate proofs at admission time and
178/// thereafter store and gossip plain [`Peer`] records.
179#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
180pub struct AnnounceResponseV2 {
181    pub peer_self: Peer,
182    pub peers_nearest: Vec<Peer>,
183}
184
185#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
186pub struct GetPostBundleV1 {
187    pub bucket_location: BucketLocation,
188    pub peers_visited: Vec<Peer>,
189    pub already_retrieved_peer_ids: Vec<Id>,
190}
191
192#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
193pub struct GetPostBundleFeedbackV1 {
194    pub bucket_location: BucketLocation,
195    pub peers_visited: Vec<Peer>,
196    pub already_retrieved_peer_ids: Vec<Id>,
197}
198
199#[derive(Debug, PartialEq, Clone)]
200pub struct GetPostBundleResponseV1 {
201    pub peers_nearer: Vec<Peer>,
202    pub cache_request_token: Option<CacheRequestTokenV1>,
203    pub post_bundles_cached: Vec<Bytes>,  // Cached EncodedPostBundleV1 from intermediate servers
204    pub post_bundle: Option<Bytes>, // Primary EncodedPostBundleV1 from the responsible server
205}
206
207impl GetPostBundleResponseV1 {
208    pub fn to_bytes_gatherer(&self) -> anyhow::Result<BytesGatherer> {
209        let mut bytes_gatherer = BytesGatherer::default();
210
211        tools::write_length_prefixed_json::<Vec<Peer>>(&mut bytes_gatherer, &self.peers_nearer)?;
212
213        match &self.cache_request_token {
214            Some(token) => {
215                bytes_gatherer.put_u8(1);
216                tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
217            }
218            None => bytes_gatherer.put_u8(0),
219        }
220
221        bytes_gatherer.put_u16(self.post_bundles_cached.len() as u16);
222        for bundle in &self.post_bundles_cached {
223            bytes_gatherer.put_u32(bundle.len() as u32);
224            bytes_gatherer.put_bytes(bundle.clone());
225        }
226
227        match &self.post_bundle {
228            Some(post_bundle) => { bytes_gatherer.put_u8(1); bytes_gatherer.put_bytes(post_bundle.clone()); }
229            None => { bytes_gatherer.put_u8(0); }
230        }
231
232        Ok(bytes_gatherer)
233    }
234
235    pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
236        use bytes::Buf;
237
238        let peers_nearer = tools::read_length_prefixed_json::<Vec<Peer>>(&mut bytes)?;
239
240        if bytes.remaining() < 1 {
241            anyhow::bail!("Invalid buffer: missing cache_request_token presence flag");
242        }
243        let cache_request_token = match bytes.get_u8() {
244            0 => None,
245            1 => Some(tools::read_length_prefixed_json::<CacheRequestTokenV1>(&mut bytes)?),
246            _ => anyhow::bail!("Invalid buffer: unknown cache_request_token presence flag"),
247        };
248
249        anyhow_assert_ge!(bytes.remaining(), 2, "Missing post_bundles_cached count");
250        let cached_count = bytes.get_u16() as usize;
251        let mut post_bundles_cached = Vec::with_capacity(cached_count);
252        for _ in 0..cached_count {
253            anyhow_assert_ge!(bytes.remaining(), 4, "Missing post_bundles_cached entry length");
254            let len = bytes.get_u32() as usize;
255            anyhow_assert_ge!(bytes.remaining(), len, "Truncated post_bundles_cached entry");
256            post_bundles_cached.push(bytes.split_to(len));
257        }
258
259        if bytes.remaining() < 1 {
260            anyhow::bail!("Invalid buffer: missing post_bundle presence flag");
261        }
262        let post_bundle = match bytes.get_u8() {
263            0 => None,
264            1 => Some(bytes.copy_to_bytes(bytes.remaining())),
265            _ => anyhow::bail!("Invalid buffer: unknown post_bundle presence flag"),
266        };
267
268        Ok(Self { peers_nearer, cache_request_token, post_bundles_cached, post_bundle })
269    }
270}
271#[derive(Debug, PartialEq, Clone)]
272pub struct GetPostBundleFeedbackResponseV1 {
273    pub peers_nearer: Vec<Peer>,
274    pub cache_request_token: Option<CacheRequestTokenV1>,
275    pub post_bundle_feedbacks_cached: Vec<Bytes>, // Cached feedback from originators the client hasn't seen yet (filtered by already_retrieved_peer_ids).
276    pub encoded_post_bundle_feedback: Option<Bytes>, // Primary feedback from the responsible server
277}
278
279impl GetPostBundleFeedbackResponseV1 {
280    pub fn to_bytes_gatherer(&self) -> anyhow::Result<BytesGatherer> {
281        let mut bytes_gatherer = BytesGatherer::default();
282
283        tools::write_length_prefixed_json::<Vec<Peer>>(&mut bytes_gatherer, &self.peers_nearer)?;
284
285        match &self.cache_request_token {
286            Some(token) => {
287                bytes_gatherer.put_u8(1u8);
288                tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
289            }
290            None => bytes_gatherer.put_u8(0u8),
291        }
292
293        bytes_gatherer.put_u16(self.post_bundle_feedbacks_cached.len() as u16);
294        for feedback in &self.post_bundle_feedbacks_cached {
295            bytes_gatherer.put_u32(feedback.len() as u32);
296            bytes_gatherer.put_bytes(feedback.clone());
297        }
298
299        match &self.encoded_post_bundle_feedback {
300            Some(post_bundle_feedback) => {
301                bytes_gatherer.put_u8(1u8);
302                bytes_gatherer.put_bytes(post_bundle_feedback.clone());
303            }
304            None => bytes_gatherer.put_u8(0u8),
305        }
306
307        Ok(bytes_gatherer)
308    }
309
310    pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
311        use bytes::Buf;
312
313        let peers_nearer = tools::read_length_prefixed_json::<Vec<Peer>>(&mut bytes)?;
314
315        if bytes.remaining() < 1 {
316            anyhow::bail!("Invalid buffer: missing cache_request_token presence flag");
317        }
318        let cache_request_token = match bytes.get_u8() {
319            0 => None,
320            1 => Some(tools::read_length_prefixed_json::<CacheRequestTokenV1>(&mut bytes)?),
321            _ => anyhow::bail!("Invalid buffer: unknown cache_request_token presence flag"),
322        };
323
324        anyhow_assert_ge!(bytes.remaining(), 2, "Missing post_bundle_feedbacks_cached count");
325        let cached_count = bytes.get_u16() as usize;
326        let mut post_bundle_feedbacks_cached = Vec::with_capacity(cached_count);
327        for _ in 0..cached_count {
328            anyhow_assert_ge!(bytes.remaining(), 4, "Missing post_bundle_feedbacks_cached entry length");
329            let len = bytes.get_u32() as usize;
330            anyhow_assert_ge!(bytes.remaining(), len, "Truncated post_bundle_feedbacks_cached entry");
331            post_bundle_feedbacks_cached.push(bytes.split_to(len));
332        }
333
334        if bytes.remaining() < 1 {
335            anyhow::bail!("Invalid buffer: missing post_bundle_feedback presence flag");
336        }
337        let post_bundle_feedback = match bytes.get_u8() {
338            0 => None,
339            1 => Some(bytes.copy_to_bytes(bytes.remaining())),
340            _ => anyhow::bail!("Invalid buffer: unknown post_bundle_feedback presence flag"),
341        };
342
343        Ok(Self { peers_nearer, cache_request_token, post_bundle_feedbacks_cached, encoded_post_bundle_feedback: post_bundle_feedback })
344    }
345}
346
347#[derive(Debug, PartialEq, Clone)]
348pub struct SubmitPostClaimV1 {
349    pub bucket_location: BucketLocation,
350    pub referenced_post_header_bytes: Option<Bytes>, // For Sequel posts: the original post's bytes_without_body, used to verify same-author
351    pub referenced_hashtags: Vec<String>,             // Hashtag strings referenced by this post, for trending tracking
352    pub encoded_post_bytes: Bytes,                    // Note that it has just the header of the EncodedPost...
353}
354
355impl SubmitPostClaimV1 {
356    pub fn new_to_bytes(bucket_location: &BucketLocation, referenced_post_header_bytes: Option<&[u8]>, referenced_hashtags: &[String], encoded_post_bytes_without_body: &[u8]) -> anyhow::Result<Bytes> {
357        let mut bytes_gatherer = BytesGatherer::default();
358        tools::write_length_prefixed_json(&mut bytes_gatherer, bucket_location)?;
359        match referenced_post_header_bytes {
360            Some(header_bytes) => {
361                bytes_gatherer.put_u32(header_bytes.len() as u32);
362                bytes_gatherer.put_slice(header_bytes);
363            }
364            None => {
365                bytes_gatherer.put_u32(0);
366            }
367        }
368        tools::write_length_prefixed_json(&mut bytes_gatherer, &referenced_hashtags)?;
369        bytes_gatherer.put_slice(encoded_post_bytes_without_body);
370        Ok(bytes_gatherer.to_bytes())
371    }
372
373    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
374        use bytes::Buf;
375        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
376
377        anyhow::ensure!(bytes.remaining() >= 4, "SubmitPostClaimV1: missing referenced_post_header_bytes length");
378        let referenced_post_header_bytes_len = bytes.get_u32() as usize;
379        let referenced_post_header_bytes = if referenced_post_header_bytes_len > 0 {
380            anyhow::ensure!(bytes.remaining() >= referenced_post_header_bytes_len, "SubmitPostClaimV1: referenced_post_header_bytes length {} exceeds remaining {}", referenced_post_header_bytes_len, bytes.remaining());
381            Some(bytes.split_to(referenced_post_header_bytes_len))
382        } else {
383            None
384        };
385
386        let referenced_hashtags = tools::read_length_prefixed_json::<Vec<String>>(bytes)?;
387
388        anyhow::ensure!(bytes.has_remaining(), "SubmitPostClaimV1: missing encoded_post_bytes");
389        let encoded_post_bytes = bytes.split_to(bytes.len());
390
391        Ok(Self { bucket_location, referenced_post_header_bytes, referenced_hashtags, encoded_post_bytes })
392    }
393}
394
395#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
396pub struct SubmitPostClaimResponseV1 {
397    pub peers_nearer: Vec<Peer>,
398    pub submit_post_claim_token: Option<SubmitPostClaimTokenV1>,
399}
400
401#[derive(Debug, PartialEq, Clone)]
402pub struct SubmitPostCommitV1 {
403    pub bucket_location: BucketLocation,
404    pub submit_post_claim_token: SubmitPostClaimTokenV1,
405    pub encoded_post_bytes: Bytes,
406}
407
408impl SubmitPostCommitV1 {
409    pub fn new_to_bytes(bucket_location: &BucketLocation, submit_post_claim_token: &SubmitPostClaimTokenV1, encoded_post_bytes: &[u8]) -> anyhow::Result<Bytes> {
410        let mut bytes_gatherer = BytesGatherer::default();
411        tools::write_length_prefixed_json(&mut bytes_gatherer, bucket_location)?;
412        tools::write_length_prefixed_json(&mut bytes_gatherer, submit_post_claim_token)?;
413        bytes_gatherer.put_slice(encoded_post_bytes);
414        Ok(bytes_gatherer.to_bytes())
415    }
416
417    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
418        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
419        let submit_post_claim_token = tools::read_length_prefixed_json::<SubmitPostClaimTokenV1>(bytes)?;
420        let encoded_post_bytes = bytes.split_to(bytes.len());
421
422        Ok(Self {
423            bucket_location,
424            submit_post_claim_token,
425            encoded_post_bytes,
426        })
427    }
428}
429
430#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
431pub struct SubmitPostCommitResponseV1 {
432    pub submit_post_commit_token: SubmitPostCommitTokenV1,
433}
434
435#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
436pub struct SubmitPostClaimTokenV1 {
437    pub peer: Peer,
438    pub bucket_location: BucketLocation,
439    pub post_id: Id,
440    pub token_signature: Signature,
441}
442
443impl SubmitPostClaimTokenV1 {
444    fn get_hash_for_signing(peer: &Peer, bucket_location: &BucketLocation, post_id: &Id) -> Hash {
445        hashing::hash_multiple(&[peer.id.as_ref(), bucket_location.get_hash_for_signing().as_ref(), post_id.as_ref(), "SubmitPostClaimTokenV1".as_bytes()])
446    }
447
448    pub fn new(peer: Peer, bucket_location: BucketLocation, post_id: Id, signature_key: &SignatureKey) -> Self {
449        let token_signature = signing::sign(signature_key, Self::get_hash_for_signing(&peer, &bucket_location, &post_id).as_ref());
450        Self { peer, bucket_location, post_id, token_signature }
451    }
452
453    pub fn verify(&self) -> anyhow::Result<()> {
454        self.peer.verify()?;
455        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
456        signing::verify(&verification_key, &self.token_signature, Self::get_hash_for_signing(&self.peer, &self.bucket_location, &self.post_id).as_ref())
457    }
458}
459
460#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
461pub struct SubmitPostCommitTokenV1 {
462    pub peer: Peer,
463    pub bucket_location: BucketLocation,
464    pub post_id: Id,
465    pub token_signature: Signature,
466}
467
468impl SubmitPostCommitTokenV1 {
469    fn get_hash_for_signing(peer: &Peer, bucket_location: &BucketLocation, post_id: &Id) -> Hash {
470        hashing::hash_multiple(&[peer.id.as_ref(), bucket_location.get_hash_for_signing().as_ref(), post_id.as_ref(), "SubmitPostCommitTokenV1".as_bytes()])
471    }
472
473    pub fn new(peer: Peer, bucket_location: BucketLocation, post_id: Id, signature_key: &SignatureKey) -> Self {
474        let token_signature = signing::sign(signature_key, Self::get_hash_for_signing(&peer, &bucket_location, &post_id).as_ref());
475        Self { peer, bucket_location, post_id, token_signature }
476    }
477
478    pub fn verify(&self) -> anyhow::Result<()> {
479        self.peer.verify()?;
480        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
481        signing::verify(&verification_key, &self.token_signature, Self::get_hash_for_signing(&self.peer, &self.bucket_location, &self.post_id).as_ref())
482    }
483}
484
485
486#[derive(Debug, PartialEq, Clone)]
487pub struct SubmitPostFeedbackV1 {
488    pub bucket_location: BucketLocation,
489    pub encoded_post_feedback: EncodedPostFeedbackV1,
490}
491
492impl SubmitPostFeedbackV1 {
493    pub fn new_to_bytes(bucket_location: &BucketLocation, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<Bytes> {
494        let mut bytes_gatherer = BytesGatherer::default();
495        tools::write_length_prefixed_json(&mut bytes_gatherer, bucket_location)?;
496        let mut feedback_bytes = BytesMut::new();
497        encoded_post_feedback.append_encode_to_bytes(&mut feedback_bytes)?;
498        bytes_gatherer.put_bytes(feedback_bytes.freeze());
499        Ok(bytes_gatherer.to_bytes())
500    }
501
502    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
503        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
504        let encoded_post_feedback = EncodedPostFeedbackV1::decode_from_bytes(bytes)?;
505        Ok(Self { bucket_location, encoded_post_feedback })
506    }
507
508}
509
510#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
511pub struct SubmitPostFeedbackResponseV1 {
512    pub peers_nearer: Vec<Peer>,
513    pub accepted: bool,
514}
515
516
517
518// ---- Heal Post Bundle (two-phase: claim then commit) ----
519
520#[derive(Debug, PartialEq, Clone)]
521pub struct HealPostBundleClaimV1 {
522    pub bucket_location: BucketLocation,
523    pub donor_header: EncodedPostBundleHeaderV1,
524}
525
526impl HealPostBundleClaimV1 {
527    pub fn new_to_bytes(donor_header: &EncodedPostBundleHeaderV1, bucket_location: &BucketLocation) -> anyhow::Result<Bytes> {
528        let mut g = BytesGatherer::default();
529        tools::write_length_prefixed_json(&mut g, bucket_location)?;
530        tools::write_length_prefixed_json(&mut g, donor_header)?;
531        Ok(g.to_bytes())
532    }
533
534    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
535        let bucket_location = tools::read_length_prefixed_json::<BucketLocation>(bytes)?;
536        let donor_header = tools::read_length_prefixed_json::<EncodedPostBundleHeaderV1>(bytes)?;
537        Ok(Self { donor_header, bucket_location })
538    }
539}
540
541#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
542pub struct HealPostBundleClaimResponseV1 {
543    pub needed_post_ids: Vec<Id>,
544    pub token: Option<HealPostBundleClaimTokenV1>,
545}
546
547#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
548pub struct HealPostBundleClaimTokenV1 {
549    pub peer: Peer,
550    pub bucket_location: BucketLocation,
551    pub donor_header_signature: Signature,
552    pub needed_post_ids: Vec<Id>,
553    pub token_signature: Signature,
554}
555
556impl HealPostBundleClaimTokenV1 {
557    fn get_hash_for_signing(peer_id: &Id, bucket_location: &BucketLocation, donor_header_signature: &Signature, needed_post_ids: &[Id]) -> Hash {
558        let bucket_location_hash = bucket_location.get_hash_for_signing();
559        let mut hash_input: Vec<&[u8]> = vec![
560            peer_id.as_ref(),
561            bucket_location_hash.as_ref(),
562            donor_header_signature.as_ref(),
563            "HealPostBundleClaimTokenV1".as_bytes(),
564        ];
565        for id in needed_post_ids {
566            hash_input.push(id.as_ref());
567        }
568        hashing::hash_multiple(&hash_input)
569    }
570
571    pub fn new(peer: Peer, bucket_location: BucketLocation, needed_post_ids: Vec<Id>, donor_header_signature: Signature, signature_key: &SignatureKey) -> Self {
572        let hash = Self::get_hash_for_signing(&peer.id, &bucket_location, &donor_header_signature, &needed_post_ids);
573        let token_signature = signing::sign(signature_key, hash.as_ref());
574        Self { peer, bucket_location, donor_header_signature, needed_post_ids, token_signature }
575    }
576
577    pub fn verify(&self) -> anyhow::Result<()> {
578        self.peer.verify()?;
579        self.bucket_location.validate()?;
580        let hash = Self::get_hash_for_signing(&self.peer.id, &self.bucket_location, &self.donor_header_signature, &self.needed_post_ids);
581        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
582        signing::verify(&verification_key, &self.token_signature, hash.as_ref())
583    }
584}
585
586#[derive(Debug, PartialEq, Clone)]
587pub struct HealPostBundleCommitV1 {
588    pub token: HealPostBundleClaimTokenV1,
589    pub donor_header: EncodedPostBundleHeaderV1,
590    pub encoded_posts_bytes: Bytes,
591}
592
593impl HealPostBundleCommitV1 {
594    pub fn new_to_bytes(token: &HealPostBundleClaimTokenV1, donor_header: &EncodedPostBundleHeaderV1, encoded_posts_bytes: &[u8]) -> anyhow::Result<Bytes> {
595        let mut bytes_gatherer = BytesGatherer::default();
596        tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
597        tools::write_length_prefixed_json(&mut bytes_gatherer, donor_header)?;
598        bytes_gatherer.put_slice(encoded_posts_bytes);
599        Ok(bytes_gatherer.to_bytes())
600    }
601
602    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
603        let token = tools::read_length_prefixed_json::<HealPostBundleClaimTokenV1>(bytes)?;
604        let donor_header = tools::read_length_prefixed_json::<EncodedPostBundleHeaderV1>(bytes)?;
605        let encoded_posts_bytes = bytes.split_to(bytes.len());
606        Ok(Self { token, donor_header, encoded_posts_bytes })
607    }
608}
609
610#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
611pub struct HealPostBundleCommitResponseV1 {
612    pub accepted: bool,
613}
614
615// ---- Heal Post Bundle Feedback (single-phase) ----
616
617#[derive(Debug, PartialEq, Clone)]
618pub struct HealPostBundleFeedbackV1 {
619    pub location_id: Id,
620    pub encoded_post_feedbacks: Vec<EncodedPostFeedbackV1>,
621}
622
623impl HealPostBundleFeedbackV1 {
624    pub fn new_to_bytes(location_id: &Id, feedbacks: &[EncodedPostFeedbackV1]) -> anyhow::Result<Bytes> {
625        let mut bytes = BytesMut::new();
626        bytes.put_slice(location_id.as_ref());
627        for f in feedbacks {
628            f.append_encode_to_bytes(&mut bytes)?;
629        }
630        Ok(bytes.freeze())
631    }
632
633    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
634        use bytes::Buf;
635        anyhow_assert_ge!(bytes.remaining(), ID_BYTES, "Missing location_id in HealPostBundleFeedbackV1");
636        let location_id = Id::from_slice(&bytes.split_to(ID_BYTES))?;
637        let mut encoded_post_feedbacks = Vec::new();
638        while bytes.has_remaining() {
639            encoded_post_feedbacks.push(EncodedPostFeedbackV1::decode_from_bytes(&mut *bytes)?);
640        }
641        Ok(Self { location_id, encoded_post_feedbacks })
642    }
643}
644
645#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
646pub struct HealPostBundleFeedbackResponseV1 {
647    pub accepted_count: u32,
648}
649
650
651/// Issued by an intermediate server during a GetPostBundle/GetPostBundleFeedback miss or stale hit.
652/// The client uploads the bundle to this server after completing its walk.
653#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
654pub struct CacheRequestTokenV1 {
655    pub peer: Peer,
656    pub bucket_location: BucketLocation,
657    pub expires_at: TimeMillis,
658    /// Originator peer IDs already held in this server's cache for this location_id.
659    /// The client should only upload a bundle whose originator peer.id is NOT in this list.
660    pub already_cached_peer_ids: Vec<Id>,
661    pub token_signature: Signature,
662}
663
664impl CacheRequestTokenV1 {
665    fn get_hash_for_signing(peer_id: &Id, bucket_location: &BucketLocation, expires_at: &TimeMillis, already_cached_peer_ids: &[Id]) -> Hash {
666        let expires_at_be = expires_at.encode_be();
667        let bucket_location_hash = bucket_location.get_hash_for_signing();
668        let mut inputs: Vec<&[u8]> = vec![peer_id.as_ref(), bucket_location_hash.as_ref(), expires_at_be.as_ref(), "CacheRequestToken".as_bytes()];
669        for id in already_cached_peer_ids {
670            inputs.push(id.as_ref());
671        }
672        hashing::hash_multiple(&inputs)
673    }
674
675    pub fn new(peer: Peer, bucket_location: BucketLocation, expires_at: TimeMillis, already_cached_peer_ids: Vec<Id>, signature_key: &SignatureKey) -> Self {
676        let token_signature = signing::sign(signature_key, Self::get_hash_for_signing(&peer.id, &bucket_location, &expires_at, &already_cached_peer_ids).as_ref());
677        Self { peer, bucket_location, expires_at, already_cached_peer_ids, token_signature }
678    }
679
680    pub fn verify(&self) -> anyhow::Result<()> {
681        self.peer.verify()?;
682        self.bucket_location.validate()?;
683        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
684        signing::verify(&verification_key, &self.token_signature, Self::get_hash_for_signing(&self.peer.id, &self.bucket_location, &self.expires_at, &self.already_cached_peer_ids).as_ref())
685    }
686
687    pub fn is_expired(&self, current_time: TimeMillis) -> bool {
688        current_time > self.expires_at
689    }
690}
691
692#[derive(Debug, PartialEq, Clone)]
693pub struct CachePostBundleV1 {
694    pub token: CacheRequestTokenV1,
695    /// One entry per originator — the client uploads all bundles it collected during its walk.
696    pub encoded_post_bundles: Vec<Bytes>,
697}
698
699impl CachePostBundleV1 {
700    pub fn new_to_bytes(token: &CacheRequestTokenV1, encoded_post_bundles: &[&[u8]]) -> anyhow::Result<Bytes> {
701        let mut bytes_gatherer = BytesGatherer::default();
702        tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
703        bytes_gatherer.put_u16(encoded_post_bundles.len() as u16);
704        for bundle in encoded_post_bundles {
705            bytes_gatherer.put_u32(bundle.len() as u32);
706            bytes_gatherer.put_slice(bundle);
707        }
708        Ok(bytes_gatherer.to_bytes())
709    }
710
711    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
712        let token = tools::read_length_prefixed_json::<CacheRequestTokenV1>(bytes)?;
713        anyhow_assert_ge!(bytes.remaining(), 2, "Missing encoded_post_bundles count");
714        let count = bytes.get_u16() as usize;
715        let mut encoded_post_bundles = Vec::with_capacity(count);
716        for _ in 0..count {
717            anyhow_assert_ge!(bytes.remaining(), 4, "Missing encoded_post_bundle entry length");
718            let len = bytes.get_u32() as usize;
719            anyhow_assert_ge!(bytes.remaining(), len, "Truncated encoded_post_bundle entry");
720            encoded_post_bundles.push(bytes.split_to(len));
721        }
722        Ok(Self { token, encoded_post_bundles })
723    }
724}
725
726#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
727pub struct CachePostBundleResponseV1 {
728    pub accepted: bool,
729}
730
731#[derive(Debug, PartialEq, Clone)]
732pub struct CachePostBundleFeedbackV1 {
733    pub token: CacheRequestTokenV1,
734    pub encoded_post_bundle_feedback_bytes: Bytes,
735}
736
737impl CachePostBundleFeedbackV1 {
738    pub fn new_to_bytes(token: &CacheRequestTokenV1, encoded_post_bundle_feedback_bytes: &[u8]) -> anyhow::Result<Bytes> {
739        let mut bytes_gatherer = BytesGatherer::default();
740        tools::write_length_prefixed_json(&mut bytes_gatherer, token)?;
741        bytes_gatherer.put_slice(encoded_post_bundle_feedback_bytes);
742        Ok(bytes_gatherer.to_bytes())
743    }
744
745    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
746        let token = tools::read_length_prefixed_json::<CacheRequestTokenV1>(bytes)?;
747        let encoded_post_bundle_feedback_bytes = bytes.split_to(bytes.len());
748        Ok(Self { token, encoded_post_bundle_feedback_bytes })
749    }
750}
751
752#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
753pub struct CachePostBundleFeedbackResponseV1 {
754    pub accepted: bool,
755}
756
757/// Client → Server: fetch Open Graph metadata for `url`
758#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
759pub struct FetchUrlPreviewV1 {
760    pub url: String,
761}
762
763impl FetchUrlPreviewV1 {
764    pub fn new_to_bytes(url: &str) -> anyhow::Result<Bytes> {
765        let mut bytes_gatherer = BytesGatherer::default();
766        tools::write_length_prefixed_json(&mut bytes_gatherer, &FetchUrlPreviewV1 { url: url.to_string() })?;
767        Ok(bytes_gatherer.to_bytes())
768    }
769
770    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
771        tools::read_length_prefixed_json(bytes)
772    }
773}
774
775/// Server → Client: extracted preview metadata
776#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
777pub struct FetchUrlPreviewResponseV1 {
778    pub url: String,
779    pub title: String,
780    pub description: String,
781    pub image_url: String,
782}
783
784impl FetchUrlPreviewResponseV1 {
785    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
786        json::struct_to_bytes(self)
787    }
788
789    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
790        json::bytes_to_struct(bytes)
791    }
792}
793
794/// Client → Server: fetch trending hashtags
795#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
796pub struct TrendingHashtagsFetchV1 {
797    pub limit: u16,
798}
799
800impl TrendingHashtagsFetchV1 {
801    pub fn new_to_bytes(limit: u16) -> anyhow::Result<Bytes> {
802        let mut bytes_gatherer = BytesGatherer::default();
803        tools::write_length_prefixed_json(&mut bytes_gatherer, &TrendingHashtagsFetchV1 { limit })?;
804        Ok(bytes_gatherer.to_bytes())
805    }
806
807    pub fn from_bytes(bytes: &mut Bytes) -> anyhow::Result<Self> {
808        tools::read_length_prefixed_json(bytes)
809    }
810}
811
812#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
813pub struct TrendingHashtagV1 {
814    pub hashtag: String,
815    pub count: u64,
816}
817
818/// Server → Client: trending hashtags with unique author counts
819#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
820pub struct TrendingHashtagsFetchResponseV1 {
821    pub trending_hashtags: Vec<TrendingHashtagV1>,
822}
823
824impl TrendingHashtagsFetchResponseV1 {
825    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
826        json::struct_to_bytes(self)
827    }
828
829    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
830        json::bytes_to_struct(bytes)
831    }
832}
833
834/// Client → Server: opt-in peer-introspection request.
835///
836/// Empty payload. The corresponding response carries a signed, compressed JSON
837/// document; per-request PoW is gated by `POW_MINIMUM_PER_PEER_STATS` and the
838/// server caches the response for a short window to absorb bursts of well-PoW'd
839/// callers without re-measuring every time.
840#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
841pub struct PeerStatsRequestV1 {}
842
843impl PeerStatsRequestV1 {
844    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
845        json::struct_to_bytes(self)
846    }
847
848    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
849        json::bytes_to_struct(bytes)
850    }
851}
852
853/// Server → Client: signed peer-introspection response.
854///
855/// `json_compressed` is an lz4-compressed JSON object (the stats document).
856/// `signature` covers `timestamp.encode_be() || json_compressed` using the
857/// server's Ed25519 signing key — clients that re-share the response amongst
858/// themselves verify against the bytes as transmitted, so no canonical-JSON
859/// dance is required.
860#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
861pub struct PeerStatsResponseV1 {
862    pub peer: Peer,
863    pub timestamp: TimeMillis,
864    pub json_compressed: Bytes,
865    pub signature: Signature,
866}
867
868impl PeerStatsResponseV1 {
869    /// The exact byte sequence that `signature` covers.
870    pub fn signing_input(timestamp: TimeMillis, json_compressed: &[u8]) -> Vec<u8> {
871        let mut buf = Vec::with_capacity(crate::tools::time::TIME_MILLIS_BYTES + json_compressed.len());
872        buf.extend_from_slice(timestamp.encode_be().as_ref());
873        buf.extend_from_slice(json_compressed);
874        buf
875    }
876
877    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
878        json::struct_to_bytes(self)
879    }
880
881    pub fn from_bytes(bytes: &Bytes) -> anyhow::Result<Self> {
882        json::bytes_to_struct(bytes)
883    }
884}
885
886#[cfg(test)]
887mod tests {
888    use crate::protocol::payload::payload::{GetPostBundleResponseV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1, SubmitPostClaimTokenV1, SubmitPostCommitTokenV1};
889    use crate::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
890    use crate::tools::server_id::ServerId;
891    use crate::tools::buckets::{BucketLocation, BucketType};
892    use crate::tools::time::{TimeMillis, MILLIS_IN_HOUR};
893    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
894    use crate::tools::{config, json, tools};
895    use crate::tools::types::{Id, Pow, Signature, ID_BYTES};
896    use std::collections::HashSet;
897    use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
898    use bytes::Bytes;
899
900    #[tokio::test]
901    #[allow(non_snake_case)]
902    async fn test_to_from_GetPostBundleResponseV1() -> anyhow::Result<()> {
903        let time_provider = RealTimeProvider;
904        let pow_generator = SingleThreadedPowGenerator::new();
905        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
906        let peer = server_id.to_peer(&time_provider)?;
907
908        let mut encoded_posts_ids = Vec::new();
909        let mut encoded_posts_lengths = Vec::new();
910        let mut encoded_posts_bytes = Vec::new();
911
912        for i in 0..16 {
913            let len = (i + 1) * 128;
914            let mut encoded_post_bytes = tools::random_bytes(len);
915            let post_id = Id::from_slice(&encoded_post_bytes[0..ID_BYTES])?;
916            encoded_posts_ids.push(post_id);
917            encoded_posts_lengths.push(encoded_post_bytes.len());
918            encoded_posts_bytes.append(&mut encoded_post_bytes);
919        }
920
921        let header = EncodedPostBundleHeaderV1 {
922            time_millis: time_provider.current_time_millis(),
923            location_id: Id::random(),
924            overflowed: false,
925            sealed: false,
926            num_posts: 0,
927            encoded_post_ids: encoded_posts_ids,
928            encoded_post_lengths: encoded_posts_lengths,
929            encoded_post_healed: HashSet::new(),
930            peer,
931            signature: Signature::zero(),
932        };
933
934        let peers_nearer = {
935            vec![
936                ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?.to_peer(&time_provider)?,
937                ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?.to_peer(&time_provider)?,
938                ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?.to_peer(&time_provider)?,
939            ]
940        };
941
942        let response = GetPostBundleResponseV1 {
943            peers_nearer,
944            cache_request_token: None,
945            post_bundles_cached: vec![],
946            post_bundle: Some((EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::from(encoded_posts_bytes) }).to_bytes()?),
947        };
948
949        let encoded = response.to_bytes_gatherer()?.to_bytes();
950        let response_decoded = GetPostBundleResponseV1::from_bytes(encoded)?;
951
952        assert_eq!(response_decoded, response);
953
954        Ok(())
955    }
956
957    async fn make_signed_header(time_provider: &RealTimeProvider) -> anyhow::Result<(EncodedPostBundleHeaderV1, ServerId)> {
958        let pow_generator = SingleThreadedPowGenerator::new();
959        let server_id = ServerId::new("own_pow", time_provider, config::SERVER_KEY_POW_MIN, true, &pow_generator).await?;
960        let peer = server_id.to_peer(time_provider)?;
961        let num_posts: u8 = 4;
962        let mut header = EncodedPostBundleHeaderV1 {
963            time_millis: time_provider.current_time_millis(),
964            location_id: Id::random(),
965            overflowed: false,
966            sealed: false,
967            num_posts,
968            encoded_post_ids: (0..num_posts).map(|_| Id::random()).collect(),
969            encoded_post_lengths: (0..num_posts).map(|i| (i as usize + 1) * 64).collect(),
970            encoded_post_healed: HashSet::new(),
971            peer,
972            signature: Signature::zero(),
973        };
974        header.signature_generate(&server_id.keys.signature_key)?;
975        Ok((header, server_id))
976    }
977
978    fn make_bucket_location() -> anyhow::Result<BucketLocation> {
979        BucketLocation::new(BucketType::User, Id::random(), MILLIS_IN_HOUR, TimeMillis(1_700_000_000_000))
980    }
981
982    #[tokio::test]
983    #[allow(non_snake_case)]
984    async fn test_SubmitPostClaimTokenV1_verify() -> anyhow::Result<()> {
985        let time_provider = RealTimeProvider;
986        let pow_generator = SingleThreadedPowGenerator::new();
987        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
988        let peer = server_id.to_peer(&time_provider)?;
989
990        let token = SubmitPostClaimTokenV1::new(peer, make_bucket_location()?, Id::random(), &server_id.keys.signature_key);
991        token.verify()?;
992        Ok(())
993    }
994
995    #[tokio::test]
996    #[allow(non_snake_case)]
997    async fn test_SubmitPostCommitTokenV1_verify() -> anyhow::Result<()> {
998        let time_provider = RealTimeProvider;
999        let pow_generator = SingleThreadedPowGenerator::new();
1000        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1001        let peer = server_id.to_peer(&time_provider)?;
1002
1003        let token = SubmitPostCommitTokenV1::new(peer, make_bucket_location()?, Id::random(), &server_id.keys.signature_key);
1004        token.verify()?;
1005        Ok(())
1006    }
1007
1008    #[tokio::test]
1009    #[allow(non_snake_case)]
1010    async fn test_to_from_HealPostBundleClaimV1() -> anyhow::Result<()> {
1011        let time_provider = RealTimeProvider;
1012        let (header, _) = make_signed_header(&time_provider).await?;
1013
1014        let bucket_location = make_bucket_location()?;
1015        let encoded = HealPostBundleClaimV1::new_to_bytes(&header, &bucket_location)?;
1016        let mut bytes = encoded;
1017        let decoded = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
1018
1019        assert_eq!(decoded.donor_header, header);
1020        assert_eq!(decoded.bucket_location, bucket_location);
1021        Ok(())
1022    }
1023
1024    #[tokio::test]
1025    #[allow(non_snake_case)]
1026    async fn test_to_from_HealPostBundleClaimResponseV1() -> anyhow::Result<()> {
1027        let time_provider = RealTimeProvider;
1028        let (header, server_id) = make_signed_header(&time_provider).await?;
1029        let peer = server_id.to_peer(&time_provider)?;
1030
1031        let needed_post_ids = header.encoded_post_ids[0..2].to_vec();
1032        let token = HealPostBundleClaimTokenV1::new(
1033            peer,
1034            make_bucket_location()?,
1035            needed_post_ids.clone(),
1036            header.signature,
1037            &server_id.keys.signature_key,
1038        );
1039
1040        let response = HealPostBundleClaimResponseV1 { needed_post_ids, token: Some(token) };
1041
1042        let encoded = json::struct_to_bytes(&response)?;
1043        let decoded = json::bytes_to_struct::<HealPostBundleClaimResponseV1>(&encoded)?;
1044
1045        assert_eq!(decoded, response);
1046        Ok(())
1047    }
1048
1049    #[tokio::test]
1050    #[allow(non_snake_case)]
1051    async fn test_HealPostBundleClaimTokenV1_verify() -> anyhow::Result<()> {
1052        let time_provider = RealTimeProvider;
1053        let (header, server_id) = make_signed_header(&time_provider).await?;
1054        let peer = server_id.to_peer(&time_provider)?;
1055
1056        let token = HealPostBundleClaimTokenV1::new(
1057            peer,
1058            make_bucket_location()?,
1059            header.encoded_post_ids.clone(),
1060            header.signature,
1061            &server_id.keys.signature_key,
1062        );
1063
1064        token.verify()?;
1065        Ok(())
1066    }
1067
1068    #[tokio::test]
1069    #[allow(non_snake_case)]
1070    async fn test_to_from_HealPostBundleCommitV1() -> anyhow::Result<()> {
1071        let time_provider = RealTimeProvider;
1072        let (header, server_id) = make_signed_header(&time_provider).await?;
1073        let peer = server_id.to_peer(&time_provider)?;
1074
1075        let token = HealPostBundleClaimTokenV1::new(
1076            peer,
1077            make_bucket_location()?,
1078            header.encoded_post_ids.clone(),
1079            header.signature,
1080            &server_id.keys.signature_key,
1081        );
1082
1083        let post_bytes = tools::random_bytes(512);
1084        let encoded = HealPostBundleCommitV1::new_to_bytes(&token, &header, &post_bytes)?;
1085        let mut bytes = encoded;
1086        let decoded = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
1087
1088        assert_eq!(decoded.token, token);
1089        assert_eq!(decoded.donor_header, header);
1090        assert_eq!(decoded.encoded_posts_bytes.as_ref(), post_bytes.as_slice());
1091        Ok(())
1092    }
1093
1094    #[tokio::test]
1095    #[allow(non_snake_case)]
1096    async fn test_to_from_HealPostBundleCommitResponseV1() -> anyhow::Result<()> {
1097        for accepted in [true, false] {
1098            let response = HealPostBundleCommitResponseV1 { accepted };
1099            let encoded = json::struct_to_bytes(&response)?;
1100            let decoded = json::bytes_to_struct::<HealPostBundleCommitResponseV1>(&encoded)?;
1101            assert_eq!(decoded, response);
1102        }
1103        Ok(())
1104    }
1105
1106    #[tokio::test]
1107    #[allow(non_snake_case)]
1108    async fn test_CacheRequestTokenV1_verify() -> anyhow::Result<()> {
1109        use crate::protocol::payload::payload::CacheRequestTokenV1;
1110        use crate::tools::time::MILLIS_IN_MINUTE;
1111
1112        let time_provider = RealTimeProvider;
1113        let pow_generator = SingleThreadedPowGenerator::new();
1114        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1115        let peer = server_id.to_peer(&time_provider)?;
1116        let bucket_location = make_bucket_location()?;
1117        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1118        let already_cached_peer_ids = vec![Id::random(), Id::random()];
1119
1120        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, already_cached_peer_ids, &server_id.keys.signature_key);
1121        token.verify()?;
1122        Ok(())
1123    }
1124
1125    #[tokio::test]
1126    #[allow(non_snake_case)]
1127    async fn test_to_from_CachePostBundleV1() -> anyhow::Result<()> {
1128        use crate::protocol::payload::payload::{CachePostBundleV1, CacheRequestTokenV1};
1129        use crate::tools::time::MILLIS_IN_MINUTE;
1130
1131        let time_provider = RealTimeProvider;
1132        let pow_generator = SingleThreadedPowGenerator::new();
1133        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1134        let peer = server_id.to_peer(&time_provider)?;
1135        let bucket_location = make_bucket_location()?;
1136        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1137        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, vec![], &server_id.keys.signature_key);
1138
1139        // Multiple bundles — one per originator
1140        let bundle_a = tools::random_bytes(512);
1141        let bundle_b = tools::random_bytes(256);
1142        let bundle_c = tools::random_bytes(1024);
1143        let bundles: &[&[u8]] = &[&bundle_a, &bundle_b, &bundle_c];
1144
1145        let encoded = CachePostBundleV1::new_to_bytes(&token, bundles)?;
1146        let mut bytes = encoded;
1147        let decoded = CachePostBundleV1::from_bytes(&mut bytes)?;
1148
1149        assert_eq!(decoded.token, token);
1150        assert_eq!(decoded.encoded_post_bundles.len(), 3);
1151        assert_eq!(decoded.encoded_post_bundles[0].as_ref(), bundle_a.as_slice());
1152        assert_eq!(decoded.encoded_post_bundles[1].as_ref(), bundle_b.as_slice());
1153        assert_eq!(decoded.encoded_post_bundles[2].as_ref(), bundle_c.as_slice());
1154        Ok(())
1155    }
1156
1157    #[tokio::test]
1158    #[allow(non_snake_case)]
1159    async fn test_to_from_CachePostBundleV1_empty() -> anyhow::Result<()> {
1160        use crate::protocol::payload::payload::{CachePostBundleV1, CacheRequestTokenV1};
1161        use crate::tools::time::MILLIS_IN_MINUTE;
1162
1163        let time_provider = RealTimeProvider;
1164        let pow_generator = SingleThreadedPowGenerator::new();
1165        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1166        let peer = server_id.to_peer(&time_provider)?;
1167        let bucket_location = make_bucket_location()?;
1168        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1169        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, vec![], &server_id.keys.signature_key);
1170
1171        let encoded = CachePostBundleV1::new_to_bytes(&token, &[])?;
1172        let mut bytes = encoded;
1173        let decoded = CachePostBundleV1::from_bytes(&mut bytes)?;
1174
1175        assert_eq!(decoded.token, token);
1176        assert!(decoded.encoded_post_bundles.is_empty());
1177        Ok(())
1178    }
1179
1180    #[tokio::test]
1181    #[allow(non_snake_case)]
1182    async fn test_to_from_CachePostBundleFeedbackV1() -> anyhow::Result<()> {
1183        use crate::protocol::payload::payload::{CachePostBundleFeedbackV1, CacheRequestTokenV1};
1184        use crate::tools::time::MILLIS_IN_MINUTE;
1185
1186        let time_provider = RealTimeProvider;
1187        let pow_generator = SingleThreadedPowGenerator::new();
1188        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1189        let peer = server_id.to_peer(&time_provider)?;
1190        let bucket_location = make_bucket_location()?;
1191        let expires_at = time_provider.current_time_millis() + MILLIS_IN_MINUTE;
1192        let already_cached_peer_ids = vec![Id::random()];
1193        let token = CacheRequestTokenV1::new(peer, bucket_location, expires_at, already_cached_peer_ids, &server_id.keys.signature_key);
1194
1195        let feedback_bytes = tools::random_bytes(256);
1196        let encoded = CachePostBundleFeedbackV1::new_to_bytes(&token, &feedback_bytes)?;
1197        let mut bytes = encoded;
1198        let decoded = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1199
1200        assert_eq!(decoded.token, token);
1201        assert_eq!(decoded.encoded_post_bundle_feedback_bytes.as_ref(), feedback_bytes.as_slice());
1202        Ok(())
1203    }
1204
1205    #[tokio::test]
1206    #[allow(non_snake_case)]
1207    async fn test_to_from_FetchUrlPreviewV1() -> anyhow::Result<()> {
1208        use crate::protocol::payload::payload::FetchUrlPreviewV1;
1209
1210        let url = "https://example.com/article?q=1#section";
1211        let encoded = FetchUrlPreviewV1::new_to_bytes(url)?;
1212        let mut bytes = encoded;
1213        let decoded = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1214
1215        assert_eq!(decoded.url, url);
1216        Ok(())
1217    }
1218
1219    #[tokio::test]
1220    #[allow(non_snake_case)]
1221    async fn test_to_from_FetchUrlPreviewResponseV1() -> anyhow::Result<()> {
1222        use crate::protocol::payload::payload::FetchUrlPreviewResponseV1;
1223
1224        let response = FetchUrlPreviewResponseV1 {
1225            url: "https://example.com/canonical".to_string(),
1226            title: "Example Title".to_string(),
1227            description: "A short description.".to_string(),
1228            image_url: "https://example.com/image.png".to_string(),
1229        };
1230
1231        let encoded = response.to_bytes()?;
1232        let decoded = FetchUrlPreviewResponseV1::from_bytes(&encoded)?;
1233
1234        assert_eq!(decoded, response);
1235        Ok(())
1236    }
1237
1238    // ── Robustness tests: CachePostBundleV1::from_bytes ──
1239
1240    #[test]
1241    #[allow(non_snake_case)]
1242    fn test_CachePostBundleV1_from_bytes_empty_input() {
1243        use crate::protocol::payload::payload::CachePostBundleV1;
1244        let mut bytes = Bytes::new();
1245        assert!(CachePostBundleV1::from_bytes(&mut bytes).is_err());
1246    }
1247
1248    #[test]
1249    #[allow(non_snake_case)]
1250    fn test_CachePostBundleV1_from_bytes_garbage_input() {
1251        use crate::protocol::payload::payload::CachePostBundleV1;
1252        let mut bytes = Bytes::from_static(&[0xff; 64]);
1253        assert!(CachePostBundleV1::from_bytes(&mut bytes).is_err());
1254    }
1255
1256    #[test]
1257    #[allow(non_snake_case)]
1258    fn test_CachePostBundleV1_from_bytes_truncated_count() {
1259        use crate::protocol::payload::payload::CachePostBundleV1;
1260        // Valid length-prefixed JSON for an empty-ish struct won't work here since CacheRequestTokenV1
1261        // has required fields. But a single byte after a valid-looking length prefix will fail at JSON parse.
1262        // The simplest test: just one byte (not enough for the length prefix u64).
1263        let mut bytes = Bytes::from_static(&[0x01]);
1264        assert!(CachePostBundleV1::from_bytes(&mut bytes).is_err());
1265    }
1266
1267    // ── Robustness tests: GetPostBundleResponseV1::from_bytes ──
1268
1269    #[test]
1270    #[allow(non_snake_case)]
1271    fn test_GetPostBundleResponseV1_from_bytes_empty_input() {
1272        let bytes = Bytes::new();
1273        assert!(GetPostBundleResponseV1::from_bytes(bytes).is_err());
1274    }
1275
1276    #[test]
1277    #[allow(non_snake_case)]
1278    fn test_GetPostBundleResponseV1_from_bytes_garbage_input() {
1279        let bytes = Bytes::from_static(&[0xff; 128]);
1280        assert!(GetPostBundleResponseV1::from_bytes(bytes).is_err());
1281    }
1282
1283    // ── Robustness tests: HealPostBundleClaimV1::from_bytes ──
1284
1285    #[test]
1286    #[allow(non_snake_case)]
1287    fn test_HealPostBundleClaimV1_from_bytes_empty_input() {
1288        let mut bytes = Bytes::new();
1289        assert!(HealPostBundleClaimV1::from_bytes(&mut bytes).is_err());
1290    }
1291
1292    #[test]
1293    #[allow(non_snake_case)]
1294    fn test_HealPostBundleClaimV1_from_bytes_garbage_input() {
1295        let mut bytes = Bytes::from_static(&[0xff; 64]);
1296        assert!(HealPostBundleClaimV1::from_bytes(&mut bytes).is_err());
1297    }
1298
1299    // ── Robustness tests: HealPostBundleCommitV1::from_bytes ──
1300
1301    #[test]
1302    #[allow(non_snake_case)]
1303    fn test_HealPostBundleCommitV1_from_bytes_empty_input() {
1304        let mut bytes = Bytes::new();
1305        assert!(HealPostBundleCommitV1::from_bytes(&mut bytes).is_err());
1306    }
1307
1308    #[test]
1309    #[allow(non_snake_case)]
1310    fn test_HealPostBundleCommitV1_from_bytes_garbage_input() {
1311        let mut bytes = Bytes::from_static(&[0xff; 64]);
1312        assert!(HealPostBundleCommitV1::from_bytes(&mut bytes).is_err());
1313    }
1314
1315    // ── Robustness tests: FetchUrlPreviewV1::from_bytes ──
1316
1317    #[test]
1318    #[allow(non_snake_case)]
1319    fn test_FetchUrlPreviewV1_from_bytes_empty_input() {
1320        use crate::protocol::payload::payload::FetchUrlPreviewV1;
1321        let mut bytes = Bytes::new();
1322        assert!(FetchUrlPreviewV1::from_bytes(&mut bytes).is_err());
1323    }
1324
1325    // ── Robustness tests: FetchUrlPreviewResponseV1::from_bytes ──
1326
1327    #[test]
1328    #[allow(non_snake_case)]
1329    fn test_FetchUrlPreviewResponseV1_from_bytes_empty_input() {
1330        use crate::protocol::payload::payload::FetchUrlPreviewResponseV1;
1331        let bytes = Bytes::new();
1332        assert!(FetchUrlPreviewResponseV1::from_bytes(&bytes).is_err());
1333    }
1334
1335    #[test]
1336    #[allow(non_snake_case)]
1337    fn test_FetchUrlPreviewResponseV1_from_bytes_garbage_input() {
1338        use crate::protocol::payload::payload::FetchUrlPreviewResponseV1;
1339        let bytes = Bytes::from_static(&[0xff; 64]);
1340        assert!(FetchUrlPreviewResponseV1::from_bytes(&bytes).is_err());
1341    }
1342
1343    // ── Robustness tests: SubmitPostClaimV1::from_bytes ──
1344
1345    #[test]
1346    #[allow(non_snake_case)]
1347    fn test_SubmitPostClaimV1_from_bytes_empty_input() {
1348        use crate::protocol::payload::payload::SubmitPostClaimV1;
1349        let mut bytes = Bytes::new();
1350        assert!(SubmitPostClaimV1::from_bytes(&mut bytes).is_err());
1351    }
1352
1353    #[test]
1354    #[allow(non_snake_case)]
1355    fn test_SubmitPostClaimV1_from_bytes_garbage_input() {
1356        use crate::protocol::payload::payload::SubmitPostClaimV1;
1357        let mut bytes = Bytes::from_static(&[0xff; 64]);
1358        assert!(SubmitPostClaimV1::from_bytes(&mut bytes).is_err());
1359    }
1360
1361    // ── Robustness tests: SubmitPostCommitV1::from_bytes ──
1362
1363    #[test]
1364    #[allow(non_snake_case)]
1365    fn test_SubmitPostCommitV1_from_bytes_empty_input() {
1366        use crate::protocol::payload::payload::SubmitPostCommitV1;
1367        let mut bytes = Bytes::new();
1368        assert!(SubmitPostCommitV1::from_bytes(&mut bytes).is_err());
1369    }
1370
1371    // ── Robustness tests: SubmitPostFeedbackV1::from_bytes ──
1372
1373    #[test]
1374    #[allow(non_snake_case)]
1375    fn test_SubmitPostFeedbackV1_from_bytes_empty_input() {
1376        use crate::protocol::payload::payload::SubmitPostFeedbackV1;
1377        let mut bytes = Bytes::new();
1378        assert!(SubmitPostFeedbackV1::from_bytes(&mut bytes).is_err());
1379    }
1380
1381    // ── Robustness tests: GetPostBundleFeedbackResponseV1::from_bytes ──
1382
1383    #[test]
1384    #[allow(non_snake_case)]
1385    fn test_GetPostBundleFeedbackResponseV1_from_bytes_empty_input() {
1386        use crate::protocol::payload::payload::GetPostBundleFeedbackResponseV1;
1387        let bytes = Bytes::new();
1388        assert!(GetPostBundleFeedbackResponseV1::from_bytes(bytes).is_err());
1389    }
1390
1391    // ── Robustness tests: CachePostBundleFeedbackV1::from_bytes ──
1392
1393    #[test]
1394    #[allow(non_snake_case)]
1395    fn test_CachePostBundleFeedbackV1_from_bytes_empty_input() {
1396        use crate::protocol::payload::payload::CachePostBundleFeedbackV1;
1397        let mut bytes = Bytes::new();
1398        assert!(CachePostBundleFeedbackV1::from_bytes(&mut bytes).is_err());
1399    }
1400
1401    #[cfg(not(target_arch = "wasm32"))]
1402    mod bolero_fuzz {
1403        use bytes::Bytes;
1404
1405        #[test]
1406        #[allow(non_snake_case)]
1407        fn fuzz_CachePostBundleV1_from_bytes() {
1408            use crate::protocol::payload::payload::CachePostBundleV1;
1409            bolero::check!().for_each(|data: &[u8]| {
1410                let mut bytes = Bytes::copy_from_slice(data);
1411                let _ = CachePostBundleV1::from_bytes(&mut bytes);
1412            });
1413        }
1414
1415        #[test]
1416        #[allow(non_snake_case)]
1417        fn fuzz_GetPostBundleResponseV1_from_bytes() {
1418            use crate::protocol::payload::payload::GetPostBundleResponseV1;
1419            bolero::check!().for_each(|data: &[u8]| {
1420                let _ = GetPostBundleResponseV1::from_bytes(Bytes::copy_from_slice(data));
1421            });
1422        }
1423
1424        #[test]
1425        #[allow(non_snake_case)]
1426        fn fuzz_GetPostBundleFeedbackResponseV1_from_bytes() {
1427            use crate::protocol::payload::payload::GetPostBundleFeedbackResponseV1;
1428            bolero::check!().for_each(|data: &[u8]| {
1429                let _ = GetPostBundleFeedbackResponseV1::from_bytes(Bytes::copy_from_slice(data));
1430            });
1431        }
1432
1433        #[test]
1434        #[allow(non_snake_case)]
1435        fn fuzz_SubmitPostClaimV1_from_bytes() {
1436            use crate::protocol::payload::payload::SubmitPostClaimV1;
1437            bolero::check!().for_each(|data: &[u8]| {
1438                let mut bytes = Bytes::copy_from_slice(data);
1439                let _ = SubmitPostClaimV1::from_bytes(&mut bytes);
1440            });
1441        }
1442
1443        #[test]
1444        #[allow(non_snake_case)]
1445        fn fuzz_HealPostBundleClaimV1_from_bytes() {
1446            use crate::protocol::payload::payload::HealPostBundleClaimV1;
1447            bolero::check!().for_each(|data: &[u8]| {
1448                let mut bytes = Bytes::copy_from_slice(data);
1449                let _ = HealPostBundleClaimV1::from_bytes(&mut bytes);
1450            });
1451        }
1452
1453        #[test]
1454        #[allow(non_snake_case)]
1455        fn fuzz_HealPostBundleCommitV1_from_bytes() {
1456            use crate::protocol::payload::payload::HealPostBundleCommitV1;
1457            bolero::check!().for_each(|data: &[u8]| {
1458                let mut bytes = Bytes::copy_from_slice(data);
1459                let _ = HealPostBundleCommitV1::from_bytes(&mut bytes);
1460            });
1461        }
1462    }
1463
1464    #[test]
1465    #[allow(non_snake_case)]
1466    fn test_PAYLOAD_REQUEST_KIND_COUNT_matches_variants() {
1467        use crate::protocol::payload::payload::{PayloadRequestKind, PAYLOAD_REQUEST_KIND_COUNT};
1468        let last_variant = PayloadRequestKind::from_u16((PAYLOAD_REQUEST_KIND_COUNT - 1) as u16)
1469            .expect("PAYLOAD_REQUEST_KIND_COUNT - 1 must decode to a valid variant");
1470        assert_eq!(last_variant, PayloadRequestKind::AnnounceV2, "last variant changed; bump PAYLOAD_REQUEST_KIND_COUNT");
1471        assert!(PayloadRequestKind::from_u16(PAYLOAD_REQUEST_KIND_COUNT as u16).is_err(), "PAYLOAD_REQUEST_KIND_COUNT must equal the variant count");
1472    }
1473
1474    #[test]
1475    #[allow(non_snake_case)]
1476    fn test_to_from_PeerStatsRequestV1() -> anyhow::Result<()> {
1477        use crate::protocol::payload::payload::PeerStatsRequestV1;
1478        let request = PeerStatsRequestV1 {};
1479        let encoded = request.to_bytes()?;
1480        let decoded = PeerStatsRequestV1::from_bytes(&encoded)?;
1481        assert_eq!(decoded, request);
1482        Ok(())
1483    }
1484
1485    #[tokio::test]
1486    #[allow(non_snake_case)]
1487    async fn test_to_from_PeerStatsResponseV1() -> anyhow::Result<()> {
1488        use crate::protocol::payload::payload::PeerStatsResponseV1;
1489        use crate::tools::compression;
1490
1491        let time_provider = RealTimeProvider;
1492        let pow_generator = SingleThreadedPowGenerator::new();
1493        let server_id = ServerId::new("own_pow", &time_provider, Pow(4), true, &pow_generator).await?;
1494        let peer = server_id.to_peer(&time_provider)?;
1495
1496        let doc = serde_json::json!({ "requests": { "PingV1": 7 }, "system": { "memory_total_bytes": 1234 } });
1497        let json_bytes = serde_json::to_vec(&doc)?;
1498        let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
1499
1500        let timestamp = time_provider.current_time_millis();
1501        let signing_input = PeerStatsResponseV1::signing_input(timestamp, &json_compressed);
1502        let signature = crate::tools::signing::sign(&server_id.keys.signature_key, &signing_input);
1503
1504        let response = PeerStatsResponseV1 { peer, timestamp, json_compressed, signature };
1505        let encoded = response.to_bytes()?;
1506        let decoded = PeerStatsResponseV1::from_bytes(&encoded)?;
1507        assert_eq!(decoded, response);
1508        Ok(())
1509    }
1510}