blob: c27fc9106d51c69773de77108b21c5b5b42983b6 [file] [log] [blame]
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for translating between sync and async control flow."""
import collections
import enum
import sys
import threading
import time
from grpc.framework.foundation import abandonment
from grpc.framework.foundation import callable_util
from grpc.framework.foundation import future
from grpc.framework.foundation import stream
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.base import utilities
from grpc.framework.interfaces.face import face
_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
_INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Crust) Internal Error! )-:'
_CANNOT_SET_INITIAL_METADATA = (
'Could not set initial metadata - has it already been set, or has a ' +
'payload already been sent?')
_CANNOT_SET_TERMINAL_METADATA = (
'Could not set terminal metadata - has it already been set, or has RPC ' +
'completion already been indicated?')
_CANNOT_SET_CODE = (
'Could not set code - has it already been set, or has RPC completion ' +
'already been indicated?')
_CANNOT_SET_DETAILS = (
'Could not set details - has it already been set, or has RPC completion ' +
'already been indicated?')
class _DummyOperator(base.Operator):
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
pass
_DUMMY_OPERATOR = _DummyOperator()
class _Awaited(
collections.namedtuple('_Awaited', ('kind', 'value',))):
@enum.unique
class Kind(enum.Enum):
NOT_YET_ARRIVED = 'not yet arrived'
ARRIVED = 'arrived'
_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)
class _Transitory(
collections.namedtuple('_Transitory', ('kind', 'value',))):
@enum.unique
class Kind(enum.Enum):
NOT_YET_SEEN = 'not yet seen'
PRESENT = 'present'
GONE = 'gone'
_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
_GONE = _Transitory(_Transitory.Kind.GONE, None)
class _Termination(
collections.namedtuple(
'_Termination', ('terminated', 'abortion', 'abortion_error',))):
"""Values indicating whether and how an RPC has terminated.
Attributes:
terminated: A boolean indicating whether or not the RPC has terminated.
abortion: A face.Abortion value describing the RPC's abortion or None if the
RPC did not abort.
abortion_error: A face.AbortionError describing the RPC's abortion or None
if the RPC did not abort.
"""
_NOT_TERMINATED = _Termination(False, None, None)
_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = {
base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination(
True, None, None),
base.Outcome.Kind.CANCELLED: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.CANCELLED, *args),
face.CancellationError(*args)),
base.Outcome.Kind.EXPIRED: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.EXPIRED, *args),
face.ExpirationError(*args)),
base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args),
face.LocalShutdownError(*args)),
base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args),
face.RemoteShutdownError(*args)),
base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
face.NetworkError(*args)),
base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
face.NetworkError(*args)),
base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args),
face.LocalError(*args)),
base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args),
face.RemoteError(*args)),
}
def _wait_once_until(condition, until):
if until is None:
condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
raise future.TimeoutError()
else:
condition.wait(timeout=remaining)
def _done_callback_as_operation_termination_callback(
done_callback, rendezvous):
def operation_termination_callback(operation_outcome):
rendezvous.set_outcome(operation_outcome)
done_callback(rendezvous)
return operation_termination_callback
def _abortion_callback_as_operation_termination_callback(
rpc_abortion_callback, rendezvous_set_outcome):
def operation_termination_callback(operation_outcome):
termination = rendezvous_set_outcome(operation_outcome)
if termination.abortion is not None:
rpc_abortion_callback(termination.abortion)
return operation_termination_callback
class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
"""A rendez-vous for the threads of an operation.
Instances of this object present iterator and stream.Consumer interfaces for
interacting with application code and present a base.Operator interface and
maintain a base.Operator internally for interacting with base interface code.
"""
def __init__(self, operator, operation_context):
self._condition = threading.Condition()
self._operator = operator
self._operation_context = operation_context
self._protocol_context = _NOT_YET_ARRIVED
self._up_initial_metadata = _NOT_YET_ARRIVED
self._up_payload = None
self._up_allowance = 1
self._up_completion = _NOT_YET_ARRIVED
self._down_initial_metadata = _NOT_YET_SEEN
self._down_payload = None
self._down_allowance = 1
self._down_terminal_metadata = _NOT_YET_SEEN
self._down_code = _NOT_YET_SEEN
self._down_details = _NOT_YET_SEEN
self._termination = _NOT_TERMINATED
# The semantics of future.Future.cancel and future.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
# result of the RPC. This field tracks whether cancellation was requested
# prior to termination of the RPC
self._cancelled = False
def set_operator_and_context(self, operator, operation_context):
with self._condition:
self._operator = operator
self._operation_context = operation_context
def _down_completion(self):
if self._down_terminal_metadata.kind is _Transitory.Kind.NOT_YET_SEEN:
terminal_metadata = None
self._down_terminal_metadata = _GONE
elif self._down_terminal_metadata.kind is _Transitory.Kind.PRESENT:
terminal_metadata = self._down_terminal_metadata.value
self._down_terminal_metadata = _GONE
else:
terminal_metadata = None
if self._down_code.kind is _Transitory.Kind.NOT_YET_SEEN:
code = None
self._down_code = _GONE
elif self._down_code.kind is _Transitory.Kind.PRESENT:
code = self._down_code.value
self._down_code = _GONE
else:
code = None
if self._down_details.kind is _Transitory.Kind.NOT_YET_SEEN:
details = None
self._down_details = _GONE
elif self._down_details.kind is _Transitory.Kind.PRESENT:
details = self._down_details.value
self._down_details = _GONE
else:
details = None
return utilities.completion(terminal_metadata, code, details)
def _set_outcome(self, outcome):
if not self._termination.terminated:
self._operator = _DUMMY_OPERATOR
self._operation_context = None
self._down_initial_metadata = _GONE
self._down_payload = None
self._down_terminal_metadata = _GONE
self._down_code = _GONE
self._down_details = _GONE
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
initial_metadata = None
else:
initial_metadata = self._up_initial_metadata.value
if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
terminal_metadata = None
else:
terminal_metadata = self._up_completion.value.terminal_metadata
if outcome.kind is base.Outcome.Kind.COMPLETED:
code = self._up_completion.value.code
details = self._up_completion.value.message
else:
code = outcome.code
details = outcome.details
self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[
outcome.kind](initial_metadata, terminal_metadata, code, details)
self._condition.notify_all()
return self._termination
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
with self._condition:
if initial_metadata is not None:
self._up_initial_metadata = _Awaited(
_Awaited.Kind.ARRIVED, initial_metadata)
if payload is not None:
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
self._up_initial_metadata = _ARRIVED_AND_NONE
self._up_payload = payload
self._up_allowance -= 1
if completion is not None:
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
self._up_initial_metadata = _ARRIVED_AND_NONE
self._up_completion = _Awaited(
_Awaited.Kind.ARRIVED, completion)
if allowance is not None:
if self._down_payload is not None:
self._operator.advance(payload=self._down_payload)
self._down_payload = None
self._down_allowance += allowance - 1
else:
self._down_allowance += allowance
self._condition.notify_all()
def cancel(self):
with self._condition:
if self._operation_context is not None:
self._operation_context.cancel()
self._cancelled = True
return False
def cancelled(self):
with self._condition:
return self._cancelled
def running(self):
with self._condition:
return not self._termination.terminated
def done(self):
with self._condition:
return self._termination.terminated
def result(self, timeout=None):
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._termination.terminated:
if self._termination.abortion is None:
return self._up_payload
elif self._termination.abortion.kind is face.Abortion.Kind.CANCELLED:
raise future.CancelledError()
else:
raise self._termination.abortion_error # pylint: disable=raising-bad-type
else:
_wait_once_until(self._condition, until)
def exception(self, timeout=None):
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._termination.terminated:
if self._termination.abortion is None:
return None
else:
return self._termination.abortion_error
else:
_wait_once_until(self._condition, until)
def traceback(self, timeout=None):
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._termination.terminated:
if self._termination.abortion_error is None:
return None
else:
abortion_error = self._termination.abortion_error
break
else:
_wait_once_until(self._condition, until)
try:
raise abortion_error
except face.AbortionError:
return sys.exc_info()[2]
def add_done_callback(self, fn):
with self._condition:
if self._operation_context is not None:
outcome = self._operation_context.add_termination_callback(
_done_callback_as_operation_termination_callback(fn, self))
if outcome is None:
return
else:
self._set_outcome(outcome)
fn(self)
def consume(self, value):
with self._condition:
while True:
if self._termination.terminated:
return
elif 0 < self._down_allowance:
self._operator.advance(payload=value)
self._down_allowance -= 1
return
else:
self._condition.wait()
def terminate(self):
with self._condition:
if self._termination.terminated:
return
elif self._down_code.kind is _Transitory.Kind.GONE:
# Conform to specified idempotence of terminate by ignoring extra calls.
return
else:
completion = self._down_completion()
self._operator.advance(completion=completion)
def consume_and_terminate(self, value):
with self._condition:
while True:
if self._termination.terminated:
return
elif 0 < self._down_allowance:
completion = self._down_completion()
self._operator.advance(payload=value, completion=completion)
return
else:
self._condition.wait()
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
with self._condition:
while True:
if self._termination.abortion_error is not None:
raise self._termination.abortion_error
elif self._up_payload is not None:
payload = self._up_payload
self._up_payload = None
if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
self._operator.advance(allowance=1)
return payload
elif self._up_completion.kind is _Awaited.Kind.ARRIVED:
raise StopIteration()
else:
self._condition.wait()
def is_active(self):
with self._condition:
return not self._termination.terminated
def time_remaining(self):
if self._operation_context is None:
return 0
else:
return self._operation_context.time_remaining()
def add_abortion_callback(self, abortion_callback):
with self._condition:
if self._operation_context is None:
return self._termination.abortion
else:
outcome = self._operation_context.add_termination_callback(
_abortion_callback_as_operation_termination_callback(
abortion_callback, self.set_outcome))
if outcome is not None:
return self._set_outcome(outcome).abortion
else:
return self._termination.abortion
def protocol_context(self):
with self._condition:
while True:
if self._protocol_context.kind is _Awaited.Kind.ARRIVED:
return self._protocol_context.value
elif self._termination.abortion_error is not None:
raise self._termination.abortion_error
else:
self._condition.wait()
def initial_metadata(self):
with self._condition:
while True:
if self._up_initial_metadata.kind is _Awaited.Kind.ARRIVED:
return self._up_initial_metadata.value
elif self._termination.terminated:
return None
else:
self._condition.wait()
def terminal_metadata(self):
with self._condition:
while True:
if self._up_completion.kind is _Awaited.Kind.ARRIVED:
return self._up_completion.value.terminal_metadata
elif self._termination.terminated:
return None
else:
self._condition.wait()
def code(self):
with self._condition:
while True:
if self._up_completion.kind is _Awaited.Kind.ARRIVED:
return self._up_completion.value.code
elif self._termination.terminated:
return None
else:
self._condition.wait()
def details(self):
with self._condition:
while True:
if self._up_completion.kind is _Awaited.Kind.ARRIVED:
return self._up_completion.value.message
elif self._termination.terminated:
return None
else:
self._condition.wait()
def set_initial_metadata(self, initial_metadata):
with self._condition:
if (self._down_initial_metadata.kind is not
_Transitory.Kind.NOT_YET_SEEN):
raise ValueError(_CANNOT_SET_INITIAL_METADATA)
else:
self._down_initial_metadata = _GONE
self._operator.advance(initial_metadata=initial_metadata)
def set_terminal_metadata(self, terminal_metadata):
with self._condition:
if (self._down_terminal_metadata.kind is not
_Transitory.Kind.NOT_YET_SEEN):
raise ValueError(_CANNOT_SET_TERMINAL_METADATA)
else:
self._down_terminal_metadata = _Transitory(
_Transitory.Kind.PRESENT, terminal_metadata)
def set_code(self, code):
with self._condition:
if self._down_code.kind is not _Transitory.Kind.NOT_YET_SEEN:
raise ValueError(_CANNOT_SET_CODE)
else:
self._down_code = _Transitory(_Transitory.Kind.PRESENT, code)
def set_details(self, details):
with self._condition:
if self._down_details.kind is not _Transitory.Kind.NOT_YET_SEEN:
raise ValueError(_CANNOT_SET_DETAILS)
else:
self._down_details = _Transitory(_Transitory.Kind.PRESENT, details)
def set_protocol_context(self, protocol_context):
with self._condition:
self._protocol_context = _Awaited(
_Awaited.Kind.ARRIVED, protocol_context)
self._condition.notify_all()
def set_outcome(self, outcome):
with self._condition:
return self._set_outcome(outcome)
class _ProtocolReceiver(base.ProtocolReceiver):
def __init__(self, rendezvous):
self._rendezvous = rendezvous
def context(self, protocol_context):
self._rendezvous.set_protocol_context(protocol_context)
def protocol_receiver(rendezvous):
return _ProtocolReceiver(rendezvous)
def pool_wrap(behavior, operation_context):
"""Wraps an operation-related behavior so that it may be called in a pool.
Args:
behavior: A callable related to carrying out an operation.
operation_context: A base_interfaces.OperationContext for the operation.
Returns:
A callable that when called carries out the behavior of the given callable
and handles whatever exceptions it raises appropriately.
"""
def translation(*args):
try:
behavior(*args)
except (
abandonment.Abandoned,
face.CancellationError,
face.ExpirationError,
face.LocalShutdownError,
face.RemoteShutdownError,
face.NetworkError,
face.RemoteError,
) as e:
if operation_context.outcome() is None:
operation_context.fail(e)
except Exception as e:
operation_context.fail(e)
return callable_util.with_exceptions_logged(
translation, _INTERNAL_ERROR_LOG_MESSAGE)