hashiverse_lib/client/peer_tracker/
peer_tracker.rs1use crate::anyhow_assert_eq;
20use crate::client::client_storage::client_storage;
21use crate::client::client_storage::client_storage::{ClientStorage, BUCKET_PEER};
22use crate::client::peer_tracker::peer_iterator::PeerIterator;
23use crate::protocol::payload::payload::{BootstrapResponseV1, BootstrapV1, PayloadRequestKind, PayloadResponseKind};
24use crate::protocol::peer::Peer;
25use crate::protocol::rpc;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::tools::LeadingAgreementBits;
28use crate::tools::types::Id;
29use crate::tools::{config, json, tools};
30use log::{info, trace, warn};
31use std::sync::Arc;
32
33pub struct PeerTracker {
48 runtime_services: Arc<RuntimeServices>,
49 client_storage: Arc<dyn ClientStorage>,
50 peers_need_flush: bool,
51 peers: Vec<Peer>,
52}
53
54impl PeerTracker {
55 pub async fn new(runtime_services: Arc<RuntimeServices>, client_storage: Arc<dyn ClientStorage>) -> anyhow::Result<Self> {
56 let peers: anyhow::Result<Vec<Peer>> = try {
57 let peers = client_storage::get_struct::<Vec<Peer>>(client_storage.as_ref(), BUCKET_PEER, "peers", runtime_services.time_provider.current_time_millis()).await?;
58 match peers {
59 Some(peers) => {
60 info!("PeerTracker is starting with {} peers", peers.len());
61 trace!("{:?}", peers);
62 peers
63 }
64 None => Vec::new(),
65 }
66 };
67
68 let peers = peers.unwrap_or_else(|e| {
69 warn!("Failed to load peers from storage: {}", e);
70 Vec::new()
71 });
72
73 Ok(Self {
74 runtime_services,
75 client_storage,
76 peers_need_flush: false,
77 peers,
78 })
79 }
80
81 pub async fn flush(&mut self) -> anyhow::Result<()> {
82 if !self.peers_need_flush {
83 return Ok(());
84 }
85
86 trace!("Flushing peers to storage");
87 self.peers_need_flush = false;
88 client_storage::put_struct(self.client_storage.as_ref(), BUCKET_PEER, "peers", &self.peers, self.runtime_services.time_provider.current_time_millis()).await?;
89
90 Ok(())
91 }
92
93 pub fn add_peer(&mut self, peer: Peer) -> anyhow::Result<()> {
94 if let Err(e) = peer.verify() {
96 anyhow::bail!("peer verification error: {}", e);
97 }
98
99 if peer.pow_initial.pow < config::SERVER_KEY_POW_MIN {
101 anyhow::bail!("peer peer.pow_initial.pow={} < {}", peer.pow_initial.pow, config::SERVER_KEY_POW_MIN);
102 }
103
104 let search_result = self.peers.binary_search_by_key(&peer.id, |peer| peer.id);
105 match search_result {
106 Ok(i) => {
107 assert_eq!(peer.id, self.peers[i].id);
109
110 if peer.timestamp > self.peers[i].timestamp {
112 self.peers[i] = peer;
113 }
114 }
115 Err(i) => {
116 self.peers.insert(i, peer);
117 }
118 }
119
120 self.peers_need_flush = true;
121
122 Ok(())
123 }
124
125 pub fn remove_peer(&mut self, peer: &Peer) {
126 if let Ok(i) = self.peers.binary_search_by_key(&peer.id, |peer| peer.id) {
127 self.peers.remove(i);
128 self.peers_need_flush = true;
129 }
130 }
131
132 pub fn is_empty(&self) -> bool {
133 self.peers.is_empty()
134 }
135 pub fn len(&self) -> usize {
136 self.peers.len()
137 }
138
139 pub fn peers(&self) -> &Vec<Peer> {
140 &self.peers
141 }
142
143 pub async fn iterate_to_location(&mut self, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> anyhow::Result<PeerIterator<'_>> {
144 self.bootstrap().await?;
145
146 Ok(PeerIterator::new(self, bucket_location_id, max_iterations_since_high_watermark, cache_radius))
147 }
148
149 pub async fn bootstrap(&mut self) -> anyhow::Result<()> {
150 if !self.is_empty() {
152 return Ok(());
153 }
154
155 info!("bootstrapping PeerTracker");
156
157 let mut bootstrap_addresses = self.runtime_services.transport_factory.get_bootstrap_addresses().await;
159 tools::shuffle(&mut bootstrap_addresses);
160
161 for bootstrap_address in bootstrap_addresses {
163 let try_result: anyhow::Result<()> = try {
164 {
165 info!("bootstrapping {}", bootstrap_address);
166
167 let request = json::struct_to_bytes(&BootstrapV1 {})?;
168 let response = rpc::rpc::rpc_server_unknown(&self.runtime_services, &Id::zero(), &bootstrap_address, PayloadRequestKind::BootstrapV1, request).await?;
169 anyhow_assert_eq!(&PayloadResponseKind::BootstrapResponseV1, &response.response_request_kind);
170 let response = json::bytes_to_struct::<BootstrapResponseV1>(&response.bytes)?;
171 for peer in response.peers_random {
172 let result = self.add_peer(peer);
173 if let Err(e) = result {
174 warn!("problem while adding bootstrapped peer: {}", e);
175 }
176 }
177 }
178 };
179
180 if let Err(e) = try_result {
181 warn!("problem bootstrapping peer {}: {}", bootstrap_address, e);
182 }
183
184 if !self.is_empty() {
186 break;
187 }
188 }
189
190 Ok(())
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use crate::client::client_storage::mem_client_storage::MemClientStorage;
197 use crate::client::peer_tracker::peer_tracker::PeerTracker;
198 use crate::tools::config;
199 use crate::tools::runtime_services::RuntimeServices;
200 use crate::tools::server_id::ServerId;
201 use crate::tools::types::Pow;
202
203 #[tokio::test]
204 async fn general_tests() -> anyhow::Result<()> {
205 let runtime_services = RuntimeServices::default_for_testing();
206 let client_storage = MemClientStorage::new().await?;
207 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
208
209 assert!(peer_tracker.is_empty());
210 assert_eq!(0, peer_tracker.len());
211
212 {
214 loop {
215 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
216 if server_id.pow >= config::SERVER_KEY_POW_MIN {
217 continue;
218 }
219 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
220 let result = peer_tracker.add_peer(peer);
221 assert!(result.is_err());
222 assert_eq!(0, peer_tracker.len());
223 break;
224 }
225 }
226
227 {
229 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
230 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
231 let result = peer_tracker.add_peer(peer);
232 assert!(result.is_ok());
233 assert_eq!(1, peer_tracker.len());
234 }
235
236 {
238 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
239 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
240 let result = peer_tracker.add_peer(peer.clone());
241 assert!(result.is_ok());
242 assert_eq!(2, peer_tracker.len());
243 let result = peer_tracker.add_peer(peer.clone());
244 assert!(result.is_ok());
245 assert_eq!(2, peer_tracker.len());
246 }
247
248 {
250 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
251 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
252 let result = peer_tracker.add_peer(peer.clone());
253 assert!(result.is_ok());
254 assert_eq!(3, peer_tracker.len());
255 peer_tracker.remove_peer(&peer);
256 assert_eq!(2, peer_tracker.len());
257 }
258
259 {
261 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
262 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
263 peer_tracker.remove_peer(&peer);
264 assert_eq!(2, peer_tracker.len());
265 }
266
267 Ok(())
268 }
269
270 #[tokio::test]
273 async fn add_peer_rejects_tampered_pow_initial() -> anyhow::Result<()> {
274 let runtime_services = RuntimeServices::default_for_testing();
275 let client_storage = MemClientStorage::new().await?;
276 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
277
278 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
279 let mut peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
280
281 peer.pow_initial.salt = crate::tools::types::Salt::zero();
282
283 let result = peer_tracker.add_peer(peer);
284 assert!(result.is_err(), "tampered peer should be rejected");
285 assert_eq!(0, peer_tracker.len());
286
287 Ok(())
288 }
289
290 #[tokio::test]
293 async fn add_peer_keeps_peers_sorted_by_id() -> anyhow::Result<()> {
294 let runtime_services = RuntimeServices::default_for_testing();
295 let client_storage = MemClientStorage::new().await?;
296 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
297
298 const NUM_PEERS: usize = 30;
299 for _ in 0..NUM_PEERS {
300 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
301 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
302 peer_tracker.add_peer(peer)?;
303 }
304
305 let peers = peer_tracker.peers();
306 assert_eq!(NUM_PEERS, peers.len());
307 for window in peers.windows(2) {
308 assert!(window[0].id < window[1].id, "peers must be sorted by id; got {} then {}", window[0].id, window[1].id);
309 }
310
311 Ok(())
312 }
313}