1use crate::anyhow_assert_eq;
17use crate::client::key_locker::key_locker::KeyLocker;
18use crate::client::peer_tracker::peer_tracker::PeerTracker;
19use crate::client::post_bundle::live_post_bundle_manager::LivePostBundleManager;
20use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
21use crate::client::timeline::recent_posts_pen::RecentPostsPen;
22use crate::protocol::posting::encoded_post::{EncodedPostBytesV1, EncodedPostV1};
23use crate::protocol::payload::payload::{PayloadRequestKind, PayloadResponseKind, SubmitPostClaimResponseV1, SubmitPostClaimV1, SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1};
24use crate::protocol::rpc;
25use crate::tools::buckets::{bucket_durations_for_type, generate_bucket_location, BucketLocation, BucketType};
26use crate::tools::client_id::ClientId;
27use crate::tools::time::TimeMillis;
28use crate::tools::tools::spawn_background_task;
29use crate::tools::{config, json};
30use crate::tools::runtime_services::RuntimeServices;
31use bytes::Bytes;
32use futures::channel::mpsc;
33use futures::StreamExt;
34use log::{info, trace, warn};
35use scraper::{Html, Selector};
36use std::collections::HashSet;
37use std::sync::Arc;
38use tokio::sync::RwLock;
39use crate::protocol::posting::amplification;
40use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
41use crate::tools::types::Id;
42
43struct LinkedBaseIdDetail {
46 linked_base_id: Id,
47 bucket_type: BucketType,
48 referenced_post_header_bytes: Option<Bytes>,
49}
50
51struct SubmissionOutcome {
57 bucket_type: BucketType,
58 token: SubmitPostCommitTokenV1,
59}
60
61#[allow(clippy::too_many_arguments)] pub async fn submit_post(
71 runtime_services: Arc<RuntimeServices>,
72 client_id: &ClientId,
73 key_locker: &Arc<dyn KeyLocker>,
74 post_bundle_manager: Arc<LivePostBundleManager>,
75 peer_tracker: Arc<RwLock<PeerTracker>>,
76 recent_posts_pen: &Arc<RwLock<RecentPostsPen>>,
77 post: &str,
78 wait_for_all_submissions: bool,
79) -> anyhow::Result<(Vec<SubmitPostCommitTokenV1>, (EncodedPostV1, Bytes))> {
80 trace!("submitting post: {}", post);
81
82 if post.is_empty() {
83 anyhow::bail!("Post cannot be empty");
84 }
85
86 let timestamp = runtime_services.time_provider.current_time_millis();
87
88 let (linked_base_id_details, referenced_hashtags) = build_locations(client_id, post)?;
90
91 let linked_base_ids: Vec<Id> = linked_base_id_details.iter().map(|d| d.linked_base_id).collect();
93 let mut encoded_post = EncodedPostV1::new(client_id, timestamp, linked_base_ids, post);
94 let encoded_post_bytes = encoded_post.encode_to_bytes_direct(key_locker).await?;
95 let encoded_post_bytes_raw = Bytes::copy_from_slice(encoded_post_bytes.bytes());
96
97 let (outcomes_tx, mut outcomes_rx) = mpsc::unbounded::<SubmissionOutcome>();
102 spawn_background_task(post_to_locations(
103 runtime_services,
104 client_id.clone(),
105 post_bundle_manager,
106 peer_tracker,
107 recent_posts_pen.clone(),
108 linked_base_id_details,
109 encoded_post.clone(),
110 encoded_post_bytes,
111 encoded_post_bytes_raw.clone(),
112 referenced_hashtags,
113 timestamp,
114 outcomes_tx,
115 ));
116
117 let mut post_commit_tokens = Vec::new();
120 let mut user_committed = false;
121 while let Some(SubmissionOutcome { bucket_type, token }) = outcomes_rx.next().await {
122 if bucket_type == BucketType::User {
123 user_committed = true;
124 }
125 post_commit_tokens.push(token);
126
127 if !wait_for_all_submissions && user_committed {
130 return Ok((post_commit_tokens, (encoded_post, encoded_post_bytes_raw)));
131 }
132 }
133
134 if !user_committed {
135 anyhow::bail!("Failed to post to any User buckets, so bailing,");
136 }
137
138 Ok((post_commit_tokens, (encoded_post, encoded_post_bytes_raw)))
139}
140
141async fn record_in_pen(recent_posts_pen: &Arc<RwLock<RecentPostsPen>>, token: &SubmitPostCommitTokenV1, encoded_post_bytes_raw: &Bytes, timestamp: TimeMillis) {
144 recent_posts_pen.write().await.add_all(&[(token.bucket_location.clone(), token.post_id)], encoded_post_bytes_raw.clone(), timestamp);
145}
146
147fn build_locations(client_id: &ClientId, post: &str) -> anyhow::Result<(Vec<LinkedBaseIdDetail>, Vec<String>)> {
154 let mut linked_base_id_details: Vec<LinkedBaseIdDetail> = vec![];
155 let mut referenced_hashtags: Vec<String> = vec![];
156
157 linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: client_id.id, bucket_type: BucketType::User, referenced_post_header_bytes: None });
159
160 {
161 let html = Html::parse_fragment(post);
162 {
163 let is_quoted = |element: scraper::ElementRef| -> bool {
166 let mut node = element.parent();
167 while let Some(n) = node {
168 if let Some(el) = scraper::ElementRef::wrap(n) {
169 if matches!(el.value().name(), "reply" | "repost" | "sequel") { return true; }
170 }
171 node = n.parent();
172 }
173 false
174 };
175
176 let selector_hashtag = Selector::parse("hashtag").map_err(|e| anyhow::anyhow!("Failed to parse hashtag selector: {}", e))?;
177 let mut seen_hashtags: HashSet<String> = HashSet::new();
178 for element in html.select(&selector_hashtag) {
179 if is_quoted(element) { continue; }
180 if let Some(hashtag) = element.attr("hashtag") {
181 if !seen_hashtags.insert(hashtag.to_string()) { continue; }
182 trace!("hashtag={:?}", hashtag);
183 referenced_hashtags.push(hashtag.to_string());
184 linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: Id::from_hashtag_str(hashtag)?, bucket_type: BucketType::Hashtag, referenced_post_header_bytes: None });
185 } else {
186 warn!("hashtag attribute not found in element {:?}", element);
187 }
188 }
189
190 let selector_mention = Selector::parse("mention").map_err(|e| anyhow::anyhow!("Failed to parse mention selector: {}", e))?;
191 let mut seen_mentions: HashSet<Id> = HashSet::new();
192 for element in html.select(&selector_mention) {
193 if is_quoted(element) { continue; }
194 if let Some(client_id_str) = element.attr("client_id") {
195 match Id::from_hex_str(client_id_str) {
196 Ok(client_id) => {
197 if !seen_mentions.insert(client_id) { continue; }
198 trace!("mention_id={:?}", client_id);
199 linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: client_id, bucket_type: BucketType::Mention, referenced_post_header_bytes: None });
200 }
201 Err(e) => warn!("mention_id corrupted in element {:?}:, {}", element, e),
202 }
203 } else {
204 warn!("mention attribute not found in element {:?}", element);
205 }
206 }
207
208 let selector_reply = Selector::parse("reply").map_err(|e| anyhow::anyhow!("Failed to parse reply selector: {}", e))?;
209 let mut seen_replies: HashSet<Id> = HashSet::new();
210 for element in html.select(&selector_reply) {
211 if is_quoted(element) { continue; }
212 if let Some(post_id_str) = element.attr("post_id") {
213 match Id::from_hex_str(post_id_str) {
214 Ok(post_id) => {
215 if !seen_replies.insert(post_id) { continue; }
216 trace!("reply post_id={:?}", post_id);
217 let referenced_post_header_bytes = element.attr("post_header_hex")
218 .and_then(|hex_str| hex::decode(hex_str).ok())
219 .map(Bytes::from);
220 linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::ReplyToPost, referenced_post_header_bytes });
221 }
222 Err(e) => warn!("reply post_id corrupted in element {:?}: {}", element, e),
223 }
224 } else {
225 warn!("post_id attribute not found in reply element {:?}", element);
226 }
227 }
228
229 let selector_sequel = Selector::parse("sequel").map_err(|e| anyhow::anyhow!("Failed to parse sequel selector: {}", e))?;
230 let mut seen_sequels: HashSet<Id> = HashSet::new();
231 for element in html.select(&selector_sequel) {
232 if is_quoted(element) { continue; }
233 if let Some(post_id_str) = element.attr("post_id") {
234 match Id::from_hex_str(post_id_str) {
235 Ok(post_id) => {
236 if !seen_sequels.insert(post_id) { continue; }
237 trace!("sequel post_id={:?}", post_id);
238 let referenced_post_header_bytes = element.attr("post_header_hex")
239 .and_then(|hex_str| hex::decode(hex_str).ok())
240 .map(Bytes::from);
241 linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::Sequel, referenced_post_header_bytes });
242 }
243 Err(e) => warn!("sequel post_id corrupted in element {:?}: {}", element, e),
244 }
245 } else {
246 warn!("post_id attribute not found in sequel element {:?}", element);
247 }
248 }
249 }
250 }
251
252 Ok((linked_base_id_details, referenced_hashtags))
253}
254
255#[allow(clippy::too_many_arguments)] async fn post_to_locations(
265 runtime_services: Arc<RuntimeServices>,
266 client_id: ClientId,
267 post_bundle_manager: Arc<LivePostBundleManager>,
268 peer_tracker: Arc<RwLock<PeerTracker>>,
269 recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
270 linked_base_id_details: Vec<LinkedBaseIdDetail>,
271 encoded_post: EncodedPostV1,
272 encoded_post_bytes: EncodedPostBytesV1,
273 encoded_post_bytes_raw: Bytes,
274 referenced_hashtags: Vec<String>,
275 timestamp: TimeMillis,
276 outcomes_tx: mpsc::UnboundedSender<SubmissionOutcome>,
277) {
278 for linked_base_id_detail in &linked_base_id_details {
280 trace!("Posting to bucket type: {:?}, linked_base_id: {}", linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id);
281 let mut committed_count = 0;
282 let try_result: anyhow::Result<()> = try {
283 for &bucket_duration in bucket_durations_for_type(linked_base_id_detail.bucket_type) {
285 let bucket_location = generate_bucket_location(linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id, bucket_duration, timestamp)?;
286 info!("checking posting availability of {:?}", bucket_location);
287
288 let post_bundle = post_bundle_manager.get_post_bundle(&bucket_location, timestamp).await?;
289 if !post_bundle.header.overflowed && !post_bundle.header.sealed {
290 info!("Posting to {:?}", bucket_location);
291 let result = post_to_location(&runtime_services, &client_id.id, &peer_tracker, &recent_posts_pen, &bucket_location, &encoded_post, &encoded_post_bytes, &encoded_post_bytes_raw, linked_base_id_detail.referenced_post_header_bytes.as_deref(), &referenced_hashtags, linked_base_id_detail.bucket_type, timestamp, &outcomes_tx).await;
292 match result {
293 Ok(result) => {
294 committed_count += result.len();
295 break;
296 }
297 Err(e) => {
298 warn!("Failed to post to {:?}: {}", bucket_location, e);
299 continue;
300 }
301 }
302 }
303
304 else {
305 trace!("no availability: overflowed={} sealed={}", post_bundle.header.overflowed, post_bundle.header.sealed);
306 }
307 }
308 };
309
310 if let Err(e) = try_result {
311 warn!("Failed to post to any bucket location for linked_base_id {:?}: {}", linked_base_id_detail.linked_base_id, e);
312 }
313
314 if linked_base_id_detail.bucket_type == BucketType::User && committed_count == 0 {
318 return;
319 }
320 }
321}
322
323#[allow(clippy::too_many_arguments)] async fn post_to_location(
325 runtime_services: &RuntimeServices,
326 sponsor_id: &Id,
327 peer_tracker: &Arc<RwLock<PeerTracker>>,
328 recent_posts_pen: &Arc<RwLock<RecentPostsPen>>,
329 bucket_location: &BucketLocation,
330 encoded_post: &EncodedPostV1,
331 encoded_post_bytes: &EncodedPostBytesV1,
332 encoded_post_bytes_raw: &Bytes,
333 referenced_post_header_bytes: Option<&[u8]>,
334 referenced_hashtags: &[String],
335 bucket_type: BucketType,
336 timestamp: TimeMillis,
337 outcomes_tx: &mpsc::UnboundedSender<SubmissionOutcome>,
338) -> anyhow::Result<Vec<SubmitPostCommitTokenV1>> {
339 let mut peers_visited = Vec::new();
340 let mut submit_post_claim_tokens = Vec::new();
341 let mut submit_post_commit_tokens = Vec::new();
342
343 let mut peer_tracker = peer_tracker.write().await;
344 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
345 while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
346 let result: anyhow::Result<()> = try {
347 info!("Requesting SubmitPostClaim with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
348
349 let request = SubmitPostClaimV1::new_to_bytes(bucket_location, referenced_post_header_bytes, referenced_hashtags, encoded_post_bytes.bytes_without_body())?;
351 let requisite_pow = amplification::get_minimum_post_pow(encoded_post.header.post_length, encoded_post.header.linked_base_ids.len(), bucket_location.duration);
352 let response = rpc::rpc::rpc_server_known_with_requisite_pow_and_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostClaimV1, request, requisite_pow).await?;
353 anyhow_assert_eq!(&PayloadResponseKind::SubmitPostClaimResponseV1, &response.response_request_kind);
354 let response = json::bytes_to_struct::<SubmitPostClaimResponseV1>(&response.bytes)?;
355
356 peers_visited.push(peer.clone());
357
358 peer_iter.add_peers(response.peers_nearer);
359
360 if let Some(submit_post_claim_token) = response.submit_post_claim_token {
361 info!("Received a SubmitPostClaim from peer {}", peer);
362
363 submit_post_claim_tokens.push(submit_post_claim_token.clone());
364
365 info!("Posting to peer {}", peer);
367 let request = SubmitPostCommitV1::new_to_bytes(bucket_location, &submit_post_claim_token, encoded_post_bytes.bytes())?;
368 let response = rpc::rpc::rpc_server_known_with_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostCommitV1, request).await?;
369 anyhow_assert_eq!(&PayloadResponseKind::SubmitPostCommitResponseV1, &response.response_request_kind);
370 let response = json::bytes_to_struct::<SubmitPostCommitResponseV1>(&response.bytes)?;
371
372 let submit_post_commit_token = response.submit_post_commit_token;
376 record_in_pen(recent_posts_pen, &submit_post_commit_token, encoded_post_bytes_raw, timestamp).await;
377 let _ = outcomes_tx.unbounded_send(SubmissionOutcome { bucket_type, token: submit_post_commit_token.clone() });
378 submit_post_commit_tokens.push(submit_post_commit_token);
379 if submit_post_commit_tokens.len() >= config::REDUNDANT_SERVERS_PER_POST {
380 break;
381 }
382 }
383 };
384
385 if let Err(e) = result {
386 warn!("Error retrieving SubmitPostClaim from peer {}: {}", peer, e);
387 peer_iter.remove_peer(&peer);
388 }
389 }
390
391 if submit_post_claim_tokens.is_empty() {
392 anyhow::bail!("Despite expecting availability, we were unable claim anywhere at {}", bucket_location);
393 }
394
395 if submit_post_commit_tokens.is_empty() {
396 anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location);
397 }
398
399 Ok(submit_post_commit_tokens)
400}
401
402pub async fn post_feedback_to_location(
403 runtime_services: &RuntimeServices,
404 sponsor_id: &Id,
405 peer_tracker: &Arc<RwLock<PeerTracker>>,
406 bucket_location: &BucketLocation,
407 encoded_post_feedback: &EncodedPostFeedbackV1,
408) -> anyhow::Result<()> {
409
410 let mut peers_visited = Vec::new();
411 let mut submit_post_feedback_accepts = Vec::new();
412
413 let mut peer_tracker = peer_tracker.write().await;
414 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
415 while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
416 let result: anyhow::Result<()> = try {
417 info!("Requesting SubmitPostFeedbackV1 with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
418
419 let request = SubmitPostFeedbackV1::new_to_bytes(bucket_location, encoded_post_feedback)?;
420 let response = rpc::rpc::rpc_server_known_with_requisite_pow(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostFeedbackV1, request, config::POW_MINIMUM_PER_FEEDBACK).await?;
421 anyhow_assert_eq!(&PayloadResponseKind::SubmitPostFeedbackResponseV1, &response.response_request_kind);
422 let response = json::bytes_to_struct::<SubmitPostFeedbackResponseV1>(&response.bytes)?;
423
424 peers_visited.push(peer.clone());
425
426 peer_iter.add_peers(response.peers_nearer);
427
428 if response.accepted {
429 info!("Received an accept from peer {}", peer);
430
431 submit_post_feedback_accepts.push(peer.clone());
432 if submit_post_feedback_accepts.len() >= config::REDUNDANT_SERVERS_PER_POST {
433 break;
434 }
435 }
436 };
437
438 if let Err(e) = result {
439 warn!("Error retrieving SubmitPostFeedbackV1 from peer {}: {}", peer, e);
440 peer_iter.remove_peer(&peer);
441 }
442 }
443
444 if submit_post_feedback_accepts.is_empty() {
445 anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location.location_id);
446 }
447
448 Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use crate::tools::keys::Keys;
455
456 fn test_client_id() -> anyhow::Result<ClientId> {
457 let keys = Keys::from_rnd(false)?;
458 ClientId::new(keys.verification_key_bytes, keys.pq_commitment_bytes)
459 }
460
461 const ID_A: &str = "1111111111111111111111111111111111111111111111111111111111111111";
463 const ID_B: &str = "2222222222222222222222222222222222222222222222222222222222222222";
464
465 fn details_of(details: &[LinkedBaseIdDetail], bucket_type: BucketType) -> Vec<&LinkedBaseIdDetail> {
466 details.iter().filter(|d| d.bucket_type == bucket_type).collect()
467 }
468
469 #[tokio::test]
470 async fn plain_post_yields_only_self_user_bucket() -> anyhow::Result<()> {
471 let client_id = test_client_id()?;
472 let (details, hashtags) = build_locations(&client_id, "just some text with no references")?;
473 assert_eq!(details.len(), 1);
474 assert_eq!(details[0].bucket_type, BucketType::User);
475 assert_eq!(details[0].linked_base_id, client_id.id);
476 assert!(hashtags.is_empty());
477 Ok(())
478 }
479
480 #[tokio::test]
481 async fn repeated_hashtag_is_deduplicated() -> anyhow::Result<()> {
482 let client_id = test_client_id()?;
483 let (details, hashtags) = build_locations(&client_id, r#"<hashtag hashtag="rust"></hashtag> and again <hashtag hashtag="rust"></hashtag>"#)?;
484 assert_eq!(hashtags, vec!["rust".to_string()]);
485 assert_eq!(details_of(&details, BucketType::Hashtag).len(), 1);
486 Ok(())
487 }
488
489 #[tokio::test]
490 async fn repeated_mention_is_deduplicated() -> anyhow::Result<()> {
491 let client_id = test_client_id()?;
492 let post = format!(r#"<mention client_id="{ID_A}"></mention> hi again <mention client_id="{ID_A}"></mention>"#);
493 let (details, _) = build_locations(&client_id, &post)?;
494 assert_eq!(details_of(&details, BucketType::Mention).len(), 1);
495 Ok(())
496 }
497
498 #[tokio::test]
499 async fn distinct_mentions_are_kept() -> anyhow::Result<()> {
500 let client_id = test_client_id()?;
501 let post = format!(r#"<mention client_id="{ID_A}"></mention> <mention client_id="{ID_B}"></mention>"#);
502 let (details, _) = build_locations(&client_id, &post)?;
503 assert_eq!(details_of(&details, BucketType::Mention).len(), 2);
504 Ok(())
505 }
506
507 #[tokio::test]
508 async fn repeated_reply_keeps_first_header_bytes() -> anyhow::Result<()> {
509 let client_id = test_client_id()?;
510 let post = format!(r#"<reply post_id="{ID_A}" post_header_hex="aabb"></reply> <reply post_id="{ID_A}" post_header_hex="ccdd"></reply>"#);
511 let (details, _) = build_locations(&client_id, &post)?;
512 let replies = details_of(&details, BucketType::ReplyToPost);
513 assert_eq!(replies.len(), 1);
514 assert_eq!(replies[0].referenced_post_header_bytes.as_deref(), Some(&[0xaa, 0xbb][..]));
515 Ok(())
516 }
517
518 #[tokio::test]
519 async fn repeated_sequel_is_deduplicated() -> anyhow::Result<()> {
520 let client_id = test_client_id()?;
521 let post = format!(r#"<sequel post_id="{ID_A}" post_header_hex="aabb"></sequel> <sequel post_id="{ID_A}" post_header_hex="ccdd"></sequel>"#);
522 let (details, _) = build_locations(&client_id, &post)?;
523 let sequels = details_of(&details, BucketType::Sequel);
524 assert_eq!(sequels.len(), 1);
525 assert_eq!(sequels[0].referenced_post_header_bytes.as_deref(), Some(&[0xaa, 0xbb][..]));
526 Ok(())
527 }
528
529 #[tokio::test]
530 async fn same_id_as_mention_and_reply_stays_two_details() -> anyhow::Result<()> {
531 let client_id = test_client_id()?;
532 let post = format!(r#"<mention client_id="{ID_A}"></mention> <reply post_id="{ID_A}"></reply>"#);
533 let (details, _) = build_locations(&client_id, &post)?;
534 assert_eq!(details_of(&details, BucketType::Mention).len(), 1);
535 assert_eq!(details_of(&details, BucketType::ReplyToPost).len(), 1);
536 Ok(())
537 }
538}