blob: c9f550897335b45bc3f39b5827c446a8ccdc662a [file] [log] [blame]
Jason Macnak09c36882020-04-01 16:22:56 +00001use futures::channel::oneshot::{self, Sender};
2use futures::executor::block_on;
Joel Galensoncc0890a2021-05-19 15:09:47 -07003use futures::future::{poll_fn, FutureExt};
Jason Macnak09c36882020-04-01 16:22:56 +00004use futures::task::{Context, Poll};
5use futures_test::task::panic_waker_ref;
Jason Macnak09c36882020-04-01 16:22:56 +00006use std::sync::mpsc;
7use std::thread;
8
9#[test]
10fn smoke_poll() {
11 let (mut tx, rx) = oneshot::channel::<u32>();
12 let mut rx = Some(rx);
13 let f = poll_fn(|cx| {
14 assert!(tx.poll_canceled(cx).is_pending());
15 assert!(tx.poll_canceled(cx).is_pending());
16 drop(rx.take());
17 assert!(tx.poll_canceled(cx).is_ready());
18 assert!(tx.poll_canceled(cx).is_ready());
19 Poll::Ready(())
20 });
21
22 block_on(f);
23}
24
25#[test]
26fn cancel_notifies() {
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070027 let (mut tx, rx) = oneshot::channel::<u32>();
Jason Macnak09c36882020-04-01 16:22:56 +000028
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070029 let t = thread::spawn(move || {
30 block_on(tx.cancellation());
Jason Macnak09c36882020-04-01 16:22:56 +000031 });
32 drop(rx);
33 t.join().unwrap();
34}
35
Jason Macnak09c36882020-04-01 16:22:56 +000036#[test]
37fn cancel_lots() {
David LeGaref7dc9c12022-03-02 00:20:44 +000038 #[cfg(miri)]
39 const N: usize = 100;
40 #[cfg(not(miri))]
41 const N: usize = 20000;
42
Jason Macnak09c36882020-04-01 16:22:56 +000043 let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
44 let t = thread::spawn(move || {
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070045 for (mut tx, tx2) in rx {
46 block_on(tx.cancellation());
Jason Macnak09c36882020-04-01 16:22:56 +000047 tx2.send(()).unwrap();
48 }
49 });
50
David LeGaref7dc9c12022-03-02 00:20:44 +000051 for _ in 0..N {
Jason Macnak09c36882020-04-01 16:22:56 +000052 let (otx, orx) = oneshot::channel::<u32>();
53 let (tx2, rx2) = mpsc::channel();
54 tx.send((otx, tx2)).unwrap();
55 drop(orx);
56 rx2.recv().unwrap();
57 }
58 drop(tx);
59
60 t.join().unwrap();
61}
62
63#[test]
64fn cancel_after_sender_drop_doesnt_notify() {
65 let (mut tx, rx) = oneshot::channel::<u32>();
66 let mut cx = Context::from_waker(panic_waker_ref());
67 assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
68 drop(tx);
69 drop(rx);
70}
71
72#[test]
73fn close() {
74 let (mut tx, mut rx) = oneshot::channel::<u32>();
75 rx.close();
76 block_on(poll_fn(|cx| {
77 match rx.poll_unpin(cx) {
Joel Galensoncc0890a2021-05-19 15:09:47 -070078 Poll::Ready(Err(_)) => {}
Jason Macnak09c36882020-04-01 16:22:56 +000079 _ => panic!(),
80 };
81 assert!(tx.poll_canceled(cx).is_ready());
82 Poll::Ready(())
83 }));
84}
85
86#[test]
87fn close_wakes() {
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070088 let (mut tx, mut rx) = oneshot::channel::<u32>();
Jason Macnak09c36882020-04-01 16:22:56 +000089 let (tx2, rx2) = mpsc::channel();
90 let t = thread::spawn(move || {
91 rx.close();
92 rx2.recv().unwrap();
93 });
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070094 block_on(tx.cancellation());
Jason Macnak09c36882020-04-01 16:22:56 +000095 tx2.send(()).unwrap();
96 t.join().unwrap();
97}
98
99#[test]
100fn is_canceled() {
101 let (tx, rx) = oneshot::channel::<u32>();
102 assert!(!tx.is_canceled());
103 drop(rx);
104 assert!(tx.is_canceled());
105}
106
107#[test]
108fn cancel_sends() {
David LeGaref7dc9c12022-03-02 00:20:44 +0000109 #[cfg(miri)]
110 const N: usize = 100;
111 #[cfg(not(miri))]
112 const N: usize = 20000;
113
Jason Macnak09c36882020-04-01 16:22:56 +0000114 let (tx, rx) = mpsc::channel::<Sender<_>>();
115 let t = thread::spawn(move || {
116 for otx in rx {
117 let _ = otx.send(42);
118 }
119 });
120
David LeGaref7dc9c12022-03-02 00:20:44 +0000121 for _ in 0..N {
Jason Macnak09c36882020-04-01 16:22:56 +0000122 let (otx, mut orx) = oneshot::channel::<u32>();
123 tx.send(otx).unwrap();
124
125 orx.close();
126 let _ = block_on(orx);
127 }
128
129 drop(tx);
130 t.join().unwrap();
131}
132
133// #[test]
134// fn spawn_sends_items() {
135// let core = local_executor::Core::new();
136// let future = ok::<_, ()>(1);
137// let rx = spawn(future, &core);
138// assert_eq!(core.run(rx).unwrap(), 1);
139// }
140//
141// #[test]
142// fn spawn_kill_dead_stream() {
143// use std::thread;
144// use std::time::Duration;
145// use futures::future::Either;
146// use futures::sync::oneshot;
147//
148// // a future which never returns anything (forever accepting incoming
149// // connections), but dropping it leads to observable side effects
150// // (like closing listening sockets, releasing limited resources,
151// // ...)
152// #[derive(Debug)]
153// struct Dead {
154// // when dropped you should get Err(oneshot::Canceled) on the
155// // receiving end
156// done: oneshot::Sender<()>,
157// }
158// impl Future for Dead {
159// type Item = ();
160// type Error = ();
161//
162// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
163// Ok(Poll::Pending)
164// }
165// }
166//
167// // need to implement a timeout for the test, as it would hang
168// // forever right now
169// let (timeout_tx, timeout_rx) = oneshot::channel();
170// thread::spawn(move || {
171// thread::sleep(Duration::from_millis(1000));
172// let _ = timeout_tx.send(());
173// });
174//
175// let core = local_executor::Core::new();
176// let (done_tx, done_rx) = oneshot::channel();
177// let future = Dead{done: done_tx};
178// let rx = spawn(future, &core);
179// let res = core.run(
180// Ok::<_, ()>(())
181// .into_future()
182// .then(move |_| {
183// // now drop the spawned future: maybe some timeout exceeded,
184// // or some connection on this end was closed by the remote
185// // end.
186// drop(rx);
187// // and wait for the spawned future to release its resources
188// done_rx
189// })
190// .select2(timeout_rx)
191// );
192// match res {
193// Err(Either::A((oneshot::Canceled, _))) => (),
194// Ok(Either::B(((), _))) => {
195// panic!("dead future wasn't canceled (timeout)");
196// },
197// _ => {
198// panic!("dead future wasn't canceled (unexpected result)");
199// },
200// }
201// }
202//
203// #[test]
204// fn spawn_dont_kill_forgot_dead_stream() {
205// use std::thread;
206// use std::time::Duration;
207// use futures::future::Either;
208// use futures::sync::oneshot;
209//
210// // a future which never returns anything (forever accepting incoming
211// // connections), but dropping it leads to observable side effects
212// // (like closing listening sockets, releasing limited resources,
213// // ...)
214// #[derive(Debug)]
215// struct Dead {
216// // when dropped you should get Err(oneshot::Canceled) on the
217// // receiving end
218// done: oneshot::Sender<()>,
219// }
220// impl Future for Dead {
221// type Item = ();
222// type Error = ();
223//
224// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
225// Ok(Poll::Pending)
226// }
227// }
228//
229// // need to implement a timeout for the test, as it would hang
230// // forever right now
231// let (timeout_tx, timeout_rx) = oneshot::channel();
232// thread::spawn(move || {
233// thread::sleep(Duration::from_millis(1000));
234// let _ = timeout_tx.send(());
235// });
236//
237// let core = local_executor::Core::new();
238// let (done_tx, done_rx) = oneshot::channel();
239// let future = Dead{done: done_tx};
240// let rx = spawn(future, &core);
241// let res = core.run(
242// Ok::<_, ()>(())
243// .into_future()
244// .then(move |_| {
245// // forget the spawned future: should keep running, i.e. hit
246// // the timeout below.
247// rx.forget();
248// // and wait for the spawned future to release its resources
249// done_rx
250// })
251// .select2(timeout_rx)
252// );
253// match res {
254// Err(Either::A((oneshot::Canceled, _))) => {
255// panic!("forgotten dead future was canceled");
256// },
257// Ok(Either::B(((), _))) => (), // reached timeout
258// _ => {
259// panic!("forgotten dead future was canceled (unexpected result)");
260// },
261// }
262// }