blob: e64cc554a6715d452b008efc8dc0b4ba1db6d487 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001#![feature(async_await)]
2
3use std::cell::Cell;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::Waker;
7use std::task::{Context, Poll};
8use std::thread;
9use std::time::Duration;
10
11use async_task::Task;
12use crossbeam::atomic::AtomicCell;
13use crossbeam::channel;
14use lazy_static::lazy_static;
15
16// Creates a future with event counters.
17//
18// Usage: `future!(f, waker, POLL, DROP)`
19//
20// The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled.
21// When it gets polled, `POLL` is incremented.
22// When it gets dropped, `DROP` is incremented.
23//
24// Every time the future is run, it stores the waker into a global variable.
25// This waker can be extracted using the `waker` function.
26macro_rules! future {
27 ($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
28 lazy_static! {
29 static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
30 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
31 static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
32 }
33
34 let ($name, $waker) = {
35 struct Fut(Cell<bool>, Box<i32>);
36
37 impl Future for Fut {
38 type Output = Box<i32>;
39
40 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41 WAKER.store(Some(cx.waker().clone()));
42 $poll.fetch_add(1);
43 thread::sleep(ms(200));
44
45 if self.0.get() {
46 Poll::Ready(Box::new(0))
47 } else {
48 self.0.set(true);
49 Poll::Pending
50 }
51 }
52 }
53
54 impl Drop for Fut {
55 fn drop(&mut self) {
56 $drop.fetch_add(1);
57 }
58 }
59
60 (Fut(Cell::new(false), Box::new(0)), || {
61 WAKER.swap(None).unwrap()
62 })
63 };
64 };
65}
66
67// Creates a schedule function with event counters.
68//
69// Usage: `schedule!(s, chan, SCHED, DROP)`
70//
71// The schedule function `s` pushes the task into `chan`.
72// When it gets invoked, `SCHED` is incremented.
73// When it gets dropped, `DROP` is incremented.
74//
75// Receiver `chan` extracts the task when it is scheduled.
76macro_rules! schedule {
77 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
78 lazy_static! {
79 static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
80 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
81 }
82
83 let ($name, $chan) = {
84 let (s, r) = channel::unbounded();
85
86 struct Guard(Box<i32>);
87
88 impl Drop for Guard {
89 fn drop(&mut self) {
90 $drop.fetch_add(1);
91 }
92 }
93
94 let guard = Guard(Box::new(0));
95 let sched = move |task: Task<_>| {
96 &guard;
97 $sched.fetch_add(1);
98 s.send(task).unwrap();
99 };
100
101 (sched, r)
102 };
103 };
104}
105
106// Creates a task with event counters.
107//
108// Usage: `task!(task, handle f, s, DROP)`
109//
110// A task with future `f` and schedule function `s` is created.
111// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
112// When the tag inside the task gets dropped, `DROP` is incremented.
113macro_rules! task {
114 ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
115 lazy_static! {
116 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
117 }
118
119 let ($task, $handle) = {
120 struct Tag(Box<i32>);
121
122 impl Drop for Tag {
123 fn drop(&mut self) {
124 $drop.fetch_add(1);
125 }
126 }
127
128 async_task::spawn($future, $schedule, Tag(Box::new(0)))
129 };
130 };
131}
132
133fn ms(ms: u64) -> Duration {
134 Duration::from_millis(ms)
135}
136
137#[test]
138fn wake() {
139 future!(f, waker, POLL, DROP_F);
140 schedule!(s, chan, SCHEDULE, DROP_S);
141 task!(mut task, _, f, s, DROP_D);
142
143 assert!(chan.is_empty());
144
145 task.run();
146 assert_eq!(POLL.load(), 1);
147 assert_eq!(SCHEDULE.load(), 0);
148 assert_eq!(DROP_F.load(), 0);
149 assert_eq!(DROP_S.load(), 0);
150 assert_eq!(DROP_D.load(), 0);
151 assert_eq!(chan.len(), 0);
152
153 waker().wake();
154 task = chan.recv().unwrap();
155 assert_eq!(POLL.load(), 1);
156 assert_eq!(SCHEDULE.load(), 1);
157 assert_eq!(DROP_F.load(), 0);
158 assert_eq!(DROP_S.load(), 0);
159 assert_eq!(DROP_D.load(), 0);
160 assert_eq!(chan.len(), 0);
161
162 task.run();
163 assert_eq!(POLL.load(), 2);
164 assert_eq!(SCHEDULE.load(), 1);
165 assert_eq!(DROP_F.load(), 1);
166 assert_eq!(DROP_S.load(), 0);
167 assert_eq!(DROP_D.load(), 0);
168 assert_eq!(chan.len(), 0);
169
170 waker().wake();
171 assert_eq!(POLL.load(), 2);
172 assert_eq!(SCHEDULE.load(), 1);
173 assert_eq!(DROP_F.load(), 1);
174 assert_eq!(DROP_S.load(), 1);
175 assert_eq!(DROP_D.load(), 1);
176 assert_eq!(chan.len(), 0);
177}
178
179#[test]
180fn wake_by_ref() {
181 future!(f, waker, POLL, DROP_F);
182 schedule!(s, chan, SCHEDULE, DROP_S);
183 task!(mut task, _, f, s, DROP_D);
184
185 assert!(chan.is_empty());
186
187 task.run();
188 assert_eq!(POLL.load(), 1);
189 assert_eq!(SCHEDULE.load(), 0);
190 assert_eq!(DROP_F.load(), 0);
191 assert_eq!(DROP_S.load(), 0);
192 assert_eq!(DROP_D.load(), 0);
193 assert_eq!(chan.len(), 0);
194
195 waker().wake_by_ref();
196 task = chan.recv().unwrap();
197 assert_eq!(POLL.load(), 1);
198 assert_eq!(SCHEDULE.load(), 1);
199 assert_eq!(DROP_F.load(), 0);
200 assert_eq!(DROP_S.load(), 0);
201 assert_eq!(DROP_D.load(), 0);
202 assert_eq!(chan.len(), 0);
203
204 task.run();
205 assert_eq!(POLL.load(), 2);
206 assert_eq!(SCHEDULE.load(), 1);
207 assert_eq!(DROP_F.load(), 1);
208 assert_eq!(DROP_S.load(), 0);
209 assert_eq!(DROP_D.load(), 0);
210 assert_eq!(chan.len(), 0);
211
212 waker().wake_by_ref();
213 assert_eq!(POLL.load(), 2);
214 assert_eq!(SCHEDULE.load(), 1);
215 assert_eq!(DROP_F.load(), 1);
216 assert_eq!(DROP_S.load(), 1);
217 assert_eq!(DROP_D.load(), 1);
218 assert_eq!(chan.len(), 0);
219}
220
221#[test]
222fn clone() {
223 future!(f, waker, POLL, DROP_F);
224 schedule!(s, chan, SCHEDULE, DROP_S);
225 task!(mut task, _, f, s, DROP_D);
226
227 task.run();
228 assert_eq!(POLL.load(), 1);
229 assert_eq!(SCHEDULE.load(), 0);
230 assert_eq!(DROP_F.load(), 0);
231 assert_eq!(DROP_S.load(), 0);
232 assert_eq!(DROP_D.load(), 0);
233 assert_eq!(chan.len(), 0);
234
235 let w2 = waker().clone();
236 let w3 = w2.clone();
237 let w4 = w3.clone();
238 w4.wake();
239
240 task = chan.recv().unwrap();
241 task.run();
242 assert_eq!(POLL.load(), 2);
243 assert_eq!(SCHEDULE.load(), 1);
244 assert_eq!(DROP_F.load(), 1);
245 assert_eq!(DROP_S.load(), 0);
246 assert_eq!(DROP_D.load(), 0);
247 assert_eq!(chan.len(), 0);
248
249 w3.wake();
250 assert_eq!(POLL.load(), 2);
251 assert_eq!(SCHEDULE.load(), 1);
252 assert_eq!(DROP_F.load(), 1);
253 assert_eq!(DROP_S.load(), 0);
254 assert_eq!(DROP_D.load(), 0);
255 assert_eq!(chan.len(), 0);
256
257 drop(w2);
258 drop(waker());
259 assert_eq!(DROP_S.load(), 1);
260 assert_eq!(DROP_D.load(), 1);
261}
262
263#[test]
264fn wake_cancelled() {
265 future!(f, waker, POLL, DROP_F);
266 schedule!(s, chan, SCHEDULE, DROP_S);
267 task!(task, _, f, s, DROP_D);
268
269 task.run();
270 assert_eq!(POLL.load(), 1);
271 assert_eq!(SCHEDULE.load(), 0);
272 assert_eq!(DROP_F.load(), 0);
273 assert_eq!(DROP_S.load(), 0);
274 assert_eq!(DROP_D.load(), 0);
275 assert_eq!(chan.len(), 0);
276
277 let w = waker();
278
279 w.wake_by_ref();
280 chan.recv().unwrap().cancel();
281 assert_eq!(POLL.load(), 1);
282 assert_eq!(SCHEDULE.load(), 1);
283 assert_eq!(DROP_F.load(), 1);
284 assert_eq!(DROP_S.load(), 0);
285 assert_eq!(DROP_D.load(), 0);
286 assert_eq!(chan.len(), 0);
287
288 w.wake();
289 assert_eq!(POLL.load(), 1);
290 assert_eq!(SCHEDULE.load(), 1);
291 assert_eq!(DROP_F.load(), 1);
292 assert_eq!(DROP_S.load(), 1);
293 assert_eq!(DROP_D.load(), 1);
294 assert_eq!(chan.len(), 0);
295}
296
297#[test]
298fn wake_completed() {
299 future!(f, waker, POLL, DROP_F);
300 schedule!(s, chan, SCHEDULE, DROP_S);
301 task!(task, _, f, s, DROP_D);
302
303 task.run();
304 let w = waker();
305 assert_eq!(POLL.load(), 1);
306 assert_eq!(SCHEDULE.load(), 0);
307 assert_eq!(DROP_F.load(), 0);
308 assert_eq!(DROP_S.load(), 0);
309 assert_eq!(DROP_D.load(), 0);
310 assert_eq!(chan.len(), 0);
311
312 w.wake();
313 chan.recv().unwrap().run();
314 assert_eq!(POLL.load(), 2);
315 assert_eq!(SCHEDULE.load(), 1);
316 assert_eq!(DROP_F.load(), 1);
317 assert_eq!(DROP_S.load(), 0);
318 assert_eq!(DROP_D.load(), 0);
319 assert_eq!(chan.len(), 0);
320
321 waker().wake();
322 assert_eq!(POLL.load(), 2);
323 assert_eq!(SCHEDULE.load(), 1);
324 assert_eq!(DROP_F.load(), 1);
325 assert_eq!(DROP_S.load(), 1);
326 assert_eq!(DROP_D.load(), 1);
327 assert_eq!(chan.len(), 0);
328}