blob: f4710cc3aa3a15c2a968aa2b029fd089ba68671b [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::fmt;
2use core::future::Future;
3use core::marker::{PhantomData, Unpin};
4use core::pin::Pin;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
Stjepan Glavinaaf051a52020-01-06 15:25:52 -06007use core::task::{Context, Poll, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +02008
9use crate::header::Header;
10use crate::state::*;
11use crate::utils::abort_on_panic;
12
13/// A handle that awaits the result of a task.
14///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020015/// This type is a future that resolves to an `Option<R>` where:
Stjepan Glavina1479e862019-08-12 20:18:51 +020016///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020017/// * `None` indicates the task has panicked or was cancelled.
18/// * `Some(result)` indicates the task has completed with `result` of type `R`.
Stjepan Glavina1479e862019-08-12 20:18:51 +020019pub struct JoinHandle<R, T> {
20 /// A raw task pointer.
21 pub(crate) raw_task: NonNull<()>,
22
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020023 /// A marker capturing generic types `R` and `T`.
Stjepan Glavina1479e862019-08-12 20:18:51 +020024 pub(crate) _marker: PhantomData<(R, T)>,
25}
26
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010027unsafe impl<R: Send, T> Send for JoinHandle<R, T> {}
Stjepan Glavina1479e862019-08-12 20:18:51 +020028unsafe impl<R, T> Sync for JoinHandle<R, T> {}
29
30impl<R, T> Unpin for JoinHandle<R, T> {}
31
32impl<R, T> JoinHandle<R, T> {
33 /// Cancels the task.
34 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020035 /// If the task has already completed, calling this method will have no effect.
Stjepan Glavina1479e862019-08-12 20:18:51 +020036 ///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020037 /// When a task is cancelled, its future will not be polled again.
Stjepan Glavina1479e862019-08-12 20:18:51 +020038 pub fn cancel(&self) {
39 let ptr = self.raw_task.as_ptr();
40 let header = ptr as *const Header;
41
42 unsafe {
43 let mut state = (*header).state.load(Ordering::Acquire);
44
45 loop {
46 // If the task has been completed or closed, it can't be cancelled.
47 if state & (COMPLETED | CLOSED) != 0 {
48 break;
49 }
50
51 // If the task is not scheduled nor running, we'll need to schedule it.
52 let new = if state & (SCHEDULED | RUNNING) == 0 {
53 (state | SCHEDULED | CLOSED) + REFERENCE
54 } else {
55 state | CLOSED
56 };
57
58 // Mark the task as closed.
59 match (*header).state.compare_exchange_weak(
60 state,
61 new,
62 Ordering::AcqRel,
63 Ordering::Acquire,
64 ) {
65 Ok(_) => {
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020066 // If the task is not scheduled nor running, schedule it one more time so
67 // that its future gets dropped by the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +020068 if state & (SCHEDULED | RUNNING) == 0 {
69 ((*header).vtable.schedule)(ptr);
70 }
71
72 // Notify the awaiter that the task has been closed.
73 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -060074 (*header).notify(None);
Stjepan Glavina1479e862019-08-12 20:18:51 +020075 }
76
77 break;
78 }
79 Err(s) => state = s,
80 }
81 }
82 }
83 }
84
85 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020086 pub fn tag(&self) -> &T {
87 let offset = Header::offset_tag::<T>();
88 let ptr = self.raw_task.as_ptr();
89
90 unsafe {
91 let raw = (ptr as *mut u8).add(offset) as *const T;
92 &*raw
93 }
94 }
Stjepan Glavinaaf051a52020-01-06 15:25:52 -060095
96 /// Returns a waker associated with the task.
97 pub fn waker(&self) -> Waker {
98 let ptr = self.raw_task.as_ptr();
99 let header = ptr as *const Header;
100
101 unsafe {
102 let raw_waker = ((*header).vtable.clone_waker)(ptr);
103 Waker::from_raw(raw_waker)
104 }
105 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200106}
107
108impl<R, T> Drop for JoinHandle<R, T> {
109 fn drop(&mut self) {
110 let ptr = self.raw_task.as_ptr();
111 let header = ptr as *const Header;
112
113 // A place where the output will be stored in case it needs to be dropped.
114 let mut output = None;
115
116 unsafe {
117 // Optimistically assume the `JoinHandle` is being dropped just after creating the
118 // task. This is a common case so if the handle is not used, the overhead of it is only
119 // one compare-exchange operation.
120 if let Err(mut state) = (*header).state.compare_exchange_weak(
121 SCHEDULED | HANDLE | REFERENCE,
122 SCHEDULED | REFERENCE,
123 Ordering::AcqRel,
124 Ordering::Acquire,
125 ) {
126 loop {
127 // If the task has been completed but not yet closed, that means its output
128 // must be dropped.
129 if state & COMPLETED != 0 && state & CLOSED == 0 {
130 // Mark the task as closed in order to grab its output.
131 match (*header).state.compare_exchange_weak(
132 state,
133 state | CLOSED,
134 Ordering::AcqRel,
135 Ordering::Acquire,
136 ) {
137 Ok(_) => {
138 // Read the output.
139 output =
140 Some((((*header).vtable.get_output)(ptr) as *mut R).read());
141
142 // Update the state variable because we're continuing the loop.
143 state |= CLOSED;
144 }
145 Err(s) => state = s,
146 }
147 } else {
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200148 // If this is the last reference to the task and it's not closed, then
149 // close it and schedule one more time so that its future gets dropped by
150 // the executor.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200151 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
152 SCHEDULED | CLOSED | REFERENCE
153 } else {
154 state & !HANDLE
155 };
156
157 // Unset the handle flag.
158 match (*header).state.compare_exchange_weak(
159 state,
160 new,
161 Ordering::AcqRel,
162 Ordering::Acquire,
163 ) {
164 Ok(_) => {
165 // If this is the last reference to the task, we need to either
166 // schedule dropping its future or destroy it.
167 if state & !(REFERENCE - 1) == 0 {
168 if state & CLOSED == 0 {
169 ((*header).vtable.schedule)(ptr);
170 } else {
171 ((*header).vtable.destroy)(ptr);
172 }
173 }
174
175 break;
176 }
177 Err(s) => state = s,
178 }
179 }
180 }
181 }
182 }
183
184 // Drop the output if it was taken out of the task.
185 drop(output);
186 }
187}
188
189impl<R, T> Future for JoinHandle<R, T> {
190 type Output = Option<R>;
191
192 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 let ptr = self.raw_task.as_ptr();
194 let header = ptr as *const Header;
195
196 unsafe {
197 let mut state = (*header).state.load(Ordering::Acquire);
198
199 loop {
200 // If the task has been closed, notify the awaiter and return `None`.
201 if state & CLOSED != 0 {
202 // Even though the awaiter is most likely the current task, it could also be
203 // another task.
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600204 (*header).notify(Some(cx.waker()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200205 return Poll::Ready(None);
206 }
207
208 // If the task is not completed, register the current task.
209 if state & COMPLETED == 0 {
210 // Replace the waker with one associated with the current task. We need a
211 // safeguard against panics because dropping the previous waker can panic.
212 abort_on_panic(|| {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600213 (*header).register(cx.waker());
Stjepan Glavina1479e862019-08-12 20:18:51 +0200214 });
215
216 // Reload the state after registering. It is possible that the task became
217 // completed or closed just before registration so we need to check for that.
218 state = (*header).state.load(Ordering::Acquire);
219
laizy2b0427a2019-11-20 21:55:50 +0800220 // If the task has been closed, return `None`. We do not need to notify the
221 // awaiter here, since we have replaced the waker above, and the executor can
222 // only set it back to `None`.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200223 if state & CLOSED != 0 {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200224 return Poll::Ready(None);
225 }
226
227 // If the task is still not completed, we're blocked on it.
228 if state & COMPLETED == 0 {
229 return Poll::Pending;
230 }
231 }
232
233 // Since the task is now completed, mark it as closed in order to grab its output.
234 match (*header).state.compare_exchange(
235 state,
236 state | CLOSED,
237 Ordering::AcqRel,
238 Ordering::Acquire,
239 ) {
240 Ok(_) => {
241 // Notify the awaiter. Even though the awaiter is most likely the current
242 // task, it could also be another task.
243 if state & AWAITER != 0 {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600244 (*header).notify(Some(cx.waker()));
Stjepan Glavina1479e862019-08-12 20:18:51 +0200245 }
246
247 // Take the output from the task.
248 let output = ((*header).vtable.get_output)(ptr) as *mut R;
249 return Poll::Ready(Some(output.read()));
250 }
251 Err(s) => state = s,
252 }
253 }
254 }
255 }
256}
257
258impl<R, T> fmt::Debug for JoinHandle<R, T> {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 let ptr = self.raw_task.as_ptr();
261 let header = ptr as *const Header;
262
263 f.debug_struct("JoinHandle")
264 .field("header", unsafe { &(*header) })
265 .finish()
266 }
267}