Build and run per-language containers for interop tests
diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py
index a2fb124..b5c1043 100755
--- a/tools/run_tests/run_interop_tests.py
+++ b/tools/run_tests/run_interop_tests.py
@@ -31,14 +31,21 @@
 """Run interop (cross-language) tests in parallel."""
 
 import argparse
+import dockerjob
 import itertools
 import xml.etree.cElementTree as ET
 import jobset
 import os
 import subprocess
 import sys
+import tempfile
 import time
+import uuid
 
+ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
+os.chdir(ROOT)
+
+_DEFAULT_SERVER_PORT=8080
 
 _CLOUD_TO_PROD_BASE_ARGS = [
     '--server_host_override=grpc-test.sandbox.google.com',
@@ -61,6 +68,7 @@
   def __init__(self):
     self.client_cmdline_base = ['bins/opt/interop_client']
     self.client_cwd = None
+    self.server_cwd = None
 
   def cloud_to_prod_args(self):
     return (self.client_cmdline_base + _CLOUD_TO_PROD_BASE_ARGS +
@@ -73,6 +81,9 @@
   def cloud_to_prod_env(self):
     return None
 
+  def server_args(self):
+    return ['bins/opt/interop_server', '--enable_ssl']
+
   def __str__(self):
     return 'c++'
 
@@ -82,6 +93,7 @@
   def __init__(self):
     self.client_cmdline_base = ['mono', 'Grpc.IntegrationTesting.Client.exe']
     self.client_cwd = 'src/csharp/Grpc.IntegrationTesting.Client/bin/Debug'
+    self.server_cwd = 'src/csharp/Grpc.IntegrationTesting.Server/bin/Debug'
 
   def cloud_to_prod_args(self):
     return (self.client_cmdline_base + _CLOUD_TO_PROD_BASE_ARGS +
@@ -94,15 +106,44 @@
   def cloud_to_prod_env(self):
     return _SSL_CERT_ENV
 
+  def server_args(self):
+    return ['mono', 'Grpc.IntegrationTesting.Server.exe', '--use_tls']
+
   def __str__(self):
     return 'csharp'
 
 
+class JavaLanguage:
+
+  def __init__(self):
+    self.client_cmdline_base = ['./run-test-client.sh']
+    self.client_cwd = '../grpc-java'
+    self.server_cwd = '../grpc-java'
+
+  def cloud_to_prod_args(self):
+    return (self.client_cmdline_base + _CLOUD_TO_PROD_BASE_ARGS +
+            ['--use_tls=true'])
+
+  def cloud_to_cloud_args(self):
+    return (self.client_cmdline_base + _CLOUD_TO_CLOUD_BASE_ARGS +
+            ['--use_tls=true', '--use_test_ca=true'])
+
+  def cloud_to_prod_env(self):
+    return None
+
+  def server_args(self):
+    return ['./run-test-server.sh', '--use_tls=true']
+
+  def __str__(self):
+    return 'java'
+
+
 class NodeLanguage:
 
   def __init__(self):
     self.client_cmdline_base = ['node', 'src/node/interop/interop_client.js']
     self.client_cwd = None
+    self.server_cwd = None
 
   def cloud_to_prod_args(self):
     return (self.client_cmdline_base + _CLOUD_TO_PROD_BASE_ARGS +
@@ -115,6 +156,9 @@
   def cloud_to_prod_env(self):
     return _SSL_CERT_ENV
 
+  def server_args(self):
+    return ['node', 'src/node/interop/interop_server.js', '--use_tls=true']
+
   def __str__(self):
     return 'node'
 
@@ -145,6 +189,7 @@
   def __init__(self):
     self.client_cmdline_base = ['ruby', 'src/ruby/bin/interop/interop_client.rb']
     self.client_cwd = None
+    self.server_cwd = None
 
   def cloud_to_prod_args(self):
     return (self.client_cmdline_base + _CLOUD_TO_PROD_BASE_ARGS +
@@ -157,57 +202,144 @@
   def cloud_to_prod_env(self):
     return _SSL_CERT_ENV
 
+  def server_args(self):
+    return ['ruby', 'src/ruby/bin/interop/interop_server.rb', '--use_tls']
+
   def __str__(self):
     return 'ruby'
 
 
-# TODO(jtattermusch): add php and python once we get them working
+# TODO(jtattermusch): python once we get it working
 _LANGUAGES = {
     'c++' : CXXLanguage(),
     'csharp' : CSharpLanguage(),
+    'java' : JavaLanguage(),
     'node' : NodeLanguage(),
     'php' :  PHPLanguage(),
     'ruby' : RubyLanguage(),
 }
 
-# languages supported as cloud_to_cloud servers 
+# languages supported as cloud_to_cloud servers
 # TODO(jtattermusch): enable other languages as servers as well
-_SERVERS = { 'c++' : 8010, 'node' : 8040, 'csharp': 8070 }
+_SERVERS = ['c++', 'node', 'csharp', 'java']
 
-# TODO(jtattermusch): add empty_stream once C++ start supporting it.
+# TODO(jtattermusch): add empty_stream once C++ starts supporting it.
+# TODO(jtattermusch): add timeout_on_sleeping_server once java starts supporting it.
 # TODO(jtattermusch): add support for auth tests.
 _TEST_CASES = ['large_unary', 'empty_unary', 'ping_pong',
                'client_streaming', 'server_streaming',
-               'cancel_after_begin', 'cancel_after_first_response',
-               'timeout_on_sleeping_server']
+               'cancel_after_begin', 'cancel_after_first_response']
 
 
-def cloud_to_prod_jobspec(language, test_case):
+def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None):
+  """Wraps given cmdline array to create 'docker run' cmdline from it."""
+  docker_cmdline = ['docker', 'run', '-i', '--rm=true']
+
+  # turn environ into -e docker args
+  if environ:
+    for k,v in environ.iteritems():
+      docker_cmdline += ['-e', '%s=%s' % (k,v)]
+
+  # set working directory
+  workdir = '/var/local/git/grpc'
+  if cwd:
+    workdir = os.path.join(workdir, cwd)
+  docker_cmdline += ['-w', workdir]
+
+  docker_cmdline += docker_args + [image] + cmdline
+  return docker_cmdline
+
+
+def bash_login_cmdline(cmdline):
+  """Creates bash -l -c cmdline from args list."""
+  # Use login shell:
+  # * rvm and nvm require it
+  # * makes error messages clearer if executables are missing
+  return ['bash', '-l', '-c', ' '.join(cmdline)]
+
+
+def cloud_to_prod_jobspec(language, test_case, docker_image=None):
   """Creates jobspec for cloud-to-prod interop test"""
-  cmdline = language.cloud_to_prod_args() + ['--test_case=%s' % test_case]
+  cmdline = bash_login_cmdline(language.cloud_to_prod_args() +
+                               ['--test_case=%s' % test_case])
+  cwd = language.client_cwd
+  environ = language.cloud_to_prod_env()
+  if docker_image:
+    cmdline = docker_run_cmdline(cmdline, image=docker_image, cwd=cwd, environ=environ)
+    cwd = None
+    environ = None
+
   test_job = jobset.JobSpec(
           cmdline=cmdline,
-          cwd=language.client_cwd,
+          cwd=cwd,
+          environ=environ,
           shortname="cloud_to_prod:%s:%s" % (language, test_case),
-          environ=language.cloud_to_prod_env(),
-          timeout_seconds=60)
+          timeout_seconds=60,
+          flake_retries=5 if args.allow_flakes else 0,
+          timeout_retries=2 if args.allow_flakes else 0)
   return test_job
 
 
 def cloud_to_cloud_jobspec(language, test_case, server_name, server_host,
-                           server_port):
+                           server_port, docker_image=None):
   """Creates jobspec for cloud-to-cloud interop test"""
-  cmdline = language.cloud_to_cloud_args() + ['--test_case=%s' % test_case,
-     '--server_host=%s' % server_host,
-     '--server_port=%s' % server_port ]
+  cmdline = bash_login_cmdline(language.cloud_to_cloud_args() +
+                               ['--test_case=%s' % test_case,
+                                '--server_host=%s' % server_host,
+                                '--server_port=%s' % server_port ])
+  cwd = language.client_cwd
+  if docker_image:
+    cmdline = docker_run_cmdline(cmdline,
+                                 image=docker_image,
+                                 cwd=cwd,
+                                 docker_args=['--net=host'])
+    cwd = None
   test_job = jobset.JobSpec(
           cmdline=cmdline,
-          cwd=language.client_cwd,
+          cwd=cwd,
           shortname="cloud_to_cloud:%s:%s_server:%s" % (language, server_name,
                                                  test_case),
-          timeout_seconds=60)
+          timeout_seconds=60,
+          flake_retries=5 if args.allow_flakes else 0,
+          timeout_retries=2 if args.allow_flakes else 0)
   return test_job
 
+
+def server_jobspec(language, docker_image):
+  """Create jobspec for running a server"""
+  cidfile = tempfile.mktemp()
+  cmdline = bash_login_cmdline(language.server_args() +
+                               ['--port=%s' % _DEFAULT_SERVER_PORT])
+  docker_cmdline = docker_run_cmdline(cmdline,
+                                      image=docker_image,
+                                      cwd=language.server_cwd,
+                                      docker_args=['-p', str(_DEFAULT_SERVER_PORT),
+                                                   '--cidfile', cidfile])
+  server_job = jobset.JobSpec(
+          cmdline=docker_cmdline,
+          shortname="interop_server:%s" % language,
+          timeout_seconds=30*60)
+  server_job.cidfile = cidfile
+  return server_job
+
+
+def build_interop_image_jobspec(language, tag=None):
+  """Creates jobspec for building interop docker image for a language"""
+  safelang = str(language).replace("+", "x")
+  if not tag:
+    tag = 'grpc_interop_%s:%s' % (safelang, uuid.uuid4())
+  env = {'INTEROP_IMAGE': tag, 'BASE_NAME': 'grpc_interop_%s' % safelang}
+  if not args.travis:
+    env['TTY_FLAG'] = '-t'
+  build_job = jobset.JobSpec(
+          cmdline=['tools/jenkins/build_interop_image.sh'],
+          environ=env,
+          shortname="build_docker_%s" % (language),
+          timeout_seconds=30*60)
+  build_job.tag = tag
+  return build_job
+
+
 argp = argparse.ArgumentParser(description='Run interop tests.')
 argp.add_argument('-l', '--language',
                   choices=['all'] + sorted(_LANGUAGES),
@@ -243,9 +375,14 @@
                   help='Run all the interop tests under docker. That provides ' +
                   'additional isolation and prevents the need to install ' +
                   'language specific prerequisites. Only available on Linux.')
+argp.add_argument('--allow_flakes',
+                  default=False,
+                  action='store_const',
+                  const=True,
+                  help="Allow flaky tests to show as passing (re-runs failed tests up to five times)")
 args = argp.parse_args()
 
-servers = set(s for s in itertools.chain.from_iterable(_SERVERS.iterkeys()
+servers = set(s for s in itertools.chain.from_iterable(_SERVERS
                                                        if x == 'all' else [x]
                                                        for x in args.server))
 
@@ -258,73 +395,98 @@
     print 'copied to the docker environment.'
     time.sleep(5)
 
-  child_argv = [ arg for arg in sys.argv if not arg == '--use_docker' ]
-  run_tests_cmd = ('tools/run_tests/run_interop_tests.py %s' %
-                   " ".join(child_argv[1:]))
-
-  # cmdline args to pass to the container running servers.
-  servers_extra_docker_args = ''
-  server_port_tuples = ''
-  for server in servers:
-    port = _SERVERS[server]
-    servers_extra_docker_args += ' -p %s' % port
-    servers_extra_docker_args += ' -e SERVER_PORT_%s=%s' % (server.replace("+", "x"), port)
-    server_port_tuples += ' %s:%s' % (server, port)
-
-  env = os.environ.copy()
-  env['RUN_TESTS_COMMAND'] = run_tests_cmd
-  env['SERVERS_DOCKER_EXTRA_ARGS'] = servers_extra_docker_args
-  env['SERVER_PORT_TUPLES'] = server_port_tuples
-  if not args.travis:
-    env['TTY_FLAG'] = '-t'  # enables Ctrl-C when not on Jenkins.
-
-  subprocess.check_call(['tools/jenkins/build_docker_and_run_interop_tests.sh'],
-                        shell=True,
-                        env=env)
-  sys.exit(0)
+if not args.use_docker and servers:
+  print "Running interop servers is only supported with --use_docker option enabled."
+  sys.exit(1)
 
 languages = set(_LANGUAGES[l]
                 for l in itertools.chain.from_iterable(
                       _LANGUAGES.iterkeys() if x == 'all' else [x]
                       for x in args.language))
 
-jobs = []
-if args.cloud_to_prod:
-  for language in languages:
-    for test_case in _TEST_CASES:
-      test_job = cloud_to_prod_jobspec(language, test_case)
-      jobs.append(test_job)
+docker_images={}
+if args.use_docker:
+  # languages for which to build docker images
+  languages_to_build = set(_LANGUAGES[k] for k in set([str(l) for l in languages] +
+                                                    [s for s in servers]))
 
-# default servers to "localhost" and the default port
-server_addresses = dict((s, ("localhost", _SERVERS[s])) for s in servers)
+  build_jobs = []
+  for l in languages_to_build:
+    job = build_interop_image_jobspec(l)
+    docker_images[str(l)] = job.tag
+    build_jobs.append(job)
 
-for server in args.override_server:
-  server_name = server[0]
-  (server_host, server_port) = server[1].split(":")
-  server_addresses[server_name] = (server_host, server_port)
+  if build_jobs:
+    jobset.message('START', 'Building interop docker images.', do_newline=True)
+    if jobset.run(build_jobs, newline_on_success=True, maxjobs=args.jobs):
+      jobset.message('SUCCESS', 'All docker images built successfully.', do_newline=True)
+    else:
+      jobset.message('FAILED', 'Failed to build interop docker images.', do_newline=True)
+      for image in docker_images.itervalues():
+        dockerjob.remove_image(image, skip_nonexistent=True)
+      exit(1);
 
-for server_name, server_address in server_addresses.iteritems():
-  (server_host, server_port) = server_address
-  for language in languages:
-    for test_case in _TEST_CASES:
-      test_job = cloud_to_cloud_jobspec(language,
-                                        test_case,
-                                        server_name,
-                                        server_host,
-                                        server_port)
-      jobs.append(test_job)
+# Start interop servers.
+server_jobs={}
+server_addresses={}
+try:
+  for s in servers:
+    lang = str(s)
+    spec = server_jobspec(_LANGUAGES[lang], docker_images.get(lang))
+    job = dockerjob.DockerJob(spec)
+    server_jobs[lang] = job
+    server_addresses[lang] = ('localhost', job.mapped_port(_DEFAULT_SERVER_PORT))
 
-if not jobs:
-  print "No jobs to run."
-  sys.exit(1)
+  jobs = []
+  if args.cloud_to_prod:
+    for language in languages:
+      for test_case in _TEST_CASES:
+        test_job = cloud_to_prod_jobspec(language, test_case,
+                                         docker_image=docker_images.get(str(language)))
+        jobs.append(test_job)
 
-root = ET.Element('testsuites')
-testsuite = ET.SubElement(root, 'testsuite', id='1', package='grpc', name='tests')
+  for server in args.override_server:
+    server_name = server[0]
+    (server_host, server_port) = server[1].split(':')
+    server_addresses[server_name] = (server_host, server_port)
 
-if jobset.run(jobs, newline_on_success=True, maxjobs=args.jobs, xml_report=testsuite):
-  jobset.message('SUCCESS', 'All tests passed', do_newline=True)
-else:
-  jobset.message('FAILED', 'Some tests failed', do_newline=True)
+  for server_name, server_address in server_addresses.iteritems():
+    (server_host, server_port) = server_address
+    for language in languages:
+      for test_case in _TEST_CASES:
+        test_job = cloud_to_cloud_jobspec(language,
+                                          test_case,
+                                          server_name,
+                                          server_host,
+                                          server_port,
+                                          docker_image=docker_images.get(str(language)))
+        jobs.append(test_job)
 
-tree = ET.ElementTree(root)
-tree.write('report.xml', encoding='UTF-8')
\ No newline at end of file
+  if not jobs:
+    print "No jobs to run."
+    for image in docker_images.itervalues():
+      dockerjob.remove_image(image, skip_nonexistent=True)
+    sys.exit(1)
+
+  root = ET.Element('testsuites')
+  testsuite = ET.SubElement(root, 'testsuite', id='1', package='grpc', name='tests')
+
+  if jobset.run(jobs, newline_on_success=True, maxjobs=args.jobs, xml_report=testsuite):
+    jobset.message('SUCCESS', 'All tests passed', do_newline=True)
+  else:
+    jobset.message('FAILED', 'Some tests failed', do_newline=True)
+
+  tree = ET.ElementTree(root)
+  tree.write('report.xml', encoding='UTF-8')
+
+finally:
+  # Check if servers are still running.
+  for server, job in server_jobs.iteritems():
+    if not job.is_running():
+      print 'Server "%s" has exited prematurely.' % server
+
+  dockerjob.finish_jobs([j for j in server_jobs.itervalues()])
+
+  for image in docker_images.itervalues():
+    print 'Removing docker image %s' % image
+    dockerjob.remove_image(image)