blob: 0c57a854761c187b8916da14ba0a06ee53e00ef9 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem::{self, ManuallyDrop};
5use core::pin::Pin;
6use core::ptr::NonNull;
Stjepan Glavinafad623a2020-04-14 14:33:37 +02007use core::sync::atomic::Ordering;
Stjepan Glavinaaf051a52020-01-06 15:25:52 -06008use core::task::{Context, Poll, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +02009
10use crate::header::Header;
11use crate::raw::RawTask;
Stjepan Glavinafad623a2020-04-14 14:33:37 +020012use crate::state::*;
Stjepan Glavina1479e862019-08-12 20:18:51 +020013use crate::JoinHandle;
14
15/// Creates a new task.
16///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020017/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
Stjepan Glavina1479e862019-08-12 20:18:51 +020018/// awaits its result.
19///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020020/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
21/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020022///
Stjepan Glavina951d9712019-11-25 18:47:16 +010023/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
24/// push the task into some kind of queue so that it can be processed later.
25///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010026/// If you need to spawn a future that does not implement [`Send`], consider using the
27/// [`spawn_local`] function instead.
28///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020029/// [`Task`]: struct.Task.html
30/// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010031/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
32/// [`spawn_local`]: fn.spawn_local.html
Stjepan Glavina1479e862019-08-12 20:18:51 +020033///
34/// # Examples
35///
36/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020037/// use crossbeam::channel;
38///
39/// // The future inside the task.
40/// let future = async {
41/// println!("Hello, world!");
42/// };
43///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020044/// // If the task gets woken up, it will be sent into this channel.
Stjepan Glavina1479e862019-08-12 20:18:51 +020045/// let (s, r) = channel::unbounded();
46/// let schedule = move |task| s.send(task).unwrap();
47///
48/// // Create a task with the future and the schedule function.
49/// let (task, handle) = async_task::spawn(future, schedule, ());
50/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020051pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
52where
53 F: Future<Output = R> + Send + 'static,
54 R: Send + 'static,
55 S: Fn(Task<T>) + Send + Sync + 'static,
56 T: Send + Sync + 'static,
57{
Stjepan Glavinad3011bf2020-04-14 19:52:56 +020058 // Allocate large futures on the heap.
59 let raw_task = if mem::size_of::<F>() >= 2048 {
60 let future = alloc::boxed::Box::pin(future);
61 RawTask::<_, R, S, T>::allocate(future, schedule, tag)
62 } else {
63 RawTask::<F, R, S, T>::allocate(future, schedule, tag)
64 };
65
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010066 let task = Task {
67 raw_task,
68 _marker: PhantomData,
69 };
70 let handle = JoinHandle {
71 raw_task,
72 _marker: PhantomData,
73 };
74 (task, handle)
75}
76
77/// Creates a new local task.
78///
79/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
80/// awaits its result.
81///
82/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
83/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
84///
Stjepan Glavina951d9712019-11-25 18:47:16 +010085/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
86/// push the task into some kind of queue so that it can be processed later.
87///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010088/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
89/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
90///
91/// [`Task`]: struct.Task.html
92/// [`JoinHandle`]: struct.JoinHandle.html
93/// [`spawn`]: fn.spawn.html
94/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
95///
96/// # Examples
97///
98/// ```
99/// use crossbeam::channel;
100///
101/// // The future inside the task.
102/// let future = async {
103/// println!("Hello, world!");
104/// };
105///
106/// // If the task gets woken up, it will be sent into this channel.
107/// let (s, r) = channel::unbounded();
108/// let schedule = move |task| s.send(task).unwrap();
109///
110/// // Create a task with the future and the schedule function.
111/// let (task, handle) = async_task::spawn_local(future, schedule, ());
112/// ```
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100113#[cfg(any(unix, windows))]
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100114pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
115where
116 F: Future<Output = R> + 'static,
117 R: 'static,
118 S: Fn(Task<T>) + Send + Sync + 'static,
119 T: Send + Sync + 'static,
120{
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600121 #[cfg(unix)]
122 #[inline]
123 fn thread_id() -> usize {
124 unsafe { libc::pthread_self() as usize }
125 }
126
127 #[cfg(windows)]
128 #[inline]
129 fn thread_id() -> usize {
130 unsafe { winapi::um::processthreadsapi::GetCurrentThreadId() as usize }
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100131 }
132
133 struct Checked<F> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600134 id: usize,
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100135 inner: ManuallyDrop<F>,
136 }
137
138 impl<F> Drop for Checked<F> {
139 fn drop(&mut self) {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600140 assert!(
141 self.id == thread_id(),
142 "local task dropped by a thread that didn't spawn it"
143 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100144 unsafe {
145 ManuallyDrop::drop(&mut self.inner);
146 }
147 }
148 }
149
150 impl<F: Future> Future for Checked<F> {
151 type Output = F::Output;
152
153 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600154 assert!(
155 self.id == thread_id(),
156 "local task polled by a thread that didn't spawn it"
157 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100158 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
159 }
160 }
161
Stjepan Glavinad3011bf2020-04-14 19:52:56 +0200162 // Wrap the future into one that which thread it's on.
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100163 let future = Checked {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600164 id: thread_id(),
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100165 inner: ManuallyDrop::new(future),
166 };
167
Stjepan Glavinad3011bf2020-04-14 19:52:56 +0200168 // Allocate large futures on the heap.
169 let raw_task = if mem::size_of::<F>() >= 2048 {
170 let future = alloc::boxed::Box::pin(future);
171 RawTask::<_, R, S, T>::allocate(future, schedule, tag)
172 } else {
173 RawTask::<_, R, S, T>::allocate(future, schedule, tag)
174 };
175
Stjepan Glavina1479e862019-08-12 20:18:51 +0200176 let task = Task {
177 raw_task,
178 _marker: PhantomData,
179 };
180 let handle = JoinHandle {
181 raw_task,
182 _marker: PhantomData,
183 };
184 (task, handle)
185}
186
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200187/// A task reference that runs its future.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200188///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200189/// At any moment in time, there is at most one [`Task`] reference associated with a particular
190/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
191/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
192/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
Stjepan Glavina1479e862019-08-12 20:18:51 +0200193///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200194/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200195/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
196/// runnable tasks.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200197///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200198/// If the [`Task`] reference is dropped without getting run, the task is automatically canceled.
199/// When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200200/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
201/// to run the task won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200202///
203/// [`run()`]: struct.Task.html#method.run
Stjepan Glavina1479e862019-08-12 20:18:51 +0200204/// [`JoinHandle`]: struct.JoinHandle.html
205/// [`Task`]: struct.Task.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200206/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200207pub struct Task<T> {
208 /// A pointer to the heap-allocated task.
209 pub(crate) raw_task: NonNull<()>,
210
211 /// A marker capturing the generic type `T`.
212 pub(crate) _marker: PhantomData<T>,
213}
214
215unsafe impl<T> Send for Task<T> {}
216unsafe impl<T> Sync for Task<T> {}
217
218impl<T> Task<T> {
219 /// Schedules the task.
220 ///
221 /// This is a convenience method that simply reschedules the task by passing it to its schedule
222 /// function.
223 ///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200224 /// If the task is canceled, this method won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200225 pub fn schedule(self) {
226 let ptr = self.raw_task.as_ptr();
227 let header = ptr as *const Header;
228 mem::forget(self);
229
230 unsafe {
231 ((*header).vtable.schedule)(ptr);
232 }
233 }
234
235 /// Runs the task.
236 ///
Stjepan Glavina94059052020-04-12 19:46:20 +0200237 /// Returns `true` if the task was woken while running, in which case it gets rescheduled at
238 /// the end of this method invocation.
239 ///
Stjepan Glavina1479e862019-08-12 20:18:51 +0200240 /// This method polls the task's future. If the future completes, its result will become
241 /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200242 /// be woken up in order to be rescheduled and run again.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200243 ///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200244 /// If the task was canceled by a [`JoinHandle`] before it gets run, then this method won't do
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200245 /// anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200246 ///
247 /// It is possible that polling the future panics, in which case the panic will be propagated
248 /// into the caller. It is advised that invocations of this method are wrapped inside
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200249 /// [`catch_unwind`]. If a panic occurs, the task is automatically canceled.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200250 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200251 /// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200252 /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
Stjepan Glavina94059052020-04-12 19:46:20 +0200253 pub fn run(self) -> bool {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200254 let ptr = self.raw_task.as_ptr();
255 let header = ptr as *const Header;
256 mem::forget(self);
257
Stjepan Glavina4ab094c2020-04-12 21:22:37 +0200258 unsafe { ((*header).vtable.run)(ptr) }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200259 }
260
261 /// Cancels the task.
262 ///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200263 /// When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200264 /// to run it won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200265 ///
266 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200267 pub fn cancel(&self) {
268 let ptr = self.raw_task.as_ptr();
269 let header = ptr as *const Header;
270
271 unsafe {
272 (*header).cancel();
273 }
274 }
275
276 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200277 pub fn tag(&self) -> &T {
278 let offset = Header::offset_tag::<T>();
279 let ptr = self.raw_task.as_ptr();
280
281 unsafe {
282 let raw = (ptr as *mut u8).add(offset) as *const T;
283 &*raw
284 }
285 }
Stjepan Glavinaaf051a52020-01-06 15:25:52 -0600286
287 /// Converts this task into a raw pointer to the tag.
288 pub fn into_raw(self) -> *const T {
289 let offset = Header::offset_tag::<T>();
290 let ptr = self.raw_task.as_ptr();
291 mem::forget(self);
292
293 unsafe { (ptr as *mut u8).add(offset) as *const T }
294 }
295
296 /// Converts a raw pointer to the tag into a task.
297 ///
298 /// This method should only be used with raw pointers returned from [`into_raw`].
299 ///
300 /// [`into_raw`]: #method.into_raw
301 pub unsafe fn from_raw(raw: *const T) -> Task<T> {
302 let offset = Header::offset_tag::<T>();
303 let ptr = (raw as *mut u8).sub(offset) as *mut ();
304
305 Task {
306 raw_task: NonNull::new_unchecked(ptr),
307 _marker: PhantomData,
308 }
309 }
310
311 /// Returns a waker associated with this task.
312 pub fn waker(&self) -> Waker {
313 let ptr = self.raw_task.as_ptr();
314 let header = ptr as *const Header;
315
316 unsafe {
317 let raw_waker = ((*header).vtable.clone_waker)(ptr);
318 Waker::from_raw(raw_waker)
319 }
320 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200321}
322
323impl<T> Drop for Task<T> {
324 fn drop(&mut self) {
325 let ptr = self.raw_task.as_ptr();
326 let header = ptr as *const Header;
327
328 unsafe {
329 // Cancel the task.
330 (*header).cancel();
331
332 // Drop the future.
333 ((*header).vtable.drop_future)(ptr);
334
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200335 // Mark the task as unscheduled.
336 let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
337
338 // Notify the awaiter that the future has been dropped.
339 if state & AWAITER != 0 {
340 (*header).notify(None);
341 }
342
Stjepan Glavina1479e862019-08-12 20:18:51 +0200343 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200344 ((*header).vtable.drop_task)(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200345 }
346 }
347}
348
349impl<T: fmt::Debug> fmt::Debug for Task<T> {
350 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351 let ptr = self.raw_task.as_ptr();
352 let header = ptr as *const Header;
353
354 f.debug_struct("Task")
355 .field("header", unsafe { &(*header) })
356 .field("tag", self.tag())
357 .finish()
358 }
359}