Skip to main content

hashiverse_lib/client/peer_tracker/
peer_tracker.rs

1//! # Kademlia-style local peer set
2//!
3//! `PeerTracker` is the client's working memory of the network: a deduplicated list of
4//! [`crate::protocol::peer::Peer`] records loaded from `BUCKET_PEER`, validated, and
5//! indexed by XOR distance so [`crate::client::peer_tracker::peer_iterator::PeerIterator`]
6//! can walk them in order of closeness to any target
7//! [`crate::tools::types::Id`].
8//!
9//! Responsibilities:
10//!
11//! - **Bootstrap** — when the tracker is empty (new install, wiped storage) it calls
12//!   `BootstrapV1` against seed domains from [`crate::tools::config::BOOTSTRAP_DOMAINS`]
13//!   to obtain a starting set of peers.
14//! - **Freshness** — stale peers (missed announces, failed RPCs) are demoted or dropped;
15//!   fresh ones from gossip (`AnnounceV1`) or from RPC responses are folded in.
16//! - **Flush** — peer mutations set `peers_need_flush`; the outer client batches those
17//!   into periodic writes to storage so every RPC response doesn't trigger a disk write.
18
19use crate::anyhow_assert_eq;
20use crate::client::client_storage::client_storage;
21use crate::client::client_storage::client_storage::{ClientStorage, BUCKET_PEER};
22use crate::client::peer_tracker::peer_iterator::PeerIterator;
23use crate::protocol::payload::payload::{BootstrapResponseV1, BootstrapV1, PayloadRequestKind, PayloadResponseKind};
24use crate::protocol::peer::Peer;
25use crate::protocol::rpc;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::tools::LeadingAgreementBits;
28use crate::tools::types::Id;
29use crate::tools::{config, json, tools};
30use log::{info, trace, warn};
31use std::sync::Arc;
32
33/// The client's local view of the known peer set and the entry point for Kademlia-style
34/// routing.
35///
36/// `PeerTracker` owns the in-memory list of [`Peer`] records the client has seen, persists
37/// them to [`ClientStorage`] under `BUCKET_PEER` so they survive restarts, and exposes the
38/// iteration primitives used throughout the client when it needs to answer "who should I
39/// talk to next about this [`Id`]?". When the list is empty (first launch, or after a
40/// reset), it seeds itself via a `BootstrapV1` RPC against the
41/// [`crate::transport::bootstrap_provider::BootstrapProvider`] addresses configured on the
42/// transport.
43///
44/// The tracker is the single source of truth for peer freshness: stale or bad peers get
45/// evicted here, new peers get folded in here, and the `peers_need_flush` flag coalesces
46/// rapid updates into a single disk write.
47pub struct PeerTracker {
48    runtime_services: Arc<RuntimeServices>,
49    client_storage: Arc<dyn ClientStorage>,
50    peers_need_flush: bool,
51    peers: Vec<Peer>,
52}
53
54impl PeerTracker {
55    pub async fn new(runtime_services: Arc<RuntimeServices>, client_storage: Arc<dyn ClientStorage>) -> anyhow::Result<Self> {
56        let peers: anyhow::Result<Vec<Peer>> = try {
57            let peers = client_storage::get_struct::<Vec<Peer>>(client_storage.as_ref(), BUCKET_PEER, "peers", runtime_services.time_provider.current_time_millis()).await?;
58            match peers {
59                Some(peers) => {
60                    info!("PeerTracker is starting with {} peers", peers.len());
61                    trace!("{:?}", peers);
62                    peers
63                }
64                None => Vec::new(),
65            }
66        };
67
68        let peers = peers.unwrap_or_else(|e| {
69            warn!("Failed to load peers from storage: {}", e);
70            Vec::new()
71        });
72
73        Ok(Self {
74            runtime_services,
75            client_storage,
76            peers_need_flush: false,
77            peers,
78        })
79    }
80
81    pub async fn flush(&mut self) -> anyhow::Result<()> {
82        if !self.peers_need_flush {
83            return Ok(());
84        }
85
86        trace!("Flushing peers to storage");
87        self.peers_need_flush = false;
88        client_storage::put_struct(self.client_storage.as_ref(), BUCKET_PEER, "peers", &self.peers, self.runtime_services.time_provider.current_time_millis()).await?;
89
90        Ok(())
91    }
92
93    pub fn add_peer(&mut self, peer: Peer) -> anyhow::Result<()> {
94        // Sanity check that this peer is kosher
95        if let Err(e) = peer.verify() {
96            anyhow::bail!("peer verification error: {}", e);
97        }
98
99        // Check that its pow is reasonable
100        if peer.pow_initial.pow < config::SERVER_KEY_POW_MIN {
101            anyhow::bail!("peer peer.pow_initial.pow={} < {}", peer.pow_initial.pow, config::SERVER_KEY_POW_MIN);
102        }
103
104        let search_result = self.peers.binary_search_by_key(&peer.id, |peer| peer.id);
105        match search_result {
106            Ok(i) => {
107                // Sanity check that the ids are the same
108                assert_eq!(peer.id, self.peers[i].id);
109
110                // If the newer peer is more recent, replace the older one
111                if peer.timestamp > self.peers[i].timestamp {
112                    self.peers[i] = peer;
113                }
114            }
115            Err(i) => {
116                self.peers.insert(i, peer);
117            }
118        }
119
120        self.peers_need_flush = true;
121
122        Ok(())
123    }
124
125    pub fn remove_peer(&mut self, peer: &Peer) {
126        if let Ok(i) = self.peers.binary_search_by_key(&peer.id, |peer| peer.id) {
127            self.peers.remove(i);
128            self.peers_need_flush = true;
129        }
130    }
131
132    pub fn is_empty(&self) -> bool {
133        self.peers.is_empty()
134    }
135    pub fn len(&self) -> usize {
136        self.peers.len()
137    }
138
139    pub fn peers(&self) -> &Vec<Peer> {
140        &self.peers
141    }
142
143    pub async fn iterate_to_location(&mut self, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> anyhow::Result<PeerIterator<'_>> {
144        self.bootstrap().await?;
145
146        Ok(PeerIterator::new(self, bucket_location_id, max_iterations_since_high_watermark, cache_radius))
147    }
148
149    pub async fn bootstrap(&mut self) -> anyhow::Result<()> {
150        // We only need to bootsrap if we have noone to talk to!
151        if !self.is_empty() {
152            return Ok(());
153        }
154
155        info!("bootstrapping PeerTracker");
156
157        // Lets randomize these addresses so that the first one is not snowed
158        let mut bootstrap_addresses = self.runtime_services.transport_factory.get_bootstrap_addresses().await;
159        tools::shuffle(&mut bootstrap_addresses);
160
161        // Our bootstrap process has handed us a bunch of raw addresses, so we need to convert them into peers
162        for bootstrap_address in bootstrap_addresses {
163            let try_result: anyhow::Result<()> = try {
164                {
165                    info!("bootstrapping {}", bootstrap_address);
166
167                    let request = json::struct_to_bytes(&BootstrapV1 {})?;
168                    let response = rpc::rpc::rpc_server_unknown(&self.runtime_services, &Id::zero(), &bootstrap_address, PayloadRequestKind::BootstrapV1, request).await?;
169                    anyhow_assert_eq!(&PayloadResponseKind::BootstrapResponseV1, &response.response_request_kind);
170                    let response = json::bytes_to_struct::<BootstrapResponseV1>(&response.bytes)?;
171                    for peer in response.peers_random {
172                        let result = self.add_peer(peer);
173                        if let Err(e) = result {
174                            warn!("problem while adding bootstrapped peer: {}", e);
175                        }
176                    }
177                }
178            };
179
180            if let Err(e) = try_result {
181                warn!("problem bootstrapping peer {}: {}", bootstrap_address, e);
182            }
183
184            // We only need to continue bootstrapping if we still have noone to talk to!
185            if !self.is_empty() {
186                break;
187            }
188        }
189
190        Ok(())
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use crate::client::client_storage::mem_client_storage::MemClientStorage;
197    use crate::client::peer_tracker::peer_tracker::PeerTracker;
198    use crate::tools::config;
199    use crate::tools::runtime_services::RuntimeServices;
200    use crate::tools::server_id::ServerId;
201    use crate::tools::types::Pow;
202
203    #[tokio::test]
204    async fn general_tests() -> anyhow::Result<()> {
205        let runtime_services = RuntimeServices::default_for_testing();
206        let client_storage = MemClientStorage::new().await?;
207        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
208
209        assert!(peer_tracker.is_empty());
210        assert_eq!(0, peer_tracker.len());
211
212        // Dont accept insufficient pow
213        {
214            loop {
215                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
216                if server_id.pow >= config::SERVER_KEY_POW_MIN {
217                    continue;
218                }
219                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
220                let result = peer_tracker.add_peer(peer);
221                assert!(result.is_err());
222                assert_eq!(0, peer_tracker.len());
223                break;
224            }
225        }
226
227        // Add an individual
228        {
229            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
230            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
231            let result = peer_tracker.add_peer(peer);
232            assert!(result.is_ok());
233            assert_eq!(1, peer_tracker.len());
234        }
235
236        // Cant add individual twice
237        {
238            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
239            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
240            let result = peer_tracker.add_peer(peer.clone());
241            assert!(result.is_ok());
242            assert_eq!(2, peer_tracker.len());
243            let result = peer_tracker.add_peer(peer.clone());
244            assert!(result.is_ok());
245            assert_eq!(2, peer_tracker.len());
246        }
247
248        // Add an individual, then remove it
249        {
250            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
251            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
252            let result = peer_tracker.add_peer(peer.clone());
253            assert!(result.is_ok());
254            assert_eq!(3, peer_tracker.len());
255            peer_tracker.remove_peer(&peer);
256            assert_eq!(2, peer_tracker.len());
257        }
258
259        // Remove an unknown individual
260        {
261            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
262            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
263            peer_tracker.remove_peer(&peer);
264            assert_eq!(2, peer_tracker.len());
265        }
266
267        Ok(())
268    }
269
270    /// A peer whose pow_initial has been mutated after signing must fail verify(),
271    /// so add_peer rejects it.
272    #[tokio::test]
273    async fn add_peer_rejects_tampered_pow_initial() -> anyhow::Result<()> {
274        let runtime_services = RuntimeServices::default_for_testing();
275        let client_storage = MemClientStorage::new().await?;
276        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
277
278        let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
279        let mut peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
280
281        peer.pow_initial.salt = crate::tools::types::Salt::zero();
282
283        let result = peer_tracker.add_peer(peer);
284        assert!(result.is_err(), "tampered peer should be rejected");
285        assert_eq!(0, peer_tracker.len());
286
287        Ok(())
288    }
289
290    /// peers() must stay sorted by id after arbitrary insertion order, because
291    /// add_peer uses binary_search_by_key to dedupe.
292    #[tokio::test]
293    async fn add_peer_keeps_peers_sorted_by_id() -> anyhow::Result<()> {
294        let runtime_services = RuntimeServices::default_for_testing();
295        let client_storage = MemClientStorage::new().await?;
296        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
297
298        const NUM_PEERS: usize = 30;
299        for _ in 0..NUM_PEERS {
300            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
301            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
302            peer_tracker.add_peer(peer)?;
303        }
304
305        let peers = peer_tracker.peers();
306        assert_eq!(NUM_PEERS, peers.len());
307        for window in peers.windows(2) {
308            assert!(window[0].id < window[1].id, "peers must be sorted by id; got {} then {}", window[0].id, window[1].id);
309        }
310
311        Ok(())
312    }
313}