Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 1 | use std::fmt; |
| 2 | use std::future::Future; |
| 3 | use std::marker::PhantomData; |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 4 | use std::mem::{self, ManuallyDrop}; |
| 5 | use std::pin::Pin; |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 6 | use std::ptr::NonNull; |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 7 | use std::task::{Context, Poll}; |
| 8 | use std::thread::{self, ThreadId}; |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 9 | |
| 10 | use crate::header::Header; |
| 11 | use crate::raw::RawTask; |
| 12 | use crate::JoinHandle; |
| 13 | |
| 14 | /// Creates a new task. |
| 15 | /// |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 16 | /// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 17 | /// awaits its result. |
| 18 | /// |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 19 | /// When run, the task polls `future`. When woken up, it gets scheduled for running by the |
| 20 | /// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 21 | /// |
Stjepan Glavina | 951d971 | 2019-11-25 18:47:16 +0100 | [diff] [blame^] | 22 | /// The schedule function should not attempt to run the task nor to drop it. Instead, it should |
| 23 | /// push the task into some kind of queue so that it can be processed later. |
| 24 | /// |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 25 | /// If you need to spawn a future that does not implement [`Send`], consider using the |
| 26 | /// [`spawn_local`] function instead. |
| 27 | /// |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 28 | /// [`Task`]: struct.Task.html |
| 29 | /// [`JoinHandle`]: struct.JoinHandle.html |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 30 | /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html |
| 31 | /// [`spawn_local`]: fn.spawn_local.html |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 32 | /// |
| 33 | /// # Examples |
| 34 | /// |
| 35 | /// ``` |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 36 | /// use crossbeam::channel; |
| 37 | /// |
| 38 | /// // The future inside the task. |
| 39 | /// let future = async { |
| 40 | /// println!("Hello, world!"); |
| 41 | /// }; |
| 42 | /// |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 43 | /// // If the task gets woken up, it will be sent into this channel. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 44 | /// let (s, r) = channel::unbounded(); |
| 45 | /// let schedule = move |task| s.send(task).unwrap(); |
| 46 | /// |
| 47 | /// // Create a task with the future and the schedule function. |
| 48 | /// let (task, handle) = async_task::spawn(future, schedule, ()); |
| 49 | /// ``` |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 50 | pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>) |
| 51 | where |
| 52 | F: Future<Output = R> + Send + 'static, |
| 53 | R: Send + 'static, |
| 54 | S: Fn(Task<T>) + Send + Sync + 'static, |
| 55 | T: Send + Sync + 'static, |
| 56 | { |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 57 | let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag); |
| 58 | let task = Task { |
| 59 | raw_task, |
| 60 | _marker: PhantomData, |
| 61 | }; |
| 62 | let handle = JoinHandle { |
| 63 | raw_task, |
| 64 | _marker: PhantomData, |
| 65 | }; |
| 66 | (task, handle) |
| 67 | } |
| 68 | |
| 69 | /// Creates a new local task. |
| 70 | /// |
| 71 | /// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that |
| 72 | /// awaits its result. |
| 73 | /// |
| 74 | /// When run, the task polls `future`. When woken up, it gets scheduled for running by the |
| 75 | /// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. |
| 76 | /// |
Stjepan Glavina | 951d971 | 2019-11-25 18:47:16 +0100 | [diff] [blame^] | 77 | /// The schedule function should not attempt to run the task nor to drop it. Instead, it should |
| 78 | /// push the task into some kind of queue so that it can be processed later. |
| 79 | /// |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 80 | /// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the |
| 81 | /// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur. |
| 82 | /// |
| 83 | /// [`Task`]: struct.Task.html |
| 84 | /// [`JoinHandle`]: struct.JoinHandle.html |
| 85 | /// [`spawn`]: fn.spawn.html |
| 86 | /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html |
| 87 | /// |
| 88 | /// # Examples |
| 89 | /// |
| 90 | /// ``` |
| 91 | /// use crossbeam::channel; |
| 92 | /// |
| 93 | /// // The future inside the task. |
| 94 | /// let future = async { |
| 95 | /// println!("Hello, world!"); |
| 96 | /// }; |
| 97 | /// |
| 98 | /// // If the task gets woken up, it will be sent into this channel. |
| 99 | /// let (s, r) = channel::unbounded(); |
| 100 | /// let schedule = move |task| s.send(task).unwrap(); |
| 101 | /// |
| 102 | /// // Create a task with the future and the schedule function. |
| 103 | /// let (task, handle) = async_task::spawn_local(future, schedule, ()); |
| 104 | /// ``` |
| 105 | pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>) |
| 106 | where |
| 107 | F: Future<Output = R> + 'static, |
| 108 | R: 'static, |
| 109 | S: Fn(Task<T>) + Send + Sync + 'static, |
| 110 | T: Send + Sync + 'static, |
| 111 | { |
| 112 | thread_local! { |
| 113 | static ID: ThreadId = thread::current().id(); |
| 114 | } |
| 115 | |
| 116 | struct Checked<F> { |
| 117 | id: ThreadId, |
| 118 | inner: ManuallyDrop<F>, |
| 119 | } |
| 120 | |
| 121 | impl<F> Drop for Checked<F> { |
| 122 | fn drop(&mut self) { |
| 123 | if ID.with(|id| *id) != self.id { |
| 124 | panic!("local task dropped by a thread that didn't spawn it"); |
| 125 | } |
| 126 | unsafe { |
| 127 | ManuallyDrop::drop(&mut self.inner); |
| 128 | } |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | impl<F: Future> Future for Checked<F> { |
| 133 | type Output = F::Output; |
| 134 | |
| 135 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 136 | if ID.with(|id| *id) != self.id { |
| 137 | panic!("local task polled by a thread that didn't spawn it"); |
| 138 | } |
| 139 | unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | let future = Checked { |
| 144 | id: ID.with(|id| *id), |
| 145 | inner: ManuallyDrop::new(future), |
| 146 | }; |
| 147 | |
| 148 | let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag); |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 149 | let task = Task { |
| 150 | raw_task, |
| 151 | _marker: PhantomData, |
| 152 | }; |
| 153 | let handle = JoinHandle { |
| 154 | raw_task, |
| 155 | _marker: PhantomData, |
| 156 | }; |
| 157 | (task, handle) |
| 158 | } |
| 159 | |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 160 | /// A task reference that runs its future. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 161 | /// |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 162 | /// At any moment in time, there is at most one [`Task`] reference associated with a particular |
| 163 | /// task. Running consumes the [`Task`] reference and polls its internal future. If the future is |
| 164 | /// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`] |
| 165 | /// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`]. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 166 | /// |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 167 | /// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 168 | /// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of |
| 169 | /// runnable tasks. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 170 | /// |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 171 | /// If the [`Task`] reference is dropped without getting run, the task is automatically cancelled. |
| 172 | /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible |
| 173 | /// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt |
| 174 | /// to run the task won't do anything. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 175 | /// |
| 176 | /// [`run()`]: struct.Task.html#method.run |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 177 | /// [`JoinHandle`]: struct.JoinHandle.html |
| 178 | /// [`Task`]: struct.Task.html |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 179 | /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 180 | pub struct Task<T> { |
| 181 | /// A pointer to the heap-allocated task. |
| 182 | pub(crate) raw_task: NonNull<()>, |
| 183 | |
| 184 | /// A marker capturing the generic type `T`. |
| 185 | pub(crate) _marker: PhantomData<T>, |
| 186 | } |
| 187 | |
| 188 | unsafe impl<T> Send for Task<T> {} |
| 189 | unsafe impl<T> Sync for Task<T> {} |
| 190 | |
| 191 | impl<T> Task<T> { |
| 192 | /// Schedules the task. |
| 193 | /// |
| 194 | /// This is a convenience method that simply reschedules the task by passing it to its schedule |
| 195 | /// function. |
| 196 | /// |
| 197 | /// If the task is cancelled, this method won't do anything. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 198 | pub fn schedule(self) { |
| 199 | let ptr = self.raw_task.as_ptr(); |
| 200 | let header = ptr as *const Header; |
| 201 | mem::forget(self); |
| 202 | |
| 203 | unsafe { |
| 204 | ((*header).vtable.schedule)(ptr); |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | /// Runs the task. |
| 209 | /// |
| 210 | /// This method polls the task's future. If the future completes, its result will become |
| 211 | /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 212 | /// be woken up in order to be rescheduled and run again. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 213 | /// |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 214 | /// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do |
| 215 | /// anything. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 216 | /// |
| 217 | /// It is possible that polling the future panics, in which case the panic will be propagated |
| 218 | /// into the caller. It is advised that invocations of this method are wrapped inside |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 219 | /// [`catch_unwind`]. If a panic occurs, the task is automatically cancelled. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 220 | /// |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 221 | /// [`JoinHandle`]: struct.JoinHandle.html |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 222 | /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 223 | pub fn run(self) { |
| 224 | let ptr = self.raw_task.as_ptr(); |
| 225 | let header = ptr as *const Header; |
| 226 | mem::forget(self); |
| 227 | |
| 228 | unsafe { |
| 229 | ((*header).vtable.run)(ptr); |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | /// Cancels the task. |
| 234 | /// |
| 235 | /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt |
Stjepan Glavina | 7a8962b | 2019-08-16 11:25:25 +0200 | [diff] [blame] | 236 | /// to run it won't do anything. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 237 | /// |
| 238 | /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 239 | pub fn cancel(&self) { |
| 240 | let ptr = self.raw_task.as_ptr(); |
| 241 | let header = ptr as *const Header; |
| 242 | |
| 243 | unsafe { |
| 244 | (*header).cancel(); |
| 245 | } |
| 246 | } |
| 247 | |
| 248 | /// Returns a reference to the tag stored inside the task. |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 249 | pub fn tag(&self) -> &T { |
| 250 | let offset = Header::offset_tag::<T>(); |
| 251 | let ptr = self.raw_task.as_ptr(); |
| 252 | |
| 253 | unsafe { |
| 254 | let raw = (ptr as *mut u8).add(offset) as *const T; |
| 255 | &*raw |
| 256 | } |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | impl<T> Drop for Task<T> { |
| 261 | fn drop(&mut self) { |
| 262 | let ptr = self.raw_task.as_ptr(); |
| 263 | let header = ptr as *const Header; |
| 264 | |
| 265 | unsafe { |
| 266 | // Cancel the task. |
| 267 | (*header).cancel(); |
| 268 | |
| 269 | // Drop the future. |
| 270 | ((*header).vtable.drop_future)(ptr); |
| 271 | |
| 272 | // Drop the task reference. |
Stjepan Glavina | 5c398cf | 2019-08-20 15:29:43 +0200 | [diff] [blame] | 273 | ((*header).vtable.drop_task)(ptr); |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 274 | } |
| 275 | } |
| 276 | } |
| 277 | |
| 278 | impl<T: fmt::Debug> fmt::Debug for Task<T> { |
| 279 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 280 | let ptr = self.raw_task.as_ptr(); |
| 281 | let header = ptr as *const Header; |
| 282 | |
| 283 | f.debug_struct("Task") |
| 284 | .field("header", unsafe { &(*header) }) |
| 285 | .field("tag", self.tag()) |
| 286 | .finish() |
| 287 | } |
| 288 | } |