Add peek + dequeue registratation to fixed queue
diff --git a/osi/include/fixed_queue.h b/osi/include/fixed_queue.h
index 12e8e75..5c32ac7 100644
--- a/osi/include/fixed_queue.h
+++ b/osi/include/fixed_queue.h
@@ -22,8 +22,10 @@
 
 struct fixed_queue_t;
 typedef struct fixed_queue_t fixed_queue_t;
+typedef struct reactor_t reactor_t;
 
 typedef void (*fixed_queue_free_cb)(void *data);
+typedef void (*fixed_queue_cb)(fixed_queue_t *queue, void *context);
 
 // Creates a new fixed queue with the given |capacity|. If more elements than
 // |capacity| are added to the queue, the caller is blocked until space is
@@ -65,6 +67,11 @@
 // NULL.
 void *fixed_queue_try_dequeue(fixed_queue_t *queue);
 
+// Returns the first element from |queue|, if present, without dequeuing it.
+// This function will never block the caller. Returns NULL if there are no elements
+// in the queue. |queue| may not be NULL.
+void *fixed_queue_try_peek(fixed_queue_t *queue);
+
 // This function returns a valid file descriptor. Callers may perform one
 // operation on the fd: select(2). If |select| indicates that the file
 // descriptor is readable, the caller may call |fixed_queue_enqueue| without
@@ -78,3 +85,13 @@
 // blocking. The caller must not close the returned file descriptor. |queue|
 // may not be NULL.
 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue);
+
+// Registers |queue| with |reactor| for dequeue operations. When there is an element
+// in the queue, ready_cb will be called. The |context| parameter is passed, untouched,
+// to the callback routine. Neither |queue|, nor |reactor|, nor |read_cb| may be NULL.
+// |context| may be NULL.
+void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context);
+
+// Unregisters the dequeue ready callback for |queue| from whichever reactor
+// it is registered with, if any. This function is idempotent.
+void fixed_queue_unregister_dequeue(fixed_queue_t *queue);
diff --git a/osi/src/fixed_queue.c b/osi/src/fixed_queue.c
index 5637507..7db687a 100644
--- a/osi/src/fixed_queue.c
+++ b/osi/src/fixed_queue.c
@@ -19,11 +19,13 @@
 #include <assert.h>
 #include <pthread.h>
 #include <stdlib.h>
+#include <utils/Log.h>
 
 #include "fixed_queue.h"
 #include "list.h"
 #include "osi.h"
 #include "semaphore.h"
+#include "reactor.h"
 
 typedef struct fixed_queue_t {
   list_t *list;
@@ -31,8 +33,14 @@
   semaphore_t *dequeue_sem;
   pthread_mutex_t lock;
   size_t capacity;
+
+  reactor_object_t *dequeue_object;
+  fixed_queue_cb dequeue_ready;
+  void *dequeue_context;
 } fixed_queue_t;
 
+static void internal_dequeue_ready(void *context);
+
 fixed_queue_t *fixed_queue_new(size_t capacity) {
   fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t));
   if (!ret)
@@ -70,6 +78,8 @@
   if (!queue)
     return;
 
+  fixed_queue_unregister_dequeue(queue);
+
   if (free_cb)
     for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
       free_cb(list_node(node));
@@ -156,6 +166,17 @@
   return ret;
 }
 
+void *fixed_queue_try_peek(fixed_queue_t *queue) {
+  assert(queue != NULL);
+
+  pthread_mutex_lock(&queue->lock);
+  // Because protected by the lock, the empty and front calls are atomic and not a race condition
+  void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
+  pthread_mutex_unlock(&queue->lock);
+
+  return ret;
+}
+
 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
   assert(queue != NULL);
   return semaphore_get_fd(queue->dequeue_sem);
@@ -165,3 +186,38 @@
   assert(queue != NULL);
   return semaphore_get_fd(queue->enqueue_sem);
 }
+
+void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
+  assert(queue != NULL);
+  assert(reactor != NULL);
+  assert(ready_cb != NULL);
+
+  // Make sure we're not already registered
+  fixed_queue_unregister_dequeue(queue);
+
+  queue->dequeue_ready = ready_cb;
+  queue->dequeue_context = context;
+  queue->dequeue_object = reactor_register(
+    reactor,
+    fixed_queue_get_dequeue_fd(queue),
+    queue,
+    internal_dequeue_ready,
+    NULL
+  );
+}
+
+void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
+  assert(queue != NULL);
+
+  if (queue->dequeue_object) {
+    reactor_unregister(queue->dequeue_object);
+    queue->dequeue_object = NULL;
+  }
+}
+
+static void internal_dequeue_ready(void *context) {
+  assert(context != NULL);
+
+  fixed_queue_t *queue = context;
+  queue->dequeue_ready(queue, queue->dequeue_context);
+}