Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 1 | use futures::channel::mpsc; |
| 2 | use futures::executor::block_on; |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 3 | use futures::future::Future; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 4 | use futures::sink::SinkExt; |
| 5 | use futures::stream::StreamExt; |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 6 | use futures::task::{Context, Poll}; |
| 7 | use std::pin::Pin; |
| 8 | use std::sync::{Arc, Weak}; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 9 | use std::thread; |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 10 | use std::time::{Duration, Instant}; |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 11 | |
| 12 | #[test] |
| 13 | fn smoke() { |
| 14 | let (mut sender, receiver) = mpsc::channel(1); |
| 15 | |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 16 | let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {}); |
Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 17 | |
| 18 | // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join. |
| 19 | block_on(receiver.take(3).for_each(|_| futures::future::ready(()))); |
| 20 | |
| 21 | t.join().unwrap() |
| 22 | } |
| 23 | |
| 24 | #[test] |
| 25 | fn multiple_senders_disconnect() { |
| 26 | { |
| 27 | let (mut tx1, mut rx) = mpsc::channel(1); |
| 28 | let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); |
| 29 | |
| 30 | // disconnect, dropping and Sink::poll_close should all close this sender but leave the |
| 31 | // channel open for other senders |
| 32 | tx1.disconnect(); |
| 33 | drop(tx2); |
| 34 | block_on(tx3.close()).unwrap(); |
| 35 | |
| 36 | assert!(tx1.is_closed()); |
| 37 | assert!(tx3.is_closed()); |
| 38 | assert!(!tx4.is_closed()); |
| 39 | |
| 40 | block_on(tx4.send(5)).unwrap(); |
| 41 | assert_eq!(block_on(rx.next()), Some(5)); |
| 42 | |
| 43 | // dropping the final sender will close the channel |
| 44 | drop(tx4); |
| 45 | assert_eq!(block_on(rx.next()), None); |
| 46 | } |
| 47 | |
| 48 | { |
| 49 | let (mut tx1, mut rx) = mpsc::unbounded(); |
| 50 | let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); |
| 51 | |
| 52 | // disconnect, dropping and Sink::poll_close should all close this sender but leave the |
| 53 | // channel open for other senders |
| 54 | tx1.disconnect(); |
| 55 | drop(tx2); |
| 56 | block_on(tx3.close()).unwrap(); |
| 57 | |
| 58 | assert!(tx1.is_closed()); |
| 59 | assert!(tx3.is_closed()); |
| 60 | assert!(!tx4.is_closed()); |
| 61 | |
| 62 | block_on(tx4.send(5)).unwrap(); |
| 63 | assert_eq!(block_on(rx.next()), Some(5)); |
| 64 | |
| 65 | // dropping the final sender will close the channel |
| 66 | drop(tx4); |
| 67 | assert_eq!(block_on(rx.next()), None); |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | #[test] |
| 72 | fn multiple_senders_close_channel() { |
| 73 | { |
| 74 | let (mut tx1, mut rx) = mpsc::channel(1); |
| 75 | let mut tx2 = tx1.clone(); |
| 76 | |
| 77 | // close_channel should shut down the whole channel |
| 78 | tx1.close_channel(); |
| 79 | |
| 80 | assert!(tx1.is_closed()); |
| 81 | assert!(tx2.is_closed()); |
| 82 | |
| 83 | let err = block_on(tx2.send(5)).unwrap_err(); |
| 84 | assert!(err.is_disconnected()); |
| 85 | |
| 86 | assert_eq!(block_on(rx.next()), None); |
| 87 | } |
| 88 | |
| 89 | { |
| 90 | let (tx1, mut rx) = mpsc::unbounded(); |
| 91 | let mut tx2 = tx1.clone(); |
| 92 | |
| 93 | // close_channel should shut down the whole channel |
| 94 | tx1.close_channel(); |
| 95 | |
| 96 | assert!(tx1.is_closed()); |
| 97 | assert!(tx2.is_closed()); |
| 98 | |
| 99 | let err = block_on(tx2.send(5)).unwrap_err(); |
| 100 | assert!(err.is_disconnected()); |
| 101 | |
| 102 | assert_eq!(block_on(rx.next()), None); |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | #[test] |
| 107 | fn single_receiver_drop_closes_channel_and_drains() { |
| 108 | { |
| 109 | let ref_count = Arc::new(0); |
| 110 | let weak_ref = Arc::downgrade(&ref_count); |
| 111 | |
| 112 | let (sender, receiver) = mpsc::unbounded(); |
| 113 | sender.unbounded_send(ref_count).expect("failed to send"); |
| 114 | |
| 115 | // Verify that the sent message is still live. |
| 116 | assert!(weak_ref.upgrade().is_some()); |
| 117 | |
| 118 | drop(receiver); |
| 119 | |
| 120 | // The sender should know the channel is closed. |
| 121 | assert!(sender.is_closed()); |
| 122 | |
| 123 | // Verify that the sent message has been dropped. |
| 124 | assert!(weak_ref.upgrade().is_none()); |
| 125 | } |
| 126 | |
| 127 | { |
| 128 | let ref_count = Arc::new(0); |
| 129 | let weak_ref = Arc::downgrade(&ref_count); |
| 130 | |
| 131 | let (mut sender, receiver) = mpsc::channel(1); |
| 132 | sender.try_send(ref_count).expect("failed to send"); |
| 133 | |
| 134 | // Verify that the sent message is still live. |
| 135 | assert!(weak_ref.upgrade().is_some()); |
| 136 | |
| 137 | drop(receiver); |
| 138 | |
| 139 | // The sender should know the channel is closed. |
| 140 | assert!(sender.is_closed()); |
| 141 | |
| 142 | // Verify that the sent message has been dropped. |
| 143 | assert!(weak_ref.upgrade().is_none()); |
| 144 | assert!(sender.is_closed()); |
| 145 | } |
| 146 | } |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 147 | |
| 148 | // Stress test that `try_send()`s occurring concurrently with receiver |
| 149 | // close/drops don't appear as successful sends. |
David LeGare | f7dc9c1 | 2022-03-02 00:20:44 +0000 | [diff] [blame] | 150 | #[cfg_attr(miri, ignore)] // Miri is too slow |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 151 | #[test] |
| 152 | fn stress_try_send_as_receiver_closes() { |
| 153 | const AMT: usize = 10000; |
| 154 | // To provide variable timing characteristics (in the hopes of |
| 155 | // reproducing the collision that leads to a race), we busy-re-poll |
| 156 | // the test MPSC receiver a variable number of times before actually |
| 157 | // stopping. We vary this countdown between 1 and the following |
| 158 | // value. |
| 159 | const MAX_COUNTDOWN: usize = 20; |
| 160 | // When we detect that a successfully sent item is still in the |
| 161 | // queue after a disconnect, we spin for up to 100ms to confirm that |
| 162 | // it is a persistent condition and not a concurrency illusion. |
| 163 | const SPIN_TIMEOUT_S: u64 = 10; |
| 164 | const SPIN_SLEEP_MS: u64 = 10; |
| 165 | struct TestRx { |
| 166 | rx: mpsc::Receiver<Arc<()>>, |
| 167 | // The number of times to query `rx` before dropping it. |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 168 | poll_count: usize, |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 169 | } |
| 170 | struct TestTask { |
| 171 | command_rx: mpsc::Receiver<TestRx>, |
| 172 | test_rx: Option<mpsc::Receiver<Arc<()>>>, |
| 173 | countdown: usize, |
| 174 | } |
| 175 | impl TestTask { |
| 176 | /// Create a new TestTask |
| 177 | fn new() -> (TestTask, mpsc::Sender<TestRx>) { |
| 178 | let (command_tx, command_rx) = mpsc::channel::<TestRx>(0); |
| 179 | ( |
| 180 | TestTask { |
| 181 | command_rx, |
| 182 | test_rx: None, |
| 183 | countdown: 0, // 0 means no countdown is in progress. |
| 184 | }, |
| 185 | command_tx, |
| 186 | ) |
| 187 | } |
| 188 | } |
| 189 | impl Future for TestTask { |
| 190 | type Output = (); |
| 191 | |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 192 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 193 | // Poll the test channel, if one is present. |
| 194 | if let Some(rx) = &mut self.test_rx { |
| 195 | if let Poll::Ready(v) = rx.poll_next_unpin(cx) { |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 196 | let _ = v.expect("test finished unexpectedly!"); |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 197 | } |
| 198 | self.countdown -= 1; |
| 199 | // Busy-poll until the countdown is finished. |
| 200 | cx.waker().wake_by_ref(); |
| 201 | } |
| 202 | // Accept any newly submitted MPSC channels for testing. |
| 203 | match self.command_rx.poll_next_unpin(cx) { |
| 204 | Poll::Ready(Some(TestRx { rx, poll_count })) => { |
| 205 | self.test_rx = Some(rx); |
| 206 | self.countdown = poll_count; |
| 207 | cx.waker().wake_by_ref(); |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 208 | } |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 209 | Poll::Ready(None) => return Poll::Ready(()), |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 210 | Poll::Pending => {} |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 211 | } |
| 212 | if self.countdown == 0 { |
| 213 | // Countdown complete -- drop the Receiver. |
| 214 | self.test_rx = None; |
| 215 | } |
| 216 | Poll::Pending |
| 217 | } |
| 218 | } |
| 219 | let (f, mut cmd_tx) = TestTask::new(); |
| 220 | let bg = thread::spawn(move || block_on(f)); |
| 221 | for i in 0..AMT { |
| 222 | let (mut test_tx, rx) = mpsc::channel(0); |
| 223 | let poll_count = i % MAX_COUNTDOWN; |
| 224 | cmd_tx.try_send(TestRx { rx, poll_count }).unwrap(); |
| 225 | let mut prev_weak: Option<Weak<()>> = None; |
| 226 | let mut attempted_sends = 0; |
| 227 | let mut successful_sends = 0; |
| 228 | loop { |
| 229 | // Create a test item. |
| 230 | let item = Arc::new(()); |
| 231 | let weak = Arc::downgrade(&item); |
| 232 | match test_tx.try_send(item) { |
| 233 | Ok(_) => { |
| 234 | prev_weak = Some(weak); |
| 235 | successful_sends += 1; |
| 236 | } |
| 237 | Err(ref e) if e.is_full() => {} |
| 238 | Err(ref e) if e.is_disconnected() => { |
| 239 | // Test for evidence of the race condition. |
| 240 | if let Some(prev_weak) = prev_weak { |
| 241 | if prev_weak.upgrade().is_some() { |
| 242 | // The previously sent item is still allocated. |
| 243 | // However, there appears to be some aspect of the |
| 244 | // concurrency that can legitimately cause the Arc |
| 245 | // to be momentarily valid. Spin for up to 100ms |
| 246 | // waiting for the previously sent item to be |
| 247 | // dropped. |
| 248 | let t0 = Instant::now(); |
| 249 | let mut spins = 0; |
| 250 | loop { |
| 251 | if prev_weak.upgrade().is_none() { |
| 252 | break; |
| 253 | } |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 254 | assert!( |
| 255 | t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 256 | "item not dropped on iteration {} after \ |
| 257 | {} sends ({} successful). spin=({})", |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 258 | i, |
| 259 | attempted_sends, |
| 260 | successful_sends, |
| 261 | spins |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 262 | ); |
| 263 | spins += 1; |
| 264 | thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); |
| 265 | } |
| 266 | } |
| 267 | } |
| 268 | break; |
| 269 | } |
| 270 | Err(ref e) => panic!("unexpected error: {}", e), |
| 271 | } |
| 272 | attempted_sends += 1; |
| 273 | } |
| 274 | } |
| 275 | drop(cmd_tx); |
Joel Galenson | cc0890a | 2021-05-19 15:09:47 -0700 | [diff] [blame] | 276 | bg.join().expect("background thread join"); |
| 277 | } |
| 278 | |
| 279 | #[test] |
| 280 | fn unbounded_try_next_after_none() { |
| 281 | let (tx, mut rx) = mpsc::unbounded::<String>(); |
| 282 | // Drop the sender, close the channel. |
| 283 | drop(tx); |
| 284 | // Receive the end of channel. |
| 285 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
| 286 | // None received, check we can call `try_next` again. |
| 287 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
| 288 | } |
| 289 | |
| 290 | #[test] |
| 291 | fn bounded_try_next_after_none() { |
| 292 | let (tx, mut rx) = mpsc::channel::<String>(17); |
| 293 | // Drop the sender, close the channel. |
| 294 | drop(tx); |
| 295 | // Receive the end of channel. |
| 296 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
| 297 | // None received, check we can call `try_next` again. |
| 298 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
Joel Galenson | 7980c2c | 2021-04-01 15:55:17 -0700 | [diff] [blame] | 299 | } |