blob: c3df3b93c941e7e044d7b9cebbfeba199edd978f [file] [log] [blame]
Jason Macnakc417d3b2020-04-06 10:30:28 -07001use futures_core::stream::{Stream, FusedStream};
2use futures_core::task::{Context, Poll};
3use futures_sink::Sink;
Haibo Huang52627c82020-05-08 19:26:17 -07004use pin_project::{pin_project, project};
Jason Macnakc417d3b2020-04-06 10:30:28 -07005use core::pin::Pin;
6use alloc::collections::VecDeque;
7
8/// Sink for the [`buffer`](super::SinkExt::buffer) method.
Haibo Huang52627c82020-05-08 19:26:17 -07009#[pin_project]
Jason Macnakc417d3b2020-04-06 10:30:28 -070010#[derive(Debug)]
11#[must_use = "sinks do nothing unless polled"]
12pub struct Buffer<Si, Item> {
Haibo Huang52627c82020-05-08 19:26:17 -070013 #[pin]
Jason Macnakc417d3b2020-04-06 10:30:28 -070014 sink: Si,
15 buf: VecDeque<Item>,
16
17 // Track capacity separately from the `VecDeque`, which may be rounded up
18 capacity: usize,
19}
20
Jason Macnakc417d3b2020-04-06 10:30:28 -070021impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
Jason Macnakc417d3b2020-04-06 10:30:28 -070022 pub(super) fn new(sink: Si, capacity: usize) -> Self {
23 Buffer {
24 sink,
25 buf: VecDeque::with_capacity(capacity),
26 capacity,
27 }
28 }
29
Haibo Huang52627c82020-05-08 19:26:17 -070030 delegate_access_inner!(sink, Si, ());
Jason Macnakc417d3b2020-04-06 10:30:28 -070031
Haibo Huang52627c82020-05-08 19:26:17 -070032 #[project]
Jason Macnakc417d3b2020-04-06 10:30:28 -070033 fn try_empty_buffer(
Haibo Huang52627c82020-05-08 19:26:17 -070034 self: Pin<&mut Self>,
Jason Macnakc417d3b2020-04-06 10:30:28 -070035 cx: &mut Context<'_>,
36 ) -> Poll<Result<(), Si::Error>> {
Haibo Huang52627c82020-05-08 19:26:17 -070037 #[project]
38 let Buffer { mut sink, buf, .. } = self.project();
39 ready!(sink.as_mut().poll_ready(cx))?;
40 while let Some(item) = buf.pop_front() {
41 sink.as_mut().start_send(item)?;
42 if !buf.is_empty() {
43 ready!(sink.as_mut().poll_ready(cx))?;
Jason Macnakc417d3b2020-04-06 10:30:28 -070044 }
45 }
46 Poll::Ready(Ok(()))
47 }
48}
49
50// Forwarding impl of Stream from the underlying sink
51impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
52 type Item = S::Item;
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
Haibo Huang52627c82020-05-08 19:26:17 -070055 self.project().sink.poll_next(cx)
Jason Macnakc417d3b2020-04-06 10:30:28 -070056 }
57
58 fn size_hint(&self) -> (usize, Option<usize>) {
59 self.sink.size_hint()
60 }
61}
62
63impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {
64 fn is_terminated(&self) -> bool {
65 self.sink.is_terminated()
66 }
67}
68
69impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
70 type Error = Si::Error;
71
72 fn poll_ready(
73 mut self: Pin<&mut Self>,
74 cx: &mut Context<'_>,
75 ) -> Poll<Result<(), Self::Error>> {
76 if self.capacity == 0 {
Haibo Huang52627c82020-05-08 19:26:17 -070077 return self.project().sink.poll_ready(cx);
Jason Macnakc417d3b2020-04-06 10:30:28 -070078 }
79
80 let _ = self.as_mut().try_empty_buffer(cx)?;
81
82 if self.buf.len() >= self.capacity {
83 Poll::Pending
84 } else {
85 Poll::Ready(Ok(()))
86 }
87 }
88
89 fn start_send(
Haibo Huang52627c82020-05-08 19:26:17 -070090 self: Pin<&mut Self>,
Jason Macnakc417d3b2020-04-06 10:30:28 -070091 item: Item,
92 ) -> Result<(), Self::Error> {
93 if self.capacity == 0 {
Haibo Huang52627c82020-05-08 19:26:17 -070094 self.project().sink.start_send(item)
Jason Macnakc417d3b2020-04-06 10:30:28 -070095 } else {
Haibo Huang52627c82020-05-08 19:26:17 -070096 self.project().buf.push_back(item);
Jason Macnakc417d3b2020-04-06 10:30:28 -070097 Ok(())
98 }
99 }
100
101 #[allow(clippy::debug_assert_with_mut_call)]
102 fn poll_flush(
103 mut self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 ) -> Poll<Result<(), Self::Error>> {
106 ready!(self.as_mut().try_empty_buffer(cx))?;
Haibo Huang52627c82020-05-08 19:26:17 -0700107 debug_assert!(self.buf.is_empty());
108 self.project().sink.poll_flush(cx)
Jason Macnakc417d3b2020-04-06 10:30:28 -0700109 }
110
111 #[allow(clippy::debug_assert_with_mut_call)]
112 fn poll_close(
113 mut self: Pin<&mut Self>,
114 cx: &mut Context<'_>,
115 ) -> Poll<Result<(), Self::Error>> {
116 ready!(self.as_mut().try_empty_buffer(cx))?;
Haibo Huang52627c82020-05-08 19:26:17 -0700117 debug_assert!(self.buf.is_empty());
118 self.project().sink.poll_close(cx)
Jason Macnakc417d3b2020-04-06 10:30:28 -0700119 }
120}