blob: 7897bcce018a1e79fd25703cd5cd2ea47918ff25 [file] [log] [blame]
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import grpc
import grpc_testing
from tests.testing import _application_common
from tests.testing import _application_testing_common
from tests.testing import _server_application
from tests.testing.proto import services_pb2
# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip.
@unittest.skipIf(
services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
'Fix protobuf issue 3452!')
class FirstServiceServicerTest(unittest.TestCase):
def setUp(self):
self._real_time = grpc_testing.strict_real_time()
self._fake_time = grpc_testing.strict_fake_time(time.time())
servicer = _server_application.FirstServiceServicer()
descriptors_to_servicers = {
_application_testing_common.FIRST_SERVICE: servicer
}
self._real_time_server = grpc_testing.server_from_dictionary(
descriptors_to_servicers, self._real_time)
self._fake_time_server = grpc_testing.server_from_dictionary(
descriptors_to_servicers, self._fake_time)
def test_successful_unary_unary(self):
rpc = self._real_time_server.invoke_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN, (),
_application_common.UNARY_UNARY_REQUEST, None)
initial_metadata = rpc.initial_metadata()
response, trailing_metadata, code, details = rpc.termination()
self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
self.assertIs(code, grpc.StatusCode.OK)
def test_successful_unary_stream(self):
rpc = self._real_time_server.invoke_unary_stream(
_application_testing_common.FIRST_SERVICE_UNSTRE, (),
_application_common.UNARY_STREAM_REQUEST, None)
initial_metadata = rpc.initial_metadata()
trailing_metadata, code, details = rpc.termination()
self.assertIs(code, grpc.StatusCode.OK)
def test_successful_stream_unary(self):
rpc = self._real_time_server.invoke_stream_unary(
_application_testing_common.FIRST_SERVICE_STREUN, (), None)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
rpc.requests_closed()
initial_metadata = rpc.initial_metadata()
response, trailing_metadata, code, details = rpc.termination()
self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response)
self.assertIs(code, grpc.StatusCode.OK)
def test_successful_stream_stream(self):
rpc = self._real_time_server.invoke_stream_stream(
_application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
initial_metadata = rpc.initial_metadata()
responses = [
rpc.take_response(),
rpc.take_response(),
]
rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
responses.extend([
rpc.take_response(),
rpc.take_response(),
rpc.take_response(),
rpc.take_response(),
])
rpc.requests_closed()
trailing_metadata, code, details = rpc.termination()
for response in responses:
self.assertEqual(_application_common.STREAM_STREAM_RESPONSE,
response)
self.assertIs(code, grpc.StatusCode.OK)
def test_server_rpc_idempotence(self):
rpc = self._real_time_server.invoke_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN, (),
_application_common.UNARY_UNARY_REQUEST, None)
first_initial_metadata = rpc.initial_metadata()
second_initial_metadata = rpc.initial_metadata()
third_initial_metadata = rpc.initial_metadata()
first_termination = rpc.termination()
second_termination = rpc.termination()
third_termination = rpc.termination()
for later_initial_metadata in (second_initial_metadata,
third_initial_metadata,):
self.assertEqual(first_initial_metadata, later_initial_metadata)
response = first_termination[0]
terminal_metadata = first_termination[1]
code = first_termination[2]
details = first_termination[3]
for later_termination in (second_termination, third_termination,):
self.assertEqual(response, later_termination[0])
self.assertEqual(terminal_metadata, later_termination[1])
self.assertIs(code, later_termination[2])
self.assertEqual(details, later_termination[3])
self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
self.assertIs(code, grpc.StatusCode.OK)
def test_misbehaving_client_unary_unary(self):
rpc = self._real_time_server.invoke_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN, (),
_application_common.ERRONEOUS_UNARY_UNARY_REQUEST, None)
initial_metadata = rpc.initial_metadata()
response, trailing_metadata, code, details = rpc.termination()
self.assertIsNot(code, grpc.StatusCode.OK)
def test_infinite_request_stream_real_time(self):
rpc = self._real_time_server.invoke_stream_unary(
_application_testing_common.FIRST_SERVICE_STREUN, (),
_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
initial_metadata = rpc.initial_metadata()
self._real_time.sleep_for(
_application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
response, trailing_metadata, code, details = rpc.termination()
self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
def test_infinite_request_stream_fake_time(self):
rpc = self._fake_time_server.invoke_stream_unary(
_application_testing_common.FIRST_SERVICE_STREUN, (),
_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
initial_metadata = rpc.initial_metadata()
self._fake_time.sleep_for(
_application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
response, trailing_metadata, code, details = rpc.termination()
self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
if __name__ == '__main__':
unittest.main(verbosity=2)