Switch to an epoll-based reactor implementation.

epoll is a much nicer interface that very closely matches the
reactor interface. It's also thread-safe which makes it a more
suitable choice for bluedroid. As a result of this change,
reactor_register and reactor_unregister are both thread-safe without
introducing any synchronization in user-space.
diff --git a/osi/src/reactor.c b/osi/src/reactor.c
index 4fcb996..eb88f75 100644
--- a/osi/src/reactor.c
+++ b/osi/src/reactor.c
@@ -20,9 +20,10 @@
 
 #include <assert.h>
 #include <errno.h>
+#include <pthread.h>
 #include <stdlib.h>
+#include <sys/epoll.h>
 #include <sys/eventfd.h>
-#include <sys/select.h>
 #include <utils/Log.h>
 
 #include "list.h"
@@ -33,29 +34,64 @@
 #endif
 
 struct reactor_t {
+  int epoll_fd;
   int event_fd;
-  list_t *objects;
+  pthread_mutex_t list_lock;  // protects invalidation_list.
+  list_t *invalidation_list;  // reactor objects that have been unregistered.
+  pthread_t run_thread;       // the pthread on which reactor_run is executing.
+  bool is_running;            // indicates whether |run_thread| is valid.
+  bool object_removed;
 };
 
-static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv);
+struct reactor_object_t {
+  int fd;                              // the file descriptor to monitor for events.
+  void *context;                       // a context that's passed back to the *_ready functions.
+  reactor_t *reactor;                  // the reactor instance this object is registered with.
+  pthread_mutex_t lock;                // protects the lifetime of this object and all variables.
 
-static const eventfd_t EVENT_REACTOR_CHANGE_SET = 1;
-static const eventfd_t EVENT_REACTOR_STOP = 0x8000000000000000LL;
+  void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.
+  void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
+};
+
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations);
+
+static const size_t MAX_EVENTS = 64;
+static const eventfd_t EVENT_REACTOR_STOP = 1;
 
 reactor_t *reactor_new(void) {
   reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t));
   if (!ret)
     return NULL;
 
+  ret->epoll_fd = INVALID_FD;
+  ret->event_fd = INVALID_FD;
+
+  ret->epoll_fd = epoll_create(MAX_EVENTS);
+  if (ret->epoll_fd == INVALID_FD) {
+    ALOGE("%s unable to create epoll instance: %s", __func__, strerror(errno));
+    goto error;
+  }
+
   ret->event_fd = eventfd(0, 0);
   if (ret->event_fd == INVALID_FD) {
     ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno));
     goto error;
   }
 
-  ret->objects = list_new(NULL);
-  if (!ret->objects)
+  pthread_mutex_init(&ret->list_lock, NULL);
+  ret->invalidation_list = list_new(NULL);
+  if (!ret->invalidation_list) {
+    ALOGE("%s unable to allocate object invalidation list.", __func__);
     goto error;
+  }
+
+  struct epoll_event event;
+  event.events = EPOLLIN;
+  event.data.ptr = NULL;
+  if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
+    ALOGE("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));
+    goto error;
+  }
 
   return ret;
 
@@ -68,28 +104,20 @@
   if (!reactor)
     return;
 
-  list_free(reactor->objects);
+  list_free(reactor->invalidation_list);
   close(reactor->event_fd);
+  close(reactor->epoll_fd);
   free(reactor);
 }
 
 reactor_status_t reactor_start(reactor_t *reactor) {
   assert(reactor != NULL);
-  return run_reactor(reactor, 0, NULL);
+  return run_reactor(reactor, 0);
 }
 
 reactor_status_t reactor_run_once(reactor_t *reactor) {
   assert(reactor != NULL);
-  return run_reactor(reactor, 1, NULL);
-}
-
-reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) {
-  assert(reactor != NULL);
-
-  struct timeval tv;
-  tv.tv_sec = timeout_ms / 1000;
-  tv.tv_usec = (timeout_ms % 1000) * 1000;
-  return run_reactor(reactor, 1, &tv);
+  return run_reactor(reactor, 1);
 }
 
 void reactor_stop(reactor_t *reactor) {
@@ -98,92 +126,164 @@
   eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
 }
 
-void reactor_register(reactor_t *reactor, reactor_object_t *obj) {
+reactor_object_t *reactor_register(reactor_t *reactor,
+    int fd, void *context,
+    void (*read_ready)(void *context),
+    void (*write_ready)(void *context)) {
   assert(reactor != NULL);
-  assert(obj != NULL);
+  assert(fd != INVALID_FD);
 
-  // IMPORTANT
-  // You might be wondering why on earth this is a |list_prepend|.
-  // That is a good question...
-  //
-  // thread_t depends on this behavior.
-  // The first reactor object it registers is its work queue, and prepending
-  // means it will always be at the end of any given reactor iteration.
-  // This is important to ensure we don't execute off dangling reactor objects
-  // or do other bad things.
-  list_prepend(reactor->objects, obj);
-  eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET);
+  reactor_object_t *object = (reactor_object_t *)calloc(1, sizeof(reactor_object_t));
+  if (!object) {
+    ALOGE("%s unable to allocate reactor object: %s", __func__, strerror(errno));
+    return NULL;
+  }
+
+  object->reactor = reactor;
+  object->fd = fd;
+  object->context = context;
+  object->read_ready = read_ready;
+  object->write_ready = write_ready;
+  pthread_mutex_init(&object->lock, NULL);
+
+  struct epoll_event event;
+  event.events = 0;
+  if (read_ready)
+    event.events |= (EPOLLIN | EPOLLRDHUP);
+  if (write_ready)
+    event.events |= EPOLLOUT;
+  event.data.ptr = object;
+
+  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
+    ALOGE("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
+    pthread_mutex_destroy(&object->lock);
+    free(object);
+    return NULL;
+  }
+
+  return object;
 }
 
-void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) {
-  assert(reactor != NULL);
-  assert(obj != NULL);
+bool reactor_change_registration(reactor_object_t *object,
+    void (*read_ready)(void *context),
+    void (*write_ready)(void *context)) {
+  assert(object != NULL);
 
-  list_remove(reactor->objects, obj);
-  eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET);
+  struct epoll_event event;
+  event.events = 0;
+  if (read_ready)
+    event.events |= (EPOLLIN | EPOLLRDHUP);
+  if (write_ready)
+    event.events |= EPOLLOUT;
+  event.data.ptr = object;
+
+  if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
+    ALOGE("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));
+    return false;
+  }
+
+  pthread_mutex_lock(&object->lock);
+  object->read_ready = read_ready;
+  object->write_ready = write_ready;
+  pthread_mutex_unlock(&object->lock);
+
+  return true;
 }
 
-// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|.
+void reactor_unregister(reactor_object_t *obj) {
+  assert(obj != NULL);
+
+  reactor_t *reactor = obj->reactor;
+
+  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
+    ALOGE("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno));
+
+  if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
+    reactor->object_removed = true;
+    return;
+  }
+
+  pthread_mutex_lock(&reactor->list_lock);
+  list_append(reactor->invalidation_list, obj);
+  pthread_mutex_unlock(&reactor->list_lock);
+
+  // Taking the object lock here makes sure a callback for |obj| isn't
+  // currently executing. The reactor thread must then either be before
+  // the callbacks or after. If after, we know that the object won't be
+  // referenced because it has been taken out of the epoll set. If before,
+  // it won't be referenced because the reactor thread will check the
+  // invalidation_list and find it in there. So by taking this lock, we
+  // are waiting until the reactor thread drops all references to |obj|.
+  // One the wait completes, we can unlock and destroy |obj| safely.
+  pthread_mutex_lock(&obj->lock);
+  pthread_mutex_unlock(&obj->lock);
+  pthread_mutex_destroy(&obj->lock);
+  free(obj);
+}
+
+// Runs the reactor loop for a maximum of |iterations|.
 // 0 |iterations| means loop forever.
-// NULL |tv| means no timeout (block until an event occurs).
 // |reactor| may not be NULL.
-static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) {
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
   assert(reactor != NULL);
 
-  for (int i = 0; iterations == 0 || i < iterations; ++i) {
-    fd_set read_set;
-    fd_set write_set;
-    FD_ZERO(&read_set);
-    FD_ZERO(&write_set);
-    FD_SET(reactor->event_fd, &read_set);
+  reactor->run_thread = pthread_self();
+  reactor->is_running = true;
 
-    int max_fd = reactor->event_fd;
-    for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) {
-      reactor_object_t *object = (reactor_object_t *)list_node(iter);
-      int fd = object->fd;
-      reactor_interest_t interest = object->interest;
-      if (interest & REACTOR_INTEREST_READ)
-        FD_SET(fd, &read_set);
-      if (interest & REACTOR_INTEREST_WRITE)
-        FD_SET(fd, &write_set);
-      if (fd > max_fd)
-        max_fd = fd;
-    }
+  struct epoll_event events[MAX_EVENTS];
+  for (int i = 0; iterations == 0 || i < iterations; ++i) {
+    pthread_mutex_lock(&reactor->list_lock);
+    list_clear(reactor->invalidation_list);
+    pthread_mutex_unlock(&reactor->list_lock);
 
     int ret;
     do {
-      ret = select(max_fd + 1, &read_set, &write_set, NULL, tv);
+      ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1);
     } while (ret == -1 && errno == EINTR);
 
     if (ret == -1) {
-      ALOGE("%s error in select: %s", __func__, strerror(errno));
+      ALOGE("%s error in epoll_wait: %s", __func__, strerror(errno));
+      reactor->is_running = false;
       return REACTOR_STATUS_ERROR;
     }
 
-    if (ret == 0)
-      return REACTOR_STATUS_TIMEOUT;
-
-    if (FD_ISSET(reactor->event_fd, &read_set)) {
-      eventfd_t value;
-      eventfd_read(reactor->event_fd, &value);
-      if (value < EVENT_REACTOR_STOP)
-        continue;
-      return REACTOR_STATUS_STOP;
-    }
-
-    for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) {
-      reactor_object_t *object = (reactor_object_t *)list_node(iter);
-      int fd = object->fd;
-      if (FD_ISSET(fd, &read_set)) {
-        object->read_ready(object->context);
-        --ret;
+    for (int j = 0; j < ret; ++j) {
+      // The event file descriptor is the only one that registers with
+      // a NULL data pointer. We use the NULL to identify it and break
+      // out of the reactor loop.
+      if (events[j].data.ptr == NULL) {
+        eventfd_t value;
+        eventfd_read(reactor->event_fd, &value);
+        reactor->is_running = false;
+        return REACTOR_STATUS_STOP;
       }
-      if (FD_ISSET(fd, &write_set)) {
+
+      reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
+
+      pthread_mutex_lock(&reactor->list_lock);
+      if (list_contains(reactor->invalidation_list, object)) {
+        pthread_mutex_unlock(&reactor->list_lock);
+        continue;
+      }
+
+      // Downgrade the list lock to an object lock.
+      pthread_mutex_lock(&object->lock);
+      pthread_mutex_unlock(&reactor->list_lock);
+
+      reactor->object_removed = false;
+      if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
+        object->read_ready(object->context);
+      if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
         object->write_ready(object->context);
-        --ret;
+      pthread_mutex_unlock(&object->lock);
+
+      if (reactor->object_removed) {
+        pthread_mutex_destroy(&object->lock);
+        free(object);
       }
     }
   }
 
+  reactor->is_running = false;
   return REACTOR_STATUS_DONE;
 }