Import 'futures-util' rust crate version 0.3.4
Bug: b/151760391
Test: m crosvm.experimental
Change-Id: I03ce20612b0c746bbff5053e98a1ec0310c75fdd
diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs
new file mode 100644
index 0000000..d2a3f90
--- /dev/null
+++ b/src/sink/buffer.rs
@@ -0,0 +1,142 @@
+use futures_core::stream::{Stream, FusedStream};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use core::pin::Pin;
+use alloc::collections::VecDeque;
+
+/// Sink for the [`buffer`](super::SinkExt::buffer) method.
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct Buffer<Si, Item> {
+ sink: Si,
+ buf: VecDeque<Item>,
+
+ // Track capacity separately from the `VecDeque`, which may be rounded up
+ capacity: usize,
+}
+
+impl<Si: Unpin, Item> Unpin for Buffer<Si, Item> {}
+
+impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
+ unsafe_pinned!(sink: Si);
+ unsafe_unpinned!(buf: VecDeque<Item>);
+ unsafe_unpinned!(capacity: usize);
+
+ pub(super) fn new(sink: Si, capacity: usize) -> Self {
+ Buffer {
+ sink,
+ buf: VecDeque::with_capacity(capacity),
+ capacity,
+ }
+ }
+
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &Si {
+ &self.sink
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut Si {
+ &mut self.sink
+ }
+
+ /// Get a pinned mutable reference to the inner sink.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
+ self.sink()
+ }
+
+ /// Consumes this combinator, returning the underlying sink.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> Si {
+ self.sink
+ }
+
+ fn try_empty_buffer(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Si::Error>> {
+ ready!(self.as_mut().sink().poll_ready(cx))?;
+ while let Some(item) = self.as_mut().buf().pop_front() {
+ self.as_mut().sink().start_send(item)?;
+ if !self.buf.is_empty() {
+ ready!(self.as_mut().sink().poll_ready(cx))?;
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
+ type Item = S::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
+ self.sink().poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.sink.size_hint()
+ }
+}
+
+impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {
+ fn is_terminated(&self) -> bool {
+ self.sink.is_terminated()
+ }
+}
+
+impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
+ type Error = Si::Error;
+
+ fn poll_ready(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ if self.capacity == 0 {
+ return self.as_mut().sink().poll_ready(cx);
+ }
+
+ let _ = self.as_mut().try_empty_buffer(cx)?;
+
+ if self.buf.len() >= self.capacity {
+ Poll::Pending
+ } else {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn start_send(
+ mut self: Pin<&mut Self>,
+ item: Item,
+ ) -> Result<(), Self::Error> {
+ if self.capacity == 0 {
+ self.as_mut().sink().start_send(item)
+ } else {
+ self.as_mut().buf().push_back(item);
+ Ok(())
+ }
+ }
+
+ #[allow(clippy::debug_assert_with_mut_call)]
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().try_empty_buffer(cx))?;
+ debug_assert!(self.as_mut().buf().is_empty());
+ self.as_mut().sink().poll_flush(cx)
+ }
+
+ #[allow(clippy::debug_assert_with_mut_call)]
+ fn poll_close(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().try_empty_buffer(cx))?;
+ debug_assert!(self.as_mut().buf().is_empty());
+ self.as_mut().sink().poll_close(cx)
+ }
+}