traced_perf: add callstack trie, allow it to be purged manually

Since the perf profiler doesn't have to keep track of the callsites, there's no
need to use the external refcounting (i.e. node increment/decrement). Instead,
we want to purge the whole tree when we're done / it grows too big (the latter
is for the future).

Bug: 144281346
Change-Id: I9f951bae6c0f9bc58c0d26fc8378cd270ef1312f
diff --git a/Android.bp b/Android.bp
index 12f93ef..83ac1b6 100644
--- a/Android.bp
+++ b/Android.bp
@@ -7493,6 +7493,8 @@
     ":perfetto_src_base_unix_socket",
     ":perfetto_src_ipc_client",
     ":perfetto_src_ipc_common",
+    ":perfetto_src_profiling_common_callstack_trie",
+    ":perfetto_src_profiling_common_interner",
     ":perfetto_src_profiling_common_unwind_support",
     ":perfetto_src_profiling_perf_proc_descriptors",
     ":perfetto_src_profiling_perf_producer",
diff --git a/include/perfetto/ext/base/lookup_set.h b/include/perfetto/ext/base/lookup_set.h
index 11bff8e..2e31a52 100644
--- a/include/perfetto/ext/base/lookup_set.h
+++ b/include/perfetto/ext/base/lookup_set.h
@@ -47,6 +47,8 @@
 
   bool Remove(const T& child) { return set_.erase(child); }
 
+  void Clear() { set_.clear(); }
+
   static_assert(std::is_const<U>::value, "key must be const");
 
  private:
diff --git a/src/profiling/common/callstack_trie.h b/src/profiling/common/callstack_trie.h
index b9bbf78..45b2786 100644
--- a/src/profiling/common/callstack_trie.h
+++ b/src/profiling/common/callstack_trie.h
@@ -102,22 +102,29 @@
 // nested callchain fragment at the leaf, which is called from many places).
 class GlobalCallstackTrie {
  public:
+  // Optionally, Nodes can be externally refcounted via |IncrementNode| and
+  // |DecrementNode|. In which case, the orphaned nodes are deleted when the
+  // last reference is decremented.
   class Node {
    public:
     // This is opaque except to GlobalCallstackTrie.
     friend class GlobalCallstackTrie;
 
     // Allow building a node out of a frame for base::LookupSet.
-    Node(Interned<Frame> frame) : Node(std::move(frame), 0, nullptr) {}
+    Node(Interned<Frame> frame) : Node(frame, 0, nullptr) {}
     Node(Interned<Frame> frame, uint64_t id)
         : Node(std::move(frame), id, nullptr) {}
     Node(Interned<Frame> frame, uint64_t id, Node* parent)
-        : id_(id), parent_(parent), location_(std::move(frame)) {}
+        : id_(id), parent_(parent), location_(frame) {}
+
+    ~Node() { PERFETTO_DCHECK(!ref_count_); }
 
     uint64_t id() const { return id_; }
 
    private:
     Node* GetOrCreateChild(const Interned<Frame>& loc);
+    // Deletes all descendant nodes, regardless of |ref_count_|.
+    void DeleteChildren() { children_.Clear(); }
 
     uint64_t ref_count_ = 0;
     uint64_t id_;
@@ -127,6 +134,7 @@
   };
 
   GlobalCallstackTrie() = default;
+  ~GlobalCallstackTrie() = default;
   GlobalCallstackTrie(const GlobalCallstackTrie&) = delete;
   GlobalCallstackTrie& operator=(const GlobalCallstackTrie&) = delete;
 
@@ -135,11 +143,19 @@
   GlobalCallstackTrie& operator=(GlobalCallstackTrie&&) = delete;
 
   Node* CreateCallsite(const std::vector<FrameData>& callstack);
-  static void DecrementNode(Node* node);
   static void IncrementNode(Node* node);
+  static void DecrementNode(Node* node);
 
   std::vector<Interned<Frame>> BuildCallstack(const Node* node) const;
 
+  // Purges all interned callstacks (and the associated internings), without
+  // restarting any interning sequences. Incompatible with external refcounting
+  // of nodes (Node.ref_count_).
+  void ClearTrie() {
+    PERFETTO_DLOG("Clearing trie");
+    root_.DeleteChildren();
+  }
+
  private:
   Node* GetOrCreateChild(Node* self, const Interned<Frame>& loc);
 
diff --git a/src/profiling/perf/BUILD.gn b/src/profiling/perf/BUILD.gn
index 9495971..6ca1259 100644
--- a/src/profiling/perf/BUILD.gn
+++ b/src/profiling/perf/BUILD.gn
@@ -54,6 +54,8 @@
     "../../../src/base",
     "../../../src/base:unix_socket",
     "../../../src/tracing/ipc/producer",
+    "../common:callstack_trie",
+    "../common:interner",
     "../common:unwind_support",
   ]
   sources = [
diff --git a/src/profiling/perf/perf_producer.cc b/src/profiling/perf/perf_producer.cc
index cb85212..4ad89aa 100644
--- a/src/profiling/perf/perf_producer.cc
+++ b/src/profiling/perf/perf_producer.cc
@@ -20,6 +20,8 @@
 
 #include <unistd.h>
 
+#include <unwindstack/Unwinder.h>
+
 #include "perfetto/base/logging.h"
 #include "perfetto/base/task_runner.h"
 #include "perfetto/ext/base/weak_ptr.h"
@@ -29,11 +31,12 @@
 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/data_source_descriptor.h"
-#include "protos/perfetto/config/profiling/perf_event_config.pbzero.h"
+#include "src/profiling/common/callstack_trie.h"
 #include "src/profiling/common/unwind_support.h"
 #include "src/profiling/perf/event_reader.h"
 
-#include <unwindstack/Unwinder.h>
+#include "protos/perfetto/config/profiling/perf_event_config.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
 namespace profiling {
@@ -90,10 +93,14 @@
     return;
   }
 
+  auto buffer_id = static_cast<BufferID>(config.target_buffer());
+  auto writer = endpoint_->CreateTraceWriter(buffer_id);
+
   // Construct the data source instance.
   auto it_inserted = data_sources_.emplace(
       std::piecewise_construct, std::forward_as_tuple(instance_id),
-      std::forward_as_tuple(std::move(event_reader.value())));
+      std::forward_as_tuple(std::move(writer),
+                            std::move(event_reader.value())));
 
   PERFETTO_CHECK(it_inserted.second);
 
@@ -126,15 +133,32 @@
       kBookkeepTickPeriodMs);
 }
 
-// TODO(rsavitski): stop perf_event before draining ring buffer and internal
-// queues (more aggressive flush).
+// TODO(rsavitski): stop perf_event, then drain ring buffer and internal
+// queues.
 void PerfProducer::StopDataSource(DataSourceInstanceID instance_id) {
   PERFETTO_DLOG("StopDataSource(id=%" PRIu64 ")", instance_id);
+  auto ds_it = data_sources_.find(instance_id);
+  if (ds_it == data_sources_.end())
+    return;
+
+  DataSource& ds = ds_it->second;
+
+  ds.trace_writer->Flush();
+
   data_sources_.erase(instance_id);
   unwind_queues_.erase(instance_id);
   bookkeping_queues_.erase(instance_id);
+
+  // If there are no more data sources, purge internings.
+  if (data_sources_.empty()) {
+    callstack_trie_.ClearTrie();
+  }
 }
 
+// TODO(rsavitski): ignoring flushes for now, as it is involved given
+// out-of-order unwinding and proc-fd timeouts. Instead of responding to
+// explicit flushes, we can ensure that we're otherwise well-behaved (do not
+// reorder packets too much). Final flush will be handled separately (on stop).
 void PerfProducer::Flush(FlushRequestID flush_id,
                          const DataSourceInstanceID* data_source_ids,
                          size_t num_data_sources) {
@@ -144,13 +168,6 @@
 
     auto ds_it = data_sources_.find(ds_id);
     if (ds_it != data_sources_.end()) {
-      auto unwind_it = unwind_queues_.find(ds_id);
-      auto book_it = bookkeping_queues_.find(ds_id);
-      PERFETTO_CHECK(unwind_it != unwind_queues_.end());
-      PERFETTO_CHECK(book_it != bookkeping_queues_.end());
-
-      ProcessUnwindQueue(&unwind_it->second, &book_it->second, &ds_it->second);
-      // TODO(rsavitski): also flush the bookkeeping queue.
       endpoint_->NotifyFlushComplete(flush_id);
     }
   }
@@ -405,6 +422,8 @@
     ret.frames.emplace_back(unwind_state.AnnotateFrame(std::move(frame)));
   }
 
+  // TODO(rsavitski): we still get a partially-unwound stack on most errors,
+  // consider adding a synthetic "error frame" like heapprofd.
   if (error_code != unwindstack::ERROR_NONE)
     ret.unwind_error = true;
 
@@ -423,11 +442,27 @@
     return;
   }
 
+  auto ds_it = data_sources_.find(ds_id);
+  PERFETTO_CHECK(ds_it != data_sources_.end());
+  DataSource& ds = ds_it->second;
+
   auto& queue = q_it->second;
   while (!queue.empty()) {
     BookkeepingEntry& entry = queue.front();
-    PERFETTO_DLOG("Bookkeeping sample: pid:[%d], ts:[%" PRIu64 "]",
-                  static_cast<int>(entry.pid), entry.timestamp);
+
+    GlobalCallstackTrie::Node* node =
+        callstack_trie_.CreateCallsite(entry.frames);
+    std::vector<Interned<Frame>> extracted =
+        callstack_trie_.BuildCallstack(node);
+
+    PERFETTO_DLOG("Extracted from interner:");
+    for (const auto& f : extracted) {
+      PERFETTO_DLOG("%u -> %s", static_cast<unsigned>(f.id()),
+                    f->function_name->c_str());
+    }
+
+    auto packet = ds.trace_writer->NewTracePacket();
+    packet->set_timestamp(entry.timestamp);
 
     queue.pop();
   }
diff --git a/src/profiling/perf/perf_producer.h b/src/profiling/perf/perf_producer.h
index ecd6be0..1f72ee2 100644
--- a/src/profiling/perf/perf_producer.h
+++ b/src/profiling/perf/perf_producer.h
@@ -32,7 +32,9 @@
 #include "perfetto/ext/base/weak_ptr.h"
 #include "perfetto/ext/tracing/core/basic_types.h"
 #include "perfetto/ext/tracing/core/producer.h"
+#include "perfetto/ext/tracing/core/trace_writer.h"
 #include "perfetto/ext/tracing/core/tracing_service.h"
+#include "src/profiling/common/callstack_trie.h"
 #include "src/profiling/common/unwind_support.h"
 #include "src/profiling/perf/event_config.h"
 #include "src/profiling/perf/event_reader.h"
@@ -83,8 +85,12 @@
   };
 
   struct DataSource {
-    DataSource(EventReader _event_reader)
-        : event_reader(std::move(_event_reader)) {}
+    DataSource(std::unique_ptr<TraceWriter> _trace_writer,
+               EventReader _event_reader)
+        : trace_writer(std::move(_trace_writer)),
+          event_reader(std::move(_event_reader)) {}
+
+    std::unique_ptr<TraceWriter> trace_writer;
 
     // TODO(rsavitski): event reader per kernel buffer.
     EventReader event_reader;
@@ -153,6 +159,7 @@
   BookkeepingEntry UnwindSample(ParsedSample sample,
                                 DataSource::ProcDescriptors* process_state);
 
+  // TODO(rsavitski): bookkeeping is not an entirely accurate name for this.
   void TickDataSourceBookkeep(DataSourceInstanceID ds_id);
 
   // Task runner owned by the main thread.
@@ -167,6 +174,15 @@
   // Owns shared memory, must outlive trace writing.
   std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
 
+  // Interns callstacks across all data sources.
+  // TODO(rsavitski): for long profiling sessions, consider purging trie when it
+  // grows too large (at the moment purged only when no sources are active).
+  // TODO(rsavitski): interning sequences are monotonic for the lifetime of the
+  // daemon. Consider resetting them at safe points - possible when no sources
+  // are active, and tricky otherwise, as it'll require emitting incremental
+  // sequence invalidation packets on all relevant sequences.
+  GlobalCallstackTrie callstack_trie_;
+
   std::map<DataSourceInstanceID, DataSource> data_sources_;
   std::map<DataSourceInstanceID, std::deque<UnwindEntry>> unwind_queues_;
   std::map<DataSourceInstanceID, std::queue<BookkeepingEntry>>