blob: a559e48c0bb26256ac127ca0e824ad63b22f5813 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::fmt;
4use core::sync::atomic::{AtomicUsize, Ordering};
5use core::task::Waker;
Stjepan Glavina1479e862019-08-12 20:18:51 +02006
7use crate::raw::TaskVTable;
8use crate::state::*;
9use crate::utils::{abort_on_panic, extend};
10
11/// The header of a task.
12///
13/// This header is stored right at the beginning of every heap-allocated task.
14pub(crate) struct Header {
15 /// Current state of the task.
16 ///
17 /// Contains flags representing the current state and the reference count.
18 pub(crate) state: AtomicUsize,
19
20 /// The task that is blocked on the `JoinHandle`.
21 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020022 /// This waker needs to be woken up once the task completes or is closed.
Stjepan Glavina921e8a02020-01-06 14:31:28 -060023 pub(crate) awaiter: UnsafeCell<Option<Waker>>,
Stjepan Glavina1479e862019-08-12 20:18:51 +020024
25 /// The virtual table.
26 ///
27 /// In addition to the actual waker virtual table, it also contains pointers to several other
28 /// methods necessary for bookkeeping the heap-allocated task.
29 pub(crate) vtable: &'static TaskVTable,
30}
31
32impl Header {
33 /// Cancels the task.
34 ///
Stjepan Glavinafad623a2020-04-14 14:33:37 +020035 /// This method will mark the task as closed, but it won't reschedule the task or drop its
36 /// future.
Stjepan Glavina1479e862019-08-12 20:18:51 +020037 pub(crate) fn cancel(&self) {
38 let mut state = self.state.load(Ordering::Acquire);
39
40 loop {
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +020041 // If the task has been completed or closed, it can't be canceled.
Stjepan Glavina1479e862019-08-12 20:18:51 +020042 if state & (COMPLETED | CLOSED) != 0 {
43 break;
44 }
45
46 // Mark the task as closed.
47 match self.state.compare_exchange_weak(
48 state,
49 state | CLOSED,
50 Ordering::AcqRel,
51 Ordering::Acquire,
52 ) {
Stjepan Glavinafad623a2020-04-14 14:33:37 +020053 Ok(_) => break,
Stjepan Glavina1479e862019-08-12 20:18:51 +020054 Err(s) => state = s,
55 }
56 }
57 }
58
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020059 /// Notifies the awaiter blocked on this task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020060 ///
Stjepan Glavina921e8a02020-01-06 14:31:28 -060061 /// If the awaiter is the same as the current waker, it will not be notified.
Stjepan Glavina1479e862019-08-12 20:18:51 +020062 #[inline]
Stjepan Glavina921e8a02020-01-06 14:31:28 -060063 pub(crate) fn notify(&self, current: Option<&Waker>) {
64 // Mark the awaiter as being notified.
65 let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
Stjepan Glavina1479e862019-08-12 20:18:51 +020066
Stjepan Glavina921e8a02020-01-06 14:31:28 -060067 // If the awaiter was not being notified nor registered...
68 if state & (NOTIFYING | REGISTERING) == 0 {
69 // Take the waker out.
70 let waker = unsafe { (*self.awaiter.get()).take() };
71
72 // Mark the state as not being notified anymore nor containing an awaiter.
73 self.state
74 .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
75
76 if let Some(w) = waker {
Stjepan Glavina1479e862019-08-12 20:18:51 +020077 // We need a safeguard against panics because waking can panic.
Stjepan Glavina921e8a02020-01-06 14:31:28 -060078 abort_on_panic(|| match current {
79 None => w.wake(),
80 Some(c) if !w.will_wake(c) => w.wake(),
81 Some(_) => {}
Stjepan Glavina1479e862019-08-12 20:18:51 +020082 });
83 }
84 }
85 }
86
Stjepan Glavina921e8a02020-01-06 14:31:28 -060087 /// Registers a new awaiter blocked on this task.
88 ///
89 /// This method is called when `JoinHandle` is polled and the task has not completed.
Stjepan Glavina1479e862019-08-12 20:18:51 +020090 #[inline]
Stjepan Glavina921e8a02020-01-06 14:31:28 -060091 pub(crate) fn register(&self, waker: &Waker) {
92 // Load the state and synchronize with it.
93 let mut state = self.state.fetch_or(0, Ordering::Acquire);
Stjepan Glavina1479e862019-08-12 20:18:51 +020094
Stjepan Glavina1479e862019-08-12 20:18:51 +020095 loop {
Stjepan Glavina921e8a02020-01-06 14:31:28 -060096 // There can't be two concurrent registrations because `JoinHandle` can only be polled
97 // by a unique pinned reference.
98 debug_assert!(state & REGISTERING == 0);
Stjepan Glavina1479e862019-08-12 20:18:51 +020099
Stjepan Glavinaf4ae6f52020-01-06 21:49:10 +0100100 // If we're in the notifying state at this moment, just wake and return without
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600101 // registering.
102 if state & NOTIFYING != 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200103 abort_on_panic(|| waker.wake_by_ref());
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600104 return;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200105 }
106
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600107 // Mark the state to let other threads know we're registering a new awaiter.
108 match self.state.compare_exchange_weak(
109 state,
110 state | REGISTERING,
111 Ordering::AcqRel,
112 Ordering::Acquire,
113 ) {
114 Ok(_) => {
115 state |= REGISTERING;
116 break;
117 }
118 Err(s) => state = s,
119 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200120 }
121
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600122 // Put the waker into the awaiter field.
123 unsafe {
124 abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200125 }
126
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600127 // This variable will contain the newly registered waker if a notification comes in before
128 // we complete registration.
129 let mut waker = None;
130
131 loop {
132 // If there was a notification, take the waker out of the awaiter field.
133 if state & NOTIFYING != 0 {
134 if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200135 abort_on_panic(|| waker = Some(w));
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600136 }
137 }
138
139 // The new state is not being notified nor registered, but there might or might not be
140 // an awaiter depending on whether there was a concurrent notification.
141 let new = if waker.is_none() {
142 (state & !NOTIFYING & !REGISTERING) | AWAITER
143 } else {
144 state & !NOTIFYING & !REGISTERING & !AWAITER
145 };
146
147 match self
148 .state
149 .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
150 {
151 Ok(_) => break,
152 Err(s) => state = s,
153 }
154 }
155
156 // If there was a notification during registration, wake the awaiter now.
157 if let Some(w) = waker {
158 abort_on_panic(|| w.wake());
159 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200160 }
161
162 /// Returns the offset at which the tag of type `T` is stored.
163 #[inline]
164 pub(crate) fn offset_tag<T>() -> usize {
165 let layout_header = Layout::new::<Header>();
166 let layout_t = Layout::new::<T>();
167 let (_, offset_t) = extend(layout_header, layout_t);
168 offset_t
169 }
170}
171
172impl fmt::Debug for Header {
173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 let state = self.state.load(Ordering::SeqCst);
175
176 f.debug_struct("Header")
177 .field("scheduled", &(state & SCHEDULED != 0))
178 .field("running", &(state & RUNNING != 0))
179 .field("completed", &(state & COMPLETED != 0))
180 .field("closed", &(state & CLOSED != 0))
181 .field("awaiter", &(state & AWAITER != 0))
182 .field("handle", &(state & HANDLE != 0))
183 .field("ref_count", &(state / REFERENCE))
184 .finish()
185 }
186}