blob: 10a189e96a2fb0de307bd428fec0f7b96166340f [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::fmt;
2use core::future::Future;
3use core::marker::{PhantomData, Unpin};
4use core::pin::Pin;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
Stjepan Glavinaaf051a52020-01-06 15:25:52 -06007use core::task::{Context, Poll, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +02008
9use crate::header::Header;
10use crate::state::*;
Stjepan Glavina1479e862019-08-12 20:18:51 +020011
12/// A handle that awaits the result of a task.
13///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020014/// This type is a future that resolves to an `Option<R>` where:
Stjepan Glavina1479e862019-08-12 20:18:51 +020015///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020016/// * `None` indicates the task has panicked or was cancelled.
17/// * `Some(result)` indicates the task has completed with `result` of type `R`.
Stjepan Glavina1479e862019-08-12 20:18:51 +020018pub struct JoinHandle<R, T> {
19 /// A raw task pointer.
20 pub(crate) raw_task: NonNull<()>,
21
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020022 /// A marker capturing generic types `R` and `T`.
Stjepan Glavina1479e862019-08-12 20:18:51 +020023 pub(crate) _marker: PhantomData<(R, T)>,
24}
25
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010026unsafe impl<R: Send, T> Send for JoinHandle<R, T> {}
Stjepan Glavina1479e862019-08-12 20:18:51 +020027unsafe impl<R, T> Sync for JoinHandle<R, T> {}
28
29impl<R, T> Unpin for JoinHandle<R, T> {}
30
31impl<R, T> JoinHandle<R, T> {
32 /// Cancels the task.
33 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020034 /// If the task has already completed, calling this method will have no effect.
Stjepan Glavina1479e862019-08-12 20:18:51 +020035 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020036 /// When a task is cancelled, its future will not be polled again.
Stjepan Glavina1479e862019-08-12 20:18:51 +020037 pub fn cancel(&self) {
38 let ptr = self.raw_task.as_ptr();
39 let header = ptr as *const Header;
40
41 unsafe {
42 let mut state = (*header).state.load(Ordering::Acquire);
43
44 loop {
45 // If the task has been completed or closed, it can't be cancelled.
46 if state & (COMPLETED | CLOSED) != 0 {
47 break;
48 }
49
50 // If the task is not scheduled nor running, we'll need to schedule it.
51 let new = if state & (SCHEDULED | RUNNING) == 0 {
52 (state | SCHEDULED | CLOSED) + REFERENCE
53 } else {
54 state | CLOSED
55 };
56
57 // Mark the task as closed.
58 match (*header).state.compare_exchange_weak(
59 state,
60 new,
61 Ordering::AcqRel,
62 Ordering::Acquire,
63 ) {
64 Ok(_) => {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020065 // If the task is not scheduled nor running, schedule it one more time so
66 // that its future gets dropped by the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +020067 if state & (SCHEDULED | RUNNING) == 0 {
68 ((*header).vtable.schedule)(ptr);
69 }
70
71 // Notify the awaiter that the task has been closed.
72 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -060073 (*header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +020074 }
75
76 break;
77 }
78 Err(s) => state = s,
79 }
80 }
81 }
82 }
83
84 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020085 pub fn tag(&self) -> &T {
86 let offset = Header::offset_tag::<T>();
87 let ptr = self.raw_task.as_ptr();
88
89 unsafe {
90 let raw = (ptr as *mut u8).add(offset) as *const T;
91 &*raw
92 }
93 }
Stjepan Glavinaaf051a52020-01-06 15:25:52 -060094
95 /// Returns a waker associated with the task.
96 pub fn waker(&self) -> Waker {
97 let ptr = self.raw_task.as_ptr();
98 let header = ptr as *const Header;
99
100 unsafe {
101 let raw_waker = ((*header).vtable.clone_waker)(ptr);
102 Waker::from_raw(raw_waker)
103 }
104 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200105}
106
107impl<R, T> Drop for JoinHandle<R, T> {
108 fn drop(&mut self) {
109 let ptr = self.raw_task.as_ptr();
110 let header = ptr as *const Header;
111
112 // A place where the output will be stored in case it needs to be dropped.
113 let mut output = None;
114
115 unsafe {
116 // Optimistically assume the `JoinHandle` is being dropped just after creating the
117 // task. This is a common case so if the handle is not used, the overhead of it is only
118 // one compare-exchange operation.
119 if let Err(mut state) = (*header).state.compare_exchange_weak(
120 SCHEDULED | HANDLE | REFERENCE,
121 SCHEDULED | REFERENCE,
122 Ordering::AcqRel,
123 Ordering::Acquire,
124 ) {
125 loop {
126 // If the task has been completed but not yet closed, that means its output
127 // must be dropped.
128 if state & COMPLETED != 0 && state & CLOSED == 0 {
129 // Mark the task as closed in order to grab its output.
130 match (*header).state.compare_exchange_weak(
131 state,
132 state | CLOSED,
133 Ordering::AcqRel,
134 Ordering::Acquire,
135 ) {
136 Ok(_) => {
137 // Read the output.
138 output =
139 Some((((*header).vtable.get_output)(ptr) as *mut R).read());
140
141 // Update the state variable because we're continuing the loop.
142 state |= CLOSED;
143 }
144 Err(s) => state = s,
145 }
146 } else {
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200147 // If this is the last reference to the task and it's not closed, then
148 // close it and schedule one more time so that its future gets dropped by
149 // the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200150 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
151 SCHEDULED | CLOSED | REFERENCE
152 } else {
153 state & !HANDLE
154 };
155
156 // Unset the handle flag.
157 match (*header).state.compare_exchange_weak(
158 state,
159 new,
160 Ordering::AcqRel,
161 Ordering::Acquire,
162 ) {
163 Ok(_) => {
164 // If this is the last reference to the task, we need to either
165 // schedule dropping its future or destroy it.
166 if state & !(REFERENCE - 1) == 0 {
167 if state & CLOSED == 0 {
168 ((*header).vtable.schedule)(ptr);
169 } else {
170 ((*header).vtable.destroy)(ptr);
171 }
172 }
173
174 break;
175 }
176 Err(s) => state = s,
177 }
178 }
179 }
180 }
181 }
182
183 // Drop the output if it was taken out of the task.
184 drop(output);
185 }
186}
187
188impl<R, T> Future for JoinHandle<R, T> {
189 type Output = Option<R>;
190
191 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
192 let ptr = self.raw_task.as_ptr();
193 let header = ptr as *const Header;
194
195 unsafe {
196 let mut state = (*header).state.load(Ordering::Acquire);
197
198 loop {
199 // If the task has been closed, notify the awaiter and return `None`.
200 if state & CLOSED != 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200201 // If the task is scheduled or running, we need to wait until its future is
202 // dropped.
203 if state & (SCHEDULED | RUNNING) != 0 {
204 // Replace the waker with one associated with the current task.
205 (*header).register(cx.waker());
206
207 // Reload the state after registering. It is possible changes occurred just
208 // before registration so we need to check for that.
209 state = (*header).state.load(Ordering::Acquire);
210
211 // If the task is still scheduled or running, we need to wait because its
212 // future is not dropped yet.
213 if state & (SCHEDULED | RUNNING) != 0 {
214 return Poll::Pending;
215 }
216 }
217
Stjepan Glavina1479e862019-08-12 20:18:51 +0200218 // Even though the awaiter is most likely the current task, it could also be
219 // another task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600220 (*header).notify(Some(cx.waker()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200221 return Poll::Ready(None);
222 }
223
224 // If the task is not completed, register the current task.
225 if state & COMPLETED == 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200226 // Replace the waker with one associated with the current task.
227 (*header).register(cx.waker());
Stjepan Glavina1479e862019-08-12 20:18:51 +0200228
229 // Reload the state after registering. It is possible that the task became
230 // completed or closed just before registration so we need to check for that.
231 state = (*header).state.load(Ordering::Acquire);
232
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200233 // If the task has been closed, restart.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200234 if state & CLOSED != 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200235 continue;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200236 }
237
238 // If the task is still not completed, we're blocked on it.
239 if state & COMPLETED == 0 {
240 return Poll::Pending;
241 }
242 }
243
244 // Since the task is now completed, mark it as closed in order to grab its output.
245 match (*header).state.compare_exchange(
246 state,
247 state | CLOSED,
248 Ordering::AcqRel,
249 Ordering::Acquire,
250 ) {
251 Ok(_) => {
252 // Notify the awaiter. Even though the awaiter is most likely the current
253 // task, it could also be another task.
254 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600255 (*header).notify(Some(cx.waker()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200256 }
257
258 // Take the output from the task.
259 let output = ((*header).vtable.get_output)(ptr) as *mut R;
260 return Poll::Ready(Some(output.read()));
261 }
262 Err(s) => state = s,
263 }
264 }
265 }
266 }
267}
268
269impl<R, T> fmt::Debug for JoinHandle<R, T> {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 let ptr = self.raw_task.as_ptr();
272 let header = ptr as *const Header;
273
274 f.debug_struct("JoinHandle")
275 .field("header", unsafe { &(*header) })
276 .finish()
277 }
278}