Task only becomes ready when the future is dropped
diff --git a/src/header.rs b/src/header.rs
index e676bef..5d808c7 100644
--- a/src/header.rs
+++ b/src/header.rs
@@ -32,8 +32,8 @@
 impl Header {
     /// Cancels the task.
     ///
-    /// This method will mark the task as closed and notify the awaiter, but it won't reschedule
-    /// the task if it's not completed.
+    /// This method will mark the task as closed, but it won't reschedule the task or drop its
+    /// future.
     pub(crate) fn cancel(&self) {
         let mut state = self.state.load(Ordering::Acquire);
 
@@ -50,14 +50,7 @@
                 Ordering::AcqRel,
                 Ordering::Acquire,
             ) {
-                Ok(_) => {
-                    // Notify the awaiter that the task has been closed.
-                    if state & AWAITER != 0 {
-                        self.notify(None);
-                    }
-
-                    break;
-                }
+                Ok(_) => break,
                 Err(s) => state = s,
             }
         }
@@ -107,7 +100,7 @@
             // If we're in the notifying state at this moment, just wake and return without
             // registering.
             if state & NOTIFYING != 0 {
-                waker.wake_by_ref();
+                abort_on_panic(|| waker.wake_by_ref());
                 return;
             }
 
@@ -139,7 +132,7 @@
             // If there was a notification, take the waker out of the awaiter field.
             if state & NOTIFYING != 0 {
                 if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
-                    waker = Some(w);
+                    abort_on_panic(|| waker = Some(w));
                 }
             }
 
diff --git a/src/join_handle.rs b/src/join_handle.rs
index f4710cc..10a189e 100644
--- a/src/join_handle.rs
+++ b/src/join_handle.rs
@@ -8,7 +8,6 @@
 
 use crate::header::Header;
 use crate::state::*;
-use crate::utils::abort_on_panic;
 
 /// A handle that awaits the result of a task.
 ///
@@ -199,6 +198,23 @@
             loop {
                 // If the task has been closed, notify the awaiter and return `None`.
                 if state & CLOSED != 0 {
+                    // If the task is scheduled or running, we need to wait until its future is
+                    // dropped.
+                    if state & (SCHEDULED | RUNNING) != 0 {
+                        // Replace the waker with one associated with the current task.
+                        (*header).register(cx.waker());
+
+                        // Reload the state after registering. It is possible changes occurred just
+                        // before registration so we need to check for that.
+                        state = (*header).state.load(Ordering::Acquire);
+
+                        // If the task is still scheduled or running, we need to wait because its
+                        // future is not dropped yet.
+                        if state & (SCHEDULED | RUNNING) != 0 {
+                            return Poll::Pending;
+                        }
+                    }
+
                     // Even though the awaiter is most likely the current task, it could also be
                     // another task.
                     (*header).notify(Some(cx.waker()));
@@ -207,21 +223,16 @@
 
                 // If the task is not completed, register the current task.
                 if state & COMPLETED == 0 {
-                    // Replace the waker with one associated with the current task. We need a
-                    // safeguard against panics because dropping the previous waker can panic.
-                    abort_on_panic(|| {
-                        (*header).register(cx.waker());
-                    });
+                    // Replace the waker with one associated with the current task.
+                    (*header).register(cx.waker());
 
                     // Reload the state after registering. It is possible that the task became
                     // completed or closed just before registration so we need to check for that.
                     state = (*header).state.load(Ordering::Acquire);
 
-                    // If the task has been closed, return `None`. We do not need to notify the
-                    // awaiter here, since we have replaced the waker above, and the executor can
-                    // only set it back to `None`.
+                    // If the task has been closed, restart.
                     if state & CLOSED != 0 {
-                        return Poll::Ready(None);
+                        continue;
                     }
 
                     // If the task is still not completed, we're blocked on it.
diff --git a/src/raw.rs b/src/raw.rs
index ff02373..05891d7 100644
--- a/src/raw.rs
+++ b/src/raw.rs
@@ -464,14 +464,17 @@
         loop {
             // If the task has already been closed, drop the task reference and return.
             if state & CLOSED != 0 {
-                // Notify the awaiter that the task has been closed.
+                // Drop the future.
+                Self::drop_future(ptr);
+
+                // Mark the task as unscheduled.
+                let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+                // Notify the awaiter that the future has been dropped.
                 if state & AWAITER != 0 {
                     (*raw.header).notify(None);
                 }
 
-                // Drop the future.
-                Self::drop_future(ptr);
-
                 // Drop the task reference.
                 Self::drop_task(ptr);
                 return false;
@@ -549,6 +552,8 @@
                 drop(output);
             }
             Poll::Pending => {
+                let mut future_dropped = false;
+
                 // The task is still not completed.
                 loop {
                     // If the task was closed while running, we'll need to unschedule in case it
@@ -559,6 +564,13 @@
                         state & !RUNNING
                     };
 
+                    if state & CLOSED != 0 && !future_dropped {
+                        // The thread that closed the task didn't drop the future because it was
+                        // running so now it's our responsibility to do so.
+                        Self::drop_future(ptr);
+                        future_dropped = true;
+                    }
+
                     // Mark the task as not running.
                     match (*raw.header).state.compare_exchange_weak(
                         state,
@@ -567,14 +579,14 @@
                         Ordering::Acquire,
                     ) {
                         Ok(state) => {
-                            // If the task was closed while running, we need to drop its future.
+                            // If the task was closed while running, we need to notify the awaiter.
                             // If the task was woken up while running, we need to schedule it.
                             // Otherwise, we just drop the task reference.
                             if state & CLOSED != 0 {
-                                // The thread that closed the task didn't drop the future because
-                                // it was running so now it's our responsibility to do so.
-                                Self::drop_future(ptr);
-
+                                // Notify the awaiter that the future has been dropped.
+                                if state & AWAITER != 0 {
+                                    (*raw.header).notify(None);
+                                }
                                 // Drop the task reference.
                                 Self::drop_task(ptr);
                             } else if state & SCHEDULED != 0 {
@@ -618,14 +630,20 @@
                         // If the task was closed while running, then unschedule it, drop its
                         // future, and drop the task reference.
                         if state & CLOSED != 0 {
-                            // We still need to unschedule the task because it is possible it was
-                            // woken up while running.
-                            (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
-
                             // The thread that closed the task didn't drop the future because it
                             // was running so now it's our responsibility to do so.
                             RawTask::<F, R, S, T>::drop_future(ptr);
 
+                            // Mark the task as not running and not scheduled.
+                            (*raw.header)
+                                .state
+                                .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
+
+                            // Notify the awaiter that the future has been dropped.
+                            if state & AWAITER != 0 {
+                                (*raw.header).notify(None);
+                            }
+
                             // Drop the task reference.
                             RawTask::<F, R, S, T>::drop_task(ptr);
                             break;
@@ -642,7 +660,7 @@
                                 // Drop the future because the task is now closed.
                                 RawTask::<F, R, S, T>::drop_future(ptr);
 
-                                // Notify the awaiter that the task has been closed.
+                                // Notify the awaiter that the future has been dropped.
                                 if state & AWAITER != 0 {
                                     (*raw.header).notify(None);
                                 }
diff --git a/src/task.rs b/src/task.rs
index 7a1a5e0..d4da255 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -4,10 +4,12 @@
 use core::mem::{self, ManuallyDrop};
 use core::pin::Pin;
 use core::ptr::NonNull;
+use core::sync::atomic::Ordering;
 use core::task::{Context, Poll, Waker};
 
 use crate::header::Header;
 use crate::raw::RawTask;
+use crate::state::*;
 use crate::JoinHandle;
 
 /// Creates a new task.
@@ -315,6 +317,14 @@
             // Drop the future.
             ((*header).vtable.drop_future)(ptr);
 
+            // Mark the task as unscheduled.
+            let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+            // Notify the awaiter that the future has been dropped.
+            if state & AWAITER != 0 {
+                (*header).notify(None);
+            }
+
             // Drop the task reference.
             ((*header).vtable.drop_task)(ptr);
         }
diff --git a/tests/basic.rs b/tests/basic.rs
index 432e14c..0c0a10f 100644
--- a/tests/basic.rs
+++ b/tests/basic.rs
@@ -6,7 +6,7 @@
 use async_task::Task;
 use crossbeam::atomic::AtomicCell;
 use crossbeam::channel;
-use futures::future;
+use futures::future::{self, FutureExt};
 use lazy_static::lazy_static;
 
 // Creates a future with event counters.
@@ -238,6 +238,44 @@
 }
 
 #[test]
+fn cancel_and_poll() {
+    future!(f, POLL, DROP_F);
+    schedule!(s, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_T);
+
+    handle.cancel();
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_T.load(), 0);
+
+    let mut handle = handle;
+    assert!((&mut handle).now_or_never().is_none());
+
+    task.run();
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_T.load(), 0);
+
+    assert!((&mut handle).now_or_never().is_some());
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_T.load(), 0);
+
+    drop(handle);
+    assert_eq!(POLL.load(), 0);
+    assert_eq!(SCHEDULE.load(), 0);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_T.load(), 1);
+}
+
+#[test]
 fn schedule() {
     let (s, r) = channel::unbounded();
     let schedule = move |t| s.send(t).unwrap();
diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs
index eb77912..60b7fd6 100644
--- a/tests/waker_panic.rs
+++ b/tests/waker_panic.rs
@@ -10,6 +10,7 @@
 use async_task::Task;
 use crossbeam::atomic::AtomicCell;
 use crossbeam::channel;
+use futures::future::FutureExt;
 use lazy_static::lazy_static;
 
 // Creates a future with event counters.
@@ -353,3 +354,44 @@
     })
     .unwrap();
 }
+
+#[test]
+fn panic_and_poll() {
+    future!(f, waker, POLL, DROP_F);
+    schedule!(s, chan, SCHEDULE, DROP_S);
+    task!(task, handle, f, s, DROP_T);
+
+    task.run();
+    waker().wake();
+    assert_eq!(POLL.load(), 1);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 0);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_T.load(), 0);
+
+    let mut handle = handle;
+    assert!((&mut handle).now_or_never().is_none());
+
+    let task = chan.recv().unwrap();
+    assert!(catch_unwind(|| task.run()).is_err());
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_T.load(), 0);
+
+    assert!((&mut handle).now_or_never().is_some());
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 0);
+    assert_eq!(DROP_T.load(), 0);
+
+    drop(waker());
+    drop(handle);
+    assert_eq!(POLL.load(), 2);
+    assert_eq!(SCHEDULE.load(), 1);
+    assert_eq!(DROP_F.load(), 1);
+    assert_eq!(DROP_S.load(), 1);
+    assert_eq!(DROP_T.load(), 1);
+}