Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame^] | 1 | use std::fmt; |
| 2 | use std::future::Future; |
| 3 | use std::marker::{PhantomData, Unpin}; |
| 4 | use std::pin::Pin; |
| 5 | use std::ptr::NonNull; |
| 6 | use std::sync::atomic::Ordering; |
| 7 | use std::task::{Context, Poll}; |
| 8 | |
| 9 | use crate::header::Header; |
| 10 | use crate::state::*; |
| 11 | use crate::utils::abort_on_panic; |
| 12 | |
| 13 | /// A handle that awaits the result of a task. |
| 14 | /// |
| 15 | /// If the task has completed with `value`, the handle returns it as `Some(value)`. If the task was |
| 16 | /// cancelled or has panicked, the handle returns `None`. Otherwise, the handle has to wait until |
| 17 | /// the task completes, panics, or gets cancelled. |
| 18 | /// |
| 19 | /// # Examples |
| 20 | /// |
| 21 | /// ``` |
| 22 | /// #![feature(async_await)] |
| 23 | /// |
| 24 | /// use crossbeam::channel; |
| 25 | /// use futures::executor; |
| 26 | /// |
| 27 | /// // The future inside the task. |
| 28 | /// let future = async { 1 + 2 }; |
| 29 | /// |
| 30 | /// // If the task gets woken, it will be sent into this channel. |
| 31 | /// let (s, r) = channel::unbounded(); |
| 32 | /// let schedule = move |task| s.send(task).unwrap(); |
| 33 | /// |
| 34 | /// // Create a task with the future and the schedule function. |
| 35 | /// let (task, handle) = async_task::spawn(future, schedule, ()); |
| 36 | /// |
| 37 | /// // Run the task. In this example, it will complete after a single run. |
| 38 | /// task.run(); |
| 39 | /// assert!(r.is_empty()); |
| 40 | /// |
| 41 | /// // Await the result of the task. |
| 42 | /// let result = executor::block_on(handle); |
| 43 | /// assert_eq!(result, Some(3)); |
| 44 | /// ``` |
| 45 | pub struct JoinHandle<R, T> { |
| 46 | /// A raw task pointer. |
| 47 | pub(crate) raw_task: NonNull<()>, |
| 48 | |
| 49 | /// A marker capturing the generic type `R`. |
| 50 | pub(crate) _marker: PhantomData<(R, T)>, |
| 51 | } |
| 52 | |
| 53 | unsafe impl<R, T> Send for JoinHandle<R, T> {} |
| 54 | unsafe impl<R, T> Sync for JoinHandle<R, T> {} |
| 55 | |
| 56 | impl<R, T> Unpin for JoinHandle<R, T> {} |
| 57 | |
| 58 | impl<R, T> JoinHandle<R, T> { |
| 59 | /// Cancels the task. |
| 60 | /// |
| 61 | /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt |
| 62 | /// to run it won't do anything. And if it's completed, awaiting its result evaluates to |
| 63 | /// `None`. |
| 64 | /// |
| 65 | /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html |
| 66 | /// |
| 67 | /// # Examples |
| 68 | /// |
| 69 | /// ``` |
| 70 | /// # #![feature(async_await)] |
| 71 | /// use crossbeam::channel; |
| 72 | /// use futures::executor; |
| 73 | /// |
| 74 | /// // The future inside the task. |
| 75 | /// let future = async { 1 + 2 }; |
| 76 | /// |
| 77 | /// // If the task gets woken, it will be sent into this channel. |
| 78 | /// let (s, r) = channel::unbounded(); |
| 79 | /// let schedule = move |task| s.send(task).unwrap(); |
| 80 | /// |
| 81 | /// // Create a task with the future and the schedule function. |
| 82 | /// let (task, handle) = async_task::spawn(future, schedule, ()); |
| 83 | /// |
| 84 | /// // Cancel the task. |
| 85 | /// handle.cancel(); |
| 86 | /// |
| 87 | /// // Running a cancelled task does nothing. |
| 88 | /// task.run(); |
| 89 | /// |
| 90 | /// // Await the result of the task. |
| 91 | /// let result = executor::block_on(handle); |
| 92 | /// assert_eq!(result, None); |
| 93 | /// ``` |
| 94 | pub fn cancel(&self) { |
| 95 | let ptr = self.raw_task.as_ptr(); |
| 96 | let header = ptr as *const Header; |
| 97 | |
| 98 | unsafe { |
| 99 | let mut state = (*header).state.load(Ordering::Acquire); |
| 100 | |
| 101 | loop { |
| 102 | // If the task has been completed or closed, it can't be cancelled. |
| 103 | if state & (COMPLETED | CLOSED) != 0 { |
| 104 | break; |
| 105 | } |
| 106 | |
| 107 | // If the task is not scheduled nor running, we'll need to schedule it. |
| 108 | let new = if state & (SCHEDULED | RUNNING) == 0 { |
| 109 | (state | SCHEDULED | CLOSED) + REFERENCE |
| 110 | } else { |
| 111 | state | CLOSED |
| 112 | }; |
| 113 | |
| 114 | // Mark the task as closed. |
| 115 | match (*header).state.compare_exchange_weak( |
| 116 | state, |
| 117 | new, |
| 118 | Ordering::AcqRel, |
| 119 | Ordering::Acquire, |
| 120 | ) { |
| 121 | Ok(_) => { |
| 122 | // If the task is not scheduled nor running, schedule it so that its future |
| 123 | // gets dropped by the executor. |
| 124 | if state & (SCHEDULED | RUNNING) == 0 { |
| 125 | ((*header).vtable.schedule)(ptr); |
| 126 | } |
| 127 | |
| 128 | // Notify the awaiter that the task has been closed. |
| 129 | if state & AWAITER != 0 { |
| 130 | (*header).notify(); |
| 131 | } |
| 132 | |
| 133 | break; |
| 134 | } |
| 135 | Err(s) => state = s, |
| 136 | } |
| 137 | } |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | /// Returns a reference to the tag stored inside the task. |
| 142 | /// |
| 143 | /// # Examples |
| 144 | /// |
| 145 | /// ``` |
| 146 | /// # #![feature(async_await)] |
| 147 | /// use crossbeam::channel; |
| 148 | /// |
| 149 | /// // The future inside the task. |
| 150 | /// let future = async { 1 + 2 }; |
| 151 | /// |
| 152 | /// // If the task gets woken, it will be sent into this channel. |
| 153 | /// let (s, r) = channel::unbounded(); |
| 154 | /// let schedule = move |task| s.send(task).unwrap(); |
| 155 | /// |
| 156 | /// // Create a task with the future and the schedule function. |
| 157 | /// let (task, handle) = async_task::spawn(future, schedule, "a simple task"); |
| 158 | /// |
| 159 | /// // Access the tag. |
| 160 | /// assert_eq!(*handle.tag(), "a simple task"); |
| 161 | /// ``` |
| 162 | pub fn tag(&self) -> &T { |
| 163 | let offset = Header::offset_tag::<T>(); |
| 164 | let ptr = self.raw_task.as_ptr(); |
| 165 | |
| 166 | unsafe { |
| 167 | let raw = (ptr as *mut u8).add(offset) as *const T; |
| 168 | &*raw |
| 169 | } |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | impl<R, T> Drop for JoinHandle<R, T> { |
| 174 | fn drop(&mut self) { |
| 175 | let ptr = self.raw_task.as_ptr(); |
| 176 | let header = ptr as *const Header; |
| 177 | |
| 178 | // A place where the output will be stored in case it needs to be dropped. |
| 179 | let mut output = None; |
| 180 | |
| 181 | unsafe { |
| 182 | // Optimistically assume the `JoinHandle` is being dropped just after creating the |
| 183 | // task. This is a common case so if the handle is not used, the overhead of it is only |
| 184 | // one compare-exchange operation. |
| 185 | if let Err(mut state) = (*header).state.compare_exchange_weak( |
| 186 | SCHEDULED | HANDLE | REFERENCE, |
| 187 | SCHEDULED | REFERENCE, |
| 188 | Ordering::AcqRel, |
| 189 | Ordering::Acquire, |
| 190 | ) { |
| 191 | loop { |
| 192 | // If the task has been completed but not yet closed, that means its output |
| 193 | // must be dropped. |
| 194 | if state & COMPLETED != 0 && state & CLOSED == 0 { |
| 195 | // Mark the task as closed in order to grab its output. |
| 196 | match (*header).state.compare_exchange_weak( |
| 197 | state, |
| 198 | state | CLOSED, |
| 199 | Ordering::AcqRel, |
| 200 | Ordering::Acquire, |
| 201 | ) { |
| 202 | Ok(_) => { |
| 203 | // Read the output. |
| 204 | output = |
| 205 | Some((((*header).vtable.get_output)(ptr) as *mut R).read()); |
| 206 | |
| 207 | // Update the state variable because we're continuing the loop. |
| 208 | state |= CLOSED; |
| 209 | } |
| 210 | Err(s) => state = s, |
| 211 | } |
| 212 | } else { |
| 213 | // If this is the last reference to task and it's not closed, then close |
| 214 | // it and schedule one more time so that its future gets dropped by the |
| 215 | // executor. |
| 216 | let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { |
| 217 | SCHEDULED | CLOSED | REFERENCE |
| 218 | } else { |
| 219 | state & !HANDLE |
| 220 | }; |
| 221 | |
| 222 | // Unset the handle flag. |
| 223 | match (*header).state.compare_exchange_weak( |
| 224 | state, |
| 225 | new, |
| 226 | Ordering::AcqRel, |
| 227 | Ordering::Acquire, |
| 228 | ) { |
| 229 | Ok(_) => { |
| 230 | // If this is the last reference to the task, we need to either |
| 231 | // schedule dropping its future or destroy it. |
| 232 | if state & !(REFERENCE - 1) == 0 { |
| 233 | if state & CLOSED == 0 { |
| 234 | ((*header).vtable.schedule)(ptr); |
| 235 | } else { |
| 236 | ((*header).vtable.destroy)(ptr); |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | break; |
| 241 | } |
| 242 | Err(s) => state = s, |
| 243 | } |
| 244 | } |
| 245 | } |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | // Drop the output if it was taken out of the task. |
| 250 | drop(output); |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | impl<R, T> Future for JoinHandle<R, T> { |
| 255 | type Output = Option<R>; |
| 256 | |
| 257 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 258 | let ptr = self.raw_task.as_ptr(); |
| 259 | let header = ptr as *const Header; |
| 260 | |
| 261 | unsafe { |
| 262 | let mut state = (*header).state.load(Ordering::Acquire); |
| 263 | |
| 264 | loop { |
| 265 | // If the task has been closed, notify the awaiter and return `None`. |
| 266 | if state & CLOSED != 0 { |
| 267 | // Even though the awaiter is most likely the current task, it could also be |
| 268 | // another task. |
| 269 | (*header).notify_unless(cx.waker()); |
| 270 | return Poll::Ready(None); |
| 271 | } |
| 272 | |
| 273 | // If the task is not completed, register the current task. |
| 274 | if state & COMPLETED == 0 { |
| 275 | // Replace the waker with one associated with the current task. We need a |
| 276 | // safeguard against panics because dropping the previous waker can panic. |
| 277 | abort_on_panic(|| { |
| 278 | (*header).swap_awaiter(Some(cx.waker().clone())); |
| 279 | }); |
| 280 | |
| 281 | // Reload the state after registering. It is possible that the task became |
| 282 | // completed or closed just before registration so we need to check for that. |
| 283 | state = (*header).state.load(Ordering::Acquire); |
| 284 | |
| 285 | // If the task has been closed, notify the awaiter and return `None`. |
| 286 | if state & CLOSED != 0 { |
| 287 | // Even though the awaiter is most likely the current task, it could also |
| 288 | // be another task. |
| 289 | (*header).notify_unless(cx.waker()); |
| 290 | return Poll::Ready(None); |
| 291 | } |
| 292 | |
| 293 | // If the task is still not completed, we're blocked on it. |
| 294 | if state & COMPLETED == 0 { |
| 295 | return Poll::Pending; |
| 296 | } |
| 297 | } |
| 298 | |
| 299 | // Since the task is now completed, mark it as closed in order to grab its output. |
| 300 | match (*header).state.compare_exchange( |
| 301 | state, |
| 302 | state | CLOSED, |
| 303 | Ordering::AcqRel, |
| 304 | Ordering::Acquire, |
| 305 | ) { |
| 306 | Ok(_) => { |
| 307 | // Notify the awaiter. Even though the awaiter is most likely the current |
| 308 | // task, it could also be another task. |
| 309 | if state & AWAITER != 0 { |
| 310 | (*header).notify_unless(cx.waker()); |
| 311 | } |
| 312 | |
| 313 | // Take the output from the task. |
| 314 | let output = ((*header).vtable.get_output)(ptr) as *mut R; |
| 315 | return Poll::Ready(Some(output.read())); |
| 316 | } |
| 317 | Err(s) => state = s, |
| 318 | } |
| 319 | } |
| 320 | } |
| 321 | } |
| 322 | } |
| 323 | |
| 324 | impl<R, T> fmt::Debug for JoinHandle<R, T> { |
| 325 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 326 | let ptr = self.raw_task.as_ptr(); |
| 327 | let header = ptr as *const Header; |
| 328 | |
| 329 | f.debug_struct("JoinHandle") |
| 330 | .field("header", unsafe { &(*header) }) |
| 331 | .finish() |
| 332 | } |
| 333 | } |