Skip to main content

hashiverse_server_lib/environment/
disk_environment_store.rs

1//! # Production disk [`EnvironmentStore`]
2//!
3//! Backing store used by the real server binary. Persists to a `fjall` keyspace for
4//! metadata and a two-level directory tree (256² slots, ≈16 M bundles before any
5//! single directory grows past ~64 K entries) for the bundle bodies. Writes go
6//! through a temp-file + rename dance so a mid-write crash never leaves a torn
7//! bundle, and feedback-update batches are atomic at the `fjall` level.
8//!
9//! Library choice rationale:
10//! - **fjall** over **redb** (redb compacts synchronously and blocks writes during
11//!   it, which we can't afford on a single-node server) and over **sled** (stalled
12//!   upstream; its v1.0 rewrite hasn't landed).
13//! - **postcard** for metadata serialisation — compact, fast, stable format, no
14//!   schema on disk.
15//!
16//! Feedback entries are stored under composite keys `(location_id, post_id,
17//! feedback_type)` so range iteration can walk all feedback for a given bundle
18//! location during decimation in a single sequential scan.
19
20use bytes::Bytes;
21use crate::environment::environment::{Environment, EnvironmentDimensions, EnvironmentFactory, PostBundleMetadata};
22use crate::environment::environment_store::EnvironmentStore;
23use anyhow::anyhow;
24use async_trait::async_trait;
25use fjall::{Database, Keyspace};
26use fs2::FileExt;
27use hashiverse_lib::tools::time::{TimeMillis, TimeMillisBytes};
28use hashiverse_lib::tools::types::{ID_BYTES, Id, SALT_BYTES, Salt, Pow};
29use log::{info, trace, warn};
30use std::collections::HashMap;
31use std::fs;
32use std::fs::OpenOptions;
33use std::path::PathBuf;
34use std::sync::Arc;
35use hashiverse_lib::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
36
37const MAX_ENVIRONMENTS_PER_NODE: usize = 256;
38
39pub struct DiskEnvironmentFactory {
40    base_path: String,
41}
42
43#[async_trait]
44impl EnvironmentFactory for DiskEnvironmentFactory {
45    fn new(base_path: &str) -> Self {
46        Self { base_path: base_path.to_string() }
47    }
48
49    async fn open_next_available(&self, environment_dimensions: EnvironmentDimensions) -> anyhow::Result<Environment> {
50        for env_id in 1..=MAX_ENVIRONMENTS_PER_NODE {
51            let disk_environment_store = DiskEnvironmentStore::new(&self.base_path, env_id);
52            match disk_environment_store {
53                Ok(disk_environment_store) => return Environment::new(Arc::new(disk_environment_store), environment_dimensions).await,
54                Err(_) => continue,
55            }
56        }
57
58        anyhow::bail!("no environments available")
59    }
60}
61
62/// DiskEnvironmentStore implements the `EnvironmentStore` trait for production use.  PostBundles are stored as files in a directory tree, while config, metadata, feedback, and post-expiry metainformation are stored in a key-value database. 
63
64/// A lot of time was spent comparing the different available databases.  Given the nature of hashiverse, it makes sense that some sort of key-value (KV) database is used.
65/// For simplicity of building and maintenance, we decided to use a pure-Rust implementation.  This narrowed teh field down to redb, sled, and fjall.
66/// We would have liked to use redb, because of its self-reported performance with individual writes and random range reads - both things that hashiverse does a lot of.
67/// Unfortunately it requires occasional compaction of the ever-growing files on disk, something that needs downtime and 2x disk space to do.  This essentially killed it for us.
68/// Next was sled, which autocompacts and seemed the next most performant, but there seems to be little improvement and support for it - it's own github repo says use this as beta software only, and work on its v1.0 release seems to have stalled.
69/// That leaves fjall - seemingly third place in performance, but gauging by its github activity, is ever improving and is highly supported.  It also does autocompaction.
70
71pub struct DiskEnvironmentStore {
72    path: PathBuf,
73    #[allow(dead_code)]
74    lock_file: fs::File, // Blocks other processes from claiming this environment on disk
75    database: Database,
76    keyspace_config: Keyspace,
77    keyspace_post_bundle_last_accessed: Keyspace, // location_id -> TimeMillis
78    keyspace_post_bundle_metadata: Keyspace,      // location_id -> PostBundleMetadata
79    keyspace_post_bundle_feedback: Keyspace,      // [location_id,post_id,feedback_type] -> [salt,pow]
80}
81
82impl DiskEnvironmentStore {
83    fn new(base_path: &str, env_id: usize) -> anyhow::Result<Self> {
84        let path = PathBuf::from(base_path).join(env_id.to_string());
85
86        // Check that the path exists
87        fs::create_dir_all(&path)?;
88
89        // Attempt to grab the lock
90        let lock_path = path.join("lock");
91        let lock_file = OpenOptions::new().create(true).truncate(true).read(true).write(true).open(&lock_path)?;
92        lock_file.try_lock_exclusive()?;
93
94        // Make sure the various stores exist
95        let database = Database::builder(path.join("database")).open()?;
96        let keyspace_config = database.keyspace("config", fjall::KeyspaceCreateOptions::default)?;
97        let keyspace_post_bundle_last_accessed = database.keyspace("post_bundle_last_accessed", fjall::KeyspaceCreateOptions::default)?;
98        let keyspace_post_bundle_metadata = database.keyspace("keyspace_post_bundle_metadata", fjall::KeyspaceCreateOptions::default)?;
99        let keyspace_post_bundle_feedback = database.keyspace("keyspace_post_bundle_feedback", fjall::KeyspaceCreateOptions::default)?;
100
101        info!("using environment {} at {}", env_id, path.to_str().unwrap());
102
103        Ok(Self {
104            path,
105            lock_file,
106            database,
107            keyspace_config,
108            keyspace_post_bundle_last_accessed,
109            keyspace_post_bundle_metadata,
110            keyspace_post_bundle_feedback,
111        })
112    }
113
114    fn path_for_location_id(&self, location_id: &Id) -> (PathBuf, PathBuf) {
115        // Two indirections of with 256 files in the folder allows 256^3 = 16million postbundles
116        // Assuming each is at tiniest 1kb, that implies about 16Gb of disk space, which is what we are aiming for.
117        // If we wanted to substantially increase the disk size supported by a hashiverse node, we might have to intro another level of indirection.
118        // Note that it is likely ext4 will also start running out of inodes if posts are too small - inodes on ext4 assume a 16kb file on average.
119        // Perhaps XFS is mandated?  Though the tradeoff is XFS is slower than ext4 when handling little files...
120        let b0 = format!("{:02x}", location_id.0[0]);
121        let b1 = format!("{:02x}", location_id.0[1]);
122
123        let directory = self.path.join("post_bundles").join(b0).join(b1);
124        let filename = directory.join(location_id.to_hex_str());
125
126        (directory, filename)
127    }
128}
129
130impl EnvironmentStore for DiskEnvironmentStore {
131    fn post_bundle_count(&self) -> anyhow::Result<usize> {
132        let len = self.keyspace_post_bundle_last_accessed.len()?;
133        Ok(len)
134    }
135
136    fn post_bundle_feedback_count(&self) -> anyhow::Result<usize> {
137        let len = self.keyspace_post_bundle_feedback.len()?;
138        Ok(len)
139    }
140    fn post_bundle_metadata_get(&self, location_id: &Id) -> anyhow::Result<Option<PostBundleMetadata>> {
141        let guard = self.keyspace_post_bundle_metadata.get(location_id)?;
142        match guard {
143            Some(guard) => Ok(Some(postcard::from_bytes(&guard)?)),
144            None => Ok(None),
145        }
146    }
147
148    fn post_bundle_metadata_put(&self, location_id: &Id, post_bundle_metadata: &PostBundleMetadata) -> anyhow::Result<()> {
149        // Much faster than allocating a Vec each time...
150        let mut scratch = [0u8; 64];
151        let scratch_used = postcard::to_slice(post_bundle_metadata, &mut scratch)?;
152        self.keyspace_post_bundle_metadata.insert(location_id.0, scratch_used.as_ref())?;
153        Ok(())
154    }
155
156    fn post_bundle_bytes_get(&self, location_id: &Id) -> anyhow::Result<Option<Bytes>> {
157        let (_directory, filename) = self.path_for_location_id(location_id);
158        let result = fs::read(filename).ok().map(Bytes::from);
159        Ok(result)
160    }
161
162    fn post_bundle_bytes_put(&self, location_id: &Id, bytes: &[u8]) -> anyhow::Result<()> {
163        let (directory, filename) = self.path_for_location_id(location_id);
164        let filename_temp = filename.with_added_extension("tmp");
165        fs::create_dir_all(&directory)?;
166        fs::write(&filename_temp, bytes)?;
167        fs::rename(&filename_temp, &filename)?;
168        Ok(())
169    }
170
171    fn post_bundle_feedbacks_bytes_get(&self, post_bundle_location_id: &Id) -> anyhow::Result<Bytes> {
172        let mut bytes = Vec::new();
173
174        self.keyspace_post_bundle_feedback.prefix(post_bundle_location_id).for_each(|guard| {
175            let try_result = try {
176                let (key, value) = guard.into_inner().map_err(|e| anyhow!("{}", e))?;
177                let post_bundle_feedback_key = PostBundleFeedbackKey::from_slice(&key)?;
178                let post_bundle_feedback_value = PostBundleFeedbackValue::from_slice(&value)?;
179                EncodedPostFeedbackV1::append_encode_direct_to_bytes(
180                    &mut bytes,
181                    post_bundle_feedback_key.post_id_bytes(),
182                    post_bundle_feedback_key.feedback_type(),
183                    post_bundle_feedback_value.salt_bytes(),
184                    post_bundle_feedback_value.pow(),
185                )?;
186            };
187
188            if let Err(e) = try_result {
189                warn!("unexpectedly unable to encode post bundle feedback: {}", e);
190            }
191        });
192
193        Ok(Bytes::from(bytes))
194    }
195
196
197    fn post_feedback_put_if_more_powerful(&self, location_id: &Id, encoded_post_feedback: &EncodedPostFeedbackV1) -> anyhow::Result<()> {
198        let post_bundle_feedback_key = PostBundleFeedbackKey::new(location_id, &encoded_post_feedback.post_id, encoded_post_feedback.feedback_type);
199
200        // Check that we don't already have much better feedback
201        let post_bundle_feedback_value = self.keyspace_post_bundle_feedback.get(&post_bundle_feedback_key)?;
202        if let Some(post_bundle_feedback_value) = post_bundle_feedback_value {
203            let post_bundle_feedback_value = PostBundleFeedbackValue::from_slice(&post_bundle_feedback_value)?;
204            if post_bundle_feedback_value.pow() >= encoded_post_feedback.pow {
205                trace!("Not storing lesser feedback for location_id={} with existing pow={}: feedback={:?}", location_id, post_bundle_feedback_value.pow(), encoded_post_feedback);
206                return Ok(());
207            }
208        }
209
210        // Commit the improved feedback
211        {
212            let post_bundle_feedback_value = PostBundleFeedbackValue::new(encoded_post_feedback.salt, encoded_post_feedback.pow);
213            self.keyspace_post_bundle_feedback.insert(post_bundle_feedback_key.0, post_bundle_feedback_value.0)?;
214        }
215
216        Ok(())
217    }
218
219    fn post_bundles_delete(&self, location_ids: &[Id]) -> anyhow::Result<()> {
220        let post_bundle_bytes_delete = |location_id: &Id| -> anyhow::Result<()> {
221            let (_directory, filename) = self.path_for_location_id(location_id);
222            let _result = fs::remove_file(filename);
223            Ok(())
224        };
225
226        // Collect feedback keys before opening the batch to avoid borrow conflicts
227        let mut feedback_keys_to_delete: Vec<Vec<u8>> = Vec::new();
228        for location_id in location_ids {
229            self.keyspace_post_bundle_feedback.prefix(location_id).for_each(|guard| {
230                let try_result: anyhow::Result<()> = try {
231                    let (key, _) = guard.into_inner().map_err(|e| anyhow!("{}", e))?;
232                    feedback_keys_to_delete.push(key.to_vec());
233                };
234                if let Err(e) = try_result {
235                    warn!("failed to collect feedback key for deletion: {}", e);
236                }
237            });
238        }
239
240        let mut batch = self.database.batch();
241        for location_id in location_ids {
242            batch.remove(&self.keyspace_post_bundle_metadata, location_id.0);
243            batch.remove(&self.keyspace_post_bundle_last_accessed, location_id.0);
244            post_bundle_bytes_delete(location_id)?;
245        }
246        for key in &feedback_keys_to_delete {
247            batch.remove(&self.keyspace_post_bundle_feedback, key.as_slice());
248        }
249        batch.commit()?;
250
251        Ok(())
252    }
253
254    fn post_bundles_last_accessed_flush(&self, post_bundles_last_accessed: &HashMap<Id, TimeMillis>) -> anyhow::Result<()> {
255        let mut batch = self.database.batch();
256        for (location_id, time_millis) in post_bundles_last_accessed.iter() {
257            let time_millis_bytes = time_millis.encode_be();
258            batch.insert(&self.keyspace_post_bundle_last_accessed, location_id.0, time_millis_bytes.0);
259        }
260        batch.commit()?;
261        Ok(())
262    }
263
264    fn post_bundles_last_accessed_iter(&self, location_id: &Id) -> Box<dyn Iterator<Item = Result<(Id, TimeMillisBytes), anyhow::Error>> + '_> {
265        let it = self
266            .keyspace_post_bundle_last_accessed
267            .range(location_id.to_string()..)
268            .chain(self.keyspace_post_bundle_last_accessed.range(..location_id.to_string()))
269            .map(|guard| {
270                let (location_id, time_millis_bytes) = guard.into_inner().map_err(|e| anyhow!("{}", e))?;
271                let location_id = Id::from_slice(&location_id)?;
272                let time_millis_bytes = TimeMillisBytes::from_bytes(&time_millis_bytes)?;
273                Ok((location_id, time_millis_bytes))
274            });
275
276        Box::new(it)
277    }
278
279    fn config_get_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
280        Ok(self.keyspace_config.get(key)?.map(|v| v.to_vec()))
281    }
282
283    fn config_put_bytes(&self, key: &str, v: Vec<u8>) -> anyhow::Result<()> {
284        self.keyspace_config.insert(key, v)?;
285        Ok(())
286    }
287}
288
289const POST_BUNDLE_FEEDBACK_KEY_SIZE: usize = ID_BYTES + ID_BYTES + 1;
290pub struct PostBundleFeedbackKey(pub [u8; POST_BUNDLE_FEEDBACK_KEY_SIZE]);
291
292impl PostBundleFeedbackKey {
293    pub fn new(location_id: &Id, post_id: &Id, feedback_type: u8) -> Self {
294        let mut bytes = [0u8; POST_BUNDLE_FEEDBACK_KEY_SIZE];
295        bytes[0..ID_BYTES].copy_from_slice(location_id.as_bytes());
296        bytes[ID_BYTES..2*ID_BYTES].copy_from_slice(post_id.as_bytes());
297        bytes[2*ID_BYTES] = feedback_type;
298        Self(bytes)
299    }
300
301    pub fn from_slice(bytes: &[u8]) -> anyhow::Result<Self> {
302        let bytes: [u8; POST_BUNDLE_FEEDBACK_KEY_SIZE] = bytes
303            .try_into()
304            .map_err(|_| anyhow::anyhow!("Invalid PostBundleFeedbackKey length: expected {}, got {}", POST_BUNDLE_FEEDBACK_KEY_SIZE, bytes.len()))?;
305        Ok(Self(bytes))
306    }
307
308    pub fn post_bundle_location_id_bytes(&self) -> &[u8] {
309        &self.0[0..ID_BYTES]
310    }
311    pub fn post_id_bytes(&self) -> &[u8] {
312        &self.0[ID_BYTES..2*ID_BYTES]
313    }
314
315    pub fn feedback_type(&self) -> u8 {
316        self.0[2*ID_BYTES]
317    }
318}
319
320impl AsRef<[u8]> for PostBundleFeedbackKey {
321    fn as_ref(&self) -> &[u8] {
322        &self.0
323    }
324}
325
326const POST_BUNDLE_FEEDBACK_VALUE_SIZE: usize = SALT_BYTES + 1;
327pub struct PostBundleFeedbackValue(pub [u8; POST_BUNDLE_FEEDBACK_VALUE_SIZE]);
328
329impl PostBundleFeedbackValue {
330    pub fn new(salt: Salt, pow: Pow) -> Self {
331        let mut bytes = [0u8; POST_BUNDLE_FEEDBACK_VALUE_SIZE];
332        bytes[0..SALT_BYTES].copy_from_slice(salt.as_slice());
333        bytes[SALT_BYTES] = pow.0;
334        Self(bytes)
335    }
336
337    pub fn from_slice(bytes: &[u8]) -> anyhow::Result<Self> {
338        let bytes: [u8; POST_BUNDLE_FEEDBACK_VALUE_SIZE] = bytes
339            .try_into()
340            .map_err(|_| anyhow::anyhow!("Invalid PostBundleFeedbackValue length: expected {}, got {}", POST_BUNDLE_FEEDBACK_VALUE_SIZE, bytes.len()))?;
341        Ok(Self(bytes))
342    }
343    pub fn salt_bytes(&self) -> &[u8] {
344        &self.0[0..SALT_BYTES]
345    }
346    pub fn pow(&self) -> Pow {
347        Pow(self.0[SALT_BYTES])
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use crate::environment;
354    use crate::environment::disk_environment_store::DiskEnvironmentFactory;
355
356    #[tokio::test]
357    async fn basics_test() -> anyhow::Result<()> {
358        environment::environment::tests::basics_test::<DiskEnvironmentFactory>().await
359    }
360
361
362    #[tokio::test]
363    async fn feedback_bytes_get_test() -> anyhow::Result<()> {
364        environment::environment::tests::feedback_bytes_get_test::<DiskEnvironmentFactory>().await
365    }
366
367    #[tokio::test]
368    async fn feedback_put_if_more_powerful_test() -> anyhow::Result<()> {
369        environment::environment::tests::feedback_put_if_more_powerful_test::<DiskEnvironmentFactory>().await
370    }
371
372    // These are brutally expensive on disk - we rely on the memory stub for the decimation tests
373    //
374    // #[tokio::test]
375    // async fn decimation_feedback_deleted_test() -> anyhow::Result<()> {
376    //     environment::environment::tests::decimation_feedback_deleted_test::<DiskEnvironmentFactory>().await
377    // }
378    // #[tokio::test]
379    // async fn decimation_test() -> anyhow::Result<()> {
380    //     environment::environment::tests::decimation_test::<DiskEnvironmentFactory>().await
381    // }
382    // #[tokio::test]
383    // async fn decimation_convergence_test() -> anyhow::Result<()> {
384    //     environment::environment::tests::decimation_convergence_test::<DiskEnvironmentFactory>(2 * 1000).await
385    // }
386    // #[tokio::test]
387    // async fn decimation_existence_test() -> anyhow::Result<()> { environment::environment::tests::decimation_existence_test::<DiskEnvironmentFactory>().await }
388}