blob: b12caceedb1311398e6ada6ebe30cf39bdcf01c5 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001use std::fmt;
2use std::future::Future;
3use std::marker::PhantomData;
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +01004use std::mem::{self, ManuallyDrop};
5use std::pin::Pin;
Stjepan Glavina1479e862019-08-12 20:18:51 +02006use std::ptr::NonNull;
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +01007use std::task::{Context, Poll};
8use std::thread::{self, ThreadId};
Stjepan Glavina1479e862019-08-12 20:18:51 +02009
10use crate::header::Header;
11use crate::raw::RawTask;
12use crate::JoinHandle;
13
14/// Creates a new task.
15///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020016/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
Stjepan Glavina1479e862019-08-12 20:18:51 +020017/// awaits its result.
18///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020019/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
20/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020021///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010022/// If you need to spawn a future that does not implement [`Send`], consider using the
23/// [`spawn_local`] function instead.
24///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020025/// [`Task`]: struct.Task.html
26/// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010027/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
28/// [`spawn_local`]: fn.spawn_local.html
Stjepan Glavina1479e862019-08-12 20:18:51 +020029///
30/// # Examples
31///
32/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020033/// use crossbeam::channel;
34///
35/// // The future inside the task.
36/// let future = async {
37/// println!("Hello, world!");
38/// };
39///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020040/// // If the task gets woken up, it will be sent into this channel.
Stjepan Glavina1479e862019-08-12 20:18:51 +020041/// let (s, r) = channel::unbounded();
42/// let schedule = move |task| s.send(task).unwrap();
43///
44/// // Create a task with the future and the schedule function.
45/// let (task, handle) = async_task::spawn(future, schedule, ());
46/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020047pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
48where
49 F: Future<Output = R> + Send + 'static,
50 R: Send + 'static,
51 S: Fn(Task<T>) + Send + Sync + 'static,
52 T: Send + Sync + 'static,
53{
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010054 let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
55 let task = Task {
56 raw_task,
57 _marker: PhantomData,
58 };
59 let handle = JoinHandle {
60 raw_task,
61 _marker: PhantomData,
62 };
63 (task, handle)
64}
65
66/// Creates a new local task.
67///
68/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
69/// awaits its result.
70///
71/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
72/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
73///
74/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
75/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
76///
77/// [`Task`]: struct.Task.html
78/// [`JoinHandle`]: struct.JoinHandle.html
79/// [`spawn`]: fn.spawn.html
80/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
81///
82/// # Examples
83///
84/// ```
85/// use crossbeam::channel;
86///
87/// // The future inside the task.
88/// let future = async {
89/// println!("Hello, world!");
90/// };
91///
92/// // If the task gets woken up, it will be sent into this channel.
93/// let (s, r) = channel::unbounded();
94/// let schedule = move |task| s.send(task).unwrap();
95///
96/// // Create a task with the future and the schedule function.
97/// let (task, handle) = async_task::spawn_local(future, schedule, ());
98/// ```
99pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
100where
101 F: Future<Output = R> + 'static,
102 R: 'static,
103 S: Fn(Task<T>) + Send + Sync + 'static,
104 T: Send + Sync + 'static,
105{
106 thread_local! {
107 static ID: ThreadId = thread::current().id();
108 }
109
110 struct Checked<F> {
111 id: ThreadId,
112 inner: ManuallyDrop<F>,
113 }
114
115 impl<F> Drop for Checked<F> {
116 fn drop(&mut self) {
117 if ID.with(|id| *id) != self.id {
118 panic!("local task dropped by a thread that didn't spawn it");
119 }
120 unsafe {
121 ManuallyDrop::drop(&mut self.inner);
122 }
123 }
124 }
125
126 impl<F: Future> Future for Checked<F> {
127 type Output = F::Output;
128
129 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 if ID.with(|id| *id) != self.id {
131 panic!("local task polled by a thread that didn't spawn it");
132 }
133 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
134 }
135 }
136
137 let future = Checked {
138 id: ID.with(|id| *id),
139 inner: ManuallyDrop::new(future),
140 };
141
142 let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200143 let task = Task {
144 raw_task,
145 _marker: PhantomData,
146 };
147 let handle = JoinHandle {
148 raw_task,
149 _marker: PhantomData,
150 };
151 (task, handle)
152}
153
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200154/// A task reference that runs its future.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200155///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200156/// At any moment in time, there is at most one [`Task`] reference associated with a particular
157/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
158/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
159/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
Stjepan Glavina1479e862019-08-12 20:18:51 +0200160///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200161/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200162/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
163/// runnable tasks.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200164///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200165/// If the [`Task`] reference is dropped without getting run, the task is automatically cancelled.
166/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
167/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
168/// to run the task won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200169///
170/// [`run()`]: struct.Task.html#method.run
Stjepan Glavina1479e862019-08-12 20:18:51 +0200171/// [`JoinHandle`]: struct.JoinHandle.html
172/// [`Task`]: struct.Task.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200173/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200174pub struct Task<T> {
175 /// A pointer to the heap-allocated task.
176 pub(crate) raw_task: NonNull<()>,
177
178 /// A marker capturing the generic type `T`.
179 pub(crate) _marker: PhantomData<T>,
180}
181
182unsafe impl<T> Send for Task<T> {}
183unsafe impl<T> Sync for Task<T> {}
184
185impl<T> Task<T> {
186 /// Schedules the task.
187 ///
188 /// This is a convenience method that simply reschedules the task by passing it to its schedule
189 /// function.
190 ///
191 /// If the task is cancelled, this method won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200192 pub fn schedule(self) {
193 let ptr = self.raw_task.as_ptr();
194 let header = ptr as *const Header;
195 mem::forget(self);
196
197 unsafe {
198 ((*header).vtable.schedule)(ptr);
199 }
200 }
201
202 /// Runs the task.
203 ///
204 /// This method polls the task's future. If the future completes, its result will become
205 /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200206 /// be woken up in order to be rescheduled and run again.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200207 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200208 /// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do
209 /// anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200210 ///
211 /// It is possible that polling the future panics, in which case the panic will be propagated
212 /// into the caller. It is advised that invocations of this method are wrapped inside
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200213 /// [`catch_unwind`]. If a panic occurs, the task is automatically cancelled.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200214 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200215 /// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200216 /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200217 pub fn run(self) {
218 let ptr = self.raw_task.as_ptr();
219 let header = ptr as *const Header;
220 mem::forget(self);
221
222 unsafe {
223 ((*header).vtable.run)(ptr);
224 }
225 }
226
227 /// Cancels the task.
228 ///
229 /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200230 /// to run it won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200231 ///
232 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200233 pub fn cancel(&self) {
234 let ptr = self.raw_task.as_ptr();
235 let header = ptr as *const Header;
236
237 unsafe {
238 (*header).cancel();
239 }
240 }
241
242 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200243 pub fn tag(&self) -> &T {
244 let offset = Header::offset_tag::<T>();
245 let ptr = self.raw_task.as_ptr();
246
247 unsafe {
248 let raw = (ptr as *mut u8).add(offset) as *const T;
249 &*raw
250 }
251 }
252}
253
254impl<T> Drop for Task<T> {
255 fn drop(&mut self) {
256 let ptr = self.raw_task.as_ptr();
257 let header = ptr as *const Header;
258
259 unsafe {
260 // Cancel the task.
261 (*header).cancel();
262
263 // Drop the future.
264 ((*header).vtable.drop_future)(ptr);
265
266 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200267 ((*header).vtable.drop_task)(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200268 }
269 }
270}
271
272impl<T: fmt::Debug> fmt::Debug for Task<T> {
273 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274 let ptr = self.raw_task.as_ptr();
275 let header = ptr as *const Header;
276
277 f.debug_struct("Task")
278 .field("header", unsafe { &(*header) })
279 .field("tag", self.tag())
280 .finish()
281 }
282}