Merge pull request #6855 from makdharma/issue6834
fix for #6834
diff --git a/examples/cpp/helloworld/greeter_async_client.cc b/examples/cpp/helloworld/greeter_async_client.cc
index c1f5eb5..33de59f 100644
--- a/examples/cpp/helloworld/greeter_async_client.cc
+++ b/examples/cpp/helloworld/greeter_async_client.cc
@@ -87,7 +87,9 @@
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
- cq.Next(&got_tag, &ok);
+ // The return value of Next should always be checked. This return value
+ // tells us whether there is any kind of event or the cq_ is shutting down.
+ GPR_ASSERT(cq.Next(&got_tag, &ok));
// Verify that the result from "cq" corresponds, by its tag, our previous
// request.
diff --git a/examples/cpp/helloworld/greeter_async_server.cc b/examples/cpp/helloworld/greeter_async_server.cc
index 64e065b..ead4418 100644
--- a/examples/cpp/helloworld/greeter_async_server.cc
+++ b/examples/cpp/helloworld/greeter_async_server.cc
@@ -160,7 +160,9 @@
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
- cq_->Next(&tag, &ok);
+ // The return value of Next should always be checked. This return value
+ // tells us whether there is any kind of event or cq_ is shutting down.
+ GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
diff --git a/setup.py b/setup.py
index 0e2646d..018d047 100644
--- a/setup.py
+++ b/setup.py
@@ -234,8 +234,7 @@
ext_modules=CYTHON_EXTENSION_MODULES,
packages=list(PACKAGES),
package_dir=PACKAGE_DIRECTORIES,
- # TODO(atash): Figure out why auditwheel doesn't like namespace packages.
- #namespace_packages=['grpc'],
+ namespace_packages=['grpc'],
package_data=PACKAGE_DATA,
install_requires=INSTALL_REQUIRES,
setup_requires=SETUP_REQUIRES,
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc
index 6e14d24..4be8cb4 100644
--- a/src/compiler/objective_c_generator.cc
+++ b/src/compiler/objective_c_generator.cc
@@ -60,9 +60,34 @@
" returns ($server_stream$$response_type$)\n\n");
}
+template <typename DescriptorType>
+static void PrintAllComments(const DescriptorType* desc, Printer* printer) {
+ std::vector<grpc::string> comments;
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING_DETACHED,
+ &comments);
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING,
+ &comments);
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_TRAILING,
+ &comments);
+ if (comments.empty()) {
+ return;
+ }
+ printer->Print("/**\n");
+ for (auto it = comments.begin(); it != comments.end(); ++it) {
+ printer->Print(" * ");
+ size_t start_pos = it->find_first_not_of(' ');
+ if (start_pos != grpc::string::npos) {
+ printer->Print(it->c_str() + start_pos);
+ }
+ printer->Print("\n");
+ }
+ printer->Print(" */\n");
+}
+
void PrintMethodSignature(Printer *printer, const MethodDescriptor *method,
const map< ::grpc::string, ::grpc::string> &vars) {
- // TODO(jcanizales): Print method comments.
+ // Print comment
+ PrintAllComments(method, printer);
printer->Print(vars, "- ($return_type$)$method_name$With");
if (method->client_streaming()) {
@@ -195,8 +220,10 @@
printer.Print("@end\n\n");
printer.Print(
- "// Basic service implementation, over gRPC, that only does"
- " marshalling and parsing.\n");
+ "/**\n"
+ " * Basic service implementation, over gRPC, that only does\n"
+ " * marshalling and parsing.\n"
+ " */\n");
printer.Print(vars,
"@interface $service_class$ :"
" GRPCProtoService<$service_class$>\n");
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 046b395..a62bb61 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -47,6 +47,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
+#include "src/core/lib/http/parser.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -107,7 +108,8 @@
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status);
+ grpc_status_code status,
+ gpr_slice *optional_message);
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@@ -161,6 +163,8 @@
GPR_ASSERT(t->ep == NULL);
+ gpr_slice_unref(t->optional_drop_message);
+
gpr_slice_buffer_destroy(&t->global.qbuf);
gpr_slice_buffer_destroy(&t->writing.outbuf);
@@ -260,6 +264,7 @@
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
+ t->optional_drop_message = gpr_empty_slice();
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
@@ -859,7 +864,7 @@
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_UNAVAILABLE);
+ GRPC_STATUS_UNAVAILABLE, NULL);
}
}
@@ -936,7 +941,7 @@
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- op->cancel_with_status);
+ op->cancel_with_status, op->optional_close_message);
}
if (op->close_with_status != GRPC_STATUS_OK) {
@@ -960,7 +965,7 @@
"(%lu vs. %lu)",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
@@ -1015,7 +1020,7 @@
"(%lu vs. %lu)",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
@@ -1201,7 +1206,7 @@
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1240,7 +1245,7 @@
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
if (stream_global->all_incoming_byte_streams_finished) {
@@ -1303,7 +1308,8 @@
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status) {
+ grpc_status_code status,
+ gpr_slice *optional_message) {
if (!stream_global->read_closed || !stream_global->write_closed) {
if (stream_global->id != 0) {
gpr_slice_buffer_add(
@@ -1313,8 +1319,12 @@
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
&stream_global->stats.outgoing));
}
+
+ if (optional_message) {
+ gpr_slice_ref(*optional_message);
+ }
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
- NULL);
+ optional_message);
}
if (status != GRPC_STATUS_OK && !stream_global->seen_error) {
stream_global->seen_error = true;
@@ -1524,8 +1534,12 @@
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
+ grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global);
cancel_from_api(user_data, transport_global, stream_global,
- GRPC_STATUS_UNAVAILABLE);
+ GRPC_STATUS_UNAVAILABLE,
+ GPR_SLICE_IS_EMPTY(transport->optional_drop_message)
+ ? NULL
+ : &transport->optional_drop_message);
}
static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
@@ -1601,6 +1615,29 @@
}
}
+static bool try_http_parsing(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_http_parser parser;
+ size_t i = 0;
+ bool success = false;
+
+ grpc_http_parser_init(&parser);
+
+ for (; i < t->read_buffer.count &&
+ grpc_http_parser_parse(&parser, t->read_buffer.slices[i]);
+ i++)
+ ;
+ if (grpc_http_parser_eof(&parser) && parser.type == GRPC_HTTP_RESPONSE) {
+ success = true;
+ GRPC_CHTTP2_IF_TRACING(gpr_log(
+ GPR_DEBUG, "Trying to connect an http1.x server, received status:%d",
+ parser.http.response.status));
+ }
+
+ grpc_http_parser_destroy(&parser);
+ return success;
+}
+
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_chttp2_transport *t = arg;
GPR_TIMER_BEGIN("reading_action.parse", 0);
@@ -1612,6 +1649,14 @@
;
if (i != t->read_buffer.count) {
success = false;
+ gpr_slice_unref(t->optional_drop_message);
+ if (try_http_parsing(exec_ctx, t)) {
+ t->optional_drop_message = gpr_slice_from_copied_string(
+ "Connection dropped: received http1.x response");
+ } else {
+ t->optional_drop_message = gpr_slice_from_copied_string(
+ "Connection dropped: received unparseable response");
+ }
}
GPR_TIMER_END("reading_action.parse", 0);
grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 5872fd8..7f3339a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -383,6 +383,9 @@
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
+
+ /** Message explaining the reason of dropping connection */
+ gpr_slice optional_drop_message;
};
typedef struct {
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index fa69316..3b51aa6 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -93,5 +93,20 @@
server.ShutdownAsync().Wait();
}
+
+ [Test]
+ public void UnstartedServerCanBeShutdown()
+ {
+ var server = new Server();
+ server.ShutdownAsync().Wait();
+ Assert.Throws(typeof(InvalidOperationException), () => server.Start());
+ }
+
+ [Test]
+ public void UnstartedServerDoesNotPreventShutdown()
+ {
+ // just create a server, don't start it, and make sure it doesn't prevent shutdown.
+ var server = new Server();
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index ae7a8c9..3b554e5 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -140,6 +140,7 @@
lock (myLock)
{
GrpcPreconditions.CheckState(!startRequested);
+ GrpcPreconditions.CheckState(!shutdownRequested);
startRequested = true;
handle.Start();
@@ -203,7 +204,6 @@
{
lock (myLock)
{
- GrpcPreconditions.CheckState(startRequested);
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
@@ -215,7 +215,6 @@
{
handle.CancelAllCalls();
}
-
await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
DisposeHandle();
diff --git a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
index 0a51110..4039c1b 100644
--- a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
@@ -143,22 +143,60 @@
del completion_queue
-class InsecureServerInsecureClient(unittest.TestCase):
+class ServerClientMixin(object):
- def setUp(self):
+ def setUpMixin(self, server_credentials, client_credentials, host_override):
self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server()
self.server.register_completion_queue(self.server_completion_queue)
- self.port = self.server.add_http2_port('[::]:0')
+ if server_credentials:
+ self.port = self.server.add_http2_port('[::]:0', server_credentials)
+ else:
+ self.port = self.server.add_http2_port('[::]:0')
self.server.start()
self.client_completion_queue = cygrpc.CompletionQueue()
- self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port))
+ if client_credentials:
+ client_channel_arguments = cygrpc.ChannelArgs([
+ cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override,
+ host_override)])
+ self.client_channel = cygrpc.Channel(
+ 'localhost:{}'.format(self.port), client_channel_arguments,
+ client_credentials)
+ else:
+ self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port))
+ if host_override:
+ self.host_argument = None # default host
+ self.expected_host = host_override
+ else:
+ # arbitrary host name necessitating no further identification
+ self.host_argument = b'hostess'
+ self.expected_host = self.host_argument
- def tearDown(self):
+ def tearDownMixin(self):
del self.server
del self.client_completion_queue
del self.server_completion_queue
+ def _perform_operations(self, operations, call, queue, deadline, description):
+ """Perform the list of operations with given call, queue, and deadline.
+
+ Invocation errors are reported with as an exception with `description` in
+ the message. Performs the operations asynchronously, returning a future.
+ """
+ def performer():
+ tag = object()
+ try:
+ call_result = call.start_batch(cygrpc.Operations(operations), tag)
+ self.assertEqual(cygrpc.CallError.ok, call_result)
+ event = queue.poll(deadline)
+ self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
+ self.assertTrue(event.success)
+ self.assertIs(tag, event.tag)
+ except Exception as error:
+ raise Exception("Error in '{}': {}".format(description, error.message))
+ return event
+ return test_utilities.SimpleFuture(performer)
+
def testEcho(self):
DEADLINE = time.time()+5
DEADLINE_TOLERANCE = 0.25
@@ -175,7 +213,6 @@
REQUEST = b'in death a member of project mayhem has a name'
RESPONSE = b'his name is robert paulson'
METHOD = b'twinkies'
- HOST = b'hostess'
cygrpc_deadline = cygrpc.Timespec(DEADLINE)
@@ -188,7 +225,8 @@
client_call_tag = object()
client_call = self.client_channel.create_call(
- None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline)
+ None, 0, self.client_completion_queue, METHOD, self.host_argument,
+ cygrpc_deadline)
client_initial_metadata = cygrpc.Metadata([
cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY,
CLIENT_METADATA_ASCII_VALUE),
@@ -216,173 +254,7 @@
test_common.metadata_transmitted(client_initial_metadata,
request_event.request_metadata))
self.assertEqual(METHOD, request_event.request_call_details.method)
- self.assertEqual(HOST, request_event.request_call_details.host)
- self.assertLess(
- abs(DEADLINE - float(request_event.request_call_details.deadline)),
- DEADLINE_TOLERANCE)
-
- server_call_tag = object()
- server_call = request_event.operation_call
- server_initial_metadata = cygrpc.Metadata([
- cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY,
- SERVER_INITIAL_METADATA_VALUE)])
- server_trailing_metadata = cygrpc.Metadata([
- cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
- SERVER_TRAILING_METADATA_VALUE)])
- server_start_batch_result = server_call.start_batch([
- cygrpc.operation_send_initial_metadata(server_initial_metadata,
- _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
- server_trailing_metadata, SERVER_STATUS_CODE,
- SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
- ], server_call_tag)
- self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
-
- client_event = client_event_future.result()
- server_event = self.server_completion_queue.poll(cygrpc_deadline)
-
- self.assertEqual(6, len(client_event.batch_operations))
- found_client_op_types = set()
- for client_result in client_event.batch_operations:
- # we expect each op type to be unique
- self.assertNotIn(client_result.type, found_client_op_types)
- found_client_op_types.add(client_result.type)
- if client_result.type == cygrpc.OperationType.receive_initial_metadata:
- self.assertTrue(
- test_common.metadata_transmitted(server_initial_metadata,
- client_result.received_metadata))
- elif client_result.type == cygrpc.OperationType.receive_message:
- self.assertEqual(RESPONSE, client_result.received_message.bytes())
- elif client_result.type == cygrpc.OperationType.receive_status_on_client:
- self.assertTrue(
- test_common.metadata_transmitted(server_trailing_metadata,
- client_result.received_metadata))
- self.assertEqual(SERVER_STATUS_DETAILS,
- client_result.received_status_details)
- self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code)
- self.assertEqual(set([
- cygrpc.OperationType.send_initial_metadata,
- cygrpc.OperationType.send_message,
- cygrpc.OperationType.send_close_from_client,
- cygrpc.OperationType.receive_initial_metadata,
- cygrpc.OperationType.receive_message,
- cygrpc.OperationType.receive_status_on_client
- ]), found_client_op_types)
-
- self.assertEqual(5, len(server_event.batch_operations))
- found_server_op_types = set()
- for server_result in server_event.batch_operations:
- self.assertNotIn(client_result.type, found_server_op_types)
- found_server_op_types.add(server_result.type)
- if server_result.type == cygrpc.OperationType.receive_message:
- self.assertEqual(REQUEST, server_result.received_message.bytes())
- elif server_result.type == cygrpc.OperationType.receive_close_on_server:
- self.assertFalse(server_result.received_cancelled)
- self.assertEqual(set([
- cygrpc.OperationType.send_initial_metadata,
- cygrpc.OperationType.receive_message,
- cygrpc.OperationType.send_message,
- cygrpc.OperationType.receive_close_on_server,
- cygrpc.OperationType.send_status_from_server
- ]), found_server_op_types)
-
- del client_call
- del server_call
-
-
-class SecureServerSecureClient(unittest.TestCase):
-
- def setUp(self):
- server_credentials = cygrpc.server_credentials_ssl(
- None, [cygrpc.SslPemKeyCertPair(resources.private_key(),
- resources.certificate_chain())], False)
- channel_credentials = cygrpc.channel_credentials_ssl(
- resources.test_root_certificates(), None)
- self.server_completion_queue = cygrpc.CompletionQueue()
- self.server = cygrpc.Server()
- self.server.register_completion_queue(self.server_completion_queue)
- self.port = self.server.add_http2_port('[::]:0', server_credentials)
- self.server.start()
- self.client_completion_queue = cygrpc.CompletionQueue()
- client_channel_arguments = cygrpc.ChannelArgs([
- cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override,
- _SSL_HOST_OVERRIDE)])
- self.client_channel = cygrpc.Channel(
- 'localhost:{}'.format(self.port), client_channel_arguments,
- channel_credentials)
-
- def tearDown(self):
- del self.server
- del self.client_completion_queue
- del self.server_completion_queue
-
- def testEcho(self):
- DEADLINE = time.time()+5
- DEADLINE_TOLERANCE = 0.25
- CLIENT_METADATA_ASCII_KEY = b'key'
- CLIENT_METADATA_ASCII_VALUE = b'val'
- CLIENT_METADATA_BIN_KEY = b'key-bin'
- CLIENT_METADATA_BIN_VALUE = b'\0'*1000
- SERVER_INITIAL_METADATA_KEY = b'init_me_me_me'
- SERVER_INITIAL_METADATA_VALUE = b'whodawha?'
- SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought'
- SERVER_TRAILING_METADATA_VALUE = b'zomg it is'
- SERVER_STATUS_CODE = cygrpc.StatusCode.ok
- SERVER_STATUS_DETAILS = b'our work is never over'
- REQUEST = b'in death a member of project mayhem has a name'
- RESPONSE = b'his name is robert paulson'
- METHOD = b'/twinkies'
- HOST = None # Default host
-
- cygrpc_deadline = cygrpc.Timespec(DEADLINE)
-
- server_request_tag = object()
- request_call_result = self.server.request_call(
- self.server_completion_queue, self.server_completion_queue,
- server_request_tag)
-
- self.assertEqual(cygrpc.CallError.ok, request_call_result)
-
- plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, '')
- call_credentials = cygrpc.call_credentials_metadata_plugin(plugin)
-
- client_call_tag = object()
- client_call = self.client_channel.create_call(
- None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline)
- client_call.set_credentials(call_credentials)
- client_initial_metadata = cygrpc.Metadata([
- cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY,
- CLIENT_METADATA_ASCII_VALUE),
- cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
- client_start_batch_result = client_call.start_batch(cygrpc.Operations([
- cygrpc.operation_send_initial_metadata(client_initial_metadata,
- _EMPTY_FLAGS),
- cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
- ]), client_call_tag)
- self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
- client_event_future = test_utilities.CompletionQueuePollFuture(
- self.client_completion_queue, cygrpc_deadline)
-
- request_event = self.server_completion_queue.poll(cygrpc_deadline)
- self.assertEqual(cygrpc.CompletionType.operation_complete,
- request_event.type)
- self.assertIsInstance(request_event.operation_call, cygrpc.Call)
- self.assertIs(server_request_tag, request_event.tag)
- self.assertEqual(0, len(request_event.batch_operations))
- client_metadata_with_credentials = list(client_initial_metadata) + [
- (_CALL_CREDENTIALS_METADATA_KEY, _CALL_CREDENTIALS_METADATA_VALUE)]
- self.assertTrue(
- test_common.metadata_transmitted(client_metadata_with_credentials,
- request_event.request_metadata))
- self.assertEqual(METHOD, request_event.request_call_details.method)
- self.assertEqual(_SSL_HOST_OVERRIDE,
+ self.assertEqual(self.expected_host,
request_event.request_call_details.host)
self.assertLess(
abs(DEADLINE - float(request_event.request_call_details.deadline)),
@@ -459,6 +331,102 @@
del client_call
del server_call
+ def test6522(self):
+ DEADLINE = time.time()+5
+ DEADLINE_TOLERANCE = 0.25
+ METHOD = b'twinkies'
+
+ cygrpc_deadline = cygrpc.Timespec(DEADLINE)
+ empty_metadata = cygrpc.Metadata([])
+
+ server_request_tag = object()
+ self.server.request_call(
+ self.server_completion_queue, self.server_completion_queue,
+ server_request_tag)
+ client_call = self.client_channel.create_call(
+ None, 0, self.client_completion_queue, METHOD, self.host_argument,
+ cygrpc_deadline)
+
+ # Prologue
+ def perform_client_operations(operations, description):
+ return self._perform_operations(
+ operations, client_call,
+ self.client_completion_queue, cygrpc_deadline, description)
+
+ client_event_future = perform_client_operations([
+ cygrpc.operation_send_initial_metadata(empty_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ ], "Client prologue")
+
+ request_event = self.server_completion_queue.poll(cygrpc_deadline)
+ server_call = request_event.operation_call
+
+ def perform_server_operations(operations, description):
+ return self._perform_operations(
+ operations, server_call,
+ self.server_completion_queue, cygrpc_deadline, description)
+
+ server_event_future = perform_server_operations([
+ cygrpc.operation_send_initial_metadata(empty_metadata,
+ _EMPTY_FLAGS),
+ ], "Server prologue")
+
+ client_event_future.result() # force completion
+ server_event_future.result()
+
+ # Messaging
+ for _ in range(10):
+ client_event_future = perform_client_operations([
+ cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ ], "Client message")
+ server_event_future = perform_server_operations([
+ cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ ], "Server receive")
+
+ client_event_future.result() # force completion
+ server_event_future.result()
+
+ # Epilogue
+ client_event_future = perform_client_operations([
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+ ], "Client epilogue")
+
+ server_event_future = perform_server_operations([
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
+ ], "Server epilogue")
+
+ client_event_future.result() # force completion
+ server_event_future.result()
+
+
+class InsecureServerInsecureClient(unittest.TestCase, ServerClientMixin):
+
+ def setUp(self):
+ self.setUpMixin(None, None, None)
+
+ def tearDown(self):
+ self.tearDownMixin()
+
+
+class SecureServerSecureClient(unittest.TestCase, ServerClientMixin):
+
+ def setUp(self):
+ server_credentials = cygrpc.server_credentials_ssl(
+ None, [cygrpc.SslPemKeyCertPair(resources.private_key(),
+ resources.certificate_chain())], False)
+ client_credentials = cygrpc.channel_credentials_ssl(
+ resources.test_root_certificates(), None)
+ self.setUpMixin(server_credentials, client_credentials, _SSL_HOST_OVERRIDE)
+
+ def tearDown(self):
+ self.tearDownMixin()
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_cython/test_utilities.py b/src/python/grpcio/tests/unit/_cython/test_utilities.py
index 6a09739..6280ce7 100644
--- a/src/python/grpcio/tests/unit/_cython/test_utilities.py
+++ b/src/python/grpcio/tests/unit/_cython/test_utilities.py
@@ -32,15 +32,35 @@
from grpc._cython import cygrpc
-class CompletionQueuePollFuture:
+class SimpleFuture(object):
+ """A simple future mechanism."""
- def __init__(self, completion_queue, deadline):
- def poller_function():
- self._event_result = completion_queue.poll(deadline)
- self._event_result = None
- self._thread = threading.Thread(target=poller_function)
+ def __init__(self, function, *args, **kwargs):
+ def wrapped_function():
+ try:
+ self._result = function(*args, **kwargs)
+ except Exception as error:
+ self._error = error
+ self._result = None
+ self._error = None
+ self._thread = threading.Thread(target=wrapped_function)
self._thread.start()
def result(self):
+ """The resulting value of this future.
+
+ Re-raises any exceptions.
+ """
self._thread.join()
- return self._event_result
+ if self._error:
+ # TODO(atash): re-raise exceptions in a way that preserves tracebacks
+ raise self._error
+ return self._result
+
+
+class CompletionQueuePollFuture(SimpleFuture):
+
+ def __init__(self, completion_queue, deadline):
+ super(CompletionQueuePollFuture, self).__init__(
+ lambda: completion_queue.poll(deadline))
+
diff --git a/tools/distrib/python/grpcio_tools/README.rst b/tools/distrib/python/grpcio_tools/README.rst
index 10d2fe8..e9f1374 100644
--- a/tools/distrib/python/grpcio_tools/README.rst
+++ b/tools/distrib/python/grpcio_tools/README.rst
@@ -126,3 +126,13 @@
GCC 6.0), this is probably a bug where GCC chokes on constant expressions
when the :code:`-fwrapv` flag is specified. You should consider setting your
environment with :code:`CFLAGS=-fno-wrapv` or using clang (:code:`CC=clang`).
+
+Usage
+-----
+
+Given protobuf include directories :code:`$INCLUDE`, an output directory
+:code:`$OUTPUT`, and proto files :code:`$PROTO_FILES`, invoke as:
+
+::
+
+ $ python -m grpc.tools.protoc -I$INCLUDE --python_out=$OUTPUT --grpc_python_out=$OUTPUT $PROTO_FILES
diff --git a/tools/distrib/python/grpcio_tools/setup.py b/tools/distrib/python/grpcio_tools/setup.py
index 576f7ae..bb08cea 100644
--- a/tools/distrib/python/grpcio_tools/setup.py
+++ b/tools/distrib/python/grpcio_tools/setup.py
@@ -88,8 +88,7 @@
protoc_ext_module(),
]),
packages=setuptools.find_packages('.'),
- # TODO(atash): Figure out why auditwheel doesn't like namespace packages.
- #namespace_packages=['grpc'],
+ namespace_packages=['grpc'],
install_requires=[
'protobuf>=3.0.0a3',
],
diff --git a/tools/dockerfile/grpc_artifact_python_manylinux_x64/Dockerfile b/tools/dockerfile/grpc_artifact_python_manylinux_x64/Dockerfile
index 3e31a2b..1d4e8e1 100644
--- a/tools/dockerfile/grpc_artifact_python_manylinux_x64/Dockerfile
+++ b/tools/dockerfile/grpc_artifact_python_manylinux_x64/Dockerfile
@@ -41,3 +41,10 @@
RUN /opt/python/cp34-cp34m/bin/pip install cython
RUN /opt/python/cp35-cp35m/bin/pip install cython
+####################################################
+# Install auditwheel with fix for namespace packages
+RUN git clone https://github.com/pypa/auditwheel /usr/local/src/auditwheel
+RUN cd /usr/local/src/auditwheel && git checkout bf071b38c9fe78b025ea05c78b1cb61d7cb09939
+RUN /opt/python/cp35-cp35m/bin/pip install /usr/local/src/auditwheel
+RUN rm /usr/local/bin/auditwheel
+RUN cd /usr/local/bin && ln -s /opt/python/cp35-cp35m/bin/auditwheel
diff --git a/tools/dockerfile/grpc_artifact_python_manylinux_x86/Dockerfile b/tools/dockerfile/grpc_artifact_python_manylinux_x86/Dockerfile
index 5fe62c2..8104996 100644
--- a/tools/dockerfile/grpc_artifact_python_manylinux_x86/Dockerfile
+++ b/tools/dockerfile/grpc_artifact_python_manylinux_x86/Dockerfile
@@ -41,3 +41,10 @@
RUN /opt/python/cp34-cp34m/bin/pip install cython
RUN /opt/python/cp35-cp35m/bin/pip install cython
+####################################################
+# Install auditwheel with fix for namespace packages
+RUN git clone https://github.com/pypa/auditwheel /usr/local/src/auditwheel
+RUN cd /usr/local/src/auditwheel && git checkout bf071b38c9fe78b025ea05c78b1cb61d7cb09939
+RUN /opt/python/cp35-cp35m/bin/pip install /usr/local/src/auditwheel
+RUN rm /usr/local/bin/auditwheel
+RUN cd /usr/local/bin && ln -s /opt/python/cp35-cp35m/bin/auditwheel