adb: make fdevent_test, socket_test compile on Windows.

Switch pthread_* to use the adb_thread_* abstractions to allow the fdevent
and socket tests to compile on Win32.

Bug: http://b/27105824
Change-Id: I6541bb1398780b999837e701837d7f86a5eee8ca
diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp
index 03cab64..471ca09 100644
--- a/adb/socket_test.cpp
+++ b/adb/socket_test.cpp
@@ -18,119 +18,89 @@
 
 #include <gtest/gtest.h>
 
+#include <array>
 #include <limits>
 #include <queue>
 #include <string>
 #include <vector>
 
-#include <pthread.h>
-#include <signal.h>
 #include <unistd.h>
 
 #include "adb.h"
 #include "adb_io.h"
+#include "fdevent_test.h"
 #include "socket.h"
 #include "sysdeps.h"
 
-static void signal_handler(int) {
-    ASSERT_EQ(1u, fdevent_installed_count());
-    pthread_exit(nullptr);
-}
-
-// On host, register a dummy socket, so fdevet_loop() will not abort when previously
-// registered local sockets are all closed. On device, fdevent_subproc_setup() installs
-// one fdevent which can be considered as dummy socket.
-static void InstallDummySocket() {
-#if ADB_HOST
-    int dummy_fds[2];
-    ASSERT_EQ(0, pipe(dummy_fds));
-    asocket* dummy_socket = create_local_socket(dummy_fds[0]);
-    ASSERT_TRUE(dummy_socket != nullptr);
-    dummy_socket->ready(dummy_socket);
-#endif
-}
-
 struct ThreadArg {
     int first_read_fd;
     int last_write_fd;
     size_t middle_pipe_count;
 };
 
-static void FdEventThreadFunc(ThreadArg* arg) {
-    std::vector<int> read_fds;
-    std::vector<int> write_fds;
+class LocalSocketTest : public FdeventTest {};
 
-    read_fds.push_back(arg->first_read_fd);
-    for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
-        int fds[2];
-        ASSERT_EQ(0, adb_socketpair(fds));
-        read_fds.push_back(fds[0]);
-        write_fds.push_back(fds[1]);
-    }
-    write_fds.push_back(arg->last_write_fd);
-
-    for (size_t i = 0; i < read_fds.size(); ++i) {
-        asocket* reader = create_local_socket(read_fds[i]);
-        ASSERT_TRUE(reader != nullptr);
-        asocket* writer = create_local_socket(write_fds[i]);
-        ASSERT_TRUE(writer != nullptr);
-        reader->peer = writer;
-        writer->peer = reader;
-        reader->ready(reader);
-    }
-
-    InstallDummySocket();
+static void FdEventThreadFunc(void*) {
     fdevent_loop();
 }
 
-class LocalSocketTest : public ::testing::Test {
-  protected:
-    static void SetUpTestCase() {
-        ASSERT_NE(SIG_ERR, signal(SIGUSR1, signal_handler));
-        ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN));
-    }
-
-    virtual void SetUp() {
-        fdevent_reset();
-        ASSERT_EQ(0u, fdevent_installed_count());
-    }
-};
-
 TEST_F(LocalSocketTest, smoke) {
-    const size_t PIPE_COUNT = 100;
-    const size_t MESSAGE_LOOP_COUNT = 100;
+    // Join two socketpairs with a chain of intermediate socketpairs.
+    int first[2];
+    std::vector<std::array<int, 2>> intermediates;
+    int last[2];
+
+    constexpr size_t INTERMEDIATE_COUNT = 50;
+    constexpr size_t MESSAGE_LOOP_COUNT = 100;
     const std::string MESSAGE = "socket_test";
-    int fd_pair1[2];
-    int fd_pair2[2];
-    ASSERT_EQ(0, adb_socketpair(fd_pair1));
-    ASSERT_EQ(0, adb_socketpair(fd_pair2));
-    pthread_t thread;
-    ThreadArg thread_arg;
-    thread_arg.first_read_fd = fd_pair1[0];
-    thread_arg.last_write_fd = fd_pair2[1];
-    thread_arg.middle_pipe_count = PIPE_COUNT;
-    int writer = fd_pair1[1];
-    int reader = fd_pair2[0];
 
-    ASSERT_EQ(0, pthread_create(&thread, nullptr,
-                                reinterpret_cast<void* (*)(void*)>(FdEventThreadFunc),
-                                &thread_arg));
+    intermediates.resize(INTERMEDIATE_COUNT);
+    ASSERT_EQ(0, adb_socketpair(first)) << strerror(errno);
+    ASSERT_EQ(0, adb_socketpair(last)) << strerror(errno);
+    asocket* prev_tail = create_local_socket(first[1]);
+    ASSERT_NE(nullptr, prev_tail);
 
-    usleep(1000);
+    auto connect = [](asocket* tail, asocket* head) {
+        tail->peer = head;
+        head->peer = tail;
+        tail->ready(tail);
+    };
+
+    for (auto& intermediate : intermediates) {
+        ASSERT_EQ(0, adb_socketpair(intermediate.data())) << strerror(errno);
+
+        asocket* head = create_local_socket(intermediate[0]);
+        ASSERT_NE(nullptr, head);
+
+        asocket* tail = create_local_socket(intermediate[1]);
+        ASSERT_NE(nullptr, tail);
+
+        connect(prev_tail, head);
+        prev_tail = tail;
+    }
+
+    asocket* end = create_local_socket(last[0]);
+    ASSERT_NE(nullptr, end);
+    connect(prev_tail, end);
+
+    PrepareThread();
+    adb_thread_t thread;
+    ASSERT_TRUE(adb_thread_create(FdEventThreadFunc, nullptr, &thread));
+
     for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
         std::string read_buffer = MESSAGE;
         std::string write_buffer(MESSAGE.size(), 'a');
-        ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
-        ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
+        ASSERT_TRUE(WriteFdExactly(first[0], &read_buffer[0], read_buffer.size()));
+        ASSERT_TRUE(ReadFdExactly(last[1], &write_buffer[0], write_buffer.size()));
         ASSERT_EQ(read_buffer, write_buffer);
     }
-    ASSERT_EQ(0, adb_close(writer));
-    ASSERT_EQ(0, adb_close(reader));
-    // Wait until the local sockets are closed.
-    sleep(1);
 
-    ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
-    ASSERT_EQ(0, pthread_join(thread, nullptr));
+    ASSERT_EQ(0, adb_close(first[0]));
+    ASSERT_EQ(0, adb_close(last[1]));
+
+    // Wait until the local sockets are closed.
+    adb_sleep_ms(100);
+    TerminateThread(thread);
 }
 
 struct CloseWithPacketArg {
@@ -160,7 +130,6 @@
     s->peer = cause_close_s;
     cause_close_s->ready(cause_close_s);
 
-    InstallDummySocket();
     fdevent_loop();
 }
 
@@ -176,21 +145,19 @@
     CloseWithPacketArg arg;
     arg.socket_fd = socket_fd[1];
     arg.cause_close_fd = cause_close_fd[1];
-    pthread_t thread;
-    ASSERT_EQ(0, pthread_create(&thread, nullptr,
-                                reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
-                                &arg));
-    // Wait until the fdevent_loop() starts.
-    sleep(1);
-    ASSERT_EQ(0, adb_close(cause_close_fd[0]));
-    sleep(1);
-    ASSERT_EQ(2u, fdevent_installed_count());
-    ASSERT_EQ(0, adb_close(socket_fd[0]));
-    // Wait until the socket is closed.
-    sleep(1);
 
-    ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
-    ASSERT_EQ(0, pthread_join(thread, nullptr));
+    PrepareThread();
+    adb_thread_t thread;
+    ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
+                                  &arg, &thread));
+    // Wait until the fdevent_loop() starts.
+    adb_sleep_ms(100);
+    ASSERT_EQ(0, adb_close(cause_close_fd[0]));
+    adb_sleep_ms(100);
+    EXPECT_EQ(2u, fdevent_installed_count());
+    ASSERT_EQ(0, adb_close(socket_fd[0]));
+
+    TerminateThread(thread);
 }
 
 // This test checks if we can read packets from a closing local socket.
@@ -203,26 +170,23 @@
     arg.socket_fd = socket_fd[1];
     arg.cause_close_fd = cause_close_fd[1];
 
-    pthread_t thread;
-    ASSERT_EQ(0, pthread_create(&thread, nullptr,
-                                reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
-                                &arg));
+    PrepareThread();
+    adb_thread_t thread;
+    ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
+                                  &arg, &thread));
     // Wait until the fdevent_loop() starts.
-    sleep(1);
+    adb_sleep_ms(100);
     ASSERT_EQ(0, adb_close(cause_close_fd[0]));
-    sleep(1);
-    ASSERT_EQ(2u, fdevent_installed_count());
+    adb_sleep_ms(100);
+    EXPECT_EQ(2u, fdevent_installed_count());
 
     // Verify if we can read successfully.
     std::vector<char> buf(arg.bytes_written);
+    ASSERT_NE(0u, arg.bytes_written);
     ASSERT_EQ(true, ReadFdExactly(socket_fd[0], buf.data(), buf.size()));
     ASSERT_EQ(0, adb_close(socket_fd[0]));
 
-    // Wait until the socket is closed.
-    sleep(1);
-
-    ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
-    ASSERT_EQ(0, pthread_join(thread, nullptr));
+    TerminateThread(thread);
 }
 
 // This test checks if we can close local socket in the following situation:
@@ -238,20 +202,17 @@
     arg.socket_fd = socket_fd[1];
     arg.cause_close_fd = cause_close_fd[1];
 
-    pthread_t thread;
-    ASSERT_EQ(0, pthread_create(&thread, nullptr,
-                                reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
-                                &arg));
+    PrepareThread();
+    adb_thread_t thread;
+    ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
+                                  &arg, &thread));
+
     // Wait until the fdevent_loop() starts.
-    sleep(1);
-    ASSERT_EQ(3u, fdevent_installed_count());
+    adb_sleep_ms(100);
+    EXPECT_EQ(3u, fdevent_installed_count());
     ASSERT_EQ(0, adb_close(socket_fd[0]));
 
-    // Wait until the socket is closed.
-    sleep(1);
-
-    ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
-    ASSERT_EQ(0, pthread_join(thread, nullptr));
+    TerminateThread(thread);
 }
 
 #if defined(__linux__)
@@ -260,50 +221,52 @@
     std::string error;
     int fd = network_loopback_client(5038, SOCK_STREAM, &error);
     ASSERT_GE(fd, 0) << error;
-    sleep(2);
+    adb_sleep_ms(200);
     ASSERT_EQ(0, adb_close(fd));
 }
 
 struct CloseRdHupSocketArg {
-  int socket_fd;
+    int socket_fd;
 };
 
 static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
-  asocket* s = create_local_socket(arg->socket_fd);
-  ASSERT_TRUE(s != nullptr);
+    asocket* s = create_local_socket(arg->socket_fd);
+    ASSERT_TRUE(s != nullptr);
 
-  InstallDummySocket();
-  fdevent_loop();
+    fdevent_loop();
 }
 
 // This test checks if we can close sockets in CLOSE_WAIT state.
 TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
-  std::string error;
-  int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error);
-  ASSERT_GE(listen_fd, 0);
-  pthread_t client_thread;
-  ASSERT_EQ(0, pthread_create(&client_thread, nullptr,
-                              reinterpret_cast<void* (*)(void*)>(ClientThreadFunc), nullptr));
+    std::string error;
+    int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error);
+    ASSERT_GE(listen_fd, 0);
 
-  struct sockaddr addr;
-  socklen_t alen;
-  alen = sizeof(addr);
-  int accept_fd = adb_socket_accept(listen_fd, &addr, &alen);
-  ASSERT_GE(accept_fd, 0);
-  CloseRdHupSocketArg arg;
-  arg.socket_fd = accept_fd;
-  pthread_t thread;
-  ASSERT_EQ(0, pthread_create(&thread, nullptr,
-                              reinterpret_cast<void* (*)(void*)>(CloseRdHupSocketThreadFunc),
-                              &arg));
-  // Wait until the fdevent_loop() starts.
-  sleep(1);
-  ASSERT_EQ(2u, fdevent_installed_count());
-  // Wait until the client closes its socket.
-  ASSERT_EQ(0, pthread_join(client_thread, nullptr));
-  sleep(2);
-  ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
-  ASSERT_EQ(0, pthread_join(thread, nullptr));
+    adb_thread_t client_thread;
+    ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(ClientThreadFunc), nullptr,
+                                  &client_thread));
+
+    struct sockaddr addr;
+    socklen_t alen;
+    alen = sizeof(addr);
+    int accept_fd = adb_socket_accept(listen_fd, &addr, &alen);
+    ASSERT_GE(accept_fd, 0);
+    CloseRdHupSocketArg arg;
+    arg.socket_fd = accept_fd;
+
+    PrepareThread();
+    adb_thread_t thread;
+    ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseRdHupSocketThreadFunc),
+                                  &arg, &thread));
+
+    // Wait until the fdevent_loop() starts.
+    adb_sleep_ms(100);
+    EXPECT_EQ(2u, fdevent_installed_count());
+
+    // Wait until the client closes its socket.
+    ASSERT_TRUE(adb_thread_join(client_thread));
+
+    TerminateThread(thread);
 }
 
 #endif  // defined(__linux__)