blob: fb5c275e6ccaef4ef2c494b8407321179bbb0824 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001use std::fmt;
2use std::future::Future;
3use std::marker::{PhantomData, Unpin};
4use std::pin::Pin;
5use std::ptr::NonNull;
6use std::sync::atomic::Ordering;
7use std::task::{Context, Poll};
8
9use crate::header::Header;
10use crate::state::*;
11use crate::utils::abort_on_panic;
12
13/// A handle that awaits the result of a task.
14///
15/// If the task has completed with `value`, the handle returns it as `Some(value)`. If the task was
16/// cancelled or has panicked, the handle returns `None`. Otherwise, the handle has to wait until
17/// the task completes, panics, or gets cancelled.
18///
19/// # Examples
20///
21/// ```
22/// #![feature(async_await)]
23///
24/// use crossbeam::channel;
25/// use futures::executor;
26///
27/// // The future inside the task.
28/// let future = async { 1 + 2 };
29///
30/// // If the task gets woken, it will be sent into this channel.
31/// let (s, r) = channel::unbounded();
32/// let schedule = move |task| s.send(task).unwrap();
33///
34/// // Create a task with the future and the schedule function.
35/// let (task, handle) = async_task::spawn(future, schedule, ());
36///
37/// // Run the task. In this example, it will complete after a single run.
38/// task.run();
39/// assert!(r.is_empty());
40///
41/// // Await the result of the task.
42/// let result = executor::block_on(handle);
43/// assert_eq!(result, Some(3));
44/// ```
45pub struct JoinHandle<R, T> {
46 /// A raw task pointer.
47 pub(crate) raw_task: NonNull<()>,
48
49 /// A marker capturing the generic type `R`.
50 pub(crate) _marker: PhantomData<(R, T)>,
51}
52
53unsafe impl<R, T> Send for JoinHandle<R, T> {}
54unsafe impl<R, T> Sync for JoinHandle<R, T> {}
55
56impl<R, T> Unpin for JoinHandle<R, T> {}
57
58impl<R, T> JoinHandle<R, T> {
59 /// Cancels the task.
60 ///
61 /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
62 /// to run it won't do anything. And if it's completed, awaiting its result evaluates to
63 /// `None`.
64 ///
65 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
66 ///
67 /// # Examples
68 ///
69 /// ```
70 /// # #![feature(async_await)]
71 /// use crossbeam::channel;
72 /// use futures::executor;
73 ///
74 /// // The future inside the task.
75 /// let future = async { 1 + 2 };
76 ///
77 /// // If the task gets woken, it will be sent into this channel.
78 /// let (s, r) = channel::unbounded();
79 /// let schedule = move |task| s.send(task).unwrap();
80 ///
81 /// // Create a task with the future and the schedule function.
82 /// let (task, handle) = async_task::spawn(future, schedule, ());
83 ///
84 /// // Cancel the task.
85 /// handle.cancel();
86 ///
87 /// // Running a cancelled task does nothing.
88 /// task.run();
89 ///
90 /// // Await the result of the task.
91 /// let result = executor::block_on(handle);
92 /// assert_eq!(result, None);
93 /// ```
94 pub fn cancel(&self) {
95 let ptr = self.raw_task.as_ptr();
96 let header = ptr as *const Header;
97
98 unsafe {
99 let mut state = (*header).state.load(Ordering::Acquire);
100
101 loop {
102 // If the task has been completed or closed, it can't be cancelled.
103 if state & (COMPLETED | CLOSED) != 0 {
104 break;
105 }
106
107 // If the task is not scheduled nor running, we'll need to schedule it.
108 let new = if state & (SCHEDULED | RUNNING) == 0 {
109 (state | SCHEDULED | CLOSED) + REFERENCE
110 } else {
111 state | CLOSED
112 };
113
114 // Mark the task as closed.
115 match (*header).state.compare_exchange_weak(
116 state,
117 new,
118 Ordering::AcqRel,
119 Ordering::Acquire,
120 ) {
121 Ok(_) => {
122 // If the task is not scheduled nor running, schedule it so that its future
123 // gets dropped by the executor.
124 if state & (SCHEDULED | RUNNING) == 0 {
125 ((*header).vtable.schedule)(ptr);
126 }
127
128 // Notify the awaiter that the task has been closed.
129 if state & AWAITER != 0 {
130 (*header).notify();
131 }
132
133 break;
134 }
135 Err(s) => state = s,
136 }
137 }
138 }
139 }
140
141 /// Returns a reference to the tag stored inside the task.
142 ///
143 /// # Examples
144 ///
145 /// ```
146 /// # #![feature(async_await)]
147 /// use crossbeam::channel;
148 ///
149 /// // The future inside the task.
150 /// let future = async { 1 + 2 };
151 ///
152 /// // If the task gets woken, it will be sent into this channel.
153 /// let (s, r) = channel::unbounded();
154 /// let schedule = move |task| s.send(task).unwrap();
155 ///
156 /// // Create a task with the future and the schedule function.
157 /// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
158 ///
159 /// // Access the tag.
160 /// assert_eq!(*handle.tag(), "a simple task");
161 /// ```
162 pub fn tag(&self) -> &T {
163 let offset = Header::offset_tag::<T>();
164 let ptr = self.raw_task.as_ptr();
165
166 unsafe {
167 let raw = (ptr as *mut u8).add(offset) as *const T;
168 &*raw
169 }
170 }
171}
172
173impl<R, T> Drop for JoinHandle<R, T> {
174 fn drop(&mut self) {
175 let ptr = self.raw_task.as_ptr();
176 let header = ptr as *const Header;
177
178 // A place where the output will be stored in case it needs to be dropped.
179 let mut output = None;
180
181 unsafe {
182 // Optimistically assume the `JoinHandle` is being dropped just after creating the
183 // task. This is a common case so if the handle is not used, the overhead of it is only
184 // one compare-exchange operation.
185 if let Err(mut state) = (*header).state.compare_exchange_weak(
186 SCHEDULED | HANDLE | REFERENCE,
187 SCHEDULED | REFERENCE,
188 Ordering::AcqRel,
189 Ordering::Acquire,
190 ) {
191 loop {
192 // If the task has been completed but not yet closed, that means its output
193 // must be dropped.
194 if state & COMPLETED != 0 && state & CLOSED == 0 {
195 // Mark the task as closed in order to grab its output.
196 match (*header).state.compare_exchange_weak(
197 state,
198 state | CLOSED,
199 Ordering::AcqRel,
200 Ordering::Acquire,
201 ) {
202 Ok(_) => {
203 // Read the output.
204 output =
205 Some((((*header).vtable.get_output)(ptr) as *mut R).read());
206
207 // Update the state variable because we're continuing the loop.
208 state |= CLOSED;
209 }
210 Err(s) => state = s,
211 }
212 } else {
213 // If this is the last reference to task and it's not closed, then close
214 // it and schedule one more time so that its future gets dropped by the
215 // executor.
216 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
217 SCHEDULED | CLOSED | REFERENCE
218 } else {
219 state & !HANDLE
220 };
221
222 // Unset the handle flag.
223 match (*header).state.compare_exchange_weak(
224 state,
225 new,
226 Ordering::AcqRel,
227 Ordering::Acquire,
228 ) {
229 Ok(_) => {
230 // If this is the last reference to the task, we need to either
231 // schedule dropping its future or destroy it.
232 if state & !(REFERENCE - 1) == 0 {
233 if state & CLOSED == 0 {
234 ((*header).vtable.schedule)(ptr);
235 } else {
236 ((*header).vtable.destroy)(ptr);
237 }
238 }
239
240 break;
241 }
242 Err(s) => state = s,
243 }
244 }
245 }
246 }
247 }
248
249 // Drop the output if it was taken out of the task.
250 drop(output);
251 }
252}
253
254impl<R, T> Future for JoinHandle<R, T> {
255 type Output = Option<R>;
256
257 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
258 let ptr = self.raw_task.as_ptr();
259 let header = ptr as *const Header;
260
261 unsafe {
262 let mut state = (*header).state.load(Ordering::Acquire);
263
264 loop {
265 // If the task has been closed, notify the awaiter and return `None`.
266 if state & CLOSED != 0 {
267 // Even though the awaiter is most likely the current task, it could also be
268 // another task.
269 (*header).notify_unless(cx.waker());
270 return Poll::Ready(None);
271 }
272
273 // If the task is not completed, register the current task.
274 if state & COMPLETED == 0 {
275 // Replace the waker with one associated with the current task. We need a
276 // safeguard against panics because dropping the previous waker can panic.
277 abort_on_panic(|| {
278 (*header).swap_awaiter(Some(cx.waker().clone()));
279 });
280
281 // Reload the state after registering. It is possible that the task became
282 // completed or closed just before registration so we need to check for that.
283 state = (*header).state.load(Ordering::Acquire);
284
285 // If the task has been closed, notify the awaiter and return `None`.
286 if state & CLOSED != 0 {
287 // Even though the awaiter is most likely the current task, it could also
288 // be another task.
289 (*header).notify_unless(cx.waker());
290 return Poll::Ready(None);
291 }
292
293 // If the task is still not completed, we're blocked on it.
294 if state & COMPLETED == 0 {
295 return Poll::Pending;
296 }
297 }
298
299 // Since the task is now completed, mark it as closed in order to grab its output.
300 match (*header).state.compare_exchange(
301 state,
302 state | CLOSED,
303 Ordering::AcqRel,
304 Ordering::Acquire,
305 ) {
306 Ok(_) => {
307 // Notify the awaiter. Even though the awaiter is most likely the current
308 // task, it could also be another task.
309 if state & AWAITER != 0 {
310 (*header).notify_unless(cx.waker());
311 }
312
313 // Take the output from the task.
314 let output = ((*header).vtable.get_output)(ptr) as *mut R;
315 return Poll::Ready(Some(output.read()));
316 }
317 Err(s) => state = s,
318 }
319 }
320 }
321 }
322}
323
324impl<R, T> fmt::Debug for JoinHandle<R, T> {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 let ptr = self.raw_task.as_ptr();
327 let header = ptr as *const Header;
328
329 f.debug_struct("JoinHandle")
330 .field("header", unsafe { &(*header) })
331 .finish()
332 }
333}