blob: ff02373165f40fc3f4b2849d8d1559f1e29f15c2 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use alloc::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::mem::{self, ManuallyDrop};
6use core::pin::Pin;
7use core::ptr::NonNull;
8use core::sync::atomic::{AtomicUsize, Ordering};
9use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +020010
11use crate::header::Header;
12use crate::state::*;
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +010013use crate::utils::{abort, abort_on_panic, extend};
Stjepan Glavina1479e862019-08-12 20:18:51 +020014use crate::Task;
15
16/// The vtable for a task.
17pub(crate) struct TaskVTable {
Stjepan Glavina1479e862019-08-12 20:18:51 +020018 /// Schedules the task.
19 pub(crate) schedule: unsafe fn(*const ()),
20
21 /// Drops the future inside the task.
22 pub(crate) drop_future: unsafe fn(*const ()),
23
24 /// Returns a pointer to the output stored after completion.
25 pub(crate) get_output: unsafe fn(*const ()) -> *const (),
26
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020027 /// Drops the task.
28 pub(crate) drop_task: unsafe fn(ptr: *const ()),
Stjepan Glavina1479e862019-08-12 20:18:51 +020029
30 /// Destroys the task.
31 pub(crate) destroy: unsafe fn(*const ()),
32
33 /// Runs the task.
Stjepan Glavina94059052020-04-12 19:46:20 +020034 pub(crate) run: unsafe fn(*const ()) -> bool,
Stjepan Glavinaaf051a52020-01-06 15:25:52 -060035
36 /// Creates a new waker associated with the task.
37 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
Stjepan Glavina1479e862019-08-12 20:18:51 +020038}
39
40/// Memory layout of a task.
41///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020042/// This struct contains the following information:
Stjepan Glavina1479e862019-08-12 20:18:51 +020043///
44/// 1. How to allocate and deallocate the task.
45/// 2. How to access the fields inside the task.
46#[derive(Clone, Copy)]
47pub(crate) struct TaskLayout {
48 /// Memory layout of the whole task.
49 pub(crate) layout: Layout,
50
51 /// Offset into the task at which the tag is stored.
52 pub(crate) offset_t: usize,
53
54 /// Offset into the task at which the schedule function is stored.
55 pub(crate) offset_s: usize,
56
57 /// Offset into the task at which the future is stored.
58 pub(crate) offset_f: usize,
59
60 /// Offset into the task at which the output is stored.
61 pub(crate) offset_r: usize,
62}
63
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020064/// Raw pointers to the fields inside a task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020065pub(crate) struct RawTask<F, R, S, T> {
66 /// The task header.
67 pub(crate) header: *const Header,
68
69 /// The schedule function.
70 pub(crate) schedule: *const S,
71
72 /// The tag inside the task.
73 pub(crate) tag: *mut T,
74
75 /// The future.
76 pub(crate) future: *mut F,
77
78 /// The output of the future.
79 pub(crate) output: *mut R,
80}
81
82impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
83
84impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
85 fn clone(&self) -> Self {
Nathan Westa1f5e182020-04-03 20:24:43 -040086 *self
Stjepan Glavina1479e862019-08-12 20:18:51 +020087 }
88}
89
90impl<F, R, S, T> RawTask<F, R, S, T>
91where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010092 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +020093 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +020094{
Stjepan Glavinaa94d2f42020-01-25 00:14:33 +010095 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
96 Self::clone_waker,
97 Self::wake,
98 Self::wake_by_ref,
99 Self::drop_waker,
100 );
101
Stjepan Glavina1479e862019-08-12 20:18:51 +0200102 /// Allocates a task with the given `future` and `schedule` function.
103 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200104 /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100105 pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200106 // Compute the layout of the task for allocation. Abort if the computation fails.
107 let task_layout = abort_on_panic(|| Self::task_layout());
108
109 unsafe {
110 // Allocate enough space for the entire task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600111 let raw_task = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100112 None => abort(),
Stjepan Glavina1479e862019-08-12 20:18:51 +0200113 Some(p) => p,
114 };
115
116 let raw = Self::from_ptr(raw_task.as_ptr());
117
118 // Write the header as the first field of the task.
119 (raw.header as *mut Header).write(Header {
120 state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600121 awaiter: UnsafeCell::new(None),
Stjepan Glavina1479e862019-08-12 20:18:51 +0200122 vtable: &TaskVTable {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200123 schedule: Self::schedule,
124 drop_future: Self::drop_future,
125 get_output: Self::get_output,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200126 drop_task: Self::drop_task,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200127 destroy: Self::destroy,
128 run: Self::run,
Stjepan Glavinaaf051a52020-01-06 15:25:52 -0600129 clone_waker: Self::clone_waker,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200130 },
131 });
132
133 // Write the tag as the second field of the task.
134 (raw.tag as *mut T).write(tag);
135
136 // Write the schedule function as the third field of the task.
137 (raw.schedule as *mut S).write(schedule);
138
139 // Write the future as the fourth field of the task.
140 raw.future.write(future);
141
142 raw_task
143 }
144 }
145
146 /// Creates a `RawTask` from a raw task pointer.
147 #[inline]
148 pub(crate) fn from_ptr(ptr: *const ()) -> Self {
149 let task_layout = Self::task_layout();
150 let p = ptr as *const u8;
151
152 unsafe {
153 Self {
154 header: p as *const Header,
155 tag: p.add(task_layout.offset_t) as *mut T,
156 schedule: p.add(task_layout.offset_s) as *const S,
157 future: p.add(task_layout.offset_f) as *mut F,
158 output: p.add(task_layout.offset_r) as *mut R,
159 }
160 }
161 }
162
163 /// Returns the memory layout for a task.
164 #[inline]
165 fn task_layout() -> TaskLayout {
166 // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
167 let layout_header = Layout::new::<Header>();
168 let layout_t = Layout::new::<T>();
169 let layout_s = Layout::new::<S>();
170 let layout_f = Layout::new::<F>();
171 let layout_r = Layout::new::<R>();
172
173 // Compute the layout for `union { F, R }`.
174 let size_union = layout_f.size().max(layout_r.size());
175 let align_union = layout_f.align().max(layout_r.align());
176 let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
177
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200178 // Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200179 let layout = layout_header;
180 let (layout, offset_t) = extend(layout, layout_t);
181 let (layout, offset_s) = extend(layout, layout_s);
182 let (layout, offset_union) = extend(layout, layout_union);
183 let offset_f = offset_union;
184 let offset_r = offset_union;
185
186 TaskLayout {
187 layout,
188 offset_t,
189 offset_s,
190 offset_f,
191 offset_r,
192 }
193 }
194
195 /// Wakes a waker.
196 unsafe fn wake(ptr: *const ()) {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100197 // This is just an optimization. If the schedule function has captured variables, then
198 // we'll do less reference counting if we wake the waker by reference and then drop it.
199 if mem::size_of::<S>() > 0 {
200 Self::wake_by_ref(ptr);
201 Self::drop_waker(ptr);
202 return;
203 }
204
Stjepan Glavina1479e862019-08-12 20:18:51 +0200205 let raw = Self::from_ptr(ptr);
206
207 let mut state = (*raw.header).state.load(Ordering::Acquire);
208
209 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200210 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200211 if state & (COMPLETED | CLOSED) != 0 {
212 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200213 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200214 break;
215 }
216
217 // If the task is already scheduled, we just need to synchronize with the thread that
218 // will run the task by "publishing" our current view of the memory.
219 if state & SCHEDULED != 0 {
220 // Update the state without actually modifying it.
221 match (*raw.header).state.compare_exchange_weak(
222 state,
223 state,
224 Ordering::AcqRel,
225 Ordering::Acquire,
226 ) {
227 Ok(_) => {
228 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200229 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200230 break;
231 }
232 Err(s) => state = s,
233 }
234 } else {
235 // Mark the task as scheduled.
236 match (*raw.header).state.compare_exchange_weak(
237 state,
238 state | SCHEDULED,
239 Ordering::AcqRel,
240 Ordering::Acquire,
241 ) {
242 Ok(_) => {
243 // If the task is not yet scheduled and isn't currently running, now is the
244 // time to schedule it.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100245 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200246 // Schedule the task.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100247 Self::schedule(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200248 } else {
249 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200250 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200251 }
252
253 break;
254 }
255 Err(s) => state = s,
256 }
257 }
258 }
259 }
260
261 /// Wakes a waker by reference.
262 unsafe fn wake_by_ref(ptr: *const ()) {
263 let raw = Self::from_ptr(ptr);
264
265 let mut state = (*raw.header).state.load(Ordering::Acquire);
266
267 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200268 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200269 if state & (COMPLETED | CLOSED) != 0 {
270 break;
271 }
272
273 // If the task is already scheduled, we just need to synchronize with the thread that
274 // will run the task by "publishing" our current view of the memory.
275 if state & SCHEDULED != 0 {
276 // Update the state without actually modifying it.
277 match (*raw.header).state.compare_exchange_weak(
278 state,
279 state,
280 Ordering::AcqRel,
281 Ordering::Acquire,
282 ) {
283 Ok(_) => break,
284 Err(s) => state = s,
285 }
286 } else {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100287 // If the task is not running, we can schedule right away.
288 let new = if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200289 (state | SCHEDULED) + REFERENCE
290 } else {
291 state | SCHEDULED
292 };
293
294 // Mark the task as scheduled.
295 match (*raw.header).state.compare_exchange_weak(
296 state,
297 new,
298 Ordering::AcqRel,
299 Ordering::Acquire,
300 ) {
301 Ok(_) => {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100302 // If the task is not running, now is the time to schedule.
303 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200304 // If the reference count overflowed, abort.
305 if state > isize::max_value() as usize {
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100306 abort();
Stjepan Glavina1479e862019-08-12 20:18:51 +0200307 }
308
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100309 // Schedule the task. There is no need to call `Self::schedule(ptr)`
310 // because the schedule function cannot be destroyed while the waker is
311 // still alive.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200312 let task = Task {
313 raw_task: NonNull::new_unchecked(ptr as *mut ()),
314 _marker: PhantomData,
315 };
316 (*raw.schedule)(task);
317 }
318
319 break;
320 }
321 Err(s) => state = s,
322 }
323 }
324 }
325 }
326
327 /// Clones a waker.
328 unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
329 let raw = Self::from_ptr(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200330
331 // Increment the reference count. With any kind of reference-counted data structure,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200332 // relaxed ordering is appropriate when incrementing the counter.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200333 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
334
335 // If the reference count overflowed, abort.
336 if state > isize::max_value() as usize {
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100337 abort();
Stjepan Glavina1479e862019-08-12 20:18:51 +0200338 }
339
Stjepan Glavinaa94d2f42020-01-25 00:14:33 +0100340 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
Stjepan Glavina1479e862019-08-12 20:18:51 +0200341 }
342
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200343 /// Drops a waker.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200344 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200345 /// This function will decrement the reference count. If it drops down to zero, the associated
346 /// join handle has been dropped too, and the task has not been completed, then it will get
347 /// scheduled one more time so that its future gets dropped by the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200348 #[inline]
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200349 unsafe fn drop_waker(ptr: *const ()) {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200350 let raw = Self::from_ptr(ptr);
351
352 // Decrement the reference count.
353 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
354
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200355 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
356 // then we need to decide how to destroy the task.
357 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
358 if new & (COMPLETED | CLOSED) == 0 {
359 // If the task was not completed nor closed, close it and schedule one more time so
360 // that its future gets dropped by the executor.
361 (*raw.header)
362 .state
363 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100364 Self::schedule(ptr);
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200365 } else {
366 // Otherwise, destroy the task right away.
367 Self::destroy(ptr);
368 }
369 }
370 }
371
372 /// Drops a task.
373 ///
374 /// This function will decrement the reference count. If it drops down to zero and the
375 /// associated join handle has been dropped too, then the task gets destroyed.
376 #[inline]
377 unsafe fn drop_task(ptr: *const ()) {
378 let raw = Self::from_ptr(ptr);
379
380 // Decrement the reference count.
381 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
382
383 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
384 // then destroy the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200385 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
386 Self::destroy(ptr);
387 }
388 }
389
390 /// Schedules a task for running.
391 ///
392 /// This function doesn't modify the state of the task. It only passes the task reference to
393 /// its schedule function.
394 unsafe fn schedule(ptr: *const ()) {
395 let raw = Self::from_ptr(ptr);
396
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100397 // If the schedule function has captured variables, create a temporary waker that prevents
398 // the task from getting deallocated while the function is being invoked.
399 let _waker;
400 if mem::size_of::<S>() > 0 {
401 _waker = Waker::from_raw(Self::clone_waker(ptr));
402 }
403
404 let task = Task {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200405 raw_task: NonNull::new_unchecked(ptr as *mut ()),
406 _marker: PhantomData,
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100407 };
408 (*raw.schedule)(task);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200409 }
410
411 /// Drops the future inside a task.
412 #[inline]
413 unsafe fn drop_future(ptr: *const ()) {
414 let raw = Self::from_ptr(ptr);
415
416 // We need a safeguard against panics because the destructor can panic.
417 abort_on_panic(|| {
418 raw.future.drop_in_place();
419 })
420 }
421
422 /// Returns a pointer to the output inside a task.
423 unsafe fn get_output(ptr: *const ()) -> *const () {
424 let raw = Self::from_ptr(ptr);
425 raw.output as *const ()
426 }
427
428 /// Cleans up task's resources and deallocates it.
429 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200430 /// The schedule function and the tag will be dropped, and the task will then get deallocated.
431 /// The task must be closed before this function is called.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200432 #[inline]
433 unsafe fn destroy(ptr: *const ()) {
434 let raw = Self::from_ptr(ptr);
435 let task_layout = Self::task_layout();
436
437 // We need a safeguard against panics because destructors can panic.
438 abort_on_panic(|| {
439 // Drop the schedule function.
440 (raw.schedule as *mut S).drop_in_place();
441
442 // Drop the tag.
443 (raw.tag as *mut T).drop_in_place();
444 });
445
446 // Finally, deallocate the memory reserved by the task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600447 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200448 }
449
450 /// Runs a task.
451 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200452 /// If polling its future panics, the task will be closed and the panic will be propagated into
453 /// the caller.
Stjepan Glavina94059052020-04-12 19:46:20 +0200454 unsafe fn run(ptr: *const ()) -> bool {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200455 let raw = Self::from_ptr(ptr);
456
457 // Create a context from the raw task pointer and the vtable inside the its header.
Stjepan Glavinaa94d2f42020-01-25 00:14:33 +0100458 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200459 let cx = &mut Context::from_waker(&waker);
460
461 let mut state = (*raw.header).state.load(Ordering::Acquire);
462
463 // Update the task's state before polling its future.
464 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200465 // If the task has already been closed, drop the task reference and return.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200466 if state & CLOSED != 0 {
467 // Notify the awaiter that the task has been closed.
468 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600469 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200470 }
471
472 // Drop the future.
473 Self::drop_future(ptr);
474
475 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200476 Self::drop_task(ptr);
Stjepan Glavina94059052020-04-12 19:46:20 +0200477 return false;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200478 }
479
480 // Mark the task as unscheduled and running.
481 match (*raw.header).state.compare_exchange_weak(
482 state,
483 (state & !SCHEDULED) | RUNNING,
484 Ordering::AcqRel,
485 Ordering::Acquire,
486 ) {
487 Ok(_) => {
488 // Update the state because we're continuing with polling the future.
489 state = (state & !SCHEDULED) | RUNNING;
490 break;
491 }
492 Err(s) => state = s,
493 }
494 }
495
496 // Poll the inner future, but surround it with a guard that closes the task in case polling
497 // panics.
498 let guard = Guard(raw);
499 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
500 mem::forget(guard);
501
502 match poll {
503 Poll::Ready(out) => {
504 // Replace the future with its output.
505 Self::drop_future(ptr);
506 raw.output.write(out);
507
508 // A place where the output will be stored in case it needs to be dropped.
509 let mut output = None;
510
511 // The task is now completed.
512 loop {
513 // If the handle is dropped, we'll need to close it and drop the output.
514 let new = if state & HANDLE == 0 {
515 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
516 } else {
517 (state & !RUNNING & !SCHEDULED) | COMPLETED
518 };
519
520 // Mark the task as not running and completed.
521 match (*raw.header).state.compare_exchange_weak(
522 state,
523 new,
524 Ordering::AcqRel,
525 Ordering::Acquire,
526 ) {
527 Ok(_) => {
528 // If the handle is dropped or if the task was closed while running,
529 // now it's time to drop the output.
530 if state & HANDLE == 0 || state & CLOSED != 0 {
531 // Read the output.
532 output = Some(raw.output.read());
533 }
534
535 // Notify the awaiter that the task has been completed.
536 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600537 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200538 }
539
540 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200541 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200542 break;
543 }
544 Err(s) => state = s,
545 }
546 }
547
548 // Drop the output if it was taken out of the task.
549 drop(output);
550 }
551 Poll::Pending => {
552 // The task is still not completed.
553 loop {
554 // If the task was closed while running, we'll need to unschedule in case it
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200555 // was woken up and then destroy it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200556 let new = if state & CLOSED != 0 {
557 state & !RUNNING & !SCHEDULED
558 } else {
559 state & !RUNNING
560 };
561
562 // Mark the task as not running.
563 match (*raw.header).state.compare_exchange_weak(
564 state,
565 new,
566 Ordering::AcqRel,
567 Ordering::Acquire,
568 ) {
569 Ok(state) => {
570 // If the task was closed while running, we need to drop its future.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200571 // If the task was woken up while running, we need to schedule it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200572 // Otherwise, we just drop the task reference.
573 if state & CLOSED != 0 {
574 // The thread that closed the task didn't drop the future because
575 // it was running so now it's our responsibility to do so.
576 Self::drop_future(ptr);
577
578 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200579 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200580 } else if state & SCHEDULED != 0 {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200581 // The thread that woke the task up didn't reschedule it because
Stjepan Glavina1479e862019-08-12 20:18:51 +0200582 // it was running so now it's our responsibility to do so.
583 Self::schedule(ptr);
Stjepan Glavina94059052020-04-12 19:46:20 +0200584 return true;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200585 } else {
586 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200587 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200588 }
589 break;
590 }
591 Err(s) => state = s,
592 }
593 }
594 }
595 }
596
Stjepan Glavina94059052020-04-12 19:46:20 +0200597 return false;
598
Stjepan Glavina1479e862019-08-12 20:18:51 +0200599 /// A guard that closes the task if polling its future panics.
600 struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
601 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100602 F: Future<Output = R> + 'static,
603 S: Fn(Task<T>) + Send + Sync + 'static;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200604
605 impl<F, R, S, T> Drop for Guard<F, R, S, T>
606 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100607 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200608 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200609 {
610 fn drop(&mut self) {
611 let raw = self.0;
612 let ptr = raw.header as *const ();
613
614 unsafe {
615 let mut state = (*raw.header).state.load(Ordering::Acquire);
616
617 loop {
618 // If the task was closed while running, then unschedule it, drop its
619 // future, and drop the task reference.
620 if state & CLOSED != 0 {
621 // We still need to unschedule the task because it is possible it was
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200622 // woken up while running.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200623 (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
624
625 // The thread that closed the task didn't drop the future because it
626 // was running so now it's our responsibility to do so.
627 RawTask::<F, R, S, T>::drop_future(ptr);
628
629 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200630 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200631 break;
632 }
633
634 // Mark the task as not running, not scheduled, and closed.
635 match (*raw.header).state.compare_exchange_weak(
636 state,
637 (state & !RUNNING & !SCHEDULED) | CLOSED,
638 Ordering::AcqRel,
639 Ordering::Acquire,
640 ) {
641 Ok(state) => {
642 // Drop the future because the task is now closed.
643 RawTask::<F, R, S, T>::drop_future(ptr);
644
645 // Notify the awaiter that the task has been closed.
646 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600647 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200648 }
649
650 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200651 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200652 break;
653 }
654 Err(s) => state = s,
655 }
656 }
657 }
658 }
659 }
660 }
661}