Skip to main content

hashiverse_lib/protocol/rpc/
rpc.rs

1//! # High-level RPC client helpers
2//!
3//! The user-facing side of the RPC layer. Where
4//! [`crate::protocol::rpc::rpc_request`] and
5//! [`crate::protocol::rpc::rpc_response`] deal with packet bytes, this module exposes
6//! end-to-end helpers that:
7//!
8//! 1. **Encode** the request with the right PoW difficulty (higher when the peer is
9//!    unknown, lower when the peer is already in the tracker — see
10//!    [`rpc_server_known_with_requisite_pow`]).
11//! 2. **Dispatch** over the supplied transport.
12//! 3. **Decode** the response, verifying the signature over the request's
13//!    `pow_content_hash` — so a reply can only have come from the specific server we
14//!    addressed, not a replay from another peer.
15//! 4. **Return** typed payloads or an `ErrorResponseV1`.
16//!
17//! Call sites either use [`rpc_server_unknown`] (first contact / bootstrap) or
18//! [`rpc_server_known`] (everything else). Trust-sensitive handlers can opt into a
19//! higher PoW floor via the `_with_requisite_pow` variant.
20
21use bytes::Bytes;
22use crate::protocol::payload::payload::{ErrorResponseV1, PayloadRequestKind, PayloadResponseKind};
23use crate::protocol::peer::Peer;
24use crate::protocol::rpc::rpc_request::{RpcRequestPacketTx, RpcRequestPacketTxFlags};
25use crate::protocol::rpc::rpc_response::RpcResponsePacketRx;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::types::{Id, PQCommitmentBytes, Pow, VerificationKeyBytes};
28use crate::tools::{config, json};
29
30pub async fn rpc_server_unknown(
31    runtime_services: &RuntimeServices,
32    sponsor_id: &Id,
33    destination_address: &str,
34    payload_request_kind: PayloadRequestKind,
35    payload: Bytes,
36) -> anyhow::Result<RpcResponsePacketRx> {
37    let destination_id = Id::zero();
38    let destination_verification_key = VerificationKeyBytes::zero();
39    let destination_pq_commitment_bytes = PQCommitmentBytes::zero();
40    let flags = RpcRequestPacketTxFlags::COMPRESSED;
41    rpc_server_xxx(
42        runtime_services,
43        config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN,
44        flags,
45        sponsor_id,
46        destination_address,
47        &destination_id,
48        &destination_verification_key,
49        &destination_pq_commitment_bytes,
50        payload_request_kind,
51        payload,
52    )
53    .await
54}
55
56pub async fn rpc_server_known(
57    runtime_services: &RuntimeServices,
58    sponsor_id: &Id,
59    destination_peer: &Peer,
60    payload_request_kind: PayloadRequestKind,
61    payload: Bytes,
62) -> anyhow::Result<RpcResponsePacketRx> {
63    let flags = RpcRequestPacketTxFlags::COMPRESSED | RpcRequestPacketTxFlags::SERVER_KNOWN;
64    rpc_server_xxx(
65        runtime_services,
66        config::POW_MINIMUM_PER_RPC_SERVER_KNOWN,
67        flags,
68        sponsor_id,
69        &destination_peer.address,
70        &destination_peer.id,
71        &destination_peer.verification_key_bytes,
72        &destination_peer.pq_commitment_bytes,
73        payload_request_kind,
74        payload,
75    )
76    .await
77}
78
79pub async fn rpc_server_known_with_no_compression(
80    runtime_services: &RuntimeServices,
81    sponsor_id: &Id,
82    destination_peer: &Peer,
83    payload_request_kind: PayloadRequestKind,
84    payload: Bytes,
85) -> anyhow::Result<RpcResponsePacketRx> {
86    let flags = RpcRequestPacketTxFlags::SERVER_KNOWN;
87    rpc_server_xxx(
88        runtime_services,
89        config::POW_MINIMUM_PER_RPC_SERVER_KNOWN,
90        flags,
91        sponsor_id,
92        &destination_peer.address,
93        &destination_peer.id,
94        &destination_peer.verification_key_bytes,
95        &destination_peer.pq_commitment_bytes,
96        payload_request_kind,
97        payload,
98    )
99    .await
100}
101
102pub async fn rpc_server_known_with_requisite_pow(
103    runtime_services: &RuntimeServices,
104    sponsor_id: &Id,
105    destination_peer: &Peer,
106    payload_request_kind: PayloadRequestKind,
107    payload: Bytes,
108    requisite_pow: Pow,
109) -> anyhow::Result<RpcResponsePacketRx> {
110    let flags = RpcRequestPacketTxFlags::COMPRESSED | RpcRequestPacketTxFlags::SERVER_KNOWN;
111    rpc_server_xxx(
112        runtime_services,
113        requisite_pow,
114        flags,
115        sponsor_id,
116        &destination_peer.address,
117        &destination_peer.id,
118        &destination_peer.verification_key_bytes,
119        &destination_peer.pq_commitment_bytes,
120        payload_request_kind,
121        payload,
122    )
123    .await
124}
125
126pub async fn rpc_server_known_with_requisite_pow_and_no_compression(
127    runtime_services: &RuntimeServices,
128    sponsor_id: &Id,
129    destination_peer: &Peer,
130    payload_request_kind: PayloadRequestKind,
131    payload: Bytes,
132    requisite_pow: Pow,
133) -> anyhow::Result<RpcResponsePacketRx> {
134    let flags = RpcRequestPacketTxFlags::SERVER_KNOWN;
135    rpc_server_xxx(
136        runtime_services,
137        requisite_pow,
138        flags,
139        sponsor_id,
140        &destination_peer.address,
141        &destination_peer.id,
142        &destination_peer.verification_key_bytes,
143        &destination_peer.pq_commitment_bytes,
144        payload_request_kind,
145        payload,
146    )
147    .await
148}
149
150#[allow(clippy::too_many_arguments)] // protocol layer — each arg is part of the RPC envelope; bundling into a struct would just rename them
151async fn rpc_server_xxx(
152    runtime_services: &RuntimeServices,
153    pow_minimum_per_rpc: Pow,
154    flags: RpcRequestPacketTxFlags,
155    sponsor_id: &Id,
156    destination_address: &str,
157    destination_id: &Id,
158    destination_verification_key_bytes: &VerificationKeyBytes,
159    destination_pq_commitment_bytes: &PQCommitmentBytes,
160    payload_request_kind: PayloadRequestKind,
161    payload: Bytes,
162) -> anyhow::Result<RpcResponsePacketRx> {
163    let rpc_request_packet = RpcRequestPacketTx::encode(runtime_services.time_provider.as_ref(), pow_minimum_per_rpc, flags, payload_request_kind, sponsor_id, destination_verification_key_bytes, destination_pq_commitment_bytes, payload, runtime_services.pow_generator.as_ref()).await?;
164
165    let response_bytes = runtime_services.transport_factory.rpc(destination_address, rpc_request_packet.bytes).await?;
166
167    let rpc_response_packet = RpcResponsePacketRx::decode(destination_id, &rpc_request_packet.pow_content_hash, config::SERVER_KEY_POW_MIN, response_bytes)?;
168
169    // Check if there was a server error
170    if rpc_response_packet.response_request_kind == PayloadResponseKind::ErrorResponseV1 {
171        let response = json::bytes_to_struct::<ErrorResponseV1>(&rpc_response_packet.bytes)?;
172        return Err(anyhow::anyhow!("server error {}: {}", response.code, response.message));
173    }
174
175    Ok(rpc_response_packet)
176}
177
178#[cfg(test)]
179mod tests {
180    use std::sync::Arc;
181    use bytes::Bytes;
182    use crate::protocol::payload::payload::{PayloadRequestKind, PayloadResponseKind};
183    use crate::protocol::rpc::rpc_request::{RpcRequestPacketRx, RpcRequestPacketTx, RpcRequestPacketTxFlags};
184    use crate::protocol::rpc::rpc_response::{RpcResponsePacketRx, RpcResponsePacketTx, RpcResponsePacketTxFlags};
185    use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
186    use crate::tools::server_id::ServerId;
187    use crate::tools::time_provider::time_provider::RealTimeProvider;
188    use crate::tools::tools;
189    use crate::tools::{types::{Id, PQCommitmentBytes, Pow, VerificationKeyBytes}, BytesGatherer};
190    use log::trace;
191    use crate::tools::runtime_services::RuntimeServices;
192
193    #[tokio::test]
194    async fn rpc_request_packet_txrx() -> anyhow::Result<()> {
195        let runtime_services = RuntimeServices::default_for_testing();
196
197        let pow_min = Pow(12);
198        let pow_minimum_per_rpc = Pow(12);
199
200        let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), pow_min, true, runtime_services.pow_generator.as_ref()).await?;
201        trace!("server_id.pow={}", server_id.pow);
202        let payload_request_kind = PayloadRequestKind::AnnounceV1;
203        let mut payload_request = [0u8; 1024];
204        tools::random_fill_bytes(&mut payload_request);
205        let request_flags = RpcRequestPacketTxFlags::COMPRESSED | RpcRequestPacketTxFlags::SERVER_KNOWN;
206
207        let rpc_request_packet_tx = RpcRequestPacketTx::encode(
208            runtime_services.time_provider.as_ref(),
209            pow_minimum_per_rpc,
210            request_flags,
211            payload_request_kind.clone(),
212            &server_id.id,
213            &server_id.keys.verification_key_bytes,
214            &server_id.keys.pq_commitment_bytes,
215            Bytes::copy_from_slice(&payload_request),
216            runtime_services.pow_generator.as_ref(),
217        )
218        .await?;
219
220        let rpc_request_packet_rx = RpcRequestPacketRx::decode(&server_id.timestamp, &server_id.keys.verification_key_bytes, &server_id.keys.pq_commitment_bytes, rpc_request_packet_tx.bytes)?;
221        assert_eq!(payload_request_kind, rpc_request_packet_rx.payload_request_kind);
222        assert_eq!(payload_request, rpc_request_packet_rx.bytes.as_ref());
223        assert!(rpc_request_packet_rx.pow_server_known);
224
225        let mut payload_response = [0u8; 1024];
226        tools::random_fill_bytes(&mut payload_response);
227        let response_flags = RpcResponsePacketTxFlags::COMPRESSED;
228
229        let gatherer = RpcResponsePacketTx::encode(
230            &server_id.keys.signature_key,
231            &server_id.keys.verification_key_bytes,
232            &server_id.keys.pq_commitment_bytes,
233            &server_id.sponsor_id,
234            &server_id.timestamp,
235            &server_id.hash,
236            &server_id.salt,
237            &rpc_request_packet_rx.pow_content_hash,
238            response_flags,
239            PayloadResponseKind::AnnounceResponseV1,
240            BytesGatherer::from_bytes(Bytes::copy_from_slice(&payload_response)),
241        )?;
242
243        let rpc_response_packet_rx = RpcResponsePacketRx::decode(&server_id.id, &rpc_request_packet_rx.pow_content_hash, pow_min, gatherer.to_bytes())?;
244
245        assert_eq!(rpc_response_packet_rx.response_request_kind, PayloadResponseKind::AnnounceResponseV1);
246        assert_eq!(rpc_response_packet_rx.bytes.as_ref(), payload_response);
247
248        Ok(())
249    }
250
251    #[tokio::test]
252    async fn rpc_request_packet_txrx_server_unknown() -> anyhow::Result<()> {
253        let time_provider = RealTimeProvider;
254        let pow_generator: Arc<dyn crate::tools::pow_generator::pow_generator::PowGenerator> = Arc::new(SingleThreadedPowGenerator::new());
255
256        let pow_min_for_server_id = Pow(12);
257        let pow_min_for_rpc = Pow(12);
258
259        let server_id = ServerId::new("own_pow", &time_provider, pow_min_for_server_id, true, pow_generator.as_ref()).await?;
260        let payload_request_kind = PayloadRequestKind::AnnounceV1;
261        let mut payload_request = [0u8; 1024];
262        tools::random_fill_bytes(&mut payload_request);
263        let flags = RpcRequestPacketTxFlags::COMPRESSED;
264
265        let rpc_request_packet_tx = RpcRequestPacketTx::encode(&time_provider, pow_min_for_rpc, flags, payload_request_kind.clone(), &Id::zero(), &VerificationKeyBytes::zero(), &PQCommitmentBytes::zero(), Bytes::copy_from_slice(&payload_request), pow_generator.as_ref()).await?;
266
267        let rpc_request_packet_rx = RpcRequestPacketRx::decode(&server_id.timestamp, &server_id.keys.verification_key_bytes, &server_id.keys.pq_commitment_bytes, rpc_request_packet_tx.bytes)?;
268        assert_eq!(payload_request_kind, rpc_request_packet_rx.payload_request_kind);
269        assert_eq!(payload_request, rpc_request_packet_rx.bytes.as_ref());
270        assert!(!rpc_request_packet_rx.pow_server_known);
271
272        let mut payload_response = [0u8; 1024];
273        tools::random_fill_bytes(&mut payload_response);
274        let response_flags = RpcResponsePacketTxFlags::COMPRESSED;
275        let gatherer = RpcResponsePacketTx::encode(
276            &server_id.keys.signature_key,
277            &server_id.keys.verification_key_bytes,
278            &server_id.keys.pq_commitment_bytes,
279            &server_id.sponsor_id,
280            &server_id.timestamp,
281            &server_id.hash,
282            &server_id.salt,
283            &rpc_request_packet_rx.pow_content_hash,
284            response_flags,
285            PayloadResponseKind::AnnounceResponseV1,
286            BytesGatherer::from_bytes(Bytes::copy_from_slice(&payload_response)),
287        )?;
288
289        let rpc_response_packet_rx = RpcResponsePacketRx::decode(&Id::zero(), &rpc_request_packet_rx.pow_content_hash, pow_min_for_server_id, gatherer.to_bytes())?;
290
291        assert_eq!(rpc_response_packet_rx.response_request_kind, PayloadResponseKind::AnnounceResponseV1);
292        assert_eq!(rpc_response_packet_rx.bytes.as_ref(), payload_response);
293
294        Ok(())
295    }
296
297    // ── Robustness tests: RpcRequestPacketRx::decode ──
298
299    use crate::tools::time::TimeMillis;
300    use crate::tools::hashing;
301
302    #[test]
303    fn rpc_request_decode_empty_input() {
304        let timestamp = TimeMillis::zero();
305        let verification_key_bytes = VerificationKeyBytes::zero();
306        let pq_commitment_bytes = PQCommitmentBytes::zero();
307        assert!(RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::new()).is_err());
308    }
309
310    #[test]
311    fn rpc_request_decode_single_byte() {
312        let timestamp = TimeMillis::zero();
313        let verification_key_bytes = VerificationKeyBytes::zero();
314        let pq_commitment_bytes = PQCommitmentBytes::zero();
315        assert!(RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::from_static(&[1u8])).is_err());
316    }
317
318    #[test]
319    fn rpc_request_decode_garbage() {
320        let timestamp = TimeMillis::zero();
321        let verification_key_bytes = VerificationKeyBytes::zero();
322        let pq_commitment_bytes = PQCommitmentBytes::zero();
323        assert!(RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::from_static(&[0xff; 256])).is_err());
324    }
325
326    // ── Robustness tests: RpcResponsePacketRx::decode ──
327
328    #[test]
329    fn rpc_response_decode_empty_input() {
330        let destination_id = Id::zero();
331        let pow_content_hash = hashing::hash(&[0u8]);
332        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::new()).is_err());
333    }
334
335    #[test]
336    fn rpc_response_decode_single_byte() {
337        let destination_id = Id::zero();
338        let pow_content_hash = hashing::hash(&[0u8]);
339        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::from_static(&[1u8])).is_err());
340    }
341
342    #[test]
343    fn rpc_response_decode_garbage() {
344        let destination_id = Id::zero();
345        let pow_content_hash = hashing::hash(&[0u8]);
346        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::from_static(&[0xff; 256])).is_err());
347    }
348
349    #[test]
350    fn rpc_response_decode_header_too_short_for_payload_len() {
351        // The header up to (but not including) the u32 payload_len field is 212 bytes.
352        // Providing 214 bytes (header + 2) used to pass the old `+ 2` minimum-length check
353        // but panic on get_u32_le() which needs 4 bytes. Now correctly rejected.
354        let mut data = vec![0u8; 214];
355        data[0] = 1; // version
356        let destination_id = Id::zero();
357        let pow_content_hash = hashing::hash(&[0u8]);
358        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::from(data)).is_err());
359    }
360
361    #[cfg(not(target_arch = "wasm32"))]
362    mod bolero_fuzz {
363        use bytes::Bytes;
364        use crate::protocol::rpc::rpc_request::RpcRequestPacketRx;
365        use crate::protocol::rpc::rpc_response::RpcResponsePacketRx;
366        use crate::tools::types::{Id, PQCommitmentBytes, Pow, VerificationKeyBytes};
367        use crate::tools::time::TimeMillis;
368        use crate::tools::hashing;
369
370        #[test]
371        fn fuzz_rpc_request_decode() {
372            bolero::check!().for_each(|data: &[u8]| {
373                let timestamp = TimeMillis::zero();
374                let verification_key_bytes = VerificationKeyBytes::zero();
375                let pq_commitment_bytes = PQCommitmentBytes::zero();
376                let _ = RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::copy_from_slice(data));
377            });
378        }
379
380        #[test]
381        fn fuzz_rpc_response_decode() {
382            bolero::check!().for_each(|data: &[u8]| {
383                let destination_id = Id::zero();
384                let pow_content_hash = hashing::hash(&[0u8]);
385                let _ = RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::copy_from_slice(data));
386            });
387        }
388    }
389}