Merge pull request #7260 from kpayson64/ga_performance_tests

Migrated python performance tests to use GA API
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py
index 0802814..83b46c9 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_client.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py
@@ -37,16 +37,23 @@
 from six.moves import queue
 
 import grpc
-from grpc.beta import implementations
-from grpc.framework.interfaces.face import face
 from src.proto.grpc.testing import messages_pb2
 from src.proto.grpc.testing import services_pb2
 from tests.unit import resources
-from tests.unit.beta import test_utilities
+from tests.unit import test_common
 
 _TIMEOUT = 60 * 60 * 24
 
 
+class GenericStub(object):
+
+  def __init__(self, channel):
+    self.UnaryCall = channel.unary_unary(
+        '/grpc.testing.BenchmarkService/UnaryCall')
+    self.StreamingCall = channel.stream_stream(
+        '/grpc.testing.BenchmarkService/StreamingCall')
+
+
 class BenchmarkClient:
   """Benchmark client interface that exposes a non-blocking send_request()."""
 
@@ -54,15 +61,12 @@
 
   def __init__(self, server, config, hist):
     # Create the stub
-    host, port = server.split(':')
-    port = int(port)
     if config.HasField('security_params'):
-      creds = implementations.ssl_channel_credentials(
-          resources.test_root_certificates())
-      channel = test_utilities.not_really_secure_channel(
-          host, port, creds, config.security_params.server_host_override)
+      creds = grpc.ssl_channel_credentials(resources.test_root_certificates())
+      channel = test_common.test_secure_channel(
+        server, creds, config.security_params.server_host_override)
     else:
-      channel = implementations.insecure_channel(host, port)
+      channel = grpc.insecure_channel(server)
 
     connected_event = threading.Event()
     def wait_for_ready(connectivity):
@@ -73,7 +77,7 @@
 
     if config.payload_config.WhichOneof('payload') == 'simple_params':
       self._generic = False
-      self._stub = services_pb2.beta_create_BenchmarkService_stub(channel)
+      self._stub = services_pb2.BenchmarkServiceStub(channel)
       payload = messages_pb2.Payload(
           body='\0' * config.payload_config.simple_params.req_size)
       self._request = messages_pb2.SimpleRequest(
@@ -81,7 +85,7 @@
           response_size=config.payload_config.simple_params.resp_size)
     else:
       self._generic = True
-      self._stub = implementations.generic_stub(channel)
+      self._stub = GenericStub(channel)
       self._request = '\0' * config.payload_config.bytebuf_params.req_size
 
     self._hist = hist
@@ -166,13 +170,8 @@
 
   def start(self):
     self._is_streaming = True
-    if self._generic:
-      stream_callable = self._stub.stream_stream(
-          'grpc.testing.BenchmarkService', 'StreamingCall')
-    else:
-      stream_callable = self._stub.StreamingCall
-
-    response_stream = stream_callable(self._request_generator(), _TIMEOUT)
+    response_stream = self._stub.StreamingCall(
+        self._request_generator(), _TIMEOUT)
     for _ in response_stream:
       self._handle_response(
           self, time.time() - self._send_time_queue.get_nowait())
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py
index 8cbf480..2b76b81 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_server.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py
@@ -31,7 +31,7 @@
 from src.proto.grpc.testing import services_pb2
 
 
-class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
+class BenchmarkServer(services_pb2.BenchmarkServiceServicer):
   """Synchronous Server implementation for the Benchmark service."""
 
   def UnaryCall(self, request, context):
@@ -44,7 +44,7 @@
       yield messages_pb2.SimpleResponse(payload=payload)
 
 
-class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
+class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer):
   """Generic Server implementation for the Benchmark service."""
 
   def __init__(self, resp_size):
diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py
index 1692637..3abf0d0 100644
--- a/src/python/grpcio_tests/tests/qps/qps_worker.py
+++ b/src/python/grpcio_tests/tests/qps/qps_worker.py
@@ -32,18 +32,21 @@
 import argparse
 import time
 
+from concurrent import futures
+import grpc
 from src.proto.grpc.testing import services_pb2
 
 from tests.qps import worker_server
 
 
 def run_worker_server(port):
+  server = grpc.server((), futures.ThreadPoolExecutor(max_workers=5))
   servicer = worker_server.WorkerServer()
-  server = services_pb2.beta_create_WorkerService_server(servicer)
+  services_pb2.add_WorkerServiceServicer_to_server(servicer, server)
   server.add_insecure_port('[::]:{}'.format(port))
   server.start()
   servicer.wait_for_quit()
-  server.stop(2)
+  server.stop(0)
 
 
 if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py
index d41f837..932a1ff 100644
--- a/src/python/grpcio_tests/tests/qps/worker_server.py
+++ b/src/python/grpcio_tests/tests/qps/worker_server.py
@@ -32,8 +32,8 @@
 import threading
 import time
 
-from grpc.beta import implementations
-from grpc.framework.interfaces.face import utilities
+from concurrent import futures
+import grpc
 from src.proto.grpc.testing import control_pb2
 from src.proto.grpc.testing import services_pb2
 from src.proto.grpc.testing import stats_pb2
@@ -45,7 +45,7 @@
 from tests.unit import resources
 
 
-class WorkerServer(services_pb2.BetaWorkerServiceServicer):
+class WorkerServer(services_pb2.WorkerServiceServicer):
   """Python Worker Server implementation."""
 
   def __init__(self):
@@ -65,7 +65,7 @@
       if request.mark.reset:
         start_time = end_time
       yield status
-    server.stop(0)
+    server.stop(None)
 
   def _get_server_status(self, start_time, end_time, port, cores):
     end_time = time.time()
@@ -76,25 +76,35 @@
     return control_pb2.ServerStatus(stats=stats, port=port, cores=cores)
 
   def _create_server(self, config):
-    if config.server_type == control_pb2.SYNC_SERVER:
+    if config.async_server_threads == 0:
+      # This is the default concurrent.futures thread pool size, but
+      # None doesn't seem to work
+      server_threads = multiprocessing.cpu_count() * 5
+    else:
+      server_threads = config.async_server_threads
+    server = grpc.server((), futures.ThreadPoolExecutor(
+        max_workers=server_threads))
+    if config.server_type == control_pb2.ASYNC_SERVER:
       servicer = benchmark_server.BenchmarkServer()
-      server = services_pb2.beta_create_BenchmarkService_server(servicer)
+      services_pb2.add_BenchmarkServiceServicer_to_server(servicer, server)
     elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
       resp_size = config.payload_config.bytebuf_params.resp_size
       servicer = benchmark_server.GenericBenchmarkServer(resp_size)
       method_implementations = {
-          ('grpc.testing.BenchmarkService', 'StreamingCall'):
-          utilities.stream_stream_inline(servicer.StreamingCall),
-          ('grpc.testing.BenchmarkService', 'UnaryCall'):
-          utilities.unary_unary_inline(servicer.UnaryCall),
+          'StreamingCall':
+          grpc.stream_stream_rpc_method_handler(servicer.StreamingCall),
+          'UnaryCall':
+          grpc.unary_unary_rpc_method_handler(servicer.UnaryCall),
       }
-      server = implementations.server(method_implementations)
+      handler = grpc.method_handlers_generic_handler(
+          'grpc.testing.BenchmarkService', method_implementations)
+      server.add_generic_rpc_handlers((handler,))
     else:
       raise Exception('Unsupported server type {}'.format(config.server_type))
 
     if config.HasField('security_params'):  # Use SSL
-      server_creds = implementations.ssl_server_credentials([(
-          resources.private_key(), resources.certificate_chain())])
+      server_creds = grpc.ssl_server_credentials(
+          ((resources.private_key(), resources.certificate_chain()),))
       port = server.add_secure_port('[::]:{}'.format(config.port), server_creds)
     else:
       port = server.add_insecure_port('[::]:{}'.format(config.port))
diff --git a/src/python/grpcio_tests/tests/unit/test_common.py b/src/python/grpcio_tests/tests/unit/test_common.py
index c8886bf..cd71bd8 100644
--- a/src/python/grpcio_tests/tests/unit/test_common.py
+++ b/src/python/grpcio_tests/tests/unit/test_common.py
@@ -31,6 +31,7 @@
 
 import collections
 
+import grpc
 import six
 
 INVOCATION_INITIAL_METADATA = (('0', 'abc'), ('1', 'def'), ('2', 'ghi'),)
@@ -78,3 +79,24 @@
       return False
   else:
     return True
+
+
+def test_secure_channel(
+    target, channel_credentials, server_host_override):
+  """Creates an insecure Channel to a remote host.
+
+  Args:
+    host: The name of the remote host to which to connect.
+    port: The port of the remote host to which to connect.
+    channel_credentials: The implementations.ChannelCredentials with which to
+      connect.
+    server_host_override: The target name used for SSL host name checking.
+
+  Returns:
+    An implementations.Channel to the remote host through which RPCs may be
+      conducted.
+  """
+  channel = grpc.secure_channel(
+      target, channel_credentials,
+      (('grpc.ssl_target_name_override', server_host_override,),))
+  return channel
diff --git a/tools/run_tests/performance/run_worker_python.sh b/tools/run_tests/performance/run_worker_python.sh
index 3b8ba6f..06cf172 100755
--- a/tools/run_tests/performance/run_worker_python.sh
+++ b/tools/run_tests/performance/run_worker_python.sh
@@ -32,4 +32,4 @@
 
 cd $(dirname $0)/../../..
 
-PYTHONPATH=src/python/grpcio_tests:src/python/grpcio:src/python/gens py27/bin/python src/python/grpcio_tests/tests/qps/qps_worker.py $@
+PYTHONPATH=src/python/grpcio_tests:src/python/gens py27/bin/python src/python/grpcio_tests/tests/qps/qps_worker.py $@
diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py
index 2d5130e..4dfd01f 100644
--- a/tools/run_tests/performance/scenario_config.py
+++ b/tools/run_tests/performance/scenario_config.py
@@ -387,45 +387,44 @@
     return 500
 
   def scenarios(self):
-    # TODO(issue #6522): Empty streaming requests does not work for python
-    #yield _ping_pong_scenario(
-    #    'python_generic_async_streaming_ping_pong', rpc_type='STREAMING',
-    #    client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
-    #    use_generic_payload=True,
-    #    categories=[SMOKETEST])
+    yield _ping_pong_scenario(
+        'python_generic_sync_streaming_ping_pong', rpc_type='STREAMING',
+        client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
+        use_generic_payload=True,
+        categories=[SMOKETEST])
 
     yield _ping_pong_scenario(
         'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
-        client_type='SYNC_CLIENT', server_type='SYNC_SERVER')
+        client_type='SYNC_CLIENT', server_type='ASYNC_SERVER')
 
     yield _ping_pong_scenario(
         'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
-        client_type='ASYNC_CLIENT', server_type='SYNC_SERVER')
+        client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
 
     yield _ping_pong_scenario(
         'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
-        client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+        client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
         categories=[SMOKETEST])
 
     yield _ping_pong_scenario(
         'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
-        client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+        client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
         unconstrained_client='sync')
 
     yield _ping_pong_scenario(
         'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
-        client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+        client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
         unconstrained_client='sync')
 
     yield _ping_pong_scenario(
         'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
-        client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+        client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
         server_language='c++', server_core_limit=1, async_server_threads=1,
         categories=[SMOKETEST])
 
     yield _ping_pong_scenario(
         'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
-        client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+        client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
         server_language='c++', server_core_limit=1, async_server_threads=1)
 
   def __str__(self):