hashiverse_server_lib/server/
post_bundle_feedback_caching.rs1use bytes::Bytes;
15use hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1;
16use hashiverse_lib::protocol::peer::Peer;
17use hashiverse_lib::tools::buckets::BucketLocation;
18use hashiverse_lib::tools::server_id::ServerId;
19use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE};
20use hashiverse_lib::tools::time_provider::moka_clock::TimeProviderMokaClock;
21use hashiverse_lib::tools::time_provider::time_provider::TimeProvider;
22use hashiverse_lib::tools::types::Id;
23use moka::sync::Cache;
24use std::sync::{Arc, Mutex};
25
26use crate::server::post_bundle_caching_shared::{CachedBundle, GetCacheResult, CACHE_HIT_THRESHOLD, CACHE_LOCATION_TTI, CACHE_REQUEST_TOKEN_TTL_DURATION, CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS};
27
28const POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT: u32 = 8 * 1024;
31
32struct CachedPostBundleFeedbackLocationEntry {
40 bundle: Option<(Id, CachedBundle)>,
42 hit_count: u32,
43}
44
45impl CachedPostBundleFeedbackLocationEntry {
46 fn placeholder() -> Self {
47 Self { bundle: None, hit_count: 0 }
48 }
49
50 fn weight(&self) -> u32 {
51 self.bundle.as_ref().map(|(_, b)| b.bytes.len() as u32).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
52 }
53}
54
55pub struct PostBundleFeedbackCache {
66 bundles: Cache<Id, Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>>,
67 inflight: Cache<Id, ()>,
68}
69
70impl PostBundleFeedbackCache {
71 pub fn new(max_bytes: u64, time_provider: Arc<dyn TimeProvider>) -> Self {
72 let clock = Arc::new(TimeProviderMokaClock::new(time_provider));
74
75 let bundles = Cache::builder()
76 .weigher(|_key: &Id, entry: &Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>| {
77 entry.lock().map(|e| e.weight()).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
78 })
79 .max_capacity(max_bytes)
80 .time_to_idle(CACHE_LOCATION_TTI)
81 .external_clock(clock.clone())
82 .build();
83
84 let inflight = Cache::builder()
85 .time_to_live(CACHE_REQUEST_TOKEN_TTL_DURATION)
86 .external_clock(clock)
87 .build();
88
89 Self { bundles, inflight }
90 }
91
92 pub fn on_get(
94 &self,
95 bucket_location: &BucketLocation,
96 already_retrieved_peer_ids: &[Id],
97 peer_self: &Peer,
98 server_id: &ServerId,
99 now: TimeMillis,
100 ) -> GetCacheResult {
101 let location_id = bucket_location.location_id;
102 let entry_arc = self.bundles.get_with(location_id, || Arc::new(Mutex::new(CachedPostBundleFeedbackLocationEntry::placeholder())));
103
104 let (cached_items, already_cached_peer_ids, should_issue_token) = {
105 let mut entry = entry_arc.lock().unwrap();
106 entry.hit_count += 1;
107
108 let cached_items: Vec<Bytes> = entry.bundle
109 .iter()
110 .filter(|(originator_id, bundle)| !already_retrieved_peer_ids.contains(originator_id) && !bundle.is_stale(now))
111 .map(|(_, bundle)| bundle.bytes.clone())
112 .collect();
113
114 let already_cached_peer_ids: Vec<Id> = entry.bundle.iter().map(|(id, _)| *id).collect();
115 let should_issue_token = entry.hit_count >= CACHE_HIT_THRESHOLD && !self.inflight.contains_key(&location_id);
116 (cached_items, already_cached_peer_ids, should_issue_token)
117 };
118
119 let cache_request_token = if should_issue_token {
120 self.inflight.insert(location_id, ());
121 let expires_at = now + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS;
122 Some(CacheRequestTokenV1::new(peer_self.clone(), bucket_location.clone(), expires_at, already_cached_peer_ids, &server_id.keys.signature_key))
123 } else {
124 None
125 };
126
127 GetCacheResult { cached_items, cache_request_token }
128 }
129
130 pub fn on_upload(
134 &self,
135 location_id: Id,
136 originator_peer_id: Id,
137 feedback_bytes: Bytes,
138 server_time: TimeMillis,
139 is_sealed: bool,
140 ) -> bool {
141 let entry_arc = match self.bundles.get(&location_id) {
142 Some(e) => e,
143 None => return false,
144 };
145
146 let mut entry = entry_arc.lock().unwrap();
147 let expires_at = if is_sealed { None } else { Some(server_time + MILLIS_IN_MINUTE.const_mul(5)) };
148 entry.bundle = Some((originator_peer_id, CachedBundle { bytes: feedback_bytes, expires_at }));
149 true
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use bytes::Bytes;
157 use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
158 use hashiverse_lib::tools::server_id::ServerId;
159 use hashiverse_lib::tools::time::TimeMillis;
160 use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
161 use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
162 use hashiverse_lib::tools::types::{Id, Pow};
163
164 async fn make_test_server_and_peer() -> anyhow::Result<(ServerId, Peer)> {
165 let time_provider = RealTimeProvider;
166 let pow_generator = SingleThreadedPowGenerator::new();
167 let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
168 let peer = server_id.to_peer(&time_provider)?;
169 Ok((server_id, peer))
170 }
171
172 fn make_test_bucket_location() -> BucketLocation {
173 BucketLocation::new(BucketType::User, Id::random(), BUCKET_DURATIONS[0], TimeMillis(1_000_000)).unwrap()
174 }
175
176 #[tokio::test]
177 async fn test_below_threshold_no_token() -> anyhow::Result<()> {
178 let (server_id, peer_self) = make_test_server_and_peer().await?;
179 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
180 let bucket_location = make_test_bucket_location();
181 let now = TimeMillis(1_000_000);
182
183 for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
184 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
185 assert!(result.cache_request_token.is_none());
186 assert!(result.cached_items.is_empty());
187 }
188
189 Ok(())
190 }
191
192 #[tokio::test]
193 async fn test_at_threshold_token_issued_then_deduplicated() -> anyhow::Result<()> {
194 let (server_id, peer_self) = make_test_server_and_peer().await?;
195 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
196 let bucket_location = make_test_bucket_location();
197 let now = TimeMillis(1_000_000);
198
199 for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
200 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
201 assert!(result.cache_request_token.is_none());
202 }
203
204 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
205 assert!(result.cache_request_token.is_some());
206
207 let result2 = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
208 assert!(result2.cache_request_token.is_none());
209
210 Ok(())
211 }
212
213 #[tokio::test]
214 async fn test_upload_and_retrieval() -> anyhow::Result<()> {
215 let (server_id, peer_self) = make_test_server_and_peer().await?;
216 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
217 let bucket_location = make_test_bucket_location();
218 let location_id = bucket_location.location_id;
219 let now = TimeMillis(1_000_000);
220 let originator_id = Id::random();
221 let feedback_bytes = Bytes::from_static(b"test_feedback");
222
223 cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
224
225 let accepted = cache.on_upload(location_id, originator_id, feedback_bytes.clone(), now, false);
226 assert!(accepted);
227
228 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
229 assert_eq!(result.cached_items, vec![feedback_bytes]);
230
231 Ok(())
232 }
233
234 #[tokio::test]
235 async fn test_already_retrieved_filtered() -> anyhow::Result<()> {
236 let (server_id, peer_self) = make_test_server_and_peer().await?;
237 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
238 let bucket_location = make_test_bucket_location();
239 let location_id = bucket_location.location_id;
240 let now = TimeMillis(1_000_000);
241 let originator_id = Id::random();
242
243 cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
244 cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), now, false);
245
246 let result = cache.on_get(&bucket_location, &[originator_id], &peer_self, &server_id, now);
247 assert!(result.cached_items.is_empty());
248
249 Ok(())
250 }
251
252 #[tokio::test]
253 async fn test_upload_returns_false_when_not_in_cache() -> anyhow::Result<()> {
254 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024, Arc::new(RealTimeProvider));
255 let location_id = Id::random();
256 let originator_id = Id::random();
257
258 let accepted = cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), TimeMillis(1_000_000), false);
259 assert!(!accepted);
260
261 Ok(())
262 }
263}