Make all async I/O object referenced.

Since it's hard to control lifespan of an object in asynchronous environment, we
should make all AsyncXxx objects a referenced objecst, that will self-destruct
when its reference count drops to zero, indicating that the last client that
used the object has abandoned it.

Change-Id: I6f8194aa14e52a23a8772d827583782989654504
diff --git a/android/async-socket.c b/android/async-socket.c
index 4be69f6..a2577c9 100644
--- a/android/async-socket.c
+++ b/android/async-socket.c
@@ -76,6 +76,10 @@
     int                 is_io_read;
     /* State of the I/O. */
     AsyncIOState        state;
+    /* Number of outstanding references to the I/O. */
+    int                 ref_count;
+    /* Deadline for this I/O */
+    Duration            deadline;
 };
 
 /*
@@ -140,18 +144,24 @@
     asio->on_io         = io_cb;
     asio->io_opaque     = io_opaque;
     asio->state         = ASIO_STATE_QUEUED;
-
+    asio->ref_count     = 1;
+    asio->deadline      = deadline;
     loopTimer_init(asio->timer, _async_socket_get_looper(as),
                    _on_async_socket_io_timed_out, asio);
     loopTimer_startAbsolute(asio->timer, deadline);
 
+    /* Reference socket that is holding this I/O. */
+    async_socket_reference(as);
+
     return asio;
 }
 
 /* Destroys and frees I/O descriptor. */
 static void
-_async_socket_io_destroy(AsyncSocketIO* asio)
+_async_socket_io_free(AsyncSocketIO* asio)
 {
+    AsyncSocket* const as = asio->as;
+
     loopTimer_stop(asio->timer);
     loopTimer_done(asio->timer);
 
@@ -163,6 +173,30 @@
     } else {
         AFREE(asio);
     }
+
+    /* Release socket that is holding this I/O. */
+    async_socket_release(as);
+}
+
+int
+async_socket_io_reference(AsyncSocketIO* asio)
+{
+    assert(asio->ref_count > 0);
+    asio->ref_count++;
+    return asio->ref_count;
+}
+
+int
+async_socket_io_release(AsyncSocketIO* asio)
+{
+    assert(asio->ref_count > 0);
+    asio->ref_count--;
+    if (asio->ref_count == 0) {
+        /* Last reference has been dropped. Destroy this object. */
+        _async_socket_io_free(asio);
+        return 0;
+    }
+    return asio->ref_count;
 }
 
 /* Creates new asynchronous socket reader.
@@ -217,10 +251,16 @@
 _on_async_socket_io_timed_out(void* opaque)
 {
     AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
-    const AsyncIOAction action = _async_socket_io_timed_out(asio->as, asio);
-    if (action != ASIO_ACTION_RETRY) {
-        _async_socket_io_destroy(asio);
-    }
+    AsyncSocket* const as = asio->as;
+
+    D("%s: %s I/O (deadline = %lld) timed out at %lld",
+      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
+      asio->deadline, async_socket_deadline(as, 0));
+
+    /* Reference while in callback. */
+    async_socket_io_reference(asio);
+    _async_socket_io_timed_out(asio->as, asio);
+    async_socket_io_release(asio);
 }
 
 /********************************************************************************
@@ -230,6 +270,7 @@
 AsyncSocket*
 async_socket_io_get_socket(const AsyncSocketIO* asio)
 {
+    async_socket_reference(asio->as);
     return asio->as;
 }
 
@@ -318,6 +359,8 @@
     int                 fd;
     /* Timeout to use for reconnection attempts. */
     int                 reconnect_to;
+    /* Number of outstanding references to the socket. */
+    int                 ref_count;
 };
 
 static const char*
@@ -337,6 +380,8 @@
  *  as - Initialized AsyncSocket instance.
  * Return:
  *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
+ *  Note that the caller is responsible for releasing the I/O object returned
+ *  from this routine.
  */
 static AsyncSocketIO*
 _async_socket_pull_first_io(AsyncSocket* as,
@@ -360,6 +405,8 @@
  * Return:
  *  First reader pulled out of the list, or NULL if there are no readers in the
  *  list.
+ *  Note that the caller is responsible for releasing the I/O object returned
+ *  from this routine.
  */
 static AsyncSocketIO*
 _async_socket_pull_first_reader(AsyncSocket* as)
@@ -373,6 +420,8 @@
  * Return:
  *  First writer pulled out of the list, or NULL if there are no writers in the
  *  list.
+ *  Note that the caller is responsible for releasing the I/O object returned
+ *  from this routine.
  */
 static AsyncSocketIO*
 _async_socket_pull_first_writer(AsyncSocket* as)
@@ -414,6 +463,9 @@
         *list_tail = prev;
     }
 
+    /* Release I/O adjusting reference added when I/O has been saved in the list. */
+    async_socket_io_release(io);
+
     return 1;
 }
 
@@ -421,10 +473,8 @@
  * Param:
  *  as - Initialized AsyncSocket instance.
  *  list_head, list_tail - Pointers to the list head and tail.
- * Return:
- *  Next I/O at the head of the list, or NULL if I/O list became empty.
  */
-static AsyncSocketIO*
+static void
 _async_socket_advance_io(AsyncSocket* as,
                          AsyncSocketIO** list_head,
                          AsyncSocketIO** list_tail)
@@ -437,31 +487,30 @@
     if (*list_head == NULL) {
         *list_tail = NULL;
     }
-    return *list_head;
+    if (first_io != NULL) {
+        /* Release I/O removed from the head of the list. */
+        async_socket_io_release(first_io);
+    }
 }
 
 /* Advances to the next reader in the list.
  * Param:
  *  as - Initialized AsyncSocket instance.
- * Return:
- *  Next reader at the head of the list, or NULL if reader list became empty.
  */
-static AsyncSocketIO*
+static void
 _async_socket_advance_reader(AsyncSocket* as)
 {
-    return _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
+    _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
 }
 
 /* Advances to the next writer in the list.
  * Param:
  *  as - Initialized AsyncSocket instance.
- * Return:
- *  Next writer at the head of the list, or NULL if writer list became empty.
  */
-static AsyncSocketIO*
+static void
 _async_socket_advance_writer(AsyncSocket* as)
 {
-    return _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
+    _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
 }
 
 /* Completes an I/O.
@@ -552,7 +601,7 @@
         /* We ignore action returned from the cancellation callback, since we're
          * in a disconnected state here. */
         _async_socket_cancel_io(as, to_cancel);
-        _async_socket_io_destroy(to_cancel);
+        async_socket_io_release(to_cancel);
     }
 }
 
@@ -568,7 +617,7 @@
         /* We ignore action returned from the cancellation callback, since we're
          * in a disconnected state here. */
         _async_socket_cancel_io(as, to_cancel);
-        _async_socket_io_destroy(to_cancel);
+        async_socket_io_release(to_cancel);
     }
 }
 
@@ -607,11 +656,10 @@
  *  as - Initialized AsyncSocket instance.
  */
 static void
-_async_socket_destroy(AsyncSocket* as)
+_async_socket_free(AsyncSocket* as)
 {
     if (as != NULL) {
-        /* Cancel all the I/O */
-        _async_socket_cancel_all_io(as);
+        D("AsyncSocket '%s' is destroyed", _async_socket_string(as));
 
         /* Close socket. */
         _async_socket_close_socket(as);
@@ -684,8 +732,9 @@
 static AsyncIOAction
 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
 {
-    D("Async socket '%s' I/O failure: %d -> %s",
-      _async_socket_string(as), errno, strerror(errno));
+    D("Async socket '%s' %s I/O failure: %d -> %s",
+      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
+      errno, strerror(errno));
 
     /* Report the failure. */
     return _async_socket_io_failure(as, asio, errno);
@@ -712,6 +761,9 @@
         return 0;
     }
 
+    /* Reference the reader while we're working with it in this callback. */
+    async_socket_io_reference(asr);
+
     /* Bump I/O state, and inform the client that I/O is in progress. */
     if (asr->state == ASIO_STATE_QUEUED) {
         asr->state = ASIO_STATE_STARTED;
@@ -724,7 +776,6 @@
           _async_socket_string(as));
         /* Move on to the next reader. */
         _async_socket_advance_reader(as);
-        _async_socket_io_destroy(asr);
         /* Lets see if there are still active readers, and enable, or disable
          * read I/O callback accordingly. */
         if (as->readers_head != NULL) {
@@ -732,6 +783,7 @@
         } else {
             loopIo_dontWantRead(as->io);
         }
+        async_socket_io_release(asr);
         return 0;
     }
 
@@ -747,6 +799,7 @@
         /* Socket has been disconnected. */
         errno = ECONNRESET;
         _on_async_socket_disconnected(as);
+        async_socket_io_release(asr);
         return -1;
     }
 
@@ -754,6 +807,7 @@
         if (errno == EWOULDBLOCK || errno == EAGAIN) {
             /* Yield to writes behind this read. */
             loopIo_wantRead(as->io);
+            async_socket_io_release(asr);
             return 0;
         }
 
@@ -764,7 +818,6 @@
               _async_socket_string(as));
             /* Move on to the next reader. */
             _async_socket_advance_reader(as);
-            _async_socket_io_destroy(asr);
             /* Lets see if there are still active readers, and enable, or disable
              * read I/O callback accordingly. */
             if (as->readers_head != NULL) {
@@ -773,6 +826,7 @@
                 loopIo_dontWantRead(as->io);
             }
         }
+        async_socket_io_release(asr);
         return -1;
     }
 
@@ -784,7 +838,6 @@
 
         /* Notify reader completion. */
         _async_socket_complete_io(as, asr);
-        _async_socket_io_destroy(asr);
     }
 
     /* Lets see if there are still active readers, and enable, or disable read
@@ -795,6 +848,8 @@
         loopIo_dontWantRead(as->io);
     }
 
+    async_socket_io_release(asr);
+
     return 0;
 }
 
@@ -819,6 +874,9 @@
         return 0;
     }
 
+    /* Reference the writer while we're working with it in this callback. */
+    async_socket_io_reference(asw);
+
     /* Bump I/O state, and inform the client that I/O is in progress. */
     if (asw->state == ASIO_STATE_QUEUED) {
         asw->state = ASIO_STATE_STARTED;
@@ -829,9 +887,8 @@
     if (action == ASIO_ACTION_ABORT) {
         D("Write is aborted by the client of async socket '%s'",
           _async_socket_string(as));
-        /* Move on to the next reader. */
-        _async_socket_advance_reader(as);
-        _async_socket_io_destroy(asw);
+        /* Move on to the next writer. */
+        _async_socket_advance_writer(as);
         /* Lets see if there are still active writers, and enable, or disable
          * write I/O callback accordingly. */
         if (as->writers_head != NULL) {
@@ -839,6 +896,7 @@
         } else {
             loopIo_dontWantWrite(as->io);
         }
+        async_socket_io_release(asw);
         return 0;
     }
 
@@ -854,6 +912,7 @@
         /* Socket has been disconnected. */
         errno = ECONNRESET;
         _on_async_socket_disconnected(as);
+        async_socket_io_release(asw);
         return -1;
     }
 
@@ -861,18 +920,17 @@
         if (errno == EWOULDBLOCK || errno == EAGAIN) {
             /* Yield to reads behind this write. */
             loopIo_wantWrite(as->io);
+            async_socket_io_release(asw);
             return 0;
         }
 
         /* An I/O error. */
-        /* An I/O error. */
         action = _on_async_socket_failure(as, asw);
         if (action == ASIO_ACTION_ABORT) {
             D("Write is aborted on failure by the client of async socket '%s'",
               _async_socket_string(as));
-            /* Move on to the next reader. */
-            _async_socket_advance_reader(as);
-            _async_socket_io_destroy(asw);
+            /* Move on to the next writer. */
+            _async_socket_advance_writer(as);
             /* Lets see if there are still active writers, and enable, or disable
              * write I/O callback accordingly. */
             if (as->writers_head != NULL) {
@@ -881,10 +939,11 @@
                 loopIo_dontWantWrite(as->io);
             }
         }
+        async_socket_io_release(asw);
         return -1;
     }
 
-    /* Update the reader descriptor. */
+    /* Update the writer descriptor. */
     asw->transferred += res;
     if (asw->transferred == asw->to_transfer) {
         /* This write is completed. Move on to the next writer. */
@@ -892,7 +951,6 @@
 
         /* Notify writer completion. */
         _async_socket_complete_io(as, asw);
-        _async_socket_io_destroy(asw);
     }
 
     /* Lets see if there are still active writers, and enable, or disable write
@@ -903,6 +961,8 @@
         loopIo_dontWantWrite(as->io);
     }
 
+    async_socket_io_release(asw);
+
     return 0;
 }
 
@@ -917,17 +977,24 @@
 {
     AsyncSocket* const as = (AsyncSocket*)opaque;
 
+    /* Reference the socket while we're working with it in this callback. */
+    async_socket_reference(as);
+
     if ((events & LOOP_IO_READ) != 0) {
         if (_on_async_socket_recv(as) != 0) {
+            async_socket_release(as);
             return;
         }
     }
 
     if ((events & LOOP_IO_WRITE) != 0) {
         if (_on_async_socket_send(as) != 0) {
+            async_socket_release(as);
             return;
         }
     }
+
+    async_socket_release(as);
 }
 
 /* A callback that is invoked by asynchronous socket connector on connection
@@ -947,6 +1014,9 @@
     AsyncIOAction action;
     AsyncSocket* const as = (AsyncSocket*)opaque;
 
+    /* Reference the socket while we're working with it in this callback. */
+    async_socket_reference(as);
+
     if (event == ASIO_STATE_SUCCEEDED) {
         /* Accept the connection. */
         as->fd = async_socket_connector_pull_fd(connector);
@@ -963,6 +1033,12 @@
         _async_socket_close_socket(as);
     }
 
+    if (action != ASIO_ACTION_RETRY) {
+        async_socket_connector_release(connector);
+    }
+
+    async_socket_release(as);
+
     return action;
 }
 
@@ -974,7 +1050,11 @@
 _on_async_socket_reconnect(void* opaque)
 {
     AsyncSocket* as = (AsyncSocket*)opaque;
+
+    /* Reference the socket while we're working with it in this callback. */
+    async_socket_reference(as);
     async_socket_connect(as, as->reconnect_to);
+    async_socket_release(as);
 }
 
 
@@ -1002,13 +1082,14 @@
     as->on_connection = client_cb;
     as->readers_head = as->readers_tail = NULL;
     as->reconnect_to = reconnect_to;
+    as->ref_count = 1;
     sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
     as->looper = looper_newCore();
     if (as->looper == NULL) {
         E("Unable to create I/O looper for async socket '%s'",
           _async_socket_string(as));
         client_cb(client_opaque, as, ASIO_STATE_FAILED);
-        _async_socket_destroy(as);
+        _async_socket_free(as);
         return NULL;
     }
     loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
@@ -1016,6 +1097,28 @@
     return as;
 }
 
+int
+async_socket_reference(AsyncSocket* as)
+{
+    assert(as->ref_count > 0);
+    as->ref_count++;
+    return as->ref_count;
+}
+
+int
+async_socket_release(AsyncSocket* as)
+{
+    assert(as->ref_count > 0);
+    as->ref_count--;
+    if (as->ref_count == 0) {
+        /* Last reference has been dropped. Destroy this object. */
+        _async_socket_cancel_all_io(as);
+        _async_socket_free(as);
+        return 0;
+    }
+    return as->ref_count;
+}
+
 void
 async_socket_connect(AsyncSocket* as, int retry_to)
 {
@@ -1034,7 +1137,6 @@
     if (as != NULL) {
         _async_socket_cancel_all_io(as);
         _async_socket_close_socket(as);
-        _async_socket_destroy(as);
     }
 }
 
@@ -1056,6 +1158,8 @@
     AsyncSocketIO* const asr =
         _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
                                  deadline);
+    /* Add new reader to the list. Note that we use initial reference from I/O
+     * 'new' routine as "in the list" reference counter. */
     if (as->readers_head == NULL) {
         as->readers_head = as->readers_tail = asr;
     } else {
@@ -1087,6 +1191,8 @@
     AsyncSocketIO* const asw =
         _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
                                  deadline);
+    /* Add new writer to the list. Note that we use initial reference from I/O
+     * 'new' routine as "in the list" reference counter. */
     if (as->writers_head == NULL) {
         as->writers_head = as->writers_tail = asw;
     } else {
@@ -1120,3 +1226,9 @@
     return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
                         DURATION_INFINITE;
 }
+
+int
+async_socket_get_port(const AsyncSocket* as)
+{
+    return sock_address_get_port(&as->address);
+}