probe_producer: Post metadata to helper sources

ftrace_reader needs a way to signal to the inode/ps metadata sources
with vectors of intresting inodes/pids. It should signal only data
sources which belong to the same tracing session. These sources may not
exist. Rather than keeping many parallel maps up to date and having
lookups in the fast path instead everytime a new datasource is created
we do a pass though all the ftrace datasources and try to set weak
pointers on them leading to the corresponding inode/ps datasources.
Then when we see metadata we post task to relevent datasource via the
weak pointer.

Bug: 73873362
Bug: 73625715
Bug: 73625480
Change-Id: I8371067f0e948733ed8d321479031deaa4ca7db9
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 27064fd..854abf1 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -22,6 +22,7 @@
 #include <string>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/weak_ptr.h"
 #include "perfetto/traced/traced.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/data_source_descriptor.h"
@@ -41,7 +42,7 @@
 uint64_t kMaxConnectionBackoffMs = 30 * 1000;
 const char* kFtraceSourceName = "com.google.perfetto.ftrace";
 const char* kProcessStatsSourceName = "com.google.perfetto.process_stats";
-const char* kInodeFileMapSourceName = "com.google.perfetto.inode_file_map";
+const char* kInodeMapSourceName = "com.google.perfetto.inode_file_map";
 
 }  // namespace.
 
@@ -72,7 +73,7 @@
                                 [](DataSourceInstanceID) {});
 
   DataSourceDescriptor inode_map_descriptor;
-  inode_map_descriptor.set_name(kInodeFileMapSourceName);
+  inode_map_descriptor.set_name(kInodeMapSourceName);
   endpoint_->RegisterDataSource(inode_map_descriptor,
                                 [](DataSourceInstanceID) {});
 }
@@ -88,31 +89,54 @@
                                 connection_backoff_ms_);
 }
 
-void ProbesProducer::CreateDataSourceInstance(
-    DataSourceInstanceID id,
-    const DataSourceConfig& source_config) {
-  if (source_config.name() == kFtraceSourceName) {
-    CreateFtraceDataSourceInstance(id, source_config);
-  } else if (source_config.name() == kInodeFileMapSourceName) {
-    CreateInodeFileDataSourceInstance(id, source_config);
-  } else if (source_config.name() == kProcessStatsSourceName) {
-    CreateProcessStatsDataSourceInstance(id, source_config);
+void ProbesProducer::CreateDataSourceInstance(DataSourceInstanceID instance_id,
+                                              const DataSourceConfig& config) {
+  // TODO(hjd): This a hack since we don't actually know the session id. For
+  // now we'll assume anything wit hthe same target buffer is in the same
+  // session.
+  TracingSessionID session_id = config.target_buffer();
+
+  if (config.name() == kFtraceSourceName) {
+    CreateFtraceDataSourceInstance(session_id, instance_id, config);
+  } else if (config.name() == kInodeMapSourceName) {
+    CreateInodeFileDataSourceInstance(session_id, instance_id, config);
+  } else if (config.name() == kProcessStatsSourceName) {
+    CreateProcessStatsDataSourceInstance(session_id, instance_id, config);
   } else {
     PERFETTO_ELOG("Data source name: %s not recognised.",
-                  source_config.name().c_str());
+                  config.name().c_str());
+    return;
+  }
+
+  std::map<TracingSessionID, InodeFileDataSource*> file_sources;
+  std::map<TracingSessionID, ProcessStatsDataSource*> ps_sources;
+  for (const auto& pair : file_map_sources_)
+    file_sources[pair.second->session_id()] = pair.second.get();
+  for (const auto& pair : process_stats_sources_)
+    ps_sources[pair.second->session_id()] = pair.second.get();
+
+  for (const auto& id_to_source : delegates_) {
+    const std::unique_ptr<SinkDelegate>& source = id_to_source.second;
+    if (session_id != source->session_id())
+      continue;
+    if (!source->ps_source() && ps_sources.count(session_id))
+      source->set_ps_source(ps_sources[session_id]->GetWeakPtr());
+    if (!source->file_source() && file_sources.count(session_id))
+      source->set_file_source(file_sources[session_id]->GetWeakPtr());
   }
 }
 
 void ProbesProducer::AddWatchdogsTimer(DataSourceInstanceID id,
-                                       const DataSourceConfig& source_config) {
-  if (source_config.trace_duration_ms() != 0)
+                                       const DataSourceConfig& config) {
+  if (config.trace_duration_ms() != 0)
     watchdogs_.emplace(id, base::Watchdog::GetInstance()->CreateFatalTimer(
-                               5000 + 2 * source_config.trace_duration_ms()));
+                               5000 + 2 * config.trace_duration_ms()));
 }
 
 void ProbesProducer::CreateFtraceDataSourceInstance(
+    TracingSessionID session_id,
     DataSourceInstanceID id,
-    const DataSourceConfig& source_config) {
+    const DataSourceConfig& config) {
   // Don't retry if FtraceController::Create() failed once.
   // This can legitimately happen on user builds where we cannot access the
   // debug paths, e.g., because of SELinux rules.
@@ -134,15 +158,15 @@
   }
 
   PERFETTO_LOG("Ftrace start (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
-               source_config.target_buffer());
+               config.target_buffer());
 
-  FtraceConfig proto_config = source_config.ftrace_config();
+  FtraceConfig proto_config = config.ftrace_config();
 
   // TODO(hjd): Static cast is bad, target_buffer() should return a BufferID.
   auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(source_config.target_buffer()));
+      static_cast<BufferID>(config.target_buffer()));
   auto delegate = std::unique_ptr<SinkDelegate>(
-      new SinkDelegate(task_runner_, std::move(trace_writer)));
+      new SinkDelegate(session_id, task_runner_, std::move(trace_writer)));
   auto sink = ftrace_->CreateSink(std::move(proto_config), delegate.get());
   if (!sink) {
     PERFETTO_ELOG("Failed to start tracing (maybe someone else is using it?)");
@@ -150,10 +174,11 @@
   }
   delegate->set_sink(std::move(sink));
   delegates_.emplace(id, std::move(delegate));
-  AddWatchdogsTimer(id, source_config);
+  AddWatchdogsTimer(id, config);
 }
 
 void ProbesProducer::CreateInodeFileDataSourceInstance(
+    TracingSessionID session_id,
     DataSourceInstanceID id,
     const DataSourceConfig& source_config) {
   PERFETTO_LOG("Inode file map start (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
@@ -162,20 +187,22 @@
       static_cast<BufferID>(source_config.target_buffer()));
   if (system_inodes_.empty())
     CreateDeviceToInodeMap("/system/", &system_inodes_);
-  auto file_map_source = std::unique_ptr<InodeFileDataSource>(
-      new InodeFileDataSource(&system_inodes_, std::move(trace_writer)));
+  auto file_map_source =
+      std::unique_ptr<InodeFileDataSource>(new InodeFileDataSource(
+          session_id, &system_inodes_, std::move(trace_writer)));
   file_map_sources_.emplace(id, std::move(file_map_source));
   AddWatchdogsTimer(id, source_config);
 }
 
 void ProbesProducer::CreateProcessStatsDataSourceInstance(
+    TracingSessionID session_id,
     DataSourceInstanceID id,
-    const DataSourceConfig& source_config) {
+    const DataSourceConfig& config) {
   PERFETTO_DCHECK(process_stats_sources_.count(id) == 0);
   auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(source_config.target_buffer()));
+      static_cast<BufferID>(config.target_buffer()));
   auto source = std::unique_ptr<ProcessStatsDataSource>(
-      new ProcessStatsDataSource(std::move(trace_writer)));
+      new ProcessStatsDataSource(session_id, std::move(trace_writer)));
   auto it_and_inserted = process_stats_sources_.emplace(id, std::move(source));
   PERFETTO_DCHECK(it_and_inserted.second);
   it_and_inserted.first->second->WriteAllProcesses();
@@ -219,16 +246,17 @@
   connection_backoff_ms_ = kInitialConnectionBackoffMs;
 }
 
-ProbesProducer::SinkDelegate::SinkDelegate(base::TaskRunner* task_runner,
+ProbesProducer::SinkDelegate::SinkDelegate(TracingSessionID id,
+                                           base::TaskRunner* task_runner,
                                            std::unique_ptr<TraceWriter> writer)
-    : task_runner_(task_runner),
+    : session_id_(id),
+      task_runner_(task_runner),
       writer_(std::move(writer)),
       weak_factory_(this) {}
 
 ProbesProducer::SinkDelegate::~SinkDelegate() = default;
 
 ProbesProducer::FtraceBundleHandle
-
 ProbesProducer::SinkDelegate::GetBundleForCpu(size_t) {
   trace_packet_ = writer_->NewTracePacket();
   return FtraceBundleHandle(trace_packet_->set_ftrace_events());
@@ -239,19 +267,15 @@
     FtraceBundleHandle,
     const FtraceMetadata& metadata) {
   trace_packet_->Finalize();
-  if (!metadata.inodes.empty()) {
-    auto weak_this = weak_factory_.GetWeakPtr();
+
+  if (file_source_ && !metadata.inodes.empty()) {
     auto inodes = metadata.inodes;
-    task_runner_->PostTask([weak_this, inodes] {
-      if (weak_this)
-        weak_this->OnInodes(inodes);
+    auto weak_file_source = file_source_;
+    task_runner_->PostTask([weak_file_source, inodes] {
+      if (weak_file_source)
+        weak_file_source->OnInodes(inodes);
     });
   }
 }
 
-void ProbesProducer::SinkDelegate::OnInodes(
-    const std::vector<std::pair<Inode, uint32_t>>& inodes) {
-  PERFETTO_LOG("Saw FtraceBundle with %zu inodes.", inodes.size());
-}
-
 }  // namespace perfetto