blob: ed3ee97f9f0dcfb9940cea339e2e0a4c0d0f4525 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use alloc::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::mem::{self, ManuallyDrop};
6use core::pin::Pin;
7use core::ptr::NonNull;
8use core::sync::atomic::{AtomicUsize, Ordering};
9use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +020010
11use crate::header::Header;
12use crate::state::*;
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +010013use crate::utils::{abort, abort_on_panic, extend};
Stjepan Glavina1479e862019-08-12 20:18:51 +020014use crate::Task;
15
16/// The vtable for a task.
17pub(crate) struct TaskVTable {
Stjepan Glavina1479e862019-08-12 20:18:51 +020018 /// Schedules the task.
19 pub(crate) schedule: unsafe fn(*const ()),
20
21 /// Drops the future inside the task.
22 pub(crate) drop_future: unsafe fn(*const ()),
23
24 /// Returns a pointer to the output stored after completion.
25 pub(crate) get_output: unsafe fn(*const ()) -> *const (),
26
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020027 /// Drops the task.
28 pub(crate) drop_task: unsafe fn(ptr: *const ()),
Stjepan Glavina1479e862019-08-12 20:18:51 +020029
30 /// Destroys the task.
31 pub(crate) destroy: unsafe fn(*const ()),
32
33 /// Runs the task.
34 pub(crate) run: unsafe fn(*const ()),
Stjepan Glavinaaf051a52020-01-06 15:25:52 -060035
36 /// Creates a new waker associated with the task.
37 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
Stjepan Glavina1479e862019-08-12 20:18:51 +020038}
39
40/// Memory layout of a task.
41///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020042/// This struct contains the following information:
Stjepan Glavina1479e862019-08-12 20:18:51 +020043///
44/// 1. How to allocate and deallocate the task.
45/// 2. How to access the fields inside the task.
46#[derive(Clone, Copy)]
47pub(crate) struct TaskLayout {
48 /// Memory layout of the whole task.
49 pub(crate) layout: Layout,
50
51 /// Offset into the task at which the tag is stored.
52 pub(crate) offset_t: usize,
53
54 /// Offset into the task at which the schedule function is stored.
55 pub(crate) offset_s: usize,
56
57 /// Offset into the task at which the future is stored.
58 pub(crate) offset_f: usize,
59
60 /// Offset into the task at which the output is stored.
61 pub(crate) offset_r: usize,
62}
63
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020064/// Raw pointers to the fields inside a task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020065pub(crate) struct RawTask<F, R, S, T> {
66 /// The task header.
67 pub(crate) header: *const Header,
68
69 /// The schedule function.
70 pub(crate) schedule: *const S,
71
72 /// The tag inside the task.
73 pub(crate) tag: *mut T,
74
75 /// The future.
76 pub(crate) future: *mut F,
77
78 /// The output of the future.
79 pub(crate) output: *mut R,
80}
81
82impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
83
84impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
85 fn clone(&self) -> Self {
86 Self {
87 header: self.header,
88 schedule: self.schedule,
89 tag: self.tag,
90 future: self.future,
91 output: self.output,
92 }
93 }
94}
95
96impl<F, R, S, T> RawTask<F, R, S, T>
97where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010098 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +020099 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200100{
Stjepan Glavinaa94d2f42020-01-25 00:14:33 +0100101 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
102 Self::clone_waker,
103 Self::wake,
104 Self::wake_by_ref,
105 Self::drop_waker,
106 );
107
Stjepan Glavina1479e862019-08-12 20:18:51 +0200108 /// Allocates a task with the given `future` and `schedule` function.
109 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200110 /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100111 pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200112 // Compute the layout of the task for allocation. Abort if the computation fails.
113 let task_layout = abort_on_panic(|| Self::task_layout());
114
115 unsafe {
116 // Allocate enough space for the entire task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600117 let raw_task = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100118 None => abort(),
Stjepan Glavina1479e862019-08-12 20:18:51 +0200119 Some(p) => p,
120 };
121
122 let raw = Self::from_ptr(raw_task.as_ptr());
123
124 // Write the header as the first field of the task.
125 (raw.header as *mut Header).write(Header {
126 state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600127 awaiter: UnsafeCell::new(None),
Stjepan Glavina1479e862019-08-12 20:18:51 +0200128 vtable: &TaskVTable {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200129 schedule: Self::schedule,
130 drop_future: Self::drop_future,
131 get_output: Self::get_output,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200132 drop_task: Self::drop_task,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200133 destroy: Self::destroy,
134 run: Self::run,
Stjepan Glavinaaf051a52020-01-06 15:25:52 -0600135 clone_waker: Self::clone_waker,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200136 },
137 });
138
139 // Write the tag as the second field of the task.
140 (raw.tag as *mut T).write(tag);
141
142 // Write the schedule function as the third field of the task.
143 (raw.schedule as *mut S).write(schedule);
144
145 // Write the future as the fourth field of the task.
146 raw.future.write(future);
147
148 raw_task
149 }
150 }
151
152 /// Creates a `RawTask` from a raw task pointer.
153 #[inline]
154 pub(crate) fn from_ptr(ptr: *const ()) -> Self {
155 let task_layout = Self::task_layout();
156 let p = ptr as *const u8;
157
158 unsafe {
159 Self {
160 header: p as *const Header,
161 tag: p.add(task_layout.offset_t) as *mut T,
162 schedule: p.add(task_layout.offset_s) as *const S,
163 future: p.add(task_layout.offset_f) as *mut F,
164 output: p.add(task_layout.offset_r) as *mut R,
165 }
166 }
167 }
168
169 /// Returns the memory layout for a task.
170 #[inline]
171 fn task_layout() -> TaskLayout {
172 // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
173 let layout_header = Layout::new::<Header>();
174 let layout_t = Layout::new::<T>();
175 let layout_s = Layout::new::<S>();
176 let layout_f = Layout::new::<F>();
177 let layout_r = Layout::new::<R>();
178
179 // Compute the layout for `union { F, R }`.
180 let size_union = layout_f.size().max(layout_r.size());
181 let align_union = layout_f.align().max(layout_r.align());
182 let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
183
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200184 // Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200185 let layout = layout_header;
186 let (layout, offset_t) = extend(layout, layout_t);
187 let (layout, offset_s) = extend(layout, layout_s);
188 let (layout, offset_union) = extend(layout, layout_union);
189 let offset_f = offset_union;
190 let offset_r = offset_union;
191
192 TaskLayout {
193 layout,
194 offset_t,
195 offset_s,
196 offset_f,
197 offset_r,
198 }
199 }
200
201 /// Wakes a waker.
202 unsafe fn wake(ptr: *const ()) {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100203 // This is just an optimization. If the schedule function has captured variables, then
204 // we'll do less reference counting if we wake the waker by reference and then drop it.
205 if mem::size_of::<S>() > 0 {
206 Self::wake_by_ref(ptr);
207 Self::drop_waker(ptr);
208 return;
209 }
210
Stjepan Glavina1479e862019-08-12 20:18:51 +0200211 let raw = Self::from_ptr(ptr);
212
213 let mut state = (*raw.header).state.load(Ordering::Acquire);
214
215 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200216 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200217 if state & (COMPLETED | CLOSED) != 0 {
218 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200219 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200220 break;
221 }
222
223 // If the task is already scheduled, we just need to synchronize with the thread that
224 // will run the task by "publishing" our current view of the memory.
225 if state & SCHEDULED != 0 {
226 // Update the state without actually modifying it.
227 match (*raw.header).state.compare_exchange_weak(
228 state,
229 state,
230 Ordering::AcqRel,
231 Ordering::Acquire,
232 ) {
233 Ok(_) => {
234 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200235 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200236 break;
237 }
238 Err(s) => state = s,
239 }
240 } else {
241 // Mark the task as scheduled.
242 match (*raw.header).state.compare_exchange_weak(
243 state,
244 state | SCHEDULED,
245 Ordering::AcqRel,
246 Ordering::Acquire,
247 ) {
248 Ok(_) => {
249 // If the task is not yet scheduled and isn't currently running, now is the
250 // time to schedule it.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100251 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200252 // Schedule the task.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100253 Self::schedule(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200254 } else {
255 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200256 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200257 }
258
259 break;
260 }
261 Err(s) => state = s,
262 }
263 }
264 }
265 }
266
267 /// Wakes a waker by reference.
268 unsafe fn wake_by_ref(ptr: *const ()) {
269 let raw = Self::from_ptr(ptr);
270
271 let mut state = (*raw.header).state.load(Ordering::Acquire);
272
273 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200274 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200275 if state & (COMPLETED | CLOSED) != 0 {
276 break;
277 }
278
279 // If the task is already scheduled, we just need to synchronize with the thread that
280 // will run the task by "publishing" our current view of the memory.
281 if state & SCHEDULED != 0 {
282 // Update the state without actually modifying it.
283 match (*raw.header).state.compare_exchange_weak(
284 state,
285 state,
286 Ordering::AcqRel,
287 Ordering::Acquire,
288 ) {
289 Ok(_) => break,
290 Err(s) => state = s,
291 }
292 } else {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100293 // If the task is not running, we can schedule right away.
294 let new = if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200295 (state | SCHEDULED) + REFERENCE
296 } else {
297 state | SCHEDULED
298 };
299
300 // Mark the task as scheduled.
301 match (*raw.header).state.compare_exchange_weak(
302 state,
303 new,
304 Ordering::AcqRel,
305 Ordering::Acquire,
306 ) {
307 Ok(_) => {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100308 // If the task is not running, now is the time to schedule.
309 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200310 // If the reference count overflowed, abort.
311 if state > isize::max_value() as usize {
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100312 abort();
Stjepan Glavina1479e862019-08-12 20:18:51 +0200313 }
314
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100315 // Schedule the task. There is no need to call `Self::schedule(ptr)`
316 // because the schedule function cannot be destroyed while the waker is
317 // still alive.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200318 let task = Task {
319 raw_task: NonNull::new_unchecked(ptr as *mut ()),
320 _marker: PhantomData,
321 };
322 (*raw.schedule)(task);
323 }
324
325 break;
326 }
327 Err(s) => state = s,
328 }
329 }
330 }
331 }
332
333 /// Clones a waker.
334 unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
335 let raw = Self::from_ptr(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200336
337 // Increment the reference count. With any kind of reference-counted data structure,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200338 // relaxed ordering is appropriate when incrementing the counter.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200339 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
340
341 // If the reference count overflowed, abort.
342 if state > isize::max_value() as usize {
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100343 abort();
Stjepan Glavina1479e862019-08-12 20:18:51 +0200344 }
345
Stjepan Glavinaa94d2f42020-01-25 00:14:33 +0100346 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
Stjepan Glavina1479e862019-08-12 20:18:51 +0200347 }
348
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200349 /// Drops a waker.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200350 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200351 /// This function will decrement the reference count. If it drops down to zero, the associated
352 /// join handle has been dropped too, and the task has not been completed, then it will get
353 /// scheduled one more time so that its future gets dropped by the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200354 #[inline]
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200355 unsafe fn drop_waker(ptr: *const ()) {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200356 let raw = Self::from_ptr(ptr);
357
358 // Decrement the reference count.
359 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
360
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200361 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
362 // then we need to decide how to destroy the task.
363 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
364 if new & (COMPLETED | CLOSED) == 0 {
365 // If the task was not completed nor closed, close it and schedule one more time so
366 // that its future gets dropped by the executor.
367 (*raw.header)
368 .state
369 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100370 Self::schedule(ptr);
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200371 } else {
372 // Otherwise, destroy the task right away.
373 Self::destroy(ptr);
374 }
375 }
376 }
377
378 /// Drops a task.
379 ///
380 /// This function will decrement the reference count. If it drops down to zero and the
381 /// associated join handle has been dropped too, then the task gets destroyed.
382 #[inline]
383 unsafe fn drop_task(ptr: *const ()) {
384 let raw = Self::from_ptr(ptr);
385
386 // Decrement the reference count.
387 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
388
389 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
390 // then destroy the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200391 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
392 Self::destroy(ptr);
393 }
394 }
395
396 /// Schedules a task for running.
397 ///
398 /// This function doesn't modify the state of the task. It only passes the task reference to
399 /// its schedule function.
400 unsafe fn schedule(ptr: *const ()) {
401 let raw = Self::from_ptr(ptr);
402
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100403 // If the schedule function has captured variables, create a temporary waker that prevents
404 // the task from getting deallocated while the function is being invoked.
405 let _waker;
406 if mem::size_of::<S>() > 0 {
407 _waker = Waker::from_raw(Self::clone_waker(ptr));
408 }
409
410 let task = Task {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200411 raw_task: NonNull::new_unchecked(ptr as *mut ()),
412 _marker: PhantomData,
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100413 };
414 (*raw.schedule)(task);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200415 }
416
417 /// Drops the future inside a task.
418 #[inline]
419 unsafe fn drop_future(ptr: *const ()) {
420 let raw = Self::from_ptr(ptr);
421
422 // We need a safeguard against panics because the destructor can panic.
423 abort_on_panic(|| {
424 raw.future.drop_in_place();
425 })
426 }
427
428 /// Returns a pointer to the output inside a task.
429 unsafe fn get_output(ptr: *const ()) -> *const () {
430 let raw = Self::from_ptr(ptr);
431 raw.output as *const ()
432 }
433
434 /// Cleans up task's resources and deallocates it.
435 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200436 /// The schedule function and the tag will be dropped, and the task will then get deallocated.
437 /// The task must be closed before this function is called.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200438 #[inline]
439 unsafe fn destroy(ptr: *const ()) {
440 let raw = Self::from_ptr(ptr);
441 let task_layout = Self::task_layout();
442
443 // We need a safeguard against panics because destructors can panic.
444 abort_on_panic(|| {
445 // Drop the schedule function.
446 (raw.schedule as *mut S).drop_in_place();
447
448 // Drop the tag.
449 (raw.tag as *mut T).drop_in_place();
450 });
451
452 // Finally, deallocate the memory reserved by the task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600453 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200454 }
455
456 /// Runs a task.
457 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200458 /// If polling its future panics, the task will be closed and the panic will be propagated into
459 /// the caller.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200460 unsafe fn run(ptr: *const ()) {
461 let raw = Self::from_ptr(ptr);
462
463 // Create a context from the raw task pointer and the vtable inside the its header.
Stjepan Glavinaa94d2f42020-01-25 00:14:33 +0100464 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200465 let cx = &mut Context::from_waker(&waker);
466
467 let mut state = (*raw.header).state.load(Ordering::Acquire);
468
469 // Update the task's state before polling its future.
470 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200471 // If the task has already been closed, drop the task reference and return.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200472 if state & CLOSED != 0 {
473 // Notify the awaiter that the task has been closed.
474 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600475 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200476 }
477
478 // Drop the future.
479 Self::drop_future(ptr);
480
481 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200482 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200483 return;
484 }
485
486 // Mark the task as unscheduled and running.
487 match (*raw.header).state.compare_exchange_weak(
488 state,
489 (state & !SCHEDULED) | RUNNING,
490 Ordering::AcqRel,
491 Ordering::Acquire,
492 ) {
493 Ok(_) => {
494 // Update the state because we're continuing with polling the future.
495 state = (state & !SCHEDULED) | RUNNING;
496 break;
497 }
498 Err(s) => state = s,
499 }
500 }
501
502 // Poll the inner future, but surround it with a guard that closes the task in case polling
503 // panics.
504 let guard = Guard(raw);
505 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
506 mem::forget(guard);
507
508 match poll {
509 Poll::Ready(out) => {
510 // Replace the future with its output.
511 Self::drop_future(ptr);
512 raw.output.write(out);
513
514 // A place where the output will be stored in case it needs to be dropped.
515 let mut output = None;
516
517 // The task is now completed.
518 loop {
519 // If the handle is dropped, we'll need to close it and drop the output.
520 let new = if state & HANDLE == 0 {
521 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
522 } else {
523 (state & !RUNNING & !SCHEDULED) | COMPLETED
524 };
525
526 // Mark the task as not running and completed.
527 match (*raw.header).state.compare_exchange_weak(
528 state,
529 new,
530 Ordering::AcqRel,
531 Ordering::Acquire,
532 ) {
533 Ok(_) => {
534 // If the handle is dropped or if the task was closed while running,
535 // now it's time to drop the output.
536 if state & HANDLE == 0 || state & CLOSED != 0 {
537 // Read the output.
538 output = Some(raw.output.read());
539 }
540
541 // Notify the awaiter that the task has been completed.
542 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600543 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200544 }
545
546 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200547 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200548 break;
549 }
550 Err(s) => state = s,
551 }
552 }
553
554 // Drop the output if it was taken out of the task.
555 drop(output);
556 }
557 Poll::Pending => {
558 // The task is still not completed.
559 loop {
560 // If the task was closed while running, we'll need to unschedule in case it
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200561 // was woken up and then destroy it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200562 let new = if state & CLOSED != 0 {
563 state & !RUNNING & !SCHEDULED
564 } else {
565 state & !RUNNING
566 };
567
568 // Mark the task as not running.
569 match (*raw.header).state.compare_exchange_weak(
570 state,
571 new,
572 Ordering::AcqRel,
573 Ordering::Acquire,
574 ) {
575 Ok(state) => {
576 // If the task was closed while running, we need to drop its future.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200577 // If the task was woken up while running, we need to schedule it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200578 // Otherwise, we just drop the task reference.
579 if state & CLOSED != 0 {
580 // The thread that closed the task didn't drop the future because
581 // it was running so now it's our responsibility to do so.
582 Self::drop_future(ptr);
583
584 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200585 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200586 } else if state & SCHEDULED != 0 {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200587 // The thread that woke the task up didn't reschedule it because
Stjepan Glavina1479e862019-08-12 20:18:51 +0200588 // it was running so now it's our responsibility to do so.
589 Self::schedule(ptr);
590 } else {
591 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200592 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200593 }
594 break;
595 }
596 Err(s) => state = s,
597 }
598 }
599 }
600 }
601
602 /// A guard that closes the task if polling its future panics.
603 struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
604 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100605 F: Future<Output = R> + 'static,
606 S: Fn(Task<T>) + Send + Sync + 'static;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200607
608 impl<F, R, S, T> Drop for Guard<F, R, S, T>
609 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100610 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200611 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200612 {
613 fn drop(&mut self) {
614 let raw = self.0;
615 let ptr = raw.header as *const ();
616
617 unsafe {
618 let mut state = (*raw.header).state.load(Ordering::Acquire);
619
620 loop {
621 // If the task was closed while running, then unschedule it, drop its
622 // future, and drop the task reference.
623 if state & CLOSED != 0 {
624 // We still need to unschedule the task because it is possible it was
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200625 // woken up while running.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200626 (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
627
628 // The thread that closed the task didn't drop the future because it
629 // was running so now it's our responsibility to do so.
630 RawTask::<F, R, S, T>::drop_future(ptr);
631
632 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200633 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200634 break;
635 }
636
637 // Mark the task as not running, not scheduled, and closed.
638 match (*raw.header).state.compare_exchange_weak(
639 state,
640 (state & !RUNNING & !SCHEDULED) | CLOSED,
641 Ordering::AcqRel,
642 Ordering::Acquire,
643 ) {
644 Ok(state) => {
645 // Drop the future because the task is now closed.
646 RawTask::<F, R, S, T>::drop_future(ptr);
647
648 // Notify the awaiter that the task has been closed.
649 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600650 (*raw.header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200651 }
652
653 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200654 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200655 break;
656 }
657 Err(s) => state = s,
658 }
659 }
660 }
661 }
662 }
663 }
664}