1use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
21use crate::client::timeline::recent_posts_pen::RecentPostsPen;
22use crate::client::timeline::recursive_bucket_visitor::{RecursiveBucketVisitor, RecursiveBucketVisitorCloseCallbackResult, RecursiveBucketVisitorOpenCallbackResult};
23use crate::tools::buckets::{BucketLocation, BucketType};
24use crate::tools::time::{DurationMillis, TimeMillis, MILLIS_IN_MONTH};
25use crate::tools::types::Id;
26use bytes::Bytes;
27use std::collections::HashSet;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30
31pub struct SingleTimeline {
46 bucket_type: BucketType,
47 base_id: Id,
48 post_bundle_manager: Arc<dyn PostBundleManager>,
49 recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
50 oldest_allowed_post_bundle_time_millis: TimeMillis,
51 oldest_processed_post_bundle_time_millis: TimeMillis,
52 post_ids_already_seen: HashSet<Id>,
53}
54
55impl SingleTimeline {
56 pub fn new(bucket_type: BucketType, location_id: &Id, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
57 Self {
58 bucket_type,
59 base_id: *location_id,
60 post_bundle_manager,
61 recent_posts_pen,
62 oldest_allowed_post_bundle_time_millis: TimeMillis::MAX,
63 oldest_processed_post_bundle_time_millis: TimeMillis::MAX,
64 post_ids_already_seen: HashSet::new(),
65 }
66 }
67
68 pub fn bucket_type(&self) -> BucketType {
69 self.bucket_type
70 }
71
72 pub fn base_id(&self) -> Id {
73 self.base_id
74 }
75
76 pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes, bool)>> {
77 let mut encoded_posts = Vec::new();
78
79 if TimeMillis::MAX == self.oldest_allowed_post_bundle_time_millis {
80 self.oldest_allowed_post_bundle_time_millis = time_millis;
81 }
82 if TimeMillis::MAX == self.oldest_processed_post_bundle_time_millis {
83 self.oldest_processed_post_bundle_time_millis = time_millis;
84 }
85
86 let time_millis_max = time_millis;
87 let time_millis_min = self.oldest_allowed_post_bundle_time_millis - MILLIS_IN_MONTH;
88
89 RecursiveBucketVisitor::visit(
90 time_millis_max,
91 time_millis_min,
92 bucket_durations,
93 &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
95 self.oldest_allowed_post_bundle_time_millis = self.oldest_allowed_post_bundle_time_millis.min(bucket_time_millis);
102
103 let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
105 let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
106
107 match encoded_post_bundle.header.overflowed {
109 true => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren),
110 false => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithoutChildren),
111 }
112 },
113 &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
115 self.oldest_processed_post_bundle_time_millis = self.oldest_processed_post_bundle_time_millis.min(bucket_time_millis);
119
120 let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
122 let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
123
124 let mut extraction_start_i = 0;
126 for i in 0..(encoded_post_bundle.header.num_posts as usize) {
127 let extraction_end_i = extraction_start_i + encoded_post_bundle.header.encoded_post_lengths[i];
128 let encoded_post_id = encoded_post_bundle.header.encoded_post_ids[i];
129 if self.post_ids_already_seen.insert(encoded_post_id) {
130 let encoded_post = encoded_post_bundle.encoded_posts_bytes.slice(extraction_start_i..extraction_end_i);
131 let healed = encoded_post_bundle.header.encoded_post_healed.contains(&encoded_post_id);
132 encoded_posts.push((bucket_location.clone(), encoded_post, healed));
133 }
134 extraction_start_i = extraction_end_i;
135 }
136
137 match encoded_posts.len() < max_posts {
139 true => Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
140 false => Ok(RecursiveBucketVisitorCloseCallbackResult::Stop),
141 }
142 },
143 )
144 .await?;
145
146 let pen_posts = self.recent_posts_pen.write().await.get_matching_posts(self.bucket_type, &self.base_id, &self.post_ids_already_seen, time_millis);
148 for (bucket_location, encoded_post_bytes, post_id) in pen_posts {
149 if self.post_ids_already_seen.insert(post_id) {
150 encoded_posts.push((bucket_location, encoded_post_bytes, false));
151 }
152 }
153
154 Ok(encoded_posts)
155 }
156
157 pub fn post_count(&self) -> usize {
158 self.post_ids_already_seen.len()
159 }
160
161 pub fn oldest_allowed_post_bundle_time_millis(&self) -> TimeMillis {
162 self.oldest_allowed_post_bundle_time_millis
163 }
164
165 pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
166 self.oldest_processed_post_bundle_time_millis
167 }
168}
169
170#[cfg(test)]
171pub mod tests {
172 use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
173 use crate::client::timeline::recent_posts_pen::RecentPostsPen;
174 use crate::client::timeline::single_timeline::SingleTimeline;
175 use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
176 use crate::tools::time::{TimeMillis, MILLIS_IN_DAY, MILLIS_IN_MONTH, MILLIS_IN_WEEK};
177 use crate::tools::types::Id;
178 use log::info;
179 use std::sync::Arc;
180 use tokio::sync::RwLock;
181
182 fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
183 Arc::new(RwLock::new(RecentPostsPen::new()))
184 }
185
186 #[tokio::test]
187 async fn timeline_single_test() -> anyhow::Result<()> {
188 let id = Id::random();
191 let stub_post_bundle_manager = StubPostBundleManager::default();
192 {
193 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
194 }
195
196 let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
197 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
198 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 5, &BUCKET_DURATIONS[1..]).await?;
199 assert_eq!(posts.len(), 23);
200 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
201 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
202
203 Ok(())
204 }
205
206 #[tokio::test]
207 async fn timeline_multiple_test() -> anyhow::Result<()> {
208 let id = Id::random();
211 let stub_post_bundle_manager = StubPostBundleManager::default();
212 {
213 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 27)?;
214 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1W", 17)?;
215 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 6)?;
216 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 22)?;
217 }
218
219 let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
220 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
221 info!("--- FETCH 1 -----------------------");
222 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
223 assert_eq!(posts.len(), 22);
224 assert_eq!(single_timeline.post_count(), 22);
225 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
226 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("3W")?);
227 info!("--- FETCH 2 -----------------------");
228 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
229 assert_eq!(posts.len(), 23);
230 assert_eq!(single_timeline.post_count(), 22 + 23);
231 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
232 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1W")?);
233 info!("--- FETCH 3 -----------------------");
234 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
235 assert_eq!(posts.len(), 27);
236 assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
237 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
238 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
239 info!("--- FETCH 4 -----------------------");
240 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
241 assert_eq!(posts.len(), 0);
242 assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
243 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
244 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
245
246 Ok(())
247 }
248
249 #[tokio::test]
250 async fn timeline_multiple_months_test() -> anyhow::Result<()> {
251 let id = Id::random();
254 let stub_post_bundle_manager = StubPostBundleManager::default();
255 {
256 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
257 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 5)?;
258 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 16)?;
259 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1M1D", 29)?;
260 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M2W", 21)?;
261 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_DAY, "1M2W", 7)?;
262 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M3W", 6)?;
263 }
264
265 let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
266 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
267 info!("--- FETCH 1 -----------------------");
268 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
269 assert_eq!(posts.len(), 34);
270 assert_eq!(single_timeline.post_count(), 34);
271 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
272 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M2W")?);
273 info!("--- FETCH 2 -----------------------");
274 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
275 assert_eq!(posts.len(), 29);
276 assert_eq!(single_timeline.post_count(), 34 + 29);
277 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
278 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
279 info!("--- FETCH 3 -----------------------");
280 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
281 assert_eq!(posts.len(), 21);
282 assert_eq!(single_timeline.post_count(), 34 + 29 + 21);
283 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
284 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("2W")?);
285 info!("--- FETCH 4 -----------------------");
286 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
287 assert_eq!(posts.len(), 23);
288 assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
289 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
290 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
291 info!("--- FETCH 5 -----------------------");
292 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
293 assert_eq!(posts.len(), 0);
294 assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
295 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
296 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
297 info!("--- FETCH 6 -----------------------");
298 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
299 assert_eq!(posts.len(), 0);
300 assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
301 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
302 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
303
304 Ok(())
305 }
306
307 #[tokio::test]
308 async fn timeline_bundle_update_test() -> anyhow::Result<()> {
309 let id = Id::random();
314 let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
315 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager.clone(), empty_pen());
316
317 info!("--- FETCH -----------------------");
318 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
319 assert_eq!(posts.len(), 0);
320 assert_eq!(single_timeline.post_count(), 0);
321 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
322 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
323
324 let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 12)?;
325
326 info!("--- FETCH -----------------------");
327 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
328 assert_eq!(posts.len(), 12);
329 assert_eq!(single_timeline.post_count(), 12);
330 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
331 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
332
333 post_bundle.header.num_posts += 1;
335 post_bundle.header.encoded_post_ids.push(Id::random());
336 post_bundle.header.encoded_post_lengths.push(0);
337 stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
338
339 info!("--- FETCH -----------------------");
340 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
341 assert_eq!(posts.len(), 1);
342 assert_eq!(single_timeline.post_count(), 13);
343 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
344 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
345
346 info!("--- FETCH -----------------------");
347 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
348 assert_eq!(posts.len(), 0);
349 assert_eq!(single_timeline.post_count(), 13);
350 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
351 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
352
353 Ok(())
354 }
355
356 #[tokio::test]
357 async fn timeline_healed_flag_is_per_post() -> anyhow::Result<()> {
358 let id = Id::random();
361 let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
362
363 let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 5)?;
364 let healed_post_id = post_bundle.header.encoded_post_ids[2];
365 post_bundle.header.encoded_post_healed.insert(healed_post_id);
366 stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
367
368 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
369 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..]).await?;
370 assert_eq!(posts.len(), 5);
371
372 let healed_count = posts.iter().filter(|(_, _, healed)| *healed).count();
373 assert_eq!(healed_count, 1, "exactly one post in this bundle is marked healed");
374
375 let unhealed_count = posts.iter().filter(|(_, _, healed)| !*healed).count();
379 assert_eq!(unhealed_count, 4);
380
381 Ok(())
382 }
383}