Merge "Fixes ADB crash on Windows due to large number of connections."
diff --git a/adb/sysdeps_win32.c b/adb/sysdeps_win32.c
index ced91e8..c426718 100644
--- a/adb/sysdeps_win32.c
+++ b/adb/sysdeps_win32.c
@@ -352,7 +352,7 @@
                 return -1;
         }
     }
-    
+
     snprintf( f->name, sizeof(f->name), "%d(%s)", _fh_to_int(f), path );
     D( "adb_open: '%s' => fd %d\n", path, _fh_to_int(f) );
     return _fh_to_int(f);
@@ -837,7 +837,7 @@
 
     if (len2 > 8) len2 = 8;
 
-    for (nn = 0; nn < len2; nn++) 
+    for (nn = 0; nn < len2; nn++)
         printf("%02x", ptr[nn]);
     printf("  ");
 
@@ -994,7 +994,7 @@
         SetEvent( bip->evt_read );
     }
 
-    BIPD(( "bip_buffer_write: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n", 
+    BIPD(( "bip_buffer_write: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n",
             bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read ));
     LeaveCriticalSection( &bip->lock );
 
@@ -1018,7 +1018,7 @@
         LeaveCriticalSection( &bip->lock );
         errno = EAGAIN;
         return -1;
-#else    
+#else
         int  ret;
         LeaveCriticalSection( &bip->lock );
 
@@ -1087,14 +1087,14 @@
     }
 
     BIPDUMP( (const unsigned char*)dst - count, count );
-    BIPD(( "bip_buffer_read: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n", 
+    BIPD(( "bip_buffer_read: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n",
             bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read ));
     LeaveCriticalSection( &bip->lock );
 
     return count;
 }
 
-typedef struct SocketPairRec_ 
+typedef struct SocketPairRec_
 {
     BipBufferRec  a2b_bip;
     BipBufferRec  b2a_bip;
@@ -1400,7 +1400,7 @@
         f->clazz->_fh_hook( f, events & ~node->wanted, node );
         node->wanted |= events;
     } else {
-        D("event_looper_hook: ignoring events %x for %d wanted=%x)\n", 
+        D("event_looper_hook: ignoring events %x for %d wanted=%x)\n",
            events, fd, node->wanted);
     }
 }
@@ -1426,6 +1426,180 @@
     }
 }
 
+/*
+ * A fixer for WaitForMultipleObjects on condition that there are more than 64
+ * handles to wait on.
+ *
+ * In cetain cases DDMS may establish more than 64 connections with ADB. For
+ * instance, this may happen if there are more than 64 processes running on a
+ * device, or there are multiple devices connected (including the emulator) with
+ * the combined number of running processes greater than 64. In this case using
+ * WaitForMultipleObjects to wait on connection events simply wouldn't cut,
+ * because of the API limitations (64 handles max). So, we need to provide a way
+ * to scale WaitForMultipleObjects to accept an arbitrary number of handles. The
+ * easiest (and "Microsoft recommended") way to do that would be dividing the
+ * handle array into chunks with the chunk size less than 64, and fire up as many
+ * waiting threads as there are chunks. Then each thread would wait on a chunk of
+ * handles, and will report back to the caller which handle has been set.
+ * Here is the implementation of that algorithm.
+ */
+
+/* Number of handles to wait on in each wating thread. */
+#define WAIT_ALL_CHUNK_SIZE 63
+
+/* Descriptor for a wating thread */
+typedef struct WaitForAllParam {
+    /* A handle to an event to signal when waiting is over. This handle is shared
+     * accross all the waiting threads, so each waiting thread knows when any
+     * other thread has exited, so it can exit too. */
+    HANDLE          main_event;
+    /* Upon exit from a waiting thread contains the index of the handle that has
+     * been signaled. The index is an absolute index of the signaled handle in
+     * the original array. This pointer is shared accross all the waiting threads
+     * and it's not guaranteed (due to a race condition) that when all the
+     * waiting threads exit, the value contained here would indicate the first
+     * handle that was signaled. This is fine, because the caller cares only
+     * about any handle being signaled. It doesn't care about the order, nor
+     * about the whole list of handles that were signaled. */
+    LONG volatile   *signaled_index;
+    /* Array of handles to wait on in a waiting thread. */
+    HANDLE*         handles;
+    /* Number of handles in 'handles' array to wait on. */
+    int             handles_count;
+    /* Index inside the main array of the first handle in the 'handles' array. */
+    int             first_handle_index;
+    /* Waiting thread handle. */
+    HANDLE          thread;
+} WaitForAllParam;
+
+/* Waiting thread routine. */
+static unsigned __stdcall
+_in_waiter_thread(void*  arg)
+{
+    HANDLE wait_on[WAIT_ALL_CHUNK_SIZE + 1];
+    int res;
+    WaitForAllParam* const param = (WaitForAllParam*)arg;
+
+    /* We have to wait on the main_event in order to be notified when any of the
+     * sibling threads is exiting. */
+    wait_on[0] = param->main_event;
+    /* The rest of the handles go behind the main event handle. */
+    memcpy(wait_on + 1, param->handles, param->handles_count * sizeof(HANDLE));
+
+    res = WaitForMultipleObjects(param->handles_count + 1, wait_on, FALSE, INFINITE);
+    if (res > 0 && res < (param->handles_count + 1)) {
+        /* One of the original handles got signaled. Save its absolute index into
+         * the output variable. */
+        InterlockedCompareExchange(param->signaled_index,
+                                   res - 1L + param->first_handle_index, -1L);
+    }
+
+    /* Notify the caller (and the siblings) that the wait is over. */
+    SetEvent(param->main_event);
+
+    _endthreadex(0);
+    return 0;
+}
+
+/* WaitForMultipeObjects fixer routine.
+ * Param:
+ *  handles Array of handles to wait on.
+ *  handles_count Number of handles in the array.
+ * Return:
+ *  (>= 0 && < handles_count) - Index of the signaled handle in the array, or
+ *  WAIT_FAILED on an error.
+ */
+static int
+_wait_for_all(HANDLE* handles, int handles_count)
+{
+    WaitForAllParam* threads;
+    HANDLE main_event;
+    int chunks, chunk, remains;
+
+    /* This variable is going to be accessed by several threads at the same time,
+     * this is bound to fail randomly when the core is run on multi-core machines.
+     * To solve this, we need to do the following (1 _and_ 2):
+     * 1. Use the "volatile" qualifier to ensure the compiler doesn't optimize
+     *    out the reads/writes in this function unexpectedly.
+     * 2. Ensure correct memory ordering. The "simple" way to do that is to wrap
+     *    all accesses inside a critical section. But we can also use
+     *    InterlockedCompareExchange() which always provide a full memory barrier
+     *    on Win32.
+     */
+    volatile LONG sig_index = -1;
+
+    /* Calculate number of chunks, and allocate thread param array. */
+    chunks = handles_count / WAIT_ALL_CHUNK_SIZE;
+    remains = handles_count % WAIT_ALL_CHUNK_SIZE;
+    threads = (WaitForAllParam*)malloc((chunks + (remains ? 1 : 0)) *
+                                        sizeof(WaitForAllParam));
+    if (threads == NULL) {
+        D("Unable to allocate thread array for %d handles.", handles_count);
+        return (int)WAIT_FAILED;
+    }
+
+    /* Create main event to wait on for all waiting threads. This is a "manualy
+     * reset" event that will remain set once it was set. */
+    main_event = CreateEvent(NULL, TRUE, FALSE, NULL);
+    if (main_event == NULL) {
+        D("Unable to create main event. Error: %d", GetLastError());
+        free(threads);
+        return (int)WAIT_FAILED;
+    }
+
+    /*
+     * Initialize waiting thread parameters.
+     */
+
+    for (chunk = 0; chunk < chunks; chunk++) {
+        threads[chunk].main_event = main_event;
+        threads[chunk].signaled_index = &sig_index;
+        threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk;
+        threads[chunk].handles = handles + threads[chunk].first_handle_index;
+        threads[chunk].handles_count = WAIT_ALL_CHUNK_SIZE;
+    }
+    if (remains) {
+        threads[chunk].main_event = main_event;
+        threads[chunk].signaled_index = &sig_index;
+        threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk;
+        threads[chunk].handles = handles + threads[chunk].first_handle_index;
+        threads[chunk].handles_count = remains;
+        chunks++;
+    }
+
+    /* Start the waiting threads. */
+    for (chunk = 0; chunk < chunks; chunk++) {
+        /* Note that using adb_thread_create is not appropriate here, since we
+         * need a handle to wait on for thread termination. */
+        threads[chunk].thread = (HANDLE)_beginthreadex(NULL, 0, _in_waiter_thread,
+                                                       &threads[chunk], 0, NULL);
+        if (threads[chunk].thread == NULL) {
+            /* Unable to create a waiter thread. Collapse. */
+            D("Unable to create a waiting thread %d of %d. errno=%d",
+              chunk, chunks, errno);
+            chunks = chunk;
+            SetEvent(main_event);
+            break;
+        }
+    }
+
+    /* Wait on any of the threads to get signaled. */
+    WaitForSingleObject(main_event, INFINITE);
+
+    /* Wait on all the waiting threads to exit. */
+    for (chunk = 0; chunk < chunks; chunk++) {
+        WaitForSingleObject(threads[chunk].thread, INFINITE);
+        CloseHandle(threads[chunk].thread);
+    }
+
+    CloseHandle(main_event);
+    free(threads);
+
+
+    const int ret = (int)InterlockedCompareExchange(&sig_index, -1, -1);
+    return (ret >= 0) ? ret : (int)WAIT_FAILED;
+}
+
 static EventLooperRec  win32_looper;
 
 static void fdevent_init(void)
@@ -1494,7 +1668,7 @@
     {
         looper->htab_count = 0;
 
-        for (hook = looper->hooks; hook; hook = hook->next) 
+        for (hook = looper->hooks; hook; hook = hook->next)
         {
             if (hook->start && !hook->start(hook)) {
                 D( "fdevent_process: error when starting a hook\n" );
@@ -1525,10 +1699,11 @@
 
             D( "adb_win32: waiting for %d events\n", looper->htab_count );
             if (looper->htab_count > MAXIMUM_WAIT_OBJECTS) {
-                D("handle count %d exceeds MAXIMUM_WAIT_OBJECTS, aborting!\n", looper->htab_count);
-                abort();
+                D("handle count %d exceeds MAXIMUM_WAIT_OBJECTS.\n", looper->htab_count);
+                wait_ret = _wait_for_all(looper->htab, looper->htab_count);
+            } else {
+                wait_ret = WaitForMultipleObjects( looper->htab_count, looper->htab, FALSE, INFINITE );
             }
-            wait_ret = WaitForMultipleObjects( looper->htab_count, looper->htab, FALSE, INFINITE );
             if (wait_ret == (int)WAIT_FAILED) {
                 D( "adb_win32: wait failed, error %ld\n", GetLastError() );
             } else {
@@ -1669,7 +1844,7 @@
     fdevent_remove(fde);
 }
 
-void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg) 
+void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg)
 {
     memset(fde, 0, sizeof(fdevent));
     fde->state = FDE_ACTIVE;
@@ -1691,7 +1866,7 @@
 
     if(fde->state & FDE_ACTIVE) {
         fdevent_disconnect(fde);
-        dump_fde(fde, "disconnect");    
+        dump_fde(fde, "disconnect");
         fdevent_unregister(fde);
     }
 
@@ -1917,7 +2092,7 @@
     if (hook->wanted & FDE_READ && rbip->can_read)
         hook->ready |= FDE_READ;
 
-    if (hook->wanted & FDE_WRITE && wbip->can_write) 
+    if (hook->wanted & FDE_WRITE && wbip->can_write)
         hook->ready |= FDE_WRITE;
  }
 
@@ -1938,7 +2113,7 @@
         D("_event_socketpair_start: can't handle FDE_READ+FDE_WRITE\n" );
         return 0;
     }
-    D( "_event_socketpair_start: hook %s for %x wanted=%x\n", 
+    D( "_event_socketpair_start: hook %s for %x wanted=%x\n",
        hook->fh->name, _fh_to_int(fh), hook->wanted);
     return 1;
 }