blob: 547ff7a3abdb5c145c6d29869620dd35464a647d [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001#![feature(async_await)]
2
3use std::future::Future;
4use std::pin::Pin;
5use std::task::Waker;
6use std::task::{Context, Poll};
7use std::thread;
8use std::time::Duration;
9
10use async_task::Task;
11use crossbeam::atomic::AtomicCell;
12use crossbeam::channel;
13use lazy_static::lazy_static;
14
15// Creates a future with event counters.
16//
17// Usage: `future!(f, waker, POLL, DROP)`
18//
19// The future `f` always sleeps for 200 ms and returns `Poll::Pending`.
20// When it gets polled, `POLL` is incremented.
21// When it gets dropped, `DROP` is incremented.
22//
23// Every time the future is run, it stores the waker into a global variable.
24// This waker can be extracted using the `waker` function.
25macro_rules! future {
26 ($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
27 lazy_static! {
28 static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
29 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
30 static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
31 }
32
33 let ($name, $waker) = {
34 struct Fut(Box<i32>);
35
36 impl Future for Fut {
37 type Output = ();
38
39 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40 WAKER.store(Some(cx.waker().clone()));
41 $poll.fetch_add(1);
42 thread::sleep(ms(200));
43 Poll::Pending
44 }
45 }
46
47 impl Drop for Fut {
48 fn drop(&mut self) {
49 $drop.fetch_add(1);
50 }
51 }
52
53 (Fut(Box::new(0)), || WAKER.swap(None).unwrap())
54 };
55 };
56}
57
58// Creates a schedule function with event counters.
59//
60// Usage: `schedule!(s, chan, SCHED, DROP)`
61//
62// The schedule function `s` pushes the task into `chan`.
63// When it gets invoked, `SCHED` is incremented.
64// When it gets dropped, `DROP` is incremented.
65//
66// Receiver `chan` extracts the task when it is scheduled.
67macro_rules! schedule {
68 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
69 lazy_static! {
70 static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
71 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
72 }
73
74 let ($name, $chan) = {
75 let (s, r) = channel::unbounded();
76
77 struct Guard(Box<i32>);
78
79 impl Drop for Guard {
80 fn drop(&mut self) {
81 $drop.fetch_add(1);
82 }
83 }
84
85 let guard = Guard(Box::new(0));
86 let sched = move |task: Task<_>| {
87 &guard;
88 $sched.fetch_add(1);
89 s.send(task).unwrap();
90 };
91
92 (sched, r)
93 };
94 };
95}
96
97// Creates a task with event counters.
98//
99// Usage: `task!(task, handle f, s, DROP)`
100//
101// A task with future `f` and schedule function `s` is created.
102// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
103// When the tag inside the task gets dropped, `DROP` is incremented.
104macro_rules! task {
105 ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
106 lazy_static! {
107 static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
108 }
109
110 let ($task, $handle) = {
111 struct Tag(Box<i32>);
112
113 impl Drop for Tag {
114 fn drop(&mut self) {
115 $drop.fetch_add(1);
116 }
117 }
118
119 async_task::spawn($future, $schedule, Tag(Box::new(0)))
120 };
121 };
122}
123
124fn ms(ms: u64) -> Duration {
125 Duration::from_millis(ms)
126}
127
128#[test]
129fn wake_during_run() {
130 future!(f, waker, POLL, DROP_F);
131 schedule!(s, chan, SCHEDULE, DROP_S);
132 task!(task, _handle, f, s, DROP_D);
133
134 task.run();
135 let w = waker();
136 w.wake_by_ref();
137 let task = chan.recv().unwrap();
138
139 crossbeam::scope(|scope| {
140 scope.spawn(|_| {
141 task.run();
142 assert_eq!(POLL.load(), 2);
143 assert_eq!(SCHEDULE.load(), 2);
144 assert_eq!(DROP_F.load(), 0);
145 assert_eq!(DROP_S.load(), 0);
146 assert_eq!(DROP_D.load(), 0);
147 assert_eq!(chan.len(), 1);
148 });
149
150 thread::sleep(ms(100));
151
152 w.wake_by_ref();
153 assert_eq!(POLL.load(), 2);
154 assert_eq!(SCHEDULE.load(), 1);
155 assert_eq!(DROP_F.load(), 0);
156 assert_eq!(DROP_S.load(), 0);
157 assert_eq!(DROP_D.load(), 0);
158 assert_eq!(chan.len(), 0);
159
160 thread::sleep(ms(200));
161
162 assert_eq!(POLL.load(), 2);
163 assert_eq!(SCHEDULE.load(), 2);
164 assert_eq!(DROP_F.load(), 0);
165 assert_eq!(DROP_S.load(), 0);
166 assert_eq!(DROP_D.load(), 0);
167 assert_eq!(chan.len(), 1);
168 })
169 .unwrap();
170
171 chan.recv().unwrap();
172 drop(waker());
173}
174
175#[test]
176fn cancel_during_run() {
177 future!(f, waker, POLL, DROP_F);
178 schedule!(s, chan, SCHEDULE, DROP_S);
179 task!(task, handle, f, s, DROP_D);
180
181 task.run();
182 let w = waker();
183 w.wake();
184 let task = chan.recv().unwrap();
185
186 crossbeam::scope(|scope| {
187 scope.spawn(|_| {
188 task.run();
189 drop(waker());
190 assert_eq!(POLL.load(), 2);
191 assert_eq!(SCHEDULE.load(), 1);
192 assert_eq!(DROP_F.load(), 1);
193 assert_eq!(DROP_S.load(), 1);
194 assert_eq!(DROP_D.load(), 1);
195 assert_eq!(chan.len(), 0);
196 });
197
198 thread::sleep(ms(100));
199
200 handle.cancel();
201 assert_eq!(POLL.load(), 2);
202 assert_eq!(SCHEDULE.load(), 1);
203 assert_eq!(DROP_F.load(), 0);
204 assert_eq!(DROP_S.load(), 0);
205 assert_eq!(DROP_D.load(), 0);
206 assert_eq!(chan.len(), 0);
207
208 drop(handle);
209 assert_eq!(POLL.load(), 2);
210 assert_eq!(SCHEDULE.load(), 1);
211 assert_eq!(DROP_F.load(), 0);
212 assert_eq!(DROP_S.load(), 0);
213 assert_eq!(DROP_D.load(), 0);
214 assert_eq!(chan.len(), 0);
215
216 thread::sleep(ms(200));
217
218 assert_eq!(POLL.load(), 2);
219 assert_eq!(SCHEDULE.load(), 1);
220 assert_eq!(DROP_F.load(), 1);
221 assert_eq!(DROP_S.load(), 1);
222 assert_eq!(DROP_D.load(), 1);
223 assert_eq!(chan.len(), 0);
224 })
225 .unwrap();
226}
227
228#[test]
229fn wake_and_cancel_during_run() {
230 future!(f, waker, POLL, DROP_F);
231 schedule!(s, chan, SCHEDULE, DROP_S);
232 task!(task, handle, f, s, DROP_D);
233
234 task.run();
235 let w = waker();
236 w.wake_by_ref();
237 let task = chan.recv().unwrap();
238
239 crossbeam::scope(|scope| {
240 scope.spawn(|_| {
241 task.run();
242 drop(waker());
243 assert_eq!(POLL.load(), 2);
244 assert_eq!(SCHEDULE.load(), 1);
245 assert_eq!(DROP_F.load(), 1);
246 assert_eq!(DROP_S.load(), 1);
247 assert_eq!(DROP_D.load(), 1);
248 assert_eq!(chan.len(), 0);
249 });
250
251 thread::sleep(ms(100));
252
253 w.wake();
254 assert_eq!(POLL.load(), 2);
255 assert_eq!(SCHEDULE.load(), 1);
256 assert_eq!(DROP_F.load(), 0);
257 assert_eq!(DROP_S.load(), 0);
258 assert_eq!(DROP_D.load(), 0);
259 assert_eq!(chan.len(), 0);
260
261 handle.cancel();
262 assert_eq!(POLL.load(), 2);
263 assert_eq!(SCHEDULE.load(), 1);
264 assert_eq!(DROP_F.load(), 0);
265 assert_eq!(DROP_S.load(), 0);
266 assert_eq!(DROP_D.load(), 0);
267 assert_eq!(chan.len(), 0);
268
269 drop(handle);
270 assert_eq!(POLL.load(), 2);
271 assert_eq!(SCHEDULE.load(), 1);
272 assert_eq!(DROP_F.load(), 0);
273 assert_eq!(DROP_S.load(), 0);
274 assert_eq!(DROP_D.load(), 0);
275 assert_eq!(chan.len(), 0);
276
277 thread::sleep(ms(200));
278
279 assert_eq!(POLL.load(), 2);
280 assert_eq!(SCHEDULE.load(), 1);
281 assert_eq!(DROP_F.load(), 1);
282 assert_eq!(DROP_S.load(), 1);
283 assert_eq!(DROP_D.load(), 1);
284 assert_eq!(chan.len(), 0);
285 })
286 .unwrap();
287}
288
289#[test]
290fn cancel_and_wake_during_run() {
291 future!(f, waker, POLL, DROP_F);
292 schedule!(s, chan, SCHEDULE, DROP_S);
293 task!(task, handle, f, s, DROP_D);
294
295 task.run();
296 let w = waker();
297 w.wake_by_ref();
298 let task = chan.recv().unwrap();
299
300 crossbeam::scope(|scope| {
301 scope.spawn(|_| {
302 task.run();
303 drop(waker());
304 assert_eq!(POLL.load(), 2);
305 assert_eq!(SCHEDULE.load(), 1);
306 assert_eq!(DROP_F.load(), 1);
307 assert_eq!(DROP_S.load(), 1);
308 assert_eq!(DROP_D.load(), 1);
309 assert_eq!(chan.len(), 0);
310 });
311
312 thread::sleep(ms(100));
313
314 handle.cancel();
315 assert_eq!(POLL.load(), 2);
316 assert_eq!(SCHEDULE.load(), 1);
317 assert_eq!(DROP_F.load(), 0);
318 assert_eq!(DROP_S.load(), 0);
319 assert_eq!(DROP_D.load(), 0);
320 assert_eq!(chan.len(), 0);
321
322 drop(handle);
323 assert_eq!(POLL.load(), 2);
324 assert_eq!(SCHEDULE.load(), 1);
325 assert_eq!(DROP_F.load(), 0);
326 assert_eq!(DROP_S.load(), 0);
327 assert_eq!(DROP_D.load(), 0);
328 assert_eq!(chan.len(), 0);
329
330 w.wake();
331 assert_eq!(POLL.load(), 2);
332 assert_eq!(SCHEDULE.load(), 1);
333 assert_eq!(DROP_F.load(), 0);
334 assert_eq!(DROP_S.load(), 0);
335 assert_eq!(DROP_D.load(), 0);
336 assert_eq!(chan.len(), 0);
337
338 thread::sleep(ms(200));
339
340 assert_eq!(POLL.load(), 2);
341 assert_eq!(SCHEDULE.load(), 1);
342 assert_eq!(DROP_F.load(), 1);
343 assert_eq!(DROP_S.load(), 1);
344 assert_eq!(DROP_D.load(), 1);
345 assert_eq!(chan.len(), 0);
346 })
347 .unwrap();
348}