blob: e22fe60666599c50d9dd5e68b9c56e9affb4431a [file] [log] [blame]
Jason Macnak09c36882020-04-01 16:22:56 +00001#![feature(test)]
2
3extern crate test;
4use crate::test::Bencher;
5
6use {
7 futures::{
8 channel::mpsc::{self, Sender, UnboundedSender},
9 ready,
10 stream::{Stream, StreamExt},
11 sink::Sink,
12 task::{Context, Poll},
13 },
14 futures_test::task::noop_context,
15 std::pin::Pin,
16};
17
18/// Single producer, single consumer
19#[bench]
20fn unbounded_1_tx(b: &mut Bencher) {
21 let mut cx = noop_context();
22 b.iter(|| {
23 let (tx, mut rx) = mpsc::unbounded();
24
25 // 1000 iterations to avoid measuring overhead of initialization
26 // Result should be divided by 1000
27 for i in 0..1000 {
28
29 // Poll, not ready, park
30 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
31
32 UnboundedSender::unbounded_send(&tx, i).unwrap();
33
34 // Now poll ready
35 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
36 }
37 })
38}
39
40/// 100 producers, single consumer
41#[bench]
42fn unbounded_100_tx(b: &mut Bencher) {
43 let mut cx = noop_context();
44 b.iter(|| {
45 let (tx, mut rx) = mpsc::unbounded();
46
47 let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
48
49 // 1000 send/recv operations total, result should be divided by 1000
50 for _ in 0..10 {
51 for (i, x) in tx.iter().enumerate() {
52 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
53
54 UnboundedSender::unbounded_send(x, i).unwrap();
55
56 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
57 }
58 }
59 })
60}
61
62#[bench]
63fn unbounded_uncontended(b: &mut Bencher) {
64 let mut cx = noop_context();
65 b.iter(|| {
66 let (tx, mut rx) = mpsc::unbounded();
67
68 for i in 0..1000 {
69 UnboundedSender::unbounded_send(&tx, i).expect("send");
70 // No need to create a task, because poll is not going to park.
71 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
72 }
73 })
74}
75
76
77/// A Stream that continuously sends incrementing number of the queue
78struct TestSender {
79 tx: Sender<u32>,
80 last: u32, // Last number sent
81}
82
83// Could be a Future, it doesn't matter
84impl Stream for TestSender {
85 type Item = u32;
86
87 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
88 -> Poll<Option<Self::Item>>
89 {
90 let this = &mut *self;
91 let mut tx = Pin::new(&mut this.tx);
92
93 ready!(tx.as_mut().poll_ready(cx)).unwrap();
94 tx.as_mut().start_send(this.last + 1).unwrap();
95 this.last += 1;
96 assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
97 Poll::Ready(Some(this.last))
98 }
99}
100
101/// Single producers, single consumer
102#[bench]
103fn bounded_1_tx(b: &mut Bencher) {
104 let mut cx = noop_context();
105 b.iter(|| {
106 let (tx, mut rx) = mpsc::channel(0);
107
108 let mut tx = TestSender { tx, last: 0 };
109
110 for i in 0..1000 {
111 assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
112 assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
113 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
114 }
115 })
116}
117
118/// 100 producers, single consumer
119#[bench]
120fn bounded_100_tx(b: &mut Bencher) {
121 let mut cx = noop_context();
122 b.iter(|| {
123 // Each sender can send one item after specified capacity
124 let (tx, mut rx) = mpsc::channel(0);
125
126 let mut tx: Vec<_> = (0..100).map(|_| {
127 TestSender {
128 tx: tx.clone(),
129 last: 0
130 }
131 }).collect();
132
133 for i in 0..10 {
134 for x in &mut tx {
135 // Send an item
136 assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
137 // Then block
138 assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
139 // Recv the item
140 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
141 }
142 }
143 })
144}