blob: 83cdf7988321975afc8f0db3912421c40013c000 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001use std::fmt;
2use std::future::Future;
3use std::marker::PhantomData;
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +01004use std::mem::{self, ManuallyDrop};
5use std::pin::Pin;
Stjepan Glavina1479e862019-08-12 20:18:51 +02006use std::ptr::NonNull;
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +01007use std::task::{Context, Poll};
8use std::thread::{self, ThreadId};
Stjepan Glavina1479e862019-08-12 20:18:51 +02009
10use crate::header::Header;
11use crate::raw::RawTask;
12use crate::JoinHandle;
13
14/// Creates a new task.
15///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020016/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
Stjepan Glavina1479e862019-08-12 20:18:51 +020017/// awaits its result.
18///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020019/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
20/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +020021///
Stjepan Glavina951d9712019-11-25 18:47:16 +010022/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
23/// push the task into some kind of queue so that it can be processed later.
24///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010025/// If you need to spawn a future that does not implement [`Send`], consider using the
26/// [`spawn_local`] function instead.
27///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +020028/// [`Task`]: struct.Task.html
29/// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010030/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
31/// [`spawn_local`]: fn.spawn_local.html
Stjepan Glavina1479e862019-08-12 20:18:51 +020032///
33/// # Examples
34///
35/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020036/// use crossbeam::channel;
37///
38/// // The future inside the task.
39/// let future = async {
40/// println!("Hello, world!");
41/// };
42///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +020043/// // If the task gets woken up, it will be sent into this channel.
Stjepan Glavina1479e862019-08-12 20:18:51 +020044/// let (s, r) = channel::unbounded();
45/// let schedule = move |task| s.send(task).unwrap();
46///
47/// // Create a task with the future and the schedule function.
48/// let (task, handle) = async_task::spawn(future, schedule, ());
49/// ```
Stjepan Glavina1479e862019-08-12 20:18:51 +020050pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
51where
52 F: Future<Output = R> + Send + 'static,
53 R: Send + 'static,
54 S: Fn(Task<T>) + Send + Sync + 'static,
55 T: Send + Sync + 'static,
56{
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010057 let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
58 let task = Task {
59 raw_task,
60 _marker: PhantomData,
61 };
62 let handle = JoinHandle {
63 raw_task,
64 _marker: PhantomData,
65 };
66 (task, handle)
67}
68
69/// Creates a new local task.
70///
71/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
72/// awaits its result.
73///
74/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
75/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
76///
Stjepan Glavina951d9712019-11-25 18:47:16 +010077/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
78/// push the task into some kind of queue so that it can be processed later.
79///
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010080/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
81/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
82///
83/// [`Task`]: struct.Task.html
84/// [`JoinHandle`]: struct.JoinHandle.html
85/// [`spawn`]: fn.spawn.html
86/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
87///
88/// # Examples
89///
90/// ```
91/// use crossbeam::channel;
92///
93/// // The future inside the task.
94/// let future = async {
95/// println!("Hello, world!");
96/// };
97///
98/// // If the task gets woken up, it will be sent into this channel.
99/// let (s, r) = channel::unbounded();
100/// let schedule = move |task| s.send(task).unwrap();
101///
102/// // Create a task with the future and the schedule function.
103/// let (task, handle) = async_task::spawn_local(future, schedule, ());
104/// ```
105pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
106where
107 F: Future<Output = R> + 'static,
108 R: 'static,
109 S: Fn(Task<T>) + Send + Sync + 'static,
110 T: Send + Sync + 'static,
111{
112 thread_local! {
113 static ID: ThreadId = thread::current().id();
114 }
115
116 struct Checked<F> {
117 id: ThreadId,
118 inner: ManuallyDrop<F>,
119 }
120
121 impl<F> Drop for Checked<F> {
122 fn drop(&mut self) {
123 if ID.with(|id| *id) != self.id {
124 panic!("local task dropped by a thread that didn't spawn it");
125 }
126 unsafe {
127 ManuallyDrop::drop(&mut self.inner);
128 }
129 }
130 }
131
132 impl<F: Future> Future for Checked<F> {
133 type Output = F::Output;
134
135 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 if ID.with(|id| *id) != self.id {
137 panic!("local task polled by a thread that didn't spawn it");
138 }
139 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
140 }
141 }
142
143 let future = Checked {
144 id: ID.with(|id| *id),
145 inner: ManuallyDrop::new(future),
146 };
147
148 let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200149 let task = Task {
150 raw_task,
151 _marker: PhantomData,
152 };
153 let handle = JoinHandle {
154 raw_task,
155 _marker: PhantomData,
156 };
157 (task, handle)
158}
159
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200160/// A task reference that runs its future.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200161///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200162/// At any moment in time, there is at most one [`Task`] reference associated with a particular
163/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
164/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
165/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
Stjepan Glavina1479e862019-08-12 20:18:51 +0200166///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200167/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200168/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
169/// runnable tasks.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200170///
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200171/// If the [`Task`] reference is dropped without getting run, the task is automatically cancelled.
172/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
173/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
174/// to run the task won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200175///
176/// [`run()`]: struct.Task.html#method.run
Stjepan Glavina1479e862019-08-12 20:18:51 +0200177/// [`JoinHandle`]: struct.JoinHandle.html
178/// [`Task`]: struct.Task.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200179/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200180pub struct Task<T> {
181 /// A pointer to the heap-allocated task.
182 pub(crate) raw_task: NonNull<()>,
183
184 /// A marker capturing the generic type `T`.
185 pub(crate) _marker: PhantomData<T>,
186}
187
188unsafe impl<T> Send for Task<T> {}
189unsafe impl<T> Sync for Task<T> {}
190
191impl<T> Task<T> {
192 /// Schedules the task.
193 ///
194 /// This is a convenience method that simply reschedules the task by passing it to its schedule
195 /// function.
196 ///
197 /// If the task is cancelled, this method won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200198 pub fn schedule(self) {
199 let ptr = self.raw_task.as_ptr();
200 let header = ptr as *const Header;
201 mem::forget(self);
202
203 unsafe {
204 ((*header).vtable.schedule)(ptr);
205 }
206 }
207
208 /// Runs the task.
209 ///
210 /// This method polls the task's future. If the future completes, its result will become
211 /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200212 /// be woken up in order to be rescheduled and run again.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200213 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200214 /// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do
215 /// anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200216 ///
217 /// It is possible that polling the future panics, in which case the panic will be propagated
218 /// into the caller. It is advised that invocations of this method are wrapped inside
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200219 /// [`catch_unwind`]. If a panic occurs, the task is automatically cancelled.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200220 ///
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200221 /// [`JoinHandle`]: struct.JoinHandle.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200222 /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200223 pub fn run(self) {
224 let ptr = self.raw_task.as_ptr();
225 let header = ptr as *const Header;
226 mem::forget(self);
227
228 unsafe {
229 ((*header).vtable.run)(ptr);
230 }
231 }
232
233 /// Cancels the task.
234 ///
235 /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
Stjepan Glavina7a8962b2019-08-16 11:25:25 +0200236 /// to run it won't do anything.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200237 ///
238 /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
Stjepan Glavina1479e862019-08-12 20:18:51 +0200239 pub fn cancel(&self) {
240 let ptr = self.raw_task.as_ptr();
241 let header = ptr as *const Header;
242
243 unsafe {
244 (*header).cancel();
245 }
246 }
247
248 /// Returns a reference to the tag stored inside the task.
Stjepan Glavina1479e862019-08-12 20:18:51 +0200249 pub fn tag(&self) -> &T {
250 let offset = Header::offset_tag::<T>();
251 let ptr = self.raw_task.as_ptr();
252
253 unsafe {
254 let raw = (ptr as *mut u8).add(offset) as *const T;
255 &*raw
256 }
257 }
258}
259
260impl<T> Drop for Task<T> {
261 fn drop(&mut self) {
262 let ptr = self.raw_task.as_ptr();
263 let header = ptr as *const Header;
264
265 unsafe {
266 // Cancel the task.
267 (*header).cancel();
268
269 // Drop the future.
270 ((*header).vtable.drop_future)(ptr);
271
272 // Drop the task reference.
Stjepan Glavina5c398cf2019-08-20 15:29:43 +0200273 ((*header).vtable.drop_task)(ptr);
Stjepan Glavina1479e862019-08-12 20:18:51 +0200274 }
275 }
276}
277
278impl<T: fmt::Debug> fmt::Debug for Task<T> {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 let ptr = self.raw_task.as_ptr();
281 let header = ptr as *const Header;
282
283 f.debug_struct("Task")
284 .field("header", unsafe { &(*header) })
285 .field("tag", self.tag())
286 .finish()
287 }
288}