Skip to main content

hashiverse_lib/protocol/rpc/
rpc_request.rs

1//! # RPC request packet encode / decode
2//!
3//! The wire format of every outgoing and incoming RPC request. Split into a strict
4//! Tx / Rx pair so the compiler enforces the asymmetry between "I'm building a request"
5//! and "I'm parsing one":
6//!
7//! - [`RpcRequestPacketTx`] — build a request. `encode` optionally compresses the
8//!   payload, runs the PoW search (via a
9//!   [`crate::tools::parallel_pow_generator::ParallelPowGenerator`]) targeting the
10//!   destination server's identity, and produces a header containing the discriminator,
11//!   sponsor id, PoW timestamp, content hash, salt, and payload length.
12//! - [`RpcRequestPacketRx`] — parse a request on the receive side. Extracts the header,
13//!   verifies PoW against the receiving server's own identity, rejects under-powered
14//!   or stale requests (via [`crate::tools::config::POW_MAX_CLOCK_DRIFT_MILLIS`]) before
15//!   any payload work happens.
16//!
17//! The PoW sits in the *header* and binds to the server's identity, so a request valid
18//! for server A can't be forwarded to server B and still authenticate.
19
20use crate::protocol::payload::payload::PayloadRequestKind;
21use crate::tools::parallel_pow_generator::ParallelPowGenerator;
22use crate::tools::server_id::ServerId;
23use crate::tools::time::{TimeMillis, TimeMillisBytes, TIME_MILLIS_BYTES};
24use crate::tools::time_provider::time_provider::TimeProvider;
25use crate::tools::types::{Hash, Id, PQCommitmentBytes, Pow, Salt, VerificationKeyBytes, HASH_BYTES, ID_BYTES, SALT_BYTES};
26use crate::tools::{compression, config, hashing};
27use bitflags::bitflags;
28use bytes::{Buf, BufMut, Bytes, BytesMut};
29
30bitflags! {
31    pub struct RpcRequestPacketTxFlags: u8 {
32        const COMPRESSED = 1 << 0;
33        const SERVER_KNOWN = 1 << 1;
34    }
35}
36
37/// An outbound RPC request, freshly encoded and ready to be sent over a [`crate::transport::TransportFactory`].
38///
39/// The "Tx" (transmit) side owns the encode path. Constructing one is expensive: in addition
40/// to compressing the payload and prefixing the header, `encode` performs a proof-of-work
41/// search against the destination server's identity via [`ParallelPowGenerator`]. The `pow`
42/// sits in the wire header so the server can reject under-powered requests cheaply before
43/// doing any payload work. `pow_content_hash` is retained on the struct so the caller can
44/// later verify that the response was produced for *this* specific request (see
45/// [`RpcResponsePacketRx`]).
46///
47/// This type and its "Rx" counterpart [`RpcRequestPacketRx`] are intentionally separate so
48/// the encode side and the decode side cannot be accidentally mixed up.
49pub struct RpcRequestPacketTx {
50    pub pow_content_hash: Hash,
51    pub bytes: Bytes,
52}
53impl RpcRequestPacketTx {
54    pub async fn encode(
55        time_provider: &dyn TimeProvider,
56        pow_minimum_per_rpc: Pow,
57        flags: RpcRequestPacketTxFlags,
58        payload_request_kind: PayloadRequestKind,
59        pow_sponsor_id: &Id,
60        destination_verification_key_bytes: &VerificationKeyBytes,
61        destination_pq_commitment_bytes: &PQCommitmentBytes,
62        payload_uncompressed: Bytes,
63        pow_generator: &dyn ParallelPowGenerator,
64    ) -> anyhow::Result<Self> {
65        // Do we actually need to compress this?
66        let payload_compressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
67            true => compression::compress_for_speed(payload_uncompressed.as_ref())?.to_bytes(),
68            false => payload_uncompressed,
69        };
70
71        // Check that it is not too large...
72        if payload_compressed.len() > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
73            anyhow::bail!("request payload size exceeds maximum allowed size: {} > {}", payload_compressed.len(), config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
74        }
75
76        // This will be needed when we verify that the server processed our request
77        let pow_content_hash: Hash = hashing::hash(payload_compressed.as_ref());
78
79        // Do some proof of work on behalf of the destination server, sponsored by the caller
80        let pow_label = format!("rpc:{}", payload_request_kind);
81        let (pow_timestamp, pow_salt, _, _) = ServerId::pow_generate(&pow_label, time_provider, pow_minimum_per_rpc, pow_sponsor_id, destination_verification_key_bytes, destination_pq_commitment_bytes, &pow_content_hash, pow_generator).await?;
82
83        // Write the header
84        let mut bytes_mut = BytesMut::with_capacity(size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() + payload_compressed.len());
85        bytes_mut.put_u8(1);
86        bytes_mut.put_u8(flags.bits());
87        bytes_mut.put_u16_le(payload_request_kind as u16);
88        bytes_mut.put_slice(pow_sponsor_id.as_ref());
89        bytes_mut.put_slice(pow_timestamp.encode_be().as_ref());
90        bytes_mut.put_slice(pow_content_hash.as_ref());
91        bytes_mut.put_slice(pow_salt.as_ref());
92        bytes_mut.put_u32_le(payload_compressed.len() as u32);
93        bytes_mut.put_slice(&payload_compressed);
94
95        let bytes = bytes_mut.freeze();
96
97        Ok(Self { pow_content_hash, bytes })
98    }
99}
100
101/// The server-side view of an inbound RPC request after header parsing and PoW verification.
102///
103/// `RpcRequestPacketRx` is what falls out of `decode` on the receiving end. It exposes the
104/// routing discriminator ([`PayloadRequestKind`]) so the dispatcher knows which handler to
105/// invoke, plus all the PoW fields needed to decide how much trust to assign to the call.
106/// The `bytes` field still contains the compressed payload — payload decoding happens later,
107/// after the dispatcher has picked the right handler.
108///
109/// Paired with [`RpcRequestPacketTx`] as the decode side of the same wire format.
110#[derive(Debug)]
111pub struct RpcRequestPacketRx {
112    pub payload_request_kind: PayloadRequestKind,
113    pub pow_sponsor_id: Id,
114    pub pow_server_known: bool,
115    pub pow: Pow,
116    pub pow_timestamp: TimeMillis,
117    pub pow_content_hash: Hash,
118    pub pow_salt: Salt,
119    pub bytes: Bytes,
120}
121impl RpcRequestPacketRx {
122    pub fn decode(current_time_millis: &TimeMillis, verification_key_bytes: &VerificationKeyBytes, pq_commitment_bytes: &PQCommitmentBytes, mut response_bytes: Bytes) -> anyhow::Result<Self> {
123        // Do we have enough bytes for the header?
124        if response_bytes.len() < size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() {
125            anyhow::bail!("RpcRequestPacket is too short for header");
126        }
127
128        let version = response_bytes.get_u8();
129        if 1 != version {
130            anyhow::bail!("Unsupported RpcRequestPacket version: {}", version);
131        }
132
133        let flags = RpcRequestPacketTxFlags::from_bits(response_bytes.get_u8()).ok_or_else(|| anyhow::anyhow!("Invalid RpcRequestPacket flags"))?;
134        let payload_request_kind = PayloadRequestKind::from_u16(response_bytes.get_u16_le())?;
135        let pow_sponsor_id = Id::from_slice(response_bytes.slice(..ID_BYTES).as_ref())?;
136        response_bytes.advance(ID_BYTES);
137        let pow_timestamp_bytes: TimeMillisBytes = TimeMillisBytes::from_bytes(response_bytes.slice(..TIME_MILLIS_BYTES).as_ref())?;
138        response_bytes.advance(TIME_MILLIS_BYTES);
139        let pow_content_hash: Hash = Hash::from_slice(response_bytes.slice(..HASH_BYTES).as_ref())?;
140        response_bytes.advance(HASH_BYTES);
141        let pow_salt = Salt::from_slice(response_bytes.slice(..SALT_BYTES).as_ref())?;
142        response_bytes.advance(SALT_BYTES);
143
144        // Is the packet timestamp close enough to ours?
145        let pow_timestamp = TimeMillis::timestamp_decode_be(&pow_timestamp_bytes);
146        {
147            let delta_millis = (*current_time_millis - pow_timestamp).abs();
148            if delta_millis.0 > config::POW_MAX_CLOCK_DRIFT_MILLIS.0 {
149                anyhow::bail!("Client pow clock drift too large: us={} them={}", current_time_millis, pow_timestamp);
150            }
151        }
152
153        // Has the client done enough pow for us?
154        let (pow, pow_server_known) = match flags.contains(RpcRequestPacketTxFlags::SERVER_KNOWN) {
155            true => {
156                let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, verification_key_bytes, pq_commitment_bytes, &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
157                if pow < config::POW_MINIMUM_PER_RPC_SERVER_KNOWN {
158                    anyhow::bail!("Client has not done enough known pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_KNOWN);
159                }
160                (pow, true)
161            }
162            false => {
163                let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, &VerificationKeyBytes::zero(), &PQCommitmentBytes::zero(), &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
164                if pow < config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN {
165                    anyhow::bail!("Client has not done enough unknown pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN);
166                }
167                (pow, false)
168            }
169        };
170
171        let request_payload_len = response_bytes.get_u32_le() as usize;
172
173        if request_payload_len > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
174            anyhow::bail!("RpcRequestPacket payload too large: {} > {}", request_payload_len, config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
175        }
176
177        // Do we have enough bytes for the payload?
178        if response_bytes.len() < request_payload_len {
179            anyhow::bail!("RpcRequestPacket is too short for payload");
180        }
181
182        let response_payload = response_bytes.slice(..request_payload_len);
183        response_bytes.advance(request_payload_len);
184
185        // Sanity check - are we done?
186        if !response_bytes.is_empty() {
187            anyhow::bail!("RpcRequestPacket is too long");
188        }
189
190        // Do we need to decompress?
191        let response_payload_decompressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
192            true => compression::decompress(response_payload.as_ref())?.to_bytes(),
193            false => response_payload,
194        };
195
196        let selfie = Self {
197            payload_request_kind,
198            pow_sponsor_id,
199            pow_server_known,
200            pow,
201            pow_timestamp,
202            pow_content_hash,
203            pow_salt,
204            bytes: response_payload_decompressed,
205        };
206        // trace!("Decoded RpcRequestPacketRx={:?}", selfie);
207
208        Ok(selfie)
209    }
210}
211