Jeff Vander Stoep | 761577d | 2020-10-14 15:21:00 +0200 | [diff] [blame^] | 1 | // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. |
| 2 | |
| 3 | mod callback; |
| 4 | mod executor; |
| 5 | mod promise; |
| 6 | |
| 7 | use std::fmt::{self, Debug, Formatter}; |
| 8 | use std::pin::Pin; |
| 9 | use std::sync::Arc; |
| 10 | |
| 11 | use futures::future::Future; |
| 12 | use futures::task::{Context, Poll, Waker}; |
| 13 | use parking_lot::Mutex; |
| 14 | |
| 15 | use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback}; |
| 16 | use self::executor::SpawnTask; |
| 17 | use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise}; |
| 18 | use crate::call::server::RequestContext; |
| 19 | use crate::call::{BatchContext, Call, MessageReader}; |
| 20 | use crate::cq::CompletionQueue; |
| 21 | use crate::error::{Error, Result}; |
| 22 | use crate::server::RequestCallContext; |
| 23 | |
| 24 | pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork}; |
| 25 | pub use self::promise::BatchType; |
| 26 | |
| 27 | /// A handle that is used to notify future that the task finishes. |
| 28 | pub struct NotifyHandle<T> { |
| 29 | result: Option<Result<T>>, |
| 30 | waker: Option<Waker>, |
| 31 | stale: bool, |
| 32 | } |
| 33 | |
| 34 | impl<T> NotifyHandle<T> { |
| 35 | fn new() -> NotifyHandle<T> { |
| 36 | NotifyHandle { |
| 37 | result: None, |
| 38 | waker: None, |
| 39 | stale: false, |
| 40 | } |
| 41 | } |
| 42 | |
| 43 | /// Set the result and notify future if necessary. |
| 44 | fn set_result(&mut self, res: Result<T>) -> Option<Waker> { |
| 45 | self.result = Some(res); |
| 46 | |
| 47 | self.waker.take() |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | type Inner<T> = Mutex<NotifyHandle<T>>; |
| 52 | |
| 53 | fn new_inner<T>() -> Arc<Inner<T>> { |
| 54 | Arc::new(Mutex::new(NotifyHandle::new())) |
| 55 | } |
| 56 | |
| 57 | /// Get the future status without the need to poll. |
| 58 | /// |
| 59 | /// If the future is polled successfully, this function will return None. |
| 60 | /// Not implemented as method as it's only for internal usage. |
| 61 | pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> { |
| 62 | let guard = f.inner.lock(); |
| 63 | match guard.result { |
| 64 | None => Ok(()), |
| 65 | Some(Err(Error::RpcFailure(ref status))) => { |
| 66 | Err(Error::RpcFinished(Some(status.to_owned()))) |
| 67 | } |
| 68 | Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)), |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | /// A future object for task that is scheduled to `CompletionQueue`. |
| 73 | pub struct CqFuture<T> { |
| 74 | inner: Arc<Inner<T>>, |
| 75 | } |
| 76 | |
| 77 | impl<T> CqFuture<T> { |
| 78 | fn new(inner: Arc<Inner<T>>) -> CqFuture<T> { |
| 79 | CqFuture { inner } |
| 80 | } |
| 81 | } |
| 82 | |
| 83 | impl<T> Future for CqFuture<T> { |
| 84 | type Output = Result<T>; |
| 85 | |
| 86 | fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { |
| 87 | let mut guard = self.inner.lock(); |
| 88 | if guard.stale { |
| 89 | panic!("Resolved future is not supposed to be polled again."); |
| 90 | } |
| 91 | |
| 92 | if let Some(res) = guard.result.take() { |
| 93 | guard.stale = true; |
| 94 | return Poll::Ready(res); |
| 95 | } |
| 96 | |
| 97 | // So the task has not been finished yet, add notification hook. |
| 98 | if guard.waker.is_none() || !guard.waker.as_ref().unwrap().will_wake(cx.waker()) { |
| 99 | guard.waker = Some(cx.waker().clone()); |
| 100 | } |
| 101 | |
| 102 | Poll::Pending |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | /// Future object for batch jobs. |
| 107 | pub type BatchFuture = CqFuture<Option<MessageReader>>; |
| 108 | |
| 109 | /// A result holder for asynchronous execution. |
| 110 | // This enum is going to be passed to FFI, so don't use trait or generic here. |
| 111 | pub enum CallTag { |
| 112 | Batch(BatchPromise), |
| 113 | Request(RequestCallback), |
| 114 | UnaryRequest(UnaryRequestCallback), |
| 115 | Abort(Abort), |
| 116 | Shutdown(ShutdownPromise), |
| 117 | Spawn(Arc<SpawnTask>), |
| 118 | } |
| 119 | |
| 120 | impl CallTag { |
| 121 | /// Generate a Future/CallTag pair for batch jobs. |
| 122 | pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) { |
| 123 | let inner = new_inner(); |
| 124 | let batch = BatchPromise::new(ty, inner.clone()); |
| 125 | (CqFuture::new(inner), CallTag::Batch(batch)) |
| 126 | } |
| 127 | |
| 128 | /// Generate a CallTag for request job. We don't have an eventloop |
| 129 | /// to pull the future, so just the tag is enough. |
| 130 | pub fn request(ctx: RequestCallContext) -> CallTag { |
| 131 | CallTag::Request(RequestCallback::new(ctx)) |
| 132 | } |
| 133 | |
| 134 | /// Generate a Future/CallTag pair for shutdown call. |
| 135 | pub fn shutdown_pair() -> (CqFuture<()>, CallTag) { |
| 136 | let inner = new_inner(); |
| 137 | let shutdown = ShutdownPromise::new(inner.clone()); |
| 138 | (CqFuture::new(inner), CallTag::Shutdown(shutdown)) |
| 139 | } |
| 140 | |
| 141 | /// Generate a CallTag for abort call before handler is called. |
| 142 | pub fn abort(call: Call) -> CallTag { |
| 143 | CallTag::Abort(Abort::new(call)) |
| 144 | } |
| 145 | |
| 146 | /// Generate a CallTag for unary request job. |
| 147 | pub fn unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag { |
| 148 | let cb = UnaryRequestCallback::new(ctx, rc); |
| 149 | CallTag::UnaryRequest(cb) |
| 150 | } |
| 151 | |
| 152 | /// Get the batch context from result holder. |
| 153 | pub fn batch_ctx(&self) -> Option<&BatchContext> { |
| 154 | match *self { |
| 155 | CallTag::Batch(ref prom) => Some(prom.context()), |
| 156 | CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()), |
| 157 | CallTag::Abort(ref cb) => Some(cb.batch_ctx()), |
| 158 | _ => None, |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | /// Get the request context from the result holder. |
| 163 | pub fn request_ctx(&self) -> Option<&RequestContext> { |
| 164 | match *self { |
| 165 | CallTag::Request(ref prom) => Some(prom.context()), |
| 166 | CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()), |
| 167 | _ => None, |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | /// Resolve the CallTag with given status. |
| 172 | pub fn resolve(self, cq: &CompletionQueue, success: bool) { |
| 173 | match self { |
| 174 | CallTag::Batch(prom) => prom.resolve(success), |
| 175 | CallTag::Request(cb) => cb.resolve(cq, success), |
| 176 | CallTag::UnaryRequest(cb) => cb.resolve(cq, success), |
| 177 | CallTag::Abort(_) => {} |
| 178 | CallTag::Shutdown(prom) => prom.resolve(success), |
| 179 | CallTag::Spawn(notify) => self::executor::resolve(notify, success), |
| 180 | } |
| 181 | } |
| 182 | } |
| 183 | |
| 184 | impl Debug for CallTag { |
| 185 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
| 186 | match *self { |
| 187 | CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({:?})", ctx), |
| 188 | CallTag::Request(_) => write!(f, "CallTag::Request(..)"), |
| 189 | CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"), |
| 190 | CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"), |
| 191 | CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"), |
| 192 | CallTag::Spawn(_) => write!(f, "CallTag::Spawn"), |
| 193 | } |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | #[cfg(test)] |
| 198 | mod tests { |
| 199 | use std::sync::mpsc::*; |
| 200 | use std::sync::*; |
| 201 | use std::thread; |
| 202 | |
| 203 | use super::*; |
| 204 | use crate::env::Environment; |
| 205 | use futures::executor::block_on; |
| 206 | |
| 207 | #[test] |
| 208 | fn test_resolve() { |
| 209 | let env = Environment::new(1); |
| 210 | |
| 211 | let (cq_f1, tag1) = CallTag::shutdown_pair(); |
| 212 | let (cq_f2, tag2) = CallTag::shutdown_pair(); |
| 213 | let (tx, rx) = mpsc::channel(); |
| 214 | |
| 215 | let handler = thread::spawn(move || { |
| 216 | tx.send(block_on(cq_f1)).unwrap(); |
| 217 | tx.send(block_on(cq_f2)).unwrap(); |
| 218 | }); |
| 219 | |
| 220 | assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); |
| 221 | tag1.resolve(&env.pick_cq(), true); |
| 222 | assert!(rx.recv().unwrap().is_ok()); |
| 223 | |
| 224 | assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); |
| 225 | tag2.resolve(&env.pick_cq(), false); |
| 226 | match rx.recv() { |
| 227 | Ok(Err(Error::ShutdownFailed)) => {} |
| 228 | res => panic!("expect shutdown failed, but got {:?}", res), |
| 229 | } |
| 230 | |
| 231 | handler.join().unwrap(); |
| 232 | } |
| 233 | } |