1use crate::tools::pow::pow_measure_from_data_hash;
16use crate::tools::pow_required_estimator::PowRequiredEstimator;
17use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
18use crate::tools::types::{Hash, Pow, Salt};
19use futures::stream::{FuturesUnordered, StreamExt};
20use log::trace;
21use std::collections::HashMap;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::{Arc, Mutex};
25
26pub struct PowJobStatus {
27 pub label: String,
28 pub pow_min: Pow,
29 pub best_pow_so_far: Pow,
30}
31
32type JobId = u64;
33
34struct JobEntry {
35 label: String,
36 pow_min: Pow,
37 best_pow_so_far: Pow,
38}
39
40#[derive(Default)]
41pub struct JobTracker {
42 next_id: JobId,
43 jobs: HashMap<JobId, JobEntry>,
44 last_work_time_millis: i64,
47}
48
49impl JobTracker {
50 pub fn add(&mut self, label: &str, pow_min: Pow) -> JobId {
51 let job_id = self.next_id;
52 self.next_id += 1;
53 self.jobs.insert(job_id, JobEntry { label: label.to_string(), pow_min, best_pow_so_far: Pow(0) });
54 job_id
55 }
56
57 pub fn update(&mut self, job_id: JobId, best_pow_so_far: Pow) {
58 if let Some(entry) = self.jobs.get_mut(&job_id) {
59 entry.best_pow_so_far = best_pow_so_far;
60 }
61 }
62
63 pub fn remove(&mut self, job_id: JobId) {
64 self.jobs.remove(&job_id);
65 }
66
67 pub fn mark_work(&mut self, now_millis: i64) {
69 self.last_work_time_millis = now_millis;
70 }
71
72 pub fn is_busy(&self, now_millis: i64, within_millis: i64) -> bool {
76 !self.jobs.is_empty() || (self.last_work_time_millis != 0 && now_millis - self.last_work_time_millis <= within_millis)
77 }
78
79 pub fn snapshot(&self) -> Vec<PowJobStatus> {
80 self.jobs.values().map(|entry| PowJobStatus {
81 label: entry.label.clone(),
82 pow_min: entry.pow_min,
83 best_pow_so_far: entry.best_pow_so_far,
84 }).collect()
85 }
86}
87
88struct TrackedJobGuard {
91 tracker: Arc<Mutex<JobTracker>>,
92 job_id: JobId,
93}
94
95impl TrackedJobGuard {
96 fn new(tracker: Arc<Mutex<JobTracker>>, label: &str, pow_min: Pow) -> Self {
97 let now = RealTimeProvider.current_time_millis().0;
98 let job_id = {
99 let mut tracker = tracker.lock().unwrap();
100 let job_id = tracker.add(label, pow_min);
101 tracker.mark_work(now);
102 job_id
103 };
104 Self { tracker, job_id }
105 }
106
107 fn update(&self, best_pow_so_far: Pow) {
108 self.tracker.lock().unwrap().update(self.job_id, best_pow_so_far);
109 }
110}
111
112impl Drop for TrackedJobGuard {
113 fn drop(&mut self) {
114 let now = RealTimeProvider.current_time_millis().0;
117 let mut tracker = self.tracker.lock().unwrap();
118 tracker.remove(self.job_id);
119 tracker.mark_work(now);
120 }
121}
122
123#[async_trait::async_trait]
148pub trait PowGenerator: Send + Sync {
149 fn pool_size(&self) -> usize;
153
154 async fn run_chunk(&self, slot: usize, chunk_iterations: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
160
161 fn tracker(&self) -> &Arc<Mutex<JobTracker>>;
165
166 fn active_jobs(&self) -> Vec<PowJobStatus> {
168 self.tracker().lock().unwrap().snapshot()
169 }
170
171 fn is_pow_busy(&self, within_millis: i64) -> bool {
175 let now = RealTimeProvider.current_time_millis().0;
176 self.tracker().lock().unwrap().is_busy(now, within_millis)
177 }
178
179 async fn generate_best_effort(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
184 run_pool(self, label, Some(iteration_limit), pow_min, data_hash).await
185 }
186
187 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
190 run_pool(self, label, None, pow_min, data_hash).await
191 }
192}
193
194const CHUNK_ITERATIONS: usize = 4 * 1024;
200
201type SlotFuture<'a> = Pin<Box<dyn Future<Output = (usize, usize, anyhow::Result<(Salt, Pow, Hash)>)> + Send + 'a>>;
202
203pub async fn run_pool<'a, G: PowGenerator + ?Sized>(
216 generator: &'a G,
217 label: &'a str,
218 iteration_cap: Option<usize>,
219 pow_min: Pow,
220 data_hash: Hash,
221) -> anyhow::Result<(Salt, Pow, Hash)> {
222 let tracker = generator.tracker().clone();
223 let guard = TrackedJobGuard::new(tracker, label, pow_min);
224
225 let real_time_provider = RealTimeProvider;
226 let mut estimator = PowRequiredEstimator::new(real_time_provider.current_time_millis(), label, pow_min);
227
228 let pool_size = generator.pool_size().max(1);
229 let mut remaining_iterations: Option<usize> = iteration_cap;
230
231 let mut best = {
238 let seed_salt = Salt::random();
239 let (seed_pow, seed_hash) = pow_measure_from_data_hash(&data_hash, &seed_salt)?;
240 (seed_salt, seed_pow, seed_hash)
241 };
242
243 guard.update(best.1);
244
245 if best.1 >= pow_min {
247 return Ok(best);
248 }
249
250 let mut in_flight: FuturesUnordered<SlotFuture<'a>> = FuturesUnordered::new();
251
252 for slot in 0..pool_size {
254 let chunk_size = pick_next_chunk_size(&mut remaining_iterations);
255 if chunk_size == 0 {
256 break;
257 }
258
259 in_flight.push(Box::pin(async move {
260 let chunk_result = generator.run_chunk(slot, chunk_size, pow_min, data_hash).await;
261 (slot, chunk_size, chunk_result)
262 }));
263 }
264
265 while let Some((slot, chunk_size, chunk_result)) = in_flight.next().await {
267 let chunk_best = chunk_result?;
268 if chunk_best.1 > best.1 {
269 best = chunk_best;
270 guard.update(best.1);
271 }
272 if best.1 >= pow_min {
273 return Ok(best);
274 }
275 if let Some(progress) = estimator.record_batch_and_estimate(real_time_provider.current_time_millis(), chunk_size, best.1) {
276 trace!("{}", progress);
277 }
278
279 let next_chunk_size = pick_next_chunk_size(&mut remaining_iterations);
280 if next_chunk_size == 0 {
281 continue;
282 }
283
284 in_flight.push(Box::pin(async move {
285 let chunk_result = generator.run_chunk(slot, next_chunk_size, pow_min, data_hash).await;
286 (slot, next_chunk_size, chunk_result)
287 }));
288 }
289
290 Ok(best)
291}
292
293fn pick_next_chunk_size(remaining_iterations: &mut Option<usize>) -> usize {
294 match remaining_iterations {
295 Some(0) => 0,
296 Some(remaining) => {
297 let chunk_size = (*remaining).min(CHUNK_ITERATIONS);
298 *remaining -= chunk_size;
299 chunk_size
300 }
301 None => CHUNK_ITERATIONS,
302 }
303}
304
305pub fn run_pool_chunk(chunk_iterations: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
306 let mut best = {
307 let salt = Salt::random();
308 let (pow, hash) = pow_measure_from_data_hash(&data_hash, &salt)?;
309 (salt, pow, hash)
310 };
311
312 if best.1 >= pow_min {
313 return Ok(best);
314 }
315
316 for _ in 1..chunk_iterations {
317 let salt = Salt::random();
318 let (pow, hash) = pow_measure_from_data_hash(&data_hash, &salt)?;
319 if pow > best.1 {
320 best = (salt, pow, hash);
321 if best.1 >= pow_min {
322 return Ok(best);
323 }
324 }
325 }
326
327 Ok(best)
328}
329
330#[cfg(test)]
331mod tests {
332 use crate::tools::pow_generator::pow_generator::{JobTracker, TrackedJobGuard};
333 use crate::tools::types::Pow;
334 use std::sync::{Arc, Mutex};
335
336 #[test]
337 fn job_tracker_round_trip() {
338 let mut tracker = JobTracker::default();
339 assert!(tracker.snapshot().is_empty());
340
341 let job_a = tracker.add("rpc", Pow(18));
342 let job_b = tracker.add("post", Pow(22));
343
344 tracker.update(job_a, Pow(7));
345 tracker.update(job_b, Pow(13));
346 tracker.update(99999, Pow(255)); let mut snapshot = tracker.snapshot();
349 snapshot.sort_by(|a, b| a.label.cmp(&b.label));
350 assert_eq!(snapshot.len(), 2);
351 assert_eq!(snapshot[0].label, "post");
352 assert_eq!(snapshot[0].pow_min, Pow(22));
353 assert_eq!(snapshot[0].best_pow_so_far, Pow(13));
354 assert_eq!(snapshot[1].label, "rpc");
355 assert_eq!(snapshot[1].pow_min, Pow(18));
356 assert_eq!(snapshot[1].best_pow_so_far, Pow(7));
357
358 tracker.remove(job_a);
359 let remaining = tracker.snapshot();
360 assert_eq!(remaining.len(), 1);
361 assert_eq!(remaining[0].label, "post");
362
363 tracker.remove(job_b);
364 assert!(tracker.snapshot().is_empty());
365 }
366
367 #[test]
368 fn tracked_job_guard_removes_on_drop() {
369 let tracker = Arc::new(Mutex::new(JobTracker::default()));
370 {
371 let _guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
372 assert_eq!(tracker.lock().unwrap().snapshot().len(), 1);
373 }
374 assert!(tracker.lock().unwrap().snapshot().is_empty());
375 }
376
377 #[test]
378 fn tracked_job_guard_update_writes_through() {
379 let tracker = Arc::new(Mutex::new(JobTracker::default()));
380 let guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
381 guard.update(Pow(42));
382 let snapshot = tracker.lock().unwrap().snapshot();
383 assert_eq!(snapshot.len(), 1);
384 assert_eq!(snapshot[0].label, "rpc");
385 assert_eq!(snapshot[0].pow_min, Pow(18));
386 assert_eq!(snapshot[0].best_pow_so_far, Pow(42));
387 }
388
389 #[test]
390 fn is_busy_tracks_active_jobs_and_recent_work_window() {
391 let mut tracker = JobTracker::default();
392
393 assert!(!tracker.is_busy(1_000_000, 1000));
395
396 let job = tracker.add("rpc", Pow(18));
398 tracker.mark_work(1_000_000);
399 assert!(tracker.is_busy(1_000_000, 1000));
400 assert!(tracker.is_busy(9_999_999, 1000)); tracker.remove(job);
404 tracker.mark_work(2_000_000);
405 assert!(tracker.is_busy(2_000_500, 1000)); assert!(tracker.is_busy(2_001_000, 1000)); assert!(!tracker.is_busy(2_001_500, 1000)); }
409
410 #[test]
411 fn tracked_job_guard_marks_work_via_real_clock() {
412 use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
413
414 let tracker = Arc::new(Mutex::new(JobTracker::default()));
415
416 {
418 let _guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
419 assert!(tracker.lock().unwrap().is_busy(0, 0));
420 }
421
422 assert!(tracker.lock().unwrap().snapshot().is_empty());
425 let now = RealTimeProvider.current_time_millis().0;
426 assert!(tracker.lock().unwrap().is_busy(now, 1000));
427 assert!(!tracker.lock().unwrap().is_busy(now + 60_000, 1000));
428 }
429
430 #[tokio::test]
431 async fn run_pool_returns_consistent_sample_when_iteration_limit_is_zero() {
432 use crate::tools::pow::{pow_compute_data_hash, pow_measure_from_data_hash};
433 use crate::tools::pow_generator::pow_generator::PowGenerator;
434 use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
435 use crate::tools::types::Pow;
436
437 let data_hash = pow_compute_data_hash(&[b"zero-budget"]);
440 let generator = SingleThreadedPowGenerator::new();
441 let (salt, achieved_pow, _) = generator.generate_best_effort("zero", 0, Pow(255), data_hash).await.unwrap();
442 let (recomputed_pow, _) = pow_measure_from_data_hash(&data_hash, &salt).unwrap();
443 assert_eq!(recomputed_pow, achieved_pow);
444 }
445
446 #[tokio::test]
447 async fn run_pool_tracker_clears_after_completion() {
448 use crate::tools::pow::pow_compute_data_hash;
449 use crate::tools::pow_generator::pow_generator::PowGenerator;
450 use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
451 use crate::tools::types::Pow;
452
453 let data_hash = pow_compute_data_hash(&[b"tracker-cleanup"]);
454 let generator = SingleThreadedPowGenerator::new();
455 let _ = generator.generate("clean", Pow(0), data_hash).await.unwrap();
457 assert!(generator.active_jobs().is_empty());
458 }
459
460 #[tokio::test]
461 async fn run_pool_returns_as_soon_as_pow_min_is_met() {
462 use crate::tools::pow::pow_compute_data_hash;
463 use crate::tools::pow_generator::pow_generator::PowGenerator;
464 use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
465 use crate::tools::types::Pow;
466
467 const POW_MIN: Pow = Pow(8);
468 let data_hash = pow_compute_data_hash(&[b"early-exit"]);
469 let generator = SingleThreadedPowGenerator::new();
470 let (_, achieved_pow, _) = generator.generate("early", POW_MIN, data_hash).await.unwrap();
471 assert!(achieved_pow >= POW_MIN);
472 }
473
474 #[tokio::test]
480 async fn run_pool_pow_min_zero_returns_consistent_salt_and_pow() {
481 use crate::tools::pow::{pow_compute_data_hash, pow_measure_from_data_hash};
482 use crate::tools::pow_generator::pow_generator::PowGenerator;
483 use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
484 use crate::tools::types::Pow;
485
486 let generator = SingleThreadedPowGenerator::new();
487 for trial in 0u32..256 {
488 let data_hash = pow_compute_data_hash(&[&trial.to_le_bytes()]);
489 let (salt, achieved_pow, _) = generator.generate_best_effort("regression", 1, Pow(0), data_hash).await.unwrap();
490 let (recomputed_pow, _) = pow_measure_from_data_hash(&data_hash, &salt).unwrap();
491 assert_eq!(recomputed_pow, achieved_pow, "trial {}: salt and pow drifted apart", trial);
492 }
493 }
494
495 #[tokio::test]
501 async fn run_pool_returns_consistent_sample_when_budget_exhausted() {
502 use crate::tools::pow::{pow_compute_data_hash, pow_measure_from_data_hash};
503 use crate::tools::pow_generator::pow_generator::PowGenerator;
504 use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
505 use crate::tools::types::Pow;
506
507 let generator = SingleThreadedPowGenerator::new();
508 for trial in 0u32..256 {
509 let data_hash = pow_compute_data_hash(&[b"exhaust", &trial.to_le_bytes()]);
510 let (salt, achieved_pow, _) = generator.generate_best_effort("exhaust", 1, Pow(255), data_hash).await.unwrap();
512 let (recomputed_pow, _) = pow_measure_from_data_hash(&data_hash, &salt).unwrap();
513 assert_eq!(recomputed_pow, achieved_pow, "trial {}: returned salt does not produce returned pow", trial);
514 }
515 }
516}