Support client streaming
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index a02a8b2..d9232ec 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -37,8 +37,6 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/generic/generic_stub.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@@ -50,49 +48,61 @@
void* tag(int i) { return (void*)(intptr_t)i; }
} // namespace
+enum CliCall::CallStatus : intptr_t { CREATE, PROCESS, FINISH };
+
Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
const grpc::string& method, const grpc::string& request,
grpc::string* response,
const OutgoingMetadataContainer& metadata,
IncomingMetadataContainer* server_initial_metadata,
IncomingMetadataContainer* server_trailing_metadata) {
- std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel));
- grpc::ClientContext ctx;
+ CliCall call(channel, method, metadata);
+ call.Write(request);
+ call.WritesDone();
+ call.Read(response, server_initial_metadata);
+ return call.Finish(server_trailing_metadata);
+}
+
+CliCall::CliCall(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method,
+ const OutgoingMetadataContainer& metadata)
+ : stub_(new grpc::GenericStub(channel)) {
if (!metadata.empty()) {
for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
iter != metadata.end(); ++iter) {
- ctx.AddMetadata(iter->first, iter->second);
+ ctx_.AddMetadata(iter->first, iter->second);
}
}
- grpc::CompletionQueue cq;
- std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call(
- stub->Call(&ctx, method, &cq, tag(1)));
+ call_ = stub_->Call(&ctx_, method, &cq_, tag(1));
void* got_tag;
bool ok;
- cq.Next(&got_tag, &ok);
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+void CliCall::Write(const grpc::string& request) {
+ void* got_tag;
+ bool ok;
grpc_slice s = grpc_slice_from_copied_string(request.c_str());
grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
grpc::ByteBuffer send_buffer(&req_slice, 1);
- call->Write(send_buffer, tag(2));
- cq.Next(&got_tag, &ok);
+ call_->Write(send_buffer, tag(2));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
- call->WritesDone(tag(3));
- cq.Next(&got_tag, &ok);
- GPR_ASSERT(ok);
- grpc::ByteBuffer recv_buffer;
- call->Read(&recv_buffer, tag(4));
- cq.Next(&got_tag, &ok);
- if (!ok) {
- std::cout << "Failed to read response." << std::endl;
- }
- grpc::Status status;
- call->Finish(&status, tag(5));
- cq.Next(&got_tag, &ok);
- GPR_ASSERT(ok);
+}
- if (status.ok()) {
+void CliCall::Read(grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+
+ grpc::ByteBuffer recv_buffer;
+ call_->Read(&recv_buffer, tag(4));
+ cq_.Next(&got_tag, &ok);
+ if (!ok) {
+ fprintf(stderr, "Failed to read response.");
+ } else {
std::vector<grpc::Slice> slices;
(void)recv_buffer.Dump(&slices);
@@ -101,10 +111,33 @@
response->append(reinterpret_cast<const char*>(slices[i].begin()),
slices[i].size());
}
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ }
+}
+
+void CliCall::WritesDone() {
+ void* got_tag;
+ bool ok;
+
+ call_->WritesDone(tag(3));
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(ok);
+}
+
+Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::Status status;
+
+ call_->Finish(&status, tag(5));
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(ok);
+ if (server_trailing_metadata) {
+ *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
}
- *server_initial_metadata = ctx.GetServerInitialMetadata();
- *server_trailing_metadata = ctx.GetServerTrailingMetadata();
return status;
}