blob: 631183ea55c63fad92076a5c9a256635662bc346 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#ifndef __GRPCPP_STREAM_H__
35#define __GRPCPP_STREAM_H__
36
Craig Tillerc4965752015-02-09 09:51:00 -080037#include <grpc++/call.h>
38#include <grpc++/channel_interface.h>
39#include <grpc++/completion_queue.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc++/status.h>
41#include <grpc/support/log.h>
42
43namespace grpc {
44
Craig Tiller1d2e2192015-02-09 15:25:21 -080045// DELETE DELETE DELETE
46// DELETE DELETE DELETE
47// DELETE DELETE DELETE
48// DELETE DELETE DELETE
49// DELETE DELETE DELETE
50// DELETE DELETE DELETE
51// DELETE DELETE DELETE
52// DELETE DELETE DELETE
53// DELETE DELETE DELETE
54// DELETE DELETE DELETE
55// DELETE DELETE DELETE
56// DELETE DELETE DELETE
57// DELETE DELETE DELETE
58// DELETE DELETE DELETE
59// DELETE DELETE DELETE
60// DELETE DELETE DELETE
61// DELETE DELETE DELETE
62// DELETE DELETE DELETE
63// DELETE DELETE DELETE
64// DELETE DELETE DELETE
65// DELETE DELETE DELETE
66// DELETE DELETE DELETE
67// DELETE DELETE DELETE
68// DELETE DELETE DELETE
69 class StreamContextInterface {
70 public:
71 template <class T> bool Write(T, bool);
72 template <class T> void Start(T);
73 template <class T> bool Read(T);
74 google::protobuf::Message *request();
75 };
76
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077// Common interface for all client side streaming.
78class ClientStreamingInterface {
79 public:
80 virtual ~ClientStreamingInterface() {}
81
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080082 // Wait until the stream finishes, and return the final status. When the
83 // client side declares it has no more message to send, either implicitly or
84 // by calling WritesDone, it needs to make sure there is no more message to
85 // be received from the server, either implicitly or by getting a false from
86 // a Read(). Otherwise, this implicitly cancels the stream.
Craig Tillerc4965752015-02-09 09:51:00 -080087 virtual Status Finish() = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088};
89
90// An interface that yields a sequence of R messages.
91template <class R>
92class ReaderInterface {
93 public:
94 virtual ~ReaderInterface() {}
95
96 // Blocking read a message and parse to msg. Returns true on success.
97 // The method returns false when there will be no more incoming messages,
98 // either because the other side has called WritesDone or the stream has
99 // failed (or been cancelled).
100 virtual bool Read(R* msg) = 0;
101};
102
103// An interface that can be fed a sequence of W messages.
104template <class W>
105class WriterInterface {
106 public:
107 virtual ~WriterInterface() {}
108
109 // Blocking write msg to the stream. Returns true on success.
110 // Returns false when the stream has been closed.
111 virtual bool Write(const W& msg) = 0;
112};
113
114template <class R>
Craig Tillerc4965752015-02-09 09:51:00 -0800115class ClientReader final : public ClientStreamingInterface,
116 public ReaderInterface<R> {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800117 public:
118 // Blocking create a stream and write the first request out.
Yang Gao5f4f0c32015-02-09 13:45:28 -0800119 ClientReader(ChannelInterface *channel, const RpcMethod &method,
120 ClientContext *context,
121 const google::protobuf::Message &request)
Craig Tiller50950712015-02-09 10:38:38 -0800122 : call_(channel->CreateCall(method, context, &cq_)) {
Craig Tillerc4965752015-02-09 09:51:00 -0800123 CallOpBuffer buf;
124 buf.AddSendMessage(request);
125 buf.AddClientSendClose();
Craig Tillerde917062015-02-09 17:15:03 -0800126 call_.PerformOps(&buf);
127 cq_.Pluck(&buf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800128 }
129
Yang Gao8a3bbb52015-02-09 14:10:59 -0800130 virtual bool Read(R *msg) override {
Craig Tillerc4965752015-02-09 09:51:00 -0800131 CallOpBuffer buf;
132 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800133 call_.PerformOps(&buf);
134 return cq_.Pluck(&buf);
Craig Tillerc4965752015-02-09 09:51:00 -0800135 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800136
Craig Tillerc4965752015-02-09 09:51:00 -0800137 virtual Status Finish() override {
138 CallOpBuffer buf;
139 Status status;
140 buf.AddClientRecvStatus(&status);
Craig Tillerde917062015-02-09 17:15:03 -0800141 call_.PerformOps(&buf);
142 GPR_ASSERT(cq_.Pluck(&buf));
Craig Tillerc4965752015-02-09 09:51:00 -0800143 return status;
144 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800145
146 private:
Craig Tillerc4965752015-02-09 09:51:00 -0800147 CompletionQueue cq_;
148 Call call_;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149};
150
151template <class W>
Craig Tillerc4965752015-02-09 09:51:00 -0800152class ClientWriter final : public ClientStreamingInterface,
153 public WriterInterface<W> {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154 public:
155 // Blocking create a stream.
Yang Gao8a3bbb52015-02-09 14:10:59 -0800156 ClientWriter(ChannelInterface *channel, const RpcMethod &method,
157 ClientContext *context,
158 google::protobuf::Message *response)
Craig Tillerc4965752015-02-09 09:51:00 -0800159 : response_(response),
Craig Tiller50950712015-02-09 10:38:38 -0800160 call_(channel->CreateCall(method, context, &cq_)) {}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800161
Yang Gao8a3bbb52015-02-09 14:10:59 -0800162 virtual bool Write(const W& msg) override {
Craig Tillerc4965752015-02-09 09:51:00 -0800163 CallOpBuffer buf;
164 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800165 call_.PerformOps(&buf);
166 return cq_.Pluck(&buf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800167 }
168
Craig Tillerc4965752015-02-09 09:51:00 -0800169 virtual bool WritesDone() {
170 CallOpBuffer buf;
171 buf.AddClientSendClose();
Craig Tillerde917062015-02-09 17:15:03 -0800172 call_.PerformOps(&buf);
173 return cq_.Pluck(&buf);
Craig Tillerc4965752015-02-09 09:51:00 -0800174 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800175
176 // Read the final response and wait for the final status.
Craig Tillerc4965752015-02-09 09:51:00 -0800177 virtual Status Finish() override {
178 CallOpBuffer buf;
179 Status status;
Yang Gao75ec2b12015-02-09 16:03:41 -0800180 buf.AddRecvMessage(response_);
Craig Tillerc4965752015-02-09 09:51:00 -0800181 buf.AddClientRecvStatus(&status);
Craig Tillerde917062015-02-09 17:15:03 -0800182 call_.PerformOps(&buf);
183 GPR_ASSERT(cq_.Pluck(&buf));
Craig Tillerc4965752015-02-09 09:51:00 -0800184 return status;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800185 }
186
187 private:
Craig Tillerc4965752015-02-09 09:51:00 -0800188 google::protobuf::Message *const response_;
189 CompletionQueue cq_;
190 Call call_;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800191};
192
193// Client-side interface for bi-directional streaming.
194template <class W, class R>
Craig Tillerc4965752015-02-09 09:51:00 -0800195class ClientReaderWriter final : public ClientStreamingInterface,
196 public WriterInterface<W>,
197 public ReaderInterface<R> {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800198 public:
199 // Blocking create a stream.
Yang Gao8a3bbb52015-02-09 14:10:59 -0800200 ClientReaderWriter(ChannelInterface *channel,
201 const RpcMethod &method, ClientContext *context)
Craig Tiller50950712015-02-09 10:38:38 -0800202 : call_(channel->CreateCall(method, context, &cq_)) {}
Craig Tillerc4965752015-02-09 09:51:00 -0800203
Yang Gao8a3bbb52015-02-09 14:10:59 -0800204 virtual bool Read(R *msg) override {
Craig Tillerc4965752015-02-09 09:51:00 -0800205 CallOpBuffer buf;
206 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800207 call_.PerformOps(&buf);
208 return cq_.Pluck(&buf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800209 }
210
Yang Gao8a3bbb52015-02-09 14:10:59 -0800211 virtual bool Write(const W& msg) override {
Craig Tillerc4965752015-02-09 09:51:00 -0800212 CallOpBuffer buf;
213 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800214 call_.PerformOps(&buf);
215 return cq_.Pluck(&buf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800216 }
217
Craig Tillerc4965752015-02-09 09:51:00 -0800218 virtual bool WritesDone() {
219 CallOpBuffer buf;
220 buf.AddClientSendClose();
Craig Tillerde917062015-02-09 17:15:03 -0800221 call_.PerformOps(&buf);
222 return cq_.Pluck(&buf);
Craig Tillerc4965752015-02-09 09:51:00 -0800223 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800224
Craig Tillerc4965752015-02-09 09:51:00 -0800225 virtual Status Finish() override {
226 CallOpBuffer buf;
227 Status status;
228 buf.AddClientRecvStatus(&status);
Craig Tillerde917062015-02-09 17:15:03 -0800229 call_.PerformOps(&buf);
230 GPR_ASSERT(cq_.Pluck(&buf));
Craig Tillerc4965752015-02-09 09:51:00 -0800231 return status;
232 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800233
234 private:
Craig Tillerc4965752015-02-09 09:51:00 -0800235 CompletionQueue cq_;
236 Call call_;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800237};
238
nnoble0c475f02014-12-05 15:37:39 -0800239template <class R>
Yang Gao8a3bbb52015-02-09 14:10:59 -0800240class ServerReader final : public ReaderInterface<R> {
nnoble0c475f02014-12-05 15:37:39 -0800241 public:
Craig Tiller1d2e2192015-02-09 15:25:21 -0800242 explicit ServerReader(Call* call) : call_(call) {}
Yang Gao8a3bbb52015-02-09 14:10:59 -0800243
244 virtual bool Read(R* msg) override {
245 CallOpBuffer buf;
246 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800247 call_->PerformOps(&buf);
248 return call_->cq()->Pluck(&buf);
nnoble0c475f02014-12-05 15:37:39 -0800249 }
250
nnoble0c475f02014-12-05 15:37:39 -0800251 private:
Yang Gao8a3bbb52015-02-09 14:10:59 -0800252 Call* call_;
nnoble0c475f02014-12-05 15:37:39 -0800253};
254
255template <class W>
Yang Gao75ec2b12015-02-09 16:03:41 -0800256class ServerWriter final : public WriterInterface<W> {
nnoble0c475f02014-12-05 15:37:39 -0800257 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800258 explicit ServerWriter(Call* call) : call_(call) {}
nnoble0c475f02014-12-05 15:37:39 -0800259
Yang Gao75ec2b12015-02-09 16:03:41 -0800260 virtual bool Write(const W& msg) override {
261 CallOpBuffer buf;
262 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800263 call_->PerformOps(&buf);
264 return call_->cq()->Pluck(&buf);
nnoble0c475f02014-12-05 15:37:39 -0800265 }
266
267 private:
Yang Gao75ec2b12015-02-09 16:03:41 -0800268 Call* call_;
nnoble0c475f02014-12-05 15:37:39 -0800269};
270
271// Server-side interface for bi-directional streaming.
272template <class W, class R>
Yang Gao75ec2b12015-02-09 16:03:41 -0800273class ServerReaderWriter final : public WriterInterface<W>,
nnoble0c475f02014-12-05 15:37:39 -0800274 public ReaderInterface<R> {
275 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800276 explicit ServerReaderWriter(Call* call) : call_(call) {}
277
278 virtual bool Read(R* msg) override {
279 CallOpBuffer buf;
280 buf.AddRecvMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800281 call_->PerformOps(&buf);
282 return call_->cq()->Pluck(&buf);
nnoble0c475f02014-12-05 15:37:39 -0800283 }
284
Yang Gao75ec2b12015-02-09 16:03:41 -0800285 virtual bool Write(const W& msg) override {
286 CallOpBuffer buf;
287 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800288 call_->PerformOps(&buf);
289 return call_->cq()->Pluck(&buf);
nnoble0c475f02014-12-05 15:37:39 -0800290 }
291
292 private:
nnoble0c475f02014-12-05 15:37:39 -0800293 CompletionQueue* cq_;
294 Call* call_;
nnoble0c475f02014-12-05 15:37:39 -0800295};
296
Yang Gao75ec2b12015-02-09 16:03:41 -0800297// Async interfaces
298// Common interface for all client side streaming.
299class ClientAsyncStreamingInterface {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800300 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800301 virtual ~ClientAsyncStreamingInterface() {}
Craig Tiller2dff17d2015-02-09 12:42:23 -0800302
Yang Gao75ec2b12015-02-09 16:03:41 -0800303 virtual void Finish(Status* status, void* tag) = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800304};
Craig Tiller2dff17d2015-02-09 12:42:23 -0800305
Yang Gao75ec2b12015-02-09 16:03:41 -0800306// An interface that yields a sequence of R messages.
307template <class R>
308class AsyncReaderInterface {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800310 virtual ~AsyncReaderInterface() {}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800311
Yang Gao75ec2b12015-02-09 16:03:41 -0800312 virtual void Read(R* msg, void* tag) = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313};
314
Yang Gao75ec2b12015-02-09 16:03:41 -0800315// An interface that can be fed a sequence of W messages.
Craig Tiller2dff17d2015-02-09 12:42:23 -0800316template <class W>
Yang Gao75ec2b12015-02-09 16:03:41 -0800317class AsyncWriterInterface {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800318 public:
Craig Tiller53191642015-02-09 16:15:23 -0800319 virtual ~AsyncWriterInterface() {}
Craig Tiller2dff17d2015-02-09 12:42:23 -0800320
Yang Gao75ec2b12015-02-09 16:03:41 -0800321 virtual void Write(const W& msg, void* tag) = 0;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800322};
323
324template <class R>
Yang Gao75ec2b12015-02-09 16:03:41 -0800325class ClientAsyncReader final : public ClientAsyncStreamingInterface,
326 public AsyncReaderInterface<R> {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800327 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800328 // Blocking create a stream and write the first request out.
329 ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
330 ClientContext *context,
331 const google::protobuf::Message &request, void* tag)
332 : call_(channel->CreateCall(method, context, &cq_)) {
Craig Tiller549a74d2015-02-09 22:13:44 -0800333 init_buf_.Reset(tag);
334 init_buf_.AddSendMessage(request);
335 init_buf_.AddClientSendClose();
336 call_.PerformOps(&init_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800337 }
338
Yang Gao75ec2b12015-02-09 16:03:41 -0800339 virtual void Read(R *msg, void* tag) override {
Craig Tiller549a74d2015-02-09 22:13:44 -0800340 read_buf_.Reset(tag);
341 read_buf_.AddRecvMessage(msg);
342 call_.PerformOps(&read_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800343 }
344
345 virtual void Finish(Status* status, void* tag) override {
Craig Tiller549a74d2015-02-09 22:13:44 -0800346 finish_buf_.Reset(tag);
347 finish_buf_.AddClientRecvStatus(status);
348 call_.PerformOps(&finish_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800349 }
Craig Tiller2dff17d2015-02-09 12:42:23 -0800350
351 private:
Yang Gao75ec2b12015-02-09 16:03:41 -0800352 CompletionQueue cq_;
353 Call call_;
Craig Tiller549a74d2015-02-09 22:13:44 -0800354 CallOpBuffer init_buf_;
355 CallOpBuffer read_buf_;
356 CallOpBuffer finish_buf_;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800357};
358
359template <class W>
Craig Tiller53191642015-02-09 16:15:23 -0800360class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
Yang Gao75ec2b12015-02-09 16:03:41 -0800361 public WriterInterface<W> {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800362 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800363 // Blocking create a stream.
364 ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
365 ClientContext *context,
366 google::protobuf::Message *response)
367 : response_(response),
368 call_(channel->CreateCall(method, context, &cq_)) {}
369
370 virtual void Write(const W& msg, void* tag) override {
Craig Tiller549a74d2015-02-09 22:13:44 -0800371 write_buf_.Reset(tag);
372 write_buf_.AddSendMessage(msg);
373 call_.PerformOps(&write_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800374 }
375
Craig Tiller549a74d2015-02-09 22:13:44 -0800376 virtual void WritesDone(void* tag) override {
377 writes_done_buf_.Reset(tag);
378 writes_done_buf_.AddClientSendClose();
379 call_.PerformOps(&writes_done_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800380 }
381
382 virtual void Finish(Status* status, void* tag) override {
Craig Tiller549a74d2015-02-09 22:13:44 -0800383 finish_buf_.Reset(tag);
384 finish_buf_.AddRecvMessage(response_);
385 finish_buf_.AddClientRecvStatus(status);
386 call_.PerformOps(&finish_buf_);
Craig Tiller2dff17d2015-02-09 12:42:23 -0800387 }
388
389 private:
Yang Gao75ec2b12015-02-09 16:03:41 -0800390 google::protobuf::Message *const response_;
391 CompletionQueue cq_;
392 Call call_;
Craig Tiller549a74d2015-02-09 22:13:44 -0800393 CallOpBuffer write_buf_;
394 CallOpBuffer writes_done_buf_;
395 CallOpBuffer finish_buf_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800396};
397
398// Client-side interface for bi-directional streaming.
399template <class W, class R>
400class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
401 public AsyncWriterInterface<W>,
402 public AsyncReaderInterface<R> {
403 public:
404 ClientAsyncReaderWriter(ChannelInterface *channel,
405 const RpcMethod &method, ClientContext *context)
406 : call_(channel->CreateCall(method, context, &cq_)) {}
407
408 virtual void Read(R *msg, void* tag) override {
Craig Tiller549a74d2015-02-09 22:13:44 -0800409 read_buf_.Reset(tag);
410 read_buf_.AddRecvMessage(msg);
411 call_.PerformOps(&read_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800412 }
413
414 virtual void Write(const W& msg, void* tag) override {
Craig Tiller549a74d2015-02-09 22:13:44 -0800415 write_buf_.Reset(tag);
416 write_buf_.AddSendMessage(msg);
417 call_.PerformOps(&write_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800418 }
419
Craig Tiller549a74d2015-02-09 22:13:44 -0800420 virtual void WritesDone(void* tag) override {
421 writes_done_buf_.Reset(tag);
422 writes_done_buf_.AddClientSendClose();
423 call_.PerformOps(&writes_done_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800424 }
425
426 virtual void Finish(Status* status, void* tag) override {
Craig Tiller549a74d2015-02-09 22:13:44 -0800427 finish_buf_.Reset(tag);
428 finish_buf_.AddClientRecvStatus(status);
429 call_.PerformOps(&finish_buf_);
Yang Gao75ec2b12015-02-09 16:03:41 -0800430 }
431
432 private:
433 CompletionQueue cq_;
434 Call call_;
Craig Tiller549a74d2015-02-09 22:13:44 -0800435 CallOpBuffer read_buf_;
436 CallOpBuffer write_buf_;
437 CallOpBuffer writes_done_buf_;
438 CallOpBuffer finish_buf_;
Yang Gao75ec2b12015-02-09 16:03:41 -0800439};
440
441// TODO(yangg) Move out of stream.h
442template <class W>
443class ServerAsyncResponseWriter final {
444 public:
445 explicit ServerAsyncResponseWriter(Call* call) : call_(call) {}
446
447 virtual void Write(const W& msg, void* tag) override {
448 CallOpBuffer buf;
449 buf.AddSendMessage(msg);
Craig Tillerde917062015-02-09 17:15:03 -0800450 call_->PerformOps(&buf);
Yang Gao75ec2b12015-02-09 16:03:41 -0800451 }
452
453 private:
454 Call* call_;
455};
456
457template <class R>
458class ServerAsyncReader : public AsyncReaderInterface<R> {
459 public:
460 explicit ServerAsyncReader(Call* call) : call_(call) {}
461
462 virtual void Read(R* msg, void* tag) {
463 // TODO
464 }
465
466 private:
467 Call* call_;
468};
469
470template <class W>
471class ServerAsyncWriter : public AsyncWriterInterface<W> {
472 public:
473 explicit ServerAsyncWriter(Call* call) : call_(call) {}
474
475 virtual void Write(const W& msg, void* tag) {
476 // TODO
477 }
478
479 private:
480 Call* call_;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800481};
482
483// Server-side interface for bi-directional streaming.
484template <class W, class R>
Yang Gao75ec2b12015-02-09 16:03:41 -0800485class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
486 public AsyncReaderInterface<R> {
Craig Tiller2dff17d2015-02-09 12:42:23 -0800487 public:
Yang Gao75ec2b12015-02-09 16:03:41 -0800488 explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
489
490 virtual void Read(R* msg, void* tag) {
491 // TODO
Craig Tiller2dff17d2015-02-09 12:42:23 -0800492 }
493
Yang Gao75ec2b12015-02-09 16:03:41 -0800494 virtual void Write(const W& msg, void* tag) {
495 // TODO
Craig Tiller2dff17d2015-02-09 12:42:23 -0800496 }
497
498 private:
Yang Gao75ec2b12015-02-09 16:03:41 -0800499 Call* call_;
Craig Tiller2dff17d2015-02-09 12:42:23 -0800500};
501
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800502} // namespace grpc
503
504#endif // __GRPCPP_STREAM_H__