Initial commit
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6936990
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+/target
+**/*.rs.bk
+Cargo.lock
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..2286f1e
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "async-task"
+version = "0.1.0"
+authors = ["Stjepan Glavina <stjepang@gmail.com>"]
+edition = "2018"
+license = "Apache-2.0/MIT"
+repository = "https://github.com/stjepang/async-task"
+homepage = "https://github.com/stjepang/async-task"
+documentation = "https://docs.rs/async-task"
+description = "Task abstraction for building executors"
+keywords = ["future", "task", "executor", "spawn"]
+categories = ["asynchronous", "concurrency"]
+
+[dependencies]
+crossbeam-utils = "0.6.5"
+
+[dev-dependencies]
+crossbeam = "0.7.1"
+futures-preview = "0.3.0-alpha.17"
+lazy_static = "1.3.0"
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 0000000..16fe87b
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,201 @@
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright [yyyy] [name of copyright owner]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/LICENSE-MIT b/LICENSE-MIT
new file mode 100644
index 0000000..31aa793
--- /dev/null
+++ b/LICENSE-MIT
@@ -0,0 +1,23 @@
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..22fe9f2
--- /dev/null
+++ b/README.md
@@ -0,0 +1,21 @@
+# async-task
+
+A task abstraction for building executors.
+
+This crate makes it possible to build an efficient and extendable executor in few lines of
+code.
+
+## License
+
+Licensed under either of
+
+ * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
+ * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
+
+at your option.
+
+#### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
+dual licensed as above, without any additional terms or conditions.
diff --git a/benches/bench.rs b/benches/bench.rs
new file mode 100644
index 0000000..6fd7935
--- /dev/null
+++ b/benches/bench.rs
@@ -0,0 +1,43 @@
+#![feature(async_await, test)]
+
+extern crate test;
+
+use futures::channel::oneshot;
+use futures::executor;
+use futures::future::TryFutureExt;
+use test::Bencher;
+
+#[bench]
+fn task_create(b: &mut Bencher) {
+    b.iter(|| {
+        async_task::spawn(async {}, drop, ());
+    });
+}
+
+#[bench]
+fn task_run(b: &mut Bencher) {
+    b.iter(|| {
+        let (task, handle) = async_task::spawn(async {}, drop, ());
+        task.run();
+        executor::block_on(handle).unwrap();
+    });
+}
+
+#[bench]
+fn oneshot_create(b: &mut Bencher) {
+    b.iter(|| {
+        let (tx, _rx) = oneshot::channel::<()>();
+        let _task = Box::new(async move { tx.send(()).map_err(|_| ()) });
+    });
+}
+
+#[bench]
+fn oneshot_run(b: &mut Bencher) {
+    b.iter(|| {
+        let (tx, rx) = oneshot::channel::<()>();
+        let task = Box::new(async move { tx.send(()).map_err(|_| ()) });
+
+        let future = task.and_then(|_| rx.map_err(|_| ()));
+        executor::block_on(future).unwrap();
+    });
+}
diff --git a/examples/panic-propagation.rs b/examples/panic-propagation.rs
new file mode 100644
index 0000000..9c4f081
--- /dev/null
+++ b/examples/panic-propagation.rs
@@ -0,0 +1,75 @@
+//! A single-threaded executor where join handles propagate panics from tasks.
+
+#![feature(async_await)]
+
+use std::future::Future;
+use std::panic::{resume_unwind, AssertUnwindSafe};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::thread;
+
+use crossbeam::channel::{unbounded, Sender};
+use futures::executor;
+use futures::future::FutureExt;
+use lazy_static::lazy_static;
+
+/// Spawns a future on the executor.
+fn spawn<F, R>(future: F) -> JoinHandle<R>
+where
+    F: Future<Output = R> + Send + 'static,
+    R: Send + 'static,
+{
+    lazy_static! {
+        // A channel that holds scheduled tasks.
+        static ref QUEUE: Sender<async_task::Task<()>> = {
+            let (sender, receiver) = unbounded::<async_task::Task<()>>();
+
+            // Start the executor thread.
+            thread::spawn(|| {
+                for task in receiver {
+                    // No need for `catch_unwind()` here because panics are already caught.
+                    task.run();
+                }
+            });
+
+            sender
+        };
+    }
+
+    // Create a future that catches panics within itself.
+    let future = AssertUnwindSafe(future).catch_unwind();
+
+    // Create a task that is scheduled by sending itself into the channel.
+    let schedule = |t| QUEUE.send(t).unwrap();
+    let (task, handle) = async_task::spawn(future, schedule, ());
+
+    // Schedule the task by sending it into the channel.
+    task.schedule();
+
+    // Wrap the handle into one that propagates panics.
+    JoinHandle(handle)
+}
+
+/// A join handle that propagates panics inside the task.
+struct JoinHandle<R>(async_task::JoinHandle<thread::Result<R>, ()>);
+
+impl<R> Future for JoinHandle<R> {
+    type Output = Option<R>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        match Pin::new(&mut self.0).poll(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(None) => Poll::Ready(None),
+            Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)),
+            Poll::Ready(Some(Err(err))) => resume_unwind(err),
+        }
+    }
+}
+
+fn main() {
+    // Spawn a future that panics and block on it.
+    let handle = spawn(async {
+        panic!("Ooops!");
+    });
+    executor::block_on(handle);
+}
diff --git a/examples/panic-result.rs b/examples/panic-result.rs
new file mode 100644
index 0000000..b1200a3
--- /dev/null
+++ b/examples/panic-result.rs
@@ -0,0 +1,74 @@
+//! A single-threaded executor where join handles catch panics inside tasks.
+
+#![feature(async_await)]
+
+use std::future::Future;
+use std::panic::AssertUnwindSafe;
+use std::thread;
+
+use crossbeam::channel::{unbounded, Sender};
+use futures::executor;
+use futures::future::FutureExt;
+use lazy_static::lazy_static;
+
+/// Spawns a future on the executor.
+fn spawn<F, R>(future: F) -> async_task::JoinHandle<thread::Result<R>, ()>
+where
+    F: Future<Output = R> + Send + 'static,
+    R: Send + 'static,
+{
+    lazy_static! {
+        // A channel that holds scheduled tasks.
+        static ref QUEUE: Sender<async_task::Task<()>> = {
+            let (sender, receiver) = unbounded::<async_task::Task<()>>();
+
+            // Start the executor thread.
+            thread::spawn(|| {
+                for task in receiver {
+                    // No need for `catch_unwind()` here because panics are already caught.
+                    task.run();
+                }
+            });
+
+            sender
+        };
+    }
+
+    // Create a future that catches panics within itself.
+    let future = AssertUnwindSafe(future).catch_unwind();
+
+    // Create a task that is scheduled by sending itself into the channel.
+    let schedule = |t| QUEUE.send(t).unwrap();
+    let (task, handle) = async_task::spawn(future, schedule, ());
+
+    // Schedule the task by sending it into the channel.
+    task.schedule();
+
+    handle
+}
+
+fn main() {
+    // Spawn a future that completes succesfully.
+    let handle = spawn(async {
+        println!("Hello, world!");
+    });
+
+    // Block on the future and report its result.
+    match executor::block_on(handle) {
+        None => println!("The task was cancelled."),
+        Some(Ok(val)) => println!("The task completed with {:?}", val),
+        Some(Err(_)) => println!("The task has panicked"),
+    }
+
+    // Spawn a future that panics.
+    let handle = spawn(async {
+        panic!("Ooops!");
+    });
+
+    // Block on the future and report its result.
+    match executor::block_on(handle) {
+        None => println!("The task was cancelled."),
+        Some(Ok(val)) => println!("The task completed with {:?}", val),
+        Some(Err(_)) => println!("The task has panicked"),
+    }
+}
diff --git a/examples/spawn-on-thread.rs b/examples/spawn-on-thread.rs
new file mode 100644
index 0000000..6d5b9a2
--- /dev/null
+++ b/examples/spawn-on-thread.rs
@@ -0,0 +1,55 @@
+//! A function that runs a future to completion on a dedicated thread.
+
+#![feature(async_await)]
+
+use std::future::Future;
+use std::sync::Arc;
+use std::thread;
+
+use crossbeam::channel;
+use futures::executor;
+
+/// Spawns a future on a new dedicated thread.
+///
+/// The returned handle can be used to await the output of the future.
+fn spawn_on_thread<F, R>(future: F) -> async_task::JoinHandle<R, ()>
+where
+    F: Future<Output = R> + Send + 'static,
+    R: Send + 'static,
+{
+    // Create a channel that holds the task when it is scheduled for running.
+    let (sender, receiver) = channel::unbounded();
+    let sender = Arc::new(sender);
+    let s = Arc::downgrade(&sender);
+
+    // Wrap the future into one that disconnects the channel on completion.
+    let future = async move {
+        // When the inner future completes, the sender gets dropped and disconnects the channel.
+        let _sender = sender;
+        future.await
+    };
+
+    // Create a task that is scheduled by sending itself into the channel.
+    let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
+    let (task, handle) = async_task::spawn(future, schedule, ());
+
+    // Schedule the task by sending it into the channel.
+    task.schedule();
+
+    // Spawn a thread running the task to completion.
+    thread::spawn(move || {
+        // Keep taking the task from the channel and running it until completion.
+        for task in receiver {
+            task.run();
+        }
+    });
+
+    handle
+}
+
+fn main() {
+    // Spawn a future on a dedicated thread.
+    executor::block_on(spawn_on_thread(async {
+        println!("Hello, world!");
+    }));
+}
diff --git a/examples/spawn.rs b/examples/spawn.rs
new file mode 100644
index 0000000..6e798c0
--- /dev/null
+++ b/examples/spawn.rs
@@ -0,0 +1,52 @@
+//! A simple single-threaded executor.
+
+#![feature(async_await)]
+
+use std::future::Future;
+use std::panic::catch_unwind;
+use std::thread;
+
+use crossbeam::channel::{unbounded, Sender};
+use futures::executor;
+use lazy_static::lazy_static;
+
+/// Spawns a future on the executor.
+fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
+where
+    F: Future<Output = R> + Send + 'static,
+    R: Send + 'static,
+{
+    lazy_static! {
+        // A channel that holds scheduled tasks.
+        static ref QUEUE: Sender<async_task::Task<()>> = {
+            let (sender, receiver) = unbounded::<async_task::Task<()>>();
+
+            // Start the executor thread.
+            thread::spawn(|| {
+                for task in receiver {
+                    // Ignore panics for simplicity.
+                    let _ignore_panic = catch_unwind(|| task.run());
+                }
+            });
+
+            sender
+        };
+    }
+
+    // Create a task that is scheduled by sending itself into the channel.
+    let schedule = |t| QUEUE.send(t).unwrap();
+    let (task, handle) = async_task::spawn(future, schedule, ());
+
+    // Schedule the task by sending it into the channel.
+    task.schedule();
+
+    handle
+}
+
+fn main() {
+    // Spawn a future and await its result.
+    let handle = spawn(async {
+        println!("Hello, world!");
+    });
+    executor::block_on(handle);
+}
diff --git a/examples/task-id.rs b/examples/task-id.rs
new file mode 100644
index 0000000..b3832d0
--- /dev/null
+++ b/examples/task-id.rs
@@ -0,0 +1,88 @@
+//! An executor that assigns an ID to every spawned task.
+
+#![feature(async_await)]
+
+use std::cell::Cell;
+use std::future::Future;
+use std::panic::catch_unwind;
+use std::thread;
+
+use crossbeam::atomic::AtomicCell;
+use crossbeam::channel::{unbounded, Sender};
+use futures::executor;
+use lazy_static::lazy_static;
+
+#[derive(Clone, Copy, Debug)]
+struct TaskId(usize);
+
+thread_local! {
+    /// The ID of the current task.
+    static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
+}
+
+/// Returns the ID of the currently executing task.
+///
+/// Returns `None` if called outside the runtime.
+fn task_id() -> Option<TaskId> {
+    TASK_ID.with(|id| id.get())
+}
+
+/// Spawns a future on the executor.
+fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId>
+where
+    F: Future<Output = R> + Send + 'static,
+    R: Send + 'static,
+{
+    lazy_static! {
+        // A channel that holds scheduled tasks.
+        static ref QUEUE: Sender<async_task::Task<TaskId>> = {
+            let (sender, receiver) = unbounded::<async_task::Task<TaskId>>();
+
+            // Start the executor thread.
+            thread::spawn(|| {
+                TASK_ID.with(|id| {
+                    for task in receiver {
+                        // Store the task ID into the thread-local before running.
+                        id.set(Some(*task.tag()));
+
+                        // Ignore panics for simplicity.
+                        let _ignore_panic = catch_unwind(|| task.run());
+                    }
+                })
+            });
+
+            sender
+        };
+
+        // A counter that assigns IDs to spawned tasks.
+        static ref COUNTER: AtomicCell<usize> = AtomicCell::new(0);
+    }
+
+    // Reserve an ID for the new task.
+    let id = TaskId(COUNTER.fetch_add(1));
+
+    // Create a task that is scheduled by sending itself into the channel.
+    let schedule = |task| QUEUE.send(task).unwrap();
+    let (task, handle) = async_task::spawn(future, schedule, id);
+
+    // Schedule the task by sending it into the channel.
+    task.schedule();
+
+    handle
+}
+
+fn main() {
+    let mut handles = vec![];
+
+    // Spawn a bunch of tasks.
+    for _ in 0..10 {
+        handles.push(spawn(async move {
+            println!("Hello from task with {:?}", task_id());
+        }));
+    }
+
+    // Wait for the tasks to finish.
+    for handle in handles {
+        executor::block_on(handle);
+    }
+}
diff --git a/src/header.rs b/src/header.rs
new file mode 100644
index 0000000..0ce5164
--- /dev/null
+++ b/src/header.rs
@@ -0,0 +1,158 @@
+use std::alloc::Layout;
+use std::cell::Cell;
+use std::fmt;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::task::Waker;
+
+use crossbeam_utils::Backoff;
+
+use crate::raw::TaskVTable;
+use crate::state::*;
+use crate::utils::{abort_on_panic, extend};
+
+/// The header of a task.
+///
+/// This header is stored right at the beginning of every heap-allocated task.
+pub(crate) struct Header {
+    /// Current state of the task.
+    ///
+    /// Contains flags representing the current state and the reference count.
+    pub(crate) state: AtomicUsize,
+
+    /// The task that is blocked on the `JoinHandle`.
+    ///
+    /// This waker needs to be woken once the task completes or is closed.
+    pub(crate) awaiter: Cell<Option<Waker>>,
+
+    /// The virtual table.
+    ///
+    /// In addition to the actual waker virtual table, it also contains pointers to several other
+    /// methods necessary for bookkeeping the heap-allocated task.
+    pub(crate) vtable: &'static TaskVTable,
+}
+
+impl Header {
+    /// Cancels the task.
+    ///
+    /// This method will only mark the task as closed and will notify the awaiter, but it won't
+    /// reschedule the task if it's not completed.
+    pub(crate) fn cancel(&self) {
+        let mut state = self.state.load(Ordering::Acquire);
+
+        loop {
+            // If the task has been completed or closed, it can't be cancelled.
+            if state & (COMPLETED | CLOSED) != 0 {
+                break;
+            }
+
+            // Mark the task as closed.
+            match self.state.compare_exchange_weak(
+                state,
+                state | CLOSED,
+                Ordering::AcqRel,
+                Ordering::Acquire,
+            ) {
+                Ok(_) => {
+                    // Notify the awaiter that the task has been closed.
+                    if state & AWAITER != 0 {
+                        self.notify();
+                    }
+
+                    break;
+                }
+                Err(s) => state = s,
+            }
+        }
+    }
+
+    /// Notifies the task blocked on the task.
+    ///
+    /// If there is a registered waker, it will be removed from the header and woken.
+    #[inline]
+    pub(crate) fn notify(&self) {
+        if let Some(waker) = self.swap_awaiter(None) {
+            // We need a safeguard against panics because waking can panic.
+            abort_on_panic(|| {
+                waker.wake();
+            });
+        }
+    }
+
+    /// Notifies the task blocked on the task unless its waker matches `current`.
+    ///
+    /// If there is a registered waker, it will be removed from the header.
+    #[inline]
+    pub(crate) fn notify_unless(&self, current: &Waker) {
+        if let Some(waker) = self.swap_awaiter(None) {
+            if !waker.will_wake(current) {
+                // We need a safeguard against panics because waking can panic.
+                abort_on_panic(|| {
+                    waker.wake();
+                });
+            }
+        }
+    }
+
+    /// Swaps the awaiter and returns the previous value.
+    #[inline]
+    pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
+        let new_is_none = new.is_none();
+
+        // We're about to try acquiring the lock in a loop. If it's already being held by another
+        // thread, we'll have to spin for a while so it's best to employ a backoff strategy.
+        let backoff = Backoff::new();
+        loop {
+            // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
+            let state = if new_is_none {
+                self.state.fetch_or(LOCKED, Ordering::Acquire)
+            } else {
+                self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire)
+            };
+
+            // If the lock was acquired, break from the loop.
+            if state & LOCKED == 0 {
+                break;
+            }
+
+            // Snooze for a little while because the lock is held by another thread.
+            backoff.snooze();
+        }
+
+        // Replace the awaiter.
+        let old = self.awaiter.replace(new);
+
+        // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
+        if new_is_none {
+            self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release);
+        } else {
+            self.state.fetch_and(!LOCKED, Ordering::Release);
+        }
+
+        old
+    }
+
+    /// Returns the offset at which the tag of type `T` is stored.
+    #[inline]
+    pub(crate) fn offset_tag<T>() -> usize {
+        let layout_header = Layout::new::<Header>();
+        let layout_t = Layout::new::<T>();
+        let (_, offset_t) = extend(layout_header, layout_t);
+        offset_t
+    }
+}
+
+impl fmt::Debug for Header {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let state = self.state.load(Ordering::SeqCst);
+
+        f.debug_struct("Header")
+            .field("scheduled", &(state & SCHEDULED != 0))
+            .field("running", &(state & RUNNING != 0))
+            .field("completed", &(state & COMPLETED != 0))
+            .field("closed", &(state & CLOSED != 0))
+            .field("awaiter", &(state & AWAITER != 0))
+            .field("handle", &(state & HANDLE != 0))
+            .field("ref_count", &(state / REFERENCE))
+            .finish()
+    }
+}
diff --git a/src/join_handle.rs b/src/join_handle.rs
new file mode 100644
index 0000000..fb5c275
--- /dev/null
+++ b/src/join_handle.rs
@@ -0,0 +1,333 @@
+use std::fmt;
+use std::future::Future;
+use std::marker::{PhantomData, Unpin};
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::sync::atomic::Ordering;
+use std::task::{Context, Poll};
+
+use crate::header::Header;
+use crate::state::*;
+use crate::utils::abort_on_panic;
+
+/// A handle that awaits the result of a task.
+///
+/// If the task has completed with `value`, the handle returns it as `Some(value)`. If the task was
+/// cancelled or has panicked, the handle returns `None`. Otherwise, the handle has to wait until
+/// the task completes, panics, or gets cancelled.
+///
+/// # Examples
+///
+/// ```
+/// #![feature(async_await)]
+///
+/// use crossbeam::channel;
+/// use futures::executor;
+///
+/// // The future inside the task.
+/// let future = async { 1 + 2 };
+///
+/// // If the task gets woken, it will be sent into this channel.
+/// let (s, r) = channel::unbounded();
+/// let schedule = move |task| s.send(task).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (task, handle) = async_task::spawn(future, schedule, ());
+///
+/// // Run the task. In this example, it will complete after a single run.
+/// task.run();
+/// assert!(r.is_empty());
+///
+/// // Await the result of the task.
+/// let result = executor::block_on(handle);
+/// assert_eq!(result, Some(3));
+/// ```
+pub struct JoinHandle<R, T> {
+    /// A raw task pointer.
+    pub(crate) raw_task: NonNull<()>,
+
+    /// A marker capturing the generic type `R`.
+    pub(crate) _marker: PhantomData<(R, T)>,
+}
+
+unsafe impl<R, T> Send for JoinHandle<R, T> {}
+unsafe impl<R, T> Sync for JoinHandle<R, T> {}
+
+impl<R, T> Unpin for JoinHandle<R, T> {}
+
+impl<R, T> JoinHandle<R, T> {
+    /// Cancels the task.
+    ///
+    /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
+    /// to run it won't do anything. And if it's completed, awaiting its result evaluates to
+    /// `None`.
+    ///
+    /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #![feature(async_await)]
+    /// use crossbeam::channel;
+    /// use futures::executor;
+    ///
+    /// // The future inside the task.
+    /// let future = async { 1 + 2 };
+    ///
+    /// // If the task gets woken, it will be sent into this channel.
+    /// let (s, r) = channel::unbounded();
+    /// let schedule = move |task| s.send(task).unwrap();
+    ///
+    /// // Create a task with the future and the schedule function.
+    /// let (task, handle) = async_task::spawn(future, schedule, ());
+    ///
+    /// // Cancel the task.
+    /// handle.cancel();
+    ///
+    /// // Running a cancelled task does nothing.
+    /// task.run();
+    ///
+    /// // Await the result of the task.
+    /// let result = executor::block_on(handle);
+    /// assert_eq!(result, None);
+    /// ```
+    pub fn cancel(&self) {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+
+        unsafe {
+            let mut state = (*header).state.load(Ordering::Acquire);
+
+            loop {
+                // If the task has been completed or closed, it can't be cancelled.
+                if state & (COMPLETED | CLOSED) != 0 {
+                    break;
+                }
+
+                // If the task is not scheduled nor running, we'll need to schedule it.
+                let new = if state & (SCHEDULED | RUNNING) == 0 {
+                    (state | SCHEDULED | CLOSED) + REFERENCE
+                } else {
+                    state | CLOSED
+                };
+
+                // Mark the task as closed.
+                match (*header).state.compare_exchange_weak(
+                    state,
+                    new,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => {
+                        // If the task is not scheduled nor running, schedule it so that its future
+                        // gets dropped by the executor.
+                        if state & (SCHEDULED | RUNNING) == 0 {
+                            ((*header).vtable.schedule)(ptr);
+                        }
+
+                        // Notify the awaiter that the task has been closed.
+                        if state & AWAITER != 0 {
+                            (*header).notify();
+                        }
+
+                        break;
+                    }
+                    Err(s) => state = s,
+                }
+            }
+        }
+    }
+
+    /// Returns a reference to the tag stored inside the task.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #![feature(async_await)]
+    /// use crossbeam::channel;
+    ///
+    /// // The future inside the task.
+    /// let future = async { 1 + 2 };
+    ///
+    /// // If the task gets woken, it will be sent into this channel.
+    /// let (s, r) = channel::unbounded();
+    /// let schedule = move |task| s.send(task).unwrap();
+    ///
+    /// // Create a task with the future and the schedule function.
+    /// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
+    ///
+    /// // Access the tag.
+    /// assert_eq!(*handle.tag(), "a simple task");
+    /// ```
+    pub fn tag(&self) -> &T {
+        let offset = Header::offset_tag::<T>();
+        let ptr = self.raw_task.as_ptr();
+
+        unsafe {
+            let raw = (ptr as *mut u8).add(offset) as *const T;
+            &*raw
+        }
+    }
+}
+
+impl<R, T> Drop for JoinHandle<R, T> {
+    fn drop(&mut self) {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+
+        // A place where the output will be stored in case it needs to be dropped.
+        let mut output = None;
+
+        unsafe {
+            // Optimistically assume the `JoinHandle` is being dropped just after creating the
+            // task. This is a common case so if the handle is not used, the overhead of it is only
+            // one compare-exchange operation.
+            if let Err(mut state) = (*header).state.compare_exchange_weak(
+                SCHEDULED | HANDLE | REFERENCE,
+                SCHEDULED | REFERENCE,
+                Ordering::AcqRel,
+                Ordering::Acquire,
+            ) {
+                loop {
+                    // If the task has been completed but not yet closed, that means its output
+                    // must be dropped.
+                    if state & COMPLETED != 0 && state & CLOSED == 0 {
+                        // Mark the task as closed in order to grab its output.
+                        match (*header).state.compare_exchange_weak(
+                            state,
+                            state | CLOSED,
+                            Ordering::AcqRel,
+                            Ordering::Acquire,
+                        ) {
+                            Ok(_) => {
+                                // Read the output.
+                                output =
+                                    Some((((*header).vtable.get_output)(ptr) as *mut R).read());
+
+                                // Update the state variable because we're continuing the loop.
+                                state |= CLOSED;
+                            }
+                            Err(s) => state = s,
+                        }
+                    } else {
+                        // If this is the last reference to task and it's not closed, then close
+                        // it and schedule one more time so that its future gets dropped by the
+                        // executor.
+                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
+                            SCHEDULED | CLOSED | REFERENCE
+                        } else {
+                            state & !HANDLE
+                        };
+
+                        // Unset the handle flag.
+                        match (*header).state.compare_exchange_weak(
+                            state,
+                            new,
+                            Ordering::AcqRel,
+                            Ordering::Acquire,
+                        ) {
+                            Ok(_) => {
+                                // If this is the last reference to the task, we need to either
+                                // schedule dropping its future or destroy it.
+                                if state & !(REFERENCE - 1) == 0 {
+                                    if state & CLOSED == 0 {
+                                        ((*header).vtable.schedule)(ptr);
+                                    } else {
+                                        ((*header).vtable.destroy)(ptr);
+                                    }
+                                }
+
+                                break;
+                            }
+                            Err(s) => state = s,
+                        }
+                    }
+                }
+            }
+        }
+
+        // Drop the output if it was taken out of the task.
+        drop(output);
+    }
+}
+
+impl<R, T> Future for JoinHandle<R, T> {
+    type Output = Option<R>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+
+        unsafe {
+            let mut state = (*header).state.load(Ordering::Acquire);
+
+            loop {
+                // If the task has been closed, notify the awaiter and return `None`.
+                if state & CLOSED != 0 {
+                    // Even though the awaiter is most likely the current task, it could also be
+                    // another task.
+                    (*header).notify_unless(cx.waker());
+                    return Poll::Ready(None);
+                }
+
+                // If the task is not completed, register the current task.
+                if state & COMPLETED == 0 {
+                    // Replace the waker with one associated with the current task. We need a
+                    // safeguard against panics because dropping the previous waker can panic.
+                    abort_on_panic(|| {
+                        (*header).swap_awaiter(Some(cx.waker().clone()));
+                    });
+
+                    // Reload the state after registering. It is possible that the task became
+                    // completed or closed just before registration so we need to check for that.
+                    state = (*header).state.load(Ordering::Acquire);
+
+                    // If the task has been closed, notify the awaiter and return `None`.
+                    if state & CLOSED != 0 {
+                        // Even though the awaiter is most likely the current task, it could also
+                        // be another task.
+                        (*header).notify_unless(cx.waker());
+                        return Poll::Ready(None);
+                    }
+
+                    // If the task is still not completed, we're blocked on it.
+                    if state & COMPLETED == 0 {
+                        return Poll::Pending;
+                    }
+                }
+
+                // Since the task is now completed, mark it as closed in order to grab its output.
+                match (*header).state.compare_exchange(
+                    state,
+                    state | CLOSED,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => {
+                        // Notify the awaiter. Even though the awaiter is most likely the current
+                        // task, it could also be another task.
+                        if state & AWAITER != 0 {
+                            (*header).notify_unless(cx.waker());
+                        }
+
+                        // Take the output from the task.
+                        let output = ((*header).vtable.get_output)(ptr) as *mut R;
+                        return Poll::Ready(Some(output.read()));
+                    }
+                    Err(s) => state = s,
+                }
+            }
+        }
+    }
+}
+
+impl<R, T> fmt::Debug for JoinHandle<R, T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+
+        f.debug_struct("JoinHandle")
+            .field("header", unsafe { &(*header) })
+            .finish()
+    }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..5518515
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,149 @@
+//! Task abstraction for building executors.
+//!
+//! # What is an executor?
+//!
+//! An async block creates a future and an async function returns one. But futures don't do
+//! anything unless they are awaited inside other async blocks or async functions. So the question
+//! arises: who or what awaits the main future that awaits others?
+//!
+//! One solution is to call [`block_on()`] on the main future, which will block
+//! the current thread and keep polling the future until it completes. But sometimes we don't want
+//! to block the current thread and would prefer to *spawn* the future to let a background thread
+//! block on it instead.
+//!
+//! This is where executors step in - they create a number of threads (typically equal to the
+//! number of CPU cores on the system) that are dedicated to polling spawned futures. Each executor
+//! thread keeps polling spawned futures in a loop and only blocks when all spawned futures are
+//! either sleeping or running.
+//!
+//! # What is a task?
+//!
+//! In order to spawn a future on an executor, one needs to allocate the future on the heap and
+//! keep some state alongside it, like whether the future is ready for polling, waiting to be woken
+//! up, or completed. This allocation is usually called a *task*.
+//!
+//! The executor then runs the spawned task by polling its future. If the future is pending on a
+//! resource, a [`Waker`] associated with the task will be registered somewhere so that the task
+//! can be woken up and run again at a later time.
+//!
+//! For example, if the future wants to read something from a TCP socket that is not ready yet, the
+//! networking system will clone the task's waker and wake it up once the socket becomes ready.
+//!
+//! # Task construction
+//!
+//! A task is constructed with [`Task::create()`]:
+//!
+//! ```
+//! # #![feature(async_await)]
+//! let future = async { 1 + 2 };
+//! let schedule = |task| unimplemented!();
+//!
+//! let (task, handle) = async_task::spawn(future, schedule, ());
+//! ```
+//!
+//! The first argument to the constructor, `()` in this example, is an arbitrary piece of data
+//! called a *tag*. This can be a task identifier, a task name, task-local storage, or something
+//! of similar nature.
+//!
+//! The second argument is the future that gets polled when the task is run.
+//!
+//! The third argument is the schedule function, which is called every time when the task gets
+//! woken up. This function should push the received task into some kind of queue of runnable
+//! tasks.
+//!
+//! The constructor returns a runnable [`Task`] and a [`JoinHandle`] that can await the result of
+//! the future.
+//!
+//! # Task scheduling
+//!
+//! TODO
+//!
+//! # Join handles
+//!
+//! TODO
+//!
+//! # Cancellation
+//!
+//! TODO
+//!
+//! # Performance
+//!
+//! TODO: explain single allocation, etc.
+//!
+//! Task [construction] incurs a single allocation only. The [`Task`] can then be run and its
+//! result awaited through the [`JoinHandle`]. When woken, the task gets automatically rescheduled.
+//! It's also possible to cancel the task so that it stops running and can't be awaited anymore.
+//!
+//! [construction]: struct.Task.html#method.create
+//! [`JoinHandle`]: struct.JoinHandle.html
+//! [`Task`]: struct.Task.html
+//! [`Future`]: https://doc.rust-lang.org/nightly/std/future/trait.Future.html
+//! [`Waker`]: https://doc.rust-lang.org/nightly/std/task/struct.Waker.html
+//! [`block_on()`]: https://docs.rs/futures-preview/*/futures/executor/fn.block_on.html
+//!
+//! # Examples
+//!
+//! A simple single-threaded executor:
+//!
+//! ```
+//! # #![feature(async_await)]
+//! use std::future::Future;
+//! use std::panic::catch_unwind;
+//! use std::thread;
+//!
+//! use async_task::{JoinHandle, Task};
+//! use crossbeam::channel::{unbounded, Sender};
+//! use futures::executor;
+//! use lazy_static::lazy_static;
+//!
+//! /// Spawns a future on the executor.
+//! fn spawn<F, R>(future: F) -> JoinHandle<R, ()>
+//! where
+//!     F: Future<Output = R> + Send + 'static,
+//!     R: Send + 'static,
+//! {
+//!     lazy_static! {
+//!         // A channel that holds scheduled tasks.
+//!         static ref QUEUE: Sender<Task<()>> = {
+//!             let (sender, receiver) = unbounded::<Task<()>>();
+//!
+//!             // Start the executor thread.
+//!             thread::spawn(|| {
+//!                 for task in receiver {
+//!                     // Ignore panics for simplicity.
+//!                     let _ignore_panic = catch_unwind(|| task.run());
+//!                 }
+//!             });
+//!
+//!             sender
+//!         };
+//!     }
+//!
+//!     // Create a task that is scheduled by sending itself into the channel.
+//!     let schedule = |t| QUEUE.send(t).unwrap();
+//!     let (task, handle) = async_task::spawn(future, schedule, ());
+//!
+//!     // Schedule the task by sending it into the channel.
+//!     task.schedule();
+//!
+//!     handle
+//! }
+//!
+//! // Spawn a future and await its result.
+//! let handle = spawn(async {
+//!     println!("Hello, world!");
+//! });
+//! executor::block_on(handle);
+//! ```
+
+#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
+
+mod header;
+mod join_handle;
+mod raw;
+mod state;
+mod task;
+mod utils;
+
+pub use crate::join_handle::JoinHandle;
+pub use crate::task::{spawn, Task};
diff --git a/src/raw.rs b/src/raw.rs
new file mode 100644
index 0000000..6928427
--- /dev/null
+++ b/src/raw.rs
@@ -0,0 +1,629 @@
+use std::alloc::{self, Layout};
+use std::cell::Cell;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::mem::{self, ManuallyDrop};
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+
+use crate::header::Header;
+use crate::state::*;
+use crate::utils::{abort_on_panic, extend};
+use crate::Task;
+
+/// The vtable for a task.
+pub(crate) struct TaskVTable {
+    /// The raw waker vtable.
+    pub(crate) raw_waker: RawWakerVTable,
+
+    /// Schedules the task.
+    pub(crate) schedule: unsafe fn(*const ()),
+
+    /// Drops the future inside the task.
+    pub(crate) drop_future: unsafe fn(*const ()),
+
+    /// Returns a pointer to the output stored after completion.
+    pub(crate) get_output: unsafe fn(*const ()) -> *const (),
+
+    /// Drops a waker or a task.
+    pub(crate) decrement: unsafe fn(ptr: *const ()),
+
+    /// Destroys the task.
+    pub(crate) destroy: unsafe fn(*const ()),
+
+    /// Runs the task.
+    pub(crate) run: unsafe fn(*const ()),
+}
+
+/// Memory layout of a task.
+///
+/// This struct contains the information on:
+///
+/// 1. How to allocate and deallocate the task.
+/// 2. How to access the fields inside the task.
+#[derive(Clone, Copy)]
+pub(crate) struct TaskLayout {
+    /// Memory layout of the whole task.
+    pub(crate) layout: Layout,
+
+    /// Offset into the task at which the tag is stored.
+    pub(crate) offset_t: usize,
+
+    /// Offset into the task at which the schedule function is stored.
+    pub(crate) offset_s: usize,
+
+    /// Offset into the task at which the future is stored.
+    pub(crate) offset_f: usize,
+
+    /// Offset into the task at which the output is stored.
+    pub(crate) offset_r: usize,
+}
+
+/// Raw pointers to the fields of a task.
+pub(crate) struct RawTask<F, R, S, T> {
+    /// The task header.
+    pub(crate) header: *const Header,
+
+    /// The schedule function.
+    pub(crate) schedule: *const S,
+
+    /// The tag inside the task.
+    pub(crate) tag: *mut T,
+
+    /// The future.
+    pub(crate) future: *mut F,
+
+    /// The output of the future.
+    pub(crate) output: *mut R,
+}
+
+impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
+
+impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
+    fn clone(&self) -> Self {
+        Self {
+            header: self.header,
+            schedule: self.schedule,
+            tag: self.tag,
+            future: self.future,
+            output: self.output,
+        }
+    }
+}
+
+impl<F, R, S, T> RawTask<F, R, S, T>
+where
+    F: Future<Output = R> + Send + 'static,
+    R: Send + 'static,
+    S: Fn(Task<T>) + Send + Sync + 'static,
+    T: Send + 'static,
+{
+    /// Allocates a task with the given `future` and `schedule` function.
+    ///
+    /// It is assumed there are initially only the `Task` reference and the `JoinHandle`.
+    pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
+        // Compute the layout of the task for allocation. Abort if the computation fails.
+        let task_layout = abort_on_panic(|| Self::task_layout());
+
+        unsafe {
+            // Allocate enough space for the entire task.
+            let raw_task = match NonNull::new(alloc::alloc(task_layout.layout) as *mut ()) {
+                None => std::process::abort(),
+                Some(p) => p,
+            };
+
+            let raw = Self::from_ptr(raw_task.as_ptr());
+
+            // Write the header as the first field of the task.
+            (raw.header as *mut Header).write(Header {
+                state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
+                awaiter: Cell::new(None),
+                vtable: &TaskVTable {
+                    raw_waker: RawWakerVTable::new(
+                        Self::clone_waker,
+                        Self::wake,
+                        Self::wake_by_ref,
+                        Self::decrement,
+                    ),
+                    schedule: Self::schedule,
+                    drop_future: Self::drop_future,
+                    get_output: Self::get_output,
+                    decrement: Self::decrement,
+                    destroy: Self::destroy,
+                    run: Self::run,
+                },
+            });
+
+            // Write the tag as the second field of the task.
+            (raw.tag as *mut T).write(tag);
+
+            // Write the schedule function as the third field of the task.
+            (raw.schedule as *mut S).write(schedule);
+
+            // Write the future as the fourth field of the task.
+            raw.future.write(future);
+
+            raw_task
+        }
+    }
+
+    /// Creates a `RawTask` from a raw task pointer.
+    #[inline]
+    pub(crate) fn from_ptr(ptr: *const ()) -> Self {
+        let task_layout = Self::task_layout();
+        let p = ptr as *const u8;
+
+        unsafe {
+            Self {
+                header: p as *const Header,
+                tag: p.add(task_layout.offset_t) as *mut T,
+                schedule: p.add(task_layout.offset_s) as *const S,
+                future: p.add(task_layout.offset_f) as *mut F,
+                output: p.add(task_layout.offset_r) as *mut R,
+            }
+        }
+    }
+
+    /// Returns the memory layout for a task.
+    #[inline]
+    fn task_layout() -> TaskLayout {
+        // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
+        let layout_header = Layout::new::<Header>();
+        let layout_t = Layout::new::<T>();
+        let layout_s = Layout::new::<S>();
+        let layout_f = Layout::new::<F>();
+        let layout_r = Layout::new::<R>();
+
+        // Compute the layout for `union { F, R }`.
+        let size_union = layout_f.size().max(layout_r.size());
+        let align_union = layout_f.align().max(layout_r.align());
+        let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
+
+        // Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`.
+        let layout = layout_header;
+        let (layout, offset_t) = extend(layout, layout_t);
+        let (layout, offset_s) = extend(layout, layout_s);
+        let (layout, offset_union) = extend(layout, layout_union);
+        let offset_f = offset_union;
+        let offset_r = offset_union;
+
+        TaskLayout {
+            layout,
+            offset_t,
+            offset_s,
+            offset_f,
+            offset_r,
+        }
+    }
+
+    /// Wakes a waker.
+    unsafe fn wake(ptr: *const ()) {
+        let raw = Self::from_ptr(ptr);
+
+        let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+        loop {
+            // If the task is completed or closed, it can't be woken.
+            if state & (COMPLETED | CLOSED) != 0 {
+                // Drop the waker.
+                Self::decrement(ptr);
+                break;
+            }
+
+            // If the task is already scheduled, we just need to synchronize with the thread that
+            // will run the task by "publishing" our current view of the memory.
+            if state & SCHEDULED != 0 {
+                // Update the state without actually modifying it.
+                match (*raw.header).state.compare_exchange_weak(
+                    state,
+                    state,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => {
+                        // Drop the waker.
+                        Self::decrement(ptr);
+                        break;
+                    }
+                    Err(s) => state = s,
+                }
+            } else {
+                // Mark the task as scheduled.
+                match (*raw.header).state.compare_exchange_weak(
+                    state,
+                    state | SCHEDULED,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => {
+                        // If the task is not yet scheduled and isn't currently running, now is the
+                        // time to schedule it.
+                        if state & (SCHEDULED | RUNNING) == 0 {
+                            // Schedule the task.
+                            let task = Task {
+                                raw_task: NonNull::new_unchecked(ptr as *mut ()),
+                                _marker: PhantomData,
+                            };
+                            (*raw.schedule)(task);
+                        } else {
+                            // Drop the waker.
+                            Self::decrement(ptr);
+                        }
+
+                        break;
+                    }
+                    Err(s) => state = s,
+                }
+            }
+        }
+    }
+
+    /// Wakes a waker by reference.
+    unsafe fn wake_by_ref(ptr: *const ()) {
+        let raw = Self::from_ptr(ptr);
+
+        let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+        loop {
+            // If the task is completed or closed, it can't be woken.
+            if state & (COMPLETED | CLOSED) != 0 {
+                break;
+            }
+
+            // If the task is already scheduled, we just need to synchronize with the thread that
+            // will run the task by "publishing" our current view of the memory.
+            if state & SCHEDULED != 0 {
+                // Update the state without actually modifying it.
+                match (*raw.header).state.compare_exchange_weak(
+                    state,
+                    state,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => break,
+                    Err(s) => state = s,
+                }
+            } else {
+                // If the task is not scheduled nor running, we'll need to schedule after waking.
+                let new = if state & (SCHEDULED | RUNNING) == 0 {
+                    (state | SCHEDULED) + REFERENCE
+                } else {
+                    state | SCHEDULED
+                };
+
+                // Mark the task as scheduled.
+                match (*raw.header).state.compare_exchange_weak(
+                    state,
+                    new,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => {
+                        // If the task is not scheduled nor running, now is the time to schedule.
+                        if state & (SCHEDULED | RUNNING) == 0 {
+                            // If the reference count overflowed, abort.
+                            if state > isize::max_value() as usize {
+                                std::process::abort();
+                            }
+
+                            // Schedule the task.
+                            let task = Task {
+                                raw_task: NonNull::new_unchecked(ptr as *mut ()),
+                                _marker: PhantomData,
+                            };
+                            (*raw.schedule)(task);
+                        }
+
+                        break;
+                    }
+                    Err(s) => state = s,
+                }
+            }
+        }
+    }
+
+    /// Clones a waker.
+    unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+        let raw = Self::from_ptr(ptr);
+        let raw_waker = &(*raw.header).vtable.raw_waker;
+
+        // Increment the reference count. With any kind of reference-counted data structure,
+        // relaxed ordering is fine when the reference is being cloned.
+        let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
+
+        // If the reference count overflowed, abort.
+        if state > isize::max_value() as usize {
+            std::process::abort();
+        }
+
+        RawWaker::new(ptr, raw_waker)
+    }
+
+    /// Drops a waker or a task.
+    ///
+    /// This function will decrement the reference count. If it drops down to zero and the
+    /// associated join handle has been dropped too, then the task gets destroyed.
+    #[inline]
+    unsafe fn decrement(ptr: *const ()) {
+        let raw = Self::from_ptr(ptr);
+
+        // Decrement the reference count.
+        let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
+
+        // If this was the last reference to the task and the `JoinHandle` has been dropped as
+        // well, then destroy task.
+        if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
+            Self::destroy(ptr);
+        }
+    }
+
+    /// Schedules a task for running.
+    ///
+    /// This function doesn't modify the state of the task. It only passes the task reference to
+    /// its schedule function.
+    unsafe fn schedule(ptr: *const ()) {
+        let raw = Self::from_ptr(ptr);
+
+        (*raw.schedule)(Task {
+            raw_task: NonNull::new_unchecked(ptr as *mut ()),
+            _marker: PhantomData,
+        });
+    }
+
+    /// Drops the future inside a task.
+    #[inline]
+    unsafe fn drop_future(ptr: *const ()) {
+        let raw = Self::from_ptr(ptr);
+
+        // We need a safeguard against panics because the destructor can panic.
+        abort_on_panic(|| {
+            raw.future.drop_in_place();
+        })
+    }
+
+    /// Returns a pointer to the output inside a task.
+    unsafe fn get_output(ptr: *const ()) -> *const () {
+        let raw = Self::from_ptr(ptr);
+        raw.output as *const ()
+    }
+
+    /// Cleans up task's resources and deallocates it.
+    ///
+    /// If the task has not been closed, then its future or the output will be dropped. The
+    /// schedule function and the tag get dropped too.
+    #[inline]
+    unsafe fn destroy(ptr: *const ()) {
+        let raw = Self::from_ptr(ptr);
+        let task_layout = Self::task_layout();
+
+        // We need a safeguard against panics because destructors can panic.
+        abort_on_panic(|| {
+            // Drop the schedule function.
+            (raw.schedule as *mut S).drop_in_place();
+
+            // Drop the tag.
+            (raw.tag as *mut T).drop_in_place();
+        });
+
+        // Finally, deallocate the memory reserved by the task.
+        alloc::dealloc(ptr as *mut u8, task_layout.layout);
+    }
+
+    /// Runs a task.
+    ///
+    /// If polling its future panics, the task will be closed and the panic propagated into the
+    /// caller.
+    unsafe fn run(ptr: *const ()) {
+        let raw = Self::from_ptr(ptr);
+
+        // Create a context from the raw task pointer and the vtable inside the its header.
+        let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
+            ptr,
+            &(*raw.header).vtable.raw_waker,
+        )));
+        let cx = &mut Context::from_waker(&waker);
+
+        let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+        // Update the task's state before polling its future.
+        loop {
+            // If the task has been closed, drop the task reference and return.
+            if state & CLOSED != 0 {
+                // Notify the awaiter that the task has been closed.
+                if state & AWAITER != 0 {
+                    (*raw.header).notify();
+                }
+
+                // Drop the future.
+                Self::drop_future(ptr);
+
+                // Drop the task reference.
+                Self::decrement(ptr);
+                return;
+            }
+
+            // Mark the task as unscheduled and running.
+            match (*raw.header).state.compare_exchange_weak(
+                state,
+                (state & !SCHEDULED) | RUNNING,
+                Ordering::AcqRel,
+                Ordering::Acquire,
+            ) {
+                Ok(_) => {
+                    // Update the state because we're continuing with polling the future.
+                    state = (state & !SCHEDULED) | RUNNING;
+                    break;
+                }
+                Err(s) => state = s,
+            }
+        }
+
+        // Poll the inner future, but surround it with a guard that closes the task in case polling
+        // panics.
+        let guard = Guard(raw);
+        let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
+        mem::forget(guard);
+
+        match poll {
+            Poll::Ready(out) => {
+                // Replace the future with its output.
+                Self::drop_future(ptr);
+                raw.output.write(out);
+
+                // A place where the output will be stored in case it needs to be dropped.
+                let mut output = None;
+
+                // The task is now completed.
+                loop {
+                    // If the handle is dropped, we'll need to close it and drop the output.
+                    let new = if state & HANDLE == 0 {
+                        (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
+                    } else {
+                        (state & !RUNNING & !SCHEDULED) | COMPLETED
+                    };
+
+                    // Mark the task as not running and completed.
+                    match (*raw.header).state.compare_exchange_weak(
+                        state,
+                        new,
+                        Ordering::AcqRel,
+                        Ordering::Acquire,
+                    ) {
+                        Ok(_) => {
+                            // If the handle is dropped or if the task was closed while running,
+                            // now it's time to drop the output.
+                            if state & HANDLE == 0 || state & CLOSED != 0 {
+                                // Read the output.
+                                output = Some(raw.output.read());
+                            }
+
+                            // Notify the awaiter that the task has been completed.
+                            if state & AWAITER != 0 {
+                                (*raw.header).notify();
+                            }
+
+                            // Drop the task reference.
+                            Self::decrement(ptr);
+                            break;
+                        }
+                        Err(s) => state = s,
+                    }
+                }
+
+                // Drop the output if it was taken out of the task.
+                drop(output);
+            }
+            Poll::Pending => {
+                // The task is still not completed.
+                loop {
+                    // If the task was closed while running, we'll need to unschedule in case it
+                    // was woken and then clean up its resources.
+                    let new = if state & CLOSED != 0 {
+                        state & !RUNNING & !SCHEDULED
+                    } else {
+                        state & !RUNNING
+                    };
+
+                    // Mark the task as not running.
+                    match (*raw.header).state.compare_exchange_weak(
+                        state,
+                        new,
+                        Ordering::AcqRel,
+                        Ordering::Acquire,
+                    ) {
+                        Ok(state) => {
+                            // If the task was closed while running, we need to drop its future.
+                            // If the task was woken while running, we need to schedule it.
+                            // Otherwise, we just drop the task reference.
+                            if state & CLOSED != 0 {
+                                // The thread that closed the task didn't drop the future because
+                                // it was running so now it's our responsibility to do so.
+                                Self::drop_future(ptr);
+
+                                // Drop the task reference.
+                                Self::decrement(ptr);
+                            } else if state & SCHEDULED != 0 {
+                                // The thread that has woken the task didn't reschedule it because
+                                // it was running so now it's our responsibility to do so.
+                                Self::schedule(ptr);
+                            } else {
+                                // Drop the task reference.
+                                Self::decrement(ptr);
+                            }
+                            break;
+                        }
+                        Err(s) => state = s,
+                    }
+                }
+            }
+        }
+
+        /// A guard that closes the task if polling its future panics.
+        struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
+        where
+            F: Future<Output = R> + Send + 'static,
+            R: Send + 'static,
+            S: Fn(Task<T>) + Send + Sync + 'static,
+            T: Send + 'static;
+
+        impl<F, R, S, T> Drop for Guard<F, R, S, T>
+        where
+            F: Future<Output = R> + Send + 'static,
+            R: Send + 'static,
+            S: Fn(Task<T>) + Send + Sync + 'static,
+            T: Send + 'static,
+        {
+            fn drop(&mut self) {
+                let raw = self.0;
+                let ptr = raw.header as *const ();
+
+                unsafe {
+                    let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+                    loop {
+                        // If the task was closed while running, then unschedule it, drop its
+                        // future, and drop the task reference.
+                        if state & CLOSED != 0 {
+                            // We still need to unschedule the task because it is possible it was
+                            // woken while running.
+                            (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+                            // The thread that closed the task didn't drop the future because it
+                            // was running so now it's our responsibility to do so.
+                            RawTask::<F, R, S, T>::drop_future(ptr);
+
+                            // Drop the task reference.
+                            RawTask::<F, R, S, T>::decrement(ptr);
+                            break;
+                        }
+
+                        // Mark the task as not running, not scheduled, and closed.
+                        match (*raw.header).state.compare_exchange_weak(
+                            state,
+                            (state & !RUNNING & !SCHEDULED) | CLOSED,
+                            Ordering::AcqRel,
+                            Ordering::Acquire,
+                        ) {
+                            Ok(state) => {
+                                // Drop the future because the task is now closed.
+                                RawTask::<F, R, S, T>::drop_future(ptr);
+
+                                // Notify the awaiter that the task has been closed.
+                                if state & AWAITER != 0 {
+                                    (*raw.header).notify();
+                                }
+
+                                // Drop the task reference.
+                                RawTask::<F, R, S, T>::decrement(ptr);
+                                break;
+                            }
+                            Err(s) => state = s,
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/src/state.rs b/src/state.rs
new file mode 100644
index 0000000..d6ce34f
--- /dev/null
+++ b/src/state.rs
@@ -0,0 +1,65 @@
+/// Set if the task is scheduled for running.
+///
+/// A task is considered to be scheduled whenever its `Task` reference exists. It is in scheduled
+/// state at the moment of creation and when it gets unapused either by its `JoinHandle` or woken
+/// by a `Waker`.
+///
+/// This flag can't be set when the task is completed. However, it can be set while the task is
+/// running, in which case it will be rescheduled as soon as polling finishes.
+pub(crate) const SCHEDULED: usize = 1 << 0;
+
+/// Set if the task is running.
+///
+/// A task is running state while its future is being polled.
+///
+/// This flag can't be set when the task is completed. However, it can be in scheduled state while
+/// it is running, in which case it will be rescheduled when it stops being polled.
+pub(crate) const RUNNING: usize = 1 << 1;
+
+/// Set if the task has been completed.
+///
+/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored
+/// inside the task until it becomes stopped. In fact, `JoinHandle` picks the output up by marking
+/// the task as stopped.
+///
+/// This flag can't be set when the task is scheduled or completed.
+pub(crate) const COMPLETED: usize = 1 << 2;
+
+/// Set if the task is closed.
+///
+/// If a task is closed, that means its either cancelled or its output has been consumed by the
+/// `JoinHandle`. A task becomes closed when:
+///
+/// 1. It gets cancelled by `Task::cancel()` or `JoinHandle::cancel()`.
+/// 2. Its output is awaited by the `JoinHandle`.
+/// 3. It panics while polling the future.
+/// 4. It is completed and the `JoinHandle` is dropped.
+pub(crate) const CLOSED: usize = 1 << 3;
+
+/// Set if the `JoinHandle` still exists.
+///
+/// The `JoinHandle` is a special case in that it is only tracked by this flag, while all other
+/// task references (`Task` and `Waker`s) are tracked by the reference count.
+pub(crate) const HANDLE: usize = 1 << 4;
+
+/// Set if the `JoinHandle` is awaiting the output.
+///
+/// This flag is set while there is a registered awaiter of type `Waker` inside the task. When the
+/// task gets closed or completed, we need to wake the awaiter. This flag can be used as a fast
+/// check that tells us if we need to wake anyone without acquiring the lock inside the task.
+pub(crate) const AWAITER: usize = 1 << 5;
+
+/// Set if the awaiter is locked.
+///
+/// This lock is acquired before a new awaiter is registered or the existing one is woken.
+pub(crate) const LOCKED: usize = 1 << 6;
+
+/// A single reference.
+///
+/// The lower bits in the state contain various flags representing the task state, while the upper
+/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the
+/// total reference count.
+///
+/// Note that the reference counter only tracks the `Task` and `Waker`s. The `JoinHandle` is
+/// tracked separately by the `HANDLE` flag.
+pub(crate) const REFERENCE: usize = 1 << 7;
diff --git a/src/task.rs b/src/task.rs
new file mode 100644
index 0000000..8bfc164
--- /dev/null
+++ b/src/task.rs
@@ -0,0 +1,390 @@
+use std::fmt;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::mem;
+use std::ptr::NonNull;
+
+use crate::header::Header;
+use crate::raw::RawTask;
+use crate::JoinHandle;
+
+/// Creates a new task.
+///
+/// This constructor returns a `Task` reference that runs the future and a [`JoinHandle`] that
+/// awaits its result.
+///
+/// The `tag` is stored inside the allocated task.
+///
+/// When run, the task polls `future`. When woken, it gets scheduled for running by the
+/// `schedule` function.
+///
+/// # Examples
+///
+/// ```
+/// # #![feature(async_await)]
+/// use crossbeam::channel;
+///
+/// // The future inside the task.
+/// let future = async {
+///     println!("Hello, world!");
+/// };
+///
+/// // If the task gets woken, it will be sent into this channel.
+/// let (s, r) = channel::unbounded();
+/// let schedule = move |task| s.send(task).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (task, handle) = async_task::spawn(future, schedule, ());
+/// ```
+///
+/// [`JoinHandle`]: struct.JoinHandle.html
+pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
+where
+    F: Future<Output = R> + Send + 'static,
+    R: Send + 'static,
+    S: Fn(Task<T>) + Send + Sync + 'static,
+    T: Send + Sync + 'static,
+{
+    let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule);
+    let task = Task {
+        raw_task,
+        _marker: PhantomData,
+    };
+    let handle = JoinHandle {
+        raw_task,
+        _marker: PhantomData,
+    };
+    (task, handle)
+}
+
+/// A task that runs a future.
+///
+/// # Construction
+///
+/// A task is a heap-allocated structure containing:
+///
+/// * A reference counter.
+/// * The state of the task.
+/// * Arbitrary piece of data called a *tag*.
+/// * A function that schedules the task when woken.
+/// * A future or its result if polling has completed.
+///
+/// Constructor [`Task::create()`] returns a [`Task`] and a [`JoinHandle`]. Those two references
+/// are like two sides of the task: one runs the future and the other awaits its result.
+///
+/// # Behavior
+///
+/// The [`Task`] reference "owns" the task itself and is used to [run] it. Running consumes the
+/// [`Task`] reference and polls its internal future. If the future is still pending after being
+/// polled, the [`Task`] reference will be recreated when woken by a [`Waker`]. If the future
+/// completes, its result becomes available to the [`JoinHandle`].
+///
+/// The [`JoinHandle`] is a [`Future`] that awaits the result of the task.
+///
+/// When the task is woken, its [`Task`] reference is recreated and passed to the schedule function
+/// provided during construction. In most executors, scheduling simply pushes the [`Task`] into a
+/// queue of runnable tasks.
+///
+/// If the [`Task`] reference is dropped without being run, the task is cancelled.
+///
+/// Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task
+/// won't be scheduled again even if a [`Waker`] wakes it or the [`JoinHandle`] is polled. An
+/// attempt to run a cancelled task won't do anything. And if the cancelled task has already
+/// completed, awaiting its result through [`JoinHandle`] will return `None`.
+///
+/// If polling the task's future panics, it gets cancelled automatically.
+///
+/// # Task states
+///
+/// A task can be in the following states:
+///
+/// * Sleeping: The [`Task`] reference doesn't exist and is waiting to be scheduled by a [`Waker`].
+/// * Scheduled: The [`Task`] reference exists and is waiting to be [run].
+/// * Completed: The [`Task`] reference doesn't exist anymore and can't be rescheduled, but its
+///   result is available to the [`JoinHandle`].
+/// * Cancelled: The [`Task`] reference may or may not exist, but running it does nothing and
+///   awaiting the [`JoinHandle`] returns `None`.
+///
+/// When constructed, the task is initially in the scheduled state.
+///
+/// # Destruction
+///
+/// The future inside the task gets dropped in the following cases:
+///
+/// * When [`Task`] is dropped.
+/// * When [`Task`] is run to completion.
+///
+/// If the future hasn't been dropped and the last [`Waker`] or [`JoinHandle`] is dropped, or if
+/// a [`JoinHandle`] cancels the task, then the task will be scheduled one last time so that its
+/// future gets dropped by the executor. In other words, the task's future can be dropped only by
+/// [`Task`].
+///
+/// When the task completes, the result of its future is stored inside the allocation. This result
+/// is taken out when the [`JoinHandle`] awaits it. When the task is cancelled or the
+/// [`JoinHandle`] is dropped without being awaited, the result gets dropped too.
+///
+/// The task gets deallocated when all references to it are dropped, which includes the [`Task`],
+/// the [`JoinHandle`], and any associated [`Waker`]s.
+///
+/// The tag inside the task and the schedule function get dropped at the time of deallocation.
+///
+/// # Panics
+///
+/// If polling the inner future inside [`run()`] panics, the panic will be propagated into
+/// the caller. Likewise, a panic inside the task result's destructor will be propagated. All other
+/// panics result in the process being aborted.
+///
+/// More precisely, the process is aborted if a panic occurs:
+///
+/// * Inside the schedule function.
+/// * While dropping the tag.
+/// * While dropping the future.
+/// * While dropping the schedule function.
+/// * While waking the task awaiting the [`JoinHandle`].
+///
+/// [`run()`]: struct.Task.html#method.run
+/// [run]: struct.Task.html#method.run
+/// [`JoinHandle`]: struct.JoinHandle.html
+/// [`Task`]: struct.Task.html
+/// [`Task::create()`]: struct.Task.html#method.create
+/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html
+/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
+///
+/// # Examples
+///
+/// ```
+/// # #![feature(async_await)]
+/// use async_task::Task;
+/// use crossbeam::channel;
+/// use futures::executor;
+///
+/// // The future inside the task.
+/// let future = async {
+///     println!("Hello, world!");
+/// };
+///
+/// // If the task gets woken, it will be sent into this channel.
+/// let (s, r) = channel::unbounded();
+/// let schedule = move |task| s.send(task).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (task, handle) = async_task::spawn(future, schedule, ());
+///
+/// // Run the task. In this example, it will complete after a single run.
+/// task.run();
+/// assert!(r.is_empty());
+///
+/// // Await its result.
+/// executor::block_on(handle);
+/// ```
+pub struct Task<T> {
+    /// A pointer to the heap-allocated task.
+    pub(crate) raw_task: NonNull<()>,
+
+    /// A marker capturing the generic type `T`.
+    pub(crate) _marker: PhantomData<T>,
+}
+
+unsafe impl<T> Send for Task<T> {}
+unsafe impl<T> Sync for Task<T> {}
+
+impl<T> Task<T> {
+    /// Schedules the task.
+    ///
+    /// This is a convenience method that simply reschedules the task by passing it to its schedule
+    /// function.
+    ///
+    /// If the task is cancelled, this method won't do anything.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #![feature(async_await)]
+    /// use crossbeam::channel;
+    ///
+    /// // The future inside the task.
+    /// let future = async {
+    ///     println!("Hello, world!");
+    /// };
+    ///
+    /// // If the task gets woken, it will be sent into this channel.
+    /// let (s, r) = channel::unbounded();
+    /// let schedule = move |task| s.send(task).unwrap();
+    ///
+    /// // Create a task with the future and the schedule function.
+    /// let (task, handle) = async_task::spawn(future, schedule, ());
+    ///
+    /// // Send the task into the channel.
+    /// task.schedule();
+    ///
+    /// // Retrieve the task back from the channel.
+    /// let task = r.recv().unwrap();
+    /// ```
+    pub fn schedule(self) {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+        mem::forget(self);
+
+        unsafe {
+            ((*header).vtable.schedule)(ptr);
+        }
+    }
+
+    /// Runs the task.
+    ///
+    /// This method polls the task's future. If the future completes, its result will become
+    /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
+    /// be woken in order to be rescheduled and then run again.
+    ///
+    /// If the task is cancelled, running it won't do anything.
+    ///
+    /// # Panics
+    ///
+    /// It is possible that polling the future panics, in which case the panic will be propagated
+    /// into the caller. It is advised that invocations of this method are wrapped inside
+    /// [`catch_unwind`].
+    ///
+    /// If a panic occurs, the task is automatically cancelled.
+    ///
+    /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #![feature(async_await)]
+    /// use crossbeam::channel;
+    /// use futures::executor;
+    ///
+    /// // The future inside the task.
+    /// let future = async { 1 + 2 };
+    ///
+    /// // If the task gets woken, it will be sent into this channel.
+    /// let (s, r) = channel::unbounded();
+    /// let schedule = move |task| s.send(task).unwrap();
+    ///
+    /// // Create a task with the future and the schedule function.
+    /// let (task, handle) = async_task::spawn(future, schedule, ());
+    ///
+    /// // Run the task. In this example, it will complete after a single run.
+    /// task.run();
+    /// assert!(r.is_empty());
+    ///
+    /// // Await the result of the task.
+    /// let result = executor::block_on(handle);
+    /// assert_eq!(result, Some(3));
+    /// ```
+    pub fn run(self) {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+        mem::forget(self);
+
+        unsafe {
+            ((*header).vtable.run)(ptr);
+        }
+    }
+
+    /// Cancels the task.
+    ///
+    /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
+    /// to run it won't do anything. And if it's completed, awaiting its result evaluates to
+    /// `None`.
+    ///
+    /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #![feature(async_await)]
+    /// use crossbeam::channel;
+    /// use futures::executor;
+    ///
+    /// // The future inside the task.
+    /// let future = async { 1 + 2 };
+    ///
+    /// // If the task gets woken, it will be sent into this channel.
+    /// let (s, r) = channel::unbounded();
+    /// let schedule = move |task| s.send(task).unwrap();
+    ///
+    /// // Create a task with the future and the schedule function.
+    /// let (task, handle) = async_task::spawn(future, schedule, ());
+    ///
+    /// // Cancel the task.
+    /// task.cancel();
+    ///
+    /// // Running a cancelled task does nothing.
+    /// task.run();
+    ///
+    /// // Await the result of the task.
+    /// let result = executor::block_on(handle);
+    /// assert_eq!(result, None);
+    /// ```
+    pub fn cancel(&self) {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+
+        unsafe {
+            (*header).cancel();
+        }
+    }
+
+    /// Returns a reference to the tag stored inside the task.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #![feature(async_await)]
+    /// use crossbeam::channel;
+    ///
+    /// // The future inside the task.
+    /// let future = async { 1 + 2 };
+    ///
+    /// // If the task gets woken, it will be sent into this channel.
+    /// let (s, r) = channel::unbounded();
+    /// let schedule = move |task| s.send(task).unwrap();
+    ///
+    /// // Create a task with the future and the schedule function.
+    /// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
+    ///
+    /// // Access the tag.
+    /// assert_eq!(*task.tag(), "a simple task");
+    /// ```
+    pub fn tag(&self) -> &T {
+        let offset = Header::offset_tag::<T>();
+        let ptr = self.raw_task.as_ptr();
+
+        unsafe {
+            let raw = (ptr as *mut u8).add(offset) as *const T;
+            &*raw
+        }
+    }
+}
+
+impl<T> Drop for Task<T> {
+    fn drop(&mut self) {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+
+        unsafe {
+            // Cancel the task.
+            (*header).cancel();
+
+            // Drop the future.
+            ((*header).vtable.drop_future)(ptr);
+
+            // Drop the task reference.
+            ((*header).vtable.decrement)(ptr);
+        }
+    }
+}
+
+impl<T: fmt::Debug> fmt::Debug for Task<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let ptr = self.raw_task.as_ptr();
+        let header = ptr as *const Header;
+
+        f.debug_struct("Task")
+            .field("header", unsafe { &(*header) })
+            .field("tag", self.tag())
+            .finish()
+    }
+}
diff --git a/src/utils.rs b/src/utils.rs
new file mode 100644
index 0000000..441ead1
--- /dev/null
+++ b/src/utils.rs
@@ -0,0 +1,48 @@
+use std::alloc::Layout;
+use std::mem;
+
+/// Calls a function and aborts if it panics.
+///
+/// This is useful in unsafe code where we can't recover from panics.
+#[inline]
+pub(crate) fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
+    struct Bomb;
+
+    impl Drop for Bomb {
+        fn drop(&mut self) {
+            std::process::abort();
+        }
+    }
+
+    let bomb = Bomb;
+    let t = f();
+    mem::forget(bomb);
+    t
+}
+
+/// Returns the layout for `a` followed by `b` and the offset of `b`.
+///
+/// This function was adapted from the currently unstable `Layout::extend()`:
+/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend
+#[inline]
+pub(crate) fn extend(a: Layout, b: Layout) -> (Layout, usize) {
+    let new_align = a.align().max(b.align());
+    let pad = padding_needed_for(a, b.align());
+
+    let offset = a.size().checked_add(pad).unwrap();
+    let new_size = offset.checked_add(b.size()).unwrap();
+
+    let layout = Layout::from_size_align(new_size, new_align).unwrap();
+    (layout, offset)
+}
+
+/// Returns the padding after `layout` that aligns the following address to `align`.
+///
+/// This function was adapted from the currently unstable `Layout::padding_needed_for()`:
+/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for
+#[inline]
+pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize {
+    let len = layout.size();
+    let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
+    len_rounded_up.wrapping_sub(len)
+}
diff --git a/tests/basic.rs b/tests/basic.rs
new file mode 100644
index 0000000..b9e181b
--- /dev/null
+++ b/tests/basic.rs
@@ -0,0 +1,314 @@
+#![feature(async_await)]
+
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::task::{Context, Poll};
+
+use async_task::Task;
+use crossbeam::atomic::AtomicCell;
+use crossbeam::channel;
+use futures::future;
+use lazy_static::lazy_static;
+
+// Creates a future with event counters.
+//
+// Usage: `future!(f, POLL, DROP)`
+//
+// The future `f` always returns `Poll::Ready`.
+// When it gets polled, `POLL` is incremented.
+// When it gets dropped, `DROP` is incremented.
+macro_rules! future {
+    ($name:pat, $poll:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Fut(Box<i32>);
+
+            impl Future for Fut {
+                type Output = Box<i32>;
+
+                fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
+                    $poll.fetch_add(1);
+                    Poll::Ready(Box::new(0))
+                }
+            }
+
+            impl Drop for Fut {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            Fut(Box::new(0))
+        };
+    };
+}
+
+// Creates a schedule function with event counters.
+//
+// Usage: `schedule!(s, SCHED, DROP)`
+//
+// The schedule function `s` does nothing.
+// When it gets invoked, `SCHED` is incremented.
+// When it gets dropped, `DROP` is incremented.
+macro_rules! schedule {
+    ($name:pat, $sched:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Guard(Box<i32>);
+
+            impl Drop for Guard {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            let guard = Guard(Box::new(0));
+            move |_task| {
+                &guard;
+                $sched.fetch_add(1);
+            }
+        };
+    };
+}
+
+// Creates a task with event counters.
+//
+// Usage: `task!(task, handle f, s, DROP)`
+//
+// A task with future `f` and schedule function `s` is created.
+// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
+// When the tag inside the task gets dropped, `DROP` is incremented.
+macro_rules! task {
+    ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
+        lazy_static! {
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($task, $handle) = {
+            struct Tag(Box<i32>);
+
+            impl Drop for Tag {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            async_task::spawn($future, $schedule, Tag(Box::new(0)))
+        };
+    };
+}
+
+#[test]
+fn cancel_and_drop_handle() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    task.cancel();
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    drop(handle);
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    drop(task);
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn run_and_drop_handle() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    drop(handle);
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn drop_handle_and_run() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    drop(handle);
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn cancel_and_run() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    handle.cancel();
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    drop(handle);
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    task.run();
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn run_and_cancel() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    handle.cancel();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    drop(handle);
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn schedule() {
+    let (s, r) = channel::unbounded();
+    let schedule = move |t| s.send(t).unwrap();
+    let (task, _handle) = async_task::spawn(
+        future::poll_fn(|_| Poll::<()>::Pending),
+        schedule,
+        Box::new(0),
+    );
+
+    assert!(r.is_empty());
+    task.schedule();
+
+    let task = r.recv().unwrap();
+    assert!(r.is_empty());
+    task.schedule();
+
+    let task = r.recv().unwrap();
+    assert!(r.is_empty());
+    task.schedule();
+
+    r.recv().unwrap();
+}
+
+#[test]
+fn tag() {
+    let (s, r) = channel::unbounded();
+    let schedule = move |t| s.send(t).unwrap();
+    let (task, handle) = async_task::spawn(
+        future::poll_fn(|_| Poll::<()>::Pending),
+        schedule,
+        AtomicUsize::new(7),
+    );
+
+    assert!(r.is_empty());
+    task.schedule();
+
+    let task = r.recv().unwrap();
+    assert!(r.is_empty());
+    handle.tag().fetch_add(1, Ordering::SeqCst);
+    task.schedule();
+
+    let task = r.recv().unwrap();
+    assert_eq!(task.tag().load(Ordering::SeqCst), 8);
+    assert!(r.is_empty());
+    task.schedule();
+
+    r.recv().unwrap();
+}
+
+#[test]
+fn schedule_counter() {
+    let (s, r) = channel::unbounded();
+    let schedule = move |t: Task<AtomicUsize>| {
+        t.tag().fetch_add(1, Ordering::SeqCst);
+        s.send(t).unwrap();
+    };
+    let (task, handle) = async_task::spawn(
+        future::poll_fn(|_| Poll::<()>::Pending),
+        schedule,
+        AtomicUsize::new(0),
+    );
+    task.schedule();
+
+    assert_eq!(handle.tag().load(Ordering::SeqCst), 1);
+    r.recv().unwrap().schedule();
+
+    assert_eq!(handle.tag().load(Ordering::SeqCst), 2);
+    r.recv().unwrap().schedule();
+
+    assert_eq!(handle.tag().load(Ordering::SeqCst), 3);
+    r.recv().unwrap();
+}
diff --git a/tests/join.rs b/tests/join.rs
new file mode 100644
index 0000000..e082939
--- /dev/null
+++ b/tests/join.rs
@@ -0,0 +1,454 @@
+#![feature(async_await)]
+
+use std::cell::Cell;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::thread;
+use std::time::Duration;
+
+use async_task::Task;
+use crossbeam::atomic::AtomicCell;
+use futures::executor::block_on;
+use futures::future;
+use lazy_static::lazy_static;
+
+// Creates a future with event counters.
+//
+// Usage: `future!(f, POLL, DROP_F, DROP_O)`
+//
+// The future `f` outputs `Poll::Ready`.
+// When it gets polled, `POLL` is incremented.
+// When it gets dropped, `DROP_F` is incremented.
+// When the output gets dropped, `DROP_O` is incremented.
+macro_rules! future {
+    ($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => {
+        lazy_static! {
+            static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop_f: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop_o: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Fut(Box<i32>);
+
+            impl Future for Fut {
+                type Output = Out;
+
+                fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
+                    $poll.fetch_add(1);
+                    Poll::Ready(Out(Box::new(0)))
+                }
+            }
+
+            impl Drop for Fut {
+                fn drop(&mut self) {
+                    $drop_f.fetch_add(1);
+                }
+            }
+
+            struct Out(Box<i32>);
+
+            impl Drop for Out {
+                fn drop(&mut self) {
+                    $drop_o.fetch_add(1);
+                }
+            }
+
+            Fut(Box::new(0))
+        };
+    };
+}
+
+// Creates a schedule function with event counters.
+//
+// Usage: `schedule!(s, SCHED, DROP)`
+//
+// The schedule function `s` does nothing.
+// When it gets invoked, `SCHED` is incremented.
+// When it gets dropped, `DROP` is incremented.
+macro_rules! schedule {
+    ($name:pat, $sched:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Guard(Box<i32>);
+
+            impl Drop for Guard {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            let guard = Guard(Box::new(0));
+            move |task: Task<_>| {
+                &guard;
+                task.schedule();
+                $sched.fetch_add(1);
+            }
+        };
+    };
+}
+
+// Creates a task with event counters.
+//
+// Usage: `task!(task, handle f, s, DROP)`
+//
+// A task with future `f` and schedule function `s` is created.
+// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
+// When the tag inside the task gets dropped, `DROP` is incremented.
+macro_rules! task {
+    ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
+        lazy_static! {
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($task, $handle) = {
+            struct Tag(Box<i32>);
+
+            impl Drop for Tag {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            async_task::spawn($future, $schedule, Tag(Box::new(0)))
+        };
+    };
+}
+
+fn ms(ms: u64) -> Duration {
+    Duration::from_millis(ms)
+}
+
+#[test]
+fn cancel_and_join() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    assert_eq!(DROP_O.load(), 0);
+
+    task.cancel();
+    drop(task);
+    assert_eq!(DROP_O.load(), 0);
+
+    assert!(block_on(handle).is_none());
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+    assert_eq!(DROP_O.load(), 0);
+}
+
+#[test]
+fn run_and_join() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    assert_eq!(DROP_O.load(), 0);
+
+    task.run();
+    assert_eq!(DROP_O.load(), 0);
+
+    assert!(block_on(handle).is_some());
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+    assert_eq!(DROP_O.load(), 1);
+}
+
+#[test]
+fn drop_handle_and_run() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    assert_eq!(DROP_O.load(), 0);
+
+    drop(handle);
+    assert_eq!(DROP_O.load(), 0);
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+    assert_eq!(DROP_O.load(), 1);
+}
+
+#[test]
+fn join_twice() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, mut handle, f, s, DROP_D);
+
+    assert_eq!(DROP_O.load(), 0);
+
+    task.run();
+    assert_eq!(DROP_O.load(), 0);
+
+    assert!(block_on(&mut handle).is_some());
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(DROP_O.load(), 1);
+
+    assert!(block_on(&mut handle).is_none());
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(DROP_O.load(), 1);
+
+    drop(handle);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn join_and_cancel() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            thread::sleep(ms(100));
+
+            task.cancel();
+            drop(task);
+
+            thread::sleep(ms(200));
+            assert_eq!(POLL.load(), 0);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_O.load(), 0);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        assert!(block_on(handle).is_none());
+        assert_eq!(POLL.load(), 0);
+        assert_eq!(SCHEDULE.load(), 0);
+
+        thread::sleep(ms(100));
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_O.load(), 0);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+    })
+    .unwrap();
+}
+
+#[test]
+fn join_and_run() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            thread::sleep(ms(200));
+
+            task.run();
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+
+            thread::sleep(ms(100));
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        assert!(block_on(handle).is_some());
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_O.load(), 1);
+
+        thread::sleep(ms(100));
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+    })
+    .unwrap();
+}
+
+#[test]
+fn try_join_and_run_and_join() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, mut handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            thread::sleep(ms(200));
+
+            task.run();
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+
+            thread::sleep(ms(100));
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        block_on(future::select(&mut handle, future::ready(())));
+        assert_eq!(POLL.load(), 0);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+
+        assert!(block_on(handle).is_some());
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_O.load(), 1);
+
+        thread::sleep(ms(100));
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+    })
+    .unwrap();
+}
+
+#[test]
+fn try_join_and_cancel_and_run() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, mut handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            thread::sleep(ms(200));
+
+            task.run();
+            assert_eq!(POLL.load(), 0);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        block_on(future::select(&mut handle, future::ready(())));
+        assert_eq!(POLL.load(), 0);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 0);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 0);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+    })
+    .unwrap();
+}
+
+#[test]
+fn try_join_and_run_and_cancel() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, mut handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            thread::sleep(ms(200));
+
+            task.run();
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 0);
+            assert_eq!(DROP_D.load(), 0);
+        });
+
+        block_on(future::select(&mut handle, future::ready(())));
+        assert_eq!(POLL.load(), 0);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+
+        thread::sleep(ms(400));
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(DROP_O.load(), 1);
+    })
+    .unwrap();
+}
+
+#[test]
+fn await_output() {
+    struct Fut<T>(Cell<Option<T>>);
+
+    impl<T> Fut<T> {
+        fn new(t: T) -> Fut<T> {
+            Fut(Cell::new(Some(t)))
+        }
+    }
+
+    impl<T> Future for Fut<T> {
+        type Output = T;
+
+        fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
+            Poll::Ready(self.0.take().unwrap())
+        }
+    }
+
+    for i in 0..10 {
+        let (task, handle) = async_task::spawn(Fut::new(i), drop, Box::new(0));
+        task.run();
+        assert_eq!(block_on(handle), Some(i));
+    }
+
+    for i in 0..10 {
+        let (task, handle) = async_task::spawn(Fut::new(vec![7; i]), drop, Box::new(0));
+        task.run();
+        assert_eq!(block_on(handle), Some(vec![7; i]));
+    }
+
+    let (task, handle) = async_task::spawn(Fut::new("foo".to_string()), drop, Box::new(0));
+    task.run();
+    assert_eq!(block_on(handle), Some("foo".to_string()));
+}
diff --git a/tests/panic.rs b/tests/panic.rs
new file mode 100644
index 0000000..68058a2
--- /dev/null
+++ b/tests/panic.rs
@@ -0,0 +1,288 @@
+#![feature(async_await)]
+
+use std::future::Future;
+use std::panic::catch_unwind;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::thread;
+use std::time::Duration;
+
+use async_task::Task;
+use crossbeam::atomic::AtomicCell;
+use futures::executor::block_on;
+use futures::future;
+use lazy_static::lazy_static;
+
+// Creates a future with event counters.
+//
+// Usage: `future!(f, POLL, DROP)`
+//
+// The future `f` sleeps for 200 ms and then panics.
+// When it gets polled, `POLL` is incremented.
+// When it gets dropped, `DROP` is incremented.
+macro_rules! future {
+    ($name:pat, $poll:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Fut(Box<i32>);
+
+            impl Future for Fut {
+                type Output = ();
+
+                fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
+                    $poll.fetch_add(1);
+                    thread::sleep(ms(200));
+                    panic!()
+                }
+            }
+
+            impl Drop for Fut {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            Fut(Box::new(0))
+        };
+    };
+}
+
+// Creates a schedule function with event counters.
+//
+// Usage: `schedule!(s, SCHED, DROP)`
+//
+// The schedule function `s` does nothing.
+// When it gets invoked, `SCHED` is incremented.
+// When it gets dropped, `DROP` is incremented.
+macro_rules! schedule {
+    ($name:pat, $sched:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Guard(Box<i32>);
+
+            impl Drop for Guard {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            let guard = Guard(Box::new(0));
+            move |_task: Task<_>| {
+                &guard;
+                $sched.fetch_add(1);
+            }
+        };
+    };
+}
+
+// Creates a task with event counters.
+//
+// Usage: `task!(task, handle f, s, DROP)`
+//
+// A task with future `f` and schedule function `s` is created.
+// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
+// When the tag inside the task gets dropped, `DROP` is incremented.
+macro_rules! task {
+    ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
+        lazy_static! {
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($task, $handle) = {
+            struct Tag(Box<i32>);
+
+            impl Drop for Tag {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            async_task::spawn($future, $schedule, Tag(Box::new(0)))
+        };
+    };
+}
+
+fn ms(ms: u64) -> Duration {
+    Duration::from_millis(ms)
+}
+
+#[test]
+fn cancel_during_run() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+    })
+    .unwrap();
+}
+
+#[test]
+fn run_and_join() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    assert!(catch_unwind(|| task.run()).is_err());
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    assert!(block_on(handle).is_none());
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn try_join_and_run_and_join() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, mut handle, f, s, DROP_D);
+
+    block_on(future::select(&mut handle, future::ready(())));
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    assert!(catch_unwind(|| task.run()).is_err());
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+
+    assert!(block_on(handle).is_none());
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn join_during_run() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+
+            thread::sleep(ms(100));
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        assert!(block_on(handle).is_none());
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+
+        thread::sleep(ms(100));
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+    })
+    .unwrap();
+}
+
+#[test]
+fn try_join_during_run() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, mut handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        block_on(future::select(&mut handle, future::ready(())));
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        drop(handle);
+    })
+    .unwrap();
+}
+
+#[test]
+fn drop_handle_during_run() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        drop(handle);
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+    })
+    .unwrap();
+}
diff --git a/tests/ready.rs b/tests/ready.rs
new file mode 100644
index 0000000..ecca328
--- /dev/null
+++ b/tests/ready.rs
@@ -0,0 +1,265 @@
+#![feature(async_await)]
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::thread;
+use std::time::Duration;
+
+use async_task::Task;
+use crossbeam::atomic::AtomicCell;
+use futures::executor::block_on;
+use futures::future;
+use lazy_static::lazy_static;
+
+// Creates a future with event counters.
+//
+// Usage: `future!(f, POLL, DROP_F, DROP_O)`
+//
+// The future `f` sleeps for 200 ms and outputs `Poll::Ready`.
+// When it gets polled, `POLL` is incremented.
+// When it gets dropped, `DROP_F` is incremented.
+// When the output gets dropped, `DROP_O` is incremented.
+macro_rules! future {
+    ($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => {
+        lazy_static! {
+            static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop_f: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop_o: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Fut(Box<i32>);
+
+            impl Future for Fut {
+                type Output = Out;
+
+                fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
+                    $poll.fetch_add(1);
+                    thread::sleep(ms(200));
+                    Poll::Ready(Out(Box::new(0)))
+                }
+            }
+
+            impl Drop for Fut {
+                fn drop(&mut self) {
+                    $drop_f.fetch_add(1);
+                }
+            }
+
+            struct Out(Box<i32>);
+
+            impl Drop for Out {
+                fn drop(&mut self) {
+                    $drop_o.fetch_add(1);
+                }
+            }
+
+            Fut(Box::new(0))
+        };
+    };
+}
+
+// Creates a schedule function with event counters.
+//
+// Usage: `schedule!(s, SCHED, DROP)`
+//
+// The schedule function `s` does nothing.
+// When it gets invoked, `SCHED` is incremented.
+// When it gets dropped, `DROP` is incremented.
+macro_rules! schedule {
+    ($name:pat, $sched:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let $name = {
+            struct Guard(Box<i32>);
+
+            impl Drop for Guard {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            let guard = Guard(Box::new(0));
+            move |_task: Task<_>| {
+                &guard;
+                $sched.fetch_add(1);
+            }
+        };
+    };
+}
+
+// Creates a task with event counters.
+//
+// Usage: `task!(task, handle f, s, DROP)`
+//
+// A task with future `f` and schedule function `s` is created.
+// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
+// When the tag inside the task gets dropped, `DROP` is incremented.
+macro_rules! task {
+    ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
+        lazy_static! {
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($task, $handle) = {
+            struct Tag(Box<i32>);
+
+            impl Drop for Tag {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            async_task::spawn($future, $schedule, Tag(Box::new(0)))
+        };
+    };
+}
+
+fn ms(ms: u64) -> Duration {
+    Duration::from_millis(ms)
+}
+
+#[test]
+fn cancel_during_run() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 0);
+            assert_eq!(DROP_D.load(), 0);
+            assert_eq!(DROP_O.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 1);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(DROP_O.load(), 1);
+    })
+    .unwrap();
+}
+
+#[test]
+fn join_during_run() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+
+            thread::sleep(ms(100));
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        assert!(block_on(handle).is_some());
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_O.load(), 1);
+
+        thread::sleep(ms(100));
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+    })
+    .unwrap();
+}
+
+#[test]
+fn try_join_during_run() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, mut handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(DROP_O.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        block_on(future::select(&mut handle, future::ready(())));
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+        drop(handle);
+    })
+    .unwrap();
+}
+
+#[test]
+fn drop_handle_during_run() {
+    future!(f, POLL, DROP_F, DROP_O);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            assert_eq!(POLL.load(), 1);
+            assert_eq!(SCHEDULE.load(), 0);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(DROP_O.load(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        drop(handle);
+        assert_eq!(POLL.load(), 1);
+        assert_eq!(SCHEDULE.load(), 0);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(DROP_O.load(), 0);
+    })
+    .unwrap();
+}
diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs
new file mode 100644
index 0000000..a683f26
--- /dev/null
+++ b/tests/waker_panic.rs
@@ -0,0 +1,357 @@
+#![feature(async_await)]
+
+use std::cell::Cell;
+use std::future::Future;
+use std::panic::catch_unwind;
+use std::pin::Pin;
+use std::task::Waker;
+use std::task::{Context, Poll};
+use std::thread;
+use std::time::Duration;
+
+use async_task::Task;
+use crossbeam::atomic::AtomicCell;
+use crossbeam::channel;
+use lazy_static::lazy_static;
+
+// Creates a future with event counters.
+//
+// Usage: `future!(f, waker, POLL, DROP)`
+//
+// The future `f` always sleeps for 200 ms, and panics the second time it is polled.
+// When it gets polled, `POLL` is incremented.
+// When it gets dropped, `DROP` is incremented.
+//
+// Every time the future is run, it stores the waker into a global variable.
+// This waker can be extracted using the `waker` function.
+macro_rules! future {
+    ($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+            static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
+        }
+
+        let ($name, $waker) = {
+            struct Fut(Cell<bool>, Box<i32>);
+
+            impl Future for Fut {
+                type Output = ();
+
+                fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+                    WAKER.store(Some(cx.waker().clone()));
+                    $poll.fetch_add(1);
+                    thread::sleep(ms(200));
+
+                    if self.0.get() {
+                        panic!()
+                    } else {
+                        self.0.set(true);
+                        Poll::Pending
+                    }
+                }
+            }
+
+            impl Drop for Fut {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            (Fut(Cell::new(false), Box::new(0)), || {
+                WAKER.swap(None).unwrap()
+            })
+        };
+    };
+}
+
+// Creates a schedule function with event counters.
+//
+// Usage: `schedule!(s, chan, SCHED, DROP)`
+//
+// The schedule function `s` pushes the task into `chan`.
+// When it gets invoked, `SCHED` is incremented.
+// When it gets dropped, `DROP` is incremented.
+//
+// Receiver `chan` extracts the task when it is scheduled.
+macro_rules! schedule {
+    ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($name, $chan) = {
+            let (s, r) = channel::unbounded();
+
+            struct Guard(Box<i32>);
+
+            impl Drop for Guard {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            let guard = Guard(Box::new(0));
+            let sched = move |task: Task<_>| {
+                &guard;
+                $sched.fetch_add(1);
+                s.send(task).unwrap();
+            };
+
+            (sched, r)
+        };
+    };
+}
+
+// Creates a task with event counters.
+//
+// Usage: `task!(task, handle f, s, DROP)`
+//
+// A task with future `f` and schedule function `s` is created.
+// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
+// When the tag inside the task gets dropped, `DROP` is incremented.
+macro_rules! task {
+    ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
+        lazy_static! {
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($task, $handle) = {
+            struct Tag(Box<i32>);
+
+            impl Drop for Tag {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            async_task::spawn($future, $schedule, Tag(Box::new(0)))
+        };
+    };
+}
+
+fn ms(ms: u64) -> Duration {
+    Duration::from_millis(ms)
+}
+
+#[test]
+fn wake_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake_by_ref();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            drop(waker());
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 1);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(chan.len(), 0);
+        });
+
+        thread::sleep(ms(100));
+
+        w.wake();
+        drop(handle);
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(chan.len(), 0);
+    })
+    .unwrap();
+}
+
+#[test]
+fn cancel_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            drop(waker());
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 1);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(chan.len(), 0);
+        });
+
+        thread::sleep(ms(100));
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(chan.len(), 0);
+    })
+    .unwrap();
+}
+
+#[test]
+fn wake_and_cancel_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake_by_ref();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            drop(waker());
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 1);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(chan.len(), 0);
+        });
+
+        thread::sleep(ms(100));
+
+        w.wake();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(chan.len(), 0);
+    })
+    .unwrap();
+}
+
+#[test]
+fn cancel_and_wake_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake_by_ref();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            assert!(catch_unwind(|| task.run()).is_err());
+            drop(waker());
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 1);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(chan.len(), 0);
+        });
+
+        thread::sleep(ms(100));
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        w.wake();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(chan.len(), 0);
+    })
+    .unwrap();
+}
diff --git a/tests/waker_pending.rs b/tests/waker_pending.rs
new file mode 100644
index 0000000..547ff7a
--- /dev/null
+++ b/tests/waker_pending.rs
@@ -0,0 +1,348 @@
+#![feature(async_await)]
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Waker;
+use std::task::{Context, Poll};
+use std::thread;
+use std::time::Duration;
+
+use async_task::Task;
+use crossbeam::atomic::AtomicCell;
+use crossbeam::channel;
+use lazy_static::lazy_static;
+
+// Creates a future with event counters.
+//
+// Usage: `future!(f, waker, POLL, DROP)`
+//
+// The future `f` always sleeps for 200 ms and returns `Poll::Pending`.
+// When it gets polled, `POLL` is incremented.
+// When it gets dropped, `DROP` is incremented.
+//
+// Every time the future is run, it stores the waker into a global variable.
+// This waker can be extracted using the `waker` function.
+macro_rules! future {
+    ($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+            static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
+        }
+
+        let ($name, $waker) = {
+            struct Fut(Box<i32>);
+
+            impl Future for Fut {
+                type Output = ();
+
+                fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+                    WAKER.store(Some(cx.waker().clone()));
+                    $poll.fetch_add(1);
+                    thread::sleep(ms(200));
+                    Poll::Pending
+                }
+            }
+
+            impl Drop for Fut {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            (Fut(Box::new(0)), || WAKER.swap(None).unwrap())
+        };
+    };
+}
+
+// Creates a schedule function with event counters.
+//
+// Usage: `schedule!(s, chan, SCHED, DROP)`
+//
+// The schedule function `s` pushes the task into `chan`.
+// When it gets invoked, `SCHED` is incremented.
+// When it gets dropped, `DROP` is incremented.
+//
+// Receiver `chan` extracts the task when it is scheduled.
+macro_rules! schedule {
+    ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($name, $chan) = {
+            let (s, r) = channel::unbounded();
+
+            struct Guard(Box<i32>);
+
+            impl Drop for Guard {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            let guard = Guard(Box::new(0));
+            let sched = move |task: Task<_>| {
+                &guard;
+                $sched.fetch_add(1);
+                s.send(task).unwrap();
+            };
+
+            (sched, r)
+        };
+    };
+}
+
+// Creates a task with event counters.
+//
+// Usage: `task!(task, handle f, s, DROP)`
+//
+// A task with future `f` and schedule function `s` is created.
+// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
+// When the tag inside the task gets dropped, `DROP` is incremented.
+macro_rules! task {
+    ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
+        lazy_static! {
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($task, $handle) = {
+            struct Tag(Box<i32>);
+
+            impl Drop for Tag {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            async_task::spawn($future, $schedule, Tag(Box::new(0)))
+        };
+    };
+}
+
+fn ms(ms: u64) -> Duration {
+    Duration::from_millis(ms)
+}
+
+#[test]
+fn wake_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, _handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake_by_ref();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 2);
+            assert_eq!(DROP_F.load(), 0);
+            assert_eq!(DROP_S.load(), 0);
+            assert_eq!(DROP_D.load(), 0);
+            assert_eq!(chan.len(), 1);
+        });
+
+        thread::sleep(ms(100));
+
+        w.wake_by_ref();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 2);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 1);
+    })
+    .unwrap();
+
+    chan.recv().unwrap();
+    drop(waker());
+}
+
+#[test]
+fn cancel_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            drop(waker());
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 1);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(chan.len(), 0);
+        });
+
+        thread::sleep(ms(100));
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(chan.len(), 0);
+    })
+    .unwrap();
+}
+
+#[test]
+fn wake_and_cancel_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake_by_ref();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            drop(waker());
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 1);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(chan.len(), 0);
+        });
+
+        thread::sleep(ms(100));
+
+        w.wake();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(chan.len(), 0);
+    })
+    .unwrap();
+}
+
+#[test]
+fn cancel_and_wake_during_run() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    w.wake_by_ref();
+    let task = chan.recv().unwrap();
+
+    crossbeam::scope(|scope| {
+        scope.spawn(|_| {
+            task.run();
+            drop(waker());
+            assert_eq!(POLL.load(), 2);
+            assert_eq!(SCHEDULE.load(), 1);
+            assert_eq!(DROP_F.load(), 1);
+            assert_eq!(DROP_S.load(), 1);
+            assert_eq!(DROP_D.load(), 1);
+            assert_eq!(chan.len(), 0);
+        });
+
+        thread::sleep(ms(100));
+
+        handle.cancel();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        drop(handle);
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        w.wake();
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 0);
+        assert_eq!(DROP_S.load(), 0);
+        assert_eq!(DROP_D.load(), 0);
+        assert_eq!(chan.len(), 0);
+
+        thread::sleep(ms(200));
+
+        assert_eq!(POLL.load(), 2);
+        assert_eq!(SCHEDULE.load(), 1);
+        assert_eq!(DROP_F.load(), 1);
+        assert_eq!(DROP_S.load(), 1);
+        assert_eq!(DROP_D.load(), 1);
+        assert_eq!(chan.len(), 0);
+    })
+    .unwrap();
+}
diff --git a/tests/waker_ready.rs b/tests/waker_ready.rs
new file mode 100644
index 0000000..e64cc55
--- /dev/null
+++ b/tests/waker_ready.rs
@@ -0,0 +1,328 @@
+#![feature(async_await)]
+
+use std::cell::Cell;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Waker;
+use std::task::{Context, Poll};
+use std::thread;
+use std::time::Duration;
+
+use async_task::Task;
+use crossbeam::atomic::AtomicCell;
+use crossbeam::channel;
+use lazy_static::lazy_static;
+
+// Creates a future with event counters.
+//
+// Usage: `future!(f, waker, POLL, DROP)`
+//
+// The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled.
+// When it gets polled, `POLL` is incremented.
+// When it gets dropped, `DROP` is incremented.
+//
+// Every time the future is run, it stores the waker into a global variable.
+// This waker can be extracted using the `waker` function.
+macro_rules! future {
+    ($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+            static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
+        }
+
+        let ($name, $waker) = {
+            struct Fut(Cell<bool>, Box<i32>);
+
+            impl Future for Fut {
+                type Output = Box<i32>;
+
+                fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+                    WAKER.store(Some(cx.waker().clone()));
+                    $poll.fetch_add(1);
+                    thread::sleep(ms(200));
+
+                    if self.0.get() {
+                        Poll::Ready(Box::new(0))
+                    } else {
+                        self.0.set(true);
+                        Poll::Pending
+                    }
+                }
+            }
+
+            impl Drop for Fut {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            (Fut(Cell::new(false), Box::new(0)), || {
+                WAKER.swap(None).unwrap()
+            })
+        };
+    };
+}
+
+// Creates a schedule function with event counters.
+//
+// Usage: `schedule!(s, chan, SCHED, DROP)`
+//
+// The schedule function `s` pushes the task into `chan`.
+// When it gets invoked, `SCHED` is incremented.
+// When it gets dropped, `DROP` is incremented.
+//
+// Receiver `chan` extracts the task when it is scheduled.
+macro_rules! schedule {
+    ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
+        lazy_static! {
+            static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($name, $chan) = {
+            let (s, r) = channel::unbounded();
+
+            struct Guard(Box<i32>);
+
+            impl Drop for Guard {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            let guard = Guard(Box::new(0));
+            let sched = move |task: Task<_>| {
+                &guard;
+                $sched.fetch_add(1);
+                s.send(task).unwrap();
+            };
+
+            (sched, r)
+        };
+    };
+}
+
+// Creates a task with event counters.
+//
+// Usage: `task!(task, handle f, s, DROP)`
+//
+// A task with future `f` and schedule function `s` is created.
+// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
+// When the tag inside the task gets dropped, `DROP` is incremented.
+macro_rules! task {
+    ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
+        lazy_static! {
+            static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
+        }
+
+        let ($task, $handle) = {
+            struct Tag(Box<i32>);
+
+            impl Drop for Tag {
+                fn drop(&mut self) {
+                    $drop.fetch_add(1);
+                }
+            }
+
+            async_task::spawn($future, $schedule, Tag(Box::new(0)))
+        };
+    };
+}
+
+fn ms(ms: u64) -> Duration {
+    Duration::from_millis(ms)
+}
+
+#[test]
+fn wake() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(mut task, _, f, s, DROP_D);
+
+    assert!(chan.is_empty());
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    waker().wake();
+    task = chan.recv().unwrap();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    task.run();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    waker().wake();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+    assert_eq!(chan.len(), 0);
+}
+
+#[test]
+fn wake_by_ref() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(mut task, _, f, s, DROP_D);
+
+    assert!(chan.is_empty());
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    waker().wake_by_ref();
+    task = chan.recv().unwrap();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    task.run();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    waker().wake_by_ref();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+    assert_eq!(chan.len(), 0);
+}
+
+#[test]
+fn clone() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(mut task, _, f, s, DROP_D);
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    let w2 = waker().clone();
+    let w3 = w2.clone();
+    let w4 = w3.clone();
+    w4.wake();
+
+    task = chan.recv().unwrap();
+    task.run();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    w3.wake();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    drop(w2);
+    drop(waker());
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+}
+
+#[test]
+fn wake_cancelled() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, _, f, s, DROP_D);
+
+    task.run();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    let w = waker();
+
+    w.wake_by_ref();
+    chan.recv().unwrap().cancel();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    w.wake();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+    assert_eq!(chan.len(), 0);
+}
+
+#[test]
+fn wake_completed() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, _, f, s, DROP_D);
+
+    task.run();
+    let w = waker();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    w.wake();
+    chan.recv().unwrap().run();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_D.load(), 0);
+    assert_eq!(chan.len(), 0);
+
+    waker().wake();
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_D.load(), 1);
+    assert_eq!(chan.len(), 0);
+}