Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 1 | //! An executor that assigns an ID to every spawned task. |
| 2 | |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 3 | use std::cell::Cell; |
| 4 | use std::future::Future; |
| 5 | use std::panic::catch_unwind; |
| 6 | use std::thread; |
| 7 | |
| 8 | use crossbeam::atomic::AtomicCell; |
| 9 | use crossbeam::channel::{unbounded, Sender}; |
| 10 | use futures::executor; |
| 11 | use lazy_static::lazy_static; |
| 12 | |
| 13 | #[derive(Clone, Copy, Debug)] |
| 14 | struct TaskId(usize); |
| 15 | |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 16 | type Task = async_task::Task<TaskId>; |
| 17 | type JoinHandle<T> = async_task::JoinHandle<T, TaskId>; |
| 18 | |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 19 | thread_local! { |
| 20 | /// The ID of the current task. |
| 21 | static TASK_ID: Cell<Option<TaskId>> = Cell::new(None); |
| 22 | } |
| 23 | |
| 24 | /// Returns the ID of the currently executing task. |
| 25 | /// |
| 26 | /// Returns `None` if called outside the runtime. |
| 27 | fn task_id() -> Option<TaskId> { |
| 28 | TASK_ID.with(|id| id.get()) |
| 29 | } |
| 30 | |
| 31 | /// Spawns a future on the executor. |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 32 | fn spawn<F, R>(future: F) -> JoinHandle<R> |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 33 | where |
| 34 | F: Future<Output = R> + Send + 'static, |
| 35 | R: Send + 'static, |
| 36 | { |
| 37 | lazy_static! { |
| 38 | // A channel that holds scheduled tasks. |
Stjepan Glavina | fcfa4ab | 2019-11-25 18:39:17 +0100 | [diff] [blame] | 39 | static ref QUEUE: Sender<Task> = { |
| 40 | let (sender, receiver) = unbounded::<Task>(); |
Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame] | 41 | |
| 42 | // Start the executor thread. |
| 43 | thread::spawn(|| { |
| 44 | TASK_ID.with(|id| { |
| 45 | for task in receiver { |
| 46 | // Store the task ID into the thread-local before running. |
| 47 | id.set(Some(*task.tag())); |
| 48 | |
| 49 | // Ignore panics for simplicity. |
| 50 | let _ignore_panic = catch_unwind(|| task.run()); |
| 51 | } |
| 52 | }) |
| 53 | }); |
| 54 | |
| 55 | sender |
| 56 | }; |
| 57 | |
| 58 | // A counter that assigns IDs to spawned tasks. |
| 59 | static ref COUNTER: AtomicCell<usize> = AtomicCell::new(0); |
| 60 | } |
| 61 | |
| 62 | // Reserve an ID for the new task. |
| 63 | let id = TaskId(COUNTER.fetch_add(1)); |
| 64 | |
| 65 | // Create a task that is scheduled by sending itself into the channel. |
| 66 | let schedule = |task| QUEUE.send(task).unwrap(); |
| 67 | let (task, handle) = async_task::spawn(future, schedule, id); |
| 68 | |
| 69 | // Schedule the task by sending it into the channel. |
| 70 | task.schedule(); |
| 71 | |
| 72 | handle |
| 73 | } |
| 74 | |
| 75 | fn main() { |
| 76 | let mut handles = vec![]; |
| 77 | |
| 78 | // Spawn a bunch of tasks. |
| 79 | for _ in 0..10 { |
| 80 | handles.push(spawn(async move { |
| 81 | println!("Hello from task with {:?}", task_id()); |
| 82 | })); |
| 83 | } |
| 84 | |
| 85 | // Wait for the tasks to finish. |
| 86 | for handle in handles { |
| 87 | executor::block_on(handle); |
| 88 | } |
| 89 | } |