Async client progress
diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc
index 8770070..0be01e1 100644
--- a/test/cpp/qps/client.cc
+++ b/test/cpp/qps/client.cc
@@ -63,118 +63,26 @@
class SynchronousClient GRPC_FINAL : public Client {
public:
- SynchronousClient(const ClientConfig& config) : timer_(new Timer) {
- for (int i = 0; i < config.client_channels(); i++) {
- channels_.push_back(ClientChannelInfo(
- config.server_targets(i % config.server_targets_size()), config));
- auto* stub = channels_.back().get_stub();
- for (int j = 0; j < config.outstanding_rpcs_per_channel(); j++) {
- threads_.emplace_back(new Thread(stub, config));
- }
- }
+ SynchronousClient(const ClientConfig& config) : Client(config) {
+ size_t num_threads = config.outstanding_rpcs_per_channel() * config.client_channels();
+ responses_.resize(num_threads);
+ StartThreads(num_threads);
}
- ClientStats Mark() {
- Histogram latencies;
- std::vector<Histogram> to_merge(threads_.size());
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->BeginSwap(&to_merge[i]);
- }
- std::unique_ptr<Timer> timer(new Timer);
- timer_.swap(timer);
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->EndSwap();
- latencies.Merge(&to_merge[i]);
- }
+ ~SynchronousClient() {
+ EndThreads();
+ }
- auto timer_result = timer->Mark();
-
- ClientStats stats;
- latencies.FillProto(stats.mutable_latencies());
- stats.set_time_elapsed(timer_result.wall);
- stats.set_time_system(timer_result.system);
- stats.set_time_user(timer_result.user);
- return stats;
+ void ThreadFunc(Histogram* histogram, size_t thread_idx) {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ double start = Timer::Now();
+ grpc::ClientContext context;
+ grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]);
+ histogram->Add((Timer::Now() - start) * 1e9);
}
private:
- class Thread {
- public:
- Thread(TestService::Stub* stub, const ClientConfig& config)
- : stub_(stub),
- config_(config),
- done_(false),
- new_(nullptr),
- impl_([this]() {
- SimpleRequest request;
- SimpleResponse response;
- request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
- request.set_response_size(config_.payload_size());
- for (;;) {
- {
- std::lock_guard<std::mutex> g(mu_);
- if (done_) return;
- if (new_) {
- new_->Swap(&histogram_);
- new_ = nullptr;
- cv_.notify_one();
- }
- }
- double start = Timer::Now();
- grpc::ClientContext context;
- grpc::Status s = stub_->UnaryCall(&context, request, &response);
- histogram_.Add((Timer::Now() - start) * 1e9);
- }
- }) {}
-
- ~Thread() {
- {
- std::lock_guard<std::mutex> g(mu_);
- done_ = true;
- }
- impl_.join();
- }
-
- void BeginSwap(Histogram* n) {
- std::lock_guard<std::mutex> g(mu_);
- new_ = n;
- }
-
- void EndSwap() {
- std::unique_lock<std::mutex> g(mu_);
- cv_.wait(g, [this]() { return new_ == nullptr; });
- }
-
- private:
- Thread(const Thread&);
- Thread& operator=(const Thread&);
-
- TestService::Stub* stub_;
- ClientConfig config_;
- std::mutex mu_;
- std::condition_variable cv_;
- bool done_;
- Histogram* new_;
- Histogram histogram_;
- std::thread impl_;
- };
-
- class ClientChannelInfo {
- public:
- explicit ClientChannelInfo(const grpc::string& target,
- const ClientConfig& config)
- : channel_(CreateTestChannel(target, config.enable_ssl())),
- stub_(TestService::NewStub(channel_)) {}
- ChannelInterface* get_channel() { return channel_.get(); }
- TestService::Stub* get_stub() { return stub_.get(); }
-
- private:
- std::shared_ptr<ChannelInterface> channel_;
- std::unique_ptr<TestService::Stub> stub_;
- };
- std::vector<ClientChannelInfo> channels_;
- std::vector<std::unique_ptr<Thread>> threads_;
- std::unique_ptr<Timer> timer_;
+ std::vector<SimpleResponse> responses_;
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {