profiling: Add missing pieces to heapprofd.

* Allow several heap dumps using the same call graph tree.
* Add main loops for worker threads.
* Add queue between worker threads.
* Add client functions for sending the stack.

Change-Id: I5d66b6199ab72467e43335df257c08825c2d2228
diff --git a/Android.bp b/Android.bp
index 9365d34..8edd08a 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3849,8 +3849,10 @@
     "src/perfetto_cmd/rate_limiter_unittest.cc",
     "src/profiling/memory/bookkeeping.cc",
     "src/profiling/memory/bookkeeping_unittest.cc",
+    "src/profiling/memory/bounded_queue_unittest.cc",
     "src/profiling/memory/client.cc",
     "src/profiling/memory/client_unittest.cc",
+    "src/profiling/memory/heapprofd_integrationtest.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/record_reader_unittest.cc",
     "src/profiling/memory/socket_listener.cc",
@@ -3859,6 +3861,7 @@
     "src/profiling/memory/string_interner_unittest.cc",
     "src/profiling/memory/unwinding.cc",
     "src/profiling/memory/unwinding_unittest.cc",
+    "src/profiling/memory/wire_protocol.cc",
     "src/protozero/message.cc",
     "src/protozero/message_handle.cc",
     "src/protozero/message_handle_unittest.cc",
diff --git a/include/perfetto/base/sock_utils.h b/include/perfetto/base/sock_utils.h
index 0008bf3..8f3a8b3 100644
--- a/include/perfetto/base/sock_utils.h
+++ b/include/perfetto/base/sock_utils.h
@@ -19,6 +19,9 @@
 
 #include "perfetto/base/scoped_file.h"
 
+#include <sys/socket.h>
+#include <sys/un.h>
+
 namespace perfetto {
 namespace base {
 
@@ -33,6 +36,13 @@
                 size_t len,
                 base::ScopedFile* fd_vec,
                 size_t max_files);
+
+bool MakeSockAddr(const std::string& socket_name,
+                  sockaddr_un* addr,
+                  socklen_t* addr_size);
+
+base::ScopedFile CreateSocket();
+
 }  // namespace base
 }  // namespace perfetto
 
diff --git a/src/base/sock_utils.cc b/src/base/sock_utils.cc
index d643d8d..bbe00fd 100644
--- a/src/base/sock_utils.cc
+++ b/src/base/sock_utils.cc
@@ -131,5 +131,27 @@
 
 #pragma GCC diagnostic pop
 
+bool MakeSockAddr(const std::string& socket_name,
+                  sockaddr_un* addr,
+                  socklen_t* addr_size) {
+  memset(addr, 0, sizeof(*addr));
+  const size_t name_len = socket_name.size();
+  if (name_len >= sizeof(addr->sun_path)) {
+    errno = ENAMETOOLONG;
+    return false;
+  }
+  memcpy(addr->sun_path, socket_name.data(), name_len);
+  if (addr->sun_path[0] == '@')
+    addr->sun_path[0] = '\0';
+  addr->sun_family = AF_UNIX;
+  *addr_size = static_cast<socklen_t>(
+      __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+  return true;
+}
+
+base::ScopedFile CreateSocket() {
+  return base::ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
+}
+
 }  // namespace base
 }  // namespace perfetto
diff --git a/src/ipc/unix_socket.cc b/src/ipc/unix_socket.cc
index 4acc59f..3f49b8d 100644
--- a/src/ipc/unix_socket.cc
+++ b/src/ipc/unix_socket.cc
@@ -44,41 +44,15 @@
 
 // TODO(primiano): Add ThreadChecker to methods of this class.
 
-namespace {
-
-bool MakeSockAddr(const std::string& socket_name,
-                  sockaddr_un* addr,
-                  socklen_t* addr_size) {
-  memset(addr, 0, sizeof(*addr));
-  const size_t name_len = socket_name.size();
-  if (name_len >= sizeof(addr->sun_path)) {
-    errno = ENAMETOOLONG;
-    return false;
-  }
-  memcpy(addr->sun_path, socket_name.data(), name_len);
-  if (addr->sun_path[0] == '@')
-    addr->sun_path[0] = '\0';
-  addr->sun_family = AF_UNIX;
-  *addr_size = static_cast<socklen_t>(
-      __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
-  return true;
-}
-
-base::ScopedFile CreateSocket() {
-  return base::ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
-}
-
-}  // namespace
-
 // static
 base::ScopedFile UnixSocket::CreateAndBind(const std::string& socket_name) {
-  base::ScopedFile fd = CreateSocket();
+  base::ScopedFile fd = base::CreateSocket();
   if (!fd)
     return fd;
 
   sockaddr_un addr;
   socklen_t addr_size;
-  if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+  if (!base::MakeSockAddr(socket_name, &addr, &addr_size)) {
     return base::ScopedFile();
   }
 
@@ -134,7 +108,7 @@
   if (adopt_state == State::kDisconnected) {
     // We get here from the default ctor().
     PERFETTO_DCHECK(!adopt_fd);
-    fd_ = CreateSocket();
+    fd_ = base::CreateSocket();
     if (!fd_) {
       last_error_ = errno;
       return;
@@ -201,7 +175,7 @@
 
   sockaddr_un addr;
   socklen_t addr_size;
-  if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+  if (!base::MakeSockAddr(socket_name, &addr, &addr_size)) {
     last_error_ = errno;
     return NotifyConnectionState(false);
   }
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 0ccc573..78e57c9 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -15,83 +15,46 @@
 import("../../../gn/perfetto.gni")
 import("//build_overrides/build.gni")
 
-source_set("record_reader") {
-  public_configs = [ "../../../gn:default_config" ]
-  deps = [
-    "../../../gn:default_deps",
-    "../../base",
-  ]
-  sources = [
-    "record_reader.cc",
-    "record_reader.h",
-  ]
-}
-
-source_set("unwinding") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+source_set("wire_protocol") {
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
     "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
     "../../base",
   ]
   sources = [
-    "unwinding.cc",
-    "unwinding.h",
+    "wire_protocol.cc",
+    "wire_protocol.h",
   ]
 }
 
-source_set("socket_listener") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+source_set("daemon") {
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
-    ":record_reader",
-    ":unwinding",
+    ":wire_protocol",
+    "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
     "../../base",
     "../../ipc",
   ]
   sources = [
-    "socket_listener.cc",
-    "socket_listener.h",
-  ]
-}
-
-source_set("string_interner") {
-  public_configs = [ "../../../gn:default_config" ]
-  deps = [
-    "../../../gn:default_deps",
-    "../../base",
-  ]
-  sources = [
-    "string_interner.cc",
-    "string_interner.h",
-  ]
-}
-
-source_set("bookkeeping") {
-  public_configs = [ "../../../gn:default_config" ]
-  deps = [
-    ":string_interner",
-    "../../../gn:default_deps",
-    "../../base",
-  ]
-  sources = [
     "bookkeeping.cc",
     "bookkeeping.h",
+    "record_reader.cc",
+    "record_reader.h",
+    "socket_listener.cc",
+    "socket_listener.h",
+    "string_interner.cc",
+    "string_interner.h",
+    "unwinding.cc",
+    "unwinding.h",
   ]
 }
 
 source_set("client") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
+    ":wire_protocol",
     "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
     "../../base",
@@ -103,25 +66,22 @@
 }
 
 source_set("unittests") {
-  public_configs = [
-    "../../../gn:default_config",
-    "../../../buildtools:libunwindstack_config",
-  ]
+  public_configs = [ "../../../buildtools:libunwindstack_config" ]
   testonly = true
   deps = [
-    ":bookkeeping",
     ":client",
-    ":record_reader",
-    ":socket_listener",
-    ":string_interner",
-    ":unwinding",
+    ":daemon",
+    ":wire_protocol",
+    "../../../gn:default_deps",
     "../../../gn:gtest_deps",
     "../../base",
     "../../base:test_support",
   ]
   sources = [
     "bookkeeping_unittest.cc",
+    "bounded_queue_unittest.cc",
     "client_unittest.cc",
+    "heapprofd_integrationtest.cc",
     "record_reader_unittest.cc",
     "socket_listener_unittest.cc",
     "string_interner_unittest.cc",
@@ -130,10 +90,8 @@
 }
 
 executable("heapprofd") {
-  public_configs = [ "../../../gn:default_config" ]
   deps = [
-    ":socket_listener",
-    ":unwinding",
+    ":daemon",
     "../../../gn:default_deps",
     "../../base",
     "../../ipc",
diff --git a/src/profiling/memory/bookkeeping.cc b/src/profiling/memory/bookkeeping.cc
index 1624cee..7a31502 100644
--- a/src/profiling/memory/bookkeeping.cc
+++ b/src/profiling/memory/bookkeeping.cc
@@ -16,37 +16,100 @@
 
 #include "src/profiling/memory/bookkeeping.h"
 
+#include "perfetto/base/logging.h"
+
 namespace perfetto {
 
-MemoryBookkeeping::Node* MemoryBookkeeping::Node::GetOrCreateChild(
-    const MemoryBookkeeping::InternedCodeLocation& loc) {
+GlobalCallstackTrie::Node* GlobalCallstackTrie::Node::GetOrCreateChild(
+    const InternedCodeLocation& loc) {
   Node* child = children_.Get(loc);
   if (!child)
     child = children_.Emplace(loc, this);
   return child;
 }
 
-void MemoryBookkeeping::RecordMalloc(const std::vector<CodeLocation>& locs,
-                                     uint64_t address,
-                                     uint64_t size) {
-  Node* node = &root_;
-  node->cum_size_ += size;
-  for (const MemoryBookkeeping::CodeLocation& loc : locs) {
-    node = node->GetOrCreateChild(InternCodeLocation(loc));
-    node->cum_size_ += size;
+void HeapTracker::RecordMalloc(const std::vector<CodeLocation>& callstack,
+                               uint64_t address,
+                               uint64_t size,
+                               uint64_t sequence_number) {
+  auto it = allocations_.find(address);
+  if (it != allocations_.end()) {
+    if (it->second.sequence_number > sequence_number) {
+      return;
+    } else {
+      // Clean up previous allocation by pretending a free happened just after
+      // it.
+      // CommitFree only uses the sequence number to check whether the
+      // currently active allocation is newer than the free, so we can make
+      // up a sequence_number here.
+      CommitFree(it->second.sequence_number + 1, address);
+    }
   }
 
-  allocations_.emplace(address, std::make_pair(size, node));
+  GlobalCallstackTrie::Node* node =
+      callsites_->IncrementCallsite(callstack, size);
+  allocations_.emplace(address, Allocation(size, sequence_number, node));
+
+  // Keep the sequence tracker consistent.
+  RecordFree(kNoopFree, sequence_number);
 }
 
-void MemoryBookkeeping::RecordFree(uint64_t address) {
+void HeapTracker::RecordFree(uint64_t address, uint64_t sequence_number) {
+  if (sequence_number != sequence_number_ + 1) {
+    pending_frees_.emplace(sequence_number, address);
+    return;
+  }
+
+  if (address != kNoopFree)
+    CommitFree(sequence_number, address);
+  sequence_number_++;
+
+  // At this point some other pending frees might be eligible to be committed.
+  auto it = pending_frees_.begin();
+  while (it != pending_frees_.end() && it->first == sequence_number_ + 1) {
+    if (it->second != kNoopFree)
+      CommitFree(it->first, it->second);
+    sequence_number_++;
+    it = pending_frees_.erase(it);
+  }
+}
+
+void HeapTracker::CommitFree(uint64_t sequence_number, uint64_t address) {
   auto leaf_it = allocations_.find(address);
   if (leaf_it == allocations_.end())
     return;
 
-  std::pair<uint64_t, Node*> value = leaf_it->second;
-  uint64_t size = value.first;
-  Node* node = value.second;
+  const Allocation& value = leaf_it->second;
+  if (value.sequence_number > sequence_number)
+    return;
+  allocations_.erase(leaf_it);
+}
+
+uint64_t GlobalCallstackTrie::GetCumSizeForTesting(
+    const std::vector<CodeLocation>& callstack) {
+  Node* node = &root_;
+  for (const CodeLocation& loc : callstack) {
+    node = node->children_.Get(InternCodeLocation(loc));
+    if (node == nullptr)
+      return 0;
+  }
+  return node->cum_size_;
+}
+
+GlobalCallstackTrie::Node* GlobalCallstackTrie::IncrementCallsite(
+    const std::vector<CodeLocation>& callstack,
+    uint64_t size) {
+  Node* node = &root_;
+  node->cum_size_ += size;
+  for (const CodeLocation& loc : callstack) {
+    node = node->GetOrCreateChild(InternCodeLocation(loc));
+    node->cum_size_ += size;
+  }
+  return node;
+}
+
+void GlobalCallstackTrie::DecrementNode(Node* node, uint64_t size) {
+  PERFETTO_DCHECK(node->cum_size_ >= size);
 
   bool delete_prev = false;
   Node* prev = nullptr;
@@ -58,19 +121,6 @@
     prev = node;
     node = node->parent_;
   }
-
-  allocations_.erase(leaf_it);
-}
-
-uint64_t MemoryBookkeeping::GetCumSizeForTesting(
-    const std::vector<CodeLocation>& locs) {
-  Node* node = &root_;
-  for (const MemoryBookkeeping::CodeLocation& loc : locs) {
-    node = node->children_.Get(InternCodeLocation(loc));
-    if (node == nullptr)
-      return 0;
-  }
-  return node->cum_size_;
 }
 
 }  // namespace perfetto
diff --git a/src/profiling/memory/bookkeeping.h b/src/profiling/memory/bookkeeping.h
index 7e5f705..bd56dd8 100644
--- a/src/profiling/memory/bookkeeping.h
+++ b/src/profiling/memory/bookkeeping.h
@@ -26,39 +26,37 @@
 
 namespace perfetto {
 
-class MemoryBookkeeping {
- public:
-  struct CodeLocation {
-    CodeLocation(std::string map_n, std::string function_n)
-        : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
+class HeapTracker;
 
-    std::string map_name;
-    std::string function_name;
-  };
+struct CodeLocation {
+  CodeLocation(std::string map_n, std::string function_n)
+      : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
 
-  void RecordMalloc(const std::vector<CodeLocation>& stack,
-                    uint64_t address,
-                    uint64_t size);
-  void RecordFree(uint64_t address);
-  uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+  std::string map_name;
+  std::string function_name;
+};
 
- private:
-  struct InternedCodeLocation {
-    StringInterner::InternedString map_name;
-    StringInterner::InternedString function_name;
+// Internal data-structure for GlobalCallstackTrie to save memory if the same
+// function is named multiple times.
+struct InternedCodeLocation {
+  StringInterner::InternedString map_name;
+  StringInterner::InternedString function_name;
 
-    bool operator<(const InternedCodeLocation& other) const {
-      if (map_name.id() == other.map_name.id())
-        return function_name.id() < other.function_name.id();
-      return map_name.id() < other.map_name.id();
-    }
-  };
-
-  InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
-    return {interner_.Intern(loc.map_name),
-            interner_.Intern(loc.function_name)};
+  bool operator<(const InternedCodeLocation& other) const {
+    if (map_name.id() == other.map_name.id())
+      return function_name.id() < other.function_name.id();
+    return map_name.id() < other.map_name.id();
   }
+};
 
+// Graph of function callsites. This is shared between heap dumps for
+// different processes. Each call site is represented by a
+// GlobalCallstackTrie::Node that is owned by the parent (i.e. calling)
+// callsite. It has a pointer to its parent, which means the function call-graph
+// can be reconstructed from a GlobalCallstackTrie::Node by walking down the
+// pointers to the parents.
+class GlobalCallstackTrie {
+ public:
   // Node in a tree of function traces that resulted in an allocation. For
   // instance, if alloc_buf is called from foo and bar, which are called from
   // main, the tree looks as following.
@@ -77,25 +75,119 @@
   // alloc_buf to the leafs of this tree.
   class Node {
    public:
+    // This is opaque except to GlobalCallstackTrie.
+    friend class GlobalCallstackTrie;
+
     Node(InternedCodeLocation location) : Node(std::move(location), nullptr) {}
     Node(InternedCodeLocation location, Node* parent)
         : parent_(parent), location_(std::move(location)) {}
 
+   private:
     Node* GetOrCreateChild(const InternedCodeLocation& loc);
 
     uint64_t cum_size_ = 0;
-    Node* parent_;
+    Node* const parent_;
     const InternedCodeLocation location_;
     base::LookupSet<Node, const InternedCodeLocation, &Node::location_>
         children_;
   };
 
-  // Address -> (size, code location)
-  std::map<uint64_t, std::pair<uint64_t, Node*>> allocations_;
+  GlobalCallstackTrie() = default;
+  GlobalCallstackTrie(const GlobalCallstackTrie&) = delete;
+  GlobalCallstackTrie& operator=(const GlobalCallstackTrie&) = delete;
+
+  uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+  Node* IncrementCallsite(const std::vector<CodeLocation>& locs, uint64_t size);
+  static void DecrementNode(Node* node, uint64_t size);
+
+ private:
+  InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
+    return {interner_.Intern(loc.map_name),
+            interner_.Intern(loc.function_name)};
+  }
+
   StringInterner interner_;
   Node root_{{interner_.Intern(""), interner_.Intern("")}};
 };
 
+// Snapshot for memory allocations of a particular process. Shares callsites
+// with other processes.
+class HeapTracker {
+ public:
+  // Caller needs to ensure that callsites outlives the HeapTracker.
+  explicit HeapTracker(GlobalCallstackTrie* callsites)
+      : callsites_(callsites) {}
+
+  void RecordMalloc(const std::vector<CodeLocation>& stack,
+                    uint64_t address,
+                    uint64_t size,
+                    uint64_t sequence_number);
+  void RecordFree(uint64_t address, uint64_t sequence_number);
+
+ private:
+  static constexpr uint64_t kNoopFree = 0;
+  struct Allocation {
+    Allocation(uint64_t size, uint64_t seq, GlobalCallstackTrie::Node* n)
+        : alloc_size(size), sequence_number(seq), node(n) {}
+
+    Allocation() = default;
+    Allocation(const Allocation&) = delete;
+    Allocation(Allocation&& other) noexcept {
+      alloc_size = other.alloc_size;
+      sequence_number = other.sequence_number;
+      node = other.node;
+      other.node = nullptr;
+    }
+
+    ~Allocation() {
+      if (node)
+        GlobalCallstackTrie::DecrementNode(node, alloc_size);
+    }
+
+    uint64_t alloc_size;
+    uint64_t sequence_number;
+    GlobalCallstackTrie::Node* node;
+  };
+
+  // Sequencing logic works as following:
+  // * mallocs are immediately commited to |allocations_|. They are rejected if
+  //   the current malloc for the address has a higher sequence number.
+  //
+  //   If all operations with sequence numbers lower than the malloc have been
+  //   commited to |allocations_|, sequence_number_ is advanced and all
+  //   unblocked pending operations after the current id are commited to
+  //   |allocations_|. Otherwise, a no-op record is added to the pending
+  //   operations queue to maintain the contiguity of the sequence.
+
+  // * for frees:
+  //   if all operations with sequence numbers lower than the free have
+  //     been commited to |allocations_| (i.e sequence_number_ ==
+  //     sequence_number - 1) the free is commited to |allocations_| and
+  //     sequence_number_ is advanced. All unblocked pending operations are
+  //     commited to |allocations_|.
+  //   otherwise: the free is added to the queue of pending operations.
+
+  // Commits a free operation into |allocations_|.
+  // This must be  called after all operations up to sequence_number have been
+  // commited to |allocations_|.
+  void CommitFree(uint64_t sequence_number, uint64_t address);
+
+  // Address -> (size, sequence_number, code location)
+  std::map<uint64_t, Allocation> allocations_;
+
+  // if allocation address != 0, there is pending free of the address.
+  // if == 0, the pending operation is a no-op.
+  // No-op operations come from allocs that have already been commited to
+  // |allocations_|. It is important to keep track of them in the list of
+  // pending to maintain the contiguity of the sequence.
+  std::map<uint64_t /* seq_id */, uint64_t /* allocation address */>
+      pending_frees_;
+
+  // The sequence number all mallocs and frees have been handled up to.
+  uint64_t sequence_number_ = 0;
+  GlobalCallstackTrie* const callsites_;
+};
+
 }  // namespace perfetto
 
 #endif  // SRC_PROFILING_MEMORY_BOOKKEEPING_H_
diff --git a/src/profiling/memory/bookkeeping_unittest.cc b/src/profiling/memory/bookkeeping_unittest.cc
index 23da7cf..1e7b369 100644
--- a/src/profiling/memory/bookkeeping_unittest.cc
+++ b/src/profiling/memory/bookkeeping_unittest.cc
@@ -22,22 +22,65 @@
 namespace perfetto {
 namespace {
 
-TEST(BookkeepingTest, Basic) {
-  std::vector<MemoryBookkeeping::CodeLocation> stack{
+std::vector<CodeLocation> stack() {
+  return {
       {"map1", "fun1"}, {"map2", "fun2"},
   };
+}
 
-  std::vector<MemoryBookkeeping::CodeLocation> stack2{
+std::vector<CodeLocation> stack2() {
+  return {
       {"map1", "fun1"}, {"map3", "fun3"},
   };
-  MemoryBookkeeping mb;
-  mb.RecordMalloc(stack, 1, 5);
-  mb.RecordMalloc(stack2, 2, 2);
-  ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
-  mb.RecordFree(2);
-  ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
-  mb.RecordFree(1);
-  ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, Basic) {
+  uint64_t sequence_number = 1;
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+
+  hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+  hd.RecordMalloc(stack2(), 2, 2, sequence_number++);
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+  hd.RecordFree(2, sequence_number++);
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+  hd.RecordFree(1, sequence_number++);
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, TwoHeapTrackers) {
+  uint64_t sequence_number = 1;
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+  {
+    HeapTracker hd2(&c);
+
+    hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+    hd2.RecordMalloc(stack2(), 2, 2, sequence_number++);
+    ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+  }
+  ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+}
+
+TEST(BookkeepingTest, ReplaceAlloc) {
+  uint64_t sequence_number = 1;
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+
+  hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+  hd.RecordMalloc(stack2(), 1, 2, sequence_number++);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack()), 0);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 2);
+}
+
+TEST(BookkeepingTest, OutOfOrder) {
+  GlobalCallstackTrie c;
+  HeapTracker hd(&c);
+
+  hd.RecordMalloc(stack(), 1, 5, 1);
+  hd.RecordMalloc(stack2(), 1, 2, 0);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack()), 5);
+  EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 0);
 }
 
 }  // namespace
diff --git a/src/profiling/memory/bounded_queue.h b/src/profiling/memory/bounded_queue.h
new file mode 100644
index 0000000..521ec5d
--- /dev/null
+++ b/src/profiling/memory/bounded_queue.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+#define SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+
+#include "perfetto/base/logging.h"
+
+// Transport messages between threads. Multiple-producer / single-consumer.
+//
+// This has to outlive both the consumer and the producer who have to
+// negotiate termination separately, if needed. This is currently only used
+// in a scenario where the producer and consumer both are loops that never
+// terminate.
+template <typename T>
+class BoundedQueue {
+ public:
+  BoundedQueue() : BoundedQueue(1) {}
+  BoundedQueue(size_t capacity) : capacity_(capacity) {
+    PERFETTO_CHECK(capacity > 0);
+  }
+
+  void Add(T item) {
+    std::unique_lock<std::mutex> l(mutex_);
+    if (deque_.size() == capacity_)
+      full_cv_.wait(l, [this] { return deque_.size() < capacity_; });
+    deque_.emplace_back(std::move(item));
+    if (deque_.size() == 1)
+      empty_cv_.notify_all();
+  }
+
+  T Get() {
+    std::unique_lock<std::mutex> l(mutex_);
+    if (elements_ == 0)
+      empty_cv_.wait(l, [this] { return !deque_.empty(); });
+    T item(std::move(deque_.front()));
+    deque_.pop_front();
+    if (deque_.size() == capacity_ - 1) {
+      l.unlock();
+      full_cv_.notify_all();
+    }
+    return item;
+  }
+
+  void SetCapacity(size_t capacity) {
+    PERFETTO_CHECK(capacity > 0);
+    {
+      std::lock_guard<std::mutex> l(mutex_);
+      capacity_ = capacity;
+    }
+    full_cv_.notify_all();
+  }
+
+ private:
+  size_t capacity_;
+  size_t elements_ = 0;
+  std::deque<T> deque_;
+  std::condition_variable full_cv_;
+  std::condition_variable empty_cv_;
+  std::mutex mutex_;
+};
+
+#endif  // SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
diff --git a/src/profiling/memory/bounded_queue_unittest.cc b/src/profiling/memory/bounded_queue_unittest.cc
new file mode 100644
index 0000000..63127d1
--- /dev/null
+++ b/src/profiling/memory/bounded_queue_unittest.cc
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/bounded_queue.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(BoundedQueueTest, IsFIFO) {
+  BoundedQueue<int> q(2);
+  q.Add(1);
+  q.Add(2);
+  EXPECT_EQ(q.Get(), 1);
+  EXPECT_EQ(q.Get(), 2);
+}
+
+TEST(BoundedQueueTest, Blocking) {
+  BoundedQueue<int> q(2);
+  q.Add(1);
+  q.Add(2);
+  std::thread th([&q] { q.Add(3); });
+  EXPECT_EQ(q.Get(), 1);
+  EXPECT_EQ(q.Get(), 2);
+  EXPECT_EQ(q.Get(), 3);
+  th.join();
+}
+
+TEST(BoundedQueueTest, Resize) {
+  BoundedQueue<int> q(2);
+  q.Add(1);
+  q.Add(2);
+  q.SetCapacity(3);
+  q.Add(3);
+  EXPECT_EQ(q.Get(), 1);
+  EXPECT_EQ(q.Get(), 2);
+  EXPECT_EQ(q.Get(), 3);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index f8d57f7..86f2063 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -18,82 +18,90 @@
 
 #include <inttypes.h>
 #include <sys/socket.h>
+#include <sys/syscall.h>
+#include <sys/un.h>
+#include <unistd.h>
 
 #include <atomic>
+#include <new>
+
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+#include <unwindstack/Regs.h>
+#include <unwindstack/RegsGetLocal.h>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/sock_utils.h"
 #include "perfetto/base/utils.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 namespace {
 
-std::atomic<uint64_t> global_sequence_number(0);
-constexpr size_t kFreePageBytes = base::kPageSize;
-constexpr size_t kFreePageSize = kFreePageBytes / sizeof(uint64_t);
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+// glibc does not define a wrapper around gettid, bionic does.
+pid_t gettid() {
+  return static_cast<pid_t>(syscall(__NR_gettid));
+}
+#endif
+
+std::vector<base::ScopedFile> ConnectPool(const std::string& sock_name,
+                                          size_t n) {
+  sockaddr_un addr;
+  socklen_t addr_size;
+  if (!base::MakeSockAddr(sock_name, &addr, &addr_size))
+    return {};
+
+  std::vector<base::ScopedFile> res;
+  res.reserve(n);
+  for (size_t i = 0; i < n; ++i) {
+    auto sock = base::CreateSocket();
+    if (connect(*sock, reinterpret_cast<sockaddr*>(&addr), addr_size) == -1) {
+      PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
+      continue;
+    }
+    res.emplace_back(std::move(sock));
+  }
+  return res;
+}
+
+inline bool IsMainThread() {
+  return getpid() == gettid();
+}
 
 }  // namespace
 
-FreePage::FreePage() : free_page_(kFreePageSize) {
-  free_page_[0] = static_cast<uint64_t>(kFreePageBytes);
-  free_page_[1] = static_cast<uint64_t>(RecordType::Free);
-  offset_ = 2;
-  // Code in Add assumes that offset is aligned to 2.
-  PERFETTO_DCHECK(offset_ % 2 == 0);
+void FreePage::Add(const uint64_t addr,
+                   const uint64_t sequence_number,
+                   SocketPool* pool) {
+  std::lock_guard<std::mutex> l(mutex_);
+  if (offset_ == kFreePageSize) {
+    FlushLocked(pool);
+    // Now that we have flushed, reset to after the header.
+    offset_ = 0;
+  }
+  FreePageEntry& current_entry = free_page_.entries[offset_++];
+  current_entry.sequence_number = sequence_number;
+  current_entry.addr = addr;
 }
 
-void FreePage::Add(const uint64_t addr, SocketPool* pool) {
-  std::lock_guard<std::mutex> l(mtx_);
-  if (offset_ == kFreePageSize)
-    Flush(pool);
-  static_assert(kFreePageSize % 2 == 0,
-                "free page size needs to be divisible by two");
-  free_page_[offset_++] = ++global_sequence_number;
-  free_page_[offset_++] = addr;
-  PERFETTO_DCHECK(offset_ % 2 == 0);
-}
-
-void FreePage::Flush(SocketPool* pool) {
+void FreePage::FlushLocked(SocketPool* pool) {
+  WireMessage msg = {};
+  msg.record_type = RecordType::Free;
+  msg.free_header = &free_page_;
   BorrowedSocket fd(pool->Borrow());
-  size_t written = 0;
-  do {
-    ssize_t wr = PERFETTO_EINTR(send(*fd, &free_page_[0] + written,
-                                     kFreePageBytes - written, MSG_NOSIGNAL));
-    if (wr == -1) {
-      fd.Close();
-      return;
-    }
-    written += static_cast<size_t>(wr);
-  } while (written < kFreePageBytes);
-  // Now that we have flushed, reset to after the header.
-  offset_ = 2;
-}
-
-BorrowedSocket::BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
-    : fd_(std::move(fd)), socket_pool_(socket_pool) {}
-
-int BorrowedSocket::operator*() {
-  return get();
-}
-
-int BorrowedSocket::get() {
-  return *fd_;
-}
-
-void BorrowedSocket::Close() {
-  fd_.reset();
-}
-
-BorrowedSocket::~BorrowedSocket() {
-  if (socket_pool_ != nullptr)
-    socket_pool_->Return(std::move(fd_));
+  SendWireMessage(*fd, msg);
 }
 
 SocketPool::SocketPool(std::vector<base::ScopedFile> sockets)
     : sockets_(std::move(sockets)), available_sockets_(sockets_.size()) {}
 
 BorrowedSocket SocketPool::Borrow() {
-  std::unique_lock<std::mutex> lck_(mtx_);
+  std::unique_lock<std::mutex> lck_(mutex_);
   if (available_sockets_ == 0)
     cv_.wait(lck_, [this] { return available_sockets_ > 0; });
   PERFETTO_CHECK(available_sockets_ > 0);
@@ -101,13 +109,14 @@
 }
 
 void SocketPool::Return(base::ScopedFile sock) {
+  PERFETTO_CHECK(dead_sockets_ + available_sockets_ < sockets_.size());
   if (!sock) {
     // TODO(fmayer): Handle reconnect or similar.
     // This is just to prevent a deadlock.
     PERFETTO_CHECK(++dead_sockets_ != sockets_.size());
     return;
   }
-  std::unique_lock<std::mutex> lck_(mtx_);
+  std::unique_lock<std::mutex> lck_(mutex_);
   PERFETTO_CHECK(available_sockets_ < sockets_.size());
   sockets_[available_sockets_++] = std::move(sock);
   if (available_sockets_ == 1) {
@@ -116,4 +125,87 @@
   }
 }
 
+const char* GetThreadStackBase() {
+  pthread_attr_t attr;
+  if (pthread_getattr_np(pthread_self(), &attr) != 0)
+    return nullptr;
+  base::ScopedResource<pthread_attr_t*, pthread_attr_destroy, nullptr> cleanup(
+      &attr);
+
+  char* stackaddr;
+  size_t stacksize;
+  if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&stackaddr),
+                            &stacksize) != 0)
+    return nullptr;
+  return stackaddr + stacksize;
+}
+
+Client::Client(std::vector<base::ScopedFile> socks)
+    : socket_pool_(std::move(socks)) {
+  uint64_t size = 0;
+  int fds[2];
+  fds[0] = open("/proc/self/maps", O_RDONLY | O_CLOEXEC);
+  fds[1] = open("/proc/self/mem", O_RDONLY | O_CLOEXEC);
+  base::Send(*socket_pool_.Borrow(), &size, sizeof(size), fds, 2);
+}
+
+Client::Client(const std::string& sock_name, size_t conns)
+    : Client(ConnectPool(sock_name, conns)) {}
+
+const char* Client::GetStackBase() {
+  if (IsMainThread()) {
+    if (!main_thread_stack_base_)
+      // Because pthread_attr_getstack reads and parses /proc/self/maps and
+      // /proc/self/stat, we have to cache the result here.
+      main_thread_stack_base_ = GetThreadStackBase();
+    return main_thread_stack_base_;
+  }
+  return GetThreadStackBase();
+}
+
+// The stack grows towards numerically smaller addresses, so the stack layout
+// of main calling malloc is as follows.
+//
+//               +------------+
+//               |SendWireMsg |
+// stacktop +--> +------------+ 0x1000
+//               |RecordMalloc|    +
+//               +------------+    |
+//               | malloc     |    |
+//               +------------+    |
+//               |  main      |    v
+// stackbase +-> +------------+ 0xffff
+void Client::RecordMalloc(uint64_t alloc_size, uint64_t alloc_address) {
+  AllocMetadata metadata;
+  const char* stackbase = GetStackBase();
+  const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+  unwindstack::AsmGetRegs(metadata.register_data);
+
+  if (stackbase < stacktop) {
+    PERFETTO_DCHECK(false);
+    return;
+  }
+
+  uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+  metadata.alloc_size = alloc_size;
+  metadata.alloc_address = alloc_address;
+  metadata.stack_pointer = reinterpret_cast<uint64_t>(stacktop);
+  metadata.stack_pointer_offset = sizeof(AllocMetadata);
+  metadata.arch = unwindstack::Regs::CurrentArch();
+  metadata.sequence_number = ++sequence_number_;
+
+  WireMessage msg{};
+  msg.alloc_header = &metadata;
+  msg.payload = const_cast<char*>(stacktop);
+  msg.payload_size = static_cast<size_t>(stack_size);
+
+  BorrowedSocket sockfd = socket_pool_.Borrow();
+  // TODO(fmayer): Handle failure.
+  PERFETTO_CHECK(SendWireMessage(*sockfd, msg));
+}
+
+void Client::RecordFree(uint64_t alloc_address) {
+  free_page_.Add(alloc_address, ++sequence_number_, &socket_pool_);
+}
+
 }  // namespace perfetto
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index 7a4e459..46a571d 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -23,47 +23,11 @@
 #include <vector>
 
 #include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 
-class SocketPool;
-
-class FreePage {
- public:
-  FreePage();
-
-  // Can be called from any thread. Must not hold mtx_.`
-  void Add(const uint64_t addr, SocketPool* pool);
-
- private:
-  // Needs to be called holding mtx_.
-  void Flush(SocketPool* pool);
-
-  std::vector<uint64_t> free_page_;
-  std::mutex mtx_;
-  size_t offset_;
-};
-
-class BorrowedSocket {
- public:
-  BorrowedSocket(const BorrowedSocket&) = delete;
-  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
-  BorrowedSocket(BorrowedSocket&& other) {
-    fd_ = std::move(other.fd_);
-    socket_pool_ = other.socket_pool_;
-    other.socket_pool_ = nullptr;
-  }
-
-  BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool);
-  int operator*();
-  int get();
-  void Close();
-  ~BorrowedSocket();
-
- private:
-  base::ScopedFile fd_;
-  SocketPool* socket_pool_ = nullptr;
-};
+class BorrowedSocket;
 
 class SocketPool {
  public:
@@ -74,13 +38,80 @@
 
  private:
   void Return(base::ScopedFile fd);
-  std::mutex mtx_;
+  std::mutex mutex_;
   std::condition_variable cv_;
   std::vector<base::ScopedFile> sockets_;
   size_t available_sockets_;
   size_t dead_sockets_ = 0;
 };
 
+// Socket borrowed from a SocketPool. Gets returned once it goes out of scope.
+class BorrowedSocket {
+ public:
+  BorrowedSocket(const BorrowedSocket&) = delete;
+  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
+  BorrowedSocket(BorrowedSocket&& other) noexcept {
+    fd_ = std::move(other.fd_);
+    socket_pool_ = other.socket_pool_;
+    other.socket_pool_ = nullptr;
+  }
+
+  BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
+      : fd_(std::move(fd)), socket_pool_(socket_pool) {}
+
+  ~BorrowedSocket() {
+    if (socket_pool_ != nullptr)
+      socket_pool_->Return(std::move(fd_));
+  }
+
+  int operator*() { return get(); }
+
+  int get() { return *fd_; }
+
+  void Close() { fd_.reset(); }
+
+ private:
+  base::ScopedFile fd_;
+  SocketPool* socket_pool_ = nullptr;
+};
+
+// Cache for frees that have been observed. It is infeasible to send every
+// free separately, so we batch and send the whole buffer once it is full.
+class FreePage {
+ public:
+  // Add address to buffer. Flush if necessary using a socket borrowed from
+  // pool.
+  // Can be called from any thread. Must not hold mutex_.`
+  void Add(const uint64_t addr, uint64_t sequence_number, SocketPool* pool);
+
+ private:
+  // Needs to be called holding mutex_.
+  void FlushLocked(SocketPool* pool);
+
+  FreeMetadata free_page_;
+  std::mutex mutex_;
+  size_t offset_ = 0;
+};
+
+const char* GetThreadStackBase();
+
+// This is created and owned by the malloc hooks.
+class Client {
+ public:
+  Client(std::vector<base::ScopedFile> sockets);
+  Client(const std::string& sock_name, size_t conns);
+  void RecordMalloc(uint64_t alloc_size, uint64_t alloc_address);
+  void RecordFree(uint64_t alloc_address);
+
+ private:
+  const char* GetStackBase();
+
+  SocketPool socket_pool_;
+  FreePage free_page_;
+  const char* main_thread_stack_base_ = nullptr;
+  std::atomic<uint64_t> sequence_number_{0};
+};
+
 }  // namespace perfetto
 
 #endif  // SRC_PROFILING_MEMORY_CLIENT_H_
diff --git a/src/profiling/memory/client_unittest.cc b/src/profiling/memory/client_unittest.cc
index a69bae3..df848da 100644
--- a/src/profiling/memory/client_unittest.cc
+++ b/src/profiling/memory/client_unittest.cc
@@ -67,5 +67,17 @@
   t2.join();
 }
 
+TEST(ClientTest, GetThreadStackBase) {
+  std::thread th([] {
+    const char* stackbase = GetThreadStackBase();
+    ASSERT_NE(stackbase, nullptr);
+    // The implementation assumes the stack grows from higher addresses to
+    // lower. We will need to rework once we encounter architectures where the
+    // stack grows the other way.
+    EXPECT_GT(stackbase, __builtin_frame_address(0));
+  });
+  th.join();
+}
+
 }  // namespace
 }  // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
new file mode 100644
index 0000000..c9b6219
--- /dev/null
+++ b/src/profiling/memory/heapprofd_integrationtest.cc
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/base/test/test_task_runner.h"
+#include "src/ipc/test/test_socket.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/socket_listener.h"
+#include "src/profiling/memory/unwinding.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+namespace {
+
+constexpr char kSocketName[] = TEST_SOCK_NAME("heapprofd_integrationtest");
+
+void __attribute__((noinline)) OtherFunction(Client* client) {
+  client->RecordMalloc(10, 0xf00);
+}
+
+void __attribute__((noinline)) SomeFunction(Client* client) {
+  OtherFunction(client);
+}
+
+class HeapprofdIntegrationTest : public ::testing::Test {
+ protected:
+  void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
+  void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
+};
+
+// TODO(fmayer): Fix out of tree integration test.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+#define MAYBE_EndToEnd EndToEnd
+#else
+#define MAYBE_EndToEnd DISABLED_EndToEnd
+#endif
+
+TEST_F(HeapprofdIntegrationTest, MAYBE_EndToEnd) {
+  GlobalCallstackTrie callsites;
+
+  base::TestTaskRunner task_runner;
+  auto done = task_runner.CreateCheckpoint("done");
+  SocketListener listener(
+      [&done](UnwindingRecord r) {
+        // TODO(fmayer): Test symbolization and result of unwinding.
+        BookkeepingRecord bookkeeping_record;
+        ASSERT_TRUE(HandleUnwindingRecord(&r, &bookkeeping_record));
+        HandleBookkeepingRecord(&bookkeeping_record);
+        done();
+      },
+      &callsites);
+
+  auto sock = ipc::UnixSocket::Listen(kSocketName, &listener, &task_runner);
+  if (!sock->is_listening()) {
+    PERFETTO_ELOG("Socket not listening.");
+    PERFETTO_CHECK(false);
+  }
+  Client client(kSocketName, 1);
+  SomeFunction(&client);
+  task_runner.RunUntilCheckpoint("done");
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index ea53977..991006e 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -15,9 +15,12 @@
  */
 
 #include <stdlib.h>
+#include <array>
 #include <memory>
+#include <vector>
 
 #include "src/ipc/unix_socket.h"
+#include "src/profiling/memory/bounded_queue.h"
 #include "src/profiling/memory/socket_listener.h"
 
 #include "perfetto/base/unix_task_runner.h"
@@ -25,15 +28,60 @@
 namespace perfetto {
 namespace {
 
+constexpr size_t kUnwinderQueueSize = 1000;
+constexpr size_t kBookkeepingQueueSize = 1000;
+constexpr size_t kUnwinderThreads = 5;
+
+// We create kUnwinderThreads unwinding threads and one bookeeping thread.
+// The bookkeeping thread is singleton in order to avoid expensive and
+// complicated synchronisation in the bookkeeping.
+//
+// We wire up the system by creating BoundedQueues between the threads. The main
+// thread runs the TaskRunner driving the SocketListener. The unwinding thread
+// takes the data received by the SocketListener and if it is a malloc does
+// stack unwinding, and if it is a free just forwards the content of the record
+// to the bookkeeping thread.
+//
+//             +--------------+
+//             |SocketListener|
+//             +------+-------+
+//                    |
+//          +--UnwindingRecord -+
+//          |                   |
+// +--------v-------+   +-------v--------+
+// |Unwinding Thread|   |Unwinding Thread|
+// +--------+-------+   +-------+--------+
+//          |                   |
+//          +-BookkeepingRecord +
+//                    |
+//           +--------v---------+
+//           |Bookkeeping Thread|
+//           +------------------+
 int HeapprofdMain(int argc, char** argv) {
+  GlobalCallstackTrie callsites;
   std::unique_ptr<ipc::UnixSocket> sock;
 
-  SocketListener listener(
-      [](size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>) {
-        // TODO(fmayer): Wire this up to a worker thread that does the
-        // unwinding.
-        PERFETTO_LOG("Record received.");
-      });
+  BoundedQueue<BookkeepingRecord> callsites_queue(kBookkeepingQueueSize);
+  std::thread bookkeeping_thread(
+      [&callsites_queue] { BookkeepingMainLoop(&callsites_queue); });
+
+  std::array<BoundedQueue<UnwindingRecord>, kUnwinderThreads> unwinder_queues;
+  for (size_t i = 0; i < kUnwinderThreads; ++i)
+    unwinder_queues[i].SetSize(kUnwinderQueueSize);
+  std::vector<std::thread> unwinding_threads;
+  unwinding_threads.reserve(kUnwinderThreads);
+  for (size_t i = 0; i < kUnwinderThreads; ++i) {
+    unwinding_threads.emplace_back([&unwinder_queues, &callsites_queue, i] {
+      UnwindingMainLoop(&unwinder_queues[i], &callsites_queue);
+    });
+  }
+
+  auto on_record_received = [&unwinder_queues](UnwindingRecord r) {
+    unwinder_queues[static_cast<size_t>(r.pid) % kUnwinderThreads].Add(
+        std::move(r));
+  };
+  SocketListener listener(std::move(on_record_received), &callsites);
+
   base::UnixTaskRunner read_task_runner;
   if (argc == 2) {
     // Allow to be able to manually specify the socket to listen on
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
index 81b7390..f857a42 100644
--- a/src/profiling/memory/socket_listener.cc
+++ b/src/profiling/memory/socket_listener.cc
@@ -82,7 +82,7 @@
   if (it == process_metadata_.end() || it->second.expired()) {
     // We have not seen the PID yet or the PID is being recycled.
     entry->process_metadata = std::make_shared<ProcessMetadata>(
-        peer_pid, std::move(maps_fd), std::move(mem_fd));
+        peer_pid, std::move(maps_fd), std::move(mem_fd), callsites_);
     process_metadata_[peer_pid] = entry->process_metadata;
   } else {
     // If the process already has metadata, this is an additional socket for
@@ -96,19 +96,30 @@
                                     size_t size,
                                     std::unique_ptr<uint8_t[]> buf) {
   auto it = sockets_.find(self);
-  if (it == sockets_.end())
+  if (it == sockets_.end()) {
     // This happens for zero-length records, because the callback gets called
     // in the first call to Read, before InitProcess is called. Because zero
     // length records are useless anyway, this is not a problem.
     return;
+  }
   Entry& entry = it->second;
+  if (!entry.process_metadata) {
+    PERFETTO_DLOG("Received record without process metadata.");
+    return;
+  }
+
+  if (size == 0) {
+    PERFETTO_DLOG("Dropping empty record.");
+    return;
+  }
   // This needs to be a weak_ptr for two reasons:
   // 1) most importantly, the weak_ptr in process_metadata_ should expire as
   // soon as the last socket for a process goes away. Otherwise, a recycled
   // PID might reuse incorrect metadata.
   // 2) it is a waste to unwind for a process that had already gone away.
   std::weak_ptr<ProcessMetadata> weak_metadata(entry.process_metadata);
-  callback_function_(size, std::move(buf), std::move(weak_metadata));
+  callback_function_({entry.process_metadata->pid, size, std::move(buf),
+                      std::move(weak_metadata)});
 }
 
 }  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
index ba5d92e..8b1609d 100644
--- a/src/profiling/memory/socket_listener.h
+++ b/src/profiling/memory/socket_listener.h
@@ -18,6 +18,7 @@
 #define SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
 
 #include "src/ipc/unix_socket.h"
+#include "src/profiling/memory/bookkeeping.h"
 #include "src/profiling/memory/record_reader.h"
 #include "src/profiling/memory/unwinding.h"
 
@@ -28,10 +29,9 @@
 
 class SocketListener : public ipc::UnixSocket::EventListener {
  public:
-  SocketListener(std::function<void(size_t,
-                                    std::unique_ptr<uint8_t[]>,
-                                    std::weak_ptr<ProcessMetadata>)> fn)
-      : callback_function_(std::move(fn)) {}
+  SocketListener(std::function<void(UnwindingRecord)> fn,
+                 GlobalCallstackTrie* callsites)
+      : callback_function_(std::move(fn)), callsites_(callsites) {}
   void OnDisconnect(ipc::UnixSocket* self) override;
   void OnNewIncomingConnection(
       ipc::UnixSocket* self,
@@ -63,9 +63,8 @@
 
   std::map<ipc::UnixSocket*, Entry> sockets_;
   std::map<pid_t, std::weak_ptr<ProcessMetadata>> process_metadata_;
-  std::function<
-      void(size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>)>
-      callback_function_;
+  std::function<void(UnwindingRecord)> callback_function_;
+  GlobalCallstackTrie* callsites_;
 };
 
 }  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index da7efdb..2dd6310 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -46,16 +46,15 @@
   base::TestTaskRunner task_runner;
   auto callback_called = task_runner.CreateCheckpoint("callback.called");
   auto connected = task_runner.CreateCheckpoint("connected");
-  auto callback_fn = [&callback_called](
-                         size_t size, std::unique_ptr<uint8_t[]> buf,
-                         std::weak_ptr<ProcessMetadata> metadata) {
-    ASSERT_EQ(size, 1u);
-    ASSERT_EQ(buf[0], '1');
-    ASSERT_FALSE(metadata.expired());
+  auto callback_fn = [&callback_called](UnwindingRecord r) {
+    ASSERT_EQ(r.size, 1u);
+    ASSERT_EQ(r.data[0], '1');
+    ASSERT_FALSE(r.metadata.expired());
     callback_called();
   };
 
-  SocketListener listener(std::move(callback_fn));
+  GlobalCallstackTrie bookkeeping;
+  SocketListener listener(std::move(callback_fn), &bookkeeping);
   MockEventListener client_listener;
   EXPECT_CALL(client_listener, OnConnect(_, _))
       .WillOnce(InvokeWithoutArgs(connected));
diff --git a/src/profiling/memory/transport_data.h b/src/profiling/memory/transport_data.h
deleted file mode 100644
index c0ef3ef..0000000
--- a/src/profiling/memory/transport_data.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// The data types used for communication between heapprofd and the client
-// embedded in processes that are being profiled.
-
-#ifndef SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-#define SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-
-#include <inttypes.h>
-#include <unwindstack/Elf.h>
-
-namespace perfetto {
-
-// Use uint64_t to make sure the following data is aligned as 64bit is the
-// strongest alignment requirement.
-enum class RecordType : uint64_t {
-  Free = 0,
-  Malloc = 1,
-};
-
-struct AllocMetadata {
-  // Size of the allocation that was made.
-  uint64_t alloc_size;
-  // Pointer returned by malloc(2) for this allocation.
-  uint64_t alloc_address;
-  // Current value of the stack pointer.
-  uint64_t stack_pointer;
-  // Offset of the data at stack_pointer from the start of this record.
-  uint64_t stack_pointer_offset;
-  // CPU architecture of the client. This determines the size of the
-  // register data that follows this struct.
-  unwindstack::ArchEnum arch;
-};
-
-}  // namespace perfetto
-
-#endif  // SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index a317600..1dcc8c0 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -42,8 +42,8 @@
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
 #include "src/profiling/memory/unwinding.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 
@@ -155,34 +155,19 @@
   maps_.clear();
 }
 
-bool DoUnwind(void* mem,
-              size_t sz,
-              ProcessMetadata* metadata,
-              std::vector<unwindstack::FrameData>* out) {
-  if (sz < sizeof(AllocMetadata)) {
-    PERFETTO_ELOG("size");
-    return false;
-  }
-  AllocMetadata* alloc_metadata = reinterpret_cast<AllocMetadata*>(mem);
-  if (sizeof(AllocMetadata) + RegSize(alloc_metadata->arch) > sz)
-    return false;
-  void* reg_data = static_cast<uint8_t*>(mem) + sizeof(AllocMetadata);
+bool DoUnwind(WireMessage* msg, ProcessMetadata* metadata, AllocRecord* out) {
+  AllocMetadata* alloc_metadata = msg->alloc_header;
   std::unique_ptr<unwindstack::Regs> regs(
-      CreateFromRawData(alloc_metadata->arch, reg_data));
+      CreateFromRawData(alloc_metadata->arch, alloc_metadata->register_data));
   if (regs == nullptr) {
     PERFETTO_ELOG("regs");
     return false;
   }
-  if (alloc_metadata->stack_pointer_offset < sizeof(AllocMetadata) ||
-      alloc_metadata->stack_pointer_offset > sz) {
-    PERFETTO_ELOG("out-of-bound stack_pointer_offset");
-    return false;
-  }
-  uint8_t* stack =
-      reinterpret_cast<uint8_t*>(mem) + alloc_metadata->stack_pointer_offset;
+  out->alloc_metadata = *alloc_metadata;
+  uint8_t* stack = reinterpret_cast<uint8_t*>(msg->payload);
   std::shared_ptr<unwindstack::Memory> mems = std::make_shared<StackMemory>(
       *metadata->mem_fd, alloc_metadata->stack_pointer, stack,
-      sz - alloc_metadata->stack_pointer_offset);
+      msg->payload_size);
   unwindstack::Unwinder unwinder(kMaxFrames, &metadata->maps, regs.get(), mems);
   // Surpress incorrect "variable may be uninitialized" error for if condition
   // after this loop. error_code = LastErrorCode gets run at least once.
@@ -198,10 +183,83 @@
       break;
   }
   if (error_code == 0)
-    *out = unwinder.frames();
+    out->frames = unwinder.frames();
   else
     PERFETTO_DLOG("unwinding failed %" PRIu8, error_code);
   return error_code == 0;
 }
 
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out) {
+  WireMessage msg;
+  if (!ReceiveWireMessage(reinterpret_cast<char*>(rec->data.get()), rec->size,
+                          &msg))
+    return false;
+  switch (msg.record_type) {
+    case RecordType::Malloc: {
+      std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+      if (!metadata)
+        // Process has already gone away.
+        return false;
+
+      out->metadata = std::move(rec->metadata);
+      out->free_record = {};
+      return DoUnwind(&msg, metadata.get(), &out->alloc_record);
+    }
+    case RecordType::Free: {
+      // We need to keep this alive, because msg.free_header is a pointer into
+      // this.
+      out->metadata = std::move(rec->metadata);
+      out->free_record.free_data = std::move(rec->data);
+      out->free_record.metadata = msg.free_header;
+      out->alloc_record = {};
+      return true;
+    }
+  }
+}
+
+__attribute__((noreturn)) void UnwindingMainLoop(
+    BoundedQueue<UnwindingRecord>* input_queue,
+    BoundedQueue<BookkeepingRecord>* output_queue) {
+  for (;;) {
+    UnwindingRecord rec = input_queue->Get();
+    BookkeepingRecord out;
+    if (HandleUnwindingRecord(&rec, &out))
+      output_queue->Add(std::move(out));
+  }
+}
+
+void HandleBookkeepingRecord(BookkeepingRecord* rec) {
+  std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+  if (!metadata)
+    // Process has already gone away.
+    return;
+
+  if (rec->free_record.free_data) {
+    FreeRecord& free_rec = rec->free_record;
+    FreePageEntry* entries = free_rec.metadata->entries;
+    uint64_t num_entries = free_rec.metadata->num_entries;
+    for (size_t i = 0; i < num_entries; ++i) {
+      const FreePageEntry& entry = entries[i];
+      metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
+    }
+  } else {
+    AllocRecord& alloc_rec = rec->alloc_record;
+    std::vector<CodeLocation> code_locations;
+    for (unwindstack::FrameData& frame : alloc_rec.frames)
+      code_locations.emplace_back(frame.map_name, frame.function_name);
+    metadata->heap_dump.RecordMalloc(code_locations,
+                                     alloc_rec.alloc_metadata.alloc_address,
+                                     alloc_rec.alloc_metadata.alloc_size,
+                                     alloc_rec.alloc_metadata.sequence_number);
+  }
+}
+
+__attribute__((noreturn)) void BookkeepingMainLoop(
+    BoundedQueue<BookkeepingRecord>* input_queue) {
+  for (;;) {
+    BookkeepingRecord rec = input_queue->Get();
+    HandleBookkeepingRecord(&rec);
+  }
+}
+
 }  // namespace perfetto
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index 455be43..1bdcf0d 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -20,6 +20,9 @@
 #include <unwindstack/Maps.h>
 #include <unwindstack/Unwinder.h>
 #include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/bookkeeping.h"
+#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 
@@ -36,13 +39,20 @@
 };
 
 struct ProcessMetadata {
-  ProcessMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
-      : pid(p), maps(std::move(maps_fd)), mem_fd(std::move(mem)) {
+  ProcessMetadata(pid_t p,
+                  base::ScopedFile maps_fd,
+                  base::ScopedFile mem,
+                  GlobalCallstackTrie* callsites)
+      : pid(p),
+        maps(std::move(maps_fd)),
+        mem_fd(std::move(mem)),
+        heap_dump(callsites) {
     PERFETTO_CHECK(maps.Parse());
   }
   pid_t pid;
   FileDescriptorMaps maps;
   base::ScopedFile mem_fd;
+  HeapTracker heap_dump;
 };
 
 // Overlays size bytes pointed to by stack for addresses in [sp, sp + size).
@@ -62,10 +72,39 @@
 
 size_t RegSize(unwindstack::ArchEnum arch);
 
-bool DoUnwind(void* mem,
-              size_t sz,
-              ProcessMetadata* metadata,
-              std::vector<unwindstack::FrameData>* out);
+struct UnwindingRecord {
+  pid_t pid;
+  size_t size;
+  std::unique_ptr<uint8_t[]> data;
+  std::weak_ptr<ProcessMetadata> metadata;
+};
+
+struct FreeRecord {
+  std::unique_ptr<uint8_t[]> free_data;
+  FreeMetadata* metadata;
+};
+
+struct AllocRecord {
+  AllocMetadata alloc_metadata;
+  std::vector<unwindstack::FrameData> frames;
+};
+
+struct BookkeepingRecord {
+  // TODO(fmayer): Use a union.
+  std::weak_ptr<ProcessMetadata> metadata;
+  AllocRecord alloc_record;
+  FreeRecord free_record;
+};
+
+bool DoUnwind(WireMessage*, ProcessMetadata* metadata, AllocRecord* out);
+
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out);
+void HandleBookkeepingRecord(BookkeepingRecord* rec);
+
+void UnwindingMainLoop(BoundedQueue<UnwindingRecord>* input_queue,
+                       BoundedQueue<BookkeepingRecord>* output_queue);
+
+void BookkeepingMainLoop(BoundedQueue<BookkeepingRecord>* input_queue);
 
 }  // namespace perfetto
 
diff --git a/src/profiling/memory/unwinding_unittest.cc b/src/profiling/memory/unwinding_unittest.cc
index 06f3ba9..f80a0ee 100644
--- a/src/profiling/memory/unwinding_unittest.cc
+++ b/src/profiling/memory/unwinding_unittest.cc
@@ -16,7 +16,8 @@
 
 #include "src/profiling/memory/unwinding.h"
 #include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
@@ -70,20 +71,6 @@
 #define MAYBE_DoUnwind DISABLED_DoUnwind
 #endif
 
-uint8_t* GetStackBase() {
-  pthread_t t = pthread_self();
-  pthread_attr_t attr;
-  if (pthread_getattr_np(t, &attr) != 0) {
-    return nullptr;
-  }
-  uint8_t* x;
-  size_t s;
-  if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&x), &s) != 0)
-    return nullptr;
-  pthread_attr_destroy(&attr);
-  return x + s;
-}
-
 // This is needed because ASAN thinks copying the whole stack is a buffer
 // underrun.
 void __attribute__((noinline))
@@ -95,50 +82,59 @@
     to[i] = from[i];
 }
 
-std::pair<std::unique_ptr<uint8_t[]>, size_t> __attribute__((noinline))
-GetRecord() {
-  const uint8_t* stackbase = GetStackBase();
-  PERFETTO_CHECK(stackbase != nullptr);
-  const uint8_t* stacktop =
-      reinterpret_cast<uint8_t*>(__builtin_frame_address(0));
-  PERFETTO_CHECK(stacktop != nullptr);
-  PERFETTO_CHECK(stacktop < stackbase);
-  const size_t stack_size = static_cast<size_t>(stackbase - stacktop);
-  std::unique_ptr<unwindstack::Regs> regs(unwindstack::Regs::CreateFromLocal());
-  const unwindstack::ArchEnum arch = regs->CurrentArch();
-  const size_t reg_size = RegSize(arch);
-  const size_t total_size = sizeof(AllocMetadata) + reg_size + stack_size;
-  std::unique_ptr<uint8_t[]> buf(new uint8_t[total_size]);
-  AllocMetadata* metadata = reinterpret_cast<AllocMetadata*>(buf.get());
-  metadata->alloc_size = 0;
-  metadata->alloc_address = 0;
+struct RecordMemory {
+  std::unique_ptr<uint8_t[]> payload;
+  std::unique_ptr<AllocMetadata> metadata;
+};
+
+RecordMemory __attribute__((noinline)) GetRecord(WireMessage* msg) {
+  std::unique_ptr<AllocMetadata> metadata(new AllocMetadata);
+
+  const char* stackbase = GetThreadStackBase();
+  const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+  unwindstack::AsmGetRegs(metadata->register_data);
+
+  if (stackbase < stacktop) {
+    PERFETTO_DCHECK(false);
+    return {nullptr, nullptr};
+  }
+  uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+
+  metadata->alloc_size = 10;
+  metadata->alloc_address = 0x10;
   metadata->stack_pointer = reinterpret_cast<uint64_t>(stacktop);
-  metadata->stack_pointer_offset = sizeof(AllocMetadata) + reg_size;
-  metadata->arch = arch;
-  unwindstack::RegsGetLocal(regs.get());
-  // Make sure nothing above has changed the stack pointer, just for extra
-  // paranoia.
-  PERFETTO_CHECK(stacktop ==
-                 reinterpret_cast<uint8_t*>(__builtin_frame_address(0)));
-  memcpy(buf.get() + sizeof(AllocMetadata), regs->RawData(), reg_size);
-  UnsafeMemcpy(buf.get() + sizeof(AllocMetadata) + reg_size, stacktop,
-               stack_size);
-  return {std::move(buf), total_size};
+  metadata->stack_pointer_offset = sizeof(AllocMetadata);
+  metadata->arch = unwindstack::Regs::CurrentArch();
+  metadata->sequence_number = 1;
+
+  std::unique_ptr<uint8_t[]> payload(new uint8_t[stack_size]);
+  UnsafeMemcpy(&payload[0], stacktop, stack_size);
+
+  *msg = {};
+  msg->alloc_header = metadata.get();
+  msg->payload = reinterpret_cast<char*>(payload.get());
+  msg->payload_size = static_cast<size_t>(stack_size);
+  return {std::move(payload), std::move(metadata)};
 }
 
 // TODO(fmayer): Investigate why this fails out of tree.
 TEST(UnwindingTest, MAYBE_DoUnwind) {
   base::ScopedFile proc_maps(open("/proc/self/maps", O_RDONLY));
   base::ScopedFile proc_mem(open("/proc/self/mem", O_RDONLY));
-  ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem));
-  auto record = GetRecord();
-  std::vector<unwindstack::FrameData> out;
-  ASSERT_TRUE(DoUnwind(record.first.get(), record.second, &metadata, &out));
+  GlobalCallstackTrie callsites;
+  ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem),
+                           &callsites);
+  WireMessage msg;
+  auto record = GetRecord(&msg);
+  AllocRecord out;
+  ASSERT_TRUE(DoUnwind(&msg, &metadata, &out));
   int st;
-  std::unique_ptr<char> demangled(
-      abi::__cxa_demangle(out[0].function_name.c_str(), nullptr, nullptr, &st));
+  std::unique_ptr<char> demangled(abi::__cxa_demangle(
+      out.frames[0].function_name.c_str(), nullptr, nullptr, &st));
   ASSERT_EQ(st, 0);
-  ASSERT_STREQ(demangled.get(), "perfetto::(anonymous namespace)::GetRecord()");
+  ASSERT_STREQ(
+      demangled.get(),
+      "perfetto::(anonymous namespace)::GetRecord(perfetto::WireMessage*)");
 }
 
 }  // namespace
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
new file mode 100644
index 0000000..39373f9
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.cc
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+namespace perfetto {
+
+namespace {
+template <typename T>
+bool ViewAndAdvance(char** ptr, T** out, const char* end) {
+  if (end - sizeof(T) < *ptr)
+    return false;
+  *out = reinterpret_cast<T*>(ptr);
+  ptr += sizeof(T);
+  return true;
+}
+}  // namespace
+
+bool SendWireMessage(int sock, const WireMessage& msg) {
+  uint64_t total_size;
+  struct iovec iovecs[4] = {};
+  // TODO(fmayer): Maye pack these two.
+  iovecs[0].iov_base = &total_size;
+  iovecs[0].iov_len = sizeof(total_size);
+  iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
+  iovecs[1].iov_len = sizeof(msg.record_type);
+  if (msg.alloc_header) {
+    iovecs[2].iov_base = msg.alloc_header;
+    iovecs[2].iov_len = sizeof(*msg.alloc_header);
+  } else if (msg.free_header) {
+    iovecs[2].iov_base = msg.free_header;
+    iovecs[2].iov_len = sizeof(*msg.free_header);
+  } else {
+    PERFETTO_DCHECK(false);
+    return false;
+  }
+
+  iovecs[3].iov_base = msg.payload;
+  iovecs[3].iov_len = msg.payload_size;
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iovecs;
+  if (msg.payload) {
+    hdr.msg_iovlen = base::ArraySize(iovecs);
+    total_size = iovecs[1].iov_len + iovecs[2].iov_len + iovecs[3].iov_len;
+  } else {
+    // If we are not sending payload, just ignore that iovec.
+    hdr.msg_iovlen = base::ArraySize(iovecs) - 1;
+    total_size = iovecs[1].iov_len + iovecs[2].iov_len;
+  }
+
+  ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+  return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
+}
+
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out) {
+  RecordType* record_type;
+  char* end = buf + size;
+  if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
+    return false;
+  switch (*record_type) {
+    case RecordType::Malloc:
+      if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+        return false;
+      out->payload = buf;
+      if (buf > end) {
+        PERFETTO_DCHECK(false);
+        return false;
+      }
+      out->payload_size = static_cast<size_t>(end - buf);
+      break;
+    case RecordType::Free:
+      if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+        return false;
+      break;
+  }
+  out->record_type = *record_type;
+  return true;
+}
+
+}  // namespace perfetto
diff --git a/src/profiling/memory/wire_protocol.h b/src/profiling/memory/wire_protocol.h
new file mode 100644
index 0000000..2810872
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.h
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The data types used for communication between heapprofd and the client
+// embedded in processes that are being profiled.
+
+#ifndef SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+#define SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+
+#include <inttypes.h>
+#include <unwindstack/Elf.h>
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+
+namespace perfetto {
+
+// Types needed for the wire format used for communication between the client
+// and heapprofd. The basic format of a record is
+// record size (uint64_t) | record type (RecordType = uint64_t) | record
+// If record type is malloc, the record format is AllocMetdata | raw stack.
+// If the record type is free, the record is a sequence of FreePageEntry.
+
+// Use uint64_t to make sure the following data is aligned as 64bit is the
+// strongest alignment requirement.
+
+// C++11 std::max is not constexpr.
+constexpr size_t constexpr_max(size_t x, size_t y) {
+  return x > y ? x : y;
+}
+
+constexpr size_t kMaxRegisterDataSize = constexpr_max(
+    constexpr_max(constexpr_max(unwindstack::ARM_REG_LAST * sizeof(uint32_t),
+                                unwindstack::ARM64_REG_LAST * sizeof(uint64_t)),
+                  unwindstack::X86_REG_LAST * sizeof(uint32_t)),
+    unwindstack::X86_64_REG_LAST * sizeof(uint64_t));
+
+constexpr size_t kFreePageSize = 1024;
+
+enum class RecordType : uint64_t {
+  Free = 0,
+  Malloc = 1,
+};
+
+struct AllocMetadata {
+  uint64_t sequence_number;
+  // Size of the allocation that was made.
+  uint64_t alloc_size;
+  // Pointer returned by malloc(2) for this allocation.
+  uint64_t alloc_address;
+  // Current value of the stack pointer.
+  uint64_t stack_pointer;
+  // Offset of the data at stack_pointer from the start of this record.
+  uint64_t stack_pointer_offset;
+  // CPU architecture of the client. This determines the size of the
+  // register data that follows this struct.
+  unwindstack::ArchEnum arch;
+  char register_data[kMaxRegisterDataSize];
+};
+
+struct FreePageEntry {
+  uint64_t sequence_number;
+  uint64_t addr;
+};
+
+struct FreeMetadata {
+  uint64_t num_entries;
+  FreePageEntry entries[kFreePageSize];
+};
+
+struct WireMessage {
+  RecordType record_type;
+
+  AllocMetadata* alloc_header;
+  FreeMetadata* free_header;
+
+  char* payload;
+  size_t payload_size;
+};
+
+bool SendWireMessage(int sock, const WireMessage& msg);
+
+// Parse message received over the wire.
+// |buf| has to outlive |out|.
+// If buf is not a valid message, return false.
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out);
+
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_