blob: 0ce51645dd203071fe589464e54ff87cd9a64484 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001use std::alloc::Layout;
2use std::cell::Cell;
3use std::fmt;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::task::Waker;
6
7use crossbeam_utils::Backoff;
8
9use crate::raw::TaskVTable;
10use crate::state::*;
11use crate::utils::{abort_on_panic, extend};
12
13/// The header of a task.
14///
15/// This header is stored right at the beginning of every heap-allocated task.
16pub(crate) struct Header {
17 /// Current state of the task.
18 ///
19 /// Contains flags representing the current state and the reference count.
20 pub(crate) state: AtomicUsize,
21
22 /// The task that is blocked on the `JoinHandle`.
23 ///
24 /// This waker needs to be woken once the task completes or is closed.
25 pub(crate) awaiter: Cell<Option<Waker>>,
26
27 /// The virtual table.
28 ///
29 /// In addition to the actual waker virtual table, it also contains pointers to several other
30 /// methods necessary for bookkeeping the heap-allocated task.
31 pub(crate) vtable: &'static TaskVTable,
32}
33
34impl Header {
35 /// Cancels the task.
36 ///
37 /// This method will only mark the task as closed and will notify the awaiter, but it won't
38 /// reschedule the task if it's not completed.
39 pub(crate) fn cancel(&self) {
40 let mut state = self.state.load(Ordering::Acquire);
41
42 loop {
43 // If the task has been completed or closed, it can't be cancelled.
44 if state & (COMPLETED | CLOSED) != 0 {
45 break;
46 }
47
48 // Mark the task as closed.
49 match self.state.compare_exchange_weak(
50 state,
51 state | CLOSED,
52 Ordering::AcqRel,
53 Ordering::Acquire,
54 ) {
55 Ok(_) => {
56 // Notify the awaiter that the task has been closed.
57 if state & AWAITER != 0 {
58 self.notify();
59 }
60
61 break;
62 }
63 Err(s) => state = s,
64 }
65 }
66 }
67
68 /// Notifies the task blocked on the task.
69 ///
70 /// If there is a registered waker, it will be removed from the header and woken.
71 #[inline]
72 pub(crate) fn notify(&self) {
73 if let Some(waker) = self.swap_awaiter(None) {
74 // We need a safeguard against panics because waking can panic.
75 abort_on_panic(|| {
76 waker.wake();
77 });
78 }
79 }
80
81 /// Notifies the task blocked on the task unless its waker matches `current`.
82 ///
83 /// If there is a registered waker, it will be removed from the header.
84 #[inline]
85 pub(crate) fn notify_unless(&self, current: &Waker) {
86 if let Some(waker) = self.swap_awaiter(None) {
87 if !waker.will_wake(current) {
88 // We need a safeguard against panics because waking can panic.
89 abort_on_panic(|| {
90 waker.wake();
91 });
92 }
93 }
94 }
95
96 /// Swaps the awaiter and returns the previous value.
97 #[inline]
98 pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
99 let new_is_none = new.is_none();
100
101 // We're about to try acquiring the lock in a loop. If it's already being held by another
102 // thread, we'll have to spin for a while so it's best to employ a backoff strategy.
103 let backoff = Backoff::new();
104 loop {
105 // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
106 let state = if new_is_none {
107 self.state.fetch_or(LOCKED, Ordering::Acquire)
108 } else {
109 self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire)
110 };
111
112 // If the lock was acquired, break from the loop.
113 if state & LOCKED == 0 {
114 break;
115 }
116
117 // Snooze for a little while because the lock is held by another thread.
118 backoff.snooze();
119 }
120
121 // Replace the awaiter.
122 let old = self.awaiter.replace(new);
123
124 // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
125 if new_is_none {
126 self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release);
127 } else {
128 self.state.fetch_and(!LOCKED, Ordering::Release);
129 }
130
131 old
132 }
133
134 /// Returns the offset at which the tag of type `T` is stored.
135 #[inline]
136 pub(crate) fn offset_tag<T>() -> usize {
137 let layout_header = Layout::new::<Header>();
138 let layout_t = Layout::new::<T>();
139 let (_, offset_t) = extend(layout_header, layout_t);
140 offset_t
141 }
142}
143
144impl fmt::Debug for Header {
145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 let state = self.state.load(Ordering::SeqCst);
147
148 f.debug_struct("Header")
149 .field("scheduled", &(state & SCHEDULED != 0))
150 .field("running", &(state & RUNNING != 0))
151 .field("completed", &(state & COMPLETED != 0))
152 .field("closed", &(state & CLOSED != 0))
153 .field("awaiter", &(state & AWAITER != 0))
154 .field("handle", &(state & HANDLE != 0))
155 .field("ref_count", &(state / REFERENCE))
156 .finish()
157 }
158}