blob: 77c0addb42efe55ed471003f84c168e0399aaee8 [file] [log] [blame]
Jan Tattermuschb2758442016-03-28 09:32:20 -07001#!/usr/bin/env python2.7
2# Copyright 2016, Google Inc.
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met:
8#
9# * Redistributions of source code must retain the above copyright
10# notice, this list of conditions and the following disclaimer.
11# * Redistributions in binary form must reproduce the above
12# copyright notice, this list of conditions and the following disclaimer
13# in the documentation and/or other materials provided with the
14# distribution.
15# * Neither the name of Google Inc. nor the names of its
16# contributors may be used to endorse or promote products derived from
17# this software without specific prior written permission.
18#
19# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31"""Run performance tests locally or remotely."""
32
33import argparse
34import jobset
35import multiprocessing
36import os
37import subprocess
38import sys
39import tempfile
40import time
41import uuid
42
43
44_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
45os.chdir(_ROOT)
46
47
48_REMOTE_HOST_USERNAME = 'jenkins'
49
50
51class CXXLanguage:
52
53 def __init__(self):
54 self.safename = 'cxx'
55
56 def scenarios(self):
57 # TODO(jtattermusch): add more scenarios
58 return {
59 # Scenario 1: generic async streaming ping-pong (contentionless latency)
60 'cpp_async_generic_streaming_ping_pong': [
61 '--rpc_type=STREAMING',
62 '--client_type=ASYNC_CLIENT',
63 '--server_type=ASYNC_GENERIC_SERVER',
64 '--outstanding_rpcs_per_channel=1',
65 '--client_channels=1',
66 '--bbuf_req_size=0',
67 '--bbuf_resp_size=0',
68 '--async_client_threads=1',
69 '--async_server_threads=1',
70 '--secure_test=true',
71 '--num_servers=1',
72 '--num_clients=1',
73 '--server_core_limit=0',
74 '--client_core_limit=0'],
75 # Scenario 5: Sync unary ping-pong with protobufs
76 'cpp_sync_unary_ping_pong_protobuf': [
77 '--rpc_type=UNARY',
78 '--client_type=SYNC_CLIENT',
79 '--server_type=SYNC_SERVER',
80 '--outstanding_rpcs_per_channel=1',
81 '--client_channels=1',
82 '--simple_req_size=0',
83 '--simple_resp_size=0',
84 '--secure_test=true',
85 '--num_servers=1',
86 '--num_clients=1',
87 '--server_core_limit=0',
88 '--client_core_limit=0']}
89
90 def __str__(self):
91 return 'c++'
92
93
94class CSharpLanguage:
95
96 def __init__(self):
97 self.safename = str(self)
98
99 def __str__(self):
100 return 'csharp'
101
102
103class NodeLanguage:
104
105 def __init__(self):
106 pass
107 self.safename = str(self)
108
109 def __str__(self):
110 return 'node'
111
112
113_LANGUAGES = {
114 'c++' : CXXLanguage(),
115 'csharp' : CSharpLanguage(),
116 'node' : NodeLanguage(),
117}
118
119
120class QpsWorkerJob:
121 """Encapsulates a qps worker server job."""
122
123 def __init__(self, spec, host_and_port):
124 self._spec = spec
125 self.host_and_port = host_and_port
126 self._job = jobset.Job(spec, bin_hash=None, newline_on_success=True, travis=True, add_env={})
127
128 def is_running(self):
129 """Polls a job and returns True if given job is still running."""
130 return self._job.state(jobset.NoCache()) == jobset._RUNNING
131
132 def kill(self):
133 return self._job.kill()
134
135
136def create_qpsworker_job(language, port=10000, remote_host=None):
137 # TODO: support more languages
138 cmd = 'bins/opt/qps_worker --driver_port=%s' % port
139 if remote_host:
140 user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
141 cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && %s"' % (user_at_host, cmd)
142 host_and_port='%s:%s' % (remote_host, port)
143 else:
144 host_and_port='localhost:%s' % port
145
146 jobspec = jobset.JobSpec(
147 cmdline=[cmd],
148 shortname='qps_worker',
149 timeout_seconds=15*60,
150 shell=True)
151 return QpsWorkerJob(jobspec, host_and_port)
152
153
154def create_scenario_jobspec(scenario_name, driver_args, workers, remote_host=None):
155 """Runs one scenario using QPS driver."""
156 # setting QPS_WORKERS env variable here makes sure it works with SSH too.
157 cmd = 'QPS_WORKERS="%s" bins/opt/qps_driver ' % ','.join(workers)
158 cmd += ' '.join(driver_args)
159 if remote_host:
160 user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
161 cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && %s"' % (user_at_host, cmd)
162
163 return jobset.JobSpec(
164 cmdline=[cmd],
165 shortname='qps_driver.%s' % scenario_name,
166 timeout_seconds=3*60,
167 shell=True,
168 verbose_success=True)
169
170
171def archive_repo():
172 """Archives local version of repo including submodules."""
173 # TODO: also archive grpc-go and grpc-java repos
174 archive_job = jobset.JobSpec(
175 cmdline=['tar', '-cf', '../grpc.tar', '../grpc/'],
176 shortname='archive_repo',
177 timeout_seconds=3*60)
178
179 jobset.message('START', 'Archiving local repository.', do_newline=True)
180 num_failures, _ = jobset.run(
181 [archive_job], newline_on_success=True, maxjobs=1)
182 if num_failures == 0:
183 jobset.message('SUCCESS',
184 'Archive with local repository create successfully.',
185 do_newline=True)
186 else:
187 jobset.message('FAILED', 'Failed to archive local repository.',
188 do_newline=True)
189 sys.exit(1)
190
191
192def prepare_remote_hosts(hosts):
193 """Prepares remote hosts."""
194 prepare_jobs = []
195 for host in hosts:
196 user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)
197 prepare_jobs.append(
198 jobset.JobSpec(
199 cmdline=['tools/run_tests/performance/remote_host_prepare.sh'],
200 shortname='remote_host_prepare.%s' % host,
201 environ = {'USER_AT_HOST': user_at_host},
202 timeout_seconds=3*60))
203 jobset.message('START', 'Preparing remote hosts.', do_newline=True)
204 num_failures, _ = jobset.run(
205 prepare_jobs, newline_on_success=True, maxjobs=10)
206 if num_failures == 0:
207 jobset.message('SUCCESS',
208 'Remote hosts ready to start build.',
209 do_newline=True)
210 else:
211 jobset.message('FAILED', 'Failed to prepare remote hosts.',
212 do_newline=True)
213 sys.exit(1)
214
215
216def build_on_remote_hosts(hosts, build_local=False):
217 """Builds performance worker on remote hosts."""
218 build_timeout = 15*60
219 build_jobs = []
220 for host in hosts:
221 user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)
222 build_jobs.append(
223 jobset.JobSpec(
224 cmdline=['tools/run_tests/performance/remote_host_build.sh'],
225 shortname='remote_host_build.%s' % host,
226 environ = {'USER_AT_HOST': user_at_host, 'CONFIG': 'opt'},
227 timeout_seconds=build_timeout))
228 if build_local:
229 # Build locally as well
230 build_jobs.append(
231 jobset.JobSpec(
232 cmdline=['tools/run_tests/performance/build_performance.sh'],
233 shortname='local_build',
234 environ = {'CONFIG': 'opt'},
235 timeout_seconds=build_timeout))
236 jobset.message('START', 'Building on remote hosts.', do_newline=True)
237 num_failures, _ = jobset.run(
238 build_jobs, newline_on_success=True, maxjobs=10)
239 if num_failures == 0:
240 jobset.message('SUCCESS',
241 'Build on remote hosts was successful.',
242 do_newline=True)
243 else:
244 jobset.message('FAILED', 'Failed to build on remote hosts.',
245 do_newline=True)
246 sys.exit(1)
247
248
249def start_qpsworkers(worker_hosts):
250 """Starts QPS workers as background jobs."""
251 if not worker_hosts:
252 # run two workers locally
253 workers=[(None, 10000), (None, 10010)]
254 elif len(worker_hosts) == 1:
255 # run two workers on the remote host
256 workers=[(worker_hosts[0], 10000), (worker_hosts[0], 10010)]
257 else:
258 # run one worker per each remote host
259 workers=[(worker_host, 10000) for worker_host in worker_hosts]
260
261 return [create_qpsworker_job(CXXLanguage(),
262 port=worker[1],
263 remote_host=worker[0])
264 for worker in workers]
265
266
267def create_scenarios(languages, workers, remote_host=None):
268 """Create jobspecs for scenarios to run."""
269 scenarios = []
270 for language in languages:
271 for scenario_name, driver_args in language.scenarios().iteritems():
272 scenario = create_scenario_jobspec(scenario_name,
273 driver_args,
274 workers,
275 remote_host=remote_host)
276 scenarios.append(scenario)
277
278 # the very last scenario requests shutting down the workers.
279 scenarios.append(create_scenario_jobspec('quit_workers',
280 ['--quit=true'],
281 workers,
282 remote_host=remote_host))
283 return scenarios
284
285
286def finish_qps_workers(jobs):
287 """Waits for given jobs to finish and eventually kills them."""
288 retries = 0
289 while any(job.is_running() for job in jobs):
290 for job in qpsworker_jobs:
291 if job.is_running():
292 print 'QPS worker "%s" is still running.' % job.host_and_port
293 if retries > 10:
294 print 'Killing all QPS workers.'
295 for job in jobs:
296 job.kill()
297 retries += 1
298 time.sleep(3)
299 print 'All QPS workers finished.'
300
301
302argp = argparse.ArgumentParser(description='Run performance tests.')
303argp.add_argument('--remote_driver_host',
304 default=None,
305 help='Run QPS driver on given host. By default, QPS driver is run locally.')
306argp.add_argument('--remote_worker_host',
307 nargs='+',
308 default=[],
309 help='Worker hosts where to start QPS workers.')
310
311args = argp.parse_args()
312
313# Put together set of remote hosts where to run and build
314remote_hosts = set()
315if args.remote_worker_host:
316 for host in args.remote_worker_host:
317 remote_hosts.add(host)
318if args.remote_driver_host:
319 remote_hosts.add(args.remote_driver_host)
320
321if remote_hosts:
322 archive_repo()
323 prepare_remote_hosts(remote_hosts)
324
325build_local = False
326if not args.remote_driver_host:
327 build_local = True
328build_on_remote_hosts(remote_hosts, build_local=build_local)
329
330qpsworker_jobs = start_qpsworkers(args.remote_worker_host)
331
332worker_addresses = [job.host_and_port for job in qpsworker_jobs]
333
334try:
335 scenarios = create_scenarios(languages=[CXXLanguage()],
336 workers=worker_addresses,
337 remote_host=args.remote_driver_host)
338 if not scenarios:
339 raise Exception('No scenarios to run')
340
341 jobset.message('START', 'Running scenarios.', do_newline=True)
342 num_failures, _ = jobset.run(
343 scenarios, newline_on_success=True, maxjobs=1)
344 if num_failures == 0:
345 jobset.message('SUCCESS',
346 'All scenarios finished successfully.',
347 do_newline=True)
348 else:
349 jobset.message('FAILED', 'Some of the scenarios failed.',
350 do_newline=True)
351 sys.exit(1)
352finally:
353 finish_qps_workers(qpsworker_jobs)