Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 1 | use futures::channel::mpsc; |
| 2 | use futures::executor::block_on; |
| 3 | use futures::future::poll_fn; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 4 | use futures::sink::SinkExt; |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 5 | use futures::stream::StreamExt; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 7 | use std::thread; |
| 8 | |
| 9 | #[test] |
| 10 | fn sequence() { |
| 11 | let (tx, rx) = mpsc::channel(1); |
| 12 | |
| 13 | let amt = 20; |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 14 | let t = thread::spawn(move || block_on(send_sequence(amt, tx))); |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 15 | let list: Vec<_> = block_on(rx.collect()); |
| 16 | let mut list = list.into_iter(); |
| 17 | for i in (1..=amt).rev() { |
| 18 | assert_eq!(list.next(), Some(i)); |
| 19 | } |
| 20 | assert_eq!(list.next(), None); |
| 21 | |
| 22 | t.join().unwrap(); |
| 23 | } |
| 24 | |
| 25 | async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) { |
| 26 | for x in 0..n { |
| 27 | sender.send(n - x).await.unwrap(); |
| 28 | } |
| 29 | } |
| 30 | |
| 31 | #[test] |
| 32 | fn drop_sender() { |
| 33 | let (tx, mut rx) = mpsc::channel::<u32>(1); |
| 34 | drop(tx); |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 35 | let f = poll_fn(|cx| rx.poll_next_unpin(cx)); |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 36 | assert_eq!(block_on(f), None) |
| 37 | } |
| 38 | |
| 39 | #[test] |
| 40 | fn drop_rx() { |
| 41 | let (mut tx, rx) = mpsc::channel::<u32>(1); |
| 42 | block_on(tx.send(1)).unwrap(); |
| 43 | drop(rx); |
| 44 | assert!(block_on(tx.send(1)).is_err()); |
| 45 | } |
| 46 | |
| 47 | #[test] |
| 48 | fn drop_order() { |
| 49 | static DROPS: AtomicUsize = AtomicUsize::new(0); |
| 50 | let (mut tx, rx) = mpsc::channel(1); |
| 51 | |
| 52 | struct A; |
| 53 | |
| 54 | impl Drop for A { |
| 55 | fn drop(&mut self) { |
| 56 | DROPS.fetch_add(1, Ordering::SeqCst); |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | block_on(tx.send(A)).unwrap(); |
| 61 | assert_eq!(DROPS.load(Ordering::SeqCst), 0); |
| 62 | drop(rx); |
| 63 | assert_eq!(DROPS.load(Ordering::SeqCst), 1); |
| 64 | assert!(block_on(tx.send(A)).is_err()); |
| 65 | assert_eq!(DROPS.load(Ordering::SeqCst), 2); |
| 66 | } |