Skip to main content

hashiverse_server_lib/environment/
mem_environment_store.rs

1//! # In-memory [`EnvironmentStore`] for tests
2//!
3//! Volatile implementation of
4//! [`crate::environment::environment_store::EnvironmentStore`] backed by a
5//! `HashMap` for bundles/metadata and a `BTreeMap` for feedback (so range queries
6//! over composite `(location, post, feedback_type)` keys work identically to the
7//! disk store).
8//!
9//! One subtlety: the decimation path needs to hold a write lock while *also*
10//! iterating the bundle set, and Rust's borrow rules don't allow that directly.
11//! The module uses an `ouroboros` self-referential struct to safely carry the
12//! guard alongside the iterator, matching what the disk store gets from its
13//! `fjall` cursor.
14
15use bytes::Bytes;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use async_trait::async_trait;
19use log::info;
20use ouroboros::self_referencing;
21use parking_lot::{RwLock,RwLockReadGuard};
22use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
23use hashiverse_lib::tools::time::{TimeMillis, TimeMillisBytes};
24use hashiverse_lib::tools::types::Id;
25use crate::environment::environment::{Environment, EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
26use crate::environment::environment_store::EnvironmentStore;
27
28pub struct MemEnvironmentFactory {
29    next_env_id: RwLock<usize>,
30}
31#[async_trait]
32impl EnvironmentFactory for MemEnvironmentFactory {
33    fn new(_base_path: &str) -> Self {
34        Self {
35            next_env_id: RwLock::new(0),
36        }
37    }
38    async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
39        let env_id = {
40            let mut next_env_id = self.next_env_id.write();
41            *next_env_id += 1;
42            *next_env_id
43        };
44
45        let mem_environment_store = Arc::new(MemEnvironmentStore::new(env_id));
46        let environment = Environment::new(mem_environment_store, environment_dimensions).await?;
47        Ok(environment)
48    }
49}
50
51/// MemEnvironmentStore implements the `EnvironmentStore` trait for use during testing.  Everything is stored in memory and is volatile to restarts.
52
53pub struct MemEnvironmentStore {
54    config: RwLock<HashMap<String, Vec<u8>>>,
55    post_bundle_metadata: RwLock<HashMap<Id, Vec<u8>>>,
56    post_bundle_last_accessed: RwLock<BTreeMap<Id, TimeMillisBytes>>,
57    post_bundle_bytes: RwLock<HashMap<Id, Bytes>>,
58    post_bundle_feedbacks: RwLock<HashMap<Id, Vec<EncodedPostFeedbackV1>>> // Map from post_bundle_location_id -> EncodedPostFeedbackV1
59}
60
61impl MemEnvironmentStore {
62    fn new(env_id: usize) -> Self {
63        info!("using MemEnvironmentStore {}", env_id);
64
65        Self {
66            config: RwLock::new(HashMap::new()),
67            post_bundle_metadata: RwLock::new(HashMap::new()),
68            post_bundle_last_accessed: RwLock::new(BTreeMap::new()),
69            post_bundle_bytes: RwLock::new(HashMap::new()),
70            post_bundle_feedbacks: RwLock::new(HashMap::new()),
71        }
72    }
73}
74
75impl EnvironmentStore for MemEnvironmentStore {
76
77    fn post_bundle_count(&self) -> anyhow::Result<usize> {
78        let len = self.post_bundle_last_accessed.read().len();
79        Ok(len)
80    }
81
82    fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
83        let total: usize = self.post_bundle_feedbacks.read().values().map(|entries| entries.len()).sum();
84        Ok(total)
85    }
86
87    fn post_bundle_metadata_get(&self, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
88        let post_bundle_metadata = self.post_bundle_metadata.read();
89        let bytes = post_bundle_metadata.get(location_id);
90        match bytes {
91            Some(bytes) => Ok(Some(postcard::from_bytes(bytes)?)),
92            None => Ok(None),
93        }
94    }
95
96    fn post_bundle_metadata_put(&self, location_id: &Id, post_bundle_metadata: &PostBundleMetadata) -> anyhow::Result<()> {
97        let bytes = postcard::to_stdvec(post_bundle_metadata)?;
98        self.post_bundle_metadata.write().insert(*location_id, bytes.to_vec());
99        Ok(())
100    }
101
102    fn post_bundle_bytes_get(&self, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
103        Ok(self.post_bundle_bytes.read().get(location_id).cloned())
104    }
105
106    fn post_bundle_bytes_put(&self, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
107        self.post_bundle_bytes.write().insert(*location_id, Bytes::copy_from_slice(bytes));
108        Ok(())
109    }
110
111    fn post_bundles_last_accessed_flush(&self, post_bundles_last_accessed: &HashMap<Id, TimeMillis>) -> anyhow::Result<()> {
112        let mut self_post_bundle_last_accessed = self.post_bundle_last_accessed.write();
113        for (location_id, time_millis) in post_bundles_last_accessed.iter() {
114            self_post_bundle_last_accessed.insert(*location_id, time_millis.encode_be());
115        }
116        Ok(())
117    }
118
119    fn post_bundles_delete(&self, location_ids: &[Id]) -> anyhow::Result<()> {
120        let mut post_bundle_metadata = self.post_bundle_metadata.write();
121        let mut post_bundle_last_accessed = self.post_bundle_last_accessed.write();
122        let mut post_bundle_bytes = self.post_bundle_bytes.write();
123        let mut post_bundle_feedbacks = self.post_bundle_feedbacks.write();
124
125        for location_id in location_ids {
126            post_bundle_metadata.remove(location_id);
127            post_bundle_last_accessed.remove(location_id);
128            post_bundle_bytes.remove(location_id);
129            post_bundle_feedbacks.remove(location_id);
130        }
131
132        Ok(())
133    }
134
135    fn post_bundles_last_accessed_iter(&self, location_id: &Id) -> Box<dyn Iterator<Item = Result<(Id, TimeMillisBytes), anyhow::Error>> + '_> {
136        let post_bundle_last_accessed = self.post_bundle_last_accessed.read();
137
138        let iter = LockedPostBundleIterBuilder {
139            guard: post_bundle_last_accessed,
140            inner_builder: |post_bundle_last_accessed| {
141                let it = post_bundle_last_accessed.range(location_id..)
142                    .chain(post_bundle_last_accessed.range(..location_id))
143                    .map(|(k, v)| Ok((*k, *v)));
144                Box::new(it)
145            },
146        }.build();
147
148        Box::new(iter)
149    }
150
151    fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
152        Ok(self.config.read().get(key).map(|v| v.to_vec()))
153    }
154
155    fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
156        self.config.write().insert(key.to_string(), v);
157        Ok(())
158    }
159
160    fn post_bundle_feedbacks_bytes_get(&self, post_bundle_location_id: &Id) -> anyhow::Result<Bytes> {
161        let mut bytes = Vec::new();
162        let feedbacks = self.post_bundle_feedbacks.read();
163        if let Some(entries) = feedbacks.get(post_bundle_location_id) {
164            for f in entries {
165                EncodedPostFeedbackV1::append_encode_direct_to_bytes(&mut bytes, f.post_id.as_ref(), f.feedback_type, f.salt.as_ref(), f.pow)?;
166            }
167        }
168        Ok(Bytes::from(bytes))
169    }
170
171    fn post_feedback_put_if_more_powerful(&self, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
172        let mut feedbacks = self.post_bundle_feedbacks.write();
173        let entries = feedbacks.entry(*location_id).or_default();
174
175        // Find existing entry for this (post_id, feedback_type) pair
176        if let Some(existing) = entries.iter_mut().find(|f| f.post_id == encoded_post_feedback.post_id && f.feedback_type == encoded_post_feedback.feedback_type) {
177            if existing.pow >= encoded_post_feedback.pow {
178                // Already have equal or better feedback — do not downgrade
179                return Ok(());
180            }
181            *existing = encoded_post_feedback.clone();
182        } else {
183            entries.push(encoded_post_feedback.clone());
184        }
185
186        Ok(())
187    }
188}
189
190
191#[self_referencing]
192pub struct LockedPostBundleIter<'a> {
193    guard: RwLockReadGuard<'a, BTreeMap<Id, TimeMillisBytes>>,
194
195    #[borrows(guard)]
196    #[not_covariant] // Required for iterators
197    inner: Box<dyn Iterator<Item = Result<(Id, TimeMillisBytes), anyhow::Error>> + 'this>,
198}
199
200impl<'a> Iterator for LockedPostBundleIter<'a> {
201    type Item = Result<(Id, TimeMillisBytes), anyhow::Error>;
202
203    fn next(&mut self) -> Option<Self::Item> {
204        self.with_inner_mut(|it| it.next())
205    }
206}
207
208
209
210#[cfg(test)]
211mod tests {
212    use crate::environment;
213    use crate::environment::mem_environment_store::MemEnvironmentFactory;
214
215    #[tokio::test]
216    async fn basics_test() -> anyhow::Result<()> {
217        environment::environment::tests::basics_test::<MemEnvironmentFactory>().await
218    }
219
220    #[tokio::test]
221    async fn decimation_test() -> anyhow::Result<()> {
222        environment::environment::tests::decimation_test::<MemEnvironmentFactory>().await
223    }
224
225    #[tokio::test]
226    async fn decimation_convergence_test() -> anyhow::Result<()> {
227        environment::environment::tests::decimation_convergence_test::<MemEnvironmentFactory>(128 * 1000).await
228    }
229    #[tokio::test]
230    async fn decimation_existence_test() -> anyhow::Result<()> {
231        environment::environment::tests::decimation_existence_test::<MemEnvironmentFactory>().await
232    }
233    #[tokio::test]
234    async fn decimation_feedback_deleted_test() -> anyhow::Result<()> {
235        environment::environment::tests::decimation_feedback_deleted_test::<MemEnvironmentFactory>().await
236    }
237
238    #[tokio::test]
239    async fn feedback_bytes_get_test() -> anyhow::Result<()> {
240        environment::environment::tests::feedback_bytes_get_test::<MemEnvironmentFactory>().await
241    }
242
243    #[tokio::test]
244    async fn feedback_put_if_more_powerful_test() -> anyhow::Result<()> {
245        environment::environment::tests::feedback_put_if_more_powerful_test::<MemEnvironmentFactory>().await
246    }
247}