Skip to main content

hashiverse_lib/client/
hashiverse_client.rs

1//! # Top-level client API
2//!
3//! The one thing every external consumer of `hashiverse-lib` ends up holding: a
4//! [`HashiverseClient`] instance. The server binary's cache role, the WASM browser client,
5//! the Python wrapper, and the integration-test harness all build on top of this struct.
6//!
7//! `HashiverseClient` wires together the rest of the `client` module — peer tracking,
8//! post/feedback bundle management, timelines, the meta-post system, the key locker — and
9//! exposes the small set of verbs a social-network client actually performs:
10//!
11//! - submit posts, submit feedback, fetch URL previews, fetch trending hashtags;
12//! - walk per-user / per-hashtag / per-mention timelines;
13//! - manage the logged-in account (via [`crate::client::key_locker`]);
14//! - read and update the profile ([`crate::client::meta_post`]).
15//!
16//! Every dependency that varies by environment (clock, network, PoW engine, storage, key
17//! locker) is injected through [`crate::tools::runtime_services::RuntimeServices`] and
18//! pluggable traits so the same struct works unchanged on native, in the browser (WASM),
19//! and in deterministic in-memory integration tests.
20
21use crate::client::args::Args;
22use crate::client::client_storage::client_storage::{ClientStorage, BUCKETS, BUCKET_TRIMS};
23use crate::client::key_locker::key_locker::KeyLocker;
24use crate::client::meta_post::meta_post::MetaPost;
25use crate::client::meta_post::meta_post_manager::MetaPostManager;
26use crate::client::peer_tracker::peer_tracker::PeerTracker;
27use crate::client::post_bundle::live_post_bundle_manager::LivePostBundleManager;
28use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
29use crate::client::post_bundle::posting;
30use crate::client::timeline::recent_posts_pen::RecentPostsPen;
31use crate::client::timeline::single_timeline::SingleTimeline;
32use crate::protocol::posting::encoded_post::EncodedPostV1;
33use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
34use crate::tools::buckets::{bucket_durations_for_type, BucketLocation, BucketType};
35use crate::tools::client_id::ClientId;
36use crate::tools::runtime_services::RuntimeServices;
37use crate::tools::types::Id;
38use bytes::Bytes;
39use log::{error, info, trace, warn};
40use std::sync::Arc;
41use tokio::sync::{RwLock, RwLockWriteGuard};
42use crate::anyhow_assert_eq;
43use crate::client::post_bundle::live_post_bundle_feedback_manager::LivePostBundleFeedbackManager;
44use crate::client::post_bundle::post_bundle_feedback_manager::PostBundleFeedbackManager;
45use crate::client::timeline::multiple_timeline::MultipleTimeline;
46use crate::protocol::payload::payload::{FetchUrlPreviewResponseV1, FetchUrlPreviewV1, PayloadRequestKind, PayloadResponseKind, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostCommitTokenV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1};
47use crate::tools::compression;
48use crate::tools::signing;
49use crate::tools::types::VerificationKey;
50use crate::protocol::peer::Peer;
51use crate::protocol::rpc;
52use crate::tools::config::CLIENT_FEEDBACK_POW_NUMERAIRE;
53use crate::tools::plain_text_post::convert_text_to_hashiverse_html;
54use crate::tools::tools;
55use crate::tools::time::TimeMillis;
56
57/// The top-level client API for participating in a hashiverse network.
58///
59/// `HashiverseClient` is what every external consumer — the server binary's cache role, the
60/// WASM browser client, the Python wrapper, integration tests — builds on top of. It
61/// orchestrates all the subsystems the client side needs:
62///
63/// - a [`PeerTracker`] for Kademlia-style peer discovery and gossip,
64/// - a [`LivePostBundleManager`] / [`LivePostBundleFeedbackManager`] pair for fetching post
65///   bundles and their feedback from the network (with on-disk caching),
66/// - a [`SingleTimeline`] / [`MultipleTimeline`] pair for recursive bucket traversal when
67///   reading a feed,
68/// - a [`MetaPostManager`] for the user's profile / follow list / settings,
69/// - a [`RecentPostsPen`] so freshly-authored posts appear in the user's own timelines
70///   immediately,
71/// - a [`ClientId`] / [`KeyLocker`] pair for the signed identity used on every outbound
72///   action.
73///
74/// All external dependencies — clock, network, PoW — are carried through a single
75/// [`RuntimeServices`], which is what makes the same code run inside the in-memory integration
76/// test harness, on a native server, and in a browser Web Worker without touching any of the
77/// high-level logic above.
78///
79/// Construct one via [`HashiverseClient::new`] and keep a single instance per logged-in
80/// account; the struct is cheap to share (`Arc<Self>`) and its internal state uses `RwLock`s
81/// for concurrent access.
82pub struct HashiverseClient {
83    runtime_services: Arc<RuntimeServices>,
84    client_storage: Arc<dyn ClientStorage>,
85    key_locker: Arc<dyn KeyLocker>,
86    post_bundle_manager: Arc<LivePostBundleManager>,
87    post_bundle_feedback_manager: Arc<LivePostBundleFeedbackManager>,
88    meta_post_manager: MetaPostManager,
89    client_id: ClientId,
90    peer_tracker: Arc<RwLock<PeerTracker>>,
91    recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
92    single_timeline: Arc<RwLock<Option<SingleTimeline>>>,
93    multiple_timeline: Arc<RwLock<Option<MultipleTimeline>>>,
94}
95
96impl HashiverseClient {
97    pub async fn new(runtime_services: Arc<RuntimeServices>, client_storage: Arc<dyn ClientStorage>, key_locker: Arc<dyn KeyLocker>, _args: Args) -> anyhow::Result<Self> {
98        let client_id = key_locker.client_id().clone();
99        info!("client_id={}", client_id);
100
101        let peer_tracker = Arc::new(RwLock::new(PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?));
102        let post_bundle_manager = Arc::new(LivePostBundleManager::new(runtime_services.clone(), client_id.id, client_storage.clone(), peer_tracker.clone()));
103        let post_bundle_feedback_manager = Arc::new(LivePostBundleFeedbackManager::new(runtime_services.clone(), client_id.id, client_storage.clone(), peer_tracker.clone()));
104
105        // Trim the various buckets
106        anyhow_assert_eq!(BUCKETS.len(), BUCKET_TRIMS.len(), "Mismatch in length between BUCKETS and BUCKET_TRIMS");
107        info!("Trimming buckets: {} buckets, {} trims", BUCKETS.len(), BUCKET_TRIMS.len());
108        for i in 0..BUCKETS.len() {
109            let trim = BUCKET_TRIMS[i];
110            if trim > 0 {
111                client_storage.trim(BUCKETS[i], trim).await?;
112            }
113        }
114
115        let meta_post_manager = MetaPostManager::new(runtime_services.clone(), client_storage.clone(), key_locker.clone(), client_id.clone());
116
117        Ok(Self {
118            runtime_services,
119            client_storage,
120            key_locker,
121            post_bundle_manager,
122            post_bundle_feedback_manager,
123            meta_post_manager,
124            client_id,
125            peer_tracker,
126            recent_posts_pen: Arc::new(RwLock::new(RecentPostsPen::new())),
127            single_timeline: Arc::new(RwLock::new(None)),
128            multiple_timeline: Arc::new(RwLock::new(None)),
129        })
130    }
131
132    pub fn client_id(&self) -> &ClientId {
133        &self.client_id
134    }
135
136    pub fn active_pow_jobs(&self) -> Vec<crate::tools::pow_generator::pow_generator::PowJobStatus> {
137        self.runtime_services.pow_generator.active_jobs()
138    }
139
140    /// Whether there is background PoW work happening now, or within the last `within_millis`.
141    /// Drives the GUI "busy" indicator.
142    pub fn is_pow_busy(&self, within_millis: i64) -> bool {
143        self.runtime_services.pow_generator.is_pow_busy(within_millis)
144    }
145
146    pub async fn client_storage_reset(&self) -> anyhow::Result<()> {
147        self.client_storage.reset().await
148    }
149
150    pub async fn submit_post(&self, post: &str) -> Result<(Vec<SubmitPostCommitTokenV1>, (EncodedPostV1, Bytes)), anyhow::Error> {
151        self.submit_post_with_wait(post, true).await
152    }
153
154    /// Submit a post, choosing whether to wait for the full fan-out. With `wait_for_all_submissions
155    /// == false` this returns as soon as the mandatory User-bucket post secures its first commit and
156    /// completes the remaining redundancy and secondary buckets on a background task.
157    pub async fn submit_post_with_wait(&self, post: &str, wait_for_all_submissions: bool) -> Result<(Vec<SubmitPostCommitTokenV1>, (EncodedPostV1, Bytes)), anyhow::Error> {
158        posting::submit_post(
159            self.runtime_services.clone(),
160            &self.client_id,
161            &self.key_locker,
162            self.post_bundle_manager.clone(),
163            self.peer_tracker.clone(),
164            &self.recent_posts_pen,
165            post,
166            wait_for_all_submissions,
167        )
168        .await
169    }
170
171    // ------------------------------------------------------------------
172    // MetaPostManager delegation
173    // ------------------------------------------------------------------
174
175    pub fn meta_post_manager(&self) -> &MetaPostManager {
176        &self.meta_post_manager
177    }
178
179    pub async fn submit_meta_post(&self) -> anyhow::Result<()> {
180        let post_json = self.meta_post_manager.build_meta_post_json().await?;
181        self.submit_post(&post_json).await?;
182        Ok(())
183    }
184
185    pub async fn ensure_meta_post_in_current_bucket(&self) -> anyhow::Result<()> {
186        if self.meta_post_manager.should_auto_publish(self.post_bundle_manager.as_ref()).await? {
187            self.submit_meta_post().await?;
188        }
189        Ok(())
190    }
191
192    pub async fn submit_feedback(&self, bucket_location: BucketLocation, post_id: Id, feedback_type: u8) -> anyhow::Result<()> {
193        info!("submit_feedback: bucket_location={}, post_signature={}, feedback_type={}", bucket_location, post_id, feedback_type);
194
195        // Generate our PoW
196        let (salt, pow, _hash) = EncodedPostFeedbackV1::pow_generate(&post_id, feedback_type, self.runtime_services.pow_generator.as_ref()).await?;
197
198        // Is it better than the best we have seen so far?
199        let post_bundle_location_id = bucket_location.location_id;
200        let post_bundle_feedback = self.post_bundle_feedback_manager.get_post_bundle_feedback(bucket_location.clone(), self.runtime_services.time_provider.current_time_millis()).await?;
201        let pow_best_so_far = post_bundle_feedback.get_post_pow_for_feedback_type(&post_id, feedback_type);
202        if pow <= pow_best_so_far {
203            trace!("skipping feedback submission: pow_best_so_far: {}, pow: {}", pow_best_so_far, pow);
204            return Ok(());
205        }
206
207        // Submit our good work to the servers
208        let encoded_post_feedback = EncodedPostFeedbackV1::new(post_id, feedback_type, salt, pow);
209        let result = posting::post_feedback_to_location(
210            &self.runtime_services, &self.client_id.id, &self.peer_tracker,
211            &bucket_location, &encoded_post_feedback,
212        ).await;
213        if let Err(e) = result {
214            warn!("Failed to feedback to {:?}: {}", post_bundle_location_id, e);
215        }
216
217        Ok(())
218    }
219
220    pub async fn get_post(&self, bucket_location: BucketLocation, post_id: &Id) -> anyhow::Result<(BucketLocation, EncodedPostV1, Bytes, bool)>
221    {
222        let post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, self.runtime_services.time_provider.current_time_millis()).await?;
223
224        let mut offset = 0;
225        for i in 0..(post_bundle.header.num_posts as usize) {
226            let len = post_bundle.header.encoded_post_lengths[i];
227            if post_bundle.header.encoded_post_ids[i] == *post_id {
228                let post_bytes = post_bundle.encoded_posts_bytes.slice(offset..offset + len);
229                let encoded_post = EncodedPostV1::decode_from_bytes(post_bytes.clone(), &bucket_location.base_id, true, true)?;
230                let healed = post_bundle.header.encoded_post_healed.contains(post_id);
231                return Ok((bucket_location, encoded_post, post_bytes, healed));
232            }
233            offset += len;
234        }
235
236        anyhow::bail!("Post {} not found in bundle {}", post_id, bucket_location.location_id)
237    }
238
239    pub async fn get_post_feedbacks(&self, bucket_location: BucketLocation, post_id: Id) -> anyhow::Result<[u64; 256]>
240    {
241        let mut post_feedbacks = [0u64; 256];
242
243        let post_bundle_feedback = self.post_bundle_feedback_manager.get_post_bundle_feedback(bucket_location, self.runtime_services.time_provider.current_time_millis()).await?;
244        let post_pows = post_bundle_feedback.get_post_pows(&post_id);
245        for (i, &pow) in post_pows.iter().enumerate() {
246            if pow.0 > 0 {
247                let statistical_attempts = 1u64.checked_shl(pow.0 as u32).unwrap_or(u64::MAX);
248                post_feedbacks[i] = statistical_attempts / CLIENT_FEEDBACK_POW_NUMERAIRE as u64;
249            }
250            // pow=0 is the sentinel for "no feedback submitted" — leave as 0
251        }
252
253        Ok(post_feedbacks)
254    }
255
256
257    async fn post_process_timeline_posts(&self, encoded_posts_bytes: Vec<(BucketLocation, Bytes, bool)>) -> anyhow::Result<Vec<(BucketLocation, EncodedPostV1, Bytes, bool)>> {
258        let mut encoded_posts = Vec::new();
259        for (bucket_location, encoded_post_bytes, healed) in encoded_posts_bytes {
260            let result = try {
261                let encoded_post = EncodedPostV1::decode_from_bytes(encoded_post_bytes.clone(), &bucket_location.base_id, true, true)?;
262                let meta_post = MetaPost::try_parse_meta_post(&encoded_post.post)?;
263                match meta_post {
264                    MetaPost::None => encoded_posts.push((bucket_location, encoded_post, encoded_post_bytes, healed)),
265                    MetaPost::MetaPostV1(meta_post_v1) => {
266                        let post_client_id = encoded_post.header.client_id()?;
267                        self.meta_post_manager.process_incoming_meta_post(&meta_post_v1, &post_client_id).await?;
268                    }
269                }
270            };
271
272            if let Err(e) = result {
273                warn!("Failed to decode post: {}", e);
274            }
275        }
276
277        Ok(encoded_posts)
278    }
279
280    pub async fn single_timeline_reset(&self) -> anyhow::Result<()> {
281        info!("Resetting single timeline");
282        let mut single_timeline = self.single_timeline.write().await;
283        *single_timeline = None;
284        Ok(())
285    }
286
287    pub async fn single_timeline_lock(&self, bucket_type: BucketType, base_id: &Id) -> anyhow::Result<RwLockWriteGuard<'_, Option<SingleTimeline>>> {
288        let mut single_timeline = self.single_timeline.write().await;
289
290        // If our base_id has changed, blow it away
291        if let Some(single_timeline_instance) = single_timeline.as_ref() {
292            if single_timeline_instance.bucket_type() != bucket_type || single_timeline_instance.base_id() != *base_id {
293                *single_timeline = None;
294            }
295        }
296
297        // Create the single_timeline if necessary
298        if single_timeline.is_none() {
299            trace!("Starting a new SingleTimeline for bucket_type={} base_id={}", bucket_type, base_id);
300            *single_timeline = Some(SingleTimeline::new(bucket_type, base_id, self.post_bundle_manager.clone(), self.recent_posts_pen.clone()));
301        }
302
303        Ok(single_timeline)
304    }
305
306    pub async fn single_timeline_get_more(&self, bucket_type: BucketType, base_id: &Id) -> anyhow::Result<(Vec<(BucketLocation, EncodedPostV1, Bytes, bool)>, TimeMillis)> {
307        trace!("Getting more posts for {}", base_id);
308
309        let mut single_timeline = self.single_timeline_lock(bucket_type, base_id).await?;
310        let single_timeline = single_timeline.as_mut().expect("we have ensured that our SingleTimeline exists");
311        let encoded_posts_bytes = single_timeline.get_more_posts(self.runtime_services.time_provider.current_time_millis(), 20, bucket_durations_for_type(bucket_type)).await?;
312        let oldest_processed_time_millis = single_timeline.oldest_processed_post_bundle_time_millis();
313        let posts = self.post_process_timeline_posts(encoded_posts_bytes).await?;
314        Ok((posts, oldest_processed_time_millis))
315    }
316
317    pub async fn multiple_timeline_reset(&self) -> anyhow::Result<()> {
318        info!("Resetting multiple timeline");
319        let mut multiple_timeline = self.multiple_timeline.write().await;
320        *multiple_timeline = None;
321        Ok(())
322    }
323
324    pub async fn multiple_timeline_lock(&self, bucket_type: BucketType, base_ids: &Vec<Id>) -> anyhow::Result<RwLockWriteGuard<'_, Option<MultipleTimeline>>> {
325        let mut multiple_timeline = self.multiple_timeline.write().await;
326
327        // If our base_ids have changed, blow it away
328        if let Some(multiple_timeline_instance) = multiple_timeline.as_ref() {
329            if multiple_timeline_instance.bucket_type() != bucket_type || multiple_timeline_instance.base_ids() != base_ids {
330                *multiple_timeline = None;
331            }
332        }
333
334        // Create the multiple_timeline if necessary
335        if multiple_timeline.is_none() {
336            trace!("Starting a new MultipleTimeline for base_ids.len()={}", base_ids.len());
337            *multiple_timeline = Some(MultipleTimeline::new(bucket_type, base_ids.clone(), self.post_bundle_manager.clone(), self.recent_posts_pen.clone()));
338        }
339
340        Ok(multiple_timeline)
341    }
342
343    pub async fn multiple_timeline_get_more(&self, bucket_type: BucketType, base_ids: &Vec<Id>) -> anyhow::Result<(Vec<(BucketLocation, EncodedPostV1, Bytes, bool)>, TimeMillis)> {
344        trace!("Getting more posts for base_ids.len()={}", base_ids.len());
345
346        let mut multiple_timeline = self.multiple_timeline_lock(bucket_type, base_ids).await?;
347        let multiple_timeline = multiple_timeline.as_mut().expect("we have ensured that our MultipleTimeline exists");
348        let encoded_posts_bytes = multiple_timeline.get_more_posts(self.runtime_services.time_provider.current_time_millis(), 60, 5, bucket_durations_for_type(bucket_type)).await?;
349        let oldest_processed_time_millis = multiple_timeline.oldest_processed_post_bundle_time_millis();
350        let posts = self.post_process_timeline_posts(encoded_posts_bytes).await?;
351        Ok((posts, oldest_processed_time_millis))
352    }
353
354    async fn get_random_peer(&self) -> anyhow::Result<Peer> {
355        {
356            let peer_tracker = self.peer_tracker.read().await;
357            if !peer_tracker.peers().is_empty() {
358                return Ok(tools::random_element(peer_tracker.peers()).clone());
359            }
360        }
361
362        // No peers — try bootstrapping
363        {
364            let mut peer_tracker = self.peer_tracker.write().await;
365            peer_tracker.bootstrap().await?;
366            anyhow::ensure!(!peer_tracker.peers().is_empty(), "Still no known peers available after bootstrap");
367            Ok(tools::random_element(peer_tracker.peers()).clone())
368        }
369    }
370
371    pub async fn get_all_known_peers(&self) -> Vec<Peer> {
372        self.peer_tracker.read().await.peers().clone()
373    }
374
375    pub async fn fetch_url_preview(&self, url: &str) -> anyhow::Result<FetchUrlPreviewResponseV1> {
376        let peer = self.get_random_peer().await?;
377
378        let payload = FetchUrlPreviewV1::new_to_bytes(url)?;
379        let sponsor_id = self.client_id.id;
380
381        let response = rpc::rpc::rpc_server_known_with_requisite_pow(
382            &self.runtime_services,
383            &sponsor_id,
384            &peer,
385            PayloadRequestKind::FetchUrlPreviewV1,
386            payload,
387            crate::tools::config::POW_MINIMUM_PER_URL_FETCH,
388        ).await?;
389
390        anyhow::ensure!(response.response_request_kind == PayloadResponseKind::FetchUrlPreviewResponseV1, "unexpected response kind: {}", response.response_request_kind);
391        FetchUrlPreviewResponseV1::from_bytes(&response.bytes)
392    }
393
394    /// Issue a `PeerStatsRequestV1` to the peer identified by `peer_id` and return
395    /// the verified, decompressed stats document.
396    ///
397    /// The signature is verified against the response's own `peer` field — callers
398    /// receive either a verified `serde_json::Value` or an error. The doc's schema
399    /// is open-ended; the server decides what fields to include and at what nesting.
400    pub async fn fetch_peer_stats(&self, peer_id: &Id) -> anyhow::Result<serde_json::Value> {
401        let peer = {
402            let peer_tracker = self.peer_tracker.read().await;
403            peer_tracker.peers().iter().find(|peer| &peer.id == peer_id).cloned()
404        };
405        let peer = peer.ok_or_else(|| anyhow::anyhow!("peer not known to this client: {}", peer_id))?;
406
407        let payload = PeerStatsRequestV1 {}.to_bytes()?;
408        let sponsor_id = self.client_id.id;
409
410        let response = rpc::rpc::rpc_server_known_with_requisite_pow(
411            &self.runtime_services,
412            &sponsor_id,
413            &peer,
414            PayloadRequestKind::PeerStatsRequestV1,
415            payload,
416            crate::tools::config::POW_MINIMUM_PER_PEER_STATS,
417        ).await?;
418
419        anyhow::ensure!(response.response_request_kind == PayloadResponseKind::PeerStatsResponseV1, "unexpected response kind: {}", response.response_request_kind);
420        let response = PeerStatsResponseV1::from_bytes(&response.bytes)?;
421
422        let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes)?;
423        let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
424        signing::verify(&verification_key, &response.signature, &signing_input)?;
425
426        let json_bytes = compression::decompress(&response.json_compressed)?.to_bytes();
427        let doc: serde_json::Value = serde_json::from_slice(&json_bytes)?;
428        Ok(doc)
429    }
430
431    pub async fn fetch_trending_hashtags(&self, limit: u16) -> anyhow::Result<TrendingHashtagsFetchResponseV1> {
432        let peer = self.get_random_peer().await?;
433
434        let payload = TrendingHashtagsFetchV1::new_to_bytes(limit)?;
435        let sponsor_id = self.client_id.id;
436
437        let response = rpc::rpc::rpc_server_known(
438            &self.runtime_services,
439            &sponsor_id,
440            &peer,
441            PayloadRequestKind::TrendingHashtagsFetchV1,
442            payload,
443        ).await?;
444
445        anyhow::ensure!(response.response_request_kind == PayloadResponseKind::TrendingHashtagsFetchResponseV1, "unexpected response kind: {}", response.response_request_kind);
446        TrendingHashtagsFetchResponseV1::from_bytes(&response.bytes)
447    }
448
449    pub async fn dispatch_command(&self, command: &String) -> Result<(), anyhow::Error> {
450        let mut command_parts: Vec<String> = command.splitn(2, " ").map(|s| s.to_string()).collect();
451        if command_parts.is_empty() {
452            anyhow::bail!("Command cannot be empty")
453        }
454
455        command_parts[0] = command_parts[0].to_uppercase();
456
457        match command_parts[0].as_str() {
458            // ID
459            "I" => {
460                info!("I am hashiverse_client {}", self.client_id);
461            }
462
463            // POST
464            "P" => {
465                if command_parts.len() < 2 {
466                    anyhow::bail!("Post message cannot be empty")
467                }
468                let post_html = convert_text_to_hashiverse_html(&command_parts[1]);
469                let payload_response = self.submit_post(&post_html).await;
470                match payload_response {
471                    Ok(_) => {
472                        info!("post succeeded");
473                    }
474                    Err(e) => {
475                        error!("post error: {}", e);
476                    }
477                }
478            }
479
480            // MY POSTS
481            "M" => {
482                let encoded_posts = self.single_timeline_get_more(BucketType::User, &self.client_id.id).await;
483                match encoded_posts {
484                    Ok((encoded_posts, _oldest_processed_time_millis)) => {
485                        info!("received {} more posts", encoded_posts.len());
486                        for (bucket_location_id, encoded_post, _raw_bytes, _healed) in encoded_posts {
487                            info!("post: {} {} {}", bucket_location_id, encoded_post.header.time_millis, encoded_post.post);
488                        }
489                    }
490                    Err(e) => {
491                        error!("post error: {}", e);
492                    }
493                }
494            }
495
496            _ => {
497                warn!("unknown command: {}", command);
498            }
499        }
500
501        Ok(())
502    }
503}