Merge pull request #9180 from soltanmm-google/lodash_becomes-midash
Change grpcio_tools to grpcio-tools
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 22b2cd0..3e509e2 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -195,7 +195,8 @@
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& initial_server_config, size_t num_servers,
- int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) {
+ int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
+ const char* qps_server_target_override, bool configure_core_lists) {
// Log everything from the driver
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
@@ -241,9 +242,6 @@
}
}
- // Setup the hosts and core counts
- auto hosts_cores = get_hosts_and_cores(workers);
-
// if num_clients is set to <=0, do dynamic sizing: all workers
// except for servers are clients
if (num_clients <= 0) {
@@ -264,6 +262,11 @@
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
std::vector<ServerData> servers(num_servers);
+ std::unordered_map<string, std::deque<int>> hosts_cores;
+
+ if (configure_core_lists) {
+ hosts_cores = get_hosts_and_cores(workers);
+ }
for (size_t i = 0; i < num_servers; i++) {
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
workers[i].c_str(), i);
@@ -271,37 +274,36 @@
CreateChannel(workers[i], InsecureChannelCredentials()));
ServerConfig server_config = initial_server_config;
- char* host;
- char* driver_port;
- char* cli_target;
- gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
- string host_str(host);
int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit();
- if (server_core_limit == 0 && client_core_limit > 0) {
- // In this case, limit the server cores if it matches the
- // same host as one or more clients
- const auto& dq = hosts_cores.at(host_str);
- bool match = false;
- int limit = dq.size();
- for (size_t cli = 0; cli < num_clients; cli++) {
- if (host_str == get_host(workers[cli + num_servers])) {
- limit -= client_core_limit;
- match = true;
+ if (configure_core_lists) {
+ string host_str(get_host(workers[i]));
+ if (server_core_limit == 0 && client_core_limit > 0) {
+ // In this case, limit the server cores if it matches the
+ // same host as one or more clients
+ const auto& dq = hosts_cores.at(host_str);
+ bool match = false;
+ int limit = dq.size();
+ for (size_t cli = 0; cli < num_clients; cli++) {
+ if (host_str == get_host(workers[cli + num_servers])) {
+ limit -= client_core_limit;
+ match = true;
+ }
+ }
+ if (match) {
+ GPR_ASSERT(limit > 0);
+ server_core_limit = limit;
}
}
- if (match) {
- GPR_ASSERT(limit > 0);
- server_core_limit = limit;
- }
- }
- if (server_core_limit > 0) {
- auto& dq = hosts_cores.at(host_str);
- GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
- for (int core = 0; core < server_core_limit; core++) {
- server_config.add_core_list(dq.front());
- dq.pop_front();
+ if (server_core_limit > 0) {
+ auto& dq = hosts_cores.at(host_str);
+ GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
+ gpr_log(GPR_INFO, "Setting server core_list");
+ for (int core = 0; core < server_core_limit; core++) {
+ server_config.add_core_list(dq.front());
+ dq.pop_front();
+ }
}
}
@@ -315,11 +317,19 @@
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
- gpr_join_host_port(&cli_target, host, init_status.port());
- client_config.add_server_targets(cli_target);
- gpr_free(host);
- gpr_free(driver_port);
- gpr_free(cli_target);
+ if (qps_server_target_override != NULL &&
+ strlen(qps_server_target_override) > 0) {
+ // overriding the qps server target only works if there is 1 server
+ GPR_ASSERT(num_servers == 1);
+ client_config.add_server_targets(qps_server_target_override);
+ } else {
+ std::string host;
+ char* cli_target;
+ host = get_host(workers[i]);
+ gpr_join_host_port(&cli_target, host.c_str(), init_status.port());
+ client_config.add_server_targets(cli_target);
+ gpr_free(cli_target);
+ }
}
// Targets are all set by now
@@ -341,7 +351,8 @@
int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit();
- if ((server_core_limit > 0) || (client_core_limit > 0)) {
+ if (configure_core_lists &&
+ ((server_core_limit > 0) || (client_core_limit > 0))) {
auto& dq = hosts_cores.at(get_host(worker));
if (client_core_limit == 0) {
// limit client cores if it matches a server host
@@ -359,6 +370,7 @@
}
if (client_core_limit > 0) {
GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
+ gpr_log(GPR_INFO, "Setting client core_list");
for (int core = 0; core < client_core_limit; core++) {
per_client_config.add_core_list(dq.front());
dq.pop_front();
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 93f4370..b5c8152 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -45,7 +45,9 @@
std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ClientConfig& client_config, size_t num_clients,
const grpc::testing::ServerConfig& server_config, size_t num_servers,
- int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
+ int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
+ const char* qps_server_target_override = "",
+ bool configure_core_lists = true);
bool RunQuit();
} // namespace testing
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index 31b5917..da835b9 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -67,17 +67,25 @@
"range is narrower than the error_tolerance computed range, we "
"stop the search.");
+DEFINE_string(qps_server_target_override, "",
+ "Override QPS server target to configure in client configs."
+ "Only applicable if there is a single benchmark server.");
+DEFINE_bool(configure_core_lists, true,
+ "Provide 'core_list' parameters to workers. Value determined "
+ "by cores available and 'core_limit' parameters of the scenarios.");
+
namespace grpc {
namespace testing {
static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
bool* success) {
std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n";
- auto result =
- RunScenario(scenario.client_config(), scenario.num_clients(),
- scenario.server_config(), scenario.num_servers(),
- scenario.warmup_seconds(), scenario.benchmark_seconds(),
- scenario.spawn_local_worker_count());
+ auto result = RunScenario(
+ scenario.client_config(), scenario.num_clients(),
+ scenario.server_config(), scenario.num_servers(),
+ scenario.warmup_seconds(), scenario.benchmark_seconds(),
+ scenario.spawn_local_worker_count(),
+ FLAGS_qps_server_target_override.c_str(), FLAGS_configure_core_lists);
// Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here.
diff --git a/tools/run_tests/run_tests_matrix.py b/tools/run_tests/run_tests_matrix.py
index d954aad..df48099 100755
--- a/tools/run_tests/run_tests_matrix.py
+++ b/tools/run_tests/run_tests_matrix.py
@@ -243,14 +243,14 @@
def _runs_per_test_type(arg_str):
- """Auxiliary function to parse the "runs_per_test" flag."""
- try:
- n = int(arg_str)
- if n <= 0: raise ValueError
- return n
- except:
- msg = '\'{}\' is not a positive integer'.format(arg_str)
- raise argparse.ArgumentTypeError(msg)
+ """Auxiliary function to parse the "runs_per_test" flag."""
+ try:
+ n = int(arg_str)
+ if n <= 0: raise ValueError
+ return n
+ except:
+ msg = '\'{}\' is not a positive integer'.format(arg_str)
+ raise argparse.ArgumentTypeError(msg)
if __name__ == "__main__":
@@ -264,6 +264,11 @@
nargs='+',
default=[],
help='Filter targets to run by label with AND semantics.')
+ argp.add_argument('--exclude',
+ choices=_allowed_labels(),
+ nargs='+',
+ default=[],
+ help='Exclude targets with any of given labels.')
argp.add_argument('--build_only',
default=False,
action='store_const',
@@ -310,7 +315,8 @@
jobs = []
for job in all_jobs:
if not args.filter or all(filter in job.labels for filter in args.filter):
- jobs.append(job)
+ if not any(exclude_label in job.labels for exclude_label in args.exclude):
+ jobs.append(job)
if not jobs:
jobset.message('FAILED', 'No test suites match given criteria.',