clang-format
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h
index ad74efa..21ac6c4 100644
--- a/include/grpc++/impl/codegen/method_handler_impl.h
+++ b/include/grpc++/impl/codegen/method_handler_impl.h
@@ -44,10 +44,10 @@
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
public:
- RpcMethodHandler(
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ResponseType*)> func,
- ServiceType* service)
+ RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
+ const RequestType*, ResponseType*)>
+ func,
+ ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
@@ -88,7 +88,8 @@
public:
ClientStreamingHandler(
std::function<Status(ServiceType*, ServerContext*,
- ServerReader<RequestType>*, ResponseType*)> func,
+ ServerReader<RequestType>*, ResponseType*)>
+ func,
ServiceType* service)
: func_(func), service_(service) {}
@@ -124,7 +125,8 @@
public:
ServerStreamingHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ServerWriter<ResponseType>*)> func,
+ ServerWriter<ResponseType>*)>
+ func,
ServiceType* service)
: func_(func), service_(service) {}
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 69489f4..0240ea0 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -924,10 +924,10 @@
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0);
- continue;
+ } else {
+ fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+ pfds[i].revents & POLLOUT_CHECK);
}
- fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK);
}
}
@@ -963,6 +963,9 @@
}
keep_polling = 1;
}
+ if (keep_polling) {
+ now = gpr_now(now.clock_type);
+ }
}
if (added_worker) {
remove_worker(pollset, &worker);
diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c
index 83058d9..5b7f222 100644
--- a/test/core/client_config/set_initial_connect_string_test.c
+++ b/test/core/client_config/set_initial_connect_string_test.c
@@ -37,6 +37,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
+#include <grpc/support/thd.h>
#include "src/core/ext/client_config/initial_connect_string.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -139,7 +140,7 @@
state.op.reserved = NULL;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(state.call, &state.op,
(size_t)(1), NULL, NULL));
- grpc_completion_queue_next(state.cq, n_sec_deadline(1), NULL);
+ grpc_completion_queue_next(state.cq, n_sec_deadline(5), NULL);
}
static void cleanup_rpc(void) {
@@ -157,12 +158,29 @@
gpr_free(state.target);
}
-static void poll_server_until_read_done(test_tcp_server *server) {
- gpr_timespec deadline = n_sec_deadline(5);
+typedef struct {
+ test_tcp_server *server;
+ gpr_event *signal_when_done;
+} poll_args;
+
+static void actually_poll_server(void *arg) {
+ poll_args *pa = arg;
+ gpr_timespec deadline = n_sec_deadline(10);
while (state.done == 0 &&
gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0) {
- test_tcp_server_poll(server, 1);
+ test_tcp_server_poll(pa->server, 1);
}
+ gpr_event_set(pa->signal_when_done, (void *)1);
+ gpr_free(pa);
+}
+
+static void poll_server_until_read_done(test_tcp_server *server,
+ gpr_event *signal_when_done) {
+ gpr_thd_id id;
+ poll_args *pa = gpr_malloc(sizeof(*pa));
+ pa->server = server;
+ pa->signal_when_done = signal_when_done;
+ gpr_thd_new(&id, actually_poll_server, pa, NULL);
}
static void match_initial_magic_string(gpr_slice_buffer *buffer) {
@@ -180,20 +198,26 @@
}
static void test_initial_string(test_tcp_server *server, int secure) {
+ gpr_event ev;
+ gpr_event_init(&ev);
grpc_test_set_initial_connect_string_function(set_magic_initial_string);
+ poll_server_until_read_done(server, &ev);
start_rpc(secure, server_port);
- poll_server_until_read_done(server);
+ gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
match_initial_magic_string(&state.incoming_buffer);
cleanup_rpc();
}
static void test_initial_string_with_redirect(test_tcp_server *server,
int secure) {
+ gpr_event ev;
+ gpr_event_init(&ev);
int another_port = grpc_pick_unused_port_or_die();
grpc_test_set_initial_connect_string_function(
reset_addr_and_set_magic_string);
+ poll_server_until_read_done(server, &ev);
start_rpc(secure, another_port);
- poll_server_until_read_done(server);
+ gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
match_initial_magic_string(&state.incoming_buffer);
cleanup_rpc();
}
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index e72cef2..c32160a 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -84,7 +84,8 @@
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
- CompletionQueue*)> start_req,
+ CompletionQueue*)>
+ start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
@@ -165,7 +166,8 @@
AsyncClient(const ClientConfig& config,
std::function<ClientRpcContext*(
StubType*, std::function<gpr_timespec()> next_issue,
- const RequestType&)> setup_ctx,
+ const RequestType&)>
+ setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub)
: ClientImpl<StubType, RequestType>(config, create_stub),
@@ -278,7 +280,8 @@
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
- void*)> start_req,
+ void*)>
+ start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
@@ -405,7 +408,8 @@
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*,
- const grpc::string& method_name, CompletionQueue*, void*)> start_req,
+ const grpc::string& method_name, CompletionQueue*, void*)>
+ start_req,
std::function<void(grpc::Status, ByteBuffer*)> on_done)
: context_(),
stub_(stub),
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index a68f1ae..1234542 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -73,7 +73,8 @@
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_function,
std::function<grpc::Status(const PayloadConfig &, const RequestType *,
- ResponseType *)> process_rpc)
+ ResponseType *)>
+ process_rpc)
: Server(config) {
char *server_address = NULL;
@@ -190,7 +191,8 @@
ServerRpcContextUnaryImpl(
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType> *,
- void *)> request_method,
+ void *)>
+ request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),