blob: 83b46c914e7f89d8d8966c29d573d69b6f310413 [file] [log] [blame]
Ken Payson0482c102016-04-19 12:08:34 -07001# Copyright 2016, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14# * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30"""Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
31
32import abc
Ken Paysonbef286f2016-06-16 16:39:04 -070033import threading
Ken Payson0482c102016-04-19 12:08:34 -070034import time
Ken Payson0482c102016-04-19 12:08:34 -070035
36from concurrent import futures
Ken Payson1efb6012016-06-08 13:06:44 -070037from six.moves import queue
Ken Payson0482c102016-04-19 12:08:34 -070038
Ken Paysonbef286f2016-06-16 16:39:04 -070039import grpc
Ken Payson0482c102016-04-19 12:08:34 -070040from src.proto.grpc.testing import messages_pb2
41from src.proto.grpc.testing import services_pb2
42from tests.unit import resources
Ken Payson45c0f2b2016-07-06 19:26:09 -070043from tests.unit import test_common
Ken Payson0482c102016-04-19 12:08:34 -070044
45_TIMEOUT = 60 * 60 * 24
46
47
Ken Payson45c0f2b2016-07-06 19:26:09 -070048class GenericStub(object):
49
50 def __init__(self, channel):
51 self.UnaryCall = channel.unary_unary(
52 '/grpc.testing.BenchmarkService/UnaryCall')
53 self.StreamingCall = channel.stream_stream(
54 '/grpc.testing.BenchmarkService/StreamingCall')
55
56
Ken Payson0482c102016-04-19 12:08:34 -070057class BenchmarkClient:
58 """Benchmark client interface that exposes a non-blocking send_request()."""
59
60 __metaclass__ = abc.ABCMeta
61
62 def __init__(self, server, config, hist):
63 # Create the stub
Ken Payson0482c102016-04-19 12:08:34 -070064 if config.HasField('security_params'):
Ken Payson45c0f2b2016-07-06 19:26:09 -070065 creds = grpc.ssl_channel_credentials(resources.test_root_certificates())
66 channel = test_common.test_secure_channel(
67 server, creds, config.security_params.server_host_override)
Ken Payson0482c102016-04-19 12:08:34 -070068 else:
Ken Payson45c0f2b2016-07-06 19:26:09 -070069 channel = grpc.insecure_channel(server)
Ken Payson0482c102016-04-19 12:08:34 -070070
Ken Paysonbef286f2016-06-16 16:39:04 -070071 connected_event = threading.Event()
72 def wait_for_ready(connectivity):
73 if connectivity == grpc.ChannelConnectivity.READY:
74 connected_event.set()
75 channel.subscribe(wait_for_ready, try_to_connect=True)
76 connected_event.wait()
77
Ken Payson0482c102016-04-19 12:08:34 -070078 if config.payload_config.WhichOneof('payload') == 'simple_params':
79 self._generic = False
Ken Payson45c0f2b2016-07-06 19:26:09 -070080 self._stub = services_pb2.BenchmarkServiceStub(channel)
Ken Payson0482c102016-04-19 12:08:34 -070081 payload = messages_pb2.Payload(
82 body='\0' * config.payload_config.simple_params.req_size)
83 self._request = messages_pb2.SimpleRequest(
84 payload=payload,
85 response_size=config.payload_config.simple_params.resp_size)
86 else:
87 self._generic = True
Ken Payson45c0f2b2016-07-06 19:26:09 -070088 self._stub = GenericStub(channel)
Ken Payson0482c102016-04-19 12:08:34 -070089 self._request = '\0' * config.payload_config.bytebuf_params.req_size
90
91 self._hist = hist
92 self._response_callbacks = []
93
94 def add_response_callback(self, callback):
Ken Payson9a36e6c2016-06-07 17:49:03 -070095 """callback will be invoked as callback(client, query_time)"""
Ken Payson0482c102016-04-19 12:08:34 -070096 self._response_callbacks.append(callback)
97
98 @abc.abstractmethod
99 def send_request(self):
100 """Non-blocking wrapper for a client's request operation."""
101 raise NotImplementedError()
102
103 def start(self):
104 pass
105
106 def stop(self):
107 pass
108
Ken Payson9a36e6c2016-06-07 17:49:03 -0700109 def _handle_response(self, client, query_time):
Ken Payson0482c102016-04-19 12:08:34 -0700110 self._hist.add(query_time * 1e9) # Report times in nanoseconds
111 for callback in self._response_callbacks:
Ken Payson9a36e6c2016-06-07 17:49:03 -0700112 callback(client, query_time)
Ken Payson0482c102016-04-19 12:08:34 -0700113
114
115class UnarySyncBenchmarkClient(BenchmarkClient):
116
117 def __init__(self, server, config, hist):
118 super(UnarySyncBenchmarkClient, self).__init__(server, config, hist)
119 self._pool = futures.ThreadPoolExecutor(
120 max_workers=config.outstanding_rpcs_per_channel)
121
122 def send_request(self):
123 # Send requests in seperate threads to support multiple outstanding rpcs
124 # (See src/proto/grpc/testing/control.proto)
125 self._pool.submit(self._dispatch_request)
126
127 def stop(self):
128 self._pool.shutdown(wait=True)
129 self._stub = None
130
131 def _dispatch_request(self):
132 start_time = time.time()
133 self._stub.UnaryCall(self._request, _TIMEOUT)
134 end_time = time.time()
Ken Payson9a36e6c2016-06-07 17:49:03 -0700135 self._handle_response(self, end_time - start_time)
Ken Payson0482c102016-04-19 12:08:34 -0700136
137
138class UnaryAsyncBenchmarkClient(BenchmarkClient):
139
140 def send_request(self):
141 # Use the Future callback api to support multiple outstanding rpcs
142 start_time = time.time()
143 response_future = self._stub.UnaryCall.future(self._request, _TIMEOUT)
144 response_future.add_done_callback(
145 lambda resp: self._response_received(start_time, resp))
146
147 def _response_received(self, start_time, resp):
148 resp.result()
149 end_time = time.time()
Ken Payson9a36e6c2016-06-07 17:49:03 -0700150 self._handle_response(self, end_time - start_time)
Ken Payson0482c102016-04-19 12:08:34 -0700151
152 def stop(self):
153 self._stub = None
154
155
Ken Payson9a36e6c2016-06-07 17:49:03 -0700156class _SyncStream(object):
Ken Payson0482c102016-04-19 12:08:34 -0700157
Ken Payson9a36e6c2016-06-07 17:49:03 -0700158 def __init__(self, stub, generic, request, handle_response):
159 self._stub = stub
160 self._generic = generic
161 self._request = request
162 self._handle_response = handle_response
Ken Payson0482c102016-04-19 12:08:34 -0700163 self._is_streaming = False
Ken Payson0482c102016-04-19 12:08:34 -0700164 self._request_queue = queue.Queue()
165 self._send_time_queue = queue.Queue()
166
167 def send_request(self):
168 self._send_time_queue.put(time.time())
169 self._request_queue.put(self._request)
170
171 def start(self):
172 self._is_streaming = True
Ken Payson45c0f2b2016-07-06 19:26:09 -0700173 response_stream = self._stub.StreamingCall(
174 self._request_generator(), _TIMEOUT)
Ken Payson0482c102016-04-19 12:08:34 -0700175 for _ in response_stream:
Ken Payson9a36e6c2016-06-07 17:49:03 -0700176 self._handle_response(
177 self, time.time() - self._send_time_queue.get_nowait())
178
179 def stop(self):
180 self._is_streaming = False
Ken Payson0482c102016-04-19 12:08:34 -0700181
182 def _request_generator(self):
183 while self._is_streaming:
184 try:
185 request = self._request_queue.get(block=True, timeout=1.0)
186 yield request
187 except queue.Empty:
188 pass
Ken Payson9a36e6c2016-06-07 17:49:03 -0700189
190
191class StreamingSyncBenchmarkClient(BenchmarkClient):
192
193 def __init__(self, server, config, hist):
194 super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
195 self._pool = futures.ThreadPoolExecutor(
196 max_workers=config.outstanding_rpcs_per_channel)
197 self._streams = [_SyncStream(self._stub, self._generic,
198 self._request, self._handle_response)
199 for _ in xrange(config.outstanding_rpcs_per_channel)]
200 self._curr_stream = 0
201
202 def send_request(self):
203 # Use a round_robin scheduler to determine what stream to send on
204 self._streams[self._curr_stream].send_request()
205 self._curr_stream = (self._curr_stream + 1) % len(self._streams)
206
207 def start(self):
208 for stream in self._streams:
209 self._pool.submit(stream.start)
210
211 def stop(self):
212 for stream in self._streams:
213 stream.stop()
214 self._pool.shutdown(wait=True)
215 self._stub = None