Skip to main content

hashiverse_client_wasm/
wasm_parallel_pow_generator.rs

1use hashiverse_lib::tools::pow_generator::pow_generator::{JobTracker, PowGenerator};
2use hashiverse_lib::tools::types::{Hash, Pow, Salt};
3use js_sys::{Array, Object, Reflect};
4use log::{info, warn};
5use send_wrapper::SendWrapper;
6use std::sync::{Arc, Mutex};
7use wasm_bindgen::prelude::*;
8use wasm_bindgen_futures::JsFuture;
9use web_sys::{MessageChannel, MessageEvent, Worker};
10
11/// A `PowGenerator` that dispatches chunks to pre-created Web Workers.
12/// The TypeScript side is responsible for spawning and initializing the workers;
13/// this struct simply receives the ready `Worker` handles and exposes one slot per worker.
14/// All scheduling — work-stealing refeed, early exit, tracker registration — lives in the
15/// shared `run_pool` dispatcher in `hashiverse-lib`.
16pub struct WasmParallelPowGenerator {
17    tracker: Arc<Mutex<JobTracker>>,
18    workers: Vec<Worker>,
19}
20
21// Safety: In WASM, everything is single-threaded. Worker handles are not Send
22// in web-sys's type system, but we never actually move them across threads.
23unsafe impl Send for WasmParallelPowGenerator {}
24unsafe impl Sync for WasmParallelPowGenerator {}
25
26impl WasmParallelPowGenerator {
27    /// Create a new generator from pre-initialized Worker handles.
28    pub fn from_workers(workers: Vec<Worker>) -> Self {
29        info!("WasmParallelPowGenerator: received {} pow workers", workers.len());
30        Self {
31            tracker: Arc::new(Mutex::new(JobTracker::default())),
32            workers,
33        }
34    }
35}
36
37#[async_trait::async_trait]
38impl PowGenerator for WasmParallelPowGenerator {
39    fn pool_size(&self) -> usize {
40        self.workers.len()
41    }
42
43    async fn run_chunk(&self, slot: usize, chunk_iterations: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
44        if self.workers.is_empty() {
45            anyhow::bail!("No pow workers available");
46        }
47
48        let worker = self.workers.get(slot).ok_or_else(|| anyhow::anyhow!("Invalid pow worker slot {}", slot))?.clone();
49        let data_hash_hex = hex::encode(data_hash);
50
51        // JsFuture / Worker are !Send; in WASM everything is single-threaded so wrap the
52        // !Send call in SendWrapper to satisfy the async-trait Send bound.
53        let inner = async move {
54            let channel = MessageChannel::new().map_err(|e| anyhow::anyhow!("Failed to create MessageChannel: {:?}", e))?;
55            let port1 = channel.port1();
56            let port2 = channel.port2();
57
58            // Three independent paths can end the Promise:
59            //   - port1.onmessage          → success, resolve with the reply payload
60            //   - port1.onmessageerror     → reply failed structured clone, reject
61            //   - worker.onerror           → worker uncaught throw / load failure, reject
62            // Without the two rejection paths the awaiter hangs forever if the worker dies
63            // before it can post `{ error: ... }`.
64            let promise = js_sys::Promise::new(&mut |resolve, reject| {
65                let resolve_clone = resolve.clone();
66                let onmessage = Closure::once_into_js(move |event: MessageEvent| {
67                    resolve_clone.call1(&JsValue::NULL, &event.data()).ok();
68                });
69                port1.set_onmessage(Some(onmessage.unchecked_ref()));
70
71                let reject_clone = reject.clone();
72                let onmessageerror = Closure::once_into_js(move |_event: MessageEvent| {
73                    reject_clone.call1(&JsValue::NULL, &JsValue::from_str("pow worker reply failed structured clone (onmessageerror)")).ok();
74                });
75                port1.set_onmessageerror(Some(onmessageerror.unchecked_ref()));
76
77                let reject_clone = reject.clone();
78                let onerror = Closure::once_into_js(move |event: JsValue| {
79                    reject_clone.call1(&JsValue::NULL, &JsValue::from_str(&format!("pow worker error event: {:?}", event))).ok();
80                });
81                worker.set_onerror(Some(onerror.unchecked_ref()));
82            });
83
84            let msg = Object::new();
85            Reflect::set(&msg, &JsValue::from_str("iteration_limit"), &JsValue::from_f64(chunk_iterations as f64)).ok();
86            Reflect::set(&msg, &JsValue::from_str("pow_min"), &JsValue::from_f64(pow_min.0 as f64)).ok();
87            Reflect::set(&msg, &JsValue::from_str("data_hash_hex"), &JsValue::from_str(&data_hash_hex)).ok();
88
89            let transfer = Array::new();
90            transfer.push(&port2);
91            worker
92                .post_message_with_transfer(&msg, &transfer)
93                .map_err(|e| anyhow::anyhow!("Failed to post message to pow worker: {:?}", e))?;
94
95            let result = JsFuture::from(promise).await;
96
97            // Cleanup regardless of outcome: clearing worker.onerror matters most — the
98            // worker is shared across run_chunk calls, so a leftover handler would mis-
99            // route the next chunk's error. port1.close() releases the channel; port2
100            // was transferred to the worker and is no longer ours to close.
101            port1.set_onmessage(None);
102            port1.set_onmessageerror(None);
103            worker.set_onerror(None);
104            port1.close();
105
106            let response_data = result.map_err(|e| anyhow::anyhow!("Pow worker response error: {:?}", e))?;
107
108            // The TS worker posts either `{ result: "salt:pow:hash" }` on success or
109            // `{ error: "..." }` if `pow_compute_batch` threw. Surface the error end-to-end.
110            if let Some(error_message) = Reflect::get(&response_data, &JsValue::from_str("error")).ok().and_then(|v| v.as_string()) {
111                anyhow::bail!("Pow worker error: {}", error_message);
112            }
113
114            let result_str = Reflect::get(&response_data, &JsValue::from_str("result"))
115                .ok()
116                .and_then(|v| v.as_string())
117                .ok_or_else(|| anyhow::anyhow!("Pow worker reply missing `result` string"))?;
118
119            parse_batch_result(&result_str).ok_or_else(|| anyhow::anyhow!("Invalid pow_compute_batch result format: {}", result_str))
120        };
121
122        SendWrapper::new(inner).await
123    }
124
125    fn tracker(&self) -> &Arc<Mutex<JobTracker>> {
126        &self.tracker
127    }
128}
129
130/// Parse the `salt_hex:pow_u8:hash_hex` result string from `pow_compute_batch`.
131fn parse_batch_result(result: &str) -> Option<(Salt, Pow, Hash)> {
132    let parts: Vec<&str> = result.splitn(3, ':').collect();
133    if parts.len() != 3 {
134        warn!("Invalid pow_compute_batch result format: {}", result);
135        return None;
136    }
137
138    let salt_bytes = hex::decode(parts[0]).ok()?;
139    let pow_val: u8 = parts[1].parse().ok()?;
140    let hash_bytes = hex::decode(parts[2]).ok()?;
141
142    let salt = Salt::from_slice(&salt_bytes).ok()?;
143    let hash = Hash::from_slice(&hash_bytes).ok()?;
144
145    Some((salt, Pow(pow_val), hash))
146}