blob: 4077b8e3503a131a7eda0de2ccbcff9dbca85bc9 [file] [log] [blame]
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test of the GRPC-backed ForeLink and RearLink."""
import threading
import unittest
from grpc._adapter import fore
from grpc._adapter import rear
from grpc.framework.base import interfaces
from grpc.framework.foundation import logging_pool
from grpc_test._adapter import _proto_scenarios
from grpc_test._adapter import _test_links
_IDENTITY = lambda x: x
_TIMEOUT = 32
# TODO(nathaniel): End-to-end metadata testing.
def _transform_metadata(unused_metadata):
return (
('one_unused_key', 'one unused value'),
('another_unused_key', 'another unused value'),
)
class RoundTripTest(unittest.TestCase):
def setUp(self):
self.fore_link_pool = logging_pool.pool(8)
self.rear_link_pool = logging_pool.pool(8)
def tearDown(self):
self.rear_link_pool.shutdown(wait=True)
self.fore_link_pool.shutdown(wait=True)
def testZeroMessageRoundTrip(self):
test_operation_id = object()
test_method = 'test method'
test_fore_link = _test_links.ForeLink(None, None)
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.kind in (
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE):
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, 0,
interfaces.BackToFrontTicket.Kind.COMPLETION, None)
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: None}, {test_method: None}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: None},
{test_method: None}, False, None, None, None,
metadata_transformer=_transform_metadata)
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is
interfaces.BackToFrontTicket.Kind.CONTINUATION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_fore_link.condition:
self.assertIs(
test_fore_link.tickets[-1].kind,
interfaces.BackToFrontTicket.Kind.COMPLETION)
def testEntireRoundTrip(self):
test_operation_id = object()
test_method = 'test method'
test_front_to_back_datum = b'\x07'
test_back_to_front_datum = b'\x08'
test_fore_link = _test_links.ForeLink(None, None)
rear_sequence_number = [0]
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.payload is None:
payload = None
else:
payload = test_back_to_front_datum
terminal = front_to_back_ticket.kind in (
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE)
if payload is not None or terminal:
if terminal:
kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
payload)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: _IDENTITY},
{test_method: _IDENTITY}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
{test_method: _IDENTITY}, False, None, None, None)
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None,
test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_rear_link.condition:
front_to_back_payloads = tuple(
ticket.payload for ticket in test_rear_link.tickets
if ticket.payload is not None)
with test_fore_link.condition:
back_to_front_payloads = tuple(
ticket.payload for ticket in test_fore_link.tickets
if ticket.payload is not None)
self.assertTupleEqual((test_front_to_back_datum,), front_to_back_payloads)
self.assertTupleEqual((test_back_to_front_datum,), back_to_front_payloads)
def _perform_scenario_test(self, scenario):
test_operation_id = object()
test_method = scenario.method()
test_fore_link = _test_links.ForeLink(None, None)
rear_lock = threading.Lock()
rear_sequence_number = [0]
def rear_action(front_to_back_ticket, fore_link):
with rear_lock:
if front_to_back_ticket.payload is not None:
response = scenario.response_for_request(front_to_back_ticket.payload)
else:
response = None
terminal = front_to_back_ticket.kind in (
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE)
if response is not None or terminal:
if terminal:
kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
response)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: scenario.deserialize_request},
{test_method: scenario.serialize_response}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool,
{test_method: scenario.serialize_request},
{test_method: scenario.deserialize_response}, False, None, None, None)
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
commencement_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0,
interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():
continuation_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None,
request, None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(continuation_ticket)
completion_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None,
None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(completion_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_rear_link.condition:
requests = tuple(
ticket.payload for ticket in test_rear_link.tickets
if ticket.payload is not None)
with test_fore_link.condition:
responses = tuple(
ticket.payload for ticket in test_fore_link.tickets
if ticket.payload is not None)
self.assertTrue(scenario.verify_requests(requests))
self.assertTrue(scenario.verify_responses(responses))
def testEmptyScenario(self):
self._perform_scenario_test(_proto_scenarios.EmptyScenario())
def testBidirectionallyUnaryScenario(self):
self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
def testBidirectionallyStreamingScenario(self):
self._perform_scenario_test(
_proto_scenarios.BidirectionallyStreamingScenario())
if __name__ == '__main__':
unittest.main(verbosity=2)