blob: 5c8b176da4f06b32f543647b84e389ad67a20eef [file] [log] [blame]
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of the base interface of RPC Framework."""
import logging
import random
import threading
import time
import unittest
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.base import utilities
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import _control
from grpc_test.framework.interfaces.base import test_interfaces
_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True))
_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome}
class _Serialization(test_interfaces.Serialization):
def serialize_request(self, request):
return request + request
def deserialize_request(self, serialized_request):
return serialized_request[:len(serialized_request) / 2]
def serialize_response(self, response):
return response * 3
def deserialize_response(self, serialized_response):
return serialized_response[2 * len(serialized_response) / 3:]
def _advance(quadruples, operator, controller):
try:
for quadruple in quadruples:
operator.advance(
initial_metadata=quadruple[0], payload=quadruple[1],
completion=quadruple[2], allowance=quadruple[3])
except Exception as e: # pylint: disable=broad-except
controller.failed('Exception on advance: %e' % e)
class _Operator(base.Operator):
def __init__(self, controller, on_advance, pool, operator_under_test):
self._condition = threading.Condition()
self._controller = controller
self._on_advance = on_advance
self._pool = pool
self._operator_under_test = operator_under_test
self._pending_advances = []
def set_operator_under_test(self, operator_under_test):
with self._condition:
self._operator_under_test = operator_under_test
pent_advances = self._pending_advances
self._pending_advances = []
pool = self._pool
controller = self._controller
if pool is None:
_advance(pent_advances, operator_under_test, controller)
else:
pool.submit(_advance, pent_advances, operator_under_test, controller)
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
on_advance = self._on_advance(
initial_metadata, payload, completion, allowance)
if on_advance.kind is _control.OnAdvance.Kind.ADVANCE:
with self._condition:
pool = self._pool
operator_under_test = self._operator_under_test
controller = self._controller
quadruple = (
on_advance.initial_metadata, on_advance.payload,
on_advance.completion, on_advance.allowance)
if pool is None:
_advance((quadruple,), operator_under_test, controller)
else:
pool.submit(_advance, (quadruple,), operator_under_test, controller)
elif on_advance.kind is _control.OnAdvance.Kind.DEFECT:
raise ValueError(
'Deliberately raised exception from Operator.advance (in a test)!')
class _Servicer(base.Servicer):
"""An base.Servicer with instrumented for testing."""
def __init__(self, group, method, controllers, pool):
self._condition = threading.Condition()
self._group = group
self._method = method
self._pool = pool
self._controllers = list(controllers)
def service(self, group, method, context, output_operator):
with self._condition:
controller = self._controllers.pop(0)
if group != self._group or method != self._method:
controller.fail(
'%s != %s or %s != %s' % (group, self._group, method, self._method))
raise base.NoSuchMethodError()
else:
operator = _Operator(
controller, controller.on_service_advance, self._pool,
output_operator)
outcome = context.add_termination_callback(
controller.service_on_termination)
if outcome is not None:
controller.service_on_termination(outcome)
return utilities.full_subscription(operator)
class _OperationTest(unittest.TestCase):
def setUp(self):
if self._synchronicity_variation:
self._pool = logging_pool.pool(test_constants.POOL_SIZE)
else:
self._pool = None
self._controller = self._controller_creator.controller(
self._implementation, self._randomness)
def tearDown(self):
if self._synchronicity_variation:
self._pool.shutdown(wait=True)
else:
self._pool = None
def test_operation(self):
invocation = self._controller.invocation()
if invocation.subscription_kind is base.Subscription.Kind.FULL:
test_operator = _Operator(
self._controller, self._controller.on_invocation_advance,
self._pool, None)
subscription = utilities.full_subscription(test_operator)
else:
# TODO(nathaniel): support and test other subscription kinds.
self.fail('Non-full subscriptions not yet supported!')
servicer = _Servicer(
invocation.group, invocation.method, (self._controller,), self._pool)
invocation_end, service_end, memo = self._implementation.instantiate(
{(invocation.group, invocation.method): _Serialization()}, servicer)
try:
invocation_end.start()
service_end.start()
operation_context, operator_under_test = invocation_end.operate(
invocation.group, invocation.method, subscription, invocation.timeout,
initial_metadata=invocation.initial_metadata, payload=invocation.payload,
completion=invocation.completion)
test_operator.set_operator_under_test(operator_under_test)
outcome = operation_context.add_termination_callback(
self._controller.invocation_on_termination)
if outcome is not None:
self._controller.invocation_on_termination(outcome)
except Exception as e: # pylint: disable=broad-except
self._controller.failed('Exception on invocation: %s' % e)
self.fail(e)
while True:
instruction = self._controller.poll()
if instruction.kind is _control.Instruction.Kind.ADVANCE:
try:
test_operator.advance(
*instruction.advance_args, **instruction.advance_kwargs)
except Exception as e: # pylint: disable=broad-except
self._controller.failed('Exception on instructed advance: %s' % e)
elif instruction.kind is _control.Instruction.Kind.CANCEL:
try:
operation_context.cancel()
except Exception as e: # pylint: disable=broad-except
self._controller.failed('Exception on cancel: %s' % e)
elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
break
invocation_stop_event = invocation_end.stop(0)
service_stop_event = service_end.stop(0)
invocation_stop_event.wait()
service_stop_event.wait()
invocation_stats = invocation_end.operation_stats()
service_stats = service_end.operation_stats()
self._implementation.destantiate(memo)
self.assertTrue(
instruction.conclude_success, msg=instruction.conclude_message)
expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT)
expected_invocation_stats[instruction.conclude_invocation_outcome] += 1
self.assertDictEqual(expected_invocation_stats, invocation_stats)
expected_service_stats = dict(_EMPTY_OUTCOME_DICT)
expected_service_stats[instruction.conclude_service_outcome] += 1
self.assertDictEqual(expected_service_stats, service_stats)
def test_cases(implementation):
"""Creates unittest.TestCase classes for a given Base implementation.
Args:
implementation: A test_interfaces.Implementation specifying creation and
destruction of the Base implementation under test.
Returns:
A sequence of subclasses of unittest.TestCase defining tests of the
specified Base layer implementation.
"""
random_seed = hash(time.time())
logging.warning('Random seed for this execution: %s', random_seed)
randomness = random.Random(x=random_seed)
test_case_classes = []
for synchronicity_variation in _SYNCHRONICITY_VARIATION:
for controller_creator in _control.CONTROLLER_CREATORS:
name = ''.join(
(synchronicity_variation[0], controller_creator.name(), 'Test',))
test_case_classes.append(
type(name, (_OperationTest,),
{'_implementation': implementation,
'_randomness': randomness,
'_synchronicity_variation': synchronicity_variation[1],
'_controller_creator': controller_creator,
}))
return test_case_classes