Stjepan Glavina | 921e8a0 | 2020-01-06 14:31:28 -0600 | [diff] [blame] | 1 | use core::fmt; |
| 2 | use core::future::Future; |
| 3 | use core::marker::{PhantomData, Unpin}; |
| 4 | use core::pin::Pin; |
| 5 | use core::ptr::NonNull; |
| 6 | use core::sync::atomic::Ordering; |
Stjepan Glavina | af051a5 | 2020-01-06 15:25:52 -0600 | [diff] [blame] | 7 | use core::task::{Context, Poll, Waker}; |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 8 | |
| 9 | use crate::header::Header; |
| 10 | use crate::state::*; |
| 11 | use crate::utils::abort_on_panic; |
| 12 | |
| 13 | /// A handle that awaits the result of a task. |
| 14 | /// |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 15 | /// This type is a future that resolves to an `Option<R>` where: |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 16 | /// |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 17 | /// * `None` indicates the task has panicked or was cancelled. |
| 18 | /// * `Some(result)` indicates the task has completed with `result` of type `R`. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 19 | pub struct JoinHandle<R, T> { |
| 20 | /// A raw task pointer. |
| 21 | pub(crate) raw_task: NonNull<()>, |
| 22 | |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 23 | /// A marker capturing generic types `R` and `T`. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 24 | pub(crate) _marker: PhantomData<(R, T)>, |
| 25 | } |
| 26 | |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 27 | unsafe impl<R: Send, T> Send for JoinHandle<R, T> {} |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 28 | unsafe impl<R, T> Sync for JoinHandle<R, T> {} |
| 29 | |
| 30 | impl<R, T> Unpin for JoinHandle<R, T> {} |
| 31 | |
| 32 | impl<R, T> JoinHandle<R, T> { |
| 33 | /// Cancels the task. |
| 34 | /// |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 35 | /// If the task has already completed, calling this method will have no effect. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 36 | /// |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 37 | /// When a task is cancelled, its future will not be polled again. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 38 | pub fn cancel(&self) { |
| 39 | let ptr = self.raw_task.as_ptr(); |
| 40 | let header = ptr as *const Header; |
| 41 | |
| 42 | unsafe { |
| 43 | let mut state = (*header).state.load(Ordering::Acquire); |
| 44 | |
| 45 | loop { |
| 46 | // If the task has been completed or closed, it can't be cancelled. |
| 47 | if state & (COMPLETED | CLOSED) != 0 { |
| 48 | break; |
| 49 | } |
| 50 | |
| 51 | // If the task is not scheduled nor running, we'll need to schedule it. |
| 52 | let new = if state & (SCHEDULED | RUNNING) == 0 { |
| 53 | (state | SCHEDULED | CLOSED) + REFERENCE |
| 54 | } else { |
| 55 | state | CLOSED |
| 56 | }; |
| 57 | |
| 58 | // Mark the task as closed. |
| 59 | match (*header).state.compare_exchange_weak( |
| 60 | state, |
| 61 | new, |
| 62 | Ordering::AcqRel, |
| 63 | Ordering::Acquire, |
| 64 | ) { |
| 65 | Ok(_) => { |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 66 | // If the task is not scheduled nor running, schedule it one more time so |
| 67 | // that its future gets dropped by the executor. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 68 | if state & (SCHEDULED | RUNNING) == 0 { |
| 69 | ((*header).vtable.schedule)(ptr); |
| 70 | } |
| 71 | |
| 72 | // Notify the awaiter that the task has been closed. |
| 73 | if state & AWAITER != 0 { |
Stjepan Glavina | 921e8a0 | 2020-01-06 14:31:28 -0600 | [diff] [blame] | 74 | (*header).notify(None); |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 75 | } |
| 76 | |
| 77 | break; |
| 78 | } |
| 79 | Err(s) => state = s, |
| 80 | } |
| 81 | } |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | /// Returns a reference to the tag stored inside the task. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 86 | pub fn tag(&self) -> &T { |
| 87 | let offset = Header::offset_tag::<T>(); |
| 88 | let ptr = self.raw_task.as_ptr(); |
| 89 | |
| 90 | unsafe { |
| 91 | let raw = (ptr as *mut u8).add(offset) as *const T; |
| 92 | &*raw |
| 93 | } |
| 94 | } |
Stjepan Glavina | af051a5 | 2020-01-06 15:25:52 -0600 | [diff] [blame] | 95 | |
| 96 | /// Returns a waker associated with the task. |
| 97 | pub fn waker(&self) -> Waker { |
| 98 | let ptr = self.raw_task.as_ptr(); |
| 99 | let header = ptr as *const Header; |
| 100 | |
| 101 | unsafe { |
| 102 | let raw_waker = ((*header).vtable.clone_waker)(ptr); |
| 103 | Waker::from_raw(raw_waker) |
| 104 | } |
| 105 | } |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 106 | } |
| 107 | |
| 108 | impl<R, T> Drop for JoinHandle<R, T> { |
| 109 | fn drop(&mut self) { |
| 110 | let ptr = self.raw_task.as_ptr(); |
| 111 | let header = ptr as *const Header; |
| 112 | |
| 113 | // A place where the output will be stored in case it needs to be dropped. |
| 114 | let mut output = None; |
| 115 | |
| 116 | unsafe { |
| 117 | // Optimistically assume the `JoinHandle` is being dropped just after creating the |
| 118 | // task. This is a common case so if the handle is not used, the overhead of it is only |
| 119 | // one compare-exchange operation. |
| 120 | if let Err(mut state) = (*header).state.compare_exchange_weak( |
| 121 | SCHEDULED | HANDLE | REFERENCE, |
| 122 | SCHEDULED | REFERENCE, |
| 123 | Ordering::AcqRel, |
| 124 | Ordering::Acquire, |
| 125 | ) { |
| 126 | loop { |
| 127 | // If the task has been completed but not yet closed, that means its output |
| 128 | // must be dropped. |
| 129 | if state & COMPLETED != 0 && state & CLOSED == 0 { |
| 130 | // Mark the task as closed in order to grab its output. |
| 131 | match (*header).state.compare_exchange_weak( |
| 132 | state, |
| 133 | state | CLOSED, |
| 134 | Ordering::AcqRel, |
| 135 | Ordering::Acquire, |
| 136 | ) { |
| 137 | Ok(_) => { |
| 138 | // Read the output. |
| 139 | output = |
| 140 | Some((((*header).vtable.get_output)(ptr) as *mut R).read()); |
| 141 | |
| 142 | // Update the state variable because we're continuing the loop. |
| 143 | state |= CLOSED; |
| 144 | } |
| 145 | Err(s) => state = s, |
| 146 | } |
| 147 | } else { |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 148 | // If this is the last reference to the task and it's not closed, then |
| 149 | // close it and schedule one more time so that its future gets dropped by |
| 150 | // the executor. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 151 | let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { |
| 152 | SCHEDULED | CLOSED | REFERENCE |
| 153 | } else { |
| 154 | state & !HANDLE |
| 155 | }; |
| 156 | |
| 157 | // Unset the handle flag. |
| 158 | match (*header).state.compare_exchange_weak( |
| 159 | state, |
| 160 | new, |
| 161 | Ordering::AcqRel, |
| 162 | Ordering::Acquire, |
| 163 | ) { |
| 164 | Ok(_) => { |
| 165 | // If this is the last reference to the task, we need to either |
| 166 | // schedule dropping its future or destroy it. |
| 167 | if state & !(REFERENCE - 1) == 0 { |
| 168 | if state & CLOSED == 0 { |
| 169 | ((*header).vtable.schedule)(ptr); |
| 170 | } else { |
| 171 | ((*header).vtable.destroy)(ptr); |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | break; |
| 176 | } |
| 177 | Err(s) => state = s, |
| 178 | } |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | } |
| 183 | |
| 184 | // Drop the output if it was taken out of the task. |
| 185 | drop(output); |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | impl<R, T> Future for JoinHandle<R, T> { |
| 190 | type Output = Option<R>; |
| 191 | |
| 192 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 193 | let ptr = self.raw_task.as_ptr(); |
| 194 | let header = ptr as *const Header; |
| 195 | |
| 196 | unsafe { |
| 197 | let mut state = (*header).state.load(Ordering::Acquire); |
| 198 | |
| 199 | loop { |
| 200 | // If the task has been closed, notify the awaiter and return `None`. |
| 201 | if state & CLOSED != 0 { |
| 202 | // Even though the awaiter is most likely the current task, it could also be |
| 203 | // another task. |
Stjepan Glavina | 921e8a0 | 2020-01-06 14:31:28 -0600 | [diff] [blame] | 204 | (*header).notify(Some(cx.waker())); |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 205 | return Poll::Ready(None); |
| 206 | } |
| 207 | |
| 208 | // If the task is not completed, register the current task. |
| 209 | if state & COMPLETED == 0 { |
| 210 | // Replace the waker with one associated with the current task. We need a |
| 211 | // safeguard against panics because dropping the previous waker can panic. |
| 212 | abort_on_panic(|| { |
Stjepan Glavina | 921e8a0 | 2020-01-06 14:31:28 -0600 | [diff] [blame] | 213 | (*header).register(cx.waker()); |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 214 | }); |
| 215 | |
| 216 | // Reload the state after registering. It is possible that the task became |
| 217 | // completed or closed just before registration so we need to check for that. |
| 218 | state = (*header).state.load(Ordering::Acquire); |
| 219 | |
laizy | 2b0427a | 2019-11-20 21:55:50 +0800 | [diff] [blame] | 220 | // If the task has been closed, return `None`. We do not need to notify the |
| 221 | // awaiter here, since we have replaced the waker above, and the executor can |
| 222 | // only set it back to `None`. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 223 | if state & CLOSED != 0 { |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 224 | return Poll::Ready(None); |
| 225 | } |
| 226 | |
| 227 | // If the task is still not completed, we're blocked on it. |
| 228 | if state & COMPLETED == 0 { |
| 229 | return Poll::Pending; |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | // Since the task is now completed, mark it as closed in order to grab its output. |
| 234 | match (*header).state.compare_exchange( |
| 235 | state, |
| 236 | state | CLOSED, |
| 237 | Ordering::AcqRel, |
| 238 | Ordering::Acquire, |
| 239 | ) { |
| 240 | Ok(_) => { |
| 241 | // Notify the awaiter. Even though the awaiter is most likely the current |
| 242 | // task, it could also be another task. |
| 243 | if state & AWAITER != 0 { |
Stjepan Glavina | 921e8a0 | 2020-01-06 14:31:28 -0600 | [diff] [blame] | 244 | (*header).notify(Some(cx.waker())); |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 245 | } |
| 246 | |
| 247 | // Take the output from the task. |
| 248 | let output = ((*header).vtable.get_output)(ptr) as *mut R; |
| 249 | return Poll::Ready(Some(output.read())); |
| 250 | } |
| 251 | Err(s) => state = s, |
| 252 | } |
| 253 | } |
| 254 | } |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | impl<R, T> fmt::Debug for JoinHandle<R, T> { |
| 259 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 260 | let ptr = self.raw_task.as_ptr(); |
| 261 | let header = ptr as *const Header; |
| 262 | |
| 263 | f.debug_struct("JoinHandle") |
| 264 | .field("header", unsafe { &(*header) }) |
| 265 | .finish() |
| 266 | } |
| 267 | } |