blob: 2a7bcf70424ca0ae3d12a5fced0b0b9c47135f15 [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001//! An executor that assigns an ID to every spawned task.
2
Stjepan Glavina1479e862019-08-12 20:18:51 +02003use std::cell::Cell;
4use std::future::Future;
5use std::panic::catch_unwind;
6use std::thread;
7
8use crossbeam::atomic::AtomicCell;
9use crossbeam::channel::{unbounded, Sender};
10use futures::executor;
11use lazy_static::lazy_static;
12
13#[derive(Clone, Copy, Debug)]
14struct TaskId(usize);
15
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010016type Task = async_task::Task<TaskId>;
17type JoinHandle<T> = async_task::JoinHandle<T, TaskId>;
18
Stjepan Glavina1479e862019-08-12 20:18:51 +020019thread_local! {
20 /// The ID of the current task.
21 static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
22}
23
24/// Returns the ID of the currently executing task.
25///
26/// Returns `None` if called outside the runtime.
27fn task_id() -> Option<TaskId> {
28 TASK_ID.with(|id| id.get())
29}
30
31/// Spawns a future on the executor.
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010032fn spawn<F, R>(future: F) -> JoinHandle<R>
Stjepan Glavina1479e862019-08-12 20:18:51 +020033where
34 F: Future<Output = R> + Send + 'static,
35 R: Send + 'static,
36{
37 lazy_static! {
38 // A channel that holds scheduled tasks.
Stjepan Glavinafcfa4ab2019-11-25 18:39:17 +010039 static ref QUEUE: Sender<Task> = {
40 let (sender, receiver) = unbounded::<Task>();
Stjepan Glavina1479e862019-08-12 20:18:51 +020041
42 // Start the executor thread.
43 thread::spawn(|| {
44 TASK_ID.with(|id| {
45 for task in receiver {
46 // Store the task ID into the thread-local before running.
47 id.set(Some(*task.tag()));
48
49 // Ignore panics for simplicity.
50 let _ignore_panic = catch_unwind(|| task.run());
51 }
52 })
53 });
54
55 sender
56 };
57
58 // A counter that assigns IDs to spawned tasks.
59 static ref COUNTER: AtomicCell<usize> = AtomicCell::new(0);
60 }
61
62 // Reserve an ID for the new task.
63 let id = TaskId(COUNTER.fetch_add(1));
64
65 // Create a task that is scheduled by sending itself into the channel.
66 let schedule = |task| QUEUE.send(task).unwrap();
67 let (task, handle) = async_task::spawn(future, schedule, id);
68
69 // Schedule the task by sending it into the channel.
70 task.schedule();
71
72 handle
73}
74
75fn main() {
76 let mut handles = vec![];
77
78 // Spawn a bunch of tasks.
79 for _ in 0..10 {
80 handles.push(spawn(async move {
81 println!("Hello from task with {:?}", task_id());
82 }));
83 }
84
85 // Wait for the tasks to finish.
86 for handle in handles {
87 executor::block_on(handle);
88 }
89}