traced_perf: rework reading granularity, add unwind queue

The implementation is kept single-threaded, but the organization roughly
follows the intended unwinder-on-a-dedicated-thread approach.

There's a mix of smaller changes, summarizing in no particular order:
* Reading now consumes individual records instead of chunks of the ring buffer.
  (The records will be either a sample, or a PERF_RECORD_LOST if the kernel has
  lost samples due to ring buffer capacity). I didn't want to bubble up the
  record type decision all the way to the PerfProducer, proposing the
  callback+return format of ReadUntilSample to abstract most of it away (though
  noting that it ends up hiding a nested loop).
* Reading parses the event directly out of the ring buffer (copying the
  populated stack bytes onto the heap, etc).
* The wrapped ringbuffer case is handled by reconstructing (with a pair of
  memcpy's) the event in a dedicated buffer, and returning a pointer to that.
* Added boot clock timestamps to samples.
* Added proc-fd interface, and direct/signal-based implementations.
* Added per-datasource unwinder queues (for now simply std::deque while we're
  single-threaded, imagine a ring buffer in the future).

On unwinding queues specifically - the slots are filled in order (from the
perspective of a given per-cpu buffer reader). The parsing however can be out
of order, as samples are kept in the queue until their proc-fds are ready (or
assumed to not be obtainable for the remainder of the data source's lifetime).
The "unwind" tick keep walking the queue in order, only releasing the completed
entries once they reach the "oldest" end of the queue.

Bug: 144281346
Change-Id: Ic59c153c1c80e04b5e5bfb25656c10bc5e80dd11
diff --git a/Android.bp b/Android.bp
index 0b1651b..dc0bc6c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -5696,6 +5696,14 @@
   ],
 }
 
+// GN: //src/profiling/perf:proc_descriptors
+filegroup {
+  name: "perfetto_src_profiling_perf_proc_descriptors",
+  srcs: [
+    "src/profiling/perf/proc_descriptors.cc",
+  ],
+}
+
 // GN: //src/profiling/perf:producer
 filegroup {
   name: "perfetto_src_profiling_perf_producer",
@@ -5713,6 +5721,14 @@
   ],
 }
 
+// GN: //src/profiling/perf:regs_parsing
+filegroup {
+  name: "perfetto_src_profiling_perf_regs_parsing",
+  srcs: [
+    "src/profiling/perf/regs_parsing.cc",
+  ],
+}
+
 // GN: //src/profiling/perf:traced_perf_main
 filegroup {
   name: "perfetto_src_profiling_perf_traced_perf_main",
@@ -5721,14 +5737,6 @@
   ],
 }
 
-// GN: //src/profiling/perf:unwind_support
-filegroup {
-  name: "perfetto_src_profiling_perf_unwind_support",
-  srcs: [
-    "src/profiling/perf/unwind_support.cc",
-  ],
-}
-
 // GN: //src/profiling/symbolizer:symbolize_database
 filegroup {
   name: "perfetto_src_profiling_symbolizer_symbolize_database",
@@ -6962,9 +6970,10 @@
     ":perfetto_src_profiling_memory_scoped_spinlock",
     ":perfetto_src_profiling_memory_unittests",
     ":perfetto_src_profiling_memory_wire_protocol",
+    ":perfetto_src_profiling_perf_proc_descriptors",
     ":perfetto_src_profiling_perf_producer",
     ":perfetto_src_profiling_perf_producer_unittests",
-    ":perfetto_src_profiling_perf_unwind_support",
+    ":perfetto_src_profiling_perf_regs_parsing",
     ":perfetto_src_profiling_unittests",
     ":perfetto_src_protozero_protozero",
     ":perfetto_src_protozero_testing_messages_cpp_gen",
@@ -7447,9 +7456,10 @@
     ":perfetto_src_base_unix_socket",
     ":perfetto_src_ipc_client",
     ":perfetto_src_ipc_common",
+    ":perfetto_src_profiling_perf_proc_descriptors",
     ":perfetto_src_profiling_perf_producer",
+    ":perfetto_src_profiling_perf_regs_parsing",
     ":perfetto_src_profiling_perf_traced_perf_main",
-    ":perfetto_src_profiling_perf_unwind_support",
     ":perfetto_src_protozero_protozero",
     ":perfetto_src_tracing_common",
     ":perfetto_src_tracing_core_core",
diff --git a/src/profiling/perf/BUILD.gn b/src/profiling/perf/BUILD.gn
index 80b0cb8..0c209ed 100644
--- a/src/profiling/perf/BUILD.gn
+++ b/src/profiling/perf/BUILD.gn
@@ -28,6 +28,7 @@
 
 source_set("traced_perf_main") {
   deps = [
+    ":proc_descriptors",
     ":producer",
     "../../../gn:default_deps",
     "../../../src/base",
@@ -41,10 +42,11 @@
 
 source_set("producer") {
   public_deps = [
-    ":unwind_support",
+    ":regs_parsing",
     "../../../include/perfetto/tracing/core",
   ]
   deps = [
+    ":proc_descriptors",
     "../../../gn:default_deps",
     "../../../protos/perfetto/config:cpp",
     "../../../protos/perfetto/config/profiling:zero",
@@ -62,7 +64,7 @@
   ]
 }
 
-source_set("unwind_support") {
+source_set("regs_parsing") {
   public_deps = [ "../../../gn:libunwindstack" ]
   deps = [
     "../../../gn:bionic_kernel_uapi_headers",
@@ -70,10 +72,22 @@
     "../../../src/base",
   ]
   sources = [
-    "unwind_support.cc",
-    "unwind_support.h",
+    "regs_parsing.cc",
+    "regs_parsing.h",
   ]
 }
+
+source_set("proc_descriptors") {
+  deps = [
+    "../../../gn:default_deps",
+    "../../../src/base",
+  ]
+  sources = [
+    "proc_descriptors.cc",
+    "proc_descriptors.h",
+  ]
+}
+
 source_set("producer_unittests") {
   testonly = true
   deps = [
diff --git a/src/profiling/perf/event_config.h b/src/profiling/perf/event_config.h
index cc750ff..86123b7 100644
--- a/src/profiling/perf/event_config.h
+++ b/src/profiling/perf/event_config.h
@@ -20,10 +20,13 @@
 #include <linux/perf_event.h>
 #include <stdint.h>
 #include <sys/types.h>
+#include <time.h>
+
+#include <unwindstack/Regs.h>
 
 #include "perfetto/ext/base/optional.h"
 #include "perfetto/tracing/core/data_source_config.h"
-#include "src/profiling/perf/unwind_support.h"
+#include "src/profiling/perf/regs_parsing.h"
 
 #include "protos/perfetto/config/profiling/perf_event_config.pbzero.h"
 
@@ -33,10 +36,10 @@
 // Describes a single profiling configuration. Bridges the gap between the data
 // source config proto, and the raw "perf_event_attr" structs to pass to the
 // perf_event_open syscall.
-// TODO(rsavitski): make sampling conditional? Or should we always go through
-// the sampling interface for simplicity? Reads can be done on-demand even if
-// sampling is on. So the question becomes whether we need *only* on-demand
-// reads.
+// TODO(rsavitski): instead of allowing arbitrary sampling flags, nail down a
+// specific set, and simplify parsing at the same time?
+// Also, for non-sample events (if they're possible), union of structs is
+// interesting.
 class EventConfig {
  public:
   static base::Optional<EventConfig> Create(const DataSourceConfig& ds_config) {
@@ -55,7 +58,6 @@
  private:
   EventConfig(const protos::pbzero::PerfEventConfig::Decoder&) {
     auto& pe = perf_event_attr_;
-    memset(&pe, 0, sizeof(perf_event_attr));
     pe.size = sizeof(perf_event_attr);
 
     pe.exclude_kernel = true;
@@ -67,11 +69,16 @@
     pe.sample_freq = 100;
     pe.freq = true;
 
-    pe.sample_type =
-        PERF_SAMPLE_TID | PERF_SAMPLE_STACK_USER | PERF_SAMPLE_REGS_USER;
+    pe.sample_type = PERF_SAMPLE_TID | PERF_SAMPLE_TIME |
+                     PERF_SAMPLE_STACK_USER | PERF_SAMPLE_REGS_USER;
     // Needs to be < ((u16)(~0u)), and have bottom 8 bits clear.
     pe.sample_stack_user = (1u << 15);
-    pe.sample_regs_user = PerfUserRegsMaskForCurrentArch();
+    pe.sample_regs_user =
+        PerfUserRegsMaskForArch(unwindstack::Regs::CurrentArch());
+
+    // for PERF_SAMPLE_TIME
+    pe.clockid = CLOCK_BOOTTIME;
+    pe.use_clockid = true;
   }
 
   // TODO(rsavitski): for now hardcode each session to be for a single cpu's
@@ -81,7 +88,7 @@
   // TODO(rsavitski): if we allow for event groups containing multiple sampled
   // counters, we'll need to vary the .type & .config fields per
   // perf_event_open.
-  perf_event_attr perf_event_attr_;
+  perf_event_attr perf_event_attr_ = {};
 };
 
 }  // namespace profiling
diff --git a/src/profiling/perf/event_reader.cc b/src/profiling/perf/event_reader.cc
index f7b297f..a996fe5 100644
--- a/src/profiling/perf/event_reader.cc
+++ b/src/profiling/perf/event_reader.cc
@@ -23,13 +23,15 @@
 #include <unistd.h>
 
 #include "perfetto/ext/base/utils.h"
-#include "src/profiling/perf/unwind_support.h"
+#include "src/profiling/perf/regs_parsing.h"
 
 namespace perfetto {
 namespace profiling {
 
 namespace {
 
+constexpr size_t kDataPagesPerRingBuffer = 256;  // 1 MB (256 x 4k pages)
+
 template <typename T>
 const char* ReadValue(T* value_out, const char* ptr) {
   memcpy(value_out, reinterpret_cast<const void*>(ptr), sizeof(T));
@@ -121,42 +123,59 @@
   return base::make_optional(std::move(ret));
 }
 
-// TODO(rsavitski): look into more specific barrier builtins. Copying simpleperf
-// for now. See |perf_output_put_handle| in the kernel for the barrier
-// requirements.
+// TODO(rsavitski): should be possible to use address-specific atomics, see
+// libperf's explanation on why the kernel uses barriers.
 #pragma GCC diagnostic push
 #if defined(__clang__)
 #pragma GCC diagnostic ignored "-Watomic-implicit-seq-cst"
 #endif
-std::vector<char> PerfRingBuffer::ReadAvailable() {
-  if (!valid())
-    return {};
+char* PerfRingBuffer::ReadRecordNonconsuming() {
+  PERFETTO_CHECK(valid());
 
   uint64_t write_offset = metadata_page_->data_head;
   uint64_t read_offset = metadata_page_->data_tail;
   __sync_synchronize();  // needs to be rmb()
 
+  PERFETTO_DCHECK(read_offset <= write_offset);
+  if (write_offset == read_offset)
+    return nullptr;  // no new data
+
   size_t read_pos = static_cast<size_t>(read_offset & (data_buf_sz_ - 1));
-  size_t data_sz = static_cast<size_t>(write_offset - read_offset);
 
-  if (data_sz == 0) {
-    return {};
+  // event header (64 bits) guaranteed to be contiguous
+  PERFETTO_DCHECK(read_pos <= data_buf_sz_ - sizeof(perf_event_header));
+  PERFETTO_DCHECK(0 == reinterpret_cast<size_t>(data_buf_ + read_pos) %
+                           alignof(perf_event_header));
+
+  perf_event_header* evt_header =
+      reinterpret_cast<perf_event_header*>(data_buf_ + read_pos);
+  uint16_t evt_size = evt_header->size;
+
+  // event wrapped - reconstruct it, and return a pointer to the buffer
+  if (read_pos + evt_size > data_buf_sz_) {
+    PERFETTO_DCHECK(read_pos + evt_size !=
+                    ((read_pos + evt_size) & (data_buf_sz_ - 1)));
+    PERFETTO_DLOG("PerfRingBuffer: returning reconstructed event");
+
+    size_t prefix_sz = data_buf_sz_ - read_pos;
+    memcpy(&reconstructed_record_[0], data_buf_ + read_pos, prefix_sz);
+    memcpy(&reconstructed_record_[0] + prefix_sz, data_buf_,
+           evt_size - prefix_sz);
+    return &reconstructed_record_[0];
+  } else {
+    // usual case - contiguous sample
+    PERFETTO_DCHECK(read_pos + evt_size ==
+                    ((read_pos + evt_size) & (data_buf_sz_ - 1)));
+
+    return data_buf_ + read_pos;
   }
+}
 
-  // memcpy accounting for wrapping
-  std::vector<char> data(data_sz);
-  size_t copy_sz = std::min(data_sz, data_buf_sz_ - read_pos);
-  memcpy(data.data(), data_buf_ + read_pos, copy_sz);
-  if (copy_sz < data_sz) {
-    memcpy(data.data() + copy_sz, data_buf_, data_sz - copy_sz);
-  }
+void PerfRingBuffer::Consume(size_t bytes) {
+  PERFETTO_CHECK(valid());
 
-  // consume the data
   __sync_synchronize();  // needs to be mb()
-  metadata_page_->data_tail += data_sz;
-
-  PERFETTO_LOG("WIP: consumed [%zu] bytes from ring buffer", data_sz);
-  return data;
+  metadata_page_->data_tail += bytes;
 }
 #pragma GCC diagnostic pop
 
@@ -190,7 +209,7 @@
   }
 
   auto ring_buffer =
-      PerfRingBuffer::Allocate(perf_fd.get(), /*data_page_count=*/128);
+      PerfRingBuffer::Allocate(perf_fd.get(), kDataPagesPerRingBuffer);
   if (!ring_buffer.has_value()) {
     return base::nullopt;
   }
@@ -199,89 +218,95 @@
                                           std::move(ring_buffer.value()));
 }
 
-void EventReader::ParseNextSampleBatch() {
-  std::vector<char> data = ring_buffer_.ReadAvailable();
-  if (data.size() == 0) {
-    PERFETTO_LOG("no samples (work in progress)");
-    return;
-  }
+base::Optional<ParsedSample> EventReader::ReadUntilSample(
+    std::function<void(uint64_t)> lost_events_callback) {
+  for (;;) {
+    char* event = ring_buffer_.ReadRecordNonconsuming();
+    if (!event)
+      return base::nullopt;  // caught up with the writer
 
-  for (const char* ptr = data.data(); ptr < data.data() + data.size();) {
-    if (!ParseSampleAndAdvance(&ptr))
-      break;
+    auto* event_hdr = reinterpret_cast<const perf_event_header*>(event);
+    PERFETTO_DLOG("record header: [%zu][%zu][%zu]",
+                  static_cast<size_t>(event_hdr->type),
+                  static_cast<size_t>(event_hdr->misc),
+                  static_cast<size_t>(event_hdr->size));
+
+    if (event_hdr->type == PERF_RECORD_SAMPLE) {
+      ParsedSample sample = ParseSampleRecord(event, event_hdr->size);
+      ring_buffer_.Consume(event_hdr->size);
+      return base::make_optional(std::move(sample));
+    }
+
+    if (event_hdr->type == PERF_RECORD_LOST) {
+      uint64_t lost_events = *reinterpret_cast<const uint64_t*>(
+          event + sizeof(perf_event_header) + sizeof(uint64_t));
+
+      lost_events_callback(lost_events);
+      // advance ring buffer position and keep looking for a sample
+      ring_buffer_.Consume(event_hdr->size);
+      continue;
+    }
+
+    PERFETTO_FATAL("Unsupported event type");
   }
 }
 
-bool EventReader::ParseSampleAndAdvance(const char** ptr) {
-  const char* sample_start = *ptr;
-  auto* event_hdr = reinterpret_cast<const perf_event_header*>(sample_start);
-
-  PERFETTO_LOG("WIP: event_header[%zu][%zu][%zu]",
-               static_cast<size_t>(event_hdr->type),
-               static_cast<size_t>(event_hdr->misc),
-               static_cast<size_t>(event_hdr->size));
-
-  if (event_hdr->type == PERF_RECORD_SAMPLE) {
-    ParsePerfRecordSample(sample_start, event_hdr->size);
-  } else {
-    PERFETTO_ELOG("Unsupported event type (work in progress)");
-  }
-
-  *ptr = sample_start + event_hdr->size;
-  return true;
-}
-
-// TODO(rsavitski): actually handle the samples instead of logging.
-void EventReader::ParsePerfRecordSample(const char* sample_start,
-                                        size_t sample_size) {
+ParsedSample EventReader::ParseSampleRecord(const char* sample_start,
+                                            size_t sample_size) {
   const perf_event_attr* cfg = event_cfg_.perf_attr();
 
-  if (cfg->sample_type & (~uint64_t(PERF_SAMPLE_TID | PERF_SAMPLE_STACK_USER |
-                                    PERF_SAMPLE_REGS_USER))) {
-    PERFETTO_ELOG("Unsupported sampling option (work in progress)");
-    return;
+  if (cfg->sample_type &
+      (~uint64_t(PERF_SAMPLE_TID | PERF_SAMPLE_TIME | PERF_SAMPLE_STACK_USER |
+                 PERF_SAMPLE_REGS_USER))) {
+    PERFETTO_FATAL("Unsupported sampling option");
   }
 
   // Parse the payload, which consists of concatenated data for each
   // |attr.sample_type| flag.
+  ParsedSample sample = {};
   const char* parse_pos = sample_start + sizeof(perf_event_header);
 
   if (cfg->sample_type & PERF_SAMPLE_TID) {
-    uint32_t pid;
+    uint32_t pid = 0;
+    uint32_t tid = 0;
     parse_pos = ReadValue(&pid, parse_pos);
-    PERFETTO_LOG("pid: %" PRIu32 "", pid);
-
-    uint32_t tid;
     parse_pos = ReadValue(&tid, parse_pos);
-    PERFETTO_LOG("tid: %" PRIu32 "", tid);
+    sample.pid = static_cast<pid_t>(pid);
+    sample.tid = static_cast<pid_t>(tid);
+  }
+
+  if (cfg->sample_type & PERF_SAMPLE_TIME) {
+    parse_pos = ReadValue(&sample.timestamp, parse_pos);
   }
 
   if (cfg->sample_type & PERF_SAMPLE_REGS_USER) {
-    auto parsed_regs = ReadPerfUserRegsData(&parse_pos);
-
-    if (parsed_regs) {
-      parsed_regs->IterateRegisters([](const char* name, uint64_t value) {
-        PERFETTO_LOG("reg[%s]: %" PRIx64 "", name, value);
-      });
-    }
+    // Can be empty, e.g. if we sampled a kernel thread.
+    sample.regs = ReadPerfUserRegsData(&parse_pos);
   }
 
   if (cfg->sample_type & PERF_SAMPLE_STACK_USER) {
     uint64_t max_stack_size;  // the requested size
     parse_pos = ReadValue(&max_stack_size, parse_pos);
-    PERFETTO_LOG("max_stack_size: %" PRIu64 "", max_stack_size);
+    PERFETTO_DLOG("max_stack_size: %" PRIu64 "", max_stack_size);
 
-    parse_pos += max_stack_size;  // skip raw data
+    const char* stack_start = parse_pos;
+    parse_pos += max_stack_size;  // skip to dyn_size
 
-    // not written if requested stack sampling size is zero
+    // written only if requested stack sampling size is nonzero
     if (max_stack_size > 0) {
       uint64_t filled_stack_size;
       parse_pos = ReadValue(&filled_stack_size, parse_pos);
-      PERFETTO_LOG("filled_stack_size: %" PRIu64 "", filled_stack_size);
+      PERFETTO_DLOG("filled_stack_size: %" PRIu64 "", filled_stack_size);
+
+      // copy stack bytes into a vector
+      size_t payload_sz = static_cast<size_t>(filled_stack_size);
+      sample.stack.resize(payload_sz);
+      memcpy(sample.stack.data(), stack_start, payload_sz);
     }
   }
 
   PERFETTO_CHECK(parse_pos == sample_start + sample_size);
+  return sample;
 }
 
 }  // namespace profiling
diff --git a/src/profiling/perf/event_reader.h b/src/profiling/perf/event_reader.h
index fa9b96b..a5d16c0 100644
--- a/src/profiling/perf/event_reader.h
+++ b/src/profiling/perf/event_reader.h
@@ -24,19 +24,12 @@
 
 #include "perfetto/ext/base/optional.h"
 #include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/tracing/core/basic_types.h"
 #include "src/profiling/perf/event_config.h"
 
 namespace perfetto {
 namespace profiling {
 
-// TODO(rsavitski): currently written for the non-overwriting ring buffer mode
-// (PROT_WRITE). Decide on whether there are use-cases for supporting the other.
-// TODO(rsavitski): given perf_event_mlock_kb limit, can we afford a ring buffer
-// per data source, or will we be forced to multiplex everything onto a single
-// ring buffer in the worst case? Alternatively, obtain CAP_IPC_LOCK (and do own
-// limiting)? Or get an adjusted RLIMIT_MEMLOCK?
-// TODO(rsavitski): polling for now, look into supporting the notification
-// mechanisms (such as epoll) later.
 class PerfRingBuffer {
  public:
   static base::Optional<PerfRingBuffer> Allocate(int perf_fd,
@@ -50,7 +43,8 @@
   PerfRingBuffer(PerfRingBuffer&& other) noexcept;
   PerfRingBuffer& operator=(PerfRingBuffer&& other) noexcept;
 
-  std::vector<char> ReadAvailable();
+  char* ReadRecordNonconsuming();
+  void Consume(size_t bytes);
 
  private:
   PerfRingBuffer() = default;
@@ -58,15 +52,29 @@
   bool valid() const { return metadata_page_ != nullptr; }
 
   // TODO(rsavitski): volatile?
-  // Is exactly the start of the mmap'd region.
+  // Points at the start of the mmap'd region.
   perf_event_mmap_page* metadata_page_ = nullptr;
 
-  // size of the mmap'd region (1 metadata page + data_buf_sz_)
+  // Size of the mmap'd region (1 metadata page + data_buf_sz_).
   size_t mmap_sz_ = 0;
 
   // mmap'd ring buffer
   char* data_buf_ = nullptr;
   size_t data_buf_sz_ = 0;
+
+  // When a record wraps around the ring buffer boundary, it is reconstructed in
+  // a contiguous form in this buffer. This allows us to always return a pointer
+  // to a contiguous record.
+  constexpr static size_t kMaxPerfRecordSize = 1 << 16;  // max size 64k
+  alignas(uint64_t) char reconstructed_record_[kMaxPerfRecordSize];
+};
+
+struct ParsedSample {
+  pid_t pid = 0;
+  pid_t tid = 0;
+  uint64_t timestamp = 0;
+  std::unique_ptr<unwindstack::Regs> regs;
+  std::vector<char> stack;
 };
 
 class EventReader {
@@ -78,6 +86,12 @@
   static base::Optional<EventReader> ConfigureEvents(
       const EventConfig& event_cfg);
 
+  // Consumes records from the ring buffer until either encountering a sample,
+  // or catching up to the writer. The other record of interest
+  // (PERF_RECORD_LOST) is handled via the given callback.
+  base::Optional<ParsedSample> ReadUntilSample(
+      std::function<void(uint64_t)> lost_events_callback);
+
   ~EventReader() = default;
 
   // move-only
@@ -86,16 +100,13 @@
   EventReader(EventReader&&) noexcept;
   EventReader& operator=(EventReader&&) noexcept;
 
-  // TODO(rsavitski): temporary one-shot parser for development purposes.
-  void ParseNextSampleBatch();
-
  private:
   EventReader(const EventConfig& event_cfg,
               base::ScopedFile perf_fd,
               PerfRingBuffer ring_buffer);
 
-  bool ParseSampleAndAdvance(const char** ptr);
-  void ParsePerfRecordSample(const char* sample_payload, size_t sample_size);
+  ParsedSample ParseSampleRecord(const char* sample_payload,
+                                 size_t sample_size);
 
   const EventConfig event_cfg_;
   base::ScopedFile perf_fd_;
diff --git a/src/profiling/perf/perf_producer.cc b/src/profiling/perf/perf_producer.cc
index e544f86..f4935ed 100644
--- a/src/profiling/perf/perf_producer.cc
+++ b/src/profiling/perf/perf_producer.cc
@@ -16,9 +16,10 @@
 
 #include "src/profiling/perf/perf_producer.h"
 
-#include <unistd.h>
 #include <utility>
 
+#include <unistd.h>
+
 #include "perfetto/base/logging.h"
 #include "perfetto/base/task_runner.h"
 #include "perfetto/ext/base/weak_ptr.h"
@@ -35,6 +36,13 @@
 namespace profiling {
 namespace {
 
+// TODO(rsavitski): for low sampling rates, look into epoll to detect samples.
+constexpr uint32_t kReadTickPeriodMs = 200;
+constexpr uint32_t kUnwindTickPeriodMs = 200;
+// TODO(rsavitski): this is better calculated (at setup) from the buffer and
+// sample sizes.
+constexpr size_t kMaxSamplesPerTick = 32;
+
 constexpr uint32_t kInitialConnectionBackoffMs = 100;
 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
 
@@ -43,66 +51,275 @@
 
 }  // namespace
 
-PerfProducer::PerfProducer(base::TaskRunner* task_runner)
-    : task_runner_(task_runner), weak_factory_(this) {}
+PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
+                           base::TaskRunner* task_runner)
+    : task_runner_(task_runner),
+      proc_fd_getter_(proc_fd_getter),
+      weak_factory_(this) {
+  proc_fd_getter->SetDelegate(this);
+}
 
-// TODO(rsavitski): configure at setup + enable at start, or do everything on
-// start? Also, do we try to work around the old(?) cpu hotplug bugs as
-// simpleperf does?
+// TODO(rsavitski): consider configure at setup + enable at start instead.
 void PerfProducer::SetupDataSource(DataSourceInstanceID,
                                    const DataSourceConfig&) {}
 
 void PerfProducer::StartDataSource(DataSourceInstanceID instance_id,
                                    const DataSourceConfig& config) {
-  PERFETTO_LOG("StartDataSource(id=%" PRIu64 ", name=%s)", instance_id,
-               config.name().c_str());
+  PERFETTO_DLOG("StartDataSource(id=%" PRIu64 ", name=%s)", instance_id,
+                config.name().c_str());
 
   if (config.name() != kDataSourceName)
     return;
 
   base::Optional<EventConfig> event_config = EventConfig::Create(config);
   if (!event_config.has_value()) {
-    PERFETTO_LOG("PerfEventConfig rejected.");
+    PERFETTO_ELOG("PerfEventConfig rejected.");
     return;
   }
 
   base::Optional<EventReader> event_reader =
       EventReader::ConfigureEvents(event_config.value());
   if (!event_reader.has_value()) {
-    PERFETTO_LOG("Failed to set up perf events.");
+    PERFETTO_ELOG("Failed to set up perf events.");
     return;
   }
 
-  // Build the DataSource instance.
+  // Construct the data source instance.
   auto it_inserted = data_sources_.emplace(
       std::piecewise_construct, std::forward_as_tuple(instance_id),
       std::forward_as_tuple(std::move(event_reader.value())));
 
-  PERFETTO_DCHECK(it_inserted.second);
+  PERFETTO_CHECK(it_inserted.second);
+
+  // Kick off periodic read task.
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, instance_id] {
+        if (weak_this)
+          weak_this->TickDataSourceRead(instance_id);
+      },
+      kReadTickPeriodMs);
+
+  // Set up unwind queue and kick off a periodic task to process it.
+  unwind_queues_.emplace(instance_id, std::deque<UnwindEntry>{});
+  task_runner_->PostDelayedTask(
+      [weak_this, instance_id] {
+        if (weak_this)
+          weak_this->TickDataSourceUnwind(instance_id);
+      },
+      kUnwindTickPeriodMs);
 }
 
+// TODO(rsavitski): stop perf_event before draining ring buffer and internal
+// queues (more aggressive flush).
 void PerfProducer::StopDataSource(DataSourceInstanceID instance_id) {
-  PERFETTO_LOG("StopDataSource(id=%" PRIu64 ")", instance_id);
-
+  PERFETTO_DLOG("StopDataSource(id=%" PRIu64 ")", instance_id);
   data_sources_.erase(instance_id);
+  unwind_queues_.erase(instance_id);
 }
 
-void PerfProducer::Flush(FlushRequestID,
+void PerfProducer::Flush(FlushRequestID flush_id,
                          const DataSourceInstanceID* data_source_ids,
                          size_t num_data_sources) {
   for (size_t i = 0; i < num_data_sources; i++) {
-    PERFETTO_LOG("Flush(id=%" PRIu64 ")", data_source_ids[i]);
+    auto ds_id = data_source_ids[i];
+    PERFETTO_DLOG("Flush(id=%" PRIu64 ")", ds_id);
 
-    auto ds_it = data_sources_.find(data_source_ids[i]);
+    auto ds_it = data_sources_.find(ds_id);
     if (ds_it != data_sources_.end()) {
-      auto& ds = ds_it->second;
+      auto unwind_it = unwind_queues_.find(ds_id);
+      PERFETTO_CHECK(unwind_it != unwind_queues_.end());
 
-      // For now, parse whatever's been accumulated in the ring buffer.
-      ds.event_reader.ParseNextSampleBatch();
+      ProcessUnwindQueue(&unwind_it->second, ds_it->second);
+      endpoint_->NotifyFlushComplete(flush_id);
     }
   }
 }
 
+void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
+  using Status = DataSource::ProcDescriptors::Status;
+  auto it = data_sources_.find(ds_id);
+  if (it == data_sources_.end()) {
+    PERFETTO_DLOG("Stopping TickDataSourceRead(%zu)",
+                  static_cast<size_t>(ds_id));
+    return;
+  }
+  DataSource& ds = it->second;
+
+  // TODO(rsavitski): record the loss in the trace.
+  auto lost_events_callback = [ds_id](uint64_t lost_events) {
+    PERFETTO_ELOG("DataSource instance [%zu] lost [%" PRIu64 "] events",
+                  static_cast<size_t>(ds_id), lost_events);
+  };
+
+  for (size_t i = 0; i < kMaxSamplesPerTick; i++) {
+    base::Optional<ParsedSample> sample =
+        ds.event_reader.ReadUntilSample(lost_events_callback);
+    if (!sample)
+      break;  // caught up to the writer
+
+    // Request proc-fds for the process if this is the first time we see it yet.
+    pid_t pid = sample->pid;
+    auto& fd_entry = ds.proc_fds[pid];  // created if absent
+
+    if (fd_entry.status == Status::kInitial) {
+      PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
+      fd_entry.status = Status::kResolving;
+      proc_fd_getter_->GetDescriptorsForPid(pid);  // response is async
+      PostDescriptorLookupTimeout(ds_id, pid, /*timeout_ms=*/1000);
+    }
+
+    if (fd_entry.status == Status::kSkip) {
+      PERFETTO_DLOG("Skipping sample for previously poisoned pid [%d]",
+                    static_cast<int>(pid));
+      continue;
+    }
+
+    // Push the sample into a dedicated unwinding queue.
+    unwind_queues_[ds_id].emplace_back(std::move(sample.value()));
+  }
+
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, ds_id] {
+        if (weak_this)
+          weak_this->TickDataSourceRead(ds_id);
+      },
+      kReadTickPeriodMs);
+}
+
+// TODO(rsavitski): first-fit makes descriptor request fulfillment not true
+// FIFO.
+void PerfProducer::OnProcDescriptors(pid_t pid,
+                                     base::ScopedFile maps_fd,
+                                     base::ScopedFile mem_fd) {
+  using Status = DataSource::ProcDescriptors::Status;
+  PERFETTO_DLOG("PerfProducer::OnProcDescriptors [%d]->{%d, %d}",
+                static_cast<int>(pid), maps_fd.get(), mem_fd.get());
+
+  // Find first fit data source that is waiting on descriptors for the process.
+  for (auto& it : data_sources_) {
+    DataSource& ds = it.second;
+    auto proc_fd_it = ds.proc_fds.find(pid);
+    if (proc_fd_it != ds.proc_fds.end() &&
+        proc_fd_it->second.status == Status::kResolving) {
+      proc_fd_it->second.status = Status::kResolved;
+      proc_fd_it->second.maps_fd = std::move(maps_fd);
+      proc_fd_it->second.mem_fd = std::move(mem_fd);
+      PERFETTO_DLOG("Handed off proc-fds for pid [%d] to DS [%zu]",
+                    static_cast<int>(pid), static_cast<size_t>(it.first));
+      return;  // done
+    }
+  }
+  PERFETTO_DLOG(
+      "Discarding proc-fds for pid [%d] as found no outstanding requests.",
+      static_cast<int>(pid));
+}
+
+void PerfProducer::PostDescriptorLookupTimeout(DataSourceInstanceID ds_id,
+                                               pid_t pid,
+                                               uint32_t timeout_ms) {
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, ds_id, pid] {
+        if (weak_this)
+          weak_this->HandleDescriptorLookupTimeout(ds_id, pid);
+      },
+      timeout_ms);
+}
+
+void PerfProducer::HandleDescriptorLookupTimeout(DataSourceInstanceID ds_id,
+                                                 pid_t pid) {
+  using Status = DataSource::ProcDescriptors::Status;
+  auto ds_it = data_sources_.find(ds_id);
+  if (ds_it == data_sources_.end())
+    return;
+
+  // If the request is still outstanding, poison the pid for this source.
+  DataSource& ds = ds_it->second;
+  auto proc_fd_it = ds.proc_fds.find(pid);
+  if (proc_fd_it != ds.proc_fds.end() &&
+      proc_fd_it->second.status == Status::kResolving) {
+    proc_fd_it->second.status = Status::kSkip;
+    PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
+                  static_cast<int>(pid), static_cast<size_t>(ds_it->first));
+  }
+}
+
+void PerfProducer::TickDataSourceUnwind(DataSourceInstanceID ds_id) {
+  auto q_it = unwind_queues_.find(ds_id);
+  auto ds_it = data_sources_.find(ds_id);
+  if (q_it == unwind_queues_.end() || ds_it == data_sources_.end()) {
+    PERFETTO_DLOG("Stopping TickDataSourceUnwind(%zu)",
+                  static_cast<size_t>(ds_id));
+    return;
+  }
+
+  ProcessUnwindQueue(&q_it->second, ds_it->second);
+
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, ds_id] {
+        if (weak_this)
+          weak_this->TickDataSourceUnwind(ds_id);
+      },
+      kUnwindTickPeriodMs);
+}
+
+// TODO(rsavitski): reader can purge kResolving entries from the start once the
+// queue grows too large.
+void PerfProducer::ProcessUnwindQueue(std::deque<UnwindEntry>* queue_ptr,
+                                      const DataSource& ds) {
+  using Status = DataSource::ProcDescriptors::Status;
+  auto& queue = *queue_ptr;
+
+  // Iterate over the queue, handling unwindable samples, and then marking them
+  // as processed.
+  size_t num_samples = queue.size();
+  for (size_t i = 0; i < num_samples; i++) {
+    UnwindEntry& entry = queue[i];
+    if (!entry.valid)
+      continue;  // already processed
+
+    ParsedSample& sample = entry.sample;
+    auto proc_fd_it = ds.proc_fds.find(sample.pid);
+    PERFETTO_CHECK(proc_fd_it != ds.proc_fds.end());  // must be present
+
+    auto fd_status = proc_fd_it->second.status;
+    PERFETTO_CHECK(fd_status != Status::kInitial);
+
+    if (fd_status == Status::kSkip) {
+      PERFETTO_DLOG("Skipping sample for pid [%d]",
+                    static_cast<int>(sample.pid));
+      entry.valid = false;
+      continue;
+    }
+
+    if (fd_status == Status::kResolving) {
+      PERFETTO_DLOG("Still resolving sample for pid [%d]",
+                    static_cast<int>(sample.pid));
+      continue;
+    }
+
+    if (fd_status == Status::kResolved) {
+      PERFETTO_DLOG("Accepting sample: pid:[%d], ts:[%" PRIu64 "]",
+                    static_cast<int>(sample.pid), sample.timestamp);
+      entry.valid = false;
+      continue;
+    }
+  }
+
+  // Pop all leading processed entries.
+  for (size_t i = 0; i < num_samples; i++) {
+    PERFETTO_DCHECK(queue.size() > 0);
+    if (queue.front().valid)
+      break;
+    queue.pop_front();
+  }
+
+  PERFETTO_DLOG("Unwind queue drain: [%zu]->[%zu]", num_samples, queue.size());
+}
+
 void PerfProducer::ConnectWithRetries(const char* socket_name) {
   PERFETTO_DCHECK(state_ == kNotStarted);
   state_ = kNotConnected;
@@ -168,10 +385,11 @@
   // recreate it again.
   base::TaskRunner* task_runner = task_runner_;
   const char* socket_name = producer_socket_name_;
+  ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
 
   // Invoke destructor and then the constructor again.
   this->~PerfProducer();
-  new (this) PerfProducer(task_runner);
+  new (this) PerfProducer(proc_fd_getter, task_runner);
 
   ConnectWithRetries(socket_name);
 }
diff --git a/src/profiling/perf/perf_producer.h b/src/profiling/perf/perf_producer.h
index e373836..426ad70 100644
--- a/src/profiling/perf/perf_producer.h
+++ b/src/profiling/perf/perf_producer.h
@@ -17,24 +17,33 @@
 #ifndef SRC_PROFILING_PERF_PERF_PRODUCER_H_
 #define SRC_PROFILING_PERF_PERF_PRODUCER_H_
 
+#include <deque>
 #include <map>
 
+#include <unistd.h>
+
+#include <unwindstack/Regs.h>
+
 #include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/optional.h"
 #include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/base/unix_socket.h"
 #include "perfetto/ext/base/weak_ptr.h"
 #include "perfetto/ext/tracing/core/basic_types.h"
 #include "perfetto/ext/tracing/core/producer.h"
 #include "perfetto/ext/tracing/core/tracing_service.h"
 #include "src/profiling/perf/event_config.h"
 #include "src/profiling/perf/event_reader.h"
+#include "src/profiling/perf/proc_descriptors.h"
 
 namespace perfetto {
 namespace profiling {
 
 // TODO(b/144281346): work in progress. Do not use.
-class PerfProducer : public Producer {
+class PerfProducer : public Producer, public ProcDescriptorDelegate {
  public:
-  PerfProducer(base::TaskRunner* task_runner);
+  PerfProducer(ProcDescriptorGetter* proc_fd_getter,
+               base::TaskRunner* task_runner);
   ~PerfProducer() override = default;
 
   PerfProducer(const PerfProducer&) = delete;
@@ -44,7 +53,7 @@
 
   void ConnectWithRetries(const char* socket_name);
 
-  // Producer Impl:
+  // Producer impl:
   void OnConnect() override;
   void OnDisconnect() override;
   void OnTracingSetup() override {}
@@ -57,6 +66,11 @@
   void ClearIncrementalState(const DataSourceInstanceID* /*data_source_ids*/,
                              size_t /*num_data_sources*/) override {}
 
+  // ProcDescriptorDelegate impl:
+  void OnProcDescriptors(pid_t pid,
+                         base::ScopedFile maps_fd,
+                         base::ScopedFile mem_fd) override;
+
  private:
   // State of the connection to tracing service (traced).
   enum State {
@@ -70,9 +84,31 @@
     DataSource(EventReader _event_reader)
         : event_reader(std::move(_event_reader)) {}
 
-    // TODO(rsavitski): current thinking is an EventReader per cpu-scoped ring
-    // buffer. And a central bookkeeper.
+    // TODO(rsavitski): event reader per kernel buffer.
     EventReader event_reader;
+
+    // TODO(rsavitski): under a single-threaded model, directly shared between
+    // the reader and the "unwinder". If/when lifting unwinding into a separate
+    // thread(s), the FDs will become owned by the unwinder, but the tracking
+    // will need to be done by both sides (frontend needs to know whether to
+    // resolve the pid, and the unwinder needs to know whether the fd is
+    // ready/poisoned).
+    struct ProcDescriptors {
+      enum class Status { kInitial, kResolving, kResolved, kSkip };
+      Status status = Status::kInitial;
+      base::ScopedFile maps_fd;
+      base::ScopedFile mem_fd;
+    };
+    std::map<pid_t, ProcDescriptors> proc_fds;  // keyed by pid
+  };
+
+  struct UnwindEntry {
+   public:
+    UnwindEntry(ParsedSample _sample)
+        : valid(true), sample(std::move(_sample)) {}
+
+    bool valid = false;
+    ParsedSample sample;
   };
 
   void ConnectService();
@@ -80,13 +116,31 @@
   void ResetConnectionBackoff();
   void IncreaseConnectionBackoff();
 
+  void TickDataSourceRead(DataSourceInstanceID ds_id);
+
+  void PostDescriptorLookupTimeout(DataSourceInstanceID ds_id,
+                                   pid_t pid,
+                                   uint32_t timeout_ms);
+  void HandleDescriptorLookupTimeout(DataSourceInstanceID ds_id, pid_t pid);
+
+  void TickDataSourceUnwind(DataSourceInstanceID ds_id);
+  void ProcessUnwindQueue(std::deque<UnwindEntry>* queue_ptr,
+                          const DataSource& ds);
+
+  // Task runner owned by the main thread.
   base::TaskRunner* const task_runner_;
   State state_ = kNotStarted;
   const char* producer_socket_name_ = nullptr;
   uint32_t connection_backoff_ms_ = 0;
 
+  // Valid and stable for the lifetime of this class.
+  ProcDescriptorGetter* const proc_fd_getter_;
+
+  // Owns shared memory, must outlive trace writing.
   std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
+
   std::map<DataSourceInstanceID, DataSource> data_sources_;
+  std::map<DataSourceInstanceID, std::deque<UnwindEntry>> unwind_queues_;
 
   base::WeakPtrFactory<PerfProducer> weak_factory_;  // keep last
 };
diff --git a/src/profiling/perf/proc_descriptors.cc b/src/profiling/perf/proc_descriptors.cc
new file mode 100644
index 0000000..3a98bfb
--- /dev/null
+++ b/src/profiling/perf/proc_descriptors.cc
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/perf/proc_descriptors.h"
+
+#include <signal.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+namespace perfetto {
+
+ProcDescriptorDelegate::~ProcDescriptorDelegate() {}
+
+ProcDescriptorGetter::~ProcDescriptorGetter() {}
+
+// DirectDescriptorGetter:
+
+DirectDescriptorGetter::~DirectDescriptorGetter() {}
+
+void DirectDescriptorGetter::SetDelegate(ProcDescriptorDelegate* delegate) {
+  delegate_ = delegate;
+}
+
+void DirectDescriptorGetter::GetDescriptorsForPid(pid_t pid) {
+  char buf[128] = {};
+  snprintf(buf, sizeof(buf), "/proc/%d/mem", pid);
+  auto maps_fd = base::ScopedFile{open(buf, O_RDONLY | O_CLOEXEC)};
+  if (!maps_fd) {
+    PERFETTO_PLOG("Failed to open [%s]", buf);
+    return;
+  }
+
+  snprintf(buf, sizeof(buf), "/proc/%d/mem", pid);
+  auto mem_fd = base::ScopedFile{open(buf, O_RDONLY | O_CLOEXEC)};
+  if (!mem_fd) {
+    PERFETTO_PLOG("Failed to open [%s]", buf);
+    return;
+  }
+
+  delegate_->OnProcDescriptors(pid, std::move(maps_fd), std::move(mem_fd));
+}
+
+// AndroidRemoteDescriptorGetter:
+
+AndroidRemoteDescriptorGetter::~AndroidRemoteDescriptorGetter() = default;
+
+void AndroidRemoteDescriptorGetter::SetDelegate(
+    ProcDescriptorDelegate* delegate) {
+  delegate_ = delegate;
+}
+
+#if !PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
+void AndroidRemoteDescriptorGetter::GetDescriptorsForPid(pid_t) {
+  PERFETTO_FATAL("Unexpected build type for AndroidRemoteDescriptorGetter");
+}
+#else
+void AndroidRemoteDescriptorGetter::GetDescriptorsForPid(pid_t pid) {
+  constexpr static int kPerfProfilerSignalValue = 1;
+  constexpr static int kProfilerSignal = __SIGRTMIN + 4;
+
+  PERFETTO_DLOG("Sending signal to pid [%d]", pid);
+  union sigval signal_value;
+  signal_value.sival_int = kPerfProfilerSignalValue;
+  if (sigqueue(pid, kProfilerSignal, signal_value) != 0) {
+    PERFETTO_DPLOG("Failed sigqueue(%d)", pid);
+  }
+}
+#endif
+
+void AndroidRemoteDescriptorGetter::OnNewIncomingConnection(
+    base::UnixSocket*,
+    std::unique_ptr<base::UnixSocket> new_connection) {
+  PERFETTO_DLOG("remote fds: new connection from pid [%d]",
+                static_cast<int>(new_connection->peer_pid()));
+
+  active_connections_.emplace(new_connection.get(), std::move(new_connection));
+}
+
+void AndroidRemoteDescriptorGetter::OnDisconnect(base::UnixSocket* self) {
+  PERFETTO_DLOG("remote fds: disconnect from pid [%d]",
+                static_cast<int>(self->peer_pid()));
+
+  auto it = active_connections_.find(self);
+  PERFETTO_CHECK(it != active_connections_.end());
+  active_connections_.erase(it);
+}
+
+void AndroidRemoteDescriptorGetter::OnDataAvailable(base::UnixSocket* self) {
+  // Expect two file descriptors (maps, followed by mem).
+  base::ScopedFile fds[2];
+  char buf[1];
+  size_t received_bytes =
+      self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
+
+  PERFETTO_DLOG("remote fds: received %zu bytes", received_bytes);
+  if (!received_bytes)
+    return;
+
+  delegate_->OnProcDescriptors(self->peer_pid(), std::move(fds[0]),
+                               std::move(fds[1]));
+}
+
+}  // namespace perfetto
diff --git a/src/profiling/perf/proc_descriptors.h b/src/profiling/perf/proc_descriptors.h
new file mode 100644
index 0000000..d4a85b5
--- /dev/null
+++ b/src/profiling/perf/proc_descriptors.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_PERF_PROC_DESCRIPTORS_H_
+#define SRC_PROFILING_PERF_PROC_DESCRIPTORS_H_
+
+#include <sys/types.h>
+
+#include <map>
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/base/unix_socket.h"
+
+namespace perfetto {
+
+// Callback interface for receiving /proc/<pid>/ file descriptors (proc-fds)
+// from |ProcDescriptorGetter|.
+class ProcDescriptorDelegate {
+ public:
+  virtual void OnProcDescriptors(pid_t pid,
+                                 base::ScopedFile maps_fd,
+                                 base::ScopedFile mem_fd) = 0;
+
+  virtual ~ProcDescriptorDelegate();
+};
+
+class ProcDescriptorGetter {
+ public:
+  virtual void GetDescriptorsForPid(pid_t pid) = 0;
+  virtual void SetDelegate(ProcDescriptorDelegate* delegate) = 0;
+
+  virtual ~ProcDescriptorGetter();
+};
+
+// Directly opens /proc/<pid>/{maps,mem} files. Used when the daemon is running
+// with sufficient privileges to do so.
+class DirectDescriptorGetter : public ProcDescriptorGetter {
+ public:
+  void GetDescriptorsForPid(pid_t pid) override;
+  void SetDelegate(ProcDescriptorDelegate* delegate) override;
+
+  ~DirectDescriptorGetter() override;
+
+ private:
+  ProcDescriptorDelegate* delegate_ = nullptr;
+};
+
+// Implementation of |ProcDescriptorGetter| used when running as a system daemon
+// on Android. Uses a socket inherited from |init| and platform signal handlers
+// to obtain the proc-fds.
+class AndroidRemoteDescriptorGetter : public ProcDescriptorGetter,
+                                      public base::UnixSocket::EventListener {
+ public:
+  AndroidRemoteDescriptorGetter(int listening_raw_socket,
+                                base::TaskRunner* task_runner) {
+    listening_socket_ = base::UnixSocket::Listen(
+        base::ScopedFile(listening_raw_socket), this, task_runner,
+        base::SockFamily::kUnix, base::SockType::kStream);
+  }
+
+  // ProcDescriptorGetter impl:
+  void GetDescriptorsForPid(pid_t pid) override;
+  void SetDelegate(ProcDescriptorDelegate* delegate) override;
+
+  // UnixSocket::EventListener impl:
+  void OnNewIncomingConnection(
+      base::UnixSocket*,
+      std::unique_ptr<base::UnixSocket> new_connection) override;
+  void OnDataAvailable(base::UnixSocket* self) override;
+  void OnDisconnect(base::UnixSocket* self) override;
+
+  ~AndroidRemoteDescriptorGetter() override;
+
+ private:
+  ProcDescriptorDelegate* delegate_ = nullptr;
+  std::unique_ptr<base::UnixSocket> listening_socket_;
+  // Holds onto connections until we receive the file descriptors (keyed by
+  // their raw addresses, which the map keeps stable).
+  std::map<base::UnixSocket*, std::unique_ptr<base::UnixSocket>>
+      active_connections_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_PERF_PROC_DESCRIPTORS_H_
diff --git a/src/profiling/perf/unwind_support.cc b/src/profiling/perf/regs_parsing.cc
similarity index 96%
rename from src/profiling/perf/unwind_support.cc
rename to src/profiling/perf/regs_parsing.cc
index a3cef1b..0c90edc 100644
--- a/src/profiling/perf/unwind_support.cc
+++ b/src/profiling/perf/regs_parsing.cc
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#include "src/profiling/perf/unwind_support.h"
+#include "src/profiling/perf/regs_parsing.h"
 
 #include <inttypes.h>
 #include <linux/perf_event.h>
@@ -35,6 +35,7 @@
 #include <unwindstack/UserX86.h>
 #include <unwindstack/UserX86_64.h>
 
+// kernel uapi headers
 #include <uapi/asm-arm/asm/perf_regs.h>
 #include <uapi/asm-x86/asm/perf_regs.h>
 #define perf_event_arm_regs perf_event_arm64_regs
@@ -201,12 +202,12 @@
 
 }  // namespace
 
-uint64_t PerfUserRegsMaskForCurrentArch() {
-  return PerfUserRegsMask(unwindstack::Regs::CurrentArch());
+uint64_t PerfUserRegsMaskForArch(unwindstack::ArchEnum arch) {
+  return PerfUserRegsMask(arch);
 }
 
 // Assumes that the sampling was configured with
-// |PerfUserRegsMaskForCurrentArch|.
+// |PerfUserRegsMaskForArch(unwindstack::Regs::CurrentArch())|.
 std::unique_ptr<unwindstack::Regs> ReadPerfUserRegsData(const char** data) {
   unwindstack::ArchEnum requested_arch = unwindstack::Regs::CurrentArch();
 
@@ -215,12 +216,12 @@
   const char* parse_pos = *data;
   uint64_t sampled_abi;
   parse_pos = ReadValue(&sampled_abi, parse_pos);
-  PERFETTO_LOG("WIP: abi: %" PRIu64 "", sampled_abi);
+  PERFETTO_DLOG("sampled abi: %" PRIu64 "", sampled_abi);
 
   // Unpack the densely-packed register values into |RawRegisterData|, which has
   // a value for every register (unsampled registers will be left at zero).
   RawRegisterData raw_regs{};
-  uint64_t regs_mask = PerfUserRegsMaskForCurrentArch();
+  uint64_t regs_mask = PerfUserRegsMaskForArch(requested_arch);
   for (size_t i = 0; regs_mask && (i < RawRegisterData::kMaxSize); i++) {
     if (regs_mask & (1ULL << i)) {
       parse_pos = ReadValue(&raw_regs.regs[i], parse_pos);
diff --git a/src/profiling/perf/unwind_support.h b/src/profiling/perf/regs_parsing.h
similarity index 87%
rename from src/profiling/perf/unwind_support.h
rename to src/profiling/perf/regs_parsing.h
index f0764d2..6dc86dd 100644
--- a/src/profiling/perf/unwind_support.h
+++ b/src/profiling/perf/regs_parsing.h
@@ -14,8 +14,8 @@
  * limitations under the License.
  */
 
-#ifndef SRC_PROFILING_PERF_UNWIND_SUPPORT_H_
-#define SRC_PROFILING_PERF_UNWIND_SUPPORT_H_
+#ifndef SRC_PROFILING_PERF_REGS_PARSING_H_
+#define SRC_PROFILING_PERF_REGS_PARSING_H_
 
 #include <stdint.h>
 #include <unwindstack/Regs.h>
@@ -29,7 +29,7 @@
 
 // Returns a bitmask for sampling the userspace register set, used when
 // configuring perf events.
-uint64_t PerfUserRegsMaskForCurrentArch();
+uint64_t PerfUserRegsMaskForArch(unwindstack::ArchEnum arch);
 
 // Converts the raw sampled register bytes to libunwindstack's representation
 // (correct arch-dependent subclass). Advances |data| pointer to past the
@@ -42,4 +42,4 @@
 }  // namespace profiling
 }  // namespace perfetto
 
-#endif  // SRC_PROFILING_PERF_UNWIND_SUPPORT_H_
+#endif  // SRC_PROFILING_PERF_REGS_PARSING_H_
diff --git a/src/profiling/perf/traced_perf.cc b/src/profiling/perf/traced_perf.cc
index c165b20..77ce7c4 100644
--- a/src/profiling/perf/traced_perf.cc
+++ b/src/profiling/perf/traced_perf.cc
@@ -18,13 +18,40 @@
 #include "perfetto/ext/base/unix_task_runner.h"
 #include "perfetto/ext/tracing/ipc/default_socket.h"
 #include "src/profiling/perf/perf_producer.h"
+#include "src/profiling/perf/proc_descriptors.h"
 
 namespace perfetto {
 
+namespace {
+#if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
+static constexpr char kTracedPerfSocketEnvVar[] = "ANDROID_SOCKET_traced_perf";
+
+int GetRawInheritedListeningSocket() {
+  const char* sock_fd = getenv(kTracedPerfSocketEnvVar);
+  if (sock_fd == nullptr)
+    PERFETTO_FATAL("Did not inherit socket from init.");
+  char* end;
+  int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
+  if (*end != '\0')
+    PERFETTO_FATAL("Invalid env variable format. Expected decimal integer.");
+  return raw_fd;
+}
+#endif
+}  // namespace
+
 // TODO(rsavitski): watchdog.
 int TracedPerfMain(int, char**) {
   base::UnixTaskRunner task_runner;
-  profiling::PerfProducer producer(&task_runner);
+
+// TODO(rsavitski): support standalone --root or similar on android.
+#if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
+  AndroidRemoteDescriptorGetter proc_fd_getter{GetRawInheritedListeningSocket(),
+                                               &task_runner};
+#else
+  DirectDescriptorGetter proc_fd_getter;
+#endif
+
+  profiling::PerfProducer producer(&proc_fd_getter, &task_runner);
   producer.ConnectWithRetries(GetProducerSocket());
   task_runner.Run();
   return 0;