Switch to an epoll-based reactor implementation.
epoll is a much nicer interface that very closely matches the
reactor interface. It's also thread-safe which makes it a more
suitable choice for bluedroid. As a result of this change,
reactor_register and reactor_unregister are both thread-safe without
introducing any synchronization in user-space.
diff --git a/osi/src/reactor.c b/osi/src/reactor.c
index 4fcb996..eb88f75 100644
--- a/osi/src/reactor.c
+++ b/osi/src/reactor.c
@@ -20,9 +20,10 @@
#include <assert.h>
#include <errno.h>
+#include <pthread.h>
#include <stdlib.h>
+#include <sys/epoll.h>
#include <sys/eventfd.h>
-#include <sys/select.h>
#include <utils/Log.h>
#include "list.h"
@@ -33,29 +34,64 @@
#endif
struct reactor_t {
+ int epoll_fd;
int event_fd;
- list_t *objects;
+ pthread_mutex_t list_lock; // protects invalidation_list.
+ list_t *invalidation_list; // reactor objects that have been unregistered.
+ pthread_t run_thread; // the pthread on which reactor_run is executing.
+ bool is_running; // indicates whether |run_thread| is valid.
+ bool object_removed;
};
-static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv);
+struct reactor_object_t {
+ int fd; // the file descriptor to monitor for events.
+ void *context; // a context that's passed back to the *_ready functions.
+ reactor_t *reactor; // the reactor instance this object is registered with.
+ pthread_mutex_t lock; // protects the lifetime of this object and all variables.
-static const eventfd_t EVENT_REACTOR_CHANGE_SET = 1;
-static const eventfd_t EVENT_REACTOR_STOP = 0x8000000000000000LL;
+ void (*read_ready)(void *context); // function to call when the file descriptor becomes readable.
+ void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable.
+};
+
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations);
+
+static const size_t MAX_EVENTS = 64;
+static const eventfd_t EVENT_REACTOR_STOP = 1;
reactor_t *reactor_new(void) {
reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t));
if (!ret)
return NULL;
+ ret->epoll_fd = INVALID_FD;
+ ret->event_fd = INVALID_FD;
+
+ ret->epoll_fd = epoll_create(MAX_EVENTS);
+ if (ret->epoll_fd == INVALID_FD) {
+ ALOGE("%s unable to create epoll instance: %s", __func__, strerror(errno));
+ goto error;
+ }
+
ret->event_fd = eventfd(0, 0);
if (ret->event_fd == INVALID_FD) {
ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno));
goto error;
}
- ret->objects = list_new(NULL);
- if (!ret->objects)
+ pthread_mutex_init(&ret->list_lock, NULL);
+ ret->invalidation_list = list_new(NULL);
+ if (!ret->invalidation_list) {
+ ALOGE("%s unable to allocate object invalidation list.", __func__);
goto error;
+ }
+
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.ptr = NULL;
+ if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
+ ALOGE("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));
+ goto error;
+ }
return ret;
@@ -68,28 +104,20 @@
if (!reactor)
return;
- list_free(reactor->objects);
+ list_free(reactor->invalidation_list);
close(reactor->event_fd);
+ close(reactor->epoll_fd);
free(reactor);
}
reactor_status_t reactor_start(reactor_t *reactor) {
assert(reactor != NULL);
- return run_reactor(reactor, 0, NULL);
+ return run_reactor(reactor, 0);
}
reactor_status_t reactor_run_once(reactor_t *reactor) {
assert(reactor != NULL);
- return run_reactor(reactor, 1, NULL);
-}
-
-reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) {
- assert(reactor != NULL);
-
- struct timeval tv;
- tv.tv_sec = timeout_ms / 1000;
- tv.tv_usec = (timeout_ms % 1000) * 1000;
- return run_reactor(reactor, 1, &tv);
+ return run_reactor(reactor, 1);
}
void reactor_stop(reactor_t *reactor) {
@@ -98,92 +126,164 @@
eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
}
-void reactor_register(reactor_t *reactor, reactor_object_t *obj) {
+reactor_object_t *reactor_register(reactor_t *reactor,
+ int fd, void *context,
+ void (*read_ready)(void *context),
+ void (*write_ready)(void *context)) {
assert(reactor != NULL);
- assert(obj != NULL);
+ assert(fd != INVALID_FD);
- // IMPORTANT
- // You might be wondering why on earth this is a |list_prepend|.
- // That is a good question...
- //
- // thread_t depends on this behavior.
- // The first reactor object it registers is its work queue, and prepending
- // means it will always be at the end of any given reactor iteration.
- // This is important to ensure we don't execute off dangling reactor objects
- // or do other bad things.
- list_prepend(reactor->objects, obj);
- eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET);
+ reactor_object_t *object = (reactor_object_t *)calloc(1, sizeof(reactor_object_t));
+ if (!object) {
+ ALOGE("%s unable to allocate reactor object: %s", __func__, strerror(errno));
+ return NULL;
+ }
+
+ object->reactor = reactor;
+ object->fd = fd;
+ object->context = context;
+ object->read_ready = read_ready;
+ object->write_ready = write_ready;
+ pthread_mutex_init(&object->lock, NULL);
+
+ struct epoll_event event;
+ event.events = 0;
+ if (read_ready)
+ event.events |= (EPOLLIN | EPOLLRDHUP);
+ if (write_ready)
+ event.events |= EPOLLOUT;
+ event.data.ptr = object;
+
+ if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
+ ALOGE("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
+ pthread_mutex_destroy(&object->lock);
+ free(object);
+ return NULL;
+ }
+
+ return object;
}
-void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) {
- assert(reactor != NULL);
- assert(obj != NULL);
+bool reactor_change_registration(reactor_object_t *object,
+ void (*read_ready)(void *context),
+ void (*write_ready)(void *context)) {
+ assert(object != NULL);
- list_remove(reactor->objects, obj);
- eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET);
+ struct epoll_event event;
+ event.events = 0;
+ if (read_ready)
+ event.events |= (EPOLLIN | EPOLLRDHUP);
+ if (write_ready)
+ event.events |= EPOLLOUT;
+ event.data.ptr = object;
+
+ if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
+ ALOGE("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));
+ return false;
+ }
+
+ pthread_mutex_lock(&object->lock);
+ object->read_ready = read_ready;
+ object->write_ready = write_ready;
+ pthread_mutex_unlock(&object->lock);
+
+ return true;
}
-// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|.
+void reactor_unregister(reactor_object_t *obj) {
+ assert(obj != NULL);
+
+ reactor_t *reactor = obj->reactor;
+
+ if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
+ ALOGE("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno));
+
+ if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
+ reactor->object_removed = true;
+ return;
+ }
+
+ pthread_mutex_lock(&reactor->list_lock);
+ list_append(reactor->invalidation_list, obj);
+ pthread_mutex_unlock(&reactor->list_lock);
+
+ // Taking the object lock here makes sure a callback for |obj| isn't
+ // currently executing. The reactor thread must then either be before
+ // the callbacks or after. If after, we know that the object won't be
+ // referenced because it has been taken out of the epoll set. If before,
+ // it won't be referenced because the reactor thread will check the
+ // invalidation_list and find it in there. So by taking this lock, we
+ // are waiting until the reactor thread drops all references to |obj|.
+ // One the wait completes, we can unlock and destroy |obj| safely.
+ pthread_mutex_lock(&obj->lock);
+ pthread_mutex_unlock(&obj->lock);
+ pthread_mutex_destroy(&obj->lock);
+ free(obj);
+}
+
+// Runs the reactor loop for a maximum of |iterations|.
// 0 |iterations| means loop forever.
-// NULL |tv| means no timeout (block until an event occurs).
// |reactor| may not be NULL.
-static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) {
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
assert(reactor != NULL);
- for (int i = 0; iterations == 0 || i < iterations; ++i) {
- fd_set read_set;
- fd_set write_set;
- FD_ZERO(&read_set);
- FD_ZERO(&write_set);
- FD_SET(reactor->event_fd, &read_set);
+ reactor->run_thread = pthread_self();
+ reactor->is_running = true;
- int max_fd = reactor->event_fd;
- for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) {
- reactor_object_t *object = (reactor_object_t *)list_node(iter);
- int fd = object->fd;
- reactor_interest_t interest = object->interest;
- if (interest & REACTOR_INTEREST_READ)
- FD_SET(fd, &read_set);
- if (interest & REACTOR_INTEREST_WRITE)
- FD_SET(fd, &write_set);
- if (fd > max_fd)
- max_fd = fd;
- }
+ struct epoll_event events[MAX_EVENTS];
+ for (int i = 0; iterations == 0 || i < iterations; ++i) {
+ pthread_mutex_lock(&reactor->list_lock);
+ list_clear(reactor->invalidation_list);
+ pthread_mutex_unlock(&reactor->list_lock);
int ret;
do {
- ret = select(max_fd + 1, &read_set, &write_set, NULL, tv);
+ ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1);
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
- ALOGE("%s error in select: %s", __func__, strerror(errno));
+ ALOGE("%s error in epoll_wait: %s", __func__, strerror(errno));
+ reactor->is_running = false;
return REACTOR_STATUS_ERROR;
}
- if (ret == 0)
- return REACTOR_STATUS_TIMEOUT;
-
- if (FD_ISSET(reactor->event_fd, &read_set)) {
- eventfd_t value;
- eventfd_read(reactor->event_fd, &value);
- if (value < EVENT_REACTOR_STOP)
- continue;
- return REACTOR_STATUS_STOP;
- }
-
- for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) {
- reactor_object_t *object = (reactor_object_t *)list_node(iter);
- int fd = object->fd;
- if (FD_ISSET(fd, &read_set)) {
- object->read_ready(object->context);
- --ret;
+ for (int j = 0; j < ret; ++j) {
+ // The event file descriptor is the only one that registers with
+ // a NULL data pointer. We use the NULL to identify it and break
+ // out of the reactor loop.
+ if (events[j].data.ptr == NULL) {
+ eventfd_t value;
+ eventfd_read(reactor->event_fd, &value);
+ reactor->is_running = false;
+ return REACTOR_STATUS_STOP;
}
- if (FD_ISSET(fd, &write_set)) {
+
+ reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
+
+ pthread_mutex_lock(&reactor->list_lock);
+ if (list_contains(reactor->invalidation_list, object)) {
+ pthread_mutex_unlock(&reactor->list_lock);
+ continue;
+ }
+
+ // Downgrade the list lock to an object lock.
+ pthread_mutex_lock(&object->lock);
+ pthread_mutex_unlock(&reactor->list_lock);
+
+ reactor->object_removed = false;
+ if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
+ object->read_ready(object->context);
+ if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
object->write_ready(object->context);
- --ret;
+ pthread_mutex_unlock(&object->lock);
+
+ if (reactor->object_removed) {
+ pthread_mutex_destroy(&object->lock);
+ free(object);
}
}
}
+ reactor->is_running = false;
return REACTOR_STATUS_DONE;
}