blob: b26c082bf30741c67e1955e1a91475da17ed7d28 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem::{self, ManuallyDrop};
5use core::pin::Pin;
6use core::ptr::NonNull;
Stjepan Glavinaaf051a52020-01-06 15:25:52 -06007use core::task::{Context, Poll, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +02008
9use crate::header::Header;
10use crate::raw::RawTask;
11use crate::JoinHandle;
12
13/// Creates a new task.
14///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020015/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
Stjepan Glavina1479e862019-08-12 20:18:51 +020016/// awaits its result.
17///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020018/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
19/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020020///
Stjepan Glavina951d9712019-11-25 18:47:16 +010021/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
22/// push the task into some kind of queue so that it can be processed later.
23///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010024/// If you need to spawn a future that does not implement [`Send`], consider using the
25/// [`spawn_local`] function instead.
26///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020027/// [`Task`]: struct.Task.html
28/// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010029/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
30/// [`spawn_local`]: fn.spawn_local.html
Stjepan Glavina1479e862019-08-12 20:18:51 +020031///
32/// # Examples
33///
34/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020035/// use crossbeam::channel;
36///
37/// // The future inside the task.
38/// let future = async {
39/// println!("Hello, world!");
40/// };
41///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020042/// // If the task gets woken up, it will be sent into this channel.
Stjepan Glavina1479e862019-08-12 20:18:51 +020043/// let (s, r) = channel::unbounded();
44/// let schedule = move |task| s.send(task).unwrap();
45///
46/// // Create a task with the future and the schedule function.
47/// let (task, handle) = async_task::spawn(future, schedule, ());
48/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020049pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
50where
51 F: Future<Output = R> + Send + 'static,
52 R: Send + 'static,
53 S: Fn(Task<T>) + Send + Sync + 'static,
54 T: Send + Sync + 'static,
55{
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010056 let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
57 let task = Task {
58 raw_task,
59 _marker: PhantomData,
60 };
61 let handle = JoinHandle {
62 raw_task,
63 _marker: PhantomData,
64 };
65 (task, handle)
66}
67
68/// Creates a new local task.
69///
70/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
71/// awaits its result.
72///
73/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
74/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
75///
Stjepan Glavina951d9712019-11-25 18:47:16 +010076/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
77/// push the task into some kind of queue so that it can be processed later.
78///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010079/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
80/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
81///
82/// [`Task`]: struct.Task.html
83/// [`JoinHandle`]: struct.JoinHandle.html
84/// [`spawn`]: fn.spawn.html
85/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
86///
87/// # Examples
88///
89/// ```
90/// use crossbeam::channel;
91///
92/// // The future inside the task.
93/// let future = async {
94/// println!("Hello, world!");
95/// };
96///
97/// // If the task gets woken up, it will be sent into this channel.
98/// let (s, r) = channel::unbounded();
99/// let schedule = move |task| s.send(task).unwrap();
100///
101/// // Create a task with the future and the schedule function.
102/// let (task, handle) = async_task::spawn_local(future, schedule, ());
103/// ```
104pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
105where
106 F: Future<Output = R> + 'static,
107 R: 'static,
108 S: Fn(Task<T>) + Send + Sync + 'static,
109 T: Send + Sync + 'static,
110{
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600111 #[cfg(unix)]
112 #[inline]
113 fn thread_id() -> usize {
114 unsafe { libc::pthread_self() as usize }
115 }
116
117 #[cfg(windows)]
118 #[inline]
119 fn thread_id() -> usize {
120 unsafe { winapi::um::processthreadsapi::GetCurrentThreadId() as usize }
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100121 }
122
123 struct Checked<F> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600124 id: usize,
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100125 inner: ManuallyDrop<F>,
126 }
127
128 impl<F> Drop for Checked<F> {
129 fn drop(&mut self) {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600130 assert!(
131 self.id == thread_id(),
132 "local task dropped by a thread that didn't spawn it"
133 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100134 unsafe {
135 ManuallyDrop::drop(&mut self.inner);
136 }
137 }
138 }
139
140 impl<F: Future> Future for Checked<F> {
141 type Output = F::Output;
142
143 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600144 assert!(
145 self.id == thread_id(),
146 "local task polled by a thread that didn't spawn it"
147 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100148 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
149 }
150 }
151
152 let future = Checked {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600153 id: thread_id(),
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100154 inner: ManuallyDrop::new(future),
155 };
156
157 let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200158 let task = Task {
159 raw_task,
160 _marker: PhantomData,
161 };
162 let handle = JoinHandle {
163 raw_task,
164 _marker: PhantomData,
165 };
166 (task, handle)
167}
168
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200169/// A task reference that runs its future.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200170///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200171/// At any moment in time, there is at most one [`Task`] reference associated with a particular
172/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
173/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
174/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
Stjepan Glavina1479e862019-08-12 20:18:51 +0200175///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200176/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200177/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
178/// runnable tasks.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200179///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200180/// If the [`Task`] reference is dropped without getting run, the task is automatically cancelled.
181/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
182/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
183/// to run the task won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200184///
185/// [`run()`]: struct.Task.html#method.run
Stjepan Glavina1479e862019-08-12 20:18:51 +0200186/// [`JoinHandle`]: struct.JoinHandle.html
187/// [`Task`]: struct.Task.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200188/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200189pub struct Task<T> {
190 /// A pointer to the heap-allocated task.
191 pub(crate) raw_task: NonNull<()>,
192
193 /// A marker capturing the generic type `T`.
194 pub(crate) _marker: PhantomData<T>,
195}
196
197unsafe impl<T> Send for Task<T> {}
198unsafe impl<T> Sync for Task<T> {}
199
200impl<T> Task<T> {
201 /// Schedules the task.
202 ///
203 /// This is a convenience method that simply reschedules the task by passing it to its schedule
204 /// function.
205 ///
206 /// If the task is cancelled, this method won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200207 pub fn schedule(self) {
208 let ptr = self.raw_task.as_ptr();
209 let header = ptr as *const Header;
210 mem::forget(self);
211
212 unsafe {
213 ((*header).vtable.schedule)(ptr);
214 }
215 }
216
217 /// Runs the task.
218 ///
219 /// This method polls the task's future. If the future completes, its result will become
220 /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200221 /// be woken up in order to be rescheduled and run again.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200222 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200223 /// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do
224 /// anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200225 ///
226 /// It is possible that polling the future panics, in which case the panic will be propagated
227 /// into the caller. It is advised that invocations of this method are wrapped inside
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200228 /// [`catch_unwind`]. If a panic occurs, the task is automatically cancelled.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200229 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200230 /// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200231 /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200232 pub fn run(self) {
233 let ptr = self.raw_task.as_ptr();
234 let header = ptr as *const Header;
235 mem::forget(self);
236
237 unsafe {
238 ((*header).vtable.run)(ptr);
239 }
240 }
241
242 /// Cancels the task.
243 ///
244 /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200245 /// to run it won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200246 ///
247 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200248 pub fn cancel(&self) {
249 let ptr = self.raw_task.as_ptr();
250 let header = ptr as *const Header;
251
252 unsafe {
253 (*header).cancel();
254 }
255 }
256
257 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200258 pub fn tag(&self) -> &T {
259 let offset = Header::offset_tag::<T>();
260 let ptr = self.raw_task.as_ptr();
261
262 unsafe {
263 let raw = (ptr as *mut u8).add(offset) as *const T;
264 &*raw
265 }
266 }
Stjepan Glavinaaf051a52020-01-06 15:25:52 -0600267
268 /// Converts this task into a raw pointer to the tag.
269 pub fn into_raw(self) -> *const T {
270 let offset = Header::offset_tag::<T>();
271 let ptr = self.raw_task.as_ptr();
272 mem::forget(self);
273
274 unsafe { (ptr as *mut u8).add(offset) as *const T }
275 }
276
277 /// Converts a raw pointer to the tag into a task.
278 ///
279 /// This method should only be used with raw pointers returned from [`into_raw`].
280 ///
281 /// [`into_raw`]: #method.into_raw
282 pub unsafe fn from_raw(raw: *const T) -> Task<T> {
283 let offset = Header::offset_tag::<T>();
284 let ptr = (raw as *mut u8).sub(offset) as *mut ();
285
286 Task {
287 raw_task: NonNull::new_unchecked(ptr),
288 _marker: PhantomData,
289 }
290 }
291
292 /// Returns a waker associated with this task.
293 pub fn waker(&self) -> Waker {
294 let ptr = self.raw_task.as_ptr();
295 let header = ptr as *const Header;
296
297 unsafe {
298 let raw_waker = ((*header).vtable.clone_waker)(ptr);
299 Waker::from_raw(raw_waker)
300 }
301 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200302}
303
304impl<T> Drop for Task<T> {
305 fn drop(&mut self) {
306 let ptr = self.raw_task.as_ptr();
307 let header = ptr as *const Header;
308
309 unsafe {
310 // Cancel the task.
311 (*header).cancel();
312
313 // Drop the future.
314 ((*header).vtable.drop_future)(ptr);
315
316 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200317 ((*header).vtable.drop_task)(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200318 }
319 }
320}
321
322impl<T: fmt::Debug> fmt::Debug for Task<T> {
323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324 let ptr = self.raw_task.as_ptr();
325 let header = ptr as *const Header;
326
327 f.debug_struct("Task")
328 .field("header", unsafe { &(*header) })
329 .field("tag", self.tag())
330 .finish()
331 }
332}