Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 1 | use futures_core::stream::{Stream, FusedStream}; |
| 2 | use futures_core::task::{Context, Poll}; |
| 3 | use futures_sink::Sink; |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 4 | use pin_project::{pin_project, project}; |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 5 | use core::pin::Pin; |
| 6 | use alloc::collections::VecDeque; |
| 7 | |
| 8 | /// Sink for the [`buffer`](super::SinkExt::buffer) method. |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 9 | #[pin_project] |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 10 | #[derive(Debug)] |
| 11 | #[must_use = "sinks do nothing unless polled"] |
| 12 | pub struct Buffer<Si, Item> { |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 13 | #[pin] |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 14 | sink: Si, |
| 15 | buf: VecDeque<Item>, |
| 16 | |
| 17 | // Track capacity separately from the `VecDeque`, which may be rounded up |
| 18 | capacity: usize, |
| 19 | } |
| 20 | |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 21 | impl<Si: Sink<Item>, Item> Buffer<Si, Item> { |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 22 | pub(super) fn new(sink: Si, capacity: usize) -> Self { |
| 23 | Buffer { |
| 24 | sink, |
| 25 | buf: VecDeque::with_capacity(capacity), |
| 26 | capacity, |
| 27 | } |
| 28 | } |
| 29 | |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 30 | delegate_access_inner!(sink, Si, ()); |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 31 | |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 32 | #[project] |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 33 | fn try_empty_buffer( |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 34 | self: Pin<&mut Self>, |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 35 | cx: &mut Context<'_>, |
| 36 | ) -> Poll<Result<(), Si::Error>> { |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 37 | #[project] |
| 38 | let Buffer { mut sink, buf, .. } = self.project(); |
| 39 | ready!(sink.as_mut().poll_ready(cx))?; |
| 40 | while let Some(item) = buf.pop_front() { |
| 41 | sink.as_mut().start_send(item)?; |
| 42 | if !buf.is_empty() { |
| 43 | ready!(sink.as_mut().poll_ready(cx))?; |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 44 | } |
| 45 | } |
| 46 | Poll::Ready(Ok(())) |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | // Forwarding impl of Stream from the underlying sink |
| 51 | impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream { |
| 52 | type Item = S::Item; |
| 53 | |
| 54 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 55 | self.project().sink.poll_next(cx) |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 56 | } |
| 57 | |
| 58 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 59 | self.sink.size_hint() |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream { |
| 64 | fn is_terminated(&self) -> bool { |
| 65 | self.sink.is_terminated() |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { |
| 70 | type Error = Si::Error; |
| 71 | |
| 72 | fn poll_ready( |
| 73 | mut self: Pin<&mut Self>, |
| 74 | cx: &mut Context<'_>, |
| 75 | ) -> Poll<Result<(), Self::Error>> { |
| 76 | if self.capacity == 0 { |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 77 | return self.project().sink.poll_ready(cx); |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 78 | } |
| 79 | |
| 80 | let _ = self.as_mut().try_empty_buffer(cx)?; |
| 81 | |
| 82 | if self.buf.len() >= self.capacity { |
| 83 | Poll::Pending |
| 84 | } else { |
| 85 | Poll::Ready(Ok(())) |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | fn start_send( |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 90 | self: Pin<&mut Self>, |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 91 | item: Item, |
| 92 | ) -> Result<(), Self::Error> { |
| 93 | if self.capacity == 0 { |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 94 | self.project().sink.start_send(item) |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 95 | } else { |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 96 | self.project().buf.push_back(item); |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 97 | Ok(()) |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | #[allow(clippy::debug_assert_with_mut_call)] |
| 102 | fn poll_flush( |
| 103 | mut self: Pin<&mut Self>, |
| 104 | cx: &mut Context<'_>, |
| 105 | ) -> Poll<Result<(), Self::Error>> { |
| 106 | ready!(self.as_mut().try_empty_buffer(cx))?; |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 107 | debug_assert!(self.buf.is_empty()); |
| 108 | self.project().sink.poll_flush(cx) |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 109 | } |
| 110 | |
| 111 | #[allow(clippy::debug_assert_with_mut_call)] |
| 112 | fn poll_close( |
| 113 | mut self: Pin<&mut Self>, |
| 114 | cx: &mut Context<'_>, |
| 115 | ) -> Poll<Result<(), Self::Error>> { |
| 116 | ready!(self.as_mut().try_empty_buffer(cx))?; |
Haibo Huang | 52627c8 | 2020-05-08 19:26:17 -0700 | [diff] [blame^] | 117 | debug_assert!(self.buf.is_empty()); |
| 118 | self.project().sink.poll_close(cx) |
Jason Macnak | c417d3b | 2020-04-06 10:30:28 -0700 | [diff] [blame] | 119 | } |
| 120 | } |