Begin sharding request queues per cq
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index e21fa2a..0428bb1 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -43,14 +43,8 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_transport *transport) {
- grpc_server_setup_transport(exec_ctx, server, transport,
- grpc_server_get_channel_args(server));
-}
-
static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_endpoint *tcp,
+ grpc_endpoint *tcp, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
@@ -61,7 +55,8 @@
*/
grpc_transport *transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
- setup_transport(exec_ctx, server, transport);
+ grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset,
+ grpc_server_get_channel_args(server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
}
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 99b9f29..fee14ae 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -52,6 +52,7 @@
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
+ grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor);
/* Create a server, initially not bound to any ports. The caller owns one ref.
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 97c945b..c695621 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -362,7 +362,7 @@
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- &acceptor);
+ read_notifier_pollset, &acceptor);
gpr_free(name);
gpr_free(addr_str);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index c9b458f..f1a031b 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -108,6 +108,7 @@
grpc_server *server;
grpc_connectivity_state connectivity_state;
grpc_channel *channel;
+ size_t cq_idx;
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
@@ -180,7 +181,8 @@
char *host;
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
- request_matcher request_matcher;
+ /* one request matcher per method per cq */
+ request_matcher *request_matchers;
registered_method *next;
};
@@ -207,7 +209,8 @@
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- request_matcher unregistered_request_matcher;
+ /** one request matcher for unregistered methods per cq */
+ request_matcher *unregistered_request_matchers;
/** free list of available requested_calls indices */
gpr_stack_lockfree *request_freelist;
/** requested call backing data */
@@ -364,15 +367,17 @@
gpr_mu_destroy(&server->mu_call);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
- request_matcher_destroy(&rm->request_matcher);
+ for (i = 0; i < server->cq_count; i++) {
+ request_matcher_destroy(&rm->request_matchers[i]);
+ }
gpr_free(rm->method);
gpr_free(rm->host);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ request_matcher_destroy(&server->unregistered_request_matchers[i]);
}
- request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
gpr_free(server->cqs);
gpr_free(server->pollsets);
@@ -584,9 +589,10 @@
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request)
continue;
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
- rm->server_registered_method->payload_handling);
+ finish_start_new_rpc(
+ exec_ctx, server, elem,
+ &rm->server_registered_method->request_matchers[chand->cq_idx],
+ rm->server_registered_method->payload_handling);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -600,14 +606,15 @@
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request)
continue;
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
- rm->server_registered_method->payload_handling);
+ finish_start_new_rpc(
+ exec_ctx, server, elem,
+ &rm->server_registered_method->request_matchers[chand->cq_idx],
+ rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(exec_ctx, server, elem,
- &server->unregistered_request_matcher,
+ &server->unregistered_request_matchers[chand->cq_idx],
GRPC_SRM_PAYLOAD_NONE);
}
@@ -637,14 +644,17 @@
static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
grpc_server *server) {
- registered_method *rm;
- request_matcher_kill_requests(exec_ctx, server,
- &server->unregistered_request_matcher);
- request_matcher_zombify_all_pending_calls(
- exec_ctx, &server->unregistered_request_matcher);
- for (rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
+ for (size_t i = 0; i < server->cq_count; i++) {
+ request_matcher_kill_requests(exec_ctx, server,
+ &server->unregistered_request_matchers[i]);
+ request_matcher_zombify_all_pending_calls(
+ exec_ctx, &server->unregistered_request_matchers[i]);
+ for (registered_method *rm = server->registered_methods; rm;
+ rm = rm->next) {
+ request_matcher_kill_requests(exec_ctx, server, &rm->request_matchers[i]);
+ request_matcher_zombify_all_pending_calls(exec_ctx,
+ &rm->request_matchers[i]);
+ }
}
}
@@ -1039,6 +1049,7 @@
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
+ grpc_pollset *accepting_pollset,
const grpc_channel_args *args) {
size_t num_registered_methods;
size_t alloc;
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 470ef23..fb6e4d6 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -53,6 +53,7 @@
server */
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_transport *transport,
+ grpc_pollset *accepting_pollset,
const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/third_party/protobuf b/third_party/protobuf
index a1938b2..d5fb408 160000
--- a/third_party/protobuf
+++ b/third_party/protobuf
@@ -1 +1 @@
-Subproject commit a1938b2aa9ca86ce7ce50c27ff9737c1008d2a03
+Subproject commit d5fb408ddc281ffcadeb08699e65bb694656d0bd