Stjepan Glavina | 1479e86 | 2019-08-12 20:18:51 +0200 | [diff] [blame^] | 1 | use std::alloc::Layout; |
| 2 | use std::cell::Cell; |
| 3 | use std::fmt; |
| 4 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 5 | use std::task::Waker; |
| 6 | |
| 7 | use crossbeam_utils::Backoff; |
| 8 | |
| 9 | use crate::raw::TaskVTable; |
| 10 | use crate::state::*; |
| 11 | use crate::utils::{abort_on_panic, extend}; |
| 12 | |
| 13 | /// The header of a task. |
| 14 | /// |
| 15 | /// This header is stored right at the beginning of every heap-allocated task. |
| 16 | pub(crate) struct Header { |
| 17 | /// Current state of the task. |
| 18 | /// |
| 19 | /// Contains flags representing the current state and the reference count. |
| 20 | pub(crate) state: AtomicUsize, |
| 21 | |
| 22 | /// The task that is blocked on the `JoinHandle`. |
| 23 | /// |
| 24 | /// This waker needs to be woken once the task completes or is closed. |
| 25 | pub(crate) awaiter: Cell<Option<Waker>>, |
| 26 | |
| 27 | /// The virtual table. |
| 28 | /// |
| 29 | /// In addition to the actual waker virtual table, it also contains pointers to several other |
| 30 | /// methods necessary for bookkeeping the heap-allocated task. |
| 31 | pub(crate) vtable: &'static TaskVTable, |
| 32 | } |
| 33 | |
| 34 | impl Header { |
| 35 | /// Cancels the task. |
| 36 | /// |
| 37 | /// This method will only mark the task as closed and will notify the awaiter, but it won't |
| 38 | /// reschedule the task if it's not completed. |
| 39 | pub(crate) fn cancel(&self) { |
| 40 | let mut state = self.state.load(Ordering::Acquire); |
| 41 | |
| 42 | loop { |
| 43 | // If the task has been completed or closed, it can't be cancelled. |
| 44 | if state & (COMPLETED | CLOSED) != 0 { |
| 45 | break; |
| 46 | } |
| 47 | |
| 48 | // Mark the task as closed. |
| 49 | match self.state.compare_exchange_weak( |
| 50 | state, |
| 51 | state | CLOSED, |
| 52 | Ordering::AcqRel, |
| 53 | Ordering::Acquire, |
| 54 | ) { |
| 55 | Ok(_) => { |
| 56 | // Notify the awaiter that the task has been closed. |
| 57 | if state & AWAITER != 0 { |
| 58 | self.notify(); |
| 59 | } |
| 60 | |
| 61 | break; |
| 62 | } |
| 63 | Err(s) => state = s, |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | /// Notifies the task blocked on the task. |
| 69 | /// |
| 70 | /// If there is a registered waker, it will be removed from the header and woken. |
| 71 | #[inline] |
| 72 | pub(crate) fn notify(&self) { |
| 73 | if let Some(waker) = self.swap_awaiter(None) { |
| 74 | // We need a safeguard against panics because waking can panic. |
| 75 | abort_on_panic(|| { |
| 76 | waker.wake(); |
| 77 | }); |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | /// Notifies the task blocked on the task unless its waker matches `current`. |
| 82 | /// |
| 83 | /// If there is a registered waker, it will be removed from the header. |
| 84 | #[inline] |
| 85 | pub(crate) fn notify_unless(&self, current: &Waker) { |
| 86 | if let Some(waker) = self.swap_awaiter(None) { |
| 87 | if !waker.will_wake(current) { |
| 88 | // We need a safeguard against panics because waking can panic. |
| 89 | abort_on_panic(|| { |
| 90 | waker.wake(); |
| 91 | }); |
| 92 | } |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | /// Swaps the awaiter and returns the previous value. |
| 97 | #[inline] |
| 98 | pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> { |
| 99 | let new_is_none = new.is_none(); |
| 100 | |
| 101 | // We're about to try acquiring the lock in a loop. If it's already being held by another |
| 102 | // thread, we'll have to spin for a while so it's best to employ a backoff strategy. |
| 103 | let backoff = Backoff::new(); |
| 104 | loop { |
| 105 | // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag. |
| 106 | let state = if new_is_none { |
| 107 | self.state.fetch_or(LOCKED, Ordering::Acquire) |
| 108 | } else { |
| 109 | self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire) |
| 110 | }; |
| 111 | |
| 112 | // If the lock was acquired, break from the loop. |
| 113 | if state & LOCKED == 0 { |
| 114 | break; |
| 115 | } |
| 116 | |
| 117 | // Snooze for a little while because the lock is held by another thread. |
| 118 | backoff.snooze(); |
| 119 | } |
| 120 | |
| 121 | // Replace the awaiter. |
| 122 | let old = self.awaiter.replace(new); |
| 123 | |
| 124 | // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag. |
| 125 | if new_is_none { |
| 126 | self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release); |
| 127 | } else { |
| 128 | self.state.fetch_and(!LOCKED, Ordering::Release); |
| 129 | } |
| 130 | |
| 131 | old |
| 132 | } |
| 133 | |
| 134 | /// Returns the offset at which the tag of type `T` is stored. |
| 135 | #[inline] |
| 136 | pub(crate) fn offset_tag<T>() -> usize { |
| 137 | let layout_header = Layout::new::<Header>(); |
| 138 | let layout_t = Layout::new::<T>(); |
| 139 | let (_, offset_t) = extend(layout_header, layout_t); |
| 140 | offset_t |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | impl fmt::Debug for Header { |
| 145 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 146 | let state = self.state.load(Ordering::SeqCst); |
| 147 | |
| 148 | f.debug_struct("Header") |
| 149 | .field("scheduled", &(state & SCHEDULED != 0)) |
| 150 | .field("running", &(state & RUNNING != 0)) |
| 151 | .field("completed", &(state & COMPLETED != 0)) |
| 152 | .field("closed", &(state & CLOSED != 0)) |
| 153 | .field("awaiter", &(state & AWAITER != 0)) |
| 154 | .field("handle", &(state & HANDLE != 0)) |
| 155 | .field("ref_count", &(state / REFERENCE)) |
| 156 | .finish() |
| 157 | } |
| 158 | } |