Implement async streaming APIs
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 6cc3716..e976e11 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -72,7 +72,7 @@
template <class R> friend class ::grpc::ServerReader;
template <class W> friend class ::grpc::ServerWriter;
template <class R, class W> friend class ::grpc::ServerReaderWriter;
-
+
ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count);
const std::chrono::system_clock::time_point deadline_;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 52a764b..6dc05bc 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -370,6 +370,15 @@
virtual void Finish(Status* status, void* tag) = 0;
};
+class ServerAsyncStreamingInterface {
+ public:
+ virtual ~ServerAsyncStreamingInterface() {}
+
+ virtual void SendInitialMetadata(void* tag) = 0;
+
+ virtual void Finish(const Status& status, void* tag) = 0;
+};
+
// An interface that yields a sequence of R messages.
template <class R>
class AsyncReaderInterface {
@@ -577,6 +586,7 @@
virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf;
+ buf.Reset(tag);
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
}
@@ -586,48 +596,147 @@
};
template <class R>
-class ServerAsyncReader : public AsyncReaderInterface<R> {
+class ServerAsyncReader : public ServerAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerAsyncReader(Call* call) : call_(call) {}
+ ServerAsyncReader(Call* call, ServerContext* ctx)
+ : call_(call), ctx_(ctx) {}
- virtual void Read(R* msg, void* tag) {
- // TODO
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&meta_buf_);
}
+ void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_->PerformOps(&read_buf_);
+ }
+
+ void Finish(const Status& status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_->PerformOps(&finish_buf_);
+ }
+
+
private:
Call* call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
};
template <class W>
-class ServerAsyncWriter : public AsyncWriterInterface<W> {
+class ServerAsyncWriter : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W> {
public:
- explicit ServerAsyncWriter(Call* call) : call_(call) {}
+ ServerAsyncWriter(Call* call, ServerContext* ctx)
+ : call_(call), ctx_(ctx) {}
- virtual void Write(const W& msg, void* tag) {
- // TODO
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&meta_buf_);
+ }
+
+ void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ write_buf_.AddSendMessage(msg);
+ call_->PerformOps(&write_buf_);
+ }
+
+ void Finish(const Status& status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_->PerformOps(&finish_buf_);
}
private:
Call* call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer finish_buf_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
- public AsyncReaderInterface<R> {
+class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
+ ServerAsyncReaderWriter(Call* call, ServerContext* ctx)
+ : call_(call), ctx_(ctx) {}
- virtual void Read(R* msg, void* tag) {
- // TODO
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&meta_buf_);
}
- virtual void Write(const W& msg, void* tag) {
- // TODO
+ virtual void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_->PerformOps(&read_buf_);
+ }
+
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ write_buf_.AddSendMessage(msg);
+ call_->PerformOps(&write_buf_);
+ }
+
+ void Finish(const Status& status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_->PerformOps(&finish_buf_);
}
private:
Call* call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer finish_buf_;
};
} // namespace grpc