Implement async streaming APIs
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 6cc3716..e976e11 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -72,7 +72,7 @@
   template <class R> friend class ::grpc::ServerReader;
   template <class W> friend class ::grpc::ServerWriter;
   template <class R, class W> friend class ::grpc::ServerReaderWriter;
-  
+
   ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count);
 
   const std::chrono::system_clock::time_point deadline_;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 52a764b..6dc05bc 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -370,6 +370,15 @@
   virtual void Finish(Status* status, void* tag) = 0;
 };
 
+class ServerAsyncStreamingInterface {
+ public:
+  virtual ~ServerAsyncStreamingInterface() {}
+
+  virtual void SendInitialMetadata(void* tag) = 0;
+
+  virtual void Finish(const Status& status, void* tag) = 0;
+};
+
 // An interface that yields a sequence of R messages.
 template <class R>
 class AsyncReaderInterface {
@@ -577,6 +586,7 @@
 
   virtual void Write(const W& msg, void* tag) override {
     CallOpBuffer buf;
+    buf.Reset(tag);
     buf.AddSendMessage(msg);
     call_->PerformOps(&buf);
   }
@@ -586,48 +596,147 @@
 };
 
 template <class R>
-class ServerAsyncReader : public AsyncReaderInterface<R> {
+class ServerAsyncReader : public ServerAsyncStreamingInterface,
+                          public AsyncReaderInterface<R> {
  public:
-  explicit ServerAsyncReader(Call* call) : call_(call) {}
+  ServerAsyncReader(Call* call, ServerContext* ctx)
+      : call_(call), ctx_(ctx) {}
 
-  virtual void Read(R* msg, void* tag) {
-    // TODO
+  void SendInitialMetadata(void* tag) override {
+    GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+    meta_buf_.Reset(tag);
+    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    ctx_->sent_initial_metadata_ = true;
+    call_->PerformOps(&meta_buf_);
   }
 
+  void Read(R* msg, void* tag) override {
+    read_buf_.Reset(tag);
+    read_buf_.AddRecvMessage(msg);
+    call_->PerformOps(&read_buf_);
+  }
+
+  void Finish(const Status& status, void* tag) override {
+    finish_buf_.Reset(tag);
+    if (!ctx_->sent_initial_metadata_) {
+      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      ctx_->sent_initial_metadata_ = true;
+    }
+    bool cancelled = false;
+    finish_buf_.AddServerRecvClose(&cancelled);
+    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+    call_->PerformOps(&finish_buf_);
+  }
+
+
  private:
   Call* call_;
+  ServerContext* ctx_;
+  CallOpBuffer meta_buf_;
+  CallOpBuffer read_buf_;
+  CallOpBuffer finish_buf_;
 };
 
 template <class W>
-class ServerAsyncWriter : public AsyncWriterInterface<W> {
+class ServerAsyncWriter : public ServerAsyncStreamingInterface,
+                          public AsyncWriterInterface<W> {
  public:
-  explicit ServerAsyncWriter(Call* call) : call_(call) {}
+  ServerAsyncWriter(Call* call, ServerContext* ctx)
+      : call_(call), ctx_(ctx) {}
 
-  virtual void Write(const W& msg, void* tag) {
-    // TODO
+  void SendInitialMetadata(void* tag) override {
+    GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+    meta_buf_.Reset(tag);
+    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    ctx_->sent_initial_metadata_ = true;
+    call_->PerformOps(&meta_buf_);
+  }
+
+  void Write(const W& msg, void* tag) override {
+    write_buf_.Reset(tag);
+    if (!ctx_->sent_initial_metadata_) {
+      write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      ctx_->sent_initial_metadata_ = true;
+    }
+    write_buf_.AddSendMessage(msg);
+    call_->PerformOps(&write_buf_);
+  }
+
+  void Finish(const Status& status, void* tag) override {
+    finish_buf_.Reset(tag);
+    if (!ctx_->sent_initial_metadata_) {
+      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      ctx_->sent_initial_metadata_ = true;
+    }
+    bool cancelled = false;
+    finish_buf_.AddServerRecvClose(&cancelled);
+    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+    call_->PerformOps(&finish_buf_);
   }
 
  private:
   Call* call_;
+  ServerContext* ctx_;
+  CallOpBuffer meta_buf_;
+  CallOpBuffer write_buf_;
+  CallOpBuffer finish_buf_;
 };
 
 // Server-side interface for bi-directional streaming.
 template <class W, class R>
-class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
-                           public AsyncReaderInterface<R> {
+class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
+                                public AsyncWriterInterface<W>,
+                                public AsyncReaderInterface<R> {
  public:
-  explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
+  ServerAsyncReaderWriter(Call* call, ServerContext* ctx)
+      : call_(call), ctx_(ctx) {}
 
-  virtual void Read(R* msg, void* tag) {
-    // TODO
+  void SendInitialMetadata(void* tag) override {
+    GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+    meta_buf_.Reset(tag);
+    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    ctx_->sent_initial_metadata_ = true;
+    call_->PerformOps(&meta_buf_);
   }
 
-  virtual void Write(const W& msg, void* tag) {
-    // TODO
+  virtual void Read(R* msg, void* tag) override {
+    read_buf_.Reset(tag);
+    read_buf_.AddRecvMessage(msg);
+    call_->PerformOps(&read_buf_);
+  }
+
+  virtual void Write(const W& msg, void* tag) override {
+    write_buf_.Reset(tag);
+    if (!ctx_->sent_initial_metadata_) {
+      write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      ctx_->sent_initial_metadata_ = true;
+    }
+    write_buf_.AddSendMessage(msg);
+    call_->PerformOps(&write_buf_);
+  }
+
+  void Finish(const Status& status, void* tag) override {
+    finish_buf_.Reset(tag);
+    if (!ctx_->sent_initial_metadata_) {
+      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      ctx_->sent_initial_metadata_ = true;
+    }
+    bool cancelled = false;
+    finish_buf_.AddServerRecvClose(&cancelled);
+    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+    call_->PerformOps(&finish_buf_);
   }
 
  private:
   Call* call_;
+  ServerContext* ctx_;
+  CallOpBuffer meta_buf_;
+  CallOpBuffer read_buf_;
+  CallOpBuffer write_buf_;
+  CallOpBuffer finish_buf_;
 };
 
 }  // namespace grpc