blob: 608e64119ec9586637d5c55ac0800d3a499014cc [file] [log] [blame]
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of the links interface of RPC Framework."""
# unittest is referenced from specification in this module.
import abc
import unittest # pylint: disable=unused-import
import six
from grpc.framework.interfaces.links import links
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.links import test_utilities
def at_least_n_payloads_received_predicate(n):
def predicate(ticket_sequence):
payload_count = 0
for ticket in ticket_sequence:
if ticket.payload is not None:
payload_count += 1
if n <= payload_count:
return True
else:
return False
return predicate
def terminated(ticket_sequence):
return ticket_sequence and ticket_sequence[-1].termination is not None
_TRANSMISSION_GROUP = 'test.Group'
_TRANSMISSION_METHOD = 'TestMethod'
class TransmissionTest(six.with_metaclass(abc.ABCMeta)):
"""Tests ticket transmission between two connected links.
This class must be mixed into a unittest.TestCase that implements the abstract
methods it provides.
"""
# This is a unittest.TestCase mix-in.
# pylint: disable=invalid-name
@abc.abstractmethod
def create_transmitting_links(self):
"""Creates two connected links for use in this test.
Returns:
Two links.Links, the first of which will be used on the invocation side
of RPCs and the second of which will be used on the service side of
RPCs.
"""
raise NotImplementedError()
@abc.abstractmethod
def destroy_transmitting_links(self, invocation_side_link, service_side_link):
"""Destroys the two connected links created for this test.
Args:
invocation_side_link: The link used on the invocation side of RPCs in
this test.
service_side_link: The link used on the service side of RPCs in this
test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_invocation_initial_metadata(self):
"""Creates a value for use as invocation-side initial metadata.
Returns:
A metadata value appropriate for use as invocation-side initial metadata
or None if invocation-side initial metadata transmission is not
supported by the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_invocation_terminal_metadata(self):
"""Creates a value for use as invocation-side terminal metadata.
Returns:
A metadata value appropriate for use as invocation-side terminal
metadata or None if invocation-side terminal metadata transmission is
not supported by the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_service_initial_metadata(self):
"""Creates a value for use as service-side initial metadata.
Returns:
A metadata value appropriate for use as service-side initial metadata or
None if service-side initial metadata transmission is not supported by
the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_service_terminal_metadata(self):
"""Creates a value for use as service-side terminal metadata.
Returns:
A metadata value appropriate for use as service-side terminal metadata or
None if service-side terminal metadata transmission is not supported by
the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_invocation_completion(self):
"""Creates values for use as invocation-side code and message.
Returns:
An invocation-side code value and an invocation-side message value.
Either or both may be None if invocation-side code and/or
invocation-side message transmission is not supported by the links
under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_service_completion(self):
"""Creates values for use as service-side code and message.
Returns:
A service-side code value and a service-side message value. Either or
both may be None if service-side code and/or service-side message
transmission is not supported by the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
"""Asserts that transmitted_metadata contains original_metadata.
Args:
original_metadata: A metadata object used in this test.
transmitted_metadata: A metadata object obtained after transmission
through the system under test.
Raises:
AssertionError: if the transmitted_metadata object does not contain
original_metadata.
"""
raise NotImplementedError()
def group_and_method(self):
"""Returns the group and method used in this test case.
Returns:
A pair of the group and method used in this test case.
"""
return _TRANSMISSION_GROUP, _TRANSMISSION_METHOD
def serialize_request(self, request):
"""Serializes a request value used in this test case.
Args:
request: A request value created by this test case.
Returns:
A bytestring that is the serialization of the given request.
"""
return request
def deserialize_request(self, serialized_request):
"""Deserializes a request value used in this test case.
Args:
serialized_request: A bytestring that is the serialization of some request
used in this test case.
Returns:
The request value encoded by the given bytestring.
"""
return serialized_request
def serialize_response(self, response):
"""Serializes a response value used in this test case.
Args:
response: A response value created by this test case.
Returns:
A bytestring that is the serialization of the given response.
"""
return response
def deserialize_response(self, serialized_response):
"""Deserializes a response value used in this test case.
Args:
serialized_response: A bytestring that is the serialization of some
response used in this test case.
Returns:
The response value encoded by the given bytestring.
"""
return serialized_response
def _assert_is_valid_metadata_payload_sequence(
self, ticket_sequence, payloads, initial_metadata, terminal_metadata):
initial_metadata_seen = False
seen_payloads = []
terminal_metadata_seen = False
for ticket in ticket_sequence:
if ticket.initial_metadata is not None:
self.assertFalse(initial_metadata_seen)
self.assertFalse(seen_payloads)
self.assertFalse(terminal_metadata_seen)
self.assertMetadataTransmitted(initial_metadata, ticket.initial_metadata)
initial_metadata_seen = True
if ticket.payload is not None:
self.assertFalse(terminal_metadata_seen)
seen_payloads.append(ticket.payload)
if ticket.terminal_metadata is not None:
self.assertFalse(terminal_metadata_seen)
self.assertMetadataTransmitted(terminal_metadata, ticket.terminal_metadata)
terminal_metadata_seen = True
self.assertSequenceEqual(payloads, seen_payloads)
def _assert_is_valid_invocation_sequence(
self, ticket_sequence, group, method, payloads, initial_metadata,
terminal_metadata, termination):
self.assertLess(0, len(ticket_sequence))
self.assertEqual(group, ticket_sequence[0].group)
self.assertEqual(method, ticket_sequence[0].method)
self._assert_is_valid_metadata_payload_sequence(
ticket_sequence, payloads, initial_metadata, terminal_metadata)
self.assertIs(termination, ticket_sequence[-1].termination)
def _assert_is_valid_service_sequence(
self, ticket_sequence, payloads, initial_metadata, terminal_metadata,
code, message, termination):
self.assertLess(0, len(ticket_sequence))
self._assert_is_valid_metadata_payload_sequence(
ticket_sequence, payloads, initial_metadata, terminal_metadata)
self.assertEqual(code, ticket_sequence[-1].code)
self.assertEqual(message, ticket_sequence[-1].message)
self.assertIs(termination, ticket_sequence[-1].termination)
def setUp(self):
self._invocation_link, self._service_link = self.create_transmitting_links()
self._invocation_mate = test_utilities.RecordingLink()
self._service_mate = test_utilities.RecordingLink()
self._invocation_link.join_link(self._invocation_mate)
self._service_link.join_link(self._service_mate)
def tearDown(self):
self.destroy_transmitting_links(self._invocation_link, self._service_link)
def testSimplestRoundTrip(self):
"""Tests transmission of one ticket in each direction."""
invocation_operation_id = object()
invocation_payload = b'\x07' * 1023
timeout = test_constants.LONG_TIMEOUT
invocation_initial_metadata = self.create_invocation_initial_metadata()
invocation_terminal_metadata = self.create_invocation_terminal_metadata()
invocation_code, invocation_message = self.create_invocation_completion()
service_payload = b'\x08' * 1025
service_initial_metadata = self.create_service_initial_metadata()
service_terminal_metadata = self.create_service_terminal_metadata()
service_code, service_message = self.create_service_completion()
original_invocation_ticket = links.Ticket(
invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata,
invocation_payload, invocation_terminal_metadata, invocation_code,
invocation_message, links.Ticket.Termination.COMPLETION, None)
self._invocation_link.accept_ticket(original_invocation_ticket)
self._service_mate.block_until_tickets_satisfy(
at_least_n_payloads_received_predicate(1))
service_operation_id = self._service_mate.tickets()[0].operation_id
self._service_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_invocation_sequence(
self._service_mate.tickets(), _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
(invocation_payload,), invocation_initial_metadata,
invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
original_service_ticket = links.Ticket(
service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message,
links.Ticket.Termination.COMPLETION, None)
self._service_link.accept_ticket(original_service_ticket)
self._invocation_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_service_sequence(
self._invocation_mate.tickets(), (service_payload,),
service_initial_metadata, service_terminal_metadata, service_code,
service_message, links.Ticket.Termination.COMPLETION)