hashiverse_server_lib/environment/
disk_environment_store.rs1use bytes::Bytes;
21use crate::environment::environment::{Environment, EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
22use crate::environment::environment_store::EnvironmentStore;
23use anyhow::anyhow;
24use async_trait::async_trait;
25use fjall::{Database, Keyspace};
26use fs2::FileExt;
27use hashiverse_lib::tools::time::{TimeMillis, TimeMillisBytes};
28use hashiverse_lib::tools::types::{ID_BYTES, Id, SALT_BYTES, Salt, Pow};
29use log::{info, trace, warn};
30use std::collections::HashMap;
31use std::fs;
32use std::fs::OpenOptions;
33use std::path::PathBuf;
34use std::sync::Arc;
35use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
36
37const MAX_ENVIRONMENTS_PER_NODE: usize = 256;
38
39pub struct DiskEnvironmentFactory {
40 base_path: String,
41}
42
43#[async_trait]
44impl EnvironmentFactory for DiskEnvironmentFactory {
45 fn new(base_path: &str) -> Self {
46 Self { base_path: base_path.to_string() }
47 }
48
49 async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
50 for env_id in 1..=MAX_ENVIRONMENTS_PER_NODE {
51 let disk_environment_store = DiskEnvironmentStore::new(&self.base_path, env_id);
52 match disk_environment_store {
53 Ok(disk_environment_store) => return Environment::new(Arc::new(disk_environment_store), environment_dimensions).await,
54 Err(_) => continue,
55 }
56 }
57
58 anyhow::bail!("no environments available")
59 }
60}
61
62pub struct DiskEnvironmentStore {
72 path: PathBuf,
73 #[allow(dead_code)]
74 lock_file: fs::File, database: Database,
76 keyspace_config: Keyspace,
77 keyspace_post_bundle_last_accessed: Keyspace, keyspace_post_bundle_metadata: Keyspace, keyspace_post_bundle_feedback: Keyspace, }
81
82impl DiskEnvironmentStore {
83 fn new(base_path: &str, env_id: usize) -> anyhow::Result<Self> {
84 let path = PathBuf::from(base_path).join(env_id.to_string());
85
86 fs::create_dir_all(&path)?;
88
89 let lock_path = path.join("lock");
91 let lock_file = OpenOptions::new().create(true).truncate(true).read(true).write(true).open(&lock_path)?;
92 lock_file.try_lock_exclusive()?;
93
94 let database = Database::builder(path.join("database")).open()?;
96 let keyspace_config = database.keyspace("config", fjall::KeyspaceCreateOptions::default)?;
97 let keyspace_post_bundle_last_accessed = database.keyspace("post_bundle_last_accessed", fjall::KeyspaceCreateOptions::default)?;
98 let keyspace_post_bundle_metadata = database.keyspace("keyspace_post_bundle_metadata", fjall::KeyspaceCreateOptions::default)?;
99 let keyspace_post_bundle_feedback = database.keyspace("keyspace_post_bundle_feedback", fjall::KeyspaceCreateOptions::default)?;
100
101 info!("using environment {} at {}", env_id, path.to_str().unwrap());
102
103 Ok(Self {
104 path,
105 lock_file,
106 database,
107 keyspace_config,
108 keyspace_post_bundle_last_accessed,
109 keyspace_post_bundle_metadata,
110 keyspace_post_bundle_feedback,
111 })
112 }
113
114 fn path_for_location_id(&self, location_id: &Id) -> (PathBuf, PathBuf) {
115 let b0 = format!("{:02x}", location_id.0[0]);
121 let b1 = format!("{:02x}", location_id.0[1]);
122
123 let directory = self.path.join("post_bundles").join(b0).join(b1);
124 let filename = directory.join(location_id.to_hex_str());
125
126 (directory, filename)
127 }
128}
129
130impl EnvironmentStore for DiskEnvironmentStore {
131 fn post_bundle_count(&self) -> anyhow::Result<usize> {
132 let len = self.keyspace_post_bundle_last_accessed.len()?;
133 Ok(len)
134 }
135
136 fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
137 let len = self.keyspace_post_bundle_feedback.len()?;
138 Ok(len)
139 }
140 fn post_bundle_metadata_get(&self, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
141 let guard = self.keyspace_post_bundle_metadata.get(location_id)?;
142 match guard {
143 Some(guard) => Ok(Some(postcard::from_bytes(&guard)?)),
144 None => Ok(None),
145 }
146 }
147
148 fn post_bundle_metadata_put(&self, location_id: &Id, post_bundle_metadata: &PostBundleMetadata) -> anyhow::Result<()> {
149 let mut scratch = [0u8; 64];
151 let scratch_used = postcard::to_slice(post_bundle_metadata, &mut scratch)?;
152 self.keyspace_post_bundle_metadata.insert(location_id.0, scratch_used.as_ref())?;
153 Ok(())
154 }
155
156 fn post_bundle_bytes_get(&self, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
157 let (_directory, filename) = self.path_for_location_id(location_id);
158 let result = fs::read(filename).ok().map(Bytes::from);
159 Ok(result)
160 }
161
162 fn post_bundle_bytes_put(&self, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
163 let (directory, filename) = self.path_for_location_id(location_id);
164 let filename_temp = filename.with_added_extension("tmp");
165 fs::create_dir_all(&directory)?;
166 fs::write(&filename_temp, bytes)?;
167 fs::rename(&filename_temp, &filename)?;
168 Ok(())
169 }
170
171 fn post_bundle_feedbacks_bytes_get(&self, post_bundle_location_id: &Id) -> anyhow::Result<Bytes> {
172 let mut bytes = Vec::new();
173
174 self.keyspace_post_bundle_feedback.prefix(post_bundle_location_id).for_each(|guard| {
175 let try_result = try {
176 let (key, value) = guard.into_inner().map_err(|e| anyhow!("{}", e))?;
177 let post_bundle_feedback_key = PostBundleFeedbackKey::from_slice(&key)?;
178 let post_bundle_feedback_value = PostBundleFeedbackValue::from_slice(&value)?;
179 EncodedPostFeedbackV1::append_encode_direct_to_bytes(
180 &mut bytes,
181 post_bundle_feedback_key.post_id_bytes(),
182 post_bundle_feedback_key.feedback_type(),
183 post_bundle_feedback_value.salt_bytes(),
184 post_bundle_feedback_value.pow(),
185 )?;
186 };
187
188 if let Err(e) = try_result {
189 warn!("unexpectedly unable to encode post bundle feedback: {}", e);
190 }
191 });
192
193 Ok(Bytes::from(bytes))
194 }
195
196
197 fn post_feedback_put_if_more_powerful(&self, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
198 let post_bundle_feedback_key = PostBundleFeedbackKey::new(location_id, &encoded_post_feedback.post_id, encoded_post_feedback.feedback_type);
199
200 let post_bundle_feedback_value = self.keyspace_post_bundle_feedback.get(&post_bundle_feedback_key)?;
202 if let Some(post_bundle_feedback_value) = post_bundle_feedback_value {
203 let post_bundle_feedback_value = PostBundleFeedbackValue::from_slice(&post_bundle_feedback_value)?;
204 if post_bundle_feedback_value.pow() >= encoded_post_feedback.pow {
205 trace!("Not storing lesser feedback for location_id={} with existing pow={}: feedback={:?}", location_id, post_bundle_feedback_value.pow(), encoded_post_feedback);
206 return Ok(());
207 }
208 }
209
210 {
212 let post_bundle_feedback_value = PostBundleFeedbackValue::new(encoded_post_feedback.salt, encoded_post_feedback.pow);
213 self.keyspace_post_bundle_feedback.insert(post_bundle_feedback_key.0, post_bundle_feedback_value.0)?;
214 }
215
216 Ok(())
217 }
218
219 fn post_bundles_delete(&self, location_ids: &[Id]) -> anyhow::Result<()> {
220 let post_bundle_bytes_delete = |location_id: &Id| -> anyhow::Result<()> {
221 let (_directory, filename) = self.path_for_location_id(location_id);
222 let _result = fs::remove_file(filename);
223 Ok(())
224 };
225
226 let mut feedback_keys_to_delete: Vec<Vec<u8>> = Vec::new();
228 for location_id in location_ids {
229 self.keyspace_post_bundle_feedback.prefix(location_id).for_each(|guard| {
230 let try_result: anyhow::Result<()> = try {
231 let (key, _) = guard.into_inner().map_err(|e| anyhow!("{}", e))?;
232 feedback_keys_to_delete.push(key.to_vec());
233 };
234 if let Err(e) = try_result {
235 warn!("failed to collect feedback key for deletion: {}", e);
236 }
237 });
238 }
239
240 let mut batch = self.database.batch();
241 for location_id in location_ids {
242 batch.remove(&self.keyspace_post_bundle_metadata, location_id.0);
243 batch.remove(&self.keyspace_post_bundle_last_accessed, location_id.0);
244 post_bundle_bytes_delete(location_id)?;
245 }
246 for key in &feedback_keys_to_delete {
247 batch.remove(&self.keyspace_post_bundle_feedback, key.as_slice());
248 }
249 batch.commit()?;
250
251 Ok(())
252 }
253
254 fn post_bundles_last_accessed_flush(&self, post_bundles_last_accessed: &HashMap<Id, TimeMillis>) -> anyhow::Result<()> {
255 let mut batch = self.database.batch();
256 for (location_id, time_millis) in post_bundles_last_accessed.iter() {
257 let time_millis_bytes = time_millis.encode_be();
258 batch.insert(&self.keyspace_post_bundle_last_accessed, location_id.0, time_millis_bytes.0);
259 }
260 batch.commit()?;
261 Ok(())
262 }
263
264 fn post_bundles_last_accessed_iter(&self, location_id: &Id) -> Box<dyn Iterator<Item = Result<(Id, TimeMillisBytes), anyhow::Error>> + '_> {
265 let it = self
266 .keyspace_post_bundle_last_accessed
267 .range(location_id.to_string()..)
268 .chain(self.keyspace_post_bundle_last_accessed.range(..location_id.to_string()))
269 .map(|guard| {
270 let (location_id, time_millis_bytes) = guard.into_inner().map_err(|e| anyhow!("{}", e))?;
271 let location_id = Id::from_slice(&location_id)?;
272 let time_millis_bytes = TimeMillisBytes::from_bytes(&time_millis_bytes)?;
273 Ok((location_id, time_millis_bytes))
274 });
275
276 Box::new(it)
277 }
278
279 fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
280 Ok(self.keyspace_config.get(key)?.map(|v| v.to_vec()))
281 }
282
283 fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
284 self.keyspace_config.insert(key, v)?;
285 Ok(())
286 }
287}
288
289const POST_BUNDLE_FEEDBACK_KEY_SIZE: usize = ID_BYTES + ID_BYTES + 1;
290pub struct PostBundleFeedbackKey(pub [u8; POST_BUNDLE_FEEDBACK_KEY_SIZE]);
291
292impl PostBundleFeedbackKey {
293 pub fn new(location_id: &Id, post_id: &Id, feedback_type: u8) -> Self {
294 let mut bytes = [0u8; POST_BUNDLE_FEEDBACK_KEY_SIZE];
295 bytes[0..ID_BYTES].copy_from_slice(location_id.as_bytes());
296 bytes[ID_BYTES..2*ID_BYTES].copy_from_slice(post_id.as_bytes());
297 bytes[2*ID_BYTES] = feedback_type;
298 Self(bytes)
299 }
300
301 pub fn from_slice(bytes: &[u8]) -> anyhow::Result<Self> {
302 let bytes: [u8; POST_BUNDLE_FEEDBACK_KEY_SIZE] = bytes
303 .try_into()
304 .map_err(|_| anyhow::anyhow!("Invalid PostBundleFeedbackKey length: expected {}, got {}", POST_BUNDLE_FEEDBACK_KEY_SIZE, bytes.len()))?;
305 Ok(Self(bytes))
306 }
307
308 pub fn post_bundle_location_id_bytes(&self) -> &[u8] {
309 &self.0[0..ID_BYTES]
310 }
311 pub fn post_id_bytes(&self) -> &[u8] {
312 &self.0[ID_BYTES..2*ID_BYTES]
313 }
314
315 pub fn feedback_type(&self) -> u8 {
316 self.0[2*ID_BYTES]
317 }
318}
319
320impl AsRef<[u8]> for PostBundleFeedbackKey {
321 fn as_ref(&self) -> &[u8] {
322 &self.0
323 }
324}
325
326const POST_BUNDLE_FEEDBACK_VALUE_SIZE: usize = SALT_BYTES + 1;
327pub struct PostBundleFeedbackValue(pub [u8; POST_BUNDLE_FEEDBACK_VALUE_SIZE]);
328
329impl PostBundleFeedbackValue {
330 pub fn new(salt: Salt, pow: Pow) -> Self {
331 let mut bytes = [0u8; POST_BUNDLE_FEEDBACK_VALUE_SIZE];
332 bytes[0..SALT_BYTES].copy_from_slice(salt.as_slice());
333 bytes[SALT_BYTES] = pow.0;
334 Self(bytes)
335 }
336
337 pub fn from_slice(bytes: &[u8]) -> anyhow::Result<Self> {
338 let bytes: [u8; POST_BUNDLE_FEEDBACK_VALUE_SIZE] = bytes
339 .try_into()
340 .map_err(|_| anyhow::anyhow!("Invalid PostBundleFeedbackValue length: expected {}, got {}", POST_BUNDLE_FEEDBACK_VALUE_SIZE, bytes.len()))?;
341 Ok(Self(bytes))
342 }
343 pub fn salt_bytes(&self) -> &[u8] {
344 &self.0[0..SALT_BYTES]
345 }
346 pub fn pow(&self) -> Pow {
347 Pow(self.0[SALT_BYTES])
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use crate::environment;
354 use crate::environment::disk_environment_store::DiskEnvironmentFactory;
355
356 #[tokio::test]
357 async fn basics_test() -> anyhow::Result<()> {
358 environment::environment::tests::basics_test::<DiskEnvironmentFactory>().await
359 }
360
361
362 #[tokio::test]
363 async fn feedback_bytes_get_test() -> anyhow::Result<()> {
364 environment::environment::tests::feedback_bytes_get_test::<DiskEnvironmentFactory>().await
365 }
366
367 #[tokio::test]
368 async fn feedback_put_if_more_powerful_test() -> anyhow::Result<()> {
369 environment::environment::tests::feedback_put_if_more_powerful_test::<DiskEnvironmentFactory>().await
370 }
371
372 }