Skip to main content

hashiverse_lib/client/
hashiverse_client.rs

1//! # Top-level client API
2//!
3//! The one thing every external consumer of `hashiverse-lib` ends up holding: a
4//! [`HashiverseClient`] instance. The server binary's cache role, the WASM browser client,
5//! the Python wrapper, and the integration-test harness all build on top of this struct.
6//!
7//! `HashiverseClient` wires together the rest of the `client` module — peer tracking,
8//! post/feedback bundle management, timelines, the meta-post system, the key locker — and
9//! exposes the small set of verbs a social-network client actually performs:
10//!
11//! - submit posts, submit feedback, fetch URL previews, fetch trending hashtags;
12//! - walk per-user / per-hashtag / per-mention timelines;
13//! - manage the logged-in account (via [`crate::client::key_locker`]);
14//! - read and update the profile ([`crate::client::meta_post`]).
15//!
16//! Every dependency that varies by environment (clock, network, PoW engine, storage, key
17//! locker) is injected through [`crate::tools::runtime_services::RuntimeServices`] and
18//! pluggable traits so the same struct works unchanged on native, in the browser (WASM),
19//! and in deterministic in-memory integration tests.
20
21use crate::client::args::Args;
22use crate::client::client_storage::client_storage::{ClientStorage, BUCKETS, BUCKET_TRIMS};
23use crate::client::key_locker::key_locker::KeyLocker;
24use crate::client::meta_post::meta_post::MetaPost;
25use crate::client::meta_post::meta_post_manager::MetaPostManager;
26use crate::client::peer_tracker::peer_tracker::PeerTracker;
27use crate::client::post_bundle::live_post_bundle_manager::LivePostBundleManager;
28use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
29use crate::client::post_bundle::posting;
30use crate::client::timeline::recent_posts_pen::RecentPostsPen;
31use crate::client::timeline::single_timeline::SingleTimeline;
32use crate::protocol::posting::encoded_post::EncodedPostV1;
33use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
34use crate::tools::buckets::{bucket_durations_for_type, generate_bucket_location, BucketLocation, BucketType};
35use crate::tools::client_id::ClientId;
36use crate::tools::runtime_services::RuntimeServices;
37use crate::tools::types::Id;
38use bytes::Bytes;
39use log::{error, info, trace, warn};
40use scraper::{Html, Selector};
41use std::sync::Arc;
42use tokio::sync::{RwLock, RwLockWriteGuard};
43use crate::anyhow_assert_eq;
44use crate::client::post_bundle::live_post_bundle_feedback_manager::LivePostBundleFeedbackManager;
45use crate::client::post_bundle::post_bundle_feedback_manager::PostBundleFeedbackManager;
46use crate::client::timeline::multiple_timeline::MultipleTimeline;
47use crate::protocol::payload::payload::{FetchUrlPreviewResponseV1, FetchUrlPreviewV1, PayloadRequestKind, PayloadResponseKind, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostCommitTokenV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1};
48use crate::tools::compression;
49use crate::tools::signing;
50use crate::tools::types::VerificationKey;
51use crate::protocol::peer::Peer;
52use crate::protocol::rpc;
53use crate::tools::config::CLIENT_FEEDBACK_POW_NUMERAIRE;
54use crate::tools::plain_text_post::convert_text_to_hashiverse_html;
55use crate::tools::tools;
56use crate::tools::time::TimeMillis;
57
58/// The top-level client API for participating in a hashiverse network.
59///
60/// `HashiverseClient` is what every external consumer — the server binary's cache role, the
61/// WASM browser client, the Python wrapper, integration tests — builds on top of. It
62/// orchestrates all the subsystems the client side needs:
63///
64/// - a [`PeerTracker`] for Kademlia-style peer discovery and gossip,
65/// - a [`LivePostBundleManager`] / [`LivePostBundleFeedbackManager`] pair for fetching post
66///   bundles and their feedback from the network (with on-disk caching),
67/// - a [`SingleTimeline`] / [`MultipleTimeline`] pair for recursive bucket traversal when
68///   reading a feed,
69/// - a [`MetaPostManager`] for the user's profile / follow list / settings,
70/// - a [`RecentPostsPen`] so freshly-authored posts appear in the user's own timelines
71///   immediately,
72/// - a [`ClientId`] / [`KeyLocker`] pair for the signed identity used on every outbound
73///   action.
74///
75/// All external dependencies — clock, network, PoW — are carried through a single
76/// [`RuntimeServices`], which is what makes the same code run inside the in-memory integration
77/// test harness, on a native server, and in a browser Web Worker without touching any of the
78/// high-level logic above.
79///
80/// Construct one via [`HashiverseClient::new`] and keep a single instance per logged-in
81/// account; the struct is cheap to share (`Arc<Self>`) and its internal state uses `RwLock`s
82/// for concurrent access.
83pub struct HashiverseClient {
84    runtime_services: Arc<RuntimeServices>,
85    client_storage: Arc<dyn ClientStorage>,
86    key_locker: Arc<dyn KeyLocker>,
87    post_bundle_manager: Arc<LivePostBundleManager>,
88    post_bundle_feedback_manager: Arc<LivePostBundleFeedbackManager>,
89    meta_post_manager: MetaPostManager,
90    client_id: ClientId,
91    peer_tracker: Arc<RwLock<PeerTracker>>,
92    recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
93    single_timeline: Arc<RwLock<Option<SingleTimeline>>>,
94    multiple_timeline: Arc<RwLock<Option<MultipleTimeline>>>,
95}
96
97impl HashiverseClient {
98    pub async fn new(runtime_services: Arc<RuntimeServices>, client_storage: Arc<dyn ClientStorage>, key_locker: Arc<dyn KeyLocker>, _args: Args) -> anyhow::Result<Self> {
99        let client_id = key_locker.client_id().clone();
100        info!("client_id={}", client_id);
101
102        let peer_tracker = Arc::new(RwLock::new(PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?));
103        let post_bundle_manager = Arc::new(LivePostBundleManager::new(runtime_services.clone(), client_id.id, client_storage.clone(), peer_tracker.clone()));
104        let post_bundle_feedback_manager = Arc::new(LivePostBundleFeedbackManager::new(runtime_services.clone(), client_id.id, client_storage.clone(), peer_tracker.clone()));
105
106        // Trim the various buckets
107        anyhow_assert_eq!(BUCKETS.len(), BUCKET_TRIMS.len(), "Mismatch in length between BUCKETS and BUCKET_TRIMS");
108        info!("Trimming buckets: {} buckets, {} trims", BUCKETS.len(), BUCKET_TRIMS.len());
109        for i in 0..BUCKETS.len() {
110            let trim = BUCKET_TRIMS[i];
111            if trim > 0 {
112                client_storage.trim(BUCKETS[i], trim).await?;
113            }
114        }
115
116        let meta_post_manager = MetaPostManager::new(runtime_services.clone(), client_storage.clone(), key_locker.clone(), client_id.clone());
117
118        Ok(Self {
119            runtime_services,
120            client_storage,
121            key_locker,
122            post_bundle_manager,
123            post_bundle_feedback_manager,
124            meta_post_manager,
125            client_id,
126            peer_tracker,
127            recent_posts_pen: Arc::new(RwLock::new(RecentPostsPen::new())),
128            single_timeline: Arc::new(RwLock::new(None)),
129            multiple_timeline: Arc::new(RwLock::new(None)),
130        })
131    }
132
133    pub fn client_id(&self) -> &ClientId {
134        &self.client_id
135    }
136
137    pub fn active_pow_jobs(&self) -> Vec<crate::tools::parallel_pow_generator::PowJobStatus> {
138        self.runtime_services.pow_generator.active_jobs()
139    }
140
141    pub async fn client_storage_reset(&self) -> anyhow::Result<()> {
142        self.client_storage.reset().await
143    }
144
145    pub async fn submit_post(&self, post: &str) -> Result<(Vec<SubmitPostCommitTokenV1>, (EncodedPostV1, Bytes)), anyhow::Error> {
146        trace!("submitting post: {}", post);
147
148        if post.is_empty() {
149            anyhow::bail!("Post cannot be empty");
150        }
151
152        let timestamp = self.runtime_services.time_provider.current_time_millis();
153
154        struct LinkedBaseIdDetail {
155            linked_base_id: Id,
156            bucket_type: BucketType,
157            referenced_post_header_bytes: Option<Bytes>,
158        }
159
160        // Parse the post for #hashtags, @mentions, replies, sequels, etc.
161        let mut linked_base_id_details: Vec<LinkedBaseIdDetail> = vec![];
162        let mut referenced_hashtags: Vec<String> = vec![];
163
164        // The original post itself is always present
165        linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: self.client_id.id, bucket_type: BucketType::User, referenced_post_header_bytes: None });
166
167        {
168            let html = Html::parse_fragment(post);
169            {
170                // Returns true if the element is nested inside a <reply> or <repost> — meaning it
171                // belongs to a quoted post and should not be indexed under the new post's buckets.
172                let is_quoted = |element: scraper::ElementRef| -> bool {
173                    let mut node = element.parent();
174                    while let Some(n) = node {
175                        if let Some(el) = scraper::ElementRef::wrap(n) {
176                            if matches!(el.value().name(), "reply" | "repost" | "sequel") { return true; }
177                        }
178                        node = n.parent();
179                    }
180                    false
181                };
182
183                let selector_hashtag = Selector::parse("hashtag").map_err(|e| anyhow::anyhow!("Failed to parse hashtag selector: {}", e))?;
184                for element in html.select(&selector_hashtag) {
185                    if is_quoted(element) { continue; }
186                    if let Some(hashtag) = element.attr("hashtag") {
187                        trace!("hashtag={:?}", hashtag);
188                        referenced_hashtags.push(hashtag.to_string());
189                        linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: Id::from_hashtag_str(hashtag)?, bucket_type: BucketType::Hashtag, referenced_post_header_bytes: None });
190                    } else {
191                        warn!("hashtag attribute not found in element {:?}", element);
192                    }
193                }
194
195                let selector_mention = Selector::parse("mention").map_err(|e| anyhow::anyhow!("Failed to parse mention selector: {}", e))?;
196                for element in html.select(&selector_mention) {
197                    if is_quoted(element) { continue; }
198                    if let Some(client_id_str) = element.attr("client_id") {
199                        match Id::from_hex_str(client_id_str) {
200                            Ok(client_id) => {
201                                trace!("mention_id={:?}", client_id);
202                                linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: client_id, bucket_type: BucketType::Mention, referenced_post_header_bytes: None });
203                            }
204                            Err(e) => warn!("mention_id corrupted in element {:?}:, {}", element, e),
205                        }
206                    } else {
207                        warn!("mention attribute not found in element {:?}", element);
208                    }
209                }
210
211                let selector_reply = Selector::parse("reply").map_err(|e| anyhow::anyhow!("Failed to parse reply selector: {}", e))?;
212                for element in html.select(&selector_reply) {
213                    if is_quoted(element) { continue; }
214                    if let Some(post_id_str) = element.attr("post_id") {
215                        match Id::from_hex_str(post_id_str) {
216                            Ok(post_id) => {
217                                trace!("reply post_id={:?}", post_id);
218                                let referenced_post_header_bytes = element.attr("post_header_hex")
219                                    .and_then(|hex_str| hex::decode(hex_str).ok())
220                                    .map(Bytes::from);
221                                linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::ReplyToPost, referenced_post_header_bytes });
222                            }
223                            Err(e) => warn!("reply post_id corrupted in element {:?}: {}", element, e),
224                        }
225                    } else {
226                        warn!("post_id attribute not found in reply element {:?}", element);
227                    }
228                }
229
230                let selector_sequel = Selector::parse("sequel").map_err(|e| anyhow::anyhow!("Failed to parse sequel selector: {}", e))?;
231                for element in html.select(&selector_sequel) {
232                    if is_quoted(element) { continue; }
233                    if let Some(post_id_str) = element.attr("post_id") {
234                        match Id::from_hex_str(post_id_str) {
235                            Ok(post_id) => {
236                                trace!("sequel post_id={:?}", post_id);
237                                let referenced_post_header_bytes = element.attr("post_header_hex")
238                                    .and_then(|hex_str| hex::decode(hex_str).ok())
239                                    .map(Bytes::from);
240                                linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::Sequel, referenced_post_header_bytes });
241                            }
242                            Err(e) => warn!("sequel post_id corrupted in element {:?}: {}", element, e),
243                        }
244                    } else {
245                        warn!("post_id attribute not found in sequel element {:?}", element);
246                    }
247                }
248            }
249        }
250
251        // Encode the post
252        let linked_base_ids: Vec<Id> = linked_base_id_details.iter().map(|d| d.linked_base_id).collect();
253        let mut encoded_post = EncodedPostV1::new(&self.client_id, timestamp, linked_base_ids, post);
254        let encoded_post_bytes = encoded_post.encode_to_bytes_direct(&self.key_locker).await?;
255
256        // The commit tokens for the User's post
257        let mut post_commit_tokens = Vec::new();
258
259        // Start the post machine
260        for linked_base_id_detail in &linked_base_id_details {
261            trace!("Posting to bucket type: {:?}, linked_base_id: {}", linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id);
262            let try_result = try {
263                // Where are we going to post it?  Start at the widest bucket and work our way narrower till we succeed
264                for &bucket_duration in bucket_durations_for_type(linked_base_id_detail.bucket_type) {
265                    let bucket_location = generate_bucket_location(linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id, bucket_duration, timestamp)?;
266                    info!("checking posting availability of {:?}", bucket_location);
267
268                    let post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, timestamp).await?;
269                    if !post_bundle.header.overflowed && !post_bundle.header.sealed {
270                        info!("Posting to {:?}", bucket_location);
271                        let result = posting::post_to_location(&self.runtime_services, &self.client_id.id, &self.peer_tracker, &bucket_location, &encoded_post, &encoded_post_bytes, linked_base_id_detail.referenced_post_header_bytes.as_deref(), &referenced_hashtags).await;
272                        match result {
273                            Ok(mut result) => {
274                                post_commit_tokens.append(&mut result);
275                                break;
276                            }
277                            Err(e) => {
278                                warn!("Failed to post to {:?}: {}", bucket_location, e);
279                                continue;
280                            }
281                        }
282                    }
283
284                    else {
285                        trace!("no availability: overflowed={} sealed={}", post_bundle.header.overflowed, post_bundle.header.sealed);
286                    }
287                }
288            };
289
290            if let Err(e) = try_result {
291                warn!("Failed to post to any bucket location for linked_base_id {:?}: {}", linked_base_id_detail.linked_base_id, e);
292            }
293
294            // Check that we managed at least one commit by the end of the User bucket (healing should fix the rest).  We can happily continue with a failed hashtag, mention, etc.
295            if linked_base_id_detail.bucket_type == BucketType::User && post_commit_tokens.is_empty() {
296                anyhow::bail!("Failed to post to any User buckets, so bailing,");
297            }
298        }
299
300        let encoded_post_bytes_raw = Bytes::copy_from_slice(encoded_post_bytes.bytes());
301
302        // Record in the recent posts pen so these posts appear immediately in timeline fetches
303        {
304            let bucket_locations_and_post_ids: Vec<_> = post_commit_tokens.iter()
305                .map(|token| (token.bucket_location.clone(), token.post_id))
306                .collect();
307            self.recent_posts_pen.write().await.add_all(&bucket_locations_and_post_ids, encoded_post_bytes_raw.clone(), timestamp);
308        }
309
310        Ok((post_commit_tokens, (encoded_post, encoded_post_bytes_raw)))
311    }
312
313    // ------------------------------------------------------------------
314    // MetaPostManager delegation
315    // ------------------------------------------------------------------
316
317    pub fn meta_post_manager(&self) -> &MetaPostManager {
318        &self.meta_post_manager
319    }
320
321    pub async fn submit_meta_post(&self) -> anyhow::Result<()> {
322        let post_json = self.meta_post_manager.build_meta_post_json().await?;
323        self.submit_post(&post_json).await?;
324        Ok(())
325    }
326
327    pub async fn ensure_meta_post_in_current_bucket(&self) -> anyhow::Result<()> {
328        if self.meta_post_manager.should_auto_publish(self.post_bundle_manager.as_ref()).await? {
329            self.submit_meta_post().await?;
330        }
331        Ok(())
332    }
333
334    pub async fn submit_feedback(&self, bucket_location: BucketLocation, post_id: Id, feedback_type: u8) -> anyhow::Result<()> {
335        info!("submit_feedback: bucket_location={}, post_signature={}, feedback_type={}", bucket_location, post_id, feedback_type);
336
337        // Generate our PoW
338        let (salt, pow, _hash) = EncodedPostFeedbackV1::pow_generate(&post_id, feedback_type, self.runtime_services.pow_generator.as_ref()).await?;
339
340        // Is it better than the best we have seen so far?
341        let post_bundle_location_id = bucket_location.location_id;
342        let post_bundle_feedback = self.post_bundle_feedback_manager.get_post_bundle_feedback(bucket_location.clone(), self.runtime_services.time_provider.current_time_millis()).await?;
343        let pow_best_so_far = post_bundle_feedback.get_post_pow_for_feedback_type(&post_id, feedback_type);
344        if pow <= pow_best_so_far {
345            trace!("skipping feedback submission: pow_best_so_far: {}, pow: {}", pow_best_so_far, pow);
346            return Ok(());
347        }
348
349        // Submit our good work to the servers
350        let encoded_post_feedback = EncodedPostFeedbackV1::new(post_id, feedback_type, salt, pow);
351        let result = posting::post_feedback_to_location(
352            &self.runtime_services, &self.client_id.id, &self.peer_tracker,
353            &bucket_location, &encoded_post_feedback,
354        ).await;
355        if let Err(e) = result {
356            warn!("Failed to feedback to {:?}: {}", post_bundle_location_id, e);
357        }
358
359        Ok(())
360    }
361
362    pub async fn get_post(&self, bucket_location: BucketLocation, post_id: &Id) -> anyhow::Result<(BucketLocation, EncodedPostV1, Bytes, bool)>
363    {
364        let post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, self.runtime_services.time_provider.current_time_millis()).await?;
365
366        let mut offset = 0;
367        for i in 0..(post_bundle.header.num_posts as usize) {
368            let len = post_bundle.header.encoded_post_lengths[i];
369            if post_bundle.header.encoded_post_ids[i] == *post_id {
370                let post_bytes = post_bundle.encoded_posts_bytes.slice(offset..offset + len);
371                let encoded_post = EncodedPostV1::decode_from_bytes(post_bytes.clone(), &bucket_location.base_id, true, true)?;
372                let healed = post_bundle.header.encoded_post_healed.contains(post_id);
373                return Ok((bucket_location, encoded_post, post_bytes, healed));
374            }
375            offset += len;
376        }
377
378        anyhow::bail!("Post {} not found in bundle {}", post_id, bucket_location.location_id)
379    }
380
381    pub async fn get_post_feedbacks(&self, bucket_location: BucketLocation, post_id: Id) -> anyhow::Result<[u64; 256]>
382    {
383        let mut post_feedbacks = [0u64; 256];
384
385        let post_bundle_feedback = self.post_bundle_feedback_manager.get_post_bundle_feedback(bucket_location, self.runtime_services.time_provider.current_time_millis()).await?;
386        let post_pows = post_bundle_feedback.get_post_pows(&post_id);
387        for (i, &pow) in post_pows.iter().enumerate() {
388            if pow.0 > 0 {
389                let statistical_attempts = 1u64.checked_shl(pow.0 as u32).unwrap_or(u64::MAX);
390                post_feedbacks[i] = statistical_attempts / CLIENT_FEEDBACK_POW_NUMERAIRE as u64;
391            }
392            // pow=0 is the sentinel for "no feedback submitted" — leave as 0
393        }
394
395        Ok(post_feedbacks)
396    }
397
398
399    async fn post_process_timeline_posts(&self, encoded_posts_bytes: Vec<(BucketLocation, Bytes, bool)>) -> anyhow::Result<Vec<(BucketLocation, EncodedPostV1, Bytes, bool)>> {
400        let mut encoded_posts = Vec::new();
401        for (bucket_location, encoded_post_bytes, healed) in encoded_posts_bytes {
402            let result = try {
403                let encoded_post = EncodedPostV1::decode_from_bytes(encoded_post_bytes.clone(), &bucket_location.base_id, true, true)?;
404                let meta_post = MetaPost::try_parse_meta_post(&encoded_post.post)?;
405                match meta_post {
406                    MetaPost::None => encoded_posts.push((bucket_location, encoded_post, encoded_post_bytes, healed)),
407                    MetaPost::MetaPostV1(meta_post_v1) => {
408                        let post_client_id = encoded_post.header.client_id()?;
409                        self.meta_post_manager.process_incoming_meta_post(&meta_post_v1, &post_client_id).await?;
410                    }
411                }
412            };
413
414            if let Err(e) = result {
415                warn!("Failed to decode post: {}", e);
416            }
417        }
418
419        Ok(encoded_posts)
420    }
421
422    pub async fn single_timeline_reset(&self) -> anyhow::Result<()> {
423        info!("Resetting single timeline");
424        let mut single_timeline = self.single_timeline.write().await;
425        *single_timeline = None;
426        Ok(())
427    }
428
429    pub async fn single_timeline_lock(&self, bucket_type: BucketType, base_id: &Id) -> anyhow::Result<RwLockWriteGuard<'_, Option<SingleTimeline>>> {
430        let mut single_timeline = self.single_timeline.write().await;
431
432        // If our base_id has changed, blow it away
433        if let Some(single_timeline_instance) = single_timeline.as_ref() {
434            if single_timeline_instance.bucket_type() != bucket_type || single_timeline_instance.base_id() != *base_id {
435                *single_timeline = None;
436            }
437        }
438
439        // Create the single_timeline if necessary
440        if single_timeline.is_none() {
441            trace!("Starting a new SingleTimeline for bucket_type={} base_id={}", bucket_type, base_id);
442            *single_timeline = Some(SingleTimeline::new(bucket_type, base_id, self.post_bundle_manager.clone(), self.recent_posts_pen.clone()));
443        }
444
445        Ok(single_timeline)
446    }
447
448    pub async fn single_timeline_get_more(&self, bucket_type: BucketType, base_id: &Id) -> anyhow::Result<(Vec<(BucketLocation, EncodedPostV1, Bytes, bool)>, TimeMillis)> {
449        trace!("Getting more posts for {}", base_id);
450
451        let mut single_timeline = self.single_timeline_lock(bucket_type, base_id).await?;
452        let single_timeline = single_timeline.as_mut().expect("we have ensured that our SingleTimeline exists");
453        let encoded_posts_bytes = single_timeline.get_more_posts(self.runtime_services.time_provider.current_time_millis(), 20, bucket_durations_for_type(bucket_type)).await?;
454        let oldest_processed_time_millis = single_timeline.oldest_processed_post_bundle_time_millis();
455        let posts = self.post_process_timeline_posts(encoded_posts_bytes).await?;
456        Ok((posts, oldest_processed_time_millis))
457    }
458
459    pub async fn multiple_timeline_reset(&self) -> anyhow::Result<()> {
460        info!("Resetting multiple timeline");
461        let mut multiple_timeline = self.multiple_timeline.write().await;
462        *multiple_timeline = None;
463        Ok(())
464    }
465
466    pub async fn multiple_timeline_lock(&self, bucket_type: BucketType, base_ids: &Vec<Id>) -> anyhow::Result<RwLockWriteGuard<'_, Option<MultipleTimeline>>> {
467        let mut multiple_timeline = self.multiple_timeline.write().await;
468
469        // If our base_ids have changed, blow it away
470        if let Some(multiple_timeline_instance) = multiple_timeline.as_ref() {
471            if multiple_timeline_instance.bucket_type() != bucket_type || multiple_timeline_instance.base_ids() != base_ids {
472                *multiple_timeline = None;
473            }
474        }
475
476        // Create the multiple_timeline if necessary
477        if multiple_timeline.is_none() {
478            trace!("Starting a new MultipleTimeline for base_ids.len()={}", base_ids.len());
479            *multiple_timeline = Some(MultipleTimeline::new(bucket_type, base_ids.clone(), self.post_bundle_manager.clone(), self.recent_posts_pen.clone()));
480        }
481
482        Ok(multiple_timeline)
483    }
484
485    pub async fn multiple_timeline_get_more(&self, bucket_type: BucketType, base_ids: &Vec<Id>) -> anyhow::Result<(Vec<(BucketLocation, EncodedPostV1, Bytes, bool)>, TimeMillis)> {
486        trace!("Getting more posts for base_ids.len()={}", base_ids.len());
487
488        let mut multiple_timeline = self.multiple_timeline_lock(bucket_type, base_ids).await?;
489        let multiple_timeline = multiple_timeline.as_mut().expect("we have ensured that our MultipleTimeline exists");
490        let encoded_posts_bytes = multiple_timeline.get_more_posts(self.runtime_services.time_provider.current_time_millis(), 60, 5, bucket_durations_for_type(bucket_type)).await?;
491        let oldest_processed_time_millis = multiple_timeline.oldest_processed_post_bundle_time_millis();
492        let posts = self.post_process_timeline_posts(encoded_posts_bytes).await?;
493        Ok((posts, oldest_processed_time_millis))
494    }
495
496    async fn get_random_peer(&self) -> anyhow::Result<Peer> {
497        {
498            let peer_tracker = self.peer_tracker.read().await;
499            if !peer_tracker.peers().is_empty() {
500                return Ok(tools::random_element(peer_tracker.peers()).clone());
501            }
502        }
503
504        // No peers — try bootstrapping
505        {
506            let mut peer_tracker = self.peer_tracker.write().await;
507            peer_tracker.bootstrap().await?;
508            anyhow::ensure!(!peer_tracker.peers().is_empty(), "Still no known peers available after bootstrap");
509            Ok(tools::random_element(peer_tracker.peers()).clone())
510        }
511    }
512
513    pub async fn get_all_known_peers(&self) -> Vec<Peer> {
514        self.peer_tracker.read().await.peers().clone()
515    }
516
517    pub async fn fetch_url_preview(&self, url: &str) -> anyhow::Result<FetchUrlPreviewResponseV1> {
518        let peer = self.get_random_peer().await?;
519
520        let payload = FetchUrlPreviewV1::new_to_bytes(url)?;
521        let sponsor_id = self.client_id.id;
522
523        let response = rpc::rpc::rpc_server_known_with_requisite_pow(
524            &self.runtime_services,
525            &sponsor_id,
526            &peer,
527            PayloadRequestKind::FetchUrlPreviewV1,
528            payload,
529            crate::tools::config::POW_MINIMUM_PER_URL_FETCH,
530        ).await?;
531
532        anyhow::ensure!(response.response_request_kind == PayloadResponseKind::FetchUrlPreviewResponseV1, "unexpected response kind: {}", response.response_request_kind);
533        FetchUrlPreviewResponseV1::from_bytes(&response.bytes)
534    }
535
536    /// Issue a `PeerStatsRequestV1` to the peer identified by `peer_id` and return
537    /// the verified, decompressed stats document.
538    ///
539    /// The signature is verified against the response's own `peer` field — callers
540    /// receive either a verified `serde_json::Value` or an error. The doc's schema
541    /// is open-ended; the server decides what fields to include and at what nesting.
542    pub async fn fetch_peer_stats(&self, peer_id: &Id) -> anyhow::Result<serde_json::Value> {
543        let peer = {
544            let peer_tracker = self.peer_tracker.read().await;
545            peer_tracker.peers().iter().find(|peer| &peer.id == peer_id).cloned()
546        };
547        let peer = peer.ok_or_else(|| anyhow::anyhow!("peer not known to this client: {}", peer_id))?;
548
549        let payload = PeerStatsRequestV1 {}.to_bytes()?;
550        let sponsor_id = self.client_id.id;
551
552        let response = rpc::rpc::rpc_server_known_with_requisite_pow(
553            &self.runtime_services,
554            &sponsor_id,
555            &peer,
556            PayloadRequestKind::PeerStatsRequestV1,
557            payload,
558            crate::tools::config::POW_MINIMUM_PER_PEER_STATS,
559        ).await?;
560
561        anyhow::ensure!(response.response_request_kind == PayloadResponseKind::PeerStatsResponseV1, "unexpected response kind: {}", response.response_request_kind);
562        let response = PeerStatsResponseV1::from_bytes(&response.bytes)?;
563
564        let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes)?;
565        let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
566        signing::verify(&verification_key, &response.signature, &signing_input)?;
567
568        let json_bytes = compression::decompress(&response.json_compressed)?.to_bytes();
569        let doc: serde_json::Value = serde_json::from_slice(&json_bytes)?;
570        Ok(doc)
571    }
572
573    pub async fn fetch_trending_hashtags(&self, limit: u16) -> anyhow::Result<TrendingHashtagsFetchResponseV1> {
574        let peer = self.get_random_peer().await?;
575
576        let payload = TrendingHashtagsFetchV1::new_to_bytes(limit)?;
577        let sponsor_id = self.client_id.id;
578
579        let response = rpc::rpc::rpc_server_known(
580            &self.runtime_services,
581            &sponsor_id,
582            &peer,
583            PayloadRequestKind::TrendingHashtagsFetchV1,
584            payload,
585        ).await?;
586
587        anyhow::ensure!(response.response_request_kind == PayloadResponseKind::TrendingHashtagsFetchResponseV1, "unexpected response kind: {}", response.response_request_kind);
588        TrendingHashtagsFetchResponseV1::from_bytes(&response.bytes)
589    }
590
591    pub async fn dispatch_command(&self, command: &String) -> Result<(), anyhow::Error> {
592        let mut command_parts: Vec<String> = command.splitn(2, " ").map(|s| s.to_string()).collect();
593        if command_parts.is_empty() {
594            anyhow::bail!("Command cannot be empty")
595        }
596
597        command_parts[0] = command_parts[0].to_uppercase();
598
599        match command_parts[0].as_str() {
600            // ID
601            "I" => {
602                info!("I am hashiverse_client {}", self.client_id);
603            }
604
605            // POST
606            "P" => {
607                if command_parts.len() < 2 {
608                    anyhow::bail!("Post message cannot be empty")
609                }
610                let post_html = convert_text_to_hashiverse_html(&command_parts[1]);
611                let payload_response = self.submit_post(&post_html).await;
612                match payload_response {
613                    Ok(_) => {
614                        info!("post succeeded");
615                    }
616                    Err(e) => {
617                        error!("post error: {}", e);
618                    }
619                }
620            }
621
622            // MY POSTS
623            "M" => {
624                let encoded_posts = self.single_timeline_get_more(BucketType::User, &self.client_id.id).await;
625                match encoded_posts {
626                    Ok((encoded_posts, _oldest_processed_time_millis)) => {
627                        info!("received {} more posts", encoded_posts.len());
628                        for (bucket_location_id, encoded_post, _raw_bytes, _healed) in encoded_posts {
629                            info!("post: {} {} {}", bucket_location_id, encoded_post.header.time_millis, encoded_post.post);
630                        }
631                    }
632                    Err(e) => {
633                        error!("post error: {}", e);
634                    }
635                }
636            }
637
638            _ => {
639                warn!("unknown command: {}", command);
640            }
641        }
642
643        Ok(())
644    }
645}