blob: 5f878469ce15f3f0d6eb127bc50bb060c3113685 [file] [log] [blame]
David Garcia Quintas6bd7b972016-01-27 19:21:12 -08001/*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35#define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
36
37#include <grpc++/impl/codegen/call.h>
38#include <grpc++/impl/codegen/channel_interface.h>
39#include <grpc++/impl/codegen/client_context.h>
40#include <grpc++/impl/codegen/completion_queue.h>
David Garcia Quintas6848c4e2016-03-07 17:10:57 -080041#include <grpc++/impl/codegen/core_codegen_interface.h>
David Garcia Quintas6bd7b972016-01-27 19:21:12 -080042#include <grpc++/impl/codegen/server_context.h>
43#include <grpc++/impl/codegen/service_type.h>
44#include <grpc++/impl/codegen/status.h>
45#include <grpc/impl/codegen/log.h>
46
47namespace grpc {
48
49/// Common interface for all synchronous client side streaming.
50class ClientStreamingInterface {
51 public:
52 virtual ~ClientStreamingInterface() {}
53
54 /// Wait until the stream finishes, and return the final status. When the
55 /// client side declares it has no more message to send, either implicitly or
56 /// by calling \a WritesDone(), it needs to make sure there is no more message
57 /// to be received from the server, either implicitly or by getting a false
58 /// from a \a Read().
59 ///
60 /// This function will return either:
61 /// - when all incoming messages have been read and the server has returned
62 /// status.
63 /// - OR when the server has returned a non-OK status.
64 virtual Status Finish() = 0;
65};
66
67/// An interface that yields a sequence of messages of type \a R.
68template <class R>
69class ReaderInterface {
70 public:
71 virtual ~ReaderInterface() {}
72
73 /// Blocking read a message and parse to \a msg. Returns \a true on success.
74 ///
75 /// \param[out] msg The read message.
76 ///
77 /// \return \a false when there will be no more incoming messages, either
78 /// because the other side has called \a WritesDone() or the stream has failed
79 /// (or been cancelled).
80 virtual bool Read(R* msg) = 0;
81};
82
83/// An interface that can be fed a sequence of messages of type \a W.
84template <class W>
85class WriterInterface {
86 public:
87 virtual ~WriterInterface() {}
88
89 /// Blocking write \a msg to the stream with options.
90 ///
91 /// \param msg The message to be written to the stream.
92 /// \param options Options affecting the write operation.
93 ///
94 /// \return \a true on success, \a false when the stream has been closed.
95 virtual bool Write(const W& msg, const WriteOptions& options) = 0;
96
97 /// Blocking write \a msg to the stream with default options.
98 ///
99 /// \param msg The message to be written to the stream.
100 ///
101 /// \return \a true on success, \a false when the stream has been closed.
102 inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
103};
104
105/// Client-side interface for streaming reads of message of type \a R.
106template <class R>
107class ClientReaderInterface : public ClientStreamingInterface,
108 public ReaderInterface<R> {
109 public:
110 /// Blocking wait for initial metadata from server. The received metadata
111 /// can only be accessed after this call returns. Should only be called before
112 /// the first read. Calling this method is optional, and if it is not called
113 /// the metadata will be available in ClientContext after the first read.
114 virtual void WaitForInitialMetadata() = 0;
115};
116
117template <class R>
118class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
119 public:
120 /// Blocking create a stream and write the first request out.
121 template <class W>
122 ClientReader(ChannelInterface* channel, const RpcMethod& method,
123 ClientContext* context, const W& request)
124 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
125 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
David Garcia Quintas9eef3772016-03-09 11:31:02 -0800126 CallOpClientSendClose> ops;
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800127 ops.SendInitialMetadata(context->send_initial_metadata_);
128 // TODO(ctiller): don't assert
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800129 GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800130 ops.ClientSendClose();
131 call_.PerformOps(&ops);
132 cq_.Pluck(&ops);
133 }
134
135 void WaitForInitialMetadata() GRPC_OVERRIDE {
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800136 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800137
138 CallOpSet<CallOpRecvInitialMetadata> ops;
139 ops.RecvInitialMetadata(context_);
140 call_.PerformOps(&ops);
141 cq_.Pluck(&ops); /// status ignored
142 }
143
144 bool Read(R* msg) GRPC_OVERRIDE {
145 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
146 if (!context_->initial_metadata_received_) {
147 ops.RecvInitialMetadata(context_);
148 }
149 ops.RecvMessage(msg);
150 call_.PerformOps(&ops);
151 return cq_.Pluck(&ops) && ops.got_message;
152 }
153
154 Status Finish() GRPC_OVERRIDE {
155 CallOpSet<CallOpClientRecvStatus> ops;
156 Status status;
157 ops.ClientRecvStatus(context_, &status);
158 call_.PerformOps(&ops);
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800159 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800160 return status;
161 }
162
163 private:
164 ClientContext* context_;
165 CompletionQueue cq_;
166 Call call_;
167};
168
169/// Client-side interface for streaming writes of message of type \a W.
170template <class W>
171class ClientWriterInterface : public ClientStreamingInterface,
172 public WriterInterface<W> {
173 public:
174 /// Half close writing from the client.
175 /// Block until writes are completed.
176 ///
177 /// \return Whether the writes were successful.
178 virtual bool WritesDone() = 0;
179};
180
181template <class W>
182class ClientWriter : public ClientWriterInterface<W> {
183 public:
184 /// Blocking create a stream.
185 template <class R>
186 ClientWriter(ChannelInterface* channel, const RpcMethod& method,
187 ClientContext* context, R* response)
188 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
189 finish_ops_.RecvMessage(response);
190
191 CallOpSet<CallOpSendInitialMetadata> ops;
192 ops.SendInitialMetadata(context->send_initial_metadata_);
193 call_.PerformOps(&ops);
194 cq_.Pluck(&ops);
195 }
196
Craig Tillera44cbfc2016-02-03 16:02:49 -0800197 void WaitForInitialMetadata() {
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800198 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
Craig Tillera44cbfc2016-02-03 16:02:49 -0800199
200 CallOpSet<CallOpRecvInitialMetadata> ops;
201 ops.RecvInitialMetadata(context_);
202 call_.PerformOps(&ops);
203 cq_.Pluck(&ops); // status ignored
204 }
205
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800206 using WriterInterface<W>::Write;
207 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
208 CallOpSet<CallOpSendMessage> ops;
209 if (!ops.SendMessage(msg, options).ok()) {
210 return false;
211 }
212 call_.PerformOps(&ops);
213 return cq_.Pluck(&ops);
214 }
215
216 bool WritesDone() GRPC_OVERRIDE {
217 CallOpSet<CallOpClientSendClose> ops;
218 ops.ClientSendClose();
219 call_.PerformOps(&ops);
220 return cq_.Pluck(&ops);
221 }
222
223 /// Read the final response and wait for the final status.
224 Status Finish() GRPC_OVERRIDE {
225 Status status;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800226 if (!context_->initial_metadata_received_) {
227 finish_ops_.RecvInitialMetadata(context_);
228 }
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800229 finish_ops_.ClientRecvStatus(context_, &status);
230 call_.PerformOps(&finish_ops_);
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800231 GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800232 return status;
233 }
234
235 private:
236 ClientContext* context_;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800237 CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
David Garcia Quintas9eef3772016-03-09 11:31:02 -0800238 CallOpClientRecvStatus> finish_ops_;
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800239 CompletionQueue cq_;
240 Call call_;
241};
242
243/// Client-side interface for bi-directional streaming.
244template <class W, class R>
245class ClientReaderWriterInterface : public ClientStreamingInterface,
246 public WriterInterface<W>,
247 public ReaderInterface<R> {
248 public:
249 /// Blocking wait for initial metadata from server. The received metadata
250 /// can only be accessed after this call returns. Should only be called before
251 /// the first read. Calling this method is optional, and if it is not called
252 /// the metadata will be available in ClientContext after the first read.
253 virtual void WaitForInitialMetadata() = 0;
254
255 /// Block until writes are completed.
256 ///
257 /// \return Whether the writes were successful.
258 virtual bool WritesDone() = 0;
259};
260
261template <class W, class R>
262class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
263 public:
264 /// Blocking create a stream.
265 ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
266 ClientContext* context)
267 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
268 CallOpSet<CallOpSendInitialMetadata> ops;
269 ops.SendInitialMetadata(context->send_initial_metadata_);
270 call_.PerformOps(&ops);
271 cq_.Pluck(&ops);
272 }
273
274 void WaitForInitialMetadata() GRPC_OVERRIDE {
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800275 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800276
277 CallOpSet<CallOpRecvInitialMetadata> ops;
278 ops.RecvInitialMetadata(context_);
279 call_.PerformOps(&ops);
280 cq_.Pluck(&ops); // status ignored
281 }
282
283 bool Read(R* msg) GRPC_OVERRIDE {
284 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
285 if (!context_->initial_metadata_received_) {
286 ops.RecvInitialMetadata(context_);
287 }
288 ops.RecvMessage(msg);
289 call_.PerformOps(&ops);
290 return cq_.Pluck(&ops) && ops.got_message;
291 }
292
293 using WriterInterface<W>::Write;
294 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
295 CallOpSet<CallOpSendMessage> ops;
296 if (!ops.SendMessage(msg, options).ok()) return false;
297 call_.PerformOps(&ops);
298 return cq_.Pluck(&ops);
299 }
300
301 bool WritesDone() GRPC_OVERRIDE {
302 CallOpSet<CallOpClientSendClose> ops;
303 ops.ClientSendClose();
304 call_.PerformOps(&ops);
305 return cq_.Pluck(&ops);
306 }
307
308 Status Finish() GRPC_OVERRIDE {
Craig Tillera44cbfc2016-02-03 16:02:49 -0800309 CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops;
310 if (!context_->initial_metadata_received_) {
311 ops.RecvInitialMetadata(context_);
312 }
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800313 Status status;
314 ops.ClientRecvStatus(context_, &status);
315 call_.PerformOps(&ops);
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800316 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800317 return status;
318 }
319
320 private:
321 ClientContext* context_;
322 CompletionQueue cq_;
323 Call call_;
324};
325
326template <class R>
327class ServerReader GRPC_FINAL : public ReaderInterface<R> {
328 public:
329 ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
330
331 void SendInitialMetadata() {
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800332 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800333
334 CallOpSet<CallOpSendInitialMetadata> ops;
335 ops.SendInitialMetadata(ctx_->initial_metadata_);
336 ctx_->sent_initial_metadata_ = true;
337 call_->PerformOps(&ops);
338 call_->cq()->Pluck(&ops);
339 }
340
341 bool Read(R* msg) GRPC_OVERRIDE {
342 CallOpSet<CallOpRecvMessage<R>> ops;
343 ops.RecvMessage(msg);
344 call_->PerformOps(&ops);
345 return call_->cq()->Pluck(&ops) && ops.got_message;
346 }
347
348 private:
349 Call* const call_;
350 ServerContext* const ctx_;
351};
352
353template <class W>
354class ServerWriter GRPC_FINAL : public WriterInterface<W> {
355 public:
356 ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
357
358 void SendInitialMetadata() {
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800359 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800360
361 CallOpSet<CallOpSendInitialMetadata> ops;
362 ops.SendInitialMetadata(ctx_->initial_metadata_);
363 ctx_->sent_initial_metadata_ = true;
364 call_->PerformOps(&ops);
365 call_->cq()->Pluck(&ops);
366 }
367
368 using WriterInterface<W>::Write;
369 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
370 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
371 if (!ops.SendMessage(msg, options).ok()) {
372 return false;
373 }
374 if (!ctx_->sent_initial_metadata_) {
375 ops.SendInitialMetadata(ctx_->initial_metadata_);
376 ctx_->sent_initial_metadata_ = true;
377 }
378 call_->PerformOps(&ops);
379 return call_->cq()->Pluck(&ops);
380 }
381
382 private:
383 Call* const call_;
384 ServerContext* const ctx_;
385};
386
387/// Server-side interface for bi-directional streaming.
388template <class W, class R>
389class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
390 public ReaderInterface<R> {
391 public:
392 ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
393
394 void SendInitialMetadata() {
David Garcia Quintas6848c4e2016-03-07 17:10:57 -0800395 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
David Garcia Quintas6bd7b972016-01-27 19:21:12 -0800396
397 CallOpSet<CallOpSendInitialMetadata> ops;
398 ops.SendInitialMetadata(ctx_->initial_metadata_);
399 ctx_->sent_initial_metadata_ = true;
400 call_->PerformOps(&ops);
401 call_->cq()->Pluck(&ops);
402 }
403
404 bool Read(R* msg) GRPC_OVERRIDE {
405 CallOpSet<CallOpRecvMessage<R>> ops;
406 ops.RecvMessage(msg);
407 call_->PerformOps(&ops);
408 return call_->cq()->Pluck(&ops) && ops.got_message;
409 }
410
411 using WriterInterface<W>::Write;
412 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
413 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
414 if (!ops.SendMessage(msg, options).ok()) {
415 return false;
416 }
417 if (!ctx_->sent_initial_metadata_) {
418 ops.SendInitialMetadata(ctx_->initial_metadata_);
419 ctx_->sent_initial_metadata_ = true;
420 }
421 call_->PerformOps(&ops);
422 return call_->cq()->Pluck(&ops);
423 }
424
425 private:
426 Call* const call_;
427 ServerContext* const ctx_;
428};
429
430} // namespace grpc
431
432#endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H