Merge branch 'master' into poll_stat
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index df5b702..0aabb4a 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -228,6 +228,7 @@
/* TODO: sreek - This will no longer be needed. Use polling_type set */
int is_non_listening_server_cq;
int num_pluckers;
+ int num_polls;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@@ -294,6 +295,7 @@
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
+ cc->num_polls = 0;
gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
@@ -310,6 +312,14 @@
return cc->completion_type;
}
+int grpc_get_cq_poll_num(grpc_completion_queue *cc) {
+ int cur_num_polls;
+ gpr_mu_lock(cc->mu);
+ cur_num_polls = cc->num_polls;
+ gpr_mu_unlock(cc->mu);
+ return cur_num_polls;
+}
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@@ -585,6 +595,7 @@
dump_pending_tags(cc);
break;
}
+ cc->num_polls++;
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
NULL, now, deadline);
if (err != GRPC_ERROR_NONE) {
@@ -765,6 +776,7 @@
dump_pending_tags(cc);
break;
}
+ cc->num_polls++;
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
&worker, now, deadline);
if (err != GRPC_ERROR_NONE) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 8d9ce2e..4e06980 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -101,6 +101,8 @@
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
+int grpc_get_cq_poll_num(grpc_completion_queue *cc);
+
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 02b156d..1f4569e 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -244,6 +244,10 @@
// Number of requests that succeeded/failed
double successful_requests_per_second = 13;
double failed_requests_per_second = 14;
+
+ // Number of polls called inside completion queue per request
+ double client_polls_per_request = 15;
+ double server_polls_per_request = 16;
}
// Results of a single benchmark scenario.
diff --git a/src/proto/grpc/testing/stats.proto b/src/proto/grpc/testing/stats.proto
index 8001416..e236cf1 100644
--- a/src/proto/grpc/testing/stats.proto
+++ b/src/proto/grpc/testing/stats.proto
@@ -47,6 +47,9 @@
// change in idle time of the server (data from proc/stat)
uint64 idle_cpu_time = 5;
+
+ // Number of polls called inside completion queue
+ uint64 cq_poll_count = 6;
}
// Histogram params based on grpc/support/histogram.c
@@ -81,4 +84,7 @@
// Number of failed requests (one row per status code seen)
repeated RequestResultCount request_results = 5;
+
+ // Number of polls called inside completion queue
+ uint64 cq_poll_count = 6;
}
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index 98aca1c..aa71c2a 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -100,6 +100,12 @@
}
}
+ void AddToLabel(std::ostream& out, benchmark::State& state) {
+ BaseFixture::AddToLabel(out, state);
+ out << " polls/iter:"
+ << (double)grpc_get_cq_poll_num(this->cq()->cq()) / state.iterations();
+ }
+
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }
@@ -212,6 +218,12 @@
}
}
+ void AddToLabel(std::ostream& out, benchmark::State& state) {
+ BaseFixture::AddToLabel(out, state);
+ out << " polls/iter:"
+ << (double)grpc_get_cq_poll_num(this->cq()->cq()) / state.iterations();
+ }
+
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }
@@ -245,7 +257,7 @@
void AddToLabel(std::ostream& out, benchmark::State& state) {
EndpointPairFixture::AddToLabel(out, state);
out << " writes/iter:"
- << ((double)stats_.num_writes / (double)state.iterations());
+ << (double)stats_.num_writes / (double)state.iterations();
}
private:
diff --git a/test/cpp/microbenchmarks/helpers.h b/test/cpp/microbenchmarks/helpers.h
index 7360a1c..66bf976 100644
--- a/test/cpp/microbenchmarks/helpers.h
+++ b/test/cpp/microbenchmarks/helpers.h
@@ -35,6 +35,7 @@
#define TEST_CPP_MICROBENCHMARKS_COUNTERS_H
#include <sstream>
+#include <string>
extern "C" {
#include <grpc/support/port_platform.h>
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index c3197eb..bebe1b9 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -54,6 +54,10 @@
#include "test/cpp/qps/usage_timer.h"
#include "test/cpp/util/create_test_channel.h"
+extern "C" {
+#include "src/core/lib/surface/completion_queue.h"
+}
+
namespace grpc {
namespace testing {
@@ -150,7 +154,8 @@
Client()
: timer_(new UsageTimer),
interarrival_timer_(),
- started_requests_(false) {
+ started_requests_(false),
+ last_reset_poll_count_(0) {
gpr_event_init(&start_requests_);
}
virtual ~Client() {}
@@ -162,6 +167,8 @@
MaybeStartRequests();
+ int cur_poll_count = GetPollCount();
+ int poll_count = cur_poll_count - last_reset_poll_count_;
if (reset) {
std::vector<Histogram> to_merge(threads_.size());
std::vector<StatusHistogram> to_merge_status(threads_.size());
@@ -176,6 +183,7 @@
MergeStatusHistogram(to_merge_status[i], &statuses);
}
timer_result = timer->Mark();
+ last_reset_poll_count_ = cur_poll_count;
} else {
// merge snapshots of each thread histogram
for (size_t i = 0; i < threads_.size(); i++) {
@@ -195,6 +203,7 @@
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
+ stats.set_cq_poll_count(poll_count);
return stats;
}
@@ -209,6 +218,11 @@
}
}
+ virtual int GetPollCount() {
+ // For sync client.
+ return 0;
+ }
+
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
@@ -351,6 +365,8 @@
gpr_event start_requests_;
bool started_requests_;
+ int last_reset_poll_count_;
+
void MaybeStartRequests() {
if (!started_requests_) {
started_requests_ = true;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 82c3356..6b8f736 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -205,6 +205,14 @@
}
}
+ int GetPollCount() override {
+ int count = 0;
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ count += grpc_get_cq_poll_num((*cq)->cq());
+ }
+ return count;
+ }
+
protected:
const int num_async_threads_;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 74fe366..ace5028 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -112,6 +112,8 @@
static double WallTime(ClientStats s) { return s.time_elapsed(); }
static double SystemTime(ClientStats s) { return s.time_system(); }
static double UserTime(ClientStats s) { return s.time_user(); }
+static double CliPollCount(ClientStats s) { return s.cq_poll_count(); }
+static double SvrPollCount(ServerStats s) { return s.cq_poll_count(); }
static double ServerWallTime(ServerStats s) { return s.time_elapsed(); }
static double ServerSystemTime(ServerStats s) { return s.time_system(); }
static double ServerUserTime(ServerStats s) { return s.time_user(); }
@@ -180,6 +182,11 @@
result->mutable_summary()->set_failed_requests_per_second(failures /
time_estimate);
}
+
+ result->mutable_summary()->set_client_polls_per_request(
+ sum(result->client_stats(), CliPollCount) / histogram.Count());
+ result->mutable_summary()->set_server_polls_per_request(
+ sum(result->server_stats(), SvrPollCount) / histogram.Count());
}
std::unique_ptr<ScenarioResult> RunScenario(
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index a906137..f00f771 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -94,6 +94,7 @@
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
GetReporter()->ReportCpuUsage(*result);
+ GetReporter()->ReportPollCount(*result);
for (int i = 0; *success && i < result->client_success_size(); i++) {
*success = result->client_success(i);
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index a9130bf..8bb4c9a 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -80,6 +80,12 @@
}
}
+void CompositeReporter::ReportPollCount(const ScenarioResult& result) {
+ for (size_t i = 0; i < reporters_.size(); ++i) {
+ reporters_[i]->ReportPollCount(result);
+ }
+}
+
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
if (result.summary().failed_requests_per_second() > 0) {
@@ -121,6 +127,13 @@
result.summary().server_cpu_usage());
}
+void GprLogReporter::ReportPollCount(const ScenarioResult& result) {
+ gpr_log(GPR_INFO, "Client Polls per Request: %.2f",
+ result.summary().client_polls_per_request());
+ gpr_log(GPR_INFO, "Server Polls per Request: %.2f",
+ result.summary().server_polls_per_request());
+}
+
void JsonReporter::ReportQPS(const ScenarioResult& result) {
grpc::string json_string =
SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
@@ -145,6 +158,10 @@
// NOP - all reporting is handled by ReportQPS.
}
+void JsonReporter::ReportPollCount(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
void RpcReporter::ReportQPS(const ScenarioResult& result) {
grpc::ClientContext context;
grpc::Status status;
@@ -177,5 +194,9 @@
// NOP - all reporting is handled by ReportQPS.
}
+void RpcReporter::ReportPollCount(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 1749be9..621fa7c 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -76,6 +76,9 @@
/** Reports server cpu usage. */
virtual void ReportCpuUsage(const ScenarioResult& result) = 0;
+ /** Reports client and server poll usage inside completion queue. */
+ virtual void ReportPollCount(const ScenarioResult& result) = 0;
+
private:
const string name_;
};
@@ -93,6 +96,7 @@
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
+ void ReportPollCount(const ScenarioResult& result) override;
private:
std::vector<std::unique_ptr<Reporter> > reporters_;
@@ -109,6 +113,7 @@
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
+ void ReportPollCount(const ScenarioResult& result) override;
};
/** Dumps the report to a JSON file. */
@@ -123,6 +128,7 @@
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
+ void ReportPollCount(const ScenarioResult& result) override;
const string report_file_;
};
@@ -138,6 +144,7 @@
void ReportLatency(const ScenarioResult& result) override;
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
+ void ReportPollCount(const ScenarioResult& result) override;
std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
};
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 8fbf37a..0077704 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -44,12 +44,17 @@
#include "test/core/util/port.h"
#include "test/cpp/qps/usage_timer.h"
+extern "C" {
+#include "src/core/lib/surface/completion_queue.h"
+}
+
namespace grpc {
namespace testing {
class Server {
public:
- explicit Server(const ServerConfig& config) : timer_(new UsageTimer) {
+ explicit Server(const ServerConfig& config)
+ : timer_(new UsageTimer), last_reset_poll_count_(0) {
cores_ = gpr_cpu_num_cores();
if (config.port()) {
port_ = config.port();
@@ -62,10 +67,13 @@
ServerStats Mark(bool reset) {
UsageTimer::Result timer_result;
+ int cur_poll_count = GetPollCount();
+ int poll_count = cur_poll_count - last_reset_poll_count_;
if (reset) {
std::unique_ptr<UsageTimer> timer(new UsageTimer);
timer.swap(timer_);
timer_result = timer->Mark();
+ last_reset_poll_count_ = cur_poll_count;
} else {
timer_result = timer_->Mark();
}
@@ -76,6 +84,7 @@
stats.set_time_user(timer_result.user);
stats.set_total_cpu_time(timer_result.total_cpu_time);
stats.set_idle_cpu_time(timer_result.idle_cpu_time);
+ stats.set_cq_poll_count(poll_count);
return stats;
}
@@ -106,10 +115,16 @@
}
}
+ virtual int GetPollCount() {
+ // For sync server.
+ return 0;
+ }
+
private:
int port_;
int cores_;
std::unique_ptr<UsageTimer> timer_;
+ int last_reset_poll_count_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 84f1579..3403ffd 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -186,6 +186,14 @@
shutdown_thread.join();
}
+ int GetPollCount() override {
+ int count = 0;
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
+ count += grpc_get_cq_poll_num((*cq)->cq());
+ }
+ return count;
+ }
+
private:
void ShutdownThreadFunc() {
// TODO (vpai): Remove this deadline and allow Shutdown to finish properly