[IPC 1] Add class to handle UNIX sockets and shared memory

Adds a foundational class to listen / connect / send / receive
data over a UNIX socket, either named or abstract.
The UnixSocket class also allows to move file descriptors across
process boundaries using POSIX SCM_RIGHTS.

Bug: 68854111
Test: ipc_unittests --gtest_filter=UnixSocketTest.*
Change-Id: I073ccdc29a327395943129a861620071b85d18af
diff --git a/BUILD.gn b/BUILD.gn
index fec540a..3211ebd 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -31,6 +31,7 @@
     "//base:base_unittests",
     "//ftrace_reader:ftrace_reader_integrationtests",
     "//ftrace_reader:ftrace_reader_unittests",
+    "//ipc:ipc_unittests",
     "//protozero:protozero_unittests",
     "//tools/sanitizers_unittests",
     "//tracing:tracing_benchmarks",
diff --git a/base/test/test_task_runner.cc b/base/test/test_task_runner.cc
index 7efd300..2b5c92b 100644
--- a/base/test/test_task_runner.cc
+++ b/base/test/test_task_runner.cc
@@ -19,6 +19,8 @@
 #include <stdio.h>
 #include <unistd.h>
 
+#include <chrono>
+
 #include "base/logging.h"
 
 // TODO: the current implementation quite hacky as it keeps waking up every 1ms.
@@ -26,54 +28,92 @@
 namespace perfetto {
 namespace base {
 
+namespace {
+constexpr int kFileDescriptorWatchTimeoutMs = 100;
+}  // namespace
+
 TestTaskRunner::TestTaskRunner() = default;
 
 TestTaskRunner::~TestTaskRunner() = default;
 
 void TestTaskRunner::Run() {
-  while (RunUntilIdle()) {
+  for (;;)
+    RunUntilIdle();
+}
+
+void TestTaskRunner::RunUntilIdle() {
+  do {
+    QueueFileDescriptorWatches(/* blocking = */ task_queue_.empty());
+  } while (RunOneTask());
+}
+
+void TestTaskRunner::RunUntilCheckpoint(const std::string& checkpoint,
+                                        int timeout_ms) {
+  PERFETTO_DCHECK(checkpoints_.count(checkpoint) == 1);
+  auto tstart = std::chrono::system_clock::now();
+  auto deadline = tstart + std::chrono::milliseconds(timeout_ms);
+  while (!checkpoints_[checkpoint]) {
+    QueueFileDescriptorWatches(/* blocking = */ task_queue_.empty());
+    RunOneTask();
+    if (std::chrono::system_clock::now() > deadline) {
+      fprintf(stderr, "[TestTaskRunner] Failed to reach checkpoint \"%s\"\n",
+              checkpoint.c_str());
+      abort();
+    }
   }
 }
 
-bool TestTaskRunner::RunUntilIdle() {
-  while (!task_queue_.empty()) {
-    std::function<void()> closure = std::move(task_queue_.front());
-    task_queue_.pop_front();
-    closure();
-  }
-
-  int res = RunFileDescriptorWatches(100);
-  if (res < 0)
+bool TestTaskRunner::RunOneTask() {
+  if (task_queue_.empty())
     return false;
+  std::function<void()> closure = std::move(task_queue_.front());
+  task_queue_.pop_front();
+  closure();
   return true;
 }
 
-bool TestTaskRunner::RunFileDescriptorWatches(int timeout_ms) {
+std::function<void()> TestTaskRunner::CreateCheckpoint(
+    const std::string& checkpoint) {
+  PERFETTO_DCHECK(checkpoints_.count(checkpoint) == 0);
+  auto checkpoint_iter = checkpoints_.emplace(checkpoint, false);
+  return [checkpoint_iter] { checkpoint_iter.first->second = true; };
+}
+
+void TestTaskRunner::QueueFileDescriptorWatches(bool blocking) {
+  uint32_t timeout_ms = blocking ? kFileDescriptorWatchTimeoutMs : 0;
   struct timeval timeout;
   timeout.tv_usec = (timeout_ms % 1000) * 1000L;
   timeout.tv_sec = static_cast<time_t>(timeout_ms / 1000);
   int max_fd = 0;
-  fd_set fds = {};
+  fd_set fds_in = {};
+  fd_set fds_err = {};
   for (const auto& it : watched_fds_) {
-    FD_SET(it.first, &fds);
+    FD_SET(it.first, &fds_in);
+    FD_SET(it.first, &fds_err);
     max_fd = std::max(max_fd, it.first);
   }
-  int res = select(max_fd + 1, &fds, nullptr, nullptr, &timeout);
-
+  int res = select(max_fd + 1, &fds_in, nullptr, &fds_err, &timeout);
   if (res < 0) {
     perror("select() failed");
-    return false;
+    abort();
   }
   if (res == 0)
-    return true;  // timeout
+    return;  // timeout
   for (int fd = 0; fd <= max_fd; ++fd) {
-    if (!FD_ISSET(fd, &fds))
+    if (!FD_ISSET(fd, &fds_in) && !FD_ISSET(fd, &fds_err)) {
       continue;
+    }
     auto fd_and_callback = watched_fds_.find(fd);
     PERFETTO_DCHECK(fd_and_callback != watched_fds_.end());
-    fd_and_callback->second();
+    if (fd_watch_task_queued_[fd])
+      continue;
+    auto callback = fd_and_callback->second;
+    task_queue_.emplace_back([this, callback, fd]() {
+      fd_watch_task_queued_[fd] = false;
+      callback();
+    });
+    fd_watch_task_queued_[fd] = true;
   }
-  return true;
 }
 
 // TaskRunner implementation.
@@ -86,12 +126,14 @@
   PERFETTO_DCHECK(fd >= 0);
   PERFETTO_DCHECK(watched_fds_.count(fd) == 0);
   watched_fds_.emplace(fd, std::move(callback));
+  fd_watch_task_queued_[fd] = false;
 }
 
 void TestTaskRunner::RemoveFileDescriptorWatch(int fd) {
   PERFETTO_DCHECK(fd >= 0);
   PERFETTO_DCHECK(watched_fds_.count(fd) == 1);
   watched_fds_.erase(fd);
+  fd_watch_task_queued_.erase(fd);
 }
 
 }  // namespace base
diff --git a/base/test/test_task_runner.h b/base/test/test_task_runner.h
index 33b37d2..92f9197 100644
--- a/base/test/test_task_runner.h
+++ b/base/test/test_task_runner.h
@@ -22,6 +22,7 @@
 #include <functional>
 #include <list>
 #include <map>
+#include <string>
 
 #include "base/task_runner.h"
 
@@ -33,10 +34,11 @@
   TestTaskRunner();
   ~TestTaskRunner() override;
 
-  void Run();
+  void RunUntilIdle();
+  void __attribute__((__noreturn__)) Run();
 
-  // Returns false in case of errors.
-  bool RunUntilIdle();
+  std::function<void()> CreateCheckpoint(const std::string& checkpoint);
+  void RunUntilCheckpoint(const std::string& checkpoint, int timeout_ms = 5000);
 
   // TaskRunner implementation.
   void PostTask(std::function<void()> closure) override;
@@ -47,11 +49,13 @@
   TestTaskRunner(const TestTaskRunner&) = delete;
   TestTaskRunner& operator=(const TestTaskRunner&) = delete;
 
-  // Returns false in case of errors.
-  bool RunFileDescriptorWatches(int timeout_ms);
+  bool RunOneTask();
+  void QueueFileDescriptorWatches(bool blocking);
 
   std::list<std::function<void()>> task_queue_;
   std::map<int, std::function<void()>> watched_fds_;
+  std::map<int, bool> fd_watch_task_queued_;
+  std::map<std::string, bool> checkpoints_;
 };
 
 }  // namespace base
diff --git a/ipc/BUILD.gn b/ipc/BUILD.gn
new file mode 100644
index 0000000..da81b42
--- /dev/null
+++ b/ipc/BUILD.gn
@@ -0,0 +1,35 @@
+# Copyright (C) 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source_set("ipc") {
+  deps += [ "//base" ]
+  sources = [
+    "src/unix_socket.cc",
+  ]
+}
+
+executable("ipc_unittests") {
+  testonly = true
+  deps += [
+    ":ipc",
+    "//base",
+    "//base:test_support",
+    "//buildtools:gmock",
+    "//buildtools:gtest",
+    "//buildtools:gtest_main",
+  ]
+  sources = [
+    "src/unix_socket_unittest.cc",
+  ]
+}
diff --git a/ipc/src/unix_socket.cc b/ipc/src/unix_socket.cc
new file mode 100644
index 0000000..a9e5f13
--- /dev/null
+++ b/ipc/src/unix_socket.cc
@@ -0,0 +1,428 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ipc/src/unix_socket.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <memory>
+
+#include "base/build_config.h"
+#include "base/logging.h"
+#include "base/task_runner.h"
+#include "base/utils.h"
+
+namespace perfetto {
+namespace ipc {
+
+// TODO(primiano): Add ThreadChecker to methods of this class.
+
+namespace {
+
+// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
+// created with SO_NOSIGPIPE (See InitializeSocket()).
+#if BUILDFLAG(OS_MACOSX)
+constexpr int kNoSigPipe = 0;
+#else
+constexpr int kNoSigPipe = MSG_NOSIGNAL;
+#endif
+
+// Android takes an int instead of socklen_t for the control buffer size.
+#if BUILDFLAG(OS_ANDROID)
+using CBufLenType = size_t;
+#else
+using CBufLenType = socklen_t;
+#endif
+
+bool MakeSockAddr(const std::string& socket_name,
+                  sockaddr_un* addr,
+                  socklen_t* addr_size) {
+  memset(addr, 0, sizeof(*addr));
+  const size_t name_len = socket_name.size();
+  if (name_len >= sizeof(addr->sun_path)) {
+    errno = ENAMETOOLONG;
+    return false;
+  }
+  memcpy(addr->sun_path, socket_name.data(), name_len);
+  if (addr->sun_path[0] == '@')
+    addr->sun_path[0] = '\0';
+  addr->sun_family = AF_UNIX;
+  *addr_size = static_cast<socklen_t>(
+      __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+  return true;
+}
+
+}  // namespace
+
+// static
+std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
+                                               EventListener* event_listener,
+                                               base::TaskRunner* task_runner) {
+  std::unique_ptr<UnixSocket> sock(new UnixSocket(event_listener, task_runner));
+  sock->DoListen(socket_name);
+  return sock;
+}
+
+// static
+std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
+                                                EventListener* event_listener,
+                                                base::TaskRunner* task_runner) {
+  std::unique_ptr<UnixSocket> sock(new UnixSocket(event_listener, task_runner));
+  sock->DoConnect(socket_name);
+  return sock;
+}
+
+UnixSocket::UnixSocket(EventListener* event_listener,
+                       base::TaskRunner* task_runner)
+    : UnixSocket(event_listener, task_runner, base::ScopedFile()) {}
+
+UnixSocket::UnixSocket(EventListener* event_listener,
+                       base::TaskRunner* task_runner,
+                       base::ScopedFile adopt_fd)
+    : event_listener_(event_listener),
+      task_runner_(task_runner),
+      weak_ref_(new WeakRef(this)) {
+  if (adopt_fd) {
+    // Only in the case of OnNewIncomingConnection().
+    fd_ = std::move(adopt_fd);
+    state_ = State::kConnected;
+  } else {
+    fd_.reset(socket(AF_UNIX, SOCK_STREAM, 0));
+  }
+  if (!fd_) {
+    last_error_ = errno;
+    return;
+  }
+
+#if BUILDFLAG(OS_MACOSX)
+  const int no_sigpipe = 1;
+  setsockopt(*fd_, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(no_sigpipe));
+#endif
+  // There is no reason why a socket should outlive the process in case of
+  // exec() by default, this is just working around a broken unix design.
+  int fcntl_res = fcntl(*fd_, FD_CLOEXEC);
+  PERFETTO_DCHECK(fcntl_res == 0);
+
+  // Set non-blocking mode.
+  int flags = fcntl(*fd_, F_GETFL, 0);
+  flags |= O_NONBLOCK;
+  fcntl_res = fcntl(fd(), F_SETFL, flags);
+  PERFETTO_CHECK(fcntl_res == 0);
+
+  std::shared_ptr<WeakRef> weak_ref = weak_ref_;
+  task_runner_->AddFileDescriptorWatch(*fd_, [weak_ref]() {
+    if (weak_ref->sock)
+      weak_ref->sock->OnEvent();
+  });
+}
+
+UnixSocket::~UnixSocket() {
+  weak_ref_->sock = nullptr;  // This will no-op any future callback.
+  Shutdown();
+}
+
+// Called only by the Listen() static constructor.
+void UnixSocket::DoListen(const std::string& socket_name) {
+  PERFETTO_DCHECK(state_ == State::kDisconnected);
+  if (!fd_)
+    return;  // This is the only thing that can gracefully fail in the ctor.
+
+  sockaddr_un addr;
+  socklen_t addr_size;
+  if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+    last_error_ = errno;
+    return;
+  }
+
+// Android takes an int as 3rd argument of bind() instead of socklen_t.
+#if BUILDFLAG(OS_ANDROID)
+  const int bind_size = static_cast<int>(addr_size);
+#else
+  const socklen_t bind_size = addr_size;
+#endif
+
+  if (bind(*fd_, reinterpret_cast<sockaddr*>(&addr), bind_size)) {
+    last_error_ = errno;
+    PERFETTO_DPLOG("bind()");
+    return;
+  }
+  if (listen(*fd_, SOMAXCONN)) {
+    last_error_ = errno;
+    PERFETTO_DPLOG("listen()");
+    return;
+  }
+
+  last_error_ = 0;
+  state_ = State::kListening;
+}
+
+// Called only by the Connect() static constructor.
+void UnixSocket::DoConnect(const std::string& socket_name) {
+  PERFETTO_DCHECK(state_ == State::kDisconnected);
+
+  // This is the only thing that can gracefully fail in the ctor.
+  if (!fd_)
+    return NotifyConnectionState(false);
+
+  sockaddr_un addr;
+  socklen_t addr_size;
+  if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+    last_error_ = errno;
+    return NotifyConnectionState(false);
+  }
+
+  int res = PERFETTO_EINTR(
+      connect(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size));
+  if (res && errno != EINPROGRESS) {
+    last_error_ = errno;
+    return NotifyConnectionState(false);
+  }
+
+  // At this point either |res| == 0 (the connect() succeeded) or started
+  // asynchronously (EINPROGRESS).
+  last_error_ = 0;
+  state_ = State::kConnecting;
+
+  // Even if the socket is non-blocking, connecting to a UNIX socket can be
+  // acknowledged straight away rather than returning EINPROGRESS. In this case
+  // just trigger an OnEvent without waiting for the FD watch. That will poll
+  // the SO_ERROR and evolve the state into either kConnected or kDisconnected.
+  if (res == 0) {
+    std::shared_ptr<WeakRef> weak_ref = weak_ref_;
+    task_runner_->PostTask([weak_ref]() {
+      if (weak_ref->sock)
+        weak_ref->sock->OnEvent();
+    });
+  }
+}
+
+void UnixSocket::OnEvent() {
+  if (state_ == State::kDisconnected)
+    return;  // Some spurious event, typically queued just before Shutdown().
+
+  if (state_ == State::kConnected)
+    return event_listener_->OnDataAvailable(this);
+
+  if (state_ == State::kConnecting) {
+    PERFETTO_DCHECK(fd_);
+    int sock_err = EINVAL;
+    socklen_t err_len = sizeof(sock_err);
+    int res = getsockopt(*fd_, SOL_SOCKET, SO_ERROR, &sock_err, &err_len);
+    if (res == 0 && sock_err == EINPROGRESS)
+      return;  // Not connected yet, just a spurious FD watch wakeup.
+    if (res == 0 && sock_err == 0) {
+      state_ = State::kConnected;
+      return event_listener_->OnConnect(this, true /* connected */);
+    }
+    last_error_ = sock_err;
+    return event_listener_->OnConnect(this, false /* connected */);
+  }
+
+  // New incoming connection.
+  if (state_ == State::kListening) {
+    // There could be more than one incoming connection behind each FD watch
+    // notification. Drain'em all.
+    for (;;) {
+      sockaddr_un cli_addr = {};
+      socklen_t size = sizeof(cli_addr);
+      base::ScopedFile new_fd(PERFETTO_EINTR(
+          accept(*fd_, reinterpret_cast<sockaddr*>(&cli_addr), &size)));
+      if (!new_fd)
+        return;
+      std::unique_ptr<UnixSocket> new_sock(
+          new UnixSocket(event_listener_, task_runner_, std::move(new_fd)));
+      event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
+    }
+  }
+}
+
+bool UnixSocket::Send(const std::string& msg) {
+  return Send(msg.c_str(), msg.size() + 1);
+}
+
+bool UnixSocket::Send(const void* msg, size_t len, int send_fd) {
+  if (state_ != State::kConnected) {
+    last_error_ = ENOTCONN;
+    return false;
+  }
+
+  msghdr msg_hdr = {};
+  iovec iov = {const_cast<void*>(msg), len};
+  msg_hdr.msg_iov = &iov;
+  msg_hdr.msg_iovlen = 1;
+  alignas(cmsghdr) char control_buf[256];
+
+  if (send_fd > -1) {
+    const CBufLenType control_buf_len =
+        static_cast<CBufLenType>(CMSG_SPACE(sizeof(int)));
+    PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
+    memset(control_buf, 0, sizeof(control_buf));
+    msg_hdr.msg_control = control_buf;
+    msg_hdr.msg_controllen = control_buf_len;
+    struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
+    cmsg->cmsg_level = SOL_SOCKET;
+    cmsg->cmsg_type = SCM_RIGHTS;
+    cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+    memcpy(CMSG_DATA(cmsg), &send_fd, sizeof(int));
+    msg_hdr.msg_controllen = cmsg->cmsg_len;
+  }
+
+  const ssize_t sz = PERFETTO_EINTR(sendmsg(*fd_, &msg_hdr, kNoSigPipe));
+  if (sz >= 0) {
+    // There should be no way a non-blocking socket returns < |len|.
+    // If the queueing fails, sendmsg() must return -1 + errno = EWOULDBLOCK.
+    PERFETTO_CHECK(static_cast<size_t>(sz) == len);
+    last_error_ = 0;
+    return true;
+  }
+
+  if (errno == EAGAIN || errno == EWOULDBLOCK) {
+    // A genuine out-of-buffer. The client should retry or give up.
+    // Man pages specify that EAGAIN and EWOULDBLOCK have the same semantic here
+    // and clients should check for both.
+    last_error_ = EAGAIN;
+    return false;
+  }
+
+  // Either the the other endpoint disconnect (ECONNRESET) or some other error
+  // happened.
+  last_error_ = errno;
+  PERFETTO_DPLOG("sendmsg() failed");
+  Shutdown();
+  return false;
+}
+
+void UnixSocket::Shutdown() {
+  std::shared_ptr<WeakRef>& weak_ref = weak_ref_;
+  if (state_ == State::kConnected) {
+    task_runner_->PostTask([weak_ref]() {
+      if (weak_ref->sock)
+        weak_ref->sock->event_listener_->OnDisconnect(weak_ref->sock);
+    });
+  } else if (state_ == State::kConnecting) {
+    task_runner_->PostTask([weak_ref]() {
+      if (weak_ref->sock)
+        weak_ref->sock->event_listener_->OnConnect(weak_ref->sock, false);
+    });
+  }
+  if (fd_) {
+    shutdown(*fd_, SHUT_RDWR);
+    task_runner_->RemoveFileDescriptorWatch(*fd_);
+    fd_.reset();
+  }
+  state_ = State::kDisconnected;
+}
+
+size_t UnixSocket::Receive(void* msg, size_t len, base::ScopedFile* recv_fd) {
+  if (state_ != State::kConnected) {
+    last_error_ = ENOTCONN;
+    return 0;
+  }
+
+  msghdr msg_hdr = {};
+  iovec iov = {msg, len};
+  msg_hdr.msg_iov = &iov;
+  msg_hdr.msg_iovlen = 1;
+  alignas(cmsghdr) char control_buf[256];
+
+  if (recv_fd) {
+    msg_hdr.msg_control = control_buf;
+    msg_hdr.msg_controllen = static_cast<CBufLenType>(CMSG_SPACE(sizeof(int)));
+    PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
+  }
+  const ssize_t sz = PERFETTO_EINTR(recvmsg(*fd_, &msg_hdr, kNoSigPipe));
+  if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+    last_error_ = EAGAIN;
+    return 0;
+  }
+  if (sz == 0) {
+    last_error_ = errno;
+    Shutdown();
+    return 0;
+  }
+  PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
+
+  int* fds = nullptr;
+  uint32_t fds_len = 0;
+
+  if (msg_hdr.msg_controllen > 0) {
+    for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
+         cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
+      const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
+      if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
+        PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
+        PERFETTO_DCHECK(fds == nullptr);
+        fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
+        fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
+      }
+    }
+  }
+
+  if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
+    for (size_t i = 0; fds && i < fds_len; ++i)
+      close(fds[i]);
+    last_error_ = EMSGSIZE;
+    Shutdown();
+    return 0;
+  }
+
+  for (size_t i = 0; fds && i < fds_len; ++i) {
+    if (recv_fd && i == 0) {
+      recv_fd->reset(fds[i]);
+    } else {
+      close(fds[i]);
+    }
+  }
+
+  last_error_ = 0;
+  return static_cast<size_t>(sz);
+}
+
+std::string UnixSocket::ReceiveString(size_t max_length) {
+  std::unique_ptr<char[]> buf(new char[max_length + 1]);
+  size_t rsize = Receive(buf.get(), max_length);
+  PERFETTO_CHECK(static_cast<size_t>(rsize) <= max_length);
+  buf[static_cast<size_t>(rsize)] = '\0';
+  return std::string(buf.get());
+}
+
+void UnixSocket::NotifyConnectionState(bool success) {
+  std::shared_ptr<WeakRef> weak_ref = weak_ref_;
+  task_runner_->PostTask([weak_ref, success]() {
+    if (weak_ref->sock)
+      weak_ref->sock->event_listener_->OnConnect(weak_ref->sock, success);
+  });
+}
+
+UnixSocket::EventListener::~EventListener() {}
+void UnixSocket::EventListener::OnNewIncomingConnection(
+    UnixSocket*,
+    std::unique_ptr<UnixSocket>) {}
+void UnixSocket::EventListener::OnConnect(UnixSocket*, bool) {}
+void UnixSocket::EventListener::OnDisconnect(UnixSocket*) {}
+void UnixSocket::EventListener::OnDataAvailable(UnixSocket*) {}
+
+}  // namespace ipc
+}  // namespace perfetto
diff --git a/ipc/src/unix_socket.h b/ipc/src/unix_socket.h
new file mode 100644
index 0000000..b91db29
--- /dev/null
+++ b/ipc/src/unix_socket.h
@@ -0,0 +1,200 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef IPC_SRC_UNIX_SOCKET_H_
+#define IPC_SRC_UNIX_SOCKET_H_
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#include <memory>
+#include <string>
+
+#include "base/logging.h"
+#include "base/scoped_file.h"
+
+namespace perfetto {
+
+namespace base {
+class TaskRunner;
+}  // namespace base.
+
+namespace ipc {
+
+// A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
+// transfer file descriptors. None of the methods in this class are blocking.
+// The main design goal is API simplicity and strong guarantees on the
+// EventListener callbacks, in order to avoid ending in some undefined state.
+// In case of any error it will aggressively just shut down the socket and
+// notify the failure with OnConnect(false) or OnDisconnect() depending on the
+// state of the socket (see below).
+// EventListener callbacks stop happening as soon as the instance is destroyed.
+//
+// Lifecycle of a client socket:
+//
+//                           Connect()
+//                               |
+//            +------------------+------------------+
+//            | (success)                           | (failure or Shutdown())
+//            V                                     V
+//     OnConnect(true)                         OnConnect(false)
+//            |
+//            V
+//    OnDataAvailable()
+//            |
+//            V
+//     OnDisconnect()  (failure or shutdown)
+//
+//
+// Lifecycle of a server socket:
+//
+//                          Listen()  --> returns false in case of errors.
+//                             |
+//                             V
+//              OnNewIncomingConnection(new_socket)
+//
+//          (|new_socket| inherits the same EventListener)
+//                             |
+//                             V
+//                     OnDataAvailable()
+//                             | (failure or Shutdown())
+//                             V
+//                       OnDisconnect()
+class UnixSocket {
+ public:
+  class EventListener {
+   public:
+    virtual ~EventListener();
+
+    // After Listen().
+    virtual void OnNewIncomingConnection(
+        UnixSocket* self,
+        std::unique_ptr<UnixSocket> new_connection);
+
+    // After Connect(), whether successful or not.
+    virtual void OnConnect(UnixSocket* self, bool connected);
+
+    // After a successful Connect() or OnNewIncomingConnection(). Either the
+    // other endpoint did disconnect or some other error happened.
+    virtual void OnDisconnect(UnixSocket* self);
+
+    // Whenever there is data available to Receive(). Note that spurious FD
+    // watch events are possible, so it is possible that Receive() soon after
+    // OnDataAvailable() returns 0 (just ignore those).
+    virtual void OnDataAvailable(UnixSocket* self);
+  };
+
+  enum class State {
+    kDisconnected = 0,  // Failed connection, peer disconnection or Shutdown().
+    kConnecting,  // Soon after Connect(), before it either succeeds or fails.
+    kConnected,   // After a successful Connect().
+    kListening    // After Listen(), until Shutdown().
+  };
+
+  // Creates a Unix domain socket and starts listening. If |socket_name|
+  // starts with a '@', an abstract socket will be created (Linux/Android only).
+  // Returns always an instance. In case of failure (e.g., another socket
+  // with the same name is  already listening) the returned socket will have
+  // is_listening() == false and last_error() will contain the failure reason.
+  static std::unique_ptr<UnixSocket> Listen(const std::string& socket_name,
+                                            EventListener*,
+                                            base::TaskRunner*);
+
+  // Creates a Unix domain socket and connects to the listening endpoint.
+  // Returns always an instance. EventListener::OnConnect(bool success) will
+  // be called always, whether the connection succeeded or not.
+  static std::unique_ptr<UnixSocket> Connect(const std::string& socket_name,
+                                             EventListener*,
+                                             base::TaskRunner*);
+
+  // This class gives the hard guarantee that no callback is called on the
+  // passed EventListener immediately after the object has been destroyed.
+  // Any queued callback will be silently dropped.
+  ~UnixSocket();
+
+  // Shuts down the current connection, if any. If the socket was Listen()-ing,
+  // stops listening. The socket goes back to kNotInitialized state, so it can
+  // be reused with Listen() or Connect().
+  void Shutdown();
+
+  // Returns true is the message was queued, false if there was no space in the
+  // output buffer, in which case the client should retry or give up.
+  // If any other error happens the socket will be shutdown and
+  // EventListener::OnDisconnect() will be called.
+  // If the socket is not connected, Send() will just return false.
+  // Does not append a null string terminator to msg in any case.
+  bool Send(const void* msg, size_t len, int send_fd = -1);
+  bool Send(const std::string& msg);
+
+  // Returns the number of bytes (<= |len|) written in |msg| or 0 if there
+  // is no data in the buffer to read or an error occurs (in which case a
+  // EventListener::OnDisconnect() will follow).
+  // If the ScopedFile pointer is not null and a FD is received, it moves the
+  // received FD into that. If a FD is received but the ScopedFile pointer is
+  // null, the FD will be automatically closed.
+  size_t Receive(void* msg, size_t len, base::ScopedFile* = nullptr);
+
+  // Only for tests. This is slower than Receive() as it requires a heap
+  // allocation and a copy for the std::string. Guarantees that the returned
+  // string is null terminated even if the underlying message sent by the peer
+  // is not.
+  std::string ReceiveString(size_t max_length = 1024);
+
+  bool is_connected() const { return state_ == State::kConnected; }
+  bool is_listening() const { return state_ == State::kListening; }
+  int fd() const { return fd_.get(); }
+  int last_error() const { return last_error_; }
+
+ private:
+  // Used to decouple the lifetime of the UnixSocket from the callbacks
+  // registered on the TaskRunner, which might happen after UnixSocket has been
+  // destroyed. This is essentially a single-instance weak_ptr<UnixSocket>.
+  // Unfortunately C++11's weak_ptr would require UnixSocket to be a shared_ptr,
+  // which is undesirable here. The |sock| pointer is invalidated by the dtor
+  // of UnixSocket.
+  struct WeakRef {
+    explicit WeakRef(UnixSocket* s) : sock(s) {}
+    ~WeakRef() = default;
+    WeakRef(const WeakRef&) = delete;
+    WeakRef& operator=(const WeakRef&) = delete;
+
+    UnixSocket* sock;
+  };
+
+  UnixSocket(EventListener*, base::TaskRunner*);
+  UnixSocket(EventListener*, base::TaskRunner*, base::ScopedFile);
+  UnixSocket(const UnixSocket&) = delete;
+  UnixSocket& operator=(const UnixSocket&) = delete;
+
+  // Called once by the corresponding public static factory methods.
+  void DoConnect(const std::string& socket_name);
+  void DoListen(const std::string& socket_name);
+
+  void OnEvent();
+  void NotifyConnectionState(bool success);
+
+  base::ScopedFile fd_;
+  State state_ = State::kDisconnected;
+  int last_error_ = 0;
+  EventListener* event_listener_;
+  base::TaskRunner* task_runner_;
+  std::shared_ptr<WeakRef> weak_ref_;
+};
+
+}  // namespace ipc
+}  // namespace perfetto
+
+#endif  // IPC_SRC_UNIX_SOCKET_H_
diff --git a/ipc/src/unix_socket_unittest.cc b/ipc/src/unix_socket_unittest.cc
new file mode 100644
index 0000000..555f2f5
--- /dev/null
+++ b/ipc/src/unix_socket_unittest.cc
@@ -0,0 +1,386 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ipc/src/unix_socket.h"
+
+#include <sys/mman.h>
+
+#include <list>
+
+#include "base/build_config.h"
+#include "base/logging.h"
+#include "base/test/test_task_runner.h"
+#include "base/utils.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+namespace ipc {
+namespace {
+
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Mock;
+
+// Mac OS X doesn't support abstract (i.e. unnamed) sockets.
+#if BUILDFLAG(OS_MACOSX)
+static const char kSocketName[] = "/tmp/test_socket";
+void UnlinkSocket() {
+  unlink(kSocketName);
+}
+#else
+static const char kSocketName[] = "@test_socket";
+void UnlinkSocket() {}
+#endif
+
+class MockEventListener : public UnixSocket::EventListener {
+ public:
+  MOCK_METHOD2(OnNewIncomingConnection, void(UnixSocket*, UnixSocket*));
+  MOCK_METHOD2(OnConnect, void(UnixSocket*, bool));
+  MOCK_METHOD1(OnDisconnect, void(UnixSocket*));
+  MOCK_METHOD1(OnDataAvailable, void(UnixSocket*));
+
+  // GMock doesn't support mocking methods with non-copiable args.
+  void OnNewIncomingConnection(
+      UnixSocket* self,
+      std::unique_ptr<UnixSocket> new_connection) override {
+    incoming_connections_.emplace_back(std::move(new_connection));
+    OnNewIncomingConnection(self, incoming_connections_.back().get());
+  }
+
+  std::unique_ptr<UnixSocket> GetIncomingConnection() {
+    if (incoming_connections_.empty())
+      return nullptr;
+    std::unique_ptr<UnixSocket> sock = std::move(incoming_connections_.front());
+    incoming_connections_.pop_front();
+    return sock;
+  }
+
+ private:
+  std::list<std::unique_ptr<UnixSocket>> incoming_connections_;
+};
+
+class UnixSocketTest : public ::testing::Test {
+ protected:
+  void SetUp() override { UnlinkSocket(); }
+  void TearDown() override { UnlinkSocket(); }
+
+  base::TestTaskRunner task_runner_;
+  MockEventListener event_listener_;
+};
+
+TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_FALSE(cli->is_connected());
+  auto checkpoint = task_runner_.CreateCheckpoint("failure");
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
+      .WillOnce(Invoke([checkpoint](UnixSocket*, bool) { checkpoint(); }));
+  task_runner_.RunUntilCheckpoint("failure");
+}
+
+// Both server and client should see an OnDisconnect() if the server drops
+// incoming connections immediately as they are created.
+TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  // The server will immediately shutdown the connection upon
+  // OnNewIncomingConnection().
+  auto srv_did_shutdown = task_runner_.CreateCheckpoint("srv_did_shutdown");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(
+          Invoke([this, srv_did_shutdown](UnixSocket*, UnixSocket* new_conn) {
+            EXPECT_CALL(event_listener_, OnDisconnect(new_conn));
+            new_conn->Shutdown();
+            srv_did_shutdown();
+          }));
+
+  auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(Invoke([checkpoint](UnixSocket*, bool) { checkpoint(); }));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  task_runner_.RunUntilCheckpoint("srv_did_shutdown");
+
+  // Trying to send something will trigger the disconnection notification.
+  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+  EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
+      .WillOnce(
+          Invoke([cli_disconnected](UnixSocket*) { cli_disconnected(); }));
+  EXPECT_FALSE(cli->Send("whatever"));
+  task_runner_.RunUntilCheckpoint("cli_disconnected");
+}
+
+TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, cli_connected, srv_disconnected](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
+            .WillOnce(Invoke(
+                [srv_disconnected](UnixSocket*) { srv_disconnected(); }));
+        cli_connected();
+      }));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+
+  auto srv_conn = event_listener_.GetIncomingConnection();
+  ASSERT_TRUE(srv_conn);
+  ASSERT_TRUE(cli->is_connected());
+
+  auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
+  EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
+      .WillOnce(Invoke([cli_did_recv](UnixSocket* s) {
+        ASSERT_EQ("srv>cli", s->ReceiveString());
+        cli_did_recv();
+      }));
+
+  auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
+  EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
+      .WillOnce(Invoke([srv_did_recv](UnixSocket* s) {
+        ASSERT_EQ("cli>srv", s->ReceiveString());
+        srv_did_recv();
+      }));
+  ASSERT_TRUE(cli->Send("cli>srv"));
+  ASSERT_TRUE(srv_conn->Send("srv>cli"));
+  task_runner_.RunUntilCheckpoint("cli_did_recv");
+  task_runner_.RunUntilCheckpoint("srv_did_recv");
+
+  // Check that Send/Receive() fails gracefully once the socket is closed.
+  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+  EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
+      .WillOnce(
+          Invoke([cli_disconnected](UnixSocket*) { cli_disconnected(); }));
+  cli->Shutdown();
+  char msg[4];
+  ASSERT_EQ(0u, cli->Receive(&msg, sizeof(msg)));
+  ASSERT_EQ("", cli->ReceiveString());
+  ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
+  ASSERT_EQ("", srv_conn->ReceiveString());
+  ASSERT_FALSE(cli->Send("foo"));
+  ASSERT_FALSE(srv_conn->Send("bar"));
+  srv->Shutdown();
+  task_runner_.RunUntilCheckpoint("cli_disconnected");
+  task_runner_.RunUntilCheckpoint("srv_disconnected");
+}
+
+// Mostly a stress tests. Connects kNumClients clients to the same server and
+// tests that all can exchange data and can see the expected sequence of events.
+TEST_F(UnixSocketTest, SeveralClients) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+  constexpr size_t kNumClients = 32;
+  std::unique_ptr<UnixSocket> cli[kNumClients];
+
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .Times(kNumClients)
+      .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(s))
+            .WillOnce(Invoke([](UnixSocket* t) {
+              ASSERT_EQ("PING", t->ReceiveString());
+              ASSERT_TRUE(t->Send("PONG"));
+            }));
+      }));
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
+        .WillOnce(Invoke([](UnixSocket* s, bool success) {
+          ASSERT_TRUE(success);
+          ASSERT_TRUE(s->Send("PING"));
+        }));
+
+    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+        .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+          ASSERT_EQ("PONG", s->ReceiveString());
+          checkpoint();
+        }));
+  }
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    task_runner_.RunUntilCheckpoint(std::to_string(i));
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+  }
+}
+
+// Creates two processes. The server process creates a file and passes it over
+// the socket to the client. Both processes mmap the file in shared mode and
+// check that they see the same contents.
+TEST_F(UnixSocketTest, SharedMemory) {
+  int pipes[2];
+  ASSERT_EQ(0, pipe(pipes));
+
+  pid_t pid = fork();
+  ASSERT_GE(pid, 0);
+  constexpr size_t kTmpSize = 4096;
+
+  if (pid == 0) {
+    // Child process.
+    FILE* tmp = tmpfile();
+    ASSERT_NE(nullptr, tmp);
+    int tmp_fd = fileno(tmp);
+    ASSERT_FALSE(ftruncate(tmp_fd, kTmpSize));
+    char* mem = reinterpret_cast<char*>(
+        mmap(nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, tmp_fd, 0));
+    ASSERT_NE(nullptr, mem);
+    memcpy(mem, "shm rocks", 10);
+
+    auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+    ASSERT_TRUE(srv->is_listening());
+    // Signal the other process that it can connect.
+    ASSERT_EQ(1, PERFETTO_EINTR(write(pipes[1], ".", 1)));
+    auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_server");
+    EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+        .WillOnce(Invoke(
+            [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
+              ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+              // Wait for the client to change this again.
+              EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
+                  .WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
+                    ASSERT_EQ("change notify", s->ReceiveString());
+                    ASSERT_STREQ("rock more", mem);
+                    checkpoint();
+                  }));
+            }));
+    task_runner_.RunUntilCheckpoint("change_seen_by_server");
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(&event_listener_));
+    _exit(0);
+  } else {
+    char sync_cmd = '\0';
+    ASSERT_EQ(1, PERFETTO_EINTR(read(pipes[0], &sync_cmd, 1)));
+    ASSERT_EQ('.', sync_cmd);
+    auto cli =
+        UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+    auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
+        .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+          char msg[32];
+          base::ScopedFile fd;
+          ASSERT_EQ(5u, s->Receive(msg, sizeof(msg), &fd));
+          ASSERT_STREQ("txfd", msg);
+          ASSERT_TRUE(fd);
+          char* mem = reinterpret_cast<char*>(mmap(
+              nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, *fd, 0));
+          ASSERT_NE(nullptr, mem);
+          mem[9] = '\0';  // Just to get a clean error in case of test failure.
+          ASSERT_STREQ("shm rocks", mem);
+
+          // Now change the shared memory and ping the other process.
+          memcpy(mem, "rock more", 10);
+          ASSERT_TRUE(s->Send("change notify"));
+          checkpoint();
+        }));
+    task_runner_.RunUntilCheckpoint("change_seen_by_client");
+    int st = 0;
+    PERFETTO_EINTR(waitpid(pid, &st, 0));
+    ASSERT_FALSE(WIFSIGNALED(st)) << "Server died with signal " << WTERMSIG(st);
+    EXPECT_TRUE(WIFEXITED(st));
+    ASSERT_EQ(0, WEXITSTATUS(st));
+  }
+}
+
+constexpr size_t kAtomicWrites_FrameSize = 1123;
+bool AtomicWrites_SendAttempt(UnixSocket* s,
+                              base::TaskRunner* task_runner,
+                              int num_frame) {
+  char buf[kAtomicWrites_FrameSize];
+  memset(buf, static_cast<char>(num_frame), sizeof(buf));
+  if (s->Send(buf, sizeof(buf)))
+    return true;
+  task_runner->PostTask(
+      std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
+  return false;
+}
+
+// Creates a client-server pair. The client sends continuously data to the
+// server. Upon each Send() attempt, the client sends a buffer which is memset()
+// with a unique number (0 to kNumFrames). We are deliberately trying to fill
+// the socket output buffer, so we expect some of these Send()s to fail.
+// The client is extremely aggressive and, when a Send() fails, just keeps
+// re-posting it with the same unique number. The server verifies that we
+// receive one and exactly one of each buffers, without any gaps or truncation.
+TEST_F(UnixSocketTest, SendIsAtomic) {
+  static constexpr int kNumFrames = 127;
+
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+
+  auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
+  std::set<int> received_iterations;
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, &received_iterations, all_frames_done](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillRepeatedly(
+                Invoke([&received_iterations, all_frames_done](UnixSocket* s) {
+                  char buf[kAtomicWrites_FrameSize];
+                  size_t res = s->Receive(buf, sizeof(buf));
+                  if (res == 0)
+                    return;  // Spurious select(), could happen.
+                  ASSERT_EQ(kAtomicWrites_FrameSize, res);
+                  // Check that we didn't get two truncated frames.
+                  for (size_t i = 0; i < sizeof(buf); i++)
+                    ASSERT_EQ(buf[0], buf[i]);
+                  ASSERT_EQ(0u, received_iterations.count(buf[0]));
+                  received_iterations.insert(buf[0]);
+                  if (received_iterations.size() == kNumFrames)
+                    all_frames_done();
+                }));
+      }));
+
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(
+          Invoke([cli_connected](UnixSocket*, bool) { cli_connected(); }));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  ASSERT_TRUE(cli->is_connected());
+
+  bool did_requeue = false;
+  for (int i = 0; i < kNumFrames; i++)
+    did_requeue |= !AtomicWrites_SendAttempt(cli.get(), &task_runner_, i);
+
+  // We expect that at least one of the kNumFrames didn't fit in the socket
+  // buffer and was re-posted, otherwise this entire test would be pointless.
+  ASSERT_TRUE(did_requeue);
+
+  task_runner_.RunUntilCheckpoint("all_frames_done");
+}
+
+// TODO(primiano): add a test to check that in the case of a peer sending a fd
+// and the other end just doing a recv (without taking it), the fd is closed and
+// not left around.
+
+// TODO(primiano); add a test to check that a socket can be reused after
+// Shutdown(),
+
+// TODO(primiano): add a test to check that OnDisconnect() is called in all
+// possible cases.
+
+// TODO(primiano): add tests that destroy the socket in all possible stages and
+// verify that no spurious EventListener callback is received.
+
+}  // namespace
+}  // namespace ipc
+}  // namespace perfetto