Add callback when gRPC FD is about to be orphaned.
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index df6cf95..98ffccd 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -81,6 +81,7 @@
grpc_closure read_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
+ grpc_udp_server_orphan_cb orphan_cb;
} server_port;
/* the overall server */
@@ -168,6 +169,10 @@
server_port *sp = &s->ports[i];
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
@@ -268,7 +273,8 @@
static int add_socket_to_server(grpc_udp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len,
- grpc_udp_server_read_cb read_cb) {
+ grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
server_port *sp;
int port;
char *addr_str;
@@ -292,6 +298,7 @@
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
+ sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@@ -301,7 +308,8 @@
}
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb) {
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
int allocated_port1 = -1;
int allocated_port2 = -1;
unsigned i;
@@ -348,7 +356,8 @@
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
- allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port1 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -370,7 +379,8 @@
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port2 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
done:
gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index d8cf957..33c5ce1 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -48,6 +48,9 @@
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server);
+/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
+typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
+
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
@@ -69,7 +72,8 @@
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb);
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done);
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 5248b61..a29a68e 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -54,6 +54,7 @@
static gpr_mu *g_mu;
static int g_number_of_reads = 0;
static int g_number_of_bytes_read = 0;
+static int g_number_of_orphan_calls = 0;
static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
grpc_server *server) {
@@ -71,6 +72,12 @@
gpr_mu_unlock(g_mu);
}
+static void on_fd_orphaned(grpc_fd *emfd) {
+ gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
+ grpc_fd_wrapped_fd(emfd));
+ g_number_of_orphan_calls++;
+}
+
static void test_no_op(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_udp_server *s = grpc_udp_server_create();
@@ -88,6 +95,7 @@
}
static void test_no_op_with_port(void) {
+ g_number_of_orphan_calls = 0;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_udp_server *s = grpc_udp_server_create();
@@ -96,13 +104,17 @@
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
- on_read));
+ on_read, on_fd_orphaned));
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);
+
+ /* The server had a single FD, which should be orphaned. */
+ GPR_ASSERT(g_number_of_orphan_calls == 1);
}
static void test_no_op_with_port_and_start(void) {
+ g_number_of_orphan_calls = 0;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_udp_server *s = grpc_udp_server_create();
@@ -111,12 +123,15 @@
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
- on_read));
+ on_read, on_fd_orphaned));
grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL);
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);
+
+ /* The server had a single FD which should be orphaned. */
+ GPR_ASSERT(g_number_of_orphan_calls == 1);
}
static void test_receive(int number_of_clients) {
@@ -133,11 +148,12 @@
gpr_log(GPR_INFO, "clients=%d", number_of_clients);
g_number_of_bytes_read = 0;
+ g_number_of_orphan_calls = 0;
memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET;
- GPR_ASSERT(
- grpc_udp_server_add_port(s, (struct sockaddr *)&addr, addr_len, on_read));
+ GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, addr_len,
+ on_read, on_fd_orphaned));
svrfd = grpc_udp_server_get_fd(s, 0);
GPR_ASSERT(svrfd >= 0);
@@ -176,6 +192,8 @@
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);
+
+ GPR_ASSERT(g_number_of_orphan_calls == 5);
}
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {