profiling: Add heapprofd producer.

Test: m
Test: flashed sailfish
Test: profile adbd

Change-Id: I2780570469cfa7dd2533dffa8cc810cb8914d118
diff --git a/Android.bp b/Android.bp
index 51dfcc0..811700e 100644
--- a/Android.bp
+++ b/Android.bp
@@ -52,6 +52,7 @@
     "src/ipc/service_proxy.cc",
     "src/ipc/virtual_destructors.cc",
     "src/profiling/memory/bookkeeping.cc",
+    "src/profiling/memory/heapprofd_producer.cc",
     "src/profiling/memory/main.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/socket_listener.cc",
@@ -4393,6 +4394,7 @@
     "src/profiling/memory/client.cc",
     "src/profiling/memory/client_unittest.cc",
     "src/profiling/memory/heapprofd_integrationtest.cc",
+    "src/profiling/memory/heapprofd_producer.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/record_reader_unittest.cc",
     "src/profiling/memory/sampler.cc",
diff --git a/heapprofd.rc b/heapprofd.rc
index 4791103..10bc845 100644
--- a/heapprofd.rc
+++ b/heapprofd.rc
@@ -18,3 +18,4 @@
     user nobody
     group nobody
     writepid /dev/cpuset/system-background/tasks
+    capabilities KILL
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 44fbf2f..8fc7552 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -45,10 +45,14 @@
     "../../../gn:default_deps",
     "../../base",
     "../../base:unix_socket",
+    "../../tracing",
+    "../../tracing:ipc",
   ]
   sources = [
     "bookkeeping.cc",
     "bookkeeping.h",
+    "heapprofd_producer.cc",
+    "heapprofd_producer.h",
     "queue_messages.h",
     "record_reader.cc",
     "record_reader.h",
diff --git a/src/profiling/memory/bookkeeping.cc b/src/profiling/memory/bookkeeping.cc
index 5b8c280..51e0d30 100644
--- a/src/profiling/memory/bookkeeping.cc
+++ b/src/profiling/memory/bookkeeping.cc
@@ -173,9 +173,16 @@
   }
 
   if (rec->record_type == BookkeepingRecord::Type::Dump) {
+    DumpRecord& dump_rec = rec->dump_record;
+    std::shared_ptr<TraceWriter> trace_writer = dump_rec.trace_writer.lock();
+    if (!trace_writer)
+      return;
     PERFETTO_LOG("Dumping heaps");
-    auto it = bookkeeping_data_.begin();
-    while (it != bookkeeping_data_.end()) {
+    for (const pid_t pid : dump_rec.pids) {
+      auto it = bookkeeping_data_.find(pid);
+      if (it == bookkeeping_data_.end())
+        continue;
+
       std::string dump_file_name = file_name_ + "." + std::to_string(it->first);
       PERFETTO_LOG("Dumping %d to %s", it->first, dump_file_name.c_str());
       base::ScopedFile fd =
@@ -188,10 +195,9 @@
       if (it->second.ref_count == 0) {
         std::lock_guard<std::mutex> l(bookkeeping_mutex_);
         it = bookkeeping_data_.erase(it);
-      } else {
-        ++it;
       }
     }
+    dump_rec.callback();
   } else if (rec->record_type == BookkeepingRecord::Type::Free) {
     FreeRecord& free_rec = rec->free_record;
     FreePageEntry* entries = free_rec.metadata->entries;
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
index 5850d1f..44e4292 100644
--- a/src/profiling/memory/heapprofd_integrationtest.cc
+++ b/src/profiling/memory/heapprofd_integrationtest.cc
@@ -59,7 +59,6 @@
   auto done = task_runner.CreateCheckpoint("done");
   constexpr uint64_t kSamplingInterval = 123;
   SocketListener listener(
-      {kSamplingInterval},
       [&done, &bookkeeping_thread](UnwindingRecord r) {
         // TODO(fmayer): Test symbolization and result of unwinding.
         // This check will only work on in-tree builds as out-of-tree
@@ -71,6 +70,7 @@
       },
       &bookkeeping_thread);
 
+  listener.ExpectPID(getpid(), {kSamplingInterval});
   auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
   if (!sock->is_listening()) {
     PERFETTO_ELOG("Socket not listening.");
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
new file mode 100644
index 0000000..912c8c1
--- /dev/null
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -0,0 +1,415 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/heapprofd_producer.h"
+
+#include <inttypes.h>
+#include <signal.h>
+#include <sys/types.h>
+
+#include "perfetto/tracing/core/data_source_config.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "perfetto/tracing/ipc/producer_ipc_client.h"
+
+namespace perfetto {
+namespace profiling {
+namespace {
+constexpr char kHeapprofdDataSource[] = "android.heapprofd";
+constexpr size_t kUnwinderQueueSize = 1000;
+constexpr size_t kBookkeepingQueueSize = 1000;
+constexpr size_t kUnwinderThreads = 5;
+constexpr const char* kDumpOutput = "/data/misc/perfetto-traces/heap_dump";
+constexpr int kHeapprofdSignal = 36;
+
+constexpr uint32_t kInitialConnectionBackoffMs = 100;
+constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
+
+ClientConfiguration MakeClientConfiguration(const DataSourceConfig& cfg) {
+  ClientConfiguration client_config;
+  client_config.interval = cfg.heapprofd_config().sampling_interval_bytes();
+  return client_config;
+}
+
+void FindPidsForBinaries(std::vector<std::string> binaries,
+                         std::vector<pid_t>* pids) {
+  base::ScopedDir proc_dir(opendir("/proc"));
+  if (!proc_dir) {
+    PERFETTO_DFATAL("Failed to open /proc");
+    return;
+  }
+  struct dirent* entry;
+  while ((entry = readdir(*proc_dir))) {
+    char* end;
+    long int pid = strtol(entry->d_name, &end, 10);
+    if (*end != '\0') {
+      continue;
+    }
+
+    char link_buf[128];
+    char binary_buf[128];
+
+    if (snprintf(link_buf, sizeof(link_buf), "/proc/%lu/exe", pid) < 0) {
+      PERFETTO_DFATAL("Failed to create exe filename for %lu", pid);
+      continue;
+    }
+    ssize_t link_size = readlink(link_buf, binary_buf, sizeof(binary_buf));
+    if (link_size < 0) {
+      continue;
+    }
+    if (link_size == sizeof(binary_buf)) {
+      PERFETTO_DFATAL("Potential overflow in binary name.");
+      continue;
+    }
+    binary_buf[link_size] = '\0';
+    for (const std::string& binary : binaries) {
+      if (binary == binary_buf)
+        pids->emplace_back(static_cast<pid_t>(pid));
+    }
+  }
+}
+
+}  // namespace
+
+// We create kUnwinderThreads unwinding threads and one bookeeping thread.
+// The bookkeeping thread is singleton in order to avoid expensive and
+// complicated synchronisation in the bookkeeping.
+//
+// We wire up the system by creating BoundedQueues between the threads. The main
+// thread runs the TaskRunner driving the SocketListener. The unwinding thread
+// takes the data received by the SocketListener and if it is a malloc does
+// stack unwinding, and if it is a free just forwards the content of the record
+// to the bookkeeping thread.
+//
+//             +--------------+
+//             |SocketListener|
+//             +------+-------+
+//                    |
+//          +--UnwindingRecord -+
+//          |                   |
+// +--------v-------+   +-------v--------+
+// |Unwinding Thread|   |Unwinding Thread|
+// +--------+-------+   +-------+--------+
+//          |                   |
+//          +-BookkeepingRecord +
+//                    |
+//           +--------v---------+
+//           |Bookkeeping Thread|
+//           +------------------+
+
+HeapprofdProducer::HeapprofdProducer(base::TaskRunner* task_runner)
+    : task_runner_(task_runner),
+      bookkeeping_queue_(kBookkeepingQueueSize),
+      bookkeeping_thread_(kDumpOutput),
+      bookkeeping_th_([this] { bookkeeping_thread_.Run(&bookkeeping_queue_); }),
+      unwinder_queues_(MakeUnwinderQueues(kUnwinderThreads)),
+      unwinding_threads_(MakeUnwindingThreads(kUnwinderThreads)),
+      socket_listener_(MakeSocketListenerCallback(), &bookkeeping_thread_),
+      socket_(MakeSocket()),
+      weak_factory_(this) {}
+
+HeapprofdProducer::~HeapprofdProducer() {
+  bookkeeping_queue_.Shutdown();
+  for (auto& queue : unwinder_queues_) {
+    queue.Shutdown();
+  }
+  bookkeeping_th_.join();
+  for (std::thread& th : unwinding_threads_) {
+    th.join();
+  }
+}
+
+void HeapprofdProducer::OnConnect() {
+  // TODO(fmayer): Delete once we have generic reconnect logic.
+  PERFETTO_DCHECK(state_ == kConnecting);
+  state_ = kConnected;
+  ResetConnectionBackoff();
+  PERFETTO_LOG("Connected to the service");
+
+  DataSourceDescriptor desc;
+  desc.set_name(kHeapprofdDataSource);
+  endpoint_->RegisterDataSource(desc);
+}
+
+// TODO(fmayer): Delete once we have generic reconnect logic.
+void HeapprofdProducer::OnDisconnect() {
+  PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
+  PERFETTO_LOG("Disconnected from tracing service");
+  if (state_ == kConnected)
+    return task_runner_->PostTask([this] { this->Restart(); });
+
+  state_ = kNotConnected;
+  IncreaseConnectionBackoff();
+  task_runner_->PostDelayedTask([this] { this->Connect(); },
+                                connection_backoff_ms_);
+}
+
+void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
+                                        const DataSourceConfig& cfg) {
+  PERFETTO_DLOG("Setting up data source.");
+  if (cfg.name() != kHeapprofdDataSource) {
+    PERFETTO_DLOG("Invalid data source name.");
+    return;
+  }
+
+  auto it = data_sources_.find(id);
+  if (it != data_sources_.end()) {
+    PERFETTO_DFATAL("Received duplicated data source instance id: %" PRIu64,
+                    id);
+    return;
+  }
+
+  DataSource data_source;
+
+  ClientConfiguration client_config = MakeClientConfiguration(cfg);
+
+  for (uint64_t pid : cfg.heapprofd_config().pid())
+    data_source.pids.emplace_back(static_cast<pid_t>(pid));
+
+  FindPidsForBinaries(cfg.heapprofd_config().native_binary_name(),
+                      &data_source.pids);
+
+  auto pid_it = data_source.pids.begin();
+  while (pid_it != data_source.pids.end()) {
+    auto profiling_session = socket_listener_.ExpectPID(*pid_it, client_config);
+    if (!profiling_session) {
+      PERFETTO_DFATAL("No enabling duplicate profiling session for %d",
+                      *pid_it);
+      pid_it = data_source.pids.erase(pid_it);
+      continue;
+    }
+    data_source.sessions.emplace_back(std::move(profiling_session));
+    ++pid_it;
+  }
+
+  if (data_source.pids.empty()) {
+    // TODO(fmayer): Whole system profiling.
+    PERFETTO_DLOG("No valid pids given. Not setting up data source.");
+    return;
+  }
+
+  auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
+  data_source.trace_writer = endpoint_->CreateTraceWriter(buffer_id);
+
+  data_sources_.emplace(id, std::move(data_source));
+  PERFETTO_DLOG("Set up data source.");
+}
+
+void HeapprofdProducer::DoContinuousDump(DataSourceInstanceID id,
+                                         uint32_t dump_interval) {
+  if (!Dump(id, 0 /* flush_id */, false /* is_flush */))
+    return;
+  auto weak_producer = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_producer, id, dump_interval] {
+        if (!weak_producer)
+          return;
+        weak_producer->DoContinuousDump(id, dump_interval);
+      },
+      dump_interval);
+}
+
+void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
+                                        const DataSourceConfig& cfg) {
+  PERFETTO_DLOG("Start DataSource");
+  auto it = data_sources_.find(id);
+  if (it == data_sources_.end()) {
+    PERFETTO_DFATAL("Received invalid data source instance to start: %" PRIu64,
+                    id);
+    return;
+  }
+
+  const DataSource& data_source = it->second;
+
+  for (pid_t pid : data_source.pids) {
+    PERFETTO_DLOG("Sending %d to %d", kHeapprofdSignal, pid);
+    if (kill(pid, kHeapprofdSignal) != 0) {
+      PERFETTO_DPLOG("kill");
+    }
+  }
+
+  const auto continuous_dump_config =
+      cfg.heapprofd_config().continuous_dump_config();
+  uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
+  if (dump_interval) {
+    auto weak_producer = weak_factory_.GetWeakPtr();
+    task_runner_->PostDelayedTask(
+        [weak_producer, id, dump_interval] {
+          if (!weak_producer)
+            return;
+          weak_producer->DoContinuousDump(id, dump_interval);
+        },
+        continuous_dump_config.dump_phase_ms());
+  }
+  PERFETTO_DLOG("Started DataSource");
+}
+
+void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
+  // DataSource holds ProfilingSession handles which on being destructed tear
+  // down the profiling on the client.
+  if (data_sources_.erase(id) != 1)
+    PERFETTO_DFATAL("Trying to stop non existing data source: %" PRIu64, id);
+}
+
+void HeapprofdProducer::OnTracingSetup() {}
+
+bool HeapprofdProducer::Dump(DataSourceInstanceID id,
+                             FlushRequestID flush_id,
+                             bool has_flush_id) {
+  PERFETTO_DLOG("Dumping %" PRIu64 ", flush: %d", id, has_flush_id);
+  auto it = data_sources_.find(id);
+  if (it == data_sources_.end()) {
+    return false;
+  }
+
+  const DataSource& data_source = it->second;
+  BookkeepingRecord record{};
+  record.record_type = BookkeepingRecord::Type::Dump;
+  DumpRecord& dump_record = record.dump_record;
+  dump_record.pids = data_source.pids;
+  dump_record.trace_writer = data_source.trace_writer;
+
+  auto weak_producer = weak_factory_.GetWeakPtr();
+  base::TaskRunner* task_runner = task_runner_;
+  if (has_flush_id) {
+    dump_record.callback = [task_runner, weak_producer, flush_id] {
+      task_runner->PostTask([weak_producer, flush_id] {
+        if (!weak_producer)
+          return weak_producer->FinishDataSourceFlush(flush_id);
+      });
+    };
+  } else {
+    dump_record.callback = [] {};
+  }
+
+  bookkeeping_queue_.Add(std::move(record));
+  return true;
+}
+
+void HeapprofdProducer::Flush(FlushRequestID flush_id,
+                              const DataSourceInstanceID* ids,
+                              size_t num_ids) {
+  if (num_ids == 0)
+    return;
+
+  size_t& flush_in_progress = flushes_in_progress_[flush_id];
+  PERFETTO_DCHECK(flush_in_progress == 0);
+  flush_in_progress = num_ids;
+  for (size_t i = 0; i < num_ids; ++i)
+    Dump(ids[i], flush_id, true);
+}
+
+void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
+  auto it = flushes_in_progress_.find(flush_id);
+  if (it == flushes_in_progress_.end()) {
+    PERFETTO_DFATAL("FinishDataSourceFlush id invalid: %" PRIu64, flush_id);
+    return;
+  }
+  size_t& flush_in_progress = it->second;
+  if (--flush_in_progress == 0) {
+    endpoint_->NotifyFlushComplete(flush_id);
+    flushes_in_progress_.erase(flush_id);
+  }
+}
+
+std::function<void(UnwindingRecord)>
+HeapprofdProducer::MakeSocketListenerCallback() {
+  return [this](UnwindingRecord record) {
+    unwinder_queues_[static_cast<size_t>(record.pid) % kUnwinderThreads].Add(
+        std::move(record));
+  };
+}
+
+std::vector<BoundedQueue<UnwindingRecord>>
+HeapprofdProducer::MakeUnwinderQueues(size_t n) {
+  std::vector<BoundedQueue<UnwindingRecord>> ret(n);
+  for (size_t i = 0; i < n; ++i)
+    ret[i].SetCapacity(kUnwinderQueueSize);
+  return ret;
+}
+
+std::vector<std::thread> HeapprofdProducer::MakeUnwindingThreads(size_t n) {
+  std::vector<std::thread> ret;
+  for (size_t i = 0; i < n; ++i) {
+    ret.emplace_back([this, i] {
+      UnwindingMainLoop(&unwinder_queues_[i], &bookkeeping_queue_);
+    });
+  }
+  return ret;
+}
+
+std::unique_ptr<base::UnixSocket> HeapprofdProducer::MakeSocket() {
+  const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
+  if (sock_fd == nullptr) {
+    unlink(kHeapprofdSocketFile);
+    return base::UnixSocket::Listen(kHeapprofdSocketFile, &socket_listener_,
+                                    task_runner_);
+  }
+  char* end;
+  int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
+  if (*end != '\0')
+    PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
+                   kHeapprofdSocketEnvVar);
+  return base::UnixSocket::Listen(base::ScopedFile(raw_fd), &socket_listener_,
+                                  task_runner_);
+}
+
+// TODO(fmayer): Delete these and used ReconnectingProducer once submitted
+void HeapprofdProducer::Restart() {
+  // We lost the connection with the tracing service. At this point we need
+  // to reset all the data sources. Trying to handle that manually is going to
+  // be error prone. What we do here is simply desroying the instance and
+  // recreating it again.
+  // TODO(hjd): Add e2e test for this.
+
+  base::TaskRunner* task_runner = task_runner_;
+  const char* socket_name = socket_name_;
+
+  // Invoke destructor and then the constructor again.
+  this->~HeapprofdProducer();
+  new (this) HeapprofdProducer(task_runner);
+
+  ConnectWithRetries(socket_name);
+}
+
+void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
+  PERFETTO_DCHECK(state_ == kNotStarted);
+  state_ = kNotConnected;
+
+  ResetConnectionBackoff();
+  socket_name_ = socket_name;
+  Connect();
+}
+
+void HeapprofdProducer::Connect() {
+  PERFETTO_DCHECK(state_ == kNotConnected);
+  state_ = kConnecting;
+  endpoint_ = ProducerIPCClient::Connect(socket_name_, this,
+                                         "android.heapprofd", task_runner_);
+}
+
+void HeapprofdProducer::IncreaseConnectionBackoff() {
+  connection_backoff_ms_ *= 2;
+  if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
+    connection_backoff_ms_ = kMaxConnectionBackoffMs;
+}
+
+void HeapprofdProducer::ResetConnectionBackoff() {
+  connection_backoff_ms_ = kInitialConnectionBackoffMs;
+}
+
+}  // namespace profiling
+}  // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_producer.h b/src/profiling/memory/heapprofd_producer.h
new file mode 100644
index 0000000..f1dd639
--- /dev/null
+++ b/src/profiling/memory/heapprofd_producer.h
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
+#define SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
+
+#include <map>
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/producer.h"
+#include "perfetto/tracing/core/tracing_service.h"
+#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/socket_listener.h"
+
+namespace perfetto {
+namespace profiling {
+
+class HeapprofdProducer : public Producer {
+ public:
+  HeapprofdProducer(base::TaskRunner* task_runner);
+  ~HeapprofdProducer() override;
+
+  // Producer Impl:
+  void OnConnect() override;
+  void OnDisconnect() override;
+  void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
+  void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
+  void StopDataSource(DataSourceInstanceID) override;
+  void OnTracingSetup() override;
+  void Flush(FlushRequestID,
+             const DataSourceInstanceID* data_source_ids,
+             size_t num_data_sources) override;
+
+  // TODO(fmayer): Delete once we have generic reconnect logic.
+  void ConnectWithRetries(const char* socket_name);
+
+ private:
+  // TODO(fmayer): Delete once we have generic reconnect logic.
+  enum State {
+    kNotStarted = 0,
+    kNotConnected,
+    kConnecting,
+    kConnected,
+  };
+  void Connect();
+  void Restart();
+  void ResetConnectionBackoff();
+  void IncreaseConnectionBackoff();
+
+  // TODO(fmayer): Delete once we have generic reconnect logic.
+  State state_ = kNotStarted;
+  uint32_t connection_backoff_ms_ = 0;
+  const char* socket_name_ = nullptr;
+
+  std::function<void(UnwindingRecord)> MakeSocketListenerCallback();
+  std::vector<BoundedQueue<UnwindingRecord>> MakeUnwinderQueues(size_t n);
+  std::vector<std::thread> MakeUnwindingThreads(size_t n);
+  std::unique_ptr<base::UnixSocket> MakeSocket();
+
+  void FinishDataSourceFlush(FlushRequestID flush_id);
+  bool Dump(DataSourceInstanceID id,
+            FlushRequestID flush_id,
+            bool has_flush_id);
+  void DoContinuousDump(DataSourceInstanceID id, uint32_t dump_interval);
+
+  struct DataSource {
+    std::vector<pid_t> pids;
+    // This is a shared ptr so we can lend a weak_ptr to the bookkeeping
+    // thread for unwinding.
+    std::shared_ptr<TraceWriter> trace_writer;
+    // These are opaque handles that shut down the sockets in SocketListener
+    // once they go away.
+    std::vector<SocketListener::ProfilingSession> sessions;
+  };
+
+  std::map<DataSourceInstanceID, DataSource> data_sources_;
+  std::map<FlushRequestID, size_t> flushes_in_progress_;
+
+  // These two are borrowed from the caller.
+  base::TaskRunner* const task_runner_;
+  std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
+
+  BoundedQueue<BookkeepingRecord> bookkeeping_queue_;
+  BookkeepingThread bookkeeping_thread_;
+  std::thread bookkeeping_th_;
+  std::vector<BoundedQueue<UnwindingRecord>> unwinder_queues_;
+  std::vector<std::thread> unwinding_threads_;
+  SocketListener socket_listener_;
+  std::unique_ptr<base::UnixSocket> socket_;
+
+  base::WeakPtrFactory<HeapprofdProducer> weak_factory_;
+};
+
+}  // namespace profiling
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index 2f5c487..5648095 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -25,8 +25,10 @@
 #include "perfetto/base/event.h"
 #include "perfetto/base/unix_socket.h"
 #include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/heapprofd_producer.h"
 #include "src/profiling/memory/socket_listener.h"
 #include "src/profiling/memory/wire_protocol.h"
+#include "src/tracing/ipc/default_socket.h"
 
 #include "perfetto/base/unix_task_runner.h"
 
@@ -34,139 +36,10 @@
 namespace profiling {
 namespace {
 
-constexpr size_t kUnwinderQueueSize = 1000;
-constexpr size_t kBookkeepingQueueSize = 1000;
-constexpr size_t kUnwinderThreads = 5;
-constexpr uint64_t kDefaultSamplingInterval = 1;
-
-base::Event* g_dump_evt = nullptr;
-
-void DumpSignalHandler(int) {
-  g_dump_evt->Notify();
-}
-
-// We create kUnwinderThreads unwinding threads and one bookeeping thread.
-// The bookkeeping thread is singleton in order to avoid expensive and
-// complicated synchronisation in the bookkeeping.
-//
-// We wire up the system by creating BoundedQueues between the threads. The main
-// thread runs the TaskRunner driving the SocketListener. The unwinding thread
-// takes the data received by the SocketListener and if it is a malloc does
-// stack unwinding, and if it is a free just forwards the content of the record
-// to the bookkeeping thread.
-//
-//             +--------------+
-//             |SocketListener|
-//             +------+-------+
-//                    |
-//          +--UnwindingRecord -+
-//          |                   |
-// +--------v-------+   +-------v--------+
-// |Unwinding Thread|   |Unwinding Thread|
-// +--------+-------+   +-------+--------+
-//          |                   |
-//          +-BookkeepingRecord +
-//                    |
-//           +--------v---------+
-//           |Bookkeeping Thread|
-//           +------------------+
-int HeapprofdMain(int argc, char** argv) {
-  // TODO(fmayer): This is temporary until heapprofd is integrated with Perfetto
-  // and receives its configuration via that.
-  uint64_t sampling_interval = kDefaultSamplingInterval;
-  bool standalone = false;
-  int opt;
-  while ((opt = getopt(argc, argv, "i:s")) != -1) {
-    switch (opt) {
-      case 'i': {
-        char* end;
-        long long sampling_interval_arg = strtoll(optarg, &end, 10);
-        if (*end != '\0' || *optarg == '\0')
-          PERFETTO_FATAL("Invalid sampling interval: %s", optarg);
-        PERFETTO_CHECK(sampling_interval > 0);
-        sampling_interval = static_cast<uint64_t>(sampling_interval_arg);
-        break;
-      }
-      case 's':
-        standalone = true;
-        break;
-      default:
-        PERFETTO_FATAL("%s [-i interval] [-s]", argv[0]);
-    }
-  }
-
+int HeapprofdMain(int, char**) {
   base::UnixTaskRunner task_runner;
-  BoundedQueue<BookkeepingRecord> bookkeeping_queue(kBookkeepingQueueSize);
-  // We set this up before launching any threads, so we do not have to use a
-  // std::atomic for g_dump_evt.
-  g_dump_evt = new base::Event();
-
-  struct sigaction action = {};
-  action.sa_handler = DumpSignalHandler;
-  PERFETTO_CHECK(sigaction(SIGUSR1, &action, nullptr) == 0);
-  task_runner.AddFileDescriptorWatch(g_dump_evt->fd(), [&bookkeeping_queue] {
-    g_dump_evt->Clear();
-
-    BookkeepingRecord rec = {};
-    rec.record_type = BookkeepingRecord::Type::Dump;
-    bookkeeping_queue.Add(std::move(rec));
-  });
-
-  std::unique_ptr<base::UnixSocket> sock;
-
-  BookkeepingThread bookkeeping_thread("/data/local/tmp/heap_dump");
-  std::thread bookkeeping_th([&bookkeeping_thread, &bookkeeping_queue] {
-    bookkeeping_thread.Run(&bookkeeping_queue);
-  });
-
-  std::array<BoundedQueue<UnwindingRecord>, kUnwinderThreads> unwinder_queues;
-  for (size_t i = 0; i < kUnwinderThreads; ++i)
-    unwinder_queues[i].SetCapacity(kUnwinderQueueSize);
-  std::vector<std::thread> unwinding_threads;
-  unwinding_threads.reserve(kUnwinderThreads);
-  for (size_t i = 0; i < kUnwinderThreads; ++i) {
-    unwinding_threads.emplace_back([&unwinder_queues, &bookkeeping_queue, i] {
-      UnwindingMainLoop(&unwinder_queues[i], &bookkeeping_queue);
-    });
-  }
-
-  auto on_record_received = [&unwinder_queues](UnwindingRecord r) {
-    unwinder_queues[static_cast<size_t>(r.pid) % kUnwinderThreads].Add(
-        std::move(r));
-  };
-  SocketListener listener({sampling_interval}, std::move(on_record_received),
-                          &bookkeeping_thread);
-
-  if (optind != argc)
-    PERFETTO_FATAL("%s [-i interval] [-s]", argv[0]);
-
-  if (standalone) {
-    // Allow to be able to manually specify the socket to listen on
-    // for testing and sideloading purposes.
-    unlink(kHeapprofdSocketFile);
-    sock =
-        base::UnixSocket::Listen(kHeapprofdSocketFile, &listener, &task_runner);
-  } else {
-    // When running as a service launched by init on Android, the socket
-    // is created by init and passed to the application using an environment
-    // variable.
-    const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
-    if (sock_fd == nullptr)
-      PERFETTO_FATAL("No argument given and environment variable %s is unset.",
-                     kHeapprofdSocketEnvVar);
-    char* end;
-    int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
-    if (*end != '\0')
-      PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
-                     kHeapprofdSocketEnvVar);
-    sock = base::UnixSocket::Listen(base::ScopedFile(raw_fd), &listener,
-                                    &task_runner);
-  }
-
-  if (sock->last_error() != 0)
-    PERFETTO_FATAL("Failed to initialize socket: %s",
-                   strerror(sock->last_error()));
-
+  HeapprofdProducer producer(&task_runner);
+  producer.ConnectWithRetries(GetProducerSocket());
   task_runner.Run();
   return 0;
 }
diff --git a/src/profiling/memory/queue_messages.h b/src/profiling/memory/queue_messages.h
index b37c227..b09fbdc 100644
--- a/src/profiling/memory/queue_messages.h
+++ b/src/profiling/memory/queue_messages.h
@@ -21,18 +21,20 @@
 
 #include <unwindstack/Maps.h>
 #include <unwindstack/Unwinder.h>
+
+#include "perfetto/tracing/core/trace_writer.h"
 #include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 namespace profiling {
 
-struct ProcessMetadata;
+struct UnwindingMetadata;
 
 struct UnwindingRecord {
   pid_t pid;
   size_t size;
   std::unique_ptr<uint8_t[]> data;
-  std::weak_ptr<ProcessMetadata> metadata;
+  std::weak_ptr<UnwindingMetadata> metadata;
 };
 
 struct FreeRecord {
@@ -46,6 +48,12 @@
   std::vector<unwindstack::FrameData> frames;
 };
 
+struct DumpRecord {
+  std::vector<pid_t> pids;
+  std::weak_ptr<TraceWriter> trace_writer;
+  std::function<void()> callback;
+};
+
 struct BookkeepingRecord {
   enum class Type {
     Dump = 0,
@@ -57,6 +65,7 @@
   Type record_type;
   AllocRecord alloc_record;
   FreeRecord free_record;
+  DumpRecord dump_record;
 };
 
 }  // namespace profiling
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
index 9009bb0..733c6ea 100644
--- a/src/profiling/memory/socket_listener.cc
+++ b/src/profiling/memory/socket_listener.cc
@@ -22,6 +22,13 @@
 
 void SocketListener::OnDisconnect(base::UnixSocket* self) {
   bookkeeping_thread_->NotifyClientDisconnected(self->peer_pid());
+  auto it = process_info_.find(self->peer_pid());
+  if (it != process_info_.end()) {
+    ProcessInfo& process_info = it->second;
+    process_info.sockets.erase(self);
+  } else {
+    PERFETTO_DFATAL("Disconnect from socket without ProcessInfo.");
+  }
   sockets_.erase(self);
 }
 
@@ -29,8 +36,20 @@
     base::UnixSocket*,
     std::unique_ptr<base::UnixSocket> new_connection) {
   base::UnixSocket* new_connection_raw = new_connection.get();
+  pid_t pid = new_connection_raw->peer_pid();
+
+  auto it = process_info_.find(pid);
+  if (it == process_info_.end()) {
+    PERFETTO_DFATAL("Unexpected connection.");
+    return;
+  }
+  ProcessInfo& process_info = it->second;
+
   sockets_.emplace(new_connection_raw, std::move(new_connection));
-  bookkeeping_thread_->NotifyClientConnected(new_connection_raw->peer_pid());
+  process_info.sockets.emplace(new_connection_raw);
+  // TODO(fmayer): Move destruction of bookkeeping data to
+  // HeapprofdProducer.
+  bookkeeping_thread_->NotifyClientConnected(pid);
 }
 
 void SocketListener::OnDataAvailable(base::UnixSocket* self) {
@@ -42,32 +61,42 @@
 
   Entry& entry = socket_it->second;
   RecordReader::ReceiveBuffer buf = entry.record_reader.BeginReceive();
+
+  auto process_info_it = process_info_.find(peer_pid);
+  if (process_info_it == process_info_.end()) {
+    PERFETTO_DFATAL("This should not happen.");
+    return;
+  }
+  ProcessInfo& process_info = process_info_it->second;
+
   size_t rd;
   if (PERFETTO_LIKELY(entry.recv_fds)) {
     rd = self->Receive(buf.data, buf.size);
   } else {
-    auto it = process_metadata_.find(peer_pid);
-    if (it != process_metadata_.end() && !it->second.expired()) {
+    auto it = unwinding_metadata_.find(peer_pid);
+    if (it != unwinding_metadata_.end() && !it->second.expired()) {
       entry.recv_fds = true;
       // If the process already has metadata, this is an additional socket for
       // an existing process. Reuse existing metadata and close the received
       // file descriptors.
-      entry.process_metadata = std::shared_ptr<ProcessMetadata>(it->second);
+      entry.unwinding_metadata = std::shared_ptr<UnwindingMetadata>(it->second);
       rd = self->Receive(buf.data, buf.size);
     } else {
       base::ScopedFile fds[2];
       rd = self->Receive(buf.data, buf.size, fds, base::ArraySize(fds));
       if (fds[0] && fds[1]) {
+        PERFETTO_DLOG("%d: Received FDs.", peer_pid);
         entry.recv_fds = true;
-        entry.process_metadata = std::make_shared<ProcessMetadata>(
+        entry.unwinding_metadata = std::make_shared<UnwindingMetadata>(
             peer_pid, std::move(fds[0]), std::move(fds[1]));
-        process_metadata_[peer_pid] = entry.process_metadata;
-        self->Send(&client_config_, sizeof(client_config_), -1,
+        unwinding_metadata_[peer_pid] = entry.unwinding_metadata;
+        self->Send(&process_info.client_config,
+                   sizeof(process_info.client_config), -1,
                    base::UnixSocket::BlockingMode::kBlocking);
       } else if (fds[0] || fds[1]) {
-        PERFETTO_DLOG("Received partial FDs.");
+        PERFETTO_DLOG("%d: Received partial FDs.", peer_pid);
       } else {
-        PERFETTO_DLOG("Received no FDs.");
+        PERFETTO_DLOG("%d: Received no FDs.", peer_pid);
       }
     }
   }
@@ -86,6 +115,30 @@
   }
 }
 
+SocketListener::ProfilingSession SocketListener::ExpectPID(
+    pid_t pid,
+    ClientConfiguration cfg) {
+  PERFETTO_DLOG("Expecting connection from %d", pid);
+  bool inserted;
+  std::tie(std::ignore, inserted) = process_info_.emplace(pid, std::move(cfg));
+  if (!inserted)
+    return ProfilingSession(0, nullptr);
+  return ProfilingSession(pid, this);
+}
+
+void SocketListener::ShutdownPID(pid_t pid) {
+  PERFETTO_DLOG("Shutting down connecting from %d", pid);
+  auto it = process_info_.find(pid);
+  if (it == process_info_.end()) {
+    PERFETTO_DFATAL("Shutting down nonexistant pid.");
+    return;
+  }
+  ProcessInfo& process_info = it->second;
+  // Disconnect all sockets for process.
+  for (base::UnixSocket* socket : process_info.sockets)
+    socket->Shutdown(true);
+}
+
 void SocketListener::RecordReceived(base::UnixSocket* self,
                                     size_t size,
                                     std::unique_ptr<uint8_t[]> buf) {
@@ -97,7 +150,7 @@
     return;
   }
   Entry& entry = it->second;
-  if (!entry.process_metadata) {
+  if (!entry.unwinding_metadata) {
     PERFETTO_DLOG("Received record without process metadata.");
     return;
   }
@@ -107,12 +160,12 @@
     return;
   }
   // This needs to be a weak_ptr for two reasons:
-  // 1) most importantly, the weak_ptr in process_metadata_ should expire as
+  // 1) most importantly, the weak_ptr in unwinding_metadata_ should expire as
   // soon as the last socket for a process goes away. Otherwise, a recycled
   // PID might reuse incorrect metadata.
   // 2) it is a waste to unwind for a process that had already gone away.
-  std::weak_ptr<ProcessMetadata> weak_metadata(entry.process_metadata);
-  callback_function_({entry.process_metadata->pid, size, std::move(buf),
+  std::weak_ptr<UnwindingMetadata> weak_metadata(entry.unwinding_metadata);
+  callback_function_({entry.unwinding_metadata->pid, size, std::move(buf),
                       std::move(weak_metadata)});
 }
 
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
index 69738b0..8c2c3e8 100644
--- a/src/profiling/memory/socket_listener.h
+++ b/src/profiling/memory/socket_listener.h
@@ -32,11 +32,43 @@
 
 class SocketListener : public base::UnixSocket::EventListener {
  public:
-  SocketListener(ClientConfiguration client_config,
-                 std::function<void(UnwindingRecord)> fn,
+  friend class ProfilingSession;
+  class ProfilingSession {
+   public:
+    friend class SocketListener;
+
+    ProfilingSession(ProfilingSession&& other)
+        : pid_(other.pid_), listener_(other.listener_) {
+      other.listener_ = nullptr;
+    }
+
+    ~ProfilingSession() {
+      if (listener_)
+        listener_->ShutdownPID(pid_);
+    }
+    ProfilingSession& operator=(ProfilingSession&& other) {
+      pid_ = other.pid_;
+      listener_ = other.listener_;
+      other.listener_ = nullptr;
+      return *this;
+    }
+
+    operator bool() const { return listener_ != nullptr; }
+
+    ProfilingSession(const ProfilingSession&) = delete;
+    ProfilingSession& operator=(const ProfilingSession&) = delete;
+
+   private:
+    ProfilingSession(pid_t pid, SocketListener* listener)
+        : pid_(pid), listener_(listener) {}
+
+    pid_t pid_;
+    SocketListener* listener_ = nullptr;
+  };
+
+  SocketListener(std::function<void(UnwindingRecord)> fn,
                  BookkeepingThread* bookkeeping_thread)
-      : client_config_(client_config),
-        callback_function_(std::move(fn)),
+      : callback_function_(std::move(fn)),
         bookkeeping_thread_(bookkeeping_thread) {}
   void OnDisconnect(base::UnixSocket* self) override;
   void OnNewIncomingConnection(
@@ -44,7 +76,15 @@
       std::unique_ptr<base::UnixSocket> new_connection) override;
   void OnDataAvailable(base::UnixSocket* self) override;
 
+  ProfilingSession ExpectPID(pid_t pid, ClientConfiguration cfg);
+
  private:
+  struct ProcessInfo {
+    ProcessInfo(ClientConfiguration cfg) : client_config(std::move(cfg)) {}
+    ClientConfiguration client_config;
+    std::set<base::UnixSocket*> sockets;
+  };
+
   struct Entry {
     Entry(std::unique_ptr<base::UnixSocket> s) : sock(std::move(s)) {}
     // Only here for ownership of the object.
@@ -58,14 +98,15 @@
     //
     // This does not get initialized in the ctor because the file descriptors
     // only get received after the first Receive call of the socket.
-    std::shared_ptr<ProcessMetadata> process_metadata;
+    std::shared_ptr<UnwindingMetadata> unwinding_metadata;
   };
 
   void RecordReceived(base::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
+  void ShutdownPID(pid_t pid);
 
-  ClientConfiguration client_config_;
   std::map<base::UnixSocket*, Entry> sockets_;
-  std::map<pid_t, std::weak_ptr<ProcessMetadata>> process_metadata_;
+  std::map<pid_t, std::weak_ptr<UnwindingMetadata>> unwinding_metadata_;
+  std::map<pid_t, ProcessInfo> process_info_;
   std::function<void(UnwindingRecord)> callback_function_;
   BookkeepingThread* const bookkeeping_thread_;
 };
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index 225572e..21d9caf 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -55,8 +55,8 @@
   };
 
   BookkeepingThread actor("");
-  SocketListener listener({},  // We do not care about the sampling rate.
-                          std::move(callback_fn), &actor);
+  SocketListener listener(std::move(callback_fn), &actor);
+  auto handle = listener.ExpectPID(getpid(), {});
   MockEventListener client_listener;
   EXPECT_CALL(client_listener, OnConnect(_, _))
       .WillOnce(InvokeWithoutArgs(connected));
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 4f085fd..4f2bbae 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -135,7 +135,7 @@
   maps_.clear();
 }
 
-bool DoUnwind(WireMessage* msg, ProcessMetadata* metadata, AllocRecord* out) {
+bool DoUnwind(WireMessage* msg, UnwindingMetadata* metadata, AllocRecord* out) {
   AllocMetadata* alloc_metadata = msg->alloc_header;
   std::unique_ptr<unwindstack::Regs> regs(
       CreateFromRawData(alloc_metadata->arch, alloc_metadata->register_data));
@@ -175,7 +175,7 @@
                           &msg))
     return false;
   if (msg.record_type == RecordType::Malloc) {
-    std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+    std::shared_ptr<UnwindingMetadata> metadata = rec->metadata.lock();
     if (!metadata) {
       // Process has already gone away.
       return false;
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index aef2918..f9d0b33 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -40,8 +40,8 @@
   base::ScopedFile fd_;
 };
 
-struct ProcessMetadata {
-  ProcessMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
+struct UnwindingMetadata {
+  UnwindingMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
       : pid(p), maps(std::move(maps_fd)), mem_fd(std::move(mem)) {
     PERFETTO_CHECK(maps.Parse());
   }
@@ -65,7 +65,7 @@
   uint8_t* stack_;
 };
 
-bool DoUnwind(WireMessage*, ProcessMetadata* metadata, AllocRecord* out);
+bool DoUnwind(WireMessage*, UnwindingMetadata* metadata, AllocRecord* out);
 
 bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out);
 
diff --git a/src/profiling/memory/unwinding_fuzzer.cc b/src/profiling/memory/unwinding_fuzzer.cc
index e1fe6ee..a52d673 100644
--- a/src/profiling/memory/unwinding_fuzzer.cc
+++ b/src/profiling/memory/unwinding_fuzzer.cc
@@ -27,7 +27,7 @@
 
 int FuzzUnwinding(const uint8_t* data, size_t size) {
   UnwindingRecord record;
-  auto process_metadata = std::make_shared<ProcessMetadata>(
+  auto unwinding_metadata = std::make_shared<UnwindingMetadata>(
       getpid(), base::OpenFile("/proc/self/maps", O_RDONLY),
       base::OpenFile("/proc/self/mem", O_RDONLY));
 
@@ -35,7 +35,7 @@
   record.size = size;
   record.data.reset(new uint8_t[size]);
   memcpy(record.data.get(), data, size);
-  record.metadata = process_metadata;
+  record.metadata = unwinding_metadata;
 
   BookkeepingRecord out;
   HandleUnwindingRecord(&record, &out);
diff --git a/src/profiling/memory/unwinding_unittest.cc b/src/profiling/memory/unwinding_unittest.cc
index 25c6f48..c38ba00 100644
--- a/src/profiling/memory/unwinding_unittest.cc
+++ b/src/profiling/memory/unwinding_unittest.cc
@@ -123,7 +123,8 @@
   base::ScopedFile proc_maps(base::OpenFile("/proc/self/maps", O_RDONLY));
   base::ScopedFile proc_mem(base::OpenFile("/proc/self/mem", O_RDONLY));
   GlobalCallstackTrie callsites;
-  ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem));
+  UnwindingMetadata metadata(getpid(), std::move(proc_maps),
+                             std::move(proc_mem));
   WireMessage msg;
   auto record = GetRecord(&msg);
   AllocRecord out;