blob: ec43203a25e7f27a6fc53dc493e70dd823a36456 [file] [log] [blame]
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for translating between sync and async control flow."""
import threading
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.face import exceptions
from grpc.framework.face import interfaces
from grpc.framework.foundation import abandonment
from grpc.framework.foundation import stream
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-('
_OPERATION_OUTCOME_TO_RPC_ABORTION = {
base_interfaces.Outcome.CANCELLED: interfaces.Abortion.CANCELLED,
base_interfaces.Outcome.EXPIRED: interfaces.Abortion.EXPIRED,
base_interfaces.Outcome.RECEPTION_FAILURE:
interfaces.Abortion.NETWORK_FAILURE,
base_interfaces.Outcome.TRANSMISSION_FAILURE:
interfaces.Abortion.NETWORK_FAILURE,
base_interfaces.Outcome.SERVICED_FAILURE:
interfaces.Abortion.SERVICED_FAILURE,
base_interfaces.Outcome.SERVICER_FAILURE:
interfaces.Abortion.SERVICER_FAILURE,
}
def _as_operation_termination_callback(rpc_abortion_callback):
def operation_termination_callback(operation_outcome):
rpc_abortion = _OPERATION_OUTCOME_TO_RPC_ABORTION.get(
operation_outcome, None)
if rpc_abortion is not None:
rpc_abortion_callback(rpc_abortion)
return operation_termination_callback
def _abortion_outcome_to_exception(abortion_outcome):
if abortion_outcome == base_interfaces.Outcome.CANCELLED:
return exceptions.CancellationError()
elif abortion_outcome == base_interfaces.Outcome.EXPIRED:
return exceptions.ExpirationError()
elif abortion_outcome == base_interfaces.Outcome.SERVICER_FAILURE:
return exceptions.ServicerError()
elif abortion_outcome == base_interfaces.Outcome.SERVICED_FAILURE:
return exceptions.ServicedError()
else:
return exceptions.NetworkError()
class UnaryConsumer(stream.Consumer):
"""A stream.Consumer that should only ever be passed one value."""
def __init__(self, on_termination):
self._on_termination = on_termination
self._value = None
def consume(self, value):
self._value = value
def terminate(self):
self._on_termination(self._value)
def consume_and_terminate(self, value):
self._on_termination(value)
class Rendezvous(stream.Consumer):
"""A rendez-vous with stream.Consumer and iterator interfaces."""
def __init__(self):
self._condition = threading.Condition()
self._values = []
self._values_completed = False
self._abortion = None
def consume(self, value):
with self._condition:
self._values.append(value)
self._condition.notify()
def terminate(self):
with self._condition:
self._values_completed = True
self._condition.notify()
def consume_and_terminate(self, value):
with self._condition:
self._values.append(value)
self._values_completed = True
self._condition.notify()
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
with self._condition:
while ((self._abortion is None) and
(not self._values) and
(not self._values_completed)):
self._condition.wait()
if self._abortion is not None:
raise _abortion_outcome_to_exception(self._abortion)
elif self._values:
return self._values.pop(0)
elif self._values_completed:
raise StopIteration()
else:
raise AssertionError('Unreachable code reached!')
def set_outcome(self, outcome):
with self._condition:
if outcome is not base_interfaces.Outcome.COMPLETED:
self._abortion = outcome
self._condition.notify()
class RpcContext(interfaces.RpcContext):
"""A wrapped base_interfaces.OperationContext."""
def __init__(self, operation_context):
self._operation_context = operation_context
def is_active(self):
return self._operation_context.is_active()
def time_remaining(self):
return self._operation_context.time_remaining()
def add_abortion_callback(self, abortion_callback):
self._operation_context.add_termination_callback(
_as_operation_termination_callback(abortion_callback))
def pipe_iterator_to_consumer(iterator, consumer, active, terminate):
"""Pipes values emitted from an iterator to a stream.Consumer.
Args:
iterator: An iterator from which values will be emitted.
consumer: A stream.Consumer to which values will be passed.
active: A no-argument callable that returns True if the work being done by
this function is still valid and should not be abandoned and False if the
work being done by this function should be abandoned.
terminate: A boolean indicating whether or not this function should
terminate the given consumer after passing to it all values emitted by the
given iterator.
Raises:
abandonment.Abandoned: If this function quits early after seeing False
returned by the active function passed to it.
Exception: This function raises whatever exceptions are raised by iterating
over the given iterator.
"""
for element in iterator:
if not active():
raise abandonment.Abandoned()
consumer.consume(element)
if not active():
raise abandonment.Abandoned()
if terminate:
consumer.terminate()
def abortion_outcome_to_exception(abortion_outcome):
return _abortion_outcome_to_exception(abortion_outcome)
def as_operation_termination_callback(rpc_abortion_callback):
return _as_operation_termination_callback(rpc_abortion_callback)