1use crate::environment::environment_store::EnvironmentStore;
21use async_trait::async_trait;
22use bytes::Bytes;
23use hashiverse_lib::tools::time::TimeMillis;
24use hashiverse_lib::tools::types::Id;
25use log::{info, warn};
26use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use serde::{Deserialize, Serialize};
28use std::collections::{BinaryHeap, HashMap};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use serde::de::DeserializeOwned;
32use tokio_util::sync::CancellationToken;
33use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
34use hashiverse_lib::tools::{compression, json};
35
36pub const CONFIG_SERVER_ID: &str = "server_id";
39pub const CONFIG_KADEMLIA_PEER_BUCKETS: &str = "kademlia_peer_buckets";
40pub const CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES: &str = "post_bundle_current_size_bytes";
41
42#[derive(Clone, Copy)]
43pub struct EnvironmentDimensions {
44 pub max_size_bytes: usize,
45 pub max_quota_used: f64,
46}
47
48impl EnvironmentDimensions {
49 pub fn with_max_size_bytes(&self, max_size_bytes: usize) -> Self {
50 Self { max_size_bytes, ..*self }
51 }
52}
53
54impl Default for EnvironmentDimensions {
55 fn default() -> Self {
56 Self {
57 max_size_bytes: 20 * 1024 * 1024 * 1024,
58 max_quota_used: 0.9,
59 }
60 }
61}
62
63#[async_trait]
64pub trait EnvironmentFactory: Send + Sync {
65 fn new(base_path: &str) -> Self
66 where
67 Self: Sized;
68 async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment>;
69}
70
71#[derive(Serialize, Deserialize, Debug, PartialEq)]
72pub struct PostBundleMetadata {
73 pub size: usize,
74 pub num_posts: u8,
75 pub num_posts_granted: u8,
76 pub overflowed: bool,
77 pub sealed: bool,
78}
79
80impl PostBundleMetadata {
81 pub fn zero() -> PostBundleMetadata {
82 Self {
83 size: 0,
84 num_posts: 0,
85 num_posts_granted: 0,
86 overflowed: false,
87 sealed: false,
88 }
89 }
90}
91
92pub struct Environment {
93 environment_dimensions: EnvironmentDimensions,
94 environment_store: Arc<dyn EnvironmentStore>,
95
96 post_bundle_locks: [RwLock<()>; 256], post_bundles_last_touched_batch: RwLock<HashMap<Id, TimeMillis>>,
98 post_bundle_current_size_bytes: AtomicUsize,
99 decimation_lock: RwLock<()>, }
101
102impl Environment {
103 pub async fn new(environment_store: Arc<dyn EnvironmentStore>, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
104 let post_bundle_locks: [RwLock<()>; 256] = std::array::from_fn(|_| RwLock::new(()));
105 let post_bundles_last_touched = RwLock::new(HashMap::new());
106 let post_bundle_current_size_bytes = environment_store.config_get_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES)?.unwrap_or(0);
107 let post_bundle_current_size_bytes = AtomicUsize::new(post_bundle_current_size_bytes);
108
109 Ok(Environment {
110 environment_dimensions,
111 environment_store,
112 post_bundle_locks,
113 post_bundles_last_touched_batch: post_bundles_last_touched,
114 post_bundle_current_size_bytes,
115 decimation_lock: RwLock::new(()),
116 })
117 }
118
119 pub fn get_read_lock_for_location_id(&self, location_id: &Id) -> RwLockReadGuard<'_, ()> {
120 self.post_bundle_locks[location_id.0[0] as usize].read()
121 }
122
123 pub fn get_write_lock_for_location_id(&self, location_id: &Id) -> RwLockWriteGuard<'_, ()> {
124 self.post_bundle_locks[location_id.0[0] as usize].write()
125 }
126
127 pub fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
128 self.environment_store.config_get_bytes(key)
129 }
130
131 pub fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
132 self.environment_store.config_put_bytes(key, v)
133 }
134
135 pub fn config_put_struct<T: Serialize>(&self, key: &str, value: &T) -> anyhow::Result<()> {
136 let bytes = json::struct_to_bytes(value)?;
137 let bytes_compressed = compression::compress_for_speed(&bytes)?.to_bytes();
138 self.config_put_bytes(key, bytes_compressed.to_vec())
139 }
140
141 pub fn config_get_struct<T: DeserializeOwned>(&self, key: &str) -> anyhow::Result<Option<T>> {
142 let result = self.config_get_bytes(key)?;
143 match result {
144 Some(bytes_compressed) => {
145 let bytes = compression::decompress(&bytes_compressed)?.to_bytes();
146 let value = json::bytes_to_struct::<T>(&bytes)?;
147 Ok(Some(value))
148 }
149 None => Ok(None),
150 }
151 }
152
153
154 pub fn get_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
155 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
156 self.environment_store.post_bundle_metadata_get(location_id)
157 }
158
159 pub fn get_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
160 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
161 self.environment_store.post_bundle_bytes_get(location_id)
162 }
163
164 pub fn get_post_bundle_encoded_post_feedbacks_bytes(&self, time_millis: TimeMillis, location_id: &Id) -> anyhow::Result<Bytes> {
165 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
166 self.environment_store.post_bundle_feedbacks_bytes_get(location_id)
167 }
168
169 pub fn put_post_feedback_if_more_powerful(&self, time_millis: TimeMillis, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
170 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
171 self.environment_store.post_feedback_put_if_more_powerful(location_id, encoded_post_feedback)
172 }
173
174 pub fn put_post_bundle_bytes(&self, time_millis: TimeMillis, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
175 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
176 self.environment_store.post_bundle_bytes_put(location_id, bytes)?;
177 Ok(())
178 }
179
180 pub fn put_post_bundle_metadata(&self, time_millis: TimeMillis, location_id: &Id, post_bundle_metadata: &PostBundleMetadata, additional_bytes: usize) -> anyhow::Result<()> {
181 self.post_bundles_last_touched_batch.write().insert(*location_id, time_millis);
182 self.environment_store.post_bundle_metadata_put(location_id, post_bundle_metadata)?;
183 self.post_bundle_current_size_bytes.fetch_add(additional_bytes, Ordering::Relaxed);
184 Ok(())
185 }
186
187 fn post_bundle_current_size_bytes(&self) -> usize {
188 self.post_bundle_current_size_bytes.load(Ordering::Relaxed)
189 }
190
191 pub fn post_bundle_total_bytes(&self) -> usize {
194 self.post_bundle_current_size_bytes()
195 }
196
197 pub fn post_bundle_count(&self) -> anyhow::Result<usize> {
199 self.environment_store.post_bundle_count()
200 }
201
202 pub fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
205 self.environment_store.post_bundle_feedback_count()
206 }
207
208 pub async fn do_maintenance(self: &Arc<Self>, cancellation_token: &CancellationToken, time_millis: TimeMillis) -> anyhow::Result<()> {
209 let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
213 self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes)?;
214
215 {
217 let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
218 if !post_bundles_last_touched.is_empty() {
219 log::trace!("Flushing {} last accessed post bundles", post_bundles_last_touched.len());
220 self.environment_store.post_bundles_last_accessed_flush(&post_bundles_last_touched)?;
221 post_bundles_last_touched.clear();
222 }
223 }
224
225 let quota_used = post_bundle_current_size_bytes as f64 / self.environment_dimensions.max_size_bytes as f64;
227 if quota_used > self.environment_dimensions.max_quota_used {
228 let env = self.clone();
229 let cancellation_token = cancellation_token.clone();
230 let max_quota_used = self.environment_dimensions.max_quota_used;
231 tokio::task::spawn_blocking(move || env.do_decimation(cancellation_token, time_millis, quota_used, max_quota_used)).await??;
232 }
233
234 Ok(())
235 }
236
237 fn do_decimation(self: &Arc<Self>, cancellation_token: CancellationToken, _time_millis: TimeMillis, quota_used: f64, max_quota_used: f64) -> anyhow::Result<()> {
238 let lock = self.decimation_lock.try_read();
239 if lock.is_none() {
240 log::trace!("A decimation is already in progress.");
241 return Ok(());
242 }
243
244 scopeguard::defer! {
246 let post_bundle_current_size_bytes = self.post_bundle_current_size_bytes();
247 let _ = self.environment_store.config_put_usize(CONFIG_POST_BUNDLE_CURRENT_SIZE_BYTES, post_bundle_current_size_bytes);
248 }
249
250 let total_rows = self.environment_store.post_bundle_count()?;
252 let decimation_count = (total_rows as f64) * (1f64 - 0.98f64 * max_quota_used / quota_used);
253
254 if 0.0 >= decimation_count {
255 warn!("Decimation count is unexpectedly zero, skipping decimation");
256 return Ok(());
257 }
258
259 let rows_per_batch = 100f64.min(decimation_count);
261 let num_batches = decimation_count / rows_per_batch;
262 let num_batches = num_batches.ceil() as usize;
263 let rows_per_batch = decimation_count as usize / num_batches;
264
265 info!("Decimation of {}/{} items will be done in {} batches of {} deletes each", decimation_count, total_rows, num_batches, rows_per_batch);
266 let post_bundle_current_size_bytes_before = self.post_bundle_current_size_bytes();
267 info!(
268 "Total size before decimation is {} ({}%)",
269 post_bundle_current_size_bytes_before,
270 100 * post_bundle_current_size_bytes_before / self.environment_dimensions.max_size_bytes
271 );
272
273 for batch in 0..num_batches {
274 if cancellation_token.is_cancelled() {
275 break;
276 }
277
278 let mut heap_of_locations_to_decimate = BinaryHeap::new();
280 {
281 let random_prefix = Id::random();
282 log::trace!("Decimation batch {} prefix: {}", batch, random_prefix);
283 self.environment_store.post_bundles_last_accessed_iter(&random_prefix).take(5 * rows_per_batch).for_each(|pair| {
284 match pair {
285 Ok((location_id, time_millis_bytes)) => {
286 if heap_of_locations_to_decimate.len() >= rows_per_batch {
287 let &(most_recent_time_millis_bytes, _) = heap_of_locations_to_decimate.peek().expect("should always work as we have checked the length");
288 if most_recent_time_millis_bytes > time_millis_bytes {
289 heap_of_locations_to_decimate.pop();
290 heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
291 }
292 }
293 else {
294 heap_of_locations_to_decimate.push((time_millis_bytes, location_id));
295 }
296 }
297 Err(e) => {
298 warn!("Error while decimating: {}", e);
299 }
300 }
301 });
302 }
303
304 {
306 let mut location_ids: Vec<Id> = Vec::new();
307 {
308 let mut post_bundles_last_touched = self.post_bundles_last_touched_batch.write();
309
310 for (_time_millis_bytes, location_id) in heap_of_locations_to_decimate {
311 location_ids.push(location_id);
312 post_bundles_last_touched.remove(&location_id);
313
314 let metadata = self.environment_store.post_bundle_metadata_get(&location_id);
315 if let Ok(metadata) = metadata {
316 if let Some(metadata) = metadata {
317 self.post_bundle_current_size_bytes.fetch_sub(metadata.size, Ordering::Relaxed);
318 }
319 }
320 }
321 }
322
323 self.environment_store.post_bundles_delete(&location_ids)?;
324 }
325 }
326
327 let post_bundle_current_size_bytes_after = self.post_bundle_current_size_bytes();
328 info!(
329 "Total size after decimation is {} ({}%): delta={}",
330 post_bundle_current_size_bytes_after,
331 100 * post_bundle_current_size_bytes_after / self.environment_dimensions.max_size_bytes,
332 post_bundle_current_size_bytes_before - post_bundle_current_size_bytes_after
333 );
334
335 Ok(())
336 }
337}
338
339#[cfg(test)]
342pub mod tests {
343 use crate::environment::environment::{EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
344 use hashiverse_lib::anyhow_assert_ge;
345 use hashiverse_lib::tools::tools;
346 use hashiverse_lib::tools::tools::get_temp_dir;
347 use hashiverse_lib::tools::types::Id;
348 use std::sync::Arc;
349 use tokio_util::sync::CancellationToken;
350 use hashiverse_lib::tools::time_provider::time_provider::{ScaledTimeProvider, TimeProvider};
351
352 pub async fn basics_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
353 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
354 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
357 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(1024 * 1024);
358 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
359 let environment = environment_factory.open_next_available(environment_dimensions).await?;
360
361 let delete_post_bundle = |location_id: &Id| -> anyhow::Result<()> {
362 let vec = [*location_id].to_vec();
363 environment.environment_store.post_bundles_delete(&vec)
364 };
365
366 {
368 let num_bytes = 16 * 1024;
369 let time_millis = time_provider.current_time_millis();
370 let location_id = Id::random();
371 let post_bundle_metadata = PostBundleMetadata::zero();
372 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, num_bytes)?;
373
374 let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
375 assert_eq!(post_bundle_metadata_check, Some(post_bundle_metadata));
376
377 delete_post_bundle(&location_id)?;
378 let post_bundle_metadata_check = environment.get_post_bundle_metadata(time_millis, &location_id)?;
379 assert_eq!(post_bundle_metadata_check, None);
380 }
381
382 {
384 let num_bytes = 16 * 1024;
385 let time_millis = time_provider.current_time_millis();
386 let location_id = Id::random();
387 let bytes = tools::random_bytes(num_bytes);
388 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
389 assert_eq!(num_bytes, environment.post_bundle_current_size_bytes());
390
391 let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
392 assert_eq!(bytes_check, Some(Bytes::from(bytes)));
393
394 delete_post_bundle(&location_id)?;
395 let bytes_check = environment.get_post_bundle_bytes(time_millis, &location_id)?;
396 assert_eq!(bytes_check, None);
397 }
398
399 Ok(())
400 }
401
402 fn get_random_post_bundle(max_size: usize) -> (Id, PostBundleMetadata, Vec<u8>) {
403 let num_bytes = tools::random_usize_bounded(max_size);
404 let location_id = Id::random();
405 let bytes = tools::random_bytes(num_bytes);
406 let mut post_bundle_metadata = PostBundleMetadata::zero();
407 post_bundle_metadata.size = num_bytes;
408
409 (location_id, post_bundle_metadata, bytes)
410 }
411
412 pub async fn decimation_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
413 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
414 let cancellation_token = CancellationToken::new();
416
417 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
418 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
419 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
420 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
421
422 let num_iterations = 10000;
424 for i in 0..num_iterations {
425 let time_millis = time_provider.current_time_millis();
426 let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(8 * 1024);
427
428 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
429 if 0 == i % 10 {
433 environment.do_maintenance(&cancellation_token, time_millis).await?;
434 }
435 }
436
437 {
438 let time_millis = time_provider.current_time_millis();
439 environment.do_maintenance(&cancellation_token, time_millis).await?;
440
441 anyhow_assert_ge!(environment_dimensions.max_size_bytes, environment.post_bundle_current_size_bytes());
442 }
443
444 Ok(())
445 }
446
447 pub async fn decimation_convergence_test<TEnvironmentFactory: EnvironmentFactory>(num_posts: usize) -> anyhow::Result<()> {
448 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
449 let cancellation_token = CancellationToken::new();
451
452 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
453 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
454 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
455 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
456
457 for i in 0..num_posts {
459 let time_millis = time_provider.current_time_millis();
460 let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
461
462 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
463 if 0 == i % 100 {
467 environment.do_maintenance(&cancellation_token, time_millis).await?;
468 }
469 }
470
471 {
472 let time_millis = time_provider.current_time_millis();
473 environment.do_maintenance(&cancellation_token, time_millis).await?;
474 assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
475 }
476
477 Ok(())
478 }
479
480 pub async fn decimation_existence_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
481 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
482 let cancellation_token = CancellationToken::new();
484
485 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
486 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
487 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
488 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
489
490 let time_millis = time_provider.current_time_millis();
491
492 let location_id_should_exist_metadata = {
493 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
494 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
495 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
496 location_id
497 };
498
499 let location_id_should_exist_bytes = {
500 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
501 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
502 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
503 location_id
504 };
505
506 let location_id_should_not_exist = {
507 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
508 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
509 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
510 location_id
511 };
512
513 let num_iterations = 128 * 1000;
515 for i in 0..num_iterations {
516 let time_millis = time_provider.current_time_millis();
517 let (location_id, post_bundle_metadata, _bytes) = get_random_post_bundle(1024);
518
519 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
520 if 0 == i % 30 {
525 let _ = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
526 let _ = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes);
527 }
528
529 if 0 == i % 100 {
530 environment.do_maintenance(&cancellation_token, time_millis).await?;
531 }
532 }
533
534 {
535 let time_millis = time_provider.current_time_millis();
536 environment.do_maintenance(&cancellation_token, time_millis).await?;
537 assert!(environment.post_bundle_current_size_bytes() < environment_dimensions.max_size_bytes * 11 / 10);
538 }
539
540 {
542 {
544 let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_metadata)?;
545 assert!(result.is_some());
546 }
547 {
548 let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_metadata)?;
549 assert!(result.is_some());
550 }
551 }
552 {
553 {
555 let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_exist_bytes)?;
556 assert!(result.is_some());
557 }
558 {
559 let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_exist_bytes)?;
560 assert!(result.is_some());
561 }
562 }
563
564 {
566 {
567 let result = environment.get_post_bundle_metadata(time_millis, &location_id_should_not_exist)?;
568 assert!(result.is_none());
569 }
570 {
571 let result = environment.get_post_bundle_bytes(time_millis, &location_id_should_not_exist)?;
572 assert!(result.is_none());
573 }
574 }
575
576 Ok(())
577 }
578
579 pub async fn decimation_feedback_deleted_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
580 let time_provider = Arc::new(ScaledTimeProvider::new(60.0));
581 let cancellation_token = CancellationToken::new();
582
583 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
584 let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(4 * 1024 * 1024);
585 let environment_factory = TEnvironmentFactory::new(&temp_dir_path);
586 let environment = Arc::new(environment_factory.open_next_available(environment_dimensions).await?);
587 let store = &environment.environment_store;
588
589 let time_millis = time_provider.current_time_millis();
590
591 let (location_id, post_bundle_metadata, bytes) = get_random_post_bundle(1024);
593 environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, post_bundle_metadata.size)?;
594 environment.put_post_bundle_bytes(time_millis, &location_id, &bytes)?;
595
596 let feedback = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
597 store.post_feedback_put_if_more_powerful(&location_id, &feedback)?;
598 assert!(!store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
599
600 for i in 0..128 * 1000usize {
602 let time_millis = time_provider.current_time_millis();
603 let (flood_id, flood_meta, _) = get_random_post_bundle(1024);
604 environment.put_post_bundle_metadata(time_millis, &flood_id, &flood_meta, flood_meta.size)?;
605 if i % 100 == 0 {
606 environment.do_maintenance(&cancellation_token, time_millis).await?;
607 }
608 }
609 environment.do_maintenance(&cancellation_token, time_provider.current_time_millis()).await?;
610
611 assert!(environment.get_post_bundle_metadata(time_millis, &location_id)?.is_none(), "bundle should have been decimated");
613
614 assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty(), "feedback should have been deleted with the bundle");
616
617 Ok(())
618 }
619
620 use bytes::{Buf, Bytes};
623 use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
624 use hashiverse_lib::tools::types::{Pow, Salt};
625
626 fn decode_all_feedbacks(mut bytes: Bytes) -> anyhow::Result<Vec<EncodedPostFeedbackV1>> {
627 let mut result = Vec::new();
628 while bytes.has_remaining() {
629 result.push(EncodedPostFeedbackV1::decode_from_bytes(&mut bytes)?);
630 }
631 Ok(result)
632 }
633
634 pub async fn feedback_bytes_get_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
635 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
636 let factory = TEnvironmentFactory::new(&temp_dir_path);
637 let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
638 let store = &env.environment_store;
639
640 let location_id = Id::random();
641 let other_location_id = Id::random();
642
643 assert!(store.post_bundle_feedbacks_bytes_get(&location_id)?.is_empty());
645
646 let f1 = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(10));
647 let f2 = EncodedPostFeedbackV1::new(Id::random(), 2, Salt::random(), Pow(20));
648 let f_other = EncodedPostFeedbackV1::new(Id::random(), 1, Salt::random(), Pow(5));
649
650 store.post_feedback_put_if_more_powerful(&location_id, &f1)?;
651 store.post_feedback_put_if_more_powerful(&location_id, &f2)?;
652 store.post_feedback_put_if_more_powerful(&other_location_id, &f_other)?;
653
654 let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
656 let mut decoded = decode_all_feedbacks(bytes)?;
657 decoded.sort_by_key(|f| f.feedback_type);
658 assert_eq!(decoded.len(), 2);
659 assert_eq!(decoded[0], f1);
660 assert_eq!(decoded[1], f2);
661
662 let other_bytes = store.post_bundle_feedbacks_bytes_get(&other_location_id)?;
664 let other_decoded = decode_all_feedbacks(other_bytes)?;
665 assert_eq!(other_decoded.len(), 1);
666 assert_eq!(other_decoded[0], f_other);
667
668 Ok(())
669 }
670
671 pub async fn feedback_put_if_more_powerful_test<TEnvironmentFactory: EnvironmentFactory>() -> anyhow::Result<()> {
672 let (_temp_dir, temp_dir_path) = get_temp_dir()?;
673 let factory = TEnvironmentFactory::new(&temp_dir_path);
674 let env = factory.open_next_available(EnvironmentDimensions::default()).await?;
675 let store = &env.environment_store;
676
677 let location_id = Id::random();
678 let post_id = Id::random();
679 let feedback_type = 3u8;
680
681 let get_single = |pow_expected: Pow| -> anyhow::Result<()> {
682 let bytes = store.post_bundle_feedbacks_bytes_get(&location_id)?;
683 let decoded = decode_all_feedbacks(bytes)?;
684 assert_eq!(decoded.len(), 1);
685 assert_eq!(decoded[0].post_id, post_id);
686 assert_eq!(decoded[0].feedback_type, feedback_type);
687 assert_eq!(decoded[0].pow, pow_expected);
688 Ok(())
689 };
690
691 let f_low = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(10));
693 store.post_feedback_put_if_more_powerful(&location_id, &f_low)?;
694 get_single(Pow(10))?;
695
696 let f_high = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
698 store.post_feedback_put_if_more_powerful(&location_id, &f_high)?;
699 get_single(Pow(20))?;
700
701 let salt_before = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
703 let f_equal = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(20));
704 store.post_feedback_put_if_more_powerful(&location_id, &f_equal)?;
705 let salt_after = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?[0].salt;
706 assert_eq!(salt_before, salt_after);
707
708 let f_weaker = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(5));
710 store.post_feedback_put_if_more_powerful(&location_id, &f_weaker)?;
711 get_single(Pow(20))?;
712
713 let f_other_type = EncodedPostFeedbackV1::new(post_id, feedback_type + 1, Salt::random(), Pow(1));
715 store.post_feedback_put_if_more_powerful(&location_id, &f_other_type)?;
716 let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
717 assert_eq!(all.len(), 2);
718
719 let f_other_post = EncodedPostFeedbackV1::new(Id::random(), feedback_type, Salt::random(), Pow(1));
721 store.post_feedback_put_if_more_powerful(&location_id, &f_other_post)?;
722 let all = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
723 assert_eq!(all.len(), 3);
724
725 let f_other_loc = EncodedPostFeedbackV1::new(post_id, feedback_type, Salt::random(), Pow(99));
727 store.post_feedback_put_if_more_powerful(&Id::random(), &f_other_loc)?;
728 let all_original = decode_all_feedbacks(store.post_bundle_feedbacks_bytes_get(&location_id)?)?;
729 assert_eq!(all_original.len(), 3);
730
731 Ok(())
732 }
733}