io_uring: Make UringContext Sync
Rather than requiring callers to wrap the entire UringContext in a big
lock, make UringContext Sync by internally using fine-grained locks:
* The submit queue is wrapped with a Mutex so that threads may still add
operations to the submit queue even when a thread is blocked
inside an io_uring_enter call.
* The completion queue internally uses a Mutex around the pending op
data and completed count so that two threads don't end up trying to
remove the same operation from the completed queue.
With this we can enable the await_uring_from_poll test. This test
uncovered missing wakeups in the case where a uring operation is added
from one thread while the main uring thread is blocked inside an
io_uring_enter syscall. To deal with this, we call submit() whenever the
pending operation is polled and not yet submitted.
BUG=none
TEST=unit tests
Change-Id: I4c7dec65c03b7b10014f4a84d2cd16fe8758ea72
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2643842
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Chirantan Ekbote <chirantan@chromium.org>
Reviewed-by: Dylan Reid <dgreid@chromium.org>
diff --git a/Cargo.lock b/Cargo.lock
index 9488878..7ec0f66 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -506,6 +506,7 @@
dependencies = [
"data_model",
"libc",
+ "sync",
"sys_util",
"syscall_defines",
]
diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs
index 40291cf..b74880a 100644
--- a/cros_async/src/io_ext.rs
+++ b/cros_async/src/io_ext.rs
@@ -170,9 +170,7 @@
}
}
- // TODO: Enable this test once UringContext is Sync.
#[test]
- #[ignore]
fn await_uring_from_poll() {
// Start a uring operation and then await the result from an FdExecutor.
async fn go(source: UringSource<File>) {
diff --git a/cros_async/src/uring_executor.rs b/cros_async/src/uring_executor.rs
index 69acc3b..ac6db5d 100644
--- a/cros_async/src/uring_executor.rs
+++ b/cros_async/src/uring_executor.rs
@@ -161,6 +161,7 @@
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
+ submitted: false,
})
}
@@ -176,6 +177,7 @@
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
+ submitted: false,
})
}
@@ -186,6 +188,7 @@
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
+ submitted: false,
})
}
@@ -196,6 +199,7 @@
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
+ submitted: false,
})
}
@@ -208,6 +212,7 @@
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
+ submitted: false,
})
}
}
@@ -282,6 +287,7 @@
let op = PendingOperation {
waker_token: Some(token),
ex: raw.clone(),
+ submitted: false,
};
match op.await {
@@ -331,7 +337,7 @@
struct RawExecutor {
queue: RunnableQueue,
- ctx: Mutex<URingContext>,
+ ctx: URingContext,
ring: Mutex<Ring>,
thread_id: Mutex<Option<ThreadId>>,
state: AtomicI32,
@@ -340,10 +346,9 @@
impl RawExecutor {
fn new(notify: EventFd) -> Result<RawExecutor> {
- let ctx = URingContext::new(NUM_ENTRIES).map_err(Error::CreatingContext)?;
Ok(RawExecutor {
queue: RunnableQueue::new(),
- ctx: Mutex::new(ctx),
+ ctx: URingContext::new(NUM_ENTRIES).map_err(Error::CreatingContext)?,
ring: Mutex::new(Ring {
ops: Slab::with_capacity(NUM_ENTRIES),
registered_sources: Slab::with_capacity(NUM_ENTRIES),
@@ -426,15 +431,13 @@
continue;
}
- // We need to make sure we always acquire the ring lock before the ctx lock. TODO: drop
- // the ctx lock once UringContext is Sync.
- let mut ring = self.ring.lock();
- let mut ctx = self.ctx.lock();
- let events = ctx.wait().map_err(Error::URingEnter)?;
+ let events = self.ctx.wait().map_err(Error::URingEnter)?;
// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
// writing to the eventfd.
self.state.store(PROCESSING, Ordering::Release);
+
+ let mut ring = self.ring.lock();
for (raw_token, result) in events {
// While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
// something that we originally gave to the kernel and that was created from a
@@ -530,11 +533,7 @@
source: &RegisteredSource,
events: &sys_util::WatchingEvents,
) -> Result<WakerToken> {
- // We need to make sure we always acquire the ring lock before the ctx lock. TODO: drop
- // the ctx lock once UringContext is Sync.
let mut ring = self.ring.lock();
- let mut ctx = self.ctx.lock();
-
let src = ring
.registered_sources
.get(source.tag)
@@ -542,7 +541,8 @@
.ok_or(Error::InvalidSource)?;
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
- ctx.add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
+ self.ctx
+ .add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
entry.insert(OpStatus::Pending(OpData {
@@ -551,6 +551,7 @@
waker: None,
canceled: false,
}));
+
Ok(WakerToken(next_op_token))
}
@@ -561,11 +562,7 @@
len: u64,
mode: u32,
) -> Result<WakerToken> {
- // We need to make sure we always acquire the ring lock before the ctx lock. TODO: drop
- // the ctx lock once UringContext is Sync.
let mut ring = self.ring.lock();
- let mut ctx = self.ctx.lock();
-
let src = ring
.registered_sources
.get(source.tag)
@@ -573,38 +570,14 @@
.ok_or(Error::InvalidSource)?;
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
- ctx.add_fallocate(
- src.as_raw_fd(),
- offset,
- len,
- mode,
- usize_to_u64(next_op_token),
- )
- .map_err(Error::SubmittingOp)?;
-
- entry.insert(OpStatus::Pending(OpData {
- _file: src,
- _mem: None,
- waker: None,
- canceled: false,
- }));
- Ok(WakerToken(next_op_token))
- }
-
- fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
- // We need to make sure we always acquire the ring lock before the ctx lock. TODO: drop
- // the ctx lock once UringContext is Sync.
- let mut ring = self.ring.lock();
- let mut ctx = self.ctx.lock();
-
- let src = ring
- .registered_sources
- .get(source.tag)
- .map(Arc::clone)
- .ok_or(Error::InvalidSource)?;
- let entry = ring.ops.vacant_entry();
- let next_op_token = entry.key();
- ctx.add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
+ self.ctx
+ .add_fallocate(
+ src.as_raw_fd(),
+ offset,
+ len,
+ mode,
+ usize_to_u64(next_op_token),
+ )
.map_err(Error::SubmittingOp)?;
entry.insert(OpStatus::Pending(OpData {
@@ -613,6 +586,30 @@
waker: None,
canceled: false,
}));
+
+ Ok(WakerToken(next_op_token))
+ }
+
+ fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
+ let mut ring = self.ring.lock();
+ let src = ring
+ .registered_sources
+ .get(source.tag)
+ .map(Arc::clone)
+ .ok_or(Error::InvalidSource)?;
+ let entry = ring.ops.vacant_entry();
+ let next_op_token = entry.key();
+ self.ctx
+ .add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
+ .map_err(Error::SubmittingOp)?;
+
+ entry.insert(OpStatus::Pending(OpData {
+ _file: src,
+ _mem: None,
+ waker: None,
+ canceled: false,
+ }));
+
Ok(WakerToken(next_op_token))
}
@@ -630,11 +627,7 @@
return Err(Error::InvalidOffset);
}
- // We need to make sure we always acquire the ring lock before the ctx lock. TODO: drop
- // the ctx lock once UringContext is Sync.
let mut ring = self.ring.lock();
- let mut ctx = self.ctx.lock();
-
let src = ring
.registered_sources
.get(source.tag)
@@ -655,7 +648,8 @@
// Safe because all the addresses are within the Memory that an Arc is kept for the
// duration to ensure the memory is valid while the kernel accesses it.
// Tested by `dont_drop_backing_mem_read` unit test.
- ctx.add_readv_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
+ self.ctx
+ .add_readv_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
}
@@ -683,11 +677,7 @@
return Err(Error::InvalidOffset);
}
- // We need to make sure we always acquire the ring lock before the ctx lock. TODO: drop
- // the ctx lock once UringContext is Sync.
let mut ring = self.ring.lock();
- let mut ctx = self.ctx.lock();
-
let src = ring
.registered_sources
.get(source.tag)
@@ -708,7 +698,8 @@
// Safe because all the addresses are within the Memory that an Arc is kept for the
// duration to ensure the memory is valid while the kernel accesses it.
// Tested by `dont_drop_backing_mem_write` unit test.
- ctx.add_writev_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
+ self.ctx
+ .add_writev_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
}
@@ -885,6 +876,7 @@
pub struct PendingOperation {
waker_token: Option<WakerToken>,
ex: Weak<RawExecutor>,
+ submitted: bool,
}
impl Future for PendingOperation {
@@ -900,6 +892,17 @@
self.waker_token = None;
Poll::Ready(result.map_err(Error::Io))
} else {
+ // If we haven't submitted the operation yet, do it now.
+ if !self.submitted {
+ match ex.ctx.submit() {
+ Ok(()) => self.submitted = true,
+ // If the kernel ring is full then wait until some ops are removed from the
+ // completion queue. This op should get submitted the next time the executor
+ // calls UringContext::wait.
+ Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
+ Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
+ }
+ }
Poll::Pending
}
} else {
@@ -929,21 +932,20 @@
use super::*;
use crate::uring_mem::{BackingMemory, MemRegion, VecIoWrapper};
- async fn pending() {
- Pending { is_ready: false }.await
+ // A future that returns ready when the uring queue is empty.
+ struct UringQueueEmpty<'a> {
+ ex: &'a URingExecutor,
}
- struct Pending {
- is_ready: bool,
- }
-
- impl Future for Pending {
+ impl<'a> Future for UringQueueEmpty<'a> {
type Output = ();
- fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
- if self.is_ready {
+
+ fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
+ // When there is only one operation in the uring queue we know it must be empty because
+ // that 1 operation comes from `notify_task`.
+ if self.ex.raw.ring.lock().ops.len() == 1 {
Poll::Ready(())
} else {
- self.is_ready = true;
Poll::Pending
}
}
@@ -983,8 +985,8 @@
// Finishing the operation should put the Arc count back to 1.
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
- tx.write(&[0u8; 8]).expect("write failed");
- ex.run_until(pending())
+ tx.write_all(&[0u8; 8]).expect("write failed");
+ ex.run_until(UringQueueEmpty { ex: &ex })
.expect("Failed to wait for read pipe ready");
assert_eq!(Arc::strong_count(&bm), 1);
}
@@ -1024,9 +1026,9 @@
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)];
- rx.read(&mut buf).expect("read to empty failed");
- ex.run_until(pending())
- .expect("Failed to wait for read pipe ready");
+ rx.read_exact(&mut buf).expect("read to empty failed");
+ ex.run_until(UringQueueEmpty { ex: &ex })
+ .expect("Failed to wait for write pipe ready");
assert_eq!(Arc::strong_count(&bm), 1);
}
diff --git a/io_uring/Cargo.toml b/io_uring/Cargo.toml
index e1c44f9..cbe5aa1 100644
--- a/io_uring/Cargo.toml
+++ b/io_uring/Cargo.toml
@@ -8,6 +8,7 @@
data_model = { path = "../data_model" }
libc = "*"
syscall_defines = { path = "../syscall_defines" }
+sync = { path = "../sync" }
sys_util = { path = "../sys_util" }
[dev-dependencies]
diff --git a/io_uring/src/uring.rs b/io_uring/src/uring.rs
index 4e91161..6586a71 100644
--- a/io_uring/src/uring.rs
+++ b/io_uring/src/uring.rs
@@ -11,9 +11,10 @@
use std::fs::File;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::pin::Pin;
-use std::sync::atomic::{AtomicPtr, AtomicU32, Ordering};
+use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use data_model::IoBufMut;
+use sync::Mutex;
use sys_util::{MappedRegion, MemoryMapping, Protection, WatchingEvents};
use crate::bindings::*;
@@ -61,9 +62,105 @@
/// Basic statistics about the operations that have been submitted to the uring.
#[derive(Default)]
pub struct URingStats {
- total_enter_calls: u64, // Number of times the uring has been entered.
- total_ops: u64, // Total ops submitted to io_uring.
- total_complete: u64, // Total ops completed by io_uring.
+ total_enter_calls: AtomicU64, // Number of times the uring has been entered.
+ total_ops: AtomicU64, // Total ops submitted to io_uring.
+ total_complete: AtomicU64, // Total ops completed by io_uring.
+}
+
+struct SubmitQueue {
+ submit_ring: SubmitQueueState,
+ submit_queue_entries: SubmitQueueEntries,
+ io_vecs: Pin<Box<[IoBufMut<'static>]>>,
+ submitting: usize, // The number of ops in the process of being submitted.
+ added: usize, // The number of ops added since the last call to `io_uring_enter`.
+ num_sqes: usize, // The total number of sqes allocated in shared memory.
+}
+
+impl SubmitQueue {
+ // Call `f` with the next available sqe or return an error if none are available.
+ // After `f` returns, the sqe is appended to the kernel's queue.
+ fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
+ where
+ F: FnMut(&mut io_uring_sqe, &mut libc::iovec),
+ {
+ if self.added == self.num_sqes {
+ return Err(Error::NoSpace);
+ }
+
+ // Find the next free submission entry in the submit ring and fill it with an iovec.
+ // The below raw pointer derefs are safe because the memory the pointers use lives as long
+ // as the mmap in self.
+ let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
+ let next_tail = tail.wrapping_add(1);
+ if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
+ return Err(Error::NoSpace);
+ }
+ // `tail` is the next sqe to use.
+ let index = (tail & self.submit_ring.ring_mask) as usize;
+ let sqe = self.submit_queue_entries.get_mut(index).unwrap();
+
+ f(sqe, self.io_vecs[index].as_mut());
+
+ // Tells the kernel to use the new index when processing the entry at that index.
+ self.submit_ring.set_array_entry(index, index as u32);
+ // Ensure the above writes to sqe are seen before the tail is updated.
+ // set_tail uses Release ordering when storing to the ring.
+ self.submit_ring.pointers.set_tail(next_tail);
+
+ self.added += 1;
+
+ Ok(())
+ }
+
+ // Returns the number of entries that have been added to this SubmitQueue since the last time
+ // `prepare_submit` was called.
+ fn prepare_submit(&mut self) -> usize {
+ let out = self.added - self.submitting;
+ self.submitting = self.added;
+
+ out
+ }
+
+ // Indicates that we failed to submit `count` entries to the kernel and that they should be
+ // retried.
+ fn fail_submit(&mut self, count: usize) {
+ debug_assert!(count <= self.submitting);
+ self.submitting -= count;
+ }
+
+ // Indicates that `count` entries have been submitted to the kernel and so the space may be
+ // reused for new entries.
+ fn complete_submit(&mut self, count: usize) {
+ debug_assert!(count <= self.submitting);
+ self.submitting -= count;
+ self.added -= count;
+ }
+
+ unsafe fn add_rw_op(
+ &mut self,
+ ptr: *const u8,
+ len: usize,
+ fd: RawFd,
+ offset: u64,
+ user_data: UserData,
+ op: u8,
+ ) -> Result<()> {
+ self.prep_next_sqe(|sqe, iovec| {
+ iovec.iov_base = ptr as *const libc::c_void as *mut _;
+ iovec.iov_len = len;
+ sqe.opcode = op;
+ sqe.addr = iovec as *const _ as *const libc::c_void as u64;
+ sqe.len = 1;
+ sqe.__bindgen_anon_1.off = offset;
+ sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
+ sqe.ioprio = 0;
+ sqe.user_data = user_data;
+ sqe.flags = 0;
+ sqe.fd = fd;
+ })?;
+
+ Ok(())
+ }
}
/// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
@@ -81,7 +178,7 @@
/// # use sys_util::WatchingEvents;
/// # use io_uring::URingContext;
/// let f = File::open(Path::new("/dev/zero")).unwrap();
-/// let mut uring = URingContext::new(16).unwrap();
+/// let uring = URingContext::new(16).unwrap();
/// uring
/// .add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454)
/// .unwrap();
@@ -92,13 +189,9 @@
/// ```
pub struct URingContext {
ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
- submit_ring: SubmitQueueState,
- submit_queue_entries: SubmitQueueEntries,
+ submit_ring: Mutex<SubmitQueue>,
complete_ring: CompleteQueueState,
- io_vecs: Pin<Box<[IoBufMut<'static>]>>,
- in_flight: usize, // The number of pending operations.
- added: usize, // The number of ops added since the last call to `io_uring_enter`.
- num_sqes: usize, // The total number of sqes allocated in shared memory.
+ in_flight: AtomicUsize, // The number of pending operations.
stats: URingStats,
}
@@ -161,79 +254,21 @@
Ok(URingContext {
ring_file,
- submit_ring,
- submit_queue_entries,
+ submit_ring: Mutex::new(SubmitQueue {
+ submit_ring,
+ submit_queue_entries,
+ io_vecs: Pin::from(vec![IoBufMut::new(&mut []); num_sqe].into_boxed_slice()),
+ submitting: 0,
+ added: 0,
+ num_sqes: ring_params.sq_entries as usize,
+ }),
complete_ring,
- io_vecs: Pin::from(vec![IoBufMut::new(&mut []); num_sqe].into_boxed_slice()),
- added: 0,
- num_sqes: ring_params.sq_entries as usize,
- in_flight: 0,
+ in_flight: AtomicUsize::new(0),
stats: Default::default(),
})
}
}
- // Call `f` with the next available sqe or return an error if none are available.
- // After `f` returns, the sqe is appended to the kernel's queue.
- fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
- where
- F: FnMut(&mut io_uring_sqe, &mut libc::iovec),
- {
- if self.added == self.num_sqes {
- return Err(Error::NoSpace);
- }
-
- // Find the next free submission entry in the submit ring and fill it with an iovec.
- // The below raw pointer derefs are safe because the memory the pointers use lives as long
- // as the mmap in self.
- let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
- let next_tail = tail.wrapping_add(1);
- if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
- return Err(Error::NoSpace);
- }
- // `tail` is the next sqe to use.
- let index = (tail & self.submit_ring.ring_mask) as usize;
- let sqe = self.submit_queue_entries.get_mut(index).unwrap();
-
- f(sqe, self.io_vecs[index].as_mut());
-
- // Tells the kernel to use the new index when processing the entry at that index.
- self.submit_ring.set_array_entry(index, index as u32);
- // Ensure the above writes to sqe are seen before the tail is updated.
- // set_tail uses Release ordering when storing to the ring.
- self.submit_ring.pointers.set_tail(next_tail);
-
- self.added += 1;
-
- Ok(())
- }
-
- unsafe fn add_rw_op(
- &mut self,
- ptr: *const u8,
- len: usize,
- fd: RawFd,
- offset: u64,
- user_data: UserData,
- op: u8,
- ) -> Result<()> {
- self.prep_next_sqe(|sqe, iovec| {
- iovec.iov_base = ptr as *const libc::c_void as *mut _;
- iovec.iov_len = len;
- sqe.opcode = op;
- sqe.addr = iovec as *const _ as *const libc::c_void as u64;
- sqe.len = 1;
- sqe.__bindgen_anon_1.off = offset;
- sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
- sqe.ioprio = 0;
- sqe.user_data = user_data;
- sqe.flags = 0;
- sqe.fd = fd;
- })?;
-
- Ok(())
- }
-
/// Asynchronously writes to `fd` from the address given in `ptr`.
/// # Safety
/// `add_write` will write up to `len` bytes of data from the address given by `ptr`. This is
@@ -242,14 +277,16 @@
/// be other references to the data pointed to by `ptr` until the operation completes. Ensure
/// that the fd remains open until the op completes as well.
pub unsafe fn add_write(
- &mut self,
+ &self,
ptr: *const u8,
len: usize,
fd: RawFd,
offset: u64,
user_data: UserData,
) -> Result<()> {
- self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_WRITEV as u8)
+ self.submit_ring
+ .lock()
+ .add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_WRITEV as u8)
}
/// Asynchronously reads from `fd` to the address given in `ptr`.
@@ -261,20 +298,22 @@
/// pointed to by `ptr` until the operation completes. Ensure that the fd remains open until
/// the op completes as well.
pub unsafe fn add_read(
- &mut self,
+ &self,
ptr: *mut u8,
len: usize,
fd: RawFd,
offset: u64,
user_data: UserData,
) -> Result<()> {
- self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8)
+ self.submit_ring
+ .lock()
+ .add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8)
}
/// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
/// existence.
pub unsafe fn add_writev_iter<I>(
- &mut self,
+ &self,
iovecs: I,
fd: RawFd,
offset: u64,
@@ -308,13 +347,13 @@
/// the operation completes. Ensure that the fd remains open until the op completes as well.
/// The iovecs reference must be kept alive until the op returns.
pub unsafe fn add_writev(
- &mut self,
+ &self,
iovecs: Pin<Box<[IoBufMut<'static>]>>,
fd: RawFd,
offset: u64,
user_data: UserData,
) -> Result<()> {
- self.prep_next_sqe(|sqe, _iovec| {
+ self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_WRITEV as u8;
sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
sqe.len = iovecs.len() as u32;
@@ -332,7 +371,7 @@
/// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
/// existence.
pub unsafe fn add_readv_iter<I>(
- &mut self,
+ &self,
iovecs: I,
fd: RawFd,
offset: u64,
@@ -366,13 +405,13 @@
/// operation completes. Ensure that the fd remains open until the op completes as well.
/// The iovecs reference must be kept alive until the op returns.
pub unsafe fn add_readv(
- &mut self,
+ &self,
iovecs: Pin<Box<[IoBufMut<'static>]>>,
fd: RawFd,
offset: u64,
user_data: UserData,
) -> Result<()> {
- self.prep_next_sqe(|sqe, _iovec| {
+ self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_READV as u8;
sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
sqe.len = iovecs.len() as u32;
@@ -387,10 +426,28 @@
Ok(())
}
+ /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
+ /// io_uring itself and for waking up a thread that's blocked inside a wait() call.
+ pub fn add_nop(&self, user_data: UserData) -> Result<()> {
+ self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
+ sqe.opcode = IORING_OP_NOP as u8;
+ sqe.fd = -1;
+ sqe.user_data = user_data;
+
+ sqe.addr = 0;
+ sqe.len = 0;
+ sqe.__bindgen_anon_1.off = 0;
+ sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
+ sqe.__bindgen_anon_2.rw_flags = 0;
+ sqe.ioprio = 0;
+ sqe.flags = 0;
+ })
+ }
+
/// Syncs all completed operations, the ordering with in-flight async ops is not
/// defined.
- pub fn add_fsync(&mut self, fd: RawFd, user_data: UserData) -> Result<()> {
- self.prep_next_sqe(|sqe, _iovec| {
+ pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
+ self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_FSYNC as u8;
sqe.fd = fd;
sqe.user_data = user_data;
@@ -407,7 +464,7 @@
/// See the usage of `fallocate`, this asynchronously performs the same operations.
pub fn add_fallocate(
- &mut self,
+ &self,
fd: RawFd,
offset: u64,
len: u64,
@@ -416,7 +473,7 @@
) -> Result<()> {
// Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
// len field.
- self.prep_next_sqe(|sqe, _iovec| {
+ self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_FALLOCATE as u8;
sqe.fd = fd;
@@ -438,12 +495,12 @@
/// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
/// to get future events.
pub fn add_poll_fd(
- &mut self,
+ &self,
fd: RawFd,
events: &WatchingEvents,
user_data: UserData,
) -> Result<()> {
- self.prep_next_sqe(|sqe, _iovec| {
+ self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_POLL_ADD as u8;
sqe.fd = fd;
sqe.user_data = user_data;
@@ -460,12 +517,12 @@
/// Removes an FD that was previously added with `add_poll_fd`.
pub fn remove_poll_fd(
- &mut self,
+ &self,
fd: RawFd,
events: &WatchingEvents,
user_data: UserData,
) -> Result<()> {
- self.prep_next_sqe(|sqe, _iovec| {
+ self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
sqe.opcode = IORING_OP_POLL_REMOVE as u8;
sqe.fd = fd;
sqe.user_data = user_data;
@@ -480,63 +537,87 @@
})
}
- /// Sends operations added with the `add_*` functions to the kernel.
- pub fn submit(&mut self) -> Result<()> {
- self.in_flight += self.added;
- self.stats.total_ops = self.stats.total_ops.wrapping_add(self.added as u64);
- if self.added > 0 {
- self.stats.total_enter_calls = self.stats.total_enter_calls.wrapping_add(1);
- unsafe {
- // Safe because the only memory modified is in the completion queue.
- io_uring_enter(self.ring_file.as_raw_fd(), self.added as u64, 0, 0)
- .map_err(Error::RingEnter)?;
+ // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
+ // waiting for `wait_nr` operations to complete.
+ fn enter(&self, wait_nr: u64) -> Result<()> {
+ let completed = self.complete_ring.num_completed();
+ self.stats
+ .total_complete
+ .fetch_add(completed as u64, Ordering::Relaxed);
+ self.in_flight.fetch_sub(completed, Ordering::Relaxed);
+
+ let added = self.submit_ring.lock().prepare_submit();
+ if added == 0 && wait_nr == 0 {
+ return Ok(());
+ }
+
+ self.stats.total_enter_calls.fetch_add(1, Ordering::Relaxed);
+ let flags = if wait_nr > 0 {
+ IORING_ENTER_GETEVENTS
+ } else {
+ 0
+ };
+ let res = unsafe {
+ // Safe because the only memory modified is in the completion queue.
+ io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags)
+ };
+
+ match res {
+ Ok(_) => {
+ self.submit_ring.lock().complete_submit(added);
+ self.stats
+ .total_ops
+ .fetch_add(added as u64, Ordering::Relaxed);
+
+ // Release store synchronizes with acquire load above.
+ self.in_flight.fetch_add(added, Ordering::Release);
+ }
+ Err(e) => {
+ self.submit_ring.lock().fail_submit(added);
+
+ if wait_nr == 0 || e != libc::EBUSY {
+ return Err(Error::RingEnter(e));
+ }
+
+ // An ebusy return means that some completed events must be processed before
+ // submitting more, wait for some to finish without pushing the new sqes in
+ // that case.
+ unsafe {
+ io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags)
+ .map_err(Error::RingEnter)?;
+ }
}
}
- self.added = 0;
Ok(())
}
+ /// Sends operations added with the `add_*` functions to the kernel.
+ pub fn submit(&self) -> Result<()> {
+ self.enter(0)
+ }
+
/// Sends operations added with the `add_*` functions to the kernel and return an iterator to any
/// completed operations. `wait` blocks until at least one completion is ready. If called
/// without any new events added, this simply waits for any existing events to complete and
/// returns as soon an one or more is ready.
- pub fn wait(&mut self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
- let completed = self.complete_ring.num_completed();
- self.stats.total_complete = self.stats.total_complete.wrapping_add(completed as u64);
- self.in_flight -= completed;
- self.stats.total_ops = self.stats.total_ops.wrapping_add(self.added as u64);
- if self.in_flight > 0 || self.added > 0 {
- unsafe {
- self.stats.total_enter_calls = self.stats.total_enter_calls.wrapping_add(1);
- // Safe because the only memory modified is in the completion queue.
- let ret = io_uring_enter(
- self.ring_file.as_raw_fd(),
- self.added as u64,
- 1,
- IORING_ENTER_GETEVENTS,
- );
- match ret {
- Ok(_) => {
- self.in_flight += self.added;
- self.added = 0;
- }
- Err(e) => {
- if e != libc::EBUSY {
- return Err(Error::RingEnter(e));
- }
- // An ebusy return means that some completed events must be processed before
- // submitting more, wait for some to finish without pushing the new sqes in
- // that case.
- io_uring_enter(self.ring_file.as_raw_fd(), 0, 1, IORING_ENTER_GETEVENTS)
- .map_err(Error::RingEnter)?;
- }
- }
- }
- }
+ pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
+ // We only want to wait for events if there aren't already events in the completion queue.
+ let wait_nr = if self.complete_ring.num_ready() > 0 {
+ 0
+ } else {
+ 1
+ };
// The CompletionQueue will iterate all completed ops.
- Ok(&mut self.complete_ring)
+ match self.enter(wait_nr) {
+ Ok(()) => Ok(&self.complete_ring),
+ // If we cannot submit any more entries then we need to pull stuff out of the completion
+ // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
+ // we know there are already entries in the completion queue.
+ Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
+ Err(e) => Err(e),
+ }
}
}
@@ -605,15 +686,20 @@
}
}
+#[derive(Default)]
+struct CompleteQueueData {
+ completed: usize,
+ //For ops that pass in arrays of iovecs, they need to be valid for the duration of the
+ //operation because the kernel might read them at any time.
+ pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
+}
+
struct CompleteQueueState {
mmap: MemoryMapping,
pointers: QueuePointers,
ring_mask: u32,
cqes_offset: u32,
- completed: usize,
- //For ops that pass in arrays of iovecs, they need to be valid for the duration of the
- //operation because the kernel might read them at any time.
- pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
+ data: Mutex<CompleteQueueData>,
}
impl CompleteQueueState {
@@ -630,13 +716,12 @@
pointers: QueuePointers { head, tail },
ring_mask,
cqes_offset: params.cq_off.cqes,
- completed: 0,
- pending_op_addrs: BTreeMap::new(),
+ data: Default::default(),
}
}
- fn add_op_data(&mut self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
- self.pending_op_addrs.insert(user_data, addrs);
+ fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
+ self.data.lock().pending_op_addrs.insert(user_data, addrs);
}
fn get_cqe(&self, head: u32) -> &io_uring_cqe {
@@ -652,16 +737,23 @@
}
}
- fn num_completed(&mut self) -> usize {
- std::mem::replace(&mut self.completed, 0)
+ fn num_ready(&self) -> u32 {
+ let tail = self.pointers.tail(Ordering::Acquire);
+ let head = self.pointers.head(Ordering::Relaxed);
+
+ tail.saturating_sub(head)
}
-}
-// Return the completed ops with their result.
-impl Iterator for CompleteQueueState {
- type Item = (UserData, std::io::Result<u32>);
+ fn num_completed(&self) -> usize {
+ let mut data = self.data.lock();
+ ::std::mem::replace(&mut data.completed, 0)
+ }
- fn next(&mut self) -> Option<Self::Item> {
+ fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
+ // Take the lock on self.data first so that 2 threads don't try to pop the same completed op
+ // from the queue.
+ let mut data = self.data.lock();
+
// Safe because the pointers to the atomics are valid and the cqe must be in range
// because the kernel provided mask is applied to the index.
let head = self.pointers.head(Ordering::Relaxed);
@@ -671,14 +763,14 @@
return None;
}
- self.completed += 1;
+ data.completed += 1;
let cqe = self.get_cqe(head);
let user_data = cqe.user_data;
let res = cqe.res;
// free the addrs saved for this op.
- let _ = self.pending_op_addrs.remove(&user_data);
+ let _ = data.pending_op_addrs.remove(&user_data);
// Store the new head and ensure the reads above complete before the kernel sees the
// update to head, `set_head` uses `Release` ordering
@@ -693,14 +785,24 @@
}
}
+// Return the completed ops with their result.
+impl<'c> Iterator for &'c CompleteQueueState {
+ type Item = (UserData, std::io::Result<u32>);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.pop_front()
+ }
+}
+
struct QueuePointers {
head: *const AtomicU32,
tail: *const AtomicU32,
}
-// Rust pointers don't implement Send but in this case both fields are atomics and so it's safe to
-// send the pointers between threads.
+// Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
+// safe to send the pointers between threads or access them concurrently from multiple threads.
unsafe impl Send for QueuePointers {}
+unsafe impl Sync for QueuePointers {}
impl QueuePointers {
// Loads the tail pointer atomically with the given ordering.
@@ -738,13 +840,19 @@
#[cfg(test)]
mod tests {
+ use std::collections::BTreeSet;
use std::fs::OpenOptions;
use std::io::{IoSlice, IoSliceMut};
use std::io::{Read, Seek, SeekFrom, Write};
+ use std::mem;
use std::path::{Path, PathBuf};
+ use std::sync::mpsc::channel;
+ use std::sync::{Arc, Barrier};
+ use std::thread;
use std::time::Duration;
- use sys_util::PollContext;
+ use sync::{Condvar, Mutex};
+ use sys_util::{pipe, PollContext};
use tempfile::{tempfile, TempDir};
use super::*;
@@ -756,7 +864,7 @@
}
fn check_one_read(
- uring: &mut URingContext,
+ uring: &URingContext,
buf: &mut [u8],
fd: RawFd,
offset: u64,
@@ -774,7 +882,7 @@
}
fn check_one_readv(
- uring: &mut URingContext,
+ uring: &URingContext,
buf: &mut [u8],
fd: RawFd,
offset: u64,
@@ -809,7 +917,7 @@
const QUEUE_SIZE: usize = 10;
const BUF_SIZE: usize = 0x1000;
- let mut uring = URingContext::new(QUEUE_SIZE).unwrap();
+ let uring = URingContext::new(QUEUE_SIZE).unwrap();
let mut buf = [0u8; BUF_SIZE * QUEUE_SIZE];
let f = create_test_file((BUF_SIZE * QUEUE_SIZE) as u64);
@@ -840,7 +948,7 @@
fn read_readv() {
let queue_size = 128;
- let mut uring = URingContext::new(queue_size).unwrap();
+ let uring = URingContext::new(queue_size).unwrap();
let mut buf = [0u8; 0x1000];
let f = create_test_file(0x1000 * 2);
@@ -848,20 +956,8 @@
// double the quue depth of buffers.
for i in 0..queue_size * 2 {
let index = i as u64;
- check_one_read(
- &mut uring,
- &mut buf,
- f.as_raw_fd(),
- (index % 2) * 0x1000,
- index,
- );
- check_one_readv(
- &mut uring,
- &mut buf,
- f.as_raw_fd(),
- (index % 2) * 0x1000,
- index,
- );
+ check_one_read(&uring, &mut buf, f.as_raw_fd(), (index % 2) * 0x1000, index);
+ check_one_readv(&uring, &mut buf, f.as_raw_fd(), (index % 2) * 0x1000, index);
}
}
@@ -870,7 +966,7 @@
let queue_size = 128;
const BUF_SIZE: usize = 0x2000;
- let mut uring = URingContext::new(queue_size).unwrap();
+ let uring = URingContext::new(queue_size).unwrap();
let mut buf = [0u8; BUF_SIZE];
let mut buf2 = [0u8; BUF_SIZE];
let mut buf3 = [0u8; BUF_SIZE];
@@ -900,7 +996,7 @@
#[test]
fn write_one_block() {
- let mut uring = URingContext::new(16).unwrap();
+ let uring = URingContext::new(16).unwrap();
let mut buf = [0u8; 4096];
let mut f = create_test_file(0);
f.write(&buf).unwrap();
@@ -919,7 +1015,7 @@
#[test]
fn write_one_submit_poll() {
- let mut uring = URingContext::new(16).unwrap();
+ let uring = URingContext::new(16).unwrap();
let mut buf = [0u8; 4096];
let mut f = create_test_file(0);
f.write(&buf).unwrap();
@@ -954,7 +1050,7 @@
const BUF_SIZE: usize = 0x2000;
const OFFSET: u64 = 0x2000;
- let mut uring = URingContext::new(queue_size).unwrap();
+ let uring = URingContext::new(queue_size).unwrap();
let buf = [0xaau8; BUF_SIZE];
let buf2 = [0xffu8; BUF_SIZE];
let buf3 = [0x55u8; BUF_SIZE];
@@ -1013,7 +1109,7 @@
.open(&file_path)
.unwrap();
- let mut uring = URingContext::new(16).unwrap();
+ let uring = URingContext::new(16).unwrap();
uring
.add_fallocate(f.as_raw_fd(), 0, set_size as u64, 0, 66)
.unwrap();
@@ -1085,7 +1181,7 @@
#[test]
fn dev_zero_readable() {
let f = File::open(Path::new("/dev/zero")).unwrap();
- let mut uring = URingContext::new(16).unwrap();
+ let uring = URingContext::new(16).unwrap();
uring
.add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454)
.unwrap();
@@ -1098,7 +1194,7 @@
fn queue_many_ebusy_retry() {
let num_entries = 16;
let f = File::open(Path::new("/dev/zero")).unwrap();
- let mut uring = URingContext::new(num_entries).unwrap();
+ let uring = URingContext::new(num_entries).unwrap();
// Fill the sumbit ring.
for sqe_batch in 0..3 {
for i in 0..num_entries {
@@ -1136,4 +1232,310 @@
}
assert!(results.next().is_none());
}
+
+ #[test]
+ fn wake_with_nop() {
+ const PIPE_READ: UserData = 0;
+ const NOP: UserData = 1;
+ const BUF_DATA: [u8; 16] = [0xf4; 16];
+
+ let uring = URingContext::new(4).map(Arc::new).unwrap();
+ let (pipe_out, mut pipe_in) = pipe(true).unwrap();
+ let (tx, rx) = channel();
+
+ let uring2 = uring.clone();
+ let wait_thread = thread::spawn(move || {
+ let mut buf = [0u8; BUF_DATA.len()];
+ unsafe {
+ uring2
+ .add_read(buf.as_mut_ptr(), buf.len(), pipe_out.as_raw_fd(), 0, 0)
+ .unwrap();
+ }
+
+ // This is still a bit racy as the other thread may end up adding the NOP before we make
+ // the syscall but I'm not aware of a mechanism that will notify the other thread
+ // exactly when we make the syscall.
+ tx.send(()).unwrap();
+ let mut events = uring2.wait().unwrap();
+ let (user_data, result) = events.next().unwrap();
+ assert_eq!(user_data, NOP);
+ assert_eq!(result.unwrap(), 0);
+
+ tx.send(()).unwrap();
+ let mut events = uring2.wait().unwrap();
+ let (user_data, result) = events.next().unwrap();
+ assert_eq!(user_data, PIPE_READ);
+ assert_eq!(result.unwrap(), buf.len() as u32);
+ assert_eq!(&buf, &BUF_DATA);
+ });
+
+ // Wait until the other thread is about to make the syscall.
+ rx.recv_timeout(Duration::from_secs(10)).unwrap();
+
+ // Now add a NOP operation. This should wake up the other thread even though it cannot yet
+ // read from the pipe.
+ uring.add_nop(NOP).unwrap();
+ uring.submit().unwrap();
+
+ // Wait for the other thread to process the NOP result.
+ rx.recv_timeout(Duration::from_secs(10)).unwrap();
+
+ // Now write to the pipe to finish the uring read.
+ pipe_in.write_all(&BUF_DATA).unwrap();
+
+ wait_thread.join().unwrap();
+ }
+
+ #[test]
+ fn complete_from_any_thread() {
+ let num_entries = 16;
+ let uring = URingContext::new(num_entries).map(Arc::new).unwrap();
+
+ // Fill the sumbit ring.
+ for sqe_batch in 0..3 {
+ for i in 0..num_entries {
+ uring.add_nop((sqe_batch * num_entries + i) as u64).unwrap();
+ }
+ uring.submit().unwrap();
+ }
+
+ // Spawn a bunch of threads that pull cqes out of the uring and make sure none of them see a
+ // duplicate.
+ const NUM_THREADS: usize = 7;
+ let completed = Arc::new(Mutex::new(BTreeSet::new()));
+ let cv = Arc::new(Condvar::new());
+ let barrier = Arc::new(Barrier::new(NUM_THREADS));
+
+ let mut threads = Vec::with_capacity(NUM_THREADS);
+ for _ in 0..NUM_THREADS {
+ let uring = uring.clone();
+ let completed = completed.clone();
+ let barrier = barrier.clone();
+ let cv = cv.clone();
+ threads.push(thread::spawn(move || {
+ barrier.wait();
+
+ 'wait: while completed.lock().len() < num_entries * 3 {
+ for (user_data, result) in uring.wait().unwrap() {
+ assert_eq!(result.unwrap(), 0);
+
+ let mut completed = completed.lock();
+ assert!(completed.insert(user_data));
+ if completed.len() >= num_entries * 3 {
+ break 'wait;
+ }
+ }
+ }
+
+ cv.notify_one();
+ }));
+ }
+
+ // Wait until all the operations have completed.
+ let mut c = completed.lock();
+ while c.len() < num_entries * 3 {
+ c = cv.wait(c);
+ }
+ mem::drop(c);
+
+ // Now add NOPs to wake up any threads blocked on the syscall.
+ for i in 0..NUM_THREADS {
+ uring.add_nop((num_entries * 3 + i) as UserData).unwrap();
+ }
+ uring.submit().unwrap();
+
+ for t in threads {
+ t.join().unwrap();
+ }
+ }
+
+ #[test]
+ fn submit_from_any_thread() {
+ const NUM_THREADS: usize = 7;
+ const ITERATIONS: usize = 113;
+ const NUM_ENTRIES: usize = 16;
+
+ fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
+ let mut in_flight = in_flight.lock();
+ while *in_flight > NUM_ENTRIES as isize {
+ in_flight = cv.wait(in_flight);
+ }
+ }
+
+ let uring = URingContext::new(NUM_ENTRIES).map(Arc::new).unwrap();
+ let in_flight = Arc::new(Mutex::new(0));
+ let cv = Arc::new(Condvar::new());
+
+ let mut threads = Vec::with_capacity(NUM_THREADS);
+ for idx in 0..NUM_THREADS {
+ let uring = uring.clone();
+ let in_flight = in_flight.clone();
+ let cv = cv.clone();
+ threads.push(thread::spawn(move || {
+ for iter in 0..ITERATIONS {
+ loop {
+ match uring.add_nop(((idx * NUM_THREADS) + iter) as UserData) {
+ Ok(()) => *in_flight.lock() += 1,
+ Err(Error::NoSpace) => {
+ wait_for_completion_thread(&in_flight, &cv);
+ continue;
+ }
+ Err(e) => panic!("Failed to add nop: {}", e),
+ }
+
+ // We don't need to wait for the completion queue if the submit fails with
+ // EBUSY because we already added the operation to the submit queue. It will
+ // get added eventually.
+ match uring.submit() {
+ Ok(()) => break,
+ Err(Error::RingEnter(libc::EBUSY)) => break,
+ Err(e) => panic!("Failed to submit ops: {}", e),
+ }
+ }
+ }
+ }));
+ }
+
+ let mut completed = 0;
+ while completed < NUM_THREADS * ITERATIONS {
+ for (_, res) in uring.wait().unwrap() {
+ assert_eq!(res.unwrap(), 0);
+ completed += 1;
+
+ let mut in_flight = in_flight.lock();
+ *in_flight -= 1;
+ let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
+ mem::drop(in_flight);
+
+ if notify_submitters {
+ cv.notify_all();
+ }
+
+ if completed >= NUM_THREADS * ITERATIONS {
+ break;
+ }
+ }
+ }
+
+ for t in threads {
+ t.join().unwrap();
+ }
+
+ // Make sure we didn't submit more entries than expected.
+ assert_eq!(*in_flight.lock(), 0);
+ assert_eq!(uring.submit_ring.lock().added, 0);
+ assert_eq!(uring.complete_ring.num_ready(), 0);
+ assert_eq!(
+ uring.stats.total_ops.load(Ordering::Relaxed),
+ (NUM_THREADS * ITERATIONS) as u64
+ );
+ }
+
+ #[test]
+ fn multi_thread_submit_and_complete() {
+ const NUM_SUBMITTERS: usize = 7;
+ const NUM_COMPLETERS: usize = 3;
+ const ITERATIONS: usize = 113;
+ const NUM_ENTRIES: usize = 16;
+
+ fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
+ let mut in_flight = in_flight.lock();
+ while *in_flight > NUM_ENTRIES as isize {
+ in_flight = cv.wait(in_flight);
+ }
+ }
+
+ let uring = URingContext::new(NUM_ENTRIES).map(Arc::new).unwrap();
+ let in_flight = Arc::new(Mutex::new(0));
+ let cv = Arc::new(Condvar::new());
+
+ let mut threads = Vec::with_capacity(NUM_SUBMITTERS + NUM_COMPLETERS);
+ for idx in 0..NUM_SUBMITTERS {
+ let uring = uring.clone();
+ let in_flight = in_flight.clone();
+ let cv = cv.clone();
+ threads.push(thread::spawn(move || {
+ for iter in 0..ITERATIONS {
+ loop {
+ match uring.add_nop(((idx * NUM_SUBMITTERS) + iter) as UserData) {
+ Ok(()) => *in_flight.lock() += 1,
+ Err(Error::NoSpace) => {
+ wait_for_completion_thread(&in_flight, &cv);
+ continue;
+ }
+ Err(e) => panic!("Failed to add nop: {}", e),
+ }
+
+ // We don't need to wait for the completion queue if the submit fails with
+ // EBUSY because we already added the operation to the submit queue. It will
+ // get added eventually.
+ match uring.submit() {
+ Ok(()) => break,
+ Err(Error::RingEnter(libc::EBUSY)) => break,
+ Err(e) => panic!("Failed to submit ops: {}", e),
+ }
+ }
+ }
+ }));
+ }
+
+ let completed = Arc::new(AtomicUsize::new(0));
+ for _ in 0..NUM_COMPLETERS {
+ let uring = uring.clone();
+ let in_flight = in_flight.clone();
+ let cv = cv.clone();
+ let completed = completed.clone();
+ threads.push(thread::spawn(move || {
+ while completed.load(Ordering::Relaxed) < NUM_SUBMITTERS * ITERATIONS {
+ for (_, res) in uring.wait().unwrap() {
+ assert_eq!(res.unwrap(), 0);
+ completed.fetch_add(1, Ordering::Relaxed);
+
+ let mut in_flight = in_flight.lock();
+ *in_flight -= 1;
+ let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
+ mem::drop(in_flight);
+
+ if notify_submitters {
+ cv.notify_all();
+ }
+
+ if completed.load(Ordering::Relaxed) >= NUM_SUBMITTERS * ITERATIONS {
+ break;
+ }
+ }
+ }
+ }));
+ }
+
+ for t in threads.drain(..NUM_SUBMITTERS) {
+ t.join().unwrap();
+ }
+
+ // Now that all submitters are finished, add NOPs to wake up any completers blocked on the
+ // syscall.
+ for i in 0..NUM_COMPLETERS {
+ uring
+ .add_nop((NUM_SUBMITTERS * ITERATIONS + i) as UserData)
+ .unwrap();
+ }
+ uring.submit().unwrap();
+
+ for t in threads {
+ t.join().unwrap();
+ }
+
+ // Make sure we didn't submit more entries than expected. Only the last few NOPs added to
+ // wake up the completer threads may still be in the completion ring.
+ assert!(uring.complete_ring.num_ready() <= NUM_COMPLETERS as u32);
+ assert_eq!(
+ in_flight.lock().abs() as u32 + uring.complete_ring.num_ready(),
+ NUM_COMPLETERS as u32
+ );
+ assert_eq!(uring.submit_ring.lock().added, 0);
+ assert_eq!(
+ uring.stats.total_ops.load(Ordering::Relaxed),
+ (NUM_SUBMITTERS * ITERATIONS + NUM_COMPLETERS) as u64
+ );
+ }
}