blob: a22d039e4023cc01d8d12b6397742552ee08da4e [file] [log] [blame]
Jason Macnak09c36882020-04-01 16:22:56 +00001use futures::channel::oneshot::{self, Sender};
2use futures::executor::block_on;
Haibo Huang0cf3d2c2020-05-08 19:24:52 -07003use futures::future::{FutureExt, poll_fn};
Jason Macnak09c36882020-04-01 16:22:56 +00004use futures::task::{Context, Poll};
5use futures_test::task::panic_waker_ref;
Jason Macnak09c36882020-04-01 16:22:56 +00006use std::sync::mpsc;
7use std::thread;
8
9#[test]
10fn smoke_poll() {
11 let (mut tx, rx) = oneshot::channel::<u32>();
12 let mut rx = Some(rx);
13 let f = poll_fn(|cx| {
14 assert!(tx.poll_canceled(cx).is_pending());
15 assert!(tx.poll_canceled(cx).is_pending());
16 drop(rx.take());
17 assert!(tx.poll_canceled(cx).is_ready());
18 assert!(tx.poll_canceled(cx).is_ready());
19 Poll::Ready(())
20 });
21
22 block_on(f);
23}
24
25#[test]
26fn cancel_notifies() {
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070027 let (mut tx, rx) = oneshot::channel::<u32>();
Jason Macnak09c36882020-04-01 16:22:56 +000028
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070029 let t = thread::spawn(move || {
30 block_on(tx.cancellation());
Jason Macnak09c36882020-04-01 16:22:56 +000031 });
32 drop(rx);
33 t.join().unwrap();
34}
35
Jason Macnak09c36882020-04-01 16:22:56 +000036#[test]
37fn cancel_lots() {
38 let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
39 let t = thread::spawn(move || {
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070040 for (mut tx, tx2) in rx {
41 block_on(tx.cancellation());
Jason Macnak09c36882020-04-01 16:22:56 +000042 tx2.send(()).unwrap();
43 }
44 });
45
46 for _ in 0..20000 {
47 let (otx, orx) = oneshot::channel::<u32>();
48 let (tx2, rx2) = mpsc::channel();
49 tx.send((otx, tx2)).unwrap();
50 drop(orx);
51 rx2.recv().unwrap();
52 }
53 drop(tx);
54
55 t.join().unwrap();
56}
57
58#[test]
59fn cancel_after_sender_drop_doesnt_notify() {
60 let (mut tx, rx) = oneshot::channel::<u32>();
61 let mut cx = Context::from_waker(panic_waker_ref());
62 assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
63 drop(tx);
64 drop(rx);
65}
66
67#[test]
68fn close() {
69 let (mut tx, mut rx) = oneshot::channel::<u32>();
70 rx.close();
71 block_on(poll_fn(|cx| {
72 match rx.poll_unpin(cx) {
73 Poll::Ready(Err(_)) => {},
74 _ => panic!(),
75 };
76 assert!(tx.poll_canceled(cx).is_ready());
77 Poll::Ready(())
78 }));
79}
80
81#[test]
82fn close_wakes() {
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070083 let (mut tx, mut rx) = oneshot::channel::<u32>();
Jason Macnak09c36882020-04-01 16:22:56 +000084 let (tx2, rx2) = mpsc::channel();
85 let t = thread::spawn(move || {
86 rx.close();
87 rx2.recv().unwrap();
88 });
Haibo Huang0cf3d2c2020-05-08 19:24:52 -070089 block_on(tx.cancellation());
Jason Macnak09c36882020-04-01 16:22:56 +000090 tx2.send(()).unwrap();
91 t.join().unwrap();
92}
93
94#[test]
95fn is_canceled() {
96 let (tx, rx) = oneshot::channel::<u32>();
97 assert!(!tx.is_canceled());
98 drop(rx);
99 assert!(tx.is_canceled());
100}
101
102#[test]
103fn cancel_sends() {
104 let (tx, rx) = mpsc::channel::<Sender<_>>();
105 let t = thread::spawn(move || {
106 for otx in rx {
107 let _ = otx.send(42);
108 }
109 });
110
111 for _ in 0..20000 {
112 let (otx, mut orx) = oneshot::channel::<u32>();
113 tx.send(otx).unwrap();
114
115 orx.close();
116 let _ = block_on(orx);
117 }
118
119 drop(tx);
120 t.join().unwrap();
121}
122
123// #[test]
124// fn spawn_sends_items() {
125// let core = local_executor::Core::new();
126// let future = ok::<_, ()>(1);
127// let rx = spawn(future, &core);
128// assert_eq!(core.run(rx).unwrap(), 1);
129// }
130//
131// #[test]
132// fn spawn_kill_dead_stream() {
133// use std::thread;
134// use std::time::Duration;
135// use futures::future::Either;
136// use futures::sync::oneshot;
137//
138// // a future which never returns anything (forever accepting incoming
139// // connections), but dropping it leads to observable side effects
140// // (like closing listening sockets, releasing limited resources,
141// // ...)
142// #[derive(Debug)]
143// struct Dead {
144// // when dropped you should get Err(oneshot::Canceled) on the
145// // receiving end
146// done: oneshot::Sender<()>,
147// }
148// impl Future for Dead {
149// type Item = ();
150// type Error = ();
151//
152// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
153// Ok(Poll::Pending)
154// }
155// }
156//
157// // need to implement a timeout for the test, as it would hang
158// // forever right now
159// let (timeout_tx, timeout_rx) = oneshot::channel();
160// thread::spawn(move || {
161// thread::sleep(Duration::from_millis(1000));
162// let _ = timeout_tx.send(());
163// });
164//
165// let core = local_executor::Core::new();
166// let (done_tx, done_rx) = oneshot::channel();
167// let future = Dead{done: done_tx};
168// let rx = spawn(future, &core);
169// let res = core.run(
170// Ok::<_, ()>(())
171// .into_future()
172// .then(move |_| {
173// // now drop the spawned future: maybe some timeout exceeded,
174// // or some connection on this end was closed by the remote
175// // end.
176// drop(rx);
177// // and wait for the spawned future to release its resources
178// done_rx
179// })
180// .select2(timeout_rx)
181// );
182// match res {
183// Err(Either::A((oneshot::Canceled, _))) => (),
184// Ok(Either::B(((), _))) => {
185// panic!("dead future wasn't canceled (timeout)");
186// },
187// _ => {
188// panic!("dead future wasn't canceled (unexpected result)");
189// },
190// }
191// }
192//
193// #[test]
194// fn spawn_dont_kill_forgot_dead_stream() {
195// use std::thread;
196// use std::time::Duration;
197// use futures::future::Either;
198// use futures::sync::oneshot;
199//
200// // a future which never returns anything (forever accepting incoming
201// // connections), but dropping it leads to observable side effects
202// // (like closing listening sockets, releasing limited resources,
203// // ...)
204// #[derive(Debug)]
205// struct Dead {
206// // when dropped you should get Err(oneshot::Canceled) on the
207// // receiving end
208// done: oneshot::Sender<()>,
209// }
210// impl Future for Dead {
211// type Item = ();
212// type Error = ();
213//
214// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
215// Ok(Poll::Pending)
216// }
217// }
218//
219// // need to implement a timeout for the test, as it would hang
220// // forever right now
221// let (timeout_tx, timeout_rx) = oneshot::channel();
222// thread::spawn(move || {
223// thread::sleep(Duration::from_millis(1000));
224// let _ = timeout_tx.send(());
225// });
226//
227// let core = local_executor::Core::new();
228// let (done_tx, done_rx) = oneshot::channel();
229// let future = Dead{done: done_tx};
230// let rx = spawn(future, &core);
231// let res = core.run(
232// Ok::<_, ()>(())
233// .into_future()
234// .then(move |_| {
235// // forget the spawned future: should keep running, i.e. hit
236// // the timeout below.
237// rx.forget();
238// // and wait for the spawned future to release its resources
239// done_rx
240// })
241// .select2(timeout_rx)
242// );
243// match res {
244// Err(Either::A((oneshot::Canceled, _))) => {
245// panic!("forgotten dead future was canceled");
246// },
247// Ok(Either::B(((), _))) => (), // reached timeout
248// _ => {
249// panic!("forgotten dead future was canceled (unexpected result)");
250// },
251// }
252// }