blob: 71ec654bfb3640447e7337d92b420df68c47424e [file] [log] [blame]
David LeGare47ef9cb2022-03-02 00:19:32 +00001use std::iter;
2use std::sync::Arc;
3
Joel Galenson22597362021-05-19 15:06:18 -07004use futures::channel::mpsc;
5use futures::executor::block_on;
6use futures::future::{self, Future};
David LeGare47ef9cb2022-03-02 00:19:32 +00007use futures::lock::Mutex;
Joel Galenson22597362021-05-19 15:06:18 -07008use futures::sink::SinkExt;
9use futures::stream::{self, StreamExt};
10use futures::task::Poll;
David LeGare47ef9cb2022-03-02 00:19:32 +000011use futures::{ready, FutureExt};
Joel Galenson22597362021-05-19 15:06:18 -070012use futures_test::task::noop_context;
13
Jason Macnak3d406bb2020-03-19 21:05:06 +000014#[test]
15fn select() {
16 fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
17 let a = stream::iter(a);
18 let b = stream::iter(b);
19 let vec = block_on(stream::select(a, b).collect::<Vec<_>>());
20 assert_eq!(vec, expected);
21 }
22
23 select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]);
24 select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]);
25 select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
26}
27
Haibo Huang67f2b762020-05-08 19:24:38 -070028#[test]
29fn flat_map() {
Joel Galenson22597362021-05-19 15:06:18 -070030 block_on(async {
31 let st =
32 stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]);
Haibo Huang67f2b762020-05-08 19:24:38 -070033
Joel Galenson22597362021-05-19 15:06:18 -070034 let values: Vec<_> =
35 st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await;
Haibo Huang67f2b762020-05-08 19:24:38 -070036
37 assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
38 });
39}
40
Jason Macnak3d406bb2020-03-19 21:05:06 +000041#[test]
42fn scan() {
Joel Galenson22597362021-05-19 15:06:18 -070043 block_on(async {
44 let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
45 .scan(1, |state, e| {
46 *state += 1;
47 futures::future::ready(if e < *state { Some(e) } else { None })
48 })
49 .collect::<Vec<_>>()
50 .await;
Haibo Huang67f2b762020-05-08 19:24:38 -070051
Joel Galenson22597362021-05-19 15:06:18 -070052 assert_eq!(values, vec![1u8, 2, 3, 4]);
Jason Macnak3d406bb2020-03-19 21:05:06 +000053 });
54}
Haibo Huang67f2b762020-05-08 19:24:38 -070055
Haibo Huang67f2b762020-05-08 19:24:38 -070056#[test]
David LeGare47ef9cb2022-03-02 00:19:32 +000057fn flatten_unordered() {
58 use futures::executor::block_on;
59 use futures::stream::*;
60 use futures::task::*;
61 use std::convert::identity;
62 use std::pin::Pin;
63 use std::thread;
64 use std::time::Duration;
65
66 struct DataStream {
67 data: Vec<u8>,
68 polled: bool,
69 wake_immediately: bool,
70 }
71
72 impl Stream for DataStream {
73 type Item = u8;
74
75 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
76 if !self.polled {
77 if !self.wake_immediately {
78 let waker = ctx.waker().clone();
79 let sleep_time =
80 Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
81 thread::spawn(move || {
82 thread::sleep(sleep_time);
83 waker.wake_by_ref();
84 });
85 } else {
86 ctx.waker().wake_by_ref();
87 }
88 self.polled = true;
89 Poll::Pending
90 } else {
91 self.polled = false;
92 Poll::Ready(self.data.pop())
93 }
94 }
95 }
96
97 struct Interchanger {
98 polled: bool,
99 base: u8,
100 wake_immediately: bool,
101 }
102
103 impl Stream for Interchanger {
104 type Item = DataStream;
105
106 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
107 if !self.polled {
108 self.polled = true;
109 if !self.wake_immediately {
110 let waker = ctx.waker().clone();
111 let sleep_time = Duration::from_millis(self.base as u64);
112 thread::spawn(move || {
113 thread::sleep(sleep_time);
114 waker.wake_by_ref();
115 });
116 } else {
117 ctx.waker().wake_by_ref();
118 }
119 Poll::Pending
120 } else {
121 let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
122 self.base += 1;
123 self.polled = false;
124 Poll::Ready(Some(DataStream {
125 polled: false,
126 data,
127 wake_immediately: self.wake_immediately && self.base % 2 == 0,
128 }))
129 }
130 }
131 }
132
133 // basic behaviour
134 {
135 block_on(async {
136 let st = stream::iter(vec![
137 stream::iter(0..=4u8),
138 stream::iter(6..=10),
139 stream::iter(10..=12),
140 ]);
141
142 let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
143
144 assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
145 });
146
147 block_on(async {
148 let st = stream::iter(vec![
149 stream::iter(0..=4u8),
150 stream::iter(6..=10),
151 stream::iter(0..=2),
152 ]);
153
154 let mut fm_unordered = st
155 .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
156 .collect::<Vec<_>>()
157 .await;
158
159 fm_unordered.sort_unstable();
160
161 assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
162 });
163 }
164
165 // wake up immediately
166 {
167 block_on(async {
168 let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
169 .take(10)
170 .map(|s| s.map(identity))
171 .flatten_unordered(10)
172 .collect::<Vec<_>>()
173 .await;
174
175 fl_unordered.sort_unstable();
176
177 assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
178 });
179
180 block_on(async {
181 let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
182 .take(10)
183 .flat_map_unordered(10, |s| s.map(identity))
184 .collect::<Vec<_>>()
185 .await;
186
187 fm_unordered.sort_unstable();
188
189 assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
190 });
191 }
192
193 // wake up after delay
194 {
195 block_on(async {
196 let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
197 .take(10)
198 .map(|s| s.map(identity))
199 .flatten_unordered(10)
200 .collect::<Vec<_>>()
201 .await;
202
203 fl_unordered.sort_unstable();
204
205 assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
206 });
207
208 block_on(async {
209 let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
210 .take(10)
211 .flat_map_unordered(10, |s| s.map(identity))
212 .collect::<Vec<_>>()
213 .await;
214
215 fm_unordered.sort_unstable();
216
217 assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
218 });
219
220 block_on(async {
221 let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
222 Interchanger { polled: false, base: 0, wake_immediately: false }
223 .take(10)
224 .flat_map_unordered(10, |s| s.map(identity))
225 .collect::<Vec<_>>(),
226 Interchanger { polled: false, base: 0, wake_immediately: false }
227 .take(10)
228 .map(|s| s.map(identity))
229 .flatten_unordered(10)
230 .collect::<Vec<_>>()
231 );
232
233 fm_unordered.sort_unstable();
234 fl_unordered.sort_unstable();
235
236 assert_eq!(fm_unordered, fl_unordered);
237 assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
238 });
239 }
240
241 // waker panics
242 {
243 let stream = Arc::new(Mutex::new(
244 Interchanger { polled: false, base: 0, wake_immediately: true }
245 .take(10)
246 .flat_map_unordered(10, |s| s.map(identity)),
247 ));
248
249 struct PanicWaker;
250
251 impl ArcWake for PanicWaker {
252 fn wake_by_ref(_arc_self: &Arc<Self>) {
253 panic!("WAKE UP");
254 }
255 }
256
257 std::thread::spawn({
258 let stream = stream.clone();
259 move || {
260 let mut st = poll_fn(|cx| {
261 let mut lock = ready!(stream.lock().poll_unpin(cx));
262
263 let panic_waker = waker(Arc::new(PanicWaker));
264 let mut panic_cx = Context::from_waker(&panic_waker);
265 let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
266
267 Poll::Ready(Some(()))
268 });
269
270 block_on(st.next())
271 }
272 })
273 .join()
274 .unwrap_err();
275
276 block_on(async move {
277 let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
278 values.sort_unstable();
279
280 assert_eq!(values, (0..60).collect::<Vec<u8>>());
281 });
282 }
283
284 // stream panics
285 {
286 let st = stream::iter(iter::once(
287 once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
288 ))
289 .chain(
290 Interchanger { polled: false, base: 0, wake_immediately: true }
291 .map(|stream| stream.right_stream())
292 .take(10),
293 );
294
295 let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
296
297 std::thread::spawn({
298 let stream = stream.clone();
299 move || {
300 let mut st = poll_fn(|cx| {
301 let mut lock = ready!(stream.lock().poll_unpin(cx));
302 let data = ready!(lock.poll_next_unpin(cx));
303
304 Poll::Ready(data)
305 });
306
307 block_on(st.next())
308 }
309 })
310 .join()
311 .unwrap_err();
312
313 block_on(async move {
314 let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
315 values.sort_unstable();
316
317 assert_eq!(values, (0..60).collect::<Vec<u8>>());
318 });
319 }
320}
321
322#[test]
Haibo Huang67f2b762020-05-08 19:24:38 -0700323fn take_until() {
Haibo Huang67f2b762020-05-08 19:24:38 -0700324 fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
325 let mut i = 0;
326 future::poll_fn(move |_cx| {
327 i += 1;
328 if i <= stop_on {
329 Poll::Pending
330 } else {
331 Poll::Ready(())
332 }
333 })
334 }
335
Joel Galenson22597362021-05-19 15:06:18 -0700336 block_on(async {
Haibo Huang67f2b762020-05-08 19:24:38 -0700337 // Verify stopping works:
338 let stream = stream::iter(1u32..=10);
339 let stop_fut = make_stop_fut(5);
340
341 let stream = stream.take_until(stop_fut);
342 let last = stream.fold(0, |_, i| async move { i }).await;
343 assert_eq!(last, 5);
344
345 // Verify take_future() works:
346 let stream = stream::iter(1..=10);
347 let stop_fut = make_stop_fut(5);
348
349 let mut stream = stream.take_until(stop_fut);
350
351 assert_eq!(stream.next().await, Some(1));
352 assert_eq!(stream.next().await, Some(2));
353
354 stream.take_future();
355
356 let last = stream.fold(0, |_, i| async move { i }).await;
357 assert_eq!(last, 10);
358
359 // Verify take_future() returns None if stream is stopped:
360 let stream = stream::iter(1u32..=10);
361 let stop_fut = make_stop_fut(1);
362 let mut stream = stream.take_until(stop_fut);
363 assert_eq!(stream.next().await, Some(1));
364 assert_eq!(stream.next().await, None);
365 assert!(stream.take_future().is_none());
366
367 // Verify TakeUntil is fused:
368 let mut i = 0;
369 let stream = stream::poll_fn(move |_cx| {
370 i += 1;
371 match i {
372 1 => Poll::Ready(Some(1)),
373 2 => Poll::Ready(None),
374 _ => panic!("TakeUntil not fused"),
375 }
376 });
377
378 let stop_fut = make_stop_fut(1);
379 let mut stream = stream.take_until(stop_fut);
380 assert_eq!(stream.next().await, Some(1));
381 assert_eq!(stream.next().await, None);
382 assert_eq!(stream.next().await, None);
383 });
384}
385
386#[test]
387#[should_panic]
Joel Galenson22597362021-05-19 15:06:18 -0700388fn chunks_panic_on_cap_zero() {
389 let (_, rx1) = mpsc::channel::<()>(1);
Haibo Huang67f2b762020-05-08 19:24:38 -0700390
Joel Galenson22597362021-05-19 15:06:18 -0700391 let _ = rx1.chunks(0);
392}
393
394#[test]
395#[should_panic]
396fn ready_chunks_panic_on_cap_zero() {
Haibo Huang67f2b762020-05-08 19:24:38 -0700397 let (_, rx1) = mpsc::channel::<()>(1);
398
399 let _ = rx1.ready_chunks(0);
400}
401
Haibo Huang67f2b762020-05-08 19:24:38 -0700402#[test]
403fn ready_chunks() {
Haibo Huang67f2b762020-05-08 19:24:38 -0700404 let (mut tx, rx1) = mpsc::channel::<i32>(16);
405
406 let mut s = rx1.ready_chunks(2);
407
408 let mut cx = noop_context();
409 assert!(s.next().poll_unpin(&mut cx).is_pending());
410
Joel Galenson22597362021-05-19 15:06:18 -0700411 block_on(async {
Haibo Huang67f2b762020-05-08 19:24:38 -0700412 tx.send(1).await.unwrap();
413
414 assert_eq!(s.next().await.unwrap(), vec![1]);
415 tx.send(2).await.unwrap();
416 tx.send(3).await.unwrap();
417 tx.send(4).await.unwrap();
Joel Galenson22597362021-05-19 15:06:18 -0700418 assert_eq!(s.next().await.unwrap(), vec![2, 3]);
Haibo Huang67f2b762020-05-08 19:24:38 -0700419 assert_eq!(s.next().await.unwrap(), vec![4]);
420 });
Chih-Hung Hsieh32ee67e2020-10-25 23:16:20 -0700421}