Use the SharedRingBuffer for sending payload from client to service.

This simplifies the code as we no longer have to deal with the
complexity of having to quickly drain the client socket in order to
unblock the client anymore.

We now use standard base::TaskRunner for our threads, including the
unwinding threads, as we no longer need to provide backpressure.

Remove code providing support for multiple DataSources about the same
process. We now only accept one DataSource per process, and we pick the
earliest we receive from traced. This simplifies the matching between
DataSource and process.

Change-Id: I9f91f7d4993a37eb8e92a43108b1cd8883b229c6
Bug: 126724929
Bug: 125891203
diff --git a/Android.bp b/Android.bp
index 8317acd..d68cdc0 100644
--- a/Android.bp
+++ b/Android.bp
@@ -62,9 +62,9 @@
     "src/profiling/memory/heapprofd_producer.cc",
     "src/profiling/memory/main.cc",
     "src/profiling/memory/proc_utils.cc",
-    "src/profiling/memory/process_matcher.cc",
     "src/profiling/memory/record_reader.cc",
-    "src/profiling/memory/socket_listener.cc",
+    "src/profiling/memory/scoped_spinlock.cc",
+    "src/profiling/memory/shared_ring_buffer.cc",
     "src/profiling/memory/system_property.cc",
     "src/profiling/memory/unwinding.cc",
     "src/profiling/memory/wire_protocol.cc",
@@ -171,6 +171,7 @@
     "src/profiling/memory/malloc_hooks.cc",
     "src/profiling/memory/proc_utils.cc",
     "src/profiling/memory/scoped_spinlock.cc",
+    "src/profiling/memory/shared_ring_buffer.cc",
     "src/profiling/memory/wire_protocol.cc",
   ],
   shared_libs: [
@@ -591,10 +592,9 @@
     "src/profiling/memory/heapprofd_end_to_end_test.cc",
     "src/profiling/memory/heapprofd_producer.cc",
     "src/profiling/memory/proc_utils.cc",
-    "src/profiling/memory/process_matcher.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/scoped_spinlock.cc",
-    "src/profiling/memory/socket_listener.cc",
+    "src/profiling/memory/shared_ring_buffer.cc",
     "src/profiling/memory/system_property.cc",
     "src/profiling/memory/unwinding.cc",
     "src/profiling/memory/wire_protocol.cc",
@@ -2849,24 +2849,18 @@
     "src/perfetto_cmd/rate_limiter_unittest.cc",
     "src/profiling/memory/bookkeeping.cc",
     "src/profiling/memory/bookkeeping_unittest.cc",
-    "src/profiling/memory/bounded_queue_unittest.cc",
     "src/profiling/memory/client.cc",
     "src/profiling/memory/client_unittest.cc",
-    "src/profiling/memory/heapprofd_integrationtest.cc",
     "src/profiling/memory/heapprofd_producer.cc",
     "src/profiling/memory/interner_unittest.cc",
     "src/profiling/memory/proc_utils.cc",
     "src/profiling/memory/proc_utils_unittest.cc",
-    "src/profiling/memory/process_matcher.cc",
-    "src/profiling/memory/process_matcher_unittest.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/record_reader_unittest.cc",
     "src/profiling/memory/sampler_unittest.cc",
     "src/profiling/memory/scoped_spinlock.cc",
     "src/profiling/memory/shared_ring_buffer.cc",
     "src/profiling/memory/shared_ring_buffer_unittest.cc",
-    "src/profiling/memory/socket_listener.cc",
-    "src/profiling/memory/socket_listener_unittest.cc",
     "src/profiling/memory/system_property.cc",
     "src/profiling/memory/system_property_unittest.cc",
     "src/profiling/memory/unwinding.cc",
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index d75d87f..63e964e 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -26,6 +26,7 @@
 source_set("wire_protocol") {
   public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
+    ":ring_buffer",
     "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
     "../../base",
@@ -88,6 +89,8 @@
   public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
     ":proc_utils",
+    ":ring_buffer",
+    ":scoped_spinlock",
     ":wire_protocol",
     "../../../buildtools:libunwindstack",
     "../../../gn:default_deps",
@@ -106,13 +109,9 @@
     "heapprofd_producer.cc",
     "heapprofd_producer.h",
     "interner.h",
-    "process_matcher.cc",
-    "process_matcher.h",
     "queue_messages.h",
     "record_reader.cc",
     "record_reader.h",
-    "socket_listener.cc",
-    "socket_listener.h",
     "system_property.cc",
     "system_property.h",
     "unwinding.cc",
@@ -124,6 +123,7 @@
   public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
     ":proc_utils",
+    ":ring_buffer",
     ":scoped_spinlock",
     ":wire_protocol",
     "../../../buildtools:libunwindstack",
@@ -153,15 +153,11 @@
   ]
   sources = [
     "bookkeeping_unittest.cc",
-    "bounded_queue_unittest.cc",
     "client_unittest.cc",
-    "heapprofd_integrationtest.cc",
     "interner_unittest.cc",
     "proc_utils_unittest.cc",
-    "process_matcher_unittest.cc",
     "record_reader_unittest.cc",
     "sampler_unittest.cc",
-    "socket_listener_unittest.cc",
     "system_property_unittest.cc",
     "unwinding_unittest.cc",
     "wire_protocol_unittest.cc",
diff --git a/src/profiling/memory/bookkeeping.cc b/src/profiling/memory/bookkeeping.cc
index 642fc54..7dec95c 100644
--- a/src/profiling/memory/bookkeeping.cc
+++ b/src/profiling/memory/bookkeeping.cc
@@ -302,180 +302,5 @@
   }
 }
 
-void BookkeepingThread::HandleDumpRecord(BookkeepingRecord* rec) {
-  DumpRecord& dump_rec = rec->dump_record;
-  std::shared_ptr<TraceWriter> trace_writer = dump_rec.trace_writer.lock();
-  if (!trace_writer) {
-    PERFETTO_LOG("Not dumping heaps");
-    return;
-  }
-  PERFETTO_LOG("Dumping heaps");
-  DumpState dump_state(trace_writer.get(), &next_index);
-
-  for (const pid_t pid : dump_rec.pids) {
-    auto it = bookkeeping_data_.find(pid);
-    if (it == bookkeeping_data_.end())
-      continue;
-
-    PERFETTO_LOG("Dumping %d ", it->first);
-    it->second.heap_tracker.Dump(pid, &dump_state);
-  }
-
-  for (GlobalCallstackTrie::Node* node : dump_state.callstacks_to_dump) {
-    // There need to be two separate loops over built_callstack because
-    // protozero cannot interleave different messages.
-    auto built_callstack = callsites_.BuildCallstack(node);
-    for (const Interned<Frame>& frame : built_callstack)
-      dump_state.WriteFrame(frame);
-    ProfilePacket::Callstack* callstack =
-        dump_state.current_profile_packet->add_callstacks();
-    callstack->set_id(node->id());
-    for (const Interned<Frame>& frame : built_callstack)
-      callstack->add_frame_ids(frame.id());
-  }
-
-  // We cannot garbage collect until we have finished dumping, as the state
-  // in DumpState points into the GlobalCallstackTrie.
-  for (const pid_t pid : dump_rec.pids) {
-    auto it = bookkeeping_data_.find(pid);
-    if (it == bookkeeping_data_.end())
-      continue;
-
-    if (it->second.ref_count == 0) {
-      std::lock_guard<std::mutex> l(bookkeeping_mutex_);
-      it = bookkeeping_data_.erase(it);
-    }
-  }
-  dump_state.current_trace_packet->Finalize();
-  trace_writer->Flush(dump_rec.callback);
-}
-
-void BookkeepingThread::HandleFreeRecord(BookkeepingData* bookkeeping_data,
-                                         BookkeepingRecord* rec) {
-  FreeRecord& free_rec = rec->free_record;
-  FreeMetadata& free_metadata = *free_rec.metadata;
-
-  if (bookkeeping_data->client_generation < free_metadata.client_generation) {
-    bookkeeping_data->heap_tracker = HeapTracker(&callsites_);
-    bookkeeping_data->client_generation = free_metadata.client_generation;
-  } else if (bookkeeping_data->client_generation >
-             free_metadata.client_generation) {
-    return;
-  }
-
-  FreePageEntry* entries = free_metadata.entries;
-  uint64_t num_entries = free_metadata.num_entries;
-  if (num_entries > kFreePageSize)
-    return;
-  for (size_t i = 0; i < num_entries; ++i) {
-    const FreePageEntry& entry = entries[i];
-    bookkeeping_data->heap_tracker.RecordFree(entry.addr,
-                                              entry.sequence_number);
-  }
-}
-
-void BookkeepingThread::HandleMallocRecord(BookkeepingData* bookkeeping_data,
-                                           BookkeepingRecord* rec) {
-  AllocRecord& alloc_rec = rec->alloc_record;
-  AllocMetadata& alloc_metadata = alloc_rec.alloc_metadata;
-
-  if (bookkeeping_data->client_generation < alloc_metadata.client_generation) {
-    bookkeeping_data->heap_tracker = HeapTracker(&callsites_);
-    bookkeeping_data->client_generation = alloc_metadata.client_generation;
-  } else if (bookkeeping_data->client_generation >
-             alloc_metadata.client_generation) {
-    return;
-  }
-
-  bookkeeping_data->heap_tracker.RecordMalloc(
-      alloc_rec.frames, alloc_rec.alloc_metadata.alloc_address,
-      alloc_rec.alloc_metadata.total_size,
-      alloc_rec.alloc_metadata.sequence_number);
-}
-
-void BookkeepingThread::HandleBookkeepingRecord(BookkeepingRecord* rec) {
-  BookkeepingData* bookkeeping_data = nullptr;
-  if (rec->pid != 0) {
-    std::lock_guard<std::mutex> l(bookkeeping_mutex_);
-    auto it = bookkeeping_data_.find(rec->pid);
-    if (it == bookkeeping_data_.end()) {
-      PERFETTO_DFATAL("Invalid pid: %d", rec->pid);
-      return;
-    }
-    bookkeeping_data = &it->second;
-  }
-
-  if (rec->record_type == BookkeepingRecord::Type::Dump)
-    HandleDumpRecord(rec);
-  else if (rec->record_type == BookkeepingRecord::Type::Free)
-    HandleFreeRecord(bookkeeping_data, rec);
-  else if (rec->record_type == BookkeepingRecord::Type::Malloc)
-    HandleMallocRecord(bookkeeping_data, rec);
-  else
-    PERFETTO_DFATAL("Invalid record type");
-}
-
-BookkeepingThread::ProcessHandle BookkeepingThread::NotifyProcessConnected(
-    pid_t pid) {
-  std::lock_guard<std::mutex> l(bookkeeping_mutex_);
-  // emplace gives the existing BookkeepingData for pid if it already exists
-  // or creates a new one.
-  auto it_and_inserted = bookkeeping_data_.emplace(pid, &callsites_);
-  BookkeepingData& bk = it_and_inserted.first->second;
-  bk.ref_count++;
-  return {this, pid};
-}
-
-void BookkeepingThread::NotifyProcessDisconnected(pid_t pid) {
-  std::lock_guard<std::mutex> l(bookkeeping_mutex_);
-  auto it = bookkeeping_data_.find(pid);
-  if (it == bookkeeping_data_.end()) {
-    PERFETTO_DFATAL("Client for %d not found", pid);
-    return;
-  }
-  it->second.ref_count--;
-}
-
-void BookkeepingThread::Run(BoundedQueue<BookkeepingRecord>* input_queue) {
-  for (;;) {
-    BookkeepingRecord rec;
-    if (!input_queue->Get(&rec))
-      return;
-    HandleBookkeepingRecord(&rec);
-  }
-}
-
-BookkeepingThread::ProcessHandle::ProcessHandle(
-    BookkeepingThread* bookkeeping_thread,
-    pid_t pid)
-    : bookkeeping_thread_(bookkeeping_thread), pid_(pid) {}
-
-BookkeepingThread::ProcessHandle::~ProcessHandle() {
-  if (bookkeeping_thread_)
-    bookkeeping_thread_->NotifyProcessDisconnected(pid_);
-}
-
-BookkeepingThread::ProcessHandle::ProcessHandle(ProcessHandle&& other) noexcept
-    : bookkeeping_thread_(other.bookkeeping_thread_), pid_(other.pid_) {
-  other.bookkeeping_thread_ = nullptr;
-}
-
-BookkeepingThread::ProcessHandle& BookkeepingThread::ProcessHandle::operator=(
-    ProcessHandle&& other) noexcept {
-  // Construct this temporary because the RHS could be an lvalue cast to an
-  // rvalue whose lifetime we do not know.
-  ProcessHandle tmp(std::move(other));
-  using std::swap;
-  swap(*this, tmp);
-  return *this;
-}
-
-void swap(BookkeepingThread::ProcessHandle& a,
-          BookkeepingThread::ProcessHandle& b) {
-  using std::swap;
-  swap(a.bookkeeping_thread_, b.bookkeeping_thread_);
-  swap(a.pid_, b.pid_);
-}
-
 }  // namespace profiling
 }  // namespace perfetto
diff --git a/src/profiling/memory/bookkeeping.h b/src/profiling/memory/bookkeeping.h
index d6e9eb8..9fd3b14 100644
--- a/src/profiling/memory/bookkeeping.h
+++ b/src/profiling/memory/bookkeeping.h
@@ -25,7 +25,6 @@
 #include "perfetto/base/string_splitter.h"
 #include "perfetto/trace/profiling/profile_packet.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
-#include "src/profiling/memory/bounded_queue.h"
 #include "src/profiling/memory/interner.h"
 #include "src/profiling/memory/queue_messages.h"
 
@@ -360,75 +359,6 @@
   GlobalCallstackTrie* callsites_;
 };
 
-struct BookkeepingData {
-  // Ownership of callsites remains with caller and has to outlive this
-  // object.
-  explicit BookkeepingData(GlobalCallstackTrie* callsites)
-      : heap_tracker(callsites) {}
-
-  // This is different to a shared_ptr to HeapTracker, because we want to keep
-  // it around until the first dump after the last socket for the PID has
-  // disconnected
-  uint64_t ref_count = 0;
-  uint64_t client_generation = 0;
-  HeapTracker heap_tracker;
-};
-
-// BookkeepingThread owns the BookkeepingData for all processes. The Run()
-// method receives messages on the input_queue and does the bookkeeping.
-class BookkeepingThread {
- public:
-  friend class ProcessHandle;
-  class ProcessHandle {
-   public:
-    friend class BookkeepingThread;
-    friend void swap(ProcessHandle&, ProcessHandle&);
-
-    ProcessHandle() = default;
-
-    ~ProcessHandle();
-    ProcessHandle(const ProcessHandle&) = delete;
-    ProcessHandle& operator=(const ProcessHandle&) = delete;
-    ProcessHandle(ProcessHandle&&) noexcept;
-    ProcessHandle& operator=(ProcessHandle&&) noexcept;
-
-   private:
-    ProcessHandle(BookkeepingThread* matcher, pid_t pid);
-
-    BookkeepingThread* bookkeeping_thread_ = nullptr;
-    pid_t pid_;
-  };
-  void Run(BoundedQueue<BookkeepingRecord>* input_queue);
-
-  // Inform the bookkeeping thread that a socket for this pid connected.
-  //
-  // This can be called from arbitrary threads.
-  ProcessHandle NotifyProcessConnected(pid_t pid);
-  void HandleBookkeepingRecord(BookkeepingRecord* rec);
-
- private:
-  void HandleDumpRecord(BookkeepingRecord* rec);
-  void HandleFreeRecord(BookkeepingData* bookkeeping_data,
-                        BookkeepingRecord* rec);
-  void HandleMallocRecord(BookkeepingData* bookkeeping_data,
-                          BookkeepingRecord* rec);
-
-  // Inform the bookkeeping thread that a socket for this pid disconnected.
-  // After the last client for a PID disconnects, the BookkeepingData is
-  // retained until the next dump, upon which it gets garbage collected.
-  //
-  // This can be called from arbitrary threads.
-  void NotifyProcessDisconnected(pid_t pid);
-
-  GlobalCallstackTrie callsites_;
-
-  std::map<pid_t, BookkeepingData> bookkeeping_data_;
-  std::mutex bookkeeping_mutex_;
-  uint64_t next_index = 0;
-};
-
-void swap(BookkeepingThread::ProcessHandle&, BookkeepingThread::ProcessHandle&);
-
 }  // namespace profiling
 }  // namespace perfetto
 
diff --git a/src/profiling/memory/bounded_queue.h b/src/profiling/memory/bounded_queue.h
deleted file mode 100644
index ca2f182..0000000
--- a/src/profiling/memory/bounded_queue.h
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
-#define SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
-
-#include <condition_variable>
-#include <deque>
-#include <mutex>
-#include <string>
-
-#include "perfetto/base/logging.h"
-
-namespace perfetto {
-namespace profiling {
-
-// Transport messages between threads. Multiple-producer / single-consumer.
-//
-// This has to outlive both the consumer and the producer. The Shutdown method
-// can be used to unblock both producers and consumers blocked on the queue.
-// The general shutdown logic is:
-// q.Shutdown()
-// Join all producer and consumer threads
-// destruct q
-template <typename T>
-class BoundedQueue {
- public:
-  BoundedQueue() : BoundedQueue("unknown") {}
-  BoundedQueue(std::string name) : BoundedQueue(std::move(name), 1) {}
-  BoundedQueue(std::string name, size_t capacity)
-      : name_(std::move(name)), capacity_(capacity) {
-    PERFETTO_CHECK(capacity > 0);
-  }
-
-  void SetName(std::string name) { name_ = std::move(name); }
-
-  void Shutdown() {
-    {
-      std::lock_guard<std::mutex> l(mutex_);
-      shutdown_ = true;
-    }
-    full_cv_.notify_all();
-    empty_cv_.notify_all();
-  }
-
-  ~BoundedQueue() { PERFETTO_DCHECK(shutdown_); }
-
-  bool Add(T item) {
-    std::unique_lock<std::mutex> l(mutex_);
-    if (deque_.size() == capacity_) {
-      if (!logged.load(std::memory_order_relaxed)) {
-        PERFETTO_ELOG("heapprofd queue %s at capacity (%zu). Blocking!",
-                      name_.c_str(), capacity_);
-        logged.store(true, std::memory_order_relaxed);
-      }
-      full_cv_.wait(l,
-                    [this] { return deque_.size() < capacity_ || shutdown_; });
-    }
-
-    if (shutdown_)
-      return false;
-
-    deque_.emplace_back(std::move(item));
-    if (deque_.size() == 1)
-      empty_cv_.notify_all();
-    return true;
-  }
-
-  bool Get(T* out) {
-    std::unique_lock<std::mutex> l(mutex_);
-    if (deque_.empty())
-      empty_cv_.wait(l, [this] { return !deque_.empty() || shutdown_; });
-
-    if (shutdown_)
-      return false;
-
-    *out = std::move(deque_.front());
-    deque_.pop_front();
-    if (deque_.size() == capacity_ - 1) {
-      l.unlock();
-      full_cv_.notify_all();
-    }
-    return true;
-  }
-
-  void SetCapacity(size_t capacity) {
-    PERFETTO_CHECK(capacity > 0);
-    {
-      std::lock_guard<std::mutex> l(mutex_);
-      capacity_ = capacity;
-    }
-    full_cv_.notify_all();
-  }
-
- private:
-  std::string name_;
-  size_t capacity_;
-  std::atomic<bool> logged{false};
-  bool shutdown_ = false;
-  size_t elements_ = 0;
-  std::deque<T> deque_;
-  std::condition_variable full_cv_;
-  std::condition_variable empty_cv_;
-  std::mutex mutex_;
-};
-
-}  // namespace profiling
-}  // namespace perfetto
-
-#endif  // SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
diff --git a/src/profiling/memory/bounded_queue_unittest.cc b/src/profiling/memory/bounded_queue_unittest.cc
deleted file mode 100644
index 02265f8..0000000
--- a/src/profiling/memory/bounded_queue_unittest.cc
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/profiling/memory/bounded_queue.h"
-
-#include "gtest/gtest.h"
-
-#include <thread>
-
-namespace perfetto {
-namespace profiling {
-namespace {
-
-TEST(BoundedQueueTest, IsFIFO) {
-  BoundedQueue<int> q("test", 2);
-  q.Add(1);
-  q.Add(2);
-  int out;
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 1);
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 2);
-  q.Shutdown();
-}
-
-TEST(BoundedQueueTest, BlockingAdd) {
-  BoundedQueue<int> q("test", 2);
-  q.Add(1);
-  q.Add(2);
-  std::thread th([&q] { q.Add(3); });
-  int out;
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 1);
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 2);
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 3);
-  th.join();
-  q.Shutdown();
-}
-
-TEST(BoundedQueueTest, BlockingGet) {
-  BoundedQueue<int> q("test", 2);
-  std::thread th([&q] {
-    int out;
-    EXPECT_TRUE(q.Get(&out));
-    EXPECT_EQ(out, 1);
-  });
-  q.Add(1);
-  th.join();
-  q.Shutdown();
-}
-
-TEST(BoundedQueueTest, Resize) {
-  BoundedQueue<int> q("test", 2);
-  q.Add(1);
-  q.Add(2);
-  q.SetCapacity(3);
-  q.Add(3);
-  int out;
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 1);
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 2);
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 3);
-  q.Shutdown();
-}
-
-TEST(BoundedQueueTest, Shutdown) {
-  BoundedQueue<int> q("test", 3);
-  q.Add(1);
-  q.Add(2);
-  q.Add(3);
-  int out;
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 1);
-  EXPECT_TRUE(q.Get(&out));
-  EXPECT_EQ(out, 2);
-  q.Shutdown();
-  EXPECT_FALSE(q.Get(&out));
-}
-
-TEST(BoundedQueueTest, ShutdownBlockingAdd) {
-  BoundedQueue<int> q("test", 2);
-  q.Add(1);
-  q.Add(2);
-  std::thread th([&q] { EXPECT_FALSE(q.Add(3)); });
-  q.Shutdown();
-  th.join();
-}
-
-TEST(BoundedQueueTest, ShutdownBlockingGet) {
-  BoundedQueue<int> q("test", 1);
-  std::thread th([&q] {
-    int out;
-    EXPECT_FALSE(q.Get(&out));
-  });
-
-  q.Shutdown();
-  th.join();
-}
-
-}  // namespace
-}  // namespace profiling
-}  // namespace perfetto
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index e194f59..dc80eb7 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -46,29 +46,24 @@
 namespace profiling {
 namespace {
 
+const char kSingleByte[1] = {'x'};
 constexpr std::chrono::seconds kLockTimeout{1};
 
-std::vector<base::UnixSocketRaw> ConnectPool(const std::string& sock_name,
-                                             size_t n) {
-  std::vector<base::UnixSocketRaw> res;
-  res.reserve(n);
-  for (size_t i = 0; i < n; ++i) {
-    auto sock = base::UnixSocketRaw::CreateMayFail(base::SockType::kStream);
-    if (!sock || !sock.Connect(sock_name)) {
-      PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
-      continue;
-    }
-    if (!sock.SetTxTimeout(kClientSockTimeoutMs)) {
-      PERFETTO_PLOG("Failed to set send timeout for %s", sock_name.c_str());
-      continue;
-    }
-    if (!sock.SetRxTimeout(kClientSockTimeoutMs)) {
-      PERFETTO_PLOG("Failed to set receive timeout for %s", sock_name.c_str());
-      continue;
-    }
-    res.emplace_back(std::move(sock));
+base::Optional<base::UnixSocketRaw> Connect(const std::string& sock_name) {
+  auto sock = base::UnixSocketRaw::CreateMayFail(base::SockType::kStream);
+  if (!sock || !sock.Connect(sock_name)) {
+    PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
+    return base::nullopt;
   }
-  return res;
+  if (!sock.SetTxTimeout(kClientSockTimeoutMs)) {
+    PERFETTO_PLOG("Failed to set send timeout for %s", sock_name.c_str());
+    return base::nullopt;
+  }
+  if (!sock.SetRxTimeout(kClientSockTimeoutMs)) {
+    PERFETTO_PLOG("Failed to set receive timeout for %s", sock_name.c_str());
+    return base::nullopt;
+  }
+  return std::move(sock);
 }
 
 inline bool IsMainThread() {
@@ -105,89 +100,38 @@
 
 bool FreePage::Add(const uint64_t addr,
                    const uint64_t sequence_number,
-                   SocketPool* pool) {
+                   Client* client) {
   std::unique_lock<std::timed_mutex> l(mutex_, kLockTimeout);
   if (!l.owns_lock())
     return false;
-  if (offset_ == kFreePageSize) {
-    if (!FlushLocked(pool))
+  if (free_page_.num_entries == kFreePageSize) {
+    if (!client->FlushFrees(&free_page_))
       return false;
     // Now that we have flushed, reset to after the header.
-    offset_ = 0;
+    free_page_.num_entries = 0;
   }
-  FreePageEntry& current_entry = free_page_.entries[offset_++];
+  FreePageEntry& current_entry = free_page_.entries[free_page_.num_entries++];
   current_entry.sequence_number = sequence_number;
   current_entry.addr = addr;
   return true;
 }
 
-bool FreePage::FlushLocked(SocketPool* pool) {
+bool Client::FlushFrees(FreeMetadata* free_metadata) {
   WireMessage msg = {};
   msg.record_type = RecordType::Free;
-  free_page_.num_entries = offset_;
-  msg.free_header = &free_page_;
-  BorrowedSocket sock(pool->Borrow());
-  if (!sock || !SendWireMessage(sock.get(), msg)) {
+  msg.free_header = free_metadata;
+  if (!SendWireMessage(&shmem_, msg)) {
     PERFETTO_PLOG("Failed to send wire message");
-    sock.Shutdown();
+    Shutdown();
+    return false;
+  }
+  if (!sock_.Send(kSingleByte, sizeof(kSingleByte))) {
+    Shutdown();
     return false;
   }
   return true;
 }
 
-SocketPool::SocketPool(std::vector<base::UnixSocketRaw> sockets)
-    : sockets_(std::move(sockets)), available_sockets_(sockets_.size()) {}
-
-BorrowedSocket SocketPool::Borrow() {
-  std::unique_lock<std::timed_mutex> l(mutex_, kLockTimeout);
-  if (!l.owns_lock())
-    return {base::UnixSocketRaw(), nullptr};
-  cv_.wait(l, [this] {
-    return available_sockets_ > 0 || dead_sockets_ == sockets_.size() ||
-           shutdown_;
-  });
-
-  if (dead_sockets_ == sockets_.size() || shutdown_) {
-    return {base::UnixSocketRaw(), nullptr};
-  }
-
-  PERFETTO_CHECK(available_sockets_ > 0);
-  return {std::move(sockets_[--available_sockets_]), this};
-}
-
-void SocketPool::Return(base::UnixSocketRaw sock) {
-  std::unique_lock<std::timed_mutex> l(mutex_, kLockTimeout);
-  if (!l.owns_lock())
-    return;
-  PERFETTO_CHECK(dead_sockets_ + available_sockets_ < sockets_.size());
-  if (sock && !shutdown_) {
-    PERFETTO_CHECK(available_sockets_ < sockets_.size());
-    sockets_[available_sockets_++] = std::move(sock);
-    l.unlock();
-    cv_.notify_one();
-  } else {
-    dead_sockets_++;
-    if (dead_sockets_ == sockets_.size()) {
-      l.unlock();
-      cv_.notify_all();
-    }
-  }
-}
-
-void SocketPool::Shutdown() {
-  {
-    std::unique_lock<std::timed_mutex> l(mutex_, kLockTimeout);
-    if (!l.owns_lock())
-      return;
-    for (size_t i = 0; i < available_sockets_; ++i)
-      sockets_[i].Shutdown();
-    dead_sockets_ += available_sockets_;
-    available_sockets_ = 0;
-    shutdown_ = true;
-  }
-  cv_.notify_all();
-}
-
 const char* GetThreadStackBase() {
   pthread_attr_t attr;
   if (pthread_getattr_np(pthread_self(), &attr) != 0)
@@ -203,14 +147,15 @@
   return stackaddr + stacksize;
 }
 
-std::atomic<uint64_t> Client::max_generation_{0};
-
-Client::Client(std::vector<base::UnixSocketRaw> socks)
-    : generation_(++max_generation_),
-      sampler_(8192),  // placeholder until we receive the config (within ctor)
-      socket_pool_(std::move(socks)),
-      free_page_(generation_),
+Client::Client(base::Optional<base::UnixSocketRaw> sock)
+    : sampler_(8192),  // placeholder until we receive the config (within ctor)
       main_thread_stack_base_(FindMainThreadStack()) {
+  if (!sock || !sock.value()) {
+    PERFETTO_DFATAL("Socket not connected.");
+    return;
+  }
+  sock_ = std::move(sock.value());
+
   // We might be running in a process that is not dumpable (such as app
   // processes on user builds), in which case the /proc/self/mem will be chown'd
   // to root:root, and will not be accessible even to the process itself (see
@@ -238,24 +183,32 @@
   // Restore original dumpability value if we overrode it.
   unset_dumpable.reset();
 
-  int fds[2];
-  fds[0] = *maps;
-  fds[1] = *mem;
-  auto sock = socket_pool_.Borrow();
-  if (!sock)
-    return;
+  int fds[kHandshakeSize];
+  fds[kHandshakeMaps] = *maps;
+  fds[kHandshakeMem] = *mem;
+
   // Send an empty record to transfer fds for /proc/self/maps and
   // /proc/self/mem.
-  uint64_t size = 0;
-  if (sock->Send(&size, sizeof(size), fds, 2) != sizeof(size)) {
+  if (sock_.Send(kSingleByte, sizeof(kSingleByte), fds, kHandshakeSize) !=
+      sizeof(kSingleByte)) {
     PERFETTO_DFATAL("Failed to send file descriptors.");
     return;
   }
-  if (sock->Receive(&client_config_, sizeof(client_config_)) !=
+
+  base::ScopedFile shmem_fd;
+  if (sock_.Receive(&client_config_, sizeof(client_config_), &shmem_fd, 1) !=
       sizeof(client_config_)) {
     PERFETTO_DFATAL("Failed to receive client config.");
     return;
   }
+
+  auto shmem = SharedRingBuffer::Attach(std::move(shmem_fd));
+  if (!shmem || !shmem->is_valid()) {
+    PERFETTO_DFATAL("Failed to attach to shmem.");
+    return;
+  }
+  shmem_ = std::move(shmem.value());
+
   PERFETTO_DCHECK(client_config_.interval >= 1);
   sampler_ = Sampler(client_config_.interval);
 
@@ -263,8 +216,7 @@
   inited_.store(true, std::memory_order_release);
 }
 
-Client::Client(const std::string& sock_name, size_t conns)
-    : Client(ConnectPool(sock_name, conns)) {}
+Client::Client(const std::string& sock_name) : Client(Connect(sock_name)) {}
 
 const char* Client::GetStackBase() {
   if (IsMainThread()) {
@@ -307,7 +259,6 @@
   }
 
   uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
-  metadata.client_generation = generation_;
   metadata.total_size = total_size;
   metadata.alloc_size = alloc_size;
   metadata.alloc_address = alloc_address;
@@ -323,10 +274,13 @@
   msg.payload = const_cast<char*>(stacktop);
   msg.payload_size = static_cast<size_t>(stack_size);
 
-  BorrowedSocket sock = socket_pool_.Borrow();
-  if (!sock || !SendWireMessage(sock.get(), msg)) {
+  if (!SendWireMessage(&shmem_, msg)) {
     PERFETTO_PLOG("Failed to send wire message.");
-    sock.Shutdown();
+    Shutdown();
+    return false;
+  }
+  if (!sock_.Send(kSingleByte, sizeof(kSingleByte))) {
+    PERFETTO_PLOG("Failed to send wire message.");
     Shutdown();
     return false;
   }
@@ -338,15 +292,13 @@
     return false;
   bool success = free_page_.Add(
       alloc_address,
-      1 + sequence_number_.fetch_add(1, std::memory_order_acq_rel),
-      &socket_pool_);
+      1 + sequence_number_.fetch_add(1, std::memory_order_acq_rel), this);
   if (!success)
     Shutdown();
   return success;
 }
 
 void Client::Shutdown() {
-  socket_pool_.Shutdown();
   inited_.store(false, std::memory_order_release);
 }
 
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index 5bcfff5..adf9e54 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -26,80 +26,29 @@
 
 #include "perfetto/base/unix_socket.h"
 #include "src/profiling/memory/sampler.h"
+#include "src/profiling/memory/shared_ring_buffer.h"
 #include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 namespace profiling {
 
-class BorrowedSocket;
-
-class SocketPool {
- public:
-  friend class BorrowedSocket;
-  SocketPool(std::vector<base::UnixSocketRaw> sockets);
-
-  BorrowedSocket Borrow();
-  void Shutdown();
-
- private:
-  bool shutdown_ = false;
-
-  void Return(base::UnixSocketRaw);
-  std::timed_mutex mutex_;
-  std::condition_variable_any cv_;
-  std::vector<base::UnixSocketRaw> sockets_;
-  size_t available_sockets_;
-  size_t dead_sockets_ = 0;
-};
-
-// Socket borrowed from a SocketPool. Gets returned once it goes out of scope.
-class BorrowedSocket {
- public:
-  BorrowedSocket(const BorrowedSocket&) = delete;
-  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
-  BorrowedSocket(BorrowedSocket&& other) noexcept
-      : sock_(std::move(other.sock_)), socket_pool_(other.socket_pool_) {
-    other.socket_pool_ = nullptr;
-  }
-
-  BorrowedSocket(base::UnixSocketRaw sock, SocketPool* socket_pool)
-      : sock_(std::move(sock)), socket_pool_(socket_pool) {}
-
-  ~BorrowedSocket() {
-    if (socket_pool_ != nullptr)
-      socket_pool_->Return(std::move(sock_));
-  }
-
-  base::UnixSocketRaw* operator->() { return &sock_; }
-  base::UnixSocketRaw* get() { return &sock_; }
-  void Shutdown() { sock_.Shutdown(); }
-  explicit operator bool() const { return !!sock_; }
-
- private:
-  base::UnixSocketRaw sock_;
-  SocketPool* socket_pool_ = nullptr;
-};
+class Client;
 
 // Cache for frees that have been observed. It is infeasible to send every
 // free separately, so we batch and send the whole buffer once it is full.
 class FreePage {
  public:
-  FreePage(uint64_t client_generation) {
-    free_page_.client_generation = client_generation;
-  }
+  FreePage() { free_page_.num_entries = 0; }
 
-  // Add address to buffer. Flush if necessary using a socket borrowed from
-  // pool.
+  // Add address to buffer. Flush if necessary.
   // Can be called from any thread. Must not hold mutex_.
-  bool Add(const uint64_t addr, uint64_t sequence_number, SocketPool* pool);
+  bool Add(const uint64_t addr, uint64_t sequence_number, Client* client);
 
  private:
-  // Needs to be called holding mutex_.
-  bool FlushLocked(SocketPool* pool);
-
+  // TODO(fmayer): Sort out naming. It's confusing data FreePage has a member
+  // called free_page_ that is of type FreeMetadata.
   FreeMetadata free_page_;
   std::timed_mutex mutex_;
-  size_t offset_ = 0;
 };
 
 const char* GetThreadStackBase();
@@ -115,13 +64,14 @@
 // the caller needs to synchronize calls behind a mutex or similar.
 class Client {
  public:
-  Client(std::vector<base::UnixSocketRaw> sockets);
-  Client(const std::string& sock_name, size_t conns);
+  Client(base::Optional<base::UnixSocketRaw> sock);
+  Client(const std::string& sock_name);
   bool RecordMalloc(uint64_t alloc_size,
                     uint64_t total_size,
                     uint64_t alloc_address);
   bool RecordFree(uint64_t alloc_address);
   void Shutdown();
+  bool FlushFrees(FreeMetadata* free_metadata);
 
   // Returns the number of bytes to assign to an allocation with the given
   // |alloc_size|, based on the current sampling rate. A return value of zero
@@ -141,9 +91,6 @@
  private:
   const char* GetStackBase();
 
-  static std::atomic<uint64_t> max_generation_;
-  const uint64_t generation_;
-
   // TODO(rsavitski): used to check if the client is completely initialized
   // after construction. The reads in RecordFree & GetSampleSizeLocked are no
   // longer necessary (was an optimization to not do redundant work after
@@ -153,10 +100,11 @@
   ClientConfiguration client_config_;
   // sampler_ operations are not thread-safe.
   Sampler sampler_;
-  SocketPool socket_pool_;
+  base::UnixSocketRaw sock_;
   FreePage free_page_;
   const char* main_thread_stack_base_ = nullptr;
   std::atomic<uint64_t> sequence_number_{0};
+  SharedRingBuffer shmem_;
 };
 
 }  // namespace profiling
diff --git a/src/profiling/memory/client_unittest.cc b/src/profiling/memory/client_unittest.cc
index 9d97c6d..e5ed201 100644
--- a/src/profiling/memory/client_unittest.cc
+++ b/src/profiling/memory/client_unittest.cc
@@ -26,103 +26,6 @@
 namespace profiling {
 namespace {
 
-base::UnixSocketRaw CreateSocket() {
-  auto sock = base::UnixSocketRaw::CreateMayFail(base::SockType::kStream);
-  PERFETTO_CHECK(sock);
-  return sock;
-}
-
-TEST(SocketPoolTest, Basic) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  BorrowedSocket sock = pool.Borrow();
-}
-
-TEST(SocketPoolTest, Close) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  BorrowedSocket sock = pool.Borrow();
-  sock.Shutdown();
-}
-
-TEST(SocketPoolTest, Multiple) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  BorrowedSocket sock = pool.Borrow();
-  BorrowedSocket sock_2 = pool.Borrow();
-}
-
-TEST(SocketPoolTest, Blocked) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  BorrowedSocket sock = pool.Borrow();  // Takes the socket above.
-  std::thread t([&pool] { pool.Borrow(); });
-  {
-    // Return fd to unblock thread.
-    BorrowedSocket temp = std::move(sock);
-  }
-  t.join();
-}
-
-TEST(SocketPoolTest, BlockedClose) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  BorrowedSocket sock = pool.Borrow();
-  std::thread t([&pool] { pool.Borrow(); });
-  {
-    // Return fd to unblock thread.
-    BorrowedSocket temp = std::move(sock);
-    temp.Shutdown();
-  }
-  t.join();
-}
-
-TEST(SocketPoolTest, MultipleBlocked) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  BorrowedSocket sock = pool.Borrow();
-  std::thread t([&pool] { pool.Borrow(); });
-  std::thread t2([&pool] { pool.Borrow(); });
-  {
-    // Return fd to unblock thread.
-    BorrowedSocket temp = std::move(sock);
-  }
-  t.join();
-  t2.join();
-}
-
-TEST(SocketPoolTest, MultipleBlockedClose) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  BorrowedSocket sock = pool.Borrow();
-  std::thread t([&pool] { pool.Borrow(); });
-  std::thread t2([&pool] { pool.Borrow(); });
-  {
-    // Return fd to unblock thread.
-    BorrowedSocket temp = std::move(sock);
-    temp.Shutdown();
-  }
-  t.join();
-  t2.join();
-}
-
-TEST(FreePageTest, ShutdownSocketPool) {
-  std::vector<base::UnixSocketRaw> socks;
-  socks.emplace_back(CreateSocket());
-  SocketPool pool(std::move(socks));
-  pool.Shutdown();
-  FreePage p{0};
-  p.Add(0, 1, &pool);
-}
-
 TEST(ClientTest, GetThreadStackBase) {
   std::thread th([] {
     const char* stackbase = GetThreadStackBase();
diff --git a/src/profiling/memory/heapprofd_end_to_end_test.cc b/src/profiling/memory/heapprofd_end_to_end_test.cc
index a4fe2f9..014ea82 100644
--- a/src/profiling/memory/heapprofd_end_to_end_test.cc
+++ b/src/profiling/memory/heapprofd_end_to_end_test.cc
@@ -193,7 +193,7 @@
     auto helper = GetHelper(&task_runner);
 
     helper->StartTracing(trace_config);
-    helper->WaitForTracingDisabled(10000);
+    helper->WaitForTracingDisabled(20000);
 
     helper->ReadData();
     helper->WaitForReadData();
@@ -410,7 +410,7 @@
 
     TraceConfig trace_config;
     trace_config.add_buffers()->set_size_kb(10 * 1024);
-    trace_config.set_duration_ms(2000);
+    trace_config.set_duration_ms(5000);
     trace_config.set_flush_timeout_ms(10000);
 
     auto* ds_config = trace_config.add_data_sources()->mutable_config();
@@ -432,7 +432,7 @@
     // TODO(rsavitski): this sleep is to compensate for the heapprofd delaying
     // in closing the sockets (and therefore the client noticing that the
     // session is over). Clarify where the delays are coming from.
-    usleep(100 * kMsToUs);
+    usleep(5000 * kMsToUs);
 
     PERFETTO_LOG("HeapprofdEndToEnd::Reinit: Starting second");
     TraceAndValidate(trace_config, pid, kSecondIterationBytes);
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
deleted file mode 100644
index 87c065a..0000000
--- a/src/profiling/memory/heapprofd_integrationtest.cc
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/base/test/test_task_runner.h"
-#include "src/ipc/test/test_socket.h"
-#include "src/profiling/memory/client.h"
-#include "src/profiling/memory/heapprofd_producer.h"
-#include "src/profiling/memory/socket_listener.h"
-#include "src/profiling/memory/unwinding.h"
-
-#include "gmock/gmock.h"
-#include "gtest/gtest.h"
-
-namespace perfetto {
-namespace profiling {
-namespace {
-
-constexpr char kSocketName[] = TEST_SOCK_NAME("heapprofd_integrationtest");
-
-void __attribute__((noinline)) OtherFunction(Client* client) {
-  client->RecordMalloc(10, 10, 0xf00);
-}
-
-void __attribute__((noinline)) SomeFunction(Client* client) {
-  OtherFunction(client);
-}
-
-class HeapprofdIntegrationTest : public ::testing::Test {
- protected:
-  void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
-  void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
-};
-
-// ASAN does not like sendmsg of the stack.
-// TODO(fmayer): Try to fix this more properly.
-#if !defined(ADDRESS_SANITIZER) && !defined(MEMORY_SANITIZER)
-#define MAYBE_EndToEnd EndToEnd
-#define MAYBE_MultiSession MultiSession
-#else
-#define MAYBE_EndToEnd DISABLED_EndToEnd
-#define MAYBE_MultiSession DISABLED_MultiSession
-#endif
-
-TEST_F(HeapprofdIntegrationTest, MAYBE_EndToEnd) {
-  GlobalCallstackTrie callsites;
-  // TODO(fmayer): Actually test the dump.
-  BookkeepingThread bookkeeping_thread;
-
-  base::TestTaskRunner task_runner;
-  auto done = task_runner.CreateCheckpoint("done");
-  constexpr uint64_t kSamplingInterval = 123;
-  SocketListener listener(
-      [&done, &bookkeeping_thread](UnwindingRecord r) {
-        // TODO(fmayer): Test symbolization and result of unwinding.
-        // This check will only work on in-tree builds as out-of-tree
-        // libunwindstack is behaving a bit weirdly.
-// TODO(fmayer): Fix out of tree integration test.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-        BookkeepingRecord bookkeeping_record;
-        ASSERT_TRUE(HandleUnwindingRecord(&r, &bookkeeping_record));
-        bookkeeping_thread.HandleBookkeepingRecord(&bookkeeping_record);
-#endif
-        base::ignore_result(r);
-        base::ignore_result(bookkeeping_thread);
-        done();
-      },
-      &bookkeeping_thread);
-
-  ProcessSetSpec spec{};
-  spec.pids.emplace(getpid());
-  spec.client_configuration.interval = kSamplingInterval;
-  auto session = listener.process_matcher().AwaitProcessSetSpec(spec);
-  auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
-  if (!sock->is_listening())
-    PERFETTO_FATAL("Socket not listening");
-  std::thread th([kSamplingInterval] {
-    Client client(kSocketName, 1);
-    SomeFunction(&client);
-    EXPECT_EQ(client.client_config_for_testing().interval, kSamplingInterval);
-  });
-
-  task_runner.RunUntilCheckpoint("done");
-  th.join();
-}
-
-TEST_F(HeapprofdIntegrationTest, MAYBE_MultiSession) {
-  GlobalCallstackTrie callsites;
-  // TODO(fmayer): Actually test the dump.
-  BookkeepingThread bookkeeping_thread;
-
-  base::TestTaskRunner task_runner;
-  auto done = task_runner.CreateCheckpoint("done");
-  constexpr uint64_t kSamplingInterval = 123;
-  SocketListener listener([&done](UnwindingRecord) { done(); },
-                          &bookkeeping_thread);
-
-  ProcessSetSpec spec{};
-  spec.pids.emplace(getpid());
-  spec.client_configuration.interval = kSamplingInterval;
-  auto session = listener.process_matcher().AwaitProcessSetSpec(spec);
-  // Allow to get a second session, but it will still use the previous
-  // sampling rate.
-  spec.client_configuration.interval = kSamplingInterval + 1;
-  auto session2 = listener.process_matcher().AwaitProcessSetSpec(spec);
-  auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
-  if (!sock->is_listening())
-    PERFETTO_FATAL("Socket not listening");
-  std::thread th([kSamplingInterval] {
-    Client client(kSocketName, 1);
-    SomeFunction(&client);
-    EXPECT_EQ(client.client_config_for_testing().interval, kSamplingInterval);
-  });
-
-  task_runner.RunUntilCheckpoint("done");
-  th.join();
-}
-
-}  // namespace
-}  // namespace profiling
-}  // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
index 8118445..2a46ff6 100644
--- a/src/profiling/memory/heapprofd_producer.cc
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -31,15 +31,18 @@
 namespace perfetto {
 namespace profiling {
 namespace {
+using ::perfetto::protos::pbzero::ProfilePacket;
+
 constexpr char kHeapprofdDataSource[] = "android.heapprofd";
-constexpr size_t kUnwinderQueueSize = 1000;
-constexpr size_t kBookkeepingQueueSize = 1000;
 constexpr size_t kUnwinderThreads = 5;
 constexpr int kHeapprofdSignal = 36;
 
 constexpr uint32_t kInitialConnectionBackoffMs = 100;
 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
 
+// TODO(fmayer): Add to HeapprofdConfig.
+constexpr uint64_t kShmemSize = 8 * 1048576;  // ~8 MB
+
 ClientConfiguration MakeClientConfiguration(const DataSourceConfig& cfg) {
   ClientConfiguration client_config;
   client_config.interval = cfg.heapprofd_config().sampling_interval_bytes();
@@ -48,41 +51,18 @@
 
 }  // namespace
 
-// We create kUnwinderThreads unwinding threads and one bookeeping thread.
-// The bookkeeping thread is singleton in order to avoid expensive and
-// complicated synchronisation in the bookkeeping.
-//
-// We wire up the system by creating BoundedQueues between the threads. The main
-// thread runs the TaskRunner driving the SocketListener. The unwinding thread
-// takes the data received by the SocketListener and if it is a malloc does
-// stack unwinding, and if it is a free just forwards the content of the record
-// to the bookkeeping thread.
-//
-//             +--------------+
-//             |SocketListener|
-//             +------+-------+
-//                    |
-//          +--UnwindingRecord -+
-//          |                   |
-// +--------v-------+   +-------v--------+
-// |Unwinding Thread|   |Unwinding Thread|
-// +--------+-------+   +-------+--------+
-//          |                   |
-//          +-BookkeepingRecord +
-//                    |
-//           +--------v---------+
-//           |Bookkeeping Thread|
-//           +------------------+
+// We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
+// thread.
+// TODO(fmayer): Summarize threading document here.
 HeapprofdProducer::HeapprofdProducer(HeapprofdMode mode,
                                      base::TaskRunner* task_runner)
     : mode_(mode),
       task_runner_(task_runner),
-      bookkeeping_queue_("Bookkeeping", kBookkeepingQueueSize),
-      bookkeeping_th_([this] { bookkeeping_thread_.Run(&bookkeeping_queue_); }),
-      unwinder_queues_(MakeUnwinderQueues(kUnwinderThreads)),
+      unwinding_task_runners_(kUnwinderThreads),
       unwinding_threads_(MakeUnwindingThreads(kUnwinderThreads)),
-      socket_listener_(MakeSocketListenerCallback(), &bookkeeping_thread_),
+      unwinding_workers_(MakeUnwindingWorkers(kUnwinderThreads)),
       target_pid_(base::kInvalidPid),
+      socket_delegate_(this),
       weak_factory_(this) {
   if (mode == HeapprofdMode::kCentral) {
     listening_socket_ = MakeListeningSocket();
@@ -90,20 +70,18 @@
 }
 
 HeapprofdProducer::~HeapprofdProducer() {
-  bookkeeping_queue_.Shutdown();
-  for (auto& queue : unwinder_queues_) {
-    queue.Shutdown();
-  }
-  bookkeeping_th_.join();
-  for (std::thread& th : unwinding_threads_) {
+  for (auto& task_runner : unwinding_task_runners_)
+    task_runner.Quit();
+  for (std::thread& th : unwinding_threads_)
     th.join();
-  }
 }
 
 void HeapprofdProducer::SetTargetProcess(pid_t target_pid,
-                                         std::string target_cmdline) {
+                                         std::string target_cmdline,
+                                         base::ScopedFile inherited_socket) {
   target_pid_ = target_pid;
   target_cmdline_ = target_cmdline;
+  inherited_fd_ = std::move(inherited_socket);
 }
 
 bool HeapprofdProducer::SourceMatchesTarget(const HeapprofdConfig& cfg) {
@@ -123,32 +101,14 @@
   return false;
 }
 
-void HeapprofdProducer::AdoptConnectedSockets(
-    std::vector<base::ScopedFile> inherited_sockets) {
+void HeapprofdProducer::AdoptTargetProcessSocket() {
   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
+  Process process{target_pid_, target_cmdline_};
+  auto socket = base::UnixSocket::AdoptConnected(
+      std::move(inherited_fd_), &socket_delegate_, task_runner_,
+      base::SockType::kStream);
 
-  auto weak_producer = weak_factory_.GetWeakPtr();
-  for (auto& scoped_fd : inherited_sockets) {
-    // Manually enqueue the on-connection callback. Pass the raw fd into the
-    // closure as we cannot easily move-capture in c++11.
-    int fd = scoped_fd.release();
-    task_runner_->PostTask([weak_producer, fd] {
-      if (!weak_producer)
-        return;
-
-      auto socket = base::UnixSocket::AdoptConnected(
-          base::ScopedFile(fd), &weak_producer->socket_listener_,
-          weak_producer->task_runner_, base::SockType::kStream);
-
-      // The forked heapprofd will not normally be able to read the target's
-      // cmdline under procfs, so pass peer's description explicitly.
-      Process process{weak_producer->target_pid_,
-                      weak_producer->target_cmdline_};
-
-      weak_producer->socket_listener_.HandleClientConnection(
-          std::move(socket), std::move(process));
-    });
-  }
+  HandleClientConnection(std::move(socket), std::move(process));
 }
 
 // TODO(fmayer): Delete once we have generic reconnect logic.
@@ -223,25 +183,17 @@
   }
 
   DataSource data_source;
-
-  ProcessSetSpec process_set_spec{};
-  process_set_spec.all = heapprofd_config.all();
-  process_set_spec.client_configuration = MakeClientConfiguration(cfg);
-  process_set_spec.pids.insert(heapprofd_config.pid().cbegin(),
-                               heapprofd_config.pid().cend());
-  process_set_spec.process_cmdline.insert(
-      heapprofd_config.process_cmdline().cbegin(),
-      heapprofd_config.process_cmdline().cend());
-
-  data_source.processes =
-      socket_listener_.process_matcher().AwaitProcessSetSpec(
-          std::move(process_set_spec));
-
+  data_source.id = id;
+  data_source.client_configuration = MakeClientConfiguration(cfg);
+  data_source.config = heapprofd_config;
   auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
   data_source.trace_writer = endpoint_->CreateTraceWriter(buffer_id);
 
   data_sources_.emplace(id, std::move(data_source));
   PERFETTO_DLOG("Set up data source.");
+
+  if (mode_ == HeapprofdMode::kChild)
+    AdoptTargetProcessSocket();
 }
 
 void HeapprofdProducer::DoContinuousDump(DataSourceInstanceID id,
@@ -329,19 +281,23 @@
 // child mode heapprofd needs to distinguish between causes of the client
 // reference being torn down.
 void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
-  // DataSource holds ProfilingSession handles which on being destructed tear
-  // down the profiling on the client.
-
-  if (mode_ == HeapprofdMode::kChild) {
-    if (data_sources_.erase(id) == 1) {
-      PERFETTO_DLOG("Child mode exiting due to stopped data source.");
-      TerminateProcess(/*exit_status=*/0);  // does not return
-    }
-
-  } else {  // kCentral
-    if (data_sources_.erase(id) != 1)
+  auto it = data_sources_.find(id);
+  if (it == data_sources_.end()) {
+    if (mode_ == HeapprofdMode::kCentral)
       PERFETTO_DFATAL("Trying to stop non existing data source: %" PRIu64, id);
+    return;
   }
+
+  DataSource& data_source = it->second;
+  for (const auto& pid_and_heap_tracker : data_source.heap_trackers) {
+    pid_t pid = pid_and_heap_tracker.first;
+    UnwinderForPID(pid).PostDisconnectSocket(pid);
+  }
+
+  data_sources_.erase(it);
+
+  if (mode_ == HeapprofdMode::kChild)
+    TerminateProcess(/*exit_status=*/0);  // does not return
 }
 
 void HeapprofdProducer::OnTracingSetup() {}
@@ -349,34 +305,47 @@
 bool HeapprofdProducer::Dump(DataSourceInstanceID id,
                              FlushRequestID flush_id,
                              bool has_flush_id) {
-  PERFETTO_DLOG("Dumping %" PRIu64 ", flush: %d", id, has_flush_id);
   auto it = data_sources_.find(id);
   if (it == data_sources_.end()) {
+    PERFETTO_LOG("Invalid data source.");
     return false;
   }
+  DataSource& data_source = it->second;
 
-  const DataSource& data_source = it->second;
-  BookkeepingRecord record{};
-  record.record_type = BookkeepingRecord::Type::Dump;
-  DumpRecord& dump_record = record.dump_record;
-  std::set<pid_t> pids = data_source.processes.GetPIDs();
-  dump_record.pids.insert(dump_record.pids.begin(), pids.cbegin(), pids.cend());
-  dump_record.trace_writer = data_source.trace_writer;
+  DumpState dump_state(data_source.trace_writer.get(), &next_index_);
 
-  auto weak_producer = weak_factory_.GetWeakPtr();
-  base::TaskRunner* task_runner = task_runner_;
-  if (has_flush_id) {
-    dump_record.callback = [task_runner, weak_producer, flush_id] {
-      task_runner->PostTask([weak_producer, flush_id] {
-        if (weak_producer)
-          return weak_producer->FinishDataSourceFlush(flush_id);
-      });
-    };
-  } else {
-    dump_record.callback = [] {};
+  for (std::pair<const pid_t, HeapTracker>& pid_and_heap_tracker :
+       data_source.heap_trackers) {
+    pid_t pid = pid_and_heap_tracker.first;
+    HeapTracker& heap_tracker = pid_and_heap_tracker.second;
+    heap_tracker.Dump(pid, &dump_state);
   }
 
-  bookkeeping_queue_.Add(std::move(record));
+  for (GlobalCallstackTrie::Node* node : dump_state.callstacks_to_dump) {
+    // There need to be two separate loops over built_callstack because
+    // protozero cannot interleave different messages.
+    auto built_callstack = callsites_.BuildCallstack(node);
+    for (const Interned<Frame>& frame : built_callstack)
+      dump_state.WriteFrame(frame);
+    ProfilePacket::Callstack* callstack =
+        dump_state.current_profile_packet->add_callstacks();
+    callstack->set_id(node->id());
+    for (const Interned<Frame>& frame : built_callstack)
+      callstack->add_frame_ids(frame.id());
+  }
+
+  dump_state.current_trace_packet->Finalize();
+  if (has_flush_id) {
+    auto weak_producer = weak_factory_.GetWeakPtr();
+    auto callback = [weak_producer, flush_id] {
+      if (weak_producer)
+        return weak_producer->task_runner_->PostTask([weak_producer, flush_id] {
+          if (weak_producer)
+            return weak_producer->FinishDataSourceFlush(flush_id);
+        });
+    };
+    data_source.trace_writer->Flush(std::move(callback));
+  }
   return true;
 }
 
@@ -406,30 +375,18 @@
   }
 }
 
-std::function<void(UnwindingRecord)>
-HeapprofdProducer::MakeSocketListenerCallback() {
-  return [this](UnwindingRecord record) {
-    unwinder_queues_[static_cast<size_t>(record.pid) % kUnwinderThreads].Add(
-        std::move(record));
-  };
-}
-
-std::vector<BoundedQueue<UnwindingRecord>>
-HeapprofdProducer::MakeUnwinderQueues(size_t n) {
-  std::vector<BoundedQueue<UnwindingRecord>> ret(n);
+std::vector<std::thread> HeapprofdProducer::MakeUnwindingThreads(size_t n) {
+  std::vector<std::thread> ret;
   for (size_t i = 0; i < n; ++i) {
-    ret[i].SetCapacity(kUnwinderQueueSize);
-    ret[i].SetName("Unwinder " + std::to_string(n));
+    ret.emplace_back([this, i] { unwinding_task_runners_[i].Run(); });
   }
   return ret;
 }
 
-std::vector<std::thread> HeapprofdProducer::MakeUnwindingThreads(size_t n) {
-  std::vector<std::thread> ret;
+std::vector<UnwindingWorker> HeapprofdProducer::MakeUnwindingWorkers(size_t n) {
+  std::vector<UnwindingWorker> ret;
   for (size_t i = 0; i < n; ++i) {
-    ret.emplace_back([this, i] {
-      UnwindingMainLoop(&unwinder_queues_[i], &bookkeeping_queue_);
-    });
+    ret.emplace_back(this, &unwinding_task_runners_[i]);
   }
   return ret;
 }
@@ -438,7 +395,7 @@
   const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
   if (sock_fd == nullptr) {
     unlink(kHeapprofdSocketFile);
-    return base::UnixSocket::Listen(kHeapprofdSocketFile, &socket_listener_,
+    return base::UnixSocket::Listen(kHeapprofdSocketFile, &socket_delegate_,
                                     task_runner_);
   }
   char* end;
@@ -446,7 +403,7 @@
   if (*end != '\0')
     PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
                    kHeapprofdSocketEnvVar);
-  return base::UnixSocket::Listen(base::ScopedFile(raw_fd), &socket_listener_,
+  return base::UnixSocket::Listen(base::ScopedFile(raw_fd), &socket_delegate_,
                                   task_runner_);
 }
 
@@ -515,5 +472,218 @@
   exit(exit_status);
 }
 
+void HeapprofdProducer::SocketDelegate::OnDisconnect(base::UnixSocket* self) {
+  auto it = producer_->pending_processes_.find(self->peer_pid());
+  if (it == producer_->pending_processes_.end()) {
+    PERFETTO_DFATAL("Unexpected disconnect.");
+    return;
+  }
+
+  if (self == it->second.sock.get())
+    producer_->pending_processes_.erase(it);
+}
+
+void HeapprofdProducer::SocketDelegate::OnNewIncomingConnection(
+    base::UnixSocket*,
+    std::unique_ptr<base::UnixSocket> new_connection) {
+  Process peer_process;
+  peer_process.pid = new_connection->peer_pid();
+  if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
+    PERFETTO_ELOG("Failed to get cmdline for %d", peer_process.pid);
+
+  producer_->HandleClientConnection(std::move(new_connection), peer_process);
+}
+
+UnwindingWorker& HeapprofdProducer::UnwinderForPID(pid_t pid) {
+  return unwinding_workers_[static_cast<uint64_t>(pid) % kUnwinderThreads];
+}
+
+void HeapprofdProducer::SocketDelegate::OnDataAvailable(
+    base::UnixSocket* self) {
+  auto it = producer_->pending_processes_.find(self->peer_pid());
+  if (it == producer_->pending_processes_.end()) {
+    PERFETTO_DFATAL("Unexpected data.");
+    return;
+  }
+
+  PendingProcess& pending_process = it->second;
+
+  base::ScopedFile fds[kHandshakeSize];
+  char buf[1];
+  self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
+
+  static_assert(kHandshakeSize == 2, "change if below.");
+  if (fds[kHandshakeMaps] && fds[kHandshakeMem]) {
+    auto ds_it =
+        producer_->data_sources_.find(pending_process.data_source_instance_id);
+    if (ds_it == producer_->data_sources_.end()) {
+      producer_->pending_processes_.erase(it);
+      return;
+    }
+
+    DataSource& data_source = ds_it->second;
+    data_source.heap_trackers.emplace(self->peer_pid(), &producer_->callsites_);
+
+    PERFETTO_DLOG("%d: Received FDs.", self->peer_pid());
+    int raw_fd = pending_process.shmem.fd();
+    // TODO(fmayer): Full buffer could deadlock us here.
+    self->Send(&data_source.client_configuration,
+               sizeof(data_source.client_configuration), &raw_fd, 1,
+               base::UnixSocket::BlockingMode::kBlocking);
+
+    UnwindingWorker::HandoffData handoff_data;
+    handoff_data.data_source_instance_id =
+        pending_process.data_source_instance_id;
+    handoff_data.sock = self->ReleaseSocket();
+    for (size_t i = 0; i < kHandshakeSize; ++i)
+      handoff_data.fds[i] = std::move(fds[i]);
+    handoff_data.shmem = std::move(pending_process.shmem);
+
+    producer_->UnwinderForPID(self->peer_pid())
+        .PostHandoffSocket(std::move(handoff_data));
+    producer_->pending_processes_.erase(it);
+  } else if (fds[kHandshakeMaps] || fds[kHandshakeMem]) {
+    PERFETTO_DFATAL("%d: Received partial FDs.", self->peer_pid());
+    producer_->pending_processes_.erase(it);
+  } else {
+    PERFETTO_DLOG("%d: Received no FDs.", self->peer_pid());
+  }
+}
+
+HeapprofdProducer::DataSource* HeapprofdProducer::GetDataSourceForProcess(
+    const Process& proc) {
+  for (auto& ds_id_and_datasource : data_sources_) {
+    DataSource& ds = ds_id_and_datasource.second;
+    if (ds.config.all())
+      return &ds;
+
+    for (uint64_t pid : ds.config.pid()) {
+      if (static_cast<pid_t>(pid) == proc.pid)
+        return &ds;
+    }
+    for (const std::string& cmdline : ds.config.process_cmdline()) {
+      if (cmdline == proc.cmdline)
+        return &ds;
+    }
+  }
+  return nullptr;
+}
+
+void HeapprofdProducer::HandleClientConnection(
+    std::unique_ptr<base::UnixSocket> new_connection,
+    Process process) {
+  DataSource* data_source = GetDataSourceForProcess(process);
+  if (!data_source) {
+    PERFETTO_LOG("No data source found.");
+    return;
+  }
+
+  auto shmem = SharedRingBuffer::Create(kShmemSize);
+  if (!shmem || !shmem->is_valid()) {
+    PERFETTO_LOG("Failed to create shared memory.");
+    return;
+  }
+
+  pid_t peer_pid = new_connection->peer_pid();
+  if (peer_pid != process.pid) {
+    PERFETTO_DFATAL("Invalid PID connected.");
+    return;
+  }
+
+  PendingProcess pending_process;
+  pending_process.sock = std::move(new_connection);
+  pending_process.data_source_instance_id = data_source->id;
+  pending_process.shmem = std::move(*shmem);
+  pending_processes_.emplace(peer_pid, std::move(pending_process));
+}
+
+void HeapprofdProducer::PostAllocRecord(AllocRecord alloc_rec) {
+  // Once we can use C++14, this should be std::moved into the lambda instead.
+  AllocRecord* raw_alloc_rec = new AllocRecord(std::move(alloc_rec));
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, raw_alloc_rec] {
+    if (weak_this)
+      weak_this->HandleAllocRecord(std::move(*raw_alloc_rec));
+    delete raw_alloc_rec;
+  });
+}
+
+void HeapprofdProducer::PostFreeRecord(FreeRecord free_rec) {
+  // Once we can use C++14, this should be std::moved into the lambda instead.
+  FreeRecord* raw_free_rec = new FreeRecord(std::move(free_rec));
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, raw_free_rec] {
+    if (weak_this)
+      weak_this->HandleFreeRecord(std::move(*raw_free_rec));
+    delete raw_free_rec;
+  });
+}
+
+void HeapprofdProducer::PostSocketDisconnected(DataSourceInstanceID ds_id,
+                                               pid_t pid) {
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, ds_id, pid] {
+    if (weak_this)
+      weak_this->HandleSocketDisconnected(ds_id, pid);
+  });
+}
+
+void HeapprofdProducer::HandleAllocRecord(AllocRecord alloc_rec) {
+  const AllocMetadata& alloc_metadata = alloc_rec.alloc_metadata;
+  auto it = data_sources_.find(alloc_rec.data_source_instance_id);
+  if (it == data_sources_.end()) {
+    PERFETTO_LOG("Invalid data source in alloc record.");
+    return;
+  }
+
+  DataSource& ds = it->second;
+  auto heap_tracker_it = ds.heap_trackers.find(alloc_rec.pid);
+  if (heap_tracker_it == ds.heap_trackers.end()) {
+    PERFETTO_LOG("Invalid PID in alloc record.");
+    return;
+  }
+
+  HeapTracker& heap_tracker = heap_tracker_it->second;
+
+  heap_tracker.RecordMalloc(alloc_rec.frames, alloc_metadata.alloc_address,
+                            alloc_metadata.total_size,
+                            alloc_metadata.sequence_number);
+}
+
+void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
+  const FreeMetadata& free_metadata = free_rec.metadata;
+  auto it = data_sources_.find(free_rec.data_source_instance_id);
+  if (it == data_sources_.end()) {
+    PERFETTO_LOG("Invalid data source in free record.");
+    return;
+  }
+
+  DataSource& ds = it->second;
+  auto heap_tracker_it = ds.heap_trackers.find(free_rec.pid);
+  if (heap_tracker_it == ds.heap_trackers.end()) {
+    PERFETTO_LOG("Invalid PID in free record.");
+    return;
+  }
+
+  HeapTracker& heap_tracker = heap_tracker_it->second;
+
+  const FreePageEntry* entries = free_metadata.entries;
+  uint64_t num_entries = free_metadata.num_entries;
+  if (num_entries > kFreePageSize) {
+    PERFETTO_DFATAL("Malformed free page.");
+    return;
+  }
+  for (size_t i = 0; i < num_entries; ++i) {
+    const FreePageEntry& entry = entries[i];
+    heap_tracker.RecordFree(entry.addr, entry.sequence_number);
+  }
+}
+
+void HeapprofdProducer::HandleSocketDisconnected(DataSourceInstanceID, pid_t) {
+  // TODO(fmayer): Dump on process disconnect rather than data source
+  // destruction. This prevents us needing to hold onto the bookkeeping data
+  // after the process disconnected.
+}
+
 }  // namespace profiling
 }  // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_producer.h b/src/profiling/memory/heapprofd_producer.h
index d00486d..a95cd19 100644
--- a/src/profiling/memory/heapprofd_producer.h
+++ b/src/profiling/memory/heapprofd_producer.h
@@ -20,29 +20,55 @@
 #include <functional>
 #include <map>
 
+#include "perfetto/base/optional.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/base/unix_socket.h"
+#include "perfetto/base/unix_task_runner.h"
 
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/producer.h"
 #include "perfetto/tracing/core/tracing_service.h"
 
-#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/bookkeeping.h"
 #include "src/profiling/memory/proc_utils.h"
-#include "src/profiling/memory/process_matcher.h"
-#include "src/profiling/memory/socket_listener.h"
 #include "src/profiling/memory/system_property.h"
+#include "src/profiling/memory/unwinding.h"
 
 namespace perfetto {
 namespace profiling {
 
+struct Process {
+  pid_t pid;
+  std::string cmdline;
+};
+
 // TODO(rsavitski): central daemon can do less work if it knows that the global
 // operating mode is fork-based, as it then will not be interacting with the
 // clients. This can be implemented as an additional mode here.
 enum class HeapprofdMode { kCentral, kChild };
 
-class HeapprofdProducer : public Producer {
+class HeapprofdProducer : public Producer, public UnwindingWorker::Delegate {
  public:
+  friend class SocketDelegate;
+
+  // TODO(fmayer): Split into two delegates for the listening socket in kCentral
+  // and for the per-client sockets to make this easier to understand?
+  // Alternatively, find a better name for this.
+  class SocketDelegate : public base::UnixSocket::EventListener {
+   public:
+    SocketDelegate(HeapprofdProducer* producer) : producer_(producer) {}
+
+    void OnDisconnect(base::UnixSocket* self) override;
+    void OnNewIncomingConnection(
+        base::UnixSocket* self,
+        std::unique_ptr<base::UnixSocket> new_connection) override;
+    void OnDataAvailable(base::UnixSocket* self) override;
+
+   private:
+    HeapprofdProducer* producer_;
+  };
+
   HeapprofdProducer(HeapprofdMode mode, base::TaskRunner* task_runner);
   ~HeapprofdProducer() override;
 
@@ -61,14 +87,24 @@
   void ConnectWithRetries(const char* socket_name);
   void DumpAll();
 
-  // Valid only if mode_ == kChild. Adopts the (connected) sockets inherited
-  // from the target process, invoking the on-connection callback.
-  void AdoptConnectedSockets(std::vector<base::ScopedFile> inherited_sockets);
+  // UnwindingWorker::Delegate impl:
+  void PostAllocRecord(AllocRecord) override;
+  void PostFreeRecord(FreeRecord) override;
+  void PostSocketDisconnected(DataSourceInstanceID, pid_t) override;
+
+  void HandleAllocRecord(AllocRecord);
+  void HandleFreeRecord(FreeRecord);
+  void HandleSocketDisconnected(DataSourceInstanceID, pid_t);
 
   // Valid only if mode_ == kChild.
-  void SetTargetProcess(pid_t target_pid, std::string target_cmdline);
+  void SetTargetProcess(pid_t target_pid,
+                        std::string target_cmdline,
+                        base::ScopedFile inherited_socket);
 
  private:
+  void HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,
+                              Process process);
+
   // TODO(fmayer): Delete once we have generic reconnect logic.
   enum State {
     kNotStarted = 0,
@@ -88,15 +124,15 @@
 
   const HeapprofdMode mode_;
 
-  std::function<void(UnwindingRecord)> MakeSocketListenerCallback();
-  std::vector<BoundedQueue<UnwindingRecord>> MakeUnwinderQueues(size_t n);
   std::vector<std::thread> MakeUnwindingThreads(size_t n);
+  std::vector<UnwindingWorker> MakeUnwindingWorkers(size_t n);
 
   void FinishDataSourceFlush(FlushRequestID flush_id);
   bool Dump(DataSourceInstanceID id,
             FlushRequestID flush_id,
             bool has_flush_id);
   void DoContinuousDump(DataSourceInstanceID id, uint32_t dump_interval);
+  UnwindingWorker& UnwinderForPID(pid_t);
 
   // functionality specific to mode_ == kCentral
   std::unique_ptr<base::UnixSocket> MakeListeningSocket();
@@ -105,16 +141,29 @@
   void TerminateProcess(int exit_status);
   bool SourceMatchesTarget(const HeapprofdConfig& cfg);
 
+  // Valid only if mode_ == kChild. Adopts the (connected) sockets inherited
+  // from the target process, invoking the on-connection callback.
+  void AdoptTargetProcessSocket();
+
   struct DataSource {
-    // This is a shared ptr so we can lend a weak_ptr to the bookkeeping
-    // thread for unwinding.
-    std::shared_ptr<TraceWriter> trace_writer;
-    // These are opaque handles that shut down the sockets in SocketListener
-    // once they go away.
-    ProcessMatcher::ProcessSetSpecHandle processes;
+    DataSourceInstanceID id;
+    std::unique_ptr<TraceWriter> trace_writer;
+    HeapprofdConfig config;
+    ClientConfiguration client_configuration;
     std::vector<SystemProperties::Handle> properties;
+    std::map<pid_t, HeapTracker> heap_trackers;
   };
 
+  struct PendingProcess {
+    std::unique_ptr<base::UnixSocket> sock;
+    DataSourceInstanceID data_source_instance_id;
+    SharedRingBuffer shmem;
+  };
+
+  std::map<pid_t, PendingProcess> pending_processes_;
+
+  DataSource* GetDataSourceForProcess(const Process& proc);
+
   std::map<DataSourceInstanceID, DataSource> data_sources_;
   std::map<FlushRequestID, size_t> flushes_in_progress_;
 
@@ -122,12 +171,17 @@
   base::TaskRunner* const task_runner_;
   std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
 
-  BoundedQueue<BookkeepingRecord> bookkeeping_queue_;
-  BookkeepingThread bookkeeping_thread_;
-  std::thread bookkeeping_th_;
-  std::vector<BoundedQueue<UnwindingRecord>> unwinder_queues_;
-  std::vector<std::thread> unwinding_threads_;
-  SocketListener socket_listener_;
+  GlobalCallstackTrie callsites_;
+  // Sequence number for ProfilePackets, so the consumer can assert that none
+  // of them were dropped.
+  uint64_t next_index_ = 0;
+
+  // These are not fields in UnwinderThread as the task runner is not movable
+  // and that makes UnwinderThread very unwieldy objects (e.g. we cannot
+  // emplace_back into a vector as that requires movability.)
+  std::vector<base::UnixTaskRunner> unwinding_task_runners_;
+  std::vector<std::thread> unwinding_threads_;  // Only for ownership.
+  std::vector<UnwindingWorker> unwinding_workers_;
 
   // state specific to mode_ == kCentral
   std::unique_ptr<base::UnixSocket> listening_socket_;
@@ -136,6 +190,11 @@
   // state specific to mode_ == kChild
   pid_t target_pid_ = base::kInvalidPid;
   std::string target_cmdline_;
+  // This is a valid FD between SetTargetProcess and UseTargetProcessSocket
+  // only.
+  base::ScopedFile inherited_fd_;
+
+  SocketDelegate socket_delegate_;
 
   base::WeakPtrFactory<HeapprofdProducer> weak_factory_;
 };
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index 8aea48c..4ac8242 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -26,9 +26,7 @@
 #include "perfetto/base/event.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/unix_socket.h"
-#include "src/profiling/memory/bounded_queue.h"
 #include "src/profiling/memory/heapprofd_producer.h"
-#include "src/profiling/memory/socket_listener.h"
 #include "src/profiling/memory/wire_protocol.h"
 #include "src/tracing/ipc/default_socket.h"
 
@@ -40,7 +38,7 @@
 
 int StartChildHeapprofd(pid_t target_pid,
                         std::string target_cmdline,
-                        std::vector<base::ScopedFile> inherited_sock_fds);
+                        base::ScopedFile inherited_sock_fd);
 int StartCentralHeapprofd();
 
 base::Event* g_dump_evt = nullptr;
@@ -49,7 +47,7 @@
   bool cleanup_crash = false;
   pid_t target_pid = base::kInvalidPid;
   std::string target_cmdline;
-  std::vector<base::ScopedFile> inherited_sock_fds;
+  base::ScopedFile inherited_sock_fd;
 
   enum { kCleanupCrash = 256, kTargetPid, kTargetCmd, kInheritFd };
   static struct option long_options[] = {
@@ -66,13 +64,19 @@
         cleanup_crash = true;
         break;
       case kTargetPid:
+        if (target_pid != base::kInvalidPid)
+          PERFETTO_FATAL("Duplicate exclusive-for-pid");
         target_pid = static_cast<pid_t>(atoi(optarg));
         break;
       case kTargetCmd:  // assumed to be already normalized
+        if (!target_cmdline.empty())
+          PERFETTO_FATAL("Duplicate exclusive-for-cmdline");
         target_cmdline = std::string(optarg);
         break;
-      case kInheritFd:  // repetition supported
-        inherited_sock_fds.emplace_back(atoi(optarg));
+      case kInheritFd:  // repetition not supported
+        if (inherited_sock_fd)
+          PERFETTO_FATAL("Duplicate inherit-socket-fd");
+        inherited_sock_fd = base::ScopedFile(atoi(optarg));
         break;
       default:
         PERFETTO_ELOG("Usage: %s [--cleanup-after-crash]", argv[0]);
@@ -90,7 +94,7 @@
   // reparenting.
   bool tpid_set = target_pid != base::kInvalidPid;
   bool tcmd_set = !target_cmdline.empty();
-  bool fds_set = !inherited_sock_fds.empty();
+  bool fds_set = !!inherited_sock_fd;
   if (tpid_set || tcmd_set || fds_set) {
     if (!tpid_set || !tcmd_set || !fds_set) {
       PERFETTO_ELOG(
@@ -100,7 +104,7 @@
     }
 
     return StartChildHeapprofd(target_pid, target_cmdline,
-                               std::move(inherited_sock_fds));
+                               std::move(inherited_sock_fd));
   }
 
   // Otherwise start as a central daemon.
@@ -109,12 +113,12 @@
 
 int StartChildHeapprofd(pid_t target_pid,
                         std::string target_cmdline,
-                        std::vector<base::ScopedFile> inherited_sock_fds) {
+                        base::ScopedFile inherited_sock_fd) {
   base::UnixTaskRunner task_runner;
   HeapprofdProducer producer(HeapprofdMode::kChild, &task_runner);
-  producer.SetTargetProcess(target_pid, target_cmdline);
+  producer.SetTargetProcess(target_pid, target_cmdline,
+                            std::move(inherited_sock_fd));
   producer.ConnectWithRetries(GetProducerSocket());
-  producer.AdoptConnectedSockets(std::move(inherited_sock_fds));
   task_runner.Run();
   return 0;
 }
diff --git a/src/profiling/memory/malloc_hooks.cc b/src/profiling/memory/malloc_hooks.cc
index 531fccf..122b87b 100644
--- a/src/profiling/memory/malloc_hooks.cc
+++ b/src/profiling/memory/malloc_hooks.cc
@@ -123,7 +123,6 @@
 // is tied together).
 std::atomic<bool> g_client_lock{false};
 
-constexpr size_t kNumConnections = 2;
 constexpr char kHeapprofdBinPath[] = "/system/bin/heapprofd";
 
 const MallocDispatch* GetDispatch() {
@@ -184,7 +183,7 @@
   PERFETTO_DLOG("Constructing client for central daemon.");
 
   return std::make_shared<perfetto::profiling::Client>(
-      perfetto::profiling::kHeapprofdSocketFile, kNumConnections);
+      perfetto::profiling::kHeapprofdSocketFile);
 }
 
 std::shared_ptr<perfetto::profiling::Client> CreateClientAndPrivateDaemon() {
@@ -256,10 +255,7 @@
     return nullptr;
   }
 
-  std::vector<perfetto::base::UnixSocketRaw> client_sockets;
-  client_sockets.emplace_back(std::move(parent_sock));
-  return std::make_shared<perfetto::profiling::Client>(
-      std::move(client_sockets));
+  return std::make_shared<perfetto::profiling::Client>(std::move(parent_sock));
 }
 
 }  // namespace
diff --git a/src/profiling/memory/process_matcher.cc b/src/profiling/memory/process_matcher.cc
deleted file mode 100644
index c87e40c..0000000
--- a/src/profiling/memory/process_matcher.cc
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/profiling/memory/process_matcher.h"
-
-#include "perfetto/base/logging.h"
-
-namespace perfetto {
-namespace profiling {
-
-ProcessMatcher::Delegate::~Delegate() = default;
-
-ProcessMatcher::ProcessHandle::ProcessHandle(ProcessMatcher* matcher, pid_t pid)
-    : matcher_(matcher), pid_(pid) {}
-
-ProcessMatcher::ProcessHandle::ProcessHandle(ProcessHandle&& other) noexcept
-    : matcher_(other.matcher_), pid_(other.pid_) {
-  other.matcher_ = nullptr;
-}
-
-ProcessMatcher::ProcessHandle& ProcessMatcher::ProcessHandle::operator=(
-    ProcessHandle&& other) noexcept {
-  // Construct this temporary because the RHS could be an lvalue cast to an
-  // rvalue reference whose lifetime we do not know.
-  ProcessHandle tmp(std::move(other));
-  using std::swap;
-  swap(*this, tmp);
-  return *this;
-}
-
-ProcessMatcher::ProcessHandle::~ProcessHandle() {
-  if (matcher_)
-    matcher_->RemoveProcess(pid_);
-}
-
-ProcessMatcher::ProcessSetSpecHandle::ProcessSetSpecHandle(
-    ProcessMatcher* matcher,
-    std::multiset<ProcessSetSpecItem>::iterator iterator)
-    : matcher_(matcher), iterator_(iterator) {}
-
-ProcessMatcher::ProcessSetSpecHandle::ProcessSetSpecHandle(
-    ProcessSetSpecHandle&& other) noexcept
-    : matcher_(other.matcher_), iterator_(other.iterator_) {
-  other.matcher_ = nullptr;
-}
-
-ProcessMatcher::ProcessSetSpecHandle& ProcessMatcher::ProcessSetSpecHandle::
-operator=(ProcessSetSpecHandle&& other) noexcept {
-  // Construct this temporary because the RHS could be an lvalue cast to an
-  // rvalue reference whose lifetime we do not know.
-  ProcessSetSpecHandle tmp(std::move(other));
-  using std::swap;
-  swap(*this, tmp);
-  return *this;
-}
-
-std::set<pid_t> ProcessMatcher::ProcessSetSpecHandle::GetPIDs() const {
-  std::set<pid_t> result;
-  for (const ProcessItem* process_item : iterator_->process_items)
-    result.emplace(process_item->process.pid);
-  return result;
-}
-
-ProcessMatcher::ProcessSetSpecHandle::~ProcessSetSpecHandle() {
-  if (matcher_)
-    matcher_->UnwaitProcessSetSpec(iterator_);
-}
-
-ProcessMatcher::ProcessMatcher(Delegate* delegate) : delegate_(delegate) {}
-
-ProcessMatcher::ProcessHandle ProcessMatcher::ProcessConnected(
-    Process process) {
-  pid_t pid = process.pid;
-  decltype(pid_to_process_)::iterator it;
-  bool inserted;
-  std::tie(it, inserted) = pid_to_process_.emplace(pid, std::move(process));
-  if (!inserted) {
-    PERFETTO_DFATAL("Duplicated PID");
-    return ProcessHandle(nullptr, 0);
-  }
-
-  ProcessItem* new_process_item = &(it->second);
-  const std::string& cmdline = new_process_item->process.cmdline;
-  cmdline_to_process_.emplace(cmdline, new_process_item);
-
-  // Go through existing ProcessSetSpecs to find ones containing the newly
-  // connected process.
-  std::set<ProcessSetSpecItem*> matching_process_set_items =
-      process_set_for_all_;
-  auto pid_range = pid_to_process_set_.equal_range(pid);
-  for (auto i = pid_range.first; i != pid_range.second; ++i) {
-    ProcessSetSpec& ps = const_cast<ProcessSetSpec&>(i->second->process_set);
-    if (ps.pids.find(pid) != ps.pids.end())
-      matching_process_set_items.emplace(i->second);
-  }
-  auto cmdline_range = cmdline_to_process_set_.equal_range(cmdline);
-  for (auto i = cmdline_range.first; i != cmdline_range.second; ++i) {
-    ProcessSetSpec& ps = const_cast<ProcessSetSpec&>(i->second->process_set);
-    if (ps.process_cmdline.find(cmdline) != ps.process_cmdline.end())
-      matching_process_set_items.emplace(i->second);
-  }
-
-  for (ProcessSetSpecItem* process_set_item : matching_process_set_items) {
-    process_set_item->process_items.emplace(new_process_item);
-    new_process_item->references.emplace(process_set_item);
-  }
-
-  if (!matching_process_set_items.empty())
-    RunMatchFn(new_process_item);
-
-  return ProcessHandle(this, pid);
-}
-
-void ProcessMatcher::RemoveProcess(pid_t pid) {
-  auto it = pid_to_process_.find(pid);
-  if (it == pid_to_process_.end()) {
-    PERFETTO_DFATAL("Could not find process.");
-    return;
-  }
-  ProcessItem& process_item = it->second;
-  auto range = cmdline_to_process_.equal_range(process_item.process.cmdline);
-  for (auto process_it = range.first; process_it != range.second;
-       ++process_it) {
-    if (process_it->second == &process_item) {
-      size_t erased = cmdline_to_process_.erase(process_item.process.cmdline);
-      PERFETTO_DCHECK(erased);
-      break;
-    }
-  }
-  pid_to_process_.erase(it);
-}
-
-ProcessMatcher::ProcessSetSpecHandle ProcessMatcher::AwaitProcessSetSpec(
-    ProcessSetSpec process_set) {
-  auto it = process_sets_.emplace(this, std::move(process_set));
-  ProcessSetSpecItem* new_process_set_item =
-      const_cast<ProcessSetSpecItem*>(&*it);
-  const ProcessSetSpec& new_process_set = new_process_set_item->process_set;
-
-  // Go through currently active processes to find ones matching the new
-  // ProcessSetSpec.
-  std::set<ProcessItem*> matching_process_items;
-  if (new_process_set.all) {
-    process_set_for_all_.emplace(new_process_set_item);
-    for (auto& p : pid_to_process_) {
-      ProcessItem& process_item = p.second;
-      matching_process_items.emplace(&process_item);
-    }
-  } else {
-    for (pid_t pid : new_process_set.pids) {
-      pid_to_process_set_.emplace(pid, new_process_set_item);
-      auto process_it = pid_to_process_.find(pid);
-      if (process_it != pid_to_process_.end())
-        matching_process_items.emplace(&(process_it->second));
-    }
-    for (std::string cmdline : new_process_set.process_cmdline) {
-      cmdline_to_process_set_.emplace(cmdline, new_process_set_item);
-      auto range = cmdline_to_process_.equal_range(cmdline);
-      for (auto process_it = range.first; process_it != range.second;
-           ++process_it)
-        matching_process_items.emplace(process_it->second);
-    }
-  }
-
-  for (ProcessItem* process_item : matching_process_items) {
-    new_process_set_item->process_items.emplace(process_item);
-    process_item->references.emplace(new_process_set_item);
-    RunMatchFn(process_item);
-  }
-
-  return ProcessSetSpecHandle(this, it);
-}
-
-void ProcessMatcher::UnwaitProcessSetSpec(
-    std::multiset<ProcessSetSpecItem>::iterator iterator) {
-  ProcessSetSpecItem& process_set_item =
-      const_cast<ProcessSetSpecItem&>(*iterator);
-  const ProcessSetSpec& process_set = process_set_item.process_set;
-
-  for (pid_t pid : process_set.pids) {
-    auto pid_range = pid_to_process_set_.equal_range(pid);
-    for (auto i = pid_range.first; i != pid_range.second;) {
-      if (i->second == &process_set_item)
-        i = pid_to_process_set_.erase(i);
-      else
-        ++i;
-    }
-  }
-  for (const std::string& cmdline : process_set.process_cmdline) {
-    auto cmdline_range = cmdline_to_process_set_.equal_range(cmdline);
-    for (auto i = cmdline_range.first; i != cmdline_range.second;) {
-      if (i->second == &process_set_item)
-        i = cmdline_to_process_set_.erase(i);
-      else
-        ++i;
-    }
-  }
-
-  if (process_set.all)
-    process_set_for_all_.erase(&process_set_item);
-  process_sets_.erase(iterator);
-}
-
-ProcessMatcher::ProcessItem::~ProcessItem() {
-  for (ProcessSetSpecItem* process_set_item : references) {
-    size_t erased = process_set_item->process_items.erase(this);
-    PERFETTO_DCHECK(erased);
-  }
-}
-
-bool ProcessMatcher::ProcessSetSpecItem::operator<(
-    const ProcessSetSpecItem& other) const {
-  return std::tie(process_set.pids, process_set.process_cmdline,
-                  process_set.all) < std::tie(other.process_set.pids,
-                                              other.process_set.process_cmdline,
-                                              other.process_set.all);
-}
-
-ProcessMatcher::ProcessSetSpecItem::~ProcessSetSpecItem() {
-  for (ProcessItem* process_item : process_items) {
-    size_t erased = process_item->references.erase(this);
-    PERFETTO_DCHECK(erased);
-    if (process_item->references.empty())
-      matcher->ShutdownProcess(process_item->process.pid);
-  }
-}
-
-void ProcessMatcher::ShutdownProcess(pid_t pid) {
-  delegate_->Disconnect(pid);
-}
-
-void ProcessMatcher::RunMatchFn(ProcessItem* process_item) {
-  std::vector<const ProcessSetSpec*> process_sets;
-  for (ProcessSetSpecItem* process_set_item : process_item->references)
-    process_sets.emplace_back(&(process_set_item->process_set));
-  delegate_->Match(process_item->process, process_sets);
-}
-
-void swap(ProcessMatcher::ProcessHandle& a, ProcessMatcher::ProcessHandle& b) {
-  using std::swap;
-  swap(a.matcher_, b.matcher_);
-  swap(a.pid_, b.pid_);
-}
-
-void swap(ProcessMatcher::ProcessSetSpecHandle& a,
-          ProcessMatcher::ProcessSetSpecHandle& b) {
-  using std::swap;
-  swap(a.matcher_, b.matcher_);
-  swap(a.iterator_, b.iterator_);
-}
-
-}  // namespace profiling
-}  // namespace perfetto
diff --git a/src/profiling/memory/process_matcher.h b/src/profiling/memory/process_matcher.h
deleted file mode 100644
index e98d5ee..0000000
--- a/src/profiling/memory/process_matcher.h
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_PROFILING_MEMORY_PROCESS_MATCHER_H_
-#define SRC_PROFILING_MEMORY_PROCESS_MATCHER_H_
-
-#include <map>
-#include <set>
-#include <string>
-#include <vector>
-
-#include "src/profiling/memory/wire_protocol.h"
-
-namespace perfetto {
-namespace profiling {
-
-struct Process {
-  pid_t pid;
-  std::string cmdline;
-};
-
-struct ProcessSetSpec {
-  std::set<pid_t> pids;
-  std::set<std::string> process_cmdline;
-  bool all = false;
-
-  ClientConfiguration client_configuration{};
-};
-
-// The Matcher allows DataSources to wait for ProcessSetSpecs, and the
-// SocketListener to notify connection of a new Process. Both of these
-// operations return an opaque handle that should be held on to by the caller.
-//
-// If the ProcessHandle gets destroyed, it signals to the Matcher that the
-// process disconnected. If the ProcessSetSpecHandle goes away, it signals to
-// the Matcher that the ProcessSetSpec has been torn down. When the last
-// ProcessSetSpec referring to a Process gets torn down, the Process has to be
-// shut down.
-//
-// In the constructor, a match_fn and a shutdown_fn are supplied.
-// match_fn is called when the set of ProcessSetSpecs for a given process
-// changes, so that the SocketListener can compute and send the appropriate
-// ClientConfiguration.
-// shutdown_fn is called when the last DataSource for a process gets torn
-// down.
-class ProcessMatcher {
- private:
-  struct ProcessItem;
-  struct ProcessSetSpecItem;
-
- public:
-  class Delegate {
-   public:
-    virtual void Match(
-        const Process& process,
-        const std::vector<const ProcessSetSpec*>& process_sets) = 0;
-    virtual void Disconnect(pid_t pid) = 0;
-    virtual ~Delegate();
-  };
-
-  class ProcessHandle {
-   public:
-    friend class ProcessMatcher;
-    friend void swap(ProcessHandle&, ProcessHandle&);
-    ProcessHandle() = default;
-
-    ~ProcessHandle();
-    ProcessHandle(const ProcessHandle&) = delete;
-    ProcessHandle& operator=(const ProcessHandle&) = delete;
-    ProcessHandle(ProcessHandle&&) noexcept;
-    ProcessHandle& operator=(ProcessHandle&&) noexcept;
-
-   private:
-    ProcessHandle(ProcessMatcher* matcher, pid_t pid);
-
-    ProcessMatcher* matcher_ = nullptr;
-    pid_t pid_;
-  };
-
-  class ProcessSetSpecHandle {
-   public:
-    friend class ProcessMatcher;
-    friend void swap(ProcessSetSpecHandle&, ProcessSetSpecHandle&);
-    ProcessSetSpecHandle() = default;
-
-    ~ProcessSetSpecHandle();
-    ProcessSetSpecHandle(const ProcessSetSpecHandle&) = delete;
-    ProcessSetSpecHandle& operator=(const ProcessSetSpecHandle&) = delete;
-    ProcessSetSpecHandle(ProcessSetSpecHandle&&) noexcept;
-    ProcessSetSpecHandle& operator=(ProcessSetSpecHandle&&) noexcept;
-
-    std::set<pid_t> GetPIDs() const;
-
-   private:
-    ProcessSetSpecHandle(ProcessMatcher* matcher,
-                         std::multiset<ProcessSetSpecItem>::iterator iterator);
-
-    ProcessMatcher* matcher_ = nullptr;
-    std::multiset<ProcessSetSpecItem>::iterator iterator_;
-  };
-
-  ProcessMatcher(Delegate* delegate);
-
-  // Notify that a process has connected. This will determine which
-  // ProcessSetSpecs it matches, and call match_fn with that set.
-  // This is called by the SocketListener.
-  ProcessHandle ProcessConnected(Process process);
-
-  // Wait for connection of a set of processes as specified in ProcessSetSpec.
-  // When a process matching that specificaton connects, match_fn will be called
-  // with this and other ProcessSetSpecs that have called this function
-  // previously.
-  // This is called by HeapprofdProducer.
-  ProcessSetSpecHandle AwaitProcessSetSpec(ProcessSetSpec process_set);
-
- private:
-  // ProcessItem and ProcessSetSpecItem are held internally in the Matcher for
-  // each Process and ProcessSetSpec. Matched Processes and ProcessSetSpecs have
-  // pointers to each other in their ProcessItem and ProcessSetSpecItem structs,
-  // which are automatically kept up to date in the destructors.
-  struct ProcessItem {
-    // No copy or move as we rely on pointer stability in ProcessSetSpecItem.
-    ProcessItem(const ProcessItem&) = delete;
-    ProcessItem& operator=(const ProcessItem&) = delete;
-    ProcessItem(ProcessItem&&) = delete;
-    ProcessItem& operator=(ProcessItem&&) = delete;
-
-    ProcessItem(Process p) : process(std::move(p)) {}
-
-    Process process;
-    std::set<ProcessSetSpecItem*> references;
-
-    ~ProcessItem();
-  };
-
-  struct ProcessSetSpecItem {
-    // No copy or move as we rely on pointer stability in ProcessSetSpec.
-    ProcessSetSpecItem(const ProcessSetSpecItem&) = delete;
-    ProcessSetSpecItem& operator=(const ProcessSetSpecItem&) = delete;
-    ProcessSetSpecItem(ProcessSetSpecItem&&) = delete;
-    ProcessSetSpecItem& operator=(ProcessSetSpecItem&&) = delete;
-
-    ProcessSetSpecItem(ProcessMatcher* m, ProcessSetSpec ps)
-        : matcher(m), process_set(std::move(ps)) {}
-
-    ~ProcessSetSpecItem();
-    bool operator<(const ProcessSetSpecItem& other) const;
-
-    ProcessMatcher* matcher;
-    const ProcessSetSpec process_set;
-    std::set<ProcessItem*> process_items;
-  };
-
-  void UnwaitProcessSetSpec(
-      std::multiset<ProcessSetSpecItem>::iterator iterator);
-  void RemoveProcess(pid_t pid);
-  void ShutdownProcess(pid_t pid);
-  void RunMatchFn(ProcessItem* process_item);
-
-  Delegate* delegate_;
-
-  std::map<pid_t, ProcessItem> pid_to_process_;
-  std::multimap<std::string, ProcessItem*> cmdline_to_process_;
-
-  std::multiset<ProcessSetSpecItem> process_sets_;
-  std::multimap<pid_t, ProcessSetSpecItem*> pid_to_process_set_;
-  std::multimap<std::string, ProcessSetSpecItem*> cmdline_to_process_set_;
-  std::set<ProcessSetSpecItem*> process_set_for_all_;
-};
-
-void swap(ProcessMatcher::ProcessHandle& a, ProcessMatcher::ProcessHandle& b);
-void swap(ProcessMatcher::ProcessSetSpecHandle& a,
-          ProcessMatcher::ProcessSetSpecHandle& b);
-
-}  // namespace profiling
-}  // namespace perfetto
-
-#endif  // SRC_PROFILING_MEMORY_PROCESS_MATCHER_H_
diff --git a/src/profiling/memory/process_matcher_unittest.cc b/src/profiling/memory/process_matcher_unittest.cc
deleted file mode 100644
index 07601fc..0000000
--- a/src/profiling/memory/process_matcher_unittest.cc
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/profiling/memory/process_matcher.h"
-
-#include "gtest/gtest.h"
-
-namespace perfetto {
-namespace profiling {
-namespace {
-
-class DummyDelegate : public ProcessMatcher::Delegate {
- public:
-  void Match(const Process&,
-             const std::vector<const ProcessSetSpec*>&) override {
-    match = true;
-  }
-
-  void Disconnect(pid_t) override { shutdown = true; }
-
-  bool match = false;
-  bool shutdown = false;
-};
-
-TEST(MatcherTest, MatchPIDProcessSetSpecFirst) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.pids.emplace(1);
-
-  auto ps_handle = m.AwaitProcessSetSpec(std::move(ps));
-  auto handle = m.ProcessConnected({1, "init"});
-  EXPECT_TRUE(delegate.match);
-  EXPECT_FALSE(delegate.shutdown);
-}
-
-TEST(MatcherTest, MatchPIDProcessSetSpecSecond) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.pids.emplace(1);
-
-  auto handle = m.ProcessConnected({1, "init"});
-  auto ps_handle = m.AwaitProcessSetSpec(std::move(ps));
-  EXPECT_TRUE(delegate.match);
-  EXPECT_FALSE(delegate.shutdown);
-}
-
-TEST(MatcherTest, MatchCmdlineProcessSetSpecFirst) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.process_cmdline.emplace("init");
-
-  auto ps_handle = m.AwaitProcessSetSpec(std::move(ps));
-  auto handle = m.ProcessConnected({1, "init"});
-  EXPECT_TRUE(delegate.match);
-  EXPECT_FALSE(delegate.shutdown);
-}
-
-TEST(MatcherTest, MatchCmdlineProcessSetSpecSecond) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.process_cmdline.emplace("init");
-
-  auto handle = m.ProcessConnected({1, "init"});
-  auto ps_handle = m.AwaitProcessSetSpec(std::move(ps));
-  EXPECT_TRUE(delegate.match);
-  EXPECT_FALSE(delegate.shutdown);
-}
-
-TEST(MatcherTest, ExpiredProcessSetSpecHandle) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.pids.emplace(1);
-
-  { auto ps_handle = m.AwaitProcessSetSpec(std::move(ps)); }
-  auto handle = m.ProcessConnected({1, "init"});
-  EXPECT_FALSE(delegate.match);
-}
-
-TEST(MatcherTest, ExpiredProcessHandle) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.pids.emplace(1);
-
-  { auto handle = m.ProcessConnected({1, "init"}); }
-  EXPECT_FALSE(delegate.shutdown);
-  auto ps_handle = m.AwaitProcessSetSpec(std::move(ps));
-  EXPECT_FALSE(delegate.match);
-}
-
-TEST(MatcherTest, MatchCmdlineProcessSetSpecFirstMultiple) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.process_cmdline.emplace("init");
-  ProcessSetSpec ps2;
-  ps2.process_cmdline.emplace("init");
-
-  auto ps_handle = m.AwaitProcessSetSpec(std::move(ps));
-  auto ps2_handle = m.AwaitProcessSetSpec(std::move(ps2));
-  auto handle = m.ProcessConnected({1, "init"});
-  EXPECT_TRUE(delegate.match);
-  EXPECT_FALSE(delegate.shutdown);
-  { auto destroy = std::move(ps2_handle); }
-  EXPECT_FALSE(delegate.shutdown);
-  { auto destroy = std::move(ps_handle); }
-  EXPECT_TRUE(delegate.shutdown);
-}
-
-TEST(MatcherTest, GetPIDs) {
-  DummyDelegate delegate;
-
-  ProcessMatcher m(&delegate);
-  ProcessSetSpec ps;
-  ps.process_cmdline.emplace("init");
-
-  auto init_handle = m.ProcessConnected({1, "init"});
-  auto second_init_handle = m.ProcessConnected({2, "init"});
-  auto ps_handle = m.AwaitProcessSetSpec(std::move(ps));
-  std::set<pid_t> expected_pids{1, 2};
-  EXPECT_EQ(ps_handle.GetPIDs(), expected_pids);
-  EXPECT_TRUE(delegate.match);
-  EXPECT_FALSE(delegate.shutdown);
-}
-
-}  // namespace
-}  // namespace profiling
-}  // namespace perfetto
diff --git a/src/profiling/memory/queue_messages.h b/src/profiling/memory/queue_messages.h
index be3e845..e1346ac 100644
--- a/src/profiling/memory/queue_messages.h
+++ b/src/profiling/memory/queue_messages.h
@@ -14,8 +14,6 @@
  * limitations under the License.
  */
 
-// This file contains messages sent between the threads over BoundedQueue.
-
 #ifndef SRC_PROFILING_MEMORY_QUEUE_MESSAGES_H_
 #define SRC_PROFILING_MEMORY_QUEUE_MESSAGES_H_
 
@@ -25,6 +23,8 @@
 #include "perfetto/tracing/core/trace_writer.h"
 #include "src/profiling/memory/wire_protocol.h"
 
+// TODO(fmayer): Find better places to put these structs.
+
 namespace perfetto {
 namespace profiling {
 
@@ -38,9 +38,9 @@
 };
 
 struct FreeRecord {
-  std::unique_ptr<uint8_t[]> free_data;
-  // This is a pointer into free_data.
-  FreeMetadata* metadata;
+  pid_t pid;
+  uint64_t data_source_instance_id;
+  FreeMetadata metadata;
 };
 
 // A wrapper of libunwindstack FrameData that also includes the build_id.
@@ -53,6 +53,8 @@
 };
 
 struct AllocRecord {
+  pid_t pid;
+  uint64_t data_source_instance_id;
   AllocMetadata alloc_metadata;
   std::vector<FrameData> frames;
 };
@@ -70,7 +72,6 @@
     Free = 2,
   };
   pid_t pid;
-  uint64_t client_generation;
   // TODO(fmayer): Use a union.
   Type record_type;
   AllocRecord alloc_record;
diff --git a/src/profiling/memory/shared_ring_buffer.cc b/src/profiling/memory/shared_ring_buffer.cc
index 52d87a0..8d31616 100644
--- a/src/profiling/memory/shared_ring_buffer.cc
+++ b/src/profiling/memory/shared_ring_buffer.cc
@@ -166,13 +166,16 @@
   Buffer result;
 
   base::Optional<PointerPositions> opt_pos = GetPointerPositions(spinlock);
-  if (!opt_pos)
+  if (!opt_pos) {
+    errno = EBADFD;
     return result;
+  }
   auto pos = opt_pos.value();
 
   const uint64_t size_with_header =
       base::AlignUp<kAlignment>(size + kHeaderSize);
   if (size_with_header > write_avail(pos)) {
+    errno = EAGAIN;
     meta_->num_writes_failed++;
     return result;
   }
@@ -192,6 +195,8 @@
 }
 
 void SharedRingBuffer::EndWrite(Buffer buf) {
+  if (!buf)
+    return;
   uint8_t* wr_ptr = buf.data - kHeaderSize;
   PERFETTO_DCHECK(reinterpret_cast<uintptr_t>(wr_ptr) % kAlignment == 0);
   reinterpret_cast<std::atomic<uint32_t>*>(wr_ptr)->store(
diff --git a/src/profiling/memory/shared_ring_buffer.h b/src/profiling/memory/shared_ring_buffer.h
index 9ec09e9..28e0c5c 100644
--- a/src/profiling/memory/shared_ring_buffer.h
+++ b/src/profiling/memory/shared_ring_buffer.h
@@ -75,6 +75,8 @@
   static base::Optional<SharedRingBuffer> Attach(base::ScopedFile);
 
   ~SharedRingBuffer();
+  SharedRingBuffer() = default;
+
   SharedRingBuffer(SharedRingBuffer&&) noexcept;
   SharedRingBuffer& operator=(SharedRingBuffer&&);
 
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
deleted file mode 100644
index 568b663..0000000
--- a/src/profiling/memory/socket_listener.cc
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/profiling/memory/socket_listener.h"
-
-#include "perfetto/base/utils.h"
-#include "src/profiling/memory/proc_utils.h"
-
-namespace perfetto {
-namespace profiling {
-namespace {
-
-ClientConfiguration MergeProcessSetSpecs(
-    const std::vector<const ProcessSetSpec*>& process_sets) {
-  ClientConfiguration result{};
-  for (const ProcessSetSpec* process_set : process_sets) {
-    const ClientConfiguration& cfg = process_set->client_configuration;
-    if (result.interval == 0 || result.interval > cfg.interval)
-      result.interval = cfg.interval;
-  }
-  PERFETTO_DCHECK(result.interval > 0);
-  if (result.interval < 1)
-    result.interval = 1;
-  return result;
-}
-
-}  // namespace
-
-SocketListener::ProcessInfo::ProcessInfo(Process p) : process(std::move(p)) {}
-
-void SocketListener::ProcessInfo::Connected(
-    ProcessMatcher* process_matcher,
-    BookkeepingThread* bookkeeping_thread) {
-  if (!connected) {
-    matcher_handle = process_matcher->ProcessConnected(process);
-    bookkeeping_handle =
-        bookkeeping_thread->NotifyProcessConnected(process.pid);
-  }
-  connected = true;
-}
-
-void SocketListener::OnDisconnect(base::UnixSocket* self) {
-  pid_t peer_pid = self->peer_pid();
-  Disconnect(peer_pid);
-}
-
-void SocketListener::Disconnect(pid_t pid) {
-  process_info_.erase(pid);
-}
-
-void SocketListener::Match(
-    const Process& process,
-    const std::vector<const ProcessSetSpec*>& process_sets) {
-  pid_t pid = process.pid;
-  auto process_info_it = process_info_.find(pid);
-  if (process_info_it == process_info_.end()) {
-    PERFETTO_DFATAL("This should not happen.");
-    return;
-  }
-
-  ProcessInfo& process_info = process_info_it->second;
-  if (process_info.set_up) {
-    // TODO(fmayer): Allow to change sampling rate.
-    return;
-  }
-
-  ClientConfiguration cfg = MergeProcessSetSpecs(process_sets);
-  for (auto& raw_sock_and_sockinfo : process_info.sockets) {
-    SocketInfo& sock_info = raw_sock_and_sockinfo.second;
-    // TODO(fmayer): Send on one and poll(2) on the other end.
-    sock_info.sock->Send(&cfg, sizeof(cfg), -1,
-                         base::UnixSocket::BlockingMode::kBlocking);
-  }
-  process_info.client_config = std::move(cfg);
-  process_info.set_up = true;
-}
-
-// Implementation warning: if adding a use for the first argument, consider that
-// HeapprofdProducer can supply a pre-connected socket (for which there is no
-// "listening" socket to give a pointer to).
-void SocketListener::OnNewIncomingConnection(
-    base::UnixSocket*,
-    std::unique_ptr<base::UnixSocket> new_connection) {
-  Process peer_process;
-  peer_process.pid = new_connection->peer_pid();
-  if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
-    PERFETTO_ELOG("Failed to get cmdline for %d", peer_process.pid);
-
-  HandleClientConnection(std::move(new_connection), peer_process);
-}
-
-void SocketListener::HandleClientConnection(
-    std::unique_ptr<base::UnixSocket> new_connection,
-    Process peer_process) {
-  PERFETTO_DCHECK(peer_process.pid == new_connection->peer_pid());
-
-  base::UnixSocket* new_connection_raw = new_connection.get();
-
-  decltype(process_info_)::iterator it;
-  std::tie(it, std::ignore) =
-      process_info_.emplace(peer_process.pid, peer_process);
-  ProcessInfo& process_info = it->second;
-  process_info.Connected(&process_matcher_, bookkeeping_thread_);
-  process_info.sockets.emplace(new_connection_raw, std::move(new_connection));
-  if (process_info.set_up) {
-    new_connection_raw->Send(&process_info.client_config,
-                             sizeof(process_info.client_config), -1,
-                             base::UnixSocket::BlockingMode::kBlocking);
-  }
-}
-
-void SocketListener::OnDataAvailable(base::UnixSocket* self) {
-  pid_t peer_pid = self->peer_pid();
-
-  auto process_info_it = process_info_.find(peer_pid);
-  if (process_info_it == process_info_.end()) {
-    PERFETTO_DFATAL("This should not happen.");
-    return;
-  }
-  ProcessInfo& process_info = process_info_it->second;
-
-  auto socket_it = process_info.sockets.find(self);
-  if (socket_it == process_info.sockets.end()) {
-    PERFETTO_DFATAL("Unexpected data received.");
-    return;
-  }
-  SocketInfo& socket_info = socket_it->second;
-
-  RecordReader::ReceiveBuffer buf = socket_info.record_reader.BeginReceive();
-
-  size_t rd;
-  if (PERFETTO_LIKELY(process_info.unwinding_metadata)) {
-    rd = self->Receive(buf.data, buf.size);
-  } else {
-    base::ScopedFile fds[2];
-    rd = self->Receive(buf.data, buf.size, fds, base::ArraySize(fds));
-    if (fds[0] && fds[1]) {
-      PERFETTO_DLOG("%d: Received FDs.", peer_pid);
-      process_info.unwinding_metadata = std::make_shared<UnwindingMetadata>(
-          peer_pid, std::move(fds[0]), std::move(fds[1]));
-    } else if (fds[0] || fds[1]) {
-      PERFETTO_DLOG("%d: Received partial FDs.", peer_pid);
-    } else {
-      PERFETTO_DLOG("%d: Received no FDs.", peer_pid);
-    }
-  }
-
-  RecordReader::Record record;
-  auto status = socket_info.record_reader.EndReceive(rd, &record);
-  switch (status) {
-    case (RecordReader::Result::Noop):
-      break;
-    case (RecordReader::Result::RecordReceived):
-      RecordReceived(self, static_cast<size_t>(record.size),
-                     std::move(record.data));
-      break;
-    case (RecordReader::Result::KillConnection):
-      self->Shutdown(true);
-      break;
-  }
-}
-
-void SocketListener::RecordReceived(base::UnixSocket* self,
-                                    size_t size,
-                                    std::unique_ptr<uint8_t[]> buf) {
-  pid_t peer_pid = self->peer_pid();
-
-  if (size == 0) {
-    PERFETTO_DLOG("Dropping empty record.");
-    return;
-  }
-
-  auto it = process_info_.find(peer_pid);
-  if (it == process_info_.end()) {
-    return;
-  }
-  ProcessInfo& process_info = it->second;
-
-  // This needs to be a weak_ptr for two reasons:
-  // 1) most importantly, the weak_ptr in unwinding_metadata_ should expire as
-  // soon as the last socket for a process goes away. Otherwise, a recycled
-  // PID might reuse incorrect metadata.
-  // 2) it is a waste to unwind for a process that had already gone away.
-  std::weak_ptr<UnwindingMetadata> weak_metadata(
-      process_info.unwinding_metadata);
-  callback_function_(
-      {peer_pid, size, std::move(buf), std::move(weak_metadata)});
-}
-
-}  // namespace profiling
-}  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
deleted file mode 100644
index 37484e0..0000000
--- a/src/profiling/memory/socket_listener.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
-#define SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
-
-#include "perfetto/base/unix_socket.h"
-
-#include "src/profiling/memory/bookkeeping.h"
-#include "src/profiling/memory/process_matcher.h"
-#include "src/profiling/memory/queue_messages.h"
-#include "src/profiling/memory/record_reader.h"
-#include "src/profiling/memory/unwinding.h"
-#include "src/profiling/memory/wire_protocol.h"
-
-#include <map>
-#include <memory>
-
-namespace perfetto {
-namespace profiling {
-
-class SocketListener : public base::UnixSocket::EventListener,
-                       public ProcessMatcher::Delegate {
- public:
-  SocketListener(std::function<void(UnwindingRecord)> fn,
-                 BookkeepingThread* bookkeeping_thread)
-      : callback_function_(std::move(fn)),
-        bookkeeping_thread_(bookkeeping_thread),
-        process_matcher_(this) {}
-  void OnDisconnect(base::UnixSocket* self) override;
-  void OnNewIncomingConnection(
-      base::UnixSocket* self,
-      std::unique_ptr<base::UnixSocket> new_connection) override;
-  void OnDataAvailable(base::UnixSocket* self) override;
-
-  void Match(const Process& process,
-             const std::vector<const ProcessSetSpec*>& process_sets) override;
-  void Disconnect(pid_t pid) override;
-
-  // Delegate for OnNewIncomingConnection.
-  void HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,
-                              Process peer_process);
-
-  ProcessMatcher& process_matcher() { return process_matcher_; }
-
- private:
-  struct SocketInfo {
-    SocketInfo(std::unique_ptr<base::UnixSocket> s) : sock(std::move(s)) {}
-
-    const std::unique_ptr<base::UnixSocket> sock;
-    RecordReader record_reader;
-  };
-
-  struct ProcessInfo {
-    ProcessInfo(Process p);
-
-    void Connected(ProcessMatcher* process_matcher,
-                   BookkeepingThread* bookkeeping_thread);
-
-    Process process;
-    ProcessMatcher::ProcessHandle matcher_handle;
-    BookkeepingThread::ProcessHandle bookkeeping_handle;
-    bool connected = false;
-    bool set_up = false;
-
-    ClientConfiguration client_config{};
-    std::map<base::UnixSocket*, SocketInfo> sockets;
-    std::shared_ptr<UnwindingMetadata> unwinding_metadata;
-  };
-
-  void RecordReceived(base::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
-
-  std::map<pid_t, ProcessInfo> process_info_;
-  std::function<void(UnwindingRecord)> callback_function_;
-  BookkeepingThread* const bookkeeping_thread_;
-  ProcessMatcher process_matcher_;
-};
-
-}  // namespace profiling
-}  // namespace perfetto
-
-#endif  // SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 5b77eb3..809deb2 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -48,6 +48,7 @@
 #include "perfetto/base/logging.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/string_utils.h"
+#include "perfetto/base/task_runner.h"
 #include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
@@ -231,49 +232,109 @@
   return true;
 }
 
-bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out) {
-  WireMessage msg;
-  if (!ReceiveWireMessage(reinterpret_cast<char*>(rec->data.get()), rec->size,
-                          &msg))
-    return false;
-  if (msg.record_type == RecordType::Malloc) {
-    std::shared_ptr<UnwindingMetadata> metadata = rec->metadata.lock();
-    if (!metadata) {
-      // Process has already gone away.
-      return false;
-    }
+void UnwindingWorker::OnDisconnect(base::UnixSocket* self) {
+  // TODO(fmayer): Maybe try to drain shmem one last time.
+  auto it = client_data_.find(self->peer_pid());
+  if (it == client_data_.end()) {
+    PERFETTO_DFATAL("Disconnected unexpecter socket.");
+    return;
+  }
+  ClientData& socket_data = it->second;
+  DataSourceInstanceID ds_id = socket_data.data_source_instance_id;
+  client_data_.erase(it);
+  delegate_->PostSocketDisconnected(ds_id, self->peer_pid());
+}
 
-    out->alloc_record.alloc_metadata = *msg.alloc_header;
-    out->pid = rec->pid;
-    out->client_generation = msg.alloc_header->client_generation;
-    out->record_type = BookkeepingRecord::Type::Malloc;
-    DoUnwind(&msg, metadata.get(), &out->alloc_record);
-    return true;
-  } else if (msg.record_type == RecordType::Free) {
-    out->record_type = BookkeepingRecord::Type::Free;
-    out->pid = rec->pid;
-    out->client_generation = msg.free_header->client_generation;
-    // We need to keep this alive, because msg.free_header is a pointer into
-    // this.
-    out->free_record.free_data = std::move(rec->data);
-    out->free_record.metadata = msg.free_header;
-    return true;
-  } else {
-    PERFETTO_DFATAL("Invalid record type.");
-    return false;
+void UnwindingWorker::OnDataAvailable(base::UnixSocket* self) {
+  auto it = client_data_.find(self->peer_pid());
+  if (it == client_data_.end()) {
+    PERFETTO_DFATAL("Unexpected data.");
+    return;
+  }
+
+  ClientData& socket_data = it->second;
+  SharedRingBuffer& shmem = socket_data.shmem;
+  SharedRingBuffer::Buffer buf;
+
+  for (;;) {
+    // TODO(fmayer): Allow spinlock acquisition to fail and repost Task if it
+    // did.
+    buf = shmem.BeginRead();
+    if (!buf)
+      break;
+    HandleBuffer(&buf, &socket_data);
+    shmem.EndRead(std::move(buf));
   }
 }
 
-void UnwindingMainLoop(BoundedQueue<UnwindingRecord>* input_queue,
-                       BoundedQueue<BookkeepingRecord>* output_queue) {
-  for (;;) {
-    UnwindingRecord rec;
-    if (!input_queue->Get(&rec))
-      return;
-    BookkeepingRecord out;
-    if (HandleUnwindingRecord(&rec, &out))
-      output_queue->Add(std::move(out));
+void UnwindingWorker::HandleBuffer(SharedRingBuffer::Buffer* buf,
+                                   ClientData* socket_data) {
+  WireMessage msg;
+  // TODO(fmayer): standardise on char* or uint8_t*.
+  // char* has stronger guarantees regarding aliasing.
+  // see https://timsong-cpp.github.io/cppwp/n3337/basic.lval#10.8
+  if (!ReceiveWireMessage(reinterpret_cast<char*>(buf->data), buf->size,
+                          &msg)) {
+    PERFETTO_DFATAL("Failed to receive wire message.");
+    return;
   }
+
+  if (msg.record_type == RecordType::Malloc) {
+    AllocRecord rec;
+    rec.alloc_metadata = *msg.alloc_header;
+    rec.pid = socket_data->sock->peer_pid();
+    rec.data_source_instance_id = socket_data->data_source_instance_id;
+    DoUnwind(&msg, &socket_data->metadata, &rec);
+    delegate_->PostAllocRecord(std::move(rec));
+  } else if (msg.record_type == RecordType::Free) {
+    FreeRecord rec;
+    rec.pid = socket_data->sock->peer_pid();
+    rec.data_source_instance_id = socket_data->data_source_instance_id;
+    // We need to copy this, so we can return the memory to the shmem buffer.
+    memcpy(&rec.metadata, msg.free_header, sizeof(*msg.free_header));
+    delegate_->PostFreeRecord(std::move(rec));
+  } else {
+    PERFETTO_DFATAL("Invalid record type.");
+  }
+}
+
+void UnwindingWorker::PostHandoffSocket(HandoffData handoff_data) {
+  // Even with C++14, this cannot be moved, as std::function has to be
+  // copyable, which HandoffData is not.
+  HandoffData* raw_data = new HandoffData(std::move(handoff_data));
+  // We do not need to use a WeakPtr here because the TaskRunner gets Quit-ed
+  // before this object get destructed.
+  task_runner_->PostTask([this, raw_data] {
+    HandoffData data = std::move(*raw_data);
+    delete raw_data;
+    HandleHandoffSocket(std::move(data));
+  });
+}
+
+void UnwindingWorker::HandleHandoffSocket(HandoffData handoff_data) {
+  auto sock = base::UnixSocket::AdoptConnected(handoff_data.sock.ReleaseFd(),
+                                               this, this->task_runner_,
+                                               base::SockType::kStream);
+  pid_t peer_pid = sock->peer_pid();
+
+  UnwindingMetadata metadata(peer_pid,
+                             std::move(handoff_data.fds[kHandshakeMaps]),
+                             std::move(handoff_data.fds[kHandshakeMem]));
+  ClientData client_data{
+      handoff_data.data_source_instance_id, std::move(sock),
+      std::move(metadata), std::move(handoff_data.shmem),
+  };
+  client_data_.emplace(peer_pid, std::move(client_data));
+}
+
+void UnwindingWorker::PostDisconnectSocket(pid_t pid) {
+  // We do not need to use a WeakPtr here because the TaskRunner gets Quit-ed
+  // before this object get destructed.
+  task_runner_->PostTask([this, pid] { HandleDisconnectSocket(pid); });
+}
+
+void UnwindingWorker::HandleDisconnectSocket(pid_t pid) {
+  client_data_.erase(pid);
 }
 
 }  // namespace profiling
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index d327441..b508311 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -29,7 +29,6 @@
 
 #include "perfetto/base/scoped_file.h"
 #include "src/profiling/memory/bookkeeping.h"
-#include "src/profiling/memory/bounded_queue.h"
 #include "src/profiling/memory/queue_messages.h"
 #include "src/profiling/memory/wire_protocol.h"
 
@@ -41,6 +40,23 @@
 class FileDescriptorMaps : public unwindstack::Maps {
  public:
   FileDescriptorMaps(base::ScopedFile fd);
+
+  FileDescriptorMaps(const FileDescriptorMaps&) = delete;
+  FileDescriptorMaps& operator=(const FileDescriptorMaps&) = delete;
+
+  FileDescriptorMaps(FileDescriptorMaps&& m) : Maps(std::move(m)) {
+    fd_ = std::move(m.fd_);
+  }
+
+  FileDescriptorMaps& operator=(FileDescriptorMaps&& m) {
+    if (&m != this)
+      fd_ = std::move(m.fd_);
+    Maps::operator=(std::move(m));
+    return *this;
+  }
+
+  virtual ~FileDescriptorMaps() override = default;
+
   bool Parse() override;
   void Reset();
 
@@ -112,10 +128,55 @@
 
 bool DoUnwind(WireMessage*, UnwindingMetadata* metadata, AllocRecord* out);
 
-bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out);
+class UnwindingWorker : public base::UnixSocket::EventListener {
+ public:
+  class Delegate {
+   public:
+    virtual void PostAllocRecord(AllocRecord);
+    virtual void PostFreeRecord(FreeRecord);
+    virtual void PostSocketDisconnected(DataSourceInstanceID, pid_t pid);
+    virtual ~Delegate() = default;
+  };
 
-void UnwindingMainLoop(BoundedQueue<UnwindingRecord>* input_queue,
-                       BoundedQueue<BookkeepingRecord>* output_queue);
+  struct HandoffData {
+    DataSourceInstanceID data_source_instance_id;
+    base::UnixSocketRaw sock;
+    base::ScopedFile fds[kHandshakeSize];
+    SharedRingBuffer shmem;
+  };
+
+  UnwindingWorker(Delegate* delegate, base::TaskRunner* task_runner)
+      : delegate_(delegate), task_runner_(task_runner) {}
+
+  // Public API safe to call from other threads.
+  void PostDisconnectSocket(pid_t pid);
+  void PostHandoffSocket(HandoffData);
+
+  // Implementation of UnixSocket::EventListener.
+  // Do not call explicitly.
+  void OnDisconnect(base::UnixSocket* self) override;
+  void OnNewIncomingConnection(base::UnixSocket*,
+                               std::unique_ptr<base::UnixSocket>) override {
+    PERFETTO_DFATAL("This should not happen.");
+  }
+  void OnDataAvailable(base::UnixSocket* self) override;
+
+ private:
+  struct ClientData {
+    DataSourceInstanceID data_source_instance_id;
+    std::unique_ptr<base::UnixSocket> sock;
+    UnwindingMetadata metadata;
+    SharedRingBuffer shmem;
+  };
+
+  void HandleBuffer(SharedRingBuffer::Buffer* buf, ClientData* socket_data);
+  void HandleHandoffSocket(HandoffData data);
+  void HandleDisconnectSocket(pid_t pid);
+
+  std::map<pid_t, ClientData> client_data_;
+  Delegate* delegate_;
+  base::TaskRunner* task_runner_;
+};
 
 }  // namespace profiling
 }  // namespace perfetto
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
index 43ae7de..22065fc 100644
--- a/src/profiling/memory/wire_protocol.cc
+++ b/src/profiling/memory/wire_protocol.cc
@@ -35,60 +35,92 @@
   *ptr += sizeof(T);
   return true;
 }
+
+// We need this to prevent crashes due to FORTIFY_SOURCE.
+void UnsafeMemcpy(char* dest, const char* src, size_t n)
+    __attribute__((no_sanitize("address"))) {
+  for (size_t i = 0; i < n; ++i) {
+    dest[i] = src[i];
+  }
+}
 }  // namespace
 
-bool SendWireMessage(base::UnixSocketRaw* sock, const WireMessage& msg) {
+bool SendWireMessage(SharedRingBuffer* shmem, const WireMessage& msg) {
   uint64_t total_size;
-  struct iovec iovecs[4] = {};
+  struct iovec iovecs[3] = {};
   // TODO(fmayer): Maye pack these two.
-  iovecs[0].iov_base = &total_size;
-  iovecs[0].iov_len = sizeof(total_size);
-  iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
-  iovecs[1].iov_len = sizeof(msg.record_type);
+  iovecs[0].iov_base = const_cast<RecordType*>(&msg.record_type);
+  iovecs[0].iov_len = sizeof(msg.record_type);
   if (msg.alloc_header) {
     PERFETTO_DCHECK(msg.record_type == RecordType::Malloc);
-    iovecs[2].iov_base = msg.alloc_header;
-    iovecs[2].iov_len = sizeof(*msg.alloc_header);
+    iovecs[1].iov_base = msg.alloc_header;
+    iovecs[1].iov_len = sizeof(*msg.alloc_header);
   } else if (msg.free_header) {
     PERFETTO_DCHECK(msg.record_type == RecordType::Free);
-    iovecs[2].iov_base = msg.free_header;
-    iovecs[2].iov_len = sizeof(*msg.free_header);
+    iovecs[1].iov_base = msg.free_header;
+    iovecs[1].iov_len = sizeof(*msg.free_header);
   } else {
     PERFETTO_DFATAL("Neither alloc_header nor free_header set.");
     return false;
   }
 
-  iovecs[3].iov_base = msg.payload;
-  iovecs[3].iov_len = msg.payload_size;
+  iovecs[2].iov_base = msg.payload;
+  iovecs[2].iov_len = msg.payload_size;
 
   struct msghdr hdr = {};
   hdr.msg_iov = iovecs;
   if (msg.payload) {
     hdr.msg_iovlen = base::ArraySize(iovecs);
-    total_size = iovecs[1].iov_len + iovecs[2].iov_len + iovecs[3].iov_len;
+    total_size = iovecs[0].iov_len + iovecs[1].iov_len + iovecs[2].iov_len;
   } else {
     // If we are not sending payload, just ignore that iovec.
     hdr.msg_iovlen = base::ArraySize(iovecs) - 1;
-    total_size = iovecs[1].iov_len + iovecs[2].iov_len;
+    total_size = iovecs[0].iov_len + iovecs[1].iov_len;
   }
 
-  ssize_t sent = sock->SendMsgAll(&hdr);
-  return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
+  SharedRingBuffer::Buffer buf;
+  {
+    ScopedSpinlock lock = shmem->AcquireLock(ScopedSpinlock::Mode::Try);
+    if (!lock.locked()) {
+      PERFETTO_DLOG("Failed to acquire spinlock.");
+      return false;
+    }
+    buf = shmem->BeginWrite(lock, total_size);
+  }
+  if (!buf) {
+    PERFETTO_DFATAL("Buffer overflow.");
+    shmem->EndWrite(std::move(buf));
+    return false;
+  }
+
+  size_t offset = 0;
+  for (size_t i = 0; i < hdr.msg_iovlen; ++i) {
+    UnsafeMemcpy(reinterpret_cast<char*>(buf.data + offset),
+                 reinterpret_cast<const char*>(hdr.msg_iov[i].iov_base),
+                 hdr.msg_iov[i].iov_len);
+    offset += hdr.msg_iov[i].iov_len;
+  }
+  shmem->EndWrite(std::move(buf));
+  return true;
 }
 
 bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out) {
   RecordType* record_type;
   char* end = buf + size;
-  if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
+  if (!ViewAndAdvance<RecordType>(&buf, &record_type, end)) {
+    PERFETTO_DFATAL("Cannot read record type.");
     return false;
+  }
 
   out->payload = nullptr;
   out->payload_size = 0;
   out->record_type = *record_type;
 
   if (*record_type == RecordType::Malloc) {
-    if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+    if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end)) {
+      PERFETTO_DFATAL("Cannot read alloc header.");
       return false;
+    }
     out->payload = buf;
     if (buf > end) {
       PERFETTO_DFATAL("Buffer overflowed");
@@ -96,8 +128,10 @@
     }
     out->payload_size = static_cast<size_t>(end - buf);
   } else if (*record_type == RecordType::Free) {
-    if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+    if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end)) {
+      PERFETTO_DFATAL("Cannot read free header.");
       return false;
+    }
   } else {
     PERFETTO_DFATAL("Invalid record type.");
     return false;
diff --git a/src/profiling/memory/wire_protocol.h b/src/profiling/memory/wire_protocol.h
index a6fde3e..422a9d4 100644
--- a/src/profiling/memory/wire_protocol.h
+++ b/src/profiling/memory/wire_protocol.h
@@ -29,6 +29,8 @@
 #include <unwindstack/UserX86.h>
 #include <unwindstack/UserX86_64.h>
 
+#include "src/profiling/memory/shared_ring_buffer.h"
+
 namespace perfetto {
 
 namespace base {
@@ -76,7 +78,6 @@
 };
 
 struct AllocMetadata {
-  uint64_t client_generation;
   uint64_t sequence_number;
   // Size of the allocation that was made.
   uint64_t alloc_size;
@@ -107,12 +108,16 @@
 };
 
 struct FreeMetadata {
-  uint64_t client_generation;
-
   uint64_t num_entries;
   FreePageEntry entries[kFreePageSize];
 };
 
+enum HandshakeFDs : size_t {
+  kHandshakeMaps = 0,
+  kHandshakeMem = 1,
+  kHandshakeSize = 2,
+};
+
 struct WireMessage {
   RecordType record_type;
 
@@ -123,7 +128,7 @@
   size_t payload_size;
 };
 
-bool SendWireMessage(base::UnixSocketRaw*, const WireMessage& msg);
+bool SendWireMessage(SharedRingBuffer* buf, const WireMessage& msg);
 
 // Parse message received over the wire.
 // |buf| has to outlive |out|.
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
index 4182fa0..91de1af 100644
--- a/src/profiling/memory/wire_protocol_unittest.cc
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -54,29 +54,22 @@
 
 namespace {
 
-RecordReader::Record ReceiveAll(base::UnixSocketRaw* sock) {
-  RecordReader record_reader;
-  RecordReader::Record record;
-  bool received = false;
-  while (!received) {
-    RecordReader::ReceiveBuffer buf = record_reader.BeginReceive();
-    ssize_t rd = sock->Receive(buf.data, buf.size);
-    PERFETTO_CHECK(rd > 0);
-    auto status = record_reader.EndReceive(static_cast<size_t>(rd), &record);
-    switch (status) {
-      case (RecordReader::Result::Noop):
-        break;
-      case (RecordReader::Result::RecordReceived):
-        received = true;
-        break;
-      case (RecordReader::Result::KillConnection):
-        PERFETTO_FATAL("Unexpected KillConnection");
-        break;
-    }
-  }
-  return record;
+base::ScopedFile CopyFD(int fd) {
+  int sv[2];
+  PERFETTO_CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
+  base::UnixSocketRaw send_sock(base::ScopedFile(sv[0]),
+                                base::SockType::kStream);
+  base::UnixSocketRaw recv_sock(base::ScopedFile(sv[1]),
+                                base::SockType::kStream);
+  char msg[] = "a";
+  PERFETTO_CHECK(send_sock.Send(msg, sizeof(msg), &fd, 1));
+  base::ScopedFile res;
+  recv_sock.Receive(msg, sizeof(msg), &res, 1);
+  return res;
 }
 
+constexpr auto kShmemSize = 1048576;
+
 TEST(WireProtocolTest, AllocMessage) {
   char payload[] = {0x77, 0x77, 0x77, 0x00};
   WireMessage msg = {};
@@ -94,19 +87,20 @@
   msg.payload = payload;
   msg.payload_size = sizeof(payload);
 
-  base::UnixSocketRaw send_sock;
-  base::UnixSocketRaw recv_sock;
-  std::tie(send_sock, recv_sock) =
-      base::UnixSocketRaw::CreatePair(base::SockType::kStream);
-  ASSERT_TRUE(send_sock);
-  ASSERT_TRUE(recv_sock);
-  ASSERT_TRUE(SendWireMessage(&send_sock, msg));
+  auto shmem_client = SharedRingBuffer::Create(kShmemSize);
+  ASSERT_TRUE(shmem_client);
+  ASSERT_TRUE(shmem_client->is_valid());
+  auto shmem_server = SharedRingBuffer::Attach(CopyFD(shmem_client->fd()));
 
-  RecordReader::Record record = ReceiveAll(&recv_sock);
+  ASSERT_TRUE(SendWireMessage(&shmem_client.value(), msg));
 
+  auto buf = shmem_server->BeginRead();
+  ASSERT_TRUE(buf);
   WireMessage recv_msg;
-  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
-                                 record.size, &recv_msg));
+  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(buf.data), buf.size,
+                                 &recv_msg));
+  shmem_server->EndRead(std::move(buf));
+
   ASSERT_EQ(recv_msg.record_type, msg.record_type);
   ASSERT_EQ(*recv_msg.alloc_header, *msg.alloc_header);
   ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
@@ -124,19 +118,20 @@
   }
   msg.free_header = &metadata;
 
-  int sv[2];
-  ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
-  base::UnixSocketRaw send_sock(base::ScopedFile(sv[0]),
-                                base::SockType::kStream);
-  base::UnixSocketRaw recv_sock(base::ScopedFile(sv[1]),
-                                base::SockType::kStream);
-  ASSERT_TRUE(SendWireMessage(&send_sock, msg));
+  auto shmem_client = SharedRingBuffer::Create(kShmemSize);
+  ASSERT_TRUE(shmem_client);
+  ASSERT_TRUE(shmem_client->is_valid());
+  auto shmem_server = SharedRingBuffer::Attach(CopyFD(shmem_client->fd()));
 
-  RecordReader::Record record = ReceiveAll(&recv_sock);
+  ASSERT_TRUE(SendWireMessage(&shmem_client.value(), msg));
 
+  auto buf = shmem_server->BeginRead();
+  ASSERT_TRUE(buf);
   WireMessage recv_msg;
-  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
-                                 record.size, &recv_msg));
+  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(buf.data), buf.size,
+                                 &recv_msg));
+  shmem_server->EndRead(std::move(buf));
+
   ASSERT_EQ(recv_msg.record_type, msg.record_type);
   ASSERT_EQ(*recv_msg.free_header, *msg.free_header);
   ASSERT_EQ(recv_msg.payload_size, msg.payload_size);