blob: e676befb34593322f622665a7b81a326c7ff1d4a [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::fmt;
4use core::sync::atomic::{AtomicUsize, Ordering};
5use core::task::Waker;
Stjepan Glavina1479e862019-08-12 20:18:51 +02006
7use crate::raw::TaskVTable;
8use crate::state::*;
9use crate::utils::{abort_on_panic, extend};
10
11/// The header of a task.
12///
13/// This header is stored right at the beginning of every heap-allocated task.
14pub(crate) struct Header {
15 /// Current state of the task.
16 ///
17 /// Contains flags representing the current state and the reference count.
18 pub(crate) state: AtomicUsize,
19
20 /// The task that is blocked on the `JoinHandle`.
21 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020022 /// This waker needs to be woken up once the task completes or is closed.
Stjepan Glavina921e8a02020-01-06 14:31:28 -060023 pub(crate) awaiter: UnsafeCell<Option<Waker>>,
Stjepan Glavina1479e862019-08-12 20:18:51 +020024
25 /// The virtual table.
26 ///
27 /// In addition to the actual waker virtual table, it also contains pointers to several other
28 /// methods necessary for bookkeeping the heap-allocated task.
29 pub(crate) vtable: &'static TaskVTable,
30}
31
32impl Header {
33 /// Cancels the task.
34 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020035 /// This method will mark the task as closed and notify the awaiter, but it won't reschedule
36 /// the task if it's not completed.
Stjepan Glavina1479e862019-08-12 20:18:51 +020037 pub(crate) fn cancel(&self) {
38 let mut state = self.state.load(Ordering::Acquire);
39
40 loop {
41 // If the task has been completed or closed, it can't be cancelled.
42 if state & (COMPLETED | CLOSED) != 0 {
43 break;
44 }
45
46 // Mark the task as closed.
47 match self.state.compare_exchange_weak(
48 state,
49 state | CLOSED,
50 Ordering::AcqRel,
51 Ordering::Acquire,
52 ) {
53 Ok(_) => {
54 // Notify the awaiter that the task has been closed.
55 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -060056 self.notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +020057 }
58
59 break;
60 }
61 Err(s) => state = s,
62 }
63 }
64 }
65
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020066 /// Notifies the awaiter blocked on this task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020067 ///
Stjepan Glavina921e8a02020-01-06 14:31:28 -060068 /// If the awaiter is the same as the current waker, it will not be notified.
Stjepan Glavina1479e862019-08-12 20:18:51 +020069 #[inline]
Stjepan Glavina921e8a02020-01-06 14:31:28 -060070 pub(crate) fn notify(&self, current: Option<&Waker>) {
71 // Mark the awaiter as being notified.
72 let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
Stjepan Glavina1479e862019-08-12 20:18:51 +020073
Stjepan Glavina921e8a02020-01-06 14:31:28 -060074 // If the awaiter was not being notified nor registered...
75 if state & (NOTIFYING | REGISTERING) == 0 {
76 // Take the waker out.
77 let waker = unsafe { (*self.awaiter.get()).take() };
78
79 // Mark the state as not being notified anymore nor containing an awaiter.
80 self.state
81 .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
82
83 if let Some(w) = waker {
Stjepan Glavina1479e862019-08-12 20:18:51 +020084 // We need a safeguard against panics because waking can panic.
Stjepan Glavina921e8a02020-01-06 14:31:28 -060085 abort_on_panic(|| match current {
86 None => w.wake(),
87 Some(c) if !w.will_wake(c) => w.wake(),
88 Some(_) => {}
Stjepan Glavina1479e862019-08-12 20:18:51 +020089 });
90 }
91 }
92 }
93
Stjepan Glavina921e8a02020-01-06 14:31:28 -060094 /// Registers a new awaiter blocked on this task.
95 ///
96 /// This method is called when `JoinHandle` is polled and the task has not completed.
Stjepan Glavina1479e862019-08-12 20:18:51 +020097 #[inline]
Stjepan Glavina921e8a02020-01-06 14:31:28 -060098 pub(crate) fn register(&self, waker: &Waker) {
99 // Load the state and synchronize with it.
100 let mut state = self.state.fetch_or(0, Ordering::Acquire);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200101
Stjepan Glavina1479e862019-08-12 20:18:51 +0200102 loop {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600103 // There can't be two concurrent registrations because `JoinHandle` can only be polled
104 // by a unique pinned reference.
105 debug_assert!(state & REGISTERING == 0);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200106
Stjepan Glavinaf4ae6f52020-01-06 21:49:10 +0100107 // If we're in the notifying state at this moment, just wake and return without
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600108 // registering.
109 if state & NOTIFYING != 0 {
110 waker.wake_by_ref();
111 return;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200112 }
113
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600114 // Mark the state to let other threads know we're registering a new awaiter.
115 match self.state.compare_exchange_weak(
116 state,
117 state | REGISTERING,
118 Ordering::AcqRel,
119 Ordering::Acquire,
120 ) {
121 Ok(_) => {
122 state |= REGISTERING;
123 break;
124 }
125 Err(s) => state = s,
126 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200127 }
128
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600129 // Put the waker into the awaiter field.
130 unsafe {
131 abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200132 }
133
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600134 // This variable will contain the newly registered waker if a notification comes in before
135 // we complete registration.
136 let mut waker = None;
137
138 loop {
139 // If there was a notification, take the waker out of the awaiter field.
140 if state & NOTIFYING != 0 {
141 if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
142 waker = Some(w);
143 }
144 }
145
146 // The new state is not being notified nor registered, but there might or might not be
147 // an awaiter depending on whether there was a concurrent notification.
148 let new = if waker.is_none() {
149 (state & !NOTIFYING & !REGISTERING) | AWAITER
150 } else {
151 state & !NOTIFYING & !REGISTERING & !AWAITER
152 };
153
154 match self
155 .state
156 .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
157 {
158 Ok(_) => break,
159 Err(s) => state = s,
160 }
161 }
162
163 // If there was a notification during registration, wake the awaiter now.
164 if let Some(w) = waker {
165 abort_on_panic(|| w.wake());
166 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200167 }
168
169 /// Returns the offset at which the tag of type `T` is stored.
170 #[inline]
171 pub(crate) fn offset_tag<T>() -> usize {
172 let layout_header = Layout::new::<Header>();
173 let layout_t = Layout::new::<T>();
174 let (_, offset_t) = extend(layout_header, layout_t);
175 offset_t
176 }
177}
178
179impl fmt::Debug for Header {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 let state = self.state.load(Ordering::SeqCst);
182
183 f.debug_struct("Header")
184 .field("scheduled", &(state & SCHEDULED != 0))
185 .field("running", &(state & RUNNING != 0))
186 .field("completed", &(state & COMPLETED != 0))
187 .field("closed", &(state & CLOSED != 0))
188 .field("awaiter", &(state & AWAITER != 0))
189 .field("handle", &(state & HANDLE != 0))
190 .field("ref_count", &(state / REFERENCE))
191 .finish()
192 }
193}