blob: 487adf5ca0cf9e69c2ca66f2574fb2b13f013f27 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::fmt;
2use core::future::Future;
3use core::marker::{PhantomData, Unpin};
4use core::pin::Pin;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
Stjepan Glavinae5d57b72020-09-17 15:02:38 +02007use core::task::{Context, Poll};
Stjepan Glavina1479e862019-08-12 20:18:51 +02008
9use crate::header::Header;
10use crate::state::*;
Stjepan Glavina1479e862019-08-12 20:18:51 +020011
12/// A handle that awaits the result of a task.
13///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020014/// This type is a future that resolves to an `Option<R>` where:
Stjepan Glavina1479e862019-08-12 20:18:51 +020015///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +020016/// * `None` indicates the task has panicked or was canceled.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020017/// * `Some(result)` indicates the task has completed with `result` of type `R`.
Stjepan Glavina42e3a692020-09-17 14:58:40 +020018pub struct JoinHandle<R> {
Stjepan Glavina1479e862019-08-12 20:18:51 +020019 /// A raw task pointer.
20 pub(crate) raw_task: NonNull<()>,
21
Stjepan Glavina42e3a692020-09-17 14:58:40 +020022 /// A marker capturing generic type `R`.
23 pub(crate) _marker: PhantomData<R>,
Stjepan Glavina1479e862019-08-12 20:18:51 +020024}
25
Stjepan Glavina42e3a692020-09-17 14:58:40 +020026unsafe impl<R: Send> Send for JoinHandle<R> {}
27unsafe impl<R> Sync for JoinHandle<R> {}
Stjepan Glavina1479e862019-08-12 20:18:51 +020028
Stjepan Glavina42e3a692020-09-17 14:58:40 +020029impl<R> Unpin for JoinHandle<R> {}
Stjepan Glavina1479e862019-08-12 20:18:51 +020030
Stjepan Glavina42e3a692020-09-17 14:58:40 +020031impl<R> JoinHandle<R> {
Stjepan Glavina1479e862019-08-12 20:18:51 +020032 /// Cancels the task.
33 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020034 /// If the task has already completed, calling this method will have no effect.
Stjepan Glavina1479e862019-08-12 20:18:51 +020035 ///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +020036 /// When a task is canceled, its future will not be polled again.
Stjepan Glavina1479e862019-08-12 20:18:51 +020037 pub fn cancel(&self) {
38 let ptr = self.raw_task.as_ptr();
39 let header = ptr as *const Header;
40
41 unsafe {
42 let mut state = (*header).state.load(Ordering::Acquire);
43
44 loop {
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +020045 // If the task has been completed or closed, it can't be canceled.
Stjepan Glavina1479e862019-08-12 20:18:51 +020046 if state & (COMPLETED | CLOSED) != 0 {
47 break;
48 }
49
50 // If the task is not scheduled nor running, we'll need to schedule it.
51 let new = if state & (SCHEDULED | RUNNING) == 0 {
52 (state | SCHEDULED | CLOSED) + REFERENCE
53 } else {
54 state | CLOSED
55 };
56
57 // Mark the task as closed.
58 match (*header).state.compare_exchange_weak(
59 state,
60 new,
61 Ordering::AcqRel,
62 Ordering::Acquire,
63 ) {
64 Ok(_) => {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020065 // If the task is not scheduled nor running, schedule it one more time so
66 // that its future gets dropped by the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +020067 if state & (SCHEDULED | RUNNING) == 0 {
68 ((*header).vtable.schedule)(ptr);
69 }
70
71 // Notify the awaiter that the task has been closed.
72 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -060073 (*header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +020074 }
75
76 break;
77 }
78 Err(s) => state = s,
79 }
80 }
81 }
82 }
Stjepan Glavina1479e862019-08-12 20:18:51 +020083}
84
Stjepan Glavina42e3a692020-09-17 14:58:40 +020085impl<R> Drop for JoinHandle<R> {
Stjepan Glavina1479e862019-08-12 20:18:51 +020086 fn drop(&mut self) {
87 let ptr = self.raw_task.as_ptr();
88 let header = ptr as *const Header;
89
90 // A place where the output will be stored in case it needs to be dropped.
91 let mut output = None;
92
93 unsafe {
94 // Optimistically assume the `JoinHandle` is being dropped just after creating the
95 // task. This is a common case so if the handle is not used, the overhead of it is only
96 // one compare-exchange operation.
97 if let Err(mut state) = (*header).state.compare_exchange_weak(
98 SCHEDULED | HANDLE | REFERENCE,
99 SCHEDULED | REFERENCE,
100 Ordering::AcqRel,
101 Ordering::Acquire,
102 ) {
103 loop {
104 // If the task has been completed but not yet closed, that means its output
105 // must be dropped.
106 if state & COMPLETED != 0 && state & CLOSED == 0 {
107 // Mark the task as closed in order to grab its output.
108 match (*header).state.compare_exchange_weak(
109 state,
110 state | CLOSED,
111 Ordering::AcqRel,
112 Ordering::Acquire,
113 ) {
114 Ok(_) => {
115 // Read the output.
116 output =
117 Some((((*header).vtable.get_output)(ptr) as *mut R).read());
118
119 // Update the state variable because we're continuing the loop.
120 state |= CLOSED;
121 }
122 Err(s) => state = s,
123 }
124 } else {
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200125 // If this is the last reference to the task and it's not closed, then
126 // close it and schedule one more time so that its future gets dropped by
127 // the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200128 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
129 SCHEDULED | CLOSED | REFERENCE
130 } else {
131 state & !HANDLE
132 };
133
134 // Unset the handle flag.
135 match (*header).state.compare_exchange_weak(
136 state,
137 new,
138 Ordering::AcqRel,
139 Ordering::Acquire,
140 ) {
141 Ok(_) => {
142 // If this is the last reference to the task, we need to either
143 // schedule dropping its future or destroy it.
144 if state & !(REFERENCE - 1) == 0 {
145 if state & CLOSED == 0 {
146 ((*header).vtable.schedule)(ptr);
147 } else {
148 ((*header).vtable.destroy)(ptr);
149 }
150 }
151
152 break;
153 }
154 Err(s) => state = s,
155 }
156 }
157 }
158 }
159 }
160
161 // Drop the output if it was taken out of the task.
162 drop(output);
163 }
164}
165
Stjepan Glavina42e3a692020-09-17 14:58:40 +0200166impl<R> Future for JoinHandle<R> {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200167 type Output = Option<R>;
168
169 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170 let ptr = self.raw_task.as_ptr();
171 let header = ptr as *const Header;
172
173 unsafe {
174 let mut state = (*header).state.load(Ordering::Acquire);
175
176 loop {
177 // If the task has been closed, notify the awaiter and return `None`.
178 if state & CLOSED != 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200179 // If the task is scheduled or running, we need to wait until its future is
180 // dropped.
181 if state & (SCHEDULED | RUNNING) != 0 {
182 // Replace the waker with one associated with the current task.
183 (*header).register(cx.waker());
184
185 // Reload the state after registering. It is possible changes occurred just
186 // before registration so we need to check for that.
187 state = (*header).state.load(Ordering::Acquire);
188
189 // If the task is still scheduled or running, we need to wait because its
190 // future is not dropped yet.
191 if state & (SCHEDULED | RUNNING) != 0 {
192 return Poll::Pending;
193 }
194 }
195
Stjepan Glavina1479e862019-08-12 20:18:51 +0200196 // Even though the awaiter is most likely the current task, it could also be
197 // another task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600198 (*header).notify(Some(cx.waker()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200199 return Poll::Ready(None);
200 }
201
202 // If the task is not completed, register the current task.
203 if state & COMPLETED == 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200204 // Replace the waker with one associated with the current task.
205 (*header).register(cx.waker());
Stjepan Glavina1479e862019-08-12 20:18:51 +0200206
207 // Reload the state after registering. It is possible that the task became
208 // completed or closed just before registration so we need to check for that.
209 state = (*header).state.load(Ordering::Acquire);
210
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200211 // If the task has been closed, restart.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200212 if state & CLOSED != 0 {
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200213 continue;
Stjepan Glavina1479e862019-08-12 20:18:51 +0200214 }
215
216 // If the task is still not completed, we're blocked on it.
217 if state & COMPLETED == 0 {
218 return Poll::Pending;
219 }
220 }
221
222 // Since the task is now completed, mark it as closed in order to grab its output.
223 match (*header).state.compare_exchange(
224 state,
225 state | CLOSED,
226 Ordering::AcqRel,
227 Ordering::Acquire,
228 ) {
229 Ok(_) => {
230 // Notify the awaiter. Even though the awaiter is most likely the current
231 // task, it could also be another task.
232 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600233 (*header).notify(Some(cx.waker()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200234 }
235
236 // Take the output from the task.
237 let output = ((*header).vtable.get_output)(ptr) as *mut R;
238 return Poll::Ready(Some(output.read()));
239 }
240 Err(s) => state = s,
241 }
242 }
243 }
244 }
245}
246
Stjepan Glavina42e3a692020-09-17 14:58:40 +0200247impl<R> fmt::Debug for JoinHandle<R> {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 let ptr = self.raw_task.as_ptr();
250 let header = ptr as *const Header;
251
252 f.debug_struct("JoinHandle")
253 .field("header", unsafe { &(*header) })
254 .finish()
255 }
256}