blob: 7625bcc38d314896e4eaaf4cd729bf7ec2259f07 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Nicolas "Pixel" Noble1ff52d52015-03-01 05:24:36 +010034#ifndef GRPCXX_STREAM_H
35#define GRPCXX_STREAM_H
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080036
Craig Tillerc4965752015-02-09 09:51:00 -080037#include <grpc++/channel_interface.h>
Yang Gao968ca532015-02-11 16:23:47 -080038#include <grpc++/client_context.h>
Craig Tillerc4965752015-02-09 09:51:00 -080039#include <grpc++/completion_queue.h>
Craig Tiller01567522015-02-11 21:08:49 -080040#include <grpc++/server_context.h>
Craig Tiller20f4af22015-02-10 09:52:15 -080041#include <grpc++/impl/call.h>
Craig Tiller1c9a2a92015-02-12 14:10:25 -080042#include <grpc++/impl/service_type.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc++/status.h>
44#include <grpc/support/log.h>
45
46namespace grpc {
47
48// Common interface for all client side streaming.
49class ClientStreamingInterface {
50 public:
51 virtual ~ClientStreamingInterface() {}
52
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053 // Wait until the stream finishes, and return the final status. When the
54 // client side declares it has no more message to send, either implicitly or
55 // by calling WritesDone, it needs to make sure there is no more message to
56 // be received from the server, either implicitly or by getting a false from
57 // a Read(). Otherwise, this implicitly cancels the stream.
Craig Tillerc4965752015-02-09 09:51:00 -080058 virtual Status Finish() = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080059};
60
61// An interface that yields a sequence of R messages.
62template <class R>
63class ReaderInterface {
64 public:
65 virtual ~ReaderInterface() {}
66
67 // Blocking read a message and parse to msg. Returns true on success.
68 // The method returns false when there will be no more incoming messages,
69 // either because the other side has called WritesDone or the stream has
70 // failed (or been cancelled).
71 virtual bool Read(R* msg) = 0;
72};
73
74// An interface that can be fed a sequence of W messages.
75template <class W>
76class WriterInterface {
77 public:
78 virtual ~WriterInterface() {}
79
80 // Blocking write msg to the stream. Returns true on success.
81 // Returns false when the stream has been closed.
82 virtual bool Write(const W& msg) = 0;
83};
84
85template <class R>
Craig Tillercf133f42015-02-26 14:05:56 -080086class ClientReader GRPC_FINAL : public ClientStreamingInterface,
87 public ReaderInterface<R> {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088 public:
89 // Blocking create a stream and write the first request out.
Craig Tiller573523f2015-02-17 07:38:26 -080090 ClientReader(ChannelInterface* channel, const RpcMethod& method,
Yang Gao7694c352015-03-03 09:48:06 -080091 ClientContext* context, const grpc::protobuf::Message& request)
Yang Gao968ca532015-02-11 16:23:47 -080092 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
Craig Tillerc4965752015-02-09 09:51:00 -080093 CallOpBuffer buf;
Craig Tillera847a8f2015-02-11 21:20:25 -080094 buf.AddSendInitialMetadata(&context->send_initial_metadata_);
Craig Tillerc4965752015-02-09 09:51:00 -080095 buf.AddSendMessage(request);
96 buf.AddClientSendClose();
Craig Tillerde917062015-02-09 17:15:03 -080097 call_.PerformOps(&buf);
98 cq_.Pluck(&buf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099 }
100
Yang Gaofd7199f2015-02-11 23:14:49 -0800101 // Blocking wait for initial metadata from server. The received metadata
Yang Gaoea222b22015-02-12 09:38:51 -0800102 // can only be accessed after this call returns. Should only be called before
103 // the first read. Calling this method is optional, and if it is not called
104 // the metadata will be available in ClientContext after the first read.
Yang Gaofd7199f2015-02-11 23:14:49 -0800105 void WaitForInitialMetadata() {
Yang Gaoea222b22015-02-12 09:38:51 -0800106 GPR_ASSERT(!context_->initial_metadata_received_);
107
108 CallOpBuffer buf;
Yang Gao2b7f5372015-02-18 00:45:53 -0800109 buf.AddRecvInitialMetadata(context_);
Yang Gaoea222b22015-02-12 09:38:51 -0800110 call_.PerformOps(&buf);
111 GPR_ASSERT(cq_.Pluck(&buf));
Yang Gaofd7199f2015-02-11 23:14:49 -0800112 }
113
Craig Tillercf133f42015-02-26 14:05:56 -0800114 virtual bool Read(R* msg) GRPC_OVERRIDE {
Craig Tillerc4965752015-02-09 09:51:00 -0800115 CallOpBuffer buf;
Yang Gaoea222b22015-02-12 09:38:51 -0800116 if (!context_->initial_metadata_received_) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800117 buf.AddRecvInitialMetadata(context_);
Yang Gaoea222b22015-02-12 09:38:51 -0800118 }
Yang Gao0c7aafa2015-02-12 22:51:38 -0800119 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800120 call_.PerformOps(&buf);
Yang Gao0c7aafa2015-02-12 22:51:38 -0800121 return cq_.Pluck(&buf) && buf.got_message;
Craig Tillerc4965752015-02-09 09:51:00 -0800122 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800123
Craig Tillercf133f42015-02-26 14:05:56 -0800124 virtual Status Finish() GRPC_OVERRIDE {
Craig Tillerc4965752015-02-09 09:51:00 -0800125 CallOpBuffer buf;
126 Status status;
Yang Gao2b7f5372015-02-18 00:45:53 -0800127 buf.AddClientRecvStatus(context_, &status);
Craig Tillerde917062015-02-09 17:15:03 -0800128 call_.PerformOps(&buf);
129 GPR_ASSERT(cq_.Pluck(&buf));
Craig Tillerc4965752015-02-09 09:51:00 -0800130 return status;
131 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132
133 private:
Yang Gao968ca532015-02-11 16:23:47 -0800134 ClientContext* context_;
Craig Tillerc4965752015-02-09 09:51:00 -0800135 CompletionQueue cq_;
136 Call call_;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800137};
138
139template <class W>
Craig Tillercf133f42015-02-26 14:05:56 -0800140class ClientWriter GRPC_FINAL : public ClientStreamingInterface,
141 public WriterInterface<W> {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800142 public:
143 // Blocking create a stream.
Craig Tiller573523f2015-02-17 07:38:26 -0800144 ClientWriter(ChannelInterface* channel, const RpcMethod& method,
Yang Gao7694c352015-03-03 09:48:06 -0800145 ClientContext* context, grpc::protobuf::Message* response)
Craig Tiller573523f2015-02-17 07:38:26 -0800146 : context_(context),
147 response_(response),
Craig Tiller01567522015-02-11 21:08:49 -0800148 call_(channel->CreateCall(method, context, &cq_)) {
149 CallOpBuffer buf;
150 buf.AddSendInitialMetadata(&context->send_initial_metadata_);
151 call_.PerformOps(&buf);
152 cq_.Pluck(&buf);
153 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154
Craig Tillercf133f42015-02-26 14:05:56 -0800155 virtual bool Write(const W& msg) GRPC_OVERRIDE {
Craig Tillerc4965752015-02-09 09:51:00 -0800156 CallOpBuffer buf;
157 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800158 call_.PerformOps(&buf);
159 return cq_.Pluck(&buf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800160 }
161
Craig Tillerc4965752015-02-09 09:51:00 -0800162 virtual bool WritesDone() {
163 CallOpBuffer buf;
164 buf.AddClientSendClose();
Craig Tillerde917062015-02-09 17:15:03 -0800165 call_.PerformOps(&buf);
166 return cq_.Pluck(&buf);
Craig Tillerc4965752015-02-09 09:51:00 -0800167 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800168
169 // Read the final response and wait for the final status.
Craig Tillercf133f42015-02-26 14:05:56 -0800170 virtual Status Finish() GRPC_OVERRIDE {
Craig Tillerc4965752015-02-09 09:51:00 -0800171 CallOpBuffer buf;
172 Status status;
Yang Gao0c7aafa2015-02-12 22:51:38 -0800173 buf.AddRecvMessage(response_);
Yang Gao2b7f5372015-02-18 00:45:53 -0800174 buf.AddClientRecvStatus(context_, &status);
Craig Tillerde917062015-02-09 17:15:03 -0800175 call_.PerformOps(&buf);
Yang Gao0c7aafa2015-02-12 22:51:38 -0800176 GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
Craig Tillerc4965752015-02-09 09:51:00 -0800177 return status;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800178 }
179
180 private:
Yang Gao968ca532015-02-11 16:23:47 -0800181 ClientContext* context_;
Yang Gao7694c352015-03-03 09:48:06 -0800182 grpc::protobuf::Message* const response_;
Craig Tillerc4965752015-02-09 09:51:00 -0800183 CompletionQueue cq_;
184 Call call_;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800185};
186
187// Client-side interface for bi-directional streaming.
188template <class W, class R>
Craig Tillercf133f42015-02-26 14:05:56 -0800189class ClientReaderWriter GRPC_FINAL : public ClientStreamingInterface,
190 public WriterInterface<W>,
191 public ReaderInterface<R> {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800192 public:
193 // Blocking create a stream.
Craig Tiller573523f2015-02-17 07:38:26 -0800194 ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
195 ClientContext* context)
Craig Tillera847a8f2015-02-11 21:20:25 -0800196 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
197 CallOpBuffer buf;
198 buf.AddSendInitialMetadata(&context->send_initial_metadata_);
199 call_.PerformOps(&buf);
200 GPR_ASSERT(cq_.Pluck(&buf));
201 }
Craig Tillerc4965752015-02-09 09:51:00 -0800202
Yang Gaofd7199f2015-02-11 23:14:49 -0800203 // Blocking wait for initial metadata from server. The received metadata
Yang Gaoea222b22015-02-12 09:38:51 -0800204 // can only be accessed after this call returns. Should only be called before
205 // the first read. Calling this method is optional, and if it is not called
206 // the metadata will be available in ClientContext after the first read.
Yang Gaofd7199f2015-02-11 23:14:49 -0800207 void WaitForInitialMetadata() {
Yang Gaoea222b22015-02-12 09:38:51 -0800208 GPR_ASSERT(!context_->initial_metadata_received_);
209
210 CallOpBuffer buf;
Yang Gao2b7f5372015-02-18 00:45:53 -0800211 buf.AddRecvInitialMetadata(context_);
Yang Gaoea222b22015-02-12 09:38:51 -0800212 call_.PerformOps(&buf);
213 GPR_ASSERT(cq_.Pluck(&buf));
Yang Gaofd7199f2015-02-11 23:14:49 -0800214 }
215
Craig Tillercf133f42015-02-26 14:05:56 -0800216 virtual bool Read(R* msg) GRPC_OVERRIDE {
Craig Tillerc4965752015-02-09 09:51:00 -0800217 CallOpBuffer buf;
Yang Gaoea222b22015-02-12 09:38:51 -0800218 if (!context_->initial_metadata_received_) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800219 buf.AddRecvInitialMetadata(context_);
Yang Gaoea222b22015-02-12 09:38:51 -0800220 }
Yang Gao0c7aafa2015-02-12 22:51:38 -0800221 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800222 call_.PerformOps(&buf);
Yang Gao0c7aafa2015-02-12 22:51:38 -0800223 return cq_.Pluck(&buf) && buf.got_message;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800224 }
225
Craig Tillercf133f42015-02-26 14:05:56 -0800226 virtual bool Write(const W& msg) GRPC_OVERRIDE {
Craig Tillerc4965752015-02-09 09:51:00 -0800227 CallOpBuffer buf;
228 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800229 call_.PerformOps(&buf);
230 return cq_.Pluck(&buf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800231 }
232
Craig Tillerc4965752015-02-09 09:51:00 -0800233 virtual bool WritesDone() {
234 CallOpBuffer buf;
235 buf.AddClientSendClose();
Craig Tillerde917062015-02-09 17:15:03 -0800236 call_.PerformOps(&buf);
237 return cq_.Pluck(&buf);
Craig Tillerc4965752015-02-09 09:51:00 -0800238 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800239
Craig Tillercf133f42015-02-26 14:05:56 -0800240 virtual Status Finish() GRPC_OVERRIDE {
Craig Tillerc4965752015-02-09 09:51:00 -0800241 CallOpBuffer buf;
242 Status status;
Yang Gao2b7f5372015-02-18 00:45:53 -0800243 buf.AddClientRecvStatus(context_, &status);
Craig Tillerde917062015-02-09 17:15:03 -0800244 call_.PerformOps(&buf);
245 GPR_ASSERT(cq_.Pluck(&buf));
Craig Tillerc4965752015-02-09 09:51:00 -0800246 return status;
247 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800248
249 private:
Yang Gao968ca532015-02-11 16:23:47 -0800250 ClientContext* context_;
Craig Tillerc4965752015-02-09 09:51:00 -0800251 CompletionQueue cq_;
252 Call call_;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800253};
254
nnoble0c475f02014-12-05 15:37:39 -0800255template <class R>
Craig Tillercf133f42015-02-26 14:05:56 -0800256class ServerReader GRPC_FINAL : public ReaderInterface<R> {
nnoble0c475f02014-12-05 15:37:39 -0800257 public:
Yang Gaob492f062015-02-11 23:43:20 -0800258 ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
259
260 void SendInitialMetadata() {
Yang Gaoea222b22015-02-12 09:38:51 -0800261 GPR_ASSERT(!ctx_->sent_initial_metadata_);
262
263 CallOpBuffer buf;
264 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
265 ctx_->sent_initial_metadata_ = true;
266 call_->PerformOps(&buf);
267 call_->cq()->Pluck(&buf);
Yang Gaob492f062015-02-11 23:43:20 -0800268 }
Yang Gao8a3bbb52015-02-09 14:10:59 -0800269
Craig Tillercf133f42015-02-26 14:05:56 -0800270 virtual bool Read(R* msg) GRPC_OVERRIDE {
Yang Gao8a3bbb52015-02-09 14:10:59 -0800271 CallOpBuffer buf;
Yang Gao0c7aafa2015-02-12 22:51:38 -0800272 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800273 call_->PerformOps(&buf);
Yang Gao0c7aafa2015-02-12 22:51:38 -0800274 return call_->cq()->Pluck(&buf) && buf.got_message;
nnoble0c475f02014-12-05 15:37:39 -0800275 }
276
nnoble0c475f02014-12-05 15:37:39 -0800277 private:
Craig Tiller01567522015-02-11 21:08:49 -0800278 Call* const call_;
279 ServerContext* const ctx_;
nnoble0c475f02014-12-05 15:37:39 -0800280};
281
282template <class W>
Craig Tillercf133f42015-02-26 14:05:56 -0800283class ServerWriter GRPC_FINAL : public WriterInterface<W> {
nnoble0c475f02014-12-05 15:37:39 -0800284 public:
Yang Gaob492f062015-02-11 23:43:20 -0800285 ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
286
287 void SendInitialMetadata() {
Yang Gaoea222b22015-02-12 09:38:51 -0800288 GPR_ASSERT(!ctx_->sent_initial_metadata_);
289
290 CallOpBuffer buf;
291 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
292 ctx_->sent_initial_metadata_ = true;
293 call_->PerformOps(&buf);
294 call_->cq()->Pluck(&buf);
Yang Gaob492f062015-02-11 23:43:20 -0800295 }
nnoble0c475f02014-12-05 15:37:39 -0800296
Craig Tillercf133f42015-02-26 14:05:56 -0800297 virtual bool Write(const W& msg) GRPC_OVERRIDE {
Yang Gao75ec2b12015-02-09 16:03:41 -0800298 CallOpBuffer buf;
Yang Gaoea222b22015-02-12 09:38:51 -0800299 if (!ctx_->sent_initial_metadata_) {
300 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
301 ctx_->sent_initial_metadata_ = true;
302 }
Yang Gao75ec2b12015-02-09 16:03:41 -0800303 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800304 call_->PerformOps(&buf);
305 return call_->cq()->Pluck(&buf);
nnoble0c475f02014-12-05 15:37:39 -0800306 }
307
308 private:
Craig Tiller01567522015-02-11 21:08:49 -0800309 Call* const call_;
310 ServerContext* const ctx_;
nnoble0c475f02014-12-05 15:37:39 -0800311};
312
313// Server-side interface for bi-directional streaming.
314template <class W, class R>
Craig Tillercf133f42015-02-26 14:05:56 -0800315class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
316 public ReaderInterface<R> {
nnoble0c475f02014-12-05 15:37:39 -0800317 public:
Yang Gaob492f062015-02-11 23:43:20 -0800318 ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
319
320 void SendInitialMetadata() {
Yang Gaoea222b22015-02-12 09:38:51 -0800321 GPR_ASSERT(!ctx_->sent_initial_metadata_);
322
323 CallOpBuffer buf;
324 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
325 ctx_->sent_initial_metadata_ = true;
326 call_->PerformOps(&buf);
327 call_->cq()->Pluck(&buf);
Yang Gaob492f062015-02-11 23:43:20 -0800328 }
Yang Gao75ec2b12015-02-09 16:03:41 -0800329
Craig Tillercf133f42015-02-26 14:05:56 -0800330 virtual bool Read(R* msg) GRPC_OVERRIDE {
Yang Gao75ec2b12015-02-09 16:03:41 -0800331 CallOpBuffer buf;
Yang Gao0c7aafa2015-02-12 22:51:38 -0800332 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800333 call_->PerformOps(&buf);
Yang Gao0c7aafa2015-02-12 22:51:38 -0800334 return call_->cq()->Pluck(&buf) && buf.got_message;
nnoble0c475f02014-12-05 15:37:39 -0800335 }
336
Craig Tillercf133f42015-02-26 14:05:56 -0800337 virtual bool Write(const W& msg) GRPC_OVERRIDE {
Yang Gao75ec2b12015-02-09 16:03:41 -0800338 CallOpBuffer buf;
Yang Gaoea222b22015-02-12 09:38:51 -0800339 if (!ctx_->sent_initial_metadata_) {
340 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
341 ctx_->sent_initial_metadata_ = true;
342 }
Yang Gao75ec2b12015-02-09 16:03:41 -0800343 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800344 call_->PerformOps(&buf);
345 return call_->cq()->Pluck(&buf);
nnoble0c475f02014-12-05 15:37:39 -0800346 }
347
348 private:
Craig Tiller01567522015-02-11 21:08:49 -0800349 Call* const call_;
350 ServerContext* const ctx_;
nnoble0c475f02014-12-05 15:37:39 -0800351};
352
Yang Gao75ec2b12015-02-09 16:03:41 -0800353// Async interfaces
354// Common interface for all client side streaming.
355class ClientAsyncStreamingInterface {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800356 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800357 virtual ~ClientAsyncStreamingInterface() {}
Craig Tiller2dff17d2015-02-09 12:42:23 -0800358
Yang Gao424bc922015-02-12 10:24:39 -0800359 virtual void ReadInitialMetadata(void* tag) = 0;
360
Yang Gao75ec2b12015-02-09 16:03:41 -0800361 virtual void Finish(Status* status, void* tag) = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800362};
Craig Tiller2dff17d2015-02-09 12:42:23 -0800363
Yang Gao75ec2b12015-02-09 16:03:41 -0800364// An interface that yields a sequence of R messages.
365template <class R>
366class AsyncReaderInterface {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800368 virtual ~AsyncReaderInterface() {}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800369
Yang Gao75ec2b12015-02-09 16:03:41 -0800370 virtual void Read(R* msg, void* tag) = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800371};
372
Yang Gao75ec2b12015-02-09 16:03:41 -0800373// An interface that can be fed a sequence of W messages.
Craig Tiller2dff17d2015-02-09 12:42:23 -0800374template <class W>
Yang Gao75ec2b12015-02-09 16:03:41 -0800375class AsyncWriterInterface {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800376 public:
Craig Tiller53191642015-02-09 16:15:23 -0800377 virtual ~AsyncWriterInterface() {}
Craig Tiller2dff17d2015-02-09 12:42:23 -0800378
Yang Gao75ec2b12015-02-09 16:03:41 -0800379 virtual void Write(const W& msg, void* tag) = 0;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800380};
381
382template <class R>
Craig Tillercf133f42015-02-26 14:05:56 -0800383class ClientAsyncReader GRPC_FINAL : public ClientAsyncStreamingInterface,
384 public AsyncReaderInterface<R> {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800385 public:
Yang Gao424bc922015-02-12 10:24:39 -0800386 // Create a stream and write the first request out.
Craig Tiller573523f2015-02-17 07:38:26 -0800387 ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
388 const RpcMethod& method, ClientContext* context,
Yang Gao7694c352015-03-03 09:48:06 -0800389 const grpc::protobuf::Message& request, void* tag)
Yang Gao5705fe32015-02-12 14:30:34 -0800390 : context_(context), call_(channel->CreateCall(method, context, cq)) {
Craig Tiller549a74d2015-02-09 22:13:44 -0800391 init_buf_.Reset(tag);
Yang Gao424bc922015-02-12 10:24:39 -0800392 init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
Craig Tiller549a74d2015-02-09 22:13:44 -0800393 init_buf_.AddSendMessage(request);
394 init_buf_.AddClientSendClose();
395 call_.PerformOps(&init_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800396 }
397
Craig Tillercf133f42015-02-26 14:05:56 -0800398 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
Yang Gao424bc922015-02-12 10:24:39 -0800399 GPR_ASSERT(!context_->initial_metadata_received_);
400
Yang Gao5705fe32015-02-12 14:30:34 -0800401 meta_buf_.Reset(tag);
Yang Gao2b7f5372015-02-18 00:45:53 -0800402 meta_buf_.AddRecvInitialMetadata(context_);
Yang Gao5705fe32015-02-12 14:30:34 -0800403 call_.PerformOps(&meta_buf_);
Yang Gao424bc922015-02-12 10:24:39 -0800404 }
405
Craig Tillercf133f42015-02-26 14:05:56 -0800406 void Read(R* msg, void* tag) GRPC_OVERRIDE {
Craig Tiller549a74d2015-02-09 22:13:44 -0800407 read_buf_.Reset(tag);
Yang Gao424bc922015-02-12 10:24:39 -0800408 if (!context_->initial_metadata_received_) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800409 read_buf_.AddRecvInitialMetadata(context_);
Yang Gao424bc922015-02-12 10:24:39 -0800410 }
Yang Gao0c7aafa2015-02-12 22:51:38 -0800411 read_buf_.AddRecvMessage(msg);
Craig Tiller549a74d2015-02-09 22:13:44 -0800412 call_.PerformOps(&read_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800413 }
414
Craig Tillercf133f42015-02-26 14:05:56 -0800415 void Finish(Status* status, void* tag) GRPC_OVERRIDE {
Craig Tiller549a74d2015-02-09 22:13:44 -0800416 finish_buf_.Reset(tag);
Yang Gao424bc922015-02-12 10:24:39 -0800417 if (!context_->initial_metadata_received_) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800418 finish_buf_.AddRecvInitialMetadata(context_);
Yang Gao424bc922015-02-12 10:24:39 -0800419 }
Yang Gao2b7f5372015-02-18 00:45:53 -0800420 finish_buf_.AddClientRecvStatus(context_, status);
Craig Tiller549a74d2015-02-09 22:13:44 -0800421 call_.PerformOps(&finish_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800422 }
Craig Tiller2dff17d2015-02-09 12:42:23 -0800423
424 private:
Craig Tillercf133f42015-02-26 14:05:56 -0800425 ClientContext* context_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800426 Call call_;
Craig Tiller549a74d2015-02-09 22:13:44 -0800427 CallOpBuffer init_buf_;
Yang Gao5705fe32015-02-12 14:30:34 -0800428 CallOpBuffer meta_buf_;
Craig Tiller549a74d2015-02-09 22:13:44 -0800429 CallOpBuffer read_buf_;
430 CallOpBuffer finish_buf_;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800431};
432
433template <class W>
Craig Tillercf133f42015-02-26 14:05:56 -0800434class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
435 public AsyncWriterInterface<W> {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800436 public:
Craig Tiller573523f2015-02-17 07:38:26 -0800437 ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
438 const RpcMethod& method, ClientContext* context,
Yang Gao7694c352015-03-03 09:48:06 -0800439 grpc::protobuf::Message* response, void* tag)
Craig Tiller573523f2015-02-17 07:38:26 -0800440 : context_(context),
441 response_(response),
Yang Gao5705fe32015-02-12 14:30:34 -0800442 call_(channel->CreateCall(method, context, cq)) {
Yang Gao424bc922015-02-12 10:24:39 -0800443 init_buf_.Reset(tag);
444 init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
445 call_.PerformOps(&init_buf_);
446 }
Yang Gao75ec2b12015-02-09 16:03:41 -0800447
Craig Tillercf133f42015-02-26 14:05:56 -0800448 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
Yang Gao424bc922015-02-12 10:24:39 -0800449 GPR_ASSERT(!context_->initial_metadata_received_);
450
Yang Gao5705fe32015-02-12 14:30:34 -0800451 meta_buf_.Reset(tag);
Yang Gao2b7f5372015-02-18 00:45:53 -0800452 meta_buf_.AddRecvInitialMetadata(context_);
Yang Gao5705fe32015-02-12 14:30:34 -0800453 call_.PerformOps(&meta_buf_);
Yang Gao424bc922015-02-12 10:24:39 -0800454 }
455
Craig Tillercf133f42015-02-26 14:05:56 -0800456 void Write(const W& msg, void* tag) GRPC_OVERRIDE {
Craig Tiller549a74d2015-02-09 22:13:44 -0800457 write_buf_.Reset(tag);
458 write_buf_.AddSendMessage(msg);
459 call_.PerformOps(&write_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800460 }
461
Yang Gao068c85b2015-02-12 15:21:24 -0800462 void WritesDone(void* tag) {
Craig Tiller549a74d2015-02-09 22:13:44 -0800463 writes_done_buf_.Reset(tag);
464 writes_done_buf_.AddClientSendClose();
465 call_.PerformOps(&writes_done_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800466 }
467
Craig Tillercf133f42015-02-26 14:05:56 -0800468 void Finish(Status* status, void* tag) GRPC_OVERRIDE {
Craig Tiller549a74d2015-02-09 22:13:44 -0800469 finish_buf_.Reset(tag);
Yang Gao424bc922015-02-12 10:24:39 -0800470 if (!context_->initial_metadata_received_) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800471 finish_buf_.AddRecvInitialMetadata(context_);
Yang Gao424bc922015-02-12 10:24:39 -0800472 }
Yang Gao0c7aafa2015-02-12 22:51:38 -0800473 finish_buf_.AddRecvMessage(response_);
Yang Gao2b7f5372015-02-18 00:45:53 -0800474 finish_buf_.AddClientRecvStatus(context_, status);
Craig Tiller549a74d2015-02-09 22:13:44 -0800475 call_.PerformOps(&finish_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800476 }
477
478 private:
Craig Tillercf133f42015-02-26 14:05:56 -0800479 ClientContext* context_;
Yang Gao7694c352015-03-03 09:48:06 -0800480 grpc::protobuf::Message* const response_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800481 Call call_;
Yang Gao424bc922015-02-12 10:24:39 -0800482 CallOpBuffer init_buf_;
Yang Gao5705fe32015-02-12 14:30:34 -0800483 CallOpBuffer meta_buf_;
Craig Tiller549a74d2015-02-09 22:13:44 -0800484 CallOpBuffer write_buf_;
485 CallOpBuffer writes_done_buf_;
486 CallOpBuffer finish_buf_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800487};
488
489// Client-side interface for bi-directional streaming.
490template <class W, class R>
Craig Tillercf133f42015-02-26 14:05:56 -0800491class ClientAsyncReaderWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
492 public AsyncWriterInterface<W>,
493 public AsyncReaderInterface<R> {
Yang Gao75ec2b12015-02-09 16:03:41 -0800494 public:
Craig Tiller573523f2015-02-17 07:38:26 -0800495 ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
496 const RpcMethod& method, ClientContext* context,
497 void* tag)
Yang Gao068c85b2015-02-12 15:21:24 -0800498 : context_(context), call_(channel->CreateCall(method, context, cq)) {
Yang Gao424bc922015-02-12 10:24:39 -0800499 init_buf_.Reset(tag);
500 init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
501 call_.PerformOps(&init_buf_);
502 }
Yang Gao75ec2b12015-02-09 16:03:41 -0800503
Craig Tillercf133f42015-02-26 14:05:56 -0800504 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
Yang Gao424bc922015-02-12 10:24:39 -0800505 GPR_ASSERT(!context_->initial_metadata_received_);
506
Yang Gao5705fe32015-02-12 14:30:34 -0800507 meta_buf_.Reset(tag);
Yang Gao2b7f5372015-02-18 00:45:53 -0800508 meta_buf_.AddRecvInitialMetadata(context_);
Yang Gao5705fe32015-02-12 14:30:34 -0800509 call_.PerformOps(&meta_buf_);
Yang Gao424bc922015-02-12 10:24:39 -0800510 }
511
Craig Tillercf133f42015-02-26 14:05:56 -0800512 void Read(R* msg, void* tag) GRPC_OVERRIDE {
Craig Tiller549a74d2015-02-09 22:13:44 -0800513 read_buf_.Reset(tag);
Yang Gao424bc922015-02-12 10:24:39 -0800514 if (!context_->initial_metadata_received_) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800515 read_buf_.AddRecvInitialMetadata(context_);
Yang Gao424bc922015-02-12 10:24:39 -0800516 }
Yang Gao0c7aafa2015-02-12 22:51:38 -0800517 read_buf_.AddRecvMessage(msg);
Craig Tiller549a74d2015-02-09 22:13:44 -0800518 call_.PerformOps(&read_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800519 }
520
Craig Tillercf133f42015-02-26 14:05:56 -0800521 void Write(const W& msg, void* tag) GRPC_OVERRIDE {
Craig Tiller549a74d2015-02-09 22:13:44 -0800522 write_buf_.Reset(tag);
523 write_buf_.AddSendMessage(msg);
524 call_.PerformOps(&write_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800525 }
526
Yang Gao068c85b2015-02-12 15:21:24 -0800527 void WritesDone(void* tag) {
Craig Tiller549a74d2015-02-09 22:13:44 -0800528 writes_done_buf_.Reset(tag);
529 writes_done_buf_.AddClientSendClose();
530 call_.PerformOps(&writes_done_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800531 }
532
Craig Tillercf133f42015-02-26 14:05:56 -0800533 void Finish(Status* status, void* tag) GRPC_OVERRIDE {
Craig Tiller549a74d2015-02-09 22:13:44 -0800534 finish_buf_.Reset(tag);
Yang Gao424bc922015-02-12 10:24:39 -0800535 if (!context_->initial_metadata_received_) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800536 finish_buf_.AddRecvInitialMetadata(context_);
Yang Gao424bc922015-02-12 10:24:39 -0800537 }
Yang Gao2b7f5372015-02-18 00:45:53 -0800538 finish_buf_.AddClientRecvStatus(context_, status);
Craig Tiller549a74d2015-02-09 22:13:44 -0800539 call_.PerformOps(&finish_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800540 }
541
542 private:
Craig Tillercf133f42015-02-26 14:05:56 -0800543 ClientContext* context_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800544 Call call_;
Yang Gao424bc922015-02-12 10:24:39 -0800545 CallOpBuffer init_buf_;
Yang Gao5705fe32015-02-12 14:30:34 -0800546 CallOpBuffer meta_buf_;
Craig Tiller549a74d2015-02-09 22:13:44 -0800547 CallOpBuffer read_buf_;
548 CallOpBuffer write_buf_;
549 CallOpBuffer writes_done_buf_;
550 CallOpBuffer finish_buf_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800551};
552
Yang Gao005f18a2015-02-13 10:22:33 -0800553template <class W, class R>
Craig Tillercf133f42015-02-26 14:05:56 -0800554class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
555 public AsyncReaderInterface<R> {
Yang Gao75ec2b12015-02-09 16:03:41 -0800556 public:
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800557 explicit ServerAsyncReader(ServerContext* ctx)
558 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
Yang Gao75ec2b12015-02-09 16:03:41 -0800559
Craig Tillercf133f42015-02-26 14:05:56 -0800560 void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
Yang Gaoa38feb92015-02-12 12:05:20 -0800561 GPR_ASSERT(!ctx_->sent_initial_metadata_);
562
563 meta_buf_.Reset(tag);
564 meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
565 ctx_->sent_initial_metadata_ = true;
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800566 call_.PerformOps(&meta_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800567 }
568
Craig Tillercf133f42015-02-26 14:05:56 -0800569 void Read(R* msg, void* tag) GRPC_OVERRIDE {
Yang Gaoa38feb92015-02-12 12:05:20 -0800570 read_buf_.Reset(tag);
571 read_buf_.AddRecvMessage(msg);
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800572 call_.PerformOps(&read_buf_);
Yang Gaoa38feb92015-02-12 12:05:20 -0800573 }
574
Yang Gao005f18a2015-02-13 10:22:33 -0800575 void Finish(const W& msg, const Status& status, void* tag) {
576 finish_buf_.Reset(tag);
577 if (!ctx_->sent_initial_metadata_) {
578 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
579 ctx_->sent_initial_metadata_ = true;
580 }
581 // The response is dropped if the status is not OK.
582 if (status.IsOk()) {
583 finish_buf_.AddSendMessage(msg);
584 }
Yang Gao005f18a2015-02-13 10:22:33 -0800585 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
586 call_.PerformOps(&finish_buf_);
587 }
588
589 void FinishWithError(const Status& status, void* tag) {
590 GPR_ASSERT(!status.IsOk());
Yang Gaoa38feb92015-02-12 12:05:20 -0800591 finish_buf_.Reset(tag);
592 if (!ctx_->sent_initial_metadata_) {
593 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
594 ctx_->sent_initial_metadata_ = true;
595 }
Yang Gaoa38feb92015-02-12 12:05:20 -0800596 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800597 call_.PerformOps(&finish_buf_);
Yang Gaoa38feb92015-02-12 12:05:20 -0800598 }
599
Yang Gao75ec2b12015-02-09 16:03:41 -0800600 private:
Craig Tillercf133f42015-02-26 14:05:56 -0800601 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800602
603 Call call_;
Yang Gaoa38feb92015-02-12 12:05:20 -0800604 ServerContext* ctx_;
605 CallOpBuffer meta_buf_;
606 CallOpBuffer read_buf_;
607 CallOpBuffer finish_buf_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800608};
609
610template <class W>
Craig Tillercf133f42015-02-26 14:05:56 -0800611class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
612 public AsyncWriterInterface<W> {
Yang Gao75ec2b12015-02-09 16:03:41 -0800613 public:
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800614 explicit ServerAsyncWriter(ServerContext* ctx)
615 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
Yang Gao75ec2b12015-02-09 16:03:41 -0800616
Craig Tillercf133f42015-02-26 14:05:56 -0800617 void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
Yang Gaoa38feb92015-02-12 12:05:20 -0800618 GPR_ASSERT(!ctx_->sent_initial_metadata_);
619
620 meta_buf_.Reset(tag);
621 meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
622 ctx_->sent_initial_metadata_ = true;
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800623 call_.PerformOps(&meta_buf_);
Yang Gaoa38feb92015-02-12 12:05:20 -0800624 }
625
Craig Tillercf133f42015-02-26 14:05:56 -0800626 void Write(const W& msg, void* tag) GRPC_OVERRIDE {
Yang Gaoa38feb92015-02-12 12:05:20 -0800627 write_buf_.Reset(tag);
628 if (!ctx_->sent_initial_metadata_) {
629 write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
630 ctx_->sent_initial_metadata_ = true;
631 }
632 write_buf_.AddSendMessage(msg);
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800633 call_.PerformOps(&write_buf_);
Yang Gaoa38feb92015-02-12 12:05:20 -0800634 }
635
Craig Tiller0220cf12015-02-12 17:39:26 -0800636 void Finish(const Status& status, void* tag) {
Yang Gaoa38feb92015-02-12 12:05:20 -0800637 finish_buf_.Reset(tag);
638 if (!ctx_->sent_initial_metadata_) {
639 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
640 ctx_->sent_initial_metadata_ = true;
641 }
Yang Gaoa38feb92015-02-12 12:05:20 -0800642 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800643 call_.PerformOps(&finish_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800644 }
645
646 private:
Craig Tillercf133f42015-02-26 14:05:56 -0800647 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800648
649 Call call_;
Yang Gaoa38feb92015-02-12 12:05:20 -0800650 ServerContext* ctx_;
651 CallOpBuffer meta_buf_;
652 CallOpBuffer write_buf_;
653 CallOpBuffer finish_buf_;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800654};
655
656// Server-side interface for bi-directional streaming.
657template <class W, class R>
Craig Tillercf133f42015-02-26 14:05:56 -0800658class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
659 public AsyncWriterInterface<W>,
660 public AsyncReaderInterface<R> {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800661 public:
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800662 explicit ServerAsyncReaderWriter(ServerContext* ctx)
663 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
Yang Gao75ec2b12015-02-09 16:03:41 -0800664
Craig Tillercf133f42015-02-26 14:05:56 -0800665 void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
Yang Gaoa38feb92015-02-12 12:05:20 -0800666 GPR_ASSERT(!ctx_->sent_initial_metadata_);
667
668 meta_buf_.Reset(tag);
669 meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
670 ctx_->sent_initial_metadata_ = true;
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800671 call_.PerformOps(&meta_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800672 }
673
Craig Tillercf133f42015-02-26 14:05:56 -0800674 virtual void Read(R* msg, void* tag) GRPC_OVERRIDE {
Yang Gaoa38feb92015-02-12 12:05:20 -0800675 read_buf_.Reset(tag);
676 read_buf_.AddRecvMessage(msg);
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800677 call_.PerformOps(&read_buf_);
Yang Gaoa38feb92015-02-12 12:05:20 -0800678 }
679
Craig Tillercf133f42015-02-26 14:05:56 -0800680 virtual void Write(const W& msg, void* tag) GRPC_OVERRIDE {
Yang Gaoa38feb92015-02-12 12:05:20 -0800681 write_buf_.Reset(tag);
682 if (!ctx_->sent_initial_metadata_) {
683 write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
684 ctx_->sent_initial_metadata_ = true;
685 }
686 write_buf_.AddSendMessage(msg);
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800687 call_.PerformOps(&write_buf_);
Yang Gaoa38feb92015-02-12 12:05:20 -0800688 }
689
Craig Tiller0220cf12015-02-12 17:39:26 -0800690 void Finish(const Status& status, void* tag) {
Yang Gaoa38feb92015-02-12 12:05:20 -0800691 finish_buf_.Reset(tag);
692 if (!ctx_->sent_initial_metadata_) {
693 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
694 ctx_->sent_initial_metadata_ = true;
695 }
Yang Gaoa38feb92015-02-12 12:05:20 -0800696 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800697 call_.PerformOps(&finish_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800698 }
699
700 private:
Craig Tillercf133f42015-02-26 14:05:56 -0800701 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
Craig Tillerd4ebeeb2015-02-12 14:57:17 -0800702
703 Call call_;
Yang Gaoa38feb92015-02-12 12:05:20 -0800704 ServerContext* ctx_;
705 CallOpBuffer meta_buf_;
706 CallOpBuffer read_buf_;
707 CallOpBuffer write_buf_;
708 CallOpBuffer finish_buf_;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800709};
710
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800711} // namespace grpc
712
Nicolas "Pixel" Noble1ff52d52015-03-01 05:24:36 +0100713#endif // GRPCXX_STREAM_H