Skip to main content

hashiverse_lib/client/timeline/
single_timeline.rs

1//! # A single stateful timeline cursor
2//!
3//! `SingleTimeline` walks one feed (a specific user's posts, a specific hashtag, the
4//! mentions of a given client, …) backwards through time, yielding pages of posts via
5//! `get_more_posts()`. It owns:
6//!
7//! - a [`crate::client::timeline::recursive_bucket_visitor::RecursiveBucketVisitor`]
8//!   that walks the hierarchical bucket ladder for the timeline's `(BucketType, base_id)`;
9//! - a `post_ids_already_seen` set that deduplicates across overlapping buckets and
10//!   across pages (important for Sequel / ReplyToPost timelines where posts can surface
11//!   from multiple places);
12//! - a [`crate::client::timeline::recent_posts_pen::RecentPostsPen`] reference so the
13//!   user's own very recent posts appear instantly instead of waiting for the next
14//!   DHT read.
15//!
16//! Timelines never reach an end — the walk just keeps stepping further back in time on
17//! demand (a hard-earned invariant: manually triggered "fetch more" must always produce
18//! *something*, even if it has to go a year further back to find it).
19
20use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
21use crate::client::timeline::recent_posts_pen::RecentPostsPen;
22use crate::client::timeline::recursive_bucket_visitor::{RecursiveBucketVisitor, RecursiveBucketVisitorCloseCallbackResult, RecursiveBucketVisitorOpenCallbackResult};
23use crate::tools::buckets::{BucketLocation, BucketType};
24use crate::tools::time::{DurationMillis, TimeMillis, MILLIS_IN_MONTH};
25use crate::tools::types::Id;
26use bytes::Bytes;
27use std::collections::HashSet;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30
31/// A cursor that walks one feed backward through time, yielding encoded posts page by page.
32///
33/// Every feed in hashiverse — a user's profile, a hashtag, a mention stream, a reply thread
34/// — is represented on the wire as a sequence of time-bucketed post bundles keyed by a
35/// single [`Id`] (the "base id" of the feed) and a [`BucketType`]. `SingleTimeline` is the
36/// client-side state machine that reads such a feed: given a wall-clock `time_millis` and a
37/// desired page size, each call to `get_more_posts` walks to the next-older bucket via
38/// [`RecursiveBucketVisitor`], pulls the bundle through a [`PostBundleManager`], and emits
39/// the posts in reverse chronological order while deduplicating against `post_ids_already_seen`.
40///
41/// It also reads from the shared [`RecentPostsPen`] so posts the local user has just
42/// authored appear at the top of the feed before they have propagated through the network.
43/// For aggregated feeds (following multiple people, multiple hashtags) see `MultipleTimeline`,
44/// which composes several `SingleTimeline`s.
45pub struct SingleTimeline {
46    bucket_type: BucketType,
47    base_id: Id,
48    post_bundle_manager: Arc<dyn PostBundleManager>,
49    recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
50    oldest_allowed_post_bundle_time_millis: TimeMillis,
51    oldest_processed_post_bundle_time_millis: TimeMillis,
52    post_ids_already_seen: HashSet<Id>,
53}
54
55impl SingleTimeline {
56    pub fn new(bucket_type: BucketType, location_id: &Id, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
57        Self {
58            bucket_type,
59            base_id: *location_id,
60            post_bundle_manager,
61            recent_posts_pen,
62            oldest_allowed_post_bundle_time_millis: TimeMillis::MAX,
63            oldest_processed_post_bundle_time_millis: TimeMillis::MAX,
64            post_ids_already_seen: HashSet::new(),
65        }
66    }
67
68    pub fn bucket_type(&self) -> BucketType {
69        self.bucket_type
70    }
71
72    pub fn base_id(&self) -> Id {
73        self.base_id
74    }
75
76    pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes, bool)>> {
77        let mut encoded_posts = Vec::new();
78
79        if TimeMillis::MAX == self.oldest_allowed_post_bundle_time_millis {
80            self.oldest_allowed_post_bundle_time_millis = time_millis;
81        }
82        if TimeMillis::MAX == self.oldest_processed_post_bundle_time_millis {
83            self.oldest_processed_post_bundle_time_millis = time_millis;
84        }
85
86        let time_millis_max = time_millis;
87        let time_millis_min = self.oldest_allowed_post_bundle_time_millis - MILLIS_IN_MONTH;
88
89        RecursiveBucketVisitor::visit(
90            time_millis_max,
91            time_millis_min,
92            bucket_durations,
93            // on_bucket_open
94            &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
95                // log::info!(
96                //     "open {}: bucket_time_millis: {}, bucket_duration_millis: {}, time_millis_min: {}",
97                //     self.base_id, bucket_time_millis, bucket_duration_millis, time_millis_min
98                // );
99
100                // Keep pushing back the oldest time we have queried - so that future calls are allowed to go back even further than that
101                self.oldest_allowed_post_bundle_time_millis = self.oldest_allowed_post_bundle_time_millis.min(bucket_time_millis);
102
103                // Get location's postbundle
104                let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
105                let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
106
107                // Do we keep going deeper?
108                match encoded_post_bundle.header.overflowed {
109                    true => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren),
110                    false => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithoutChildren),
111                }
112            },
113            // on_bucket_close
114            &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
115                // log::info!("close {}: bucket_time_millis: {}, bucket_duration_millis: {}", self.base_id, bucket_time_millis, bucket_duration_millis);
116
117                // Keep pushing back the oldest time we have processed
118                self.oldest_processed_post_bundle_time_millis = self.oldest_processed_post_bundle_time_millis.min(bucket_time_millis);
119
120                // Get location's postbundle
121                let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
122                let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
123
124                // Extract all the posts we have not seen before (some post bundles may add posts over time if they are not yet sealed)
125                let mut extraction_start_i = 0;
126                for i in 0..(encoded_post_bundle.header.num_posts as usize) {
127                    let extraction_end_i = extraction_start_i + encoded_post_bundle.header.encoded_post_lengths[i];
128                    let encoded_post_id = encoded_post_bundle.header.encoded_post_ids[i];
129                    if self.post_ids_already_seen.insert(encoded_post_id) {
130                        let encoded_post = encoded_post_bundle.encoded_posts_bytes.slice(extraction_start_i..extraction_end_i);
131                        let healed = encoded_post_bundle.header.encoded_post_healed.contains(&encoded_post_id);
132                        encoded_posts.push((bucket_location.clone(), encoded_post, healed));
133                    }
134                    extraction_start_i = extraction_end_i;
135                }
136
137                // Do we have enough posts this round?
138                match encoded_posts.len() < max_posts {
139                    true => Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
140                    false => Ok(RecursiveBucketVisitorCloseCallbackResult::Stop),
141                }
142            },
143        )
144        .await?;
145
146        // Consult the recent posts pen for any posts we submitted that haven't yet appeared from the network
147        let pen_posts = self.recent_posts_pen.write().await.get_matching_posts(self.bucket_type, &self.base_id, &self.post_ids_already_seen, time_millis);
148        for (bucket_location, encoded_post_bytes, post_id) in pen_posts {
149            if self.post_ids_already_seen.insert(post_id) {
150                encoded_posts.push((bucket_location, encoded_post_bytes, false));
151            }
152        }
153
154        Ok(encoded_posts)
155    }
156
157    pub fn post_count(&self) -> usize {
158        self.post_ids_already_seen.len()
159    }
160
161    pub fn oldest_allowed_post_bundle_time_millis(&self) -> TimeMillis {
162        self.oldest_allowed_post_bundle_time_millis
163    }
164
165    pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
166        self.oldest_processed_post_bundle_time_millis
167    }
168}
169
170#[cfg(test)]
171pub mod tests {
172    use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
173    use crate::client::timeline::recent_posts_pen::RecentPostsPen;
174    use crate::client::timeline::single_timeline::SingleTimeline;
175    use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
176    use crate::tools::time::{TimeMillis, MILLIS_IN_DAY, MILLIS_IN_MONTH, MILLIS_IN_WEEK};
177    use crate::tools::types::Id;
178    use log::info;
179    use std::sync::Arc;
180    use tokio::sync::RwLock;
181
182    fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
183        Arc::new(RwLock::new(RecentPostsPen::new()))
184    }
185
186    #[tokio::test]
187    async fn timeline_single_test() -> anyhow::Result<()> {
188        // configure_logging();
189
190        let id = Id::random();
191        let stub_post_bundle_manager = StubPostBundleManager::default();
192        {
193            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
194        }
195
196        let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
197        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
198        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 5, &BUCKET_DURATIONS[1..]).await?;
199        assert_eq!(posts.len(), 23);
200        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
201        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
202
203        Ok(())
204    }
205
206    #[tokio::test]
207    async fn timeline_multiple_test() -> anyhow::Result<()> {
208        // configure_logging();
209
210        let id = Id::random();
211        let stub_post_bundle_manager = StubPostBundleManager::default();
212        {
213            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 27)?;
214            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1W", 17)?;
215            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 6)?;
216            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 22)?;
217        }
218
219        let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
220        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
221        info!("--- FETCH 1 -----------------------");
222        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
223        assert_eq!(posts.len(), 22);
224        assert_eq!(single_timeline.post_count(), 22);
225        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
226        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("3W")?);
227        info!("--- FETCH 2 -----------------------");
228        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
229        assert_eq!(posts.len(), 23);
230        assert_eq!(single_timeline.post_count(), 22 + 23);
231        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
232        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1W")?);
233        info!("--- FETCH 3 -----------------------");
234        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
235        assert_eq!(posts.len(), 27);
236        assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
237        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
238        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
239        info!("--- FETCH 4 -----------------------");
240        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
241        assert_eq!(posts.len(), 0);
242        assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
243        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
244        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
245
246        Ok(())
247    }
248
249    #[tokio::test]
250    async fn timeline_multiple_months_test() -> anyhow::Result<()> {
251        // configure_logging();
252
253        let id = Id::random();
254        let stub_post_bundle_manager = StubPostBundleManager::default();
255        {
256            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
257            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 5)?;
258            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 16)?;
259            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1M1D", 29)?;
260            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M2W", 21)?;
261            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_DAY, "1M2W", 7)?;
262            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M3W", 6)?;
263        }
264
265        let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
266        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
267        info!("--- FETCH 1 -----------------------");
268        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
269        assert_eq!(posts.len(), 34);
270        assert_eq!(single_timeline.post_count(), 34);
271        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
272        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M2W")?);
273        info!("--- FETCH 2 -----------------------");
274        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
275        assert_eq!(posts.len(), 29);
276        assert_eq!(single_timeline.post_count(), 34 + 29);
277        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
278        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
279        info!("--- FETCH 3 -----------------------");
280        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
281        assert_eq!(posts.len(), 21);
282        assert_eq!(single_timeline.post_count(), 34 + 29 + 21);
283        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
284        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("2W")?);
285        info!("--- FETCH 4 -----------------------");
286        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
287        assert_eq!(posts.len(), 23);
288        assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
289        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
290        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
291        info!("--- FETCH 5 -----------------------");
292        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
293        assert_eq!(posts.len(), 0);
294        assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
295        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
296        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
297        info!("--- FETCH 6 -----------------------");
298        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
299        assert_eq!(posts.len(), 0);
300        assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
301        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
302        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
303
304        Ok(())
305    }
306
307    #[tokio::test]
308    async fn timeline_bundle_update_test() -> anyhow::Result<()> {
309        // configure_logging();
310
311        // Note that each time we do the request, the "distance back" it has searched should be increasing by 1 month because we never actually get 20 posts...
312
313        let id = Id::random();
314        let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
315        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager.clone(), empty_pen());
316
317        info!("--- FETCH -----------------------");
318        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
319        assert_eq!(posts.len(), 0);
320        assert_eq!(single_timeline.post_count(), 0);
321        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
322        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
323
324        let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 12)?;
325
326        info!("--- FETCH -----------------------");
327        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
328        assert_eq!(posts.len(), 12);
329        assert_eq!(single_timeline.post_count(), 12);
330        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
331        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
332
333        // Add a post to the old bundle - as if someone has posted since we last checked
334        post_bundle.header.num_posts += 1;
335        post_bundle.header.encoded_post_ids.push(Id::random());
336        post_bundle.header.encoded_post_lengths.push(0);
337        stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
338
339        info!("--- FETCH -----------------------");
340        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
341        assert_eq!(posts.len(), 1);
342        assert_eq!(single_timeline.post_count(), 13);
343        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
344        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
345
346        info!("--- FETCH -----------------------");
347        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
348        assert_eq!(posts.len(), 0);
349        assert_eq!(single_timeline.post_count(), 13);
350        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
351        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
352
353        Ok(())
354    }
355
356    #[tokio::test]
357    async fn timeline_healed_flag_is_per_post() -> anyhow::Result<()> {
358        // A bundle marks individual post_ids as healed via header.encoded_post_healed.
359        // get_more_posts must surface that as the third tuple element per post.
360        let id = Id::random();
361        let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
362
363        let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 5)?;
364        let healed_post_id = post_bundle.header.encoded_post_ids[2];
365        post_bundle.header.encoded_post_healed.insert(healed_post_id);
366        stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
367
368        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
369        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..]).await?;
370        assert_eq!(posts.len(), 5);
371
372        let healed_count = posts.iter().filter(|(_, _, healed)| *healed).count();
373        assert_eq!(healed_count, 1, "exactly one post in this bundle is marked healed");
374
375        // Pen-only posts (locally-submitted, no bundle yet) must be reported as not healed.
376        // The existing bundle test already covers the bundle case; here just spot-check the
377        // unhealed posts come through with `false`.
378        let unhealed_count = posts.iter().filter(|(_, _, healed)| !*healed).count();
379        assert_eq!(unhealed_count, 4);
380
381        Ok(())
382    }
383}