Skip to main content

hashiverse_lib/tools/
parallel_pow_generator.rs

1//! # Parallel proof-of-work search engine
2//!
3//! Proof-of-work is mandatory on every outgoing RPC, every peer announcement, and every
4//! piece of report/feedback — so finding a PoW solution quickly is on the hot path for
5//! virtually every client and server action. This module isolates that work behind a
6//! single trait, [`ParallelPowGenerator`], so the calling code doesn't care whether it's
7//! running on a 32-core server or a single-threaded WASM Web Worker.
8//!
9//! ## Implementations
10//!
11//! - [`NativeParallelPowGenerator`] — rayon + `tokio::task::spawn_blocking`, saturates
12//!   every CPU core on native targets.
13//! - [`StubParallelPowGenerator`] — single-threaded fallback that works on every target
14//!   including WASM. Browser clients run this (with a relaxed `pow_min`) because Web
15//!   Workers don't expose thread pools.
16//!
17//! ## Observability
18//!
19//! [`JobTracker`] + [`PowJobStatus`] expose the set of in-flight PoW jobs and the
20//! best-so-far pow for each. The web client surfaces this in the UI so that when a post
21//! feels slow to send the user can see it's because PoW is still grinding.
22//!
23//! ## Shared loop
24//!
25//! [`generate_loop`] is the one-true batching loop used by both implementations: repeatedly
26//! call [`ParallelPowGenerator::generate_best_effort`] in 64K-attempt batches, update the
27//! tracker, and bail as soon as a batch returns `pow >= pow_min`. Every batch also yields
28//! to the runtime so on single-threaded targets other tasks still get a chance to run.
29
30use crate::tools::pow::{pow_generate_with_iteration_limit};
31use crate::tools::pow_required_estimator::PowRequiredEstimator;
32use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
33use crate::tools::tools;
34use crate::tools::types::{Hash, Pow, Salt};
35use log::trace;
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38
39pub struct PowJobStatus {
40    pub label: String,
41    pub pow_min: Pow,
42    pub best_pow_so_far: Pow,
43}
44
45type JobId = u64;
46
47struct JobEntry {
48    label: String,
49    pow_min: Pow,
50    best_pow_so_far: Pow,
51}
52
53#[derive(Default)]
54pub struct JobTracker {
55    next_id: JobId,
56    jobs: HashMap<JobId, JobEntry>,
57}
58
59impl JobTracker {
60    pub fn add(&mut self, label: &str, pow_min: Pow) -> JobId {
61        let job_id = self.next_id;
62        self.next_id += 1;
63        self.jobs.insert(job_id, JobEntry { label: label.to_string(), pow_min, best_pow_so_far: Pow(0) });
64        job_id
65    }
66
67    pub fn update(&mut self, job_id: JobId, best_pow_so_far: Pow) {
68        if let Some(entry) = self.jobs.get_mut(&job_id) {
69            entry.best_pow_so_far = best_pow_so_far;
70        }
71    }
72
73    pub fn remove(&mut self, job_id: JobId) {
74        self.jobs.remove(&job_id);
75    }
76
77    pub fn snapshot(&self) -> Vec<PowJobStatus> {
78        self.jobs.values().map(|entry| PowJobStatus {
79            label: entry.label.clone(),
80            pow_min: entry.pow_min,
81            best_pow_so_far: entry.best_pow_so_far,
82        }).collect()
83    }
84}
85
86/// A pluggable engine for searching for proof-of-work solutions in parallel.
87///
88/// Proof-of-work is required on every RPC packet, on peer announcements, and on report /
89/// feedback submissions, so finding PoW is on the hot path for every outbound action a client
90/// or server takes. `ParallelPowGenerator` abstracts over the concrete way we parallelize that
91/// search so the calling code stays platform-agnostic:
92///
93/// - [`NativeParallelPowGenerator`] uses `rayon` + `tokio::task::spawn_blocking` to pin the
94///   search across all CPU cores on native targets.
95/// - [`StubParallelPowGenerator`] is a single-threaded fallback that works on every target,
96///   including WASM. Browser clients use this (with a relaxed `pow_min`) because Web Workers
97///   do not expose `rayon` / threads directly.
98///
99/// Implementations must also maintain the `active_jobs()` observability view — the UI surfaces
100/// in-progress PoW searches to end users so they understand why an action is slow.
101#[async_trait::async_trait]
102pub trait ParallelPowGenerator: Send + Sync {
103    /// Run up to `iteration_limit` hash attempts and return the best `(Salt, Pow, Hash)` found.
104    /// Exits early if `pow >= pow_min` is achieved.
105    ///
106    /// `label` is a human-readable job name for observability (e.g. `"rpc:AnnounceV1"`, `"feedback"`).
107    /// `data_hash` must be pre-computed via `pow_compute_data_hash` before calling.
108    ///
109    /// Note: this method does NOT register the job with the tracker. Use it only from inside
110    /// `generate_loop` (which manages its own tracker entry across batches). Direct callers
111    /// that want their single-batch search to show up in `active_jobs()` should use
112    /// [`Self::generate_best_effort_tracked`] instead.
113    async fn generate_best_effort(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
114
115    /// Loop `generate_best_effort` in batches until `pow >= pow_min` is achieved.
116    async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
117
118    /// Snapshot of all concurrently in-flight tracked jobs.
119    fn active_jobs(&self) -> Vec<PowJobStatus>;
120
121    /// Accessor for the impl's `JobTracker`. Exists so the default `generate_best_effort_tracked`
122    /// implementation can register the job without each impl having to duplicate the wrapping.
123    fn tracker(&self) -> &Arc<Mutex<JobTracker>>;
124
125    /// `generate_best_effort` plus tracker registration for the duration of the call.
126    /// Use this when a single-batch PoW is run directly (i.e. not inside `generate_loop`),
127    /// otherwise the job is invisible to `active_jobs()`.
128    async fn generate_best_effort_tracked(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
129        let job_id = self.tracker().lock().unwrap().add(label, pow_min);
130        let result = self.generate_best_effort(label, iteration_limit, pow_min, data_hash).await;
131        self.tracker().lock().unwrap().remove(job_id);
132        result
133    }
134}
135
136/// Shared loop logic for `generate`: repeatedly calls `generate_best_effort` in
137/// `BATCH_SIZE` batches until `pow >= pow_min`, tracking progress via `JobTracker`.
138///
139/// Future optimization: the current batch-and-wait approach dispatches to all N workers,
140/// then waits for all N to respond before dispatching the next batch. This means fast
141/// workers sit idle while the slowest worker finishes. A better design would feed workers
142/// individually as they complete (work-stealing / pool-style), maintaining a shared
143/// "best result so far" per job and checking pow_min after each worker result. This would
144/// also allow concurrent generate() calls to have their batches truly interleaved at the
145/// individual-worker level rather than at the batch level.
146pub async fn generate_loop(
147    generator: &(dyn ParallelPowGenerator + '_),
148    tracker: &Arc<Mutex<JobTracker>>,
149    label: &str,
150    pow_min: Pow,
151    data_hash: Hash,
152) -> anyhow::Result<(Salt, Pow, Hash)> {
153    const BATCH_SIZE: usize = 64 * 1024;
154    let real_time_provider = RealTimeProvider::default();
155    let mut estimator = PowRequiredEstimator::new(real_time_provider.current_time_millis(), label, pow_min);
156    let job_id = tracker.lock().unwrap().add(label, pow_min);
157    loop {
158        let result = generator.generate_best_effort(label, BATCH_SIZE, pow_min, data_hash).await?;
159        if result.1 >= pow_min {
160            tracker.lock().unwrap().remove(job_id);
161            return Ok(result);
162        }
163        tracker.lock().unwrap().update(job_id, result.1);
164        let progress = estimator.record_batch_and_estimate(real_time_provider.current_time_millis(), BATCH_SIZE, result.1);
165        trace!("{}", progress);
166        tools::yield_now().await;
167    }
168}
169
170// ──────────────────────────────────────────────────────────────────────────────
171// StubParallelPowGenerator — single-threaded, works on all platforms
172// ──────────────────────────────────────────────────────────────────────────────
173
174pub struct StubParallelPowGenerator {
175    tracker: Arc<Mutex<JobTracker>>,
176}
177
178impl StubParallelPowGenerator {
179    pub fn new() -> Self {
180        Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
181    }
182}
183
184impl Default for StubParallelPowGenerator {
185    fn default() -> Self { Self::new() }
186}
187
188#[async_trait::async_trait]
189impl ParallelPowGenerator for StubParallelPowGenerator {
190    async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
191        pow_generate_with_iteration_limit(iteration_limit, pow_min, &data_hash).await
192    }
193
194    async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
195        generate_loop(self, &self.tracker, label, pow_min, data_hash).await
196    }
197
198    fn active_jobs(&self) -> Vec<PowJobStatus> {
199        self.tracker.lock().unwrap().snapshot()
200    }
201
202    fn tracker(&self) -> &Arc<Mutex<JobTracker>> {
203        &self.tracker
204    }
205}
206
207// ──────────────────────────────────────────────────────────────────────────────
208// NativeParallelPowGenerator — rayon + spawn_blocking, non-WASM only
209// ──────────────────────────────────────────────────────────────────────────────
210
211#[cfg(not(target_arch = "wasm32"))]
212pub struct NativeParallelPowGenerator {
213    tracker: Arc<Mutex<JobTracker>>,
214}
215
216#[cfg(not(target_arch = "wasm32"))]
217impl NativeParallelPowGenerator {
218    pub fn new() -> Self {
219        Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
220    }
221}
222
223#[cfg(not(target_arch = "wasm32"))]
224impl Default for NativeParallelPowGenerator {
225    fn default() -> Self { Self::new() }
226}
227
228#[cfg(not(target_arch = "wasm32"))]
229#[async_trait::async_trait]
230impl ParallelPowGenerator for NativeParallelPowGenerator {
231    async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
232        let num_threads = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
233        let per_thread = (iteration_limit / num_threads).max(1);
234        let result = tokio::task::spawn_blocking(move || {
235            use rayon::prelude::*;
236            (0..num_threads)
237                .into_par_iter()
238                .map(|_| {
239                    let mut best = (Salt::zero(), Pow(0), Hash::zero());
240                    for _ in 0..per_thread {
241                        let salt = Salt::random();
242                        if let Ok((pow, hash)) = crate::tools::pow::pow_measure_from_data_hash(&data_hash, &salt) {
243                            if pow > best.1 {
244                                best = (salt, pow, hash);
245                                if pow >= pow_min {
246                                    break;
247                                }
248                            }
249                        }
250                    }
251                    best
252                })
253                .reduce(
254                    || (Salt::zero(), Pow(0), Hash::zero()),
255                    |a, b| if b.1 > a.1 { b } else { a },
256                )
257        })
258        .await?;
259        Ok(result)
260    }
261
262    async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
263        generate_loop(self, &self.tracker, label, pow_min, data_hash).await
264    }
265
266    fn active_jobs(&self) -> Vec<PowJobStatus> {
267        self.tracker.lock().unwrap().snapshot()
268    }
269
270    fn tracker(&self) -> &Arc<Mutex<JobTracker>> {
271        &self.tracker
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use crate::tools::parallel_pow_generator::{JobTracker, ParallelPowGenerator, StubParallelPowGenerator};
278    use crate::tools::pow::pow_compute_data_hash;
279    use crate::tools::tools;
280    use crate::tools::types::Pow;
281
282    #[test]
283    fn job_tracker_round_trip() {
284        let mut tracker = JobTracker::default();
285        assert!(tracker.snapshot().is_empty());
286
287        let job_a = tracker.add("rpc", Pow(18));
288        let job_b = tracker.add("post", Pow(22));
289
290        tracker.update(job_a, Pow(7));
291        tracker.update(job_b, Pow(13));
292        tracker.update(99999, Pow(255)); // unknown job_id is silently ignored
293
294        let mut snapshot = tracker.snapshot();
295        snapshot.sort_by(|a, b| a.label.cmp(&b.label));
296        assert_eq!(snapshot.len(), 2);
297        assert_eq!(snapshot[0].label, "post");
298        assert_eq!(snapshot[0].pow_min, Pow(22));
299        assert_eq!(snapshot[0].best_pow_so_far, Pow(13));
300        assert_eq!(snapshot[1].label, "rpc");
301        assert_eq!(snapshot[1].pow_min, Pow(18));
302        assert_eq!(snapshot[1].best_pow_so_far, Pow(7));
303
304        tracker.remove(job_a);
305        let remaining = tracker.snapshot();
306        assert_eq!(remaining.len(), 1);
307        assert_eq!(remaining[0].label, "post");
308
309        tracker.remove(job_b);
310        assert!(tracker.snapshot().is_empty());
311    }
312
313    #[tokio::test]
314    async fn stub_generates_valid_pow() -> anyhow::Result<()> {
315        const POW_MIN: Pow = Pow(12);
316        let mut data = [0u8; 64];
317        tools::random_fill_bytes(&mut data);
318        let data_hash = pow_compute_data_hash(&[&data]);
319        let generator = StubParallelPowGenerator::new();
320        let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
321        assert!(pow >= POW_MIN);
322        Ok(())
323    }
324
325    #[cfg(not(target_arch = "wasm32"))]
326    #[tokio::test]
327    async fn native_generates_valid_pow() -> anyhow::Result<()> {
328        const POW_MIN: Pow = Pow(12);
329        let mut data = [0u8; 64];
330        tools::random_fill_bytes(&mut data);
331        let data_hash = pow_compute_data_hash(&[&data]);
332        let generator = crate::tools::parallel_pow_generator::NativeParallelPowGenerator::new();
333        let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
334        assert!(pow >= POW_MIN);
335        Ok(())
336    }
337}