Jason Macnak | 09c3688 | 2020-04-01 16:22:56 +0000 | [diff] [blame] | 1 | #![feature(test)] |
| 2 | |
| 3 | extern crate test; |
| 4 | use crate::test::Bencher; |
| 5 | |
| 6 | use { |
| 7 | futures::{ |
| 8 | channel::mpsc::{self, Sender, UnboundedSender}, |
| 9 | ready, |
| 10 | stream::{Stream, StreamExt}, |
| 11 | sink::Sink, |
| 12 | task::{Context, Poll}, |
| 13 | }, |
| 14 | futures_test::task::noop_context, |
| 15 | std::pin::Pin, |
| 16 | }; |
| 17 | |
| 18 | /// Single producer, single consumer |
| 19 | #[bench] |
| 20 | fn unbounded_1_tx(b: &mut Bencher) { |
| 21 | let mut cx = noop_context(); |
| 22 | b.iter(|| { |
| 23 | let (tx, mut rx) = mpsc::unbounded(); |
| 24 | |
| 25 | // 1000 iterations to avoid measuring overhead of initialization |
| 26 | // Result should be divided by 1000 |
| 27 | for i in 0..1000 { |
| 28 | |
| 29 | // Poll, not ready, park |
| 30 | assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); |
| 31 | |
| 32 | UnboundedSender::unbounded_send(&tx, i).unwrap(); |
| 33 | |
| 34 | // Now poll ready |
| 35 | assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); |
| 36 | } |
| 37 | }) |
| 38 | } |
| 39 | |
| 40 | /// 100 producers, single consumer |
| 41 | #[bench] |
| 42 | fn unbounded_100_tx(b: &mut Bencher) { |
| 43 | let mut cx = noop_context(); |
| 44 | b.iter(|| { |
| 45 | let (tx, mut rx) = mpsc::unbounded(); |
| 46 | |
| 47 | let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect(); |
| 48 | |
| 49 | // 1000 send/recv operations total, result should be divided by 1000 |
| 50 | for _ in 0..10 { |
| 51 | for (i, x) in tx.iter().enumerate() { |
| 52 | assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); |
| 53 | |
| 54 | UnboundedSender::unbounded_send(x, i).unwrap(); |
| 55 | |
| 56 | assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); |
| 57 | } |
| 58 | } |
| 59 | }) |
| 60 | } |
| 61 | |
| 62 | #[bench] |
| 63 | fn unbounded_uncontended(b: &mut Bencher) { |
| 64 | let mut cx = noop_context(); |
| 65 | b.iter(|| { |
| 66 | let (tx, mut rx) = mpsc::unbounded(); |
| 67 | |
| 68 | for i in 0..1000 { |
| 69 | UnboundedSender::unbounded_send(&tx, i).expect("send"); |
| 70 | // No need to create a task, because poll is not going to park. |
| 71 | assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); |
| 72 | } |
| 73 | }) |
| 74 | } |
| 75 | |
| 76 | |
| 77 | /// A Stream that continuously sends incrementing number of the queue |
| 78 | struct TestSender { |
| 79 | tx: Sender<u32>, |
| 80 | last: u32, // Last number sent |
| 81 | } |
| 82 | |
| 83 | // Could be a Future, it doesn't matter |
| 84 | impl Stream for TestSender { |
| 85 | type Item = u32; |
| 86 | |
| 87 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) |
| 88 | -> Poll<Option<Self::Item>> |
| 89 | { |
| 90 | let this = &mut *self; |
| 91 | let mut tx = Pin::new(&mut this.tx); |
| 92 | |
| 93 | ready!(tx.as_mut().poll_ready(cx)).unwrap(); |
| 94 | tx.as_mut().start_send(this.last + 1).unwrap(); |
| 95 | this.last += 1; |
| 96 | assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx)); |
| 97 | Poll::Ready(Some(this.last)) |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | /// Single producers, single consumer |
| 102 | #[bench] |
| 103 | fn bounded_1_tx(b: &mut Bencher) { |
| 104 | let mut cx = noop_context(); |
| 105 | b.iter(|| { |
| 106 | let (tx, mut rx) = mpsc::channel(0); |
| 107 | |
| 108 | let mut tx = TestSender { tx, last: 0 }; |
| 109 | |
| 110 | for i in 0..1000 { |
| 111 | assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx)); |
| 112 | assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx)); |
| 113 | assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); |
| 114 | } |
| 115 | }) |
| 116 | } |
| 117 | |
| 118 | /// 100 producers, single consumer |
| 119 | #[bench] |
| 120 | fn bounded_100_tx(b: &mut Bencher) { |
| 121 | let mut cx = noop_context(); |
| 122 | b.iter(|| { |
| 123 | // Each sender can send one item after specified capacity |
| 124 | let (tx, mut rx) = mpsc::channel(0); |
| 125 | |
| 126 | let mut tx: Vec<_> = (0..100).map(|_| { |
| 127 | TestSender { |
| 128 | tx: tx.clone(), |
| 129 | last: 0 |
| 130 | } |
| 131 | }).collect(); |
| 132 | |
| 133 | for i in 0..10 { |
| 134 | for x in &mut tx { |
| 135 | // Send an item |
| 136 | assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx)); |
| 137 | // Then block |
| 138 | assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx)); |
| 139 | // Recv the item |
| 140 | assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); |
| 141 | } |
| 142 | } |
| 143 | }) |
| 144 | } |