blob: a683f26f42696f659fb4807729dc17f18f684358 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001#![feature(async_await)]
2
3use std::cell::Cell;
4use std::future::Future;
5use std::panic::catch_unwind;
6use std::pin::Pin;
7use std::task::Waker;
8use std::task::{Context, Poll};
9use std::thread;
10use std::time::Duration;
11
12use async_task::Task;
13use crossbeam::atomic::AtomicCell;
14use crossbeam::channel;
15use lazy_static::lazy_static;
16
17// Creates a future with event counters.
18//
19// Usage: `future!(f, waker, POLL, DROP)`
20//
21// The future `f` always sleeps for 200 ms, and panics the second time it is polled.
22// When it gets polled, `POLL` is incremented.
23// When it gets dropped, `DROP` is incremented.
24//
25// Every time the future is run, it stores the waker into a global variable.
26// This waker can be extracted using the `waker` function.
27macro_rules! future {
28 ($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
29 lazy_static! {
30 static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
31 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
32 static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
33 }
34
35 let ($name, $waker) = {
36 struct Fut(Cell<bool>, Box<i32>);
37
38 impl Future for Fut {
39 type Output = ();
40
41 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42 WAKER.store(Some(cx.waker().clone()));
43 $poll.fetch_add(1);
44 thread::sleep(ms(200));
45
46 if self.0.get() {
47 panic!()
48 } else {
49 self.0.set(true);
50 Poll::Pending
51 }
52 }
53 }
54
55 impl Drop for Fut {
56 fn drop(&mut self) {
57 $drop.fetch_add(1);
58 }
59 }
60
61 (Fut(Cell::new(false), Box::new(0)), || {
62 WAKER.swap(None).unwrap()
63 })
64 };
65 };
66}
67
68// Creates a schedule function with event counters.
69//
70// Usage: `schedule!(s, chan, SCHED, DROP)`
71//
72// The schedule function `s` pushes the task into `chan`.
73// When it gets invoked, `SCHED` is incremented.
74// When it gets dropped, `DROP` is incremented.
75//
76// Receiver `chan` extracts the task when it is scheduled.
77macro_rules! schedule {
78 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
79 lazy_static! {
80 static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
81 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
82 }
83
84 let ($name, $chan) = {
85 let (s, r) = channel::unbounded();
86
87 struct Guard(Box<i32>);
88
89 impl Drop for Guard {
90 fn drop(&mut self) {
91 $drop.fetch_add(1);
92 }
93 }
94
95 let guard = Guard(Box::new(0));
96 let sched = move |task: Task<_>| {
97 &guard;
98 $sched.fetch_add(1);
99 s.send(task).unwrap();
100 };
101
102 (sched, r)
103 };
104 };
105}
106
107// Creates a task with event counters.
108//
109// Usage: `task!(task, handle f, s, DROP)`
110//
111// A task with future `f` and schedule function `s` is created.
112// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
113// When the tag inside the task gets dropped, `DROP` is incremented.
114macro_rules! task {
115 ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
116 lazy_static! {
117 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
118 }
119
120 let ($task, $handle) = {
121 struct Tag(Box<i32>);
122
123 impl Drop for Tag {
124 fn drop(&mut self) {
125 $drop.fetch_add(1);
126 }
127 }
128
129 async_task::spawn($future, $schedule, Tag(Box::new(0)))
130 };
131 };
132}
133
134fn ms(ms: u64) -> Duration {
135 Duration::from_millis(ms)
136}
137
138#[test]
139fn wake_during_run() {
140 future!(f, waker, POLL, DROP_F);
141 schedule!(s, chan, SCHEDULE, DROP_S);
142 task!(task, handle, f, s, DROP_D);
143
144 task.run();
145 let w = waker();
146 w.wake_by_ref();
147 let task = chan.recv().unwrap();
148
149 crossbeam::scope(|scope| {
150 scope.spawn(|_| {
151 assert!(catch_unwind(|| task.run()).is_err());
152 drop(waker());
153 assert_eq!(POLL.load(), 2);
154 assert_eq!(SCHEDULE.load(), 1);
155 assert_eq!(DROP_F.load(), 1);
156 assert_eq!(DROP_S.load(), 1);
157 assert_eq!(DROP_D.load(), 1);
158 assert_eq!(chan.len(), 0);
159 });
160
161 thread::sleep(ms(100));
162
163 w.wake();
164 drop(handle);
165 assert_eq!(POLL.load(), 2);
166 assert_eq!(SCHEDULE.load(), 1);
167 assert_eq!(DROP_F.load(), 0);
168 assert_eq!(DROP_S.load(), 0);
169 assert_eq!(DROP_D.load(), 0);
170 assert_eq!(chan.len(), 0);
171
172 thread::sleep(ms(200));
173
174 assert_eq!(POLL.load(), 2);
175 assert_eq!(SCHEDULE.load(), 1);
176 assert_eq!(DROP_F.load(), 1);
177 assert_eq!(DROP_S.load(), 1);
178 assert_eq!(DROP_D.load(), 1);
179 assert_eq!(chan.len(), 0);
180 })
181 .unwrap();
182}
183
184#[test]
185fn cancel_during_run() {
186 future!(f, waker, POLL, DROP_F);
187 schedule!(s, chan, SCHEDULE, DROP_S);
188 task!(task, handle, f, s, DROP_D);
189
190 task.run();
191 let w = waker();
192 w.wake();
193 let task = chan.recv().unwrap();
194
195 crossbeam::scope(|scope| {
196 scope.spawn(|_| {
197 assert!(catch_unwind(|| task.run()).is_err());
198 drop(waker());
199 assert_eq!(POLL.load(), 2);
200 assert_eq!(SCHEDULE.load(), 1);
201 assert_eq!(DROP_F.load(), 1);
202 assert_eq!(DROP_S.load(), 1);
203 assert_eq!(DROP_D.load(), 1);
204 assert_eq!(chan.len(), 0);
205 });
206
207 thread::sleep(ms(100));
208
209 handle.cancel();
210 assert_eq!(POLL.load(), 2);
211 assert_eq!(SCHEDULE.load(), 1);
212 assert_eq!(DROP_F.load(), 0);
213 assert_eq!(DROP_S.load(), 0);
214 assert_eq!(DROP_D.load(), 0);
215 assert_eq!(chan.len(), 0);
216
217 drop(handle);
218 assert_eq!(POLL.load(), 2);
219 assert_eq!(SCHEDULE.load(), 1);
220 assert_eq!(DROP_F.load(), 0);
221 assert_eq!(DROP_S.load(), 0);
222 assert_eq!(DROP_D.load(), 0);
223 assert_eq!(chan.len(), 0);
224
225 thread::sleep(ms(200));
226
227 assert_eq!(POLL.load(), 2);
228 assert_eq!(SCHEDULE.load(), 1);
229 assert_eq!(DROP_F.load(), 1);
230 assert_eq!(DROP_S.load(), 1);
231 assert_eq!(DROP_D.load(), 1);
232 assert_eq!(chan.len(), 0);
233 })
234 .unwrap();
235}
236
237#[test]
238fn wake_and_cancel_during_run() {
239 future!(f, waker, POLL, DROP_F);
240 schedule!(s, chan, SCHEDULE, DROP_S);
241 task!(task, handle, f, s, DROP_D);
242
243 task.run();
244 let w = waker();
245 w.wake_by_ref();
246 let task = chan.recv().unwrap();
247
248 crossbeam::scope(|scope| {
249 scope.spawn(|_| {
250 assert!(catch_unwind(|| task.run()).is_err());
251 drop(waker());
252 assert_eq!(POLL.load(), 2);
253 assert_eq!(SCHEDULE.load(), 1);
254 assert_eq!(DROP_F.load(), 1);
255 assert_eq!(DROP_S.load(), 1);
256 assert_eq!(DROP_D.load(), 1);
257 assert_eq!(chan.len(), 0);
258 });
259
260 thread::sleep(ms(100));
261
262 w.wake();
263 assert_eq!(POLL.load(), 2);
264 assert_eq!(SCHEDULE.load(), 1);
265 assert_eq!(DROP_F.load(), 0);
266 assert_eq!(DROP_S.load(), 0);
267 assert_eq!(DROP_D.load(), 0);
268 assert_eq!(chan.len(), 0);
269
270 handle.cancel();
271 assert_eq!(POLL.load(), 2);
272 assert_eq!(SCHEDULE.load(), 1);
273 assert_eq!(DROP_F.load(), 0);
274 assert_eq!(DROP_S.load(), 0);
275 assert_eq!(DROP_D.load(), 0);
276 assert_eq!(chan.len(), 0);
277
278 drop(handle);
279 assert_eq!(POLL.load(), 2);
280 assert_eq!(SCHEDULE.load(), 1);
281 assert_eq!(DROP_F.load(), 0);
282 assert_eq!(DROP_S.load(), 0);
283 assert_eq!(DROP_D.load(), 0);
284 assert_eq!(chan.len(), 0);
285
286 thread::sleep(ms(200));
287
288 assert_eq!(POLL.load(), 2);
289 assert_eq!(SCHEDULE.load(), 1);
290 assert_eq!(DROP_F.load(), 1);
291 assert_eq!(DROP_S.load(), 1);
292 assert_eq!(DROP_D.load(), 1);
293 assert_eq!(chan.len(), 0);
294 })
295 .unwrap();
296}
297
298#[test]
299fn cancel_and_wake_during_run() {
300 future!(f, waker, POLL, DROP_F);
301 schedule!(s, chan, SCHEDULE, DROP_S);
302 task!(task, handle, f, s, DROP_D);
303
304 task.run();
305 let w = waker();
306 w.wake_by_ref();
307 let task = chan.recv().unwrap();
308
309 crossbeam::scope(|scope| {
310 scope.spawn(|_| {
311 assert!(catch_unwind(|| task.run()).is_err());
312 drop(waker());
313 assert_eq!(POLL.load(), 2);
314 assert_eq!(SCHEDULE.load(), 1);
315 assert_eq!(DROP_F.load(), 1);
316 assert_eq!(DROP_S.load(), 1);
317 assert_eq!(DROP_D.load(), 1);
318 assert_eq!(chan.len(), 0);
319 });
320
321 thread::sleep(ms(100));
322
323 handle.cancel();
324 assert_eq!(POLL.load(), 2);
325 assert_eq!(SCHEDULE.load(), 1);
326 assert_eq!(DROP_F.load(), 0);
327 assert_eq!(DROP_S.load(), 0);
328 assert_eq!(DROP_D.load(), 0);
329 assert_eq!(chan.len(), 0);
330
331 drop(handle);
332 assert_eq!(POLL.load(), 2);
333 assert_eq!(SCHEDULE.load(), 1);
334 assert_eq!(DROP_F.load(), 0);
335 assert_eq!(DROP_S.load(), 0);
336 assert_eq!(DROP_D.load(), 0);
337 assert_eq!(chan.len(), 0);
338
339 w.wake();
340 assert_eq!(POLL.load(), 2);
341 assert_eq!(SCHEDULE.load(), 1);
342 assert_eq!(DROP_F.load(), 0);
343 assert_eq!(DROP_S.load(), 0);
344 assert_eq!(DROP_D.load(), 0);
345 assert_eq!(chan.len(), 0);
346
347 thread::sleep(ms(200));
348
349 assert_eq!(POLL.load(), 2);
350 assert_eq!(SCHEDULE.load(), 1);
351 assert_eq!(DROP_F.load(), 1);
352 assert_eq!(DROP_S.load(), 1);
353 assert_eq!(DROP_D.load(), 1);
354 assert_eq!(chan.len(), 0);
355 })
356 .unwrap();
357}