| Jeff Vander Stoep | 761577d | 2020-10-14 15:21:00 +0200 | [diff] [blame^] | 1 | // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. |
| 2 | |
| 3 | use std::ffi::CStr; |
| 4 | use std::pin::Pin; |
| 5 | use std::sync::Arc; |
| 6 | use std::{result, slice}; |
| 7 | |
| 8 | use crate::grpc_sys::{ |
| 9 | self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context, |
| 10 | }; |
| 11 | use futures::future::Future; |
| 12 | use futures::ready; |
| 13 | use futures::sink::Sink; |
| 14 | use futures::stream::Stream; |
| 15 | use futures::task::{Context, Poll}; |
| 16 | use parking_lot::Mutex; |
| 17 | |
| 18 | use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags}; |
| 19 | use crate::auth_context::AuthContext; |
| 20 | use crate::call::{ |
| 21 | BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase, |
| 22 | }; |
| 23 | use crate::codec::{DeserializeFn, SerializeFn}; |
| 24 | use crate::cq::CompletionQueue; |
| 25 | use crate::error::{Error, Result}; |
| 26 | use crate::metadata::Metadata; |
| 27 | use crate::server::{BoxHandler, RequestCallContext}; |
| 28 | use crate::task::{BatchFuture, CallTag, Executor, Kicker}; |
| 29 | |
| 30 | pub struct Deadline { |
| 31 | spec: gpr_timespec, |
| 32 | } |
| 33 | |
| 34 | impl Deadline { |
| 35 | fn new(spec: gpr_timespec) -> Deadline { |
| 36 | let realtime_spec = |
| 37 | unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) }; |
| 38 | |
| 39 | Deadline { |
| 40 | spec: realtime_spec, |
| 41 | } |
| 42 | } |
| 43 | |
| 44 | pub fn exceeded(&self) -> bool { |
| 45 | unsafe { |
| 46 | let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME); |
| 47 | grpc_sys::gpr_time_cmp(now, self.spec) >= 0 |
| 48 | } |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | /// Context for accepting a request. |
| 53 | pub struct RequestContext { |
| 54 | ctx: *mut grpcwrap_request_call_context, |
| 55 | request_call: Option<RequestCallContext>, |
| 56 | } |
| 57 | |
| 58 | impl RequestContext { |
| 59 | pub fn new(rc: RequestCallContext) -> RequestContext { |
| 60 | let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() }; |
| 61 | |
| 62 | RequestContext { |
| 63 | ctx, |
| 64 | request_call: Some(rc), |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | /// Try to accept a client side streaming request. |
| 69 | /// |
| 70 | /// Return error if the request is a client side unary request. |
| 71 | pub fn handle_stream_req( |
| 72 | self, |
| 73 | cq: &CompletionQueue, |
| 74 | rc: &mut RequestCallContext, |
| 75 | ) -> result::Result<(), Self> { |
| 76 | let handler = unsafe { rc.get_handler(self.method()) }; |
| 77 | match handler { |
| 78 | Some(handler) => match handler.method_type() { |
| 79 | MethodType::Unary | MethodType::ServerStreaming => Err(self), |
| 80 | _ => { |
| 81 | execute(self, cq, None, handler); |
| 82 | Ok(()) |
| 83 | } |
| 84 | }, |
| 85 | None => { |
| 86 | execute_unimplemented(self, cq.clone()); |
| 87 | Ok(()) |
| 88 | } |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | /// Accept a client side unary request. |
| 93 | /// |
| 94 | /// This method should be called after `handle_stream_req`. When handling |
| 95 | /// client side unary request, handler will only be called after the unary |
| 96 | /// request is received. |
| 97 | pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) { |
| 98 | // fetch message before calling callback. |
| 99 | let tag = Box::new(CallTag::unary_request(self, rc)); |
| 100 | let batch_ctx = tag.batch_ctx().unwrap().as_ptr(); |
| 101 | let request_ctx = tag.request_ctx().unwrap().as_ptr(); |
| 102 | let tag_ptr = Box::into_raw(tag); |
| 103 | unsafe { |
| 104 | let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx); |
| 105 | let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _); |
| 106 | if code != grpc_call_error::GRPC_CALL_OK { |
| 107 | Box::from_raw(tag_ptr); |
| 108 | // it should not failed. |
| 109 | panic!("try to receive message fail: {:?}", code); |
| 110 | } |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> { |
| 115 | self.request_call.take() |
| 116 | } |
| 117 | |
| 118 | pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context { |
| 119 | self.ctx |
| 120 | } |
| 121 | |
| 122 | fn call(&self, cq: CompletionQueue) -> Call { |
| 123 | unsafe { |
| 124 | // It is okay to use a mutable pointer on a immutable reference, `self`, |
| 125 | // because grpcwrap_request_call_context_ref_call is thread-safe. |
| 126 | let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx); |
| 127 | assert!(!call.is_null()); |
| 128 | Call::from_raw(call, cq) |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | pub fn method(&self) -> &[u8] { |
| 133 | let mut len = 0; |
| 134 | let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) }; |
| 135 | |
| 136 | unsafe { slice::from_raw_parts(method as _, len) } |
| 137 | } |
| 138 | |
| 139 | fn host(&self) -> &[u8] { |
| 140 | let mut len = 0; |
| 141 | let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) }; |
| 142 | |
| 143 | unsafe { slice::from_raw_parts(host as _, len) } |
| 144 | } |
| 145 | |
| 146 | fn deadline(&self) -> Deadline { |
| 147 | let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) }; |
| 148 | |
| 149 | Deadline::new(t) |
| 150 | } |
| 151 | |
| 152 | fn metadata(&self) -> &Metadata { |
| 153 | unsafe { |
| 154 | let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx); |
| 155 | let arr_ptr: *const Metadata = ptr as _; |
| 156 | &*arr_ptr |
| 157 | } |
| 158 | } |
| 159 | |
| 160 | fn peer(&self) -> String { |
| 161 | unsafe { |
| 162 | // RequestContext always holds a reference of the call. |
| 163 | let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx); |
| 164 | let p = grpc_sys::grpc_call_get_peer(call); |
| 165 | let peer = CStr::from_ptr(p) |
| 166 | .to_str() |
| 167 | .expect("valid UTF-8 data") |
| 168 | .to_owned(); |
| 169 | grpc_sys::gpr_free(p as _); |
| 170 | peer |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | /// If the server binds in non-secure mode, this will return None |
| 175 | fn auth_context(&self) -> Option<AuthContext> { |
| 176 | unsafe { |
| 177 | let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx); |
| 178 | AuthContext::from_call_ptr(call) |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | |
| 183 | impl Drop for RequestContext { |
| 184 | fn drop(&mut self) { |
| 185 | unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) } |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | /// A context for handling client side unary request. |
| 190 | pub struct UnaryRequestContext { |
| 191 | request: RequestContext, |
| 192 | request_call: Option<RequestCallContext>, |
| 193 | batch: BatchContext, |
| 194 | } |
| 195 | |
| 196 | impl UnaryRequestContext { |
| 197 | pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext { |
| 198 | UnaryRequestContext { |
| 199 | request: ctx, |
| 200 | request_call: Some(rc), |
| 201 | batch: BatchContext::new(), |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | pub fn batch_ctx(&self) -> &BatchContext { |
| 206 | &self.batch |
| 207 | } |
| 208 | |
| 209 | pub fn batch_ctx_mut(&mut self) -> &mut BatchContext { |
| 210 | &mut self.batch |
| 211 | } |
| 212 | |
| 213 | pub fn request_ctx(&self) -> &RequestContext { |
| 214 | &self.request |
| 215 | } |
| 216 | |
| 217 | pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> { |
| 218 | self.request_call.take() |
| 219 | } |
| 220 | |
| 221 | pub fn handle( |
| 222 | self, |
| 223 | rc: &mut RequestCallContext, |
| 224 | cq: &CompletionQueue, |
| 225 | reader: Option<MessageReader>, |
| 226 | ) { |
| 227 | let handler = unsafe { rc.get_handler(self.request.method()).unwrap() }; |
| 228 | if reader.is_some() { |
| 229 | return execute(self.request, cq, reader, handler); |
| 230 | } |
| 231 | |
| 232 | let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned())); |
| 233 | self.request.call(cq.clone()).abort(&status) |
| 234 | } |
| 235 | } |
| 236 | |
| 237 | /// A stream for client a streaming call and a duplex streaming call. |
| 238 | /// |
| 239 | /// The corresponding RPC will be canceled if the stream did not |
| 240 | /// finish before dropping. |
| 241 | #[must_use = "if unused the RequestStream may immediately cancel the RPC"] |
| 242 | pub struct RequestStream<T> { |
| 243 | call: Arc<Mutex<ShareCall>>, |
| 244 | base: StreamingBase, |
| 245 | de: DeserializeFn<T>, |
| 246 | } |
| 247 | |
| 248 | impl<T> RequestStream<T> { |
| 249 | fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> { |
| 250 | RequestStream { |
| 251 | call, |
| 252 | base: StreamingBase::new(None), |
| 253 | de, |
| 254 | } |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | impl<T> Stream for RequestStream<T> { |
| 259 | type Item = Result<T>; |
| 260 | |
| 261 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> { |
| 262 | { |
| 263 | let mut call = self.call.lock(); |
| 264 | call.check_alive()?; |
| 265 | } |
| 266 | |
| 267 | let t = &mut *self; |
| 268 | match ready!(t.base.poll(cx, &mut t.call, false)?) { |
| 269 | None => Poll::Ready(None), |
| 270 | Some(data) => Poll::Ready(Some((t.de)(data))), |
| 271 | } |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | impl<T> Drop for RequestStream<T> { |
| 276 | /// The corresponding RPC will be canceled if the stream did not |
| 277 | /// finish before dropping. |
| 278 | fn drop(&mut self) { |
| 279 | self.base.on_drop(&mut self.call); |
| 280 | } |
| 281 | } |
| 282 | |
| 283 | /// A helper macro used to implement server side unary sink. |
| 284 | /// Not using generic here because we don't need to expose |
| 285 | /// `CallHolder` or `Call` to caller. |
| 286 | // TODO: Use type alias to be friendly for documentation. |
| 287 | macro_rules! impl_unary_sink { |
| 288 | ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => { |
| 289 | pub struct $rt { |
| 290 | call: $holder, |
| 291 | cq_f: Option<BatchFuture>, |
| 292 | err: Option<Error>, |
| 293 | } |
| 294 | |
| 295 | impl Future for $rt { |
| 296 | type Output = Result<()>; |
| 297 | |
| 298 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { |
| 299 | if let Some(e) = self.err.take() { |
| 300 | return Poll::Ready(Err(e)); |
| 301 | } |
| 302 | |
| 303 | if self.cq_f.is_some() { |
| 304 | ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?); |
| 305 | self.cq_f.take(); |
| 306 | } |
| 307 | |
| 308 | ready!(self.call.call(|c| c.poll_finish(cx))?); |
| 309 | Poll::Ready(Ok(())) |
| 310 | } |
| 311 | } |
| 312 | |
| 313 | $(#[$attr])* |
| 314 | pub struct $t<T> { |
| 315 | call: Option<$holder>, |
| 316 | write_flags: u32, |
| 317 | ser: SerializeFn<T>, |
| 318 | } |
| 319 | |
| 320 | impl<T> $t<T> { |
| 321 | fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> { |
| 322 | $t { |
| 323 | call: Some(call), |
| 324 | write_flags: 0, |
| 325 | ser: ser, |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | pub fn success(self, t: T) -> $rt { |
| 330 | self.complete(RpcStatus::ok(), Some(t)) |
| 331 | } |
| 332 | |
| 333 | pub fn fail(self, status: RpcStatus) -> $rt { |
| 334 | self.complete(status, None) |
| 335 | } |
| 336 | |
| 337 | fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt { |
| 338 | let data = t.as_ref().map(|t| { |
| 339 | let mut buf = vec![]; |
| 340 | (self.ser)(t, &mut buf); |
| 341 | buf |
| 342 | }); |
| 343 | |
| 344 | let write_flags = self.write_flags; |
| 345 | let res = self.call.as_mut().unwrap().call(|c| { |
| 346 | c.call |
| 347 | .start_send_status_from_server(&status, true, &data, write_flags) |
| 348 | }); |
| 349 | |
| 350 | let (cq_f, err) = match res { |
| 351 | Ok(f) => (Some(f), None), |
| 352 | Err(e) => (None, Some(e)), |
| 353 | }; |
| 354 | |
| 355 | $rt { |
| 356 | call: self.call.take().unwrap(), |
| 357 | cq_f: cq_f, |
| 358 | err: err, |
| 359 | } |
| 360 | } |
| 361 | } |
| 362 | |
| 363 | impl<T> Drop for $t<T> { |
| 364 | /// The corresponding RPC will be canceled if the sink did not |
| 365 | /// send a response before dropping. |
| 366 | fn drop(&mut self) { |
| 367 | self.call |
| 368 | .as_mut() |
| 369 | .map(|call| call.call(|c| c.call.cancel())); |
| 370 | } |
| 371 | } |
| 372 | }; |
| 373 | } |
| 374 | |
| 375 | impl_unary_sink!( |
| 376 | /// A sink for unary call. |
| 377 | /// |
| 378 | /// To close the sink properly, you should call [`success`] or [`fail`] before dropping. |
| 379 | /// |
| 380 | /// [`success`]: #method.success |
| 381 | /// [`fail`]: #method.fail |
| 382 | #[must_use = "if unused the sink may immediately cancel the RPC"] |
| 383 | UnarySink, |
| 384 | UnarySinkResult, |
| 385 | ShareCall |
| 386 | ); |
| 387 | impl_unary_sink!( |
| 388 | /// A sink for client streaming call. |
| 389 | /// |
| 390 | /// To close the sink properly, you should call [`success`] or [`fail`] before dropping. |
| 391 | /// |
| 392 | /// [`success`]: #method.success |
| 393 | /// [`fail`]: #method.fail |
| 394 | #[must_use = "if unused the sink may immediately cancel the RPC"] |
| 395 | ClientStreamingSink, |
| 396 | ClientStreamingSinkResult, |
| 397 | Arc<Mutex<ShareCall>> |
| 398 | ); |
| 399 | |
| 400 | // A macro helper to implement server side streaming sink. |
| 401 | macro_rules! impl_stream_sink { |
| 402 | ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => { |
| 403 | $(#[$attr])* |
| 404 | pub struct $t<T> { |
| 405 | call: Option<$holder>, |
| 406 | base: SinkBase, |
| 407 | flush_f: Option<BatchFuture>, |
| 408 | status: RpcStatus, |
| 409 | flushed: bool, |
| 410 | closed: bool, |
| 411 | ser: SerializeFn<T>, |
| 412 | } |
| 413 | |
| 414 | impl<T> $t<T> { |
| 415 | fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> { |
| 416 | $t { |
| 417 | call: Some(call), |
| 418 | base: SinkBase::new(true), |
| 419 | flush_f: None, |
| 420 | status: RpcStatus::ok(), |
| 421 | flushed: false, |
| 422 | closed: false, |
| 423 | ser: ser, |
| 424 | } |
| 425 | } |
| 426 | |
| 427 | pub fn set_status(&mut self, status: RpcStatus) { |
| 428 | assert!(self.flush_f.is_none()); |
| 429 | self.status = status; |
| 430 | } |
| 431 | |
| 432 | pub fn fail(mut self, status: RpcStatus) -> $ft { |
| 433 | assert!(self.flush_f.is_none()); |
| 434 | let send_metadata = self.base.send_metadata; |
| 435 | let res = self.call.as_mut().unwrap().call(|c| { |
| 436 | c.call |
| 437 | .start_send_status_from_server(&status, send_metadata, &None, 0) |
| 438 | }); |
| 439 | |
| 440 | let (fail_f, err) = match res { |
| 441 | Ok(f) => (Some(f), None), |
| 442 | Err(e) => (None, Some(e)), |
| 443 | }; |
| 444 | |
| 445 | $ft { |
| 446 | call: self.call.take().unwrap(), |
| 447 | fail_f: fail_f, |
| 448 | err: err, |
| 449 | } |
| 450 | } |
| 451 | } |
| 452 | |
| 453 | impl<T> Drop for $t<T> { |
| 454 | /// The corresponding RPC will be canceled if the sink did not call |
| 455 | /// [`close`] or [`fail`] before dropping. |
| 456 | /// |
| 457 | /// [`close`]: #method.close |
| 458 | /// [`fail`]: #method.fail |
| 459 | fn drop(&mut self) { |
| 460 | // We did not close it explicitly and it was not dropped in the `fail`. |
| 461 | if !self.closed && self.call.is_some() { |
| 462 | let mut call = self.call.take().unwrap(); |
| 463 | call.call(|c| c.call.cancel()); |
| 464 | } |
| 465 | } |
| 466 | } |
| 467 | |
| 468 | impl<T> Sink<(T, WriteFlags)> for $t<T> { |
| 469 | type Error = Error; |
| 470 | |
| 471 | #[inline] |
| 472 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { |
| 473 | if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? { |
| 474 | return Poll::Ready(Err(Error::RemoteStopped)); |
| 475 | } |
| 476 | Pin::new(&mut self.base).poll_ready(cx) |
| 477 | } |
| 478 | |
| 479 | #[inline] |
| 480 | fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> { |
| 481 | let t = &mut *self; |
| 482 | t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser) |
| 483 | } |
| 484 | |
| 485 | #[inline] |
| 486 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { |
| 487 | if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? { |
| 488 | return Poll::Ready(Err(Error::RemoteStopped)); |
| 489 | } |
| 490 | Pin::new(&mut self.base).poll_ready(cx) |
| 491 | } |
| 492 | |
| 493 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { |
| 494 | if self.flush_f.is_none() { |
| 495 | ready!(Pin::new(&mut self.base).poll_ready(cx)?); |
| 496 | |
| 497 | let send_metadata = self.base.send_metadata; |
| 498 | let t = &mut *self; |
| 499 | let status = &t.status; |
| 500 | let flush_f = t.call.as_mut().unwrap().call(|c| { |
| 501 | c.call |
| 502 | .start_send_status_from_server(status, send_metadata, &None, 0) |
| 503 | })?; |
| 504 | t.flush_f = Some(flush_f); |
| 505 | } |
| 506 | |
| 507 | if !self.flushed { |
| 508 | ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?); |
| 509 | self.flushed = true; |
| 510 | } |
| 511 | |
| 512 | ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?); |
| 513 | self.closed = true; |
| 514 | Poll::Ready(Ok(())) |
| 515 | } |
| 516 | } |
| 517 | |
| 518 | #[must_use = "if unused the sink failure may immediately cancel the RPC"] |
| 519 | pub struct $ft { |
| 520 | call: $holder, |
| 521 | fail_f: Option<BatchFuture>, |
| 522 | err: Option<Error>, |
| 523 | } |
| 524 | |
| 525 | impl Future for $ft { |
| 526 | type Output = Result<()>; |
| 527 | |
| 528 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { |
| 529 | if let Some(e) = self.err.take() { |
| 530 | return Poll::Ready(Err(e)); |
| 531 | } |
| 532 | |
| 533 | let readiness = self.call.call(|c| { |
| 534 | if c.finished { |
| 535 | return Poll::Ready(Ok(())); |
| 536 | } |
| 537 | |
| 538 | c.poll_finish(cx).map(|r| r.map(|_| ())) |
| 539 | })?; |
| 540 | |
| 541 | if let Some(ref mut f) = self.fail_f { |
| 542 | ready!(Pin::new(f).poll(cx)?); |
| 543 | } |
| 544 | |
| 545 | self.fail_f.take(); |
| 546 | readiness.map(Ok) |
| 547 | } |
| 548 | } |
| 549 | }; |
| 550 | } |
| 551 | |
| 552 | impl_stream_sink!( |
| 553 | /// A sink for server streaming call. |
| 554 | /// |
| 555 | /// To close the sink properly, you should call [`close`] or [`fail`] before dropping. |
| 556 | /// |
| 557 | /// [`close`]: #method.close |
| 558 | /// [`fail`]: #method.fail |
| 559 | #[must_use = "if unused the sink may immediately cancel the RPC"] |
| 560 | ServerStreamingSink, |
| 561 | ServerStreamingSinkFailure, |
| 562 | ShareCall |
| 563 | ); |
| 564 | impl_stream_sink!( |
| 565 | /// A sink for duplex streaming call. |
| 566 | /// |
| 567 | /// To close the sink properly, you should call [`close`] or [`fail`] before dropping. |
| 568 | /// |
| 569 | /// [`close`]: #method.close |
| 570 | /// [`fail`]: #method.fail |
| 571 | #[must_use = "if unused the sink may immediately cancel the RPC"] |
| 572 | DuplexSink, |
| 573 | DuplexSinkFailure, |
| 574 | Arc<Mutex<ShareCall>> |
| 575 | ); |
| 576 | |
| 577 | /// A context for rpc handling. |
| 578 | pub struct RpcContext<'a> { |
| 579 | ctx: RequestContext, |
| 580 | executor: Executor<'a>, |
| 581 | deadline: Deadline, |
| 582 | } |
| 583 | |
| 584 | impl<'a> RpcContext<'a> { |
| 585 | fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> { |
| 586 | RpcContext { |
| 587 | deadline: ctx.deadline(), |
| 588 | ctx, |
| 589 | executor: Executor::new(cq), |
| 590 | } |
| 591 | } |
| 592 | |
| 593 | fn kicker(&self) -> Kicker { |
| 594 | let call = self.call(); |
| 595 | Kicker::from_call(call) |
| 596 | } |
| 597 | |
| 598 | pub(crate) fn call(&self) -> Call { |
| 599 | self.ctx.call(self.executor.cq().clone()) |
| 600 | } |
| 601 | |
| 602 | pub fn method(&self) -> &[u8] { |
| 603 | self.ctx.method() |
| 604 | } |
| 605 | |
| 606 | pub fn host(&self) -> &[u8] { |
| 607 | self.ctx.host() |
| 608 | } |
| 609 | |
| 610 | pub fn deadline(&self) -> &Deadline { |
| 611 | &self.deadline |
| 612 | } |
| 613 | |
| 614 | /// Get the initial metadata sent by client. |
| 615 | pub fn request_headers(&self) -> &Metadata { |
| 616 | self.ctx.metadata() |
| 617 | } |
| 618 | |
| 619 | pub fn peer(&self) -> String { |
| 620 | self.ctx.peer() |
| 621 | } |
| 622 | |
| 623 | /// Wrapper around the gRPC Core AuthContext |
| 624 | /// |
| 625 | /// If the server binds in non-secure mode, this will return None |
| 626 | pub fn auth_context(&self) -> Option<AuthContext> { |
| 627 | self.ctx.auth_context() |
| 628 | } |
| 629 | |
| 630 | /// Spawn the future into current gRPC poll thread. |
| 631 | /// |
| 632 | /// This can reduce a lot of context switching, but please make |
| 633 | /// sure there is no heavy work in the future. |
| 634 | pub fn spawn<F>(&self, f: F) |
| 635 | where |
| 636 | F: Future<Output = ()> + Send + 'static, |
| 637 | { |
| 638 | self.executor.spawn(f, self.kicker()) |
| 639 | } |
| 640 | } |
| 641 | |
| 642 | // Following four helper functions are used to create a callback closure. |
| 643 | |
| 644 | macro_rules! accept_call { |
| 645 | ($call:expr) => { |
| 646 | match $call.start_server_side() { |
| 647 | Err(Error::QueueShutdown) => return, |
| 648 | Err(e) => panic!("unexpected error when trying to accept request: {:?}", e), |
| 649 | Ok(f) => f, |
| 650 | } |
| 651 | }; |
| 652 | } |
| 653 | |
| 654 | // Helper function to call a unary handler. |
| 655 | pub fn execute_unary<P, Q, F>( |
| 656 | ctx: RpcContext<'_>, |
| 657 | ser: SerializeFn<Q>, |
| 658 | de: DeserializeFn<P>, |
| 659 | payload: MessageReader, |
| 660 | f: &mut F, |
| 661 | ) where |
| 662 | F: FnMut(RpcContext<'_>, P, UnarySink<Q>), |
| 663 | { |
| 664 | let mut call = ctx.call(); |
| 665 | let close_f = accept_call!(call); |
| 666 | let request = match de(payload) { |
| 667 | Ok(f) => f, |
| 668 | Err(e) => { |
| 669 | let status = RpcStatus::new( |
| 670 | RpcStatusCode::INTERNAL, |
| 671 | Some(format!("Failed to deserialize response message: {:?}", e)), |
| 672 | ); |
| 673 | call.abort(&status); |
| 674 | return; |
| 675 | } |
| 676 | }; |
| 677 | let sink = UnarySink::new(ShareCall::new(call, close_f), ser); |
| 678 | f(ctx, request, sink) |
| 679 | } |
| 680 | |
| 681 | // Helper function to call client streaming handler. |
| 682 | pub fn execute_client_streaming<P, Q, F>( |
| 683 | ctx: RpcContext<'_>, |
| 684 | ser: SerializeFn<Q>, |
| 685 | de: DeserializeFn<P>, |
| 686 | f: &mut F, |
| 687 | ) where |
| 688 | F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>), |
| 689 | { |
| 690 | let mut call = ctx.call(); |
| 691 | let close_f = accept_call!(call); |
| 692 | let call = Arc::new(Mutex::new(ShareCall::new(call, close_f))); |
| 693 | |
| 694 | let req_s = RequestStream::new(call.clone(), de); |
| 695 | let sink = ClientStreamingSink::new(call, ser); |
| 696 | f(ctx, req_s, sink) |
| 697 | } |
| 698 | |
| 699 | // Helper function to call server streaming handler. |
| 700 | pub fn execute_server_streaming<P, Q, F>( |
| 701 | ctx: RpcContext<'_>, |
| 702 | ser: SerializeFn<Q>, |
| 703 | de: DeserializeFn<P>, |
| 704 | payload: MessageReader, |
| 705 | f: &mut F, |
| 706 | ) where |
| 707 | F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>), |
| 708 | { |
| 709 | let mut call = ctx.call(); |
| 710 | let close_f = accept_call!(call); |
| 711 | |
| 712 | let request = match de(payload) { |
| 713 | Ok(t) => t, |
| 714 | Err(e) => { |
| 715 | let status = RpcStatus::new( |
| 716 | RpcStatusCode::INTERNAL, |
| 717 | Some(format!("Failed to deserialize response message: {:?}", e)), |
| 718 | ); |
| 719 | call.abort(&status); |
| 720 | return; |
| 721 | } |
| 722 | }; |
| 723 | |
| 724 | let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser); |
| 725 | f(ctx, request, sink) |
| 726 | } |
| 727 | |
| 728 | // Helper function to call duplex streaming handler. |
| 729 | pub fn execute_duplex_streaming<P, Q, F>( |
| 730 | ctx: RpcContext<'_>, |
| 731 | ser: SerializeFn<Q>, |
| 732 | de: DeserializeFn<P>, |
| 733 | f: &mut F, |
| 734 | ) where |
| 735 | F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>), |
| 736 | { |
| 737 | let mut call = ctx.call(); |
| 738 | let close_f = accept_call!(call); |
| 739 | let call = Arc::new(Mutex::new(ShareCall::new(call, close_f))); |
| 740 | |
| 741 | let req_s = RequestStream::new(call.clone(), de); |
| 742 | let sink = DuplexSink::new(call, ser); |
| 743 | f(ctx, req_s, sink) |
| 744 | } |
| 745 | |
| 746 | // A helper function used to handle all undefined rpc calls. |
| 747 | pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) { |
| 748 | // Suppress needless-pass-by-value. |
| 749 | let ctx = ctx; |
| 750 | let mut call = ctx.call(cq); |
| 751 | accept_call!(call); |
| 752 | call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None)) |
| 753 | } |
| 754 | |
| 755 | // Helper function to call handler. |
| 756 | // |
| 757 | // Invoked after a request is ready to be handled. |
| 758 | fn execute( |
| 759 | ctx: RequestContext, |
| 760 | cq: &CompletionQueue, |
| 761 | payload: Option<MessageReader>, |
| 762 | f: &mut BoxHandler, |
| 763 | ) { |
| 764 | let rpc_ctx = RpcContext::new(ctx, cq); |
| 765 | f.handle(rpc_ctx, payload) |
| 766 | } |