blob: 8bfc1643e8b5c972615ed1692c6aca1fe9b8203f [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001use std::fmt;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::mem;
5use std::ptr::NonNull;
6
7use crate::header::Header;
8use crate::raw::RawTask;
9use crate::JoinHandle;
10
11/// Creates a new task.
12///
13/// This constructor returns a `Task` reference that runs the future and a [`JoinHandle`] that
14/// awaits its result.
15///
16/// The `tag` is stored inside the allocated task.
17///
18/// When run, the task polls `future`. When woken, it gets scheduled for running by the
19/// `schedule` function.
20///
21/// # Examples
22///
23/// ```
24/// # #![feature(async_await)]
25/// use crossbeam::channel;
26///
27/// // The future inside the task.
28/// let future = async {
29/// println!("Hello, world!");
30/// };
31///
32/// // If the task gets woken, it will be sent into this channel.
33/// let (s, r) = channel::unbounded();
34/// let schedule = move |task| s.send(task).unwrap();
35///
36/// // Create a task with the future and the schedule function.
37/// let (task, handle) = async_task::spawn(future, schedule, ());
38/// ```
39///
40/// [`JoinHandle`]: struct.JoinHandle.html
41pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
42where
43 F: Future<Output = R> + Send + 'static,
44 R: Send + 'static,
45 S: Fn(Task<T>) + Send + Sync + 'static,
46 T: Send + Sync + 'static,
47{
48 let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule);
49 let task = Task {
50 raw_task,
51 _marker: PhantomData,
52 };
53 let handle = JoinHandle {
54 raw_task,
55 _marker: PhantomData,
56 };
57 (task, handle)
58}
59
60/// A task that runs a future.
61///
62/// # Construction
63///
64/// A task is a heap-allocated structure containing:
65///
66/// * A reference counter.
67/// * The state of the task.
68/// * Arbitrary piece of data called a *tag*.
69/// * A function that schedules the task when woken.
70/// * A future or its result if polling has completed.
71///
72/// Constructor [`Task::create()`] returns a [`Task`] and a [`JoinHandle`]. Those two references
73/// are like two sides of the task: one runs the future and the other awaits its result.
74///
75/// # Behavior
76///
77/// The [`Task`] reference "owns" the task itself and is used to [run] it. Running consumes the
78/// [`Task`] reference and polls its internal future. If the future is still pending after being
79/// polled, the [`Task`] reference will be recreated when woken by a [`Waker`]. If the future
80/// completes, its result becomes available to the [`JoinHandle`].
81///
82/// The [`JoinHandle`] is a [`Future`] that awaits the result of the task.
83///
84/// When the task is woken, its [`Task`] reference is recreated and passed to the schedule function
85/// provided during construction. In most executors, scheduling simply pushes the [`Task`] into a
86/// queue of runnable tasks.
87///
88/// If the [`Task`] reference is dropped without being run, the task is cancelled.
89///
90/// Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task
91/// won't be scheduled again even if a [`Waker`] wakes it or the [`JoinHandle`] is polled. An
92/// attempt to run a cancelled task won't do anything. And if the cancelled task has already
93/// completed, awaiting its result through [`JoinHandle`] will return `None`.
94///
95/// If polling the task's future panics, it gets cancelled automatically.
96///
97/// # Task states
98///
99/// A task can be in the following states:
100///
101/// * Sleeping: The [`Task`] reference doesn't exist and is waiting to be scheduled by a [`Waker`].
102/// * Scheduled: The [`Task`] reference exists and is waiting to be [run].
103/// * Completed: The [`Task`] reference doesn't exist anymore and can't be rescheduled, but its
104/// result is available to the [`JoinHandle`].
105/// * Cancelled: The [`Task`] reference may or may not exist, but running it does nothing and
106/// awaiting the [`JoinHandle`] returns `None`.
107///
108/// When constructed, the task is initially in the scheduled state.
109///
110/// # Destruction
111///
112/// The future inside the task gets dropped in the following cases:
113///
114/// * When [`Task`] is dropped.
115/// * When [`Task`] is run to completion.
116///
117/// If the future hasn't been dropped and the last [`Waker`] or [`JoinHandle`] is dropped, or if
118/// a [`JoinHandle`] cancels the task, then the task will be scheduled one last time so that its
119/// future gets dropped by the executor. In other words, the task's future can be dropped only by
120/// [`Task`].
121///
122/// When the task completes, the result of its future is stored inside the allocation. This result
123/// is taken out when the [`JoinHandle`] awaits it. When the task is cancelled or the
124/// [`JoinHandle`] is dropped without being awaited, the result gets dropped too.
125///
126/// The task gets deallocated when all references to it are dropped, which includes the [`Task`],
127/// the [`JoinHandle`], and any associated [`Waker`]s.
128///
129/// The tag inside the task and the schedule function get dropped at the time of deallocation.
130///
131/// # Panics
132///
133/// If polling the inner future inside [`run()`] panics, the panic will be propagated into
134/// the caller. Likewise, a panic inside the task result's destructor will be propagated. All other
135/// panics result in the process being aborted.
136///
137/// More precisely, the process is aborted if a panic occurs:
138///
139/// * Inside the schedule function.
140/// * While dropping the tag.
141/// * While dropping the future.
142/// * While dropping the schedule function.
143/// * While waking the task awaiting the [`JoinHandle`].
144///
145/// [`run()`]: struct.Task.html#method.run
146/// [run]: struct.Task.html#method.run
147/// [`JoinHandle`]: struct.JoinHandle.html
148/// [`Task`]: struct.Task.html
149/// [`Task::create()`]: struct.Task.html#method.create
150/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html
151/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
152///
153/// # Examples
154///
155/// ```
156/// # #![feature(async_await)]
157/// use async_task::Task;
158/// use crossbeam::channel;
159/// use futures::executor;
160///
161/// // The future inside the task.
162/// let future = async {
163/// println!("Hello, world!");
164/// };
165///
166/// // If the task gets woken, it will be sent into this channel.
167/// let (s, r) = channel::unbounded();
168/// let schedule = move |task| s.send(task).unwrap();
169///
170/// // Create a task with the future and the schedule function.
171/// let (task, handle) = async_task::spawn(future, schedule, ());
172///
173/// // Run the task. In this example, it will complete after a single run.
174/// task.run();
175/// assert!(r.is_empty());
176///
177/// // Await its result.
178/// executor::block_on(handle);
179/// ```
180pub struct Task<T> {
181 /// A pointer to the heap-allocated task.
182 pub(crate) raw_task: NonNull<()>,
183
184 /// A marker capturing the generic type `T`.
185 pub(crate) _marker: PhantomData<T>,
186}
187
188unsafe impl<T> Send for Task<T> {}
189unsafe impl<T> Sync for Task<T> {}
190
191impl<T> Task<T> {
192 /// Schedules the task.
193 ///
194 /// This is a convenience method that simply reschedules the task by passing it to its schedule
195 /// function.
196 ///
197 /// If the task is cancelled, this method won't do anything.
198 ///
199 /// # Examples
200 ///
201 /// ```
202 /// # #![feature(async_await)]
203 /// use crossbeam::channel;
204 ///
205 /// // The future inside the task.
206 /// let future = async {
207 /// println!("Hello, world!");
208 /// };
209 ///
210 /// // If the task gets woken, it will be sent into this channel.
211 /// let (s, r) = channel::unbounded();
212 /// let schedule = move |task| s.send(task).unwrap();
213 ///
214 /// // Create a task with the future and the schedule function.
215 /// let (task, handle) = async_task::spawn(future, schedule, ());
216 ///
217 /// // Send the task into the channel.
218 /// task.schedule();
219 ///
220 /// // Retrieve the task back from the channel.
221 /// let task = r.recv().unwrap();
222 /// ```
223 pub fn schedule(self) {
224 let ptr = self.raw_task.as_ptr();
225 let header = ptr as *const Header;
226 mem::forget(self);
227
228 unsafe {
229 ((*header).vtable.schedule)(ptr);
230 }
231 }
232
233 /// Runs the task.
234 ///
235 /// This method polls the task's future. If the future completes, its result will become
236 /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
237 /// be woken in order to be rescheduled and then run again.
238 ///
239 /// If the task is cancelled, running it won't do anything.
240 ///
241 /// # Panics
242 ///
243 /// It is possible that polling the future panics, in which case the panic will be propagated
244 /// into the caller. It is advised that invocations of this method are wrapped inside
245 /// [`catch_unwind`].
246 ///
247 /// If a panic occurs, the task is automatically cancelled.
248 ///
249 /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
250 ///
251 /// # Examples
252 ///
253 /// ```
254 /// # #![feature(async_await)]
255 /// use crossbeam::channel;
256 /// use futures::executor;
257 ///
258 /// // The future inside the task.
259 /// let future = async { 1 + 2 };
260 ///
261 /// // If the task gets woken, it will be sent into this channel.
262 /// let (s, r) = channel::unbounded();
263 /// let schedule = move |task| s.send(task).unwrap();
264 ///
265 /// // Create a task with the future and the schedule function.
266 /// let (task, handle) = async_task::spawn(future, schedule, ());
267 ///
268 /// // Run the task. In this example, it will complete after a single run.
269 /// task.run();
270 /// assert!(r.is_empty());
271 ///
272 /// // Await the result of the task.
273 /// let result = executor::block_on(handle);
274 /// assert_eq!(result, Some(3));
275 /// ```
276 pub fn run(self) {
277 let ptr = self.raw_task.as_ptr();
278 let header = ptr as *const Header;
279 mem::forget(self);
280
281 unsafe {
282 ((*header).vtable.run)(ptr);
283 }
284 }
285
286 /// Cancels the task.
287 ///
288 /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
289 /// to run it won't do anything. And if it's completed, awaiting its result evaluates to
290 /// `None`.
291 ///
292 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// # #![feature(async_await)]
298 /// use crossbeam::channel;
299 /// use futures::executor;
300 ///
301 /// // The future inside the task.
302 /// let future = async { 1 + 2 };
303 ///
304 /// // If the task gets woken, it will be sent into this channel.
305 /// let (s, r) = channel::unbounded();
306 /// let schedule = move |task| s.send(task).unwrap();
307 ///
308 /// // Create a task with the future and the schedule function.
309 /// let (task, handle) = async_task::spawn(future, schedule, ());
310 ///
311 /// // Cancel the task.
312 /// task.cancel();
313 ///
314 /// // Running a cancelled task does nothing.
315 /// task.run();
316 ///
317 /// // Await the result of the task.
318 /// let result = executor::block_on(handle);
319 /// assert_eq!(result, None);
320 /// ```
321 pub fn cancel(&self) {
322 let ptr = self.raw_task.as_ptr();
323 let header = ptr as *const Header;
324
325 unsafe {
326 (*header).cancel();
327 }
328 }
329
330 /// Returns a reference to the tag stored inside the task.
331 ///
332 /// # Examples
333 ///
334 /// ```
335 /// # #![feature(async_await)]
336 /// use crossbeam::channel;
337 ///
338 /// // The future inside the task.
339 /// let future = async { 1 + 2 };
340 ///
341 /// // If the task gets woken, it will be sent into this channel.
342 /// let (s, r) = channel::unbounded();
343 /// let schedule = move |task| s.send(task).unwrap();
344 ///
345 /// // Create a task with the future and the schedule function.
346 /// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
347 ///
348 /// // Access the tag.
349 /// assert_eq!(*task.tag(), "a simple task");
350 /// ```
351 pub fn tag(&self) -> &T {
352 let offset = Header::offset_tag::<T>();
353 let ptr = self.raw_task.as_ptr();
354
355 unsafe {
356 let raw = (ptr as *mut u8).add(offset) as *const T;
357 &*raw
358 }
359 }
360}
361
362impl<T> Drop for Task<T> {
363 fn drop(&mut self) {
364 let ptr = self.raw_task.as_ptr();
365 let header = ptr as *const Header;
366
367 unsafe {
368 // Cancel the task.
369 (*header).cancel();
370
371 // Drop the future.
372 ((*header).vtable.drop_future)(ptr);
373
374 // Drop the task reference.
375 ((*header).vtable.decrement)(ptr);
376 }
377 }
378}
379
380impl<T: fmt::Debug> fmt::Debug for Task<T> {
381 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382 let ptr = self.raw_task.as_ptr();
383 let header = ptr as *const Header;
384
385 f.debug_struct("Task")
386 .field("header", unsafe { &(*header) })
387 .field("tag", self.tag())
388 .finish()
389 }
390}