Skip to main content

hashiverse_server_lib/server/
post_bundle_caching.rs

1//! # Post-bundle read cache
2//!
3//! A `moka` weighted cache that holds recently-served post bundles in RAM so repeated
4//! reads for the same `(location_id, originator)` don't thrash disk. Multiple versions
5//! per `location_id` are kept because different peer originators hold different slices
6//! of the same bucket — the cache index is keyed by `(location_id, originator_id)`.
7//!
8//! The cache doubles as a **propagation hint engine**: once a hot entry has been
9//! requested more than [`crate::server::post_bundle_caching_shared::CACHE_HIT_THRESHOLD`]
10//! times (10), the next requester's response includes a signed
11//! [`hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1`] asking them to
12//! forward the bundle to additional servers. This turns read pressure into better
13//! cache distribution without requiring explicit coordination.
14//!
15//! An `inflight` sibling cache tracks outstanding tokens so the same hot bundle doesn't
16//! generate redundant cache-request tokens in parallel.
17
18use bytes::Bytes;
19use hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1;
20use hashiverse_lib::protocol::peer::Peer;
21use hashiverse_lib::tools::buckets::BucketLocation;
22use hashiverse_lib::tools::server_id::ServerId;
23use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE};
24use hashiverse_lib::tools::time_provider::moka_clock::TimeProviderMokaClock;
25use hashiverse_lib::tools::time_provider::time_provider::TimeProvider;
26use hashiverse_lib::tools::tools::leading_agreement_bits_xor;
27use hashiverse_lib::tools::types::Id;
28use moka::sync::Cache;
29use std::collections::HashMap;
30use std::sync::{Arc, Mutex};
31
32use crate::server::post_bundle_caching_shared::{CachedBundle, GetCacheResult, CACHE_HIT_THRESHOLD, CACHE_LOCATION_TTI, CACHE_REQUEST_TOKEN_TTL_DURATION, CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS};
33
34/// Placeholder weight for post bundle entries — approximates the real data that will replace it.
35/// Ballpark: 20 posts × 50 KB each × up to 5 originators ≈ 5 MB per location.
36const POST_BUNDLE_PLACEHOLDER_WEIGHT: u32 = 4 * 1024 * 1024;
37
38// --------------------------------------------------------------------------------------------
39// CachedPostBundleLocationEntry
40// --------------------------------------------------------------------------------------------
41
42/// Per-location entry for post bundles.
43/// Multiple originator versions are stored — different servers may hold different subsets of posts.
44struct CachedPostBundleLocationEntry {
45    /// Originator peer_id → cached bundle.
46    bundles: HashMap<Id, CachedBundle>,
47    hit_count: u32,
48}
49
50impl CachedPostBundleLocationEntry {
51    fn placeholder() -> Self {
52        Self { bundles: HashMap::new(), hit_count: 0 }
53    }
54
55    fn weight(&self) -> u32 {
56        let total: u32 = self.bundles.values().map(|b| b.bytes.len() as u32).sum();
57        if total == 0 { POST_BUNDLE_PLACEHOLDER_WEIGHT } else { total }
58    }
59}
60
61// --------------------------------------------------------------------------------------------
62// PostBundleCache
63// --------------------------------------------------------------------------------------------
64
65/// Intermediate-server cache for `EncodedPostBundleV1` data.
66///
67/// Two Moka caches:
68/// - `bundles`: weighted `Cache<Id, Arc<Mutex<CachedPostBundleLocationEntry>>>` with TTI.
69///   If a location_id hasn't been queried within `CACHE_LOCATION_TTI`, the entire entry is evicted.
70///   Individual bundles within an entry may also be stale (live bundles have a per-bundle
71///   `expires_at`; sealed bundles are never individually stale).
72/// - `inflight`: `Cache<Id, ()>` with 30-second TTL — tracks locations for which a
73///   `CacheRequestToken` has been issued but the client hasn't uploaded yet.
74pub struct PostBundleCache {
75    max_originators_per_location: usize,
76    bundles: Cache<Id, Arc<Mutex<CachedPostBundleLocationEntry>>>,
77    inflight: Cache<Id, ()>,
78}
79
80impl PostBundleCache {
81    pub fn new(max_originators_per_location: usize, max_bytes: u64, time_provider: Arc<dyn TimeProvider>) -> Self {
82        // Drive moka's TTI/TTL from our TimeProvider (scaled in tests) rather than wall time.
83        let clock = Arc::new(TimeProviderMokaClock::new(time_provider));
84
85        let bundles = Cache::builder()
86            .weigher(|_key: &Id, entry: &Arc<Mutex<CachedPostBundleLocationEntry>>| {
87                entry.lock().map(|e| e.weight()).unwrap_or(POST_BUNDLE_PLACEHOLDER_WEIGHT)
88            })
89            .max_capacity(max_bytes)
90            .time_to_idle(CACHE_LOCATION_TTI)
91            .external_clock(clock.clone())
92            .build();
93
94        let inflight = Cache::builder()
95            .time_to_live(CACHE_REQUEST_TOKEN_TTL_DURATION)
96            .external_clock(clock)
97            .build();
98
99        Self { max_originators_per_location, bundles, inflight }
100    }
101
102    /// Called by the dispatch handler when serving a `GetPostBundleV1` request.
103    ///
104    /// - `bucket_location` — used as the cache key (`location_id`) and included in any token issued.
105    /// - `already_retrieved_peer_ids` — originator IDs the client already has; filtered out of `cached_items`.
106    /// - `peer_self` / `server_id` — used to sign the `CacheRequestToken` if one is issued.
107    /// - `now` — current time.
108    pub fn on_get(
109        &self,
110        bucket_location: &BucketLocation,
111        already_retrieved_peer_ids: &[Id],
112        peer_self: &Peer,
113        server_id: &ServerId,
114        now: TimeMillis,
115    ) -> GetCacheResult {
116        let location_id = bucket_location.location_id;
117        let entry_arc = self.bundles.get_with(location_id, || Arc::new(Mutex::new(CachedPostBundleLocationEntry::placeholder())));
118
119        let (cached_items, already_cached_peer_ids, should_issue_token) = {
120            let mut entry = entry_arc.lock().unwrap();
121            entry.hit_count += 1;
122
123            let already_retrieved_set: std::collections::HashSet<Id> = already_retrieved_peer_ids.iter().copied().collect();
124            let cached_items: Vec<Bytes> = entry.bundles
125                .iter()
126                .filter(|(originator_id, bundle)| !already_retrieved_set.contains(originator_id) && !bundle.is_stale(now))
127                .map(|(_, bundle)| bundle.bytes.clone())
128                .collect();
129
130            let already_cached_peer_ids: Vec<Id> = entry.bundles.keys().copied().collect();
131            let should_issue_token = entry.hit_count >= CACHE_HIT_THRESHOLD && !self.inflight.contains_key(&location_id);
132            (cached_items, already_cached_peer_ids, should_issue_token)
133        };
134
135        let cache_request_token = if should_issue_token {
136            self.inflight.insert(location_id, ());
137            let expires_at = now + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS;
138            Some(CacheRequestTokenV1::new(peer_self.clone(), bucket_location.clone(), expires_at, already_cached_peer_ids, &server_id.keys.signature_key))
139        } else {
140            None
141        };
142
143        GetCacheResult { cached_items, cache_request_token }
144    }
145
146    /// Called by the dispatch handler when a `CachePostBundleV1` upload arrives.
147    /// The token must have been verified and expiry-checked by the caller.
148    /// Returns `true` if accepted, `false` if the entry was evicted before the upload arrived.
149    pub fn on_upload(
150        &self,
151        location_id: Id,
152        originator_peer_id: Id,
153        bundle_bytes: Bytes,
154        server_time: TimeMillis,
155        is_sealed: bool,
156    ) -> bool {
157        let entry_arc = match self.bundles.get(&location_id) {
158            Some(e) => e,
159            None => return false,   // Entry evicted between token issuance and upload — reject.
160        };
161
162        let mut entry = entry_arc.lock().unwrap();
163        let expires_at = if is_sealed { None } else { Some(server_time + MILLIS_IN_MINUTE.const_mul(5)) };
164        let bundle = CachedBundle { bytes: bundle_bytes, expires_at };
165
166        // Insert (or update) the new originator
167        entry.bundles.insert(originator_peer_id, bundle);
168
169        // If over capacity, evict the worst entry: furthest from location_id,
170        // breaking ties by expires_at (stalest loses). This may evict the entry
171        // we just inserted if it's the worst — that's correct.
172        // This will also prevent sybils trying to insert garbage cache items - they would have to control the location_id
173        while entry.bundles.len() > self.max_originators_per_location {
174            let evict_key = entry.bundles
175                .iter()
176                .min_by(|(id_a, bundle_a), (id_b, bundle_b)| {
177                    let distance_a = leading_agreement_bits_xor(id_a.as_ref(), location_id.as_ref());
178                    let distance_b = leading_agreement_bits_xor(id_b.as_ref(), location_id.as_ref());
179                    distance_a.cmp(&distance_b).then_with(|| {
180                        let expires_a = bundle_a.expires_at.unwrap_or(TimeMillis(i64::MAX));
181                        let expires_b = bundle_b.expires_at.unwrap_or(TimeMillis(i64::MAX));
182                        expires_a.cmp(&expires_b)
183                    })
184                })
185                .map(|(id, _)| *id);
186            if let Some(k) = evict_key {
187                entry.bundles.remove(&k);
188            }
189        }
190
191        // Return whether our insertion survived eviction
192        entry.bundles.contains_key(&originator_peer_id)
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use bytes::Bytes;
200    use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
201    use hashiverse_lib::tools::server_id::ServerId;
202    use hashiverse_lib::tools::time::TimeMillis;
203    use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
204    use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
205    use hashiverse_lib::tools::types::{Id, Pow};
206
207    async fn make_test_server_and_peer() -> anyhow::Result<(ServerId, hashiverse_lib::protocol::peer::Peer)> {
208        let time_provider = RealTimeProvider;
209        let pow_generator = SingleThreadedPowGenerator::new();
210        let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
211        let peer = server_id.to_peer(&time_provider)?;
212        Ok((server_id, peer))
213    }
214
215    fn make_test_bucket_location() -> BucketLocation {
216        BucketLocation::new(BucketType::User, Id::random(), BUCKET_DURATIONS[0], TimeMillis(1_000_000)).unwrap()
217    }
218
219    #[tokio::test]
220    async fn test_below_threshold_no_token() -> anyhow::Result<()> {
221        let (server_id, peer_self) = make_test_server_and_peer().await?;
222        let cache = PostBundleCache::new(5, 64 * 1024 * 1024, Arc::new(RealTimeProvider));
223        let bucket_location = make_test_bucket_location();
224        let now = TimeMillis(1_000_000);
225
226        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
227            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
228            assert!(result.cache_request_token.is_none());
229            assert!(result.cached_items.is_empty());
230        }
231
232        Ok(())
233    }
234
235    #[tokio::test]
236    async fn test_at_threshold_token_issued_then_deduplicated() -> anyhow::Result<()> {
237        let (server_id, peer_self) = make_test_server_and_peer().await?;
238        let cache = PostBundleCache::new(5, 64 * 1024 * 1024, Arc::new(RealTimeProvider));
239        let bucket_location = make_test_bucket_location();
240        let now = TimeMillis(1_000_000);
241
242        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
243            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
244            assert!(result.cache_request_token.is_none());
245        }
246
247        // The threshold-th call issues a token
248        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
249        assert!(result.cache_request_token.is_some());
250
251        // Subsequent calls must NOT double-issue (inflight dedupe)
252        let result2 = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
253        assert!(result2.cache_request_token.is_none());
254
255        Ok(())
256    }
257
258    #[tokio::test]
259    async fn test_upload_and_retrieval() -> anyhow::Result<()> {
260        let (server_id, peer_self) = make_test_server_and_peer().await?;
261        let cache = PostBundleCache::new(5, 64 * 1024 * 1024, Arc::new(RealTimeProvider));
262        let bucket_location = make_test_bucket_location();
263        let location_id = bucket_location.location_id;
264        let now = TimeMillis(1_000_000);
265        let originator_id = Id::random();
266        let bundle_bytes = Bytes::from_static(b"test_bundle");
267
268        // Register the placeholder entry via on_get
269        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
270
271        let accepted = cache.on_upload(location_id, originator_id, bundle_bytes.clone(), now, false);
272        assert!(accepted);
273
274        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
275        assert_eq!(result.cached_items, vec![bundle_bytes]);
276
277        Ok(())
278    }
279
280    #[tokio::test]
281    async fn test_already_retrieved_filtered() -> anyhow::Result<()> {
282        let (server_id, peer_self) = make_test_server_and_peer().await?;
283        let cache = PostBundleCache::new(5, 64 * 1024 * 1024, Arc::new(RealTimeProvider));
284        let bucket_location = make_test_bucket_location();
285        let location_id = bucket_location.location_id;
286        let now = TimeMillis(1_000_000);
287        let originator_id = Id::random();
288
289        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
290        cache.on_upload(location_id, originator_id, Bytes::from_static(b"bundle"), now, false);
291
292        let result = cache.on_get(&bucket_location, &[originator_id], &peer_self, &server_id, now);
293        assert!(result.cached_items.is_empty());
294
295        Ok(())
296    }
297
298    #[tokio::test]
299    async fn test_upload_returns_false_when_not_in_cache() -> anyhow::Result<()> {
300        let cache = PostBundleCache::new(5, 64 * 1024 * 1024, Arc::new(RealTimeProvider));
301        let location_id = Id::random();
302        let originator_id = Id::random();
303
304        // No on_get call — entry was never inserted — upload must be rejected
305        let accepted = cache.on_upload(location_id, originator_id, Bytes::from_static(b"bundle"), TimeMillis(1_000_000), false);
306        assert!(!accepted);
307
308        Ok(())
309    }
310
311    /// When more originators are uploaded than `max_originators_per_location`, the cache keeps the
312    /// ones *closest* to the location_id (highest leading-agreement-bits) and evicts the furthest.
313    #[tokio::test]
314    async fn test_overflow_keeps_closest_originators() -> anyhow::Result<()> {
315        let (server_id, peer_self) = make_test_server_and_peer().await?;
316        let cache = PostBundleCache::new(3, 64 * 1024 * 1024, Arc::new(RealTimeProvider)); // keep at most 3
317        let bucket_location = make_test_bucket_location();
318        let location_id = bucket_location.location_id;
319        let now = TimeMillis(1_000_000);
320
321        // Register the placeholder entry so uploads are accepted.
322        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
323
324        // Flipping bit `p` of the location_id yields an originator whose leading-agreement-bits
325        // with the location_id is exactly `p` — i.e. a controllable XOR distance.
326        let originator_at = |flip_bit: usize| -> Id {
327            let mut bytes = location_id.0;
328            bytes[flip_bit / 8] ^= 1 << (7 - (flip_bit % 8));
329            Id(bytes)
330        };
331
332        // Agreements 20,40,60,80,100. With a cap of 3 the three closest (60,80,100) must survive.
333        for &p in &[20usize, 40, 60, 80, 100] {
334            let bytes = Bytes::from(format!("bundle-agreement-{}", p));
335            // sealed => never individually stale, so on_get returns every survivor.
336            cache.on_upload(location_id, originator_at(p), bytes, now, true);
337        }
338
339        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
340        let cached: std::collections::HashSet<Vec<u8>> = result.cached_items.iter().map(|b| b.to_vec()).collect();
341        assert_eq!(3, cached.len(), "cache must keep exactly max_originators_per_location entries");
342        for &p in &[60usize, 80, 100] {
343            assert!(cached.contains(format!("bundle-agreement-{}", p).as_bytes()), "closest originator (agreement {}) must be kept", p);
344        }
345        for &p in &[20usize, 40] {
346            assert!(!cached.contains(format!("bundle-agreement-{}", p).as_bytes()), "furthest originator (agreement {}) must be evicted", p);
347        }
348        Ok(())
349    }
350
351    /// The `CacheRequestToken` the server issues carries a TTL-bounded expiry — exactly the window
352    /// the `CachePostBundleV1` handler enforces via `token.is_expired(now)`. This is the
353    /// deterministic counterpart to that handler check (the integration tests can't exercise the
354    /// real RPC upload reliably because, under the scaled clock, the 30s TTL is only ~33ms of real
355    /// time and the upload RPC can outlive it — the very race that made the cache tests flaky).
356    /// (Token signature/PoW verification needs a fully-PoW'd identity and is covered by the
357    /// end-to-end `test_caching_spreads_via_client_fetches`.)
358    #[tokio::test]
359    async fn test_cache_request_token_expiry() -> anyhow::Result<()> {
360        let (server_id, peer_self) = make_test_server_and_peer().await?;
361        let cache = PostBundleCache::new(5, 64 * 1024 * 1024, Arc::new(RealTimeProvider));
362        let bucket_location = make_test_bucket_location();
363        let now = TimeMillis(1_000_000);
364
365        // Drive the hit threshold so the server issues a token.
366        let mut token = None;
367        for _ in 0..CACHE_HIT_THRESHOLD {
368            token = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now).cache_request_token.or(token);
369        }
370        let token = token.expect("server issues a token at the hit threshold");
371
372        assert!(!token.is_expired(now), "token must be valid at issue time");
373        assert!(!token.is_expired(TimeMillis(now.0 + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS.0 - 1)), "token must be valid just before its TTL elapses");
374        assert!(token.is_expired(TimeMillis(now.0 + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS.0 + 1)), "token must be expired once its TTL has elapsed");
375
376        Ok(())
377    }
378}