blob: 02e94192606c4263d3b7ea1a67ca0e336b1f01eb [file] [log] [blame]
Jeff Vander Stoep761577d2020-10-14 15:21:00 +02001// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::fmt::{self, Debug, Formatter};
4use std::sync::Arc;
5
6use super::Inner;
7use crate::call::{BatchContext, MessageReader, RpcStatusCode};
8use crate::error::Error;
9
10/// Batch job type.
11#[derive(PartialEq, Debug)]
12pub enum BatchType {
13 /// Finish without reading any message.
14 Finish,
15 /// Extract one message when finish.
16 Read,
17 /// Check the rpc code and then extract one message.
18 CheckRead,
19}
20
21/// A promise used to resolve batch jobs.
22pub struct Batch {
23 ty: BatchType,
24 ctx: BatchContext,
25 inner: Arc<Inner<Option<MessageReader>>>,
26}
27
28impl Batch {
29 pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
30 Batch {
31 ty,
32 ctx: BatchContext::new(),
33 inner,
34 }
35 }
36
37 pub fn context(&self) -> &BatchContext {
38 &self.ctx
39 }
40
41 fn read_one_msg(&mut self, success: bool) {
42 let task = {
43 let mut guard = self.inner.lock();
44 if success {
45 guard.set_result(Ok(self.ctx.recv_message()))
46 } else {
47 // rely on C core to handle the failed read (e.g. deliver approriate
48 // statusCode on the clientside).
49 guard.set_result(Ok(None))
50 }
51 };
52 task.map(|t| t.wake());
53 }
54
55 fn finish_response(&mut self, succeed: bool) {
56 let task = {
57 let mut guard = self.inner.lock();
58 if succeed {
59 let status = self.ctx.rpc_status();
60 if status.status == RpcStatusCode::OK {
61 guard.set_result(Ok(None))
62 } else {
63 guard.set_result(Err(Error::RpcFailure(status)))
64 }
65 } else {
66 guard.set_result(Err(Error::RemoteStopped))
67 }
68 };
69 task.map(|t| t.wake());
70 }
71
72 fn handle_unary_response(&mut self) {
73 let task = {
74 let mut guard = self.inner.lock();
75 let status = self.ctx.rpc_status();
76 if status.status == RpcStatusCode::OK {
77 guard.set_result(Ok(self.ctx.recv_message()))
78 } else {
79 guard.set_result(Err(Error::RpcFailure(status)))
80 }
81 };
82 task.map(|t| t.wake());
83 }
84
85 pub fn resolve(mut self, success: bool) {
86 match self.ty {
87 BatchType::CheckRead => {
88 assert!(success);
89 self.handle_unary_response();
90 }
91 BatchType::Finish => {
92 self.finish_response(success);
93 }
94 BatchType::Read => {
95 self.read_one_msg(success);
96 }
97 }
98 }
99}
100
101impl Debug for Batch {
102 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
103 write!(f, "Batch [{:?}]", self.ty)
104 }
105}
106
107/// A promise used to resolve async shutdown result.
108pub struct Shutdown {
109 inner: Arc<Inner<()>>,
110}
111
112impl Shutdown {
113 pub fn new(inner: Arc<Inner<()>>) -> Shutdown {
114 Shutdown { inner }
115 }
116
117 pub fn resolve(self, success: bool) {
118 let task = {
119 let mut guard = self.inner.lock();
120 if success {
121 guard.set_result(Ok(()))
122 } else {
123 guard.set_result(Err(Error::ShutdownFailed))
124 }
125 };
126 task.map(|t| t.wake());
127 }
128}