Merge github.com:grpc/grpc into c++api
diff --git a/Makefile b/Makefile
index 1547b49..6d382ca 100644
--- a/Makefile
+++ b/Makefile
@@ -2614,6 +2614,7 @@
     src/cpp/client/create_channel.cc \
     src/cpp/client/credentials.cc \
     src/cpp/client/internal_stub.cc \
+    src/cpp/common/call.cc \
     src/cpp/common/completion_queue.cc \
     src/cpp/common/rpc_method.cc \
     src/cpp/proto/proto_utils.cc \
@@ -2673,6 +2674,7 @@
 src/cpp/client/create_channel.cc: $(OPENSSL_DEP)
 src/cpp/client/credentials.cc: $(OPENSSL_DEP)
 src/cpp/client/internal_stub.cc: $(OPENSSL_DEP)
+src/cpp/common/call.cc: $(OPENSSL_DEP)
 src/cpp/common/completion_queue.cc: $(OPENSSL_DEP)
 src/cpp/common/rpc_method.cc: $(OPENSSL_DEP)
 src/cpp/proto/proto_utils.cc: $(OPENSSL_DEP)
@@ -2733,6 +2735,7 @@
 objs/$(CONFIG)/src/cpp/client/create_channel.o: 
 objs/$(CONFIG)/src/cpp/client/credentials.o: 
 objs/$(CONFIG)/src/cpp/client/internal_stub.o: 
+objs/$(CONFIG)/src/cpp/common/call.o: 
 objs/$(CONFIG)/src/cpp/common/completion_queue.o: 
 objs/$(CONFIG)/src/cpp/common/rpc_method.o: 
 objs/$(CONFIG)/src/cpp/proto/proto_utils.o: 
diff --git a/build.json b/build.json
index 68110e4..7d35f79 100644
--- a/build.json
+++ b/build.json
@@ -413,6 +413,7 @@
         "src/cpp/client/create_channel.cc",
         "src/cpp/client/credentials.cc",
         "src/cpp/client/internal_stub.cc",
+        "src/cpp/common/call.cc",
         "src/cpp/common/completion_queue.cc",
         "src/cpp/common/rpc_method.cc",
         "src/cpp/proto/proto_utils.cc",
diff --git a/include/grpc++/call.h b/include/grpc++/call.h
new file mode 100644
index 0000000..8cddf78
--- /dev/null
+++ b/include/grpc++/call.h
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPCPP_CALL_H__
+#define __GRPCPP_CALL_H__
+
+#include <grpc++/status.h>
+#include <grpc++/completion_queue.h>
+
+#include <memory>
+
+namespace google {
+namespace protobuf {
+class Message;
+}  // namespace protobuf
+}  // namespace google
+
+struct grpc_call;
+struct grpc_op;
+
+namespace grpc {
+
+class ChannelInterface;
+
+class CallOpBuffer final : public CompletionQueueTag {
+ public:
+  void AddSendMessage(const google::protobuf::Message &message);
+  void AddRecvMessage(google::protobuf::Message *message);
+  void AddClientSendClose();
+  void AddClientRecvStatus(Status *status);
+
+  // INTERNAL API:
+
+  // Convert to an array of grpc_op elements
+  void FillOps(grpc_op *ops, size_t *nops);
+
+  // Called by completion queue just prior to returning from Next() or Pluck()
+  void FinalizeResult() override;
+};
+
+class CCallDeleter {
+ public:
+  void operator()(grpc_call *c);
+};
+
+// Straightforward wrapping of the C call object
+class Call final {
+ public:
+  Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq);
+
+  void PerformOps(CallOpBuffer *buffer, void *tag);
+
+  grpc_call *call() { return call_.get(); }
+  CompletionQueue *cq() { return cq_; }
+
+ private:
+  ChannelInterface *channel_;
+  CompletionQueue *cq_;
+  std::unique_ptr<grpc_call, CCallDeleter> call_;
+};
+
+}  // namespace grpc
+
+#endif  // __GRPCPP_CALL_INTERFACE_H__
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h
index 9ed3542..452c785 100644
--- a/include/grpc++/channel_interface.h
+++ b/include/grpc++/channel_interface.h
@@ -39,30 +39,42 @@
 namespace google {
 namespace protobuf {
 class Message;
-}
-}
+}  // namespace protobuf
+}  // namespace google
+
+struct grpc_call;
 
 namespace grpc {
-
+class Call;
+class CallOpBuffer;
 class ClientContext;
+class CompletionQueue;
 class RpcMethod;
 class StreamContextInterface;
+class CallInterface;
 
 class ChannelInterface {
  public:
   virtual ~ChannelInterface() {}
 
-  virtual Status StartBlockingRpc(const RpcMethod& method,
-                                  ClientContext* context,
-                                  const google::protobuf::Message& request,
-                                  google::protobuf::Message* result) = 0;
-
-  virtual StreamContextInterface* CreateStream(
-      const RpcMethod& method, ClientContext* context,
-      const google::protobuf::Message* request,
-      google::protobuf::Message* result) = 0;
+  virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
+                          CompletionQueue *cq) = 0;
+  virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag, Call *call) = 0;
 };
 
+// Wrapper that begins an asynchronous unary call
+void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+                    ClientContext *context,
+                    const google::protobuf::Message &request,
+                    google::protobuf::Message *result, Status *status,
+                    CompletionQueue *cq, void *tag);
+
+// Wrapper that performs a blocking unary call
+Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+                         ClientContext *context,
+                         const google::protobuf::Message &request,
+                         google::protobuf::Message *result);
+
 }  // namespace grpc
 
 #endif  // __GRPCPP_CHANNEL_INTERFACE_H__
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 72f6253..4bc707e 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -38,42 +38,46 @@
 
 namespace grpc {
 
+class CompletionQueue;
+
+class CompletionQueueTag {
+ public:
+  virtual void FinalizeResult() = 0;
+
+ private:
+  friend class CompletionQueue;
+  void *user_tag_;
+};
+
 // grpc_completion_queue wrapper class
 class CompletionQueue {
  public:
   CompletionQueue();
   ~CompletionQueue();
 
-  enum CompletionType {
-    QUEUE_CLOSED = 0,       // Shutting down.
-    RPC_END = 1,            // An RPC finished. Either at client or server.
-    CLIENT_READ_OK = 2,     // A client-side read has finished successfully.
-    CLIENT_READ_ERROR = 3,  // A client-side read has finished with error.
-    CLIENT_WRITE_OK = 4,
-    CLIENT_WRITE_ERROR = 5,
-    SERVER_RPC_NEW = 6,     // A new RPC just arrived at the server.
-    SERVER_READ_OK = 7,     // A server-side read has finished successfully.
-    SERVER_READ_ERROR = 8,  // A server-side read has finished with error.
-    SERVER_WRITE_OK = 9,
-    SERVER_WRITE_ERROR = 10,
-    // Client or server has sent half close successfully.
-    HALFCLOSE_OK = 11,
-    // New CompletionTypes may be added in the future, so user code should
-    // always
-    // handle the default case of a CompletionType that appears after such code
-    // was
-    // written.
-    DO_NOT_USE = 20,
-  };
-
   // Blocking read from queue.
-  // For QUEUE_CLOSED, *tag is not changed.
-  // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext.
-  // For others, *tag will be the AsyncServerContext of this rpc.
-  CompletionType Next(void** tag);
+  // Returns true if an event was received, false if the queue is ready
+  // for destruction.
+  bool Next(void **tag, bool *ok);
+
+  bool Pluck(void *tag);
+
+  // Prepare a tag for the C api
+  // Given a tag we'd like to receive from Next, what tag should we pass
+  // down to the C api?
+  // Usage example:
+  //   grpc_call_start_batch(..., cq.PrepareTagForC(tag));
+  // Allows attaching some work to be executed before the original tag
+  // is returned.
+  // MUST be used for all events that could be surfaced through this
+  // wrapping API
+  void *PrepareTagForC(CompletionQueueTag *cq_tag, void *user_tag) {
+    cq_tag->user_tag_ = user_tag;
+    return cq_tag;
+  }
 
   // Shutdown has to be called, and the CompletionQueue can only be
-  // destructed when the QUEUE_CLOSED message has been read with Next().
+  // destructed when false is returned from Next().
   void Shutdown();
 
   grpc_completion_queue* cq() { return cq_; }
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 5fa371b..3931d9a 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -48,8 +48,8 @@
 namespace google {
 namespace protobuf {
 class Message;
-}
-}
+}  // namespace protobuf
+}  // namespace google
 
 namespace grpc {
 class AsyncServerContext;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index b8982f4..dce07b6 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -34,6 +34,9 @@
 #ifndef __GRPCPP_STREAM_H__
 #define __GRPCPP_STREAM_H__
 
+#include <grpc++/call.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/completion_queue.h>
 #include <grpc++/stream_context_interface.h>
 #include <grpc++/status.h>
 #include <grpc/support/log.h>
@@ -45,16 +48,12 @@
  public:
   virtual ~ClientStreamingInterface() {}
 
-  // Try to cancel the stream. Wait() still needs to be called to get the final
-  // status. Cancelling after the stream has finished has no effects.
-  virtual void Cancel() = 0;
-
   // Wait until the stream finishes, and return the final status. When the
   // client side declares it has no more message to send, either implicitly or
   // by calling WritesDone, it needs to make sure there is no more message to
   // be received from the server, either implicitly or by getting a false from
   // a Read(). Otherwise, this implicitly cancels the stream.
-  virtual const Status& Wait() = 0;
+  virtual Status Finish() = 0;
 };
 
 // An interface that yields a sequence of R messages.
@@ -82,95 +81,127 @@
 };
 
 template <class R>
-class ClientReader : public ClientStreamingInterface,
-                     public ReaderInterface<R> {
+class ClientReader final : public ClientStreamingInterface,
+                           public ReaderInterface<R> {
  public:
   // Blocking create a stream and write the first request out.
-  explicit ClientReader(StreamContextInterface* context) : context_(context) {
-    GPR_ASSERT(context_);
-    context_->Start(true);
-    context_->Write(context_->request(), true);
+  explicit ClientReader(ChannelInterface *channel, const RpcMethod &method,
+                        ClientContext *context,
+                        const google::protobuf::Message &request)
+      : call_(channel->CreateCall(method, context, &cq_)) {
+    CallOpBuffer buf;
+    buf.AddSendMessage(request);
+    buf.AddClientSendClose();
+    call_.PerformOps(&buf, (void *)1);
+    cq_.Pluck((void *)1);
   }
 
-  ~ClientReader() { delete context_; }
+  virtual bool Read(R *msg) {
+    CallOpBuffer buf;
+    buf.AddRecvMessage(msg);
+    call_.PerformOps(&buf, (void *)2);
+    return cq_.Pluck((void *)2);
+  }
 
-  virtual bool Read(R* msg) { return context_->Read(msg); }
-
-  virtual void Cancel() { context_->Cancel(); }
-
-  virtual const Status& Wait() { return context_->Wait(); }
+  virtual Status Finish() override {
+    CallOpBuffer buf;
+    Status status;
+    buf.AddClientRecvStatus(&status);
+    call_.PerformOps(&buf, (void *)3);
+    GPR_ASSERT(cq_.Pluck((void *)3));
+    return status;
+  }
 
  private:
-  StreamContextInterface* const context_;
+  CompletionQueue cq_;
+  Call call_;
 };
 
 template <class W>
-class ClientWriter : public ClientStreamingInterface,
-                     public WriterInterface<W> {
+class ClientWriter final : public ClientStreamingInterface,
+                           public WriterInterface<W> {
  public:
   // Blocking create a stream.
-  explicit ClientWriter(StreamContextInterface* context) : context_(context) {
-    GPR_ASSERT(context_);
-    context_->Start(false);
-  }
-
-  ~ClientWriter() { delete context_; }
+  explicit ClientWriter(ChannelInterface *channel, const RpcMethod &method,
+                        ClientContext *context,
+                        google::protobuf::Message *response)
+      : response_(response),
+        call_(channel->CreateCall(method, context, &cq_)) {}
 
   virtual bool Write(const W& msg) {
-    return context_->Write(const_cast<W*>(&msg), false);
+    CallOpBuffer buf;
+    buf.AddSendMessage(msg);
+    call_.PerformOps(&buf, (void *)2);
+    return cq_.Pluck((void *)2);
   }
 
-  virtual void WritesDone() { context_->Write(nullptr, true); }
-
-  virtual void Cancel() { context_->Cancel(); }
+  virtual bool WritesDone() {
+    CallOpBuffer buf;
+    buf.AddClientSendClose();
+    call_.PerformOps(&buf, (void *)3);
+    return cq_.Pluck((void *)3);
+  }
 
   // Read the final response and wait for the final status.
-  virtual const Status& Wait() {
-    bool success = context_->Read(context_->response());
-    if (!success) {
-      Cancel();
-    } else {
-      success = context_->Read(nullptr);
-      if (success) {
-        Cancel();
-      }
-    }
-    return context_->Wait();
+  virtual Status Finish() override {
+    CallOpBuffer buf;
+    Status status;
+    buf.AddClientRecvStatus(&status);
+    call_.PerformOps(&buf, (void *)4);
+    GPR_ASSERT(cq_.Pluck((void *)4));
+    return status;
   }
 
  private:
-  StreamContextInterface* const context_;
+  google::protobuf::Message *const response_;
+  CompletionQueue cq_;
+  Call call_;
 };
 
 // Client-side interface for bi-directional streaming.
 template <class W, class R>
-class ClientReaderWriter : public ClientStreamingInterface,
-                           public WriterInterface<W>,
-                           public ReaderInterface<R> {
+class ClientReaderWriter final : public ClientStreamingInterface,
+                                 public WriterInterface<W>,
+                                 public ReaderInterface<R> {
  public:
   // Blocking create a stream.
-  explicit ClientReaderWriter(StreamContextInterface* context)
-      : context_(context) {
-    GPR_ASSERT(context_);
-    context_->Start(false);
+  explicit ClientReaderWriter(ChannelInterface *channel,
+                              const RpcMethod &method, ClientContext *context)
+      : call_(channel->CreateCall(method, context, &cq_)) {}
+
+  virtual bool Read(R *msg) {
+    CallOpBuffer buf;
+    buf.AddRecvMessage(msg);
+    call_.PerformOps(&buf, (void *)2);
+    return cq_.Pluck((void *)2);
   }
 
-  ~ClientReaderWriter() { delete context_; }
-
-  virtual bool Read(R* msg) { return context_->Read(msg); }
-
   virtual bool Write(const W& msg) {
-    return context_->Write(const_cast<W*>(&msg), false);
+    CallOpBuffer buf;
+    buf.AddSendMessage(msg);
+    call_.PerformOps(&buf, (void *)3);
+    return cq_.Pluck((void *)3);
   }
 
-  virtual void WritesDone() { context_->Write(nullptr, true); }
+  virtual bool WritesDone() {
+    CallOpBuffer buf;
+    buf.AddClientSendClose();
+    call_.PerformOps(&buf, (void *)4);
+    return cq_.Pluck((void *)4);
+  }
 
-  virtual void Cancel() { context_->Cancel(); }
-
-  virtual const Status& Wait() { return context_->Wait(); }
+  virtual Status Finish() override {
+    CallOpBuffer buf;
+    Status status;
+    buf.AddClientRecvStatus(&status);
+    call_.PerformOps(&buf, (void *)5);
+    GPR_ASSERT(cq_.Pluck((void *)5));
+    return status;
+  }
 
  private:
-  StreamContextInterface* const context_;
+  CompletionQueue cq_;
+  Call call_;
 };
 
 template <class R>
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 8724f97..cd537f9 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -268,7 +268,7 @@
                    "::grpc::ClientContext* context, "
                    "const $Request$& request, $Response$* response) {\n");
     printer->Print(*vars,
-                   "  return channel()->StartBlockingRpc("
+                   "return ::grpc::BlockingUnaryCall(channel(),"
                    "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\"), "
                    "context, request, response);\n"
                    "}\n\n");
@@ -279,10 +279,10 @@
         "::grpc::ClientContext* context, $Response$* response) {\n");
     printer->Print(*vars,
                    "  return new ::grpc::ClientWriter< $Request$>("
-                   "channel()->CreateStream("
+                   "channel(),"
                    "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
                    "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
-                   "context, nullptr, response));\n"
+                   "context, response);\n"
                    "}\n\n");
   } else if (ServerOnlyStreaming(method)) {
     printer->Print(
@@ -291,10 +291,10 @@
         "::grpc::ClientContext* context, const $Request$* request) {\n");
     printer->Print(*vars,
                    "  return new ::grpc::ClientReader< $Response$>("
-                   "channel()->CreateStream("
+                   "channel(),"
                    "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
                    "::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
-                   "context, request, nullptr));\n"
+                   "context, *request);\n"
                    "}\n\n");
   } else if (BidiStreaming(method)) {
     printer->Print(
@@ -304,10 +304,10 @@
     printer->Print(
         *vars,
         "  return new ::grpc::ClientReaderWriter< $Request$, $Response$>("
-        "channel()->CreateStream("
+        "channel(),"
         "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
         "::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
-        "context, nullptr, nullptr));\n"
+        "context);\n"
         "}\n\n");
   }
 }
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 3f39364..b513212 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -43,8 +43,10 @@
 
 #include "src/cpp/proto/proto_utils.h"
 #include "src/cpp/stream/stream_context.h"
+#include <grpc++/call.h>
 #include <grpc++/channel_arguments.h>
 #include <grpc++/client_context.h>
+#include <grpc++/completion_queue.h>
 #include <grpc++/config.h>
 #include <grpc++/credentials.h>
 #include <grpc++/impl/rpc_method.h>
@@ -77,103 +79,23 @@
 
 Channel::~Channel() { grpc_channel_destroy(c_channel_); }
 
-namespace {
-// Pluck the finished event and set to status when it is not nullptr.
-void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag,
-                    Status *status) {
-  grpc_event *ev =
-      grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future);
-  if (status) {
-    StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status);
-    grpc::string details(ev->data.finished.details ? ev->data.finished.details
-                                                   : "");
-    *status = Status(error_code, details);
-  }
-  grpc_event_finish(ev);
-}
-}  // namespace
-
-// TODO(yangg) more error handling
-Status Channel::StartBlockingRpc(const RpcMethod &method,
-                                 ClientContext *context,
-                                 const google::protobuf::Message &request,
-                                 google::protobuf::Message *result) {
-  Status status;
-  grpc_call *call = grpc_channel_create_call_old(
-      c_channel_, method.name(), target_.c_str(), context->RawDeadline());
-  context->set_call(call);
-
-  grpc_event *ev;
-  void *finished_tag = reinterpret_cast<char *>(call);
-  void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
-  void *write_tag = reinterpret_cast<char *>(call) + 3;
-  void *halfclose_tag = reinterpret_cast<char *>(call) + 4;
-  void *read_tag = reinterpret_cast<char *>(call) + 5;
-
-  grpc_completion_queue *cq = grpc_completion_queue_create();
-  context->set_cq(cq);
-  // add_metadata from context
-  //
-  // invoke
-  GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag,
-                                  GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
-  // write request
-  grpc_byte_buffer *write_buffer = nullptr;
-  bool success = SerializeProto(request, &write_buffer);
-  if (!success) {
-    grpc_call_cancel(call);
-    status =
-        Status(StatusCode::DATA_LOSS, "Failed to serialize request proto.");
-    GetFinalStatus(cq, finished_tag, nullptr);
-    return status;
-  }
-  GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag,
-                                       GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
-  grpc_byte_buffer_destroy(write_buffer);
-  ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future);
-
-  success = ev->data.write_accepted == GRPC_OP_OK;
-  grpc_event_finish(ev);
-  if (!success) {
-    GetFinalStatus(cq, finished_tag, &status);
-    return status;
-  }
-  // writes done
-  GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK);
-  ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future);
-  grpc_event_finish(ev);
-  // start read metadata
-  //
-  ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future);
-  grpc_event_finish(ev);
-  // start read
-  GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK);
-  ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future);
-  if (ev->data.read) {
-    if (!DeserializeProto(ev->data.read, result)) {
-      grpc_event_finish(ev);
-      status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto.");
-      GetFinalStatus(cq, finished_tag, nullptr);
-      return status;
-    }
-  }
-  grpc_event_finish(ev);
-
-  // wait status
-  GetFinalStatus(cq, finished_tag, &status);
-  return status;
+Call Channel::CreateCall(const RpcMethod &method, ClientContext *context,
+                         CompletionQueue *cq) {
+  auto c_call =
+      grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
+                               target_.c_str(), context->RawDeadline());
+  context->set_call(c_call);
+  return Call(c_call, this, cq);
 }
 
-StreamContextInterface *Channel::CreateStream(
-    const RpcMethod &method, ClientContext *context,
-    const google::protobuf::Message *request,
-    google::protobuf::Message *result) {
-  grpc_call *call = grpc_channel_create_call_old(
-      c_channel_, method.name(), target_.c_str(), context->RawDeadline());
-  context->set_call(call);
-  grpc_completion_queue *cq = grpc_completion_queue_create();
-  context->set_cq(cq);
-  return new StreamContext(method, context, request, result);
+void Channel::PerformOpsOnCall(CallOpBuffer *buf, void *tag, Call *call) {
+  static const size_t MAX_OPS = 8;
+  size_t nops = MAX_OPS;
+  grpc_op ops[MAX_OPS];
+  buf->FillOps(ops, &nops);
+  GPR_ASSERT(GRPC_CALL_OK ==
+             grpc_call_start_batch(call->call(), ops, nops,
+                                   call->cq()->PrepareTagForC(buf, tag)));
 }
 
 }  // namespace grpc
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index 67d18bf..6cf2228 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -42,11 +42,14 @@
 struct grpc_channel;
 
 namespace grpc {
+class Call;
+class CallOpBuffer;
 class ChannelArguments;
+class CompletionQueue;
 class Credentials;
 class StreamContextInterface;
 
-class Channel : public ChannelInterface {
+class Channel final : public ChannelInterface {
  public:
   Channel(const grpc::string &target, const ChannelArguments &args);
   Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds,
@@ -54,14 +57,10 @@
 
   ~Channel() override;
 
-  Status StartBlockingRpc(const RpcMethod &method, ClientContext *context,
-                          const google::protobuf::Message &request,
-                          google::protobuf::Message *result) override;
-
-  StreamContextInterface *CreateStream(
-      const RpcMethod &method, ClientContext *context,
-      const google::protobuf::Message *request,
-      google::protobuf::Message *result) override;
+  virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
+                          CompletionQueue *cq) override;
+  virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag,
+                                Call *call) override;
 
  private:
   const grpc::string target_;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
new file mode 100644
index 0000000..6ae6c6c
--- /dev/null
+++ b/src/cpp/common/call.cc
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <include/grpc++/call.h>
+#include <include/grpc++/channel_interface.h>
+
+namespace grpc {
+
+void Call::PerformOps(CallOpBuffer* buffer, void* tag) {
+  channel_->PerformOpsOnCall(buffer, tag, call_);
+}
+
+}  // namespace grpc
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index f06da9b..383b66c 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -33,6 +33,8 @@
 
 #include <grpc++/completion_queue.h>
 
+#include <memory>
+
 #include <grpc/grpc.h>
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
@@ -47,66 +49,24 @@
 
 void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
 
-CompletionQueue::CompletionType CompletionQueue::Next(void **tag) {
-  grpc_event *ev;
-  CompletionType return_type;
-  bool success;
+// Helper class so we can declare a unique_ptr with grpc_event
+class EventDeleter {
+ public:
+  void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); }
+};
 
-  ev = grpc_completion_queue_next(cq_, gpr_inf_future);
-  if (!ev) {
-    gpr_log(GPR_ERROR, "no next event in queue");
-    abort();
+bool CompletionQueue::Next(void **tag, bool *ok) {
+  std::unique_ptr<grpc_event, EventDeleter> ev;
+
+  ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
+  if (ev->type == GRPC_QUEUE_SHUTDOWN) {
+    return false;
   }
-  switch (ev->type) {
-    case GRPC_QUEUE_SHUTDOWN:
-      return_type = QUEUE_CLOSED;
-      break;
-    case GRPC_READ:
-      *tag = ev->tag;
-      if (ev->data.read) {
-        success = static_cast<AsyncServerContext *>(ev->tag)
-                      ->ParseRead(ev->data.read);
-        return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR;
-      } else {
-        return_type = SERVER_READ_ERROR;
-      }
-      break;
-    case GRPC_WRITE_ACCEPTED:
-      *tag = ev->tag;
-      if (ev->data.write_accepted != GRPC_OP_ERROR) {
-        return_type = SERVER_WRITE_OK;
-      } else {
-        return_type = SERVER_WRITE_ERROR;
-      }
-      break;
-    case GRPC_SERVER_RPC_NEW:
-      GPR_ASSERT(!ev->tag);
-      // Finishing the pending new rpcs after the server has been shutdown.
-      if (!ev->call) {
-        *tag = nullptr;
-      } else {
-        *tag = new AsyncServerContext(
-            ev->call, ev->data.server_rpc_new.method,
-            ev->data.server_rpc_new.host,
-            Timespec2Timepoint(ev->data.server_rpc_new.deadline));
-      }
-      return_type = SERVER_RPC_NEW;
-      break;
-    case GRPC_FINISHED:
-      *tag = ev->tag;
-      return_type = RPC_END;
-      break;
-    case GRPC_FINISH_ACCEPTED:
-      *tag = ev->tag;
-      return_type = HALFCLOSE_OK;
-      break;
-    default:
-      // We do not handle client side messages now
-      gpr_log(GPR_ERROR, "client-side messages aren't supported yet");
-      abort();
-  }
-  grpc_event_finish(ev);
-  return return_type;
+  auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag);
+  cq_tag->FinalizeResult();
+  *tag = cq_tag->user_tag_;
+  *ok = ev->status.op_complete == GRPC_OP_OK;
+  return true;
 }
 
 }  // namespace grpc
diff --git a/src/cpp/server/server_rpc_handler.h b/src/cpp/server/server_rpc_handler.h
index a43e07d..ec8ec2c 100644
--- a/src/cpp/server/server_rpc_handler.h
+++ b/src/cpp/server/server_rpc_handler.h
@@ -53,7 +53,6 @@
   void StartRpc();
 
  private:
-  CompletionQueue::CompletionType WaitForNextEvent();
   void FinishRpc(const Status &status);
 
   std::unique_ptr<AsyncServerContext> async_server_context_;