Simplify ftrace architecture and integration with traced_probes

Historically the ftrace reader code has been strongly decoupled
from the rest of the codebase. The use case that was justifying
it (fall back into a library for other perf tools) is no more,
and we are left with extra layers that are unneeded and hurt
code readability. This CL removes the glue layers between
ftrace and probes_producer, in preparation of upcoming behavioral
changes (Flush). The main changes introduced by this CL are:
- Introduce a base class with hand-rolled RTTI for probes_producer.
  This simplifies the bookkeeping logic within the traced_probes
  binary.
- Collapse Ftrace's Sink and SinkDelegate into a FtraceDataSource
  class. FDS keeps track of all the state of ftrace for a given
  tracing session.
- Remove ftrace/end_to_end_integrationtest.cc, it had just two
  tests and they were disabled.
- Minor cleanups: introduce PERFETTO_WARN_UNUSED_RESULT; move
  stats to a dedicated header.

Change-Id: I7047fc07bbaf9f9bf862cdb81c87e567ffbc6779
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 1e100c4..1fd3ad5 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -24,6 +24,7 @@
 #include <string>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
 #include "perfetto/base/weak_ptr.h"
 #include "perfetto/traced/traced.h"
 #include "perfetto/tracing/core/data_source_config.h"
@@ -33,6 +34,8 @@
 #include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/ipc/producer_ipc_client.h"
 #include "src/traced/probes/filesystem/inode_file_data_source.h"
+#include "src/traced/probes/ftrace/ftrace_data_source.h"
+#include "src/traced/probes/probes_data_source.h"
 
 #include "perfetto/trace/filesystem/inode_file_map.pbzero.h"
 #include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
@@ -58,8 +61,12 @@
 //                    +--------------+
 //
 
-ProbesProducer::ProbesProducer() {}
-ProbesProducer::~ProbesProducer() = default;
+ProbesProducer::ProbesProducer() : weak_factory_(this) {}
+ProbesProducer::~ProbesProducer() {
+  // The ftrace data sources must be deleted before the ftrace controller.
+  data_sources_.clear();
+  ftrace_.reset();
+}
 
 void ProbesProducer::OnConnect() {
   PERFETTO_DCHECK(state_ == kConnecting);
@@ -111,50 +118,38 @@
 
 void ProbesProducer::CreateDataSourceInstance(DataSourceInstanceID instance_id,
                                               const DataSourceConfig& config) {
+  PERFETTO_DCHECK(data_sources_.count(instance_id) == 0);
+
   // TODO(hjd): This a hack since we don't actually know the session id. For
   // now we'll assume anything wit hthe same target buffer is in the same
   // session.
   TracingSessionID session_id = config.target_buffer();
 
+  std::unique_ptr<ProbesDataSource> data_source;
   if (config.name() == kFtraceSourceName) {
-    if (!CreateFtraceDataSourceInstance(session_id, instance_id, config))
-      failed_sources_.insert(instance_id);
+    data_source = CreateFtraceDataSource(session_id, instance_id, config);
   } else if (config.name() == kInodeMapSourceName) {
-    CreateInodeFileDataSourceInstance(session_id, instance_id, config);
+    data_source = CreateInodeFileDataSource(session_id, instance_id, config);
   } else if (config.name() == kProcessStatsSourceName) {
-    CreateProcessStatsDataSourceInstance(session_id, instance_id, config);
-  } else {
-    PERFETTO_ELOG("Data source name: %s not recognised.",
-                  config.name().c_str());
+    data_source = CreateProcessStatsDataSource(session_id, instance_id, config);
+  }
+
+  if (!data_source) {
+    PERFETTO_ELOG("Failed to create data source '%s'", config.name().c_str());
     return;
   }
 
-  std::map<TracingSessionID, InodeFileDataSource*> file_sources;
-  std::map<TracingSessionID, ProcessStatsDataSource*> ps_sources;
-  for (const auto& pair : file_map_sources_)
-    file_sources[pair.second->session_id()] = pair.second.get();
-  for (const auto& pair : process_stats_sources_)
-    ps_sources[pair.second->session_id()] = pair.second.get();
+  session_data_sources_.emplace(session_id, data_source.get());
+  data_sources_[instance_id] = std::move(data_source);
 
-  for (const auto& id_to_source : delegates_) {
-    const std::unique_ptr<SinkDelegate>& source = id_to_source.second;
-    if (session_id != source->session_id())
-      continue;
-    if (!source->ps_source() && ps_sources.count(session_id))
-      source->set_ps_source(ps_sources[session_id]->GetWeakPtr());
-    if (!source->file_source() && file_sources.count(session_id))
-      source->set_file_source(file_sources[session_id]->GetWeakPtr());
+  if (config.trace_duration_ms() != 0) {
+    uint32_t timeout = 5000 + 2 * config.trace_duration_ms();
+    watchdogs_.emplace(
+        instance_id, base::Watchdog::GetInstance()->CreateFatalTimer(timeout));
   }
 }
 
-void ProbesProducer::AddWatchdogsTimer(DataSourceInstanceID id,
-                                       const DataSourceConfig& config) {
-  if (config.trace_duration_ms() != 0)
-    watchdogs_.emplace(id, base::Watchdog::GetInstance()->CreateFatalTimer(
-                               5000 + 2 * config.trace_duration_ms()));
-}
-
-bool ProbesProducer::CreateFtraceDataSourceInstance(
+std::unique_ptr<ProbesDataSource> ProbesProducer::CreateFtraceDataSource(
     TracingSessionID session_id,
     DataSourceInstanceID id,
     const DataSourceConfig& config) {
@@ -162,16 +157,16 @@
   // This can legitimately happen on user builds where we cannot access the
   // debug paths, e.g., because of SELinux rules.
   if (ftrace_creation_failed_)
-    return false;
+    return nullptr;
 
   // Lazily create on the first instance.
   if (!ftrace_) {
-    ftrace_ = FtraceController::Create(task_runner_);
+    ftrace_ = FtraceController::Create(task_runner_, this);
 
     if (!ftrace_) {
       PERFETTO_ELOG("Failed to create FtraceController");
       ftrace_creation_failed_ = true;
-      return false;
+      return nullptr;
     }
 
     ftrace_->DisableAllEvents();
@@ -180,72 +175,65 @@
 
   PERFETTO_LOG("Ftrace start (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
                config.target_buffer());
-
-  FtraceConfig proto_config = config.ftrace_config();
-
-  auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(config.target_buffer()));
-  auto delegate = std::unique_ptr<SinkDelegate>(
-      new SinkDelegate(session_id, task_runner_, std::move(trace_writer)));
-  auto sink = ftrace_->CreateSink(std::move(proto_config), delegate.get());
-  if (!sink) {
-    PERFETTO_ELOG("Failed to start tracing (maybe someone else is using it?)");
-    return false;
+  const BufferID buffer_id = static_cast<BufferID>(config.target_buffer());
+  std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
+      ftrace_->GetWeakPtr(), session_id, config.ftrace_config(),
+      endpoint_->CreateTraceWriter(buffer_id)));
+  if (!ftrace_->AddDataSource(data_source.get())) {
+    PERFETTO_ELOG(
+        "Failed to start tracing (too many concurrent sessions or ftrace is "
+        "already in use)");
+    return nullptr;
   }
-  delegate->set_sink(std::move(sink));
-  delegates_.emplace(id, std::move(delegate));
-  AddWatchdogsTimer(id, config);
-  return true;
+  return std::move(data_source);
 }
 
-void ProbesProducer::CreateInodeFileDataSourceInstance(
+std::unique_ptr<ProbesDataSource> ProbesProducer::CreateInodeFileDataSource(
     TracingSessionID session_id,
     DataSourceInstanceID id,
     DataSourceConfig source_config) {
   PERFETTO_LOG("Inode file map start (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
                id, source_config.target_buffer());
-  auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(source_config.target_buffer()));
+  auto buffer_id = static_cast<BufferID>(source_config.target_buffer());
   if (system_inodes_.empty())
     CreateStaticDeviceToInodeMap("/system", &system_inodes_);
-  auto file_map_source =
-      std::unique_ptr<InodeFileDataSource>(new InodeFileDataSource(
-          std::move(source_config), task_runner_, session_id, &system_inodes_,
-          &cache_, std::move(trace_writer)));
-  file_map_sources_.emplace(id, std::move(file_map_source));
-  AddWatchdogsTimer(id, source_config);
+  return std::unique_ptr<InodeFileDataSource>(new InodeFileDataSource(
+      std::move(source_config), task_runner_, session_id, &system_inodes_,
+      &cache_, endpoint_->CreateTraceWriter(buffer_id)));
 }
 
-void ProbesProducer::CreateProcessStatsDataSourceInstance(
+std::unique_ptr<ProbesDataSource> ProbesProducer::CreateProcessStatsDataSource(
     TracingSessionID session_id,
     DataSourceInstanceID id,
     const DataSourceConfig& config) {
-  PERFETTO_DCHECK(process_stats_sources_.count(id) == 0);
-  auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(config.target_buffer()));
-  auto source = std::unique_ptr<ProcessStatsDataSource>(
-      new ProcessStatsDataSource(session_id, std::move(trace_writer), config));
-  auto it_and_inserted = process_stats_sources_.emplace(id, std::move(source));
-  if (!it_and_inserted.second) {
-    PERFETTO_DCHECK(false);
-    return;
-  }
-  ProcessStatsDataSource* ps_data_source = it_and_inserted.first->second.get();
+  base::ignore_result(id);
+  auto buffer_id = static_cast<BufferID>(config.target_buffer());
+  auto data_source =
+      std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
+          session_id, endpoint_->CreateTraceWriter(buffer_id), config));
   if (config.process_stats_config().scan_all_processes_on_start()) {
-    ps_data_source->WriteAllProcesses();
+    data_source->WriteAllProcesses();
   }
+  return std::move(data_source);
 }
 
 void ProbesProducer::TearDownDataSourceInstance(DataSourceInstanceID id) {
   PERFETTO_LOG("Producer stop (id=%" PRIu64 ")", id);
-  // |id| could be the id of any of the datasources we handle:
-  PERFETTO_DCHECK((failed_sources_.count(id) + delegates_.count(id) +
-                   process_stats_sources_.count(id) +
-                   file_map_sources_.count(id)) == 1);
-  failed_sources_.erase(id);
-  delegates_.erase(id);
-  process_stats_sources_.erase(id);
-  file_map_sources_.erase(id);
+  auto it = data_sources_.find(id);
+  if (it == data_sources_.end()) {
+    PERFETTO_ELOG("Cannot stop data source id=%" PRIu64 ", not found", id);
+    return;
+  }
+  ProbesDataSource* data_source = it->second.get();
+  TracingSessionID session_id = data_source->tracing_session_id;
+  auto range = session_data_sources_.equal_range(session_id);
+  for (auto kv = range.first; kv != range.second; kv++) {
+    if (kv->second != data_source)
+      continue;
+    session_data_sources_.erase(kv);
+    break;
+  }
+  data_sources_.erase(it);
   watchdogs_.erase(id);
 }
 
@@ -255,26 +243,62 @@
                            const DataSourceInstanceID* data_source_ids,
                            size_t num_data_sources) {
   for (size_t i = 0; i < num_data_sources; i++) {
-    DataSourceInstanceID ds_id = data_source_ids[i];
-    {
-      auto it = process_stats_sources_.find(ds_id);
-      if (it != process_stats_sources_.end())
-        it->second->Flush();
-    }
-    {
-      auto it = file_map_sources_.find(ds_id);
-      if (it != file_map_sources_.end())
-        it->second->Flush();
-    }
-    {
-      auto it = delegates_.find(ds_id);
-      if (it != delegates_.end())
-        it->second->Flush();
-    }
+    auto it = data_sources_.find(data_source_ids[i]);
+    if (it == data_sources_.end())
+      continue;
+    it->second->Flush();
   }
   endpoint_->NotifyFlushComplete(flush_request_id);
 }
 
+// This function is called by the FtraceController in batches, whenever it has
+// read one or more pages from one or more cpus and written that into the
+// userspace tracing buffer. If more than one ftrace data sources are active,
+// this call typically happens after writing for all session has been handled.
+void ProbesProducer::OnFtraceDataWrittenIntoDataSourceBuffers() {
+  TracingSessionID last_session_id = 0;
+  FtraceMetadata* metadata = nullptr;
+  InodeFileDataSource* inode_data_source = nullptr;
+  ProcessStatsDataSource* ps_data_source = nullptr;
+
+  // unordered_multimap guarantees that entries with the same key are contiguous
+  // in the iteration.
+  for (auto it = session_data_sources_.begin(); /* check below*/; it++) {
+    // If this is the last iteration or this is the session id has changed,
+    // dispatch the metadata update to the linked data sources, if any.
+    if (it == session_data_sources_.end() || it->first != last_session_id) {
+      bool has_inodes = metadata && !metadata->inode_and_device.empty();
+      bool has_pids = metadata && !metadata->pids.empty();
+      if (has_inodes && inode_data_source)
+        inode_data_source->OnInodes(metadata->inode_and_device);
+      if (has_pids && ps_data_source)
+        ps_data_source->OnPids(metadata->pids);
+      if (metadata)
+        metadata->Clear();
+      metadata = nullptr;
+      inode_data_source = nullptr;
+      ps_data_source = nullptr;
+      if (it == session_data_sources_.end())
+        break;
+      last_session_id = it->first;
+    }
+    ProbesDataSource* ds = it->second;
+    switch (ds->type_id) {
+      case FtraceDataSource::kTypeId:
+        metadata = static_cast<FtraceDataSource*>(ds)->mutable_metadata();
+        break;
+      case InodeFileDataSource::kTypeId:
+        inode_data_source = static_cast<InodeFileDataSource*>(ds);
+        break;
+      case ProcessStatsDataSource::kTypeId:
+        ps_data_source = static_cast<ProcessStatsDataSource*>(ds);
+        break;
+      default:
+        PERFETTO_DCHECK(false);
+    }  // switch (type_id)
+  }    // for (session_data_sources_)
+}
+
 void ProbesProducer::ConnectWithRetries(const char* socket_name,
                                         base::TaskRunner* task_runner) {
   PERFETTO_DCHECK(state_ == kNotStarted);
@@ -303,81 +327,4 @@
   connection_backoff_ms_ = kInitialConnectionBackoffMs;
 }
 
-ProbesProducer::SinkDelegate::SinkDelegate(TracingSessionID id,
-                                           base::TaskRunner* task_runner,
-                                           std::unique_ptr<TraceWriter> writer)
-    : session_id_(id),
-      task_runner_(task_runner),
-      writer_(std::move(writer)),
-      weak_factory_(this) {}
-
-ProbesProducer::SinkDelegate::~SinkDelegate() = default;
-
-void ProbesProducer::SinkDelegate::OnCreate(FtraceSink* sink) {
-  sink->DumpFtraceStats(&stats_before_);
-}
-
-void ProbesProducer::SinkDelegate::Flush() {
-  // TODO(primiano): this still doesn't flush data from the kernel ftrace
-  // buffers (see b/73886018). We should do that and delay the
-  // NotifyFlushComplete() until the ftrace data has been drained from the
-  // kernel ftrace buffer and written in the SMB.
-  if (writer_ && (!trace_packet_ || trace_packet_->is_finalized())) {
-    WriteStats();
-    writer_->Flush();
-  }
-}
-
-void ProbesProducer::SinkDelegate::WriteStats() {
-  {
-    auto before_packet = writer_->NewTracePacket();
-    auto out = before_packet->set_ftrace_stats();
-    out->set_phase(protos::pbzero::FtraceStats_Phase_START_OF_TRACE);
-    stats_before_.Write(out);
-  }
-  {
-    FtraceStats stats_after{};
-    sink_->DumpFtraceStats(&stats_after);
-    auto after_packet = writer_->NewTracePacket();
-    auto out = after_packet->set_ftrace_stats();
-    out->set_phase(protos::pbzero::FtraceStats_Phase_END_OF_TRACE);
-    stats_after.Write(out);
-  }
-}
-
-ProbesProducer::FtraceBundleHandle
-ProbesProducer::SinkDelegate::GetBundleForCpu(size_t) {
-  trace_packet_ = writer_->NewTracePacket();
-  return FtraceBundleHandle(trace_packet_->set_ftrace_events());
-}
-
-void ProbesProducer::SinkDelegate::OnBundleComplete(
-    size_t,
-    FtraceBundleHandle,
-    const FtraceMetadata& metadata) {
-  trace_packet_->Finalize();
-
-  if (file_source_ && !metadata.inode_and_device.empty()) {
-    auto inodes = metadata.inode_and_device;
-    auto weak_file_source = file_source_;
-    task_runner_->PostTask([weak_file_source, inodes] {
-      if (weak_file_source)
-        weak_file_source->OnInodes(inodes);
-    });
-  }
-  if (ps_source_ && !metadata.pids.empty()) {
-    const auto& quirks = ps_source_->config().process_stats_config().quirks();
-    if (std::find(quirks.begin(), quirks.end(),
-                  ProcessStatsConfig::DISABLE_ON_DEMAND) != quirks.end()) {
-      return;
-    }
-    const auto& pids = metadata.pids;
-    auto weak_ps_source = ps_source_;
-    task_runner_->PostTask([weak_ps_source, pids] {
-      if (weak_ps_source)
-        weak_ps_source->OnPids(pids);
-    });
-  }
-}
-
 }  // namespace perfetto