hashiverse_server_lib/server/
hashiverse_server.rs1use crate::environment::environment::{Environment, EnvironmentDimensions, EnvironmentFactory, CONFIG_KADEMLIA_PEER_BUCKETS, CONFIG_SERVER_ID};
25use crate::server::kademlia::kademlia;
26use crate::server::kademlia::kademlia::Kademlia;
27use crate::server::post_bundle_caching::PostBundleCache;
28use crate::server::post_bundle_feedback_caching::PostBundleFeedbackCache;
29use hashiverse_lib::anyhow_assert_eq;
30use hashiverse_lib::protocol::payload::payload::{AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, BootstrapV1, PayloadRequestKind, PayloadResponseKind, PeerStatsResponseV1, PAYLOAD_REQUEST_KIND_COUNT};
31use hashiverse_lib::protocol::peer::Peer;
32use hashiverse_lib::protocol::rpc;
33use hashiverse_lib::tools::runtime_services::RuntimeServices;
34use hashiverse_lib::tools::server_id::ServerId;
35use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE, MILLIS_IN_SECOND};
36use hashiverse_lib::tools::time_provider::time_provider::TimeProvider;
37use hashiverse_lib::tools::types::{Id, Salt};
38use hashiverse_lib::tools::{config, tools};
39use hashiverse_lib::tools::json;
40use hashiverse_lib::transport::transport::{IncomingRequest, TransportServer};
41use log::{error, info, trace, warn};
42use moka::sync::Cache;
43use parking_lot::{Mutex, RwLock};
44use std::sync::Arc;
45use std::sync::atomic::AtomicU64;
46use std::time::Duration;
47use bytes::Bytes;
48use tokio::sync::mpsc;
49use tokio_util::sync::CancellationToken;
50use hashiverse_lib::protocol::rpc::rpc_response::RpcResponsePacketRx;
51use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
52use hashiverse_lib::protocol::payload::payload::TrendingHashtagsFetchResponseV1;
53use crate::server::args::Args;
54
55pub struct HashiverseServer {
56 pub runtime_services: Arc<RuntimeServices>,
57 pub environment: Arc<Environment>,
58 pub server_id: ServerId,
59 pub kademlia: Arc<RwLock<Kademlia<Id, Peer>>>,
60 pub transport_server: Arc<dyn TransportServer>,
61 pub peer_self: Arc<RwLock<Peer>>,
62 pub heal_in_progress: Cache<Id, ()>,
63 pub seen_salts: Cache<Salt, ()>,
64 pub post_bundle_cache: PostBundleCache,
65 pub post_bundle_feedback_cache: PostBundleFeedbackCache,
66 pub trending_hashtags: Cache<String, HyperLogLog>,
67 pub trending_hashtags_response_cache: Mutex<Option<(TimeMillis, TrendingHashtagsFetchResponseV1)>>,
68 pub request_counters: Arc<[AtomicU64; PAYLOAD_REQUEST_KIND_COUNT]>,
72 pub peer_stats_response_cache: Mutex<Option<(TimeMillis, PeerStatsResponseV1)>>,
76}
77
78impl HashiverseServer {
79 pub async fn new(runtime_services: Arc<RuntimeServices>, environment_factory: Arc<dyn EnvironmentFactory>, args: Args) -> anyhow::Result<Arc<Self>> {
80 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(args.max_post_database_size_megabytes * 1024 * 1024);
81 let environment = environment_factory.open_next_available(environment_dimensions).await?;
82
83 let config_server_id = environment.config_get_bytes(CONFIG_SERVER_ID)?;
85 let server_id = match config_server_id {
86 None => {
87 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, args.skip_pq_commitment_bytes, runtime_services.pow_generator.as_ref()).await?;
88 environment.config_put_bytes(CONFIG_SERVER_ID, server_id.encode()?)?;
89 info!("starting new server with server_id={}", server_id);
90 server_id
91 }
92 Some(config_server_id) => {
93 let server_id = ServerId::decode(config_server_id.as_ref())?;
94 server_id.verify()?;
95 info!("restarting existing server with server_id={}", server_id);
96 server_id
97 }
98 };
99
100 let transport_server = runtime_services.transport_factory.create_server(&args.base_path, args.port, args.force_local_network).await?;
101
102 let mut peer_self = server_id.to_peer(runtime_services.time_provider.as_ref())?;
104 peer_self.address = transport_server.get_address().to_string();
105 peer_self.sign(runtime_services.time_provider.as_ref(), &server_id.keys.signature_key)?;
106
107 let mut kademlia = Kademlia::<Id, Peer>::new(server_id.id, config::SERVER_KADEMLIA_MAX_PEERS_PER_BUCKET);
109 {
110 let now = runtime_services.time_provider.current_time_millis();
111
112 kademlia.add_peer(peer_self.clone(), now)?;
114
115 {
117 let try_result = try {
118 let peer_buckets = environment.config_get_struct::<Vec<Vec<Peer>>>(CONFIG_KADEMLIA_PEER_BUCKETS)?;
119 if let Some(peer_buckets) = peer_buckets {
120 for peer_bucket in peer_buckets {
121 for peer in peer_bucket {
122 kademlia.add_peer(peer, now)?;
123 }
124 }
125 }
126 };
127
128 if let Err(e) = try_result {
129 warn!("problem depersisting peer_buckets: {}", e);
130 }
131 }
132 }
133
134 info!("server_id={}", server_id);
135 info!("peer_self={}", peer_self);
136
137 let hashiverse_server = HashiverseServer {
138 runtime_services,
139 environment: Arc::new(environment),
140 server_id,
141 kademlia: Arc::new(RwLock::new(kademlia)),
142 transport_server,
143 peer_self: Arc::new(RwLock::new(peer_self)),
144 heal_in_progress: Cache::builder().time_to_live(Duration::from_secs(60)).build(),
145 seen_salts: Cache::builder().time_to_live(Duration::from_mins(5)).max_capacity(100_000).build(),
146 post_bundle_cache: PostBundleCache::new(config::SERVER_POST_BUNDLE_CACHE_MAX_ORIGINATORS_PER_LOCATION, config::SERVER_POST_BUNDLE_CACHE_MAX_BYTES),
147 post_bundle_feedback_cache: PostBundleFeedbackCache::new(config::SERVER_POST_BUNDLE_FEEDBACK_CACHE_MAX_BYTES),
148 trending_hashtags: Cache::builder().max_capacity(256).build(),
149 trending_hashtags_response_cache: Mutex::new(None),
150 request_counters: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))),
151 peer_stats_response_cache: Mutex::new(None),
152 };
153
154 Ok(Arc::new(hashiverse_server))
155 }
156
157 pub async fn run(&self, cancellation_token: CancellationToken) {
158 info!("server started");
159
160 let (tx, rx) = mpsc::channel::<IncomingRequest>(32);
161
162 let res = tokio::try_join!(
163 self.wrap_and_dispatch_network_envelopes(cancellation_token.clone(), rx),
164 self.maintain_environment(cancellation_token.clone(), self.runtime_services.time_provider.clone()),
165 self.maintain_kademlia(cancellation_token.clone(), self.runtime_services.time_provider.clone()),
166 self.transport_server.listen(cancellation_token.clone(), tx),
167 );
168
169 match res {
170 Ok(_) => info!("server stopped"),
171 Err(e) => error!("server stopped with error: {}", e),
172 }
173 }
174
175 pub async fn add_potential_peer_to_kademlia(&self, peer: Peer, time_millis: TimeMillis) {
176 let result = peer.verify();
178 if let Err(e) = result {
179 warn!("peer {} failed verification: {}", peer, e);
180 return;
181 }
182
183 if peer.pow_initial.pow < config::SERVER_KEY_POW_MIN {
185 warn!("peer {} failed pow so not adding to our kademlia", peer);
186 return;
187 }
188
189 let result = self.kademlia.write().add_peer(peer, time_millis);
190 if let Err(e) = result {
191 warn!("problem adding peer: {}", e);
192 }
193 }
194
195 async fn rpc_server_unknown(&self, address: &String, payload_request_kind: PayloadRequestKind, payload: Bytes) -> anyhow::Result<RpcResponsePacketRx> {
196 rpc::rpc::rpc_server_unknown(&self.runtime_services, &self.server_id.id, address, payload_request_kind, payload).await
197 }
198
199 async fn rpc_server_known(&self, destination_peer: &Peer, payload_request_kind: PayloadRequestKind, payload: Bytes) -> anyhow::Result<RpcResponsePacketRx> {
200 rpc::rpc::rpc_server_known(&self.runtime_services, &self.server_id.id, destination_peer, payload_request_kind, payload).await
201 }
202
203 async fn maintain_environment(&self, cancellation_token: CancellationToken, time_provider: Arc<dyn TimeProvider>) -> Result<(), anyhow::Error> {
204 loop {
205 if cancellation_token.is_cancelled() {
206 break;
207 }
208
209 let time_millis = time_provider.current_time_millis();
210
211 self.environment.do_maintenance(&cancellation_token, time_millis).await?;
212
213 tools::cancellable_sleep_millis(self.runtime_services.time_provider.as_ref(), MILLIS_IN_MINUTE, &cancellation_token).await;
214 }
215
216 Ok(())
217 }
218
219 async fn maintain_kademlia(&self, cancellation_token: CancellationToken, time_provider: Arc<dyn TimeProvider>) -> Result<(), anyhow::Error> {
220 let mut last_bootstrap = TimeMillis::zero();
221 let mut last_announce = TimeMillis::zero();
222 let mut last_peers_dump_to_storage = self.runtime_services.time_provider.current_time_millis();
223
224 loop {
225 if cancellation_token.is_cancelled() {
226 break;
227 }
228
229 let now = time_provider.current_time_millis();
230
231 if now - last_bootstrap > config::MILLIS_TO_WAIT_BETWEEN_BOOTSTRAPS {
233 last_bootstrap = now;
234
235 let needs_bootstrapping = { self.kademlia.read().len() < config::MINIMUM_PEERS_TO_STOP_BOOTSTRAPPING };
236
237 if needs_bootstrapping {
238
239 let mut bootstrap_addresses = self.runtime_services.transport_factory.get_bootstrap_addresses().await;
241 tools::shuffle(&mut bootstrap_addresses);
242 trace!("bootstrap addresses: {:?}", bootstrap_addresses);
243
244 for bootstrap_address in bootstrap_addresses {
245 if cancellation_token.is_cancelled() {
246 break;
247 }
248
249 let try_result: anyhow::Result<()> = try {
250 {
251 trace!("bootstrapping {}", bootstrap_address);
252 let rpc_response_packet_rx = self.rpc_server_unknown(&bootstrap_address, PayloadRequestKind::BootstrapV1, json::struct_to_bytes(&BootstrapV1 {})?).await?;
253 anyhow_assert_eq!(&PayloadResponseKind::BootstrapResponseV1, &rpc_response_packet_rx.response_request_kind);
254 let response = json::bytes_to_struct::<BootstrapResponseV1>(&rpc_response_packet_rx.bytes)?;
255 for peer in response.peers_random {
256 self.add_potential_peer_to_kademlia(peer, now).await;
257 }
258 }
259 };
260
261 if let Err(e) = try_result {
262 warn!("problem bootstrapping {}: {}", bootstrap_address, e);
263 }
264
265 let needs_bootstrapping = { self.kademlia.read().len() < config::MINIMUM_PEERS_TO_STOP_BOOTSTRAPPING };
266 if !needs_bootstrapping {
267 break;
268 }
269
270 }
271
272 trace!("We now have {} peers", self.kademlia.read().len());
273 }
274 }
275
276 if now - last_announce > config::MILLIS_TO_WAIT_BETWEEN_ANNOUNCES {
278 last_announce = now;
279
280 let peer_self = self.peer_self.read().clone();
281
282
283 let mut announce_peers = Vec::<Peer>::new();
285 {
286 let kademlia = self.kademlia.read();
287
288 let peer_with_lowest_score = kademlia.get_peer_with_lowest_score();
290 if let Some(peer_with_lowest_score) = peer_with_lowest_score {
291 announce_peers.push(peer_with_lowest_score.clone());
292 }
293
294 let (peers_nearest, _) = kademlia.get_peers_for_key(&peer_self.id, 8);
296 if !peers_nearest.is_empty() {
297 announce_peers.push(tools::random_element(&peers_nearest).clone());
298 }
299 }
300
301 for announce_peer in announce_peers {
303 if cancellation_token.is_cancelled() {
304 break;
305 }
306
307 if announce_peer == peer_self {
309 self.add_potential_peer_to_kademlia(peer_self.clone(), now).await;
311 continue;
312 }
313
314 let try_result: anyhow::Result<()> = try {
315 {
316 let rpc_response_packet_rx = self.rpc_server_known(&announce_peer, PayloadRequestKind::AnnounceV1, json::struct_to_bytes(&AnnounceV1 { peer_self: peer_self.clone() })?).await?;
319 anyhow_assert_eq!(&PayloadResponseKind::AnnounceResponseV1, &rpc_response_packet_rx.response_request_kind);
320 let response = json::bytes_to_struct::<AnnounceResponseV1>(&rpc_response_packet_rx.bytes)?;
321 self.add_potential_peer_to_kademlia(response.peer_self, now).await;
322 for peer in response.peers_nearest {
323 self.add_potential_peer_to_kademlia(peer, now).await;
324 }
325 }
326 };
327
328 if let Err(e) = try_result {
329 warn!("problem announcing {}: {}", announce_peer, e);
330 self.kademlia.write().remove_peer(&announce_peer.id, now);
331 }
332 }
333 }
334
335 {
337 let try_result = try {
338 let kademlia = self.kademlia.read();
339 if last_peers_dump_to_storage < kademlia.peers_last_changed() && now - last_peers_dump_to_storage > config::MILLIS_TO_WAIT_BETWEEN_PEER_DUMPS {
340 last_peers_dump_to_storage = kademlia.peers_last_changed();
341 let peer_buckets = kademlia.get_peer_buckets();
342 self.environment.config_put_struct(CONFIG_KADEMLIA_PEER_BUCKETS, &peer_buckets)?;
345 }
346 };
347
348 if let Err(e) = try_result {
349 warn!("problem persisting peer_buckets: {}", e);
350 }
351 }
352
353 tools::cancellable_sleep_millis(self.runtime_services.time_provider.as_ref(), MILLIS_IN_SECOND.const_mul(30), &cancellation_token).await;
354 }
355
356 Ok(())
357 }
358}
359
360impl kademlia::Peer<Id> for Peer {
361 fn id(&self) -> &Id {
362 &self.id
363 }
364 fn score(&self, time_millis: TimeMillis) -> f64 {
365 self.pow_current_day.pow_decayed_day(time_millis) + self.pow_current_month.pow_decayed_month(time_millis)
367 }
368}