blob: b3832d07bfeab03ba8cbde4b529390b3e02fc81c [file] [log] [blame]
Stjepan Glavina1479e862019-08-12 20:18:51 +02001//! An executor that assigns an ID to every spawned task.
2
3#![feature(async_await)]
4
5use std::cell::Cell;
6use std::future::Future;
7use std::panic::catch_unwind;
8use std::thread;
9
10use crossbeam::atomic::AtomicCell;
11use crossbeam::channel::{unbounded, Sender};
12use futures::executor;
13use lazy_static::lazy_static;
14
15#[derive(Clone, Copy, Debug)]
16struct TaskId(usize);
17
18thread_local! {
19 /// The ID of the current task.
20 static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
21}
22
23/// Returns the ID of the currently executing task.
24///
25/// Returns `None` if called outside the runtime.
26fn task_id() -> Option<TaskId> {
27 TASK_ID.with(|id| id.get())
28}
29
30/// Spawns a future on the executor.
31fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId>
32where
33 F: Future<Output = R> + Send + 'static,
34 R: Send + 'static,
35{
36 lazy_static! {
37 // A channel that holds scheduled tasks.
38 static ref QUEUE: Sender<async_task::Task<TaskId>> = {
39 let (sender, receiver) = unbounded::<async_task::Task<TaskId>>();
40
41 // Start the executor thread.
42 thread::spawn(|| {
43 TASK_ID.with(|id| {
44 for task in receiver {
45 // Store the task ID into the thread-local before running.
46 id.set(Some(*task.tag()));
47
48 // Ignore panics for simplicity.
49 let _ignore_panic = catch_unwind(|| task.run());
50 }
51 })
52 });
53
54 sender
55 };
56
57 // A counter that assigns IDs to spawned tasks.
58 static ref COUNTER: AtomicCell<usize> = AtomicCell::new(0);
59 }
60
61 // Reserve an ID for the new task.
62 let id = TaskId(COUNTER.fetch_add(1));
63
64 // Create a task that is scheduled by sending itself into the channel.
65 let schedule = |task| QUEUE.send(task).unwrap();
66 let (task, handle) = async_task::spawn(future, schedule, id);
67
68 // Schedule the task by sending it into the channel.
69 task.schedule();
70
71 handle
72}
73
74fn main() {
75 let mut handles = vec![];
76
77 // Spawn a bunch of tasks.
78 for _ in 0..10 {
79 handles.push(spawn(async move {
80 println!("Hello from task with {:?}", task_id());
81 }));
82 }
83
84 // Wait for the tasks to finish.
85 for handle in handles {
86 executor::block_on(handle);
87 }
88}