blob: 4d6e1798e1a55038dcf723f99d04fa07b48154e8 [file] [log] [blame]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001//! Tests copied from `std::sync::mpsc`.
2//!
3//! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4//! modified to work with `crossbeam-channel` instead.
5//!
6//! Minor tweaks were needed to make the tests compile:
7//!
8//! - Replace `box` syntax with `Box::new`.
9//! - Replace all uses of `Select` with `select!`.
10//! - Change the imports.
11//! - Join all spawned threads.
12//! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13//!
14//! Source:
15//! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16//!
17//! Copyright & License:
18//! - Copyright 2013-2014 The Rust Project Developers
19//! - Apache License, Version 2.0 or MIT license, at your option
20//! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21//! - https://www.rust-lang.org/en-US/legal.html
22
David LeGare54fc8482022-03-01 18:58:39 +000023#![allow(
24 clippy::drop_copy,
25 clippy::match_single_binding,
26 clippy::redundant_clone
27)]
28
Jakub Kotur2b588ff2020-12-21 17:28:14 +010029use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
30use std::sync::mpsc::{SendError, TrySendError};
31use std::thread::JoinHandle;
32use std::time::Duration;
33
34use crossbeam_channel as cc;
35
36pub struct Sender<T> {
37 pub inner: cc::Sender<T>,
38}
39
40impl<T> Sender<T> {
41 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
42 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
43 }
44}
45
46impl<T> Clone for Sender<T> {
47 fn clone(&self) -> Sender<T> {
48 Sender {
49 inner: self.inner.clone(),
50 }
51 }
52}
53
54pub struct SyncSender<T> {
55 pub inner: cc::Sender<T>,
56}
57
58impl<T> SyncSender<T> {
59 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
60 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
61 }
62
63 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
64 self.inner.try_send(t).map_err(|err| match err {
65 cc::TrySendError::Full(m) => TrySendError::Full(m),
66 cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
67 })
68 }
69}
70
71impl<T> Clone for SyncSender<T> {
72 fn clone(&self) -> SyncSender<T> {
73 SyncSender {
74 inner: self.inner.clone(),
75 }
76 }
77}
78
79pub struct Receiver<T> {
80 pub inner: cc::Receiver<T>,
81}
82
83impl<T> Receiver<T> {
84 pub fn try_recv(&self) -> Result<T, TryRecvError> {
85 self.inner.try_recv().map_err(|err| match err {
86 cc::TryRecvError::Empty => TryRecvError::Empty,
87 cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
88 })
89 }
90
91 pub fn recv(&self) -> Result<T, RecvError> {
92 self.inner.recv().map_err(|_| RecvError)
93 }
94
95 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
96 self.inner.recv_timeout(timeout).map_err(|err| match err {
97 cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
98 cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
99 })
100 }
101
102 pub fn iter(&self) -> Iter<T> {
103 Iter { inner: self }
104 }
105
106 pub fn try_iter(&self) -> TryIter<T> {
107 TryIter { inner: self }
108 }
109}
110
111impl<'a, T> IntoIterator for &'a Receiver<T> {
112 type Item = T;
113 type IntoIter = Iter<'a, T>;
114
115 fn into_iter(self) -> Iter<'a, T> {
116 self.iter()
117 }
118}
119
120impl<T> IntoIterator for Receiver<T> {
121 type Item = T;
122 type IntoIter = IntoIter<T>;
123
124 fn into_iter(self) -> IntoIter<T> {
125 IntoIter { inner: self }
126 }
127}
128
129pub struct TryIter<'a, T: 'a> {
130 inner: &'a Receiver<T>,
131}
132
133impl<'a, T> Iterator for TryIter<'a, T> {
134 type Item = T;
135
136 fn next(&mut self) -> Option<T> {
137 self.inner.try_recv().ok()
138 }
139}
140
141pub struct Iter<'a, T: 'a> {
142 inner: &'a Receiver<T>,
143}
144
145impl<'a, T> Iterator for Iter<'a, T> {
146 type Item = T;
147
148 fn next(&mut self) -> Option<T> {
149 self.inner.recv().ok()
150 }
151}
152
153pub struct IntoIter<T> {
154 inner: Receiver<T>,
155}
156
157impl<T> Iterator for IntoIter<T> {
158 type Item = T;
159
160 fn next(&mut self) -> Option<T> {
161 self.inner.recv().ok()
162 }
163}
164
165pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
166 let (s, r) = cc::unbounded();
167 let s = Sender { inner: s };
168 let r = Receiver { inner: r };
169 (s, r)
170}
171
172pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
173 let (s, r) = cc::bounded(bound);
174 let s = SyncSender { inner: s };
175 let r = Receiver { inner: r };
176 (s, r)
177}
178
179macro_rules! select {
180 (
181 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
182 ) => ({
183 cc::crossbeam_channel_internal! {
184 $(
David LeGare54fc8482022-03-01 18:58:39 +0000185 $meth(($rx).inner) -> res => {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100186 let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
187 $code
188 }
189 )+
190 }
191 })
192}
193
194// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
195mod channel_tests {
196 use super::*;
197
198 use std::env;
199 use std::thread;
200 use std::time::{Duration, Instant};
201
202 pub fn stress_factor() -> usize {
203 match env::var("RUST_TEST_STRESS") {
204 Ok(val) => val.parse().unwrap(),
205 Err(..) => 1,
206 }
207 }
208
209 #[test]
210 fn smoke() {
211 let (tx, rx) = channel::<i32>();
212 tx.send(1).unwrap();
213 assert_eq!(rx.recv().unwrap(), 1);
214 }
215
216 #[test]
217 fn drop_full() {
218 let (tx, _rx) = channel::<Box<isize>>();
219 tx.send(Box::new(1)).unwrap();
220 }
221
222 #[test]
223 fn drop_full_shared() {
224 let (tx, _rx) = channel::<Box<isize>>();
225 drop(tx.clone());
226 drop(tx.clone());
227 tx.send(Box::new(1)).unwrap();
228 }
229
230 #[test]
231 fn smoke_shared() {
232 let (tx, rx) = channel::<i32>();
233 tx.send(1).unwrap();
234 assert_eq!(rx.recv().unwrap(), 1);
235 let tx = tx.clone();
236 tx.send(1).unwrap();
237 assert_eq!(rx.recv().unwrap(), 1);
238 }
239
240 #[test]
241 fn smoke_threads() {
242 let (tx, rx) = channel::<i32>();
243 let t = thread::spawn(move || {
244 tx.send(1).unwrap();
245 });
246 assert_eq!(rx.recv().unwrap(), 1);
247 t.join().unwrap();
248 }
249
250 #[test]
251 fn smoke_port_gone() {
252 let (tx, rx) = channel::<i32>();
253 drop(rx);
254 assert!(tx.send(1).is_err());
255 }
256
257 #[test]
258 fn smoke_shared_port_gone() {
259 let (tx, rx) = channel::<i32>();
260 drop(rx);
261 assert!(tx.send(1).is_err())
262 }
263
264 #[test]
265 fn smoke_shared_port_gone2() {
266 let (tx, rx) = channel::<i32>();
267 drop(rx);
268 let tx2 = tx.clone();
269 drop(tx);
270 assert!(tx2.send(1).is_err());
271 }
272
273 #[test]
274 fn port_gone_concurrent() {
275 let (tx, rx) = channel::<i32>();
276 let t = thread::spawn(move || {
277 rx.recv().unwrap();
278 });
279 while tx.send(1).is_ok() {}
280 t.join().unwrap();
281 }
282
283 #[test]
284 fn port_gone_concurrent_shared() {
285 let (tx, rx) = channel::<i32>();
286 let tx2 = tx.clone();
287 let t = thread::spawn(move || {
288 rx.recv().unwrap();
289 });
290 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
291 t.join().unwrap();
292 }
293
294 #[test]
295 fn smoke_chan_gone() {
296 let (tx, rx) = channel::<i32>();
297 drop(tx);
298 assert!(rx.recv().is_err());
299 }
300
301 #[test]
302 fn smoke_chan_gone_shared() {
303 let (tx, rx) = channel::<()>();
304 let tx2 = tx.clone();
305 drop(tx);
306 drop(tx2);
307 assert!(rx.recv().is_err());
308 }
309
310 #[test]
311 fn chan_gone_concurrent() {
312 let (tx, rx) = channel::<i32>();
313 let t = thread::spawn(move || {
314 tx.send(1).unwrap();
315 tx.send(1).unwrap();
316 });
317 while rx.recv().is_ok() {}
318 t.join().unwrap();
319 }
320
321 #[test]
322 fn stress() {
David LeGare54fc8482022-03-01 18:58:39 +0000323 #[cfg(miri)]
324 const COUNT: usize = 500;
325 #[cfg(not(miri))]
326 const COUNT: usize = 10000;
327
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100328 let (tx, rx) = channel::<i32>();
329 let t = thread::spawn(move || {
David LeGare54fc8482022-03-01 18:58:39 +0000330 for _ in 0..COUNT {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100331 tx.send(1).unwrap();
332 }
333 });
David LeGare54fc8482022-03-01 18:58:39 +0000334 for _ in 0..COUNT {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100335 assert_eq!(rx.recv().unwrap(), 1);
336 }
337 t.join().ok().unwrap();
338 }
339
340 #[test]
341 fn stress_shared() {
David LeGare54fc8482022-03-01 18:58:39 +0000342 #[cfg(miri)]
343 const AMT: u32 = 500;
344 #[cfg(not(miri))]
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100345 const AMT: u32 = 10000;
346 const NTHREADS: u32 = 8;
347 let (tx, rx) = channel::<i32>();
348
349 let t = thread::spawn(move || {
350 for _ in 0..AMT * NTHREADS {
351 assert_eq!(rx.recv().unwrap(), 1);
352 }
David LeGare54fc8482022-03-01 18:58:39 +0000353 assert!(rx.try_recv().is_err());
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100354 });
355
356 let mut ts = Vec::with_capacity(NTHREADS as usize);
357 for _ in 0..NTHREADS {
358 let tx = tx.clone();
359 let t = thread::spawn(move || {
360 for _ in 0..AMT {
361 tx.send(1).unwrap();
362 }
363 });
364 ts.push(t);
365 }
366 drop(tx);
367 t.join().ok().unwrap();
368 for t in ts {
369 t.join().unwrap();
370 }
371 }
372
373 #[test]
374 fn send_from_outside_runtime() {
375 let (tx1, rx1) = channel::<()>();
376 let (tx2, rx2) = channel::<i32>();
377 let t1 = thread::spawn(move || {
378 tx1.send(()).unwrap();
379 for _ in 0..40 {
380 assert_eq!(rx2.recv().unwrap(), 1);
381 }
382 });
383 rx1.recv().unwrap();
384 let t2 = thread::spawn(move || {
385 for _ in 0..40 {
386 tx2.send(1).unwrap();
387 }
388 });
389 t1.join().ok().unwrap();
390 t2.join().ok().unwrap();
391 }
392
393 #[test]
394 fn recv_from_outside_runtime() {
395 let (tx, rx) = channel::<i32>();
396 let t = thread::spawn(move || {
397 for _ in 0..40 {
398 assert_eq!(rx.recv().unwrap(), 1);
399 }
400 });
401 for _ in 0..40 {
402 tx.send(1).unwrap();
403 }
404 t.join().ok().unwrap();
405 }
406
407 #[test]
408 fn no_runtime() {
409 let (tx1, rx1) = channel::<i32>();
410 let (tx2, rx2) = channel::<i32>();
411 let t1 = thread::spawn(move || {
412 assert_eq!(rx1.recv().unwrap(), 1);
413 tx2.send(2).unwrap();
414 });
415 let t2 = thread::spawn(move || {
416 tx1.send(1).unwrap();
417 assert_eq!(rx2.recv().unwrap(), 2);
418 });
419 t1.join().ok().unwrap();
420 t2.join().ok().unwrap();
421 }
422
423 #[test]
424 fn oneshot_single_thread_close_port_first() {
425 // Simple test of closing without sending
426 let (_tx, rx) = channel::<i32>();
427 drop(rx);
428 }
429
430 #[test]
431 fn oneshot_single_thread_close_chan_first() {
432 // Simple test of closing without sending
433 let (tx, _rx) = channel::<i32>();
434 drop(tx);
435 }
436
437 #[test]
438 fn oneshot_single_thread_send_port_close() {
439 // Testing that the sender cleans up the payload if receiver is closed
440 let (tx, rx) = channel::<Box<i32>>();
441 drop(rx);
442 assert!(tx.send(Box::new(0)).is_err());
443 }
444
445 #[test]
446 fn oneshot_single_thread_recv_chan_close() {
447 let (tx, rx) = channel::<i32>();
448 drop(tx);
449 assert_eq!(rx.recv(), Err(RecvError));
450 }
451
452 #[test]
453 fn oneshot_single_thread_send_then_recv() {
454 let (tx, rx) = channel::<Box<i32>>();
455 tx.send(Box::new(10)).unwrap();
456 assert!(*rx.recv().unwrap() == 10);
457 }
458
459 #[test]
460 fn oneshot_single_thread_try_send_open() {
461 let (tx, rx) = channel::<i32>();
462 assert!(tx.send(10).is_ok());
463 assert!(rx.recv().unwrap() == 10);
464 }
465
466 #[test]
467 fn oneshot_single_thread_try_send_closed() {
468 let (tx, rx) = channel::<i32>();
469 drop(rx);
470 assert!(tx.send(10).is_err());
471 }
472
473 #[test]
474 fn oneshot_single_thread_try_recv_open() {
475 let (tx, rx) = channel::<i32>();
476 tx.send(10).unwrap();
477 assert!(rx.recv() == Ok(10));
478 }
479
480 #[test]
481 fn oneshot_single_thread_try_recv_closed() {
482 let (tx, rx) = channel::<i32>();
483 drop(tx);
484 assert!(rx.recv().is_err());
485 }
486
487 #[test]
488 fn oneshot_single_thread_peek_data() {
489 let (tx, rx) = channel::<i32>();
490 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
491 tx.send(10).unwrap();
492 assert_eq!(rx.try_recv(), Ok(10));
493 }
494
495 #[test]
496 fn oneshot_single_thread_peek_close() {
497 let (tx, rx) = channel::<i32>();
498 drop(tx);
499 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
500 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
501 }
502
503 #[test]
504 fn oneshot_single_thread_peek_open() {
505 let (_tx, rx) = channel::<i32>();
506 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
507 }
508
509 #[test]
510 fn oneshot_multi_task_recv_then_send() {
511 let (tx, rx) = channel::<Box<i32>>();
512 let t = thread::spawn(move || {
513 assert!(*rx.recv().unwrap() == 10);
514 });
515
516 tx.send(Box::new(10)).unwrap();
517 t.join().unwrap();
518 }
519
520 #[test]
521 fn oneshot_multi_task_recv_then_close() {
522 let (tx, rx) = channel::<Box<i32>>();
523 let t = thread::spawn(move || {
524 drop(tx);
525 });
526 thread::spawn(move || {
527 assert_eq!(rx.recv(), Err(RecvError));
528 })
529 .join()
530 .unwrap();
531 t.join().unwrap();
532 }
533
534 #[test]
535 fn oneshot_multi_thread_close_stress() {
536 let stress_factor = stress_factor();
537 let mut ts = Vec::with_capacity(stress_factor);
538 for _ in 0..stress_factor {
539 let (tx, rx) = channel::<i32>();
540 let t = thread::spawn(move || {
541 drop(rx);
542 });
543 ts.push(t);
544 drop(tx);
545 }
546 for t in ts {
547 t.join().unwrap();
548 }
549 }
550
551 #[test]
552 fn oneshot_multi_thread_send_close_stress() {
553 let stress_factor = stress_factor();
554 let mut ts = Vec::with_capacity(2 * stress_factor);
555 for _ in 0..stress_factor {
556 let (tx, rx) = channel::<i32>();
557 let t = thread::spawn(move || {
558 drop(rx);
559 });
560 ts.push(t);
561 thread::spawn(move || {
562 let _ = tx.send(1);
563 })
564 .join()
565 .unwrap();
566 }
567 for t in ts {
568 t.join().unwrap();
569 }
570 }
571
572 #[test]
573 fn oneshot_multi_thread_recv_close_stress() {
574 let stress_factor = stress_factor();
575 let mut ts = Vec::with_capacity(2 * stress_factor);
576 for _ in 0..stress_factor {
577 let (tx, rx) = channel::<i32>();
578 let t = thread::spawn(move || {
579 thread::spawn(move || {
580 assert_eq!(rx.recv(), Err(RecvError));
581 })
582 .join()
583 .unwrap();
584 });
585 ts.push(t);
586 let t2 = thread::spawn(move || {
587 let t = thread::spawn(move || {
588 drop(tx);
589 });
590 t.join().unwrap();
591 });
592 ts.push(t2);
593 }
594 for t in ts {
595 t.join().unwrap();
596 }
597 }
598
599 #[test]
600 fn oneshot_multi_thread_send_recv_stress() {
601 let stress_factor = stress_factor();
602 let mut ts = Vec::with_capacity(stress_factor);
603 for _ in 0..stress_factor {
604 let (tx, rx) = channel::<Box<isize>>();
605 let t = thread::spawn(move || {
606 tx.send(Box::new(10)).unwrap();
607 });
608 ts.push(t);
609 assert!(*rx.recv().unwrap() == 10);
610 }
611 for t in ts {
612 t.join().unwrap();
613 }
614 }
615
616 #[test]
617 fn stream_send_recv_stress() {
618 let stress_factor = stress_factor();
619 let mut ts = Vec::with_capacity(2 * stress_factor);
620 for _ in 0..stress_factor {
621 let (tx, rx) = channel();
622
623 if let Some(t) = send(tx, 0) {
624 ts.push(t);
625 }
626 if let Some(t2) = recv(rx, 0) {
627 ts.push(t2);
628 }
629
630 fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
631 if i == 10 {
632 return None;
633 }
634
635 Some(thread::spawn(move || {
636 tx.send(Box::new(i)).unwrap();
637 send(tx, i + 1);
638 }))
639 }
640
641 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
642 if i == 10 {
643 return None;
644 }
645
646 Some(thread::spawn(move || {
647 assert!(*rx.recv().unwrap() == i);
648 recv(rx, i + 1);
649 }))
650 }
651 }
652 for t in ts {
653 t.join().unwrap();
654 }
655 }
656
657 #[test]
658 fn oneshot_single_thread_recv_timeout() {
659 let (tx, rx) = channel();
660 tx.send(()).unwrap();
661 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
662 assert_eq!(
663 rx.recv_timeout(Duration::from_millis(1)),
664 Err(RecvTimeoutError::Timeout)
665 );
666 tx.send(()).unwrap();
667 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
668 }
669
670 #[test]
671 fn stress_recv_timeout_two_threads() {
672 let (tx, rx) = channel();
673 let stress = stress_factor() + 100;
674 let timeout = Duration::from_millis(100);
675
676 let t = thread::spawn(move || {
677 for i in 0..stress {
678 if i % 2 == 0 {
679 thread::sleep(timeout * 2);
680 }
681 tx.send(1usize).unwrap();
682 }
683 });
684
685 let mut recv_count = 0;
686 loop {
687 match rx.recv_timeout(timeout) {
688 Ok(n) => {
689 assert_eq!(n, 1usize);
690 recv_count += 1;
691 }
692 Err(RecvTimeoutError::Timeout) => continue,
693 Err(RecvTimeoutError::Disconnected) => break,
694 }
695 }
696
697 assert_eq!(recv_count, stress);
698 t.join().unwrap()
699 }
700
701 #[test]
702 fn recv_timeout_upgrade() {
703 let (tx, rx) = channel::<()>();
704 let timeout = Duration::from_millis(1);
705 let _tx_clone = tx.clone();
706
707 let start = Instant::now();
708 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
709 assert!(Instant::now() >= start + timeout);
710 }
711
712 #[test]
713 fn stress_recv_timeout_shared() {
714 let (tx, rx) = channel();
715 let stress = stress_factor() + 100;
716
717 let mut ts = Vec::with_capacity(stress);
718 for i in 0..stress {
719 let tx = tx.clone();
720 let t = thread::spawn(move || {
721 thread::sleep(Duration::from_millis(i as u64 * 10));
722 tx.send(1usize).unwrap();
723 });
724 ts.push(t);
725 }
726
727 drop(tx);
728
729 let mut recv_count = 0;
730 loop {
731 match rx.recv_timeout(Duration::from_millis(10)) {
732 Ok(n) => {
733 assert_eq!(n, 1usize);
734 recv_count += 1;
735 }
736 Err(RecvTimeoutError::Timeout) => continue,
737 Err(RecvTimeoutError::Disconnected) => break,
738 }
739 }
740
741 assert_eq!(recv_count, stress);
742 for t in ts {
743 t.join().unwrap();
744 }
745 }
746
747 #[test]
748 fn recv_a_lot() {
David LeGare54fc8482022-03-01 18:58:39 +0000749 #[cfg(miri)]
750 const N: usize = 100;
751 #[cfg(not(miri))]
752 const N: usize = 10000;
753
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100754 // Regression test that we don't run out of stack in scheduler context
755 let (tx, rx) = channel();
David LeGare54fc8482022-03-01 18:58:39 +0000756 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100757 tx.send(()).unwrap();
758 }
David LeGare54fc8482022-03-01 18:58:39 +0000759 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100760 rx.recv().unwrap();
761 }
762 }
763
764 #[test]
765 fn shared_recv_timeout() {
766 let (tx, rx) = channel();
767 let total = 5;
768 let mut ts = Vec::with_capacity(total);
769 for _ in 0..total {
770 let tx = tx.clone();
771 let t = thread::spawn(move || {
772 tx.send(()).unwrap();
773 });
774 ts.push(t);
775 }
776
777 for _ in 0..total {
778 rx.recv().unwrap();
779 }
780
781 assert_eq!(
782 rx.recv_timeout(Duration::from_millis(1)),
783 Err(RecvTimeoutError::Timeout)
784 );
785 tx.send(()).unwrap();
786 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
787 for t in ts {
788 t.join().unwrap();
789 }
790 }
791
792 #[test]
793 fn shared_chan_stress() {
794 let (tx, rx) = channel();
795 let total = stress_factor() + 100;
796 let mut ts = Vec::with_capacity(total);
797 for _ in 0..total {
798 let tx = tx.clone();
799 let t = thread::spawn(move || {
800 tx.send(()).unwrap();
801 });
802 ts.push(t);
803 }
804
805 for _ in 0..total {
806 rx.recv().unwrap();
807 }
808 for t in ts {
809 t.join().unwrap();
810 }
811 }
812
813 #[test]
814 fn test_nested_recv_iter() {
815 let (tx, rx) = channel::<i32>();
816 let (total_tx, total_rx) = channel::<i32>();
817
818 let t = thread::spawn(move || {
819 let mut acc = 0;
820 for x in rx.iter() {
821 acc += x;
822 }
823 total_tx.send(acc).unwrap();
824 });
825
826 tx.send(3).unwrap();
827 tx.send(1).unwrap();
828 tx.send(2).unwrap();
829 drop(tx);
830 assert_eq!(total_rx.recv().unwrap(), 6);
831 t.join().unwrap();
832 }
833
834 #[test]
835 fn test_recv_iter_break() {
836 let (tx, rx) = channel::<i32>();
837 let (count_tx, count_rx) = channel();
838
839 let t = thread::spawn(move || {
840 let mut count = 0;
841 for x in rx.iter() {
842 if count >= 3 {
843 break;
844 } else {
845 count += x;
846 }
847 }
848 count_tx.send(count).unwrap();
849 });
850
851 tx.send(2).unwrap();
852 tx.send(2).unwrap();
853 tx.send(2).unwrap();
854 let _ = tx.send(2);
855 drop(tx);
856 assert_eq!(count_rx.recv().unwrap(), 4);
857 t.join().unwrap();
858 }
859
860 #[test]
861 fn test_recv_try_iter() {
862 let (request_tx, request_rx) = channel();
863 let (response_tx, response_rx) = channel();
864
865 // Request `x`s until we have `6`.
866 let t = thread::spawn(move || {
867 let mut count = 0;
868 loop {
869 for x in response_rx.try_iter() {
870 count += x;
871 if count == 6 {
872 return count;
873 }
874 }
875 request_tx.send(()).unwrap();
876 }
877 });
878
879 for _ in request_rx.iter() {
880 if response_tx.send(2).is_err() {
881 break;
882 }
883 }
884
885 assert_eq!(t.join().unwrap(), 6);
886 }
887
888 #[test]
889 fn test_recv_into_iter_owned() {
890 let mut iter = {
891 let (tx, rx) = channel::<i32>();
892 tx.send(1).unwrap();
893 tx.send(2).unwrap();
894
895 rx.into_iter()
896 };
897 assert_eq!(iter.next().unwrap(), 1);
898 assert_eq!(iter.next().unwrap(), 2);
David LeGare54fc8482022-03-01 18:58:39 +0000899 assert!(iter.next().is_none());
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100900 }
901
902 #[test]
903 fn test_recv_into_iter_borrowed() {
904 let (tx, rx) = channel::<i32>();
905 tx.send(1).unwrap();
906 tx.send(2).unwrap();
907 drop(tx);
908 let mut iter = (&rx).into_iter();
909 assert_eq!(iter.next().unwrap(), 1);
910 assert_eq!(iter.next().unwrap(), 2);
David LeGare54fc8482022-03-01 18:58:39 +0000911 assert!(iter.next().is_none());
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100912 }
913
914 #[test]
915 fn try_recv_states() {
916 let (tx1, rx1) = channel::<i32>();
917 let (tx2, rx2) = channel::<()>();
918 let (tx3, rx3) = channel::<()>();
919 let t = thread::spawn(move || {
920 rx2.recv().unwrap();
921 tx1.send(1).unwrap();
922 tx3.send(()).unwrap();
923 rx2.recv().unwrap();
924 drop(tx1);
925 tx3.send(()).unwrap();
926 });
927
928 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
929 tx2.send(()).unwrap();
930 rx3.recv().unwrap();
931 assert_eq!(rx1.try_recv(), Ok(1));
932 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
933 tx2.send(()).unwrap();
934 rx3.recv().unwrap();
935 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
936 t.join().unwrap();
937 }
938
939 // This bug used to end up in a livelock inside of the Receiver destructor
940 // because the internal state of the Shared packet was corrupted
941 #[test]
942 fn destroy_upgraded_shared_port_when_sender_still_active() {
943 let (tx, rx) = channel();
944 let (tx2, rx2) = channel();
945 let t = thread::spawn(move || {
946 rx.recv().unwrap(); // wait on a oneshot
947 drop(rx); // destroy a shared
948 tx2.send(()).unwrap();
949 });
950 // make sure the other thread has gone to sleep
951 for _ in 0..5000 {
952 thread::yield_now();
953 }
954
955 // upgrade to a shared chan and send a message
956 let tx2 = tx.clone();
957 drop(tx);
958 tx2.send(()).unwrap();
959
960 // wait for the child thread to exit before we exit
961 rx2.recv().unwrap();
962 t.join().unwrap();
963 }
964
965 #[test]
966 fn issue_32114() {
967 let (tx, _) = channel();
968 let _ = tx.send(123);
969 assert_eq!(tx.send(123), Err(SendError(123)));
970 }
971}
972
973// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
974mod sync_channel_tests {
975 use super::*;
976
977 use std::env;
978 use std::thread;
979 use std::time::Duration;
980
981 pub fn stress_factor() -> usize {
982 match env::var("RUST_TEST_STRESS") {
983 Ok(val) => val.parse().unwrap(),
984 Err(..) => 1,
985 }
986 }
987
988 #[test]
989 fn smoke() {
990 let (tx, rx) = sync_channel::<i32>(1);
991 tx.send(1).unwrap();
992 assert_eq!(rx.recv().unwrap(), 1);
993 }
994
995 #[test]
996 fn drop_full() {
997 let (tx, _rx) = sync_channel::<Box<isize>>(1);
998 tx.send(Box::new(1)).unwrap();
999 }
1000
1001 #[test]
1002 fn smoke_shared() {
1003 let (tx, rx) = sync_channel::<i32>(1);
1004 tx.send(1).unwrap();
1005 assert_eq!(rx.recv().unwrap(), 1);
1006 let tx = tx.clone();
1007 tx.send(1).unwrap();
1008 assert_eq!(rx.recv().unwrap(), 1);
1009 }
1010
1011 #[test]
1012 fn recv_timeout() {
1013 let (tx, rx) = sync_channel::<i32>(1);
1014 assert_eq!(
1015 rx.recv_timeout(Duration::from_millis(1)),
1016 Err(RecvTimeoutError::Timeout)
1017 );
1018 tx.send(1).unwrap();
1019 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1020 }
1021
1022 #[test]
1023 fn smoke_threads() {
1024 let (tx, rx) = sync_channel::<i32>(0);
1025 let t = thread::spawn(move || {
1026 tx.send(1).unwrap();
1027 });
1028 assert_eq!(rx.recv().unwrap(), 1);
1029 t.join().unwrap();
1030 }
1031
1032 #[test]
1033 fn smoke_port_gone() {
1034 let (tx, rx) = sync_channel::<i32>(0);
1035 drop(rx);
1036 assert!(tx.send(1).is_err());
1037 }
1038
1039 #[test]
1040 fn smoke_shared_port_gone2() {
1041 let (tx, rx) = sync_channel::<i32>(0);
1042 drop(rx);
1043 let tx2 = tx.clone();
1044 drop(tx);
1045 assert!(tx2.send(1).is_err());
1046 }
1047
1048 #[test]
1049 fn port_gone_concurrent() {
1050 let (tx, rx) = sync_channel::<i32>(0);
1051 let t = thread::spawn(move || {
1052 rx.recv().unwrap();
1053 });
1054 while tx.send(1).is_ok() {}
1055 t.join().unwrap();
1056 }
1057
1058 #[test]
1059 fn port_gone_concurrent_shared() {
1060 let (tx, rx) = sync_channel::<i32>(0);
1061 let tx2 = tx.clone();
1062 let t = thread::spawn(move || {
1063 rx.recv().unwrap();
1064 });
1065 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1066 t.join().unwrap();
1067 }
1068
1069 #[test]
1070 fn smoke_chan_gone() {
1071 let (tx, rx) = sync_channel::<i32>(0);
1072 drop(tx);
1073 assert!(rx.recv().is_err());
1074 }
1075
1076 #[test]
1077 fn smoke_chan_gone_shared() {
1078 let (tx, rx) = sync_channel::<()>(0);
1079 let tx2 = tx.clone();
1080 drop(tx);
1081 drop(tx2);
1082 assert!(rx.recv().is_err());
1083 }
1084
1085 #[test]
1086 fn chan_gone_concurrent() {
1087 let (tx, rx) = sync_channel::<i32>(0);
1088 let t = thread::spawn(move || {
1089 tx.send(1).unwrap();
1090 tx.send(1).unwrap();
1091 });
1092 while rx.recv().is_ok() {}
1093 t.join().unwrap();
1094 }
1095
1096 #[test]
1097 fn stress() {
David LeGare54fc8482022-03-01 18:58:39 +00001098 #[cfg(miri)]
1099 const N: usize = 100;
1100 #[cfg(not(miri))]
1101 const N: usize = 10000;
1102
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001103 let (tx, rx) = sync_channel::<i32>(0);
1104 let t = thread::spawn(move || {
David LeGare54fc8482022-03-01 18:58:39 +00001105 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001106 tx.send(1).unwrap();
1107 }
1108 });
David LeGare54fc8482022-03-01 18:58:39 +00001109 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001110 assert_eq!(rx.recv().unwrap(), 1);
1111 }
1112 t.join().unwrap();
1113 }
1114
1115 #[test]
1116 fn stress_recv_timeout_two_threads() {
David LeGare54fc8482022-03-01 18:58:39 +00001117 #[cfg(miri)]
1118 const N: usize = 100;
1119 #[cfg(not(miri))]
1120 const N: usize = 10000;
1121
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001122 let (tx, rx) = sync_channel::<i32>(0);
1123
1124 let t = thread::spawn(move || {
David LeGare54fc8482022-03-01 18:58:39 +00001125 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001126 tx.send(1).unwrap();
1127 }
1128 });
1129
1130 let mut recv_count = 0;
1131 loop {
1132 match rx.recv_timeout(Duration::from_millis(1)) {
1133 Ok(v) => {
1134 assert_eq!(v, 1);
1135 recv_count += 1;
1136 }
1137 Err(RecvTimeoutError::Timeout) => continue,
1138 Err(RecvTimeoutError::Disconnected) => break,
1139 }
1140 }
1141
David LeGare54fc8482022-03-01 18:58:39 +00001142 assert_eq!(recv_count, N);
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001143 t.join().unwrap();
1144 }
1145
1146 #[test]
1147 fn stress_recv_timeout_shared() {
David LeGare54fc8482022-03-01 18:58:39 +00001148 #[cfg(miri)]
1149 const AMT: u32 = 100;
1150 #[cfg(not(miri))]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001151 const AMT: u32 = 1000;
1152 const NTHREADS: u32 = 8;
1153 let (tx, rx) = sync_channel::<i32>(0);
1154 let (dtx, drx) = sync_channel::<()>(0);
1155
1156 let t = thread::spawn(move || {
1157 let mut recv_count = 0;
1158 loop {
1159 match rx.recv_timeout(Duration::from_millis(10)) {
1160 Ok(v) => {
1161 assert_eq!(v, 1);
1162 recv_count += 1;
1163 }
1164 Err(RecvTimeoutError::Timeout) => continue,
1165 Err(RecvTimeoutError::Disconnected) => break,
1166 }
1167 }
1168
1169 assert_eq!(recv_count, AMT * NTHREADS);
1170 assert!(rx.try_recv().is_err());
1171
1172 dtx.send(()).unwrap();
1173 });
1174
1175 let mut ts = Vec::with_capacity(NTHREADS as usize);
1176 for _ in 0..NTHREADS {
1177 let tx = tx.clone();
1178 let t = thread::spawn(move || {
1179 for _ in 0..AMT {
1180 tx.send(1).unwrap();
1181 }
1182 });
1183 ts.push(t);
1184 }
1185
1186 drop(tx);
1187
1188 drx.recv().unwrap();
1189 for t in ts {
1190 t.join().unwrap();
1191 }
1192 t.join().unwrap();
1193 }
1194
1195 #[test]
1196 fn stress_shared() {
David LeGare54fc8482022-03-01 18:58:39 +00001197 #[cfg(miri)]
1198 const AMT: u32 = 100;
1199 #[cfg(not(miri))]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001200 const AMT: u32 = 1000;
1201 const NTHREADS: u32 = 8;
1202 let (tx, rx) = sync_channel::<i32>(0);
1203 let (dtx, drx) = sync_channel::<()>(0);
1204
1205 let t = thread::spawn(move || {
1206 for _ in 0..AMT * NTHREADS {
1207 assert_eq!(rx.recv().unwrap(), 1);
1208 }
David LeGare54fc8482022-03-01 18:58:39 +00001209 assert!(rx.try_recv().is_err());
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001210 dtx.send(()).unwrap();
1211 });
1212
1213 let mut ts = Vec::with_capacity(NTHREADS as usize);
1214 for _ in 0..NTHREADS {
1215 let tx = tx.clone();
1216 let t = thread::spawn(move || {
1217 for _ in 0..AMT {
1218 tx.send(1).unwrap();
1219 }
1220 });
1221 ts.push(t);
1222 }
1223 drop(tx);
1224 drx.recv().unwrap();
1225 for t in ts {
1226 t.join().unwrap();
1227 }
1228 t.join().unwrap();
1229 }
1230
1231 #[test]
1232 fn oneshot_single_thread_close_port_first() {
1233 // Simple test of closing without sending
1234 let (_tx, rx) = sync_channel::<i32>(0);
1235 drop(rx);
1236 }
1237
1238 #[test]
1239 fn oneshot_single_thread_close_chan_first() {
1240 // Simple test of closing without sending
1241 let (tx, _rx) = sync_channel::<i32>(0);
1242 drop(tx);
1243 }
1244
1245 #[test]
1246 fn oneshot_single_thread_send_port_close() {
1247 // Testing that the sender cleans up the payload if receiver is closed
1248 let (tx, rx) = sync_channel::<Box<i32>>(0);
1249 drop(rx);
1250 assert!(tx.send(Box::new(0)).is_err());
1251 }
1252
1253 #[test]
1254 fn oneshot_single_thread_recv_chan_close() {
1255 let (tx, rx) = sync_channel::<i32>(0);
1256 drop(tx);
1257 assert_eq!(rx.recv(), Err(RecvError));
1258 }
1259
1260 #[test]
1261 fn oneshot_single_thread_send_then_recv() {
1262 let (tx, rx) = sync_channel::<Box<i32>>(1);
1263 tx.send(Box::new(10)).unwrap();
1264 assert!(*rx.recv().unwrap() == 10);
1265 }
1266
1267 #[test]
1268 fn oneshot_single_thread_try_send_open() {
1269 let (tx, rx) = sync_channel::<i32>(1);
1270 assert_eq!(tx.try_send(10), Ok(()));
1271 assert!(rx.recv().unwrap() == 10);
1272 }
1273
1274 #[test]
1275 fn oneshot_single_thread_try_send_closed() {
1276 let (tx, rx) = sync_channel::<i32>(0);
1277 drop(rx);
1278 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1279 }
1280
1281 #[test]
1282 fn oneshot_single_thread_try_send_closed2() {
1283 let (tx, _rx) = sync_channel::<i32>(0);
1284 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1285 }
1286
1287 #[test]
1288 fn oneshot_single_thread_try_recv_open() {
1289 let (tx, rx) = sync_channel::<i32>(1);
1290 tx.send(10).unwrap();
1291 assert!(rx.recv() == Ok(10));
1292 }
1293
1294 #[test]
1295 fn oneshot_single_thread_try_recv_closed() {
1296 let (tx, rx) = sync_channel::<i32>(0);
1297 drop(tx);
1298 assert!(rx.recv().is_err());
1299 }
1300
1301 #[test]
1302 fn oneshot_single_thread_try_recv_closed_with_data() {
1303 let (tx, rx) = sync_channel::<i32>(1);
1304 tx.send(10).unwrap();
1305 drop(tx);
1306 assert_eq!(rx.try_recv(), Ok(10));
1307 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1308 }
1309
1310 #[test]
1311 fn oneshot_single_thread_peek_data() {
1312 let (tx, rx) = sync_channel::<i32>(1);
1313 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1314 tx.send(10).unwrap();
1315 assert_eq!(rx.try_recv(), Ok(10));
1316 }
1317
1318 #[test]
1319 fn oneshot_single_thread_peek_close() {
1320 let (tx, rx) = sync_channel::<i32>(0);
1321 drop(tx);
1322 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1323 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1324 }
1325
1326 #[test]
1327 fn oneshot_single_thread_peek_open() {
1328 let (_tx, rx) = sync_channel::<i32>(0);
1329 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1330 }
1331
1332 #[test]
1333 fn oneshot_multi_task_recv_then_send() {
1334 let (tx, rx) = sync_channel::<Box<i32>>(0);
1335 let t = thread::spawn(move || {
1336 assert!(*rx.recv().unwrap() == 10);
1337 });
1338
1339 tx.send(Box::new(10)).unwrap();
1340 t.join().unwrap();
1341 }
1342
1343 #[test]
1344 fn oneshot_multi_task_recv_then_close() {
1345 let (tx, rx) = sync_channel::<Box<i32>>(0);
1346 let t = thread::spawn(move || {
1347 drop(tx);
1348 });
1349 thread::spawn(move || {
1350 assert_eq!(rx.recv(), Err(RecvError));
1351 })
1352 .join()
1353 .unwrap();
1354 t.join().unwrap();
1355 }
1356
1357 #[test]
1358 fn oneshot_multi_thread_close_stress() {
1359 let stress_factor = stress_factor();
1360 let mut ts = Vec::with_capacity(stress_factor);
1361 for _ in 0..stress_factor {
1362 let (tx, rx) = sync_channel::<i32>(0);
1363 let t = thread::spawn(move || {
1364 drop(rx);
1365 });
1366 ts.push(t);
1367 drop(tx);
1368 }
1369 for t in ts {
1370 t.join().unwrap();
1371 }
1372 }
1373
1374 #[test]
1375 fn oneshot_multi_thread_send_close_stress() {
1376 let stress_factor = stress_factor();
1377 let mut ts = Vec::with_capacity(stress_factor);
1378 for _ in 0..stress_factor {
1379 let (tx, rx) = sync_channel::<i32>(0);
1380 let t = thread::spawn(move || {
1381 drop(rx);
1382 });
1383 ts.push(t);
1384 thread::spawn(move || {
1385 let _ = tx.send(1);
1386 })
1387 .join()
1388 .unwrap();
1389 }
1390 for t in ts {
1391 t.join().unwrap();
1392 }
1393 }
1394
1395 #[test]
1396 fn oneshot_multi_thread_recv_close_stress() {
1397 let stress_factor = stress_factor();
1398 let mut ts = Vec::with_capacity(2 * stress_factor);
1399 for _ in 0..stress_factor {
1400 let (tx, rx) = sync_channel::<i32>(0);
1401 let t = thread::spawn(move || {
1402 thread::spawn(move || {
1403 assert_eq!(rx.recv(), Err(RecvError));
1404 })
1405 .join()
1406 .unwrap();
1407 });
1408 ts.push(t);
1409 let t2 = thread::spawn(move || {
1410 thread::spawn(move || {
1411 drop(tx);
1412 });
1413 });
1414 ts.push(t2);
1415 }
1416 for t in ts {
1417 t.join().unwrap();
1418 }
1419 }
1420
1421 #[test]
1422 fn oneshot_multi_thread_send_recv_stress() {
1423 let stress_factor = stress_factor();
1424 let mut ts = Vec::with_capacity(stress_factor);
1425 for _ in 0..stress_factor {
1426 let (tx, rx) = sync_channel::<Box<i32>>(0);
1427 let t = thread::spawn(move || {
1428 tx.send(Box::new(10)).unwrap();
1429 });
1430 ts.push(t);
1431 assert!(*rx.recv().unwrap() == 10);
1432 }
1433 for t in ts {
1434 t.join().unwrap();
1435 }
1436 }
1437
1438 #[test]
1439 fn stream_send_recv_stress() {
1440 let stress_factor = stress_factor();
1441 let mut ts = Vec::with_capacity(2 * stress_factor);
1442 for _ in 0..stress_factor {
1443 let (tx, rx) = sync_channel::<Box<i32>>(0);
1444
1445 if let Some(t) = send(tx, 0) {
1446 ts.push(t);
1447 }
1448 if let Some(t) = recv(rx, 0) {
1449 ts.push(t);
1450 }
1451
1452 fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1453 if i == 10 {
1454 return None;
1455 }
1456
1457 Some(thread::spawn(move || {
1458 tx.send(Box::new(i)).unwrap();
1459 send(tx, i + 1);
1460 }))
1461 }
1462
1463 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1464 if i == 10 {
1465 return None;
1466 }
1467
1468 Some(thread::spawn(move || {
1469 assert!(*rx.recv().unwrap() == i);
1470 recv(rx, i + 1);
1471 }))
1472 }
1473 }
1474 for t in ts {
1475 t.join().unwrap();
1476 }
1477 }
1478
1479 #[test]
1480 fn recv_a_lot() {
David LeGare54fc8482022-03-01 18:58:39 +00001481 #[cfg(miri)]
1482 const N: usize = 100;
1483 #[cfg(not(miri))]
1484 const N: usize = 10000;
1485
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001486 // Regression test that we don't run out of stack in scheduler context
David LeGare54fc8482022-03-01 18:58:39 +00001487 let (tx, rx) = sync_channel(N);
1488 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001489 tx.send(()).unwrap();
1490 }
David LeGare54fc8482022-03-01 18:58:39 +00001491 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001492 rx.recv().unwrap();
1493 }
1494 }
1495
1496 #[test]
1497 fn shared_chan_stress() {
1498 let (tx, rx) = sync_channel(0);
1499 let total = stress_factor() + 100;
1500 let mut ts = Vec::with_capacity(total);
1501 for _ in 0..total {
1502 let tx = tx.clone();
1503 let t = thread::spawn(move || {
1504 tx.send(()).unwrap();
1505 });
1506 ts.push(t);
1507 }
1508
1509 for _ in 0..total {
1510 rx.recv().unwrap();
1511 }
1512 for t in ts {
1513 t.join().unwrap();
1514 }
1515 }
1516
1517 #[test]
1518 fn test_nested_recv_iter() {
1519 let (tx, rx) = sync_channel::<i32>(0);
1520 let (total_tx, total_rx) = sync_channel::<i32>(0);
1521
1522 let t = thread::spawn(move || {
1523 let mut acc = 0;
1524 for x in rx.iter() {
1525 acc += x;
1526 }
1527 total_tx.send(acc).unwrap();
1528 });
1529
1530 tx.send(3).unwrap();
1531 tx.send(1).unwrap();
1532 tx.send(2).unwrap();
1533 drop(tx);
1534 assert_eq!(total_rx.recv().unwrap(), 6);
1535 t.join().unwrap();
1536 }
1537
1538 #[test]
1539 fn test_recv_iter_break() {
1540 let (tx, rx) = sync_channel::<i32>(0);
1541 let (count_tx, count_rx) = sync_channel(0);
1542
1543 let t = thread::spawn(move || {
1544 let mut count = 0;
1545 for x in rx.iter() {
1546 if count >= 3 {
1547 break;
1548 } else {
1549 count += x;
1550 }
1551 }
1552 count_tx.send(count).unwrap();
1553 });
1554
1555 tx.send(2).unwrap();
1556 tx.send(2).unwrap();
1557 tx.send(2).unwrap();
1558 let _ = tx.try_send(2);
1559 drop(tx);
1560 assert_eq!(count_rx.recv().unwrap(), 4);
1561 t.join().unwrap();
1562 }
1563
1564 #[test]
1565 fn try_recv_states() {
1566 let (tx1, rx1) = sync_channel::<i32>(1);
1567 let (tx2, rx2) = sync_channel::<()>(1);
1568 let (tx3, rx3) = sync_channel::<()>(1);
1569 let t = thread::spawn(move || {
1570 rx2.recv().unwrap();
1571 tx1.send(1).unwrap();
1572 tx3.send(()).unwrap();
1573 rx2.recv().unwrap();
1574 drop(tx1);
1575 tx3.send(()).unwrap();
1576 });
1577
1578 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1579 tx2.send(()).unwrap();
1580 rx3.recv().unwrap();
1581 assert_eq!(rx1.try_recv(), Ok(1));
1582 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1583 tx2.send(()).unwrap();
1584 rx3.recv().unwrap();
1585 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1586 t.join().unwrap();
1587 }
1588
1589 // This bug used to end up in a livelock inside of the Receiver destructor
1590 // because the internal state of the Shared packet was corrupted
1591 #[test]
1592 fn destroy_upgraded_shared_port_when_sender_still_active() {
1593 let (tx, rx) = sync_channel::<()>(0);
1594 let (tx2, rx2) = sync_channel::<()>(0);
1595 let t = thread::spawn(move || {
1596 rx.recv().unwrap(); // wait on a oneshot
1597 drop(rx); // destroy a shared
1598 tx2.send(()).unwrap();
1599 });
1600 // make sure the other thread has gone to sleep
1601 for _ in 0..5000 {
1602 thread::yield_now();
1603 }
1604
1605 // upgrade to a shared chan and send a message
1606 let tx2 = tx.clone();
1607 drop(tx);
1608 tx2.send(()).unwrap();
1609
1610 // wait for the child thread to exit before we exit
1611 rx2.recv().unwrap();
1612 t.join().unwrap();
1613 }
1614
1615 #[test]
1616 fn send1() {
1617 let (tx, rx) = sync_channel::<i32>(0);
1618 let t = thread::spawn(move || {
1619 rx.recv().unwrap();
1620 });
1621 assert_eq!(tx.send(1), Ok(()));
1622 t.join().unwrap();
1623 }
1624
1625 #[test]
1626 fn send2() {
1627 let (tx, rx) = sync_channel::<i32>(0);
1628 let t = thread::spawn(move || {
1629 drop(rx);
1630 });
1631 assert!(tx.send(1).is_err());
1632 t.join().unwrap();
1633 }
1634
1635 #[test]
1636 fn send3() {
1637 let (tx, rx) = sync_channel::<i32>(1);
1638 assert_eq!(tx.send(1), Ok(()));
1639 let t = thread::spawn(move || {
1640 drop(rx);
1641 });
1642 assert!(tx.send(1).is_err());
1643 t.join().unwrap();
1644 }
1645
1646 #[test]
1647 fn send4() {
1648 let (tx, rx) = sync_channel::<i32>(0);
1649 let tx2 = tx.clone();
1650 let (done, donerx) = channel();
1651 let done2 = done.clone();
1652 let t = thread::spawn(move || {
1653 assert!(tx.send(1).is_err());
1654 done.send(()).unwrap();
1655 });
1656 let t2 = thread::spawn(move || {
1657 assert!(tx2.send(2).is_err());
1658 done2.send(()).unwrap();
1659 });
1660 drop(rx);
1661 donerx.recv().unwrap();
1662 donerx.recv().unwrap();
1663 t.join().unwrap();
1664 t2.join().unwrap();
1665 }
1666
1667 #[test]
1668 fn try_send1() {
1669 let (tx, _rx) = sync_channel::<i32>(0);
1670 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1671 }
1672
1673 #[test]
1674 fn try_send2() {
1675 let (tx, _rx) = sync_channel::<i32>(1);
1676 assert_eq!(tx.try_send(1), Ok(()));
1677 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1678 }
1679
1680 #[test]
1681 fn try_send3() {
1682 let (tx, rx) = sync_channel::<i32>(1);
1683 assert_eq!(tx.try_send(1), Ok(()));
1684 drop(rx);
1685 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1686 }
1687
1688 #[test]
1689 fn issue_15761() {
1690 fn repro() {
1691 let (tx1, rx1) = sync_channel::<()>(3);
1692 let (tx2, rx2) = sync_channel::<()>(3);
1693
1694 let _t = thread::spawn(move || {
1695 rx1.recv().unwrap();
1696 tx2.try_send(()).unwrap();
1697 });
1698
1699 tx1.try_send(()).unwrap();
1700 rx2.recv().unwrap();
1701 }
1702
1703 for _ in 0..100 {
1704 repro()
1705 }
1706 }
1707}
1708
1709// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1710mod select_tests {
1711 use super::*;
1712
1713 use std::thread;
1714
1715 #[test]
1716 fn smoke() {
1717 let (tx1, rx1) = channel::<i32>();
1718 let (tx2, rx2) = channel::<i32>();
1719 tx1.send(1).unwrap();
1720 select! {
1721 foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1722 _bar = rx2.recv() => panic!()
1723 }
1724 tx2.send(2).unwrap();
1725 select! {
1726 _foo = rx1.recv() => panic!(),
1727 bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1728 }
1729 drop(tx1);
1730 select! {
1731 foo = rx1.recv() => assert!(foo.is_err()),
1732 _bar = rx2.recv() => panic!()
1733 }
1734 drop(tx2);
1735 select! {
1736 bar = rx2.recv() => assert!(bar.is_err())
1737 }
1738 }
1739
1740 #[test]
1741 fn smoke2() {
1742 let (_tx1, rx1) = channel::<i32>();
1743 let (_tx2, rx2) = channel::<i32>();
1744 let (_tx3, rx3) = channel::<i32>();
1745 let (_tx4, rx4) = channel::<i32>();
1746 let (tx5, rx5) = channel::<i32>();
1747 tx5.send(4).unwrap();
1748 select! {
1749 _foo = rx1.recv() => panic!("1"),
1750 _foo = rx2.recv() => panic!("2"),
1751 _foo = rx3.recv() => panic!("3"),
1752 _foo = rx4.recv() => panic!("4"),
1753 foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1754 }
1755 }
1756
1757 #[test]
1758 fn closed() {
1759 let (_tx1, rx1) = channel::<i32>();
1760 let (tx2, rx2) = channel::<i32>();
1761 drop(tx2);
1762
1763 select! {
1764 _a1 = rx1.recv() => panic!(),
1765 a2 = rx2.recv() => assert!(a2.is_err())
1766 }
1767 }
1768
1769 #[test]
1770 fn unblocks() {
1771 let (tx1, rx1) = channel::<i32>();
1772 let (_tx2, rx2) = channel::<i32>();
1773 let (tx3, rx3) = channel::<i32>();
1774
1775 let t = thread::spawn(move || {
1776 for _ in 0..20 {
1777 thread::yield_now();
1778 }
1779 tx1.send(1).unwrap();
1780 rx3.recv().unwrap();
1781 for _ in 0..20 {
1782 thread::yield_now();
1783 }
1784 });
1785
1786 select! {
1787 a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1788 _b = rx2.recv() => panic!()
1789 }
1790 tx3.send(1).unwrap();
1791 select! {
1792 a = rx1.recv() => assert!(a.is_err()),
1793 _b = rx2.recv() => panic!()
1794 }
1795 t.join().unwrap();
1796 }
1797
1798 #[test]
1799 fn both_ready() {
1800 let (tx1, rx1) = channel::<i32>();
1801 let (tx2, rx2) = channel::<i32>();
1802 let (tx3, rx3) = channel::<()>();
1803
1804 let t = thread::spawn(move || {
1805 for _ in 0..20 {
1806 thread::yield_now();
1807 }
1808 tx1.send(1).unwrap();
1809 tx2.send(2).unwrap();
1810 rx3.recv().unwrap();
1811 });
1812
1813 select! {
1814 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1815 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1816 }
1817 select! {
1818 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1819 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1820 }
1821 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1822 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1823 tx3.send(()).unwrap();
1824 t.join().unwrap();
1825 }
1826
1827 #[test]
1828 fn stress() {
David LeGare54fc8482022-03-01 18:58:39 +00001829 #[cfg(miri)]
1830 const AMT: i32 = 100;
1831 #[cfg(not(miri))]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001832 const AMT: i32 = 10000;
David LeGare54fc8482022-03-01 18:58:39 +00001833
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001834 let (tx1, rx1) = channel::<i32>();
1835 let (tx2, rx2) = channel::<i32>();
1836 let (tx3, rx3) = channel::<()>();
1837
1838 let t = thread::spawn(move || {
1839 for i in 0..AMT {
1840 if i % 2 == 0 {
1841 tx1.send(i).unwrap();
1842 } else {
1843 tx2.send(i).unwrap();
1844 }
1845 rx3.recv().unwrap();
1846 }
1847 });
1848
1849 for i in 0..AMT {
1850 select! {
1851 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1852 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1853 }
1854 tx3.send(()).unwrap();
1855 }
1856 t.join().unwrap();
1857 }
1858
1859 #[allow(unused_must_use)]
1860 #[test]
1861 fn cloning() {
1862 let (tx1, rx1) = channel::<i32>();
1863 let (_tx2, rx2) = channel::<i32>();
1864 let (tx3, rx3) = channel::<()>();
1865
1866 let t = thread::spawn(move || {
1867 rx3.recv().unwrap();
1868 tx1.clone();
1869 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1870 tx1.send(2).unwrap();
1871 rx3.recv().unwrap();
1872 });
1873
1874 tx3.send(()).unwrap();
1875 select! {
1876 _i1 = rx1.recv() => {},
1877 _i2 = rx2.recv() => panic!()
1878 }
1879 tx3.send(()).unwrap();
1880 t.join().unwrap();
1881 }
1882
1883 #[allow(unused_must_use)]
1884 #[test]
1885 fn cloning2() {
1886 let (tx1, rx1) = channel::<i32>();
1887 let (_tx2, rx2) = channel::<i32>();
1888 let (tx3, rx3) = channel::<()>();
1889
1890 let t = thread::spawn(move || {
1891 rx3.recv().unwrap();
1892 tx1.clone();
1893 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1894 tx1.send(2).unwrap();
1895 rx3.recv().unwrap();
1896 });
1897
1898 tx3.send(()).unwrap();
1899 select! {
1900 _i1 = rx1.recv() => {},
1901 _i2 = rx2.recv() => panic!()
1902 }
1903 tx3.send(()).unwrap();
1904 t.join().unwrap();
1905 }
1906
1907 #[test]
1908 fn cloning3() {
1909 let (tx1, rx1) = channel::<()>();
1910 let (tx2, rx2) = channel::<()>();
1911 let (tx3, rx3) = channel::<()>();
1912 let t = thread::spawn(move || {
1913 select! {
1914 _ = rx1.recv() => panic!(),
1915 _ = rx2.recv() => {}
1916 }
1917 tx3.send(()).unwrap();
1918 });
1919
1920 for _ in 0..1000 {
1921 thread::yield_now();
1922 }
1923 drop(tx1.clone());
1924 tx2.send(()).unwrap();
1925 rx3.recv().unwrap();
1926 t.join().unwrap();
1927 }
1928
1929 #[test]
1930 fn preflight1() {
1931 let (tx, rx) = channel();
1932 tx.send(()).unwrap();
1933 select! {
1934 _n = rx.recv() => {}
1935 }
1936 }
1937
1938 #[test]
1939 fn preflight2() {
1940 let (tx, rx) = channel();
1941 tx.send(()).unwrap();
1942 tx.send(()).unwrap();
1943 select! {
1944 _n = rx.recv() => {}
1945 }
1946 }
1947
1948 #[test]
1949 fn preflight3() {
1950 let (tx, rx) = channel();
1951 drop(tx.clone());
1952 tx.send(()).unwrap();
1953 select! {
1954 _n = rx.recv() => {}
1955 }
1956 }
1957
1958 #[test]
1959 fn preflight4() {
1960 let (tx, rx) = channel();
1961 tx.send(()).unwrap();
1962 select! {
1963 _ = rx.recv() => {}
1964 }
1965 }
1966
1967 #[test]
1968 fn preflight5() {
1969 let (tx, rx) = channel();
1970 tx.send(()).unwrap();
1971 tx.send(()).unwrap();
1972 select! {
1973 _ = rx.recv() => {}
1974 }
1975 }
1976
1977 #[test]
1978 fn preflight6() {
1979 let (tx, rx) = channel();
1980 drop(tx.clone());
1981 tx.send(()).unwrap();
1982 select! {
1983 _ = rx.recv() => {}
1984 }
1985 }
1986
1987 #[test]
1988 fn preflight7() {
1989 let (tx, rx) = channel::<()>();
1990 drop(tx);
1991 select! {
1992 _ = rx.recv() => {}
1993 }
1994 }
1995
1996 #[test]
1997 fn preflight8() {
1998 let (tx, rx) = channel();
1999 tx.send(()).unwrap();
2000 drop(tx);
2001 rx.recv().unwrap();
2002 select! {
2003 _ = rx.recv() => {}
2004 }
2005 }
2006
2007 #[test]
2008 fn preflight9() {
2009 let (tx, rx) = channel();
2010 drop(tx.clone());
2011 tx.send(()).unwrap();
2012 drop(tx);
2013 rx.recv().unwrap();
2014 select! {
2015 _ = rx.recv() => {}
2016 }
2017 }
2018
2019 #[test]
2020 fn oneshot_data_waiting() {
2021 let (tx1, rx1) = channel();
2022 let (tx2, rx2) = channel();
2023 let t = thread::spawn(move || {
2024 select! {
2025 _n = rx1.recv() => {}
2026 }
2027 tx2.send(()).unwrap();
2028 });
2029
2030 for _ in 0..100 {
2031 thread::yield_now()
2032 }
2033 tx1.send(()).unwrap();
2034 rx2.recv().unwrap();
2035 t.join().unwrap();
2036 }
2037
2038 #[test]
2039 fn stream_data_waiting() {
2040 let (tx1, rx1) = channel();
2041 let (tx2, rx2) = channel();
2042 tx1.send(()).unwrap();
2043 tx1.send(()).unwrap();
2044 rx1.recv().unwrap();
2045 rx1.recv().unwrap();
2046 let t = thread::spawn(move || {
2047 select! {
2048 _n = rx1.recv() => {}
2049 }
2050 tx2.send(()).unwrap();
2051 });
2052
2053 for _ in 0..100 {
2054 thread::yield_now()
2055 }
2056 tx1.send(()).unwrap();
2057 rx2.recv().unwrap();
2058 t.join().unwrap();
2059 }
2060
2061 #[test]
2062 fn shared_data_waiting() {
2063 let (tx1, rx1) = channel();
2064 let (tx2, rx2) = channel();
2065 drop(tx1.clone());
2066 tx1.send(()).unwrap();
2067 rx1.recv().unwrap();
2068 let t = thread::spawn(move || {
2069 select! {
2070 _n = rx1.recv() => {}
2071 }
2072 tx2.send(()).unwrap();
2073 });
2074
2075 for _ in 0..100 {
2076 thread::yield_now()
2077 }
2078 tx1.send(()).unwrap();
2079 rx2.recv().unwrap();
2080 t.join().unwrap();
2081 }
2082
2083 #[test]
2084 fn sync1() {
2085 let (tx, rx) = sync_channel::<i32>(1);
2086 tx.send(1).unwrap();
2087 select! {
2088 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2089 }
2090 }
2091
2092 #[test]
2093 fn sync2() {
2094 let (tx, rx) = sync_channel::<i32>(0);
2095 let t = thread::spawn(move || {
2096 for _ in 0..100 {
2097 thread::yield_now()
2098 }
2099 tx.send(1).unwrap();
2100 });
2101 select! {
2102 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2103 }
2104 t.join().unwrap();
2105 }
2106
2107 #[test]
2108 fn sync3() {
2109 let (tx1, rx1) = sync_channel::<i32>(0);
2110 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2111 let t = thread::spawn(move || {
2112 tx1.send(1).unwrap();
2113 });
2114 let t2 = thread::spawn(move || {
2115 tx2.send(2).unwrap();
2116 });
2117 select! {
2118 n = rx1.recv() => {
2119 let n = n.unwrap();
2120 assert_eq!(n, 1);
2121 assert_eq!(rx2.recv().unwrap(), 2);
2122 },
2123 n = rx2.recv() => {
2124 let n = n.unwrap();
2125 assert_eq!(n, 2);
2126 assert_eq!(rx1.recv().unwrap(), 1);
2127 }
2128 }
2129 t.join().unwrap();
2130 t2.join().unwrap();
2131 }
2132}