Merge github.com:grpc/grpc into test_affine
diff --git a/Makefile b/Makefile
index 42cedf5..c93c9a4 100644
--- a/Makefile
+++ b/Makefile
@@ -215,9 +215,9 @@
CXX_mutrace = $(DEFAULT_CXX)
LD_mutrace = $(DEFAULT_CC)
LDXX_mutrace = $(DEFAULT_CXX)
-CPPFLAGS_mutrace = -O0
+CPPFLAGS_mutrace = -O3 -fno-omit-frame-pointer
LDFLAGS_mutrace = -rdynamic
-DEFINES_mutrace = _DEBUG DEBUG
+DEFINES_mutrace = NDEBUG
VALID_CONFIG_memcheck = 1
CC_memcheck = $(DEFAULT_CC)
diff --git a/build.yaml b/build.yaml
index ac61612..68e814f 100644
--- a/build.yaml
+++ b/build.yaml
@@ -3246,8 +3246,8 @@
compile_the_world: true
timeout_multiplier: 4
mutrace:
- CPPFLAGS: -O0
- DEFINES: _DEBUG DEBUG
+ CPPFLAGS: -O3 -fno-omit-frame-pointer
+ DEFINES: NDEBUG
LDFLAGS: -rdynamic
opt:
CPPFLAGS: -O2
diff --git a/grpc.def b/grpc.def
index 09a94a6..b811f0f 100644
--- a/grpc.def
+++ b/grpc.def
@@ -77,6 +77,7 @@
grpc_server_request_registered_call
grpc_server_create
grpc_server_register_completion_queue
+ grpc_server_register_non_listening_completion_queue
grpc_server_add_insecure_http2_port
grpc_server_start
grpc_server_shutdown_and_notify
diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h
index 6fcd5c3..cf78438 100644
--- a/include/grpc++/impl/codegen/channel_interface.h
+++ b/include/grpc++/impl/codegen/channel_interface.h
@@ -85,6 +85,16 @@
return WaitForStateChangeImpl(last_observed, deadline_tp.raw_time());
}
+ /// Wait for this channel to be connected
+ template <typename T>
+ bool WaitForConnected(T deadline) {
+ grpc_connectivity_state state;
+ while ((state = GetState(true)) != GRPC_CHANNEL_READY) {
+ if (!WaitForStateChange(state, deadline)) return false;
+ }
+ return true;
+ }
+
private:
template <class R>
friend class ::grpc::ClientReader;
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index 56864d6..1b84b44 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -222,9 +222,18 @@
/// A specific type of completion queue used by the processing of notifications
/// by servers. Instantiated by \a ServerBuilder.
class ServerCompletionQueue : public CompletionQueue {
+ public:
+ bool IsFrequentlyPolled() { return is_frequently_polled_; }
+
private:
+ bool is_frequently_polled_;
friend class ServerBuilder;
- ServerCompletionQueue() {}
+ /// \param is_frequently_polled Informs the GPRC library about whether the
+ /// server completion queue would be actively polled (by calling Next() or
+ /// AsyncNext()). By default all server completion queues are assumed to be
+ /// frequently polled.
+ ServerCompletionQueue(bool is_frequently_polled = true)
+ : is_frequently_polled_(is_frequently_polled) {}
};
} // namespace grpc
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index ad62952..8525cb7 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -108,7 +108,15 @@
/// Add a completion queue for handling asynchronous services
/// Caller is required to keep this completion queue live until
/// the server is destroyed.
- std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
+ ///
+ /// \param is_frequently_polled This is an optional parameter to inform GRPC
+ /// library about whether this completion queue would be frequently polled
+ /// (i.e by calling Next() or AsyncNext()). The default value is 'true' and is
+ /// the recommended setting. Setting this to 'false' (i.e not polling the
+ /// completion queue frequently) will have a significantly negative
+ /// performance impact and hence should not be used in production use cases.
+ std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(
+ bool is_frequently_polled = true);
/// Return a running server which is ready for processing calls.
std::unique_ptr<Server> BuildAndStart();
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 0ca28c0..6f7a67b 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -334,6 +334,15 @@
grpc_completion_queue *cq,
void *reserved);
+/** Register a non-listening completion queue with the server. This API is
+ similar to grpc_server_register_completion_queue except that the server will
+ not use this completion_queue to listen to any incoming channels.
+
+ Registering a non-listening completion queue will have negative performance
+ impact and hence this API is not recommended for production use cases. */
+GRPCAPI void grpc_server_register_non_listening_completion_queue(
+ grpc_server *server, grpc_completion_queue *q, void *reserved);
+
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
REQUIRES: server not started */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index e21fa2a..0428bb1 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -43,14 +43,8 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_transport *transport) {
- grpc_server_setup_transport(exec_ctx, server, transport,
- grpc_server_get_channel_args(server));
-}
-
static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_endpoint *tcp,
+ grpc_endpoint *tcp, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
@@ -61,7 +55,8 @@
*/
grpc_transport *transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
- setup_transport(exec_ctx, server, transport);
+ grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset,
+ grpc_server_get_channel_args(server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 698b2be..26b0f00 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -52,7 +52,7 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-typedef struct grpc_server_secure_state {
+typedef struct server_secure_state {
grpc_server *server;
grpc_tcp_server *tcp;
grpc_server_security_connector *sc;
@@ -62,13 +62,16 @@
gpr_refcount refcount;
grpc_closure destroy_closure;
grpc_closure *destroy_callback;
-} grpc_server_secure_state;
+} server_secure_state;
-static void state_ref(grpc_server_secure_state *state) {
- gpr_ref(&state->refcount);
-}
+typedef struct server_secure_connect {
+ server_secure_state *state;
+ grpc_pollset *accepting_pollset;
+} server_secure_connect;
-static void state_unref(grpc_server_secure_state *state) {
+static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); }
+
+static void state_unref(server_secure_state *state) {
if (gpr_unref(&state->refcount)) {
/* ensure all threads have unlocked */
gpr_mu_lock(&state->mu);
@@ -80,67 +83,66 @@
}
}
-static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep,
- grpc_transport *transport,
- grpc_auth_context *auth_context) {
- grpc_server_secure_state *state = statep;
- grpc_channel_args *args_copy;
- grpc_arg args_to_add[2];
- args_to_add[0] = grpc_server_credentials_to_arg(state->creds);
- args_to_add[1] = grpc_auth_context_to_arg(auth_context);
- args_copy = grpc_channel_args_copy_and_add(
- grpc_server_get_channel_args(state->server), args_to_add,
- GPR_ARRAY_SIZE(args_to_add));
- grpc_server_setup_transport(exec_ctx, state->server, transport, args_copy);
- grpc_channel_args_destroy(args_copy);
-}
-
static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
grpc_security_status status,
grpc_endpoint *secure_endpoint,
grpc_auth_context *auth_context) {
- grpc_server_secure_state *state = statep;
+ server_secure_connect *state = statep;
grpc_transport *transport;
if (status == GRPC_SECURITY_OK) {
if (secure_endpoint) {
- gpr_mu_lock(&state->mu);
- if (!state->is_shutdown) {
+ gpr_mu_lock(&state->state->mu);
+ if (!state->state->is_shutdown) {
transport = grpc_create_chttp2_transport(
- exec_ctx, grpc_server_get_channel_args(state->server),
+ exec_ctx, grpc_server_get_channel_args(state->state->server),
secure_endpoint, 0);
- setup_transport(exec_ctx, state, transport, auth_context);
+ grpc_channel_args *args_copy;
+ grpc_arg args_to_add[2];
+ args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds);
+ args_to_add[1] = grpc_auth_context_to_arg(auth_context);
+ args_copy = grpc_channel_args_copy_and_add(
+ grpc_server_get_channel_args(state->state->server), args_to_add,
+ GPR_ARRAY_SIZE(args_to_add));
+ grpc_server_setup_transport(exec_ctx, state->state->server, transport,
+ state->accepting_pollset, args_copy);
+ grpc_channel_args_destroy(args_copy);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
} else {
/* We need to consume this here, because the server may already have
* gone away. */
grpc_endpoint_destroy(exec_ctx, secure_endpoint);
}
- gpr_mu_unlock(&state->mu);
+ gpr_mu_unlock(&state->state->mu);
}
} else {
gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
}
- state_unref(state);
+ state_unref(state->state);
+ gpr_free(state);
}
static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
+ grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
- grpc_server_secure_state *state = statep;
- state_ref(state);
- grpc_server_security_connector_do_handshake(
- exec_ctx, state->sc, acceptor, tcp, on_secure_handshake_done, state);
+ server_secure_connect *state = gpr_malloc(sizeof(*state));
+ state->state = statep;
+ state_ref(state->state);
+ state->accepting_pollset = accepting_pollset;
+ grpc_server_security_connector_do_handshake(exec_ctx, state->state->sc,
+ acceptor, tcp,
+ on_secure_handshake_done, state);
}
/* Server callback: start listening on our ports */
static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
grpc_pollset **pollsets, size_t pollset_count) {
- grpc_server_secure_state *state = statep;
+ server_secure_state *state = statep;
grpc_tcp_server_start(exec_ctx, state->tcp, pollsets, pollset_count,
on_accept, state);
}
static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) {
- grpc_server_secure_state *state = statep;
+ server_secure_state *state = statep;
if (state->destroy_callback != NULL) {
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
success);
@@ -153,7 +155,7 @@
callbacks) */
static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
grpc_closure *callback) {
- grpc_server_secure_state *state = statep;
+ server_secure_state *state = statep;
grpc_tcp_server *tcp;
gpr_mu_lock(&state->mu);
state->is_shutdown = 1;
@@ -167,7 +169,7 @@
grpc_server_credentials *creds) {
grpc_resolved_addresses *resolved = NULL;
grpc_tcp_server *tcp = NULL;
- grpc_server_secure_state *state = NULL;
+ server_secure_state *state = NULL;
size_t i;
unsigned count = 0;
int port_num = -1;
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index aeb6e28..943c404 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -126,6 +126,9 @@
grpc_closure *on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* The pollset that last noticed and notified that the fd is readable */
+ grpc_pollset *read_notifier_pollset;
};
/* Begin polling on an fd.
@@ -147,7 +150,8 @@
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
- int got_read, int got_write);
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd *fd);
@@ -342,6 +346,7 @@
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
+ r->read_notifier_pollset = NULL;
gpr_mu_unlock(&r->mu);
return r;
}
@@ -545,6 +550,11 @@
}
}
+static void set_read_notifier_pollset_locked(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
+ fd->read_notifier_pollset = read_notifier_pollset;
+}
+
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
GPR_ASSERT(!fd->shutdown);
@@ -568,6 +578,18 @@
gpr_mu_unlock(&fd->mu);
}
+/* Return the read-notifier pollset */
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ grpc_pollset *notifier = NULL;
+
+ gpr_mu_lock(&fd->mu);
+ notifier = fd->read_notifier_pollset;
+ gpr_mu_unlock(&fd->mu);
+
+ return notifier;
+}
+
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_pollset_worker *worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher *watcher) {
@@ -620,7 +642,8 @@
}
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
- int got_read, int got_write) {
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
@@ -656,6 +679,10 @@
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
kick = 1;
}
+
+ if (read_notifier_pollset != NULL) {
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
}
if (got_write) {
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
@@ -756,9 +783,14 @@
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
+ /* Prefer not to kick self. Push the worker to the end of the list and
+ * pop the one from front */
GPR_TIMER_MARK("kick_anonymous_not_self", 0);
push_back_worker(p, specific_worker);
specific_worker = pop_front_worker(p);
+ /* If there was only one worker on the pollset, we would get the same
+ * worker we pushed (the one set on current thread local) back. If so,
+ * kick it only if GRPC_POLLSET_CAN_KICK_SELF flag is set */
if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
@@ -1201,11 +1233,11 @@
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else if (r == 0) {
if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else {
if (pfd[0].revents & POLLIN_CHECK) {
@@ -1216,9 +1248,9 @@
}
if (nfds > 2) {
fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
- pfd[2].revents & POLLOUT_CHECK);
+ pfd[2].revents & POLLOUT_CHECK, pollset);
} else if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
}
@@ -1354,11 +1386,11 @@
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
@@ -1369,11 +1401,11 @@
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
continue;
}
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK);
+ pfds[i].revents & POLLOUT_CHECK, pollset);
}
}
@@ -1449,20 +1481,31 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
+ grpc_pollset *read_notifier_pollset) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
gpr_mu_lock(&fd->mu);
set_ready_locked(exec_ctx, fd, st);
+
+ /* A non-NULL read_notifier_pollset means that the fd is readable. */
+ if (read_notifier_pollset != NULL) {
+ /* Note: Since the fd might be a part of multiple pollsets, this might be
+ * called multiple times (for each time the fd becomes readable) and it is
+ * okay to set the fd's read-notifier pollset to anyone of these pollsets */
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
+
gpr_mu_unlock(&fd->mu);
}
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->read_closure);
+static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_pollset *notifier_pollset) {
+ set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->write_closure);
+ set_ready(exec_ctx, fd, &fd->write_closure, NULL);
}
struct epoll_fd_list {
@@ -1554,7 +1597,7 @@
}
}
}
- fd_end_poll(exec_ctx, &watcher, 0, 0);
+ fd_end_poll(exec_ctx, &watcher, 0, 0, NULL);
}
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1668,7 +1711,7 @@
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
} else {
if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd);
+ fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
@@ -1897,6 +1940,7 @@
.fd_shutdown = fd_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
+ .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index e91ae40..fafb3b4 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -113,6 +113,9 @@
grpc_closure *on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* The pollset that last noticed and notified that the fd is readable */
+ grpc_pollset *read_notifier_pollset;
};
/* Begin polling on an fd.
@@ -134,7 +137,8 @@
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
- int got_read, int got_write);
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd *fd);
@@ -301,6 +305,7 @@
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
+ r->read_notifier_pollset = NULL;
char *name2;
gpr_asprintf(&name2, "%s fd=%d", name, fd);
@@ -316,6 +321,18 @@
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
+/* Return the read-notifier pollset */
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ grpc_pollset *notifier = NULL;
+
+ gpr_mu_lock(&fd->mu);
+ notifier = fd->read_notifier_pollset;
+ gpr_mu_unlock(&fd->mu);
+
+ return notifier;
+}
+
static void pollset_kick_locked(grpc_fd_watcher *watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
@@ -444,6 +461,11 @@
}
}
+static void set_read_notifier_pollset_locked(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
+ fd->read_notifier_pollset = read_notifier_pollset;
+}
+
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
GPR_ASSERT(!fd->shutdown);
@@ -519,7 +541,8 @@
}
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
- int got_read, int got_write) {
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
@@ -555,6 +578,9 @@
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
kick = 1;
}
+ if (read_notifier_pollset != NULL) {
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
}
if (got_write) {
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
@@ -899,11 +925,11 @@
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
@@ -914,10 +940,10 @@
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
} else {
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK);
+ pfds[i].revents & POLLOUT_CHECK, pollset);
}
}
}
@@ -1181,6 +1207,7 @@
.fd_shutdown = fd_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
+ .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index a7dfc95..6477b05 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -163,6 +163,11 @@
g_event_engine->fd_notify_on_write(exec_ctx, fd, closure);
}
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd);
+}
+
size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1fa9f5e..344bf63 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -55,6 +55,8 @@
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
+ grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd);
void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu);
void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -137,6 +139,10 @@
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
+/* Return the read notifier pollset from the fd */
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd);
+
/* pollset_posix functions */
/* Add an fd to a pollset */
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 99b9f29..fee14ae 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -52,6 +52,7 @@
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
+ grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor);
/* Create a server, initially not bound to any ports. The caller owns one ref.
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index aaeb384..c695621 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -310,13 +310,15 @@
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
+ grpc_pollset *read_notifier_pollset = NULL;
grpc_fd *fdobj;
- size_t i;
if (!success) {
goto error;
}
+ read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd);
+
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
struct sockaddr_storage addr;
@@ -349,16 +351,18 @@
}
fdobj = grpc_fd_create(fd, name);
- /* TODO(ctiller): revise this when we have server-side sharding
- of channels -- we certainly should not be automatically adding every
- incoming channel to every pollset owned by the server */
- for (i = 0; i < sp->server->pollset_count; i++) {
- grpc_pollset_add_fd(exec_ctx, sp->server->pollsets[i], fdobj);
+
+ if (read_notifier_pollset == NULL) {
+ gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
+ goto error;
}
+
+ grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- &acceptor);
+ read_notifier_pollset, &acceptor);
gpr_free(name);
gpr_free(addr_str);
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 1f82c3b..5eb7cf1 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -70,6 +70,8 @@
int shutdown;
int shutdown_called;
int is_server_cq;
+ /** Can the server cq accept incoming channels */
+ int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@@ -84,6 +86,7 @@
};
#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
+#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
static gpr_mu g_freelist_mu;
static grpc_completion_queue *g_freelist;
@@ -149,6 +152,7 @@
cc->shutdown = 0;
cc->shutdown_called = 0;
cc->is_server_cq = 0;
+ cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
@@ -511,6 +515,18 @@
return POLLSET_FROM_CQ(cc);
}
+grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
+ return CQ_FROM_POLLSET(ps);
+}
+
+void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+ cc->is_non_listening_server_cq = 1;
+}
+
+bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+ return (cc->is_non_listening_server_cq == 1);
+}
+
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index eef82cf..3d0dd13 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -81,7 +81,10 @@
void *done_arg, grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
+grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
+void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
+bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 2db95b9..4d179d0 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -81,7 +81,6 @@
void *tag;
grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
- grpc_completion_queue *cq_for_notification;
grpc_call **call;
grpc_cq_completion completion;
grpc_metadata_array *initial_metadata;
@@ -108,6 +107,7 @@
grpc_server *server;
grpc_connectivity_state connectivity_state;
grpc_channel *channel;
+ size_t cq_idx;
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
@@ -170,6 +170,7 @@
struct request_matcher {
grpc_server *server;
+ size_t cq_idx;
call_data *pending_head;
call_data *pending_tail;
gpr_stack_lockfree *requests;
@@ -180,7 +181,8 @@
char *host;
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
- request_matcher request_matcher;
+ /* one request matcher per method per cq */
+ request_matcher *request_matchers;
registered_method *next;
};
@@ -195,6 +197,7 @@
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
size_t cq_count;
+ bool started;
/* The two following mutexes control access to server-state
mu_global controls access to non-call-related state (e.g., channel state)
@@ -207,7 +210,8 @@
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- request_matcher unregistered_request_matcher;
+ /** one request matcher for unregistered methods per cq */
+ request_matcher *unregistered_request_matchers;
/** free list of available requested_calls indices */
gpr_stack_lockfree *request_freelist;
/** requested call backing data */
@@ -234,7 +238,7 @@
static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- requested_call *rc);
+ size_t cq_idx, requested_call *rc);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server);
@@ -309,9 +313,10 @@
*/
static void request_matcher_init(request_matcher *rm, size_t entries,
- grpc_server *server) {
+ size_t cq_idx, grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
+ rm->cq_idx = cq_idx;
rm->requests = gpr_stack_lockfree_create(entries);
}
@@ -344,7 +349,8 @@
request_matcher *rm) {
int request_id;
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- fail_call(exec_ctx, server, &server->requested_calls[request_id]);
+ fail_call(exec_ctx, server, rm->cq_idx,
+ &server->requested_calls[request_id]);
}
}
@@ -364,16 +370,24 @@
gpr_mu_destroy(&server->mu_call);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
- request_matcher_destroy(&rm->request_matcher);
+ if (server->started) {
+ for (i = 0; i < server->cq_count; i++) {
+ request_matcher_destroy(&rm->request_matchers[i]);
+ }
+ gpr_free(rm->request_matchers);
+ }
gpr_free(rm->method);
gpr_free(rm->host);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ if (server->started) {
+ request_matcher_destroy(&server->unregistered_request_matchers[i]);
+ }
}
- request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
+ gpr_free(server->unregistered_request_matchers);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -453,11 +467,11 @@
}
static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc) {
+ call_data *calld, size_t cq_idx, requested_call *rc) {
grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
grpc_call *call = calld->call;
*rc->call = call;
- calld->cq_new = rc->cq_for_notification;
+ calld->cq_new = server->cqs[cq_idx];
GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
switch (rc->type) {
case BATCH_CALL:
@@ -525,7 +539,8 @@
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld, rm->cq_idx,
+ &server->requested_calls[request_id]);
}
}
@@ -584,9 +599,10 @@
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request)
continue;
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
- rm->server_registered_method->payload_handling);
+ finish_start_new_rpc(
+ exec_ctx, server, elem,
+ &rm->server_registered_method->request_matchers[chand->cq_idx],
+ rm->server_registered_method->payload_handling);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -600,14 +616,15 @@
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request)
continue;
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
- rm->server_registered_method->payload_handling);
+ finish_start_new_rpc(
+ exec_ctx, server, elem,
+ &rm->server_registered_method->request_matchers[chand->cq_idx],
+ rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(exec_ctx, server, elem,
- &server->unregistered_request_matcher,
+ &server->unregistered_request_matchers[chand->cq_idx],
GRPC_SRM_PAYLOAD_NONE);
}
@@ -637,14 +654,20 @@
static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
grpc_server *server) {
- registered_method *rm;
- request_matcher_kill_requests(exec_ctx, server,
- &server->unregistered_request_matcher);
- request_matcher_zombify_all_pending_calls(
- exec_ctx, &server->unregistered_request_matcher);
- for (rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
+ if (server->started) {
+ for (size_t i = 0; i < server->cq_count; i++) {
+ request_matcher_kill_requests(exec_ctx, server,
+ &server->unregistered_request_matchers[i]);
+ request_matcher_zombify_all_pending_calls(
+ exec_ctx, &server->unregistered_request_matchers[i]);
+ for (registered_method *rm = server->registered_methods; rm;
+ rm = rm->next) {
+ request_matcher_kill_requests(exec_ctx, server,
+ &rm->request_matchers[i]);
+ request_matcher_zombify_all_pending_calls(exec_ctx,
+ &rm->request_matchers[i]);
+ }
+ }
}
}
@@ -895,23 +918,45 @@
"server",
};
-void grpc_server_register_completion_queue(grpc_server *server,
- grpc_completion_queue *cq,
- void *reserved) {
+static void register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq,
+ bool is_non_listening, void *reserved) {
size_t i, n;
- GRPC_API_TRACE(
- "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
- (server, cq, reserved));
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
- GRPC_CQ_INTERNAL_REF(cq, "server");
+
grpc_cq_mark_server_cq(cq);
- n = server->cq_count++;
- server->cqs = gpr_realloc(server->cqs,
- server->cq_count * sizeof(grpc_completion_queue *));
- server->cqs[n] = cq;
+
+ /* Non-listening completion queues are not added to server->cqs */
+ if (is_non_listening) {
+ grpc_cq_mark_non_listening_server_cq(cq);
+ } else {
+ GRPC_CQ_INTERNAL_REF(cq, "server");
+ n = server->cq_count++;
+ server->cqs = gpr_realloc(
+ server->cqs, server->cq_count * sizeof(grpc_completion_queue *));
+ server->cqs[n] = cq;
+ }
+}
+
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq,
+ void *reserved) {
+ GRPC_API_TRACE(
+ "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
+ (server, cq, reserved));
+ register_completion_queue(server, cq, false, reserved);
+}
+
+void grpc_server_register_non_listening_completion_queue(
+ grpc_server *server, grpc_completion_queue *cq, void *reserved) {
+ GRPC_API_TRACE(
+ "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
+ "reserved=%p)",
+ 3, (server, cq, reserved));
+ register_completion_queue(server, cq, true, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@@ -940,8 +985,6 @@
for (i = 0; i < (size_t)server->max_requested_calls; i++) {
gpr_stack_lockfree_push(server->request_freelist, (int)i);
}
- request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls, server);
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
@@ -985,8 +1028,6 @@
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
- request_matcher_init(&m->request_matcher, server->max_requested_calls,
- server);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
@@ -1003,9 +1044,23 @@
GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
+ server->started = true;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->unregistered_request_matchers = gpr_malloc(
+ sizeof(*server->unregistered_request_matchers) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+ request_matcher_init(&server->unregistered_request_matchers[i],
+ server->max_requested_calls, i, server);
+ for (registered_method *rm = server->registered_methods; rm;
+ rm = rm->next) {
+ if (i == 0) {
+ rm->request_matchers =
+ gpr_malloc(sizeof(*rm->request_matchers) * server->cq_count);
+ }
+ request_matcher_init(&rm->request_matchers[i],
+ server->max_requested_calls, i, server);
+ }
}
for (l = server->listeners; l; l = l->next) {
@@ -1017,8 +1072,8 @@
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
+ grpc_pollset *accepting_pollset,
const grpc_channel_args *args) {
- size_t i;
size_t num_registered_methods;
size_t alloc;
registered_method *rm;
@@ -1033,12 +1088,6 @@
uint32_t max_probes = 0;
grpc_transport_op op;
- for (i = 0; i < s->cq_count; i++) {
- memset(&op, 0, sizeof(op));
- op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
- grpc_transport_perform_op(exec_ctx, transport, &op);
- }
-
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
chand = (channel_data *)grpc_channel_stack_element(
@@ -1048,6 +1097,17 @@
server_ref(s);
chand->channel = channel;
+ size_t cq_idx;
+ grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset);
+ for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
+ if (s->cqs[cq_idx] == accepting_cq) break;
+ }
+ if (cq_idx == s->cq_count) {
+ /* completion queue not found: pick a random one to publish new calls to */
+ cq_idx = (size_t)rand() % s->cq_count;
+ }
+ chand->cq_idx = cq_idx;
+
num_registered_methods = 0;
for (rm = s->registered_methods; rm; rm = rm->next) {
num_registered_methods++;
@@ -1218,27 +1278,27 @@
}
static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
- grpc_server *server,
+ grpc_server *server, size_t cq_idx,
requested_call *rc) {
call_data *calld = NULL;
request_matcher *rm = NULL;
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
- fail_call(exec_ctx, server, rc);
+ fail_call(exec_ctx, server, cq_idx, rc);
return GRPC_CALL_OK;
}
request_id = gpr_stack_lockfree_pop(server->request_freelist);
if (request_id == -1) {
/* out of request ids: just fail this one */
- fail_call(exec_ctx, server, rc);
+ fail_call(exec_ctx, server, cq_idx, rc);
return GRPC_CALL_OK;
}
switch (rc->type) {
case BATCH_CALL:
- rm = &server->unregistered_request_matcher;
+ rm = &server->unregistered_request_matchers[cq_idx];
break;
case REGISTERED_CALL:
- rm = &rc->data.registered.registered_method->request_matcher;
+ rm = &rc->data.registered.registered_method->request_matchers[cq_idx];
break;
}
server->requested_calls[request_id] = *rc;
@@ -1264,7 +1324,7 @@
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld,
+ publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls[request_id]);
}
gpr_mu_lock(&server->mu_call);
@@ -1288,7 +1348,13 @@
"cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
7, (server, call, details, initial_metadata, cq_bound_to_call,
cq_for_notification, tag));
- if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ size_t cq_idx;
+ for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+ if (server->cqs[cq_idx] == cq_for_notification) {
+ break;
+ }
+ }
+ if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
@@ -1299,11 +1365,10 @@
rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
- rc->cq_for_notification = cq_for_notification;
rc->call = call;
rc->data.batch.details = details;
rc->initial_metadata = initial_metadata;
- error = queue_call_request(&exec_ctx, server, rc);
+ error = queue_call_request(&exec_ctx, server, cq_idx, rc);
done:
grpc_exec_ctx_finish(&exec_ctx);
return error;
@@ -1325,7 +1390,14 @@
"tag=%p)",
9, (server, rmp, call, deadline, initial_metadata, optional_payload,
cq_bound_to_call, cq_for_notification, tag));
- if (!grpc_cq_is_server_cq(cq_for_notification)) {
+
+ size_t cq_idx;
+ for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+ if (server->cqs[cq_idx] == cq_for_notification) {
+ break;
+ }
+ }
+ if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
@@ -1341,26 +1413,25 @@
rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
- rc->cq_for_notification = cq_for_notification;
rc->call = call;
rc->data.registered.registered_method = rm;
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
- error = queue_call_request(&exec_ctx, server, rc);
+ error = queue_call_request(&exec_ctx, server, cq_idx, rc);
done:
grpc_exec_ctx_finish(&exec_ctx);
return error;
}
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- requested_call *rc) {
+ size_t cq_idx, requested_call *rc) {
*rc->call = NULL;
rc->initial_metadata->count = 0;
server_ref(server);
- grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0,
- done_request_event, rc, &rc->completion);
+ grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, 0, done_request_event,
+ rc, &rc->completion);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 470ef23..fb6e4d6 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -53,6 +53,7 @@
server */
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_transport *transport,
+ grpc_pollset *accepting_pollset,
const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 61f0f6a..fbcb3ce 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -60,8 +60,9 @@
}
}
-std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
- ServerCompletionQueue* cq = new ServerCompletionQueue();
+std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
+ bool is_frequently_polled) {
+ ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@@ -99,8 +100,11 @@
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<ThreadPoolInterface> thread_pool;
+ // Does this server have atleast one sync method
+ bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) {
+ has_sync_methods = true;
if (thread_pool == nullptr) {
thread_pool.reset(CreateDefaultThreadPool());
break;
@@ -128,10 +132,33 @@
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
ServerInitializer* initializer = server->initializer();
+
+ // If the server has atleast one sync methods, we know that this is a Sync
+ // server or a Hybrid server and the completion queue (server->cq_) would be
+ // frequently polled.
+ int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
+
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
- nullptr);
+ // A completion queue that is not polled frequently (by calling Next() or
+ // AsyncNext()) is not safe to use for listening to incoming channels.
+ // Register all such completion queues as non-listening completion queues
+ // with the GRPC core library.
+ if ((*cq)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ nullptr);
+ num_frequently_polled_cqs++;
+ } else {
+ grpc_server_register_non_listening_completion_queue(server->server_,
+ (*cq)->cq(), nullptr);
+ }
}
+
+ if (num_frequently_polled_cqs == 0) {
+ gpr_log(GPR_ERROR,
+ "Atleast one of the completion queues must be frequently polled");
+ return nullptr;
+ }
+
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService((*service)->host.get(), (*service)->service)) {
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index 0955147..f71cf12 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -115,6 +115,7 @@
grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
grpc_server_create_type grpc_server_create_import;
grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
+grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
grpc_server_start_type grpc_server_start_import;
grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@@ -386,6 +387,7 @@
grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
+ grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index 6de2954..a364075 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -296,6 +296,9 @@
typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
+typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
+extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
+#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
#define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index cebbe8c..3b62984 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -115,6 +115,7 @@
grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
grpc_server_create_type grpc_server_create_import;
grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
+grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
grpc_server_start_type grpc_server_start_import;
grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@@ -382,6 +383,7 @@
grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
+ grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index d7ea6c5..1428e6d 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -296,6 +296,9 @@
typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
+typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
+extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
+#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
#define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index e582068..f753b6f 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -70,7 +70,7 @@
static void server_setup_transport(void *ts, grpc_transport *transport) {
thd_args *a = ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_server_setup_transport(&exec_ctx, a->server, transport,
+ grpc_server_setup_transport(&exec_ctx, a->server, transport, NULL,
grpc_server_get_channel_args(a->server));
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c
index c1b8452..f21d651 100644
--- a/test/core/client_config/set_initial_connect_string_test.c
+++ b/test/core/client_config/set_initial_connect_string_test.c
@@ -79,7 +79,7 @@
}
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,grpc_pollset*accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
test_tcp_server *server = arg;
grpc_closure_init(&on_read, handle_read, NULL);
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c
index 87533a9..6b0769b 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.c
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.c
@@ -50,6 +50,7 @@
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@@ -60,7 +61,9 @@
static void server_setup_transport(void *ts, grpc_transport *transport) {
grpc_end2end_test_fixture *f = ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_server_setup_transport(&exec_ctx, f->server, transport,
+ grpc_endpoint_pair *sfd = f->fixture_data;
+ grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
+ grpc_server_setup_transport(&exec_ctx, f->server, transport, NULL,
grpc_server_get_channel_args(f->server));
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c
index f28147c..7be88f8 100644
--- a/test/core/end2end/fixtures/h2_sockpair.c
+++ b/test/core/end2end/fixtures/h2_sockpair.c
@@ -49,6 +49,7 @@
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@@ -59,7 +60,9 @@
static void server_setup_transport(void *ts, grpc_transport *transport) {
grpc_end2end_test_fixture *f = ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_server_setup_transport(&exec_ctx, f->server, transport,
+ grpc_endpoint_pair *sfd = f->fixture_data;
+ grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
+ grpc_server_setup_transport(&exec_ctx, f->server, transport, NULL,
grpc_server_get_channel_args(f->server));
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c
index 302b16b..166654b 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.c
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c
@@ -49,6 +49,7 @@
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@@ -59,7 +60,9 @@
static void server_setup_transport(void *ts, grpc_transport *transport) {
grpc_end2end_test_fixture *f = ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_server_setup_transport(&exec_ctx, f->server, transport,
+ grpc_endpoint_pair *sfd = f->fixture_data;
+ grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
+ grpc_server_setup_transport(&exec_ctx, f->server, transport, NULL,
grpc_server_get_channel_args(f->server));
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c
index cdfa960..e79aeb8 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.c
+++ b/test/core/end2end/fuzzers/api_fuzzer.c
@@ -252,7 +252,7 @@
grpc_transport *transport =
grpc_create_chttp2_transport(exec_ctx, NULL, server, 0);
- grpc_server_setup_transport(exec_ctx, g_server, transport, NULL);
+ grpc_server_setup_transport(exec_ctx, g_server, transport, NULL, NULL);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
grpc_exec_ctx_enqueue(exec_ctx, fc->closure, false, NULL);
diff --git a/test/core/end2end/fuzzers/server_fuzzer.c b/test/core/end2end/fuzzers/server_fuzzer.c
index 4027371..0a7d6d9 100644
--- a/test/core/end2end/fuzzers/server_fuzzer.c
+++ b/test/core/end2end/fuzzers/server_fuzzer.c
@@ -69,7 +69,7 @@
grpc_server_start(server);
grpc_transport *transport =
grpc_create_chttp2_transport(&exec_ctx, NULL, mock_endpoint, 0);
- grpc_server_setup_transport(&exec_ctx, server, transport, NULL);
+ grpc_server_setup_transport(&exec_ctx, server, transport, NULL, NULL);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0);
grpc_call *call1 = NULL;
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index 266d239..365bfbb 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -112,7 +112,7 @@
weak_ref->server = server;
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *pollset,
grpc_tcp_server_acceptor *acceptor) {
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_destroy(exec_ctx, tcp);
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index d62d5a9..49a1fc4 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -63,6 +63,12 @@
shutdown_and_destroy(grpc_completion_queue_create(NULL));
}
+static void test_pollset_conversion(void) {
+ grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
+ GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq);
+ shutdown_and_destroy(cq);
+}
+
static void test_wait_empty(void) {
grpc_completion_queue *cc;
grpc_event event;
@@ -408,6 +414,7 @@
grpc_test_init(argc, argv);
grpc_init();
test_no_op();
+ test_pollset_conversion();
test_wait_empty();
test_shutdown_then_next_polling();
test_shutdown_then_next_with_timeout();
diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c
index 28ddf58..af23fba 100644
--- a/test/core/surface/concurrent_connectivity_test.c
+++ b/test/core/surface/concurrent_connectivity_test.c
@@ -95,7 +95,7 @@
GPR_ASSERT(detag(ev.tag) == 0xd1e);
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp,
+static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, grpc_pollset*accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
struct server_thread_args *args = (struct server_thread_args *)vargs;
(void)acceptor;
diff --git a/test/core/surface/server_test.c b/test/core/surface/server_test.c
index 3d2e253..1e94c5a 100644
--- a/test/core/surface/server_test.c
+++ b/test/core/surface/server_test.c
@@ -67,12 +67,14 @@
void test_request_call_on_no_server_cq(void) {
grpc_completion_queue *cc = grpc_completion_queue_create(NULL);
+ grpc_server *server = grpc_server_create(NULL, NULL);
GPR_ASSERT(GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE ==
- grpc_server_request_call(NULL, NULL, NULL, NULL, cc, cc, NULL));
+ grpc_server_request_call(server, NULL, NULL, NULL, cc, cc, NULL));
GPR_ASSERT(GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE ==
- grpc_server_request_registered_call(NULL, NULL, NULL, NULL, NULL,
+ grpc_server_request_registered_call(server, NULL, NULL, NULL, NULL,
NULL, cc, cc, NULL));
grpc_completion_queue_destroy(cc);
+ grpc_server_destroy(server);
}
void test_bind_server_twice(void) {
diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c
index d408374..d3d8f5a 100644
--- a/test/core/util/reconnect_server.c
+++ b/test/core/util/reconnect_server.c
@@ -70,7 +70,7 @@
}
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
char *peer;
char *last_colon;
diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc
index 02043a8..0423448 100644
--- a/test/cpp/end2end/hybrid_end2end_test.cc
+++ b/test/cpp/end2end/hybrid_end2end_test.cc
@@ -216,7 +216,7 @@
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
- cqs_.push_back(builder.AddCompletionQueue());
+ cqs_.push_back(builder.AddCompletionQueue(false));
}
server_ = builder.BuildAndStart();
}
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 175529f..2a89eb8 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -38,6 +38,7 @@
#include <mutex>
#include <vector>
+#include <grpc++/channel.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/slice.h>
@@ -315,6 +316,10 @@
target, config.security_params().server_host_override(),
config.has_security_params(), !config.security_params().use_test_ca(),
std::shared_ptr<CallCredentials>(), args);
+ gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
+ GPR_ASSERT(channel_->WaitForConnected(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(30, GPR_TIMESPAN))));
stub_ = create_stub(channel_);
}
Channel* get_channel() { return channel_.get(); }
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index c3e72d9..f94ea0c 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -50,8 +50,8 @@
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(1000);
- client_config.set_client_channels(8);
+ client_config.set_outstanding_rpcs_per_channel(100);
+ client_config.set_client_channels(64);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop();
diff --git a/third_party/protobuf b/third_party/protobuf
index a1938b2..d5fb408 160000
--- a/third_party/protobuf
+++ b/third_party/protobuf
@@ -1 +1 @@
-Subproject commit a1938b2aa9ca86ce7ce50c27ff9737c1008d2a03
+Subproject commit d5fb408ddc281ffcadeb08699e65bb694656d0bd