hashiverse_client_wasm/
wasm_parallel_pow_generator.rs1use hashiverse_lib::tools::pow_generator::pow_generator::{JobTracker, PowGenerator};
2use hashiverse_lib::tools::types::{Hash, Pow, Salt};
3use js_sys::{Array, Object, Reflect};
4use log::{info, warn};
5use send_wrapper::SendWrapper;
6use std::sync::{Arc, Mutex};
7use wasm_bindgen::prelude::*;
8use wasm_bindgen_futures::JsFuture;
9use web_sys::{MessageChannel, MessageEvent, Worker};
10
11pub struct WasmParallelPowGenerator {
17 tracker: Arc<Mutex<JobTracker>>,
18 workers: Vec<Worker>,
19}
20
21unsafe impl Send for WasmParallelPowGenerator {}
24unsafe impl Sync for WasmParallelPowGenerator {}
25
26impl WasmParallelPowGenerator {
27 pub fn from_workers(workers: Vec<Worker>) -> Self {
29 info!("WasmParallelPowGenerator: received {} pow workers", workers.len());
30 Self {
31 tracker: Arc::new(Mutex::new(JobTracker::default())),
32 workers,
33 }
34 }
35}
36
37#[async_trait::async_trait]
38impl PowGenerator for WasmParallelPowGenerator {
39 fn pool_size(&self) -> usize {
40 self.workers.len()
41 }
42
43 async fn run_chunk(&self, slot: usize, chunk_iterations: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
44 if self.workers.is_empty() {
45 anyhow::bail!("No pow workers available");
46 }
47
48 let worker = self.workers.get(slot).ok_or_else(|| anyhow::anyhow!("Invalid pow worker slot {}", slot))?.clone();
49 let data_hash_hex = hex::encode(data_hash);
50
51 let inner = async move {
54 let channel = MessageChannel::new().map_err(|e| anyhow::anyhow!("Failed to create MessageChannel: {:?}", e))?;
55 let port1 = channel.port1();
56 let port2 = channel.port2();
57
58 let promise = js_sys::Promise::new(&mut |resolve, reject| {
65 let resolve_clone = resolve.clone();
66 let onmessage = Closure::once_into_js(move |event: MessageEvent| {
67 resolve_clone.call1(&JsValue::NULL, &event.data()).ok();
68 });
69 port1.set_onmessage(Some(onmessage.unchecked_ref()));
70
71 let reject_clone = reject.clone();
72 let onmessageerror = Closure::once_into_js(move |_event: MessageEvent| {
73 reject_clone.call1(&JsValue::NULL, &JsValue::from_str("pow worker reply failed structured clone (onmessageerror)")).ok();
74 });
75 port1.set_onmessageerror(Some(onmessageerror.unchecked_ref()));
76
77 let reject_clone = reject.clone();
78 let onerror = Closure::once_into_js(move |event: JsValue| {
79 reject_clone.call1(&JsValue::NULL, &JsValue::from_str(&format!("pow worker error event: {:?}", event))).ok();
80 });
81 worker.set_onerror(Some(onerror.unchecked_ref()));
82 });
83
84 let msg = Object::new();
85 Reflect::set(&msg, &JsValue::from_str("iteration_limit"), &JsValue::from_f64(chunk_iterations as f64)).ok();
86 Reflect::set(&msg, &JsValue::from_str("pow_min"), &JsValue::from_f64(pow_min.0 as f64)).ok();
87 Reflect::set(&msg, &JsValue::from_str("data_hash_hex"), &JsValue::from_str(&data_hash_hex)).ok();
88
89 let transfer = Array::new();
90 transfer.push(&port2);
91 worker
92 .post_message_with_transfer(&msg, &transfer)
93 .map_err(|e| anyhow::anyhow!("Failed to post message to pow worker: {:?}", e))?;
94
95 let result = JsFuture::from(promise).await;
96
97 port1.set_onmessage(None);
102 port1.set_onmessageerror(None);
103 worker.set_onerror(None);
104 port1.close();
105
106 let response_data = result.map_err(|e| anyhow::anyhow!("Pow worker response error: {:?}", e))?;
107
108 if let Some(error_message) = Reflect::get(&response_data, &JsValue::from_str("error")).ok().and_then(|v| v.as_string()) {
111 anyhow::bail!("Pow worker error: {}", error_message);
112 }
113
114 let result_str = Reflect::get(&response_data, &JsValue::from_str("result"))
115 .ok()
116 .and_then(|v| v.as_string())
117 .ok_or_else(|| anyhow::anyhow!("Pow worker reply missing `result` string"))?;
118
119 parse_batch_result(&result_str).ok_or_else(|| anyhow::anyhow!("Invalid pow_compute_batch result format: {}", result_str))
120 };
121
122 SendWrapper::new(inner).await
123 }
124
125 fn tracker(&self) -> &Arc<Mutex<JobTracker>> {
126 &self.tracker
127 }
128}
129
130fn parse_batch_result(result: &str) -> Option<(Salt, Pow, Hash)> {
132 let parts: Vec<&str> = result.splitn(3, ':').collect();
133 if parts.len() != 3 {
134 warn!("Invalid pow_compute_batch result format: {}", result);
135 return None;
136 }
137
138 let salt_bytes = hex::decode(parts[0]).ok()?;
139 let pow_val: u8 = parts[1].parse().ok()?;
140 let hash_bytes = hex::decode(parts[2]).ok()?;
141
142 let salt = Salt::from_slice(&salt_bytes).ok()?;
143 let hash = Hash::from_slice(&hash_bytes).ok()?;
144
145 Some((salt, Pow(pow_val), hash))
146}