hashiverse_lib/tools/time_provider/
manual_time_provider.rs1use crate::tools::time::{TimeMillis};
9use parking_lot::RwLock;
10use std::cmp::Ordering;
11use std::collections::BinaryHeap;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll, Waker};
16use std::time::Duration;
17use tokio::sync::Notify;
18use tokio_util::sync::CancellationToken;
19use crate::tools::time_provider::time_provider::TimeProvider;
20
21#[derive(Debug)]
23struct ManualTimeProviderWakeTime {
24 time: TimeMillis,
25 waker: Waker,
26}
27
28impl PartialEq for ManualTimeProviderWakeTime {
29 fn eq(&self, other: &Self) -> bool {
30 self.time == other.time
31 }
32}
33
34impl Eq for ManualTimeProviderWakeTime {}
35
36impl PartialOrd for ManualTimeProviderWakeTime {
37 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
38 Some(self.cmp(other))
39 }
40}
41
42impl Ord for ManualTimeProviderWakeTime {
43 fn cmp(&self, other: &Self) -> Ordering {
44 other.time.cmp(&self.time)
46 }
47}
48
49#[derive(Clone)]
54pub struct ManualTimeProvider {
55 current_time: Arc<RwLock<TimeMillis>>,
56 wake_times: Arc<RwLock<BinaryHeap<ManualTimeProviderWakeTime>>>,
57 new_sleepers_notify: Arc<Notify>,
58}
59
60impl Default for ManualTimeProvider {
61 fn default() -> Self {
62 Self::new(TimeMillis::zero())
63 }
64}
65
66impl ManualTimeProvider {
67 pub fn new(start_time_millis: TimeMillis) -> Self {
69 Self {
70 current_time: Arc::new(RwLock::new(start_time_millis)),
71 wake_times: Arc::new(RwLock::new(BinaryHeap::new())),
72 new_sleepers_notify: Arc::new(Notify::new()),
73 }
74 }
75
76 pub async fn run_all_sleepers_till_done(&self, cancellation_token: &CancellationToken) {
82 while !cancellation_token.is_cancelled() {
83 if self.wake_times.read().is_empty() {
84 tokio::select! {
85 _ = self.new_sleepers_notify.notified() => {},
86 _ = cancellation_token.cancelled() => {},
87 }
88 }
89
90 tokio::task::yield_now().await;
91 self.advance_time_until_next_sleeper().await;
92 }
93 }
94
95 pub async fn advance_time_until_next_sleeper(&self) {
99 let mut current = self.current_time.write();
100 let mut wake_times = self.wake_times.write();
101
102 let new_time = match wake_times.peek() {
103 Some(wake_time) => wake_time.time,
104 None => *current,
105 };
106
107 *current = new_time;
109
110 let mut wakers_to_wake = Vec::new();
112
113 while let Some(wake_time) = wake_times.peek() {
114 if wake_time.time <= new_time {
115 if let Some(entry) = wake_times.pop() {
116 wakers_to_wake.push(entry.waker);
117 }
118 }
119 else {
120 break;
121 }
122 }
123
124 drop(current);
126 drop(wake_times);
127
128 for waker in wakers_to_wake {
130 waker.wake();
131 }
132 }
133
134 pub fn set_time(&self, time: TimeMillis) {
139 *self.current_time.write() = time;
140
141 let mut wake_times = self.wake_times.write();
142 let mut wakers_to_wake = Vec::new();
143 while let Some(wake_time) = wake_times.peek() {
144 if wake_time.time <= time {
145 if let Some(entry) = wake_times.pop() {
146 wakers_to_wake.push(entry.waker);
147 }
148 } else {
149 break;
150 }
151 }
152 drop(wake_times);
153 for waker in wakers_to_wake {
154 waker.wake();
155 }
156 }
157
158 fn register_wake_time(&self, wake_time: TimeMillis, waker: Waker) {
160 let mut wake_times = self.wake_times.write();
161 wake_times.push(ManualTimeProviderWakeTime { time: wake_time, waker });
162
163 self.new_sleepers_notify.notify_one();
165 }
166}
167
168pub struct ManualTimeProviderSleep {
170 provider: ManualTimeProvider,
171 wake_time: TimeMillis,
172}
173
174impl Future for ManualTimeProviderSleep {
175 type Output = ();
176
177 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
178 let current_time = self.provider.current_time_millis();
180 if current_time >= self.wake_time {
181 return Poll::Ready(());
182 }
183
184 let waker = cx.waker().clone();
186 self.provider.register_wake_time(self.wake_time, waker);
187
188 Poll::Pending
189 }
190}
191
192impl TimeProvider for ManualTimeProvider {
193 fn current_time_millis(&self) -> TimeMillis {
194 *self.current_time.read()
195 }
196
197 fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
198 let current_time = self.current_time_millis();
199 let wake_time = current_time + duration;
200
201 Box::pin(ManualTimeProviderSleep {
202 provider: self.clone(), wake_time,
204 })
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use crate::tools::time::{MILLIS_IN_SECOND, TimeMillis};
211 use log::info;
212 use std::sync::Arc;
213 use tokio_util::sync::CancellationToken;
214 use crate::tools::time_provider::manual_time_provider::ManualTimeProvider;
215 use crate::tools::time_provider::time_provider::TimeProvider;
216
217 #[tokio::test]
218 async fn generic_test() {
219 let time_provider = Arc::new(ManualTimeProvider::new(TimeMillis::zero()));
220 let cancellation_token = CancellationToken::new();
223
224 tokio::join!(
225 async {
226 info!("Thread 1 start");
227 for _ in 0..10 {
228 info!("Thread 1 tick");
229 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
230 }
231 info!("Thread 1 end");
232 },
233 async {
234 info!("Thread 2 start");
235 for _ in 0..10 {
236 tokio::task::yield_now().await;
237 tokio::task::yield_now().await;
238 tokio::task::yield_now().await;
239 info!("Thread 2 tick");
240 tokio::task::yield_now().await;
241 tokio::task::yield_now().await;
242 tokio::task::yield_now().await;
243 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
244 tokio::task::yield_now().await;
245 tokio::task::yield_now().await;
246 tokio::task::yield_now().await;
247 }
248 cancellation_token.cancel();
249 info!("Thread 2 end");
250 },
251 async {
252 info!("Time driver start");
253 time_provider.run_all_sleepers_till_done(&cancellation_token).await;
254 info!("Time driver end");
255 },
256 );
257 }
258}