Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 1 | //! A channel for sending a single message between asynchronous tasks. |
Chih-Hung Hsieh | bc88f08 | 2020-10-25 23:16:20 -0700 | [diff] [blame] | 2 | //! |
| 3 | //! This is a single-producer, single-consumer channel. |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 4 | |
| 5 | use alloc::sync::Arc; |
| 6 | use core::fmt; |
| 7 | use core::pin::Pin; |
| 8 | use core::sync::atomic::AtomicBool; |
| 9 | use core::sync::atomic::Ordering::SeqCst; |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 10 | use futures_core::future::{FusedFuture, Future}; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 11 | use futures_core::task::{Context, Poll, Waker}; |
| 12 | |
| 13 | use crate::lock::Lock; |
| 14 | |
| 15 | /// A future for a value that will be provided by another asynchronous task. |
| 16 | /// |
Chih-Hung Hsieh | bc88f08 | 2020-10-25 23:16:20 -0700 | [diff] [blame] | 17 | /// This is created by the [`channel`](channel) function. |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 18 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 19 | pub struct Receiver<T> { |
| 20 | inner: Arc<Inner<T>>, |
| 21 | } |
| 22 | |
| 23 | /// A means of transmitting a single value to another task. |
| 24 | /// |
Chih-Hung Hsieh | bc88f08 | 2020-10-25 23:16:20 -0700 | [diff] [blame] | 25 | /// This is created by the [`channel`](channel) function. |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 26 | pub struct Sender<T> { |
| 27 | inner: Arc<Inner<T>>, |
| 28 | } |
| 29 | |
| 30 | // The channels do not ever project Pin to the inner T |
| 31 | impl<T> Unpin for Receiver<T> {} |
| 32 | impl<T> Unpin for Sender<T> {} |
| 33 | |
| 34 | /// Internal state of the `Receiver`/`Sender` pair above. This is all used as |
| 35 | /// the internal synchronization between the two for send/recv operations. |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 36 | struct Inner<T> { |
| 37 | /// Indicates whether this oneshot is complete yet. This is filled in both |
| 38 | /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it |
| 39 | /// appropriately. |
| 40 | /// |
| 41 | /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is |
| 42 | /// unlocked and ready to be inspected. |
| 43 | /// |
| 44 | /// For `Sender` if this is `true` then the oneshot has gone away and it |
| 45 | /// can return ready from `poll_canceled`. |
| 46 | complete: AtomicBool, |
| 47 | |
| 48 | /// The actual data being transferred as part of this `Receiver`. This is |
| 49 | /// filled in by `Sender::complete` and read by `Receiver::poll`. |
| 50 | /// |
| 51 | /// Note that this is protected by `Lock`, but it is in theory safe to |
| 52 | /// replace with an `UnsafeCell` as it's actually protected by `complete` |
| 53 | /// above. I wouldn't recommend doing this, however, unless someone is |
| 54 | /// supremely confident in the various atomic orderings here and there. |
| 55 | data: Lock<Option<T>>, |
| 56 | |
| 57 | /// Field to store the task which is blocked in `Receiver::poll`. |
| 58 | /// |
| 59 | /// This is filled in when a oneshot is polled but not ready yet. Note that |
| 60 | /// the `Lock` here, unlike in `data` above, is important to resolve races. |
| 61 | /// Both the `Receiver` and the `Sender` halves understand that if they |
| 62 | /// can't acquire the lock then some important interference is happening. |
| 63 | rx_task: Lock<Option<Waker>>, |
| 64 | |
| 65 | /// Like `rx_task` above, except for the task blocked in |
| 66 | /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`. |
| 67 | tx_task: Lock<Option<Waker>>, |
| 68 | } |
| 69 | |
Chih-Hung Hsieh | bc88f08 | 2020-10-25 23:16:20 -0700 | [diff] [blame] | 70 | /// Creates a new one-shot channel for sending a single value across asynchronous tasks. |
| 71 | /// |
| 72 | /// The channel works for a spsc (single-producer, single-consumer) scheme. |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 73 | /// |
| 74 | /// This function is similar to Rust's channel constructor found in the standard |
| 75 | /// library. Two halves are returned, the first of which is a `Sender` handle, |
| 76 | /// used to signal the end of a computation and provide its value. The second |
| 77 | /// half is a `Receiver` which implements the `Future` trait, resolving to the |
| 78 | /// value that was given to the `Sender` handle. |
| 79 | /// |
| 80 | /// Each half can be separately owned and sent across tasks. |
| 81 | /// |
| 82 | /// # Examples |
| 83 | /// |
| 84 | /// ``` |
| 85 | /// use futures::channel::oneshot; |
Haibo Huang | 0cf3d2c | 2020-05-08 19:24:52 -0700 | [diff] [blame] | 86 | /// use std::{thread, time::Duration}; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 87 | /// |
| 88 | /// let (sender, receiver) = oneshot::channel::<i32>(); |
| 89 | /// |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 90 | /// thread::spawn(|| { |
Haibo Huang | 0cf3d2c | 2020-05-08 19:24:52 -0700 | [diff] [blame] | 91 | /// println!("THREAD: sleeping zzz..."); |
| 92 | /// thread::sleep(Duration::from_millis(1000)); |
| 93 | /// println!("THREAD: i'm awake! sending."); |
| 94 | /// sender.send(3).unwrap(); |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 95 | /// }); |
| 96 | /// |
Haibo Huang | 0cf3d2c | 2020-05-08 19:24:52 -0700 | [diff] [blame] | 97 | /// println!("MAIN: doing some useful stuff"); |
| 98 | /// |
| 99 | /// futures::executor::block_on(async { |
| 100 | /// println!("MAIN: waiting for msg..."); |
| 101 | /// println!("MAIN: got: {:?}", receiver.await) |
| 102 | /// }); |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 103 | /// ``` |
| 104 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
| 105 | let inner = Arc::new(Inner::new()); |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 106 | let receiver = Receiver { inner: inner.clone() }; |
| 107 | let sender = Sender { inner }; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 108 | (sender, receiver) |
| 109 | } |
| 110 | |
| 111 | impl<T> Inner<T> { |
Haibo Huang | 09da603 | 2021-02-09 17:02:02 -0800 | [diff] [blame] | 112 | fn new() -> Self { |
| 113 | Self { |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 114 | complete: AtomicBool::new(false), |
| 115 | data: Lock::new(None), |
| 116 | rx_task: Lock::new(None), |
| 117 | tx_task: Lock::new(None), |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | fn send(&self, t: T) -> Result<(), T> { |
| 122 | if self.complete.load(SeqCst) { |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 123 | return Err(t); |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 124 | } |
| 125 | |
| 126 | // Note that this lock acquisition may fail if the receiver |
Haibo Huang | 0cf3d2c | 2020-05-08 19:24:52 -0700 | [diff] [blame] | 127 | // is closed and sets the `complete` flag to `true`, whereupon |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 128 | // the receiver may call `poll()`. |
| 129 | if let Some(mut slot) = self.data.try_lock() { |
| 130 | assert!(slot.is_none()); |
| 131 | *slot = Some(t); |
| 132 | drop(slot); |
| 133 | |
| 134 | // If the receiver called `close()` between the check at the |
| 135 | // start of the function, and the lock being released, then |
| 136 | // the receiver may not be around to receive it, so try to |
| 137 | // pull it back out. |
| 138 | if self.complete.load(SeqCst) { |
| 139 | // If lock acquisition fails, then receiver is actually |
| 140 | // receiving it, so we're good. |
| 141 | if let Some(mut slot) = self.data.try_lock() { |
| 142 | if let Some(t) = slot.take() { |
| 143 | return Err(t); |
| 144 | } |
| 145 | } |
| 146 | } |
| 147 | Ok(()) |
| 148 | } else { |
| 149 | // Must have been closed |
| 150 | Err(t) |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 155 | // Fast path up first, just read the flag and see if our other half is |
| 156 | // gone. This flag is set both in our destructor and the oneshot |
| 157 | // destructor, but our destructor hasn't run yet so if it's set then the |
| 158 | // oneshot is gone. |
| 159 | if self.complete.load(SeqCst) { |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 160 | return Poll::Ready(()); |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 161 | } |
| 162 | |
| 163 | // If our other half is not gone then we need to park our current task |
| 164 | // and move it into the `tx_task` slot to get notified when it's |
| 165 | // actually gone. |
| 166 | // |
| 167 | // If `try_lock` fails, then the `Receiver` is in the process of using |
| 168 | // it, so we can deduce that it's now in the process of going away and |
| 169 | // hence we're canceled. If it succeeds then we just store our handle. |
| 170 | // |
| 171 | // Crucially we then check `complete` *again* before we return. |
| 172 | // While we were storing our handle inside `tx_task` the |
| 173 | // `Receiver` may have been dropped. The first thing it does is set the |
| 174 | // flag, and if it fails to acquire the lock it assumes that we'll see |
| 175 | // the flag later on. So... we then try to see the flag later on! |
| 176 | let handle = cx.waker().clone(); |
| 177 | match self.tx_task.try_lock() { |
| 178 | Some(mut p) => *p = Some(handle), |
| 179 | None => return Poll::Ready(()), |
| 180 | } |
| 181 | if self.complete.load(SeqCst) { |
| 182 | Poll::Ready(()) |
| 183 | } else { |
| 184 | Poll::Pending |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | fn is_canceled(&self) -> bool { |
| 189 | self.complete.load(SeqCst) |
| 190 | } |
| 191 | |
| 192 | fn drop_tx(&self) { |
| 193 | // Flag that we're a completed `Sender` and try to wake up a receiver. |
| 194 | // Whether or not we actually stored any data will get picked up and |
| 195 | // translated to either an item or cancellation. |
| 196 | // |
| 197 | // Note that if we fail to acquire the `rx_task` lock then that means |
| 198 | // we're in one of two situations: |
| 199 | // |
| 200 | // 1. The receiver is trying to block in `poll` |
| 201 | // 2. The receiver is being dropped |
| 202 | // |
| 203 | // In the first case it'll check the `complete` flag after it's done |
| 204 | // blocking to see if it succeeded. In the latter case we don't need to |
| 205 | // wake up anyone anyway. So in both cases it's ok to ignore the `None` |
| 206 | // case of `try_lock` and bail out. |
| 207 | // |
| 208 | // The first case crucially depends on `Lock` using `SeqCst` ordering |
| 209 | // under the hood. If it instead used `Release` / `Acquire` ordering, |
| 210 | // then it would not necessarily synchronize with `inner.complete` |
| 211 | // and deadlock might be possible, as was observed in |
| 212 | // https://github.com/rust-lang/futures-rs/pull/219. |
| 213 | self.complete.store(true, SeqCst); |
| 214 | |
| 215 | if let Some(mut slot) = self.rx_task.try_lock() { |
| 216 | if let Some(task) = slot.take() { |
| 217 | drop(slot); |
| 218 | task.wake(); |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | // If we registered a task for cancel notification drop it to reduce |
| 223 | // spurious wakeups |
| 224 | if let Some(mut slot) = self.tx_task.try_lock() { |
| 225 | drop(slot.take()); |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | fn close_rx(&self) { |
| 230 | // Flag our completion and then attempt to wake up the sender if it's |
| 231 | // blocked. See comments in `drop` below for more info |
| 232 | self.complete.store(true, SeqCst); |
| 233 | if let Some(mut handle) = self.tx_task.try_lock() { |
| 234 | if let Some(task) = handle.take() { |
| 235 | drop(handle); |
| 236 | task.wake() |
| 237 | } |
| 238 | } |
| 239 | } |
| 240 | |
| 241 | fn try_recv(&self) -> Result<Option<T>, Canceled> { |
| 242 | // If we're complete, either `::close_rx` or `::drop_tx` was called. |
| 243 | // We can assume a successful send if data is present. |
| 244 | if self.complete.load(SeqCst) { |
| 245 | if let Some(mut slot) = self.data.try_lock() { |
| 246 | if let Some(data) = slot.take() { |
| 247 | return Ok(Some(data)); |
| 248 | } |
| 249 | } |
| 250 | Err(Canceled) |
| 251 | } else { |
| 252 | Ok(None) |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
| 257 | // Check to see if some data has arrived. If it hasn't then we need to |
| 258 | // block our task. |
| 259 | // |
| 260 | // Note that the acquisition of the `rx_task` lock might fail below, but |
| 261 | // the only situation where this can happen is during `Sender::drop` |
| 262 | // when we are indeed completed already. If that's happening then we |
| 263 | // know we're completed so keep going. |
| 264 | let done = if self.complete.load(SeqCst) { |
| 265 | true |
| 266 | } else { |
| 267 | let task = cx.waker().clone(); |
| 268 | match self.rx_task.try_lock() { |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 269 | Some(mut slot) => { |
| 270 | *slot = Some(task); |
| 271 | false |
| 272 | } |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 273 | None => true, |
| 274 | } |
| 275 | }; |
| 276 | |
| 277 | // If we're `done` via one of the paths above, then look at the data and |
| 278 | // figure out what the answer is. If, however, we stored `rx_task` |
| 279 | // successfully above we need to check again if we're completed in case |
| 280 | // a message was sent while `rx_task` was locked and couldn't notify us |
| 281 | // otherwise. |
| 282 | // |
| 283 | // If we're not done, and we're not complete, though, then we've |
| 284 | // successfully blocked our task and we return `Pending`. |
| 285 | if done || self.complete.load(SeqCst) { |
| 286 | // If taking the lock fails, the sender will realise that the we're |
| 287 | // `done` when it checks the `complete` flag on the way out, and |
| 288 | // will treat the send as a failure. |
| 289 | if let Some(mut slot) = self.data.try_lock() { |
| 290 | if let Some(data) = slot.take() { |
| 291 | return Poll::Ready(Ok(data)); |
| 292 | } |
| 293 | } |
| 294 | Poll::Ready(Err(Canceled)) |
| 295 | } else { |
| 296 | Poll::Pending |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | fn drop_rx(&self) { |
| 301 | // Indicate to the `Sender` that we're done, so any future calls to |
| 302 | // `poll_canceled` are weeded out. |
| 303 | self.complete.store(true, SeqCst); |
| 304 | |
| 305 | // If we've blocked a task then there's no need for it to stick around, |
| 306 | // so we need to drop it. If this lock acquisition fails, though, then |
| 307 | // it's just because our `Sender` is trying to take the task, so we |
| 308 | // let them take care of that. |
| 309 | if let Some(mut slot) = self.rx_task.try_lock() { |
| 310 | let task = slot.take(); |
| 311 | drop(slot); |
| 312 | drop(task); |
| 313 | } |
| 314 | |
| 315 | // Finally, if our `Sender` wants to get notified of us going away, it |
| 316 | // would have stored something in `tx_task`. Here we try to peel that |
| 317 | // out and unpark it. |
| 318 | // |
| 319 | // Note that the `try_lock` here may fail, but only if the `Sender` is |
| 320 | // in the process of filling in the task. If that happens then we |
| 321 | // already flagged `complete` and they'll pick that up above. |
| 322 | if let Some(mut handle) = self.tx_task.try_lock() { |
| 323 | if let Some(task) = handle.take() { |
| 324 | drop(handle); |
| 325 | task.wake() |
| 326 | } |
| 327 | } |
| 328 | } |
| 329 | } |
| 330 | |
| 331 | impl<T> Sender<T> { |
| 332 | /// Completes this oneshot with a successful result. |
| 333 | /// |
| 334 | /// This function will consume `self` and indicate to the other end, the |
| 335 | /// [`Receiver`](Receiver), that the value provided is the result of the |
| 336 | /// computation this represents. |
| 337 | /// |
| 338 | /// If the value is successfully enqueued for the remote end to receive, |
| 339 | /// then `Ok(())` is returned. If the receiving end was dropped before |
Chih-Hung Hsieh | bc88f08 | 2020-10-25 23:16:20 -0700 | [diff] [blame] | 340 | /// this function was called, however, then `Err(t)` is returned. |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 341 | pub fn send(self, t: T) -> Result<(), T> { |
| 342 | self.inner.send(t) |
| 343 | } |
| 344 | |
| 345 | /// Polls this `Sender` half to detect whether its associated |
Chih-Hung Hsieh | bc88f08 | 2020-10-25 23:16:20 -0700 | [diff] [blame] | 346 | /// [`Receiver`](Receiver) has been dropped. |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 347 | /// |
| 348 | /// # Return values |
| 349 | /// |
| 350 | /// If `Ready(())` is returned then the associated `Receiver` has been |
| 351 | /// dropped, which means any work required for sending should be canceled. |
| 352 | /// |
| 353 | /// If `Pending` is returned then the associated `Receiver` is still |
| 354 | /// alive and may be able to receive a message if sent. The current task, |
| 355 | /// however, is scheduled to receive a notification if the corresponding |
| 356 | /// `Receiver` goes away. |
| 357 | pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
| 358 | self.inner.poll_canceled(cx) |
| 359 | } |
| 360 | |
Haibo Huang | 0cf3d2c | 2020-05-08 19:24:52 -0700 | [diff] [blame] | 361 | /// Creates a future that resolves when this `Sender`'s corresponding |
| 362 | /// [`Receiver`](Receiver) half has hung up. |
| 363 | /// |
| 364 | /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled) |
Haibo Huang | 09da603 | 2021-02-09 17:02:02 -0800 | [diff] [blame] | 365 | /// to expose a [`Future`](core::future::Future). |
Haibo Huang | 0cf3d2c | 2020-05-08 19:24:52 -0700 | [diff] [blame] | 366 | pub fn cancellation(&mut self) -> Cancellation<'_, T> { |
| 367 | Cancellation { inner: self } |
| 368 | } |
| 369 | |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 370 | /// Tests to see whether this `Sender`'s corresponding `Receiver` |
| 371 | /// has been dropped. |
| 372 | /// |
| 373 | /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not |
| 374 | /// enqueue a task for wakeup upon cancellation, but merely reports the |
| 375 | /// current state, which may be subject to concurrent modification. |
| 376 | pub fn is_canceled(&self) -> bool { |
| 377 | self.inner.is_canceled() |
| 378 | } |
Chih-Hung Hsieh | bc88f08 | 2020-10-25 23:16:20 -0700 | [diff] [blame] | 379 | |
| 380 | /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether |
| 381 | /// they were created by the same call to `channel`. |
| 382 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
| 383 | Arc::ptr_eq(&self.inner, &receiver.inner) |
| 384 | } |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 385 | } |
| 386 | |
| 387 | impl<T> Drop for Sender<T> { |
| 388 | fn drop(&mut self) { |
| 389 | self.inner.drop_tx() |
| 390 | } |
| 391 | } |
| 392 | |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 393 | impl<T: fmt::Debug> fmt::Debug for Sender<T> { |
| 394 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 395 | f.debug_struct("Sender").field("complete", &self.inner.complete).finish() |
| 396 | } |
| 397 | } |
| 398 | |
Haibo Huang | 0cf3d2c | 2020-05-08 19:24:52 -0700 | [diff] [blame] | 399 | /// A future that resolves when the receiving end of a channel has hung up. |
| 400 | /// |
| 401 | /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). |
| 402 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 403 | #[derive(Debug)] |
| 404 | pub struct Cancellation<'a, T> { |
| 405 | inner: &'a mut Sender<T>, |
| 406 | } |
| 407 | |
| 408 | impl<T> Future for Cancellation<'_, T> { |
| 409 | type Output = (); |
| 410 | |
| 411 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
| 412 | self.inner.poll_canceled(cx) |
| 413 | } |
| 414 | } |
| 415 | |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 416 | /// Error returned from a [`Receiver`](Receiver) when the corresponding |
| 417 | /// [`Sender`](Sender) is dropped. |
| 418 | #[derive(Clone, Copy, PartialEq, Eq, Debug)] |
| 419 | pub struct Canceled; |
| 420 | |
| 421 | impl fmt::Display for Canceled { |
| 422 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 423 | write!(f, "oneshot canceled") |
| 424 | } |
| 425 | } |
| 426 | |
| 427 | #[cfg(feature = "std")] |
| 428 | impl std::error::Error for Canceled {} |
| 429 | |
| 430 | impl<T> Receiver<T> { |
| 431 | /// Gracefully close this receiver, preventing any subsequent attempts to |
| 432 | /// send to it. |
| 433 | /// |
| 434 | /// Any `send` operation which happens after this method returns is |
| 435 | /// guaranteed to fail. After calling this method, you can use |
| 436 | /// [`Receiver::poll`](core::future::Future::poll) to determine whether a |
| 437 | /// message had previously been sent. |
| 438 | pub fn close(&mut self) { |
| 439 | self.inner.close_rx() |
| 440 | } |
| 441 | |
| 442 | /// Attempts to receive a message outside of the context of a task. |
| 443 | /// |
| 444 | /// Does not schedule a task wakeup or have any other side effects. |
| 445 | /// |
| 446 | /// A return value of `None` must be considered immediately stale (out of |
| 447 | /// date) unless [`close`](Receiver::close) has been called first. |
| 448 | /// |
| 449 | /// Returns an error if the sender was dropped. |
| 450 | pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> { |
| 451 | self.inner.try_recv() |
| 452 | } |
| 453 | } |
| 454 | |
| 455 | impl<T> Future for Receiver<T> { |
| 456 | type Output = Result<T, Canceled>; |
| 457 | |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 458 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 459 | self.inner.recv(cx) |
| 460 | } |
| 461 | } |
| 462 | |
Haibo Huang | 09da603 | 2021-02-09 17:02:02 -0800 | [diff] [blame] | 463 | impl<T> FusedFuture for Receiver<T> { |
| 464 | fn is_terminated(&self) -> bool { |
| 465 | if self.inner.complete.load(SeqCst) { |
| 466 | if let Some(slot) = self.inner.data.try_lock() { |
| 467 | if slot.is_some() { |
| 468 | return false; |
| 469 | } |
| 470 | } |
| 471 | true |
| 472 | } else { |
| 473 | false |
| 474 | } |
| 475 | } |
| 476 | } |
| 477 | |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 478 | impl<T> Drop for Receiver<T> { |
| 479 | fn drop(&mut self) { |
| 480 | self.inner.drop_rx() |
| 481 | } |
| 482 | } |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 483 | |
| 484 | impl<T: fmt::Debug> fmt::Debug for Receiver<T> { |
| 485 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 486 | f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() |
| 487 | } |
| 488 | } |