Support bidirectional stream RPC

Add bidistream test

Add bad request tests

Replace deprecated DynamicMessageFactory constructor
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index 1101abe..4d045da 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -67,6 +67,8 @@
                  const grpc::string& method,
                  const OutgoingMetadataContainer& metadata)
     : stub_(new grpc::GenericStub(channel)) {
+  gpr_mu_init(&write_mu_);
+  gpr_cv_init(&write_cv_);
   if (!metadata.empty()) {
     for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
          iter != metadata.end(); ++iter) {
@@ -80,6 +82,11 @@
   GPR_ASSERT(ok);
 }
 
+CliCall::~CliCall() {
+  gpr_cv_destroy(&write_cv_);
+  gpr_mu_destroy(&write_mu_);
+}
+
 void CliCall::Write(const grpc::string& request) {
   void* got_tag;
   bool ok;
@@ -126,6 +133,81 @@
   GPR_ASSERT(ok);
 }
 
+void CliCall::WriteAndWait(const grpc::string& request) {
+  grpc_slice s = grpc_slice_from_copied_string(request.c_str());
+  grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
+  grpc::ByteBuffer send_buffer(&req_slice, 1);
+
+  gpr_mu_lock(&write_mu_);
+  call_->Write(send_buffer, tag(2));
+  write_done_ = false;
+  while (!write_done_) {
+    gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+  }
+  gpr_mu_unlock(&write_mu_);
+}
+
+void CliCall::WritesDoneAndWait() {
+  gpr_mu_lock(&write_mu_);
+  call_->WritesDone(tag(4));
+  write_done_ = false;
+  while (!write_done_) {
+    gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+  }
+  gpr_mu_unlock(&write_mu_);
+}
+
+bool CliCall::ReadAndMaybeNotifyWrite(
+    grpc::string* response,
+    IncomingMetadataContainer* server_initial_metadata) {
+  void* got_tag;
+  bool ok;
+  grpc::ByteBuffer recv_buffer;
+
+  call_->Read(&recv_buffer, tag(3));
+  bool cq_result = cq_.Next(&got_tag, &ok);
+
+  while (got_tag != tag(3)) {
+    gpr_mu_lock(&write_mu_);
+    write_done_ = true;
+    gpr_cv_signal(&write_cv_);
+    gpr_mu_unlock(&write_mu_);
+
+    cq_result = cq_.Next(&got_tag, &ok);
+    if (got_tag == tag(2)) {
+      GPR_ASSERT(ok);
+    }
+  }
+
+  if (!cq_result || !ok) {
+    // If the RPC is ended on the server side, we should still wait for the
+    // pending write on the client side to be done.
+    if (!ok) {
+      gpr_mu_lock(&write_mu_);
+      if (!write_done_) {
+        cq_.Next(&got_tag, &ok);
+        GPR_ASSERT(got_tag != tag(2));
+        write_done_ = true;
+        gpr_cv_signal(&write_cv_);
+      }
+      gpr_mu_unlock(&write_mu_);
+    }
+    return false;
+  }
+
+  std::vector<grpc::Slice> slices;
+  recv_buffer.Dump(&slices);
+  response->clear();
+  for (size_t i = 0; i < slices.size(); i++) {
+    response->append(reinterpret_cast<const char*>(slices[i].begin()),
+                     slices[i].size());
+  }
+  if (server_initial_metadata) {
+    *server_initial_metadata = ctx_.GetServerInitialMetadata();
+  }
+  return true;
+}
+
 Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
   void* got_tag;
   bool ok;