blob: d2a3f9098b33300d3a6eac30a50839c8b7d3d58f [file] [log] [blame]
Jason Macnakc417d3b2020-04-06 10:30:28 -07001use futures_core::stream::{Stream, FusedStream};
2use futures_core::task::{Context, Poll};
3use futures_sink::Sink;
4use pin_utils::{unsafe_pinned, unsafe_unpinned};
5use core::pin::Pin;
6use alloc::collections::VecDeque;
7
8/// Sink for the [`buffer`](super::SinkExt::buffer) method.
9#[derive(Debug)]
10#[must_use = "sinks do nothing unless polled"]
11pub struct Buffer<Si, Item> {
12 sink: Si,
13 buf: VecDeque<Item>,
14
15 // Track capacity separately from the `VecDeque`, which may be rounded up
16 capacity: usize,
17}
18
19impl<Si: Unpin, Item> Unpin for Buffer<Si, Item> {}
20
21impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
22 unsafe_pinned!(sink: Si);
23 unsafe_unpinned!(buf: VecDeque<Item>);
24 unsafe_unpinned!(capacity: usize);
25
26 pub(super) fn new(sink: Si, capacity: usize) -> Self {
27 Buffer {
28 sink,
29 buf: VecDeque::with_capacity(capacity),
30 capacity,
31 }
32 }
33
34 /// Get a shared reference to the inner sink.
35 pub fn get_ref(&self) -> &Si {
36 &self.sink
37 }
38
39 /// Get a mutable reference to the inner sink.
40 pub fn get_mut(&mut self) -> &mut Si {
41 &mut self.sink
42 }
43
44 /// Get a pinned mutable reference to the inner sink.
45 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
46 self.sink()
47 }
48
49 /// Consumes this combinator, returning the underlying sink.
50 ///
51 /// Note that this may discard intermediate state of this combinator, so
52 /// care should be taken to avoid losing resources when this is called.
53 pub fn into_inner(self) -> Si {
54 self.sink
55 }
56
57 fn try_empty_buffer(
58 mut self: Pin<&mut Self>,
59 cx: &mut Context<'_>,
60 ) -> Poll<Result<(), Si::Error>> {
61 ready!(self.as_mut().sink().poll_ready(cx))?;
62 while let Some(item) = self.as_mut().buf().pop_front() {
63 self.as_mut().sink().start_send(item)?;
64 if !self.buf.is_empty() {
65 ready!(self.as_mut().sink().poll_ready(cx))?;
66 }
67 }
68 Poll::Ready(Ok(()))
69 }
70}
71
72// Forwarding impl of Stream from the underlying sink
73impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
74 type Item = S::Item;
75
76 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
77 self.sink().poll_next(cx)
78 }
79
80 fn size_hint(&self) -> (usize, Option<usize>) {
81 self.sink.size_hint()
82 }
83}
84
85impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {
86 fn is_terminated(&self) -> bool {
87 self.sink.is_terminated()
88 }
89}
90
91impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
92 type Error = Si::Error;
93
94 fn poll_ready(
95 mut self: Pin<&mut Self>,
96 cx: &mut Context<'_>,
97 ) -> Poll<Result<(), Self::Error>> {
98 if self.capacity == 0 {
99 return self.as_mut().sink().poll_ready(cx);
100 }
101
102 let _ = self.as_mut().try_empty_buffer(cx)?;
103
104 if self.buf.len() >= self.capacity {
105 Poll::Pending
106 } else {
107 Poll::Ready(Ok(()))
108 }
109 }
110
111 fn start_send(
112 mut self: Pin<&mut Self>,
113 item: Item,
114 ) -> Result<(), Self::Error> {
115 if self.capacity == 0 {
116 self.as_mut().sink().start_send(item)
117 } else {
118 self.as_mut().buf().push_back(item);
119 Ok(())
120 }
121 }
122
123 #[allow(clippy::debug_assert_with_mut_call)]
124 fn poll_flush(
125 mut self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 ) -> Poll<Result<(), Self::Error>> {
128 ready!(self.as_mut().try_empty_buffer(cx))?;
129 debug_assert!(self.as_mut().buf().is_empty());
130 self.as_mut().sink().poll_flush(cx)
131 }
132
133 #[allow(clippy::debug_assert_with_mut_call)]
134 fn poll_close(
135 mut self: Pin<&mut Self>,
136 cx: &mut Context<'_>,
137 ) -> Poll<Result<(), Self::Error>> {
138 ready!(self.as_mut().try_empty_buffer(cx))?;
139 debug_assert!(self.as_mut().buf().is_empty());
140 self.as_mut().sink().poll_close(cx)
141 }
142}