blob: b8e7ea17f708b3252514e51174e125343505ed1c [file] [log] [blame]
Nathaniel Manista10da1972016-06-01 22:27:26 +00001# Copyright 2016, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14# * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Nathaniel Manista10da1972016-06-01 22:27:26 +000029"""Service-side implementation of gRPC Python."""
30
31import collections
32import enum
33import logging
34import threading
35import time
36
37import grpc
38from grpc import _common
39from grpc._cython import cygrpc
40from grpc.framework.foundation import callable_util
41
42_SHUTDOWN_TAG = 'shutdown'
43_REQUEST_CALL_TAG = 'request_call'
44
45_RECEIVE_CLOSE_ON_SERVER_TOKEN = 'receive_close_on_server'
46_SEND_INITIAL_METADATA_TOKEN = 'send_initial_metadata'
47_RECEIVE_MESSAGE_TOKEN = 'receive_message'
48_SEND_MESSAGE_TOKEN = 'send_message'
49_SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN = (
50 'send_initial_metadata * send_message')
51_SEND_STATUS_FROM_SERVER_TOKEN = 'send_status_from_server'
52_SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN = (
53 'send_initial_metadata * send_status_from_server')
54
55_OPEN = 'open'
56_CLOSED = 'closed'
57_CANCELLED = 'cancelled'
58
59_EMPTY_FLAGS = 0
Nathaniel Manista10da1972016-06-01 22:27:26 +000060
Ken Payson83decd62016-12-08 12:14:59 -080061_UNEXPECTED_EXIT_SERVER_GRACE = 1.0
Ken Paysoned742852016-06-10 13:19:31 -070062
Nathaniel Manista10da1972016-06-01 22:27:26 +000063
64def _serialized_request(request_event):
Masood Malekghassemicc793702017-01-13 19:20:10 -080065 return request_event.batch_operations[0].received_message.bytes()
Nathaniel Manista10da1972016-06-01 22:27:26 +000066
67
Nathaniel Manistac09a3582016-06-06 14:28:55 +000068def _application_code(code):
Masood Malekghassemicc793702017-01-13 19:20:10 -080069 cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code)
70 return cygrpc.StatusCode.unknown if cygrpc_code is None else cygrpc_code
Nathaniel Manistac09a3582016-06-06 14:28:55 +000071
72
73def _completion_code(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -080074 if state.code is None:
75 return cygrpc.StatusCode.ok
76 else:
77 return _application_code(state.code)
Nathaniel Manistac09a3582016-06-06 14:28:55 +000078
79
80def _abortion_code(state, code):
Masood Malekghassemicc793702017-01-13 19:20:10 -080081 if state.code is None:
82 return code
83 else:
84 return _application_code(state.code)
Nathaniel Manista10da1972016-06-01 22:27:26 +000085
86
87def _details(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -080088 return b'' if state.details is None else state.details
Nathaniel Manista10da1972016-06-01 22:27:26 +000089
90
91class _HandlerCallDetails(
Masood Malekghassemicc793702017-01-13 19:20:10 -080092 collections.namedtuple('_HandlerCallDetails', (
Ken Payson2fa5f2f2017-02-06 10:27:09 -080093 'method', 'invocation_metadata',)), grpc.HandlerCallDetails):
Masood Malekghassemicc793702017-01-13 19:20:10 -080094 pass
Nathaniel Manista10da1972016-06-01 22:27:26 +000095
96
97class _RPCState(object):
98
Masood Malekghassemicc793702017-01-13 19:20:10 -080099 def __init__(self):
100 self.condition = threading.Condition()
101 self.due = set()
102 self.request = None
103 self.client = _OPEN
104 self.initial_metadata_allowed = True
105 self.disable_next_compression = False
106 self.trailing_metadata = None
107 self.code = None
108 self.details = None
109 self.statused = False
110 self.rpc_errors = []
111 self.callbacks = []
Nathaniel Manista10da1972016-06-01 22:27:26 +0000112
113
114def _raise_rpc_error(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800115 rpc_error = grpc.RpcError()
116 state.rpc_errors.append(rpc_error)
117 raise rpc_error
Nathaniel Manista10da1972016-06-01 22:27:26 +0000118
119
120def _possibly_finish_call(state, token):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800121 state.due.remove(token)
122 if (state.client is _CANCELLED or state.statused) and not state.due:
123 callbacks = state.callbacks
124 state.callbacks = None
125 return state, callbacks
126 else:
127 return None, ()
Nathaniel Manista10da1972016-06-01 22:27:26 +0000128
129
130def _send_status_from_server(state, token):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800131
132 def send_status_from_server(unused_send_status_from_server_event):
133 with state.condition:
134 return _possibly_finish_call(state, token)
135
136 return send_status_from_server
Nathaniel Manista10da1972016-06-01 22:27:26 +0000137
138
139def _abort(state, call, code, details):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800140 if state.client is not _CANCELLED:
141 effective_code = _abortion_code(state, code)
142 effective_details = details if state.details is None else state.details
143 if state.initial_metadata_allowed:
Ken Payson2fa5f2f2017-02-06 10:27:09 -0800144 operations = (cygrpc.operation_send_initial_metadata(
Nathaniel Manista90798ab2017-02-22 18:56:55 +0000145 _common.EMPTY_METADATA, _EMPTY_FLAGS),
Ken Payson2fa5f2f2017-02-06 10:27:09 -0800146 cygrpc.operation_send_status_from_server(
147 _common.cygrpc_metadata(state.trailing_metadata),
148 effective_code, effective_details, _EMPTY_FLAGS),)
Masood Malekghassemicc793702017-01-13 19:20:10 -0800149 token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
150 else:
151 operations = (cygrpc.operation_send_status_from_server(
152 _common.cygrpc_metadata(state.trailing_metadata),
153 effective_code, effective_details, _EMPTY_FLAGS),)
154 token = _SEND_STATUS_FROM_SERVER_TOKEN
155 call.start_server_batch(
156 cygrpc.Operations(operations),
157 _send_status_from_server(state, token))
158 state.statused = True
159 state.due.add(token)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000160
161
162def _receive_close_on_server(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800163
164 def receive_close_on_server(receive_close_on_server_event):
165 with state.condition:
166 if receive_close_on_server_event.batch_operations[
167 0].received_cancelled:
168 state.client = _CANCELLED
169 elif state.client is _OPEN:
170 state.client = _CLOSED
171 state.condition.notify_all()
172 return _possibly_finish_call(state, _RECEIVE_CLOSE_ON_SERVER_TOKEN)
173
174 return receive_close_on_server
Nathaniel Manista10da1972016-06-01 22:27:26 +0000175
176
177def _receive_message(state, call, request_deserializer):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800178
179 def receive_message(receive_message_event):
180 serialized_request = _serialized_request(receive_message_event)
181 if serialized_request is None:
182 with state.condition:
183 if state.client is _OPEN:
184 state.client = _CLOSED
185 state.condition.notify_all()
186 return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000187 else:
Masood Malekghassemicc793702017-01-13 19:20:10 -0800188 request = _common.deserialize(serialized_request,
189 request_deserializer)
190 with state.condition:
191 if request is None:
192 _abort(state, call, cygrpc.StatusCode.internal,
193 b'Exception deserializing request!')
194 else:
195 state.request = request
196 state.condition.notify_all()
197 return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)
198
199 return receive_message
Nathaniel Manista10da1972016-06-01 22:27:26 +0000200
201
202def _send_initial_metadata(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800203
204 def send_initial_metadata(unused_send_initial_metadata_event):
205 with state.condition:
206 return _possibly_finish_call(state, _SEND_INITIAL_METADATA_TOKEN)
207
208 return send_initial_metadata
Nathaniel Manista10da1972016-06-01 22:27:26 +0000209
210
211def _send_message(state, token):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800212
213 def send_message(unused_send_message_event):
214 with state.condition:
215 state.condition.notify_all()
216 return _possibly_finish_call(state, token)
217
218 return send_message
Nathaniel Manista10da1972016-06-01 22:27:26 +0000219
220
221class _Context(grpc.ServicerContext):
222
Masood Malekghassemicc793702017-01-13 19:20:10 -0800223 def __init__(self, rpc_event, state, request_deserializer):
224 self._rpc_event = rpc_event
225 self._state = state
226 self._request_deserializer = request_deserializer
Nathaniel Manista10da1972016-06-01 22:27:26 +0000227
Masood Malekghassemicc793702017-01-13 19:20:10 -0800228 def is_active(self):
229 with self._state.condition:
230 return self._state.client is not _CANCELLED and not self._state.statused
Nathaniel Manista10da1972016-06-01 22:27:26 +0000231
Masood Malekghassemicc793702017-01-13 19:20:10 -0800232 def time_remaining(self):
Nathaniel Manista7d561a62017-01-20 21:05:47 +0000233 return max(
234 float(self._rpc_event.request_call_details.deadline) - time.time(),
235 0)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000236
Masood Malekghassemicc793702017-01-13 19:20:10 -0800237 def cancel(self):
238 self._rpc_event.operation_call.cancel()
Nathaniel Manista10da1972016-06-01 22:27:26 +0000239
Masood Malekghassemicc793702017-01-13 19:20:10 -0800240 def add_callback(self, callback):
241 with self._state.condition:
242 if self._state.callbacks is None:
243 return False
244 else:
245 self._state.callbacks.append(callback)
246 return True
Nathaniel Manista10da1972016-06-01 22:27:26 +0000247
Masood Malekghassemicc793702017-01-13 19:20:10 -0800248 def disable_next_message_compression(self):
249 with self._state.condition:
250 self._state.disable_next_compression = True
Nathaniel Manista10da1972016-06-01 22:27:26 +0000251
Masood Malekghassemicc793702017-01-13 19:20:10 -0800252 def invocation_metadata(self):
253 return _common.application_metadata(self._rpc_event.request_metadata)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000254
Masood Malekghassemicc793702017-01-13 19:20:10 -0800255 def peer(self):
256 return _common.decode(self._rpc_event.operation_call.peer())
Nathaniel Manista10da1972016-06-01 22:27:26 +0000257
Masood Malekghassemicc793702017-01-13 19:20:10 -0800258 def send_initial_metadata(self, initial_metadata):
259 with self._state.condition:
260 if self._state.client is _CANCELLED:
261 _raise_rpc_error(self._state)
262 else:
263 if self._state.initial_metadata_allowed:
264 operation = cygrpc.operation_send_initial_metadata(
265 _common.cygrpc_metadata(initial_metadata), _EMPTY_FLAGS)
266 self._rpc_event.operation_call.start_server_batch(
267 cygrpc.Operations((operation,)),
268 _send_initial_metadata(self._state))
269 self._state.initial_metadata_allowed = False
270 self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
271 else:
272 raise ValueError('Initial metadata no longer allowed!')
Nathaniel Manista10da1972016-06-01 22:27:26 +0000273
Masood Malekghassemicc793702017-01-13 19:20:10 -0800274 def set_trailing_metadata(self, trailing_metadata):
275 with self._state.condition:
276 self._state.trailing_metadata = _common.cygrpc_metadata(
277 trailing_metadata)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000278
Masood Malekghassemicc793702017-01-13 19:20:10 -0800279 def set_code(self, code):
280 with self._state.condition:
281 self._state.code = code
Nathaniel Manista10da1972016-06-01 22:27:26 +0000282
Masood Malekghassemicc793702017-01-13 19:20:10 -0800283 def set_details(self, details):
284 with self._state.condition:
285 self._state.details = _common.encode(details)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000286
287
288class _RequestIterator(object):
289
Masood Malekghassemicc793702017-01-13 19:20:10 -0800290 def __init__(self, state, call, request_deserializer):
291 self._state = state
292 self._call = call
293 self._request_deserializer = request_deserializer
Nathaniel Manista10da1972016-06-01 22:27:26 +0000294
Masood Malekghassemicc793702017-01-13 19:20:10 -0800295 def _raise_or_start_receive_message(self):
296 if self._state.client is _CANCELLED:
297 _raise_rpc_error(self._state)
298 elif self._state.client is _CLOSED or self._state.statused:
299 raise StopIteration()
300 else:
301 self._call.start_server_batch(
302 cygrpc.Operations(
303 (cygrpc.operation_receive_message(_EMPTY_FLAGS),)),
304 _receive_message(self._state, self._call,
305 self._request_deserializer))
306 self._state.due.add(_RECEIVE_MESSAGE_TOKEN)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000307
Masood Malekghassemicc793702017-01-13 19:20:10 -0800308 def _look_for_request(self):
309 if self._state.client is _CANCELLED:
310 _raise_rpc_error(self._state)
311 elif (self._state.request is None and
312 _RECEIVE_MESSAGE_TOKEN not in self._state.due):
313 raise StopIteration()
314 else:
315 request = self._state.request
316 self._state.request = None
317 return request
Nathaniel Manista10da1972016-06-01 22:27:26 +0000318
Masood Malekghassemicc793702017-01-13 19:20:10 -0800319 def _next(self):
320 with self._state.condition:
321 self._raise_or_start_receive_message()
322 while True:
323 self._state.condition.wait()
324 request = self._look_for_request()
325 if request is not None:
326 return request
Nathaniel Manista10da1972016-06-01 22:27:26 +0000327
Masood Malekghassemicc793702017-01-13 19:20:10 -0800328 def __iter__(self):
329 return self
Nathaniel Manista10da1972016-06-01 22:27:26 +0000330
Masood Malekghassemicc793702017-01-13 19:20:10 -0800331 def __next__(self):
332 return self._next()
Nathaniel Manista10da1972016-06-01 22:27:26 +0000333
Masood Malekghassemicc793702017-01-13 19:20:10 -0800334 def next(self):
335 return self._next()
Nathaniel Manista10da1972016-06-01 22:27:26 +0000336
337
338def _unary_request(rpc_event, state, request_deserializer):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800339
340 def unary_request():
341 with state.condition:
342 if state.client is _CANCELLED or state.statused:
343 return None
344 else:
345 start_server_batch_result = rpc_event.operation_call.start_server_batch(
346 cygrpc.Operations(
347 (cygrpc.operation_receive_message(_EMPTY_FLAGS),)),
348 _receive_message(state, rpc_event.operation_call,
349 request_deserializer))
350 state.due.add(_RECEIVE_MESSAGE_TOKEN)
351 while True:
352 state.condition.wait()
353 if state.request is None:
354 if state.client is _CLOSED:
355 details = '"{}" requires exactly one request message.'.format(
356 rpc_event.request_call_details.method)
357 _abort(state, rpc_event.operation_call,
358 cygrpc.StatusCode.unimplemented,
359 _common.encode(details))
360 return None
361 elif state.client is _CANCELLED:
362 return None
363 else:
364 request = state.request
365 state.request = None
366 return request
367
368 return unary_request
Nathaniel Manista10da1972016-06-01 22:27:26 +0000369
370
371def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800372 context = _Context(rpc_event, state, request_deserializer)
373 try:
374 return behavior(argument, context), True
375 except Exception as e: # pylint: disable=broad-except
376 with state.condition:
377 if e not in state.rpc_errors:
378 details = 'Exception calling application: {}'.format(e)
379 logging.exception(details)
380 _abort(state, rpc_event.operation_call,
381 cygrpc.StatusCode.unknown, _common.encode(details))
382 return None, False
Nathaniel Manista10da1972016-06-01 22:27:26 +0000383
384
385def _take_response_from_response_iterator(rpc_event, state, response_iterator):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800386 try:
387 return next(response_iterator), True
388 except StopIteration:
389 return None, True
390 except Exception as e: # pylint: disable=broad-except
391 with state.condition:
392 if e not in state.rpc_errors:
393 details = 'Exception iterating responses: {}'.format(e)
394 logging.exception(details)
395 _abort(state, rpc_event.operation_call,
396 cygrpc.StatusCode.unknown, _common.encode(details))
397 return None, False
Nathaniel Manista10da1972016-06-01 22:27:26 +0000398
399
400def _serialize_response(rpc_event, state, response, response_serializer):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800401 serialized_response = _common.serialize(response, response_serializer)
402 if serialized_response is None:
403 with state.condition:
404 _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal,
405 b'Failed to serialize response!')
406 return None
407 else:
408 return serialized_response
Nathaniel Manista10da1972016-06-01 22:27:26 +0000409
410
411def _send_response(rpc_event, state, serialized_response):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800412 with state.condition:
413 if state.client is _CANCELLED or state.statused:
414 return False
415 else:
416 if state.initial_metadata_allowed:
Ken Payson2fa5f2f2017-02-06 10:27:09 -0800417 operations = (cygrpc.operation_send_initial_metadata(
Nathaniel Manista90798ab2017-02-22 18:56:55 +0000418 _common.EMPTY_METADATA, _EMPTY_FLAGS),
Ken Payson2fa5f2f2017-02-06 10:27:09 -0800419 cygrpc.operation_send_message(serialized_response,
420 _EMPTY_FLAGS),)
Masood Malekghassemicc793702017-01-13 19:20:10 -0800421 state.initial_metadata_allowed = False
422 token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
423 else:
424 operations = (cygrpc.operation_send_message(serialized_response,
425 _EMPTY_FLAGS),)
426 token = _SEND_MESSAGE_TOKEN
427 rpc_event.operation_call.start_server_batch(
428 cygrpc.Operations(operations), _send_message(state, token))
429 state.due.add(token)
430 while True:
431 state.condition.wait()
432 if token not in state.due:
433 return state.client is not _CANCELLED and not state.statused
Nathaniel Manista10da1972016-06-01 22:27:26 +0000434
435
436def _status(rpc_event, state, serialized_response):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800437 with state.condition:
438 if state.client is not _CANCELLED:
439 trailing_metadata = _common.cygrpc_metadata(state.trailing_metadata)
440 code = _completion_code(state)
441 details = _details(state)
442 operations = [
443 cygrpc.operation_send_status_from_server(
444 trailing_metadata, code, details, _EMPTY_FLAGS),
445 ]
446 if state.initial_metadata_allowed:
447 operations.append(
Nathaniel Manista90798ab2017-02-22 18:56:55 +0000448 cygrpc.operation_send_initial_metadata(
449 _common.EMPTY_METADATA, _EMPTY_FLAGS))
Masood Malekghassemicc793702017-01-13 19:20:10 -0800450 if serialized_response is not None:
451 operations.append(
452 cygrpc.operation_send_message(serialized_response,
453 _EMPTY_FLAGS))
454 rpc_event.operation_call.start_server_batch(
455 cygrpc.Operations(operations),
456 _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
457 state.statused = True
458 state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000459
460
Masood Malekghassemicc793702017-01-13 19:20:10 -0800461def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
462 request_deserializer, response_serializer):
463 argument = argument_thunk()
464 if argument is not None:
465 response, proceed = _call_behavior(rpc_event, state, behavior, argument,
466 request_deserializer)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000467 if proceed:
Nathaniel Manista10da1972016-06-01 22:27:26 +0000468 serialized_response = _serialize_response(
469 rpc_event, state, response, response_serializer)
470 if serialized_response is not None:
Masood Malekghassemicc793702017-01-13 19:20:10 -0800471 _status(rpc_event, state, serialized_response)
472
473
474def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
475 request_deserializer, response_serializer):
476 argument = argument_thunk()
477 if argument is not None:
478 response_iterator, proceed = _call_behavior(
479 rpc_event, state, behavior, argument, request_deserializer)
480 if proceed:
481 while True:
482 response, proceed = _take_response_from_response_iterator(
483 rpc_event, state, response_iterator)
484 if proceed:
485 if response is None:
486 _status(rpc_event, state, None)
487 break
488 else:
489 serialized_response = _serialize_response(
490 rpc_event, state, response, response_serializer)
491 if serialized_response is not None:
492 proceed = _send_response(rpc_event, state,
493 serialized_response)
494 if not proceed:
495 break
496 else:
497 break
498 else:
499 break
Nathaniel Manista10da1972016-06-01 22:27:26 +0000500
501
502def _handle_unary_unary(rpc_event, state, method_handler, thread_pool):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800503 unary_request = _unary_request(rpc_event, state,
504 method_handler.request_deserializer)
505 thread_pool.submit(_unary_response_in_pool, rpc_event, state,
506 method_handler.unary_unary, unary_request,
507 method_handler.request_deserializer,
508 method_handler.response_serializer)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000509
510
511def _handle_unary_stream(rpc_event, state, method_handler, thread_pool):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800512 unary_request = _unary_request(rpc_event, state,
513 method_handler.request_deserializer)
514 thread_pool.submit(_stream_response_in_pool, rpc_event, state,
515 method_handler.unary_stream, unary_request,
516 method_handler.request_deserializer,
517 method_handler.response_serializer)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000518
519
520def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800521 request_iterator = _RequestIterator(state, rpc_event.operation_call,
522 method_handler.request_deserializer)
523 thread_pool.submit(_unary_response_in_pool, rpc_event, state,
524 method_handler.stream_unary, lambda: request_iterator,
525 method_handler.request_deserializer,
526 method_handler.response_serializer)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000527
528
529def _handle_stream_stream(rpc_event, state, method_handler, thread_pool):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800530 request_iterator = _RequestIterator(state, rpc_event.operation_call,
531 method_handler.request_deserializer)
532 thread_pool.submit(_stream_response_in_pool, rpc_event, state,
533 method_handler.stream_stream, lambda: request_iterator,
534 method_handler.request_deserializer,
535 method_handler.response_serializer)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000536
537
538def _find_method_handler(rpc_event, generic_handlers):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800539 for generic_handler in generic_handlers:
540 method_handler = generic_handler.service(
541 _HandlerCallDetails(
542 _common.decode(rpc_event.request_call_details.method),
543 rpc_event.request_metadata))
544 if method_handler is not None:
545 return method_handler
546 else:
547 return None
Nathaniel Manista10da1972016-06-01 22:27:26 +0000548
549
550def _handle_unrecognized_method(rpc_event):
Nathaniel Manista90798ab2017-02-22 18:56:55 +0000551 operations = (cygrpc.operation_send_initial_metadata(_common.EMPTY_METADATA,
552 _EMPTY_FLAGS),
553 cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
554 cygrpc.operation_send_status_from_server(
555 _common.EMPTY_METADATA, cygrpc.StatusCode.unimplemented,
556 b'Method not found!', _EMPTY_FLAGS),)
Masood Malekghassemicc793702017-01-13 19:20:10 -0800557 rpc_state = _RPCState()
Ken Payson2fa5f2f2017-02-06 10:27:09 -0800558 rpc_event.operation_call.start_server_batch(
559 operations, lambda ignored_event: (rpc_state, (),))
Masood Malekghassemicc793702017-01-13 19:20:10 -0800560 return rpc_state
Nathaniel Manista10da1972016-06-01 22:27:26 +0000561
562
563def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800564 state = _RPCState()
565 with state.condition:
566 rpc_event.operation_call.start_server_batch(
567 cygrpc.Operations(
568 (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),)),
569 _receive_close_on_server(state))
570 state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
571 if method_handler.request_streaming:
572 if method_handler.response_streaming:
573 _handle_stream_stream(rpc_event, state, method_handler,
574 thread_pool)
575 else:
576 _handle_stream_unary(rpc_event, state, method_handler,
577 thread_pool)
578 else:
579 if method_handler.response_streaming:
580 _handle_unary_stream(rpc_event, state, method_handler,
581 thread_pool)
582 else:
583 _handle_unary_unary(rpc_event, state, method_handler,
584 thread_pool)
585 return state
Nathaniel Manista10da1972016-06-01 22:27:26 +0000586
587
588def _handle_call(rpc_event, generic_handlers, thread_pool):
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800589 if not rpc_event.success:
590 return None
Masood Malekghassemicc793702017-01-13 19:20:10 -0800591 if rpc_event.request_call_details.method is not None:
592 method_handler = _find_method_handler(rpc_event, generic_handlers)
593 if method_handler is None:
594 return _handle_unrecognized_method(rpc_event)
595 else:
596 return _handle_with_method_handler(rpc_event, method_handler,
597 thread_pool)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000598 else:
Masood Malekghassemicc793702017-01-13 19:20:10 -0800599 return None
Nathaniel Manista10da1972016-06-01 22:27:26 +0000600
601
602@enum.unique
603class _ServerStage(enum.Enum):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800604 STOPPED = 'stopped'
605 STARTED = 'started'
606 GRACE = 'grace'
Nathaniel Manista10da1972016-06-01 22:27:26 +0000607
608
609class _ServerState(object):
610
Masood Malekghassemicc793702017-01-13 19:20:10 -0800611 def __init__(self, completion_queue, server, generic_handlers, thread_pool):
612 self.lock = threading.Lock()
613 self.completion_queue = completion_queue
614 self.server = server
615 self.generic_handlers = list(generic_handlers)
616 self.thread_pool = thread_pool
617 self.stage = _ServerStage.STOPPED
618 self.shutdown_events = None
Nathaniel Manista10da1972016-06-01 22:27:26 +0000619
Masood Malekghassemicc793702017-01-13 19:20:10 -0800620 # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
621 self.rpc_states = set()
622 self.due = set()
Nathaniel Manista10da1972016-06-01 22:27:26 +0000623
624
625def _add_generic_handlers(state, generic_handlers):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800626 with state.lock:
627 state.generic_handlers.extend(generic_handlers)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000628
629
630def _add_insecure_port(state, address):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800631 with state.lock:
632 return state.server.add_http2_port(address)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000633
634
635def _add_secure_port(state, address, server_credentials):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800636 with state.lock:
637 return state.server.add_http2_port(address,
638 server_credentials._credentials)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000639
640
641def _request_call(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800642 state.server.request_call(state.completion_queue, state.completion_queue,
643 _REQUEST_CALL_TAG)
644 state.due.add(_REQUEST_CALL_TAG)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000645
646
647# TODO(https://github.com/grpc/grpc/issues/6597): delete this function.
648def _stop_serving(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800649 if not state.rpc_states and not state.due:
650 for shutdown_event in state.shutdown_events:
651 shutdown_event.set()
652 state.stage = _ServerStage.STOPPED
653 return True
654 else:
655 return False
Nathaniel Manista10da1972016-06-01 22:27:26 +0000656
657
658def _serve(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800659 while True:
660 event = state.completion_queue.poll()
661 if event.tag is _SHUTDOWN_TAG:
662 with state.lock:
663 state.due.remove(_SHUTDOWN_TAG)
664 if _stop_serving(state):
665 return
666 elif event.tag is _REQUEST_CALL_TAG:
667 with state.lock:
668 state.due.remove(_REQUEST_CALL_TAG)
669 rpc_state = _handle_call(event, state.generic_handlers,
670 state.thread_pool)
671 if rpc_state is not None:
672 state.rpc_states.add(rpc_state)
673 if state.stage is _ServerStage.STARTED:
674 _request_call(state)
675 elif _stop_serving(state):
676 return
677 else:
678 rpc_state, callbacks = event.tag(event)
679 for callback in callbacks:
680 callable_util.call_logging_exceptions(
681 callback, 'Exception calling callback!')
682 if rpc_state is not None:
683 with state.lock:
684 state.rpc_states.remove(rpc_state)
685 if _stop_serving(state):
686 return
Nathaniel Manista10da1972016-06-01 22:27:26 +0000687
688
Ken Payson83decd62016-12-08 12:14:59 -0800689def _stop(state, grace):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800690 with state.lock:
691 if state.stage is _ServerStage.STOPPED:
692 shutdown_event = threading.Event()
693 shutdown_event.set()
694 return shutdown_event
695 else:
696 if state.stage is _ServerStage.STARTED:
697 state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
698 state.stage = _ServerStage.GRACE
699 state.shutdown_events = []
700 state.due.add(_SHUTDOWN_TAG)
701 shutdown_event = threading.Event()
702 state.shutdown_events.append(shutdown_event)
703 if grace is None:
704 state.server.cancel_all_calls()
705 # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
706 for rpc_state in state.rpc_states:
707 with rpc_state.condition:
708 rpc_state.client = _CANCELLED
709 rpc_state.condition.notify_all()
710 else:
711
712 def cancel_all_calls_after_grace():
713 shutdown_event.wait(timeout=grace)
714 with state.lock:
715 state.server.cancel_all_calls()
716 # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
717 for rpc_state in state.rpc_states:
718 with rpc_state.condition:
719 rpc_state.client = _CANCELLED
720 rpc_state.condition.notify_all()
721
722 thread = threading.Thread(target=cancel_all_calls_after_grace)
723 thread.start()
724 return shutdown_event
725 shutdown_event.wait()
726 return shutdown_event
Nathaniel Manista10da1972016-06-01 22:27:26 +0000727
728
Ken Paysoned742852016-06-10 13:19:31 -0700729def _start(state):
Masood Malekghassemicc793702017-01-13 19:20:10 -0800730 with state.lock:
731 if state.stage is not _ServerStage.STOPPED:
732 raise ValueError('Cannot start already-started server!')
733 state.server.start()
734 state.stage = _ServerStage.STARTED
735 _request_call(state)
Ken Paysoned742852016-06-10 13:19:31 -0700736
Masood Malekghassemicc793702017-01-13 19:20:10 -0800737 def cleanup_server(timeout):
738 if timeout is None:
739 _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait()
740 else:
741 _stop(state, timeout).wait()
742
743 thread = _common.CleanupThread(
744 cleanup_server, target=_serve, args=(state,))
745 thread.start()
746
Ken Paysoned742852016-06-10 13:19:31 -0700747
Nathaniel Manista10da1972016-06-01 22:27:26 +0000748class Server(grpc.Server):
749
Masood Malekghassemicc793702017-01-13 19:20:10 -0800750 def __init__(self, thread_pool, generic_handlers, options):
751 completion_queue = cygrpc.CompletionQueue()
752 server = cygrpc.Server(_common.channel_args(options))
753 server.register_completion_queue(completion_queue)
754 self._state = _ServerState(completion_queue, server, generic_handlers,
755 thread_pool)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000756
Masood Malekghassemicc793702017-01-13 19:20:10 -0800757 def add_generic_rpc_handlers(self, generic_rpc_handlers):
758 _add_generic_handlers(self._state, generic_rpc_handlers)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000759
Masood Malekghassemicc793702017-01-13 19:20:10 -0800760 def add_insecure_port(self, address):
761 return _add_insecure_port(self._state, _common.encode(address))
Nathaniel Manista10da1972016-06-01 22:27:26 +0000762
Masood Malekghassemicc793702017-01-13 19:20:10 -0800763 def add_secure_port(self, address, server_credentials):
764 return _add_secure_port(self._state,
765 _common.encode(address), server_credentials)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000766
Masood Malekghassemicc793702017-01-13 19:20:10 -0800767 def start(self):
768 _start(self._state)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000769
Masood Malekghassemicc793702017-01-13 19:20:10 -0800770 def stop(self, grace):
771 return _stop(self._state, grace)
Nathaniel Manista10da1972016-06-01 22:27:26 +0000772
Masood Malekghassemicc793702017-01-13 19:20:10 -0800773 def __del__(self):
774 _stop(self._state, None)