Adapted the following to the new iomgr's cb API:
alarm_test, tcp_posix, fd_posix, pollset_posix, credentials, call,
channel, server, child_channel
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index d22542f..e74c32b 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -33,6 +33,7 @@
#include "src/core/iomgr/iomgr.h"
+#include <assert.h>
#include <stdlib.h>
#include "src/core/iomgr/iomgr_internal.h"
@@ -42,17 +43,10 @@
#include <grpc/support/thd.h>
#include <grpc/support/sync.h>
-typedef struct delayed_callback {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- struct delayed_callback *next;
-} delayed_callback;
-
static gpr_mu g_mu;
static gpr_cv g_rcv;
-static delayed_callback *g_cbs_head = NULL;
-static delayed_callback *g_cbs_tail = NULL;
+static grpc_iomgr_closure *g_cbs_head = NULL;
+static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
static int g_refs;
static gpr_event g_background_callback_executor_done;
@@ -66,12 +60,18 @@
gpr_timespec short_deadline =
gpr_time_add(gpr_now(), gpr_time_from_millis(100));
if (g_cbs_head) {
- delayed_callback *cb = g_cbs_head;
- g_cbs_head = cb->next;
+ grpc_iomgr_closure *iocb = g_cbs_head;
+ int is_cb_ext_managed;
+ g_cbs_head = iocb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, cb->success);
- gpr_free(cb);
+ /* capture the managed state, as the callback may deallocate itself */
+ is_cb_ext_managed = iocb->is_ext_managed;
+ assert(iocb->success >= 0);
+ iocb->cb(iocb->cb_arg, iocb->success);
+ if (!is_cb_ext_managed) {
+ gpr_free(iocb);
+ }
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
} else {
@@ -103,7 +103,7 @@
}
void grpc_iomgr_shutdown(void) {
- delayed_callback *cb;
+ grpc_iomgr_closure *iocb;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
@@ -114,13 +114,18 @@
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
g_cbs_head ? " and executing final callbacks" : "");
while (g_cbs_head) {
- cb = g_cbs_head;
- g_cbs_head = cb->next;
+ int is_cb_ext_managed;
+ iocb = g_cbs_head;
+ g_cbs_head = iocb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, 0);
- gpr_free(cb);
+ /* capture the managed state, as the callback may deallocate itself */
+ is_cb_ext_managed = iocb->is_ext_managed;
+ iocb->cb(iocb->cb_arg, 0);
+ if (!is_cb_ext_managed) {
+ gpr_free(iocb);
+ }
gpr_mu_lock(&g_mu);
}
if (g_refs) {
@@ -167,42 +172,52 @@
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success) {
- delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
- dcb->cb = cb;
- dcb->cb_arg = cb_arg;
- dcb->success = success;
+grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ int is_ext_managed) {
+ grpc_iomgr_closure *iocb = gpr_malloc(sizeof(grpc_iomgr_closure));
+ iocb->cb = cb;
+ iocb->cb_arg = cb_arg;
+ iocb->is_ext_managed = is_ext_managed;
+ iocb->success = -1; /* uninitialized */
+ iocb->next = NULL;
+ return iocb;
+}
+
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success) {
+ iocb->success = success;
gpr_mu_lock(&g_mu);
- dcb->next = NULL;
+ iocb->next = NULL;
if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = dcb;
+ g_cbs_head = g_cbs_tail = iocb;
} else {
- g_cbs_tail->next = dcb;
- g_cbs_tail = dcb;
+ g_cbs_tail->next = iocb;
+ g_cbs_tail = iocb;
}
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
- grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
+
+void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb) {
+ grpc_iomgr_add_delayed_callback(iocb, 1);
}
+
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
- delayed_callback *cb;
+ grpc_iomgr_closure *iocb;
for (;;) {
+ int is_cb_ext_managed;
/* check for new work */
if (!gpr_mu_trylock(&g_mu)) {
break;
}
- cb = g_cbs_head;
- if (!cb) {
+ iocb = g_cbs_head;
+ if (!iocb) {
gpr_mu_unlock(&g_mu);
break;
}
- g_cbs_head = cb->next;
+ g_cbs_head = iocb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
/* if we have a mutex to drop, do so before executing work */
@@ -211,8 +226,13 @@
retake_mu = drop_mu;
drop_mu = NULL;
}
- cb->cb(cb->cb_arg, success && cb->success);
- gpr_free(cb);
+ /* capture the managed state, as the callback may deallocate itself */
+ is_cb_ext_managed = iocb->is_ext_managed;
+ assert(iocb->success >= 0);
+ iocb->cb(iocb->cb_arg, success && iocb->success);
+ if (!is_cb_ext_managed) {
+ gpr_free(iocb);
+ }
n++;
}
if (retake_mu) {