blob: 58d6d6b48d8380c529f1afb04375b7b1635a951b [file] [log] [blame]
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::ptr;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::thread::{self, ThreadId};
use crate::error::{Error, Result};
use crate::grpc_sys::{self, gpr_clock_type, grpc_completion_queue};
use crate::task::UnfinishedWork;
pub use crate::grpc_sys::grpc_completion_type as EventType;
pub use crate::grpc_sys::grpc_event as Event;
/// `CompletionQueueHandle` enable notification of the completion of asynchronous actions.
pub struct CompletionQueueHandle {
cq: *mut grpc_completion_queue,
// When `ref_cnt` < 0, a shutdown is pending, completion queue should not
// accept requests anymore; when `ref_cnt` == 0, completion queue should
// be shutdown; When `ref_cnt` > 0, completion queue can accept requests
// and should not be shutdown.
ref_cnt: AtomicIsize,
}
unsafe impl Sync for CompletionQueueHandle {}
unsafe impl Send for CompletionQueueHandle {}
impl CompletionQueueHandle {
pub fn new() -> CompletionQueueHandle {
CompletionQueueHandle {
cq: unsafe { grpc_sys::grpc_completion_queue_create_for_next(ptr::null_mut()) },
ref_cnt: AtomicIsize::new(1),
}
}
fn add_ref(&self) -> Result<()> {
let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
loop {
if cnt <= 0 {
// `shutdown` has been called, reject any requests.
return Err(Error::QueueShutdown);
}
let new_cnt = cnt + 1;
match self.ref_cnt.compare_exchange_weak(
cnt,
new_cnt,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => return Ok(()),
Err(c) => cnt = c,
}
}
}
fn unref(&self) {
let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
let shutdown = loop {
// If `shutdown` is not called, `cnt` > 0, so minus 1 to unref.
// If `shutdown` is called, `cnt` < 0, so plus 1 to unref.
let new_cnt = cnt - cnt.signum();
match self.ref_cnt.compare_exchange_weak(
cnt,
new_cnt,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break new_cnt == 0,
Err(c) => cnt = c,
}
};
if shutdown {
unsafe {
grpc_sys::grpc_completion_queue_shutdown(self.cq);
}
}
}
fn shutdown(&self) {
let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
let shutdown = loop {
if cnt <= 0 {
// `shutdown` is called, skipped.
return;
}
// Make cnt negative to indicate that `shutdown` has been called.
// Because `cnt` is initialized to 1, so minus 1 to make it reach
// toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`.
let new_cnt = -cnt + 1;
match self.ref_cnt.compare_exchange_weak(
cnt,
new_cnt,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break new_cnt == 0,
Err(c) => cnt = c,
}
};
if shutdown {
unsafe {
grpc_sys::grpc_completion_queue_shutdown(self.cq);
}
}
}
}
impl Drop for CompletionQueueHandle {
fn drop(&mut self) {
unsafe { grpc_sys::grpc_completion_queue_destroy(self.cq) }
}
}
pub struct CompletionQueueRef<'a> {
queue: &'a CompletionQueue,
}
impl<'a> CompletionQueueRef<'a> {
pub fn as_ptr(&self) -> *mut grpc_completion_queue {
self.queue.handle.cq
}
}
impl<'a> Drop for CompletionQueueRef<'a> {
fn drop(&mut self) {
self.queue.handle.unref();
}
}
/// `WorkQueue` stores the unfinished work of a completion queue.
///
/// Every completion queue has a work queue, and every work queue belongs
/// to exact one completion queue. `WorkQueue` is a short path for future
/// notifications. When a future is ready to be polled, there are two way
/// to notify it.
/// 1. If it's in the same thread where the future is spawned, the future
/// will be pushed into `WorkQueue` and be polled when current call tag
/// is handled;
/// 2. If not, the future will be wrapped as a call tag and pushed into
/// completion queue and finally popped at the call to `grpc_completion_queue_next`.
pub struct WorkQueue {
id: ThreadId,
pending_work: UnsafeCell<VecDeque<UnfinishedWork>>,
}
unsafe impl Sync for WorkQueue {}
unsafe impl Send for WorkQueue {}
const QUEUE_CAPACITY: usize = 4096;
impl WorkQueue {
pub fn new() -> WorkQueue {
WorkQueue {
id: std::thread::current().id(),
pending_work: UnsafeCell::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
}
}
/// Pushes an unfinished work into the inner queue.
///
/// If the method is not called from the same thread where it's created,
/// the work will returned and no work is pushed.
pub fn push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork> {
if self.id == thread::current().id() {
unsafe { &mut *self.pending_work.get() }.push_back(work);
None
} else {
Some(work)
}
}
/// Pops one unfinished work.
///
/// It should only be called from the same thread where the queue is created.
/// Otherwise it leads to undefined behavior.
pub unsafe fn pop_work(&self) -> Option<UnfinishedWork> {
let queue = &mut *self.pending_work.get();
if queue.capacity() > QUEUE_CAPACITY && queue.len() < queue.capacity() / 2 {
queue.shrink_to_fit();
}
{ &mut *self.pending_work.get() }.pop_back()
}
}
#[derive(Clone)]
pub struct CompletionQueue {
handle: Arc<CompletionQueueHandle>,
pub(crate) worker: Arc<WorkQueue>,
}
impl CompletionQueue {
pub fn new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue {
CompletionQueue { handle, worker }
}
/// Blocks until an event is available, the completion queue is being shut down.
pub fn next(&self) -> Event {
unsafe {
let inf = grpc_sys::gpr_inf_future(gpr_clock_type::GPR_CLOCK_REALTIME);
grpc_sys::grpc_completion_queue_next(self.handle.cq, inf, ptr::null_mut())
}
}
pub fn borrow(&self) -> Result<CompletionQueueRef<'_>> {
self.handle.add_ref()?;
Ok(CompletionQueueRef { queue: self })
}
/// Begin destruction of a completion queue.
///
/// Once all possible events are drained then `next()` will start to produce
/// `Event::QueueShutdown` events only.
pub fn shutdown(&self) {
self.handle.shutdown()
}
pub fn worker_id(&self) -> ThreadId {
self.worker.id
}
}