blob: f151d0ea7b171b3e44c541c5d6f84d168dd451a8 [file] [log] [blame]
Jeff Vander Stoep761577d2020-10-14 15:21:00 +02001// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3mod callback;
4mod executor;
5mod promise;
6
7use std::fmt::{self, Debug, Formatter};
8use std::pin::Pin;
9use std::sync::Arc;
10
11use futures::future::Future;
12use futures::task::{Context, Poll, Waker};
13use parking_lot::Mutex;
14
15use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
16use self::executor::SpawnTask;
17use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
18use crate::call::server::RequestContext;
19use crate::call::{BatchContext, Call, MessageReader};
20use crate::cq::CompletionQueue;
21use crate::error::{Error, Result};
22use crate::server::RequestCallContext;
23
24pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
25pub use self::promise::BatchType;
26
27/// A handle that is used to notify future that the task finishes.
28pub struct NotifyHandle<T> {
29 result: Option<Result<T>>,
30 waker: Option<Waker>,
31 stale: bool,
32}
33
34impl<T> NotifyHandle<T> {
35 fn new() -> NotifyHandle<T> {
36 NotifyHandle {
37 result: None,
38 waker: None,
39 stale: false,
40 }
41 }
42
43 /// Set the result and notify future if necessary.
44 fn set_result(&mut self, res: Result<T>) -> Option<Waker> {
45 self.result = Some(res);
46
47 self.waker.take()
48 }
49}
50
51type Inner<T> = Mutex<NotifyHandle<T>>;
52
53fn new_inner<T>() -> Arc<Inner<T>> {
54 Arc::new(Mutex::new(NotifyHandle::new()))
55}
56
57/// Get the future status without the need to poll.
58///
59/// If the future is polled successfully, this function will return None.
60/// Not implemented as method as it's only for internal usage.
61pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> {
62 let guard = f.inner.lock();
63 match guard.result {
64 None => Ok(()),
65 Some(Err(Error::RpcFailure(ref status))) => {
66 Err(Error::RpcFinished(Some(status.to_owned())))
67 }
68 Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)),
69 }
70}
71
72/// A future object for task that is scheduled to `CompletionQueue`.
73pub struct CqFuture<T> {
74 inner: Arc<Inner<T>>,
75}
76
77impl<T> CqFuture<T> {
78 fn new(inner: Arc<Inner<T>>) -> CqFuture<T> {
79 CqFuture { inner }
80 }
81}
82
83impl<T> Future for CqFuture<T> {
84 type Output = Result<T>;
85
86 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
87 let mut guard = self.inner.lock();
88 if guard.stale {
89 panic!("Resolved future is not supposed to be polled again.");
90 }
91
92 if let Some(res) = guard.result.take() {
93 guard.stale = true;
94 return Poll::Ready(res);
95 }
96
97 // So the task has not been finished yet, add notification hook.
98 if guard.waker.is_none() || !guard.waker.as_ref().unwrap().will_wake(cx.waker()) {
99 guard.waker = Some(cx.waker().clone());
100 }
101
102 Poll::Pending
103 }
104}
105
106/// Future object for batch jobs.
107pub type BatchFuture = CqFuture<Option<MessageReader>>;
108
109/// A result holder for asynchronous execution.
110// This enum is going to be passed to FFI, so don't use trait or generic here.
111pub enum CallTag {
112 Batch(BatchPromise),
113 Request(RequestCallback),
114 UnaryRequest(UnaryRequestCallback),
115 Abort(Abort),
116 Shutdown(ShutdownPromise),
117 Spawn(Arc<SpawnTask>),
118}
119
120impl CallTag {
121 /// Generate a Future/CallTag pair for batch jobs.
122 pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) {
123 let inner = new_inner();
124 let batch = BatchPromise::new(ty, inner.clone());
125 (CqFuture::new(inner), CallTag::Batch(batch))
126 }
127
128 /// Generate a CallTag for request job. We don't have an eventloop
129 /// to pull the future, so just the tag is enough.
130 pub fn request(ctx: RequestCallContext) -> CallTag {
131 CallTag::Request(RequestCallback::new(ctx))
132 }
133
134 /// Generate a Future/CallTag pair for shutdown call.
135 pub fn shutdown_pair() -> (CqFuture<()>, CallTag) {
136 let inner = new_inner();
137 let shutdown = ShutdownPromise::new(inner.clone());
138 (CqFuture::new(inner), CallTag::Shutdown(shutdown))
139 }
140
141 /// Generate a CallTag for abort call before handler is called.
142 pub fn abort(call: Call) -> CallTag {
143 CallTag::Abort(Abort::new(call))
144 }
145
146 /// Generate a CallTag for unary request job.
147 pub fn unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag {
148 let cb = UnaryRequestCallback::new(ctx, rc);
149 CallTag::UnaryRequest(cb)
150 }
151
152 /// Get the batch context from result holder.
153 pub fn batch_ctx(&self) -> Option<&BatchContext> {
154 match *self {
155 CallTag::Batch(ref prom) => Some(prom.context()),
156 CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()),
157 CallTag::Abort(ref cb) => Some(cb.batch_ctx()),
158 _ => None,
159 }
160 }
161
162 /// Get the request context from the result holder.
163 pub fn request_ctx(&self) -> Option<&RequestContext> {
164 match *self {
165 CallTag::Request(ref prom) => Some(prom.context()),
166 CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()),
167 _ => None,
168 }
169 }
170
171 /// Resolve the CallTag with given status.
172 pub fn resolve(self, cq: &CompletionQueue, success: bool) {
173 match self {
174 CallTag::Batch(prom) => prom.resolve(success),
175 CallTag::Request(cb) => cb.resolve(cq, success),
176 CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
177 CallTag::Abort(_) => {}
178 CallTag::Shutdown(prom) => prom.resolve(success),
179 CallTag::Spawn(notify) => self::executor::resolve(notify, success),
180 }
181 }
182}
183
184impl Debug for CallTag {
185 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
186 match *self {
187 CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({:?})", ctx),
188 CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
189 CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
190 CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
191 CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"),
192 CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
193 }
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use std::sync::mpsc::*;
200 use std::sync::*;
201 use std::thread;
202
203 use super::*;
204 use crate::env::Environment;
205 use futures::executor::block_on;
206
207 #[test]
208 fn test_resolve() {
209 let env = Environment::new(1);
210
211 let (cq_f1, tag1) = CallTag::shutdown_pair();
212 let (cq_f2, tag2) = CallTag::shutdown_pair();
213 let (tx, rx) = mpsc::channel();
214
215 let handler = thread::spawn(move || {
216 tx.send(block_on(cq_f1)).unwrap();
217 tx.send(block_on(cq_f2)).unwrap();
218 });
219
220 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
221 tag1.resolve(&env.pick_cq(), true);
222 assert!(rx.recv().unwrap().is_ok());
223
224 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
225 tag2.resolve(&env.pick_cq(), false);
226 match rx.recv() {
227 Ok(Err(Error::ShutdownFailed)) => {}
228 res => panic!("expect shutdown failed, but got {:?}", res),
229 }
230
231 handler.join().unwrap();
232 }
233}