Ivan Lozano | c0f49eb | 2021-01-25 21:39:17 -0500 | [diff] [blame] | 1 | use crate::Stream; |
| 2 | |
| 3 | use std::borrow::Borrow; |
| 4 | use std::hash::Hash; |
| 5 | use std::pin::Pin; |
| 6 | use std::task::{Context, Poll}; |
| 7 | |
| 8 | /// Combine many streams into one, indexing each source stream with a unique |
| 9 | /// key. |
| 10 | /// |
| 11 | /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source |
| 12 | /// streams into a single merged stream that yields values in the order that |
| 13 | /// they arrive from the source streams. However, `StreamMap` has a lot more |
| 14 | /// flexibility in usage patterns. |
| 15 | /// |
| 16 | /// `StreamMap` can: |
| 17 | /// |
| 18 | /// * Merge an arbitrary number of streams. |
| 19 | /// * Track which source stream the value was received from. |
| 20 | /// * Handle inserting and removing streams from the set of managed streams at |
| 21 | /// any point during iteration. |
| 22 | /// |
| 23 | /// All source streams held by `StreamMap` are indexed using a key. This key is |
| 24 | /// included with the value when a source stream yields a value. The key is also |
| 25 | /// used to remove the stream from the `StreamMap` before the stream has |
| 26 | /// completed streaming. |
| 27 | /// |
| 28 | /// # `Unpin` |
| 29 | /// |
| 30 | /// Because the `StreamMap` API moves streams during runtime, both streams and |
| 31 | /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a |
| 32 | /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to |
| 33 | /// pin the stream in the heap. |
| 34 | /// |
| 35 | /// # Implementation |
| 36 | /// |
| 37 | /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this |
| 38 | /// internal implementation detail will persist in future versions, but it is |
| 39 | /// important to know the runtime implications. In general, `StreamMap` works |
| 40 | /// best with a "smallish" number of streams as all entries are scanned on |
| 41 | /// insert, remove, and polling. In cases where a large number of streams need |
| 42 | /// to be merged, it may be advisable to use tasks sending values on a shared |
| 43 | /// [`mpsc`] channel. |
| 44 | /// |
| 45 | /// [`StreamExt::merge`]: crate::StreamExt::merge |
| 46 | /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html |
| 47 | /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html |
| 48 | /// [`Box::pin`]: std::boxed::Box::pin |
| 49 | /// |
| 50 | /// # Examples |
| 51 | /// |
| 52 | /// Merging two streams, then remove them after receiving the first value |
| 53 | /// |
| 54 | /// ``` |
| 55 | /// use tokio_stream::{StreamExt, StreamMap, Stream}; |
| 56 | /// use tokio::sync::mpsc; |
| 57 | /// use std::pin::Pin; |
| 58 | /// |
| 59 | /// #[tokio::main] |
| 60 | /// async fn main() { |
| 61 | /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); |
| 62 | /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); |
| 63 | /// |
| 64 | /// // Convert the channels to a `Stream`. |
| 65 | /// let rx1 = Box::pin(async_stream::stream! { |
| 66 | /// while let Some(item) = rx1.recv().await { |
| 67 | /// yield item; |
| 68 | /// } |
| 69 | /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
| 70 | /// |
| 71 | /// let rx2 = Box::pin(async_stream::stream! { |
| 72 | /// while let Some(item) = rx2.recv().await { |
| 73 | /// yield item; |
| 74 | /// } |
| 75 | /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
| 76 | /// |
| 77 | /// tokio::spawn(async move { |
| 78 | /// tx1.send(1).await.unwrap(); |
| 79 | /// |
| 80 | /// // This value will never be received. The send may or may not return |
| 81 | /// // `Err` depending on if the remote end closed first or not. |
| 82 | /// let _ = tx1.send(2).await; |
| 83 | /// }); |
| 84 | /// |
| 85 | /// tokio::spawn(async move { |
| 86 | /// tx2.send(3).await.unwrap(); |
| 87 | /// let _ = tx2.send(4).await; |
| 88 | /// }); |
| 89 | /// |
| 90 | /// let mut map = StreamMap::new(); |
| 91 | /// |
| 92 | /// // Insert both streams |
| 93 | /// map.insert("one", rx1); |
| 94 | /// map.insert("two", rx2); |
| 95 | /// |
| 96 | /// // Read twice |
| 97 | /// for _ in 0..2 { |
| 98 | /// let (key, val) = map.next().await.unwrap(); |
| 99 | /// |
| 100 | /// if key == "one" { |
| 101 | /// assert_eq!(val, 1); |
| 102 | /// } else { |
| 103 | /// assert_eq!(val, 3); |
| 104 | /// } |
| 105 | /// |
| 106 | /// // Remove the stream to prevent reading the next value |
| 107 | /// map.remove(key); |
| 108 | /// } |
| 109 | /// } |
| 110 | /// ``` |
| 111 | /// |
| 112 | /// This example models a read-only client to a chat system with channels. The |
| 113 | /// client sends commands to join and leave channels. `StreamMap` is used to |
| 114 | /// manage active channel subscriptions. |
| 115 | /// |
| 116 | /// For simplicity, messages are displayed with `println!`, but they could be |
| 117 | /// sent to the client over a socket. |
| 118 | /// |
| 119 | /// ```no_run |
| 120 | /// use tokio_stream::{Stream, StreamExt, StreamMap}; |
| 121 | /// |
| 122 | /// enum Command { |
| 123 | /// Join(String), |
| 124 | /// Leave(String), |
| 125 | /// } |
| 126 | /// |
| 127 | /// fn commands() -> impl Stream<Item = Command> { |
| 128 | /// // Streams in user commands by parsing `stdin`. |
| 129 | /// # tokio_stream::pending() |
| 130 | /// } |
| 131 | /// |
| 132 | /// // Join a channel, returns a stream of messages received on the channel. |
| 133 | /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { |
| 134 | /// // left as an exercise to the reader |
| 135 | /// # tokio_stream::pending() |
| 136 | /// } |
| 137 | /// |
| 138 | /// #[tokio::main] |
| 139 | /// async fn main() { |
| 140 | /// let mut channels = StreamMap::new(); |
| 141 | /// |
| 142 | /// // Input commands (join / leave channels). |
| 143 | /// let cmds = commands(); |
| 144 | /// tokio::pin!(cmds); |
| 145 | /// |
| 146 | /// loop { |
| 147 | /// tokio::select! { |
| 148 | /// Some(cmd) = cmds.next() => { |
| 149 | /// match cmd { |
| 150 | /// Command::Join(chan) => { |
| 151 | /// // Join the channel and add it to the `channels` |
| 152 | /// // stream map |
| 153 | /// let msgs = join(&chan); |
| 154 | /// channels.insert(chan, msgs); |
| 155 | /// } |
| 156 | /// Command::Leave(chan) => { |
| 157 | /// channels.remove(&chan); |
| 158 | /// } |
| 159 | /// } |
| 160 | /// } |
| 161 | /// Some((chan, msg)) = channels.next() => { |
| 162 | /// // Received a message, display it on stdout with the channel |
| 163 | /// // it originated from. |
| 164 | /// println!("{}: {}", chan, msg); |
| 165 | /// } |
| 166 | /// // Both the `commands` stream and the `channels` stream are |
| 167 | /// // complete. There is no more work to do, so leave the loop. |
| 168 | /// else => break, |
| 169 | /// } |
| 170 | /// } |
| 171 | /// } |
| 172 | /// ``` |
| 173 | #[derive(Debug)] |
| 174 | pub struct StreamMap<K, V> { |
| 175 | /// Streams stored in the map |
| 176 | entries: Vec<(K, V)>, |
| 177 | } |
| 178 | |
| 179 | impl<K, V> StreamMap<K, V> { |
| 180 | /// An iterator visiting all key-value pairs in arbitrary order. |
| 181 | /// |
| 182 | /// The iterator element type is &'a (K, V). |
| 183 | /// |
| 184 | /// # Examples |
| 185 | /// |
| 186 | /// ``` |
| 187 | /// use tokio_stream::{StreamMap, pending}; |
| 188 | /// |
| 189 | /// let mut map = StreamMap::new(); |
| 190 | /// |
| 191 | /// map.insert("a", pending::<i32>()); |
| 192 | /// map.insert("b", pending()); |
| 193 | /// map.insert("c", pending()); |
| 194 | /// |
| 195 | /// for (key, stream) in map.iter() { |
| 196 | /// println!("({}, {:?})", key, stream); |
| 197 | /// } |
| 198 | /// ``` |
| 199 | pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { |
| 200 | self.entries.iter() |
| 201 | } |
| 202 | |
| 203 | /// An iterator visiting all key-value pairs mutably in arbitrary order. |
| 204 | /// |
| 205 | /// The iterator element type is &'a mut (K, V). |
| 206 | /// |
| 207 | /// # Examples |
| 208 | /// |
| 209 | /// ``` |
| 210 | /// use tokio_stream::{StreamMap, pending}; |
| 211 | /// |
| 212 | /// let mut map = StreamMap::new(); |
| 213 | /// |
| 214 | /// map.insert("a", pending::<i32>()); |
| 215 | /// map.insert("b", pending()); |
| 216 | /// map.insert("c", pending()); |
| 217 | /// |
| 218 | /// for (key, stream) in map.iter_mut() { |
| 219 | /// println!("({}, {:?})", key, stream); |
| 220 | /// } |
| 221 | /// ``` |
| 222 | pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { |
| 223 | self.entries.iter_mut() |
| 224 | } |
| 225 | |
| 226 | /// Creates an empty `StreamMap`. |
| 227 | /// |
| 228 | /// The stream map is initially created with a capacity of `0`, so it will |
| 229 | /// not allocate until it is first inserted into. |
| 230 | /// |
| 231 | /// # Examples |
| 232 | /// |
| 233 | /// ``` |
| 234 | /// use tokio_stream::{StreamMap, Pending}; |
| 235 | /// |
| 236 | /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); |
| 237 | /// ``` |
| 238 | pub fn new() -> StreamMap<K, V> { |
| 239 | StreamMap { entries: vec![] } |
| 240 | } |
| 241 | |
| 242 | /// Creates an empty `StreamMap` with the specified capacity. |
| 243 | /// |
| 244 | /// The stream map will be able to hold at least `capacity` elements without |
| 245 | /// reallocating. If `capacity` is 0, the stream map will not allocate. |
| 246 | /// |
| 247 | /// # Examples |
| 248 | /// |
| 249 | /// ``` |
| 250 | /// use tokio_stream::{StreamMap, Pending}; |
| 251 | /// |
| 252 | /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); |
| 253 | /// ``` |
| 254 | pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { |
| 255 | StreamMap { |
| 256 | entries: Vec::with_capacity(capacity), |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | /// Returns an iterator visiting all keys in arbitrary order. |
| 261 | /// |
| 262 | /// The iterator element type is &'a K. |
| 263 | /// |
| 264 | /// # Examples |
| 265 | /// |
| 266 | /// ``` |
| 267 | /// use tokio_stream::{StreamMap, pending}; |
| 268 | /// |
| 269 | /// let mut map = StreamMap::new(); |
| 270 | /// |
| 271 | /// map.insert("a", pending::<i32>()); |
| 272 | /// map.insert("b", pending()); |
| 273 | /// map.insert("c", pending()); |
| 274 | /// |
| 275 | /// for key in map.keys() { |
| 276 | /// println!("{}", key); |
| 277 | /// } |
| 278 | /// ``` |
| 279 | pub fn keys(&self) -> impl Iterator<Item = &K> { |
| 280 | self.iter().map(|(k, _)| k) |
| 281 | } |
| 282 | |
| 283 | /// An iterator visiting all values in arbitrary order. |
| 284 | /// |
| 285 | /// The iterator element type is &'a V. |
| 286 | /// |
| 287 | /// # Examples |
| 288 | /// |
| 289 | /// ``` |
| 290 | /// use tokio_stream::{StreamMap, pending}; |
| 291 | /// |
| 292 | /// let mut map = StreamMap::new(); |
| 293 | /// |
| 294 | /// map.insert("a", pending::<i32>()); |
| 295 | /// map.insert("b", pending()); |
| 296 | /// map.insert("c", pending()); |
| 297 | /// |
| 298 | /// for stream in map.values() { |
| 299 | /// println!("{:?}", stream); |
| 300 | /// } |
| 301 | /// ``` |
| 302 | pub fn values(&self) -> impl Iterator<Item = &V> { |
| 303 | self.iter().map(|(_, v)| v) |
| 304 | } |
| 305 | |
| 306 | /// An iterator visiting all values mutably in arbitrary order. |
| 307 | /// |
| 308 | /// The iterator element type is &'a mut V. |
| 309 | /// |
| 310 | /// # Examples |
| 311 | /// |
| 312 | /// ``` |
| 313 | /// use tokio_stream::{StreamMap, pending}; |
| 314 | /// |
| 315 | /// let mut map = StreamMap::new(); |
| 316 | /// |
| 317 | /// map.insert("a", pending::<i32>()); |
| 318 | /// map.insert("b", pending()); |
| 319 | /// map.insert("c", pending()); |
| 320 | /// |
| 321 | /// for stream in map.values_mut() { |
| 322 | /// println!("{:?}", stream); |
| 323 | /// } |
| 324 | /// ``` |
| 325 | pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { |
| 326 | self.iter_mut().map(|(_, v)| v) |
| 327 | } |
| 328 | |
| 329 | /// Returns the number of streams the map can hold without reallocating. |
| 330 | /// |
| 331 | /// This number is a lower bound; the `StreamMap` might be able to hold |
| 332 | /// more, but is guaranteed to be able to hold at least this many. |
| 333 | /// |
| 334 | /// # Examples |
| 335 | /// |
| 336 | /// ``` |
| 337 | /// use tokio_stream::{StreamMap, Pending}; |
| 338 | /// |
| 339 | /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); |
| 340 | /// assert!(map.capacity() >= 100); |
| 341 | /// ``` |
| 342 | pub fn capacity(&self) -> usize { |
| 343 | self.entries.capacity() |
| 344 | } |
| 345 | |
| 346 | /// Returns the number of streams in the map. |
| 347 | /// |
| 348 | /// # Examples |
| 349 | /// |
| 350 | /// ``` |
| 351 | /// use tokio_stream::{StreamMap, pending}; |
| 352 | /// |
| 353 | /// let mut a = StreamMap::new(); |
| 354 | /// assert_eq!(a.len(), 0); |
| 355 | /// a.insert(1, pending::<i32>()); |
| 356 | /// assert_eq!(a.len(), 1); |
| 357 | /// ``` |
| 358 | pub fn len(&self) -> usize { |
| 359 | self.entries.len() |
| 360 | } |
| 361 | |
| 362 | /// Returns `true` if the map contains no elements. |
| 363 | /// |
| 364 | /// # Examples |
| 365 | /// |
| 366 | /// ``` |
| 367 | /// use std::collections::HashMap; |
| 368 | /// |
| 369 | /// let mut a = HashMap::new(); |
| 370 | /// assert!(a.is_empty()); |
| 371 | /// a.insert(1, "a"); |
| 372 | /// assert!(!a.is_empty()); |
| 373 | /// ``` |
| 374 | pub fn is_empty(&self) -> bool { |
| 375 | self.entries.is_empty() |
| 376 | } |
| 377 | |
| 378 | /// Clears the map, removing all key-stream pairs. Keeps the allocated |
| 379 | /// memory for reuse. |
| 380 | /// |
| 381 | /// # Examples |
| 382 | /// |
| 383 | /// ``` |
| 384 | /// use tokio_stream::{StreamMap, pending}; |
| 385 | /// |
| 386 | /// let mut a = StreamMap::new(); |
| 387 | /// a.insert(1, pending::<i32>()); |
| 388 | /// a.clear(); |
| 389 | /// assert!(a.is_empty()); |
| 390 | /// ``` |
| 391 | pub fn clear(&mut self) { |
| 392 | self.entries.clear(); |
| 393 | } |
| 394 | |
| 395 | /// Insert a key-stream pair into the map. |
| 396 | /// |
| 397 | /// If the map did not have this key present, `None` is returned. |
| 398 | /// |
| 399 | /// If the map did have this key present, the new `stream` replaces the old |
| 400 | /// one and the old stream is returned. |
| 401 | /// |
| 402 | /// # Examples |
| 403 | /// |
| 404 | /// ``` |
| 405 | /// use tokio_stream::{StreamMap, pending}; |
| 406 | /// |
| 407 | /// let mut map = StreamMap::new(); |
| 408 | /// |
| 409 | /// assert!(map.insert(37, pending::<i32>()).is_none()); |
| 410 | /// assert!(!map.is_empty()); |
| 411 | /// |
| 412 | /// map.insert(37, pending()); |
| 413 | /// assert!(map.insert(37, pending()).is_some()); |
| 414 | /// ``` |
| 415 | pub fn insert(&mut self, k: K, stream: V) -> Option<V> |
| 416 | where |
| 417 | K: Hash + Eq, |
| 418 | { |
| 419 | let ret = self.remove(&k); |
| 420 | self.entries.push((k, stream)); |
| 421 | |
| 422 | ret |
| 423 | } |
| 424 | |
| 425 | /// Removes a key from the map, returning the stream at the key if the key was previously in the map. |
| 426 | /// |
| 427 | /// The key may be any borrowed form of the map's key type, but `Hash` and |
| 428 | /// `Eq` on the borrowed form must match those for the key type. |
| 429 | /// |
| 430 | /// # Examples |
| 431 | /// |
| 432 | /// ``` |
| 433 | /// use tokio_stream::{StreamMap, pending}; |
| 434 | /// |
| 435 | /// let mut map = StreamMap::new(); |
| 436 | /// map.insert(1, pending::<i32>()); |
| 437 | /// assert!(map.remove(&1).is_some()); |
| 438 | /// assert!(map.remove(&1).is_none()); |
| 439 | /// ``` |
| 440 | pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> |
| 441 | where |
| 442 | K: Borrow<Q>, |
| 443 | Q: Hash + Eq, |
| 444 | { |
| 445 | for i in 0..self.entries.len() { |
| 446 | if self.entries[i].0.borrow() == k { |
| 447 | return Some(self.entries.swap_remove(i).1); |
| 448 | } |
| 449 | } |
| 450 | |
| 451 | None |
| 452 | } |
| 453 | |
| 454 | /// Returns `true` if the map contains a stream for the specified key. |
| 455 | /// |
| 456 | /// The key may be any borrowed form of the map's key type, but `Hash` and |
| 457 | /// `Eq` on the borrowed form must match those for the key type. |
| 458 | /// |
| 459 | /// # Examples |
| 460 | /// |
| 461 | /// ``` |
| 462 | /// use tokio_stream::{StreamMap, pending}; |
| 463 | /// |
| 464 | /// let mut map = StreamMap::new(); |
| 465 | /// map.insert(1, pending::<i32>()); |
| 466 | /// assert_eq!(map.contains_key(&1), true); |
| 467 | /// assert_eq!(map.contains_key(&2), false); |
| 468 | /// ``` |
| 469 | pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool |
| 470 | where |
| 471 | K: Borrow<Q>, |
| 472 | Q: Hash + Eq, |
| 473 | { |
| 474 | for i in 0..self.entries.len() { |
| 475 | if self.entries[i].0.borrow() == k { |
| 476 | return true; |
| 477 | } |
| 478 | } |
| 479 | |
| 480 | false |
| 481 | } |
| 482 | } |
| 483 | |
| 484 | impl<K, V> StreamMap<K, V> |
| 485 | where |
| 486 | K: Unpin, |
| 487 | V: Stream + Unpin, |
| 488 | { |
| 489 | /// Polls the next value, includes the vec entry index |
| 490 | fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { |
| 491 | use Poll::*; |
| 492 | |
| 493 | let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; |
| 494 | let mut idx = start; |
| 495 | |
| 496 | for _ in 0..self.entries.len() { |
| 497 | let (_, stream) = &mut self.entries[idx]; |
| 498 | |
| 499 | match Pin::new(stream).poll_next(cx) { |
| 500 | Ready(Some(val)) => return Ready(Some((idx, val))), |
| 501 | Ready(None) => { |
| 502 | // Remove the entry |
| 503 | self.entries.swap_remove(idx); |
| 504 | |
| 505 | // Check if this was the last entry, if so the cursor needs |
| 506 | // to wrap |
| 507 | if idx == self.entries.len() { |
| 508 | idx = 0; |
| 509 | } else if idx < start && start <= self.entries.len() { |
| 510 | // The stream being swapped into the current index has |
| 511 | // already been polled, so skip it. |
| 512 | idx = idx.wrapping_add(1) % self.entries.len(); |
| 513 | } |
| 514 | } |
| 515 | Pending => { |
| 516 | idx = idx.wrapping_add(1) % self.entries.len(); |
| 517 | } |
| 518 | } |
| 519 | } |
| 520 | |
| 521 | // If the map is empty, then the stream is complete. |
| 522 | if self.entries.is_empty() { |
| 523 | Ready(None) |
| 524 | } else { |
| 525 | Pending |
| 526 | } |
| 527 | } |
| 528 | } |
| 529 | |
| 530 | impl<K, V> Default for StreamMap<K, V> { |
| 531 | fn default() -> Self { |
| 532 | Self::new() |
| 533 | } |
| 534 | } |
| 535 | |
| 536 | impl<K, V> Stream for StreamMap<K, V> |
| 537 | where |
| 538 | K: Clone + Unpin, |
| 539 | V: Stream + Unpin, |
| 540 | { |
| 541 | type Item = (K, V::Item); |
| 542 | |
| 543 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 544 | if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { |
| 545 | let key = self.entries[idx].0.clone(); |
| 546 | Poll::Ready(Some((key, val))) |
| 547 | } else { |
| 548 | Poll::Ready(None) |
| 549 | } |
| 550 | } |
| 551 | |
| 552 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 553 | let mut ret = (0, Some(0)); |
| 554 | |
| 555 | for (_, stream) in &self.entries { |
| 556 | let hint = stream.size_hint(); |
| 557 | |
| 558 | ret.0 += hint.0; |
| 559 | |
| 560 | match (ret.1, hint.1) { |
| 561 | (Some(a), Some(b)) => ret.1 = Some(a + b), |
| 562 | (Some(_), None) => ret.1 = None, |
| 563 | _ => {} |
| 564 | } |
| 565 | } |
| 566 | |
| 567 | ret |
| 568 | } |
| 569 | } |
| 570 | |
| 571 | mod rand { |
| 572 | use std::cell::Cell; |
| 573 | |
| 574 | mod loom { |
| 575 | #[cfg(not(loom))] |
| 576 | pub(crate) mod rand { |
| 577 | use std::collections::hash_map::RandomState; |
| 578 | use std::hash::{BuildHasher, Hash, Hasher}; |
| 579 | use std::sync::atomic::AtomicU32; |
| 580 | use std::sync::atomic::Ordering::Relaxed; |
| 581 | |
| 582 | static COUNTER: AtomicU32 = AtomicU32::new(1); |
| 583 | |
| 584 | pub(crate) fn seed() -> u64 { |
| 585 | let rand_state = RandomState::new(); |
| 586 | |
| 587 | let mut hasher = rand_state.build_hasher(); |
| 588 | |
| 589 | // Hash some unique-ish data to generate some new state |
| 590 | COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); |
| 591 | |
| 592 | // Get the seed |
| 593 | hasher.finish() |
| 594 | } |
| 595 | } |
| 596 | |
| 597 | #[cfg(loom)] |
| 598 | pub(crate) mod rand { |
| 599 | pub(crate) fn seed() -> u64 { |
| 600 | 1 |
| 601 | } |
| 602 | } |
| 603 | } |
| 604 | |
| 605 | /// Fast random number generate |
| 606 | /// |
| 607 | /// Implement xorshift64+: 2 32-bit xorshift sequences added together. |
| 608 | /// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's |
| 609 | /// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf |
| 610 | /// This generator passes the SmallCrush suite, part of TestU01 framework: |
| 611 | /// http://simul.iro.umontreal.ca/testu01/tu01.html |
| 612 | #[derive(Debug)] |
| 613 | pub(crate) struct FastRand { |
| 614 | one: Cell<u32>, |
| 615 | two: Cell<u32>, |
| 616 | } |
| 617 | |
| 618 | impl FastRand { |
| 619 | /// Initialize a new, thread-local, fast random number generator. |
| 620 | pub(crate) fn new(seed: u64) -> FastRand { |
| 621 | let one = (seed >> 32) as u32; |
| 622 | let mut two = seed as u32; |
| 623 | |
| 624 | if two == 0 { |
| 625 | // This value cannot be zero |
| 626 | two = 1; |
| 627 | } |
| 628 | |
| 629 | FastRand { |
| 630 | one: Cell::new(one), |
| 631 | two: Cell::new(two), |
| 632 | } |
| 633 | } |
| 634 | |
| 635 | pub(crate) fn fastrand_n(&self, n: u32) -> u32 { |
| 636 | // This is similar to fastrand() % n, but faster. |
| 637 | // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ |
| 638 | let mul = (self.fastrand() as u64).wrapping_mul(n as u64); |
| 639 | (mul >> 32) as u32 |
| 640 | } |
| 641 | |
| 642 | fn fastrand(&self) -> u32 { |
| 643 | let mut s1 = self.one.get(); |
| 644 | let s0 = self.two.get(); |
| 645 | |
| 646 | s1 ^= s1 << 17; |
| 647 | s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; |
| 648 | |
| 649 | self.one.set(s0); |
| 650 | self.two.set(s1); |
| 651 | |
| 652 | s0.wrapping_add(s1) |
| 653 | } |
| 654 | } |
| 655 | |
| 656 | // Used by `StreamMap` |
| 657 | pub(crate) fn thread_rng_n(n: u32) -> u32 { |
| 658 | thread_local! { |
| 659 | static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); |
| 660 | } |
| 661 | |
| 662 | THREAD_RNG.with(|rng| rng.fastrand_n(n)) |
| 663 | } |
| 664 | } |