blob: 82fcf7c88ea96a92cb7669d4921dd2783ec88c74 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::cell::UnsafeCell;
2use core::fmt;
3use core::sync::atomic::{AtomicUsize, Ordering};
4use core::task::Waker;
Stjepan Glavina1479e862019-08-12 20:18:51 +02005
6use crate::raw::TaskVTable;
7use crate::state::*;
Stjepan Glavina42e3a692020-09-17 14:58:40 +02008use crate::utils::abort_on_panic;
Stjepan Glavina1479e862019-08-12 20:18:51 +02009
10/// The header of a task.
11///
12/// This header is stored right at the beginning of every heap-allocated task.
13pub(crate) struct Header {
14 /// Current state of the task.
15 ///
16 /// Contains flags representing the current state and the reference count.
17 pub(crate) state: AtomicUsize,
18
19 /// The task that is blocked on the `JoinHandle`.
20 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020021 /// This waker needs to be woken up once the task completes or is closed.
Stjepan Glavina921e8a02020-01-06 14:31:28 -060022 pub(crate) awaiter: UnsafeCell<Option<Waker>>,
Stjepan Glavina1479e862019-08-12 20:18:51 +020023
24 /// The virtual table.
25 ///
26 /// In addition to the actual waker virtual table, it also contains pointers to several other
27 /// methods necessary for bookkeeping the heap-allocated task.
28 pub(crate) vtable: &'static TaskVTable,
29}
30
31impl Header {
32 /// Cancels the task.
33 ///
Stjepan Glavinafad623a2020-04-14 14:33:37 +020034 /// This method will mark the task as closed, but it won't reschedule the task or drop its
35 /// future.
Stjepan Glavina1479e862019-08-12 20:18:51 +020036 pub(crate) fn cancel(&self) {
37 let mut state = self.state.load(Ordering::Acquire);
38
39 loop {
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +020040 // If the task has been completed or closed, it can't be canceled.
Stjepan Glavina1479e862019-08-12 20:18:51 +020041 if state & (COMPLETED | CLOSED) != 0 {
42 break;
43 }
44
45 // Mark the task as closed.
46 match self.state.compare_exchange_weak(
47 state,
48 state | CLOSED,
49 Ordering::AcqRel,
50 Ordering::Acquire,
51 ) {
Stjepan Glavinafad623a2020-04-14 14:33:37 +020052 Ok(_) => break,
Stjepan Glavina1479e862019-08-12 20:18:51 +020053 Err(s) => state = s,
54 }
55 }
56 }
57
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020058 /// Notifies the awaiter blocked on this task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020059 ///
Stjepan Glavina921e8a02020-01-06 14:31:28 -060060 /// If the awaiter is the same as the current waker, it will not be notified.
Stjepan Glavina1479e862019-08-12 20:18:51 +020061 #[inline]
Stjepan Glavina921e8a02020-01-06 14:31:28 -060062 pub(crate) fn notify(&self, current: Option<&Waker>) {
63 // Mark the awaiter as being notified.
64 let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
Stjepan Glavina1479e862019-08-12 20:18:51 +020065
Stjepan Glavina921e8a02020-01-06 14:31:28 -060066 // If the awaiter was not being notified nor registered...
67 if state & (NOTIFYING | REGISTERING) == 0 {
68 // Take the waker out.
69 let waker = unsafe { (*self.awaiter.get()).take() };
70
71 // Mark the state as not being notified anymore nor containing an awaiter.
72 self.state
73 .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
74
75 if let Some(w) = waker {
Stjepan Glavina1479e862019-08-12 20:18:51 +020076 // We need a safeguard against panics because waking can panic.
Stjepan Glavina921e8a02020-01-06 14:31:28 -060077 abort_on_panic(|| match current {
78 None => w.wake(),
79 Some(c) if !w.will_wake(c) => w.wake(),
80 Some(_) => {}
Stjepan Glavina1479e862019-08-12 20:18:51 +020081 });
82 }
83 }
84 }
85
Stjepan Glavina921e8a02020-01-06 14:31:28 -060086 /// Registers a new awaiter blocked on this task.
87 ///
88 /// This method is called when `JoinHandle` is polled and the task has not completed.
Stjepan Glavina1479e862019-08-12 20:18:51 +020089 #[inline]
Stjepan Glavina921e8a02020-01-06 14:31:28 -060090 pub(crate) fn register(&self, waker: &Waker) {
91 // Load the state and synchronize with it.
92 let mut state = self.state.fetch_or(0, Ordering::Acquire);
Stjepan Glavina1479e862019-08-12 20:18:51 +020093
Stjepan Glavina1479e862019-08-12 20:18:51 +020094 loop {
Stjepan Glavina921e8a02020-01-06 14:31:28 -060095 // There can't be two concurrent registrations because `JoinHandle` can only be polled
96 // by a unique pinned reference.
97 debug_assert!(state & REGISTERING == 0);
Stjepan Glavina1479e862019-08-12 20:18:51 +020098
Stjepan Glavinaf4ae6f52020-01-06 21:49:10 +010099 // If we're in the notifying state at this moment, just wake and return without
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600100 // registering.
101 if state & NOTIFYING != 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200102 abort_on_panic(|| waker.wake_by_ref());
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600103 return;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200104 }
105
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600106 // Mark the state to let other threads know we're registering a new awaiter.
107 match self.state.compare_exchange_weak(
108 state,
109 state | REGISTERING,
110 Ordering::AcqRel,
111 Ordering::Acquire,
112 ) {
113 Ok(_) => {
114 state |= REGISTERING;
115 break;
116 }
117 Err(s) => state = s,
118 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200119 }
120
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600121 // Put the waker into the awaiter field.
122 unsafe {
123 abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200124 }
125
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600126 // This variable will contain the newly registered waker if a notification comes in before
127 // we complete registration.
128 let mut waker = None;
129
130 loop {
131 // If there was a notification, take the waker out of the awaiter field.
132 if state & NOTIFYING != 0 {
133 if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200134 abort_on_panic(|| waker = Some(w));
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600135 }
136 }
137
138 // The new state is not being notified nor registered, but there might or might not be
139 // an awaiter depending on whether there was a concurrent notification.
140 let new = if waker.is_none() {
141 (state & !NOTIFYING & !REGISTERING) | AWAITER
142 } else {
143 state & !NOTIFYING & !REGISTERING & !AWAITER
144 };
145
146 match self
147 .state
148 .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
149 {
150 Ok(_) => break,
151 Err(s) => state = s,
152 }
153 }
154
155 // If there was a notification during registration, wake the awaiter now.
156 if let Some(w) = waker {
157 abort_on_panic(|| w.wake());
158 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200159 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200160}
161
162impl fmt::Debug for Header {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 let state = self.state.load(Ordering::SeqCst);
165
166 f.debug_struct("Header")
167 .field("scheduled", &(state & SCHEDULED != 0))
168 .field("running", &(state & RUNNING != 0))
169 .field("completed", &(state & COMPLETED != 0))
170 .field("closed", &(state & CLOSED != 0))
171 .field("awaiter", &(state & AWAITER != 0))
172 .field("handle", &(state & HANDLE != 0))
173 .field("ref_count", &(state / REFERENCE))
174 .finish()
175 }
176}