1use crate::environment::environment_store::EnvironmentStore;
21use async_trait::async_trait;
22use bytes::Bytes;
23use hashiverse_lib::tools::time::TimeMillis;
24use hashiverse_lib::tools::types::Id;
25use log::{info, warn};
26use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use serde::{Deserialize, Serialize};
28use std::collections::{BinaryHeap, HashMap};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use serde::de::DeserializeOwned;
32use tokio_util::sync::CancellationToken;
33use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
34use hashiverse_lib::tools::{compression, json};
35
36pub const CONFIG_SERVER_ID: &str = "server_id";
40pub const CONFIG_KADEMLIA_PEER_BUCKETS: &str = "kademlia_peer_buckets";
41pub const CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES: &str = "post_bundle_current_size_bytes";
42
43#[derive(Clone, Copy)]
44pub struct EnvironmentDimensions {
45 pub max_size_bytes: usize,
46 pub max_quota_used: f64,
47}
48
49impl EnvironmentDimensions {
50 pub fn with_max_size_bytes(&self, max_size_bytes: usize) -> Self {
51 Self { max_size_bytes, ..*self }
52 }
53}
54
55impl Default for EnvironmentDimensions {
56 fn default() -> Self {
57 Self {
58 max_size_bytes: 20 * 1024 * 1024 * 1024,
59 max_quota_used: 0.9,
60 }
61 }
62}
63
64#[async_trait]
65pub trait EnvironmentFactory: Send + Sync {
66 fn new(base_path: &str) -> Self
67 where
68 Self: Sized;
69 async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment>;
70}
71
72#[derive(Serialize, Deserialize, Debug, PartialEq)]
73pub struct PostBundleMetadata {
74 pub size: usize,
75 pub num_posts: u8,
76 pub num_posts_granted: u8,
77 pub overflowed: bool,
78 pub sealed: bool,
79}
80
81impl PostBundleMetadata {
82 pub fn zero() -> PostBundleMetadata {
83 Self {
84 size: 0,
85 num_posts: 0,
86 num_posts_granted: 0,
87 overflowed: false,
88 sealed: false,
89 }
90 }
91}
92
93pub struct Environment {
94 environment_dimensions: EnvironmentDimensions,
95 environment_store: Arc<dyn EnvironmentStore>,
96
97 post_bundle_locks: [RwLock<()>; 256], post_bundles_last_touched_batch: RwLock<HashMap<Id, TimeMillis>>,
99 post_bundle_current_size_bytes: AtomicUsize,
100 decimation_lock: RwLock<()>, }
102
103impl Environment {
104 pub async fn new(environment_store: Arc<dyn EnvironmentStore>, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
105 let post_bundle_locks: [RwLock<()>; 256] = std::array::from_fn(|_| RwLock::new(()));
106 let post_bundles_last_touched = RwLock::new(HashMap::new());
107 let post_bundle_current_size_bytes = environment_store.config_get_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES)?.unwrap_or(0);
108 let post_bundle_current_size_bytes = AtomicUsize::new(post_bundle_current_size_bytes);
109
110 Ok(Environment {
111 environment_dimensions,
112 environment_store,
113 post_bundle_locks,
114 post_bundles_last_touched_batch: post_bundles_last_touched,
115 post_bundle_current_size_bytes,
116 decimation_lock: RwLock::new(()),
117 })
118 }
119
120 pub fn get_read_lock_for_location_id(&self, location_id: &Id) -> RwLockReadGuard<'_, ()> {
121 self.post_bundle_locks[location_id.0[0] as usize].read()
122 }
123
124 pub fn get_write_lock_for_location_id(&self, location_id: &Id) -> RwLockWriteGuard<'_, ()> {
125 self.post_bundle_locks[location_id.0[0] as usize].write()
126 }
127
128 pub fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
129 self.environment_store.config_get_bytes(key)
130 }
131
132 pub fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
133 self.environment_store.config_put_bytes(key, v)
134 }
135
136 pub fn config_put_struct<T: Serialize>(&self, key: &str, value: &T) -> anyhow::Result<()> {
137 let bytes = json::struct_to_bytes(value)?;
138 let bytes_compressed = compression::compress_for_speed(&bytes)?.to_bytes();
139 self.config_put_bytes(key, bytes_compressed.to_vec())
140 }
141
142 pub fn config_get_struct<T: DeserializeOwned>(&self, key: &str) -> anyhow::Result<Option<T>> {
143 let result = self.config_get_bytes(key)?;
144 match result {
145 Some(bytes_compressed) => {
146 let bytes = compression::decompress(&bytes_compressed)?.to_bytes();
147 let value = json::bytes_to_struct::<T>(&bytes)?;
148 Ok(Some(value))
149 }
150 None => Ok(None),
151 }
152 }
153
154
155 pub fn get_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
156 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
157 self.environment_store.post_bundle_metadata_get(location_id)
158 }
159
160 pub fn get_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
161 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
162 self.environment_store.post_bundle_bytes_get(location_id)
163 }
164
165 pub fn get_post_bundle_encoded_post_feedbacks_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Bytes> {
166 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
167 self.environment_store.post_bundle_feedbacks_bytes_get(location_id)
168 }
169
170 pub fn put_post_feedback_if_more_powerful(&self, time_millis: TimeMillis, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
171 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
172 self.environment_store.post_feedback_put_if_more_powerful(location_id, encoded_post_feedback)
173 }
174
175 pub fn put_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
176 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
177 self.environment_store.post_bundle_bytes_put(location_id, bytes)?;
178 Ok(())
179 }
180
181 pub fn put_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id, post_bundle_metadata: &PostBundleMetadata, additional_bytes: usize) -> anyhow::Result<()> {
182 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
183 self.environment_store.post_bundle_metadata_put(location_id, post_bundle_metadata)?;
184 self.post_bundle_current_size_bytes.fetch_add(additional_bytes, Ordering::Relaxed);
185 Ok(())
186 }
187
188 fn post_bundle_current_size_bytes(&self) -> usize {
189 self.post_bundle_current_size_bytes.load(Ordering::Relaxed)
190 }
191
192 pub fn post_bundle_total_bytes(&self) -> usize {
195 self.post_bundle_current_size_bytes()
196 }
197
198 pub fn post_bundle_count(&self) -> anyhow::Result<usize> {
200 self.environment_store.post_bundle_count()
201 }
202
203 pub fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
206 self.environment_store.post_bundle_feedback_count()
207 }
208
209 pub async fn do_maintenance(self: &Arc<Self>, cancellation_token: &CancellationToken, time_millis: TimeMillis) -> anyhow::Result<()> {
210 let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
214 self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes)?;
215
216 {
218 let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
219 if !post_bundles_last_touched.is_empty() {
220 log::trace!("Flushing {} last accessed post bundles", post_bundles_last_touched.len());
221 self.environment_store.post_bundles_last_accessed_flush(&post_bundles_last_touched)?;
222 post_bundles_last_touched.clear();
223 }
224 }
225
226 let quota_used = post_bundle_current_size_bytes as f64 / self.environment_dimensions.max_size_bytes as f64;
228 if quota_used > self.environment_dimensions.max_quota_used {
229 let env = self.clone();
230 let cancellation_token = cancellation_token.clone();
231 let max_quota_used = self.environment_dimensions.max_quota_used;
232 tokio::task::spawn_blocking(move || env.do_decimation(cancellation_token, time_millis, quota_used, max_quota_used)).await??;
233 }
234
235 Ok(())
236 }
237
238 fn do_decimation(self: &Arc<Self>, cancellation_token: CancellationToken, _time_millis: TimeMillis, quota_used: f64, max_quota_used: f64) -> anyhow::Result<()> {
239 let lock = self.decimation_lock.try_read();
240 if lock.is_none() {
241 log::trace!("A decimation is already in progress.");
242 return Ok(());
243 }
244
245 scopeguard::defer! {
247 let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
248 let _ = self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes);
249 }
250
251 let total_rows = self.environment_store.post_bundle_count()?;
253 let decimation_count = (total_rows as f64) * (1f64 - 0.98f64 * max_quota_used / quota_used);
254
255 if 0.0 >= decimation_count {
256 warn!("Decimation count is unexpectedly zero, skipping decimation");
257 return Ok(());
258 }
259
260 let rows_per_batch = 100f64.min(decimation_count);
262 let num_batches = decimation_count / rows_per_batch;
263 let num_batches = num_batches.ceil() as usize;
264 let rows_per_batch = decimation_count as usize / num_batches;
265
266 info!("Decimation of {}/{} items will be done in {} batches of {} deletes each", decimation_count, total_rows, num_batches, rows_per_batch);
267 let post_bundle_current_size_bytes_before = self.post_bundle_current_size_bytes();
268 info!(
269 "Total size before decimation is {} ({}%)",
270 post_bundle_current_size_bytes_before,
271 100 * post_bundle_current_size_bytes_before / self.environment_dimensions.max_size_bytes
272 );
273
274 for batch in 0..num_batches {
275 if cancellation_token.is_cancelled() {
276 break;
277 }
278
279 let mut heap_of_locations_to_decimate = BinaryHeap::new();
281 {
282 let random_prefix = Id::random();
283 log::trace!("Decimation batch {} prefix: {}", batch, random_prefix);
284 self.environment_store.post_bundles_last_accessed_iter(&random_prefix).take(5 * rows_per_batch).for_each(|pair| {
285 match pair {
286 Ok((location_id, time_millis_bytes)) => {
287 if heap_of_locations_to_decimate.len() >= rows_per_batch {
288 let &(most_recent_time_millis_bytes, _) = heap_of_locations_to_decimate.peek().expect("should always work as we have checked the length");
289 if most_recent_time_millis_bytes > time_millis_bytes {
290 heap_of_locations_to_decimate.pop();
291 heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
292 }
293 }
294 else {
295 heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
296 }
297 }
298 Err(e) => {
299 warn!("Error while decimating: {}", e);
300 }
301 }
302 });
303 }
304
305 {
307 let mut location_ids: Vec<Id> = Vec::new();
308 {
309 let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
310
311 for (_time_millis_bytes, location_id) in heap_of_locations_to_decimate {
312 location_ids.push(location_id);
313 post_bundles_last_touched.remove(&location_id);
314
315 let metadata = self.environment_store.post_bundle_metadata_get(&location_id);
316 if let Ok(metadata) = metadata {
317 if let Some(metadata) = metadata {
318 self.post_bundle_current_size_bytes.fetch_sub(metadata.size, Ordering::Relaxed);
319 }
320 }
321 }
322 }
323
324 self.environment_store.post_bundles_delete(&location_ids)?;
325 }
326 }
327
328 let post_bundle_current_size_bytes_after = self.post_bundle_current_size_bytes();
329 info!(
330 "Total size after decimation is {} ({}%): delta={}",
331 post_bundle_current_size_bytes_after,
332 100 * post_bundle_current_size_bytes_after / self.environment_dimensions.max_size_bytes,
333 post_bundle_current_size_bytes_before - post_bundle_current_size_bytes_after
334 );
335
336 Ok(())
337 }
338}
339
340#[cfg(test)]
343pub mod tests {
344 use crate::environment::environment::{EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
345 use hashiverse_lib::anyhow_assert_ge;
346 use hashiverse_lib::tools::tools;
347 use hashiverse_lib::tools::tools::get_temp_dir;
348 use hashiverse_lib::tools::types::Id;
349 use std::sync::Arc;
350 use tokio_util::sync::CancellationToken;
351 use hashiverse_lib::tools::time_provider::time_provider::{ScaledTimeProvider, TimeProvider};
352
353 pub async fn basics_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
354 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
355 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
358 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(1024 * 1024);
359 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
360 let environment = environment_factory.open_next_available(environment_dimensions).await?;
361
362 let delete_post_bundle = |location_id: &Id| -> anyhow::Result<()> {
363 let vec = vec![*location_id].to_vec();
364 environment.environment_store.post_bundles_delete(&vec)
365 };
366
367 {
369 let num_bytes = 16 * 1024;
370 let time_millis = time_provider.current_time_millis();
371 let location_id = Id::random();
372 let post_bundle_metadata = PostBundleMetadata::zero();
373 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, num_bytes)?;
374
375 let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
376 assert_eq!(post_bundle_metadata_check, Some(post_bundle_metadata));
377
378 delete_post_bundle(&location_id)?;
379 let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
380 assert_eq!(post_bundle_metadata_check, None);
381 }
382
383 {
385 let num_bytes = 16 * 1024;
386 let time_millis = time_provider.current_time_millis();
387 let location_id = Id::random();
388 let bytes = tools::random_bytes(num_bytes);
389 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
390 assert_eq!(num_bytes, environment.post_bundle_current_size_bytes());
391
392 let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
393 assert_eq!(bytes_check, Some(Bytes::from(bytes)));
394
395 delete_post_bundle(&location_id)?;
396 let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
397 assert_eq!(bytes_check, None);
398 }
399
400 Ok(())
401 }
402
403 fn get_random_post_bundle(max_size: usize) -> (Id, PostBundleMetadata, Vec<u8>) {
404 let num_bytes = tools::random_usize_bounded(max_size);
405 let location_id = Id::random();
406 let bytes = tools::random_bytes(num_bytes);
407 let mut post_bundle_metadata = PostBundleMetadata::zero();
408 post_bundle_metadata.size = num_bytes;
409
410 (location_id, post_bundle_metadata, bytes)
411 }
412
413 pub async fn decimation_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
414 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
415 let cancellation_token = CancellationToken::new();
417
418 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
419 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
420 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
421 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
422
423 let num_iterations = 10000;
425 for i in 0..num_iterations {
426 let time_millis = time_provider.current_time_millis();
427 let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(8 * 1024);
428
429 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
430 if 0 == i % 10 {
434 environment.do_maintenance(&cancellation_token, time_millis).await?;
435 }
436 }
437
438 {
439 let time_millis = time_provider.current_time_millis();
440 environment.do_maintenance(&cancellation_token, time_millis).await?;
441
442 anyhow_assert_ge!(environment_dimensions.max_size_bytes, environment.post_bundle_current_size_bytes());
443 }
444
445 Ok(())
446 }
447
448 pub async fn decimation_convergence_test<TEnvironmentFactory: EnvironmentFactory>(num_posts: usize) -> anyhow::Result<()> {
449 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
450 let cancellation_token = CancellationToken::new();
452
453 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
454 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
455 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
456 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
457
458 for i in 0..num_posts {
460 let time_millis = time_provider.current_time_millis();
461 let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
462
463 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
464 if 0 == i % 100 {
468 environment.do_maintenance(&cancellation_token, time_millis).await?;
469 }
470 }
471
472 {
473 let time_millis = time_provider.current_time_millis();
474 environment.do_maintenance(&cancellation_token, time_millis).await?;
475 assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
476 }
477
478 Ok(())
479 }
480
481 pub async fn decimation_existence_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
482 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
483 let cancellation_token = CancellationToken::new();
485
486 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
487 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
488 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
489 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
490
491 let time_millis = time_provider.current_time_millis();
492
493 let location_id_should_exist_metadata = {
494 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
495 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
496 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
497 location_id
498 };
499
500 let location_id_should_exist_bytes = {
501 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
502 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
503 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
504 location_id
505 };
506
507 let location_id_should_not_exist = {
508 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
509 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
510 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
511 location_id
512 };
513
514 let num_iterations = 128 * 1000;
516 for i in 0..num_iterations {
517 let time_millis = time_provider.current_time_millis();
518 let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
519
520 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
521 if 0 == i % 30 {
526 let _ = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
527 let _ = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes);
528 }
529
530 if 0 == i % 100 {
531 environment.do_maintenance(&cancellation_token, time_millis).await?;
532 }
533 }
534
535 {
536 let time_millis = time_provider.current_time_millis();
537 environment.do_maintenance(&cancellation_token, time_millis).await?;
538 assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
539 }
540
541 {
543 {
545 let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
546 assert!(result.is_some());
547 }
548 {
549 let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_metadata)?;
550 assert!(result.is_some());
551 }
552 }
553 {
554 {
556 let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_bytes)?;
557 assert!(result.is_some());
558 }
559 {
560 let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes)?;
561 assert!(result.is_some());
562 }
563 }
564
565 {
567 {
568 let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_not_exist)?;
569 assert!(result.is_none());
570 }
571 {
572 let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_not_exist)?;
573 assert!(result.is_none());
574 }
575 }
576
577 Ok(())
578 }
579
580 pub async fn decimation_feedback_deleted_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
581 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
582 let cancellation_token = CancellationToken::new();
583
584 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
585 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
586 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
587 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
588 let store = &environment.environment_store;
589
590 let time_millis = time_provider.current_time_millis();
591
592 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
594 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
595 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
596
597 let feedback = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
598 store.post_feedback_put_if_more_powerful(&location_id, &feedback)?;
599 assert!(!store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
600
601 for i in 0..128 * 1000usize {
603 let time_millis = time_provider.current_time_millis();
604 let (flood_id, flood_meta, _) = get_random_post_bundle(1024);
605 environment.put_post_bundle_metadata(time_millis, &flood_id, &flood_meta, flood_meta.size)?;
606 if i % 100 == 0 {
607 environment.do_maintenance(&cancellation_token, time_millis).await?;
608 }
609 }
610 environment.do_maintenance(&cancellation_token, time_provider.current_time_millis()).await?;
611
612 assert!(environment.get_post_bundle_metadata(time_millis, &location_id)?.is_none(), "bundle should have been decimated");
614
615 assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty(), "feedback should have been deleted with the bundle");
617
618 Ok(())
619 }
620
621 use bytes::{Buf, Bytes};
624 use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
625 use hashiverse_lib::tools::types::{Pow, Salt};
626
627 fn decode_all_feedbacks(mut bytes: Bytes) -> anyhow::Result<Vec<EncodedPostFeedbackV1>> {
628 let mut result = Vec::new();
629 while bytes.has_remaining() {
630 result.push(EncodedPostFeedbackV1::decode_from_bytes(&mut bytes)?);
631 }
632 Ok(result)
633 }
634
635 pub async fn feedback_bytes_get_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
636 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
637 let factory = TEnvironmentFactory::new(&temp_dir_path);
638 let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
639 let store = &env.environment_store;
640
641 let location_id = Id::random();
642 let other_location_id = Id::random();
643
644 assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
646
647 let f1 = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
648 let f2 = EncodedPostFeedbackV1::new(Id::random(), 2, Salt::random(), Pow(20));
649 let f_other = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(5));
650
651 store.post_feedback_put_if_more_powerful(&location_id, &f1)?;
652 store.post_feedback_put_if_more_powerful(&location_id, &f2)?;
653 store.post_feedback_put_if_more_powerful(&other_location_id, &f_other)?;
654
655 let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
657 let mut decoded = decode_all_feedbacks(bytes)?;
658 decoded.sort_by_key(|f| f.feedback_type);
659 assert_eq!(decoded.len(), 2);
660 assert_eq!(decoded[0], f1);
661 assert_eq!(decoded[1], f2);
662
663 let other_bytes = store.post_bundle_feedbacks_bytes_get(&other_location_id)?;
665 let other_decoded = decode_all_feedbacks(other_bytes)?;
666 assert_eq!(other_decoded.len(), 1);
667 assert_eq!(other_decoded[0], f_other);
668
669 Ok(())
670 }
671
672 pub async fn feedback_put_if_more_powerful_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
673 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
674 let factory = TEnvironmentFactory::new(&temp_dir_path);
675 let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
676 let store = &env.environment_store;
677
678 let location_id = Id::random();
679 let post_id = Id::random();
680 let feedback_type = 3u8;
681
682 let get_single = |pow_expected: Pow| -> anyhow::Result<()> {
683 let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
684 let decoded = decode_all_feedbacks(bytes)?;
685 assert_eq!(decoded.len(), 1);
686 assert_eq!(decoded[0].post_id, post_id);
687 assert_eq!(decoded[0].feedback_type, feedback_type);
688 assert_eq!(decoded[0].pow, pow_expected);
689 Ok(())
690 };
691
692 let f_low = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(10));
694 store.post_feedback_put_if_more_powerful(&location_id, &f_low)?;
695 get_single(Pow(10))?;
696
697 let f_high = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
699 store.post_feedback_put_if_more_powerful(&location_id, &f_high)?;
700 get_single(Pow(20))?;
701
702 let salt_before = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
704 let f_equal = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
705 store.post_feedback_put_if_more_powerful(&location_id, &f_equal)?;
706 let salt_after = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
707 assert_eq!(salt_before, salt_after);
708
709 let f_weaker = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(5));
711 store.post_feedback_put_if_more_powerful(&location_id, &f_weaker)?;
712 get_single(Pow(20))?;
713
714 let f_other_type = EncodedPostFeedbackV1::new(post_id, feedback_type + 1, Salt::random(), Pow(1));
716 store.post_feedback_put_if_more_powerful(&location_id, &f_other_type)?;
717 let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
718 assert_eq!(all.len(), 2);
719
720 let f_other_post = EncodedPostFeedbackV1::new(Id::random(), feedback_type, Salt::random(), Pow(1));
722 store.post_feedback_put_if_more_powerful(&location_id, &f_other_post)?;
723 let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
724 assert_eq!(all.len(), 3);
725
726 let f_other_loc = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(99));
728 store.post_feedback_put_if_more_powerful(&Id::random(), &f_other_loc)?;
729 let all_original = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
730 assert_eq!(all_original.len(), 3);
731
732 Ok(())
733 }
734}