Skip to main content

hashiverse_server_lib/environment/
environment.rs

1//! # Server-side data environment — persistence, quotas, decimation
2//!
3//! [`Environment`] is the facade every handler in
4//! [`crate::server`] writes through when it needs to read or mutate persisted
5//! data: post bundles, per-bundle metadata, feedback, and server config. It wraps a
6//! pluggable [`crate::environment::environment_store::EnvironmentStore`] and layers
7//! on top:
8//!
9//! - **Striped locks** — 256 mutexes keyed by the first byte of the bundle id, so
10//!   unrelated bundles never contend. Hot locations stay local to their own stripe.
11//! - **Quota enforcement** — disk usage is tracked against the operator's configured
12//!   cap (from [`crate::server::args::Args`]). When the cap is crossed, decimation
13//!   runs as an offline `tokio::task::spawn_blocking` job, evicting the
14//!   least-recently-accessed bundles until the store is back under quota — without
15//!   stalling the live request loop.
16//! - **Feedback merging** — writes are monotonic: a new feedback entry is kept only
17//!   if its proof-of-work is at least as strong as any existing entry for the same
18//!   `(location, post, feedback_type)`, so sybil floods can't overwrite real votes.
19
20use crate::environment::environment_store::EnvironmentStore;
21use async_trait::async_trait;
22use bytes::Bytes;
23use hashiverse_lib::tools::time::TimeMillis;
24use hashiverse_lib::tools::types::Id;
25use log::{info, warn};
26use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use serde::{Deserialize, Serialize};
28use std::collections::{BinaryHeap, HashMap};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use serde::de::DeserializeOwned;
32use tokio_util::sync::CancellationToken;
33use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
34use hashiverse_lib::tools::{compression, json};
35
36/// An environment is a 'folder' location that contains everything to persist a server
37///  - e.g. config , keys, posts, etc
38
39pub const CONFIG_SERVER_ID: &str = "server_id";
40pub const CONFIG_KADEMLIA_PEER_BUCKETS: &str = "kademlia_peer_buckets";
41pub const CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES: &str = "post_bundle_current_size_bytes";
42
43#[derive(Clone, Copy)]
44pub struct EnvironmentDimensions {
45    pub max_size_bytes: usize,
46    pub max_quota_used: f64,
47}
48
49impl EnvironmentDimensions {
50    pub fn with_max_size_bytes(&self, max_size_bytes: usize) -> Self {
51        Self { max_size_bytes, ..*self }
52    }
53}
54
55impl Default for EnvironmentDimensions {
56    fn default() -> Self {
57        Self {
58            max_size_bytes: 20 * 1024 * 1024 * 1024,
59            max_quota_used: 0.9,
60        }
61    }
62}
63
64#[async_trait]
65pub trait EnvironmentFactory: Send + Sync {
66    fn new(base_path: &str) -> Self
67    where
68        Self: Sized;
69    async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment>;
70}
71
72#[derive(Serialize, Deserialize, Debug, PartialEq)]
73pub struct PostBundleMetadata {
74    pub size: usize,
75    pub num_posts: u8,
76    pub num_posts_granted: u8,
77    pub overflowed: bool,
78    pub sealed: bool,
79}
80
81impl PostBundleMetadata {
82    pub fn zero() -> PostBundleMetadata {
83        Self {
84            size: 0,
85            num_posts: 0,
86            num_posts_granted: 0,
87            overflowed: false,
88            sealed: false,
89        }
90    }
91}
92
93pub struct Environment {
94    environment_dimensions: EnvironmentDimensions,
95    environment_store: Arc<dyn EnvironmentStore>,
96
97    post_bundle_locks: [RwLock<()>; 256], // We do striped locking to allow better parallelism
98    post_bundles_last_touched_batch: RwLock<HashMap<Id, TimeMillis>>,
99    post_bundle_current_size_bytes: AtomicUsize,
100    decimation_lock: RwLock<()>, // Allows only one decimation process at a time
101}
102
103impl Environment {
104    pub async fn new(environment_store: Arc<dyn EnvironmentStore>, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
105        let post_bundle_locks: [RwLock<()>; 256] = std::array::from_fn(|_| RwLock::new(()));
106        let post_bundles_last_touched = RwLock::new(HashMap::new());
107        let post_bundle_current_size_bytes = environment_store.config_get_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES)?.unwrap_or(0);
108        let post_bundle_current_size_bytes = AtomicUsize::new(post_bundle_current_size_bytes);
109
110        Ok(Environment {
111            environment_dimensions,
112            environment_store,
113            post_bundle_locks,
114            post_bundles_last_touched_batch: post_bundles_last_touched,
115            post_bundle_current_size_bytes,
116            decimation_lock: RwLock::new(()),
117        })
118    }
119
120    pub fn get_read_lock_for_location_id(&self, location_id: &Id) -> RwLockReadGuard<'_, ()> {
121        self.post_bundle_locks[location_id.0[0] as usize].read()
122    }
123
124    pub fn get_write_lock_for_location_id(&self, location_id: &Id) -> RwLockWriteGuard<'_, ()> {
125        self.post_bundle_locks[location_id.0[0] as usize].write()
126    }
127
128    pub fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
129        self.environment_store.config_get_bytes(key)
130    }
131
132    pub fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
133        self.environment_store.config_put_bytes(key, v)
134    }
135
136    pub fn config_put_struct<T: Serialize>(&self, key: &str, value: &T) -> anyhow::Result<()> {
137        let bytes = json::struct_to_bytes(value)?;
138        let bytes_compressed = compression::compress_for_speed(&bytes)?.to_bytes();
139        self.config_put_bytes(key, bytes_compressed.to_vec())
140    }
141
142    pub fn config_get_struct<T: DeserializeOwned>(&self, key: &str) -> anyhow::Result<Option<T>> {
143        let result = self.config_get_bytes(key)?;
144        match result {
145            Some(bytes_compressed) => {
146                let bytes = compression::decompress(&bytes_compressed)?.to_bytes();
147                let value = json::bytes_to_struct::<T>(&bytes)?;
148                Ok(Some(value))
149            }
150            None => Ok(None),
151        }
152    }
153
154
155    pub fn get_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
156        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
157        self.environment_store.post_bundle_metadata_get(location_id)
158    }
159
160    pub fn get_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
161        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
162        self.environment_store.post_bundle_bytes_get(location_id)
163    }
164
165    pub fn get_post_bundle_encoded_post_feedbacks_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Bytes> {
166        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
167        self.environment_store.post_bundle_feedbacks_bytes_get(location_id)
168    }
169    
170    pub fn put_post_feedback_if_more_powerful(&self, time_millis: TimeMillis, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
171        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
172        self.environment_store.post_feedback_put_if_more_powerful(location_id, encoded_post_feedback)
173    }
174
175    pub fn put_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
176        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
177        self.environment_store.post_bundle_bytes_put(location_id, bytes)?;
178        Ok(())
179    }
180
181    pub fn put_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id, post_bundle_metadata: &PostBundleMetadata, additional_bytes: usize) -> anyhow::Result<()> {
182        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
183        self.environment_store.post_bundle_metadata_put(location_id, post_bundle_metadata)?;
184        self.post_bundle_current_size_bytes.fetch_add(additional_bytes, Ordering::Relaxed);
185        Ok(())
186    }
187
188    fn post_bundle_current_size_bytes(&self) -> usize {
189        self.post_bundle_current_size_bytes.load(Ordering::Relaxed)
190    }
191
192    /// Total on-disk bytes occupied by stored post bundles, as tracked by the
193    /// quota counter. Exposed for the server-stats RPC.
194    pub fn post_bundle_total_bytes(&self) -> usize {
195        self.post_bundle_current_size_bytes()
196    }
197
198    /// Number of post bundles currently in the store.
199    pub fn post_bundle_count(&self) -> anyhow::Result<usize> {
200        self.environment_store.post_bundle_count()
201    }
202
203    /// Number of distinct (location, post, feedback_type) feedback entries currently
204    /// in the store.
205    pub fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
206        self.environment_store.post_bundle_feedback_count()
207    }
208
209    pub async fn do_maintenance(self: &Arc<Self>, cancellation_token: &CancellationToken, time_millis: TimeMillis) -> anyhow::Result<()> {
210        // log::trace!("do_maintenance");
211
212        // Push the current total disk used to store
213        let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
214        self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes)?;
215
216        // Flush the last_accessed times to store
217        {
218            let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
219            if !post_bundles_last_touched.is_empty() {
220                log::trace!("Flushing {} last accessed post bundles", post_bundles_last_touched.len());
221                self.environment_store.post_bundles_last_accessed_flush(&post_bundles_last_touched)?;
222                post_bundles_last_touched.clear();
223            }
224        }
225
226        // Have we passed our allowed quota of available space?
227        let quota_used = post_bundle_current_size_bytes as f64 / self.environment_dimensions.max_size_bytes as f64;
228        if quota_used > self.environment_dimensions.max_quota_used {
229            let env = self.clone();
230            let cancellation_token = cancellation_token.clone();
231            let max_quota_used = self.environment_dimensions.max_quota_used;
232            tokio::task::spawn_blocking(move || env.do_decimation(cancellation_token, time_millis, quota_used, max_quota_used)).await??;
233        }
234
235        Ok(())
236    }
237
238    fn do_decimation(self: &Arc<Self>, cancellation_token: CancellationToken, _time_millis: TimeMillis, quota_used: f64, max_quota_used: f64) -> anyhow::Result<()> {
239        let lock = self.decimation_lock.try_read();
240        if lock.is_none() {
241            log::trace!("A decimation is already in progress.");
242            return Ok(());
243        }
244
245        // Make sure we flush our size update to the database
246        scopeguard::defer! {
247            let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
248            let _ = self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes);
249        }
250
251        // Work out how much we need to decimate
252        let total_rows = self.environment_store.post_bundle_count()?;
253        let decimation_count = (total_rows as f64) * (1f64 - 0.98f64 * max_quota_used / quota_used);
254
255        if 0.0 >= decimation_count {
256            warn!("Decimation count is unexpectedly zero, skipping decimation");
257            return Ok(());
258        }
259
260        // Find a neatish rounding of batches and rows per batch
261        let rows_per_batch = 100f64.min(decimation_count);
262        let num_batches = decimation_count / rows_per_batch;
263        let num_batches = num_batches.ceil() as usize;
264        let rows_per_batch = decimation_count as usize / num_batches;
265
266        info!("Decimation of {}/{} items will be done in {} batches of {} deletes each", decimation_count, total_rows, num_batches, rows_per_batch);
267        let post_bundle_current_size_bytes_before = self.post_bundle_current_size_bytes();
268        info!(
269            "Total size before decimation is {} ({}%)",
270            post_bundle_current_size_bytes_before,
271            100 * post_bundle_current_size_bytes_before / self.environment_dimensions.max_size_bytes
272        );
273
274        for batch in 0..num_batches {
275            if cancellation_token.is_cancelled() {
276                break;
277            }
278
279            // Who will we kill
280            let mut heap_of_locations_to_decimate = BinaryHeap::new();
281            {
282                let random_prefix = Id::random();
283                log::trace!("Decimation batch {} prefix: {}", batch, random_prefix);
284                self.environment_store.post_bundles_last_accessed_iter(&random_prefix).take(5 * rows_per_batch).for_each(|pair| {
285                    match pair {
286                        Ok((location_id, time_millis_bytes)) => {
287                            if heap_of_locations_to_decimate.len() >= rows_per_batch {
288                                let &(most_recent_time_millis_bytes, _) = heap_of_locations_to_decimate.peek().expect("should always work as we have checked the length");
289                                if most_recent_time_millis_bytes > time_millis_bytes {
290                                    heap_of_locations_to_decimate.pop();
291                                    heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
292                                }
293                            }
294                            else {
295                                heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
296                            }
297                        }
298                        Err(e) => {
299                            warn!("Error while decimating: {}", e);
300                        }
301                    }
302                });
303            }
304
305            // Kill the victims
306            {
307                let mut location_ids: Vec<Id> = Vec::new();
308                {
309                    let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
310
311                    for (_time_millis_bytes, location_id) in heap_of_locations_to_decimate {
312                        location_ids.push(location_id);
313                        post_bundles_last_touched.remove(&location_id);
314
315                        let metadata = self.environment_store.post_bundle_metadata_get(&location_id);
316                        if let Ok(metadata) = metadata {
317                            if let Some(metadata) = metadata {
318                                self.post_bundle_current_size_bytes.fetch_sub(metadata.size, Ordering::Relaxed);
319                            }
320                        }
321                    }
322                }
323
324                self.environment_store.post_bundles_delete(&location_ids)?;
325            }
326        }
327
328        let post_bundle_current_size_bytes_after = self.post_bundle_current_size_bytes();
329        info!(
330            "Total size after decimation is {} ({}%): delta={}",
331            post_bundle_current_size_bytes_after,
332            100 * post_bundle_current_size_bytes_after / self.environment_dimensions.max_size_bytes,
333            post_bundle_current_size_bytes_before - post_bundle_current_size_bytes_after
334        );
335
336        Ok(())
337    }
338}
339
340// ----------------------------------------------------------------------------------------------------------------------------------
341
342#[cfg(test)]
343pub mod tests {
344    use crate::environment::environment::{EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
345    use hashiverse_lib::anyhow_assert_ge;
346    use hashiverse_lib::tools::tools;
347    use hashiverse_lib::tools::tools::get_temp_dir;
348    use hashiverse_lib::tools::types::Id;
349    use std::sync::Arc;
350    use tokio_util::sync::CancellationToken;
351    use hashiverse_lib::tools::time_provider::time_provider::{ScaledTimeProvider, TimeProvider};
352
353    pub async fn basics_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
354        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
355        // configure_logging_with_time_provider("trace", time_provider.clone());
356
357        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
358        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(1024 * 1024);
359        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
360        let environment = environment_factory.open_next_available(environment_dimensions).await?;
361
362        let delete_post_bundle = |location_id: &Id| -> anyhow::Result<()> {
363            let vec = vec![*location_id].to_vec();
364            environment.environment_store.post_bundles_delete(&vec)
365        };
366
367        // Put some metadata
368        {
369            let num_bytes = 16 * 1024;
370            let time_millis = time_provider.current_time_millis();
371            let location_id = Id::random();
372            let post_bundle_metadata = PostBundleMetadata::zero();
373            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, num_bytes)?;
374
375            let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
376            assert_eq!(post_bundle_metadata_check, Some(post_bundle_metadata));
377
378            delete_post_bundle(&location_id)?;
379            let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
380            assert_eq!(post_bundle_metadata_check, None);
381        }
382
383        // Put some bytes
384        {
385            let num_bytes = 16 * 1024;
386            let time_millis = time_provider.current_time_millis();
387            let location_id = Id::random();
388            let bytes = tools::random_bytes(num_bytes);
389            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
390            assert_eq!(num_bytes, environment.post_bundle_current_size_bytes());
391
392            let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
393            assert_eq!(bytes_check, Some(Bytes::from(bytes)));
394
395            delete_post_bundle(&location_id)?;
396            let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
397            assert_eq!(bytes_check, None);
398        }
399
400        Ok(())
401    }
402
403    fn get_random_post_bundle(max_size: usize) -> (Id, PostBundleMetadata, Vec<u8>) {
404        let num_bytes = tools::random_usize_bounded(max_size);
405        let location_id = Id::random();
406        let bytes = tools::random_bytes(num_bytes);
407        let mut post_bundle_metadata = PostBundleMetadata::zero();
408        post_bundle_metadata.size = num_bytes;
409
410        (location_id, post_bundle_metadata, bytes)
411    }
412
413    pub async fn decimation_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
414        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
415        // configure_logging_with_time_provider("trace", time_provider.clone());
416        let cancellation_token = CancellationToken::new();
417
418        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
419        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
420        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
421        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
422
423        // Post to many locations
424        let num_iterations = 10000;
425        for i in 0..num_iterations {
426            let time_millis = time_provider.current_time_millis();
427            let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(8 * 1024);
428
429            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
430            // We dont actually have to write the bundles to disk for this test - we just want to confirm the eviction is taking place...
431            // environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
432
433            if 0 == i % 10 {
434                environment.do_maintenance(&cancellation_token, time_millis).await?;
435            }
436        }
437
438        {
439            let time_millis = time_provider.current_time_millis();
440            environment.do_maintenance(&cancellation_token, time_millis).await?;
441
442            anyhow_assert_ge!(environment_dimensions.max_size_bytes, environment.post_bundle_current_size_bytes());
443        }
444
445        Ok(())
446    }
447
448    pub async fn decimation_convergence_test<TEnvironmentFactory: EnvironmentFactory>(num_posts: usize) -> anyhow::Result<()> {
449        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
450        // configure_logging_with_time_provider("trace", time_provider.clone());
451        let cancellation_token = CancellationToken::new();
452
453        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
454        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
455        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
456        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
457
458        // Post to many locations
459        for i in 0..num_posts {
460            let time_millis = time_provider.current_time_millis();
461            let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
462
463            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
464            // We dont actually have to write the bundles to disk for this test - we just want to confirm the eviction is taking place...
465            //environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
466
467            if 0 == i % 100 {
468                environment.do_maintenance(&cancellation_token, time_millis).await?;
469            }
470        }
471
472        {
473            let time_millis = time_provider.current_time_millis();
474            environment.do_maintenance(&cancellation_token, time_millis).await?;
475            assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
476        }
477
478        Ok(())
479    }
480
481    pub async fn decimation_existence_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
482        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
483        // configure_logging_with_time_provider("trace", time_provider.clone());
484        let cancellation_token = CancellationToken::new();
485
486        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
487        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
488        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
489        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
490
491        let time_millis = time_provider.current_time_millis();
492
493        let location_id_should_exist_metadata = {
494            let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
495            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
496            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
497            location_id
498        };
499
500        let location_id_should_exist_bytes = {
501            let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
502            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
503            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
504            location_id
505        };
506
507        let location_id_should_not_exist = {
508            let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
509            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
510            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
511            location_id
512        };
513
514        // Post to many locations
515        let num_iterations = 128 * 1000;
516        for i in 0..num_iterations {
517            let time_millis = time_provider.current_time_millis();
518            let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
519
520            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
521            // We dont actually have to write the bundles to disk for this test - we just want to confirm the eviction is taking place...
522            //environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
523
524            // Lets keep some posts active
525            if 0 == i % 30 {
526                let _ = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
527                let _ = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes);
528            }
529
530            if 0 == i % 100 {
531                environment.do_maintenance(&cancellation_token, time_millis).await?;
532            }
533        }
534
535        {
536            let time_millis = time_provider.current_time_millis();
537            environment.do_maintenance(&cancellation_token, time_millis).await?;
538            assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
539        }
540
541        // Check the things that should still exist
542        {
543            // Existence by metadata queries
544            {
545                let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
546                assert!(result.is_some());
547            }
548            {
549                let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_metadata)?;
550                assert!(result.is_some());
551            }
552        }
553        {
554            // Existence by bytes queries
555            {
556                let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_bytes)?;
557                assert!(result.is_some());
558            }
559            {
560                let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes)?;
561                assert!(result.is_some());
562            }
563        }
564
565        // Check the thing that should have died
566        {
567            {
568                let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_not_exist)?;
569                assert!(result.is_none());
570            }
571            {
572                let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_not_exist)?;
573                assert!(result.is_none());
574            }
575        }
576
577        Ok(())
578    }
579
580    pub async fn decimation_feedback_deleted_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
581        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
582        let cancellation_token = CancellationToken::new();
583
584        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
585        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
586        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
587        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
588        let store = &environment.environment_store;
589
590        let time_millis = time_provider.current_time_millis();
591
592        // Create a bundle and attach feedback to it
593        let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
594        environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
595        environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
596
597        let feedback = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
598        store.post_feedback_put_if_more_powerful(&location_id, &feedback)?;
599        assert!(!store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
600
601        // Flood with bundles (never re-access ours) to trigger decimation
602        for i in 0..128 * 1000usize {
603            let time_millis = time_provider.current_time_millis();
604            let (flood_id, flood_meta, _) = get_random_post_bundle(1024);
605            environment.put_post_bundle_metadata(time_millis, &flood_id, &flood_meta, flood_meta.size)?;
606            if i % 100 == 0 {
607                environment.do_maintenance(&cancellation_token, time_millis).await?;
608            }
609        }
610        environment.do_maintenance(&cancellation_token, time_provider.current_time_millis()).await?;
611
612        // The bundle must have been evicted
613        assert!(environment.get_post_bundle_metadata(time_millis, &location_id)?.is_none(), "bundle should have been decimated");
614
615        // Its feedback must be gone too
616        assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty(), "feedback should have been deleted with the bundle");
617
618        Ok(())
619    }
620
621    // ── feedback tests ────────────────────────────────────────────────────────
622
623    use bytes::{Buf, Bytes};
624    use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
625    use hashiverse_lib::tools::types::{Pow, Salt};
626
627    fn decode_all_feedbacks(mut bytes: Bytes) -> anyhow::Result<Vec<EncodedPostFeedbackV1>> {
628        let mut result = Vec::new();
629        while bytes.has_remaining() {
630            result.push(EncodedPostFeedbackV1::decode_from_bytes(&mut bytes)?);
631        }
632        Ok(result)
633    }
634
635    pub async fn feedback_bytes_get_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
636        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
637        let factory = TEnvironmentFactory::new(&temp_dir_path);
638        let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
639        let store = &env.environment_store;
640
641        let location_id = Id::random();
642        let other_location_id = Id::random();
643
644        // Empty: unknown location returns empty bytes
645        assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
646
647        let f1 = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
648        let f2 = EncodedPostFeedbackV1::new(Id::random(), 2, Salt::random(), Pow(20));
649        let f_other = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(5));
650
651        store.post_feedback_put_if_more_powerful(&location_id, &f1)?;
652        store.post_feedback_put_if_more_powerful(&location_id, &f2)?;
653        store.post_feedback_put_if_more_powerful(&other_location_id, &f_other)?;
654
655        // Correct entries returned for location_id
656        let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
657        let mut decoded = decode_all_feedbacks(bytes)?;
658        decoded.sort_by_key(|f| f.feedback_type);
659        assert_eq!(decoded.len(), 2);
660        assert_eq!(decoded[0], f1);
661        assert_eq!(decoded[1], f2);
662
663        // Other location unaffected
664        let other_bytes = store.post_bundle_feedbacks_bytes_get(&other_location_id)?;
665        let other_decoded = decode_all_feedbacks(other_bytes)?;
666        assert_eq!(other_decoded.len(), 1);
667        assert_eq!(other_decoded[0], f_other);
668
669        Ok(())
670    }
671
672    pub async fn feedback_put_if_more_powerful_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
673        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
674        let factory = TEnvironmentFactory::new(&temp_dir_path);
675        let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
676        let store = &env.environment_store;
677
678        let location_id = Id::random();
679        let post_id = Id::random();
680        let feedback_type = 3u8;
681
682        let get_single = |pow_expected: Pow| -> anyhow::Result<()> {
683            let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
684            let decoded = decode_all_feedbacks(bytes)?;
685            assert_eq!(decoded.len(), 1);
686            assert_eq!(decoded[0].post_id, post_id);
687            assert_eq!(decoded[0].feedback_type, feedback_type);
688            assert_eq!(decoded[0].pow, pow_expected);
689            Ok(())
690        };
691
692        // First put: stored
693        let f_low = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(10));
694        store.post_feedback_put_if_more_powerful(&location_id, &f_low)?;
695        get_single(Pow(10))?;
696
697        // Higher pow: replaces
698        let f_high = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
699        store.post_feedback_put_if_more_powerful(&location_id, &f_high)?;
700        get_single(Pow(20))?;
701
702        // Equal pow: existing kept (salt unchanged)
703        let salt_before = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
704        let f_equal = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
705        store.post_feedback_put_if_more_powerful(&location_id, &f_equal)?;
706        let salt_after = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
707        assert_eq!(salt_before, salt_after);
708
709        // Lower pow: existing kept
710        let f_weaker = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(5));
711        store.post_feedback_put_if_more_powerful(&location_id, &f_weaker)?;
712        get_single(Pow(20))?;
713
714        // Different feedback_type for same post: stored as a separate entry
715        let f_other_type = EncodedPostFeedbackV1::new(post_id, feedback_type + 1, Salt::random(), Pow(1));
716        store.post_feedback_put_if_more_powerful(&location_id, &f_other_type)?;
717        let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
718        assert_eq!(all.len(), 2);
719
720        // Different post_id for same feedback_type: stored as a separate entry
721        let f_other_post = EncodedPostFeedbackV1::new(Id::random(), feedback_type, Salt::random(), Pow(1));
722        store.post_feedback_put_if_more_powerful(&location_id, &f_other_post)?;
723        let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
724        assert_eq!(all.len(), 3);
725
726        // Different location_id: does not affect original location
727        let f_other_loc = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(99));
728        store.post_feedback_put_if_more_powerful(&Id::random(), &f_other_loc)?;
729        let all_original = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
730        assert_eq!(all_original.len(), 3);
731
732        Ok(())
733    }
734}