blob: 80a521ee17ae5a41dad2d7743bd318d0a439426e [file] [log] [blame]
Ivan Lozanoc0f49eb2021-01-25 21:39:17 -05001use crate::Stream;
2
3use std::borrow::Borrow;
4use std::hash::Hash;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8/// Combine many streams into one, indexing each source stream with a unique
9/// key.
10///
11/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
12/// streams into a single merged stream that yields values in the order that
13/// they arrive from the source streams. However, `StreamMap` has a lot more
14/// flexibility in usage patterns.
15///
16/// `StreamMap` can:
17///
18/// * Merge an arbitrary number of streams.
19/// * Track which source stream the value was received from.
20/// * Handle inserting and removing streams from the set of managed streams at
21/// any point during iteration.
22///
23/// All source streams held by `StreamMap` are indexed using a key. This key is
24/// included with the value when a source stream yields a value. The key is also
25/// used to remove the stream from the `StreamMap` before the stream has
26/// completed streaming.
27///
28/// # `Unpin`
29///
30/// Because the `StreamMap` API moves streams during runtime, both streams and
31/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
32/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
33/// pin the stream in the heap.
34///
35/// # Implementation
36///
37/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
38/// internal implementation detail will persist in future versions, but it is
39/// important to know the runtime implications. In general, `StreamMap` works
40/// best with a "smallish" number of streams as all entries are scanned on
41/// insert, remove, and polling. In cases where a large number of streams need
42/// to be merged, it may be advisable to use tasks sending values on a shared
43/// [`mpsc`] channel.
44///
45/// [`StreamExt::merge`]: crate::StreamExt::merge
46/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
47/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
48/// [`Box::pin`]: std::boxed::Box::pin
49///
50/// # Examples
51///
52/// Merging two streams, then remove them after receiving the first value
53///
54/// ```
55/// use tokio_stream::{StreamExt, StreamMap, Stream};
56/// use tokio::sync::mpsc;
57/// use std::pin::Pin;
58///
59/// #[tokio::main]
60/// async fn main() {
61/// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
62/// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
63///
64/// // Convert the channels to a `Stream`.
65/// let rx1 = Box::pin(async_stream::stream! {
66/// while let Some(item) = rx1.recv().await {
67/// yield item;
68/// }
69/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
70///
71/// let rx2 = Box::pin(async_stream::stream! {
72/// while let Some(item) = rx2.recv().await {
73/// yield item;
74/// }
75/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
76///
77/// tokio::spawn(async move {
78/// tx1.send(1).await.unwrap();
79///
80/// // This value will never be received. The send may or may not return
81/// // `Err` depending on if the remote end closed first or not.
82/// let _ = tx1.send(2).await;
83/// });
84///
85/// tokio::spawn(async move {
86/// tx2.send(3).await.unwrap();
87/// let _ = tx2.send(4).await;
88/// });
89///
90/// let mut map = StreamMap::new();
91///
92/// // Insert both streams
93/// map.insert("one", rx1);
94/// map.insert("two", rx2);
95///
96/// // Read twice
97/// for _ in 0..2 {
98/// let (key, val) = map.next().await.unwrap();
99///
100/// if key == "one" {
101/// assert_eq!(val, 1);
102/// } else {
103/// assert_eq!(val, 3);
104/// }
105///
106/// // Remove the stream to prevent reading the next value
107/// map.remove(key);
108/// }
109/// }
110/// ```
111///
112/// This example models a read-only client to a chat system with channels. The
113/// client sends commands to join and leave channels. `StreamMap` is used to
114/// manage active channel subscriptions.
115///
116/// For simplicity, messages are displayed with `println!`, but they could be
117/// sent to the client over a socket.
118///
119/// ```no_run
120/// use tokio_stream::{Stream, StreamExt, StreamMap};
121///
122/// enum Command {
123/// Join(String),
124/// Leave(String),
125/// }
126///
127/// fn commands() -> impl Stream<Item = Command> {
128/// // Streams in user commands by parsing `stdin`.
129/// # tokio_stream::pending()
130/// }
131///
132/// // Join a channel, returns a stream of messages received on the channel.
133/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
134/// // left as an exercise to the reader
135/// # tokio_stream::pending()
136/// }
137///
138/// #[tokio::main]
139/// async fn main() {
140/// let mut channels = StreamMap::new();
141///
142/// // Input commands (join / leave channels).
143/// let cmds = commands();
144/// tokio::pin!(cmds);
145///
146/// loop {
147/// tokio::select! {
148/// Some(cmd) = cmds.next() => {
149/// match cmd {
150/// Command::Join(chan) => {
151/// // Join the channel and add it to the `channels`
152/// // stream map
153/// let msgs = join(&chan);
154/// channels.insert(chan, msgs);
155/// }
156/// Command::Leave(chan) => {
157/// channels.remove(&chan);
158/// }
159/// }
160/// }
161/// Some((chan, msg)) = channels.next() => {
162/// // Received a message, display it on stdout with the channel
163/// // it originated from.
164/// println!("{}: {}", chan, msg);
165/// }
166/// // Both the `commands` stream and the `channels` stream are
167/// // complete. There is no more work to do, so leave the loop.
168/// else => break,
169/// }
170/// }
171/// }
172/// ```
173#[derive(Debug)]
174pub struct StreamMap<K, V> {
175 /// Streams stored in the map
176 entries: Vec<(K, V)>,
177}
178
179impl<K, V> StreamMap<K, V> {
180 /// An iterator visiting all key-value pairs in arbitrary order.
181 ///
182 /// The iterator element type is &'a (K, V).
183 ///
184 /// # Examples
185 ///
186 /// ```
187 /// use tokio_stream::{StreamMap, pending};
188 ///
189 /// let mut map = StreamMap::new();
190 ///
191 /// map.insert("a", pending::<i32>());
192 /// map.insert("b", pending());
193 /// map.insert("c", pending());
194 ///
195 /// for (key, stream) in map.iter() {
196 /// println!("({}, {:?})", key, stream);
197 /// }
198 /// ```
199 pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
200 self.entries.iter()
201 }
202
203 /// An iterator visiting all key-value pairs mutably in arbitrary order.
204 ///
205 /// The iterator element type is &'a mut (K, V).
206 ///
207 /// # Examples
208 ///
209 /// ```
210 /// use tokio_stream::{StreamMap, pending};
211 ///
212 /// let mut map = StreamMap::new();
213 ///
214 /// map.insert("a", pending::<i32>());
215 /// map.insert("b", pending());
216 /// map.insert("c", pending());
217 ///
218 /// for (key, stream) in map.iter_mut() {
219 /// println!("({}, {:?})", key, stream);
220 /// }
221 /// ```
222 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
223 self.entries.iter_mut()
224 }
225
226 /// Creates an empty `StreamMap`.
227 ///
228 /// The stream map is initially created with a capacity of `0`, so it will
229 /// not allocate until it is first inserted into.
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// use tokio_stream::{StreamMap, Pending};
235 ///
236 /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
237 /// ```
238 pub fn new() -> StreamMap<K, V> {
239 StreamMap { entries: vec![] }
240 }
241
242 /// Creates an empty `StreamMap` with the specified capacity.
243 ///
244 /// The stream map will be able to hold at least `capacity` elements without
245 /// reallocating. If `capacity` is 0, the stream map will not allocate.
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// use tokio_stream::{StreamMap, Pending};
251 ///
252 /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
253 /// ```
254 pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
255 StreamMap {
256 entries: Vec::with_capacity(capacity),
257 }
258 }
259
260 /// Returns an iterator visiting all keys in arbitrary order.
261 ///
262 /// The iterator element type is &'a K.
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// use tokio_stream::{StreamMap, pending};
268 ///
269 /// let mut map = StreamMap::new();
270 ///
271 /// map.insert("a", pending::<i32>());
272 /// map.insert("b", pending());
273 /// map.insert("c", pending());
274 ///
275 /// for key in map.keys() {
276 /// println!("{}", key);
277 /// }
278 /// ```
279 pub fn keys(&self) -> impl Iterator<Item = &K> {
280 self.iter().map(|(k, _)| k)
281 }
282
283 /// An iterator visiting all values in arbitrary order.
284 ///
285 /// The iterator element type is &'a V.
286 ///
287 /// # Examples
288 ///
289 /// ```
290 /// use tokio_stream::{StreamMap, pending};
291 ///
292 /// let mut map = StreamMap::new();
293 ///
294 /// map.insert("a", pending::<i32>());
295 /// map.insert("b", pending());
296 /// map.insert("c", pending());
297 ///
298 /// for stream in map.values() {
299 /// println!("{:?}", stream);
300 /// }
301 /// ```
302 pub fn values(&self) -> impl Iterator<Item = &V> {
303 self.iter().map(|(_, v)| v)
304 }
305
306 /// An iterator visiting all values mutably in arbitrary order.
307 ///
308 /// The iterator element type is &'a mut V.
309 ///
310 /// # Examples
311 ///
312 /// ```
313 /// use tokio_stream::{StreamMap, pending};
314 ///
315 /// let mut map = StreamMap::new();
316 ///
317 /// map.insert("a", pending::<i32>());
318 /// map.insert("b", pending());
319 /// map.insert("c", pending());
320 ///
321 /// for stream in map.values_mut() {
322 /// println!("{:?}", stream);
323 /// }
324 /// ```
325 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
326 self.iter_mut().map(|(_, v)| v)
327 }
328
329 /// Returns the number of streams the map can hold without reallocating.
330 ///
331 /// This number is a lower bound; the `StreamMap` might be able to hold
332 /// more, but is guaranteed to be able to hold at least this many.
333 ///
334 /// # Examples
335 ///
336 /// ```
337 /// use tokio_stream::{StreamMap, Pending};
338 ///
339 /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
340 /// assert!(map.capacity() >= 100);
341 /// ```
342 pub fn capacity(&self) -> usize {
343 self.entries.capacity()
344 }
345
346 /// Returns the number of streams in the map.
347 ///
348 /// # Examples
349 ///
350 /// ```
351 /// use tokio_stream::{StreamMap, pending};
352 ///
353 /// let mut a = StreamMap::new();
354 /// assert_eq!(a.len(), 0);
355 /// a.insert(1, pending::<i32>());
356 /// assert_eq!(a.len(), 1);
357 /// ```
358 pub fn len(&self) -> usize {
359 self.entries.len()
360 }
361
362 /// Returns `true` if the map contains no elements.
363 ///
364 /// # Examples
365 ///
366 /// ```
Joel Galensonbd64f252021-08-09 10:50:19 -0700367 /// use tokio_stream::{StreamMap, pending};
Ivan Lozanoc0f49eb2021-01-25 21:39:17 -0500368 ///
Joel Galensonbd64f252021-08-09 10:50:19 -0700369 /// let mut a = StreamMap::new();
Ivan Lozanoc0f49eb2021-01-25 21:39:17 -0500370 /// assert!(a.is_empty());
Joel Galensonbd64f252021-08-09 10:50:19 -0700371 /// a.insert(1, pending::<i32>());
Ivan Lozanoc0f49eb2021-01-25 21:39:17 -0500372 /// assert!(!a.is_empty());
373 /// ```
374 pub fn is_empty(&self) -> bool {
375 self.entries.is_empty()
376 }
377
378 /// Clears the map, removing all key-stream pairs. Keeps the allocated
379 /// memory for reuse.
380 ///
381 /// # Examples
382 ///
383 /// ```
384 /// use tokio_stream::{StreamMap, pending};
385 ///
386 /// let mut a = StreamMap::new();
387 /// a.insert(1, pending::<i32>());
388 /// a.clear();
389 /// assert!(a.is_empty());
390 /// ```
391 pub fn clear(&mut self) {
392 self.entries.clear();
393 }
394
395 /// Insert a key-stream pair into the map.
396 ///
397 /// If the map did not have this key present, `None` is returned.
398 ///
399 /// If the map did have this key present, the new `stream` replaces the old
400 /// one and the old stream is returned.
401 ///
402 /// # Examples
403 ///
404 /// ```
405 /// use tokio_stream::{StreamMap, pending};
406 ///
407 /// let mut map = StreamMap::new();
408 ///
409 /// assert!(map.insert(37, pending::<i32>()).is_none());
410 /// assert!(!map.is_empty());
411 ///
412 /// map.insert(37, pending());
413 /// assert!(map.insert(37, pending()).is_some());
414 /// ```
415 pub fn insert(&mut self, k: K, stream: V) -> Option<V>
416 where
417 K: Hash + Eq,
418 {
419 let ret = self.remove(&k);
420 self.entries.push((k, stream));
421
422 ret
423 }
424
425 /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
426 ///
427 /// The key may be any borrowed form of the map's key type, but `Hash` and
428 /// `Eq` on the borrowed form must match those for the key type.
429 ///
430 /// # Examples
431 ///
432 /// ```
433 /// use tokio_stream::{StreamMap, pending};
434 ///
435 /// let mut map = StreamMap::new();
436 /// map.insert(1, pending::<i32>());
437 /// assert!(map.remove(&1).is_some());
438 /// assert!(map.remove(&1).is_none());
439 /// ```
440 pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
441 where
442 K: Borrow<Q>,
443 Q: Hash + Eq,
444 {
445 for i in 0..self.entries.len() {
446 if self.entries[i].0.borrow() == k {
447 return Some(self.entries.swap_remove(i).1);
448 }
449 }
450
451 None
452 }
453
454 /// Returns `true` if the map contains a stream for the specified key.
455 ///
456 /// The key may be any borrowed form of the map's key type, but `Hash` and
457 /// `Eq` on the borrowed form must match those for the key type.
458 ///
459 /// # Examples
460 ///
461 /// ```
462 /// use tokio_stream::{StreamMap, pending};
463 ///
464 /// let mut map = StreamMap::new();
465 /// map.insert(1, pending::<i32>());
466 /// assert_eq!(map.contains_key(&1), true);
467 /// assert_eq!(map.contains_key(&2), false);
468 /// ```
469 pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
470 where
471 K: Borrow<Q>,
472 Q: Hash + Eq,
473 {
474 for i in 0..self.entries.len() {
475 if self.entries[i].0.borrow() == k {
476 return true;
477 }
478 }
479
480 false
481 }
482}
483
484impl<K, V> StreamMap<K, V>
485where
486 K: Unpin,
487 V: Stream + Unpin,
488{
489 /// Polls the next value, includes the vec entry index
490 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
491 use Poll::*;
492
493 let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
494 let mut idx = start;
495
496 for _ in 0..self.entries.len() {
497 let (_, stream) = &mut self.entries[idx];
498
499 match Pin::new(stream).poll_next(cx) {
500 Ready(Some(val)) => return Ready(Some((idx, val))),
501 Ready(None) => {
502 // Remove the entry
503 self.entries.swap_remove(idx);
504
505 // Check if this was the last entry, if so the cursor needs
506 // to wrap
507 if idx == self.entries.len() {
508 idx = 0;
509 } else if idx < start && start <= self.entries.len() {
510 // The stream being swapped into the current index has
511 // already been polled, so skip it.
512 idx = idx.wrapping_add(1) % self.entries.len();
513 }
514 }
515 Pending => {
516 idx = idx.wrapping_add(1) % self.entries.len();
517 }
518 }
519 }
520
521 // If the map is empty, then the stream is complete.
522 if self.entries.is_empty() {
523 Ready(None)
524 } else {
525 Pending
526 }
527 }
528}
529
530impl<K, V> Default for StreamMap<K, V> {
531 fn default() -> Self {
532 Self::new()
533 }
534}
535
536impl<K, V> Stream for StreamMap<K, V>
537where
538 K: Clone + Unpin,
539 V: Stream + Unpin,
540{
541 type Item = (K, V::Item);
542
543 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
544 if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
545 let key = self.entries[idx].0.clone();
546 Poll::Ready(Some((key, val)))
547 } else {
548 Poll::Ready(None)
549 }
550 }
551
552 fn size_hint(&self) -> (usize, Option<usize>) {
553 let mut ret = (0, Some(0));
554
555 for (_, stream) in &self.entries {
556 let hint = stream.size_hint();
557
558 ret.0 += hint.0;
559
560 match (ret.1, hint.1) {
561 (Some(a), Some(b)) => ret.1 = Some(a + b),
562 (Some(_), None) => ret.1 = None,
563 _ => {}
564 }
565 }
566
567 ret
568 }
569}
570
David LeGare9efa0492022-03-02 16:21:06 +0000571impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
572where
573 K: Hash + Eq,
574{
575 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
576 let iterator = iter.into_iter();
577 let (lower_bound, _) = iterator.size_hint();
578 let mut stream_map = Self::with_capacity(lower_bound);
579
580 for (key, value) in iterator {
581 stream_map.insert(key, value);
582 }
583
584 stream_map
585 }
586}
587
Ivan Lozanoc0f49eb2021-01-25 21:39:17 -0500588mod rand {
589 use std::cell::Cell;
590
591 mod loom {
592 #[cfg(not(loom))]
593 pub(crate) mod rand {
594 use std::collections::hash_map::RandomState;
595 use std::hash::{BuildHasher, Hash, Hasher};
596 use std::sync::atomic::AtomicU32;
597 use std::sync::atomic::Ordering::Relaxed;
598
599 static COUNTER: AtomicU32 = AtomicU32::new(1);
600
601 pub(crate) fn seed() -> u64 {
602 let rand_state = RandomState::new();
603
604 let mut hasher = rand_state.build_hasher();
605
606 // Hash some unique-ish data to generate some new state
607 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
608
609 // Get the seed
610 hasher.finish()
611 }
612 }
613
614 #[cfg(loom)]
615 pub(crate) mod rand {
616 pub(crate) fn seed() -> u64 {
617 1
618 }
619 }
620 }
621
622 /// Fast random number generate
623 ///
624 /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
Joel Galenson6ba60722021-05-19 17:36:51 -0700625 /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
626 /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
Ivan Lozanoc0f49eb2021-01-25 21:39:17 -0500627 /// This generator passes the SmallCrush suite, part of TestU01 framework:
Joel Galenson6ba60722021-05-19 17:36:51 -0700628 /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
Ivan Lozanoc0f49eb2021-01-25 21:39:17 -0500629 #[derive(Debug)]
630 pub(crate) struct FastRand {
631 one: Cell<u32>,
632 two: Cell<u32>,
633 }
634
635 impl FastRand {
636 /// Initialize a new, thread-local, fast random number generator.
637 pub(crate) fn new(seed: u64) -> FastRand {
638 let one = (seed >> 32) as u32;
639 let mut two = seed as u32;
640
641 if two == 0 {
642 // This value cannot be zero
643 two = 1;
644 }
645
646 FastRand {
647 one: Cell::new(one),
648 two: Cell::new(two),
649 }
650 }
651
652 pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
653 // This is similar to fastrand() % n, but faster.
654 // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
655 let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
656 (mul >> 32) as u32
657 }
658
659 fn fastrand(&self) -> u32 {
660 let mut s1 = self.one.get();
661 let s0 = self.two.get();
662
663 s1 ^= s1 << 17;
664 s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
665
666 self.one.set(s0);
667 self.two.set(s1);
668
669 s0.wrapping_add(s1)
670 }
671 }
672
673 // Used by `StreamMap`
674 pub(crate) fn thread_rng_n(n: u32) -> u32 {
675 thread_local! {
676 static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
677 }
678
679 THREAD_RNG.with(|rng| rng.fastrand_n(n))
680 }
681}