blob: 4ec66a45d1a4cda697a50fff1a2c48f3e835450c [file] [log] [blame]
Jeff Vander Stoepbf372732021-02-18 09:39:46 +01001#![cfg(not(loom))]
2
3//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
4//!
5//!
6//! # Overview
7//!
8//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
9//! to handle an arbitrary sequence of read and write operations. This is useful
10//! for writing unit tests for networking services as using an actual network
11//! type is fairly non deterministic.
12//!
13//! # Usage
14//!
15//! Attempting to write data that the mock isn't expecting will result in a
16//! panic.
17//!
18//! [`AsyncRead`]: tokio::io::AsyncRead
19//! [`AsyncWrite`]: tokio::io::AsyncWrite
20
21use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22use tokio::sync::mpsc;
23use tokio::time::{self, Duration, Instant, Sleep};
Elliott Hughes9dcb4412021-04-02 10:43:29 -070024use tokio_stream::wrappers::UnboundedReceiverStream;
Jeff Vander Stoepbf372732021-02-18 09:39:46 +010025
26use futures_core::{ready, Stream};
27use std::collections::VecDeque;
28use std::fmt;
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{self, Poll, Waker};
33use std::{cmp, io};
34
35/// An I/O object that follows a predefined script.
36///
37/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
38/// follows the scenario described by the builder and panics otherwise.
39#[derive(Debug)]
40pub struct Mock {
41 inner: Inner,
42}
43
44/// A handle to send additional actions to the related `Mock`.
45#[derive(Debug)]
46pub struct Handle {
47 tx: mpsc::UnboundedSender<Action>,
48}
49
50/// Builds `Mock` instances.
51#[derive(Debug, Clone, Default)]
52pub struct Builder {
53 // Sequence of actions for the Mock to take
54 actions: VecDeque<Action>,
55}
56
57#[derive(Debug, Clone)]
58enum Action {
59 Read(Vec<u8>),
60 Write(Vec<u8>),
61 Wait(Duration),
62 // Wrapped in Arc so that Builder can be cloned and Send.
63 // Mock is not cloned as does not need to check Rc for ref counts.
64 ReadError(Option<Arc<io::Error>>),
65 WriteError(Option<Arc<io::Error>>),
66}
67
68struct Inner {
69 actions: VecDeque<Action>,
70 waiting: Option<Instant>,
71 sleep: Option<Pin<Box<Sleep>>>,
72 read_wait: Option<Waker>,
Elliott Hughes9dcb4412021-04-02 10:43:29 -070073 rx: UnboundedReceiverStream<Action>,
Jeff Vander Stoepbf372732021-02-18 09:39:46 +010074}
75
76impl Builder {
77 /// Return a new, empty `Builder.
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 /// Sequence a `read` operation.
83 ///
84 /// The next operation in the mock's script will be to expect a `read` call
85 /// and return `buf`.
86 pub fn read(&mut self, buf: &[u8]) -> &mut Self {
87 self.actions.push_back(Action::Read(buf.into()));
88 self
89 }
90
91 /// Sequence a `read` operation that produces an error.
92 ///
93 /// The next operation in the mock's script will be to expect a `read` call
94 /// and return `error`.
95 pub fn read_error(&mut self, error: io::Error) -> &mut Self {
96 let error = Some(error.into());
97 self.actions.push_back(Action::ReadError(error));
98 self
99 }
100
101 /// Sequence a `write` operation.
102 ///
103 /// The next operation in the mock's script will be to expect a `write`
104 /// call.
105 pub fn write(&mut self, buf: &[u8]) -> &mut Self {
106 self.actions.push_back(Action::Write(buf.into()));
107 self
108 }
109
110 /// Sequence a `write` operation that produces an error.
111 ///
112 /// The next operation in the mock's script will be to expect a `write`
113 /// call that provides `error`.
114 pub fn write_error(&mut self, error: io::Error) -> &mut Self {
115 let error = Some(error.into());
116 self.actions.push_back(Action::WriteError(error));
117 self
118 }
119
120 /// Sequence a wait.
121 ///
122 /// The next operation in the mock's script will be to wait without doing so
123 /// for `duration` amount of time.
124 pub fn wait(&mut self, duration: Duration) -> &mut Self {
125 let duration = cmp::max(duration, Duration::from_millis(1));
126 self.actions.push_back(Action::Wait(duration));
127 self
128 }
129
130 /// Build a `Mock` value according to the defined script.
131 pub fn build(&mut self) -> Mock {
132 let (mock, _) = self.build_with_handle();
133 mock
134 }
135
136 /// Build a `Mock` value paired with a handle
137 pub fn build_with_handle(&mut self) -> (Mock, Handle) {
138 let (inner, handle) = Inner::new(self.actions.clone());
139
140 let mock = Mock { inner };
141
142 (mock, handle)
143 }
144}
145
146impl Handle {
147 /// Sequence a `read` operation.
148 ///
149 /// The next operation in the mock's script will be to expect a `read` call
150 /// and return `buf`.
151 pub fn read(&mut self, buf: &[u8]) -> &mut Self {
152 self.tx.send(Action::Read(buf.into())).unwrap();
153 self
154 }
155
156 /// Sequence a `read` operation error.
157 ///
158 /// The next operation in the mock's script will be to expect a `read` call
159 /// and return `error`.
160 pub fn read_error(&mut self, error: io::Error) -> &mut Self {
161 let error = Some(error.into());
162 self.tx.send(Action::ReadError(error)).unwrap();
163 self
164 }
165
166 /// Sequence a `write` operation.
167 ///
168 /// The next operation in the mock's script will be to expect a `write`
169 /// call.
170 pub fn write(&mut self, buf: &[u8]) -> &mut Self {
171 self.tx.send(Action::Write(buf.into())).unwrap();
172 self
173 }
174
175 /// Sequence a `write` operation error.
176 ///
177 /// The next operation in the mock's script will be to expect a `write`
178 /// call error.
179 pub fn write_error(&mut self, error: io::Error) -> &mut Self {
180 let error = Some(error.into());
181 self.tx.send(Action::WriteError(error)).unwrap();
182 self
183 }
184}
185
186impl Inner {
187 fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
Elliott Hughes9dcb4412021-04-02 10:43:29 -0700188 let (tx, rx) = mpsc::unbounded_channel();
Jeff Vander Stoepbf372732021-02-18 09:39:46 +0100189
Elliott Hughes9dcb4412021-04-02 10:43:29 -0700190 let rx = UnboundedReceiverStream::new(rx);
Jeff Vander Stoepbf372732021-02-18 09:39:46 +0100191
192 let inner = Inner {
193 actions,
194 sleep: None,
195 read_wait: None,
196 rx,
197 waiting: None,
198 };
199
200 let handle = Handle { tx };
201
202 (inner, handle)
203 }
204
205 fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
206 Pin::new(&mut self.rx).poll_next(cx)
207 }
208
209 fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
210 match self.action() {
211 Some(&mut Action::Read(ref mut data)) => {
212 // Figure out how much to copy
213 let n = cmp::min(dst.remaining(), data.len());
214
215 // Copy the data into the `dst` slice
216 dst.put_slice(&data[..n]);
217
218 // Drain the data from the source
219 data.drain(..n);
220
221 Ok(())
222 }
223 Some(&mut Action::ReadError(ref mut err)) => {
224 // As the
225 let err = err.take().expect("Should have been removed from actions.");
226 let err = Arc::try_unwrap(err).expect("There are no other references.");
227 Err(err)
228 }
229 Some(_) => {
230 // Either waiting or expecting a write
231 Err(io::ErrorKind::WouldBlock.into())
232 }
233 None => Ok(()),
234 }
235 }
236
237 fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
238 let mut ret = 0;
239
240 if self.actions.is_empty() {
241 return Err(io::ErrorKind::BrokenPipe.into());
242 }
243
244 if let Some(&mut Action::Wait(..)) = self.action() {
245 return Err(io::ErrorKind::WouldBlock.into());
246 }
247
248 if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
249 let err = err.take().expect("Should have been removed from actions.");
250 let err = Arc::try_unwrap(err).expect("There are no other references.");
251 return Err(err);
252 }
253
254 for i in 0..self.actions.len() {
255 match self.actions[i] {
256 Action::Write(ref mut expect) => {
257 let n = cmp::min(src.len(), expect.len());
258
259 assert_eq!(&src[..n], &expect[..n]);
260
261 // Drop data that was matched
262 expect.drain(..n);
263 src = &src[n..];
264
265 ret += n;
266
267 if src.is_empty() {
268 return Ok(ret);
269 }
270 }
271 Action::Wait(..) | Action::WriteError(..) => {
272 break;
273 }
274 _ => {}
275 }
276
277 // TODO: remove write
278 }
279
280 Ok(ret)
281 }
282
283 fn remaining_wait(&mut self) -> Option<Duration> {
284 match self.action() {
285 Some(&mut Action::Wait(dur)) => Some(dur),
286 _ => None,
287 }
288 }
289
290 fn action(&mut self) -> Option<&mut Action> {
291 loop {
292 if self.actions.is_empty() {
293 return None;
294 }
295
296 match self.actions[0] {
297 Action::Read(ref mut data) => {
298 if !data.is_empty() {
299 break;
300 }
301 }
302 Action::Write(ref mut data) => {
303 if !data.is_empty() {
304 break;
305 }
306 }
307 Action::Wait(ref mut dur) => {
308 if let Some(until) = self.waiting {
309 let now = Instant::now();
310
311 if now < until {
312 break;
313 }
314 } else {
315 self.waiting = Some(Instant::now() + *dur);
316 break;
317 }
318 }
319 Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
320 if error.is_some() {
321 break;
322 }
323 }
324 }
325
326 let _action = self.actions.pop_front();
327 }
328
329 self.actions.front_mut()
330 }
331}
332
333// ===== impl Inner =====
334
335impl Mock {
336 fn maybe_wakeup_reader(&mut self) {
337 match self.inner.action() {
338 Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
339 if let Some(waker) = self.inner.read_wait.take() {
340 waker.wake();
341 }
342 }
343 _ => {}
344 }
345 }
346}
347
348impl AsyncRead for Mock {
349 fn poll_read(
350 mut self: Pin<&mut Self>,
351 cx: &mut task::Context<'_>,
352 buf: &mut ReadBuf<'_>,
353 ) -> Poll<io::Result<()>> {
354 loop {
355 if let Some(ref mut sleep) = self.inner.sleep {
356 ready!(Pin::new(sleep).poll(cx));
357 }
358
359 // If a sleep is set, it has already fired
360 self.inner.sleep = None;
361
362 // Capture 'filled' to monitor if it changed
363 let filled = buf.filled().len();
364
365 match self.inner.read(buf) {
366 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
367 if let Some(rem) = self.inner.remaining_wait() {
368 let until = Instant::now() + rem;
369 self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
370 } else {
371 self.inner.read_wait = Some(cx.waker().clone());
372 return Poll::Pending;
373 }
374 }
375 Ok(()) => {
376 if buf.filled().len() == filled {
377 match ready!(self.inner.poll_action(cx)) {
378 Some(action) => {
379 self.inner.actions.push_back(action);
380 continue;
381 }
382 None => {
383 return Poll::Ready(Ok(()));
384 }
385 }
386 } else {
387 return Poll::Ready(Ok(()));
388 }
389 }
390 Err(e) => return Poll::Ready(Err(e)),
391 }
392 }
393 }
394}
395
396impl AsyncWrite for Mock {
397 fn poll_write(
398 mut self: Pin<&mut Self>,
399 cx: &mut task::Context<'_>,
400 buf: &[u8],
401 ) -> Poll<io::Result<usize>> {
402 loop {
403 if let Some(ref mut sleep) = self.inner.sleep {
404 ready!(Pin::new(sleep).poll(cx));
405 }
406
407 // If a sleep is set, it has already fired
408 self.inner.sleep = None;
409
410 match self.inner.write(buf) {
411 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
412 if let Some(rem) = self.inner.remaining_wait() {
413 let until = Instant::now() + rem;
414 self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
415 } else {
416 panic!("unexpected WouldBlock");
417 }
418 }
419 Ok(0) => {
420 // TODO: Is this correct?
421 if !self.inner.actions.is_empty() {
422 return Poll::Pending;
423 }
424
425 // TODO: Extract
426 match ready!(self.inner.poll_action(cx)) {
427 Some(action) => {
428 self.inner.actions.push_back(action);
429 continue;
430 }
431 None => {
432 panic!("unexpected write");
433 }
434 }
435 }
436 ret => {
437 self.maybe_wakeup_reader();
438 return Poll::Ready(ret);
439 }
440 }
441 }
442 }
443
444 fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
445 Poll::Ready(Ok(()))
446 }
447
448 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
449 Poll::Ready(Ok(()))
450 }
451}
452
453/// Ensures that Mock isn't dropped with data "inside".
454impl Drop for Mock {
455 fn drop(&mut self) {
456 // Avoid double panicking, since makes debugging much harder.
457 if std::thread::panicking() {
458 return;
459 }
460
461 self.inner.actions.iter().for_each(|a| match a {
462 Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
463 Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
464 _ => (),
465 })
466 }
467}
468/*
469/// Returns `true` if called from the context of a futures-rs Task
470fn is_task_ctx() -> bool {
471 use std::panic;
472
473 // Save the existing panic hook
474 let h = panic::take_hook();
475
476 // Install a new one that does nothing
477 panic::set_hook(Box::new(|_| {}));
478
479 // Attempt to call the fn
480 let r = panic::catch_unwind(|| task::current()).is_ok();
481
482 // Re-install the old one
483 panic::set_hook(h);
484
485 // Return the result
486 r
487}
488*/
489
490impl fmt::Debug for Inner {
491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492 write!(f, "Inner {{...}}")
493 }
494}