| # Copyright 2015, Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """Internal utilities for gRPC Python.""" |
| |
| import collections |
| import threading |
| import time |
| |
| import six |
| |
| import grpc |
| from grpc import _common |
| from grpc.framework.foundation import callable_util |
| |
| _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( |
| 'Exception calling connectivity future "done" callback!') |
| |
| |
| class RpcMethodHandler( |
| collections.namedtuple( |
| '_RpcMethodHandler', |
| ('request_streaming', 'response_streaming', 'request_deserializer', |
| 'response_serializer', 'unary_unary', 'unary_stream', 'stream_unary', |
| 'stream_stream',)), |
| grpc.RpcMethodHandler): |
| pass |
| |
| |
| class DictionaryGenericHandler(grpc.ServiceRpcHandler): |
| |
| def __init__(self, service, method_handlers): |
| self._name = service |
| self._method_handlers = { |
| _common.fully_qualified_method(service, method): method_handler |
| for method, method_handler in six.iteritems(method_handlers)} |
| |
| def service_name(self): |
| return self._name |
| |
| def service(self, handler_call_details): |
| return self._method_handlers.get(handler_call_details.method) |
| |
| |
| class _ChannelReadyFuture(grpc.Future): |
| |
| def __init__(self, channel): |
| self._condition = threading.Condition() |
| self._channel = channel |
| |
| self._matured = False |
| self._cancelled = False |
| self._done_callbacks = [] |
| |
| def _block(self, timeout): |
| until = None if timeout is None else time.time() + timeout |
| with self._condition: |
| while True: |
| if self._cancelled: |
| raise grpc.FutureCancelledError() |
| elif self._matured: |
| return |
| else: |
| if until is None: |
| self._condition.wait() |
| else: |
| remaining = until - time.time() |
| if remaining < 0: |
| raise grpc.FutureTimeoutError() |
| else: |
| self._condition.wait(timeout=remaining) |
| |
| def _update(self, connectivity): |
| with self._condition: |
| if (not self._cancelled and |
| connectivity is grpc.ChannelConnectivity.READY): |
| self._matured = True |
| self._channel.unsubscribe(self._update) |
| self._condition.notify_all() |
| done_callbacks = tuple(self._done_callbacks) |
| self._done_callbacks = None |
| else: |
| return |
| |
| for done_callback in done_callbacks: |
| callable_util.call_logging_exceptions( |
| done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) |
| |
| def cancel(self): |
| with self._condition: |
| if not self._matured: |
| self._cancelled = True |
| self._channel.unsubscribe(self._update) |
| self._condition.notify_all() |
| done_callbacks = tuple(self._done_callbacks) |
| self._done_callbacks = None |
| else: |
| return False |
| |
| for done_callback in done_callbacks: |
| callable_util.call_logging_exceptions( |
| done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) |
| |
| def cancelled(self): |
| with self._condition: |
| return self._cancelled |
| |
| def running(self): |
| with self._condition: |
| return not self._cancelled and not self._matured |
| |
| def done(self): |
| with self._condition: |
| return self._cancelled or self._matured |
| |
| def result(self, timeout=None): |
| self._block(timeout) |
| return None |
| |
| def exception(self, timeout=None): |
| self._block(timeout) |
| return None |
| |
| def traceback(self, timeout=None): |
| self._block(timeout) |
| return None |
| |
| def add_done_callback(self, fn): |
| with self._condition: |
| if not self._cancelled and not self._matured: |
| self._done_callbacks.append(fn) |
| return |
| |
| fn(self) |
| |
| def start(self): |
| with self._condition: |
| self._channel.subscribe(self._update, try_to_connect=True) |
| |
| def __del__(self): |
| with self._condition: |
| if not self._cancelled and not self._matured: |
| self._channel.unsubscribe(self._update) |
| |
| |
| def channel_ready_future(channel): |
| ready_future = _ChannelReadyFuture(channel) |
| ready_future.start() |
| return ready_future |