Merge "trace_processor: change cursor class to be scoped to a filter operation"
diff --git a/Android.bp b/Android.bp
index 9365d34..e7f0d6c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -35,12 +35,12 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/page_allocator.cc",
-    "src/base/sock_utils.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
     "src/base/time.cc",
+    "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
     "src/base/virtual_destructors.cc",
     "src/base/watchdog_posix.cc",
@@ -49,7 +49,6 @@
     "src/ipc/deferred.cc",
     "src/ipc/host_impl.cc",
     "src/ipc/service_proxy.cc",
-    "src/ipc/unix_socket.cc",
     "src/ipc/virtual_destructors.cc",
     "src/protozero/message.cc",
     "src/protozero/message_handle.cc",
@@ -161,12 +160,12 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/page_allocator.cc",
-    "src/base/sock_utils.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
     "src/base/time.cc",
+    "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
     "src/base/virtual_destructors.cc",
     "src/base/watchdog_posix.cc",
@@ -175,7 +174,6 @@
     "src/ipc/deferred.cc",
     "src/ipc/host_impl.cc",
     "src/ipc/service_proxy.cc",
-    "src/ipc/unix_socket.cc",
     "src/ipc/virtual_destructors.cc",
     "src/perfetto_cmd/main.cc",
     "src/perfetto_cmd/perfetto_cmd.cc",
@@ -302,7 +300,6 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/page_allocator.cc",
-    "src/base/sock_utils.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -311,6 +308,7 @@
     "src/base/test/vm_test_utils.cc",
     "src/base/thread_checker.cc",
     "src/base/time.cc",
+    "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
     "src/base/virtual_destructors.cc",
     "src/base/watchdog_posix.cc",
@@ -319,7 +317,6 @@
     "src/ipc/deferred.cc",
     "src/ipc/host_impl.cc",
     "src/ipc/service_proxy.cc",
-    "src/ipc/unix_socket.cc",
     "src/ipc/virtual_destructors.cc",
     "src/protozero/message.cc",
     "src/protozero/message_handle.cc",
@@ -3618,12 +3615,12 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/page_allocator.cc",
-    "src/base/sock_utils.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
     "src/base/time.cc",
+    "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
     "src/base/virtual_destructors.cc",
     "src/base/watchdog_posix.cc",
@@ -3632,7 +3629,6 @@
     "src/ipc/deferred.cc",
     "src/ipc/host_impl.cc",
     "src/ipc/service_proxy.cc",
-    "src/ipc/unix_socket.cc",
     "src/ipc/virtual_destructors.cc",
     "src/protozero/message.cc",
     "src/protozero/message_handle.cc",
@@ -3809,7 +3805,6 @@
     "src/base/page_allocator.cc",
     "src/base/page_allocator_unittest.cc",
     "src/base/scoped_file_unittest.cc",
-    "src/base/sock_utils.cc",
     "src/base/string_splitter.cc",
     "src/base/string_splitter_unittest.cc",
     "src/base/string_utils.cc",
@@ -3825,6 +3820,8 @@
     "src/base/thread_checker_unittest.cc",
     "src/base/time.cc",
     "src/base/time_unittest.cc",
+    "src/base/unix_socket.cc",
+    "src/base/unix_socket_unittest.cc",
     "src/base/unix_task_runner.cc",
     "src/base/utils_unittest.cc",
     "src/base/virtual_destructors.cc",
@@ -3841,16 +3838,16 @@
     "src/ipc/host_impl_unittest.cc",
     "src/ipc/service_proxy.cc",
     "src/ipc/test/ipc_integrationtest.cc",
-    "src/ipc/unix_socket.cc",
-    "src/ipc/unix_socket_unittest.cc",
     "src/ipc/virtual_destructors.cc",
     "src/perfetto_cmd/perfetto_cmd.cc",
     "src/perfetto_cmd/rate_limiter.cc",
     "src/perfetto_cmd/rate_limiter_unittest.cc",
     "src/profiling/memory/bookkeeping.cc",
     "src/profiling/memory/bookkeeping_unittest.cc",
+    "src/profiling/memory/bounded_queue_unittest.cc",
     "src/profiling/memory/client.cc",
     "src/profiling/memory/client_unittest.cc",
+    "src/profiling/memory/heapprofd_integrationtest.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/record_reader_unittest.cc",
     "src/profiling/memory/socket_listener.cc",
@@ -3859,6 +3856,7 @@
     "src/profiling/memory/string_interner_unittest.cc",
     "src/profiling/memory/unwinding.cc",
     "src/profiling/memory/unwinding_unittest.cc",
+    "src/profiling/memory/wire_protocol.cc",
     "src/protozero/message.cc",
     "src/protozero/message_handle.cc",
     "src/protozero/message_handle_unittest.cc",
diff --git a/include/perfetto/base/BUILD.gn b/include/perfetto/base/BUILD.gn
index ea5fa38..10da3d0 100644
--- a/include/perfetto/base/BUILD.gn
+++ b/include/perfetto/base/BUILD.gn
@@ -12,6 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import("../../../gn/perfetto.gni")
+
 source_set("base") {
   sources = [
     "build_config.h",
@@ -39,4 +41,7 @@
   if (is_android) {
     sources += [ "android_task_runner.h" ]
   }
+  if (!build_with_chromium) {
+    sources += [ "unix_socket.h" ]
+  }
 }
diff --git a/include/perfetto/base/sock_utils.h b/include/perfetto/base/sock_utils.h
deleted file mode 100644
index 0008bf3..0000000
--- a/include/perfetto/base/sock_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef INCLUDE_PERFETTO_BASE_SOCK_UTILS_H_
-#define INCLUDE_PERFETTO_BASE_SOCK_UTILS_H_
-
-#include "perfetto/base/scoped_file.h"
-
-namespace perfetto {
-namespace base {
-
-ssize_t Send(int fd,
-             const void* msg,
-             size_t len,
-             const int* send_fds,
-             size_t num_fds);
-
-ssize_t Receive(int fd,
-                void* msg,
-                size_t len,
-                base::ScopedFile* fd_vec,
-                size_t max_files);
-}  // namespace base
-}  // namespace perfetto
-
-#endif  // INCLUDE_PERFETTO_BASE_SOCK_UTILS_H_
diff --git a/src/ipc/unix_socket.h b/include/perfetto/base/unix_socket.h
similarity index 92%
rename from src/ipc/unix_socket.h
rename to include/perfetto/base/unix_socket.h
index 32839d3..7daf5a2 100644
--- a/src/ipc/unix_socket.h
+++ b/include/perfetto/base/unix_socket.h
@@ -14,8 +14,8 @@
  * limitations under the License.
  */
 
-#ifndef SRC_IPC_UNIX_SOCKET_H_
-#define SRC_IPC_UNIX_SOCKET_H_
+#ifndef INCLUDE_PERFETTO_BASE_UNIX_SOCKET_H_
+#define INCLUDE_PERFETTO_BASE_UNIX_SOCKET_H_
 
 #include <stdint.h>
 #include <sys/types.h>
@@ -25,16 +25,34 @@
 
 #include "perfetto/base/logging.h"
 #include "perfetto/base/scoped_file.h"
+#include "perfetto/base/utils.h"
 #include "perfetto/base/weak_ptr.h"
-#include "perfetto/ipc/basic_types.h"
+
+#include <sys/socket.h>
+#include <sys/un.h>
 
 namespace perfetto {
-
 namespace base {
-class TaskRunner;
-}  // namespace base.
 
-namespace ipc {
+class TaskRunner;
+
+ssize_t SockSend(int fd,
+                 const void* msg,
+                 size_t len,
+                 const int* send_fds,
+                 size_t num_fds);
+
+ssize_t SockReceive(int fd,
+                    void* msg,
+                    size_t len,
+                    base::ScopedFile* fd_vec,
+                    size_t max_files);
+
+bool MakeSockAddr(const std::string& socket_name,
+                  sockaddr_un* addr,
+                  socklen_t* addr_size);
+
+base::ScopedFile CreateSocket();
 
 // A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
 // transfer file descriptors. None of the methods in this class are blocking.
@@ -234,7 +252,7 @@
   base::WeakPtrFactory<UnixSocket> weak_ptr_factory_;
 };
 
-}  // namespace ipc
+}  // namespace base
 }  // namespace perfetto
 
-#endif  // SRC_IPC_UNIX_SOCKET_H_
+#endif  // INCLUDE_PERFETTO_BASE_UNIX_SOCKET_H_
diff --git a/include/perfetto/base/utils.h b/include/perfetto/base/utils.h
index 9b18214..4d2f439 100644
--- a/include/perfetto/base/utils.h
+++ b/include/perfetto/base/utils.h
@@ -22,6 +22,9 @@
 #include <errno.h>
 #include <stddef.h>
 #include <stdlib.h>
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+#include <sys/types.h>
+#endif
 
 #define PERFETTO_EINTR(x)                                   \
   ({                                                        \
@@ -61,6 +64,11 @@
 namespace perfetto {
 namespace base {
 
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+constexpr uid_t kInvalidUid = static_cast<uid_t>(-1);
+constexpr pid_t kInvalidPid = static_cast<pid_t>(-1);
+#endif
+
 constexpr size_t kPageSize = 4096;
 constexpr size_t kMaxCpus = 128;
 
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 68939a9..019cd7b 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -88,13 +88,13 @@
 
 if (!build_with_chromium) {
   # This cannot be in :base as it does not build on WASM.
-  source_set("sock_utils") {
+  source_set("unix_socket") {
     deps = [
       "../../gn:default_deps",
       "../../include/perfetto/base",
     ]
     sources = [
-      "sock_utils.cc",
+      "unix_socket.cc",
     ]
   }
 }
@@ -153,7 +153,10 @@
       "utils_unittest.cc",
     ]
   }
-  if (!build_with_chromium && (is_linux || is_android)) {
-    sources += [ "watchdog_unittest.cc" ]
+  if (!build_with_chromium) {
+    sources += [ "unix_socket_unittest.cc" ]
+    if (is_linux || is_android) {
+      sources += [ "watchdog_unittest.cc" ]
+    }
   }
 }
diff --git a/src/base/sock_utils.cc b/src/base/sock_utils.cc
deleted file mode 100644
index d643d8d..0000000
--- a/src/base/sock_utils.cc
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "perfetto/base/sock_utils.h"
-
-#include <sys/socket.h>
-#include <sys/un.h>
-
-namespace perfetto {
-namespace base {
-namespace {
-// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
-// created with SO_NOSIGPIPE (See InitializeSocket()).
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
-constexpr int kNoSigPipe = 0;
-#else
-constexpr int kNoSigPipe = MSG_NOSIGNAL;
-#endif
-
-// Android takes an int instead of socklen_t for the control buffer size.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-using CBufLenType = size_t;
-#else
-using CBufLenType = socklen_t;
-#endif
-}
-
-// The CMSG_* macros use NULL instead of nullptr.
-#pragma GCC diagnostic push
-#if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
-#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
-#endif
-
-ssize_t Send(int fd,
-             const void* msg,
-             size_t len,
-             const int* send_fds,
-             size_t num_fds) {
-  msghdr msg_hdr = {};
-  iovec iov = {const_cast<void*>(msg), len};
-  msg_hdr.msg_iov = &iov;
-  msg_hdr.msg_iovlen = 1;
-  alignas(cmsghdr) char control_buf[256];
-
-  if (num_fds > 0) {
-    const CBufLenType control_buf_len =
-        static_cast<CBufLenType>(CMSG_SPACE(num_fds * sizeof(int)));
-    PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
-    memset(control_buf, 0, sizeof(control_buf));
-    msg_hdr.msg_control = control_buf;
-    msg_hdr.msg_controllen = control_buf_len;
-    struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
-    cmsg->cmsg_level = SOL_SOCKET;
-    cmsg->cmsg_type = SCM_RIGHTS;
-    cmsg->cmsg_len = static_cast<CBufLenType>(CMSG_LEN(num_fds * sizeof(int)));
-    memcpy(CMSG_DATA(cmsg), send_fds, num_fds * sizeof(int));
-    msg_hdr.msg_controllen = cmsg->cmsg_len;
-  }
-
-  return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
-}
-
-ssize_t Receive(int fd,
-                void* msg,
-                size_t len,
-                base::ScopedFile* fd_vec,
-                size_t max_files) {
-  msghdr msg_hdr = {};
-  iovec iov = {msg, len};
-  msg_hdr.msg_iov = &iov;
-  msg_hdr.msg_iovlen = 1;
-  alignas(cmsghdr) char control_buf[256];
-
-  if (max_files > 0) {
-    msg_hdr.msg_control = control_buf;
-    msg_hdr.msg_controllen =
-        static_cast<CBufLenType>(CMSG_SPACE(max_files * sizeof(int)));
-    PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
-  }
-  const ssize_t sz = PERFETTO_EINTR(recvmsg(fd, &msg_hdr, kNoSigPipe));
-  if (sz <= 0) {
-    return sz;
-  }
-  PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
-
-  int* fds = nullptr;
-  uint32_t fds_len = 0;
-
-  if (max_files > 0) {
-    for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
-         cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
-      const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
-      if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
-        PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
-        PERFETTO_CHECK(fds == nullptr);
-        fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
-        fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
-      }
-    }
-  }
-
-  if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
-    for (size_t i = 0; fds && i < fds_len; ++i)
-      close(fds[i]);
-    errno = EMSGSIZE;
-    return -1;
-  }
-
-  for (size_t i = 0; fds && i < fds_len; ++i) {
-    if (i < max_files)
-      fd_vec[i].reset(fds[i]);
-    else
-      close(fds[i]);
-  }
-
-  return sz;
-}
-
-#pragma GCC diagnostic pop
-
-}  // namespace base
-}  // namespace perfetto
diff --git a/src/ipc/unix_socket.cc b/src/base/unix_socket.cc
similarity index 72%
rename from src/ipc/unix_socket.cc
rename to src/base/unix_socket.cc
index 4acc59f..e3ed5f2 100644
--- a/src/ipc/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#include "src/ipc/unix_socket.h"
+#include "perfetto/base/unix_socket.h"
 
 #include <errno.h>
 #include <fcntl.h>
@@ -31,7 +31,6 @@
 
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
-#include "perfetto/base/sock_utils.h"
 #include "perfetto/base/task_runner.h"
 #include "perfetto/base/utils.h"
 
@@ -40,11 +39,117 @@
 #endif
 
 namespace perfetto {
-namespace ipc {
-
-// TODO(primiano): Add ThreadChecker to methods of this class.
+namespace base {
 
 namespace {
+// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
+// created with SO_NOSIGPIPE (See InitializeSocket()).
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+constexpr int kNoSigPipe = 0;
+#else
+constexpr int kNoSigPipe = MSG_NOSIGNAL;
+#endif
+
+// Android takes an int instead of socklen_t for the control buffer size.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+using CBufLenType = size_t;
+#else
+using CBufLenType = socklen_t;
+#endif
+}
+
+// The CMSG_* macros use NULL instead of nullptr.
+#pragma GCC diagnostic push
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
+#endif
+
+ssize_t SockSend(int fd,
+                 const void* msg,
+                 size_t len,
+                 const int* send_fds,
+                 size_t num_fds) {
+  msghdr msg_hdr = {};
+  iovec iov = {const_cast<void*>(msg), len};
+  msg_hdr.msg_iov = &iov;
+  msg_hdr.msg_iovlen = 1;
+  alignas(cmsghdr) char control_buf[256];
+
+  if (num_fds > 0) {
+    const CBufLenType control_buf_len =
+        static_cast<CBufLenType>(CMSG_SPACE(num_fds * sizeof(int)));
+    PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
+    memset(control_buf, 0, sizeof(control_buf));
+    msg_hdr.msg_control = control_buf;
+    msg_hdr.msg_controllen = control_buf_len;
+    struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
+    cmsg->cmsg_level = SOL_SOCKET;
+    cmsg->cmsg_type = SCM_RIGHTS;
+    cmsg->cmsg_len = static_cast<CBufLenType>(CMSG_LEN(num_fds * sizeof(int)));
+    memcpy(CMSG_DATA(cmsg), send_fds, num_fds * sizeof(int));
+    msg_hdr.msg_controllen = cmsg->cmsg_len;
+  }
+
+  return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
+}
+
+ssize_t SockReceive(int fd,
+                    void* msg,
+                    size_t len,
+                    ScopedFile* fd_vec,
+                    size_t max_files) {
+  msghdr msg_hdr = {};
+  iovec iov = {msg, len};
+  msg_hdr.msg_iov = &iov;
+  msg_hdr.msg_iovlen = 1;
+  alignas(cmsghdr) char control_buf[256];
+
+  if (max_files > 0) {
+    msg_hdr.msg_control = control_buf;
+    msg_hdr.msg_controllen =
+        static_cast<CBufLenType>(CMSG_SPACE(max_files * sizeof(int)));
+    PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
+  }
+  const ssize_t sz = PERFETTO_EINTR(recvmsg(fd, &msg_hdr, kNoSigPipe));
+  if (sz <= 0) {
+    return sz;
+  }
+  PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
+
+  int* fds = nullptr;
+  uint32_t fds_len = 0;
+
+  if (max_files > 0) {
+    for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
+         cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
+      const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
+      if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
+        PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
+        PERFETTO_CHECK(fds == nullptr);
+        fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
+        fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
+      }
+    }
+  }
+
+  if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
+    for (size_t i = 0; fds && i < fds_len; ++i)
+      close(fds[i]);
+    errno = EMSGSIZE;
+    return -1;
+  }
+
+  for (size_t i = 0; fds && i < fds_len; ++i) {
+    if (i < max_files)
+      fd_vec[i].reset(fds[i]);
+    else
+      close(fds[i]);
+  }
+
+  return sz;
+}
+
+#pragma GCC diagnostic pop
 
 bool MakeSockAddr(const std::string& socket_name,
                   sockaddr_un* addr,
@@ -64,27 +169,27 @@
   return true;
 }
 
-base::ScopedFile CreateSocket() {
-  return base::ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
+ScopedFile CreateSocket() {
+  return ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
 }
 
-}  // namespace
+// TODO(primiano): Add ThreadChecker to methods of this class.
 
 // static
-base::ScopedFile UnixSocket::CreateAndBind(const std::string& socket_name) {
-  base::ScopedFile fd = CreateSocket();
+ScopedFile UnixSocket::CreateAndBind(const std::string& socket_name) {
+  ScopedFile fd = CreateSocket();
   if (!fd)
     return fd;
 
   sockaddr_un addr;
   socklen_t addr_size;
   if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
-    return base::ScopedFile();
+    return ScopedFile();
   }
 
   if (bind(*fd, reinterpret_cast<sockaddr*>(&addr), addr_size)) {
     PERFETTO_DPLOG("bind()");
-    return base::ScopedFile();
+    return ScopedFile();
   }
 
   return fd;
@@ -93,15 +198,15 @@
 // static
 std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
                                                EventListener* event_listener,
-                                               base::TaskRunner* task_runner) {
+                                               TaskRunner* task_runner) {
   // Forward the call to the Listen() overload below.
   return Listen(CreateAndBind(socket_name), event_listener, task_runner);
 }
 
 // static
-std::unique_ptr<UnixSocket> UnixSocket::Listen(base::ScopedFile socket_fd,
+std::unique_ptr<UnixSocket> UnixSocket::Listen(ScopedFile socket_fd,
                                                EventListener* event_listener,
-                                               base::TaskRunner* task_runner) {
+                                               TaskRunner* task_runner) {
   std::unique_ptr<UnixSocket> sock(new UnixSocket(
       event_listener, task_runner, std::move(socket_fd), State::kListening));
   return sock;
@@ -110,22 +215,21 @@
 // static
 std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
                                                 EventListener* event_listener,
-                                                base::TaskRunner* task_runner) {
+                                                TaskRunner* task_runner) {
   std::unique_ptr<UnixSocket> sock(new UnixSocket(event_listener, task_runner));
   sock->DoConnect(socket_name);
   return sock;
 }
 
-UnixSocket::UnixSocket(EventListener* event_listener,
-                       base::TaskRunner* task_runner)
+UnixSocket::UnixSocket(EventListener* event_listener, TaskRunner* task_runner)
     : UnixSocket(event_listener,
                  task_runner,
-                 base::ScopedFile(),
+                 ScopedFile(),
                  State::kDisconnected) {}
 
 UnixSocket::UnixSocket(EventListener* event_listener,
-                       base::TaskRunner* task_runner,
-                       base::ScopedFile adopt_fd,
+                       TaskRunner* task_runner,
+                       ScopedFile adopt_fd,
                        State adopt_state)
     : event_listener_(event_listener),
       task_runner_(task_runner),
@@ -179,7 +283,7 @@
 
   SetBlockingIO(false);
 
-  base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
   task_runner_->AddFileDescriptorWatch(*fd_, [weak_ptr]() {
     if (weak_ptr)
       weak_ptr->OnEvent();
@@ -223,7 +327,7 @@
   // just trigger an OnEvent without waiting for the FD watch. That will poll
   // the SO_ERROR and evolve the state into either kConnected or kDisconnected.
   if (res == 0) {
-    base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+    WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
     task_runner_->PostTask([weak_ptr]() {
       if (weak_ptr)
         weak_ptr->OnEvent();
@@ -281,7 +385,7 @@
     for (;;) {
       sockaddr_un cli_addr = {};
       socklen_t size = sizeof(cli_addr);
-      base::ScopedFile new_fd(PERFETTO_EINTR(
+      ScopedFile new_fd(PERFETTO_EINTR(
           accept(*fd_, reinterpret_cast<sockaddr*>(&cli_addr), &size)));
       if (!new_fd)
         return;
@@ -317,7 +421,7 @@
 
   if (blocking_mode == BlockingMode::kBlocking)
     SetBlockingIO(true);
-  const ssize_t sz = base::Send(*fd_, msg, len, send_fds, num_fds);
+  const ssize_t sz = SockSend(*fd_, msg, len, send_fds, num_fds);
   if (blocking_mode == BlockingMode::kBlocking)
     SetBlockingIO(false);
 
@@ -347,7 +451,7 @@
 }
 
 void UnixSocket::Shutdown(bool notify) {
-  base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
   if (notify) {
     if (state_ == State::kConnected) {
       task_runner_->PostTask([weak_ptr]() {
@@ -376,14 +480,14 @@
 
 size_t UnixSocket::Receive(void* msg,
                            size_t len,
-                           base::ScopedFile* fd_vec,
+                           ScopedFile* fd_vec,
                            size_t max_files) {
   if (state_ != State::kConnected) {
     last_error_ = ENOTCONN;
     return 0;
   }
 
-  const ssize_t sz = base::Receive(*fd_, msg, len, fd_vec, max_files);
+  const ssize_t sz = SockReceive(*fd_, msg, len, fd_vec, max_files);
   if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
     last_error_ = EAGAIN;
     return 0;
@@ -409,7 +513,7 @@
   if (!success)
     Shutdown(false);
 
-  base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
   task_runner_->PostTask([weak_ptr, success]() {
     if (weak_ptr)
       weak_ptr->event_listener_->OnConnect(weak_ptr.get(), success);
@@ -435,5 +539,5 @@
 void UnixSocket::EventListener::OnDisconnect(UnixSocket*) {}
 void UnixSocket::EventListener::OnDataAvailable(UnixSocket*) {}
 
-}  // namespace ipc
+}  // namespace base
 }  // namespace perfetto
diff --git a/src/ipc/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
similarity index 97%
rename from src/ipc/unix_socket_unittest.cc
rename to src/base/unix_socket_unittest.cc
index 024072f..5cdd6ab 100644
--- a/src/ipc/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#include "src/ipc/unix_socket.h"
+#include "perfetto/base/unix_socket.h"
 
 #include <sys/mman.h>
 
@@ -31,7 +31,7 @@
 #include "src/ipc/test/test_socket.h"
 
 namespace perfetto {
-namespace ipc {
+namespace base {
 namespace {
 
 using ::testing::_;
@@ -74,7 +74,7 @@
   void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
   void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
 
-  base::TestTaskRunner task_runner_;
+  TestTaskRunner task_runner_;
   MockEventListener event_listener_;
 };
 
@@ -199,15 +199,15 @@
   ASSERT_TRUE(srv_conn);
   ASSERT_TRUE(cli->is_connected());
 
-  base::ScopedFile null_fd(open("/dev/null", O_RDONLY));
-  base::ScopedFile zero_fd(open("/dev/zero", O_RDONLY));
+  ScopedFile null_fd(open("/dev/null", O_RDONLY));
+  ScopedFile zero_fd(open("/dev/zero", O_RDONLY));
 
   auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
   EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
       .WillRepeatedly(Invoke([cli_did_recv](UnixSocket* s) {
-        base::ScopedFile fd_buf[3];
+        ScopedFile fd_buf[3];
         char buf[sizeof(cli_str)];
-        if (!s->Receive(buf, sizeof(buf), fd_buf, base::ArraySize(fd_buf)))
+        if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
           return;
         ASSERT_STREQ(srv_str, buf);
         ASSERT_NE(*fd_buf[0], -1);
@@ -225,9 +225,9 @@
   auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
   EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
       .WillRepeatedly(Invoke([srv_did_recv](UnixSocket* s) {
-        base::ScopedFile fd_buf[3];
+        ScopedFile fd_buf[3];
         char buf[sizeof(srv_str)];
-        if (!s->Receive(buf, sizeof(buf), fd_buf, base::ArraySize(fd_buf)))
+        if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
           return;
         ASSERT_STREQ(cli_str, buf);
         ASSERT_NE(*fd_buf[0], -1);
@@ -339,7 +339,7 @@
 
   if (pid == 0) {
     // Child process.
-    base::TempFile scoped_tmp = base::TempFile::CreateUnlinked();
+    TempFile scoped_tmp = TempFile::CreateUnlinked();
     int tmp_fd = scoped_tmp.fd();
     ASSERT_FALSE(ftruncate(tmp_fd, kTmpSize));
     char* mem = reinterpret_cast<char*>(
@@ -379,7 +379,7 @@
     EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
         .WillOnce(Invoke([checkpoint](UnixSocket* s) {
           char msg[32];
-          base::ScopedFile fd;
+          ScopedFile fd;
           ASSERT_EQ(5u, s->Receive(msg, sizeof(msg), &fd));
           ASSERT_STREQ("txfd", msg);
           ASSERT_TRUE(fd);
@@ -405,7 +405,7 @@
 
 constexpr size_t kAtomicWrites_FrameSize = 1123;
 bool AtomicWrites_SendAttempt(UnixSocket* s,
-                              base::TaskRunner* task_runner,
+                              TaskRunner* task_runner,
                               int num_frame) {
   char buf[kAtomicWrites_FrameSize];
   memset(buf, static_cast<char>(num_frame), sizeof(buf));
@@ -547,7 +547,7 @@
 
   // Perform the blocking send form another thread.
   std::thread tx_thread([] {
-    base::TestTaskRunner tx_task_runner;
+    TestTaskRunner tx_task_runner;
     MockEventListener tx_events;
     auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
 
@@ -594,7 +594,7 @@
 
   // Perform the blocking send form another thread.
   std::thread tx_thread([] {
-    base::TestTaskRunner tx_task_runner;
+    TestTaskRunner tx_task_runner;
     MockEventListener tx_events;
     auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
 
@@ -634,5 +634,5 @@
 // verify that no spurious EventListener callback is received.
 
 }  // namespace
-}  // namespace ipc
+}  // namespace base
 }  // namespace perfetto
diff --git a/src/ipc/BUILD.gn b/src/ipc/BUILD.gn
index 4dffcf7..8a7e4e1 100644
--- a/src/ipc/BUILD.gn
+++ b/src/ipc/BUILD.gn
@@ -30,12 +30,12 @@
   public_configs = [ "../../gn:default_config" ]
   public_deps = [
     "../../include/perfetto/ipc",
+    "../base:unix_socket",
   ]
   deps = [
     ":wire_protocol",
     "../../gn:default_deps",
     "../base",
-    "../base:sock_utils",
   ]
   sources = [
     "buffered_frame_deserializer.cc",
@@ -44,8 +44,6 @@
     "host_impl.cc",
     "host_impl.h",
     "service_proxy.cc",
-    "unix_socket.cc",
-    "unix_socket.h",
     "virtual_destructors.cc",
   ]
 }
@@ -78,7 +76,6 @@
     "deferred_unittest.cc",
     "host_impl_unittest.cc",
     "test/ipc_integrationtest.cc",
-    "unix_socket_unittest.cc",
   ]
 }
 
diff --git a/src/ipc/client_impl.cc b/src/ipc/client_impl.cc
index cc58bf5..d648df5 100644
--- a/src/ipc/client_impl.cc
+++ b/src/ipc/client_impl.cc
@@ -44,13 +44,14 @@
 ClientImpl::ClientImpl(const char* socket_name, base::TaskRunner* task_runner)
     : task_runner_(task_runner), weak_ptr_factory_(this) {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
-  sock_ = UnixSocket::Connect(socket_name, this, task_runner);
+  sock_ = base::UnixSocket::Connect(socket_name, this, task_runner);
 }
 
 ClientImpl::~ClientImpl() {
   // Ensure we are not destroyed in the middle of invoking a reply.
   PERFETTO_DCHECK(!invoking_method_reply_);
-  OnDisconnect(nullptr);  // The UnixSocket* ptr is not used in OnDisconnect().
+  OnDisconnect(
+      nullptr);  // The base::UnixSocket* ptr is not used in OnDisconnect().
 }
 
 void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
@@ -122,12 +123,12 @@
   // the send and PostTask the reply later? Right now we are making Send()
   // blocking as a workaround. Propagate bakpressure to the caller instead.
   bool res = sock_->Send(buf.data(), buf.size(), fd,
-                         UnixSocket::BlockingMode::kBlocking);
+                         base::UnixSocket::BlockingMode::kBlocking);
   PERFETTO_CHECK(res || !sock_->is_connected());
   return res;
 }
 
-void ClientImpl::OnConnect(UnixSocket*, bool connected) {
+void ClientImpl::OnConnect(base::UnixSocket*, bool connected) {
   // Drain the BindService() calls that were queued before establishig the
   // connection with the host.
   for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings_) {
@@ -140,7 +141,7 @@
   queued_bindings_.clear();
 }
 
-void ClientImpl::OnDisconnect(UnixSocket*) {
+void ClientImpl::OnDisconnect(base::UnixSocket*) {
   for (auto it : service_bindings_) {
     base::WeakPtr<ServiceProxy>& service_proxy = it.second;
     task_runner_->PostTask([service_proxy] {
@@ -152,7 +153,7 @@
   queued_bindings_.clear();
 }
 
-void ClientImpl::OnDataAvailable(UnixSocket*) {
+void ClientImpl::OnDataAvailable(base::UnixSocket*) {
   size_t rsize;
   do {
     auto buf = frame_deserializer_.BeginReceive();
diff --git a/src/ipc/client_impl.h b/src/ipc/client_impl.h
index 2402d17..d53c05b 100644
--- a/src/ipc/client_impl.h
+++ b/src/ipc/client_impl.h
@@ -19,9 +19,9 @@
 
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/base/unix_socket.h"
 #include "perfetto/ipc/client.h"
 #include "src/ipc/buffered_frame_deserializer.h"
-#include "src/ipc/unix_socket.h"
 
 #include "src/ipc/wire_protocol.pb.h"
 
@@ -39,7 +39,7 @@
 
 class ServiceDescriptor;
 
-class ClientImpl : public Client, public UnixSocket::EventListener {
+class ClientImpl : public Client, public base::UnixSocket::EventListener {
  public:
   ClientImpl(const char* socket_name, base::TaskRunner*);
   ~ClientImpl() override;
@@ -49,10 +49,10 @@
   void UnbindService(ServiceID) override;
   base::ScopedFile TakeReceivedFD() override;
 
-  // UnixSocket::EventListener implementation.
-  void OnConnect(UnixSocket*, bool connected) override;
-  void OnDisconnect(UnixSocket*) override;
-  void OnDataAvailable(UnixSocket*) override;
+  // base::UnixSocket::EventListener implementation.
+  void OnConnect(base::UnixSocket*, bool connected) override;
+  void OnDisconnect(base::UnixSocket*) override;
+  void OnDataAvailable(base::UnixSocket*) override;
 
   RequestID BeginInvoke(ServiceID,
                         const std::string& method_name,
@@ -82,7 +82,7 @@
   void OnInvokeMethodReply(QueuedRequest, const Frame::InvokeMethodReply&);
 
   bool invoking_method_reply_ = false;
-  std::unique_ptr<UnixSocket> sock_;
+  std::unique_ptr<base::UnixSocket> sock_;
   base::TaskRunner* const task_runner_;
   RequestID last_request_id_ = 0;
   BufferedFrameDeserializer frame_deserializer_;
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 9922cb8..445e3d0 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -24,13 +24,13 @@
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "perfetto/base/temp_file.h"
+#include "perfetto/base/unix_socket.h"
 #include "perfetto/base/utils.h"
 #include "perfetto/ipc/service_descriptor.h"
 #include "perfetto/ipc/service_proxy.h"
 #include "src/base/test/test_task_runner.h"
 #include "src/ipc/buffered_frame_deserializer.h"
 #include "src/ipc/test/test_socket.h"
-#include "src/ipc/unix_socket.h"
 
 #include "src/ipc/test/client_unittest_messages.pb.h"
 
@@ -78,7 +78,7 @@
 
 // A fake host implementation. Listens on |kSockName| and replies to IPC
 // metohds like a real one.
-class FakeHost : public UnixSocket::EventListener {
+class FakeHost : public base::UnixSocket::EventListener {
  public:
   struct FakeMethod {
     MethodID id;
@@ -103,7 +103,7 @@
 
   explicit FakeHost(base::TaskRunner* task_runner) {
     DESTROY_TEST_SOCK(kSockName);
-    listening_sock = UnixSocket::Listen(kSockName, this, task_runner);
+    listening_sock = base::UnixSocket::Listen(kSockName, this, task_runner);
     EXPECT_TRUE(listening_sock->is_listening());
   }
   ~FakeHost() override { DESTROY_TEST_SOCK(kSockName); }
@@ -117,15 +117,15 @@
     return svc;
   }
 
-  // UnixSocket::EventListener implementation.
+  // base::UnixSocket::EventListener implementation.
   void OnNewIncomingConnection(
-      UnixSocket*,
-      std::unique_ptr<UnixSocket> new_connection) override {
+      base::UnixSocket*,
+      std::unique_ptr<base::UnixSocket> new_connection) override {
     ASSERT_FALSE(client_sock);
     client_sock = std::move(new_connection);
   }
 
-  void OnDataAvailable(UnixSocket* sock) override {
+  void OnDataAvailable(base::UnixSocket* sock) override {
     if (sock != client_sock.get())
       return;
     auto buf = frame_deserializer.BeginReceive();
@@ -187,8 +187,8 @@
   }
 
   BufferedFrameDeserializer frame_deserializer;
-  std::unique_ptr<UnixSocket> listening_sock;
-  std::unique_ptr<UnixSocket> client_sock;
+  std::unique_ptr<base::UnixSocket> listening_sock;
+  std::unique_ptr<base::UnixSocket> client_sock;
   std::map<std::string, std::unique_ptr<FakeService>> services;
   ServiceID last_service_id = 0;
   int next_reply_fd = -1;
diff --git a/src/ipc/host_impl.cc b/src/ipc/host_impl.cc
index 2e9fe79..02d0a68 100644
--- a/src/ipc/host_impl.cc
+++ b/src/ipc/host_impl.cc
@@ -56,14 +56,14 @@
     : task_runner_(task_runner), weak_ptr_factory_(this) {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  sock_ = UnixSocket::Listen(std::move(socket_fd), this, task_runner_);
+  sock_ = base::UnixSocket::Listen(std::move(socket_fd), this, task_runner_);
 }
 
 HostImpl::HostImpl(const char* socket_name, base::TaskRunner* task_runner)
     : task_runner_(task_runner), weak_ptr_factory_(this) {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  sock_ = UnixSocket::Listen(socket_name, this, task_runner_);
+  sock_ = base::UnixSocket::Listen(socket_name, this, task_runner_);
 }
 
 HostImpl::~HostImpl() = default;
@@ -81,8 +81,9 @@
   return true;
 }
 
-void HostImpl::OnNewIncomingConnection(UnixSocket*,
-                                       std::unique_ptr<UnixSocket> new_conn) {
+void HostImpl::OnNewIncomingConnection(
+    base::UnixSocket*,
+    std::unique_ptr<base::UnixSocket> new_conn) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   std::unique_ptr<ClientConnection> client(new ClientConnection());
   ClientID client_id = ++last_client_id_;
@@ -92,7 +93,7 @@
   clients_[client_id] = std::move(client);
 }
 
-void HostImpl::OnDataAvailable(UnixSocket* sock) {
+void HostImpl::OnDataAvailable(base::UnixSocket* sock) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   auto it = clients_by_socket_.find(sock);
   if (it == clients_by_socket_.end())
@@ -237,11 +238,11 @@
   // the send and PostTask the reply later? Right now we are making Send()
   // blocking as a workaround. Propagate bakpressure to the caller instead.
   bool res = client->sock->Send(buf.data(), buf.size(), fd,
-                                UnixSocket::BlockingMode::kBlocking);
+                                base::UnixSocket::BlockingMode::kBlocking);
   PERFETTO_CHECK(res || !client->sock->is_connected());
 }
 
-void HostImpl::OnDisconnect(UnixSocket* sock) {
+void HostImpl::OnDisconnect(base::UnixSocket* sock) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   auto it = clients_by_socket_.find(sock);
   if (it == clients_by_socket_.end())
diff --git a/src/ipc/host_impl.h b/src/ipc/host_impl.h
index bef1d9e..5a062d4 100644
--- a/src/ipc/host_impl.h
+++ b/src/ipc/host_impl.h
@@ -24,17 +24,17 @@
 
 #include "perfetto/base/task_runner.h"
 #include "perfetto/base/thread_checker.h"
+#include "perfetto/base/unix_socket.h"
 #include "perfetto/ipc/deferred.h"
 #include "perfetto/ipc/host.h"
 #include "src/ipc/buffered_frame_deserializer.h"
-#include "src/ipc/unix_socket.h"
 
 namespace perfetto {
 namespace ipc {
 
 class Frame;
 
-class HostImpl : public Host, public UnixSocket::EventListener {
+class HostImpl : public Host, public base::UnixSocket::EventListener {
  public:
   HostImpl(const char* socket_name, base::TaskRunner*);
   HostImpl(base::ScopedFile socket_fd, base::TaskRunner*);
@@ -43,20 +43,20 @@
   // Host implementation.
   bool ExposeService(std::unique_ptr<Service>) override;
 
-  // UnixSocket::EventListener implementation.
-  void OnNewIncomingConnection(UnixSocket*,
-                               std::unique_ptr<UnixSocket>) override;
-  void OnDisconnect(UnixSocket*) override;
-  void OnDataAvailable(UnixSocket*) override;
+  // base::UnixSocket::EventListener implementation.
+  void OnNewIncomingConnection(base::UnixSocket*,
+                               std::unique_ptr<base::UnixSocket>) override;
+  void OnDisconnect(base::UnixSocket*) override;
+  void OnDataAvailable(base::UnixSocket*) override;
 
-  const UnixSocket* sock() const { return sock_.get(); }
+  const base::UnixSocket* sock() const { return sock_.get(); }
 
  private:
   // Owns the per-client receive buffer (BufferedFrameDeserializer).
   struct ClientConnection {
     ~ClientConnection();
     ClientID id;
-    std::unique_ptr<UnixSocket> sock;
+    std::unique_ptr<base::UnixSocket> sock;
     BufferedFrameDeserializer frame_deserializer;
     base::ScopedFile received_fd;
   };
@@ -85,9 +85,9 @@
 
   base::TaskRunner* const task_runner_;
   std::map<ServiceID, ExposedService> services_;
-  std::unique_ptr<UnixSocket> sock_;  // The listening socket.
+  std::unique_ptr<base::UnixSocket> sock_;  // The listening socket.
   std::map<ClientID, std::unique_ptr<ClientConnection>> clients_;
-  std::map<UnixSocket*, ClientConnection*> clients_by_socket_;
+  std::map<base::UnixSocket*, ClientConnection*> clients_by_socket_;
   ServiceID last_service_id_ = 0;
   ClientID last_client_id_ = 0;
   base::WeakPtrFactory<HostImpl> weak_ptr_factory_;
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index f48775d..3ff1f99 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -22,13 +22,13 @@
 #include "gtest/gtest.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/temp_file.h"
+#include "perfetto/base/unix_socket.h"
 #include "perfetto/base/utils.h"
 #include "perfetto/ipc/service.h"
 #include "perfetto/ipc/service_descriptor.h"
 #include "src/base/test/test_task_runner.h"
 #include "src/ipc/buffered_frame_deserializer.h"
 #include "src/ipc/test/test_socket.h"
-#include "src/ipc/unix_socket.h"
 
 #include "src/ipc/test/client_unittest_messages.pb.h"
 #include "src/ipc/wire_protocol.pb.h"
@@ -78,7 +78,7 @@
   ServiceDescriptor descriptor_;
 };
 
-class FakeClient : public UnixSocket::EventListener {
+class FakeClient : public base::UnixSocket::EventListener {
  public:
   MOCK_METHOD0(OnConnect, void());
   MOCK_METHOD0(OnDisconnect, void());
@@ -88,7 +88,7 @@
   MOCK_METHOD0(OnRequestError, void());
 
   explicit FakeClient(base::TaskRunner* task_runner) {
-    sock_ = UnixSocket::Connect(kSockName, this, task_runner);
+    sock_ = base::UnixSocket::Connect(kSockName, this, task_runner);
   }
 
   ~FakeClient() override = default;
@@ -118,15 +118,15 @@
     SendFrame(frame, fd);
   }
 
-  // UnixSocket::EventListener implementation.
-  void OnConnect(UnixSocket*, bool success) override {
+  // base::UnixSocket::EventListener implementation.
+  void OnConnect(base::UnixSocket*, bool success) override {
     ASSERT_TRUE(success);
     OnConnect();
   }
 
-  void OnDisconnect(UnixSocket*) override { OnDisconnect(); }
+  void OnDisconnect(base::UnixSocket*) override { OnDisconnect(); }
 
-  void OnDataAvailable(UnixSocket* sock) override {
+  void OnDataAvailable(base::UnixSocket* sock) override {
     ASSERT_EQ(sock_.get(), sock);
     auto buf = frame_deserializer_.BeginReceive();
     base::ScopedFile fd;
@@ -156,7 +156,7 @@
   }
 
   BufferedFrameDeserializer frame_deserializer_;
-  std::unique_ptr<UnixSocket> sock_;
+  std::unique_ptr<base::UnixSocket> sock_;
   std::map<uint64_t /* request_id */, int /* num_replies_received */> requests_;
   ServiceID last_bound_service_id_;
 };
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 0ccc573..78e57c9 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -15,83 +15,46 @@
 import("../../../gn/perfetto.gni")
 import("//build_overrides/build.gni")
 
-source_set("record_reader") {
-  public_configs = [ "../../../gn:default_config" ]
-  deps = [
-    "../../../gn:default_deps",
-    "../../base",
-  ]
-  sources = [
-    "record_reader.cc",
-    "record_reader.h",
-  ]
-}
-
-source_set("unwinding") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+source_set("wire_protocol") {
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
     "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
     "../../base",
   ]
   sources = [
-    "unwinding.cc",
-    "unwinding.h",
+    "wire_protocol.cc",
+    "wire_protocol.h",
   ]
 }
 
-source_set("socket_listener") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+source_set("daemon") {
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
-    ":record_reader",
-    ":unwinding",
+    ":wire_protocol",
+    "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
     "../../base",
     "../../ipc",
   ]
   sources = [
-    "socket_listener.cc",
-    "socket_listener.h",
-  ]
-}
-
-source_set("string_interner") {
-  public_configs = [ "../../../gn:default_config" ]
-  deps = [
-    "../../../gn:default_deps",
-    "../../base",
-  ]
-  sources = [
-    "string_interner.cc",
-    "string_interner.h",
-  ]
-}
-
-source_set("bookkeeping") {
-  public_configs = [ "../../../gn:default_config" ]
-  deps = [
-    ":string_interner",
-    "../../../gn:default_deps",
-    "../../base",
-  ]
-  sources = [
     "bookkeeping.cc",
     "bookkeeping.h",
+    "record_reader.cc",
+    "record_reader.h",
+    "socket_listener.cc",
+    "socket_listener.h",
+    "string_interner.cc",
+    "string_interner.h",
+    "unwinding.cc",
+    "unwinding.h",
   ]
 }
 
 source_set("client") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
+    ":wire_protocol",
     "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
     "../../base",
@@ -103,25 +66,22 @@
 }
 
 source_set("unittests") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   testonly = true
   deps = [
-    ":bookkeeping",
     ":client",
-    ":record_reader",
-    ":socket_listener",
-    ":string_interner",
-    ":unwinding",
+    ":daemon",
+    ":wire_protocol",
+    "../../../gn:default_deps",
     "../../../gn:gtest_deps",
     "../../base",
     "../../base:test_support",
   ]
   sources = [
     "bookkeeping_unittest.cc",
+    "bounded_queue_unittest.cc",
     "client_unittest.cc",
+    "heapprofd_integrationtest.cc",
     "record_reader_unittest.cc",
     "socket_listener_unittest.cc",
     "string_interner_unittest.cc",
@@ -130,10 +90,8 @@
 }
 
 executable("heapprofd") {
-  public_configs = [ "../../../gn:default_config" ]
   deps = [
-    ":socket_listener",
-    ":unwinding",
+    ":daemon",
     "../../../gn:default_deps",
     "../../base",
     "../../ipc",
diff --git a/src/profiling/memory/bookkeeping.cc b/src/profiling/memory/bookkeeping.cc
index 1624cee..7a31502 100644
--- a/src/profiling/memory/bookkeeping.cc
+++ b/src/profiling/memory/bookkeeping.cc
@@ -16,37 +16,100 @@
 
 #include "src/profiling/memory/bookkeeping.h"
 
+#include "perfetto/base/logging.h"
+
 namespace perfetto {
 
-MemoryBookkeeping::Node* MemoryBookkeeping::Node::GetOrCreateChild(
-    const MemoryBookkeeping::InternedCodeLocation& loc) {
+GlobalCallstackTrie::Node* GlobalCallstackTrie::Node::GetOrCreateChild(
+    const InternedCodeLocation& loc) {
   Node* child = children_.Get(loc);
   if (!child)
     child = children_.Emplace(loc, this);
   return child;
 }
 
-void MemoryBookkeeping::RecordMalloc(const std::vector<CodeLocation>& locs,
-                                     uint64_t address,
-                                     uint64_t size) {
-  Node* node = &root_;
-  node->cum_size_ += size;
-  for (const MemoryBookkeeping::CodeLocation& loc : locs) {
-    node = node->GetOrCreateChild(InternCodeLocation(loc));
-    node->cum_size_ += size;
+void HeapTracker::RecordMalloc(const std::vector<CodeLocation>& callstack,
+                               uint64_t address,
+                               uint64_t size,
+                               uint64_t sequence_number) {
+  auto it = allocations_.find(address);
+  if (it != allocations_.end()) {
+    if (it->second.sequence_number > sequence_number) {
+      return;
+    } else {
+      // Clean up previous allocation by pretending a free happened just after
+      // it.
+      // CommitFree only uses the sequence number to check whether the
+      // currently active allocation is newer than the free, so we can make
+      // up a sequence_number here.
+      CommitFree(it->second.sequence_number + 1, address);
+    }
   }
 
-  allocations_.emplace(address, std::make_pair(size, node));
+  GlobalCallstackTrie::Node* node =
+      callsites_->IncrementCallsite(callstack, size);
+  allocations_.emplace(address, Allocation(size, sequence_number, node));
+
+  // Keep the sequence tracker consistent.
+  RecordFree(kNoopFree, sequence_number);
 }
 
-void MemoryBookkeeping::RecordFree(uint64_t address) {
+void HeapTracker::RecordFree(uint64_t address, uint64_t sequence_number) {
+  if (sequence_number != sequence_number_ + 1) {
+    pending_frees_.emplace(sequence_number, address);
+    return;
+  }
+
+  if (address != kNoopFree)
+    CommitFree(sequence_number, address);
+  sequence_number_++;
+
+  // At this point some other pending frees might be eligible to be committed.
+  auto it = pending_frees_.begin();
+  while (it != pending_frees_.end() && it->first == sequence_number_ + 1) {
+    if (it->second != kNoopFree)
+      CommitFree(it->first, it->second);
+    sequence_number_++;
+    it = pending_frees_.erase(it);
+  }
+}
+
+void HeapTracker::CommitFree(uint64_t sequence_number, uint64_t address) {
   auto leaf_it = allocations_.find(address);
   if (leaf_it == allocations_.end())
     return;
 
-  std::pair<uint64_t, Node*> value = leaf_it->second;
-  uint64_t size = value.first;
-  Node* node = value.second;
+  const Allocation& value = leaf_it->second;
+  if (value.sequence_number > sequence_number)
+    return;
+  allocations_.erase(leaf_it);
+}
+
+uint64_t GlobalCallstackTrie::GetCumSizeForTesting(
+    const std::vector<CodeLocation>& callstack) {
+  Node* node = &root_;
+  for (const CodeLocation& loc : callstack) {
+    node = node->children_.Get(InternCodeLocation(loc));
+    if (node == nullptr)
+      return 0;
+  }
+  return node->cum_size_;
+}
+
+GlobalCallstackTrie::Node* GlobalCallstackTrie::IncrementCallsite(
+    const std::vector<CodeLocation>& callstack,
+    uint64_t size) {
+  Node* node = &root_;
+  node->cum_size_ += size;
+  for (const CodeLocation& loc : callstack) {
+    node = node->GetOrCreateChild(InternCodeLocation(loc));
+    node->cum_size_ += size;
+  }
+  return node;
+}
+
+void GlobalCallstackTrie::DecrementNode(Node* node, uint64_t size) {
+  PERFETTO_DCHECK(node->cum_size_ >= size);
 
   bool delete_prev = false;
   Node* prev = nullptr;
@@ -58,19 +121,6 @@
     prev = node;
     node = node->parent_;
   }
-
-  allocations_.erase(leaf_it);
-}
-
-uint64_t MemoryBookkeeping::GetCumSizeForTesting(
-    const std::vector<CodeLocation>& locs) {
-  Node* node = &root_;
-  for (const MemoryBookkeeping::CodeLocation& loc : locs) {
-    node = node->children_.Get(InternCodeLocation(loc));
-    if (node == nullptr)
-      return 0;
-  }
-  return node->cum_size_;
 }
 
 }  // namespace perfetto
diff --git a/src/profiling/memory/bookkeeping.h b/src/profiling/memory/bookkeeping.h
index 7e5f705..bd56dd8 100644
--- a/src/profiling/memory/bookkeeping.h
+++ b/src/profiling/memory/bookkeeping.h
@@ -26,39 +26,37 @@
 
 namespace perfetto {
 
-class MemoryBookkeeping {
- public:
-  struct CodeLocation {
-    CodeLocation(std::string map_n, std::string function_n)
-        : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
+class HeapTracker;
 
-    std::string map_name;
-    std::string function_name;
-  };
+struct CodeLocation {
+  CodeLocation(std::string map_n, std::string function_n)
+      : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
 
-  void RecordMalloc(const std::vector<CodeLocation>& stack,
-                    uint64_t address,
-                    uint64_t size);
-  void RecordFree(uint64_t address);
-  uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+  std::string map_name;
+  std::string function_name;
+};
 
- private:
-  struct InternedCodeLocation {
-    StringInterner::InternedString map_name;
-    StringInterner::InternedString function_name;
+// Internal data-structure for GlobalCallstackTrie to save memory if the same
+// function is named multiple times.
+struct InternedCodeLocation {
+  StringInterner::InternedString map_name;
+  StringInterner::InternedString function_name;
 
-    bool operator<(const InternedCodeLocation& other) const {
-      if (map_name.id() == other.map_name.id())
-        return function_name.id() < other.function_name.id();
-      return map_name.id() < other.map_name.id();
-    }
-  };
-
-  InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
-    return {interner_.Intern(loc.map_name),
-            interner_.Intern(loc.function_name)};
+  bool operator<(const InternedCodeLocation& other) const {
+    if (map_name.id() == other.map_name.id())
+      return function_name.id() < other.function_name.id();
+    return map_name.id() < other.map_name.id();
   }
+};
 
+// Graph of function callsites. This is shared between heap dumps for
+// different processes. Each call site is represented by a
+// GlobalCallstackTrie::Node that is owned by the parent (i.e. calling)
+// callsite. It has a pointer to its parent, which means the function call-graph
+// can be reconstructed from a GlobalCallstackTrie::Node by walking down the
+// pointers to the parents.
+class GlobalCallstackTrie {
+ public:
   // Node in a tree of function traces that resulted in an allocation. For
   // instance, if alloc_buf is called from foo and bar, which are called from
   // main, the tree looks as following.
@@ -77,25 +75,119 @@
   // alloc_buf to the leafs of this tree.
   class Node {
    public:
+    // This is opaque except to GlobalCallstackTrie.
+    friend class GlobalCallstackTrie;
+
     Node(InternedCodeLocation location) : Node(std::move(location), nullptr) {}
     Node(InternedCodeLocation location, Node* parent)
         : parent_(parent), location_(std::move(location)) {}
 
+   private:
     Node* GetOrCreateChild(const InternedCodeLocation& loc);
 
     uint64_t cum_size_ = 0;
-    Node* parent_;
+    Node* const parent_;
     const InternedCodeLocation location_;
     base::LookupSet<Node, const InternedCodeLocation, &Node::location_>
         children_;
   };
 
-  // Address -> (size, code location)
-  std::map<uint64_t, std::pair<uint64_t, Node*>> allocations_;
+  GlobalCallstackTrie() = default;
+  GlobalCallstackTrie(const GlobalCallstackTrie&) = delete;
+  GlobalCallstackTrie& operator=(const GlobalCallstackTrie&) = delete;
+
+  uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+  Node* IncrementCallsite(const std::vector<CodeLocation>& locs, uint64_t size);
+  static void DecrementNode(Node* node, uint64_t size);
+
+ private:
+  InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
+    return {interner_.Intern(loc.map_name),
+            interner_.Intern(loc.function_name)};
+  }
+
   StringInterner interner_;
   Node root_{{interner_.Intern(""), interner_.Intern("")}};
 };
 
+// Snapshot for memory allocations of a particular process. Shares callsites
+// with other processes.
+class HeapTracker {
+ public:
+  // Caller needs to ensure that callsites outlives the HeapTracker.
+  explicit HeapTracker(GlobalCallstackTrie* callsites)
+      : callsites_(callsites) {}
+
+  void RecordMalloc(const std::vector<CodeLocation>& stack,
+                    uint64_t address,
+                    uint64_t size,
+                    uint64_t sequence_number);
+  void RecordFree(uint64_t address, uint64_t sequence_number);
+
+ private:
+  static constexpr uint64_t kNoopFree = 0;
+  struct Allocation {
+    Allocation(uint64_t size, uint64_t seq, GlobalCallstackTrie::Node* n)
+        : alloc_size(size), sequence_number(seq), node(n) {}
+
+    Allocation() = default;
+    Allocation(const Allocation&) = delete;
+    Allocation(Allocation&& other) noexcept {
+      alloc_size = other.alloc_size;
+      sequence_number = other.sequence_number;
+      node = other.node;
+      other.node = nullptr;
+    }
+
+    ~Allocation() {
+      if (node)
+        GlobalCallstackTrie::DecrementNode(node, alloc_size);
+    }
+
+    uint64_t alloc_size;
+    uint64_t sequence_number;
+    GlobalCallstackTrie::Node* node;
+  };
+
+  // Sequencing logic works as following:
+  // * mallocs are immediately commited to |allocations_|. They are rejected if
+  //   the current malloc for the address has a higher sequence number.
+  //
+  //   If all operations with sequence numbers lower than the malloc have been
+  //   commited to |allocations_|, sequence_number_ is advanced and all
+  //   unblocked pending operations after the current id are commited to
+  //   |allocations_|. Otherwise, a no-op record is added to the pending
+  //   operations queue to maintain the contiguity of the sequence.
+
+  // * for frees:
+  //   if all operations with sequence numbers lower than the free have
+  //     been commited to |allocations_| (i.e sequence_number_ ==
+  //     sequence_number - 1) the free is commited to |allocations_| and
+  //     sequence_number_ is advanced. All unblocked pending operations are
+  //     commited to |allocations_|.
+  //   otherwise: the free is added to the queue of pending operations.
+
+  // Commits a free operation into |allocations_|.
+  // This must be  called after all operations up to sequence_number have been
+  // commited to |allocations_|.
+  void CommitFree(uint64_t sequence_number, uint64_t address);
+
+  // Address -> (size, sequence_number, code location)
+  std::map<uint64_t, Allocation> allocations_;
+
+  // if allocation address != 0, there is pending free of the address.
+  // if == 0, the pending operation is a no-op.
+  // No-op operations come from allocs that have already been commited to
+  // |allocations_|. It is important to keep track of them in the list of
+  // pending to maintain the contiguity of the sequence.
+  std::map<uint64_t /* seq_id */, uint64_t /* allocation address */>
+      pending_frees_;
+
+  // The sequence number all mallocs and frees have been handled up to.
+  uint64_t sequence_number_ = 0;
+  GlobalCallstackTrie* const callsites_;
+};
+
 }  // namespace perfetto
 
 #endif  // SRC_PROFILING_MEMORY_BOOKKEEPING_H_
diff --git a/src/profiling/memory/bookkeeping_unittest.cc b/src/profiling/memory/bookkeeping_unittest.cc
index 23da7cf..1e7b369 100644
--- a/src/profiling/memory/bookkeeping_unittest.cc
+++ b/src/profiling/memory/bookkeeping_unittest.cc
@@ -22,22 +22,65 @@
 namespace perfetto {
 namespace {
 
-TEST(BookkeepingTest, Basic) {
-  std::vector<MemoryBookkeeping::CodeLocation> stack{
+std::vector<CodeLocation> stack() {
+  return {
       {"map1", "fun1"}, {"map2", "fun2"},
   };
+}
 
-  std::vector<MemoryBookkeeping::CodeLocation> stack2{
+std::vector<CodeLocation> stack2() {
+  return {
       {"map1", "fun1"}, {"map3", "fun3"},
   };
-  MemoryBookkeeping mb;
-  mb.RecordMalloc(stack, 1, 5);
-  mb.RecordMalloc(stack2, 2, 2);
-  ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
-  mb.RecordFree(2);
-  ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
-  mb.RecordFree(1);
-  ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, Basic) {
+  uint64_t sequence_number = 1;
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+
+  hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+  hd.RecordMalloc(stack2(), 2, 2, sequence_number++);
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+  hd.RecordFree(2, sequence_number++);
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+  hd.RecordFree(1, sequence_number++);
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, TwoHeapTrackers) {
+  uint64_t sequence_number = 1;
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+  {
+    HeapTracker hd2(&c);
+
+    hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+    hd2.RecordMalloc(stack2(), 2, 2, sequence_number++);
+    ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+  }
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+}
+
+TEST(BookkeepingTest, ReplaceAlloc) {
+  uint64_t sequence_number = 1;
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+
+  hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+  hd.RecordMalloc(stack2(), 1, 2, sequence_number++);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack()), 0);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 2);
+}
+
+TEST(BookkeepingTest, OutOfOrder) {
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+
+  hd.RecordMalloc(stack(), 1, 5, 1);
+  hd.RecordMalloc(stack2(), 1, 2, 0);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack()), 5);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 0);
 }
 
 }  // namespace
diff --git a/src/profiling/memory/bounded_queue.h b/src/profiling/memory/bounded_queue.h
new file mode 100644
index 0000000..521ec5d
--- /dev/null
+++ b/src/profiling/memory/bounded_queue.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+#define SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+
+#include "perfetto/base/logging.h"
+
+// Transport messages between threads. Multiple-producer / single-consumer.
+//
+// This has to outlive both the consumer and the producer who have to
+// negotiate termination separately, if needed. This is currently only used
+// in a scenario where the producer and consumer both are loops that never
+// terminate.
+template <typename T>
+class BoundedQueue {
+ public:
+  BoundedQueue() : BoundedQueue(1) {}
+  BoundedQueue(size_t capacity) : capacity_(capacity) {
+    PERFETTO_CHECK(capacity > 0);
+  }
+
+  void Add(T item) {
+    std::unique_lock<std::mutex> l(mutex_);
+    if (deque_.size() == capacity_)
+      full_cv_.wait(l, [this] { return deque_.size() < capacity_; });
+    deque_.emplace_back(std::move(item));
+    if (deque_.size() == 1)
+      empty_cv_.notify_all();
+  }
+
+  T Get() {
+    std::unique_lock<std::mutex> l(mutex_);
+    if (elements_ == 0)
+      empty_cv_.wait(l, [this] { return !deque_.empty(); });
+    T item(std::move(deque_.front()));
+    deque_.pop_front();
+    if (deque_.size() == capacity_ - 1) {
+      l.unlock();
+      full_cv_.notify_all();
+    }
+    return item;
+  }
+
+  void SetCapacity(size_t capacity) {
+    PERFETTO_CHECK(capacity > 0);
+    {
+      std::lock_guard<std::mutex> l(mutex_);
+      capacity_ = capacity;
+    }
+    full_cv_.notify_all();
+  }
+
+ private:
+  size_t capacity_;
+  size_t elements_ = 0;
+  std::deque<T> deque_;
+  std::condition_variable full_cv_;
+  std::condition_variable empty_cv_;
+  std::mutex mutex_;
+};
+
+#endif  // SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
diff --git a/src/profiling/memory/bounded_queue_unittest.cc b/src/profiling/memory/bounded_queue_unittest.cc
new file mode 100644
index 0000000..63127d1
--- /dev/null
+++ b/src/profiling/memory/bounded_queue_unittest.cc
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/bounded_queue.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(BoundedQueueTest, IsFIFO) {
+  BoundedQueue<int> q(2);
+  q.Add(1);
+  q.Add(2);
+  EXPECT_EQ(q.Get(), 1);
+  EXPECT_EQ(q.Get(), 2);
+}
+
+TEST(BoundedQueueTest, Blocking) {
+  BoundedQueue<int> q(2);
+  q.Add(1);
+  q.Add(2);
+  std::thread th([&q] { q.Add(3); });
+  EXPECT_EQ(q.Get(), 1);
+  EXPECT_EQ(q.Get(), 2);
+  EXPECT_EQ(q.Get(), 3);
+  th.join();
+}
+
+TEST(BoundedQueueTest, Resize) {
+  BoundedQueue<int> q(2);
+  q.Add(1);
+  q.Add(2);
+  q.SetCapacity(3);
+  q.Add(3);
+  EXPECT_EQ(q.Get(), 1);
+  EXPECT_EQ(q.Get(), 2);
+  EXPECT_EQ(q.Get(), 3);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index f8d57f7..d00f451 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -18,82 +18,90 @@
 
 #include <inttypes.h>
 #include <sys/socket.h>
+#include <sys/syscall.h>
+#include <sys/un.h>
+#include <unistd.h>
 
 #include <atomic>
+#include <new>
+
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+#include <unwindstack/Regs.h>
+#include <unwindstack/RegsGetLocal.h>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/unix_socket.h"
 #include "perfetto/base/utils.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 namespace {
 
-std::atomic<uint64_t> global_sequence_number(0);
-constexpr size_t kFreePageBytes = base::kPageSize;
-constexpr size_t kFreePageSize = kFreePageBytes / sizeof(uint64_t);
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+// glibc does not define a wrapper around gettid, bionic does.
+pid_t gettid() {
+  return static_cast<pid_t>(syscall(__NR_gettid));
+}
+#endif
+
+std::vector<base::ScopedFile> ConnectPool(const std::string& sock_name,
+                                          size_t n) {
+  sockaddr_un addr;
+  socklen_t addr_size;
+  if (!base::MakeSockAddr(sock_name, &addr, &addr_size))
+    return {};
+
+  std::vector<base::ScopedFile> res;
+  res.reserve(n);
+  for (size_t i = 0; i < n; ++i) {
+    auto sock = base::CreateSocket();
+    if (connect(*sock, reinterpret_cast<sockaddr*>(&addr), addr_size) == -1) {
+      PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
+      continue;
+    }
+    res.emplace_back(std::move(sock));
+  }
+  return res;
+}
+
+inline bool IsMainThread() {
+  return getpid() == gettid();
+}
 
 }  // namespace
 
-FreePage::FreePage() : free_page_(kFreePageSize) {
-  free_page_[0] = static_cast<uint64_t>(kFreePageBytes);
-  free_page_[1] = static_cast<uint64_t>(RecordType::Free);
-  offset_ = 2;
-  // Code in Add assumes that offset is aligned to 2.
-  PERFETTO_DCHECK(offset_ % 2 == 0);
+void FreePage::Add(const uint64_t addr,
+                   const uint64_t sequence_number,
+                   SocketPool* pool) {
+  std::lock_guard<std::mutex> l(mutex_);
+  if (offset_ == kFreePageSize) {
+    FlushLocked(pool);
+    // Now that we have flushed, reset to after the header.
+    offset_ = 0;
+  }
+  FreePageEntry& current_entry = free_page_.entries[offset_++];
+  current_entry.sequence_number = sequence_number;
+  current_entry.addr = addr;
 }
 
-void FreePage::Add(const uint64_t addr, SocketPool* pool) {
-  std::lock_guard<std::mutex> l(mtx_);
-  if (offset_ == kFreePageSize)
-    Flush(pool);
-  static_assert(kFreePageSize % 2 == 0,
-                "free page size needs to be divisible by two");
-  free_page_[offset_++] = ++global_sequence_number;
-  free_page_[offset_++] = addr;
-  PERFETTO_DCHECK(offset_ % 2 == 0);
-}
-
-void FreePage::Flush(SocketPool* pool) {
+void FreePage::FlushLocked(SocketPool* pool) {
+  WireMessage msg = {};
+  msg.record_type = RecordType::Free;
+  msg.free_header = &free_page_;
   BorrowedSocket fd(pool->Borrow());
-  size_t written = 0;
-  do {
-    ssize_t wr = PERFETTO_EINTR(send(*fd, &free_page_[0] + written,
-                                     kFreePageBytes - written, MSG_NOSIGNAL));
-    if (wr == -1) {
-      fd.Close();
-      return;
-    }
-    written += static_cast<size_t>(wr);
-  } while (written < kFreePageBytes);
-  // Now that we have flushed, reset to after the header.
-  offset_ = 2;
-}
-
-BorrowedSocket::BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
-    : fd_(std::move(fd)), socket_pool_(socket_pool) {}
-
-int BorrowedSocket::operator*() {
-  return get();
-}
-
-int BorrowedSocket::get() {
-  return *fd_;
-}
-
-void BorrowedSocket::Close() {
-  fd_.reset();
-}
-
-BorrowedSocket::~BorrowedSocket() {
-  if (socket_pool_ != nullptr)
-    socket_pool_->Return(std::move(fd_));
+  SendWireMessage(*fd, msg);
 }
 
 SocketPool::SocketPool(std::vector<base::ScopedFile> sockets)
     : sockets_(std::move(sockets)), available_sockets_(sockets_.size()) {}
 
 BorrowedSocket SocketPool::Borrow() {
-  std::unique_lock<std::mutex> lck_(mtx_);
+  std::unique_lock<std::mutex> lck_(mutex_);
   if (available_sockets_ == 0)
     cv_.wait(lck_, [this] { return available_sockets_ > 0; });
   PERFETTO_CHECK(available_sockets_ > 0);
@@ -101,13 +109,14 @@
 }
 
 void SocketPool::Return(base::ScopedFile sock) {
+  PERFETTO_CHECK(dead_sockets_ + available_sockets_ < sockets_.size());
   if (!sock) {
     // TODO(fmayer): Handle reconnect or similar.
     // This is just to prevent a deadlock.
     PERFETTO_CHECK(++dead_sockets_ != sockets_.size());
     return;
   }
-  std::unique_lock<std::mutex> lck_(mtx_);
+  std::unique_lock<std::mutex> lck_(mutex_);
   PERFETTO_CHECK(available_sockets_ < sockets_.size());
   sockets_[available_sockets_++] = std::move(sock);
   if (available_sockets_ == 1) {
@@ -116,4 +125,87 @@
   }
 }
 
+const char* GetThreadStackBase() {
+  pthread_attr_t attr;
+  if (pthread_getattr_np(pthread_self(), &attr) != 0)
+    return nullptr;
+  base::ScopedResource<pthread_attr_t*, pthread_attr_destroy, nullptr> cleanup(
+      &attr);
+
+  char* stackaddr;
+  size_t stacksize;
+  if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&stackaddr),
+                            &stacksize) != 0)
+    return nullptr;
+  return stackaddr + stacksize;
+}
+
+Client::Client(std::vector<base::ScopedFile> socks)
+    : socket_pool_(std::move(socks)) {
+  uint64_t size = 0;
+  int fds[2];
+  fds[0] = open("/proc/self/maps", O_RDONLY | O_CLOEXEC);
+  fds[1] = open("/proc/self/mem", O_RDONLY | O_CLOEXEC);
+  base::SockSend(*socket_pool_.Borrow(), &size, sizeof(size), fds, 2);
+}
+
+Client::Client(const std::string& sock_name, size_t conns)
+    : Client(ConnectPool(sock_name, conns)) {}
+
+const char* Client::GetStackBase() {
+  if (IsMainThread()) {
+    if (!main_thread_stack_base_)
+      // Because pthread_attr_getstack reads and parses /proc/self/maps and
+      // /proc/self/stat, we have to cache the result here.
+      main_thread_stack_base_ = GetThreadStackBase();
+    return main_thread_stack_base_;
+  }
+  return GetThreadStackBase();
+}
+
+// The stack grows towards numerically smaller addresses, so the stack layout
+// of main calling malloc is as follows.
+//
+//               +------------+
+//               |SendWireMsg |
+// stacktop +--> +------------+ 0x1000
+//               |RecordMalloc|    +
+//               +------------+    |
+//               | malloc     |    |
+//               +------------+    |
+//               |  main      |    v
+// stackbase +-> +------------+ 0xffff
+void Client::RecordMalloc(uint64_t alloc_size, uint64_t alloc_address) {
+  AllocMetadata metadata;
+  const char* stackbase = GetStackBase();
+  const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+  unwindstack::AsmGetRegs(metadata.register_data);
+
+  if (stackbase < stacktop) {
+    PERFETTO_DCHECK(false);
+    return;
+  }
+
+  uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+  metadata.alloc_size = alloc_size;
+  metadata.alloc_address = alloc_address;
+  metadata.stack_pointer = reinterpret_cast<uint64_t>(stacktop);
+  metadata.stack_pointer_offset = sizeof(AllocMetadata);
+  metadata.arch = unwindstack::Regs::CurrentArch();
+  metadata.sequence_number = ++sequence_number_;
+
+  WireMessage msg{};
+  msg.alloc_header = &metadata;
+  msg.payload = const_cast<char*>(stacktop);
+  msg.payload_size = static_cast<size_t>(stack_size);
+
+  BorrowedSocket sockfd = socket_pool_.Borrow();
+  // TODO(fmayer): Handle failure.
+  PERFETTO_CHECK(SendWireMessage(*sockfd, msg));
+}
+
+void Client::RecordFree(uint64_t alloc_address) {
+  free_page_.Add(alloc_address, ++sequence_number_, &socket_pool_);
+}
+
 }  // namespace perfetto
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index 7a4e459..46a571d 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -23,47 +23,11 @@
 #include <vector>
 
 #include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 
-class SocketPool;
-
-class FreePage {
- public:
-  FreePage();
-
-  // Can be called from any thread. Must not hold mtx_.`
-  void Add(const uint64_t addr, SocketPool* pool);
-
- private:
-  // Needs to be called holding mtx_.
-  void Flush(SocketPool* pool);
-
-  std::vector<uint64_t> free_page_;
-  std::mutex mtx_;
-  size_t offset_;
-};
-
-class BorrowedSocket {
- public:
-  BorrowedSocket(const BorrowedSocket&) = delete;
-  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
-  BorrowedSocket(BorrowedSocket&& other) {
-    fd_ = std::move(other.fd_);
-    socket_pool_ = other.socket_pool_;
-    other.socket_pool_ = nullptr;
-  }
-
-  BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool);
-  int operator*();
-  int get();
-  void Close();
-  ~BorrowedSocket();
-
- private:
-  base::ScopedFile fd_;
-  SocketPool* socket_pool_ = nullptr;
-};
+class BorrowedSocket;
 
 class SocketPool {
  public:
@@ -74,13 +38,80 @@
 
  private:
   void Return(base::ScopedFile fd);
-  std::mutex mtx_;
+  std::mutex mutex_;
   std::condition_variable cv_;
   std::vector<base::ScopedFile> sockets_;
   size_t available_sockets_;
   size_t dead_sockets_ = 0;
 };
 
+// Socket borrowed from a SocketPool. Gets returned once it goes out of scope.
+class BorrowedSocket {
+ public:
+  BorrowedSocket(const BorrowedSocket&) = delete;
+  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
+  BorrowedSocket(BorrowedSocket&& other) noexcept {
+    fd_ = std::move(other.fd_);
+    socket_pool_ = other.socket_pool_;
+    other.socket_pool_ = nullptr;
+  }
+
+  BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
+      : fd_(std::move(fd)), socket_pool_(socket_pool) {}
+
+  ~BorrowedSocket() {
+    if (socket_pool_ != nullptr)
+      socket_pool_->Return(std::move(fd_));
+  }
+
+  int operator*() { return get(); }
+
+  int get() { return *fd_; }
+
+  void Close() { fd_.reset(); }
+
+ private:
+  base::ScopedFile fd_;
+  SocketPool* socket_pool_ = nullptr;
+};
+
+// Cache for frees that have been observed. It is infeasible to send every
+// free separately, so we batch and send the whole buffer once it is full.
+class FreePage {
+ public:
+  // Add address to buffer. Flush if necessary using a socket borrowed from
+  // pool.
+  // Can be called from any thread. Must not hold mutex_.`
+  void Add(const uint64_t addr, uint64_t sequence_number, SocketPool* pool);
+
+ private:
+  // Needs to be called holding mutex_.
+  void FlushLocked(SocketPool* pool);
+
+  FreeMetadata free_page_;
+  std::mutex mutex_;
+  size_t offset_ = 0;
+};
+
+const char* GetThreadStackBase();
+
+// This is created and owned by the malloc hooks.
+class Client {
+ public:
+  Client(std::vector<base::ScopedFile> sockets);
+  Client(const std::string& sock_name, size_t conns);
+  void RecordMalloc(uint64_t alloc_size, uint64_t alloc_address);
+  void RecordFree(uint64_t alloc_address);
+
+ private:
+  const char* GetStackBase();
+
+  SocketPool socket_pool_;
+  FreePage free_page_;
+  const char* main_thread_stack_base_ = nullptr;
+  std::atomic<uint64_t> sequence_number_{0};
+};
+
 }  // namespace perfetto
 
 #endif  // SRC_PROFILING_MEMORY_CLIENT_H_
diff --git a/src/profiling/memory/client_unittest.cc b/src/profiling/memory/client_unittest.cc
index a69bae3..df848da 100644
--- a/src/profiling/memory/client_unittest.cc
+++ b/src/profiling/memory/client_unittest.cc
@@ -67,5 +67,17 @@
   t2.join();
 }
 
+TEST(ClientTest, GetThreadStackBase) {
+  std::thread th([] {
+    const char* stackbase = GetThreadStackBase();
+    ASSERT_NE(stackbase, nullptr);
+    // The implementation assumes the stack grows from higher addresses to
+    // lower. We will need to rework once we encounter architectures where the
+    // stack grows the other way.
+    EXPECT_GT(stackbase, __builtin_frame_address(0));
+  });
+  th.join();
+}
+
 }  // namespace
 }  // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
new file mode 100644
index 0000000..1dc10aa
--- /dev/null
+++ b/src/profiling/memory/heapprofd_integrationtest.cc
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/base/test/test_task_runner.h"
+#include "src/ipc/test/test_socket.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/socket_listener.h"
+#include "src/profiling/memory/unwinding.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+namespace {
+
+constexpr char kSocketName[] = TEST_SOCK_NAME("heapprofd_integrationtest");
+
+void __attribute__((noinline)) OtherFunction(Client* client) {
+  client->RecordMalloc(10, 0xf00);
+}
+
+void __attribute__((noinline)) SomeFunction(Client* client) {
+  OtherFunction(client);
+}
+
+class HeapprofdIntegrationTest : public ::testing::Test {
+ protected:
+  void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
+  void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
+};
+
+// TODO(fmayer): Fix out of tree integration test.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+#define MAYBE_EndToEnd EndToEnd
+#else
+#define MAYBE_EndToEnd DISABLED_EndToEnd
+#endif
+
+TEST_F(HeapprofdIntegrationTest, MAYBE_EndToEnd) {
+  GlobalCallstackTrie callsites;
+
+  base::TestTaskRunner task_runner;
+  auto done = task_runner.CreateCheckpoint("done");
+  SocketListener listener(
+      [&done](UnwindingRecord r) {
+        // TODO(fmayer): Test symbolization and result of unwinding.
+        BookkeepingRecord bookkeeping_record;
+        ASSERT_TRUE(HandleUnwindingRecord(&r, &bookkeeping_record));
+        HandleBookkeepingRecord(&bookkeeping_record);
+        done();
+      },
+      &callsites);
+
+  auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
+  if (!sock->is_listening()) {
+    PERFETTO_ELOG("Socket not listening.");
+    PERFETTO_CHECK(false);
+  }
+  Client client(kSocketName, 1);
+  SomeFunction(&client);
+  task_runner.RunUntilCheckpoint("done");
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index ea53977..991006e 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -15,9 +15,12 @@
  */
 
 #include <stdlib.h>
+#include <array>
 #include <memory>
+#include <vector>
 
 #include "src/ipc/unix_socket.h"
+#include "src/profiling/memory/bounded_queue.h"
 #include "src/profiling/memory/socket_listener.h"
 
 #include "perfetto/base/unix_task_runner.h"
@@ -25,15 +28,60 @@
 namespace perfetto {
 namespace {
 
+constexpr size_t kUnwinderQueueSize = 1000;
+constexpr size_t kBookkeepingQueueSize = 1000;
+constexpr size_t kUnwinderThreads = 5;
+
+// We create kUnwinderThreads unwinding threads and one bookeeping thread.
+// The bookkeeping thread is singleton in order to avoid expensive and
+// complicated synchronisation in the bookkeeping.
+//
+// We wire up the system by creating BoundedQueues between the threads. The main
+// thread runs the TaskRunner driving the SocketListener. The unwinding thread
+// takes the data received by the SocketListener and if it is a malloc does
+// stack unwinding, and if it is a free just forwards the content of the record
+// to the bookkeeping thread.
+//
+//             +--------------+
+//             |SocketListener|
+//             +------+-------+
+//                    |
+//          +--UnwindingRecord -+
+//          |                   |
+// +--------v-------+   +-------v--------+
+// |Unwinding Thread|   |Unwinding Thread|
+// +--------+-------+   +-------+--------+
+//          |                   |
+//          +-BookkeepingRecord +
+//                    |
+//           +--------v---------+
+//           |Bookkeeping Thread|
+//           +------------------+
 int HeapprofdMain(int argc, char** argv) {
+  GlobalCallstackTrie callsites;
   std::unique_ptr<ipc::UnixSocket> sock;
 
-  SocketListener listener(
-      [](size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>) {
-        // TODO(fmayer): Wire this up to a worker thread that does the
-        // unwinding.
-        PERFETTO_LOG("Record received.");
-      });
+  BoundedQueue<BookkeepingRecord> callsites_queue(kBookkeepingQueueSize);
+  std::thread bookkeeping_thread(
+      [&callsites_queue] { BookkeepingMainLoop(&callsites_queue); });
+
+  std::array<BoundedQueue<UnwindingRecord>, kUnwinderThreads> unwinder_queues;
+  for (size_t i = 0; i < kUnwinderThreads; ++i)
+    unwinder_queues[i].SetSize(kUnwinderQueueSize);
+  std::vector<std::thread> unwinding_threads;
+  unwinding_threads.reserve(kUnwinderThreads);
+  for (size_t i = 0; i < kUnwinderThreads; ++i) {
+    unwinding_threads.emplace_back([&unwinder_queues, &callsites_queue, i] {
+      UnwindingMainLoop(&unwinder_queues[i], &callsites_queue);
+    });
+  }
+
+  auto on_record_received = [&unwinder_queues](UnwindingRecord r) {
+    unwinder_queues[static_cast<size_t>(r.pid) % kUnwinderThreads].Add(
+        std::move(r));
+  };
+  SocketListener listener(std::move(on_record_received), &callsites);
+
   base::UnixTaskRunner read_task_runner;
   if (argc == 2) {
     // Allow to be able to manually specify the socket to listen on
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
index 81b7390..1a26e01 100644
--- a/src/profiling/memory/socket_listener.cc
+++ b/src/profiling/memory/socket_listener.cc
@@ -19,18 +19,18 @@
 
 namespace perfetto {
 
-void SocketListener::OnDisconnect(ipc::UnixSocket* self) {
+void SocketListener::OnDisconnect(base::UnixSocket* self) {
   sockets_.erase(self);
 }
 
 void SocketListener::OnNewIncomingConnection(
-    ipc::UnixSocket*,
-    std::unique_ptr<ipc::UnixSocket> new_connection) {
-  ipc::UnixSocket* new_connection_raw = new_connection.get();
+    base::UnixSocket*,
+    std::unique_ptr<base::UnixSocket> new_connection) {
+  base::UnixSocket* new_connection_raw = new_connection.get();
   sockets_.emplace(new_connection_raw, std::move(new_connection));
 }
 
-void SocketListener::OnDataAvailable(ipc::UnixSocket* self) {
+void SocketListener::OnDataAvailable(base::UnixSocket* self) {
   auto it = sockets_.find(self);
   if (it == sockets_.end())
     return;
@@ -82,7 +82,7 @@
   if (it == process_metadata_.end() || it->second.expired()) {
     // We have not seen the PID yet or the PID is being recycled.
     entry->process_metadata = std::make_shared<ProcessMetadata>(
-        peer_pid, std::move(maps_fd), std::move(mem_fd));
+        peer_pid, std::move(maps_fd), std::move(mem_fd), callsites_);
     process_metadata_[peer_pid] = entry->process_metadata;
   } else {
     // If the process already has metadata, this is an additional socket for
@@ -92,23 +92,34 @@
   }
 }
 
-void SocketListener::RecordReceived(ipc::UnixSocket* self,
+void SocketListener::RecordReceived(base::UnixSocket* self,
                                     size_t size,
                                     std::unique_ptr<uint8_t[]> buf) {
   auto it = sockets_.find(self);
-  if (it == sockets_.end())
+  if (it == sockets_.end()) {
     // This happens for zero-length records, because the callback gets called
     // in the first call to Read, before InitProcess is called. Because zero
     // length records are useless anyway, this is not a problem.
     return;
+  }
   Entry& entry = it->second;
+  if (!entry.process_metadata) {
+    PERFETTO_DLOG("Received record without process metadata.");
+    return;
+  }
+
+  if (size == 0) {
+    PERFETTO_DLOG("Dropping empty record.");
+    return;
+  }
   // This needs to be a weak_ptr for two reasons:
   // 1) most importantly, the weak_ptr in process_metadata_ should expire as
   // soon as the last socket for a process goes away. Otherwise, a recycled
   // PID might reuse incorrect metadata.
   // 2) it is a waste to unwind for a process that had already gone away.
   std::weak_ptr<ProcessMetadata> weak_metadata(entry.process_metadata);
-  callback_function_(size, std::move(buf), std::move(weak_metadata));
+  callback_function_({entry.process_metadata->pid, size, std::move(buf),
+                      std::move(weak_metadata)});
 }
 
 }  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
index ba5d92e..6d83d9e 100644
--- a/src/profiling/memory/socket_listener.h
+++ b/src/profiling/memory/socket_listener.h
@@ -17,7 +17,8 @@
 #ifndef SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
 #define SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
 
-#include "src/ipc/unix_socket.h"
+#include "perfetto/base/unix_socket.h"
+#include "src/profiling/memory/bookkeeping.h"
 #include "src/profiling/memory/record_reader.h"
 #include "src/profiling/memory/unwinding.h"
 
@@ -26,23 +27,22 @@
 
 namespace perfetto {
 
-class SocketListener : public ipc::UnixSocket::EventListener {
+class SocketListener : public base::UnixSocket::EventListener {
  public:
-  SocketListener(std::function<void(size_t,
-                                    std::unique_ptr<uint8_t[]>,
-                                    std::weak_ptr<ProcessMetadata>)> fn)
-      : callback_function_(std::move(fn)) {}
-  void OnDisconnect(ipc::UnixSocket* self) override;
+  SocketListener(std::function<void(UnwindingRecord)> fn,
+                 GlobalCallstackTrie* callsites)
+      : callback_function_(std::move(fn)), callsites_(callsites) {}
+  void OnDisconnect(base::UnixSocket* self) override;
   void OnNewIncomingConnection(
-      ipc::UnixSocket* self,
-      std::unique_ptr<ipc::UnixSocket> new_connection) override;
-  void OnDataAvailable(ipc::UnixSocket* self) override;
+      base::UnixSocket* self,
+      std::unique_ptr<base::UnixSocket> new_connection) override;
+  void OnDataAvailable(base::UnixSocket* self) override;
 
  private:
   struct Entry {
-    Entry(std::unique_ptr<ipc::UnixSocket> s) : sock(std::move(s)) {}
+    Entry(std::unique_ptr<base::UnixSocket> s) : sock(std::move(s)) {}
     // Only here for ownership of the object.
-    const std::unique_ptr<ipc::UnixSocket> sock;
+    const std::unique_ptr<base::UnixSocket> sock;
     RecordReader record_reader;
     bool recv_fds = false;
     // The sockets own the metadata for a particular PID. When the last socket
@@ -55,17 +55,16 @@
     std::shared_ptr<ProcessMetadata> process_metadata;
   };
 
-  void RecordReceived(ipc::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
+  void RecordReceived(base::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
   void InitProcess(Entry* entry,
                    pid_t peer_pid,
                    base::ScopedFile maps_fd,
                    base::ScopedFile mem_fd);
 
-  std::map<ipc::UnixSocket*, Entry> sockets_;
+  std::map<base::UnixSocket*, Entry> sockets_;
   std::map<pid_t, std::weak_ptr<ProcessMetadata>> process_metadata_;
-  std::function<
-      void(size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>)>
-      callback_function_;
+  std::function<void(UnwindingRecord)> callback_function_;
+  GlobalCallstackTrie* callsites_;
 };
 
 }  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index da7efdb..58d5e8f 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -37,34 +37,33 @@
   void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
 };
 
-class MockEventListener : public ipc::UnixSocket::EventListener {
+class MockEventListener : public base::UnixSocket::EventListener {
  public:
-  MOCK_METHOD2(OnConnect, void(ipc::UnixSocket*, bool));
+  MOCK_METHOD2(OnConnect, void(base::UnixSocket*, bool));
 };
 
 TEST_F(SocketListenerTest, ReceiveRecord) {
   base::TestTaskRunner task_runner;
   auto callback_called = task_runner.CreateCheckpoint("callback.called");
   auto connected = task_runner.CreateCheckpoint("connected");
-  auto callback_fn = [&callback_called](
-                         size_t size, std::unique_ptr<uint8_t[]> buf,
-                         std::weak_ptr<ProcessMetadata> metadata) {
-    ASSERT_EQ(size, 1u);
-    ASSERT_EQ(buf[0], '1');
-    ASSERT_FALSE(metadata.expired());
+  auto callback_fn = [&callback_called](UnwindingRecord r) {
+    ASSERT_EQ(r.size, 1u);
+    ASSERT_EQ(r.data[0], '1');
+    ASSERT_FALSE(r.metadata.expired());
     callback_called();
   };
 
-  SocketListener listener(std::move(callback_fn));
+  GlobalCallstackTrie bookkeeping;
+  SocketListener listener(std::move(callback_fn), &bookkeeping);
   MockEventListener client_listener;
   EXPECT_CALL(client_listener, OnConnect(_, _))
       .WillOnce(InvokeWithoutArgs(connected));
 
-  std::unique_ptr<ipc::UnixSocket> recv_socket =
-      ipc::UnixSocket::Listen(kSocketName, &listener, &task_runner);
+  std::unique_ptr<base::UnixSocket> recv_socket =
+      base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
 
-  std::unique_ptr<ipc::UnixSocket> client_socket =
-      ipc::UnixSocket::Connect(kSocketName, &client_listener, &task_runner);
+  std::unique_ptr<base::UnixSocket> client_socket =
+      base::UnixSocket::Connect(kSocketName, &client_listener, &task_runner);
 
   task_runner.RunUntilCheckpoint("connected");
   uint64_t size = 1;
diff --git a/src/profiling/memory/transport_data.h b/src/profiling/memory/transport_data.h
deleted file mode 100644
index c0ef3ef..0000000
--- a/src/profiling/memory/transport_data.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// The data types used for communication between heapprofd and the client
-// embedded in processes that are being profiled.
-
-#ifndef SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-#define SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-
-#include <inttypes.h>
-#include <unwindstack/Elf.h>
-
-namespace perfetto {
-
-// Use uint64_t to make sure the following data is aligned as 64bit is the
-// strongest alignment requirement.
-enum class RecordType : uint64_t {
-  Free = 0,
-  Malloc = 1,
-};
-
-struct AllocMetadata {
-  // Size of the allocation that was made.
-  uint64_t alloc_size;
-  // Pointer returned by malloc(2) for this allocation.
-  uint64_t alloc_address;
-  // Current value of the stack pointer.
-  uint64_t stack_pointer;
-  // Offset of the data at stack_pointer from the start of this record.
-  uint64_t stack_pointer_offset;
-  // CPU architecture of the client. This determines the size of the
-  // register data that follows this struct.
-  unwindstack::ArchEnum arch;
-};
-
-}  // namespace perfetto
-
-#endif  // SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index a317600..1dcc8c0 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -42,8 +42,8 @@
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
 #include "src/profiling/memory/unwinding.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 
@@ -155,34 +155,19 @@
   maps_.clear();
 }
 
-bool DoUnwind(void* mem,
-              size_t sz,
-              ProcessMetadata* metadata,
-              std::vector<unwindstack::FrameData>* out) {
-  if (sz < sizeof(AllocMetadata)) {
-    PERFETTO_ELOG("size");
-    return false;
-  }
-  AllocMetadata* alloc_metadata = reinterpret_cast<AllocMetadata*>(mem);
-  if (sizeof(AllocMetadata) + RegSize(alloc_metadata->arch) > sz)
-    return false;
-  void* reg_data = static_cast<uint8_t*>(mem) + sizeof(AllocMetadata);
+bool DoUnwind(WireMessage* msg, ProcessMetadata* metadata, AllocRecord* out) {
+  AllocMetadata* alloc_metadata = msg->alloc_header;
   std::unique_ptr<unwindstack::Regs> regs(
-      CreateFromRawData(alloc_metadata->arch, reg_data));
+      CreateFromRawData(alloc_metadata->arch, alloc_metadata->register_data));
   if (regs == nullptr) {
     PERFETTO_ELOG("regs");
     return false;
   }
-  if (alloc_metadata->stack_pointer_offset < sizeof(AllocMetadata) ||
-      alloc_metadata->stack_pointer_offset > sz) {
-    PERFETTO_ELOG("out-of-bound stack_pointer_offset");
-    return false;
-  }
-  uint8_t* stack =
-      reinterpret_cast<uint8_t*>(mem) + alloc_metadata->stack_pointer_offset;
+  out->alloc_metadata = *alloc_metadata;
+  uint8_t* stack = reinterpret_cast<uint8_t*>(msg->payload);
   std::shared_ptr<unwindstack::Memory> mems = std::make_shared<StackMemory>(
       *metadata->mem_fd, alloc_metadata->stack_pointer, stack,
-      sz - alloc_metadata->stack_pointer_offset);
+      msg->payload_size);
   unwindstack::Unwinder unwinder(kMaxFrames, &metadata->maps, regs.get(), mems);
   // Surpress incorrect "variable may be uninitialized" error for if condition
   // after this loop. error_code = LastErrorCode gets run at least once.
@@ -198,10 +183,83 @@
       break;
   }
   if (error_code == 0)
-    *out = unwinder.frames();
+    out->frames = unwinder.frames();
   else
     PERFETTO_DLOG("unwinding failed %" PRIu8, error_code);
   return error_code == 0;
 }
 
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out) {
+  WireMessage msg;
+  if (!ReceiveWireMessage(reinterpret_cast<char*>(rec->data.get()), rec->size,
+                          &msg))
+    return false;
+  switch (msg.record_type) {
+    case RecordType::Malloc: {
+      std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+      if (!metadata)
+        // Process has already gone away.
+        return false;
+
+      out->metadata = std::move(rec->metadata);
+      out->free_record = {};
+      return DoUnwind(&msg, metadata.get(), &out->alloc_record);
+    }
+    case RecordType::Free: {
+      // We need to keep this alive, because msg.free_header is a pointer into
+      // this.
+      out->metadata = std::move(rec->metadata);
+      out->free_record.free_data = std::move(rec->data);
+      out->free_record.metadata = msg.free_header;
+      out->alloc_record = {};
+      return true;
+    }
+  }
+}
+
+__attribute__((noreturn)) void UnwindingMainLoop(
+    BoundedQueue<UnwindingRecord>* input_queue,
+    BoundedQueue<BookkeepingRecord>* output_queue) {
+  for (;;) {
+    UnwindingRecord rec = input_queue->Get();
+    BookkeepingRecord out;
+    if (HandleUnwindingRecord(&rec, &out))
+      output_queue->Add(std::move(out));
+  }
+}
+
+void HandleBookkeepingRecord(BookkeepingRecord* rec) {
+  std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+  if (!metadata)
+    // Process has already gone away.
+    return;
+
+  if (rec->free_record.free_data) {
+    FreeRecord& free_rec = rec->free_record;
+    FreePageEntry* entries = free_rec.metadata->entries;
+    uint64_t num_entries = free_rec.metadata->num_entries;
+    for (size_t i = 0; i < num_entries; ++i) {
+      const FreePageEntry& entry = entries[i];
+      metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
+    }
+  } else {
+    AllocRecord& alloc_rec = rec->alloc_record;
+    std::vector<CodeLocation> code_locations;
+    for (unwindstack::FrameData& frame : alloc_rec.frames)
+      code_locations.emplace_back(frame.map_name, frame.function_name);
+    metadata->heap_dump.RecordMalloc(code_locations,
+                                     alloc_rec.alloc_metadata.alloc_address,
+                                     alloc_rec.alloc_metadata.alloc_size,
+                                     alloc_rec.alloc_metadata.sequence_number);
+  }
+}
+
+__attribute__((noreturn)) void BookkeepingMainLoop(
+    BoundedQueue<BookkeepingRecord>* input_queue) {
+  for (;;) {
+    BookkeepingRecord rec = input_queue->Get();
+    HandleBookkeepingRecord(&rec);
+  }
+}
+
 }  // namespace perfetto
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index 455be43..1bdcf0d 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -20,6 +20,9 @@
 #include <unwindstack/Maps.h>
 #include <unwindstack/Unwinder.h>
 #include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/bookkeeping.h"
+#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 
@@ -36,13 +39,20 @@
 };
 
 struct ProcessMetadata {
-  ProcessMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
-      : pid(p), maps(std::move(maps_fd)), mem_fd(std::move(mem)) {
+  ProcessMetadata(pid_t p,
+                  base::ScopedFile maps_fd,
+                  base::ScopedFile mem,
+                  GlobalCallstackTrie* callsites)
+      : pid(p),
+        maps(std::move(maps_fd)),
+        mem_fd(std::move(mem)),
+        heap_dump(callsites) {
     PERFETTO_CHECK(maps.Parse());
   }
   pid_t pid;
   FileDescriptorMaps maps;
   base::ScopedFile mem_fd;
+  HeapTracker heap_dump;
 };
 
 // Overlays size bytes pointed to by stack for addresses in [sp, sp + size).
@@ -62,10 +72,39 @@
 
 size_t RegSize(unwindstack::ArchEnum arch);
 
-bool DoUnwind(void* mem,
-              size_t sz,
-              ProcessMetadata* metadata,
-              std::vector<unwindstack::FrameData>* out);
+struct UnwindingRecord {
+  pid_t pid;
+  size_t size;
+  std::unique_ptr<uint8_t[]> data;
+  std::weak_ptr<ProcessMetadata> metadata;
+};
+
+struct FreeRecord {
+  std::unique_ptr<uint8_t[]> free_data;
+  FreeMetadata* metadata;
+};
+
+struct AllocRecord {
+  AllocMetadata alloc_metadata;
+  std::vector<unwindstack::FrameData> frames;
+};
+
+struct BookkeepingRecord {
+  // TODO(fmayer): Use a union.
+  std::weak_ptr<ProcessMetadata> metadata;
+  AllocRecord alloc_record;
+  FreeRecord free_record;
+};
+
+bool DoUnwind(WireMessage*, ProcessMetadata* metadata, AllocRecord* out);
+
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out);
+void HandleBookkeepingRecord(BookkeepingRecord* rec);
+
+void UnwindingMainLoop(BoundedQueue<UnwindingRecord>* input_queue,
+                       BoundedQueue<BookkeepingRecord>* output_queue);
+
+void BookkeepingMainLoop(BoundedQueue<BookkeepingRecord>* input_queue);
 
 }  // namespace perfetto
 
diff --git a/src/profiling/memory/unwinding_unittest.cc b/src/profiling/memory/unwinding_unittest.cc
index 06f3ba9..f80a0ee 100644
--- a/src/profiling/memory/unwinding_unittest.cc
+++ b/src/profiling/memory/unwinding_unittest.cc
@@ -16,7 +16,8 @@
 
 #include "src/profiling/memory/unwinding.h"
 #include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
@@ -70,20 +71,6 @@
 #define MAYBE_DoUnwind DISABLED_DoUnwind
 #endif
 
-uint8_t* GetStackBase() {
-  pthread_t t = pthread_self();
-  pthread_attr_t attr;
-  if (pthread_getattr_np(t, &attr) != 0) {
-    return nullptr;
-  }
-  uint8_t* x;
-  size_t s;
-  if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&x), &s) != 0)
-    return nullptr;
-  pthread_attr_destroy(&attr);
-  return x + s;
-}
-
 // This is needed because ASAN thinks copying the whole stack is a buffer
 // underrun.
 void __attribute__((noinline))
@@ -95,50 +82,59 @@
     to[i] = from[i];
 }
 
-std::pair<std::unique_ptr<uint8_t[]>, size_t> __attribute__((noinline))
-GetRecord() {
-  const uint8_t* stackbase = GetStackBase();
-  PERFETTO_CHECK(stackbase != nullptr);
-  const uint8_t* stacktop =
-      reinterpret_cast<uint8_t*>(__builtin_frame_address(0));
-  PERFETTO_CHECK(stacktop != nullptr);
-  PERFETTO_CHECK(stacktop < stackbase);
-  const size_t stack_size = static_cast<size_t>(stackbase - stacktop);
-  std::unique_ptr<unwindstack::Regs> regs(unwindstack::Regs::CreateFromLocal());
-  const unwindstack::ArchEnum arch = regs->CurrentArch();
-  const size_t reg_size = RegSize(arch);
-  const size_t total_size = sizeof(AllocMetadata) + reg_size + stack_size;
-  std::unique_ptr<uint8_t[]> buf(new uint8_t[total_size]);
-  AllocMetadata* metadata = reinterpret_cast<AllocMetadata*>(buf.get());
-  metadata->alloc_size = 0;
-  metadata->alloc_address = 0;
+struct RecordMemory {
+  std::unique_ptr<uint8_t[]> payload;
+  std::unique_ptr<AllocMetadata> metadata;
+};
+
+RecordMemory __attribute__((noinline)) GetRecord(WireMessage* msg) {
+  std::unique_ptr<AllocMetadata> metadata(new AllocMetadata);
+
+  const char* stackbase = GetThreadStackBase();
+  const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+  unwindstack::AsmGetRegs(metadata->register_data);
+
+  if (stackbase < stacktop) {
+    PERFETTO_DCHECK(false);
+    return {nullptr, nullptr};
+  }
+  uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+
+  metadata->alloc_size = 10;
+  metadata->alloc_address = 0x10;
   metadata->stack_pointer = reinterpret_cast<uint64_t>(stacktop);
-  metadata->stack_pointer_offset = sizeof(AllocMetadata) + reg_size;
-  metadata->arch = arch;
-  unwindstack::RegsGetLocal(regs.get());
-  // Make sure nothing above has changed the stack pointer, just for extra
-  // paranoia.
-  PERFETTO_CHECK(stacktop ==
-                 reinterpret_cast<uint8_t*>(__builtin_frame_address(0)));
-  memcpy(buf.get() + sizeof(AllocMetadata), regs->RawData(), reg_size);
-  UnsafeMemcpy(buf.get() + sizeof(AllocMetadata) + reg_size, stacktop,
-               stack_size);
-  return {std::move(buf), total_size};
+  metadata->stack_pointer_offset = sizeof(AllocMetadata);
+  metadata->arch = unwindstack::Regs::CurrentArch();
+  metadata->sequence_number = 1;
+
+  std::unique_ptr<uint8_t[]> payload(new uint8_t[stack_size]);
+  UnsafeMemcpy(&payload[0], stacktop, stack_size);
+
+  *msg = {};
+  msg->alloc_header = metadata.get();
+  msg->payload = reinterpret_cast<char*>(payload.get());
+  msg->payload_size = static_cast<size_t>(stack_size);
+  return {std::move(payload), std::move(metadata)};
 }
 
 // TODO(fmayer): Investigate why this fails out of tree.
 TEST(UnwindingTest, MAYBE_DoUnwind) {
   base::ScopedFile proc_maps(open("/proc/self/maps", O_RDONLY));
   base::ScopedFile proc_mem(open("/proc/self/mem", O_RDONLY));
-  ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem));
-  auto record = GetRecord();
-  std::vector<unwindstack::FrameData> out;
-  ASSERT_TRUE(DoUnwind(record.first.get(), record.second, &metadata, &out));
+  GlobalCallstackTrie callsites;
+  ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem),
+                           &callsites);
+  WireMessage msg;
+  auto record = GetRecord(&msg);
+  AllocRecord out;
+  ASSERT_TRUE(DoUnwind(&msg, &metadata, &out));
   int st;
-  std::unique_ptr<char> demangled(
-      abi::__cxa_demangle(out[0].function_name.c_str(), nullptr, nullptr, &st));
+  std::unique_ptr<char> demangled(abi::__cxa_demangle(
+      out.frames[0].function_name.c_str(), nullptr, nullptr, &st));
   ASSERT_EQ(st, 0);
-  ASSERT_STREQ(demangled.get(), "perfetto::(anonymous namespace)::GetRecord()");
+  ASSERT_STREQ(
+      demangled.get(),
+      "perfetto::(anonymous namespace)::GetRecord(perfetto::WireMessage*)");
 }
 
 }  // namespace
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
new file mode 100644
index 0000000..39373f9
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.cc
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+namespace perfetto {
+
+namespace {
+template <typename T>
+bool ViewAndAdvance(char** ptr, T** out, const char* end) {
+  if (end - sizeof(T) < *ptr)
+    return false;
+  *out = reinterpret_cast<T*>(ptr);
+  ptr += sizeof(T);
+  return true;
+}
+}  // namespace
+
+bool SendWireMessage(int sock, const WireMessage& msg) {
+  uint64_t total_size;
+  struct iovec iovecs[4] = {};
+  // TODO(fmayer): Maye pack these two.
+  iovecs[0].iov_base = &total_size;
+  iovecs[0].iov_len = sizeof(total_size);
+  iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
+  iovecs[1].iov_len = sizeof(msg.record_type);
+  if (msg.alloc_header) {
+    iovecs[2].iov_base = msg.alloc_header;
+    iovecs[2].iov_len = sizeof(*msg.alloc_header);
+  } else if (msg.free_header) {
+    iovecs[2].iov_base = msg.free_header;
+    iovecs[2].iov_len = sizeof(*msg.free_header);
+  } else {
+    PERFETTO_DCHECK(false);
+    return false;
+  }
+
+  iovecs[3].iov_base = msg.payload;
+  iovecs[3].iov_len = msg.payload_size;
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iovecs;
+  if (msg.payload) {
+    hdr.msg_iovlen = base::ArraySize(iovecs);
+    total_size = iovecs[1].iov_len + iovecs[2].iov_len + iovecs[3].iov_len;
+  } else {
+    // If we are not sending payload, just ignore that iovec.
+    hdr.msg_iovlen = base::ArraySize(iovecs) - 1;
+    total_size = iovecs[1].iov_len + iovecs[2].iov_len;
+  }
+
+  ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+  return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
+}
+
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out) {
+  RecordType* record_type;
+  char* end = buf + size;
+  if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
+    return false;
+  switch (*record_type) {
+    case RecordType::Malloc:
+      if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+        return false;
+      out->payload = buf;
+      if (buf > end) {
+        PERFETTO_DCHECK(false);
+        return false;
+      }
+      out->payload_size = static_cast<size_t>(end - buf);
+      break;
+    case RecordType::Free:
+      if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+        return false;
+      break;
+  }
+  out->record_type = *record_type;
+  return true;
+}
+
+}  // namespace perfetto
diff --git a/src/profiling/memory/wire_protocol.h b/src/profiling/memory/wire_protocol.h
new file mode 100644
index 0000000..2810872
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.h
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The data types used for communication between heapprofd and the client
+// embedded in processes that are being profiled.
+
+#ifndef SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+#define SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+
+#include <inttypes.h>
+#include <unwindstack/Elf.h>
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+
+namespace perfetto {
+
+// Types needed for the wire format used for communication between the client
+// and heapprofd. The basic format of a record is
+// record size (uint64_t) | record type (RecordType = uint64_t) | record
+// If record type is malloc, the record format is AllocMetdata | raw stack.
+// If the record type is free, the record is a sequence of FreePageEntry.
+
+// Use uint64_t to make sure the following data is aligned as 64bit is the
+// strongest alignment requirement.
+
+// C++11 std::max is not constexpr.
+constexpr size_t constexpr_max(size_t x, size_t y) {
+  return x > y ? x : y;
+}
+
+constexpr size_t kMaxRegisterDataSize = constexpr_max(
+    constexpr_max(constexpr_max(unwindstack::ARM_REG_LAST * sizeof(uint32_t),
+                                unwindstack::ARM64_REG_LAST * sizeof(uint64_t)),
+                  unwindstack::X86_REG_LAST * sizeof(uint32_t)),
+    unwindstack::X86_64_REG_LAST * sizeof(uint64_t));
+
+constexpr size_t kFreePageSize = 1024;
+
+enum class RecordType : uint64_t {
+  Free = 0,
+  Malloc = 1,
+};
+
+struct AllocMetadata {
+  uint64_t sequence_number;
+  // Size of the allocation that was made.
+  uint64_t alloc_size;
+  // Pointer returned by malloc(2) for this allocation.
+  uint64_t alloc_address;
+  // Current value of the stack pointer.
+  uint64_t stack_pointer;
+  // Offset of the data at stack_pointer from the start of this record.
+  uint64_t stack_pointer_offset;
+  // CPU architecture of the client. This determines the size of the
+  // register data that follows this struct.
+  unwindstack::ArchEnum arch;
+  char register_data[kMaxRegisterDataSize];
+};
+
+struct FreePageEntry {
+  uint64_t sequence_number;
+  uint64_t addr;
+};
+
+struct FreeMetadata {
+  uint64_t num_entries;
+  FreePageEntry entries[kFreePageSize];
+};
+
+struct WireMessage {
+  RecordType record_type;
+
+  AllocMetadata* alloc_header;
+  FreeMetadata* free_header;
+
+  char* payload;
+  size_t payload_size;
+};
+
+bool SendWireMessage(int sock, const WireMessage& msg);
+
+// Parse message received over the wire.
+// |buf| has to outlive |out|.
+// If buf is not a valid message, return false.
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out);
+
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_