Skip to main content

hashiverse_server_lib/server/
post_bundle_feedback_caching.rs

1//! # Post-bundle feedback read cache
2//!
3//! Sister of [`crate::server::post_bundle_caching`] for the feedback layer.
4//! One key difference: feedback is pre-merged by the client before it arrives at
5//! the server, so the cache stores a single canonical entry per `location_id`
6//! instead of one per originator. This also means the cached value is typically
7//! much smaller (~8 KB placeholder weight vs. the post-bundle cache's 4 MB), so
8//! more entries fit into the same memory budget.
9//!
10//! Cache-request token issuance and the hit-count heuristic work identically to
11//! the post-bundle cache, so hot feedback gets gossiped out in the same way hot
12//! bundles do.
13
14use bytes::Bytes;
15use hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1;
16use hashiverse_lib::protocol::peer::Peer;
17use hashiverse_lib::tools::buckets::BucketLocation;
18use hashiverse_lib::tools::server_id::ServerId;
19use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE};
20use hashiverse_lib::tools::time_provider::moka_clock::TimeProviderMokaClock;
21use hashiverse_lib::tools::time_provider::time_provider::TimeProvider;
22use hashiverse_lib::tools::types::Id;
23use moka::sync::Cache;
24use std::sync::{Arc, Mutex};
25
26use crate::server::post_bundle_caching_shared::{CachedBundle, GetCacheResult, CACHE_HIT_THRESHOLD, CACHE_LOCATION_TTI, CACHE_REQUEST_TOKEN_TTL_DURATION, CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS};
27
28/// Placeholder weight for feedback entries — approximates a merged feedback bundle.
29/// Ballpark: 20 posts × 10 feedbacks × 50 bytes each ≈ 10 KB.
30const POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT: u32 = 8 * 1024;
31
32// --------------------------------------------------------------------------------------------
33// CachedPostBundleFeedbackLocationEntry
34// --------------------------------------------------------------------------------------------
35
36/// Per-location entry for feedback.
37/// Only one canonical merged result is stored — the client already merges feedback across
38/// servers before uploading, so multiple originator versions are not meaningful here.
39struct CachedPostBundleFeedbackLocationEntry {
40    /// The single merged feedback bundle, together with the originator's peer_id.
41    bundle: Option<(Id, CachedBundle)>,
42    hit_count: u32,
43}
44
45impl CachedPostBundleFeedbackLocationEntry {
46    fn placeholder() -> Self {
47        Self { bundle: None, hit_count: 0 }
48    }
49
50    fn weight(&self) -> u32 {
51        self.bundle.as_ref().map(|(_, b)| b.bytes.len() as u32).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
52    }
53}
54
55// --------------------------------------------------------------------------------------------
56// PostBundleFeedbackCache
57// --------------------------------------------------------------------------------------------
58
59/// Intermediate-server cache for `EncodedPostBundleFeedbackV1` data.
60///
61/// Two Moka caches:
62/// - `bundles`: weighted `Cache<Id, Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>>` with TTI.
63///   If a location_id hasn't been queried within `CACHE_LOCATION_TTI`, the entry is evicted.
64/// - `inflight`: `Cache<Id, ()>` with 30-second TTL — tracks pending upload tokens.
65pub struct PostBundleFeedbackCache {
66    bundles: Cache<Id, Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>>,
67    inflight: Cache<Id, ()>,
68}
69
70impl PostBundleFeedbackCache {
71    pub fn new(max_bytes: u64, time_provider: Arc<dyn TimeProvider>) -> Self {
72        // Drive moka's TTI/TTL from our TimeProvider (scaled in tests) rather than wall time.
73        let clock = Arc::new(TimeProviderMokaClock::new(time_provider));
74
75        let bundles = Cache::builder()
76            .weigher(|_key: &Id, entry: &Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>| {
77                entry.lock().map(|e| e.weight()).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
78            })
79            .max_capacity(max_bytes)
80            .time_to_idle(CACHE_LOCATION_TTI)
81            .external_clock(clock.clone())
82            .build();
83
84        let inflight = Cache::builder()
85            .time_to_live(CACHE_REQUEST_TOKEN_TTL_DURATION)
86            .external_clock(clock)
87            .build();
88
89        Self { bundles, inflight }
90    }
91
92    /// Called by the dispatch handler when serving a `GetPostBundleFeedbackV1` request.
93    pub fn on_get(
94        &self,
95        bucket_location: &BucketLocation,
96        already_retrieved_peer_ids: &[Id],
97        peer_self: &Peer,
98        server_id: &ServerId,
99        now: TimeMillis,
100    ) -> GetCacheResult {
101        let location_id = bucket_location.location_id;
102        let entry_arc = self.bundles.get_with(location_id, || Arc::new(Mutex::new(CachedPostBundleFeedbackLocationEntry::placeholder())));
103
104        let (cached_items, already_cached_peer_ids, should_issue_token) = {
105            let mut entry = entry_arc.lock().unwrap();
106            entry.hit_count += 1;
107
108            let cached_items: Vec<Bytes> = entry.bundle
109                .iter()
110                .filter(|(originator_id, bundle)| !already_retrieved_peer_ids.contains(originator_id) && !bundle.is_stale(now))
111                .map(|(_, bundle)| bundle.bytes.clone())
112                .collect();
113
114            let already_cached_peer_ids: Vec<Id> = entry.bundle.iter().map(|(id, _)| *id).collect();
115            let should_issue_token = entry.hit_count >= CACHE_HIT_THRESHOLD && !self.inflight.contains_key(&location_id);
116            (cached_items, already_cached_peer_ids, should_issue_token)
117        };
118
119        let cache_request_token = if should_issue_token {
120            self.inflight.insert(location_id, ());
121            let expires_at = now + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS;
122            Some(CacheRequestTokenV1::new(peer_self.clone(), bucket_location.clone(), expires_at, already_cached_peer_ids, &server_id.keys.signature_key))
123        } else {
124            None
125        };
126
127        GetCacheResult { cached_items, cache_request_token }
128    }
129
130    /// Called by the dispatch handler when a `CachePostBundleFeedbackV1` upload arrives.
131    /// Unconditionally replaces any previously cached feedback — the uploader has the merged best.
132    /// Returns `true` if accepted, `false` if the entry was evicted before the upload arrived.
133    pub fn on_upload(
134        &self,
135        location_id: Id,
136        originator_peer_id: Id,
137        feedback_bytes: Bytes,
138        server_time: TimeMillis,
139        is_sealed: bool,
140    ) -> bool {
141        let entry_arc = match self.bundles.get(&location_id) {
142            Some(e) => e,
143            None => return false,
144        };
145
146        let mut entry = entry_arc.lock().unwrap();
147        let expires_at = if is_sealed { None } else { Some(server_time + MILLIS_IN_MINUTE.const_mul(5)) };
148        entry.bundle = Some((originator_peer_id, CachedBundle { bytes: feedback_bytes, expires_at }));
149        true
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use bytes::Bytes;
157    use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
158    use hashiverse_lib::tools::server_id::ServerId;
159    use hashiverse_lib::tools::time::TimeMillis;
160    use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
161    use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
162    use hashiverse_lib::tools::types::{Id, Pow};
163
164    async fn make_test_server_and_peer() -> anyhow::Result<(ServerId, Peer)> {
165        let time_provider = RealTimeProvider;
166        let pow_generator = SingleThreadedPowGenerator::new();
167        let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
168        let peer = server_id.to_peer(&time_provider)?;
169        Ok((server_id, peer))
170    }
171
172    fn make_test_bucket_location() -> BucketLocation {
173        BucketLocation::new(BucketType::User, Id::random(), BUCKET_DURATIONS[0], TimeMillis(1_000_000)).unwrap()
174    }
175
176    #[tokio::test]
177    async fn test_below_threshold_no_token() -> anyhow::Result<()> {
178        let (server_id, peer_self) = make_test_server_and_peer().await?;
179        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
180        let bucket_location = make_test_bucket_location();
181        let now = TimeMillis(1_000_000);
182
183        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
184            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
185            assert!(result.cache_request_token.is_none());
186            assert!(result.cached_items.is_empty());
187        }
188
189        Ok(())
190    }
191
192    #[tokio::test]
193    async fn test_at_threshold_token_issued_then_deduplicated() -> anyhow::Result<()> {
194        let (server_id, peer_self) = make_test_server_and_peer().await?;
195        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
196        let bucket_location = make_test_bucket_location();
197        let now = TimeMillis(1_000_000);
198
199        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
200            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
201            assert!(result.cache_request_token.is_none());
202        }
203
204        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
205        assert!(result.cache_request_token.is_some());
206
207        let result2 = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
208        assert!(result2.cache_request_token.is_none());
209
210        Ok(())
211    }
212
213    #[tokio::test]
214    async fn test_upload_and_retrieval() -> anyhow::Result<()> {
215        let (server_id, peer_self) = make_test_server_and_peer().await?;
216        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
217        let bucket_location = make_test_bucket_location();
218        let location_id = bucket_location.location_id;
219        let now = TimeMillis(1_000_000);
220        let originator_id = Id::random();
221        let feedback_bytes = Bytes::from_static(b"test_feedback");
222
223        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
224
225        let accepted = cache.on_upload(location_id, originator_id, feedback_bytes.clone(), now, false);
226        assert!(accepted);
227
228        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
229        assert_eq!(result.cached_items, vec![feedback_bytes]);
230
231        Ok(())
232    }
233
234    #[tokio::test]
235    async fn test_already_retrieved_filtered() -> anyhow::Result<()> {
236        let (server_id, peer_self) = make_test_server_and_peer().await?;
237        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
238        let bucket_location = make_test_bucket_location();
239        let location_id = bucket_location.location_id;
240        let now = TimeMillis(1_000_000);
241        let originator_id = Id::random();
242
243        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
244        cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), now, false);
245
246        let result = cache.on_get(&bucket_location, &[originator_id], &peer_self, &server_id, now);
247        assert!(result.cached_items.is_empty());
248
249        Ok(())
250    }
251
252    #[tokio::test]
253    async fn test_upload_returns_false_when_not_in_cache() -> anyhow::Result<()> {
254        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
255        let location_id = Id::random();
256        let originator_id = Id::random();
257
258        let accepted = cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), TimeMillis(1_000_000), false);
259        assert!(!accepted);
260
261        Ok(())
262    }
263}