blob: 8875d6d28eb8c9d94968a1976432d4394716ab83 [file] [log] [blame]
Jeff Vander Stoep761577d2020-10-14 15:21:00 +02001// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::ffi::CStr;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::{result, slice};
7
8use crate::grpc_sys::{
9 self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
10};
11use futures::future::Future;
12use futures::ready;
13use futures::sink::Sink;
14use futures::stream::Stream;
15use futures::task::{Context, Poll};
16use parking_lot::Mutex;
17
18use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
19use crate::auth_context::AuthContext;
20use crate::call::{
21 BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
22};
23use crate::codec::{DeserializeFn, SerializeFn};
24use crate::cq::CompletionQueue;
25use crate::error::{Error, Result};
26use crate::metadata::Metadata;
27use crate::server::{BoxHandler, RequestCallContext};
28use crate::task::{BatchFuture, CallTag, Executor, Kicker};
29
30pub struct Deadline {
31 spec: gpr_timespec,
32}
33
34impl Deadline {
35 fn new(spec: gpr_timespec) -> Deadline {
36 let realtime_spec =
37 unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
38
39 Deadline {
40 spec: realtime_spec,
41 }
42 }
43
44 pub fn exceeded(&self) -> bool {
45 unsafe {
46 let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
47 grpc_sys::gpr_time_cmp(now, self.spec) >= 0
48 }
49 }
50}
51
52/// Context for accepting a request.
53pub struct RequestContext {
54 ctx: *mut grpcwrap_request_call_context,
55 request_call: Option<RequestCallContext>,
56}
57
58impl RequestContext {
59 pub fn new(rc: RequestCallContext) -> RequestContext {
60 let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
61
62 RequestContext {
63 ctx,
64 request_call: Some(rc),
65 }
66 }
67
68 /// Try to accept a client side streaming request.
69 ///
70 /// Return error if the request is a client side unary request.
71 pub fn handle_stream_req(
72 self,
73 cq: &CompletionQueue,
74 rc: &mut RequestCallContext,
75 ) -> result::Result<(), Self> {
76 let handler = unsafe { rc.get_handler(self.method()) };
77 match handler {
78 Some(handler) => match handler.method_type() {
79 MethodType::Unary | MethodType::ServerStreaming => Err(self),
80 _ => {
81 execute(self, cq, None, handler);
82 Ok(())
83 }
84 },
85 None => {
86 execute_unimplemented(self, cq.clone());
87 Ok(())
88 }
89 }
90 }
91
92 /// Accept a client side unary request.
93 ///
94 /// This method should be called after `handle_stream_req`. When handling
95 /// client side unary request, handler will only be called after the unary
96 /// request is received.
97 pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
98 // fetch message before calling callback.
99 let tag = Box::new(CallTag::unary_request(self, rc));
100 let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
101 let request_ctx = tag.request_ctx().unwrap().as_ptr();
102 let tag_ptr = Box::into_raw(tag);
103 unsafe {
104 let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
105 let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
106 if code != grpc_call_error::GRPC_CALL_OK {
107 Box::from_raw(tag_ptr);
108 // it should not failed.
109 panic!("try to receive message fail: {:?}", code);
110 }
111 }
112 }
113
114 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
115 self.request_call.take()
116 }
117
118 pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
119 self.ctx
120 }
121
122 fn call(&self, cq: CompletionQueue) -> Call {
123 unsafe {
124 // It is okay to use a mutable pointer on a immutable reference, `self`,
125 // because grpcwrap_request_call_context_ref_call is thread-safe.
126 let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
127 assert!(!call.is_null());
128 Call::from_raw(call, cq)
129 }
130 }
131
132 pub fn method(&self) -> &[u8] {
133 let mut len = 0;
134 let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
135
136 unsafe { slice::from_raw_parts(method as _, len) }
137 }
138
139 fn host(&self) -> &[u8] {
140 let mut len = 0;
141 let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
142
143 unsafe { slice::from_raw_parts(host as _, len) }
144 }
145
146 fn deadline(&self) -> Deadline {
147 let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
148
149 Deadline::new(t)
150 }
151
152 fn metadata(&self) -> &Metadata {
153 unsafe {
154 let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
155 let arr_ptr: *const Metadata = ptr as _;
156 &*arr_ptr
157 }
158 }
159
160 fn peer(&self) -> String {
161 unsafe {
162 // RequestContext always holds a reference of the call.
163 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
164 let p = grpc_sys::grpc_call_get_peer(call);
165 let peer = CStr::from_ptr(p)
166 .to_str()
167 .expect("valid UTF-8 data")
168 .to_owned();
169 grpc_sys::gpr_free(p as _);
170 peer
171 }
172 }
173
174 /// If the server binds in non-secure mode, this will return None
175 fn auth_context(&self) -> Option<AuthContext> {
176 unsafe {
177 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
178 AuthContext::from_call_ptr(call)
179 }
180 }
181}
182
183impl Drop for RequestContext {
184 fn drop(&mut self) {
185 unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
186 }
187}
188
189/// A context for handling client side unary request.
190pub struct UnaryRequestContext {
191 request: RequestContext,
192 request_call: Option<RequestCallContext>,
193 batch: BatchContext,
194}
195
196impl UnaryRequestContext {
197 pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
198 UnaryRequestContext {
199 request: ctx,
200 request_call: Some(rc),
201 batch: BatchContext::new(),
202 }
203 }
204
205 pub fn batch_ctx(&self) -> &BatchContext {
206 &self.batch
207 }
208
209 pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
210 &mut self.batch
211 }
212
213 pub fn request_ctx(&self) -> &RequestContext {
214 &self.request
215 }
216
217 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
218 self.request_call.take()
219 }
220
221 pub fn handle(
222 self,
223 rc: &mut RequestCallContext,
224 cq: &CompletionQueue,
225 reader: Option<MessageReader>,
226 ) {
227 let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
228 if reader.is_some() {
229 return execute(self.request, cq, reader, handler);
230 }
231
232 let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned()));
233 self.request.call(cq.clone()).abort(&status)
234 }
235}
236
237/// A stream for client a streaming call and a duplex streaming call.
238///
239/// The corresponding RPC will be canceled if the stream did not
240/// finish before dropping.
241#[must_use = "if unused the RequestStream may immediately cancel the RPC"]
242pub struct RequestStream<T> {
243 call: Arc<Mutex<ShareCall>>,
244 base: StreamingBase,
245 de: DeserializeFn<T>,
246}
247
248impl<T> RequestStream<T> {
249 fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
250 RequestStream {
251 call,
252 base: StreamingBase::new(None),
253 de,
254 }
255 }
256}
257
258impl<T> Stream for RequestStream<T> {
259 type Item = Result<T>;
260
261 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
262 {
263 let mut call = self.call.lock();
264 call.check_alive()?;
265 }
266
267 let t = &mut *self;
268 match ready!(t.base.poll(cx, &mut t.call, false)?) {
269 None => Poll::Ready(None),
270 Some(data) => Poll::Ready(Some((t.de)(data))),
271 }
272 }
273}
274
275impl<T> Drop for RequestStream<T> {
276 /// The corresponding RPC will be canceled if the stream did not
277 /// finish before dropping.
278 fn drop(&mut self) {
279 self.base.on_drop(&mut self.call);
280 }
281}
282
283/// A helper macro used to implement server side unary sink.
284/// Not using generic here because we don't need to expose
285/// `CallHolder` or `Call` to caller.
286// TODO: Use type alias to be friendly for documentation.
287macro_rules! impl_unary_sink {
288 ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
289 pub struct $rt {
290 call: $holder,
291 cq_f: Option<BatchFuture>,
292 err: Option<Error>,
293 }
294
295 impl Future for $rt {
296 type Output = Result<()>;
297
298 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
299 if let Some(e) = self.err.take() {
300 return Poll::Ready(Err(e));
301 }
302
303 if self.cq_f.is_some() {
304 ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
305 self.cq_f.take();
306 }
307
308 ready!(self.call.call(|c| c.poll_finish(cx))?);
309 Poll::Ready(Ok(()))
310 }
311 }
312
313 $(#[$attr])*
314 pub struct $t<T> {
315 call: Option<$holder>,
316 write_flags: u32,
317 ser: SerializeFn<T>,
318 }
319
320 impl<T> $t<T> {
321 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
322 $t {
323 call: Some(call),
324 write_flags: 0,
325 ser: ser,
326 }
327 }
328
329 pub fn success(self, t: T) -> $rt {
330 self.complete(RpcStatus::ok(), Some(t))
331 }
332
333 pub fn fail(self, status: RpcStatus) -> $rt {
334 self.complete(status, None)
335 }
336
337 fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
338 let data = t.as_ref().map(|t| {
339 let mut buf = vec![];
340 (self.ser)(t, &mut buf);
341 buf
342 });
343
344 let write_flags = self.write_flags;
345 let res = self.call.as_mut().unwrap().call(|c| {
346 c.call
347 .start_send_status_from_server(&status, true, &data, write_flags)
348 });
349
350 let (cq_f, err) = match res {
351 Ok(f) => (Some(f), None),
352 Err(e) => (None, Some(e)),
353 };
354
355 $rt {
356 call: self.call.take().unwrap(),
357 cq_f: cq_f,
358 err: err,
359 }
360 }
361 }
362
363 impl<T> Drop for $t<T> {
364 /// The corresponding RPC will be canceled if the sink did not
365 /// send a response before dropping.
366 fn drop(&mut self) {
367 self.call
368 .as_mut()
369 .map(|call| call.call(|c| c.call.cancel()));
370 }
371 }
372 };
373}
374
375impl_unary_sink!(
376 /// A sink for unary call.
377 ///
378 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
379 ///
380 /// [`success`]: #method.success
381 /// [`fail`]: #method.fail
382 #[must_use = "if unused the sink may immediately cancel the RPC"]
383 UnarySink,
384 UnarySinkResult,
385 ShareCall
386);
387impl_unary_sink!(
388 /// A sink for client streaming call.
389 ///
390 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
391 ///
392 /// [`success`]: #method.success
393 /// [`fail`]: #method.fail
394 #[must_use = "if unused the sink may immediately cancel the RPC"]
395 ClientStreamingSink,
396 ClientStreamingSinkResult,
397 Arc<Mutex<ShareCall>>
398);
399
400// A macro helper to implement server side streaming sink.
401macro_rules! impl_stream_sink {
402 ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
403 $(#[$attr])*
404 pub struct $t<T> {
405 call: Option<$holder>,
406 base: SinkBase,
407 flush_f: Option<BatchFuture>,
408 status: RpcStatus,
409 flushed: bool,
410 closed: bool,
411 ser: SerializeFn<T>,
412 }
413
414 impl<T> $t<T> {
415 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
416 $t {
417 call: Some(call),
418 base: SinkBase::new(true),
419 flush_f: None,
420 status: RpcStatus::ok(),
421 flushed: false,
422 closed: false,
423 ser: ser,
424 }
425 }
426
427 pub fn set_status(&mut self, status: RpcStatus) {
428 assert!(self.flush_f.is_none());
429 self.status = status;
430 }
431
432 pub fn fail(mut self, status: RpcStatus) -> $ft {
433 assert!(self.flush_f.is_none());
434 let send_metadata = self.base.send_metadata;
435 let res = self.call.as_mut().unwrap().call(|c| {
436 c.call
437 .start_send_status_from_server(&status, send_metadata, &None, 0)
438 });
439
440 let (fail_f, err) = match res {
441 Ok(f) => (Some(f), None),
442 Err(e) => (None, Some(e)),
443 };
444
445 $ft {
446 call: self.call.take().unwrap(),
447 fail_f: fail_f,
448 err: err,
449 }
450 }
451 }
452
453 impl<T> Drop for $t<T> {
454 /// The corresponding RPC will be canceled if the sink did not call
455 /// [`close`] or [`fail`] before dropping.
456 ///
457 /// [`close`]: #method.close
458 /// [`fail`]: #method.fail
459 fn drop(&mut self) {
460 // We did not close it explicitly and it was not dropped in the `fail`.
461 if !self.closed && self.call.is_some() {
462 let mut call = self.call.take().unwrap();
463 call.call(|c| c.call.cancel());
464 }
465 }
466 }
467
468 impl<T> Sink<(T, WriteFlags)> for $t<T> {
469 type Error = Error;
470
471 #[inline]
472 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
473 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
474 return Poll::Ready(Err(Error::RemoteStopped));
475 }
476 Pin::new(&mut self.base).poll_ready(cx)
477 }
478
479 #[inline]
480 fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
481 let t = &mut *self;
482 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser)
483 }
484
485 #[inline]
486 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
487 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
488 return Poll::Ready(Err(Error::RemoteStopped));
489 }
490 Pin::new(&mut self.base).poll_ready(cx)
491 }
492
493 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
494 if self.flush_f.is_none() {
495 ready!(Pin::new(&mut self.base).poll_ready(cx)?);
496
497 let send_metadata = self.base.send_metadata;
498 let t = &mut *self;
499 let status = &t.status;
500 let flush_f = t.call.as_mut().unwrap().call(|c| {
501 c.call
502 .start_send_status_from_server(status, send_metadata, &None, 0)
503 })?;
504 t.flush_f = Some(flush_f);
505 }
506
507 if !self.flushed {
508 ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
509 self.flushed = true;
510 }
511
512 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
513 self.closed = true;
514 Poll::Ready(Ok(()))
515 }
516 }
517
518 #[must_use = "if unused the sink failure may immediately cancel the RPC"]
519 pub struct $ft {
520 call: $holder,
521 fail_f: Option<BatchFuture>,
522 err: Option<Error>,
523 }
524
525 impl Future for $ft {
526 type Output = Result<()>;
527
528 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
529 if let Some(e) = self.err.take() {
530 return Poll::Ready(Err(e));
531 }
532
533 let readiness = self.call.call(|c| {
534 if c.finished {
535 return Poll::Ready(Ok(()));
536 }
537
538 c.poll_finish(cx).map(|r| r.map(|_| ()))
539 })?;
540
541 if let Some(ref mut f) = self.fail_f {
542 ready!(Pin::new(f).poll(cx)?);
543 }
544
545 self.fail_f.take();
546 readiness.map(Ok)
547 }
548 }
549 };
550}
551
552impl_stream_sink!(
553 /// A sink for server streaming call.
554 ///
555 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
556 ///
557 /// [`close`]: #method.close
558 /// [`fail`]: #method.fail
559 #[must_use = "if unused the sink may immediately cancel the RPC"]
560 ServerStreamingSink,
561 ServerStreamingSinkFailure,
562 ShareCall
563);
564impl_stream_sink!(
565 /// A sink for duplex streaming call.
566 ///
567 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
568 ///
569 /// [`close`]: #method.close
570 /// [`fail`]: #method.fail
571 #[must_use = "if unused the sink may immediately cancel the RPC"]
572 DuplexSink,
573 DuplexSinkFailure,
574 Arc<Mutex<ShareCall>>
575);
576
577/// A context for rpc handling.
578pub struct RpcContext<'a> {
579 ctx: RequestContext,
580 executor: Executor<'a>,
581 deadline: Deadline,
582}
583
584impl<'a> RpcContext<'a> {
585 fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
586 RpcContext {
587 deadline: ctx.deadline(),
588 ctx,
589 executor: Executor::new(cq),
590 }
591 }
592
593 fn kicker(&self) -> Kicker {
594 let call = self.call();
595 Kicker::from_call(call)
596 }
597
598 pub(crate) fn call(&self) -> Call {
599 self.ctx.call(self.executor.cq().clone())
600 }
601
602 pub fn method(&self) -> &[u8] {
603 self.ctx.method()
604 }
605
606 pub fn host(&self) -> &[u8] {
607 self.ctx.host()
608 }
609
610 pub fn deadline(&self) -> &Deadline {
611 &self.deadline
612 }
613
614 /// Get the initial metadata sent by client.
615 pub fn request_headers(&self) -> &Metadata {
616 self.ctx.metadata()
617 }
618
619 pub fn peer(&self) -> String {
620 self.ctx.peer()
621 }
622
623 /// Wrapper around the gRPC Core AuthContext
624 ///
625 /// If the server binds in non-secure mode, this will return None
626 pub fn auth_context(&self) -> Option<AuthContext> {
627 self.ctx.auth_context()
628 }
629
630 /// Spawn the future into current gRPC poll thread.
631 ///
632 /// This can reduce a lot of context switching, but please make
633 /// sure there is no heavy work in the future.
634 pub fn spawn<F>(&self, f: F)
635 where
636 F: Future<Output = ()> + Send + 'static,
637 {
638 self.executor.spawn(f, self.kicker())
639 }
640}
641
642// Following four helper functions are used to create a callback closure.
643
644macro_rules! accept_call {
645 ($call:expr) => {
646 match $call.start_server_side() {
647 Err(Error::QueueShutdown) => return,
648 Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
649 Ok(f) => f,
650 }
651 };
652}
653
654// Helper function to call a unary handler.
655pub fn execute_unary<P, Q, F>(
656 ctx: RpcContext<'_>,
657 ser: SerializeFn<Q>,
658 de: DeserializeFn<P>,
659 payload: MessageReader,
660 f: &mut F,
661) where
662 F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
663{
664 let mut call = ctx.call();
665 let close_f = accept_call!(call);
666 let request = match de(payload) {
667 Ok(f) => f,
668 Err(e) => {
669 let status = RpcStatus::new(
670 RpcStatusCode::INTERNAL,
671 Some(format!("Failed to deserialize response message: {:?}", e)),
672 );
673 call.abort(&status);
674 return;
675 }
676 };
677 let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
678 f(ctx, request, sink)
679}
680
681// Helper function to call client streaming handler.
682pub fn execute_client_streaming<P, Q, F>(
683 ctx: RpcContext<'_>,
684 ser: SerializeFn<Q>,
685 de: DeserializeFn<P>,
686 f: &mut F,
687) where
688 F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
689{
690 let mut call = ctx.call();
691 let close_f = accept_call!(call);
692 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
693
694 let req_s = RequestStream::new(call.clone(), de);
695 let sink = ClientStreamingSink::new(call, ser);
696 f(ctx, req_s, sink)
697}
698
699// Helper function to call server streaming handler.
700pub fn execute_server_streaming<P, Q, F>(
701 ctx: RpcContext<'_>,
702 ser: SerializeFn<Q>,
703 de: DeserializeFn<P>,
704 payload: MessageReader,
705 f: &mut F,
706) where
707 F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
708{
709 let mut call = ctx.call();
710 let close_f = accept_call!(call);
711
712 let request = match de(payload) {
713 Ok(t) => t,
714 Err(e) => {
715 let status = RpcStatus::new(
716 RpcStatusCode::INTERNAL,
717 Some(format!("Failed to deserialize response message: {:?}", e)),
718 );
719 call.abort(&status);
720 return;
721 }
722 };
723
724 let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
725 f(ctx, request, sink)
726}
727
728// Helper function to call duplex streaming handler.
729pub fn execute_duplex_streaming<P, Q, F>(
730 ctx: RpcContext<'_>,
731 ser: SerializeFn<Q>,
732 de: DeserializeFn<P>,
733 f: &mut F,
734) where
735 F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
736{
737 let mut call = ctx.call();
738 let close_f = accept_call!(call);
739 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
740
741 let req_s = RequestStream::new(call.clone(), de);
742 let sink = DuplexSink::new(call, ser);
743 f(ctx, req_s, sink)
744}
745
746// A helper function used to handle all undefined rpc calls.
747pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
748 // Suppress needless-pass-by-value.
749 let ctx = ctx;
750 let mut call = ctx.call(cq);
751 accept_call!(call);
752 call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None))
753}
754
755// Helper function to call handler.
756//
757// Invoked after a request is ready to be handled.
758fn execute(
759 ctx: RequestContext,
760 cq: &CompletionQueue,
761 payload: Option<MessageReader>,
762 f: &mut BoxHandler,
763) {
764 let rpc_ctx = RpcContext::new(ctx, cq);
765 f.handle(rpc_ctx, payload)
766}