| # Copyright 2015, Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """Tests for the old '_low'.""" |
| |
| import Queue |
| import threading |
| import time |
| import unittest |
| |
| from grpc._adapter import _intermediary_low as _low |
| |
| _STREAM_LENGTH = 300 |
| _TIMEOUT = 5 |
| _AFTER_DELAY = 2 |
| _FUTURE = time.time() + 60 * 60 * 24 |
| _BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200 |
| _BYTE_SEQUENCE_SEQUENCE = tuple( |
| bytes(bytearray((row + column) % 256 for column in range(row))) |
| for row in range(_STREAM_LENGTH)) |
| |
| |
| class LonelyClientTest(unittest.TestCase): |
| |
| def testLonelyClient(self): |
| host = 'nosuchhostexists' |
| port = 54321 |
| method = 'test method' |
| deadline = time.time() + _TIMEOUT |
| after_deadline = deadline + _AFTER_DELAY |
| metadata_tag = object() |
| finish_tag = object() |
| |
| completion_queue = _low.CompletionQueue() |
| channel = _low.Channel('%s:%d' % (host, port), None) |
| client_call = _low.Call(channel, completion_queue, method, host, deadline) |
| |
| client_call.invoke(completion_queue, metadata_tag, finish_tag) |
| first_event = completion_queue.get(after_deadline) |
| self.assertIsNotNone(first_event) |
| second_event = completion_queue.get(after_deadline) |
| self.assertIsNotNone(second_event) |
| kinds = [event.kind for event in (first_event, second_event)] |
| self.assertItemsEqual( |
| (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH), |
| kinds) |
| |
| self.assertIsNone(completion_queue.get(after_deadline)) |
| |
| completion_queue.stop() |
| stop_event = completion_queue.get(_FUTURE) |
| self.assertEqual(_low.Event.Kind.STOP, stop_event.kind) |
| |
| del client_call |
| del channel |
| del completion_queue |
| |
| |
| def _drive_completion_queue(completion_queue, event_queue): |
| while True: |
| event = completion_queue.get(_FUTURE) |
| if event.kind is _low.Event.Kind.STOP: |
| break |
| event_queue.put(event) |
| |
| |
| class EchoTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.host = 'localhost' |
| |
| self.server_completion_queue = _low.CompletionQueue() |
| self.server = _low.Server(self.server_completion_queue) |
| port = self.server.add_http2_addr('[::]:0') |
| self.server.start() |
| self.server_events = Queue.Queue() |
| self.server_completion_queue_thread = threading.Thread( |
| target=_drive_completion_queue, |
| args=(self.server_completion_queue, self.server_events)) |
| self.server_completion_queue_thread.start() |
| |
| self.client_completion_queue = _low.CompletionQueue() |
| self.channel = _low.Channel('%s:%d' % (self.host, port), None) |
| self.client_events = Queue.Queue() |
| self.client_completion_queue_thread = threading.Thread( |
| target=_drive_completion_queue, |
| args=(self.client_completion_queue, self.client_events)) |
| self.client_completion_queue_thread.start() |
| |
| def tearDown(self): |
| self.server.stop() |
| self.server.cancel_all_calls() |
| self.server_completion_queue.stop() |
| self.client_completion_queue.stop() |
| self.server_completion_queue_thread.join() |
| self.client_completion_queue_thread.join() |
| del self.server |
| |
| def _perform_echo_test(self, test_data): |
| method = 'test method' |
| details = 'test details' |
| server_leading_metadata_key = 'my_server_leading_key' |
| server_leading_metadata_value = 'my_server_leading_value' |
| server_trailing_metadata_key = 'my_server_trailing_key' |
| server_trailing_metadata_value = 'my_server_trailing_value' |
| client_metadata_key = 'my_client_key' |
| client_metadata_value = 'my_client_value' |
| server_leading_binary_metadata_key = 'my_server_leading_key-bin' |
| server_leading_binary_metadata_value = b'\0'*2047 |
| server_trailing_binary_metadata_key = 'my_server_trailing_key-bin' |
| server_trailing_binary_metadata_value = b'\0'*2047 |
| client_binary_metadata_key = 'my_client_key-bin' |
| client_binary_metadata_value = b'\0'*2047 |
| deadline = _FUTURE |
| metadata_tag = object() |
| finish_tag = object() |
| write_tag = object() |
| complete_tag = object() |
| service_tag = object() |
| read_tag = object() |
| status_tag = object() |
| |
| server_data = [] |
| client_data = [] |
| |
| client_call = _low.Call(self.channel, self.client_completion_queue, |
| method, self.host, deadline) |
| client_call.add_metadata(client_metadata_key, client_metadata_value) |
| client_call.add_metadata(client_binary_metadata_key, |
| client_binary_metadata_value) |
| |
| client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) |
| |
| self.server.service(service_tag) |
| service_accepted = self.server_events.get() |
| self.assertIsNotNone(service_accepted) |
| self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) |
| self.assertIs(service_accepted.tag, service_tag) |
| self.assertEqual(method, service_accepted.service_acceptance.method) |
| self.assertEqual(self.host, service_accepted.service_acceptance.host) |
| self.assertIsNotNone(service_accepted.service_acceptance.call) |
| metadata = dict(service_accepted.metadata) |
| self.assertIn(client_metadata_key, metadata) |
| self.assertEqual(client_metadata_value, metadata[client_metadata_key]) |
| self.assertIn(client_binary_metadata_key, metadata) |
| self.assertEqual(client_binary_metadata_value, |
| metadata[client_binary_metadata_key]) |
| server_call = service_accepted.service_acceptance.call |
| server_call.accept(self.server_completion_queue, finish_tag) |
| server_call.add_metadata(server_leading_metadata_key, |
| server_leading_metadata_value) |
| server_call.add_metadata(server_leading_binary_metadata_key, |
| server_leading_binary_metadata_value) |
| server_call.premetadata() |
| |
| metadata_accepted = self.client_events.get() |
| self.assertIsNotNone(metadata_accepted) |
| self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) |
| self.assertEqual(metadata_tag, metadata_accepted.tag) |
| metadata = dict(metadata_accepted.metadata) |
| self.assertIn(server_leading_metadata_key, metadata) |
| self.assertEqual(server_leading_metadata_value, |
| metadata[server_leading_metadata_key]) |
| self.assertIn(server_leading_binary_metadata_key, metadata) |
| self.assertEqual(server_leading_binary_metadata_value, |
| metadata[server_leading_binary_metadata_key]) |
| |
| for datum in test_data: |
| client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) |
| write_accepted = self.client_events.get() |
| self.assertIsNotNone(write_accepted) |
| self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) |
| self.assertIs(write_accepted.tag, write_tag) |
| self.assertIs(write_accepted.write_accepted, True) |
| |
| server_call.read(read_tag) |
| read_accepted = self.server_events.get() |
| self.assertIsNotNone(read_accepted) |
| self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) |
| self.assertEqual(read_tag, read_accepted.tag) |
| self.assertIsNotNone(read_accepted.bytes) |
| server_data.append(read_accepted.bytes) |
| |
| server_call.write(read_accepted.bytes, write_tag, 0) |
| write_accepted = self.server_events.get() |
| self.assertIsNotNone(write_accepted) |
| self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) |
| self.assertEqual(write_tag, write_accepted.tag) |
| self.assertTrue(write_accepted.write_accepted) |
| |
| client_call.read(read_tag) |
| read_accepted = self.client_events.get() |
| self.assertIsNotNone(read_accepted) |
| self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) |
| self.assertEqual(read_tag, read_accepted.tag) |
| self.assertIsNotNone(read_accepted.bytes) |
| client_data.append(read_accepted.bytes) |
| |
| client_call.complete(complete_tag) |
| complete_accepted = self.client_events.get() |
| self.assertIsNotNone(complete_accepted) |
| self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) |
| self.assertIs(complete_accepted.tag, complete_tag) |
| self.assertIs(complete_accepted.complete_accepted, True) |
| |
| server_call.read(read_tag) |
| read_accepted = self.server_events.get() |
| self.assertIsNotNone(read_accepted) |
| self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) |
| self.assertEqual(read_tag, read_accepted.tag) |
| self.assertIsNone(read_accepted.bytes) |
| |
| server_call.add_metadata(server_trailing_metadata_key, |
| server_trailing_metadata_value) |
| server_call.add_metadata(server_trailing_binary_metadata_key, |
| server_trailing_binary_metadata_value) |
| |
| server_call.status(_low.Status(_low.Code.OK, details), status_tag) |
| server_terminal_event_one = self.server_events.get() |
| server_terminal_event_two = self.server_events.get() |
| if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: |
| status_accepted = server_terminal_event_one |
| rpc_accepted = server_terminal_event_two |
| else: |
| status_accepted = server_terminal_event_two |
| rpc_accepted = server_terminal_event_one |
| self.assertIsNotNone(status_accepted) |
| self.assertIsNotNone(rpc_accepted) |
| self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind) |
| self.assertEqual(status_tag, status_accepted.tag) |
| self.assertTrue(status_accepted.complete_accepted) |
| self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) |
| self.assertEqual(finish_tag, rpc_accepted.tag) |
| self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) |
| |
| client_call.read(read_tag) |
| client_terminal_event_one = self.client_events.get() |
| client_terminal_event_two = self.client_events.get() |
| if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: |
| read_accepted = client_terminal_event_one |
| finish_accepted = client_terminal_event_two |
| else: |
| read_accepted = client_terminal_event_two |
| finish_accepted = client_terminal_event_one |
| self.assertIsNotNone(read_accepted) |
| self.assertIsNotNone(finish_accepted) |
| self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) |
| self.assertEqual(read_tag, read_accepted.tag) |
| self.assertIsNone(read_accepted.bytes) |
| self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) |
| self.assertEqual(finish_tag, finish_accepted.tag) |
| self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status) |
| metadata = dict(finish_accepted.metadata) |
| self.assertIn(server_trailing_metadata_key, metadata) |
| self.assertEqual(server_trailing_metadata_value, |
| metadata[server_trailing_metadata_key]) |
| self.assertIn(server_trailing_binary_metadata_key, metadata) |
| self.assertEqual(server_trailing_binary_metadata_value, |
| metadata[server_trailing_binary_metadata_key]) |
| self.assertSetEqual(set(key for key, _ in finish_accepted.metadata), |
| set((server_trailing_metadata_key, |
| server_trailing_binary_metadata_key,))) |
| |
| self.assertSequenceEqual(test_data, server_data) |
| self.assertSequenceEqual(test_data, client_data) |
| |
| def testNoEcho(self): |
| self._perform_echo_test(()) |
| |
| def testOneByteEcho(self): |
| self._perform_echo_test([b'\x07']) |
| |
| def testOneManyByteEcho(self): |
| self._perform_echo_test([_BYTE_SEQUENCE]) |
| |
| def testManyOneByteEchoes(self): |
| self._perform_echo_test(_BYTE_SEQUENCE) |
| |
| def testManyManyByteEchoes(self): |
| self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE) |
| |
| |
| class CancellationTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.host = 'localhost' |
| |
| self.server_completion_queue = _low.CompletionQueue() |
| self.server = _low.Server(self.server_completion_queue) |
| port = self.server.add_http2_addr('[::]:0') |
| self.server.start() |
| self.server_events = Queue.Queue() |
| self.server_completion_queue_thread = threading.Thread( |
| target=_drive_completion_queue, |
| args=(self.server_completion_queue, self.server_events)) |
| self.server_completion_queue_thread.start() |
| |
| self.client_completion_queue = _low.CompletionQueue() |
| self.channel = _low.Channel('%s:%d' % (self.host, port), None) |
| self.client_events = Queue.Queue() |
| self.client_completion_queue_thread = threading.Thread( |
| target=_drive_completion_queue, |
| args=(self.client_completion_queue, self.client_events)) |
| self.client_completion_queue_thread.start() |
| |
| def tearDown(self): |
| self.server.stop() |
| self.server.cancel_all_calls() |
| self.server_completion_queue.stop() |
| self.client_completion_queue.stop() |
| self.server_completion_queue_thread.join() |
| self.client_completion_queue_thread.join() |
| del self.server |
| |
| def testCancellation(self): |
| method = 'test method' |
| deadline = _FUTURE |
| metadata_tag = object() |
| finish_tag = object() |
| write_tag = object() |
| service_tag = object() |
| read_tag = object() |
| test_data = _BYTE_SEQUENCE_SEQUENCE |
| |
| server_data = [] |
| client_data = [] |
| |
| client_call = _low.Call(self.channel, self.client_completion_queue, |
| method, self.host, deadline) |
| |
| client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) |
| |
| self.server.service(service_tag) |
| service_accepted = self.server_events.get() |
| server_call = service_accepted.service_acceptance.call |
| |
| server_call.accept(self.server_completion_queue, finish_tag) |
| server_call.premetadata() |
| |
| metadata_accepted = self.client_events.get() |
| self.assertIsNotNone(metadata_accepted) |
| |
| for datum in test_data: |
| client_call.write(datum, write_tag, 0) |
| write_accepted = self.client_events.get() |
| |
| server_call.read(read_tag) |
| read_accepted = self.server_events.get() |
| server_data.append(read_accepted.bytes) |
| |
| server_call.write(read_accepted.bytes, write_tag, 0) |
| write_accepted = self.server_events.get() |
| self.assertIsNotNone(write_accepted) |
| |
| client_call.read(read_tag) |
| read_accepted = self.client_events.get() |
| client_data.append(read_accepted.bytes) |
| |
| client_call.cancel() |
| # cancel() is idempotent. |
| client_call.cancel() |
| client_call.cancel() |
| client_call.cancel() |
| |
| server_call.read(read_tag) |
| |
| server_terminal_event_one = self.server_events.get() |
| server_terminal_event_two = self.server_events.get() |
| if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: |
| read_accepted = server_terminal_event_one |
| rpc_accepted = server_terminal_event_two |
| else: |
| read_accepted = server_terminal_event_two |
| rpc_accepted = server_terminal_event_one |
| self.assertIsNotNone(read_accepted) |
| self.assertIsNotNone(rpc_accepted) |
| self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) |
| self.assertIsNone(read_accepted.bytes) |
| self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) |
| self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) |
| |
| finish_event = self.client_events.get() |
| self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) |
| self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'), |
| finish_event.status) |
| |
| self.assertSequenceEqual(test_data, server_data) |
| self.assertSequenceEqual(test_data, client_data) |
| |
| |
| class ExpirationTest(unittest.TestCase): |
| |
| @unittest.skip('TODO(nathaniel): Expiration test!') |
| def testExpiration(self): |
| pass |
| |
| |
| if __name__ == '__main__': |
| unittest.main(verbosity=2) |
| |