Add waker_fn (#18)
* Add waker_fn
* Add a waker_fn test
* Double sleep times
* More benches
* Prohibit recursive block_on calls
* Reformat code
diff --git a/Cargo.toml b/Cargo.toml
index 2b4a4a9..8f7aa15 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,3 +23,4 @@
crossbeam-utils = "0.7.0"
futures = "0.3.1"
lazy_static = "1.4.0"
+pin-utils = "0.1.0-alpha.4"
diff --git a/benches/bench.rs b/benches/spawn.rs
similarity index 100%
rename from benches/bench.rs
rename to benches/spawn.rs
diff --git a/benches/waker_fn.rs b/benches/waker_fn.rs
new file mode 100644
index 0000000..90d99f9
--- /dev/null
+++ b/benches/waker_fn.rs
@@ -0,0 +1,89 @@
+#![feature(test)]
+
+extern crate test;
+
+use std::cell::RefCell;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll, Waker};
+
+use crossbeam::sync::Parker;
+use test::Bencher;
+
+/// Runs a future to completion on the current thread.
+fn block_on<F: Future>(future: F) -> F::Output {
+ // Pin the future on the stack.
+ pin_utils::pin_mut!(future);
+
+ thread_local! {
+ // Parker and waker associated with the current thread.
+ static CACHE: RefCell<(Parker, Waker)> = {
+ let parker = Parker::new();
+ let unparker = parker.unparker().clone();
+ let waker = async_task::waker_fn(move || unparker.unpark());
+ RefCell::new((parker, waker))
+ };
+ }
+
+ CACHE.with(|cache| {
+ // Panic if `block_on()` is called recursively.
+ let (parker, waker) = &mut *cache.try_borrow_mut().ok().expect("recursive `block_on`");
+
+ // Create the task context.
+ let cx = &mut Context::from_waker(&waker);
+
+ // Keep polling the future until completion.
+ loop {
+ match future.as_mut().poll(cx) {
+ Poll::Ready(output) => return output,
+ Poll::Pending => parker.park(),
+ }
+ }
+ })
+}
+
+#[bench]
+fn custom_block_on_0_yields(b: &mut Bencher) {
+ b.iter(|| block_on(Yields(0)));
+}
+
+#[bench]
+fn custom_block_on_10_yields(b: &mut Bencher) {
+ b.iter(|| block_on(Yields(10)));
+}
+
+#[bench]
+fn custom_block_on_50_yields(b: &mut Bencher) {
+ b.iter(|| block_on(Yields(50)));
+}
+
+#[bench]
+fn futures_block_on_0_yields(b: &mut Bencher) {
+ b.iter(|| futures::executor::block_on(Yields(0)));
+}
+
+#[bench]
+fn futures_block_on_10_yields(b: &mut Bencher) {
+ b.iter(|| futures::executor::block_on(Yields(10)));
+}
+
+#[bench]
+fn futures_block_on_50_yields(b: &mut Bencher) {
+ b.iter(|| futures::executor::block_on(Yields(50)));
+}
+
+struct Yields(u32);
+
+impl Future for Yields {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.0 == 0 {
+ Poll::Ready(())
+ } else {
+ self.0 -= 1;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+}
diff --git a/examples/block.rs b/examples/block.rs
new file mode 100644
index 0000000..e4124b2
--- /dev/null
+++ b/examples/block.rs
@@ -0,0 +1,60 @@
+//! A simple implementation of `block_on`.
+
+use std::cell::RefCell;
+use std::future::Future;
+use std::task::{Context, Poll, Waker};
+use std::thread;
+use std::time::Duration;
+
+use crossbeam::sync::Parker;
+use futures::channel::oneshot;
+
+/// Runs a future to completion on the current thread.
+fn block_on<F: Future>(future: F) -> F::Output {
+ // Pin the future on the stack.
+ pin_utils::pin_mut!(future);
+
+ thread_local! {
+ // Parker and waker associated with the current thread.
+ static CACHE: RefCell<(Parker, Waker)> = {
+ let parker = Parker::new();
+ let unparker = parker.unparker().clone();
+ let waker = async_task::waker_fn(move || unparker.unpark());
+ RefCell::new((parker, waker))
+ };
+ }
+
+ CACHE.with(|cache| {
+ // Panic if `block_on()` is called recursively.
+ let (parker, waker) = &mut *cache.try_borrow_mut().ok().expect("recursive block_on()");
+
+ // Create the task context.
+ let cx = &mut Context::from_waker(&waker);
+
+ // Keep polling the future until completion.
+ loop {
+ match future.as_mut().poll(cx) {
+ Poll::Ready(output) => return output,
+ Poll::Pending => parker.park(),
+ }
+ }
+ })
+}
+
+fn main() {
+ let (s, r) = oneshot::channel();
+
+ // Spawn a thread that will send a message through the channel.
+ thread::spawn(move || {
+ thread::sleep(Duration::from_secs(1));
+ s.send("Hello, world!").unwrap();
+ });
+
+ // Block until the message is received.
+ let msg = block_on(async {
+ println!("Awaiting...");
+ r.await.unwrap()
+ });
+
+ println!("{}", msg);
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5fe858a..1bd7ca8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,13 +1,11 @@
//! Task abstraction for building executors.
//!
+//! # Spawning
+//!
//! To spawn a future onto an executor, we first need to allocate it on the heap and keep some
//! state alongside it. The state indicates whether the future is ready for polling, waiting to be
//! woken up, or completed. Such a future is called a *task*.
//!
-//! This crate helps with task allocation and polling its future to completion.
-//!
-//! # Spawning
-//!
//! All executors have some kind of queue that holds runnable tasks:
//!
//! ```
@@ -89,13 +87,31 @@
//! Task construction incurs a single allocation that holds its state, the schedule function, and
//! the future or the result of the future if completed.
//!
-//! The layout of a task is equivalent to 4 words followed by the schedule function, and then by a
-//! union of the future and its output.
+//! The layout of a task is equivalent to 4 `usize`s followed by the schedule function, and then by
+//! a union of the future and its output.
+//!
+//! # Waking
+//!
+//! The handy [`waker_fn`] constructor converts any function into a [`Waker`]. Every time it is
+//! woken, the function gets called:
+//!
+//! ```
+//! let waker = async_task::waker_fn(|| println!("Wake!"));
+//!
+//! // Prints "Wake!" twice.
+//! waker.wake_by_ref();
+//! waker.wake_by_ref();
+//! ```
+//!
+//! This is useful for implementing single-future executors like [`block_on`].
//!
//! [`spawn`]: fn.spawn.html
//! [`spawn_local`]: fn.spawn_local.html
+//! [`waker_fn`]: fn.waker_fn.html
//! [`Task`]: struct.Task.html
//! [`JoinHandle`]: struct.JoinHandle.html
+//! [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
+//! [`block_on`]: https://github.com/async-rs/async-task/blob/master/examples/block.rs
#![no_std]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
@@ -110,6 +126,8 @@
mod state;
mod task;
mod utils;
+mod waker_fn;
pub use crate::join_handle::JoinHandle;
pub use crate::task::{spawn, spawn_local, Task};
+pub use crate::waker_fn::waker_fn;
diff --git a/src/raw.rs b/src/raw.rs
index 1440e7e..ed3ee97 100644
--- a/src/raw.rs
+++ b/src/raw.rs
@@ -15,9 +15,6 @@
/// The vtable for a task.
pub(crate) struct TaskVTable {
- /// The raw waker vtable.
- pub(crate) raw_waker_vtable: RawWakerVTable,
-
/// Schedules the task.
pub(crate) schedule: unsafe fn(*const ()),
@@ -101,6 +98,13 @@
F: Future<Output = R> + 'static,
S: Fn(Task<T>) + Send + Sync + 'static,
{
+ const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ Self::clone_waker,
+ Self::wake,
+ Self::wake_by_ref,
+ Self::drop_waker,
+ );
+
/// Allocates a task with the given `future` and `schedule` function.
///
/// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
@@ -122,12 +126,6 @@
state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
awaiter: UnsafeCell::new(None),
vtable: &TaskVTable {
- raw_waker_vtable: RawWakerVTable::new(
- Self::clone_waker,
- Self::wake,
- Self::wake_by_ref,
- Self::drop_waker,
- ),
schedule: Self::schedule,
drop_future: Self::drop_future,
get_output: Self::get_output,
@@ -335,7 +333,6 @@
/// Clones a waker.
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let raw = Self::from_ptr(ptr);
- let raw_waker_vtable = &(*raw.header).vtable.raw_waker_vtable;
// Increment the reference count. With any kind of reference-counted data structure,
// relaxed ordering is appropriate when incrementing the counter.
@@ -346,7 +343,7 @@
abort();
}
- RawWaker::new(ptr, raw_waker_vtable)
+ RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
}
/// Drops a waker.
@@ -464,10 +461,7 @@
let raw = Self::from_ptr(ptr);
// Create a context from the raw task pointer and the vtable inside the its header.
- let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
- ptr,
- &(*raw.header).vtable.raw_waker_vtable,
- )));
+ let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
let cx = &mut Context::from_waker(&waker);
let mut state = (*raw.header).state.load(Ordering::Acquire);
diff --git a/src/waker_fn.rs b/src/waker_fn.rs
new file mode 100644
index 0000000..37105f1
--- /dev/null
+++ b/src/waker_fn.rs
@@ -0,0 +1,43 @@
+use alloc::sync::Arc;
+use core::mem::{self, ManuallyDrop};
+use core::task::{RawWaker, RawWakerVTable, Waker};
+
+/// Creates a waker from a wake function.
+///
+/// The function gets called every time the waker is woken.
+pub fn waker_fn<F: Fn() + Send + Sync + 'static>(f: F) -> Waker {
+ let raw = Arc::into_raw(Arc::new(f)) as *const ();
+ let vtable = &Helper::<F>::VTABLE;
+ unsafe { Waker::from_raw(RawWaker::new(raw, vtable)) }
+}
+
+struct Helper<F>(F);
+
+impl<F: Fn() + Send + Sync + 'static> Helper<F> {
+ const VTABLE: RawWakerVTable = RawWakerVTable::new(
+ Self::clone_waker,
+ Self::wake,
+ Self::wake_by_ref,
+ Self::drop_waker,
+ );
+
+ unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+ let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const F));
+ mem::forget(arc.clone());
+ RawWaker::new(ptr, &Self::VTABLE)
+ }
+
+ unsafe fn wake(ptr: *const ()) {
+ let arc = Arc::from_raw(ptr as *const F);
+ (arc)();
+ }
+
+ unsafe fn wake_by_ref(ptr: *const ()) {
+ let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const F));
+ (arc)();
+ }
+
+ unsafe fn drop_waker(ptr: *const ()) {
+ drop(Arc::from_raw(ptr as *const F));
+ }
+}
diff --git a/tests/join.rs b/tests/join.rs
index e572062..8e17b34 100644
--- a/tests/join.rs
+++ b/tests/join.rs
@@ -223,12 +223,12 @@
crossbeam::scope(|scope| {
scope.spawn(|_| {
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
task.cancel();
drop(task);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
@@ -241,7 +241,7 @@
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 0);
assert_eq!(DROP_S.load(), 1);
@@ -258,14 +258,14 @@
crossbeam::scope(|scope| {
scope.spawn(|_| {
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
});
@@ -276,7 +276,7 @@
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
})
@@ -291,14 +291,14 @@
crossbeam::scope(|scope| {
scope.spawn(|_| {
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
});
@@ -317,7 +317,7 @@
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
})
@@ -332,7 +332,7 @@
crossbeam::scope(|scope| {
scope.spawn(|_| {
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
task.run();
assert_eq!(POLL.load(), 0);
diff --git a/tests/panic.rs b/tests/panic.rs
index 7b7d1b5..80a8f1b 100644
--- a/tests/panic.rs
+++ b/tests/panic.rs
@@ -33,7 +33,7 @@
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
$poll.fetch_add(1);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
panic!()
}
}
@@ -128,7 +128,7 @@
assert_eq!(DROP_T.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
handle.cancel();
assert_eq!(POLL.load(), 1);
@@ -209,19 +209,19 @@
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert!(block_on(handle).is_none());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
})
@@ -244,7 +244,7 @@
assert_eq!(DROP_T.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 1);
@@ -273,7 +273,7 @@
assert_eq!(DROP_T.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
drop(handle);
assert_eq!(POLL.load(), 1);
diff --git a/tests/ready.rs b/tests/ready.rs
index 05b266b..a966760 100644
--- a/tests/ready.rs
+++ b/tests/ready.rs
@@ -34,7 +34,7 @@
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
$poll.fetch_add(1);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
Poll::Ready(Out(Box::new(0)))
}
}
@@ -138,7 +138,7 @@
assert_eq!(DROP_O.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
handle.cancel();
assert_eq!(POLL.load(), 1);
@@ -148,7 +148,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(DROP_O.load(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
@@ -181,12 +181,12 @@
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert!(block_on(handle).is_some());
assert_eq!(POLL.load(), 1);
@@ -194,7 +194,7 @@
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 1);
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_T.load(), 1);
})
@@ -218,7 +218,7 @@
assert_eq!(DROP_O.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 1);
@@ -249,7 +249,7 @@
assert_eq!(DROP_O.load(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
drop(handle);
assert_eq!(POLL.load(), 1);
diff --git a/tests/waker_fn.rs b/tests/waker_fn.rs
new file mode 100644
index 0000000..fdad34c
--- /dev/null
+++ b/tests/waker_fn.rs
@@ -0,0 +1,29 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+#[test]
+fn wake() {
+ let a = Arc::new(AtomicUsize::new(0));
+ let w = async_task::waker_fn({
+ let a = a.clone();
+ move || {
+ a.fetch_add(1, Ordering::SeqCst);
+ }
+ });
+
+ assert_eq!(a.load(Ordering::SeqCst), 0);
+ w.wake_by_ref();
+ assert_eq!(a.load(Ordering::SeqCst), 1);
+
+ let w2 = w.clone();
+ assert_eq!(a.load(Ordering::SeqCst), 1);
+ w2.wake_by_ref();
+ assert_eq!(a.load(Ordering::SeqCst), 2);
+ drop(w2);
+ assert_eq!(a.load(Ordering::SeqCst), 2);
+
+ let w3 = w.clone();
+ assert_eq!(a.load(Ordering::SeqCst), 2);
+ w3.wake();
+ assert_eq!(a.load(Ordering::SeqCst), 3);
+}
diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs
index bacdef6..eb77912 100644
--- a/tests/waker_panic.rs
+++ b/tests/waker_panic.rs
@@ -39,7 +39,7 @@
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
WAKER.store(Some(cx.waker().clone()));
$poll.fetch_add(1);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
if self.0.get() {
panic!()
@@ -156,7 +156,7 @@
assert_eq!(chan.len(), 0);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
w.wake();
drop(handle);
@@ -167,7 +167,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
@@ -202,7 +202,7 @@
assert_eq!(chan.len(), 0);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
handle.cancel();
assert_eq!(POLL.load(), 2);
@@ -220,7 +220,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
@@ -255,7 +255,7 @@
assert_eq!(chan.len(), 0);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
w.wake();
assert_eq!(POLL.load(), 2);
@@ -281,7 +281,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
@@ -316,7 +316,7 @@
assert_eq!(chan.len(), 0);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
handle.cancel();
assert_eq!(POLL.load(), 2);
@@ -342,7 +342,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
diff --git a/tests/waker_pending.rs b/tests/waker_pending.rs
index d2c939b..1374c4a 100644
--- a/tests/waker_pending.rs
+++ b/tests/waker_pending.rs
@@ -37,7 +37,7 @@
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
WAKER.store(Some(cx.waker().clone()));
$poll.fetch_add(1);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
Poll::Pending
}
}
@@ -145,7 +145,7 @@
assert_eq!(chan.len(), 1);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
w.wake_by_ref();
assert_eq!(POLL.load(), 2);
@@ -155,7 +155,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 2);
@@ -193,7 +193,7 @@
assert_eq!(chan.len(), 0);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
handle.cancel();
assert_eq!(POLL.load(), 2);
@@ -211,7 +211,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
@@ -246,7 +246,7 @@
assert_eq!(chan.len(), 0);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
w.wake();
assert_eq!(POLL.load(), 2);
@@ -272,7 +272,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
@@ -307,7 +307,7 @@
assert_eq!(chan.len(), 0);
});
- thread::sleep(ms(100));
+ thread::sleep(ms(200));
handle.cancel();
assert_eq!(POLL.load(), 2);
@@ -333,7 +333,7 @@
assert_eq!(DROP_T.load(), 0);
assert_eq!(chan.len(), 0);
- thread::sleep(ms(200));
+ thread::sleep(ms(400));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);