blob: a300d63f870c639b5cb23f85e5c51c12025a37b0 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem::{self, ManuallyDrop};
5use core::pin::Pin;
6use core::ptr::NonNull;
Stjepan Glavinafad623a2020-04-14 14:33:37 +02007use core::sync::atomic::Ordering;
Stjepan Glavinaaf051a52020-01-06 15:25:52 -06008use core::task::{Context, Poll, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +02009
10use crate::header::Header;
11use crate::raw::RawTask;
Stjepan Glavinafad623a2020-04-14 14:33:37 +020012use crate::state::*;
Stjepan Glavina1479e862019-08-12 20:18:51 +020013use crate::JoinHandle;
14
15/// Creates a new task.
16///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020017/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
Stjepan Glavina1479e862019-08-12 20:18:51 +020018/// awaits its result.
19///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020020/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
21/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020022///
Stjepan Glavina951d9712019-11-25 18:47:16 +010023/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
24/// push the task into some kind of queue so that it can be processed later.
25///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010026/// If you need to spawn a future that does not implement [`Send`], consider using the
27/// [`spawn_local`] function instead.
28///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020029/// [`Task`]: struct.Task.html
30/// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010031/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
32/// [`spawn_local`]: fn.spawn_local.html
Stjepan Glavina1479e862019-08-12 20:18:51 +020033///
34/// # Examples
35///
36/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020037/// use crossbeam::channel;
38///
39/// // The future inside the task.
40/// let future = async {
41/// println!("Hello, world!");
42/// };
43///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020044/// // If the task gets woken up, it will be sent into this channel.
Stjepan Glavina1479e862019-08-12 20:18:51 +020045/// let (s, r) = channel::unbounded();
46/// let schedule = move |task| s.send(task).unwrap();
47///
48/// // Create a task with the future and the schedule function.
49/// let (task, handle) = async_task::spawn(future, schedule, ());
50/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020051pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
52where
53 F: Future<Output = R> + Send + 'static,
54 R: Send + 'static,
55 S: Fn(Task<T>) + Send + Sync + 'static,
56 T: Send + Sync + 'static,
57{
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010058 let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
59 let task = Task {
60 raw_task,
61 _marker: PhantomData,
62 };
63 let handle = JoinHandle {
64 raw_task,
65 _marker: PhantomData,
66 };
67 (task, handle)
68}
69
70/// Creates a new local task.
71///
72/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
73/// awaits its result.
74///
75/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
76/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
77///
Stjepan Glavina951d9712019-11-25 18:47:16 +010078/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
79/// push the task into some kind of queue so that it can be processed later.
80///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010081/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
82/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
83///
84/// [`Task`]: struct.Task.html
85/// [`JoinHandle`]: struct.JoinHandle.html
86/// [`spawn`]: fn.spawn.html
87/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
88///
89/// # Examples
90///
91/// ```
92/// use crossbeam::channel;
93///
94/// // The future inside the task.
95/// let future = async {
96/// println!("Hello, world!");
97/// };
98///
99/// // If the task gets woken up, it will be sent into this channel.
100/// let (s, r) = channel::unbounded();
101/// let schedule = move |task| s.send(task).unwrap();
102///
103/// // Create a task with the future and the schedule function.
104/// let (task, handle) = async_task::spawn_local(future, schedule, ());
105/// ```
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100106#[cfg(any(unix, windows))]
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100107pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
108where
109 F: Future<Output = R> + 'static,
110 R: 'static,
111 S: Fn(Task<T>) + Send + Sync + 'static,
112 T: Send + Sync + 'static,
113{
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600114 #[cfg(unix)]
115 #[inline]
116 fn thread_id() -> usize {
117 unsafe { libc::pthread_self() as usize }
118 }
119
120 #[cfg(windows)]
121 #[inline]
122 fn thread_id() -> usize {
123 unsafe { winapi::um::processthreadsapi::GetCurrentThreadId() as usize }
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100124 }
125
126 struct Checked<F> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600127 id: usize,
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100128 inner: ManuallyDrop<F>,
129 }
130
131 impl<F> Drop for Checked<F> {
132 fn drop(&mut self) {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600133 assert!(
134 self.id == thread_id(),
135 "local task dropped by a thread that didn't spawn it"
136 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100137 unsafe {
138 ManuallyDrop::drop(&mut self.inner);
139 }
140 }
141 }
142
143 impl<F: Future> Future for Checked<F> {
144 type Output = F::Output;
145
146 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600147 assert!(
148 self.id == thread_id(),
149 "local task polled by a thread that didn't spawn it"
150 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100151 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
152 }
153 }
154
155 let future = Checked {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600156 id: thread_id(),
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100157 inner: ManuallyDrop::new(future),
158 };
159
160 let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200161 let task = Task {
162 raw_task,
163 _marker: PhantomData,
164 };
165 let handle = JoinHandle {
166 raw_task,
167 _marker: PhantomData,
168 };
169 (task, handle)
170}
171
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200172/// A task reference that runs its future.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200173///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200174/// At any moment in time, there is at most one [`Task`] reference associated with a particular
175/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
176/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
177/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
Stjepan Glavina1479e862019-08-12 20:18:51 +0200178///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200179/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200180/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
181/// runnable tasks.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200182///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200183/// If the [`Task`] reference is dropped without getting run, the task is automatically canceled.
184/// When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200185/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
186/// to run the task won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200187///
188/// [`run()`]: struct.Task.html#method.run
Stjepan Glavina1479e862019-08-12 20:18:51 +0200189/// [`JoinHandle`]: struct.JoinHandle.html
190/// [`Task`]: struct.Task.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200191/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200192pub struct Task<T> {
193 /// A pointer to the heap-allocated task.
194 pub(crate) raw_task: NonNull<()>,
195
196 /// A marker capturing the generic type `T`.
197 pub(crate) _marker: PhantomData<T>,
198}
199
200unsafe impl<T> Send for Task<T> {}
201unsafe impl<T> Sync for Task<T> {}
202
203impl<T> Task<T> {
204 /// Schedules the task.
205 ///
206 /// This is a convenience method that simply reschedules the task by passing it to its schedule
207 /// function.
208 ///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200209 /// If the task is canceled, this method won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200210 pub fn schedule(self) {
211 let ptr = self.raw_task.as_ptr();
212 let header = ptr as *const Header;
213 mem::forget(self);
214
215 unsafe {
216 ((*header).vtable.schedule)(ptr);
217 }
218 }
219
220 /// Runs the task.
221 ///
Stjepan Glavina94059052020-04-12 19:46:20 +0200222 /// Returns `true` if the task was woken while running, in which case it gets rescheduled at
223 /// the end of this method invocation.
224 ///
Stjepan Glavina1479e862019-08-12 20:18:51 +0200225 /// This method polls the task's future. If the future completes, its result will become
226 /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200227 /// be woken up in order to be rescheduled and run again.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200228 ///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200229 /// If the task was canceled by a [`JoinHandle`] before it gets run, then this method won't do
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200230 /// anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200231 ///
232 /// It is possible that polling the future panics, in which case the panic will be propagated
233 /// into the caller. It is advised that invocations of this method are wrapped inside
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200234 /// [`catch_unwind`]. If a panic occurs, the task is automatically canceled.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200235 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200236 /// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200237 /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
Stjepan Glavina94059052020-04-12 19:46:20 +0200238 pub fn run(self) -> bool {
Stjepan Glavina1479e862019-08-12 20:18:51 +0200239 let ptr = self.raw_task.as_ptr();
240 let header = ptr as *const Header;
241 mem::forget(self);
242
Stjepan Glavina4ab094c2020-04-12 21:22:37 +0200243 unsafe { ((*header).vtable.run)(ptr) }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200244 }
245
246 /// Cancels the task.
247 ///
Stjepan Glavinad7b17fb2020-04-14 14:38:57 +0200248 /// When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200249 /// to run it won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200250 ///
251 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200252 pub fn cancel(&self) {
253 let ptr = self.raw_task.as_ptr();
254 let header = ptr as *const Header;
255
256 unsafe {
257 (*header).cancel();
258 }
259 }
260
261 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200262 pub fn tag(&self) -> &T {
263 let offset = Header::offset_tag::<T>();
264 let ptr = self.raw_task.as_ptr();
265
266 unsafe {
267 let raw = (ptr as *mut u8).add(offset) as *const T;
268 &*raw
269 }
270 }
Stjepan Glavinaaf051a52020-01-06 15:25:52 -0600271
272 /// Converts this task into a raw pointer to the tag.
273 pub fn into_raw(self) -> *const T {
274 let offset = Header::offset_tag::<T>();
275 let ptr = self.raw_task.as_ptr();
276 mem::forget(self);
277
278 unsafe { (ptr as *mut u8).add(offset) as *const T }
279 }
280
281 /// Converts a raw pointer to the tag into a task.
282 ///
283 /// This method should only be used with raw pointers returned from [`into_raw`].
284 ///
285 /// [`into_raw`]: #method.into_raw
286 pub unsafe fn from_raw(raw: *const T) -> Task<T> {
287 let offset = Header::offset_tag::<T>();
288 let ptr = (raw as *mut u8).sub(offset) as *mut ();
289
290 Task {
291 raw_task: NonNull::new_unchecked(ptr),
292 _marker: PhantomData,
293 }
294 }
295
296 /// Returns a waker associated with this task.
297 pub fn waker(&self) -> Waker {
298 let ptr = self.raw_task.as_ptr();
299 let header = ptr as *const Header;
300
301 unsafe {
302 let raw_waker = ((*header).vtable.clone_waker)(ptr);
303 Waker::from_raw(raw_waker)
304 }
305 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200306}
307
308impl<T> Drop for Task<T> {
309 fn drop(&mut self) {
310 let ptr = self.raw_task.as_ptr();
311 let header = ptr as *const Header;
312
313 unsafe {
314 // Cancel the task.
315 (*header).cancel();
316
317 // Drop the future.
318 ((*header).vtable.drop_future)(ptr);
319
Stjepan Glavinafad623a2020-04-14 14:33:37 +0200320 // Mark the task as unscheduled.
321 let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
322
323 // Notify the awaiter that the future has been dropped.
324 if state & AWAITER != 0 {
325 (*header).notify(None);
326 }
327
Stjepan Glavina1479e862019-08-12 20:18:51 +0200328 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200329 ((*header).vtable.drop_task)(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200330 }
331 }
332}
333
334impl<T: fmt::Debug> fmt::Debug for Task<T> {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 let ptr = self.raw_task.as_ptr();
337 let header = ptr as *const Header;
338
339 f.debug_struct("Task")
340 .field("header", unsafe { &(*header) })
341 .field("tag", self.tag())
342 .finish()
343 }
344}