blob: 66b7aeced7ef48e4711bad08570c9ca0b11d7b7f [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001//! An executor that assigns an ID to every spawned task.
2
Stjepan Glavina1479e862019-08-12 20:18:51 +02003use std::cell::Cell;
4use std::future::Future;
5use std::panic::catch_unwind;
6use std::thread;
7
8use crossbeam::atomic::AtomicCell;
9use crossbeam::channel::{unbounded, Sender};
10use futures::executor;
11use lazy_static::lazy_static;
12
13#[derive(Clone, Copy, Debug)]
14struct TaskId(usize);
15
16thread_local! {
17 /// The ID of the current task.
18 static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
19}
20
21/// Returns the ID of the currently executing task.
22///
23/// Returns `None` if called outside the runtime.
24fn task_id() -> Option<TaskId> {
25 TASK_ID.with(|id| id.get())
26}
27
28/// Spawns a future on the executor.
29fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId>
30where
31 F: Future<Output = R> + Send + 'static,
32 R: Send + 'static,
33{
34 lazy_static! {
35 // A channel that holds scheduled tasks.
36 static ref QUEUE: Sender<async_task::Task<TaskId>> = {
37 let (sender, receiver) = unbounded::<async_task::Task<TaskId>>();
38
39 // Start the executor thread.
40 thread::spawn(|| {
41 TASK_ID.with(|id| {
42 for task in receiver {
43 // Store the task ID into the thread-local before running.
44 id.set(Some(*task.tag()));
45
46 // Ignore panics for simplicity.
47 let _ignore_panic = catch_unwind(|| task.run());
48 }
49 })
50 });
51
52 sender
53 };
54
55 // A counter that assigns IDs to spawned tasks.
56 static ref COUNTER: AtomicCell<usize> = AtomicCell::new(0);
57 }
58
59 // Reserve an ID for the new task.
60 let id = TaskId(COUNTER.fetch_add(1));
61
62 // Create a task that is scheduled by sending itself into the channel.
63 let schedule = |task| QUEUE.send(task).unwrap();
64 let (task, handle) = async_task::spawn(future, schedule, id);
65
66 // Schedule the task by sending it into the channel.
67 task.schedule();
68
69 handle
70}
71
72fn main() {
73 let mut handles = vec![];
74
75 // Spawn a bunch of tasks.
76 for _ in 0..10 {
77 handles.push(spawn(async move {
78 println!("Hello from task with {:?}", task_id());
79 }));
80 }
81
82 // Wait for the tasks to finish.
83 for handle in handles {
84 executor::block_on(handle);
85 }
86}