blob: 77e83d5561caf474c9586a9fb170d5449b45cadc [file] [log] [blame]
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests transmission of tickets across gRPC-on-the-wire."""
import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.interfaces.links import links
from grpc_test import test_common
from grpc_test._links import _proto_scenarios
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.links import test_cases
from grpc_test.framework.interfaces.links import test_utilities
_IDENTITY = lambda x: x
class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
def create_transmitting_links(self):
service_link = service.service_link(
{self.group_and_method(): self.deserialize_request},
{self.group_and_method(): self.serialize_response})
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, 'localhost', None,
{self.group_and_method(): self.serialize_request},
{self.group_and_method(): self.deserialize_response})
invocation_link.start()
return invocation_link, service_link
def destroy_transmitting_links(self, invocation_side_link, service_side_link):
invocation_side_link.stop()
service_side_link.begin_stop()
service_side_link.end_stop()
def create_invocation_initial_metadata(self):
return (
('first_invocation_initial_metadata_key', 'just a string value'),
('second_invocation_initial_metadata_key', '0123456789'),
('third_invocation_initial_metadata_key-bin', '\x00\x57' * 100),
)
def create_invocation_terminal_metadata(self):
return None
def create_service_initial_metadata(self):
return (
('first_service_initial_metadata_key', 'just another string value'),
('second_service_initial_metadata_key', '9876543210'),
('third_service_initial_metadata_key-bin', '\x00\x59\x02' * 100),
)
def create_service_terminal_metadata(self):
return (
('first_service_terminal_metadata_key', 'yet another string value'),
('second_service_terminal_metadata_key', 'abcdefghij'),
('third_service_terminal_metadata_key-bin', '\x00\x37' * 100),
)
def create_invocation_completion(self):
return None, None
def create_service_completion(self):
return (
beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!')
def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
self.assertTrue(
test_common.metadata_transmitted(
original_metadata, transmitted_metadata),
'%s erroneously transmitted as %s' % (
original_metadata, transmitted_metadata))
class RoundTripTest(unittest.TestCase):
def testZeroMessageRoundTrip(self):
test_operation_id = object()
test_group = 'test package.Test Group'
test_method = 'test method'
identity_transformation = {(test_group, test_method): _IDENTITY}
test_code = beta_interfaces.StatusCode.OK
test_message = 'a test message'
service_link = service.service_link(
identity_transformation, identity_transformation)
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, None, None, identity_transformation, identity_transformation)
invocation_mate = test_utilities.RecordingLink()
invocation_link.join_link(invocation_mate)
invocation_link.start()
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, links.Ticket.Termination.COMPLETION, None)
invocation_link.accept_ticket(invocation_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_ticket = links.Ticket(
service_mate.tickets()[-1].operation_id, 0, None, None, None, None,
None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.begin_stop()
service_link.end_stop()
self.assertIs(
service_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
self.assertIs(
invocation_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
self.assertIs(invocation_mate.tickets()[-1].code, test_code)
self.assertEqual(invocation_mate.tickets()[-1].message, test_message)
def _perform_scenario_test(self, scenario):
test_operation_id = object()
test_group, test_method = scenario.group_and_method()
test_code = beta_interfaces.StatusCode.OK
test_message = 'a scenario test message'
service_link = service.service_link(
{(test_group, test_method): scenario.deserialize_request},
{(test_group, test_method): scenario.serialize_response})
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, 'localhost', None,
{(test_group, test_method): scenario.serialize_request},
{(test_group, test_method): scenario.deserialize_response})
invocation_mate = test_utilities.RecordingLink()
invocation_link.join_link(invocation_mate)
invocation_link.start()
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, None, None)
invocation_link.accept_ticket(invocation_ticket)
requests = scenario.requests()
for request_index, request in enumerate(requests):
request_ticket = links.Ticket(
test_operation_id, 1 + request_index, None, None, None, None, 1, None,
request, None, None, None, None, None)
invocation_link.accept_ticket(request_ticket)
service_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
response_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_index, None, None,
None, None, 1, None, scenario.response_for_request(request), None,
None, None, None, None)
service_link.accept_ticket(response_ticket)
invocation_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
request_count = len(requests)
invocation_completion_ticket = links.Ticket(
test_operation_id, request_count + 1, None, None, None, None, None,
None, None, None, None, None, links.Ticket.Termination.COMPLETION,
None)
invocation_link.accept_ticket(invocation_completion_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_completion_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_count, None, None, None,
None, None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_completion_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.begin_stop()
service_link.end_stop()
observed_requests = tuple(
ticket.payload for ticket in service_mate.tickets()
if ticket.payload is not None)
observed_responses = tuple(
ticket.payload for ticket in invocation_mate.tickets()
if ticket.payload is not None)
self.assertTrue(scenario.verify_requests(observed_requests))
self.assertTrue(scenario.verify_responses(observed_responses))
def testEmptyScenario(self):
self._perform_scenario_test(_proto_scenarios.EmptyScenario())
def testBidirectionallyUnaryScenario(self):
self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
def testBidirectionallyStreamingScenario(self):
self._perform_scenario_test(
_proto_scenarios.BidirectionallyStreamingScenario())
if __name__ == '__main__':
unittest.main(verbosity=2)