blob: c250c02ff29cbc0557a8f464443b31c9707039c5 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001use std::alloc::{self, Layout};
2use std::cell::Cell;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::mem::{self, ManuallyDrop};
6use std::pin::Pin;
7use std::ptr::NonNull;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
10
11use crate::header::Header;
12use crate::state::*;
13use crate::utils::{abort_on_panic, extend};
14use crate::Task;
15
16/// The vtable for a task.
17pub(crate) struct TaskVTable {
18 /// The raw waker vtable.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +010019 pub(crate) raw_waker_vtable: RawWakerVTable,
Stjepan Glavina1479e862019-08-12 20:18:51 +020020
21 /// Schedules the task.
22 pub(crate) schedule: unsafe fn(*const ()),
23
24 /// Drops the future inside the task.
25 pub(crate) drop_future: unsafe fn(*const ()),
26
27 /// Returns a pointer to the output stored after completion.
28 pub(crate) get_output: unsafe fn(*const ()) -> *const (),
29
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020030 /// Drops the task.
31 pub(crate) drop_task: unsafe fn(ptr: *const ()),
Stjepan Glavina1479e862019-08-12 20:18:51 +020032
33 /// Destroys the task.
34 pub(crate) destroy: unsafe fn(*const ()),
35
36 /// Runs the task.
37 pub(crate) run: unsafe fn(*const ()),
38}
39
40/// Memory layout of a task.
41///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020042/// This struct contains the following information:
Stjepan Glavina1479e862019-08-12 20:18:51 +020043///
44/// 1. How to allocate and deallocate the task.
45/// 2. How to access the fields inside the task.
46#[derive(Clone, Copy)]
47pub(crate) struct TaskLayout {
48 /// Memory layout of the whole task.
49 pub(crate) layout: Layout,
50
51 /// Offset into the task at which the tag is stored.
52 pub(crate) offset_t: usize,
53
54 /// Offset into the task at which the schedule function is stored.
55 pub(crate) offset_s: usize,
56
57 /// Offset into the task at which the future is stored.
58 pub(crate) offset_f: usize,
59
60 /// Offset into the task at which the output is stored.
61 pub(crate) offset_r: usize,
62}
63
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020064/// Raw pointers to the fields inside a task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020065pub(crate) struct RawTask<F, R, S, T> {
66 /// The task header.
67 pub(crate) header: *const Header,
68
69 /// The schedule function.
70 pub(crate) schedule: *const S,
71
72 /// The tag inside the task.
73 pub(crate) tag: *mut T,
74
75 /// The future.
76 pub(crate) future: *mut F,
77
78 /// The output of the future.
79 pub(crate) output: *mut R,
80}
81
82impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
83
84impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
85 fn clone(&self) -> Self {
86 Self {
87 header: self.header,
88 schedule: self.schedule,
89 tag: self.tag,
90 future: self.future,
91 output: self.output,
92 }
93 }
94}
95
96impl<F, R, S, T> RawTask<F, R, S, T>
97where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010098 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +020099 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200100{
101 /// Allocates a task with the given `future` and `schedule` function.
102 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200103 /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100104 pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200105 // Compute the layout of the task for allocation. Abort if the computation fails.
106 let task_layout = abort_on_panic(|| Self::task_layout());
107
108 unsafe {
109 // Allocate enough space for the entire task.
110 let raw_task = match NonNull::new(alloc::alloc(task_layout.layout) as *mut ()) {
111 None => std::process::abort(),
112 Some(p) => p,
113 };
114
115 let raw = Self::from_ptr(raw_task.as_ptr());
116
117 // Write the header as the first field of the task.
118 (raw.header as *mut Header).write(Header {
119 state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
120 awaiter: Cell::new(None),
121 vtable: &TaskVTable {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100122 raw_waker_vtable: RawWakerVTable::new(
Stjepan Glavina1479e862019-08-12 20:18:51 +0200123 Self::clone_waker,
124 Self::wake,
125 Self::wake_by_ref,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200126 Self::drop_waker,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200127 ),
128 schedule: Self::schedule,
129 drop_future: Self::drop_future,
130 get_output: Self::get_output,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200131 drop_task: Self::drop_task,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200132 destroy: Self::destroy,
133 run: Self::run,
134 },
135 });
136
137 // Write the tag as the second field of the task.
138 (raw.tag as *mut T).write(tag);
139
140 // Write the schedule function as the third field of the task.
141 (raw.schedule as *mut S).write(schedule);
142
143 // Write the future as the fourth field of the task.
144 raw.future.write(future);
145
146 raw_task
147 }
148 }
149
150 /// Creates a `RawTask` from a raw task pointer.
151 #[inline]
152 pub(crate) fn from_ptr(ptr: *const ()) -> Self {
153 let task_layout = Self::task_layout();
154 let p = ptr as *const u8;
155
156 unsafe {
157 Self {
158 header: p as *const Header,
159 tag: p.add(task_layout.offset_t) as *mut T,
160 schedule: p.add(task_layout.offset_s) as *const S,
161 future: p.add(task_layout.offset_f) as *mut F,
162 output: p.add(task_layout.offset_r) as *mut R,
163 }
164 }
165 }
166
167 /// Returns the memory layout for a task.
168 #[inline]
169 fn task_layout() -> TaskLayout {
170 // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
171 let layout_header = Layout::new::<Header>();
172 let layout_t = Layout::new::<T>();
173 let layout_s = Layout::new::<S>();
174 let layout_f = Layout::new::<F>();
175 let layout_r = Layout::new::<R>();
176
177 // Compute the layout for `union { F, R }`.
178 let size_union = layout_f.size().max(layout_r.size());
179 let align_union = layout_f.align().max(layout_r.align());
180 let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
181
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200182 // Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200183 let layout = layout_header;
184 let (layout, offset_t) = extend(layout, layout_t);
185 let (layout, offset_s) = extend(layout, layout_s);
186 let (layout, offset_union) = extend(layout, layout_union);
187 let offset_f = offset_union;
188 let offset_r = offset_union;
189
190 TaskLayout {
191 layout,
192 offset_t,
193 offset_s,
194 offset_f,
195 offset_r,
196 }
197 }
198
199 /// Wakes a waker.
200 unsafe fn wake(ptr: *const ()) {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100201 // This is just an optimization. If the schedule function has captured variables, then
202 // we'll do less reference counting if we wake the waker by reference and then drop it.
203 if mem::size_of::<S>() > 0 {
204 Self::wake_by_ref(ptr);
205 Self::drop_waker(ptr);
206 return;
207 }
208
Stjepan Glavina1479e862019-08-12 20:18:51 +0200209 let raw = Self::from_ptr(ptr);
210
211 let mut state = (*raw.header).state.load(Ordering::Acquire);
212
213 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200214 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200215 if state & (COMPLETED | CLOSED) != 0 {
216 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200217 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200218 break;
219 }
220
221 // If the task is already scheduled, we just need to synchronize with the thread that
222 // will run the task by "publishing" our current view of the memory.
223 if state & SCHEDULED != 0 {
224 // Update the state without actually modifying it.
225 match (*raw.header).state.compare_exchange_weak(
226 state,
227 state,
228 Ordering::AcqRel,
229 Ordering::Acquire,
230 ) {
231 Ok(_) => {
232 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200233 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200234 break;
235 }
236 Err(s) => state = s,
237 }
238 } else {
239 // Mark the task as scheduled.
240 match (*raw.header).state.compare_exchange_weak(
241 state,
242 state | SCHEDULED,
243 Ordering::AcqRel,
244 Ordering::Acquire,
245 ) {
246 Ok(_) => {
247 // If the task is not yet scheduled and isn't currently running, now is the
248 // time to schedule it.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100249 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200250 // Schedule the task.
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100251 Self::schedule(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200252 } else {
253 // Drop the waker.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200254 Self::drop_waker(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200255 }
256
257 break;
258 }
259 Err(s) => state = s,
260 }
261 }
262 }
263 }
264
265 /// Wakes a waker by reference.
266 unsafe fn wake_by_ref(ptr: *const ()) {
267 let raw = Self::from_ptr(ptr);
268
269 let mut state = (*raw.header).state.load(Ordering::Acquire);
270
271 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200272 // If the task is completed or closed, it can't be woken up.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200273 if state & (COMPLETED | CLOSED) != 0 {
274 break;
275 }
276
277 // If the task is already scheduled, we just need to synchronize with the thread that
278 // will run the task by "publishing" our current view of the memory.
279 if state & SCHEDULED != 0 {
280 // Update the state without actually modifying it.
281 match (*raw.header).state.compare_exchange_weak(
282 state,
283 state,
284 Ordering::AcqRel,
285 Ordering::Acquire,
286 ) {
287 Ok(_) => break,
288 Err(s) => state = s,
289 }
290 } else {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100291 // If the task is not running, we can schedule right away.
292 let new = if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200293 (state | SCHEDULED) + REFERENCE
294 } else {
295 state | SCHEDULED
296 };
297
298 // Mark the task as scheduled.
299 match (*raw.header).state.compare_exchange_weak(
300 state,
301 new,
302 Ordering::AcqRel,
303 Ordering::Acquire,
304 ) {
305 Ok(_) => {
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100306 // If the task is not running, now is the time to schedule.
307 if state & RUNNING == 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200308 // If the reference count overflowed, abort.
309 if state > isize::max_value() as usize {
310 std::process::abort();
311 }
312
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100313 // Schedule the task. There is no need to call `Self::schedule(ptr)`
314 // because the schedule function cannot be destroyed while the waker is
315 // still alive.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200316 let task = Task {
317 raw_task: NonNull::new_unchecked(ptr as *mut ()),
318 _marker: PhantomData,
319 };
320 (*raw.schedule)(task);
321 }
322
323 break;
324 }
325 Err(s) => state = s,
326 }
327 }
328 }
329 }
330
331 /// Clones a waker.
332 unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
333 let raw = Self::from_ptr(ptr);
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100334 let raw_waker_vtable = &(*raw.header).vtable.raw_waker_vtable;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200335
336 // Increment the reference count. With any kind of reference-counted data structure,
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200337 // relaxed ordering is appropriate when incrementing the counter.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200338 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
339
340 // If the reference count overflowed, abort.
341 if state > isize::max_value() as usize {
342 std::process::abort();
343 }
344
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100345 RawWaker::new(ptr, raw_waker_vtable)
Stjepan Glavina1479e862019-08-12 20:18:51 +0200346 }
347
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200348 /// Drops a waker.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200349 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200350 /// This function will decrement the reference count. If it drops down to zero, the associated
351 /// join handle has been dropped too, and the task has not been completed, then it will get
352 /// scheduled one more time so that its future gets dropped by the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200353 #[inline]
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200354 unsafe fn drop_waker(ptr: *const ()) {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200355 let raw = Self::from_ptr(ptr);
356
357 // Decrement the reference count.
358 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
359
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200360 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
361 // then we need to decide how to destroy the task.
362 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
363 if new & (COMPLETED | CLOSED) == 0 {
364 // If the task was not completed nor closed, close it and schedule one more time so
365 // that its future gets dropped by the executor.
366 (*raw.header)
367 .state
368 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100369 Self::schedule(ptr);
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200370 } else {
371 // Otherwise, destroy the task right away.
372 Self::destroy(ptr);
373 }
374 }
375 }
376
377 /// Drops a task.
378 ///
379 /// This function will decrement the reference count. If it drops down to zero and the
380 /// associated join handle has been dropped too, then the task gets destroyed.
381 #[inline]
382 unsafe fn drop_task(ptr: *const ()) {
383 let raw = Self::from_ptr(ptr);
384
385 // Decrement the reference count.
386 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
387
388 // If this was the last reference to the task and the `JoinHandle` has been dropped too,
389 // then destroy the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200390 if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
391 Self::destroy(ptr);
392 }
393 }
394
395 /// Schedules a task for running.
396 ///
397 /// This function doesn't modify the state of the task. It only passes the task reference to
398 /// its schedule function.
399 unsafe fn schedule(ptr: *const ()) {
400 let raw = Self::from_ptr(ptr);
401
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100402 // If the schedule function has captured variables, create a temporary waker that prevents
403 // the task from getting deallocated while the function is being invoked.
404 let _waker;
405 if mem::size_of::<S>() > 0 {
406 _waker = Waker::from_raw(Self::clone_waker(ptr));
407 }
408
409 let task = Task {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200410 raw_task: NonNull::new_unchecked(ptr as *mut ()),
411 _marker: PhantomData,
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100412 };
413 (*raw.schedule)(task);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200414 }
415
416 /// Drops the future inside a task.
417 #[inline]
418 unsafe fn drop_future(ptr: *const ()) {
419 let raw = Self::from_ptr(ptr);
420
421 // We need a safeguard against panics because the destructor can panic.
422 abort_on_panic(|| {
423 raw.future.drop_in_place();
424 })
425 }
426
427 /// Returns a pointer to the output inside a task.
428 unsafe fn get_output(ptr: *const ()) -> *const () {
429 let raw = Self::from_ptr(ptr);
430 raw.output as *const ()
431 }
432
433 /// Cleans up task's resources and deallocates it.
434 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200435 /// The schedule function and the tag will be dropped, and the task will then get deallocated.
436 /// The task must be closed before this function is called.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200437 #[inline]
438 unsafe fn destroy(ptr: *const ()) {
439 let raw = Self::from_ptr(ptr);
440 let task_layout = Self::task_layout();
441
442 // We need a safeguard against panics because destructors can panic.
443 abort_on_panic(|| {
444 // Drop the schedule function.
445 (raw.schedule as *mut S).drop_in_place();
446
447 // Drop the tag.
448 (raw.tag as *mut T).drop_in_place();
449 });
450
451 // Finally, deallocate the memory reserved by the task.
452 alloc::dealloc(ptr as *mut u8, task_layout.layout);
453 }
454
455 /// Runs a task.
456 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200457 /// If polling its future panics, the task will be closed and the panic will be propagated into
458 /// the caller.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200459 unsafe fn run(ptr: *const ()) {
460 let raw = Self::from_ptr(ptr);
461
462 // Create a context from the raw task pointer and the vtable inside the its header.
463 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
464 ptr,
Stjepan Glavina45a2c1f2019-12-31 01:38:58 +0100465 &(*raw.header).vtable.raw_waker_vtable,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200466 )));
467 let cx = &mut Context::from_waker(&waker);
468
469 let mut state = (*raw.header).state.load(Ordering::Acquire);
470
471 // Update the task's state before polling its future.
472 loop {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200473 // If the task has already been closed, drop the task reference and return.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200474 if state & CLOSED != 0 {
475 // Notify the awaiter that the task has been closed.
476 if state & AWAITER != 0 {
477 (*raw.header).notify();
478 }
479
480 // Drop the future.
481 Self::drop_future(ptr);
482
483 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200484 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200485 return;
486 }
487
488 // Mark the task as unscheduled and running.
489 match (*raw.header).state.compare_exchange_weak(
490 state,
491 (state & !SCHEDULED) | RUNNING,
492 Ordering::AcqRel,
493 Ordering::Acquire,
494 ) {
495 Ok(_) => {
496 // Update the state because we're continuing with polling the future.
497 state = (state & !SCHEDULED) | RUNNING;
498 break;
499 }
500 Err(s) => state = s,
501 }
502 }
503
504 // Poll the inner future, but surround it with a guard that closes the task in case polling
505 // panics.
506 let guard = Guard(raw);
507 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
508 mem::forget(guard);
509
510 match poll {
511 Poll::Ready(out) => {
512 // Replace the future with its output.
513 Self::drop_future(ptr);
514 raw.output.write(out);
515
516 // A place where the output will be stored in case it needs to be dropped.
517 let mut output = None;
518
519 // The task is now completed.
520 loop {
521 // If the handle is dropped, we'll need to close it and drop the output.
522 let new = if state & HANDLE == 0 {
523 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
524 } else {
525 (state & !RUNNING & !SCHEDULED) | COMPLETED
526 };
527
528 // Mark the task as not running and completed.
529 match (*raw.header).state.compare_exchange_weak(
530 state,
531 new,
532 Ordering::AcqRel,
533 Ordering::Acquire,
534 ) {
535 Ok(_) => {
536 // If the handle is dropped or if the task was closed while running,
537 // now it's time to drop the output.
538 if state & HANDLE == 0 || state & CLOSED != 0 {
539 // Read the output.
540 output = Some(raw.output.read());
541 }
542
543 // Notify the awaiter that the task has been completed.
544 if state & AWAITER != 0 {
545 (*raw.header).notify();
546 }
547
548 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200549 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200550 break;
551 }
552 Err(s) => state = s,
553 }
554 }
555
556 // Drop the output if it was taken out of the task.
557 drop(output);
558 }
559 Poll::Pending => {
560 // The task is still not completed.
561 loop {
562 // If the task was closed while running, we'll need to unschedule in case it
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200563 // was woken up and then destroy it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200564 let new = if state & CLOSED != 0 {
565 state & !RUNNING & !SCHEDULED
566 } else {
567 state & !RUNNING
568 };
569
570 // Mark the task as not running.
571 match (*raw.header).state.compare_exchange_weak(
572 state,
573 new,
574 Ordering::AcqRel,
575 Ordering::Acquire,
576 ) {
577 Ok(state) => {
578 // If the task was closed while running, we need to drop its future.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200579 // If the task was woken up while running, we need to schedule it.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200580 // Otherwise, we just drop the task reference.
581 if state & CLOSED != 0 {
582 // The thread that closed the task didn't drop the future because
583 // it was running so now it's our responsibility to do so.
584 Self::drop_future(ptr);
585
586 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200587 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200588 } else if state & SCHEDULED != 0 {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200589 // The thread that woke the task up didn't reschedule it because
Stjepan Glavina1479e862019-08-12 20:18:51 +0200590 // it was running so now it's our responsibility to do so.
591 Self::schedule(ptr);
592 } else {
593 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200594 Self::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200595 }
596 break;
597 }
598 Err(s) => state = s,
599 }
600 }
601 }
602 }
603
604 /// A guard that closes the task if polling its future panics.
605 struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
606 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100607 F: Future<Output = R> + 'static,
608 S: Fn(Task<T>) + Send + Sync + 'static;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200609
610 impl<F, R, S, T> Drop for Guard<F, R, S, T>
611 where
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100612 F: Future<Output = R> + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200613 S: Fn(Task<T>) + Send + Sync + 'static,
Stjepan Glavina1479e862019-08-12 20:18:51 +0200614 {
615 fn drop(&mut self) {
616 let raw = self.0;
617 let ptr = raw.header as *const ();
618
619 unsafe {
620 let mut state = (*raw.header).state.load(Ordering::Acquire);
621
622 loop {
623 // If the task was closed while running, then unschedule it, drop its
624 // future, and drop the task reference.
625 if state & CLOSED != 0 {
626 // We still need to unschedule the task because it is possible it was
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200627 // woken up while running.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200628 (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
629
630 // The thread that closed the task didn't drop the future because it
631 // was running so now it's our responsibility to do so.
632 RawTask::<F, R, S, T>::drop_future(ptr);
633
634 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200635 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200636 break;
637 }
638
639 // Mark the task as not running, not scheduled, and closed.
640 match (*raw.header).state.compare_exchange_weak(
641 state,
642 (state & !RUNNING & !SCHEDULED) | CLOSED,
643 Ordering::AcqRel,
644 Ordering::Acquire,
645 ) {
646 Ok(state) => {
647 // Drop the future because the task is now closed.
648 RawTask::<F, R, S, T>::drop_future(ptr);
649
650 // Notify the awaiter that the task has been closed.
651 if state & AWAITER != 0 {
652 (*raw.header).notify();
653 }
654
655 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200656 RawTask::<F, R, S, T>::drop_task(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200657 break;
658 }
659 Err(s) => state = s,
660 }
661 }
662 }
663 }
664 }
665 }
666}