Skip to main content

hashiverse_server_lib/environment/
environment.rs

1//! # Server-side data environment — persistence, quotas, decimation
2//!
3//! [`Environment`] is the facade every handler in
4//! [`crate::server`] writes through when it needs to read or mutate persisted
5//! data: post bundles, per-bundle metadata, feedback, and server config. It wraps a
6//! pluggable [`crate::environment::environment_store::EnvironmentStore`] and layers
7//! on top:
8//!
9//! - **Striped locks** — 256 mutexes keyed by the first byte of the bundle id, so
10//!   unrelated bundles never contend. Hot locations stay local to their own stripe.
11//! - **Quota enforcement** — disk usage is tracked against the operator's configured
12//!   cap (from [`crate::server::args::Args`]). When the cap is crossed, decimation
13//!   runs as an offline `tokio::task::spawn_blocking` job, evicting the
14//!   least-recently-accessed bundles until the store is back under quota — without
15//!   stalling the live request loop.
16//! - **Feedback merging** — writes are monotonic: a new feedback entry is kept only
17//!   if its proof-of-work is at least as strong as any existing entry for the same
18//!   `(location, post, feedback_type)`, so sybil floods can't overwrite real votes.
19
20use crate::environment::environment_store::EnvironmentStore;
21use async_trait::async_trait;
22use bytes::Bytes;
23use hashiverse_lib::tools::time::TimeMillis;
24use hashiverse_lib::tools::types::Id;
25use log::{info, warn};
26use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use serde::{Deserialize, Serialize};
28use std::collections::{BinaryHeap, HashMap};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use serde::de::DeserializeOwned;
32use tokio_util::sync::CancellationToken;
33use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
34use hashiverse_lib::tools::{compression, json};
35
36/// An environment is a 'folder' location that contains everything to persist a server
37///  - e.g. config , keys, posts, etc
38pub const CONFIG_SERVER_ID: &str = "server_id";
39pub const CONFIG_KADEMLIA_PEER_BUCKETS: &str = "kademlia_peer_buckets";
40pub const CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES: &str = "post_bundle_current_size_bytes";
41
42#[derive(Clone, Copy)]
43pub struct EnvironmentDimensions {
44    pub max_size_bytes: usize,
45    pub max_quota_used: f64,
46}
47
48impl EnvironmentDimensions {
49    pub fn with_max_size_bytes(&self, max_size_bytes: usize) -> Self {
50        Self { max_size_bytes, ..*self }
51    }
52}
53
54impl Default for EnvironmentDimensions {
55    fn default() -> Self {
56        Self {
57            max_size_bytes: 20 * 1024 * 1024 * 1024,
58            max_quota_used: 0.9,
59        }
60    }
61}
62
63#[async_trait]
64pub trait EnvironmentFactory: Send + Sync {
65    fn new(base_path: &str) -> Self
66    where
67        Self: Sized;
68    async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment>;
69}
70
71#[derive(Serialize, Deserialize, Debug, PartialEq)]
72pub struct PostBundleMetadata {
73    pub size: usize,
74    pub num_posts: u8,
75    pub num_posts_granted: u8,
76    pub overflowed: bool,
77    pub sealed: bool,
78}
79
80impl PostBundleMetadata {
81    pub fn zero() -> PostBundleMetadata {
82        Self {
83            size: 0,
84            num_posts: 0,
85            num_posts_granted: 0,
86            overflowed: false,
87            sealed: false,
88        }
89    }
90}
91
92pub struct Environment {
93    environment_dimensions: EnvironmentDimensions,
94    environment_store: Arc<dyn EnvironmentStore>,
95
96    post_bundle_locks: [RwLock<()>; 256], // We do striped locking to allow better parallelism
97    post_bundles_last_touched_batch: RwLock<HashMap<Id, TimeMillis>>,
98    post_bundle_current_size_bytes: AtomicUsize,
99    decimation_lock: RwLock<()>, // Allows only one decimation process at a time
100}
101
102impl Environment {
103    pub async fn new(environment_store: Arc<dyn EnvironmentStore>, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
104        let post_bundle_locks: [RwLock<()>; 256] = std::array::from_fn(|_| RwLock::new(()));
105        let post_bundles_last_touched = RwLock::new(HashMap::new());
106        let post_bundle_current_size_bytes = environment_store.config_get_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES)?.unwrap_or(0);
107        let post_bundle_current_size_bytes = AtomicUsize::new(post_bundle_current_size_bytes);
108
109        Ok(Environment {
110            environment_dimensions,
111            environment_store,
112            post_bundle_locks,
113            post_bundles_last_touched_batch: post_bundles_last_touched,
114            post_bundle_current_size_bytes,
115            decimation_lock: RwLock::new(()),
116        })
117    }
118
119    pub fn get_read_lock_for_location_id(&self, location_id: &Id) -> RwLockReadGuard<'_, ()> {
120        self.post_bundle_locks[location_id.0[0] as usize].read()
121    }
122
123    pub fn get_write_lock_for_location_id(&self, location_id: &Id) -> RwLockWriteGuard<'_, ()> {
124        self.post_bundle_locks[location_id.0[0] as usize].write()
125    }
126
127    pub fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
128        self.environment_store.config_get_bytes(key)
129    }
130
131    pub fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
132        self.environment_store.config_put_bytes(key, v)
133    }
134
135    pub fn config_put_struct<T: Serialize>(&self, key: &str, value: &T) -> anyhow::Result<()> {
136        let bytes = json::struct_to_bytes(value)?;
137        let bytes_compressed = compression::compress_for_speed(&bytes)?.to_bytes();
138        self.config_put_bytes(key, bytes_compressed.to_vec())
139    }
140
141    pub fn config_get_struct<T: DeserializeOwned>(&self, key: &str) -> anyhow::Result<Option<T>> {
142        let result = self.config_get_bytes(key)?;
143        match result {
144            Some(bytes_compressed) => {
145                let bytes = compression::decompress(&bytes_compressed)?.to_bytes();
146                let value = json::bytes_to_struct::<T>(&bytes)?;
147                Ok(Some(value))
148            }
149            None => Ok(None),
150        }
151    }
152
153
154    pub fn get_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
155        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
156        self.environment_store.post_bundle_metadata_get(location_id)
157    }
158
159    pub fn get_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
160        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
161        self.environment_store.post_bundle_bytes_get(location_id)
162    }
163
164    pub fn get_post_bundle_encoded_post_feedbacks_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Bytes> {
165        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
166        self.environment_store.post_bundle_feedbacks_bytes_get(location_id)
167    }
168    
169    pub fn put_post_feedback_if_more_powerful(&self, time_millis: TimeMillis, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
170        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
171        self.environment_store.post_feedback_put_if_more_powerful(location_id, encoded_post_feedback)
172    }
173
174    pub fn put_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
175        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
176        self.environment_store.post_bundle_bytes_put(location_id, bytes)?;
177        Ok(())
178    }
179
180    pub fn put_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id, post_bundle_metadata: &PostBundleMetadata, additional_bytes: usize) -> anyhow::Result<()> {
181        self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
182        self.environment_store.post_bundle_metadata_put(location_id, post_bundle_metadata)?;
183        self.post_bundle_current_size_bytes.fetch_add(additional_bytes, Ordering::Relaxed);
184        Ok(())
185    }
186
187    fn post_bundle_current_size_bytes(&self) -> usize {
188        self.post_bundle_current_size_bytes.load(Ordering::Relaxed)
189    }
190
191    /// Total on-disk bytes occupied by stored post bundles, as tracked by the
192    /// quota counter. Exposed for the server-stats RPC.
193    pub fn post_bundle_total_bytes(&self) -> usize {
194        self.post_bundle_current_size_bytes()
195    }
196
197    /// Number of post bundles currently in the store.
198    pub fn post_bundle_count(&self) -> anyhow::Result<usize> {
199        self.environment_store.post_bundle_count()
200    }
201
202    /// Number of distinct (location, post, feedback_type) feedback entries currently
203    /// in the store.
204    pub fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
205        self.environment_store.post_bundle_feedback_count()
206    }
207
208    pub async fn do_maintenance(self: &Arc<Self>, cancellation_token: &CancellationToken, time_millis: TimeMillis) -> anyhow::Result<()> {
209        // log::trace!("do_maintenance");
210
211        // Push the current total disk used to store
212        let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
213        self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes)?;
214
215        // Flush the last_accessed times to store
216        {
217            let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
218            if !post_bundles_last_touched.is_empty() {
219                log::trace!("Flushing {} last accessed post bundles", post_bundles_last_touched.len());
220                self.environment_store.post_bundles_last_accessed_flush(&post_bundles_last_touched)?;
221                post_bundles_last_touched.clear();
222            }
223        }
224
225        // Have we passed our allowed quota of available space?
226        let quota_used = post_bundle_current_size_bytes as f64 / self.environment_dimensions.max_size_bytes as f64;
227        if quota_used > self.environment_dimensions.max_quota_used {
228            let env = self.clone();
229            let cancellation_token = cancellation_token.clone();
230            let max_quota_used = self.environment_dimensions.max_quota_used;
231            tokio::task::spawn_blocking(move || env.do_decimation(cancellation_token, time_millis, quota_used, max_quota_used)).await??;
232        }
233
234        Ok(())
235    }
236
237    fn do_decimation(self: &Arc<Self>, cancellation_token: CancellationToken, _time_millis: TimeMillis, quota_used: f64, max_quota_used: f64) -> anyhow::Result<()> {
238        let lock = self.decimation_lock.try_read();
239        if lock.is_none() {
240            log::trace!("A decimation is already in progress.");
241            return Ok(());
242        }
243
244        // Make sure we flush our size update to the database
245        scopeguard::defer! {
246            let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
247            let _ = self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes);
248        }
249
250        // Work out how much we need to decimate
251        let total_rows = self.environment_store.post_bundle_count()?;
252        let decimation_count = (total_rows as f64) * (1f64 - 0.98f64 * max_quota_used / quota_used);
253
254        if 0.0 >= decimation_count {
255            warn!("Decimation count is unexpectedly zero, skipping decimation");
256            return Ok(());
257        }
258
259        // Find a neatish rounding of batches and rows per batch
260        let rows_per_batch = 100f64.min(decimation_count);
261        let num_batches = decimation_count / rows_per_batch;
262        let num_batches = num_batches.ceil() as usize;
263        let rows_per_batch = decimation_count as usize / num_batches;
264
265        info!("Decimation of {}/{} items will be done in {} batches of {} deletes each", decimation_count, total_rows, num_batches, rows_per_batch);
266        let post_bundle_current_size_bytes_before = self.post_bundle_current_size_bytes();
267        info!(
268            "Total size before decimation is {} ({}%)",
269            post_bundle_current_size_bytes_before,
270            100 * post_bundle_current_size_bytes_before / self.environment_dimensions.max_size_bytes
271        );
272
273        for batch in 0..num_batches {
274            if cancellation_token.is_cancelled() {
275                break;
276            }
277
278            // Who will we kill
279            let mut heap_of_locations_to_decimate = BinaryHeap::new();
280            {
281                let random_prefix = Id::random();
282                log::trace!("Decimation batch {} prefix: {}", batch, random_prefix);
283                self.environment_store.post_bundles_last_accessed_iter(&random_prefix).take(5 * rows_per_batch).for_each(|pair| {
284                    match pair {
285                        Ok((location_id, time_millis_bytes)) => {
286                            if heap_of_locations_to_decimate.len() >= rows_per_batch {
287                                let &(most_recent_time_millis_bytes, _) = heap_of_locations_to_decimate.peek().expect("should always work as we have checked the length");
288                                if most_recent_time_millis_bytes > time_millis_bytes {
289                                    heap_of_locations_to_decimate.pop();
290                                    heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
291                                }
292                            }
293                            else {
294                                heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
295                            }
296                        }
297                        Err(e) => {
298                            warn!("Error while decimating: {}", e);
299                        }
300                    }
301                });
302            }
303
304            // Kill the victims
305            {
306                let mut location_ids: Vec<Id> = Vec::new();
307                {
308                    let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
309
310                    for (_time_millis_bytes, location_id) in heap_of_locations_to_decimate {
311                        location_ids.push(location_id);
312                        post_bundles_last_touched.remove(&location_id);
313
314                        let metadata = self.environment_store.post_bundle_metadata_get(&location_id);
315                        if let Ok(metadata) = metadata {
316                            if let Some(metadata) = metadata {
317                                self.post_bundle_current_size_bytes.fetch_sub(metadata.size, Ordering::Relaxed);
318                            }
319                        }
320                    }
321                }
322
323                self.environment_store.post_bundles_delete(&location_ids)?;
324            }
325        }
326
327        let post_bundle_current_size_bytes_after = self.post_bundle_current_size_bytes();
328        info!(
329            "Total size after decimation is {} ({}%): delta={}",
330            post_bundle_current_size_bytes_after,
331            100 * post_bundle_current_size_bytes_after / self.environment_dimensions.max_size_bytes,
332            post_bundle_current_size_bytes_before - post_bundle_current_size_bytes_after
333        );
334
335        Ok(())
336    }
337}
338
339// ----------------------------------------------------------------------------------------------------------------------------------
340
341#[cfg(test)]
342pub mod tests {
343    use crate::environment::environment::{EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
344    use hashiverse_lib::anyhow_assert_ge;
345    use hashiverse_lib::tools::tools;
346    use hashiverse_lib::tools::tools::get_temp_dir;
347    use hashiverse_lib::tools::types::Id;
348    use std::sync::Arc;
349    use tokio_util::sync::CancellationToken;
350    use hashiverse_lib::tools::time_provider::time_provider::{ScaledTimeProvider, TimeProvider};
351
352    pub async fn basics_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
353        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
354        // configure_logging_with_time_provider("trace", time_provider.clone());
355
356        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
357        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(1024 * 1024);
358        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
359        let environment = environment_factory.open_next_available(environment_dimensions).await?;
360
361        let delete_post_bundle = |location_id: &Id| -> anyhow::Result<()> {
362            let vec = [*location_id].to_vec();
363            environment.environment_store.post_bundles_delete(&vec)
364        };
365
366        // Put some metadata
367        {
368            let num_bytes = 16 * 1024;
369            let time_millis = time_provider.current_time_millis();
370            let location_id = Id::random();
371            let post_bundle_metadata = PostBundleMetadata::zero();
372            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, num_bytes)?;
373
374            let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
375            assert_eq!(post_bundle_metadata_check, Some(post_bundle_metadata));
376
377            delete_post_bundle(&location_id)?;
378            let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
379            assert_eq!(post_bundle_metadata_check, None);
380        }
381
382        // Put some bytes
383        {
384            let num_bytes = 16 * 1024;
385            let time_millis = time_provider.current_time_millis();
386            let location_id = Id::random();
387            let bytes = tools::random_bytes(num_bytes);
388            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
389            assert_eq!(num_bytes, environment.post_bundle_current_size_bytes());
390
391            let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
392            assert_eq!(bytes_check, Some(Bytes::from(bytes)));
393
394            delete_post_bundle(&location_id)?;
395            let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
396            assert_eq!(bytes_check, None);
397        }
398
399        Ok(())
400    }
401
402    fn get_random_post_bundle(max_size: usize) -> (Id, PostBundleMetadata, Vec<u8>) {
403        let num_bytes = tools::random_usize_bounded(max_size);
404        let location_id = Id::random();
405        let bytes = tools::random_bytes(num_bytes);
406        let mut post_bundle_metadata = PostBundleMetadata::zero();
407        post_bundle_metadata.size = num_bytes;
408
409        (location_id, post_bundle_metadata, bytes)
410    }
411
412    pub async fn decimation_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
413        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
414        // configure_logging_with_time_provider("trace", time_provider.clone());
415        let cancellation_token = CancellationToken::new();
416
417        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
418        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
419        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
420        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
421
422        // Post to many locations
423        let num_iterations = 10000;
424        for i in 0..num_iterations {
425            let time_millis = time_provider.current_time_millis();
426            let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(8 * 1024);
427
428            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
429            // We dont actually have to write the bundles to disk for this test - we just want to confirm the eviction is taking place...
430            // environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
431
432            if 0 == i % 10 {
433                environment.do_maintenance(&cancellation_token, time_millis).await?;
434            }
435        }
436
437        {
438            let time_millis = time_provider.current_time_millis();
439            environment.do_maintenance(&cancellation_token, time_millis).await?;
440
441            anyhow_assert_ge!(environment_dimensions.max_size_bytes, environment.post_bundle_current_size_bytes());
442        }
443
444        Ok(())
445    }
446
447    pub async fn decimation_convergence_test<TEnvironmentFactory: EnvironmentFactory>(num_posts: usize) -> anyhow::Result<()> {
448        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
449        // configure_logging_with_time_provider("trace", time_provider.clone());
450        let cancellation_token = CancellationToken::new();
451
452        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
453        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
454        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
455        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
456
457        // Post to many locations
458        for i in 0..num_posts {
459            let time_millis = time_provider.current_time_millis();
460            let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
461
462            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
463            // We dont actually have to write the bundles to disk for this test - we just want to confirm the eviction is taking place...
464            //environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
465
466            if 0 == i % 100 {
467                environment.do_maintenance(&cancellation_token, time_millis).await?;
468            }
469        }
470
471        {
472            let time_millis = time_provider.current_time_millis();
473            environment.do_maintenance(&cancellation_token, time_millis).await?;
474            assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
475        }
476
477        Ok(())
478    }
479
480    pub async fn decimation_existence_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
481        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
482        // configure_logging_with_time_provider("trace", time_provider.clone());
483        let cancellation_token = CancellationToken::new();
484
485        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
486        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
487        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
488        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
489
490        let time_millis = time_provider.current_time_millis();
491
492        let location_id_should_exist_metadata = {
493            let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
494            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
495            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
496            location_id
497        };
498
499        let location_id_should_exist_bytes = {
500            let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
501            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
502            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
503            location_id
504        };
505
506        let location_id_should_not_exist = {
507            let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
508            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
509            environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
510            location_id
511        };
512
513        // Post to many locations
514        let num_iterations = 128 * 1000;
515        for i in 0..num_iterations {
516            let time_millis = time_provider.current_time_millis();
517            let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
518
519            environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
520            // We dont actually have to write the bundles to disk for this test - we just want to confirm the eviction is taking place...
521            //environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
522
523            // Lets keep some posts active
524            if 0 == i % 30 {
525                let _ = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
526                let _ = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes);
527            }
528
529            if 0 == i % 100 {
530                environment.do_maintenance(&cancellation_token, time_millis).await?;
531            }
532        }
533
534        {
535            let time_millis = time_provider.current_time_millis();
536            environment.do_maintenance(&cancellation_token, time_millis).await?;
537            assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
538        }
539
540        // Check the things that should still exist
541        {
542            // Existence by metadata queries
543            {
544                let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
545                assert!(result.is_some());
546            }
547            {
548                let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_metadata)?;
549                assert!(result.is_some());
550            }
551        }
552        {
553            // Existence by bytes queries
554            {
555                let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_bytes)?;
556                assert!(result.is_some());
557            }
558            {
559                let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes)?;
560                assert!(result.is_some());
561            }
562        }
563
564        // Check the thing that should have died
565        {
566            {
567                let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_not_exist)?;
568                assert!(result.is_none());
569            }
570            {
571                let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_not_exist)?;
572                assert!(result.is_none());
573            }
574        }
575
576        Ok(())
577    }
578
579    pub async fn decimation_feedback_deleted_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
580        let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
581        let cancellation_token = CancellationToken::new();
582
583        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
584        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
585        let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
586        let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
587        let store = &environment.environment_store;
588
589        let time_millis = time_provider.current_time_millis();
590
591        // Create a bundle and attach feedback to it
592        let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
593        environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
594        environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
595
596        let feedback = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
597        store.post_feedback_put_if_more_powerful(&location_id, &feedback)?;
598        assert!(!store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
599
600        // Flood with bundles (never re-access ours) to trigger decimation
601        for i in 0..128 * 1000usize {
602            let time_millis = time_provider.current_time_millis();
603            let (flood_id, flood_meta, _) = get_random_post_bundle(1024);
604            environment.put_post_bundle_metadata(time_millis, &flood_id, &flood_meta, flood_meta.size)?;
605            if i % 100 == 0 {
606                environment.do_maintenance(&cancellation_token, time_millis).await?;
607            }
608        }
609        environment.do_maintenance(&cancellation_token, time_provider.current_time_millis()).await?;
610
611        // The bundle must have been evicted
612        assert!(environment.get_post_bundle_metadata(time_millis, &location_id)?.is_none(), "bundle should have been decimated");
613
614        // Its feedback must be gone too
615        assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty(), "feedback should have been deleted with the bundle");
616
617        Ok(())
618    }
619
620    // ── feedback tests ────────────────────────────────────────────────────────
621
622    use bytes::{Buf, Bytes};
623    use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
624    use hashiverse_lib::tools::types::{Pow, Salt};
625
626    fn decode_all_feedbacks(mut bytes: Bytes) -> anyhow::Result<Vec<EncodedPostFeedbackV1>> {
627        let mut result = Vec::new();
628        while bytes.has_remaining() {
629            result.push(EncodedPostFeedbackV1::decode_from_bytes(&mut bytes)?);
630        }
631        Ok(result)
632    }
633
634    pub async fn feedback_bytes_get_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
635        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
636        let factory = TEnvironmentFactory::new(&temp_dir_path);
637        let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
638        let store = &env.environment_store;
639
640        let location_id = Id::random();
641        let other_location_id = Id::random();
642
643        // Empty: unknown location returns empty bytes
644        assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
645
646        let f1 = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
647        let f2 = EncodedPostFeedbackV1::new(Id::random(), 2, Salt::random(), Pow(20));
648        let f_other = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(5));
649
650        store.post_feedback_put_if_more_powerful(&location_id, &f1)?;
651        store.post_feedback_put_if_more_powerful(&location_id, &f2)?;
652        store.post_feedback_put_if_more_powerful(&other_location_id, &f_other)?;
653
654        // Correct entries returned for location_id
655        let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
656        let mut decoded = decode_all_feedbacks(bytes)?;
657        decoded.sort_by_key(|f| f.feedback_type);
658        assert_eq!(decoded.len(), 2);
659        assert_eq!(decoded[0], f1);
660        assert_eq!(decoded[1], f2);
661
662        // Other location unaffected
663        let other_bytes = store.post_bundle_feedbacks_bytes_get(&other_location_id)?;
664        let other_decoded = decode_all_feedbacks(other_bytes)?;
665        assert_eq!(other_decoded.len(), 1);
666        assert_eq!(other_decoded[0], f_other);
667
668        Ok(())
669    }
670
671    pub async fn feedback_put_if_more_powerful_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
672        let (_temp_dir, temp_dir_path) = get_temp_dir()?;
673        let factory = TEnvironmentFactory::new(&temp_dir_path);
674        let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
675        let store = &env.environment_store;
676
677        let location_id = Id::random();
678        let post_id = Id::random();
679        let feedback_type = 3u8;
680
681        let get_single = |pow_expected: Pow| -> anyhow::Result<()> {
682            let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
683            let decoded = decode_all_feedbacks(bytes)?;
684            assert_eq!(decoded.len(), 1);
685            assert_eq!(decoded[0].post_id, post_id);
686            assert_eq!(decoded[0].feedback_type, feedback_type);
687            assert_eq!(decoded[0].pow, pow_expected);
688            Ok(())
689        };
690
691        // First put: stored
692        let f_low = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(10));
693        store.post_feedback_put_if_more_powerful(&location_id, &f_low)?;
694        get_single(Pow(10))?;
695
696        // Higher pow: replaces
697        let f_high = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
698        store.post_feedback_put_if_more_powerful(&location_id, &f_high)?;
699        get_single(Pow(20))?;
700
701        // Equal pow: existing kept (salt unchanged)
702        let salt_before = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
703        let f_equal = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
704        store.post_feedback_put_if_more_powerful(&location_id, &f_equal)?;
705        let salt_after = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
706        assert_eq!(salt_before, salt_after);
707
708        // Lower pow: existing kept
709        let f_weaker = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(5));
710        store.post_feedback_put_if_more_powerful(&location_id, &f_weaker)?;
711        get_single(Pow(20))?;
712
713        // Different feedback_type for same post: stored as a separate entry
714        let f_other_type = EncodedPostFeedbackV1::new(post_id, feedback_type + 1, Salt::random(), Pow(1));
715        store.post_feedback_put_if_more_powerful(&location_id, &f_other_type)?;
716        let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
717        assert_eq!(all.len(), 2);
718
719        // Different post_id for same feedback_type: stored as a separate entry
720        let f_other_post = EncodedPostFeedbackV1::new(Id::random(), feedback_type, Salt::random(), Pow(1));
721        store.post_feedback_put_if_more_powerful(&location_id, &f_other_post)?;
722        let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
723        assert_eq!(all.len(), 3);
724
725        // Different location_id: does not affect original location
726        let f_other_loc = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(99));
727        store.post_feedback_put_if_more_powerful(&Id::random(), &f_other_loc)?;
728        let all_original = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
729        assert_eq!(all_original.len(), 3);
730
731        Ok(())
732    }
733}