| Jeff Vander Stoep | 761577d | 2020-10-14 15:21:00 +0200 | [diff] [blame] | 1 | // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. | 
 | 2 |  | 
 | 3 | use std::ffi::CStr; | 
 | 4 | use std::pin::Pin; | 
 | 5 | use std::sync::Arc; | 
 | 6 | use std::{result, slice}; | 
 | 7 |  | 
 | 8 | use crate::grpc_sys::{ | 
 | 9 |     self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context, | 
 | 10 | }; | 
 | 11 | use futures::future::Future; | 
 | 12 | use futures::ready; | 
 | 13 | use futures::sink::Sink; | 
 | 14 | use futures::stream::Stream; | 
 | 15 | use futures::task::{Context, Poll}; | 
 | 16 | use parking_lot::Mutex; | 
 | 17 |  | 
 | 18 | use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags}; | 
 | 19 | use crate::auth_context::AuthContext; | 
 | 20 | use crate::call::{ | 
 | 21 |     BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase, | 
 | 22 | }; | 
 | 23 | use crate::codec::{DeserializeFn, SerializeFn}; | 
 | 24 | use crate::cq::CompletionQueue; | 
 | 25 | use crate::error::{Error, Result}; | 
 | 26 | use crate::metadata::Metadata; | 
 | 27 | use crate::server::{BoxHandler, RequestCallContext}; | 
 | 28 | use crate::task::{BatchFuture, CallTag, Executor, Kicker}; | 
 | 29 |  | 
 | 30 | pub struct Deadline { | 
 | 31 |     spec: gpr_timespec, | 
 | 32 | } | 
 | 33 |  | 
 | 34 | impl Deadline { | 
 | 35 |     fn new(spec: gpr_timespec) -> Deadline { | 
 | 36 |         let realtime_spec = | 
 | 37 |             unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) }; | 
 | 38 |  | 
 | 39 |         Deadline { | 
 | 40 |             spec: realtime_spec, | 
 | 41 |         } | 
 | 42 |     } | 
 | 43 |  | 
 | 44 |     pub fn exceeded(&self) -> bool { | 
 | 45 |         unsafe { | 
 | 46 |             let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME); | 
 | 47 |             grpc_sys::gpr_time_cmp(now, self.spec) >= 0 | 
 | 48 |         } | 
 | 49 |     } | 
 | 50 | } | 
 | 51 |  | 
 | 52 | /// Context for accepting a request. | 
 | 53 | pub struct RequestContext { | 
 | 54 |     ctx: *mut grpcwrap_request_call_context, | 
 | 55 |     request_call: Option<RequestCallContext>, | 
 | 56 | } | 
 | 57 |  | 
 | 58 | impl RequestContext { | 
 | 59 |     pub fn new(rc: RequestCallContext) -> RequestContext { | 
 | 60 |         let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() }; | 
 | 61 |  | 
 | 62 |         RequestContext { | 
 | 63 |             ctx, | 
 | 64 |             request_call: Some(rc), | 
 | 65 |         } | 
 | 66 |     } | 
 | 67 |  | 
 | 68 |     /// Try to accept a client side streaming request. | 
 | 69 |     /// | 
 | 70 |     /// Return error if the request is a client side unary request. | 
 | 71 |     pub fn handle_stream_req( | 
 | 72 |         self, | 
 | 73 |         cq: &CompletionQueue, | 
 | 74 |         rc: &mut RequestCallContext, | 
 | 75 |     ) -> result::Result<(), Self> { | 
 | 76 |         let handler = unsafe { rc.get_handler(self.method()) }; | 
 | 77 |         match handler { | 
 | 78 |             Some(handler) => match handler.method_type() { | 
 | 79 |                 MethodType::Unary | MethodType::ServerStreaming => Err(self), | 
 | 80 |                 _ => { | 
 | 81 |                     execute(self, cq, None, handler); | 
 | 82 |                     Ok(()) | 
 | 83 |                 } | 
 | 84 |             }, | 
 | 85 |             None => { | 
 | 86 |                 execute_unimplemented(self, cq.clone()); | 
 | 87 |                 Ok(()) | 
 | 88 |             } | 
 | 89 |         } | 
 | 90 |     } | 
 | 91 |  | 
 | 92 |     /// Accept a client side unary request. | 
 | 93 |     /// | 
 | 94 |     /// This method should be called after `handle_stream_req`. When handling | 
 | 95 |     /// client side unary request, handler will only be called after the unary | 
 | 96 |     /// request is received. | 
 | 97 |     pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) { | 
 | 98 |         // fetch message before calling callback. | 
 | 99 |         let tag = Box::new(CallTag::unary_request(self, rc)); | 
 | 100 |         let batch_ctx = tag.batch_ctx().unwrap().as_ptr(); | 
 | 101 |         let request_ctx = tag.request_ctx().unwrap().as_ptr(); | 
 | 102 |         let tag_ptr = Box::into_raw(tag); | 
 | 103 |         unsafe { | 
 | 104 |             let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx); | 
 | 105 |             let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _); | 
 | 106 |             if code != grpc_call_error::GRPC_CALL_OK { | 
 | 107 |                 Box::from_raw(tag_ptr); | 
 | 108 |                 // it should not failed. | 
 | 109 |                 panic!("try to receive message fail: {:?}", code); | 
 | 110 |             } | 
 | 111 |         } | 
 | 112 |     } | 
 | 113 |  | 
 | 114 |     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> { | 
 | 115 |         self.request_call.take() | 
 | 116 |     } | 
 | 117 |  | 
 | 118 |     pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context { | 
 | 119 |         self.ctx | 
 | 120 |     } | 
 | 121 |  | 
 | 122 |     fn call(&self, cq: CompletionQueue) -> Call { | 
 | 123 |         unsafe { | 
 | 124 |             // It is okay to use a mutable pointer on a immutable reference, `self`, | 
 | 125 |             // because grpcwrap_request_call_context_ref_call is thread-safe. | 
 | 126 |             let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx); | 
 | 127 |             assert!(!call.is_null()); | 
 | 128 |             Call::from_raw(call, cq) | 
 | 129 |         } | 
 | 130 |     } | 
 | 131 |  | 
 | 132 |     pub fn method(&self) -> &[u8] { | 
 | 133 |         let mut len = 0; | 
 | 134 |         let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) }; | 
 | 135 |  | 
 | 136 |         unsafe { slice::from_raw_parts(method as _, len) } | 
 | 137 |     } | 
 | 138 |  | 
 | 139 |     fn host(&self) -> &[u8] { | 
 | 140 |         let mut len = 0; | 
 | 141 |         let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) }; | 
 | 142 |  | 
 | 143 |         unsafe { slice::from_raw_parts(host as _, len) } | 
 | 144 |     } | 
 | 145 |  | 
 | 146 |     fn deadline(&self) -> Deadline { | 
 | 147 |         let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) }; | 
 | 148 |  | 
 | 149 |         Deadline::new(t) | 
 | 150 |     } | 
 | 151 |  | 
 | 152 |     fn metadata(&self) -> &Metadata { | 
 | 153 |         unsafe { | 
 | 154 |             let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx); | 
 | 155 |             let arr_ptr: *const Metadata = ptr as _; | 
 | 156 |             &*arr_ptr | 
 | 157 |         } | 
 | 158 |     } | 
 | 159 |  | 
 | 160 |     fn peer(&self) -> String { | 
 | 161 |         unsafe { | 
 | 162 |             // RequestContext always holds a reference of the call. | 
 | 163 |             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx); | 
 | 164 |             let p = grpc_sys::grpc_call_get_peer(call); | 
 | 165 |             let peer = CStr::from_ptr(p) | 
 | 166 |                 .to_str() | 
 | 167 |                 .expect("valid UTF-8 data") | 
 | 168 |                 .to_owned(); | 
 | 169 |             grpc_sys::gpr_free(p as _); | 
 | 170 |             peer | 
 | 171 |         } | 
 | 172 |     } | 
 | 173 |  | 
 | 174 |     /// If the server binds in non-secure mode, this will return None | 
 | 175 |     fn auth_context(&self) -> Option<AuthContext> { | 
 | 176 |         unsafe { | 
 | 177 |             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx); | 
 | 178 |             AuthContext::from_call_ptr(call) | 
 | 179 |         } | 
 | 180 |     } | 
 | 181 | } | 
 | 182 |  | 
 | 183 | impl Drop for RequestContext { | 
 | 184 |     fn drop(&mut self) { | 
 | 185 |         unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) } | 
 | 186 |     } | 
 | 187 | } | 
 | 188 |  | 
 | 189 | /// A context for handling client side unary request. | 
 | 190 | pub struct UnaryRequestContext { | 
 | 191 |     request: RequestContext, | 
 | 192 |     request_call: Option<RequestCallContext>, | 
 | 193 |     batch: BatchContext, | 
 | 194 | } | 
 | 195 |  | 
 | 196 | impl UnaryRequestContext { | 
 | 197 |     pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext { | 
 | 198 |         UnaryRequestContext { | 
 | 199 |             request: ctx, | 
 | 200 |             request_call: Some(rc), | 
 | 201 |             batch: BatchContext::new(), | 
 | 202 |         } | 
 | 203 |     } | 
 | 204 |  | 
 | 205 |     pub fn batch_ctx(&self) -> &BatchContext { | 
 | 206 |         &self.batch | 
 | 207 |     } | 
 | 208 |  | 
 | 209 |     pub fn batch_ctx_mut(&mut self) -> &mut BatchContext { | 
 | 210 |         &mut self.batch | 
 | 211 |     } | 
 | 212 |  | 
 | 213 |     pub fn request_ctx(&self) -> &RequestContext { | 
 | 214 |         &self.request | 
 | 215 |     } | 
 | 216 |  | 
 | 217 |     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> { | 
 | 218 |         self.request_call.take() | 
 | 219 |     } | 
 | 220 |  | 
 | 221 |     pub fn handle( | 
 | 222 |         self, | 
 | 223 |         rc: &mut RequestCallContext, | 
 | 224 |         cq: &CompletionQueue, | 
 | 225 |         reader: Option<MessageReader>, | 
 | 226 |     ) { | 
 | 227 |         let handler = unsafe { rc.get_handler(self.request.method()).unwrap() }; | 
 | 228 |         if reader.is_some() { | 
 | 229 |             return execute(self.request, cq, reader, handler); | 
 | 230 |         } | 
 | 231 |  | 
 | 232 |         let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned())); | 
 | 233 |         self.request.call(cq.clone()).abort(&status) | 
 | 234 |     } | 
 | 235 | } | 
 | 236 |  | 
 | 237 | /// A stream for client a streaming call and a duplex streaming call. | 
 | 238 | /// | 
 | 239 | /// The corresponding RPC will be canceled if the stream did not | 
 | 240 | /// finish before dropping. | 
 | 241 | #[must_use = "if unused the RequestStream may immediately cancel the RPC"] | 
 | 242 | pub struct RequestStream<T> { | 
 | 243 |     call: Arc<Mutex<ShareCall>>, | 
 | 244 |     base: StreamingBase, | 
 | 245 |     de: DeserializeFn<T>, | 
 | 246 | } | 
 | 247 |  | 
 | 248 | impl<T> RequestStream<T> { | 
 | 249 |     fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> { | 
 | 250 |         RequestStream { | 
 | 251 |             call, | 
 | 252 |             base: StreamingBase::new(None), | 
 | 253 |             de, | 
 | 254 |         } | 
 | 255 |     } | 
 | 256 | } | 
 | 257 |  | 
 | 258 | impl<T> Stream for RequestStream<T> { | 
 | 259 |     type Item = Result<T>; | 
 | 260 |  | 
 | 261 |     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> { | 
 | 262 |         { | 
 | 263 |             let mut call = self.call.lock(); | 
 | 264 |             call.check_alive()?; | 
 | 265 |         } | 
 | 266 |  | 
 | 267 |         let t = &mut *self; | 
 | 268 |         match ready!(t.base.poll(cx, &mut t.call, false)?) { | 
 | 269 |             None => Poll::Ready(None), | 
 | 270 |             Some(data) => Poll::Ready(Some((t.de)(data))), | 
 | 271 |         } | 
 | 272 |     } | 
 | 273 | } | 
 | 274 |  | 
 | 275 | impl<T> Drop for RequestStream<T> { | 
 | 276 |     /// The corresponding RPC will be canceled if the stream did not | 
 | 277 |     /// finish before dropping. | 
 | 278 |     fn drop(&mut self) { | 
 | 279 |         self.base.on_drop(&mut self.call); | 
 | 280 |     } | 
 | 281 | } | 
 | 282 |  | 
 | 283 | /// A helper macro used to implement server side unary sink. | 
 | 284 | /// Not using generic here because we don't need to expose | 
 | 285 | /// `CallHolder` or `Call` to caller. | 
 | 286 | // TODO: Use type alias to be friendly for documentation. | 
 | 287 | macro_rules! impl_unary_sink { | 
 | 288 |     ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => { | 
 | 289 |         pub struct $rt { | 
 | 290 |             call: $holder, | 
 | 291 |             cq_f: Option<BatchFuture>, | 
 | 292 |             err: Option<Error>, | 
 | 293 |         } | 
 | 294 |  | 
 | 295 |         impl Future for $rt { | 
 | 296 |             type Output = Result<()>; | 
 | 297 |  | 
 | 298 |             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { | 
 | 299 |                 if let Some(e) = self.err.take() { | 
 | 300 |                     return Poll::Ready(Err(e)); | 
 | 301 |                 } | 
 | 302 |  | 
 | 303 |                 if self.cq_f.is_some() { | 
 | 304 |                     ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?); | 
 | 305 |                     self.cq_f.take(); | 
 | 306 |                 } | 
 | 307 |  | 
 | 308 |                 ready!(self.call.call(|c| c.poll_finish(cx))?); | 
 | 309 |                 Poll::Ready(Ok(())) | 
 | 310 |             } | 
 | 311 |         } | 
 | 312 |  | 
 | 313 |         $(#[$attr])* | 
 | 314 |         pub struct $t<T> { | 
 | 315 |             call: Option<$holder>, | 
 | 316 |             write_flags: u32, | 
 | 317 |             ser: SerializeFn<T>, | 
 | 318 |         } | 
 | 319 |  | 
 | 320 |         impl<T> $t<T> { | 
 | 321 |             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> { | 
 | 322 |                 $t { | 
 | 323 |                     call: Some(call), | 
 | 324 |                     write_flags: 0, | 
 | 325 |                     ser: ser, | 
 | 326 |                 } | 
 | 327 |             } | 
 | 328 |  | 
 | 329 |             pub fn success(self, t: T) -> $rt { | 
 | 330 |                 self.complete(RpcStatus::ok(), Some(t)) | 
 | 331 |             } | 
 | 332 |  | 
 | 333 |             pub fn fail(self, status: RpcStatus) -> $rt { | 
 | 334 |                 self.complete(status, None) | 
 | 335 |             } | 
 | 336 |  | 
 | 337 |             fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt { | 
 | 338 |                 let data = t.as_ref().map(|t| { | 
 | 339 |                     let mut buf = vec![]; | 
 | 340 |                     (self.ser)(t, &mut buf); | 
 | 341 |                     buf | 
 | 342 |                 }); | 
 | 343 |  | 
 | 344 |                 let write_flags = self.write_flags; | 
 | 345 |                 let res = self.call.as_mut().unwrap().call(|c| { | 
 | 346 |                     c.call | 
 | 347 |                         .start_send_status_from_server(&status, true, &data, write_flags) | 
 | 348 |                 }); | 
 | 349 |  | 
 | 350 |                 let (cq_f, err) = match res { | 
 | 351 |                     Ok(f) => (Some(f), None), | 
 | 352 |                     Err(e) => (None, Some(e)), | 
 | 353 |                 }; | 
 | 354 |  | 
 | 355 |                 $rt { | 
 | 356 |                     call: self.call.take().unwrap(), | 
 | 357 |                     cq_f: cq_f, | 
 | 358 |                     err: err, | 
 | 359 |                 } | 
 | 360 |             } | 
 | 361 |         } | 
 | 362 |  | 
 | 363 |         impl<T> Drop for $t<T> { | 
 | 364 |             /// The corresponding RPC will be canceled if the sink did not | 
 | 365 |             /// send a response before dropping. | 
 | 366 |             fn drop(&mut self) { | 
 | 367 |                 self.call | 
 | 368 |                     .as_mut() | 
 | 369 |                     .map(|call| call.call(|c| c.call.cancel())); | 
 | 370 |             } | 
 | 371 |         } | 
 | 372 |     }; | 
 | 373 | } | 
 | 374 |  | 
 | 375 | impl_unary_sink!( | 
 | 376 |     /// A sink for unary call. | 
 | 377 |     /// | 
 | 378 |     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping. | 
 | 379 |     /// | 
 | 380 |     /// [`success`]: #method.success | 
 | 381 |     /// [`fail`]: #method.fail | 
 | 382 |     #[must_use = "if unused the sink may immediately cancel the RPC"] | 
 | 383 |     UnarySink, | 
 | 384 |     UnarySinkResult, | 
 | 385 |     ShareCall | 
 | 386 | ); | 
 | 387 | impl_unary_sink!( | 
 | 388 |     /// A sink for client streaming call. | 
 | 389 |     /// | 
 | 390 |     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping. | 
 | 391 |     /// | 
 | 392 |     /// [`success`]: #method.success | 
 | 393 |     /// [`fail`]: #method.fail | 
 | 394 |     #[must_use = "if unused the sink may immediately cancel the RPC"] | 
 | 395 |     ClientStreamingSink, | 
 | 396 |     ClientStreamingSinkResult, | 
 | 397 |     Arc<Mutex<ShareCall>> | 
 | 398 | ); | 
 | 399 |  | 
 | 400 | // A macro helper to implement server side streaming sink. | 
 | 401 | macro_rules! impl_stream_sink { | 
 | 402 |     ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => { | 
 | 403 |         $(#[$attr])* | 
 | 404 |         pub struct $t<T> { | 
 | 405 |             call: Option<$holder>, | 
 | 406 |             base: SinkBase, | 
 | 407 |             flush_f: Option<BatchFuture>, | 
 | 408 |             status: RpcStatus, | 
 | 409 |             flushed: bool, | 
 | 410 |             closed: bool, | 
 | 411 |             ser: SerializeFn<T>, | 
 | 412 |         } | 
 | 413 |  | 
 | 414 |         impl<T> $t<T> { | 
 | 415 |             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> { | 
 | 416 |                 $t { | 
 | 417 |                     call: Some(call), | 
 | 418 |                     base: SinkBase::new(true), | 
 | 419 |                     flush_f: None, | 
 | 420 |                     status: RpcStatus::ok(), | 
 | 421 |                     flushed: false, | 
 | 422 |                     closed: false, | 
 | 423 |                     ser: ser, | 
 | 424 |                 } | 
 | 425 |             } | 
 | 426 |  | 
 | 427 |             pub fn set_status(&mut self, status: RpcStatus) { | 
 | 428 |                 assert!(self.flush_f.is_none()); | 
 | 429 |                 self.status = status; | 
 | 430 |             } | 
 | 431 |  | 
 | 432 |             pub fn fail(mut self, status: RpcStatus) -> $ft { | 
 | 433 |                 assert!(self.flush_f.is_none()); | 
 | 434 |                 let send_metadata = self.base.send_metadata; | 
 | 435 |                 let res = self.call.as_mut().unwrap().call(|c| { | 
 | 436 |                     c.call | 
 | 437 |                         .start_send_status_from_server(&status, send_metadata, &None, 0) | 
 | 438 |                 }); | 
 | 439 |  | 
 | 440 |                 let (fail_f, err) = match res { | 
 | 441 |                     Ok(f) => (Some(f), None), | 
 | 442 |                     Err(e) => (None, Some(e)), | 
 | 443 |                 }; | 
 | 444 |  | 
 | 445 |                 $ft { | 
 | 446 |                     call: self.call.take().unwrap(), | 
 | 447 |                     fail_f: fail_f, | 
 | 448 |                     err: err, | 
 | 449 |                 } | 
 | 450 |             } | 
 | 451 |         } | 
 | 452 |  | 
 | 453 |         impl<T> Drop for $t<T> { | 
 | 454 |             /// The corresponding RPC will be canceled if the sink did not call | 
 | 455 |             /// [`close`] or [`fail`] before dropping. | 
 | 456 |             /// | 
 | 457 |             /// [`close`]: #method.close | 
 | 458 |             /// [`fail`]: #method.fail | 
 | 459 |             fn drop(&mut self) { | 
 | 460 |                 // We did not close it explicitly and it was not dropped in the `fail`. | 
 | 461 |                 if !self.closed && self.call.is_some() { | 
 | 462 |                     let mut call = self.call.take().unwrap(); | 
 | 463 |                     call.call(|c| c.call.cancel()); | 
 | 464 |                 } | 
 | 465 |             } | 
 | 466 |         } | 
 | 467 |  | 
 | 468 |         impl<T> Sink<(T, WriteFlags)> for $t<T> { | 
 | 469 |             type Error = Error; | 
 | 470 |  | 
 | 471 |             #[inline] | 
 | 472 |             fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { | 
 | 473 |                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? { | 
 | 474 |                     return Poll::Ready(Err(Error::RemoteStopped)); | 
 | 475 |                 } | 
 | 476 |                 Pin::new(&mut self.base).poll_ready(cx) | 
 | 477 |             } | 
 | 478 |  | 
 | 479 |             #[inline] | 
 | 480 |             fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> { | 
 | 481 |                 let t = &mut *self; | 
 | 482 |                 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser) | 
 | 483 |             } | 
 | 484 |  | 
 | 485 |             #[inline] | 
 | 486 |             fn poll_flush(mut self: Pin<&mut Self>,  cx: &mut Context) -> Poll<Result<()>> { | 
 | 487 |                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? { | 
 | 488 |                     return Poll::Ready(Err(Error::RemoteStopped)); | 
 | 489 |                 } | 
 | 490 |                 Pin::new(&mut self.base).poll_ready(cx) | 
 | 491 |             } | 
 | 492 |  | 
 | 493 |             fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { | 
 | 494 |                 if self.flush_f.is_none() { | 
 | 495 |                     ready!(Pin::new(&mut self.base).poll_ready(cx)?); | 
 | 496 |  | 
 | 497 |                     let send_metadata = self.base.send_metadata; | 
 | 498 |                     let t = &mut *self; | 
 | 499 |                     let status = &t.status; | 
 | 500 |                     let flush_f = t.call.as_mut().unwrap().call(|c| { | 
 | 501 |                         c.call | 
 | 502 |                             .start_send_status_from_server(status, send_metadata, &None, 0) | 
 | 503 |                     })?; | 
 | 504 |                     t.flush_f = Some(flush_f); | 
 | 505 |                 } | 
 | 506 |  | 
 | 507 |                 if !self.flushed { | 
 | 508 |                     ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?); | 
 | 509 |                     self.flushed = true; | 
 | 510 |                 } | 
 | 511 |  | 
 | 512 |                 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?); | 
 | 513 |                 self.closed = true; | 
 | 514 |                 Poll::Ready(Ok(())) | 
 | 515 |             } | 
 | 516 |         } | 
 | 517 |  | 
 | 518 |         #[must_use = "if unused the sink failure may immediately cancel the RPC"] | 
 | 519 |         pub struct $ft { | 
 | 520 |             call: $holder, | 
 | 521 |             fail_f: Option<BatchFuture>, | 
 | 522 |             err: Option<Error>, | 
 | 523 |         } | 
 | 524 |  | 
 | 525 |         impl Future for $ft { | 
 | 526 |             type Output = Result<()>; | 
 | 527 |  | 
 | 528 |             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { | 
 | 529 |                 if let Some(e) = self.err.take() { | 
 | 530 |                     return Poll::Ready(Err(e)); | 
 | 531 |                 } | 
 | 532 |  | 
 | 533 |                 let readiness = self.call.call(|c| { | 
 | 534 |                     if c.finished { | 
 | 535 |                         return Poll::Ready(Ok(())); | 
 | 536 |                     } | 
 | 537 |  | 
 | 538 |                     c.poll_finish(cx).map(|r| r.map(|_| ())) | 
 | 539 |                 })?; | 
 | 540 |  | 
 | 541 |                 if let Some(ref mut f) = self.fail_f { | 
 | 542 |                     ready!(Pin::new(f).poll(cx)?); | 
 | 543 |                 } | 
 | 544 |  | 
 | 545 |                 self.fail_f.take(); | 
 | 546 |                 readiness.map(Ok) | 
 | 547 |             } | 
 | 548 |         } | 
 | 549 |     }; | 
 | 550 | } | 
 | 551 |  | 
 | 552 | impl_stream_sink!( | 
 | 553 |     /// A sink for server streaming call. | 
 | 554 |     /// | 
 | 555 |     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping. | 
 | 556 |     /// | 
 | 557 |     /// [`close`]: #method.close | 
 | 558 |     /// [`fail`]: #method.fail | 
 | 559 |     #[must_use = "if unused the sink may immediately cancel the RPC"] | 
 | 560 |     ServerStreamingSink, | 
 | 561 |     ServerStreamingSinkFailure, | 
 | 562 |     ShareCall | 
 | 563 | ); | 
 | 564 | impl_stream_sink!( | 
 | 565 |     /// A sink for duplex streaming call. | 
 | 566 |     /// | 
 | 567 |     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping. | 
 | 568 |     /// | 
 | 569 |     /// [`close`]: #method.close | 
 | 570 |     /// [`fail`]: #method.fail | 
 | 571 |     #[must_use = "if unused the sink may immediately cancel the RPC"] | 
 | 572 |     DuplexSink, | 
 | 573 |     DuplexSinkFailure, | 
 | 574 |     Arc<Mutex<ShareCall>> | 
 | 575 | ); | 
 | 576 |  | 
 | 577 | /// A context for rpc handling. | 
 | 578 | pub struct RpcContext<'a> { | 
 | 579 |     ctx: RequestContext, | 
 | 580 |     executor: Executor<'a>, | 
 | 581 |     deadline: Deadline, | 
 | 582 | } | 
 | 583 |  | 
 | 584 | impl<'a> RpcContext<'a> { | 
 | 585 |     fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> { | 
 | 586 |         RpcContext { | 
 | 587 |             deadline: ctx.deadline(), | 
 | 588 |             ctx, | 
 | 589 |             executor: Executor::new(cq), | 
 | 590 |         } | 
 | 591 |     } | 
 | 592 |  | 
 | 593 |     fn kicker(&self) -> Kicker { | 
 | 594 |         let call = self.call(); | 
 | 595 |         Kicker::from_call(call) | 
 | 596 |     } | 
 | 597 |  | 
 | 598 |     pub(crate) fn call(&self) -> Call { | 
 | 599 |         self.ctx.call(self.executor.cq().clone()) | 
 | 600 |     } | 
 | 601 |  | 
 | 602 |     pub fn method(&self) -> &[u8] { | 
 | 603 |         self.ctx.method() | 
 | 604 |     } | 
 | 605 |  | 
 | 606 |     pub fn host(&self) -> &[u8] { | 
 | 607 |         self.ctx.host() | 
 | 608 |     } | 
 | 609 |  | 
 | 610 |     pub fn deadline(&self) -> &Deadline { | 
 | 611 |         &self.deadline | 
 | 612 |     } | 
 | 613 |  | 
 | 614 |     /// Get the initial metadata sent by client. | 
 | 615 |     pub fn request_headers(&self) -> &Metadata { | 
 | 616 |         self.ctx.metadata() | 
 | 617 |     } | 
 | 618 |  | 
 | 619 |     pub fn peer(&self) -> String { | 
 | 620 |         self.ctx.peer() | 
 | 621 |     } | 
 | 622 |  | 
 | 623 |     /// Wrapper around the gRPC Core AuthContext | 
 | 624 |     /// | 
 | 625 |     /// If the server binds in non-secure mode, this will return None | 
 | 626 |     pub fn auth_context(&self) -> Option<AuthContext> { | 
 | 627 |         self.ctx.auth_context() | 
 | 628 |     } | 
 | 629 |  | 
 | 630 |     /// Spawn the future into current gRPC poll thread. | 
 | 631 |     /// | 
 | 632 |     /// This can reduce a lot of context switching, but please make | 
 | 633 |     /// sure there is no heavy work in the future. | 
 | 634 |     pub fn spawn<F>(&self, f: F) | 
 | 635 |     where | 
 | 636 |         F: Future<Output = ()> + Send + 'static, | 
 | 637 |     { | 
 | 638 |         self.executor.spawn(f, self.kicker()) | 
 | 639 |     } | 
 | 640 | } | 
 | 641 |  | 
 | 642 | // Following four helper functions are used to create a callback closure. | 
 | 643 |  | 
 | 644 | macro_rules! accept_call { | 
 | 645 |     ($call:expr) => { | 
 | 646 |         match $call.start_server_side() { | 
 | 647 |             Err(Error::QueueShutdown) => return, | 
 | 648 |             Err(e) => panic!("unexpected error when trying to accept request: {:?}", e), | 
 | 649 |             Ok(f) => f, | 
 | 650 |         } | 
 | 651 |     }; | 
 | 652 | } | 
 | 653 |  | 
 | 654 | // Helper function to call a unary handler. | 
 | 655 | pub fn execute_unary<P, Q, F>( | 
 | 656 |     ctx: RpcContext<'_>, | 
 | 657 |     ser: SerializeFn<Q>, | 
 | 658 |     de: DeserializeFn<P>, | 
 | 659 |     payload: MessageReader, | 
 | 660 |     f: &mut F, | 
 | 661 | ) where | 
 | 662 |     F: FnMut(RpcContext<'_>, P, UnarySink<Q>), | 
 | 663 | { | 
 | 664 |     let mut call = ctx.call(); | 
 | 665 |     let close_f = accept_call!(call); | 
 | 666 |     let request = match de(payload) { | 
 | 667 |         Ok(f) => f, | 
 | 668 |         Err(e) => { | 
 | 669 |             let status = RpcStatus::new( | 
 | 670 |                 RpcStatusCode::INTERNAL, | 
 | 671 |                 Some(format!("Failed to deserialize response message: {:?}", e)), | 
 | 672 |             ); | 
 | 673 |             call.abort(&status); | 
 | 674 |             return; | 
 | 675 |         } | 
 | 676 |     }; | 
 | 677 |     let sink = UnarySink::new(ShareCall::new(call, close_f), ser); | 
 | 678 |     f(ctx, request, sink) | 
 | 679 | } | 
 | 680 |  | 
 | 681 | // Helper function to call client streaming handler. | 
 | 682 | pub fn execute_client_streaming<P, Q, F>( | 
 | 683 |     ctx: RpcContext<'_>, | 
 | 684 |     ser: SerializeFn<Q>, | 
 | 685 |     de: DeserializeFn<P>, | 
 | 686 |     f: &mut F, | 
 | 687 | ) where | 
 | 688 |     F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>), | 
 | 689 | { | 
 | 690 |     let mut call = ctx.call(); | 
 | 691 |     let close_f = accept_call!(call); | 
 | 692 |     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f))); | 
 | 693 |  | 
 | 694 |     let req_s = RequestStream::new(call.clone(), de); | 
 | 695 |     let sink = ClientStreamingSink::new(call, ser); | 
 | 696 |     f(ctx, req_s, sink) | 
 | 697 | } | 
 | 698 |  | 
 | 699 | // Helper function to call server streaming handler. | 
 | 700 | pub fn execute_server_streaming<P, Q, F>( | 
 | 701 |     ctx: RpcContext<'_>, | 
 | 702 |     ser: SerializeFn<Q>, | 
 | 703 |     de: DeserializeFn<P>, | 
 | 704 |     payload: MessageReader, | 
 | 705 |     f: &mut F, | 
 | 706 | ) where | 
 | 707 |     F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>), | 
 | 708 | { | 
 | 709 |     let mut call = ctx.call(); | 
 | 710 |     let close_f = accept_call!(call); | 
 | 711 |  | 
 | 712 |     let request = match de(payload) { | 
 | 713 |         Ok(t) => t, | 
 | 714 |         Err(e) => { | 
 | 715 |             let status = RpcStatus::new( | 
 | 716 |                 RpcStatusCode::INTERNAL, | 
 | 717 |                 Some(format!("Failed to deserialize response message: {:?}", e)), | 
 | 718 |             ); | 
 | 719 |             call.abort(&status); | 
 | 720 |             return; | 
 | 721 |         } | 
 | 722 |     }; | 
 | 723 |  | 
 | 724 |     let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser); | 
 | 725 |     f(ctx, request, sink) | 
 | 726 | } | 
 | 727 |  | 
 | 728 | // Helper function to call duplex streaming handler. | 
 | 729 | pub fn execute_duplex_streaming<P, Q, F>( | 
 | 730 |     ctx: RpcContext<'_>, | 
 | 731 |     ser: SerializeFn<Q>, | 
 | 732 |     de: DeserializeFn<P>, | 
 | 733 |     f: &mut F, | 
 | 734 | ) where | 
 | 735 |     F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>), | 
 | 736 | { | 
 | 737 |     let mut call = ctx.call(); | 
 | 738 |     let close_f = accept_call!(call); | 
 | 739 |     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f))); | 
 | 740 |  | 
 | 741 |     let req_s = RequestStream::new(call.clone(), de); | 
 | 742 |     let sink = DuplexSink::new(call, ser); | 
 | 743 |     f(ctx, req_s, sink) | 
 | 744 | } | 
 | 745 |  | 
 | 746 | // A helper function used to handle all undefined rpc calls. | 
 | 747 | pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) { | 
 | 748 |     // Suppress needless-pass-by-value. | 
 | 749 |     let ctx = ctx; | 
 | 750 |     let mut call = ctx.call(cq); | 
 | 751 |     accept_call!(call); | 
 | 752 |     call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None)) | 
 | 753 | } | 
 | 754 |  | 
 | 755 | // Helper function to call handler. | 
 | 756 | // | 
 | 757 | // Invoked after a request is ready to be handled. | 
 | 758 | fn execute( | 
 | 759 |     ctx: RequestContext, | 
 | 760 |     cq: &CompletionQueue, | 
 | 761 |     payload: Option<MessageReader>, | 
 | 762 |     f: &mut BoxHandler, | 
 | 763 | ) { | 
 | 764 |     let rpc_ctx = RpcContext::new(ctx, cq); | 
 | 765 |     f.handle(rpc_ctx, payload) | 
 | 766 | } |