hashiverse_server_lib/environment/
mem_environment_store.rs1use bytes::Bytes;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use async_trait::async_trait;
19use log::info;
20use ouroboros::self_referencing;
21use parking_lot::{RwLock,RwLockReadGuard};
22use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
23use hashiverse_lib::tools::time::{TimeMillis, TimeMillisBytes};
24use hashiverse_lib::tools::types::Id;
25use crate::environment::environment::{Environment, EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
26use crate::environment::environment_store::EnvironmentStore;
27
28pub struct MemEnvironmentFactory {
29 next_env_id: RwLock<usize>,
30}
31#[async_trait]
32impl EnvironmentFactory for MemEnvironmentFactory {
33 fn new(_base_path: &str) -> Self {
34 Self {
35 next_env_id: RwLock::new(0),
36 }
37 }
38 async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
39 let env_id = {
40 let mut next_env_id = self.next_env_id.write();
41 *next_env_id += 1;
42 *next_env_id
43 };
44
45 let mem_environment_store = Arc::new(MemEnvironmentStore::new(env_id));
46 let environment = Environment::new(mem_environment_store, environment_dimensions).await?;
47 Ok(environment)
48 }
49}
50
51pub struct MemEnvironmentStore {
53 config: RwLock<HashMap<String, Vec<u8>>>,
54 post_bundle_metadata: RwLock<HashMap<Id, Vec<u8>>>,
55 post_bundle_last_accessed: RwLock<BTreeMap<Id, TimeMillisBytes>>,
56 post_bundle_bytes: RwLock<HashMap<Id, Bytes>>,
57 post_bundle_feedbacks: RwLock<HashMap<Id, Vec<EncodedPostFeedbackV1>>> }
59
60impl MemEnvironmentStore {
61 fn new(env_id: usize) -> Self {
62 info!("using MemEnvironmentStore {}", env_id);
63
64 Self {
65 config: RwLock::new(HashMap::new()),
66 post_bundle_metadata: RwLock::new(HashMap::new()),
67 post_bundle_last_accessed: RwLock::new(BTreeMap::new()),
68 post_bundle_bytes: RwLock::new(HashMap::new()),
69 post_bundle_feedbacks: RwLock::new(HashMap::new()),
70 }
71 }
72}
73
74impl EnvironmentStore for MemEnvironmentStore {
75
76 fn post_bundle_count(&self) -> anyhow::Result<usize> {
77 let len = self.post_bundle_last_accessed.read().len();
78 Ok(len)
79 }
80
81 fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
82 let total: usize = self.post_bundle_feedbacks.read().values().map(|entries| entries.len()).sum();
83 Ok(total)
84 }
85
86 fn post_bundle_metadata_get(&self, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
87 let post_bundle_metadata = self.post_bundle_metadata.read();
88 let bytes = post_bundle_metadata.get(location_id);
89 match bytes {
90 Some(bytes) => Ok(Some(postcard::from_bytes(bytes)?)),
91 None => Ok(None),
92 }
93 }
94
95 fn post_bundle_metadata_put(&self, location_id: &Id, post_bundle_metadata: &PostBundleMetadata) -> anyhow::Result<()> {
96 let bytes = postcard::to_stdvec(post_bundle_metadata)?;
97 self.post_bundle_metadata.write().insert(*location_id, bytes.to_vec());
98 Ok(())
99 }
100
101 fn post_bundle_bytes_get(&self, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
102 Ok(self.post_bundle_bytes.read().get(location_id).cloned())
103 }
104
105 fn post_bundle_bytes_put(&self, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
106 self.post_bundle_bytes.write().insert(*location_id, Bytes::copy_from_slice(bytes));
107 Ok(())
108 }
109
110 fn post_bundles_last_accessed_flush(&self, post_bundles_last_accessed: &HashMap<Id, TimeMillis>) -> anyhow::Result<()> {
111 let mut self_post_bundle_last_accessed = self.post_bundle_last_accessed.write();
112 for (location_id, time_millis) in post_bundles_last_accessed.iter() {
113 self_post_bundle_last_accessed.insert(*location_id, time_millis.encode_be());
114 }
115 Ok(())
116 }
117
118 fn post_bundles_delete(&self, location_ids: &[Id]) -> anyhow::Result<()> {
119 let mut post_bundle_metadata = self.post_bundle_metadata.write();
120 let mut post_bundle_last_accessed = self.post_bundle_last_accessed.write();
121 let mut post_bundle_bytes = self.post_bundle_bytes.write();
122 let mut post_bundle_feedbacks = self.post_bundle_feedbacks.write();
123
124 for location_id in location_ids {
125 post_bundle_metadata.remove(location_id);
126 post_bundle_last_accessed.remove(location_id);
127 post_bundle_bytes.remove(location_id);
128 post_bundle_feedbacks.remove(location_id);
129 }
130
131 Ok(())
132 }
133
134 fn post_bundles_last_accessed_iter(&self, location_id: &Id) -> Box<dyn Iterator<Item = Result<(Id, TimeMillisBytes), anyhow::Error>> + '_> {
135 let post_bundle_last_accessed = self.post_bundle_last_accessed.read();
136
137 let iter = LockedPostBundleIterBuilder {
138 guard: post_bundle_last_accessed,
139 inner_builder: |post_bundle_last_accessed| {
140 let it = post_bundle_last_accessed.range(location_id..)
141 .chain(post_bundle_last_accessed.range(..location_id))
142 .map(|(k, v)| Ok((*k, *v)));
143 Box::new(it)
144 },
145 }.build();
146
147 Box::new(iter)
148 }
149
150 fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
151 Ok(self.config.read().get(key).map(|v| v.to_vec()))
152 }
153
154 fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
155 self.config.write().insert(key.to_string(), v);
156 Ok(())
157 }
158
159 fn post_bundle_feedbacks_bytes_get(&self, post_bundle_location_id: &Id) -> anyhow::Result<Bytes> {
160 let mut bytes = Vec::new();
161 let feedbacks = self.post_bundle_feedbacks.read();
162 if let Some(entries) = feedbacks.get(post_bundle_location_id) {
163 for f in entries {
164 EncodedPostFeedbackV1::append_encode_direct_to_bytes(&mut bytes, f.post_id.as_ref(), f.feedback_type, f.salt.as_ref(), f.pow)?;
165 }
166 }
167 Ok(Bytes::from(bytes))
168 }
169
170 fn post_feedback_put_if_more_powerful(&self, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
171 let mut feedbacks = self.post_bundle_feedbacks.write();
172 let entries = feedbacks.entry(*location_id).or_default();
173
174 if let Some(existing) = entries.iter_mut().find(|f| f.post_id == encoded_post_feedback.post_id && f.feedback_type == encoded_post_feedback.feedback_type) {
176 if existing.pow >= encoded_post_feedback.pow {
177 return Ok(());
179 }
180 *existing = encoded_post_feedback.clone();
181 } else {
182 entries.push(encoded_post_feedback.clone());
183 }
184
185 Ok(())
186 }
187}
188
189
190#[self_referencing]
191pub struct LockedPostBundleIter<'a> {
192 guard: RwLockReadGuard<'a, BTreeMap<Id, TimeMillisBytes>>,
193
194 #[borrows(guard)]
195 #[not_covariant] inner: Box<dyn Iterator<Item = Result<(Id, TimeMillisBytes), anyhow::Error>> + 'this>,
197}
198
199impl<'a> Iterator for LockedPostBundleIter<'a> {
200 type Item = Result<(Id, TimeMillisBytes), anyhow::Error>;
201
202 fn next(&mut self) -> Option<Self::Item> {
203 self.with_inner_mut(|it| it.next())
204 }
205}
206
207
208
209#[cfg(test)]
210mod tests {
211 use crate::environment;
212 use crate::environment::mem_environment_store::MemEnvironmentFactory;
213
214 #[tokio::test]
215 async fn basics_test() -> anyhow::Result<()> {
216 environment::environment::tests::basics_test::<MemEnvironmentFactory>().await
217 }
218
219 #[tokio::test]
220 async fn decimation_test() -> anyhow::Result<()> {
221 environment::environment::tests::decimation_test::<MemEnvironmentFactory>().await
222 }
223
224 #[tokio::test]
225 async fn decimation_convergence_test() -> anyhow::Result<()> {
226 environment::environment::tests::decimation_convergence_test::<MemEnvironmentFactory>(128 * 1000).await
227 }
228 #[tokio::test]
229 async fn decimation_existence_test() -> anyhow::Result<()> {
230 environment::environment::tests::decimation_existence_test::<MemEnvironmentFactory>().await
231 }
232 #[tokio::test]
233 async fn decimation_feedback_deleted_test() -> anyhow::Result<()> {
234 environment::environment::tests::decimation_feedback_deleted_test::<MemEnvironmentFactory>().await
235 }
236
237 #[tokio::test]
238 async fn feedback_bytes_get_test() -> anyhow::Result<()> {
239 environment::environment::tests::feedback_bytes_get_test::<MemEnvironmentFactory>().await
240 }
241
242 #[tokio::test]
243 async fn feedback_put_if_more_powerful_test() -> anyhow::Result<()> {
244 environment::environment::tests::feedback_put_if_more_powerful_test::<MemEnvironmentFactory>().await
245 }
246}