Introduce the (outside-of-iomgr) pollset API.
This CL introduces the public side of this interface. There will need to be an
iomgr-private API also, but this will be a per-implementation API and so is not
covered here.
I've taken care of wiring the interface through the codebase in the manner that
I expect it will be used.
Change on 2014/12/17 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82376987
diff --git a/Makefile b/Makefile
index f75e836..a1b8c38 100644
--- a/Makefile
+++ b/Makefile
@@ -889,6 +889,7 @@
src/core/iomgr/endpoint_pair_posix.c \
src/core/iomgr/iomgr_libevent.c \
src/core/iomgr/iomgr_libevent_use_threads.c \
+ src/core/iomgr/pollset.c \
src/core/iomgr/resolve_address_posix.c \
src/core/iomgr/sockaddr_utils.c \
src/core/iomgr/socket_utils_common_posix.c \
@@ -2018,6 +2019,7 @@
src/core/iomgr/endpoint_pair_posix.c \
src/core/iomgr/iomgr_libevent.c \
src/core/iomgr/iomgr_libevent_use_threads.c \
+ src/core/iomgr/pollset.c \
src/core/iomgr/resolve_address_posix.c \
src/core/iomgr/sockaddr_utils.c \
src/core/iomgr/socket_utils_common_posix.c \
diff --git a/build.json b/build.json
index 72c74d9..a970172 100644
--- a/build.json
+++ b/build.json
@@ -119,6 +119,7 @@
"src/core/iomgr/endpoint_pair_posix.c",
"src/core/iomgr/iomgr_libevent.c",
"src/core/iomgr/iomgr_libevent_use_threads.c",
+ "src/core/iomgr/pollset.c",
"src/core/iomgr/resolve_address_posix.c",
"src/core/iomgr/sockaddr_utils.c",
"src/core/iomgr/socket_utils_common_posix.c",
@@ -215,6 +216,7 @@
"src/core/iomgr/iomgr_completion_queue_interface.h",
"src/core/iomgr/iomgr.h",
"src/core/iomgr/iomgr_libevent.h",
+ "src/core/iomgr/pollset.h",
"src/core/iomgr/resolve_address.h",
"src/core/iomgr/sockaddr.h",
"src/core/iomgr/sockaddr_posix.h",
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
index 4a98cbf..9a7838c 100644
--- a/src/core/channel/call_op_string.c
+++ b/src/core/channel/call_op_string.c
@@ -107,7 +107,7 @@
op->data.deadline.tv_nsec);
break;
case GRPC_SEND_START:
- bprintf(&b, "SEND_START");
+ bprintf(&b, "SEND_START pollset=%p", op->data.start.pollset);
break;
case GRPC_SEND_MESSAGE:
bprintf(&b, "SEND_MESSAGE");
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index b837caf..14e972f 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -108,6 +108,9 @@
/* Argument data, matching up with grpc_call_op_type names */
union {
+ struct {
+ grpc_pollset *pollset;
+ } start;
grpc_byte_buffer *message;
grpc_mdelem *metadata;
gpr_timespec deadline;
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a146a7b..6329932 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -98,6 +98,7 @@
void (*on_complete)(void *user_data, grpc_op_error error);
void *on_complete_user_data;
gpr_uint32 start_flags;
+ grpc_pollset *pollset;
} waiting;
} s;
};
@@ -186,6 +187,7 @@
calld->s.waiting.on_complete = op->done_cb;
calld->s.waiting.on_complete_user_data = op->user_data;
calld->s.waiting.start_flags = op->flags;
+ calld->s.waiting.pollset = op->data.start.pollset;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
@@ -523,6 +525,7 @@
call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
call_ops[i].user_data =
waiting_children[i]->s.waiting.on_complete_user_data;
+ call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 8581fb4..5faa03c 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -132,6 +132,7 @@
op->user_data);
break;
case GRPC_SEND_START:
+ grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
end_bufferable_op(op, chand, calld, 0);
break;
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 259c948..f1944bf 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -44,6 +44,10 @@
return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline);
}
+void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+ ep->vtable->add_to_pollset(ep, pollset);
+}
+
void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index 88949fb..bbd800b 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -34,6 +34,7 @@
#ifndef __GRPC_INTERNAL_IOMGR_ENDPOINT_H__
#define __GRPC_INTERNAL_IOMGR_ENDPOINT_H__
+#include "src/core/iomgr/pollset.h"
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
@@ -69,6 +70,7 @@
grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_endpoint_write_cb cb,
void *user_data, gpr_timespec deadline);
+ void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
};
@@ -92,6 +94,10 @@
void grpc_endpoint_shutdown(grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_endpoint *ep);
+/* Add an endpoint to a pollset, so that when the pollset is polled, events from
+ this endpoint are considered */
+void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
+
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
};
diff --git a/src/core/iomgr/pollset.c b/src/core/iomgr/pollset.c
new file mode 100644
index 0000000..62a0019
--- /dev/null
+++ b/src/core/iomgr/pollset.c
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/pollset.h"
+
+void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; }
+void grpc_pollset_destroy(grpc_pollset *pollset) {}
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
new file mode 100644
index 0000000..ba1a9d5
--- /dev/null
+++ b/src/core/iomgr/pollset.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_
+#define __GRPC_INTERNAL_IOMGR_POLLSET_H_
+
+/* A grpc_pollset is a set of file descriptors that a higher level item is
+ interested in. For example:
+ - a server will typically keep a pollset containing all connected channels,
+ so that it can find new calls to service
+ - a completion queue might keep a pollset with an entry for each transport
+ that is servicing a call that it's tracking */
+/* Eventually different implementations of iomgr will provide their own
+ grpc_pollset structs. As this is just a dummy wrapper to get the API in,
+ we just define a simple type here. */
+typedef struct { char unused; } grpc_pollset;
+
+void grpc_pollset_init(grpc_pollset *pollset);
+void grpc_pollset_destroy(grpc_pollset *pollset);
+
+#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index b59e916..bc3ce69 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -538,9 +538,14 @@
return status;
}
-static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read,
- grpc_tcp_write, grpc_tcp_shutdown,
- grpc_tcp_destroy};
+static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+ /* tickle the pollset so we crash if things aren't wired correctly */
+ pollset->unused++;
+}
+
+static const grpc_endpoint_vtable vtable = {
+ grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
+ grpc_tcp_shutdown, grpc_tcp_destroy};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 34be425..cab09ca 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -325,8 +325,15 @@
secure_endpoint_unref(ep);
}
+static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
+ grpc_pollset *pollset) {
+ secure_endpoint *ep = (secure_endpoint *)secure_ep;
+ grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
+}
+
static const grpc_endpoint_vtable vtable = {
- endpoint_notify_on_read, endpoint_write, endpoint_shutdown, endpoint_unref};
+ endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset,
+ endpoint_shutdown, endpoint_unref};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ba4c806..6270ce6 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -409,6 +409,7 @@
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = done_invoke;
+ op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
elem = CALL_ELEM_FROM_CALL(call, 0);
@@ -480,6 +481,7 @@
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = do_nothing;
+ op.data.start.pollset = grpc_cq_pollset(call->cq);
op.user_data = NULL;
elem = CALL_ELEM_FROM_CALL(call, 0);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b890f9d..4837f5b 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -66,6 +66,8 @@
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
once all queued events are drained */
gpr_refcount refs;
+ /* the set of low level i/o things that concern this cq */
+ grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
int shutdown;
/* Head of a linked list of queued events (prev points to the last element) */
@@ -87,6 +89,7 @@
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
+ grpc_pollset_init(&cc->pollset);
cc->allow_polling = 1;
return cc;
}
@@ -367,6 +370,7 @@
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
GPR_ASSERT(cc->queue == NULL);
+ grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
@@ -392,3 +396,7 @@
gpr_log(GPR_INFO, "pending ops:%s", tmp);
#endif
}
+
+grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
+ return &cc->pollset;
+}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 8f3d34a..2e752a3 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -36,6 +36,7 @@
/* Internal API for completion channels */
+#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
/* A finish func is executed whenever the event consumer calls
@@ -101,4 +102,6 @@
void grpc_cq_dump_pending_ops(grpc_completion_queue *cc);
+grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
+
#endif /* __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 9eaa3c0..3829e7a 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -491,6 +491,8 @@
}
filters[i] = &grpc_connected_channel_filter;
+ grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
+
channel = grpc_channel_create_from_filters(filters, num_filters,
s->channel_args, mdctx, 0);
chand = (channel_data *)grpc_channel_stack_element(
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 0570439..a8ae8cc 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1696,14 +1696,23 @@
}
}
+static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
+ transport *t = (transport *)gt;
+ lock(t);
+ if (t->ep) {
+ grpc_endpoint_add_to_pollset(t->ep, pollset);
+ }
+ unlock(t);
+}
+
/*
* INTEGRATION GLUE
*/
static const grpc_transport_vtable vtable = {
sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- destroy_stream, abort_stream, goaway, close_transport, send_ping,
- destroy_transport};
+ add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
+ send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 1c44abc..0ca67ac 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -66,6 +66,11 @@
transport->vtable->set_allow_window_updates(transport, stream, allow);
}
+void grpc_transport_add_to_pollset(grpc_transport *transport,
+ grpc_pollset *pollset) {
+ transport->vtable->add_to_pollset(transport, pollset);
+}
+
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream) {
transport->vtable->destroy_stream(transport, stream);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 6a089a2..00dacbf 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -36,6 +36,7 @@
#include <stddef.h>
+#include "src/core/iomgr/pollset.h"
#include "src/core/transport/stream_op.h"
/* forward declarations */
@@ -202,15 +203,18 @@
void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
grpc_status_code status);
+void grpc_transport_add_to_pollset(grpc_transport *transport,
+ grpc_pollset *pollset);
+
/* Advise peer of pending connection termination. */
-void grpc_transport_goaway(struct grpc_transport *transport,
- grpc_status_code status, gpr_slice debug_data);
+void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
+ gpr_slice debug_data);
/* Close a transport. Aborts all open streams. */
-void grpc_transport_close(struct grpc_transport *transport);
+void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
-void grpc_transport_destroy(struct grpc_transport *transport);
+void grpc_transport_destroy(grpc_transport *transport);
/* Return type for grpc_transport_setup_callback */
typedef struct grpc_transport_setup_result {
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index 328ead2..9f497b9 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -53,6 +53,9 @@
void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
int allow);
+ /* implementation of grpc_transport_add_to_pollset */
+ void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
+
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj
index 9cc576a..391354c 100644
--- a/vsprojects/vs2013/grpc.vcxproj
+++ b/vsprojects/vs2013/grpc.vcxproj
@@ -104,6 +104,7 @@
<ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" />
+ <ClInclude Include="..\..\src\core\iomgr\pollset.h" />
<ClInclude Include="..\..\src\core\iomgr\resolve_address.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" />
@@ -209,6 +210,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c">
</ClCompile>
+ <ClCompile Include="..\..\src\core\iomgr\pollset.c">
+ </ClCompile>
<ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\sockaddr_utils.c">