1use crate::tools::pow::{pow_generate_with_iteration_limit};
31use crate::tools::pow_required_estimator::PowRequiredEstimator;
32use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
33use crate::tools::tools;
34use crate::tools::types::{Hash, Pow, Salt};
35use log::trace;
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38
39pub struct PowJobStatus {
40 pub label: String,
41 pub pow_min: Pow,
42 pub best_pow_so_far: Pow,
43}
44
45type JobId = u64;
46
47struct JobEntry {
48 label: String,
49 pow_min: Pow,
50 best_pow_so_far: Pow,
51}
52
53#[derive(Default)]
54pub struct JobTracker {
55 next_id: JobId,
56 jobs: HashMap<JobId, JobEntry>,
57}
58
59impl JobTracker {
60 pub fn add(&mut self, label: &str, pow_min: Pow) -> JobId {
61 let job_id = self.next_id;
62 self.next_id += 1;
63 self.jobs.insert(job_id, JobEntry { label: label.to_string(), pow_min, best_pow_so_far: Pow(0) });
64 job_id
65 }
66
67 pub fn update(&mut self, job_id: JobId, best_pow_so_far: Pow) {
68 if let Some(entry) = self.jobs.get_mut(&job_id) {
69 entry.best_pow_so_far = best_pow_so_far;
70 }
71 }
72
73 pub fn remove(&mut self, job_id: JobId) {
74 self.jobs.remove(&job_id);
75 }
76
77 pub fn snapshot(&self) -> Vec<PowJobStatus> {
78 self.jobs.values().map(|entry| PowJobStatus {
79 label: entry.label.clone(),
80 pow_min: entry.pow_min,
81 best_pow_so_far: entry.best_pow_so_far,
82 }).collect()
83 }
84}
85
86#[async_trait::async_trait]
102pub trait ParallelPowGenerator: Send + Sync {
103 async fn generate_best_effort(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
114
115 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
117
118 fn active_jobs(&self) -> Vec<PowJobStatus>;
120
121 fn tracker(&self) -> &Arc<Mutex<JobTracker>>;
124
125 async fn generate_best_effort_tracked(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
129 let job_id = self.tracker().lock().unwrap().add(label, pow_min);
130 let result = self.generate_best_effort(label, iteration_limit, pow_min, data_hash).await;
131 self.tracker().lock().unwrap().remove(job_id);
132 result
133 }
134}
135
136pub async fn generate_loop(
147 generator: &(dyn ParallelPowGenerator + '_),
148 tracker: &Arc<Mutex<JobTracker>>,
149 label: &str,
150 pow_min: Pow,
151 data_hash: Hash,
152) -> anyhow::Result<(Salt, Pow, Hash)> {
153 const BATCH_SIZE: usize = 64 * 1024;
154 let real_time_provider = RealTimeProvider::default();
155 let mut estimator = PowRequiredEstimator::new(real_time_provider.current_time_millis(), label, pow_min);
156 let job_id = tracker.lock().unwrap().add(label, pow_min);
157 loop {
158 let result = generator.generate_best_effort(label, BATCH_SIZE, pow_min, data_hash).await?;
159 if result.1 >= pow_min {
160 tracker.lock().unwrap().remove(job_id);
161 return Ok(result);
162 }
163 tracker.lock().unwrap().update(job_id, result.1);
164 let progress = estimator.record_batch_and_estimate(real_time_provider.current_time_millis(), BATCH_SIZE, result.1);
165 trace!("{}", progress);
166 tools::yield_now().await;
167 }
168}
169
170pub struct StubParallelPowGenerator {
175 tracker: Arc<Mutex<JobTracker>>,
176}
177
178impl StubParallelPowGenerator {
179 pub fn new() -> Self {
180 Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
181 }
182}
183
184impl Default for StubParallelPowGenerator {
185 fn default() -> Self { Self::new() }
186}
187
188#[async_trait::async_trait]
189impl ParallelPowGenerator for StubParallelPowGenerator {
190 async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
191 pow_generate_with_iteration_limit(iteration_limit, pow_min, &data_hash).await
192 }
193
194 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
195 generate_loop(self, &self.tracker, label, pow_min, data_hash).await
196 }
197
198 fn active_jobs(&self) -> Vec<PowJobStatus> {
199 self.tracker.lock().unwrap().snapshot()
200 }
201
202 fn tracker(&self) -> &Arc<Mutex<JobTracker>> {
203 &self.tracker
204 }
205}
206
207#[cfg(not(target_arch = "wasm32"))]
212pub struct NativeParallelPowGenerator {
213 tracker: Arc<Mutex<JobTracker>>,
214}
215
216#[cfg(not(target_arch = "wasm32"))]
217impl NativeParallelPowGenerator {
218 pub fn new() -> Self {
219 Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
220 }
221}
222
223#[cfg(not(target_arch = "wasm32"))]
224impl Default for NativeParallelPowGenerator {
225 fn default() -> Self { Self::new() }
226}
227
228#[cfg(not(target_arch = "wasm32"))]
229#[async_trait::async_trait]
230impl ParallelPowGenerator for NativeParallelPowGenerator {
231 async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
232 let num_threads = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
233 let per_thread = (iteration_limit / num_threads).max(1);
234 let result = tokio::task::spawn_blocking(move || {
235 use rayon::prelude::*;
236 (0..num_threads)
237 .into_par_iter()
238 .map(|_| {
239 let mut best = (Salt::zero(), Pow(0), Hash::zero());
240 for _ in 0..per_thread {
241 let salt = Salt::random();
242 if let Ok((pow, hash)) = crate::tools::pow::pow_measure_from_data_hash(&data_hash, &salt) {
243 if pow > best.1 {
244 best = (salt, pow, hash);
245 if pow >= pow_min {
246 break;
247 }
248 }
249 }
250 }
251 best
252 })
253 .reduce(
254 || (Salt::zero(), Pow(0), Hash::zero()),
255 |a, b| if b.1 > a.1 { b } else { a },
256 )
257 })
258 .await?;
259 Ok(result)
260 }
261
262 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
263 generate_loop(self, &self.tracker, label, pow_min, data_hash).await
264 }
265
266 fn active_jobs(&self) -> Vec<PowJobStatus> {
267 self.tracker.lock().unwrap().snapshot()
268 }
269
270 fn tracker(&self) -> &Arc<Mutex<JobTracker>> {
271 &self.tracker
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use crate::tools::parallel_pow_generator::{JobTracker, ParallelPowGenerator, StubParallelPowGenerator};
278 use crate::tools::pow::pow_compute_data_hash;
279 use crate::tools::tools;
280 use crate::tools::types::Pow;
281
282 #[test]
283 fn job_tracker_round_trip() {
284 let mut tracker = JobTracker::default();
285 assert!(tracker.snapshot().is_empty());
286
287 let job_a = tracker.add("rpc", Pow(18));
288 let job_b = tracker.add("post", Pow(22));
289
290 tracker.update(job_a, Pow(7));
291 tracker.update(job_b, Pow(13));
292 tracker.update(99999, Pow(255)); let mut snapshot = tracker.snapshot();
295 snapshot.sort_by(|a, b| a.label.cmp(&b.label));
296 assert_eq!(snapshot.len(), 2);
297 assert_eq!(snapshot[0].label, "post");
298 assert_eq!(snapshot[0].pow_min, Pow(22));
299 assert_eq!(snapshot[0].best_pow_so_far, Pow(13));
300 assert_eq!(snapshot[1].label, "rpc");
301 assert_eq!(snapshot[1].pow_min, Pow(18));
302 assert_eq!(snapshot[1].best_pow_so_far, Pow(7));
303
304 tracker.remove(job_a);
305 let remaining = tracker.snapshot();
306 assert_eq!(remaining.len(), 1);
307 assert_eq!(remaining[0].label, "post");
308
309 tracker.remove(job_b);
310 assert!(tracker.snapshot().is_empty());
311 }
312
313 #[tokio::test]
314 async fn stub_generates_valid_pow() -> anyhow::Result<()> {
315 const POW_MIN: Pow = Pow(12);
316 let mut data = [0u8; 64];
317 tools::random_fill_bytes(&mut data);
318 let data_hash = pow_compute_data_hash(&[&data]);
319 let generator = StubParallelPowGenerator::new();
320 let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
321 assert!(pow >= POW_MIN);
322 Ok(())
323 }
324
325 #[cfg(not(target_arch = "wasm32"))]
326 #[tokio::test]
327 async fn native_generates_valid_pow() -> anyhow::Result<()> {
328 const POW_MIN: Pow = Pow(12);
329 let mut data = [0u8; 64];
330 tools::random_fill_bytes(&mut data);
331 let data_hash = pow_compute_data_hash(&[&data]);
332 let generator = crate::tools::parallel_pow_generator::NativeParallelPowGenerator::new();
333 let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
334 assert!(pow >= POW_MIN);
335 Ok(())
336 }
337}