blob: 529dc4a48def12bb6971c7224ec9a1e6e88ccda4 [file] [log] [blame]
Sree Kuchibhotla1b38bb42015-12-14 17:22:38 -08001#!/usr/bin/env python
2# Copyright 2015, Google Inc.
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met:
8#
9# * Redistributions of source code must retain the above copyright
10# notice, this list of conditions and the following disclaimer.
11# * Redistributions in binary form must reproduce the above
12# copyright notice, this list of conditions and the following disclaimer
13# in the documentation and/or other materials provided with the
14# distribution.
15# * Neither the name of Google Inc. nor the names of its
16# contributors may be used to endorse or promote products derived from
17# this software without specific prior written permission.
18#
19# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30"""Run stress test in C++"""
31
32import argparse
33import atexit
34import dockerjob
35import itertools
36import jobset
37import json
38import multiprocessing
39import os
40import re
41import report_utils
42import subprocess
43import sys
44import tempfile
45import time
46import uuid
47
48# Docker doesn't clean up after itself, so we do it on exit.
49atexit.register(lambda: subprocess.call(['stty', 'echo']))
50
51ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
52os.chdir(ROOT)
53
54_DEFAULT_SERVER_PORT = 8080
55_DEFAULT_METRICS_PORT = 8081
56_DEFAULT_TEST_CASES = 'empty_unary:20,large_unary:20,client_streaming:20,server_streaming:20,empty_stream:20'
57_DEFAULT_NUM_CHANNELS_PER_SERVER = 5
58_DEFAULT_NUM_STUBS_PER_CHANNEL = 10
59
60# 15 mins default
61#_DEFAULT_TEST_DURATION_SECS = 900
62_DEFAULT_TEST_DURATION_SECS = 10
63
64class CXXLanguage:
65
66 def __init__(self):
67 self.client_cwd = None
68 self.server_cwd = None
69 self.safename = 'cxx'
70
71 def client_cmd(self, args):
72 return ['bins/opt/stress_test'] + args
73
74 def server_cmd(self, args):
75 return ['bins/opt/interop_server'] + args
76
77 def global_env(self):
78 return {}
79
80 def __str__(self):
81 return 'c++'
82
83
84_LANGUAGES = {'c++': CXXLanguage(),}
85
86# languages supported as cloud_to_cloud servers
87_SERVERS = ['c++']
88
89DOCKER_WORKDIR_ROOT = '/var/local/git/grpc'
90
91
92def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None):
93 """Wraps given cmdline array to create 'docker run' cmdline from it."""
94 docker_cmdline = ['docker', 'run', '-i', '--rm=true']
95
96 # turn environ into -e docker args
97 if environ:
98 for k, v in environ.iteritems():
99 docker_cmdline += ['-e', '%s=%s' % (k, v)]
100
101 # set working directory
102 workdir = DOCKER_WORKDIR_ROOT
103 if cwd:
104 workdir = os.path.join(workdir, cwd)
105 docker_cmdline += ['-w', workdir]
106
107 docker_cmdline += docker_args + [image] + cmdline
108 return docker_cmdline
109
110
111def bash_login_cmdline(cmdline):
112 """Creates bash -l -c cmdline from args list."""
113 # Use login shell:
114 # * rvm and nvm require it
115 # * makes error messages clearer if executables are missing
116 return ['bash', '-l', '-c', ' '.join(cmdline)]
117
118
119def _job_kill_handler(job):
120 if job._spec.container_name:
121 dockerjob.docker_kill(job._spec.container_name)
122 # When the job times out and we decide to kill it,
123 # we need to wait a before restarting the job
124 # to prevent "container name already in use" error.
125 # TODO(jtattermusch): figure out a cleaner way to to this.
126 time.sleep(2)
127
128
129def cloud_to_cloud_jobspec(language,
130 test_cases,
131 server_addresses,
132 test_duration_secs,
133 num_channels_per_server,
134 num_stubs_per_channel,
135 metrics_port,
136 docker_image=None):
137 """Creates jobspec for cloud-to-cloud interop test"""
138 cmdline = bash_login_cmdline(language.client_cmd([
139 '--test_cases=%s' % test_cases, '--server_addresses=%s' %
140 server_addresses, '--test_duration_secs=%s' % test_duration_secs,
141 '--num_stubs_per_channel=%s' % num_stubs_per_channel,
142 '--num_channels_per_server=%s' % num_channels_per_server,
143 '--metrics_port=%s' % metrics_port
144 ]))
145 print cmdline
146 cwd = language.client_cwd
147 environ = language.global_env()
148 if docker_image:
149 container_name = dockerjob.random_name('interop_client_%s' %
150 language.safename)
151 cmdline = docker_run_cmdline(
152 cmdline,
153 image=docker_image,
154 environ=environ,
155 cwd=cwd,
156 docker_args=['--net=host', '--name', container_name])
157 cwd = None
158
159 test_job = jobset.JobSpec(cmdline=cmdline,
160 cwd=cwd,
161 environ=environ,
162 shortname='cloud_to_cloud:%s:%s_server:stress_test' % (
163 language, server_name),
164 timeout_seconds=test_duration_secs * 2,
165 flake_retries=5 if args.allow_flakes else 0,
166 timeout_retries=2 if args.allow_flakes else 0,
167 kill_handler=_job_kill_handler)
168 test_job.container_name = container_name
169 return test_job
170
171
172def server_jobspec(language, docker_image, test_duration_secs):
173 """Create jobspec for running a server"""
174 container_name = dockerjob.random_name('interop_server_%s' %
175 language.safename)
176 cmdline = bash_login_cmdline(language.server_cmd(['--port=%s' %
177 _DEFAULT_SERVER_PORT]))
178 environ = language.global_env()
179 docker_cmdline = docker_run_cmdline(
180 cmdline,
181 image=docker_image,
182 cwd=language.server_cwd,
183 environ=environ,
184 docker_args=['-p', str(_DEFAULT_SERVER_PORT), '--name', container_name])
185
186 server_job = jobset.JobSpec(cmdline=docker_cmdline,
187 environ=environ,
188 shortname='interop_server_%s' % language,
189 timeout_seconds=test_duration_secs * 3)
190 server_job.container_name = container_name
191 return server_job
192
193
194def build_interop_image_jobspec(language, tag=None):
195 """Creates jobspec for building stress test docker image for a language"""
196 if not tag:
197 tag = 'grpc_interop_%s:%s' % (language.safename, uuid.uuid4())
198 env = {'INTEROP_IMAGE': tag,
199 'BASE_NAME': 'grpc_interop_%s' % language.safename}
200 env['TTY_FLAG'] = '-t'
201 build_job = jobset.JobSpec(cmdline=['tools/jenkins/build_interop_image.sh'],
202 environ=env,
203 shortname='build_docker_%s' % (language),
204 timeout_seconds=30 * 60)
205 build_job.tag = tag
206 return build_job
207
208
209def aggregate_http2_results(stdout):
210 match = re.search(r'\{"cases[^\]]*\]\}', stdout)
211 if not match:
212 return None
213
214 results = json.loads(match.group(0))
215 skipped = 0
216 passed = 0
217 failed = 0
218 failed_cases = []
219 for case in results['cases']:
220 if case.get('skipped', False):
221 skipped += 1
222 else:
223 if case.get('passed', False):
224 passed += 1
225 else:
226 failed += 1
227 failed_cases.append(case.get('name', 'NONAME'))
228 return {
229 'passed': passed,
230 'failed': failed,
231 'skipped': skipped,
232 'failed_cases': ', '.join(failed_cases),
233 'percent': 1.0 * passed / (passed + failed)
234 }
235
236
237argp = argparse.ArgumentParser(description='Run stress tests.')
238argp.add_argument('-l',
239 '--language',
240 choices=['all'] + sorted(_LANGUAGES),
241 nargs='+',
242 default=['all'],
243 help='Clients to run.')
244argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int)
245argp.add_argument(
246 '-s',
247 '--server',
248 choices=['all'] + sorted(_SERVERS),
249 action='append',
250 help='Run cloud_to_cloud servers in a separate docker ' + 'image.',
251 default=[])
252argp.add_argument(
253 '--override_server',
254 action='append',
255 type=lambda kv: kv.split('='),
256 help=
257 'Use servername=HOST:PORT to explicitly specify a server. E.g. '
258 'csharp=localhost:50000',
259 default=[])
260argp.add_argument('--test_duration_secs',
261 action='append',
262 help='The duration of the test in seconds',
263 default=[_DEFAULT_TEST_DURATION_SECS])
264argp.add_argument(
265 '--allow_flakes',
266 default=False,
267 action='store_const',
268 const=True,
269 help=
270 'Allow flaky tests to show as passing (re-runs failed tests up to five times)')
271
272args = argp.parse_args()
273
274servers = set(
275 s
276 for s in itertools.chain.from_iterable(_SERVERS if x == 'all' else [x]
277 for x in args.server))
278
279languages = set(_LANGUAGES[l]
280 for l in itertools.chain.from_iterable(_LANGUAGES.iterkeys(
281 ) if x == 'all' else [x] for x in args.language))
282
283docker_images = {}
284# languages for which to build docker images
285languages_to_build = set(
286 _LANGUAGES[k]
287 for k in set([str(l) for l in languages] + [s for s in servers]))
288build_jobs = []
289for l in languages_to_build:
290 job = build_interop_image_jobspec(l)
291 docker_images[str(l)] = job.tag
292 build_jobs.append(job)
293
294if build_jobs:
295 jobset.message('START', 'Building interop docker images.', do_newline=True)
296 num_failures, _ = jobset.run(build_jobs,
297 newline_on_success=True,
298 maxjobs=args.jobs)
299 if num_failures == 0:
300 jobset.message('SUCCESS',
301 'All docker images built successfully.',
302 do_newline=True)
303 else:
304 jobset.message('FAILED',
305 'Failed to build interop docker images.',
306 do_newline=True)
307 for image in docker_images.itervalues():
308 dockerjob.remove_image(image, skip_nonexistent=True)
309 sys.exit(1)
310
311# Start interop servers.
312server_jobs = {}
313server_addresses = {}
314try:
315 for s in servers:
316 lang = str(s)
317 spec = server_jobspec(_LANGUAGES[lang], docker_images.get(lang), _DEFAULT_TEST_DURATION_SECS)
318 job = dockerjob.DockerJob(spec)
319 server_jobs[lang] = job
320 server_addresses[lang] = ('localhost',
321 job.mapped_port(_DEFAULT_SERVER_PORT))
322
323 jobs = []
324
325 for server in args.override_server:
326 server_name = server[0]
327 (server_host, server_port) = server[1].split(':')
328 server_addresses[server_name] = (server_host, server_port)
329
330 for server_name, server_address in server_addresses.iteritems():
331 (server_host, server_port) = server_address
332 for language in languages:
333 test_job = cloud_to_cloud_jobspec(
334 language,
335 _DEFAULT_TEST_CASES,
336 ('%s:%s' % (server_host, server_port)),
337 _DEFAULT_TEST_DURATION_SECS,
338 _DEFAULT_NUM_CHANNELS_PER_SERVER,
339 _DEFAULT_NUM_STUBS_PER_CHANNEL,
340 _DEFAULT_METRICS_PORT,
341 docker_image=docker_images.get(str(language)))
342 jobs.append(test_job)
343
344 if not jobs:
345 print 'No jobs to run.'
346 for image in docker_images.itervalues():
347 dockerjob.remove_image(image, skip_nonexistent=True)
348 sys.exit(1)
349
350 num_failures, resultset = jobset.run(jobs,
351 newline_on_success=True,
352 maxjobs=args.jobs)
353 if num_failures:
354 jobset.message('FAILED', 'Some tests failed', do_newline=True)
355 else:
356 jobset.message('SUCCESS', 'All tests passed', do_newline=True)
357
358 report_utils.render_junit_xml_report(resultset, 'report.xml')
359
360 for name, job in resultset.iteritems():
361 if "http2" in name:
362 job[0].http2results = aggregate_http2_results(job[0].message)
363
364 report_utils.render_interop_html_report(
365 set([str(l) for l in languages]), servers, [], [], [], resultset,
366 num_failures, 0, 0)
367
368finally:
369 # Check if servers are still running.
370 for server, job in server_jobs.iteritems():
371 if not job.is_running():
372 print 'Server "%s" has exited prematurely.' % server
373
374 dockerjob.finish_jobs([j for j in server_jobs.itervalues()])
375
376 for image in docker_images.itervalues():
377 print 'Removing docker image %s' % image
378 dockerjob.remove_image(image)