blob: bff940d74710da3a6be3a2c099cf8e661f51f910 [file] [log] [blame]
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utility functions for invoking RPCs."""
from grpc.framework.crust import _control
from grpc.framework.interfaces.base import utilities
from grpc.framework.interfaces.face import face
_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
_EMPTY_COMPLETION = utilities.completion(None, None, None)
def _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
complete):
rendezvous = _control.Rendezvous(None, None)
subscription = utilities.full_subscription(
rendezvous, _control.protocol_receiver(rendezvous))
operation_context, operator = end.operate(
group, method, subscription, timeout, protocol_options=protocol_options,
initial_metadata=initial_metadata, payload=payload,
completion=_EMPTY_COMPLETION if complete else None)
rendezvous.set_operator_and_context(operator, operation_context)
outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is not None:
rendezvous.set_outcome(outcome)
return rendezvous, operation_context, outcome
def _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
if outcome is None:
def in_pool():
abortion = rendezvous.add_abortion_callback(abortion_callback)
if abortion is None:
try:
receiver.initial_metadata(rendezvous.initial_metadata())
receiver.response(next(rendezvous))
receiver.complete(
rendezvous.terminal_metadata(), rendezvous.code(),
rendezvous.details())
except face.AbortionError:
pass
else:
abortion_callback(abortion)
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
if outcome is None:
def in_pool():
abortion = rendezvous.add_abortion_callback(abortion_callback)
if abortion is None:
try:
receiver.initial_metadata(rendezvous.initial_metadata())
for response in rendezvous:
receiver.response(response)
receiver.complete(
rendezvous.terminal_metadata(), rendezvous.code(),
rendezvous.details())
except face.AbortionError:
pass
else:
abortion_callback(abortion)
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def blocking_unary_unary(
end, group, method, timeout, with_call, protocol_options, initial_metadata,
payload):
"""Services in a blocking fashion a unary-unary servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
if with_call:
return next(rendezvous), rendezvous
else:
return next(rendezvous)
def future_unary_unary(
end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in value-out servicer method by returning a Future."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return rendezvous
def inline_unary_stream(
end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in stream-out servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return rendezvous
def blocking_stream_unary(
end, group, method, timeout, with_call, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services in a blocking fashion a stream-in value-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
rendezvous.consume(payload)
rendezvous.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context))
if with_call:
return next(rendezvous), rendezvous
else:
return next(rendezvous)
else:
if with_call:
return next(rendezvous), rendezvous
else:
return next(rendezvous)
def future_stream_unary(
end, group, method, timeout, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services a stream-in value-out servicer method by returning a Future."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
rendezvous.consume(payload)
rendezvous.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def inline_stream_stream(
end, group, method, timeout, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services a stream-in stream-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
rendezvous.consume(payload)
rendezvous.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def event_unary_unary(
end, group, method, timeout, protocol_options, initial_metadata, payload,
receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_unary_stream(
end, group, method, timeout, protocol_options, initial_metadata, payload,
receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_unary(
end, group, method, timeout, protocol_options, initial_metadata, receiver,
abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_stream(
end, group, method, timeout, protocol_options, initial_metadata, receiver,
abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)