traced_perf: basic metatracing

Bug: 144281346
Change-Id: I0cd9a59c4e2f85bf7082d2d0abc7e027c05aa57a
diff --git a/Android.bp b/Android.bp
index 4c7c85c..fd0c8c8 100644
--- a/Android.bp
+++ b/Android.bp
@@ -7685,6 +7685,7 @@
     ":perfetto_src_protozero_protozero",
     ":perfetto_src_tracing_common",
     ":perfetto_src_tracing_core_core",
+    ":perfetto_src_tracing_core_service",
     ":perfetto_src_tracing_ipc_common",
     ":perfetto_src_tracing_ipc_producer_producer",
     "src/profiling/perf/main.cc",
diff --git a/include/perfetto/ext/base/metatrace_events.h b/include/perfetto/ext/base/metatrace_events.h
index e3cfb77..fd6a5dd 100644
--- a/include/perfetto/ext/base/metatrace_events.h
+++ b/include/perfetto/ext/base/metatrace_events.h
@@ -29,6 +29,7 @@
   TAG_PROC_POLLERS = 1 << 1,
   TAG_TRACE_WRITER = 1 << 2,
   TAG_TRACE_SERVICE = 1 << 3,
+  TAG_PRODUCER = 1 << 4,
 };
 
 // The macros below generate matching enums and arrays of string literals.
@@ -60,7 +61,11 @@
   F(FTRACE_READ_TICK), \
   F(FTRACE_CPU_READ_CYCLE), \
   F(FTRACE_CPU_READ_BATCH), \
-  F(KALLSYMS_PARSE)
+  F(KALLSYMS_PARSE), \
+  F(PROFILER_READ_TICK), \
+  F(PROFILER_READ_CPU), \
+  F(PROFILER_UNWIND_TICK), \
+  F(PROFILER_UNWIND_SAMPLE)
 
 // Append only, see above.
 //
@@ -71,7 +76,8 @@
   F(COUNTER_ZERO_UNUSED),\
   F(FTRACE_PAGES_DRAINED), \
   F(PS_PIDS_SCANNED), \
-  F(TRACE_SERVICE_COMMIT_DATA)
+  F(TRACE_SERVICE_COMMIT_DATA), \
+  F(PROFILER_UNWIND_QUEUE_SZ)
 
 // clang-format on
 
diff --git a/src/profiling/perf/BUILD.gn b/src/profiling/perf/BUILD.gn
index 9b4e2d1..5f36377 100644
--- a/src/profiling/perf/BUILD.gn
+++ b/src/profiling/perf/BUILD.gn
@@ -44,6 +44,7 @@
   public_deps = [
     ":regs_parsing",
     "../../../include/perfetto/tracing/core",
+    "../../../src/tracing/core:service",  # for metatrace
   ]
   deps = [
     ":proc_descriptors",
diff --git a/src/profiling/perf/perf_producer.cc b/src/profiling/perf/perf_producer.cc
index 0437254..c6fd0db 100644
--- a/src/profiling/perf/perf_producer.cc
+++ b/src/profiling/perf/perf_producer.cc
@@ -25,6 +25,7 @@
 
 #include "perfetto/base/logging.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/metatrace.h"
 #include "perfetto/ext/base/weak_ptr.h"
 #include "perfetto/ext/tracing/core/basic_types.h"
 #include "perfetto/ext/tracing/core/producer.h"
@@ -128,6 +129,13 @@
   PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(instance_id),
                config.name().c_str());
 
+  if (config.name() == MetatraceWriter::kDataSourceName) {
+    StartMetatraceSource(instance_id,
+                         static_cast<BufferID>(config.target_buffer()));
+    return;
+  }
+
+  // linux.perf data source
   if (config.name() != kDataSourceName)
     return;
 
@@ -192,6 +200,16 @@
 
 void PerfProducer::StopDataSource(DataSourceInstanceID instance_id) {
   PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(instance_id));
+
+  // Metatrace: stop immediately (will miss the events from the
+  // asynchronous shutdown of the primary data source).
+  auto meta_it = metatrace_writers_.find(instance_id);
+  if (meta_it != metatrace_writers_.end()) {
+    meta_it->second.WriteAllAndFlushTraceWriter([] {});
+    metatrace_writers_.erase(meta_it);
+    return;
+  }
+
   auto ds_it = data_sources_.find(instance_id);
   if (ds_it == data_sources_.end())
     return;
@@ -209,14 +227,22 @@
 void PerfProducer::Flush(FlushRequestID flush_id,
                          const DataSourceInstanceID* data_source_ids,
                          size_t num_data_sources) {
+  bool should_ack_flush = false;
   for (size_t i = 0; i < num_data_sources; i++) {
     auto ds_id = data_source_ids[i];
     PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
-    auto ds_it = data_sources_.find(ds_id);
-    if (ds_it != data_sources_.end()) {
-      endpoint_->NotifyFlushComplete(flush_id);
+
+    auto meta_it = metatrace_writers_.find(ds_id);
+    if (meta_it != metatrace_writers_.end()) {
+      meta_it->second.WriteAllAndFlushTraceWriter([] {});
+      should_ack_flush = true;
+    }
+    if (data_sources_.find(ds_id) != data_sources_.end()) {
+      should_ack_flush = true;
     }
   }
+  if (should_ack_flush)
+    endpoint_->NotifyFlushComplete(flush_id);
 }
 
 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
@@ -228,6 +254,8 @@
   }
   DataSource& ds = it->second;
 
+  PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
+
   // Make a pass over all per-cpu readers.
   bool more_records_available = false;
   for (EventReader& reader : ds.per_cpu_readers) {
@@ -256,6 +284,7 @@
                                             DataSourceInstanceID ds_id,
                                             DataSource* ds) {
   using Status = DataSource::ProcDescriptors::Status;
+  PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
 
   // If the kernel ring buffer dropped data, record it in the trace.
   size_t cpu = reader->cpu();
@@ -279,7 +308,7 @@
       continue;
     }
 
-    // Request proc-fds for the process if this is the first time we see it yet.
+    // Request proc-fds for the process if this is the first time we see it.
     pid_t pid = sample->pid;
     auto& fd_entry = ds->proc_fds[pid];  // created if absent
 
@@ -299,10 +328,14 @@
 
     // Push the sample into a dedicated unwinding queue.
     unwind_queues_[ds_id].emplace_back(std::move(sample.value()));
+
+    // Metatrace: counter sensible only when there's a single active source.
+    PERFETTO_METATRACE_COUNTER(TAG_PRODUCER, PROFILER_UNWIND_QUEUE_SZ,
+                               unwind_queues_[ds_id].size());
   }
 
-  // Most likely more events in the buffer - technically, max_samples can stop
-  // us right at the boundary.
+  // Most likely more events in the buffer. Though we might be exactly on the
+  // boundary due to |max_samples|.
   return true;
 }
 
@@ -371,6 +404,8 @@
   auto unwind_it = unwind_queues_.find(ds_id);
   PERFETTO_CHECK(unwind_it != unwind_queues_.end());
 
+  PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_UNWIND_TICK);
+
   bool queue_active =
       ProcessUnwindQueue(ds_id, &unwind_it->second, &ds_it->second);
 
@@ -439,11 +474,12 @@
 
     // Sample ready - process it.
     if (fd_status == Status::kResolved) {
+      PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_UNWIND_SAMPLE);
+
       PerfProducer::CompletedSample unwound_sample =
           UnwindSample(std::move(sample), &proc_fd_it->second);
 
       PostEmitSample(ds_id, std::move(unwound_sample));
-
       entry.valid = false;
       continue;
     }
@@ -457,6 +493,9 @@
     queue.pop_front();
   }
 
+  // Metatrace: counter sensible only when there's a single active source.
+  PERFETTO_METATRACE_COUNTER(TAG_PRODUCER, PROFILER_UNWIND_QUEUE_SZ,
+                             queue.size());
   PERFETTO_DLOG("Unwind queue drain: [%zu]->[%zu]", num_samples, queue.size());
 
   // Return whether we're done with unwindings for this source.
@@ -638,6 +677,18 @@
   }
 }
 
+void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
+                                        BufferID target_buffer) {
+  auto writer = endpoint_->CreateTraceWriter(target_buffer);
+
+  auto it_and_inserted = metatrace_writers_.emplace(
+      std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
+  PERFETTO_DCHECK(it_and_inserted.second);
+  // Note: only the first concurrent writer will actually be active.
+  metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
+                                   metatrace::TAG_ANY);
+}
+
 void PerfProducer::ConnectWithRetries(const char* socket_name) {
   PERFETTO_DCHECK(state_ == kNotStarted);
   state_ = kNotConnected;
@@ -670,10 +721,19 @@
   ResetConnectionBackoff();
   PERFETTO_LOG("Connected to the service");
 
-  DataSourceDescriptor desc;
-  desc.set_name(kDataSourceName);
-  desc.set_will_notify_on_stop(true);
-  endpoint_->RegisterDataSource(desc);
+  {
+    // linux.perf
+    DataSourceDescriptor desc;
+    desc.set_name(kDataSourceName);
+    desc.set_will_notify_on_stop(true);
+    endpoint_->RegisterDataSource(desc);
+  }
+  {
+    // metatrace
+    DataSourceDescriptor desc;
+    desc.set_name(MetatraceWriter::kDataSourceName);
+    endpoint_->RegisterDataSource(desc);
+  }
 }
 
 void PerfProducer::OnDisconnect() {
diff --git a/src/profiling/perf/perf_producer.h b/src/profiling/perf/perf_producer.h
index ee2a661..1b127c7 100644
--- a/src/profiling/perf/perf_producer.h
+++ b/src/profiling/perf/perf_producer.h
@@ -41,6 +41,7 @@
 #include "src/profiling/perf/event_config.h"
 #include "src/profiling/perf/event_reader.h"
 #include "src/profiling/perf/proc_descriptors.h"
+#include "src/tracing/core/metatrace_writer.h"
 
 namespace perfetto {
 namespace profiling {
@@ -197,6 +198,8 @@
   // service of the stop.
   void FinishDataSourceStop(DataSourceInstanceID ds_id);
 
+  void StartMetatraceSource(DataSourceInstanceID ds_id, BufferID target_buffer);
+
   // Task runner owned by the main thread.
   base::TaskRunner* const task_runner_;
   State state_ = kNotStarted;
@@ -209,6 +212,10 @@
   // Owns shared memory, must outlive trace writing.
   std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
 
+  // If multiple metatrace sources are enabled concurrently,
+  // only the first one becomes active.
+  std::map<DataSourceInstanceID, MetatraceWriter> metatrace_writers_;
+
   // Interns callstacks across all data sources.
   // TODO(rsavitski): for long profiling sessions, consider purging trie when it
   // grows too large (at the moment purged only when no sources are active).