blob: c783d26f13fdb53a1f003899634fb430ecdc861a [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use alloc::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::mem::{self, ManuallyDrop};
6use core::pin::Pin;
7use core::ptr::NonNull;
8use core::sync::atomic::{AtomicUsize, Ordering};
9use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +020010
11use crate::header::Header;
12use crate::state::*;
13use crate::utils::{abort_on_panic, extend};
14use crate::Task;
15
16/// The vtable for a task.
17pub(crate) struct TaskVTable {
18 /// The raw waker vtable.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +010019 pub(crate) raw_waker_vtable: RawWakerVTable,
Stjepan Glavina1479e862019-08-12 20:18:51 +020020
21 /// Schedules the task.
22 pub(crate) schedule: unsafe fn(*const ()),
23
24 /// Drops the future inside the task.
25 pub(crate) drop_future: unsafe fn(*const ()),
26
27 /// Returns a pointer to the output stored after completion.
28 pub(crate) get_output: unsafe fn(*const ()) -> *const (),
29
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020030 /// Drops the task.
31 pub(crate) drop_task: unsafe fn(ptr: *const ()),
Stjepan Glavina1479e862019-08-12 20:18:51 +020032
33 /// Destroys the task.
34 pub(crate) destroy: unsafe fn(*const ()),
35
36 /// Runs the task.
37 pub(crate) run: unsafe fn(*const ()),
Stjepan Glavinaaf051a52020-01-06 15:25:52 -060038
39 /// Creates a new waker associated with the task.
40 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
Stjepan Glavina1479e862019-08-12 20:18:51 +020041}
42
43/// Memory layout of a task.
44///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020045/// This struct contains the following information:
Stjepan Glavina1479e862019-08-12 20:18:51 +020046///
47/// 1. How to allocate and deallocate the task.
48/// 2. How to access the fields inside the task.
49#[derive(Clone, Copy)]
50pub(crate) struct TaskLayout {
51 /// Memory layout of the whole task.
52 pub(crate) layout: Layout,
53
54 /// Offset into the task at which the tag is stored.
55 pub(crate) offset_t: usize,
56
57 /// Offset into the task at which the schedule function is stored.
58 pub(crate) offset_s: usize,
59
60 /// Offset into the task at which the future is stored.
61 pub(crate) offset_f: usize,
62
63 /// Offset into the task at which the output is stored.
64 pub(crate) offset_r: usize,
65}
66
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020067/// Raw pointers to the fields inside a task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020068pub(crate) struct RawTask<F, R, S, T> {
69 /// The task header.
70 pub(crate) header: *const Header,
71
72 /// The schedule function.
73 pub(crate) schedule: *const S,
74
75 /// The tag inside the task.
76 pub(crate) tag: *mut T,
77
78 /// The future.
79 pub(crate) future: *mut F,
80
81 /// The output of the future.
82 pub(crate) output: *mut R,
83}
84
85impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
86
87impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
88 fn clone(&self) -> Self {
89 Self {
90 header: self.header,
91 schedule: self.schedule,
92 tag: self.tag,
93 future: self.future,
94 output: self.output,
95 }
96 }
97}
98
99impl<F, R, S, T> RawTask<F, R, S, T>
100where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100101 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200102 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200103{
104 /// Allocates a task with the given `future` and `schedule` function.
105 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200106 /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100107 pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200108 // Compute the layout of the task for allocation. Abort if the computation fails.
109 let task_layout = abort_on_panic(|| Self::task_layout());
110
111 unsafe {
112 // Allocate enough space for the entire task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600113 let raw_task = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
114 None => libc::abort(),
Stjepan Glavina1479e862019-08-12 20:18:51 +0200115 Some(p) => p,
116 };
117
118 let raw = Self::from_ptr(raw_task.as_ptr());
119
120 // Write the header as the first field of the task.
121 (raw.header as *mut Header).write(Header {
122 state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600123 awaiter: UnsafeCell::new(None),
Stjepan Glavina1479e862019-08-12 20:18:51 +0200124 vtable: &TaskVTable {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100125 raw_waker_vtable: RawWakerVTable::new(
Stjepan Glavina1479e862019-08-12 20:18:51 +0200126 Self::clone_waker,
127 Self::wake,
128 Self::wake_by_ref,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200129 Self::drop_waker,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200130 ),
131 schedule: Self::schedule,
132 drop_future: Self::drop_future,
133 get_output: Self::get_output,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200134 drop_task: Self::drop_task,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200135 destroy: Self::destroy,
136 run: Self::run,
Stjepan Glavinaaf051a52020-01-06 15:25:52 -0600137 clone_waker: Self::clone_waker,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200138 },
139 });
140
141 // Write the tag as the second field of the task.
142 (raw.tag as *mut T).write(tag);
143
144 // Write the schedule function as the third field of the task.
145 (raw.schedule as *mut S).write(schedule);
146
147 // Write the future as the fourth field of the task.
148 raw.future.write(future);
149
150 raw_task
151 }
152 }
153
154 /// Creates a `RawTask` from a raw task pointer.
155 #[inline]
156 pub(crate) fn from_ptr(ptr: *const ()) -> Self {
157 let task_layout = Self::task_layout();
158 let p = ptr as *const u8;
159
160 unsafe {
161 Self {
162 header: p as *const Header,
163 tag: p.add(task_layout.offset_t) as *mut T,
164 schedule: p.add(task_layout.offset_s) as *const S,
165 future: p.add(task_layout.offset_f) as *mut F,
166 output: p.add(task_layout.offset_r) as *mut R,
167 }
168 }
169 }
170
171 /// Returns the memory layout for a task.
172 #[inline]
173 fn task_layout() -> TaskLayout {
174 // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
175 let layout_header = Layout::new::<Header>();
176 let layout_t = Layout::new::<T>();
177 let layout_s = Layout::new::<S>();
178 let layout_f = Layout::new::<F>();
179 let layout_r = Layout::new::<R>();
180
181 // Compute the layout for `union { F, R }`.
182 let size_union = layout_f.size().max(layout_r.size());
183 let align_union = layout_f.align().max(layout_r.align());
184 let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
185
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200186 // Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200187 let layout = layout_header;
188 let (layout, offset_t) = extend(layout, layout_t);
189 let (layout, offset_s) = extend(layout, layout_s);
190 let (layout, offset_union) = extend(layout, layout_union);
191 let offset_f = offset_union;
192 let offset_r = offset_union;
193
194 TaskLayout {
195 layout,
196 offset_t,
197 offset_s,
198 offset_f,
199 offset_r,
200 }
201 }
202
203 /// Wakes a waker.
204 unsafe fn wake(ptr: *const ()) {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100205 // This is just an optimization. If the schedule function has captured variables, then
206 // we'll do less reference counting if we wake the waker by reference and then drop it.
207 if mem::size_of::<S>() > 0 {
208 Self::wake_by_ref(ptr);
209 Self::drop_waker(ptr);
210 return;
211 }
212
Stjepan Glavina1479e862019-08-12 20:18:51 +0200213 let raw = Self::from_ptr(ptr);
214
215 let mut state = (*raw.header).state.load(Ordering::Acquire);
216
217 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200218 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200219 if state & (COMPLETED | CLOSED) != 0 {
220 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200221 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200222 break;
223 }
224
225 // If the task is already scheduled, we just need to synchronize with the thread that
226 // will run the task by "publishing" our current view of the memory.
227 if state & SCHEDULED != 0 {
228 // Update the state without actually modifying it.
229 match (*raw.header).state.compare_exchange_weak(
230 state,
231 state,
232 Ordering::AcqRel,
233 Ordering::Acquire,
234 ) {
235 Ok(_) => {
236 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200237 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200238 break;
239 }
240 Err(s) => state = s,
241 }
242 } else {
243 // Mark the task as scheduled.
244 match (*raw.header).state.compare_exchange_weak(
245 state,
246 state | SCHEDULED,
247 Ordering::AcqRel,
248 Ordering::Acquire,
249 ) {
250 Ok(_) => {
251 // If the task is not yet scheduled and isn't currently running, now is the
252 // time to schedule it.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100253 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200254 // Schedule the task.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100255 Self::schedule(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200256 } else {
257 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200258 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200259 }
260
261 break;
262 }
263 Err(s) => state = s,
264 }
265 }
266 }
267 }
268
269 /// Wakes a waker by reference.
270 unsafe fn wake_by_ref(ptr: *const ()) {
271 let raw = Self::from_ptr(ptr);
272
273 let mut state = (*raw.header).state.load(Ordering::Acquire);
274
275 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200276 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200277 if state & (COMPLETED | CLOSED) != 0 {
278 break;
279 }
280
281 // If the task is already scheduled, we just need to synchronize with the thread that
282 // will run the task by "publishing" our current view of the memory.
283 if state & SCHEDULED != 0 {
284 // Update the state without actually modifying it.
285 match (*raw.header).state.compare_exchange_weak(
286 state,
287 state,
288 Ordering::AcqRel,
289 Ordering::Acquire,
290 ) {
291 Ok(_) => break,
292 Err(s) => state = s,
293 }
294 } else {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100295 // If the task is not running, we can schedule right away.
296 let new = if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200297 (state | SCHEDULED) + REFERENCE
298 } else {
299 state | SCHEDULED
300 };
301
302 // Mark the task as scheduled.
303 match (*raw.header).state.compare_exchange_weak(
304 state,
305 new,
306 Ordering::AcqRel,
307 Ordering::Acquire,
308 ) {
309 Ok(_) => {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100310 // If the task is not running, now is the time to schedule.
311 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200312 // If the reference count overflowed, abort.
313 if state > isize::max_value() as usize {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600314 libc::abort();
Stjepan Glavina1479e862019-08-12 20:18:51 +0200315 }
316
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100317 // Schedule the task. There is no need to call `Self::schedule(ptr)`
318 // because the schedule function cannot be destroyed while the waker is
319 // still alive.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200320 let task = Task {
321 raw_task: NonNull::new_unchecked(ptr as *mut ()),
322 _marker: PhantomData,
323 };
324 (*raw.schedule)(task);
325 }
326
327 break;
328 }
329 Err(s) => state = s,
330 }
331 }
332 }
333 }
334
335 /// Clones a waker.
336 unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
337 let raw = Self::from_ptr(ptr);
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100338 let raw_waker_vtable = &(*raw.header).vtable.raw_waker_vtable;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200339
340 // Increment the reference count. With any kind of reference-counted data structure,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200341 // relaxed ordering is appropriate when incrementing the counter.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200342 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
343
344 // If the reference count overflowed, abort.
345 if state > isize::max_value() as usize {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600346 libc::abort();
Stjepan Glavina1479e862019-08-12 20:18:51 +0200347 }
348
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100349 RawWaker::new(ptr, raw_waker_vtable)
Stjepan Glavina1479e862019-08-12 20:18:51 +0200350 }
351
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200352 /// Drops a waker.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200353 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200354 /// This function will decrement the reference count. If it drops down to zero, the associated
355 /// join handle has been dropped too, and the task has not been completed, then it will get
356 /// scheduled one more time so that its future gets dropped by the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200357 #[inline]
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200358 unsafe fn drop_waker(ptr: *const ()) {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200359 let raw = Self::from_ptr(ptr);
360
361 // Decrement the reference count.
362 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
363
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200364 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
365 // then we need to decide how to destroy the task.
366 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
367 if new & (COMPLETED | CLOSED) == 0 {
368 // If the task was not completed nor closed, close it and schedule one more time so
369 // that its future gets dropped by the executor.
370 (*raw.header)
371 .state
372 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100373 Self::schedule(ptr);
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200374 } else {
375 // Otherwise, destroy the task right away.
376 Self::destroy(ptr);
377 }
378 }
379 }
380
381 /// Drops a task.
382 ///
383 /// This function will decrement the reference count. If it drops down to zero and the
384 /// associated join handle has been dropped too, then the task gets destroyed.
385 #[inline]
386 unsafe fn drop_task(ptr: *const ()) {
387 let raw = Self::from_ptr(ptr);
388
389 // Decrement the reference count.
390 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
391
392 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
393 // then destroy the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200394 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
395 Self::destroy(ptr);
396 }
397 }
398
399 /// Schedules a task for running.
400 ///
401 /// This function doesn't modify the state of the task. It only passes the task reference to
402 /// its schedule function.
403 unsafe fn schedule(ptr: *const ()) {
404 let raw = Self::from_ptr(ptr);
405
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100406 // If the schedule function has captured variables, create a temporary waker that prevents
407 // the task from getting deallocated while the function is being invoked.
408 let _waker;
409 if mem::size_of::<S>() > 0 {
410 _waker = Waker::from_raw(Self::clone_waker(ptr));
411 }
412
413 let task = Task {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200414 raw_task: NonNull::new_unchecked(ptr as *mut ()),
415 _marker: PhantomData,
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100416 };
417 (*raw.schedule)(task);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200418 }
419
420 /// Drops the future inside a task.
421 #[inline]
422 unsafe fn drop_future(ptr: *const ()) {
423 let raw = Self::from_ptr(ptr);
424
425 // We need a safeguard against panics because the destructor can panic.
426 abort_on_panic(|| {
427 raw.future.drop_in_place();
428 })
429 }
430
431 /// Returns a pointer to the output inside a task.
432 unsafe fn get_output(ptr: *const ()) -> *const () {
433 let raw = Self::from_ptr(ptr);
434 raw.output as *const ()
435 }
436
437 /// Cleans up task's resources and deallocates it.
438 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200439 /// The schedule function and the tag will be dropped, and the task will then get deallocated.
440 /// The task must be closed before this function is called.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200441 #[inline]
442 unsafe fn destroy(ptr: *const ()) {
443 let raw = Self::from_ptr(ptr);
444 let task_layout = Self::task_layout();
445
446 // We need a safeguard against panics because destructors can panic.
447 abort_on_panic(|| {
448 // Drop the schedule function.
449 (raw.schedule as *mut S).drop_in_place();
450
451 // Drop the tag.
452 (raw.tag as *mut T).drop_in_place();
453 });
454
455 // Finally, deallocate the memory reserved by the task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600456 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200457 }
458
459 /// Runs a task.
460 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200461 /// If polling its future panics, the task will be closed and the panic will be propagated into
462 /// the caller.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200463 unsafe fn run(ptr: *const ()) {
464 let raw = Self::from_ptr(ptr);
465
466 // Create a context from the raw task pointer and the vtable inside the its header.
467 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
468 ptr,
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100469 &(*raw.header).vtable.raw_waker_vtable,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200470 )));
471 let cx = &mut Context::from_waker(&waker);
472
473 let mut state = (*raw.header).state.load(Ordering::Acquire);
474
475 // Update the task's state before polling its future.
476 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200477 // If the task has already been closed, drop the task reference and return.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200478 if state & CLOSED != 0 {
479 // Notify the awaiter that the task has been closed.
480 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600481 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200482 }
483
484 // Drop the future.
485 Self::drop_future(ptr);
486
487 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200488 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200489 return;
490 }
491
492 // Mark the task as unscheduled and running.
493 match (*raw.header).state.compare_exchange_weak(
494 state,
495 (state & !SCHEDULED) | RUNNING,
496 Ordering::AcqRel,
497 Ordering::Acquire,
498 ) {
499 Ok(_) => {
500 // Update the state because we're continuing with polling the future.
501 state = (state & !SCHEDULED) | RUNNING;
502 break;
503 }
504 Err(s) => state = s,
505 }
506 }
507
508 // Poll the inner future, but surround it with a guard that closes the task in case polling
509 // panics.
510 let guard = Guard(raw);
511 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
512 mem::forget(guard);
513
514 match poll {
515 Poll::Ready(out) => {
516 // Replace the future with its output.
517 Self::drop_future(ptr);
518 raw.output.write(out);
519
520 // A place where the output will be stored in case it needs to be dropped.
521 let mut output = None;
522
523 // The task is now completed.
524 loop {
525 // If the handle is dropped, we'll need to close it and drop the output.
526 let new = if state & HANDLE == 0 {
527 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
528 } else {
529 (state & !RUNNING & !SCHEDULED) | COMPLETED
530 };
531
532 // Mark the task as not running and completed.
533 match (*raw.header).state.compare_exchange_weak(
534 state,
535 new,
536 Ordering::AcqRel,
537 Ordering::Acquire,
538 ) {
539 Ok(_) => {
540 // If the handle is dropped or if the task was closed while running,
541 // now it's time to drop the output.
542 if state & HANDLE == 0 || state & CLOSED != 0 {
543 // Read the output.
544 output = Some(raw.output.read());
545 }
546
547 // Notify the awaiter that the task has been completed.
548 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600549 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200550 }
551
552 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200553 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200554 break;
555 }
556 Err(s) => state = s,
557 }
558 }
559
560 // Drop the output if it was taken out of the task.
561 drop(output);
562 }
563 Poll::Pending => {
564 // The task is still not completed.
565 loop {
566 // If the task was closed while running, we'll need to unschedule in case it
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200567 // was woken up and then destroy it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200568 let new = if state & CLOSED != 0 {
569 state & !RUNNING & !SCHEDULED
570 } else {
571 state & !RUNNING
572 };
573
574 // Mark the task as not running.
575 match (*raw.header).state.compare_exchange_weak(
576 state,
577 new,
578 Ordering::AcqRel,
579 Ordering::Acquire,
580 ) {
581 Ok(state) => {
582 // If the task was closed while running, we need to drop its future.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200583 // If the task was woken up while running, we need to schedule it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200584 // Otherwise, we just drop the task reference.
585 if state & CLOSED != 0 {
586 // The thread that closed the task didn't drop the future because
587 // it was running so now it's our responsibility to do so.
588 Self::drop_future(ptr);
589
590 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200591 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200592 } else if state & SCHEDULED != 0 {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200593 // The thread that woke the task up didn't reschedule it because
Stjepan Glavina1479e862019-08-12 20:18:51 +0200594 // it was running so now it's our responsibility to do so.
595 Self::schedule(ptr);
596 } else {
597 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200598 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200599 }
600 break;
601 }
602 Err(s) => state = s,
603 }
604 }
605 }
606 }
607
608 /// A guard that closes the task if polling its future panics.
609 struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
610 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100611 F: Future<Output = R> + 'static,
612 S: Fn(Task<T>) + Send + Sync + 'static;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200613
614 impl<F, R, S, T> Drop for Guard<F, R, S, T>
615 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100616 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200617 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200618 {
619 fn drop(&mut self) {
620 let raw = self.0;
621 let ptr = raw.header as *const ();
622
623 unsafe {
624 let mut state = (*raw.header).state.load(Ordering::Acquire);
625
626 loop {
627 // If the task was closed while running, then unschedule it, drop its
628 // future, and drop the task reference.
629 if state & CLOSED != 0 {
630 // We still need to unschedule the task because it is possible it was
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200631 // woken up while running.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200632 (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
633
634 // The thread that closed the task didn't drop the future because it
635 // was running so now it's our responsibility to do so.
636 RawTask::<F, R, S, T>::drop_future(ptr);
637
638 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200639 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200640 break;
641 }
642
643 // Mark the task as not running, not scheduled, and closed.
644 match (*raw.header).state.compare_exchange_weak(
645 state,
646 (state & !RUNNING & !SCHEDULED) | CLOSED,
647 Ordering::AcqRel,
648 Ordering::Acquire,
649 ) {
650 Ok(state) => {
651 // Drop the future because the task is now closed.
652 RawTask::<F, R, S, T>::drop_future(ptr);
653
654 // Notify the awaiter that the task has been closed.
655 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600656 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200657 }
658
659 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200660 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200661 break;
662 }
663 Err(s) => state = s,
664 }
665 }
666 }
667 }
668 }
669 }
670}