blob: 8ef209c1b383e5bb9b4264ad07773c03f295f237 [file] [log] [blame]
Stjepan Glavina921e8a02020-01-06 14:31:28 -06001use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem::{self, ManuallyDrop};
5use core::pin::Pin;
6use core::ptr::NonNull;
Stjepan Glavinaaf051a52020-01-06 15:25:52 -06007use core::task::{Context, Poll, Waker};
Stjepan Glavina1479e862019-08-12 20:18:51 +02008
9use crate::header::Header;
10use crate::raw::RawTask;
11use crate::JoinHandle;
12
13/// Creates a new task.
14///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020015/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
Stjepan Glavina1479e862019-08-12 20:18:51 +020016/// awaits its result.
17///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020018/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
19/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020020///
Stjepan Glavina951d9712019-11-25 18:47:16 +010021/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
22/// push the task into some kind of queue so that it can be processed later.
23///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010024/// If you need to spawn a future that does not implement [`Send`], consider using the
25/// [`spawn_local`] function instead.
26///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020027/// [`Task`]: struct.Task.html
28/// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010029/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
30/// [`spawn_local`]: fn.spawn_local.html
Stjepan Glavina1479e862019-08-12 20:18:51 +020031///
32/// # Examples
33///
34/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020035/// use crossbeam::channel;
36///
37/// // The future inside the task.
38/// let future = async {
39/// println!("Hello, world!");
40/// };
41///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020042/// // If the task gets woken up, it will be sent into this channel.
Stjepan Glavina1479e862019-08-12 20:18:51 +020043/// let (s, r) = channel::unbounded();
44/// let schedule = move |task| s.send(task).unwrap();
45///
46/// // Create a task with the future and the schedule function.
47/// let (task, handle) = async_task::spawn(future, schedule, ());
48/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020049pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
50where
51 F: Future<Output = R> + Send + 'static,
52 R: Send + 'static,
53 S: Fn(Task<T>) + Send + Sync + 'static,
54 T: Send + Sync + 'static,
55{
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010056 let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
57 let task = Task {
58 raw_task,
59 _marker: PhantomData,
60 };
61 let handle = JoinHandle {
62 raw_task,
63 _marker: PhantomData,
64 };
65 (task, handle)
66}
67
68/// Creates a new local task.
69///
70/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
71/// awaits its result.
72///
73/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
74/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
75///
Stjepan Glavina951d9712019-11-25 18:47:16 +010076/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
77/// push the task into some kind of queue so that it can be processed later.
78///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010079/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
80/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
81///
82/// [`Task`]: struct.Task.html
83/// [`JoinHandle`]: struct.JoinHandle.html
84/// [`spawn`]: fn.spawn.html
85/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
86///
87/// # Examples
88///
89/// ```
90/// use crossbeam::channel;
91///
92/// // The future inside the task.
93/// let future = async {
94/// println!("Hello, world!");
95/// };
96///
97/// // If the task gets woken up, it will be sent into this channel.
98/// let (s, r) = channel::unbounded();
99/// let schedule = move |task| s.send(task).unwrap();
100///
101/// // Create a task with the future and the schedule function.
102/// let (task, handle) = async_task::spawn_local(future, schedule, ());
103/// ```
Stjepan Glavina7e7d19c2020-01-07 22:45:13 +0100104#[cfg(any(unix, windows))]
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100105pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
106where
107 F: Future<Output = R> + 'static,
108 R: 'static,
109 S: Fn(Task<T>) + Send + Sync + 'static,
110 T: Send + Sync + 'static,
111{
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600112 #[cfg(unix)]
113 #[inline]
114 fn thread_id() -> usize {
115 unsafe { libc::pthread_self() as usize }
116 }
117
118 #[cfg(windows)]
119 #[inline]
120 fn thread_id() -> usize {
121 unsafe { winapi::um::processthreadsapi::GetCurrentThreadId() as usize }
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100122 }
123
124 struct Checked<F> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600125 id: usize,
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100126 inner: ManuallyDrop<F>,
127 }
128
129 impl<F> Drop for Checked<F> {
130 fn drop(&mut self) {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600131 assert!(
132 self.id == thread_id(),
133 "local task dropped by a thread that didn't spawn it"
134 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100135 unsafe {
136 ManuallyDrop::drop(&mut self.inner);
137 }
138 }
139 }
140
141 impl<F: Future> Future for Checked<F> {
142 type Output = F::Output;
143
144 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600145 assert!(
146 self.id == thread_id(),
147 "local task polled by a thread that didn't spawn it"
148 );
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100149 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
150 }
151 }
152
153 let future = Checked {
Stjepan Glavina921e8a02020-01-06 14:31:28 -0600154 id: thread_id(),
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +0100155 inner: ManuallyDrop::new(future),
156 };
157
158 let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200159 let task = Task {
160 raw_task,
161 _marker: PhantomData,
162 };
163 let handle = JoinHandle {
164 raw_task,
165 _marker: PhantomData,
166 };
167 (task, handle)
168}
169
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200170/// A task reference that runs its future.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200171///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200172/// At any moment in time, there is at most one [`Task`] reference associated with a particular
173/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
174/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
175/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
Stjepan Glavina1479e862019-08-12 20:18:51 +0200176///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200177/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200178/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
179/// runnable tasks.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200180///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200181/// If the [`Task`] reference is dropped without getting run, the task is automatically cancelled.
182/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
183/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
184/// to run the task won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200185///
186/// [`run()`]: struct.Task.html#method.run
Stjepan Glavina1479e862019-08-12 20:18:51 +0200187/// [`JoinHandle`]: struct.JoinHandle.html
188/// [`Task`]: struct.Task.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200189/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200190pub struct Task<T> {
191 /// A pointer to the heap-allocated task.
192 pub(crate) raw_task: NonNull<()>,
193
194 /// A marker capturing the generic type `T`.
195 pub(crate) _marker: PhantomData<T>,
196}
197
198unsafe impl<T> Send for Task<T> {}
199unsafe impl<T> Sync for Task<T> {}
200
201impl<T> Task<T> {
202 /// Schedules the task.
203 ///
204 /// This is a convenience method that simply reschedules the task by passing it to its schedule
205 /// function.
206 ///
207 /// If the task is cancelled, this method won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200208 pub fn schedule(self) {
209 let ptr = self.raw_task.as_ptr();
210 let header = ptr as *const Header;
211 mem::forget(self);
212
213 unsafe {
214 ((*header).vtable.schedule)(ptr);
215 }
216 }
217
218 /// Runs the task.
219 ///
220 /// This method polls the task's future. If the future completes, its result will become
221 /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200222 /// be woken up in order to be rescheduled and run again.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200223 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200224 /// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do
225 /// anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200226 ///
227 /// It is possible that polling the future panics, in which case the panic will be propagated
228 /// into the caller. It is advised that invocations of this method are wrapped inside
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200229 /// [`catch_unwind`]. If a panic occurs, the task is automatically cancelled.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200230 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200231 /// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200232 /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200233 pub fn run(self) {
234 let ptr = self.raw_task.as_ptr();
235 let header = ptr as *const Header;
236 mem::forget(self);
237
238 unsafe {
239 ((*header).vtable.run)(ptr);
240 }
241 }
242
243 /// Cancels the task.
244 ///
245 /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200246 /// to run it won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200247 ///
248 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200249 pub fn cancel(&self) {
250 let ptr = self.raw_task.as_ptr();
251 let header = ptr as *const Header;
252
253 unsafe {
254 (*header).cancel();
255 }
256 }
257
258 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200259 pub fn tag(&self) -> &T {
260 let offset = Header::offset_tag::<T>();
261 let ptr = self.raw_task.as_ptr();
262
263 unsafe {
264 let raw = (ptr as *mut u8).add(offset) as *const T;
265 &*raw
266 }
267 }
Stjepan Glavinaaf051a52020-01-06 15:25:52 -0600268
269 /// Converts this task into a raw pointer to the tag.
270 pub fn into_raw(self) -> *const T {
271 let offset = Header::offset_tag::<T>();
272 let ptr = self.raw_task.as_ptr();
273 mem::forget(self);
274
275 unsafe { (ptr as *mut u8).add(offset) as *const T }
276 }
277
278 /// Converts a raw pointer to the tag into a task.
279 ///
280 /// This method should only be used with raw pointers returned from [`into_raw`].
281 ///
282 /// [`into_raw`]: #method.into_raw
283 pub unsafe fn from_raw(raw: *const T) -> Task<T> {
284 let offset = Header::offset_tag::<T>();
285 let ptr = (raw as *mut u8).sub(offset) as *mut ();
286
287 Task {
288 raw_task: NonNull::new_unchecked(ptr),
289 _marker: PhantomData,
290 }
291 }
292
293 /// Returns a waker associated with this task.
294 pub fn waker(&self) -> Waker {
295 let ptr = self.raw_task.as_ptr();
296 let header = ptr as *const Header;
297
298 unsafe {
299 let raw_waker = ((*header).vtable.clone_waker)(ptr);
300 Waker::from_raw(raw_waker)
301 }
302 }
Stjepan Glavina1479e862019-08-12 20:18:51 +0200303}
304
305impl<T> Drop for Task<T> {
306 fn drop(&mut self) {
307 let ptr = self.raw_task.as_ptr();
308 let header = ptr as *const Header;
309
310 unsafe {
311 // Cancel the task.
312 (*header).cancel();
313
314 // Drop the future.
315 ((*header).vtable.drop_future)(ptr);
316
317 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200318 ((*header).vtable.drop_task)(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200319 }
320 }
321}
322
323impl<T: fmt::Debug> fmt::Debug for Task<T> {
324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325 let ptr = self.raw_task.as_ptr();
326 let header = ptr as *const Header;
327
328 f.debug_struct("Task")
329 .field("header", unsafe { &(*header) })
330 .field("tag", self.tag())
331 .finish()
332 }
333}