Import grpcio 0.6.0

And add metadata files using the following command:
get_rust_pkg.py --add3prf -v grpcio-0.6.0 -o grpcio

Test: none
Change-Id: I53cc0feb5c9d24eacb62331b968cab4ec85f60a6
diff --git a/src/task/callback.rs b/src/task/callback.rs
new file mode 100644
index 0000000..2675469
--- /dev/null
+++ b/src/task/callback.rs
@@ -0,0 +1,84 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::call::server::{RequestContext, UnaryRequestContext};
+use crate::call::{BatchContext, Call};
+use crate::cq::CompletionQueue;
+use crate::server::{self, RequestCallContext};
+
+pub struct Request {
+    ctx: RequestContext,
+}
+
+impl Request {
+    pub fn new(rc: RequestCallContext) -> Request {
+        let ctx = RequestContext::new(rc);
+        Request { ctx }
+    }
+
+    pub fn context(&self) -> &RequestContext {
+        &self.ctx
+    }
+
+    pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
+        let mut rc = self.ctx.take_request_call_context().unwrap();
+        if !success {
+            server::request_call(rc, cq);
+            return;
+        }
+
+        match self.ctx.handle_stream_req(cq, &mut rc) {
+            Ok(_) => server::request_call(rc, cq),
+            Err(ctx) => ctx.handle_unary_req(rc, cq),
+        }
+    }
+}
+
+pub struct UnaryRequest {
+    ctx: UnaryRequestContext,
+}
+
+impl UnaryRequest {
+    pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequest {
+        let ctx = UnaryRequestContext::new(ctx, rc);
+        UnaryRequest { ctx }
+    }
+
+    pub fn batch_ctx(&self) -> &BatchContext {
+        self.ctx.batch_ctx()
+    }
+
+    pub fn request_ctx(&self) -> &RequestContext {
+        self.ctx.request_ctx()
+    }
+
+    pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
+        let mut rc = self.ctx.take_request_call_context().unwrap();
+        if !success {
+            server::request_call(rc, cq);
+            return;
+        }
+
+        let reader = self.ctx.batch_ctx_mut().recv_message();
+        self.ctx.handle(&mut rc, cq, reader);
+        server::request_call(rc, cq);
+    }
+}
+
+/// A callback to wait for status for the aborted rpc call to be sent.
+pub struct Abort {
+    ctx: BatchContext,
+    _call: Call,
+}
+
+impl Abort {
+    pub fn new(call: Call) -> Abort {
+        Abort {
+            ctx: BatchContext::new(),
+            _call: call,
+        }
+    }
+
+    pub fn batch_ctx(&self) -> &BatchContext {
+        &self.ctx
+    }
+}
diff --git a/src/task/executor.rs b/src/task/executor.rs
new file mode 100644
index 0000000..4a13905
--- /dev/null
+++ b/src/task/executor.rs
@@ -0,0 +1,256 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+//! gRPC C Core binds a call to a completion queue, all the related readiness
+//! will be forwarded to the completion queue. This module utilizes the mechanism
+//! and using `Kicker` to wake up completion queue.
+//!
+//! Apparently, to minimize context switch, it's better to bind the future to the
+//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
+
+use std::cell::UnsafeCell;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicU8, Ordering};
+use std::sync::Arc;
+
+use futures::future::Future;
+use futures::task::{waker_ref, ArcWake, Context, Poll};
+
+use super::CallTag;
+use crate::call::Call;
+use crate::cq::{CompletionQueue, WorkQueue};
+use crate::error::{Error, Result};
+use crate::grpc_sys::{self, grpc_call_error};
+
+/// A handle to a `Spawn`.
+/// Inner future is expected to be polled in the same thread as cq.
+type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
+
+/// `Kicker` wakes up the completion queue that the inner call binds to.
+pub(crate) struct Kicker {
+    call: Call,
+}
+
+impl Kicker {
+    pub fn from_call(call: Call) -> Kicker {
+        Kicker { call }
+    }
+
+    /// Wakes up its completion queue.
+    ///
+    /// `tag` will be popped by `grpc_completion_queue_next` in the future.
+    pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
+        let _ref = self.call.cq.borrow()?;
+        unsafe {
+            let ptr = Box::into_raw(tag);
+            let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
+            if status == grpc_call_error::GRPC_CALL_OK {
+                Ok(())
+            } else {
+                Err(Error::CallFailure(status))
+            }
+        }
+    }
+}
+
+unsafe impl Sync for Kicker {}
+
+impl Clone for Kicker {
+    fn clone(&self) -> Kicker {
+        // Bump call's reference count.
+        let call = unsafe {
+            grpc_sys::grpc_call_ref(self.call.call);
+            self.call.call
+        };
+        let cq = self.call.cq.clone();
+        Kicker {
+            call: Call { call, cq },
+        }
+    }
+}
+
+/// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
+/// it will be notified via task.wake(), and marked as NOTIFIED. When executor
+/// begins to poll the future, it's marked as POLLING. When the executor finishes
+/// polling, the future can either be ready or not ready. In the former case, it's
+/// marked as COMPLETED. If it's latter, it's marked as IDLE again.
+///
+/// Note it's possible the future is notified during polling, in which case, executor
+/// should polling it when last polling is finished unless it returns ready.
+const NOTIFIED: u8 = 1;
+const IDLE: u8 = 2;
+const POLLING: u8 = 3;
+const COMPLETED: u8 = 4;
+
+/// Maintains the spawned future with state, so that it can be notified and polled efficiently.
+pub struct SpawnTask {
+    handle: UnsafeCell<Option<SpawnHandle>>,
+    state: AtomicU8,
+    kicker: Kicker,
+    queue: Arc<WorkQueue>,
+}
+
+/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
+///
+/// Sync is required by `ArcWake`.
+unsafe impl Sync for SpawnTask {}
+
+impl SpawnTask {
+    fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
+        SpawnTask {
+            handle: UnsafeCell::new(Some(s)),
+            state: AtomicU8::new(IDLE),
+            kicker,
+            queue,
+        }
+    }
+
+    /// Marks the state of this task to NOTIFIED.
+    ///
+    /// Returns true means the task was IDLE, needs to be scheduled.
+    fn mark_notified(&self) -> bool {
+        loop {
+            match self.state.compare_exchange_weak(
+                IDLE,
+                NOTIFIED,
+                Ordering::AcqRel,
+                Ordering::Acquire,
+            ) {
+                Ok(_) => return true,
+                Err(POLLING) => match self.state.compare_exchange_weak(
+                    POLLING,
+                    NOTIFIED,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Err(IDLE) | Err(POLLING) => continue,
+                    // If it succeeds, then executor will poll the future again;
+                    // if it fails, then the future should be resolved. In both
+                    // cases, no need to notify the future, hence return false.
+                    _ => return false,
+                },
+                Err(IDLE) => continue,
+                _ => return false,
+            }
+        }
+    }
+}
+
+pub fn resolve(task: Arc<SpawnTask>, success: bool) {
+    // it should always be canceled for now.
+    assert!(success);
+    poll(task, true);
+}
+
+/// A custom Waker.
+///
+/// It will push the inner future to work_queue if it's notified on the
+/// same thread as inner cq.
+impl ArcWake for SpawnTask {
+    fn wake_by_ref(task: &Arc<Self>) {
+        if !task.mark_notified() {
+            return;
+        }
+
+        // It can lead to deadlock if poll the future immediately. So we need to
+        // defer the work instead.
+        if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
+            match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
+                // If the queue is shutdown, then the tag will be notified
+                // eventually. So just skip here.
+                Err(Error::QueueShutdown) => (),
+                Err(e) => panic!("unexpected error when canceling call: {:?}", e),
+                _ => (),
+            }
+        }
+    }
+}
+
+/// Work that should be deferred to be handled.
+///
+/// Sometimes a work can't be done immediately as it might lead
+/// to resource conflict, deadlock for example. So they will be
+/// pushed into a queue and handled when current work is done.
+pub struct UnfinishedWork(Arc<SpawnTask>);
+
+impl UnfinishedWork {
+    pub fn finish(self) {
+        resolve(self.0, true);
+    }
+}
+
+/// Poll the future.
+///
+/// `woken` indicates that if the cq is waken up by itself.
+fn poll(task: Arc<SpawnTask>, woken: bool) {
+    let mut init_state = if woken { NOTIFIED } else { IDLE };
+    // TODO: maybe we need to break the loop to avoid hunger.
+    loop {
+        match task
+            .state
+            .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
+        {
+            Ok(_) => {}
+            Err(COMPLETED) => return,
+            Err(s) => panic!("unexpected state {}", s),
+        }
+
+        let waker = waker_ref(&task);
+        let mut cx = Context::from_waker(&waker);
+
+        // L208 "lock"s state, hence it's safe to get a mutable reference.
+        match unsafe { &mut *task.handle.get() }
+            .as_mut()
+            .unwrap()
+            .as_mut()
+            .poll(&mut cx)
+        {
+            Poll::Ready(()) => {
+                task.state.store(COMPLETED, Ordering::Release);
+                unsafe { &mut *task.handle.get() }.take();
+            }
+            _ => {
+                match task.state.compare_exchange(
+                    POLLING,
+                    IDLE,
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => return,
+                    Err(NOTIFIED) => {
+                        init_state = NOTIFIED;
+                    }
+                    Err(s) => panic!("unexpected state {}", s),
+                }
+            }
+        }
+    }
+}
+
+/// An executor that drives a future in the gRPC poll thread, which
+/// can reduce thread context switching.
+pub(crate) struct Executor<'a> {
+    cq: &'a CompletionQueue,
+}
+
+impl<'a> Executor<'a> {
+    pub fn new(cq: &CompletionQueue) -> Executor<'_> {
+        Executor { cq }
+    }
+
+    pub fn cq(&self) -> &CompletionQueue {
+        self.cq
+    }
+
+    /// Spawn the future into inner poll loop.
+    ///
+    /// If you want to trace the future, you may need to create a sender/receiver
+    /// pair by yourself.
+    pub fn spawn<F>(&self, f: F, kicker: Kicker)
+    where
+        F: Future<Output = ()> + Send + 'static,
+    {
+        let s = Box::pin(f);
+        let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
+        poll(notify, false)
+    }
+}
diff --git a/src/task/mod.rs b/src/task/mod.rs
new file mode 100644
index 0000000..f151d0e
--- /dev/null
+++ b/src/task/mod.rs
@@ -0,0 +1,233 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+mod callback;
+mod executor;
+mod promise;
+
+use std::fmt::{self, Debug, Formatter};
+use std::pin::Pin;
+use std::sync::Arc;
+
+use futures::future::Future;
+use futures::task::{Context, Poll, Waker};
+use parking_lot::Mutex;
+
+use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
+use self::executor::SpawnTask;
+use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
+use crate::call::server::RequestContext;
+use crate::call::{BatchContext, Call, MessageReader};
+use crate::cq::CompletionQueue;
+use crate::error::{Error, Result};
+use crate::server::RequestCallContext;
+
+pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
+pub use self::promise::BatchType;
+
+/// A handle that is used to notify future that the task finishes.
+pub struct NotifyHandle<T> {
+    result: Option<Result<T>>,
+    waker: Option<Waker>,
+    stale: bool,
+}
+
+impl<T> NotifyHandle<T> {
+    fn new() -> NotifyHandle<T> {
+        NotifyHandle {
+            result: None,
+            waker: None,
+            stale: false,
+        }
+    }
+
+    /// Set the result and notify future if necessary.
+    fn set_result(&mut self, res: Result<T>) -> Option<Waker> {
+        self.result = Some(res);
+
+        self.waker.take()
+    }
+}
+
+type Inner<T> = Mutex<NotifyHandle<T>>;
+
+fn new_inner<T>() -> Arc<Inner<T>> {
+    Arc::new(Mutex::new(NotifyHandle::new()))
+}
+
+/// Get the future status without the need to poll.
+///
+/// If the future is polled successfully, this function will return None.
+/// Not implemented as method as it's only for internal usage.
+pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> {
+    let guard = f.inner.lock();
+    match guard.result {
+        None => Ok(()),
+        Some(Err(Error::RpcFailure(ref status))) => {
+            Err(Error::RpcFinished(Some(status.to_owned())))
+        }
+        Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)),
+    }
+}
+
+/// A future object for task that is scheduled to `CompletionQueue`.
+pub struct CqFuture<T> {
+    inner: Arc<Inner<T>>,
+}
+
+impl<T> CqFuture<T> {
+    fn new(inner: Arc<Inner<T>>) -> CqFuture<T> {
+        CqFuture { inner }
+    }
+}
+
+impl<T> Future for CqFuture<T> {
+    type Output = Result<T>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        let mut guard = self.inner.lock();
+        if guard.stale {
+            panic!("Resolved future is not supposed to be polled again.");
+        }
+
+        if let Some(res) = guard.result.take() {
+            guard.stale = true;
+            return Poll::Ready(res);
+        }
+
+        // So the task has not been finished yet, add notification hook.
+        if guard.waker.is_none() || !guard.waker.as_ref().unwrap().will_wake(cx.waker()) {
+            guard.waker = Some(cx.waker().clone());
+        }
+
+        Poll::Pending
+    }
+}
+
+/// Future object for batch jobs.
+pub type BatchFuture = CqFuture<Option<MessageReader>>;
+
+/// A result holder for asynchronous execution.
+// This enum is going to be passed to FFI, so don't use trait or generic here.
+pub enum CallTag {
+    Batch(BatchPromise),
+    Request(RequestCallback),
+    UnaryRequest(UnaryRequestCallback),
+    Abort(Abort),
+    Shutdown(ShutdownPromise),
+    Spawn(Arc<SpawnTask>),
+}
+
+impl CallTag {
+    /// Generate a Future/CallTag pair for batch jobs.
+    pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) {
+        let inner = new_inner();
+        let batch = BatchPromise::new(ty, inner.clone());
+        (CqFuture::new(inner), CallTag::Batch(batch))
+    }
+
+    /// Generate a CallTag for request job. We don't have an eventloop
+    /// to pull the future, so just the tag is enough.
+    pub fn request(ctx: RequestCallContext) -> CallTag {
+        CallTag::Request(RequestCallback::new(ctx))
+    }
+
+    /// Generate a Future/CallTag pair for shutdown call.
+    pub fn shutdown_pair() -> (CqFuture<()>, CallTag) {
+        let inner = new_inner();
+        let shutdown = ShutdownPromise::new(inner.clone());
+        (CqFuture::new(inner), CallTag::Shutdown(shutdown))
+    }
+
+    /// Generate a CallTag for abort call before handler is called.
+    pub fn abort(call: Call) -> CallTag {
+        CallTag::Abort(Abort::new(call))
+    }
+
+    /// Generate a CallTag for unary request job.
+    pub fn unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag {
+        let cb = UnaryRequestCallback::new(ctx, rc);
+        CallTag::UnaryRequest(cb)
+    }
+
+    /// Get the batch context from result holder.
+    pub fn batch_ctx(&self) -> Option<&BatchContext> {
+        match *self {
+            CallTag::Batch(ref prom) => Some(prom.context()),
+            CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()),
+            CallTag::Abort(ref cb) => Some(cb.batch_ctx()),
+            _ => None,
+        }
+    }
+
+    /// Get the request context from the result holder.
+    pub fn request_ctx(&self) -> Option<&RequestContext> {
+        match *self {
+            CallTag::Request(ref prom) => Some(prom.context()),
+            CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()),
+            _ => None,
+        }
+    }
+
+    /// Resolve the CallTag with given status.
+    pub fn resolve(self, cq: &CompletionQueue, success: bool) {
+        match self {
+            CallTag::Batch(prom) => prom.resolve(success),
+            CallTag::Request(cb) => cb.resolve(cq, success),
+            CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
+            CallTag::Abort(_) => {}
+            CallTag::Shutdown(prom) => prom.resolve(success),
+            CallTag::Spawn(notify) => self::executor::resolve(notify, success),
+        }
+    }
+}
+
+impl Debug for CallTag {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        match *self {
+            CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({:?})", ctx),
+            CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
+            CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
+            CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
+            CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"),
+            CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::mpsc::*;
+    use std::sync::*;
+    use std::thread;
+
+    use super::*;
+    use crate::env::Environment;
+    use futures::executor::block_on;
+
+    #[test]
+    fn test_resolve() {
+        let env = Environment::new(1);
+
+        let (cq_f1, tag1) = CallTag::shutdown_pair();
+        let (cq_f2, tag2) = CallTag::shutdown_pair();
+        let (tx, rx) = mpsc::channel();
+
+        let handler = thread::spawn(move || {
+            tx.send(block_on(cq_f1)).unwrap();
+            tx.send(block_on(cq_f2)).unwrap();
+        });
+
+        assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+        tag1.resolve(&env.pick_cq(), true);
+        assert!(rx.recv().unwrap().is_ok());
+
+        assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+        tag2.resolve(&env.pick_cq(), false);
+        match rx.recv() {
+            Ok(Err(Error::ShutdownFailed)) => {}
+            res => panic!("expect shutdown failed, but got {:?}", res),
+        }
+
+        handler.join().unwrap();
+    }
+}
diff --git a/src/task/promise.rs b/src/task/promise.rs
new file mode 100644
index 0000000..02e9419
--- /dev/null
+++ b/src/task/promise.rs
@@ -0,0 +1,128 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::fmt::{self, Debug, Formatter};
+use std::sync::Arc;
+
+use super::Inner;
+use crate::call::{BatchContext, MessageReader, RpcStatusCode};
+use crate::error::Error;
+
+/// Batch job type.
+#[derive(PartialEq, Debug)]
+pub enum BatchType {
+    /// Finish without reading any message.
+    Finish,
+    /// Extract one message when finish.
+    Read,
+    /// Check the rpc code and then extract one message.
+    CheckRead,
+}
+
+/// A promise used to resolve batch jobs.
+pub struct Batch {
+    ty: BatchType,
+    ctx: BatchContext,
+    inner: Arc<Inner<Option<MessageReader>>>,
+}
+
+impl Batch {
+    pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
+        Batch {
+            ty,
+            ctx: BatchContext::new(),
+            inner,
+        }
+    }
+
+    pub fn context(&self) -> &BatchContext {
+        &self.ctx
+    }
+
+    fn read_one_msg(&mut self, success: bool) {
+        let task = {
+            let mut guard = self.inner.lock();
+            if success {
+                guard.set_result(Ok(self.ctx.recv_message()))
+            } else {
+                // rely on C core to handle the failed read (e.g. deliver approriate
+                // statusCode on the clientside).
+                guard.set_result(Ok(None))
+            }
+        };
+        task.map(|t| t.wake());
+    }
+
+    fn finish_response(&mut self, succeed: bool) {
+        let task = {
+            let mut guard = self.inner.lock();
+            if succeed {
+                let status = self.ctx.rpc_status();
+                if status.status == RpcStatusCode::OK {
+                    guard.set_result(Ok(None))
+                } else {
+                    guard.set_result(Err(Error::RpcFailure(status)))
+                }
+            } else {
+                guard.set_result(Err(Error::RemoteStopped))
+            }
+        };
+        task.map(|t| t.wake());
+    }
+
+    fn handle_unary_response(&mut self) {
+        let task = {
+            let mut guard = self.inner.lock();
+            let status = self.ctx.rpc_status();
+            if status.status == RpcStatusCode::OK {
+                guard.set_result(Ok(self.ctx.recv_message()))
+            } else {
+                guard.set_result(Err(Error::RpcFailure(status)))
+            }
+        };
+        task.map(|t| t.wake());
+    }
+
+    pub fn resolve(mut self, success: bool) {
+        match self.ty {
+            BatchType::CheckRead => {
+                assert!(success);
+                self.handle_unary_response();
+            }
+            BatchType::Finish => {
+                self.finish_response(success);
+            }
+            BatchType::Read => {
+                self.read_one_msg(success);
+            }
+        }
+    }
+}
+
+impl Debug for Batch {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "Batch [{:?}]", self.ty)
+    }
+}
+
+/// A promise used to resolve async shutdown result.
+pub struct Shutdown {
+    inner: Arc<Inner<()>>,
+}
+
+impl Shutdown {
+    pub fn new(inner: Arc<Inner<()>>) -> Shutdown {
+        Shutdown { inner }
+    }
+
+    pub fn resolve(self, success: bool) {
+        let task = {
+            let mut guard = self.inner.lock();
+            if success {
+                guard.set_result(Ok(()))
+            } else {
+                guard.set_result(Err(Error::ShutdownFailed))
+            }
+        };
+        task.map(|t| t.wake());
+    }
+}