Skip to main content

hashiverse_lib/client/post_bundle/
posting.rs

1//! # Submit-post and submit-feedback client-side orchestration
2//!
3//! Two async functions that the higher-level client API calls when a user posts or
4//! reacts:
5//!
6//! - `post_to_location` — two-phase commit for a new post against the server(s)
7//!   responsible for the target bucket. `SubmitPostClaimV1` first reserves a slot and
8//!   returns a commit token (the server has now done its admission-control work);
9//!   `SubmitPostCommitV1` then uploads the full post bytes. Split into two so a big
10//!   upload can't be aborted halfway without the server having first agreed it wants
11//!   the post.
12//! - `post_feedback_to_location` — smaller single-phase RPC (`SubmitPostFeedbackV1`)
13//!   used for reactions / flags which carry their own PoW but no big payload to
14//!   reserve against.
15
16use crate::anyhow_assert_eq;
17use crate::client::key_locker::key_locker::KeyLocker;
18use crate::client::peer_tracker::peer_tracker::PeerTracker;
19use crate::client::post_bundle::live_post_bundle_manager::LivePostBundleManager;
20use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
21use crate::client::timeline::recent_posts_pen::RecentPostsPen;
22use crate::protocol::posting::encoded_post::{EncodedPostBytesV1, EncodedPostV1};
23use crate::protocol::payload::payload::{PayloadRequestKind, PayloadResponseKind, SubmitPostClaimResponseV1, SubmitPostClaimV1, SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1};
24use crate::protocol::rpc;
25use crate::tools::buckets::{bucket_durations_for_type, generate_bucket_location, BucketLocation, BucketType};
26use crate::tools::client_id::ClientId;
27use crate::tools::time::TimeMillis;
28use crate::tools::tools::spawn_background_task;
29use crate::tools::{config, json};
30use crate::tools::runtime_services::RuntimeServices;
31use bytes::Bytes;
32use futures::channel::mpsc;
33use futures::StreamExt;
34use log::{info, trace, warn};
35use scraper::{Html, Selector};
36use std::collections::HashSet;
37use std::sync::Arc;
38use tokio::sync::RwLock;
39use crate::protocol::posting::amplification;
40use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
41use crate::tools::types::Id;
42
43/// One target the post needs indexing under: the author's own User bucket, plus one per
44/// referenced #hashtag / @mention / reply / sequel parsed out of the post HTML.
45struct LinkedBaseIdDetail {
46    linked_base_id: Id,
47    bucket_type: BucketType,
48    referenced_post_header_bytes: Option<Bytes>,
49}
50
51/// One successful server commit, streamed back from the background poster to the foreground.
52///
53/// The channel only carries successes — a per-server failure isn't reported here because it's
54/// an internal retry (a dead peer just means we walk to the next one, it does not mean the post
55/// failed). Total failure is conveyed by the channel closing with no `User` outcome ever sent.
56struct SubmissionOutcome {
57    bucket_type: BucketType,
58    token: SubmitPostCommitTokenV1,
59}
60
61/// Submit a post to the network.
62///
63/// The parse + encode happen in the foreground, then [`post_to_locations`] is run as a background
64/// task that streams each successful server commit back over a channel. With
65/// `wait_for_all_submissions == true` the foreground drains every outcome before returning (so the
66/// returned token list and recent-posts pen are fully populated); with `false` it returns the
67/// instant the mandatory User-bucket post secures its first commit and lets the background finish
68/// the remaining redundancy and secondary buckets (visible via the PoW job tracker).
69#[allow(clippy::too_many_arguments)] // each arg is a distinct piece of the client state submitting a post; bundling adds indirection without simplifying the call site
70pub async fn submit_post(
71    runtime_services: Arc<RuntimeServices>,
72    client_id: &ClientId,
73    key_locker: &Arc<dyn KeyLocker>,
74    post_bundle_manager: Arc<LivePostBundleManager>,
75    peer_tracker: Arc<RwLock<PeerTracker>>,
76    recent_posts_pen: &Arc<RwLock<RecentPostsPen>>,
77    post: &str,
78    wait_for_all_submissions: bool,
79) -> anyhow::Result<(Vec<SubmitPostCommitTokenV1>, (EncodedPostV1, Bytes))> {
80    trace!("submitting post: {}", post);
81
82    if post.is_empty() {
83        anyhow::bail!("Post cannot be empty");
84    }
85
86    let timestamp = runtime_services.time_provider.current_time_millis();
87
88    // Parse the post for the buckets it should be indexed under (User + #hashtags / @mentions / replies / sequels).
89    let (linked_base_id_details, referenced_hashtags) = build_locations(client_id, post)?;
90
91    // Encode the post
92    let linked_base_ids: Vec<Id> = linked_base_id_details.iter().map(|d| d.linked_base_id).collect();
93    let mut encoded_post = EncodedPostV1::new(client_id, timestamp, linked_base_ids, post);
94    let encoded_post_bytes = encoded_post.encode_to_bytes_direct(key_locker).await?;
95    let encoded_post_bytes_raw = Bytes::copy_from_slice(encoded_post_bytes.bytes());
96
97    // Fan the post out to every target bucket on a background task. It records each successful server
98    // commit in the recent posts pen itself (so committed posts show up immediately in local timelines
99    // regardless of whether we're still listening) and streams the commit tokens back to us over this
100    // channel. It keeps running even after we return.
101    let (outcomes_tx, mut outcomes_rx) = mpsc::unbounded::<SubmissionOutcome>();
102    spawn_background_task(post_to_locations(
103        runtime_services,
104        client_id.clone(),
105        post_bundle_manager,
106        peer_tracker,
107        recent_posts_pen.clone(),
108        linked_base_id_details,
109        encoded_post.clone(),
110        encoded_post_bytes,
111        encoded_post_bytes_raw.clone(),
112        referenced_hashtags,
113        timestamp,
114        outcomes_tx,
115    ));
116
117    // Collect the commit tokens as they land. We must have at least one User-bucket commit (healing
118    // fixes the rest); the channel closing without one is the same hard-fail as before.
119    let mut post_commit_tokens = Vec::new();
120    let mut user_committed = false;
121    while let Some(SubmissionOutcome { bucket_type, token }) = outcomes_rx.next().await {
122        if bucket_type == BucketType::User {
123            user_committed = true;
124        }
125        post_commit_tokens.push(token);
126
127        // Once the User's post is durably committed once we can hand back control and let the background
128        // task complete the redundancy and secondary buckets (and pen those commits as it goes).
129        if !wait_for_all_submissions && user_committed {
130            return Ok((post_commit_tokens, (encoded_post, encoded_post_bytes_raw)));
131        }
132    }
133
134    if !user_committed {
135        anyhow::bail!("Failed to post to any User buckets, so bailing,");
136    }
137
138    Ok((post_commit_tokens, (encoded_post, encoded_post_bytes_raw)))
139}
140
141/// Record a single freshly-committed post in the recent posts pen so it appears immediately in the
142/// author's own (and the relevant hashtag / mention) timelines, before any network fetch.
143async fn record_in_pen(recent_posts_pen: &Arc<RwLock<RecentPostsPen>>, token: &SubmitPostCommitTokenV1, encoded_post_bytes_raw: &Bytes, timestamp: TimeMillis) {
144    recent_posts_pen.write().await.add_all(&[(token.bucket_location.clone(), token.post_id)], encoded_post_bytes_raw.clone(), timestamp);
145}
146
147/// Parse the post HTML into the set of buckets it should be indexed under.
148///
149/// The author's own User bucket is always present; on top of that, each #hashtag / @mention /
150/// reply / sequel referenced in the post (but not those nested inside a quoted reply/repost/sequel)
151/// contributes its own target. Returns the targets together with the list of referenced hashtags
152/// (needed by the claim RPC for hashtag-spam accounting).
153fn build_locations(client_id: &ClientId, post: &str) -> anyhow::Result<(Vec<LinkedBaseIdDetail>, Vec<String>)> {
154    let mut linked_base_id_details: Vec<LinkedBaseIdDetail> = vec![];
155    let mut referenced_hashtags: Vec<String> = vec![];
156
157    // The original post itself is always present
158    linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: client_id.id, bucket_type: BucketType::User, referenced_post_header_bytes: None });
159
160    {
161        let html = Html::parse_fragment(post);
162        {
163            // Returns true if the element is nested inside a <reply> or <repost> — meaning it
164            // belongs to a quoted post and should not be indexed under the new post's buckets.
165            let is_quoted = |element: scraper::ElementRef| -> bool {
166                let mut node = element.parent();
167                while let Some(n) = node {
168                    if let Some(el) = scraper::ElementRef::wrap(n) {
169                        if matches!(el.value().name(), "reply" | "repost" | "sequel") { return true; }
170                    }
171                    node = n.parent();
172                }
173                false
174            };
175
176            let selector_hashtag = Selector::parse("hashtag").map_err(|e| anyhow::anyhow!("Failed to parse hashtag selector: {}", e))?;
177            let mut seen_hashtags: HashSet<String> = HashSet::new();
178            for element in html.select(&selector_hashtag) {
179                if is_quoted(element) { continue; }
180                if let Some(hashtag) = element.attr("hashtag") {
181                    if !seen_hashtags.insert(hashtag.to_string()) { continue; }
182                    trace!("hashtag={:?}", hashtag);
183                    referenced_hashtags.push(hashtag.to_string());
184                    linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: Id::from_hashtag_str(hashtag)?, bucket_type: BucketType::Hashtag, referenced_post_header_bytes: None });
185                } else {
186                    warn!("hashtag attribute not found in element {:?}", element);
187                }
188            }
189
190            let selector_mention = Selector::parse("mention").map_err(|e| anyhow::anyhow!("Failed to parse mention selector: {}", e))?;
191            let mut seen_mentions: HashSet<Id> = HashSet::new();
192            for element in html.select(&selector_mention) {
193                if is_quoted(element) { continue; }
194                if let Some(client_id_str) = element.attr("client_id") {
195                    match Id::from_hex_str(client_id_str) {
196                        Ok(client_id) => {
197                            if !seen_mentions.insert(client_id) { continue; }
198                            trace!("mention_id={:?}", client_id);
199                            linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: client_id, bucket_type: BucketType::Mention, referenced_post_header_bytes: None });
200                        }
201                        Err(e) => warn!("mention_id corrupted in element {:?}:, {}", element, e),
202                    }
203                } else {
204                    warn!("mention attribute not found in element {:?}", element);
205                }
206            }
207
208            let selector_reply = Selector::parse("reply").map_err(|e| anyhow::anyhow!("Failed to parse reply selector: {}", e))?;
209            let mut seen_replies: HashSet<Id> = HashSet::new();
210            for element in html.select(&selector_reply) {
211                if is_quoted(element) { continue; }
212                if let Some(post_id_str) = element.attr("post_id") {
213                    match Id::from_hex_str(post_id_str) {
214                        Ok(post_id) => {
215                            if !seen_replies.insert(post_id) { continue; }
216                            trace!("reply post_id={:?}", post_id);
217                            let referenced_post_header_bytes = element.attr("post_header_hex")
218                                .and_then(|hex_str| hex::decode(hex_str).ok())
219                                .map(Bytes::from);
220                            linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::ReplyToPost, referenced_post_header_bytes });
221                        }
222                        Err(e) => warn!("reply post_id corrupted in element {:?}: {}", element, e),
223                    }
224                } else {
225                    warn!("post_id attribute not found in reply element {:?}", element);
226                }
227            }
228
229            let selector_sequel = Selector::parse("sequel").map_err(|e| anyhow::anyhow!("Failed to parse sequel selector: {}", e))?;
230            let mut seen_sequels: HashSet<Id> = HashSet::new();
231            for element in html.select(&selector_sequel) {
232                if is_quoted(element) { continue; }
233                if let Some(post_id_str) = element.attr("post_id") {
234                    match Id::from_hex_str(post_id_str) {
235                        Ok(post_id) => {
236                            if !seen_sequels.insert(post_id) { continue; }
237                            trace!("sequel post_id={:?}", post_id);
238                            let referenced_post_header_bytes = element.attr("post_header_hex")
239                                .and_then(|hex_str| hex::decode(hex_str).ok())
240                                .map(Bytes::from);
241                            linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::Sequel, referenced_post_header_bytes });
242                        }
243                        Err(e) => warn!("sequel post_id corrupted in element {:?}: {}", element, e),
244                    }
245                } else {
246                    warn!("post_id attribute not found in sequel element {:?}", element);
247                }
248            }
249        }
250    }
251
252    Ok((linked_base_id_details, referenced_hashtags))
253}
254
255/// Post the encoded post into every target bucket (User + #hashtags / @mentions / replies / sequels).
256///
257/// Runs as a background task: it owns its inputs and streams each successful server commit back to
258/// the caller over `outcomes_tx`. For each target it starts at the widest bucket duration and
259/// narrows until a non-overflowed, non-sealed bucket accepts it. The User bucket is mandatory — if
260/// it secures no commit we stop early (dropping `outcomes_tx` closes the channel, which the caller
261/// reads as the hard-fail); secondary buckets are best-effort (a failed hashtag / mention is logged
262/// and skipped, healing reconciles the rest later).
263#[allow(clippy::too_many_arguments)] // each arg is a distinct piece of the post-to-locations operation; bundling adds indirection without simplifying the call site
264async fn post_to_locations(
265    runtime_services: Arc<RuntimeServices>,
266    client_id: ClientId,
267    post_bundle_manager: Arc<LivePostBundleManager>,
268    peer_tracker: Arc<RwLock<PeerTracker>>,
269    recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
270    linked_base_id_details: Vec<LinkedBaseIdDetail>,
271    encoded_post: EncodedPostV1,
272    encoded_post_bytes: EncodedPostBytesV1,
273    encoded_post_bytes_raw: Bytes,
274    referenced_hashtags: Vec<String>,
275    timestamp: TimeMillis,
276    outcomes_tx: mpsc::UnboundedSender<SubmissionOutcome>,
277) {
278    // Start the post machine
279    for linked_base_id_detail in &linked_base_id_details {
280        trace!("Posting to bucket type: {:?}, linked_base_id: {}", linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id);
281        let mut committed_count = 0;
282        let try_result: anyhow::Result<()> = try {
283            // Where are we going to post it?  Start at the widest bucket and work our way narrower till we succeed
284            for &bucket_duration in bucket_durations_for_type(linked_base_id_detail.bucket_type) {
285                let bucket_location = generate_bucket_location(linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id, bucket_duration, timestamp)?;
286                info!("checking posting availability of {:?}", bucket_location);
287
288                let post_bundle = post_bundle_manager.get_post_bundle(&bucket_location, timestamp).await?;
289                if !post_bundle.header.overflowed && !post_bundle.header.sealed {
290                    info!("Posting to {:?}", bucket_location);
291                    let result = post_to_location(&runtime_services, &client_id.id, &peer_tracker, &recent_posts_pen, &bucket_location, &encoded_post, &encoded_post_bytes, &encoded_post_bytes_raw, linked_base_id_detail.referenced_post_header_bytes.as_deref(), &referenced_hashtags, linked_base_id_detail.bucket_type, timestamp, &outcomes_tx).await;
292                    match result {
293                        Ok(result) => {
294                            committed_count += result.len();
295                            break;
296                        }
297                        Err(e) => {
298                            warn!("Failed to post to {:?}: {}", bucket_location, e);
299                            continue;
300                        }
301                    }
302                }
303
304                else {
305                    trace!("no availability: overflowed={} sealed={}", post_bundle.header.overflowed, post_bundle.header.sealed);
306                }
307            }
308        };
309
310        if let Err(e) = try_result {
311            warn!("Failed to post to any bucket location for linked_base_id {:?}: {}", linked_base_id_detail.linked_base_id, e);
312        }
313
314        // Stop if we managed no commit by the end of the User bucket (healing should fix the rest).  We can
315        // happily continue with a failed hashtag, mention, etc.  Returning here drops `outcomes_tx` and closes
316        // the channel, which the caller reads as the User-bucket hard-fail.
317        if linked_base_id_detail.bucket_type == BucketType::User && committed_count == 0 {
318            return;
319        }
320    }
321}
322
323#[allow(clippy::too_many_arguments)] // each arg is a distinct piece of the post-to-location operation; bundling adds indirection without simplifying call sites
324async fn post_to_location(
325    runtime_services: &RuntimeServices,
326    sponsor_id: &Id,
327    peer_tracker: &Arc<RwLock<PeerTracker>>,
328    recent_posts_pen: &Arc<RwLock<RecentPostsPen>>,
329    bucket_location: &BucketLocation,
330    encoded_post: &EncodedPostV1,
331    encoded_post_bytes: &EncodedPostBytesV1,
332    encoded_post_bytes_raw: &Bytes,
333    referenced_post_header_bytes: Option<&[u8]>,
334    referenced_hashtags: &[String],
335    bucket_type: BucketType,
336    timestamp: TimeMillis,
337    outcomes_tx: &mpsc::UnboundedSender<SubmissionOutcome>,
338) -> anyhow::Result<Vec<SubmitPostCommitTokenV1>> {
339    let mut peers_visited = Vec::new();
340    let mut submit_post_claim_tokens = Vec::new();
341    let mut submit_post_commit_tokens = Vec::new();
342
343    let mut peer_tracker = peer_tracker.write().await;
344    let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
345    while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
346        let result: anyhow::Result<()> = try {
347            info!("Requesting SubmitPostClaim with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
348
349            // First we need to get permission to post
350            let request = SubmitPostClaimV1::new_to_bytes(bucket_location, referenced_post_header_bytes, referenced_hashtags, encoded_post_bytes.bytes_without_body())?;
351            let requisite_pow = amplification::get_minimum_post_pow(encoded_post.header.post_length, encoded_post.header.linked_base_ids.len(), bucket_location.duration);
352            let response = rpc::rpc::rpc_server_known_with_requisite_pow_and_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostClaimV1, request, requisite_pow).await?;
353            anyhow_assert_eq!(&PayloadResponseKind::SubmitPostClaimResponseV1, &response.response_request_kind);
354            let response = json::bytes_to_struct::<SubmitPostClaimResponseV1>(&response.bytes)?;
355
356            peers_visited.push(peer.clone());
357
358            peer_iter.add_peers(response.peers_nearer);
359
360            if let Some(submit_post_claim_token) = response.submit_post_claim_token {
361                info!("Received a SubmitPostClaim from peer {}", peer);
362
363                submit_post_claim_tokens.push(submit_post_claim_token.clone());
364
365                // We have permission, so do the post!
366                info!("Posting to peer {}", peer);
367                let request = SubmitPostCommitV1::new_to_bytes(bucket_location, &submit_post_claim_token, encoded_post_bytes.bytes())?;
368                let response = rpc::rpc::rpc_server_known_with_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostCommitV1, request).await?;
369                anyhow_assert_eq!(&PayloadResponseKind::SubmitPostCommitResponseV1, &response.response_request_kind);
370                let response = json::bytes_to_struct::<SubmitPostCommitResponseV1>(&response.bytes)?;
371
372                // Record this commit in the recent posts pen (so it shows up immediately in local
373                // timelines) then stream it back to the submitter. The receiver may have already returned
374                // (the fast path), in which case the send simply no-ops — but the pen is still populated.
375                let submit_post_commit_token = response.submit_post_commit_token;
376                record_in_pen(recent_posts_pen, &submit_post_commit_token, encoded_post_bytes_raw, timestamp).await;
377                let _ = outcomes_tx.unbounded_send(SubmissionOutcome { bucket_type, token: submit_post_commit_token.clone() });
378                submit_post_commit_tokens.push(submit_post_commit_token);
379                if submit_post_commit_tokens.len() >= config::REDUNDANT_SERVERS_PER_POST {
380                    break;
381                }
382            }
383        };
384
385        if let Err(e) = result {
386            warn!("Error retrieving SubmitPostClaim from peer {}: {}", peer, e);
387            peer_iter.remove_peer(&peer);
388        }
389    }
390
391    if submit_post_claim_tokens.is_empty() {
392        anyhow::bail!("Despite expecting availability, we were unable claim anywhere at {}", bucket_location);
393    }
394
395    if submit_post_commit_tokens.is_empty() {
396        anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location);
397    }
398
399    Ok(submit_post_commit_tokens)
400}
401
402pub async fn post_feedback_to_location(
403    runtime_services: &RuntimeServices,
404    sponsor_id: &Id,
405    peer_tracker: &Arc<RwLock<PeerTracker>>,
406    bucket_location: &BucketLocation,
407    encoded_post_feedback: &EncodedPostFeedbackV1,
408) -> anyhow::Result<()> {
409
410    let mut peers_visited = Vec::new();
411    let mut submit_post_feedback_accepts = Vec::new();
412
413    let mut peer_tracker = peer_tracker.write().await;
414    let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
415    while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
416        let result: anyhow::Result<()> = try {
417            info!("Requesting SubmitPostFeedbackV1 with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
418
419            let request = SubmitPostFeedbackV1::new_to_bytes(bucket_location, encoded_post_feedback)?;
420            let response = rpc::rpc::rpc_server_known_with_requisite_pow(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostFeedbackV1, request, config::POW_MINIMUM_PER_FEEDBACK).await?;
421            anyhow_assert_eq!(&PayloadResponseKind::SubmitPostFeedbackResponseV1, &response.response_request_kind);
422            let response = json::bytes_to_struct::<SubmitPostFeedbackResponseV1>(&response.bytes)?;
423
424            peers_visited.push(peer.clone());
425
426            peer_iter.add_peers(response.peers_nearer);
427
428            if response.accepted {
429                info!("Received an accept from peer {}", peer);
430
431                submit_post_feedback_accepts.push(peer.clone());
432                if submit_post_feedback_accepts.len() >= config::REDUNDANT_SERVERS_PER_POST {
433                    break;
434                }
435            }
436        };
437
438        if let Err(e) = result {
439            warn!("Error retrieving SubmitPostFeedbackV1 from peer {}: {}", peer, e);
440            peer_iter.remove_peer(&peer);
441        }
442    }
443
444    if submit_post_feedback_accepts.is_empty() {
445        anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location.location_id);
446    }
447
448    Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use crate::tools::keys::Keys;
455
456    fn test_client_id() -> anyhow::Result<ClientId> {
457        let keys = Keys::from_rnd(false)?;
458        ClientId::new(keys.verification_key_bytes, keys.pq_commitment_bytes)
459    }
460
461    // Two distinct 64-hex-char ids for use as mention / reply / sequel targets.
462    const ID_A: &str = "1111111111111111111111111111111111111111111111111111111111111111";
463    const ID_B: &str = "2222222222222222222222222222222222222222222222222222222222222222";
464
465    fn details_of(details: &[LinkedBaseIdDetail], bucket_type: BucketType) -> Vec<&LinkedBaseIdDetail> {
466        details.iter().filter(|d| d.bucket_type == bucket_type).collect()
467    }
468
469    #[tokio::test]
470    async fn plain_post_yields_only_self_user_bucket() -> anyhow::Result<()> {
471        let client_id = test_client_id()?;
472        let (details, hashtags) = build_locations(&client_id, "just some text with no references")?;
473        assert_eq!(details.len(), 1);
474        assert_eq!(details[0].bucket_type, BucketType::User);
475        assert_eq!(details[0].linked_base_id, client_id.id);
476        assert!(hashtags.is_empty());
477        Ok(())
478    }
479
480    #[tokio::test]
481    async fn repeated_hashtag_is_deduplicated() -> anyhow::Result<()> {
482        let client_id = test_client_id()?;
483        let (details, hashtags) = build_locations(&client_id, r#"<hashtag hashtag="rust"></hashtag> and again <hashtag hashtag="rust"></hashtag>"#)?;
484        assert_eq!(hashtags, vec!["rust".to_string()]);
485        assert_eq!(details_of(&details, BucketType::Hashtag).len(), 1);
486        Ok(())
487    }
488
489    #[tokio::test]
490    async fn repeated_mention_is_deduplicated() -> anyhow::Result<()> {
491        let client_id = test_client_id()?;
492        let post = format!(r#"<mention client_id="{ID_A}"></mention> hi again <mention client_id="{ID_A}"></mention>"#);
493        let (details, _) = build_locations(&client_id, &post)?;
494        assert_eq!(details_of(&details, BucketType::Mention).len(), 1);
495        Ok(())
496    }
497
498    #[tokio::test]
499    async fn distinct_mentions_are_kept() -> anyhow::Result<()> {
500        let client_id = test_client_id()?;
501        let post = format!(r#"<mention client_id="{ID_A}"></mention> <mention client_id="{ID_B}"></mention>"#);
502        let (details, _) = build_locations(&client_id, &post)?;
503        assert_eq!(details_of(&details, BucketType::Mention).len(), 2);
504        Ok(())
505    }
506
507    #[tokio::test]
508    async fn repeated_reply_keeps_first_header_bytes() -> anyhow::Result<()> {
509        let client_id = test_client_id()?;
510        let post = format!(r#"<reply post_id="{ID_A}" post_header_hex="aabb"></reply> <reply post_id="{ID_A}" post_header_hex="ccdd"></reply>"#);
511        let (details, _) = build_locations(&client_id, &post)?;
512        let replies = details_of(&details, BucketType::ReplyToPost);
513        assert_eq!(replies.len(), 1);
514        assert_eq!(replies[0].referenced_post_header_bytes.as_deref(), Some(&[0xaa, 0xbb][..]));
515        Ok(())
516    }
517
518    #[tokio::test]
519    async fn repeated_sequel_is_deduplicated() -> anyhow::Result<()> {
520        let client_id = test_client_id()?;
521        let post = format!(r#"<sequel post_id="{ID_A}" post_header_hex="aabb"></sequel> <sequel post_id="{ID_A}" post_header_hex="ccdd"></sequel>"#);
522        let (details, _) = build_locations(&client_id, &post)?;
523        let sequels = details_of(&details, BucketType::Sequel);
524        assert_eq!(sequels.len(), 1);
525        assert_eq!(sequels[0].referenced_post_header_bytes.as_deref(), Some(&[0xaa, 0xbb][..]));
526        Ok(())
527    }
528
529    #[tokio::test]
530    async fn same_id_as_mention_and_reply_stays_two_details() -> anyhow::Result<()> {
531        let client_id = test_client_id()?;
532        let post = format!(r#"<mention client_id="{ID_A}"></mention> <reply post_id="{ID_A}"></reply>"#);
533        let (details, _) = build_locations(&client_id, &post)?;
534        assert_eq!(details_of(&details, BucketType::Mention).len(), 1);
535        assert_eq!(details_of(&details, BucketType::ReplyToPost).len(), 1);
536        Ok(())
537    }
538}