| # Copyright 2015, Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """Code for making a service.TestService more amenable to use in tests.""" |
| |
| import collections |
| import threading |
| |
| # testing_control, interfaces, and testing_service are referenced from |
| # specification in this module. |
| from grpc.framework.common import cardinality |
| from grpc.framework.common import style |
| from grpc.framework.face import exceptions |
| from grpc.framework.face import interfaces as face_interfaces |
| from grpc.framework.foundation import stream |
| from grpc.framework.foundation import stream_util |
| from grpc_test.framework.face.testing import control as testing_control # pylint: disable=unused-import |
| from grpc_test.framework.face.testing import interfaces # pylint: disable=unused-import |
| from grpc_test.framework.face.testing import service as testing_service # pylint: disable=unused-import |
| |
| _IDENTITY = lambda x: x |
| |
| |
| class TestServiceDigest( |
| collections.namedtuple( |
| 'TestServiceDigest', |
| ['name', |
| 'methods', |
| 'inline_method_implementations', |
| 'event_method_implementations', |
| 'multi_method_implementation', |
| 'unary_unary_messages_sequences', |
| 'unary_stream_messages_sequences', |
| 'stream_unary_messages_sequences', |
| 'stream_stream_messages_sequences'])): |
| """A transformation of a service.TestService. |
| |
| Attributes: |
| name: The RPC service name to be used in the test. |
| methods: A sequence of interfaces.Method objects describing the RPC |
| methods that will be called during the test. |
| inline_method_implementations: A dict from RPC method name to |
| face_interfaces.MethodImplementation object to be used in tests of |
| in-line calls to behaviors under test. |
| event_method_implementations: A dict from RPC method name to |
| face_interfaces.MethodImplementation object to be used in tests of |
| event-driven calls to behaviors under test. |
| multi_method_implementation: A face_interfaces.MultiMethodImplementation to |
| be used in tests of generic calls to behaviors under test. |
| unary_unary_messages_sequences: A dict from method name to sequence of |
| service.UnaryUnaryTestMessages objects to be used to test the method |
| with the given name. |
| unary_stream_messages_sequences: A dict from method name to sequence of |
| service.UnaryStreamTestMessages objects to be used to test the method |
| with the given name. |
| stream_unary_messages_sequences: A dict from method name to sequence of |
| service.StreamUnaryTestMessages objects to be used to test the method |
| with the given name. |
| stream_stream_messages_sequences: A dict from method name to sequence of |
| service.StreamStreamTestMessages objects to be used to test the |
| method with the given name. |
| serialization: A serial.Serialization object describing serialization |
| behaviors for all the RPC methods. |
| """ |
| |
| |
| class _BufferingConsumer(stream.Consumer): |
| """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" |
| |
| def __init__(self): |
| self.consumed = [] |
| self.terminated = False |
| |
| def consume(self, value): |
| self.consumed.append(value) |
| |
| def terminate(self): |
| self.terminated = True |
| |
| def consume_and_terminate(self, value): |
| self.consumed.append(value) |
| self.terminated = True |
| |
| |
| class _InlineUnaryUnaryMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, unary_unary_test_method, control): |
| self._test_method = unary_unary_test_method |
| self._control = control |
| |
| self.cardinality = cardinality.Cardinality.UNARY_UNARY |
| self.style = style.Service.INLINE |
| |
| def unary_unary_inline(self, request, context): |
| response_list = [] |
| self._test_method.service( |
| request, response_list.append, context, self._control) |
| return response_list.pop(0) |
| |
| |
| class _EventUnaryUnaryMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, unary_unary_test_method, control, pool): |
| self._test_method = unary_unary_test_method |
| self._control = control |
| self._pool = pool |
| |
| self.cardinality = cardinality.Cardinality.UNARY_UNARY |
| self.style = style.Service.EVENT |
| |
| def unary_unary_event(self, request, response_callback, context): |
| if self._pool is None: |
| self._test_method.service( |
| request, response_callback, context, self._control) |
| else: |
| self._pool.submit( |
| self._test_method.service, request, response_callback, context, |
| self._control) |
| |
| |
| class _InlineUnaryStreamMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, unary_stream_test_method, control): |
| self._test_method = unary_stream_test_method |
| self._control = control |
| |
| self.cardinality = cardinality.Cardinality.UNARY_STREAM |
| self.style = style.Service.INLINE |
| |
| def unary_stream_inline(self, request, context): |
| response_consumer = _BufferingConsumer() |
| self._test_method.service( |
| request, response_consumer, context, self._control) |
| for response in response_consumer.consumed: |
| yield response |
| |
| |
| class _EventUnaryStreamMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, unary_stream_test_method, control, pool): |
| self._test_method = unary_stream_test_method |
| self._control = control |
| self._pool = pool |
| |
| self.cardinality = cardinality.Cardinality.UNARY_STREAM |
| self.style = style.Service.EVENT |
| |
| def unary_stream_event(self, request, response_consumer, context): |
| if self._pool is None: |
| self._test_method.service( |
| request, response_consumer, context, self._control) |
| else: |
| self._pool.submit( |
| self._test_method.service, request, response_consumer, context, |
| self._control) |
| |
| |
| class _InlineStreamUnaryMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, stream_unary_test_method, control): |
| self._test_method = stream_unary_test_method |
| self._control = control |
| |
| self.cardinality = cardinality.Cardinality.STREAM_UNARY |
| self.style = style.Service.INLINE |
| |
| def stream_unary_inline(self, request_iterator, context): |
| response_list = [] |
| request_consumer = self._test_method.service( |
| response_list.append, context, self._control) |
| for request in request_iterator: |
| request_consumer.consume(request) |
| request_consumer.terminate() |
| return response_list.pop(0) |
| |
| |
| class _EventStreamUnaryMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, stream_unary_test_method, control, pool): |
| self._test_method = stream_unary_test_method |
| self._control = control |
| self._pool = pool |
| |
| self.cardinality = cardinality.Cardinality.STREAM_UNARY |
| self.style = style.Service.EVENT |
| |
| def stream_unary_event(self, response_callback, context): |
| request_consumer = self._test_method.service( |
| response_callback, context, self._control) |
| if self._pool is None: |
| return request_consumer |
| else: |
| return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) |
| |
| |
| class _InlineStreamStreamMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, stream_stream_test_method, control): |
| self._test_method = stream_stream_test_method |
| self._control = control |
| |
| self.cardinality = cardinality.Cardinality.STREAM_STREAM |
| self.style = style.Service.INLINE |
| |
| def stream_stream_inline(self, request_iterator, context): |
| response_consumer = _BufferingConsumer() |
| request_consumer = self._test_method.service( |
| response_consumer, context, self._control) |
| |
| for request in request_iterator: |
| request_consumer.consume(request) |
| while response_consumer.consumed: |
| yield response_consumer.consumed.pop(0) |
| response_consumer.terminate() |
| |
| |
| class _EventStreamStreamMethod(face_interfaces.MethodImplementation): |
| |
| def __init__(self, stream_stream_test_method, control, pool): |
| self._test_method = stream_stream_test_method |
| self._control = control |
| self._pool = pool |
| |
| self.cardinality = cardinality.Cardinality.STREAM_STREAM |
| self.style = style.Service.EVENT |
| |
| def stream_stream_event(self, response_consumer, context): |
| request_consumer = self._test_method.service( |
| response_consumer, context, self._control) |
| if self._pool is None: |
| return request_consumer |
| else: |
| return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) |
| |
| |
| class _UnaryConsumer(stream.Consumer): |
| """A Consumer that only allows consumption of exactly one value.""" |
| |
| def __init__(self, action): |
| self._lock = threading.Lock() |
| self._action = action |
| self._consumed = False |
| self._terminated = False |
| |
| def consume(self, value): |
| with self._lock: |
| if self._consumed: |
| raise ValueError('Unary consumer already consumed!') |
| elif self._terminated: |
| raise ValueError('Unary consumer already terminated!') |
| else: |
| self._consumed = True |
| |
| self._action(value) |
| |
| def terminate(self): |
| with self._lock: |
| if not self._consumed: |
| raise ValueError('Unary consumer hasn\'t yet consumed!') |
| elif self._terminated: |
| raise ValueError('Unary consumer already terminated!') |
| else: |
| self._terminated = True |
| |
| def consume_and_terminate(self, value): |
| with self._lock: |
| if self._consumed: |
| raise ValueError('Unary consumer already consumed!') |
| elif self._terminated: |
| raise ValueError('Unary consumer already terminated!') |
| else: |
| self._consumed = True |
| self._terminated = True |
| |
| self._action(value) |
| |
| |
| class _UnaryUnaryAdaptation(object): |
| |
| def __init__(self, unary_unary_test_method): |
| self._method = unary_unary_test_method |
| |
| def service(self, response_consumer, context, control): |
| def action(request): |
| self._method.service( |
| request, response_consumer.consume_and_terminate, context, control) |
| return _UnaryConsumer(action) |
| |
| |
| class _UnaryStreamAdaptation(object): |
| |
| def __init__(self, unary_stream_test_method): |
| self._method = unary_stream_test_method |
| |
| def service(self, response_consumer, context, control): |
| def action(request): |
| self._method.service(request, response_consumer, context, control) |
| return _UnaryConsumer(action) |
| |
| |
| class _StreamUnaryAdaptation(object): |
| |
| def __init__(self, stream_unary_test_method): |
| self._method = stream_unary_test_method |
| |
| def service(self, response_consumer, context, control): |
| return self._method.service( |
| response_consumer.consume_and_terminate, context, control) |
| |
| |
| class _MultiMethodImplementation(face_interfaces.MultiMethodImplementation): |
| |
| def __init__(self, methods, control, pool): |
| self._methods = methods |
| self._control = control |
| self._pool = pool |
| |
| def service(self, name, response_consumer, context): |
| method = self._methods.get(name, None) |
| if method is None: |
| raise exceptions.NoSuchMethodError(name) |
| elif self._pool is None: |
| return method(response_consumer, context, self._control) |
| else: |
| request_consumer = method(response_consumer, context, self._control) |
| return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) |
| |
| |
| class _Assembly( |
| collections.namedtuple( |
| '_Assembly', |
| ['methods', 'inlines', 'events', 'adaptations', 'messages'])): |
| """An intermediate structure created when creating a TestServiceDigest.""" |
| |
| |
| def _assemble( |
| scenarios, names, inline_method_constructor, event_method_constructor, |
| adapter, control, pool): |
| """Creates an _Assembly from the given scenarios.""" |
| methods = [] |
| inlines = {} |
| events = {} |
| adaptations = {} |
| messages = {} |
| for name, scenario in scenarios.iteritems(): |
| if name in names: |
| raise ValueError('Repeated name "%s"!' % name) |
| |
| test_method = scenario[0] |
| inline_method = inline_method_constructor(test_method, control) |
| event_method = event_method_constructor(test_method, control, pool) |
| adaptation = adapter(test_method) |
| |
| methods.append(test_method) |
| inlines[name] = inline_method |
| events[name] = event_method |
| adaptations[name] = adaptation |
| messages[name] = scenario[1] |
| |
| return _Assembly(methods, inlines, events, adaptations, messages) |
| |
| |
| def digest(service, control, pool): |
| """Creates a TestServiceDigest from a TestService. |
| |
| Args: |
| service: A testing_service.TestService. |
| control: A testing_control.Control. |
| pool: If RPC methods should be serviced in a separate thread, a thread pool. |
| None if RPC methods should be serviced in the thread belonging to the |
| run-time that calls for their service. |
| |
| Returns: |
| A TestServiceDigest synthesized from the given service.TestService. |
| """ |
| names = set() |
| |
| unary_unary = _assemble( |
| service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod, |
| _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) |
| names.update(set(unary_unary.inlines)) |
| |
| unary_stream = _assemble( |
| service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod, |
| _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) |
| names.update(set(unary_stream.inlines)) |
| |
| stream_unary = _assemble( |
| service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod, |
| _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) |
| names.update(set(stream_unary.inlines)) |
| |
| stream_stream = _assemble( |
| service.stream_stream_scenarios(), names, _InlineStreamStreamMethod, |
| _EventStreamStreamMethod, _IDENTITY, control, pool) |
| names.update(set(stream_stream.inlines)) |
| |
| methods = list(unary_unary.methods) |
| methods.extend(unary_stream.methods) |
| methods.extend(stream_unary.methods) |
| methods.extend(stream_stream.methods) |
| adaptations = dict(unary_unary.adaptations) |
| adaptations.update(unary_stream.adaptations) |
| adaptations.update(stream_unary.adaptations) |
| adaptations.update(stream_stream.adaptations) |
| inlines = dict(unary_unary.inlines) |
| inlines.update(unary_stream.inlines) |
| inlines.update(stream_unary.inlines) |
| inlines.update(stream_stream.inlines) |
| events = dict(unary_unary.events) |
| events.update(unary_stream.events) |
| events.update(stream_unary.events) |
| events.update(stream_stream.events) |
| |
| return TestServiceDigest( |
| service.name(), |
| methods, |
| inlines, |
| events, |
| _MultiMethodImplementation(adaptations, control, pool), |
| unary_unary.messages, |
| unary_stream.messages, |
| stream_unary.messages, |
| stream_stream.messages) |