Import grpcio 0.6.0
And add metadata files using the following command:
get_rust_pkg.py --add3prf -v grpcio-0.6.0 -o grpcio
Test: none
Change-Id: I53cc0feb5c9d24eacb62331b968cab4ec85f60a6
diff --git a/src/task/callback.rs b/src/task/callback.rs
new file mode 100644
index 0000000..2675469
--- /dev/null
+++ b/src/task/callback.rs
@@ -0,0 +1,84 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::call::server::{RequestContext, UnaryRequestContext};
+use crate::call::{BatchContext, Call};
+use crate::cq::CompletionQueue;
+use crate::server::{self, RequestCallContext};
+
+pub struct Request {
+ ctx: RequestContext,
+}
+
+impl Request {
+ pub fn new(rc: RequestCallContext) -> Request {
+ let ctx = RequestContext::new(rc);
+ Request { ctx }
+ }
+
+ pub fn context(&self) -> &RequestContext {
+ &self.ctx
+ }
+
+ pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
+ let mut rc = self.ctx.take_request_call_context().unwrap();
+ if !success {
+ server::request_call(rc, cq);
+ return;
+ }
+
+ match self.ctx.handle_stream_req(cq, &mut rc) {
+ Ok(_) => server::request_call(rc, cq),
+ Err(ctx) => ctx.handle_unary_req(rc, cq),
+ }
+ }
+}
+
+pub struct UnaryRequest {
+ ctx: UnaryRequestContext,
+}
+
+impl UnaryRequest {
+ pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequest {
+ let ctx = UnaryRequestContext::new(ctx, rc);
+ UnaryRequest { ctx }
+ }
+
+ pub fn batch_ctx(&self) -> &BatchContext {
+ self.ctx.batch_ctx()
+ }
+
+ pub fn request_ctx(&self) -> &RequestContext {
+ self.ctx.request_ctx()
+ }
+
+ pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
+ let mut rc = self.ctx.take_request_call_context().unwrap();
+ if !success {
+ server::request_call(rc, cq);
+ return;
+ }
+
+ let reader = self.ctx.batch_ctx_mut().recv_message();
+ self.ctx.handle(&mut rc, cq, reader);
+ server::request_call(rc, cq);
+ }
+}
+
+/// A callback to wait for status for the aborted rpc call to be sent.
+pub struct Abort {
+ ctx: BatchContext,
+ _call: Call,
+}
+
+impl Abort {
+ pub fn new(call: Call) -> Abort {
+ Abort {
+ ctx: BatchContext::new(),
+ _call: call,
+ }
+ }
+
+ pub fn batch_ctx(&self) -> &BatchContext {
+ &self.ctx
+ }
+}
diff --git a/src/task/executor.rs b/src/task/executor.rs
new file mode 100644
index 0000000..4a13905
--- /dev/null
+++ b/src/task/executor.rs
@@ -0,0 +1,256 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+//! gRPC C Core binds a call to a completion queue, all the related readiness
+//! will be forwarded to the completion queue. This module utilizes the mechanism
+//! and using `Kicker` to wake up completion queue.
+//!
+//! Apparently, to minimize context switch, it's better to bind the future to the
+//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
+
+use std::cell::UnsafeCell;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicU8, Ordering};
+use std::sync::Arc;
+
+use futures::future::Future;
+use futures::task::{waker_ref, ArcWake, Context, Poll};
+
+use super::CallTag;
+use crate::call::Call;
+use crate::cq::{CompletionQueue, WorkQueue};
+use crate::error::{Error, Result};
+use crate::grpc_sys::{self, grpc_call_error};
+
+/// A handle to a `Spawn`.
+/// Inner future is expected to be polled in the same thread as cq.
+type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
+
+/// `Kicker` wakes up the completion queue that the inner call binds to.
+pub(crate) struct Kicker {
+ call: Call,
+}
+
+impl Kicker {
+ pub fn from_call(call: Call) -> Kicker {
+ Kicker { call }
+ }
+
+ /// Wakes up its completion queue.
+ ///
+ /// `tag` will be popped by `grpc_completion_queue_next` in the future.
+ pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
+ let _ref = self.call.cq.borrow()?;
+ unsafe {
+ let ptr = Box::into_raw(tag);
+ let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
+ if status == grpc_call_error::GRPC_CALL_OK {
+ Ok(())
+ } else {
+ Err(Error::CallFailure(status))
+ }
+ }
+ }
+}
+
+unsafe impl Sync for Kicker {}
+
+impl Clone for Kicker {
+ fn clone(&self) -> Kicker {
+ // Bump call's reference count.
+ let call = unsafe {
+ grpc_sys::grpc_call_ref(self.call.call);
+ self.call.call
+ };
+ let cq = self.call.cq.clone();
+ Kicker {
+ call: Call { call, cq },
+ }
+ }
+}
+
+/// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
+/// it will be notified via task.wake(), and marked as NOTIFIED. When executor
+/// begins to poll the future, it's marked as POLLING. When the executor finishes
+/// polling, the future can either be ready or not ready. In the former case, it's
+/// marked as COMPLETED. If it's latter, it's marked as IDLE again.
+///
+/// Note it's possible the future is notified during polling, in which case, executor
+/// should polling it when last polling is finished unless it returns ready.
+const NOTIFIED: u8 = 1;
+const IDLE: u8 = 2;
+const POLLING: u8 = 3;
+const COMPLETED: u8 = 4;
+
+/// Maintains the spawned future with state, so that it can be notified and polled efficiently.
+pub struct SpawnTask {
+ handle: UnsafeCell<Option<SpawnHandle>>,
+ state: AtomicU8,
+ kicker: Kicker,
+ queue: Arc<WorkQueue>,
+}
+
+/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
+///
+/// Sync is required by `ArcWake`.
+unsafe impl Sync for SpawnTask {}
+
+impl SpawnTask {
+ fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
+ SpawnTask {
+ handle: UnsafeCell::new(Some(s)),
+ state: AtomicU8::new(IDLE),
+ kicker,
+ queue,
+ }
+ }
+
+ /// Marks the state of this task to NOTIFIED.
+ ///
+ /// Returns true means the task was IDLE, needs to be scheduled.
+ fn mark_notified(&self) -> bool {
+ loop {
+ match self.state.compare_exchange_weak(
+ IDLE,
+ NOTIFIED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => return true,
+ Err(POLLING) => match self.state.compare_exchange_weak(
+ POLLING,
+ NOTIFIED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Err(IDLE) | Err(POLLING) => continue,
+ // If it succeeds, then executor will poll the future again;
+ // if it fails, then the future should be resolved. In both
+ // cases, no need to notify the future, hence return false.
+ _ => return false,
+ },
+ Err(IDLE) => continue,
+ _ => return false,
+ }
+ }
+ }
+}
+
+pub fn resolve(task: Arc<SpawnTask>, success: bool) {
+ // it should always be canceled for now.
+ assert!(success);
+ poll(task, true);
+}
+
+/// A custom Waker.
+///
+/// It will push the inner future to work_queue if it's notified on the
+/// same thread as inner cq.
+impl ArcWake for SpawnTask {
+ fn wake_by_ref(task: &Arc<Self>) {
+ if !task.mark_notified() {
+ return;
+ }
+
+ // It can lead to deadlock if poll the future immediately. So we need to
+ // defer the work instead.
+ if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
+ match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
+ // If the queue is shutdown, then the tag will be notified
+ // eventually. So just skip here.
+ Err(Error::QueueShutdown) => (),
+ Err(e) => panic!("unexpected error when canceling call: {:?}", e),
+ _ => (),
+ }
+ }
+ }
+}
+
+/// Work that should be deferred to be handled.
+///
+/// Sometimes a work can't be done immediately as it might lead
+/// to resource conflict, deadlock for example. So they will be
+/// pushed into a queue and handled when current work is done.
+pub struct UnfinishedWork(Arc<SpawnTask>);
+
+impl UnfinishedWork {
+ pub fn finish(self) {
+ resolve(self.0, true);
+ }
+}
+
+/// Poll the future.
+///
+/// `woken` indicates that if the cq is waken up by itself.
+fn poll(task: Arc<SpawnTask>, woken: bool) {
+ let mut init_state = if woken { NOTIFIED } else { IDLE };
+ // TODO: maybe we need to break the loop to avoid hunger.
+ loop {
+ match task
+ .state
+ .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
+ {
+ Ok(_) => {}
+ Err(COMPLETED) => return,
+ Err(s) => panic!("unexpected state {}", s),
+ }
+
+ let waker = waker_ref(&task);
+ let mut cx = Context::from_waker(&waker);
+
+ // L208 "lock"s state, hence it's safe to get a mutable reference.
+ match unsafe { &mut *task.handle.get() }
+ .as_mut()
+ .unwrap()
+ .as_mut()
+ .poll(&mut cx)
+ {
+ Poll::Ready(()) => {
+ task.state.store(COMPLETED, Ordering::Release);
+ unsafe { &mut *task.handle.get() }.take();
+ }
+ _ => {
+ match task.state.compare_exchange(
+ POLLING,
+ IDLE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => return,
+ Err(NOTIFIED) => {
+ init_state = NOTIFIED;
+ }
+ Err(s) => panic!("unexpected state {}", s),
+ }
+ }
+ }
+ }
+}
+
+/// An executor that drives a future in the gRPC poll thread, which
+/// can reduce thread context switching.
+pub(crate) struct Executor<'a> {
+ cq: &'a CompletionQueue,
+}
+
+impl<'a> Executor<'a> {
+ pub fn new(cq: &CompletionQueue) -> Executor<'_> {
+ Executor { cq }
+ }
+
+ pub fn cq(&self) -> &CompletionQueue {
+ self.cq
+ }
+
+ /// Spawn the future into inner poll loop.
+ ///
+ /// If you want to trace the future, you may need to create a sender/receiver
+ /// pair by yourself.
+ pub fn spawn<F>(&self, f: F, kicker: Kicker)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let s = Box::pin(f);
+ let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
+ poll(notify, false)
+ }
+}
diff --git a/src/task/mod.rs b/src/task/mod.rs
new file mode 100644
index 0000000..f151d0e
--- /dev/null
+++ b/src/task/mod.rs
@@ -0,0 +1,233 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+mod callback;
+mod executor;
+mod promise;
+
+use std::fmt::{self, Debug, Formatter};
+use std::pin::Pin;
+use std::sync::Arc;
+
+use futures::future::Future;
+use futures::task::{Context, Poll, Waker};
+use parking_lot::Mutex;
+
+use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
+use self::executor::SpawnTask;
+use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
+use crate::call::server::RequestContext;
+use crate::call::{BatchContext, Call, MessageReader};
+use crate::cq::CompletionQueue;
+use crate::error::{Error, Result};
+use crate::server::RequestCallContext;
+
+pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
+pub use self::promise::BatchType;
+
+/// A handle that is used to notify future that the task finishes.
+pub struct NotifyHandle<T> {
+ result: Option<Result<T>>,
+ waker: Option<Waker>,
+ stale: bool,
+}
+
+impl<T> NotifyHandle<T> {
+ fn new() -> NotifyHandle<T> {
+ NotifyHandle {
+ result: None,
+ waker: None,
+ stale: false,
+ }
+ }
+
+ /// Set the result and notify future if necessary.
+ fn set_result(&mut self, res: Result<T>) -> Option<Waker> {
+ self.result = Some(res);
+
+ self.waker.take()
+ }
+}
+
+type Inner<T> = Mutex<NotifyHandle<T>>;
+
+fn new_inner<T>() -> Arc<Inner<T>> {
+ Arc::new(Mutex::new(NotifyHandle::new()))
+}
+
+/// Get the future status without the need to poll.
+///
+/// If the future is polled successfully, this function will return None.
+/// Not implemented as method as it's only for internal usage.
+pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> {
+ let guard = f.inner.lock();
+ match guard.result {
+ None => Ok(()),
+ Some(Err(Error::RpcFailure(ref status))) => {
+ Err(Error::RpcFinished(Some(status.to_owned())))
+ }
+ Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)),
+ }
+}
+
+/// A future object for task that is scheduled to `CompletionQueue`.
+pub struct CqFuture<T> {
+ inner: Arc<Inner<T>>,
+}
+
+impl<T> CqFuture<T> {
+ fn new(inner: Arc<Inner<T>>) -> CqFuture<T> {
+ CqFuture { inner }
+ }
+}
+
+impl<T> Future for CqFuture<T> {
+ type Output = Result<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let mut guard = self.inner.lock();
+ if guard.stale {
+ panic!("Resolved future is not supposed to be polled again.");
+ }
+
+ if let Some(res) = guard.result.take() {
+ guard.stale = true;
+ return Poll::Ready(res);
+ }
+
+ // So the task has not been finished yet, add notification hook.
+ if guard.waker.is_none() || !guard.waker.as_ref().unwrap().will_wake(cx.waker()) {
+ guard.waker = Some(cx.waker().clone());
+ }
+
+ Poll::Pending
+ }
+}
+
+/// Future object for batch jobs.
+pub type BatchFuture = CqFuture<Option<MessageReader>>;
+
+/// A result holder for asynchronous execution.
+// This enum is going to be passed to FFI, so don't use trait or generic here.
+pub enum CallTag {
+ Batch(BatchPromise),
+ Request(RequestCallback),
+ UnaryRequest(UnaryRequestCallback),
+ Abort(Abort),
+ Shutdown(ShutdownPromise),
+ Spawn(Arc<SpawnTask>),
+}
+
+impl CallTag {
+ /// Generate a Future/CallTag pair for batch jobs.
+ pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) {
+ let inner = new_inner();
+ let batch = BatchPromise::new(ty, inner.clone());
+ (CqFuture::new(inner), CallTag::Batch(batch))
+ }
+
+ /// Generate a CallTag for request job. We don't have an eventloop
+ /// to pull the future, so just the tag is enough.
+ pub fn request(ctx: RequestCallContext) -> CallTag {
+ CallTag::Request(RequestCallback::new(ctx))
+ }
+
+ /// Generate a Future/CallTag pair for shutdown call.
+ pub fn shutdown_pair() -> (CqFuture<()>, CallTag) {
+ let inner = new_inner();
+ let shutdown = ShutdownPromise::new(inner.clone());
+ (CqFuture::new(inner), CallTag::Shutdown(shutdown))
+ }
+
+ /// Generate a CallTag for abort call before handler is called.
+ pub fn abort(call: Call) -> CallTag {
+ CallTag::Abort(Abort::new(call))
+ }
+
+ /// Generate a CallTag for unary request job.
+ pub fn unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag {
+ let cb = UnaryRequestCallback::new(ctx, rc);
+ CallTag::UnaryRequest(cb)
+ }
+
+ /// Get the batch context from result holder.
+ pub fn batch_ctx(&self) -> Option<&BatchContext> {
+ match *self {
+ CallTag::Batch(ref prom) => Some(prom.context()),
+ CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()),
+ CallTag::Abort(ref cb) => Some(cb.batch_ctx()),
+ _ => None,
+ }
+ }
+
+ /// Get the request context from the result holder.
+ pub fn request_ctx(&self) -> Option<&RequestContext> {
+ match *self {
+ CallTag::Request(ref prom) => Some(prom.context()),
+ CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()),
+ _ => None,
+ }
+ }
+
+ /// Resolve the CallTag with given status.
+ pub fn resolve(self, cq: &CompletionQueue, success: bool) {
+ match self {
+ CallTag::Batch(prom) => prom.resolve(success),
+ CallTag::Request(cb) => cb.resolve(cq, success),
+ CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
+ CallTag::Abort(_) => {}
+ CallTag::Shutdown(prom) => prom.resolve(success),
+ CallTag::Spawn(notify) => self::executor::resolve(notify, success),
+ }
+ }
+}
+
+impl Debug for CallTag {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ match *self {
+ CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({:?})", ctx),
+ CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
+ CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
+ CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
+ CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"),
+ CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::mpsc::*;
+ use std::sync::*;
+ use std::thread;
+
+ use super::*;
+ use crate::env::Environment;
+ use futures::executor::block_on;
+
+ #[test]
+ fn test_resolve() {
+ let env = Environment::new(1);
+
+ let (cq_f1, tag1) = CallTag::shutdown_pair();
+ let (cq_f2, tag2) = CallTag::shutdown_pair();
+ let (tx, rx) = mpsc::channel();
+
+ let handler = thread::spawn(move || {
+ tx.send(block_on(cq_f1)).unwrap();
+ tx.send(block_on(cq_f2)).unwrap();
+ });
+
+ assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+ tag1.resolve(&env.pick_cq(), true);
+ assert!(rx.recv().unwrap().is_ok());
+
+ assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+ tag2.resolve(&env.pick_cq(), false);
+ match rx.recv() {
+ Ok(Err(Error::ShutdownFailed)) => {}
+ res => panic!("expect shutdown failed, but got {:?}", res),
+ }
+
+ handler.join().unwrap();
+ }
+}
diff --git a/src/task/promise.rs b/src/task/promise.rs
new file mode 100644
index 0000000..02e9419
--- /dev/null
+++ b/src/task/promise.rs
@@ -0,0 +1,128 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::fmt::{self, Debug, Formatter};
+use std::sync::Arc;
+
+use super::Inner;
+use crate::call::{BatchContext, MessageReader, RpcStatusCode};
+use crate::error::Error;
+
+/// Batch job type.
+#[derive(PartialEq, Debug)]
+pub enum BatchType {
+ /// Finish without reading any message.
+ Finish,
+ /// Extract one message when finish.
+ Read,
+ /// Check the rpc code and then extract one message.
+ CheckRead,
+}
+
+/// A promise used to resolve batch jobs.
+pub struct Batch {
+ ty: BatchType,
+ ctx: BatchContext,
+ inner: Arc<Inner<Option<MessageReader>>>,
+}
+
+impl Batch {
+ pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
+ Batch {
+ ty,
+ ctx: BatchContext::new(),
+ inner,
+ }
+ }
+
+ pub fn context(&self) -> &BatchContext {
+ &self.ctx
+ }
+
+ fn read_one_msg(&mut self, success: bool) {
+ let task = {
+ let mut guard = self.inner.lock();
+ if success {
+ guard.set_result(Ok(self.ctx.recv_message()))
+ } else {
+ // rely on C core to handle the failed read (e.g. deliver approriate
+ // statusCode on the clientside).
+ guard.set_result(Ok(None))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+
+ fn finish_response(&mut self, succeed: bool) {
+ let task = {
+ let mut guard = self.inner.lock();
+ if succeed {
+ let status = self.ctx.rpc_status();
+ if status.status == RpcStatusCode::OK {
+ guard.set_result(Ok(None))
+ } else {
+ guard.set_result(Err(Error::RpcFailure(status)))
+ }
+ } else {
+ guard.set_result(Err(Error::RemoteStopped))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+
+ fn handle_unary_response(&mut self) {
+ let task = {
+ let mut guard = self.inner.lock();
+ let status = self.ctx.rpc_status();
+ if status.status == RpcStatusCode::OK {
+ guard.set_result(Ok(self.ctx.recv_message()))
+ } else {
+ guard.set_result(Err(Error::RpcFailure(status)))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+
+ pub fn resolve(mut self, success: bool) {
+ match self.ty {
+ BatchType::CheckRead => {
+ assert!(success);
+ self.handle_unary_response();
+ }
+ BatchType::Finish => {
+ self.finish_response(success);
+ }
+ BatchType::Read => {
+ self.read_one_msg(success);
+ }
+ }
+ }
+}
+
+impl Debug for Batch {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "Batch [{:?}]", self.ty)
+ }
+}
+
+/// A promise used to resolve async shutdown result.
+pub struct Shutdown {
+ inner: Arc<Inner<()>>,
+}
+
+impl Shutdown {
+ pub fn new(inner: Arc<Inner<()>>) -> Shutdown {
+ Shutdown { inner }
+ }
+
+ pub fn resolve(self, success: bool) {
+ let task = {
+ let mut guard = self.inner.lock();
+ if success {
+ guard.set_result(Ok(()))
+ } else {
+ guard.set_result(Err(Error::ShutdownFailed))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+}