Adapted the following to the new iomgr's cb API:
alarm_test, tcp_posix, fd_posix, pollset_posix, credentials, call,
channel, server, child_channel
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b697fcc..3509d02 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -91,6 +91,7 @@
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
}
+
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
@@ -117,7 +118,10 @@
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
close(fd->fd);
- grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
+ fd->on_done_iocb.cb = fd->on_done;
+ fd->on_done_iocb.cb_arg = fd->on_done_user_data;
+ fd->on_done_iocb.is_ext_managed = 1;
+ grpc_iomgr_add_callback(&fd->on_done_iocb);
freelist_fd(fd);
grpc_iomgr_unref();
} else {
@@ -196,20 +200,25 @@
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
- int allow_synchronous_callback) {
+ int allow_synchronous_callback,
+ grpc_iomgr_closure *iocb) {
if (allow_synchronous_callback) {
cb(arg, success);
} else {
- grpc_iomgr_add_delayed_callback(cb, arg, success);
+ /* !iocb: allocate -> managed by iomgr
+ * iocb: "iocb" holds an instance managed by fd_posix */
+ iocb = grpc_iomgr_cb_create(cb, arg, !iocb /* is_ext_managed */);
+ grpc_iomgr_add_delayed_callback(iocb, success);
}
}
static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
- int allow_synchronous_callback) {
+ int allow_synchronous_callback,
+ grpc_iomgr_closure *iocbs) {
size_t i;
for (i = 0; i < n; i++) {
make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
- allow_synchronous_callback);
+ allow_synchronous_callback, iocbs + i);
}
}
@@ -238,7 +247,7 @@
gpr_atm_rel_store(st, NOT_READY);
make_callback(closure->cb, closure->cb_arg,
!gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback);
+ allow_synchronous_callback, NULL);
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@@ -284,11 +293,14 @@
int success;
grpc_iomgr_closure cb;
size_t ncb = 0;
+ grpc_iomgr_closure *ready_iocb;
gpr_mu_lock(&fd->set_state_mu);
set_ready_locked(st, &cb, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
success = !gpr_atm_acq_load(&fd->shutdown);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback);
+ assert(ncb <= 1);
+ ready_iocb = grpc_iomgr_cb_create(cb.cb, cb.cb_arg, 0);
+ make_callbacks(&cb, ncb, success, allow_synchronous_callback, ready_iocb);
}
void grpc_fd_shutdown(grpc_fd *fd) {
@@ -300,7 +312,8 @@
set_ready_locked(&fd->readst, cb, &ncb);
set_ready_locked(&fd->writest, cb, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
- make_callbacks(cb, ncb, 0, 0);
+ assert(ncb <= 2);
+ make_callbacks(cb, ncb, 0, 0, fd->shutdown_iocbs);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {