adb: make fdevent_test, socket_test compile on Windows.
Switch pthread_* to use the adb_thread_* abstractions to allow the fdevent
and socket tests to compile on Win32.
Bug: http://b/27105824
Change-Id: I6541bb1398780b999837e701837d7f86a5eee8ca
diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp
index 03cab64..471ca09 100644
--- a/adb/socket_test.cpp
+++ b/adb/socket_test.cpp
@@ -18,119 +18,89 @@
#include <gtest/gtest.h>
+#include <array>
#include <limits>
#include <queue>
#include <string>
#include <vector>
-#include <pthread.h>
-#include <signal.h>
#include <unistd.h>
#include "adb.h"
#include "adb_io.h"
+#include "fdevent_test.h"
#include "socket.h"
#include "sysdeps.h"
-static void signal_handler(int) {
- ASSERT_EQ(1u, fdevent_installed_count());
- pthread_exit(nullptr);
-}
-
-// On host, register a dummy socket, so fdevet_loop() will not abort when previously
-// registered local sockets are all closed. On device, fdevent_subproc_setup() installs
-// one fdevent which can be considered as dummy socket.
-static void InstallDummySocket() {
-#if ADB_HOST
- int dummy_fds[2];
- ASSERT_EQ(0, pipe(dummy_fds));
- asocket* dummy_socket = create_local_socket(dummy_fds[0]);
- ASSERT_TRUE(dummy_socket != nullptr);
- dummy_socket->ready(dummy_socket);
-#endif
-}
-
struct ThreadArg {
int first_read_fd;
int last_write_fd;
size_t middle_pipe_count;
};
-static void FdEventThreadFunc(ThreadArg* arg) {
- std::vector<int> read_fds;
- std::vector<int> write_fds;
+class LocalSocketTest : public FdeventTest {};
- read_fds.push_back(arg->first_read_fd);
- for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
- int fds[2];
- ASSERT_EQ(0, adb_socketpair(fds));
- read_fds.push_back(fds[0]);
- write_fds.push_back(fds[1]);
- }
- write_fds.push_back(arg->last_write_fd);
-
- for (size_t i = 0; i < read_fds.size(); ++i) {
- asocket* reader = create_local_socket(read_fds[i]);
- ASSERT_TRUE(reader != nullptr);
- asocket* writer = create_local_socket(write_fds[i]);
- ASSERT_TRUE(writer != nullptr);
- reader->peer = writer;
- writer->peer = reader;
- reader->ready(reader);
- }
-
- InstallDummySocket();
+static void FdEventThreadFunc(void*) {
fdevent_loop();
}
-class LocalSocketTest : public ::testing::Test {
- protected:
- static void SetUpTestCase() {
- ASSERT_NE(SIG_ERR, signal(SIGUSR1, signal_handler));
- ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN));
- }
-
- virtual void SetUp() {
- fdevent_reset();
- ASSERT_EQ(0u, fdevent_installed_count());
- }
-};
-
TEST_F(LocalSocketTest, smoke) {
- const size_t PIPE_COUNT = 100;
- const size_t MESSAGE_LOOP_COUNT = 100;
+ // Join two socketpairs with a chain of intermediate socketpairs.
+ int first[2];
+ std::vector<std::array<int, 2>> intermediates;
+ int last[2];
+
+ constexpr size_t INTERMEDIATE_COUNT = 50;
+ constexpr size_t MESSAGE_LOOP_COUNT = 100;
const std::string MESSAGE = "socket_test";
- int fd_pair1[2];
- int fd_pair2[2];
- ASSERT_EQ(0, adb_socketpair(fd_pair1));
- ASSERT_EQ(0, adb_socketpair(fd_pair2));
- pthread_t thread;
- ThreadArg thread_arg;
- thread_arg.first_read_fd = fd_pair1[0];
- thread_arg.last_write_fd = fd_pair2[1];
- thread_arg.middle_pipe_count = PIPE_COUNT;
- int writer = fd_pair1[1];
- int reader = fd_pair2[0];
- ASSERT_EQ(0, pthread_create(&thread, nullptr,
- reinterpret_cast<void* (*)(void*)>(FdEventThreadFunc),
- &thread_arg));
+ intermediates.resize(INTERMEDIATE_COUNT);
+ ASSERT_EQ(0, adb_socketpair(first)) << strerror(errno);
+ ASSERT_EQ(0, adb_socketpair(last)) << strerror(errno);
+ asocket* prev_tail = create_local_socket(first[1]);
+ ASSERT_NE(nullptr, prev_tail);
- usleep(1000);
+ auto connect = [](asocket* tail, asocket* head) {
+ tail->peer = head;
+ head->peer = tail;
+ tail->ready(tail);
+ };
+
+ for (auto& intermediate : intermediates) {
+ ASSERT_EQ(0, adb_socketpair(intermediate.data())) << strerror(errno);
+
+ asocket* head = create_local_socket(intermediate[0]);
+ ASSERT_NE(nullptr, head);
+
+ asocket* tail = create_local_socket(intermediate[1]);
+ ASSERT_NE(nullptr, tail);
+
+ connect(prev_tail, head);
+ prev_tail = tail;
+ }
+
+ asocket* end = create_local_socket(last[0]);
+ ASSERT_NE(nullptr, end);
+ connect(prev_tail, end);
+
+ PrepareThread();
+ adb_thread_t thread;
+ ASSERT_TRUE(adb_thread_create(FdEventThreadFunc, nullptr, &thread));
+
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
std::string write_buffer(MESSAGE.size(), 'a');
- ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
- ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
+ ASSERT_TRUE(WriteFdExactly(first[0], &read_buffer[0], read_buffer.size()));
+ ASSERT_TRUE(ReadFdExactly(last[1], &write_buffer[0], write_buffer.size()));
ASSERT_EQ(read_buffer, write_buffer);
}
- ASSERT_EQ(0, adb_close(writer));
- ASSERT_EQ(0, adb_close(reader));
- // Wait until the local sockets are closed.
- sleep(1);
- ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
- ASSERT_EQ(0, pthread_join(thread, nullptr));
+ ASSERT_EQ(0, adb_close(first[0]));
+ ASSERT_EQ(0, adb_close(last[1]));
+
+ // Wait until the local sockets are closed.
+ adb_sleep_ms(100);
+ TerminateThread(thread);
}
struct CloseWithPacketArg {
@@ -160,7 +130,6 @@
s->peer = cause_close_s;
cause_close_s->ready(cause_close_s);
- InstallDummySocket();
fdevent_loop();
}
@@ -176,21 +145,19 @@
CloseWithPacketArg arg;
arg.socket_fd = socket_fd[1];
arg.cause_close_fd = cause_close_fd[1];
- pthread_t thread;
- ASSERT_EQ(0, pthread_create(&thread, nullptr,
- reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
- &arg));
- // Wait until the fdevent_loop() starts.
- sleep(1);
- ASSERT_EQ(0, adb_close(cause_close_fd[0]));
- sleep(1);
- ASSERT_EQ(2u, fdevent_installed_count());
- ASSERT_EQ(0, adb_close(socket_fd[0]));
- // Wait until the socket is closed.
- sleep(1);
- ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
- ASSERT_EQ(0, pthread_join(thread, nullptr));
+ PrepareThread();
+ adb_thread_t thread;
+ ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
+ &arg, &thread));
+ // Wait until the fdevent_loop() starts.
+ adb_sleep_ms(100);
+ ASSERT_EQ(0, adb_close(cause_close_fd[0]));
+ adb_sleep_ms(100);
+ EXPECT_EQ(2u, fdevent_installed_count());
+ ASSERT_EQ(0, adb_close(socket_fd[0]));
+
+ TerminateThread(thread);
}
// This test checks if we can read packets from a closing local socket.
@@ -203,26 +170,23 @@
arg.socket_fd = socket_fd[1];
arg.cause_close_fd = cause_close_fd[1];
- pthread_t thread;
- ASSERT_EQ(0, pthread_create(&thread, nullptr,
- reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
- &arg));
+ PrepareThread();
+ adb_thread_t thread;
+ ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
+ &arg, &thread));
// Wait until the fdevent_loop() starts.
- sleep(1);
+ adb_sleep_ms(100);
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
- sleep(1);
- ASSERT_EQ(2u, fdevent_installed_count());
+ adb_sleep_ms(100);
+ EXPECT_EQ(2u, fdevent_installed_count());
// Verify if we can read successfully.
std::vector<char> buf(arg.bytes_written);
+ ASSERT_NE(0u, arg.bytes_written);
ASSERT_EQ(true, ReadFdExactly(socket_fd[0], buf.data(), buf.size()));
ASSERT_EQ(0, adb_close(socket_fd[0]));
- // Wait until the socket is closed.
- sleep(1);
-
- ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
- ASSERT_EQ(0, pthread_join(thread, nullptr));
+ TerminateThread(thread);
}
// This test checks if we can close local socket in the following situation:
@@ -238,20 +202,17 @@
arg.socket_fd = socket_fd[1];
arg.cause_close_fd = cause_close_fd[1];
- pthread_t thread;
- ASSERT_EQ(0, pthread_create(&thread, nullptr,
- reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
- &arg));
+ PrepareThread();
+ adb_thread_t thread;
+ ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
+ &arg, &thread));
+
// Wait until the fdevent_loop() starts.
- sleep(1);
- ASSERT_EQ(3u, fdevent_installed_count());
+ adb_sleep_ms(100);
+ EXPECT_EQ(3u, fdevent_installed_count());
ASSERT_EQ(0, adb_close(socket_fd[0]));
- // Wait until the socket is closed.
- sleep(1);
-
- ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
- ASSERT_EQ(0, pthread_join(thread, nullptr));
+ TerminateThread(thread);
}
#if defined(__linux__)
@@ -260,50 +221,52 @@
std::string error;
int fd = network_loopback_client(5038, SOCK_STREAM, &error);
ASSERT_GE(fd, 0) << error;
- sleep(2);
+ adb_sleep_ms(200);
ASSERT_EQ(0, adb_close(fd));
}
struct CloseRdHupSocketArg {
- int socket_fd;
+ int socket_fd;
};
static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
- asocket* s = create_local_socket(arg->socket_fd);
- ASSERT_TRUE(s != nullptr);
+ asocket* s = create_local_socket(arg->socket_fd);
+ ASSERT_TRUE(s != nullptr);
- InstallDummySocket();
- fdevent_loop();
+ fdevent_loop();
}
// This test checks if we can close sockets in CLOSE_WAIT state.
TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
- std::string error;
- int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error);
- ASSERT_GE(listen_fd, 0);
- pthread_t client_thread;
- ASSERT_EQ(0, pthread_create(&client_thread, nullptr,
- reinterpret_cast<void* (*)(void*)>(ClientThreadFunc), nullptr));
+ std::string error;
+ int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error);
+ ASSERT_GE(listen_fd, 0);
- struct sockaddr addr;
- socklen_t alen;
- alen = sizeof(addr);
- int accept_fd = adb_socket_accept(listen_fd, &addr, &alen);
- ASSERT_GE(accept_fd, 0);
- CloseRdHupSocketArg arg;
- arg.socket_fd = accept_fd;
- pthread_t thread;
- ASSERT_EQ(0, pthread_create(&thread, nullptr,
- reinterpret_cast<void* (*)(void*)>(CloseRdHupSocketThreadFunc),
- &arg));
- // Wait until the fdevent_loop() starts.
- sleep(1);
- ASSERT_EQ(2u, fdevent_installed_count());
- // Wait until the client closes its socket.
- ASSERT_EQ(0, pthread_join(client_thread, nullptr));
- sleep(2);
- ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
- ASSERT_EQ(0, pthread_join(thread, nullptr));
+ adb_thread_t client_thread;
+ ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(ClientThreadFunc), nullptr,
+ &client_thread));
+
+ struct sockaddr addr;
+ socklen_t alen;
+ alen = sizeof(addr);
+ int accept_fd = adb_socket_accept(listen_fd, &addr, &alen);
+ ASSERT_GE(accept_fd, 0);
+ CloseRdHupSocketArg arg;
+ arg.socket_fd = accept_fd;
+
+ PrepareThread();
+ adb_thread_t thread;
+ ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseRdHupSocketThreadFunc),
+ &arg, &thread));
+
+ // Wait until the fdevent_loop() starts.
+ adb_sleep_ms(100);
+ EXPECT_EQ(2u, fdevent_installed_count());
+
+ // Wait until the client closes its socket.
+ ASSERT_TRUE(adb_thread_join(client_thread));
+
+ TerminateThread(thread);
}
#endif // defined(__linux__)