| use crate::Stream; |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| use tokio::sync::mpsc::Receiver; |
| |
| /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. |
| /// |
| /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver |
| /// [`Stream`]: trait@crate::Stream |
| #[derive(Debug)] |
| pub struct ReceiverStream<T> { |
| inner: Receiver<T>, |
| } |
| |
| impl<T> ReceiverStream<T> { |
| /// Create a new `ReceiverStream`. |
| pub fn new(recv: Receiver<T>) -> Self { |
| Self { inner: recv } |
| } |
| |
| /// Get back the inner `Receiver`. |
| pub fn into_inner(self) -> Receiver<T> { |
| self.inner |
| } |
| |
| /// Closes the receiving half of a channel without dropping it. |
| /// |
| /// This prevents any further messages from being sent on the channel while |
| /// still enabling the receiver to drain messages that are buffered. Any |
| /// outstanding [`Permit`] values will still be able to send messages. |
| /// |
| /// To guarantee no messages are dropped, after calling `close()`, you must |
| /// receive all items from the stream until `None` is returned. |
| /// |
| /// [`Permit`]: struct@tokio::sync::mpsc::Permit |
| pub fn close(&mut self) { |
| self.inner.close() |
| } |
| } |
| |
| impl<T> Stream for ReceiverStream<T> { |
| type Item = T; |
| |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| self.inner.poll_recv(cx) |
| } |
| } |
| |
| impl<T> AsRef<Receiver<T>> for ReceiverStream<T> { |
| fn as_ref(&self) -> &Receiver<T> { |
| &self.inner |
| } |
| } |
| |
| impl<T> AsMut<Receiver<T>> for ReceiverStream<T> { |
| fn as_mut(&mut self) -> &mut Receiver<T> { |
| &mut self.inner |
| } |
| } |