Simplify ftrace architecture and integration with traced_probes

Historically the ftrace reader code has been strongly decoupled
from the rest of the codebase. The use case that was justifying
it (fall back into a library for other perf tools) is no more,
and we are left with extra layers that are unneeded and hurt
code readability. This CL removes the glue layers between
ftrace and probes_producer, in preparation of upcoming behavioral
changes (Flush). The main changes introduced by this CL are:
- Introduce a base class with hand-rolled RTTI for probes_producer.
  This simplifies the bookkeeping logic within the traced_probes
  binary.
- Collapse Ftrace's Sink and SinkDelegate into a FtraceDataSource
  class. FDS keeps track of all the state of ftrace for a given
  tracing session.
- Remove ftrace/end_to_end_integrationtest.cc, it had just two
  tests and they were disabled.
- Minor cleanups: introduce PERFETTO_WARN_UNUSED_RESULT; move
  stats to a dedicated header.

Change-Id: I7047fc07bbaf9f9bf862cdb81c87e567ffbc6779
diff --git a/Android.bp b/Android.bp
index 0cd32cc..cf7502e 100644
--- a/Android.bp
+++ b/Android.bp
@@ -70,11 +70,13 @@
     "src/traced/probes/ftrace/ftrace_config.cc",
     "src/traced/probes/ftrace/ftrace_config_muxer.cc",
     "src/traced/probes/ftrace/ftrace_controller.cc",
+    "src/traced/probes/ftrace/ftrace_data_source.cc",
     "src/traced/probes/ftrace/ftrace_metadata.cc",
     "src/traced/probes/ftrace/ftrace_procfs.cc",
-    "src/traced/probes/ftrace/ftrace_sink.cc",
+    "src/traced/probes/ftrace/ftrace_stats.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/probes.cc",
+    "src/traced/probes/probes_data_source.cc",
     "src/traced/probes/probes_producer.cc",
     "src/traced/probes/ps/process_stats_data_source.cc",
     "src/traced/service/service.cc",
@@ -320,19 +322,20 @@
     "src/traced/probes/ftrace/atrace_wrapper.cc",
     "src/traced/probes/ftrace/cpu_reader.cc",
     "src/traced/probes/ftrace/cpu_stats_parser.cc",
-    "src/traced/probes/ftrace/end_to_end_integrationtest.cc",
     "src/traced/probes/ftrace/event_info.cc",
     "src/traced/probes/ftrace/event_info_constants.cc",
     "src/traced/probes/ftrace/format_parser.cc",
     "src/traced/probes/ftrace/ftrace_config.cc",
     "src/traced/probes/ftrace/ftrace_config_muxer.cc",
     "src/traced/probes/ftrace/ftrace_controller.cc",
+    "src/traced/probes/ftrace/ftrace_data_source.cc",
     "src/traced/probes/ftrace/ftrace_metadata.cc",
     "src/traced/probes/ftrace/ftrace_procfs.cc",
     "src/traced/probes/ftrace/ftrace_procfs_integrationtest.cc",
-    "src/traced/probes/ftrace/ftrace_sink.cc",
+    "src/traced/probes/ftrace/ftrace_stats.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/ftrace/test/cpu_reader_support.cc",
+    "src/traced/probes/probes_data_source.cc",
     "src/traced/probes/probes_producer.cc",
     "src/traced/probes/ps/process_stats_data_source.cc",
     "src/tracing/core/chrome_config.cc",
@@ -3729,13 +3732,15 @@
     "src/traced/probes/ftrace/ftrace_config_unittest.cc",
     "src/traced/probes/ftrace/ftrace_controller.cc",
     "src/traced/probes/ftrace/ftrace_controller_unittest.cc",
+    "src/traced/probes/ftrace/ftrace_data_source.cc",
     "src/traced/probes/ftrace/ftrace_metadata.cc",
     "src/traced/probes/ftrace/ftrace_procfs.cc",
     "src/traced/probes/ftrace/ftrace_procfs_unittest.cc",
-    "src/traced/probes/ftrace/ftrace_sink.cc",
+    "src/traced/probes/ftrace/ftrace_stats.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/ftrace/proto_translation_table_unittest.cc",
     "src/traced/probes/ftrace/test/cpu_reader_support.cc",
+    "src/traced/probes/probes_data_source.cc",
     "src/traced/probes/probes_producer.cc",
     "src/traced/probes/ps/process_stats_data_source.cc",
     "src/traced/probes/ps/process_stats_data_source_unittest.cc",
diff --git a/include/perfetto/base/thread_checker.h b/include/perfetto/base/thread_checker.h
index c4da2ca..a6d0ed9 100644
--- a/include/perfetto/base/thread_checker.h
+++ b/include/perfetto/base/thread_checker.h
@@ -25,6 +25,7 @@
 #include <atomic>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
 
 namespace perfetto {
 namespace base {
@@ -41,7 +42,7 @@
   ~ThreadChecker();
   ThreadChecker(const ThreadChecker&);
   ThreadChecker& operator=(const ThreadChecker&);
-  bool CalledOnValidThread() const __attribute__((warn_unused_result));
+  bool CalledOnValidThread() const PERFETTO_WARN_UNUSED_RESULT;
   void DetachFromThread();
 
  private:
diff --git a/include/perfetto/base/utils.h b/include/perfetto/base/utils.h
index c1b5476..3279114 100644
--- a/include/perfetto/base/utils.h
+++ b/include/perfetto/base/utils.h
@@ -44,6 +44,12 @@
 #endif
 #endif
 
+#if defined(__GNUC__) || defined(__clang__)
+#define PERFETTO_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
+#else
+#define PERFETTO_WARN_UNUSED_RESULT
+#endif
+
 namespace perfetto {
 namespace base {
 
diff --git a/src/ipc/buffered_frame_deserializer.h b/src/ipc/buffered_frame_deserializer.h
index 52a5f5f..00f1a75 100644
--- a/src/ipc/buffered_frame_deserializer.h
+++ b/src/ipc/buffered_frame_deserializer.h
@@ -25,6 +25,7 @@
 #include <sys/mman.h>
 
 #include "perfetto/base/page_allocator.h"
+#include "perfetto/base/utils.h"
 #include "perfetto/ipc/basic_types.h"
 
 namespace perfetto {
@@ -100,7 +101,7 @@
   // buffer previously returned by BeginReceive() (the return value of recv()).
   // Returns false if a header > |max_capacity| is received, in which case the
   // caller is expected to shutdown the socket and terminate the ipc.
-  bool EndReceive(size_t recv_size) __attribute__((warn_unused_result));
+  bool EndReceive(size_t recv_size) PERFETTO_WARN_UNUSED_RESULT;
 
   // Decodes and returns the next decoded frame in the buffer if any, nullptr
   // if no further frames have been decoded.
diff --git a/src/traced/probes/BUILD.gn b/src/traced/probes/BUILD.gn
index 6f33405..babc591 100644
--- a/src/traced/probes/BUILD.gn
+++ b/src/traced/probes/BUILD.gn
@@ -31,6 +31,7 @@
     "ftrace",
   ]
   deps = [
+    ":data_source",
     "../../../gn:default_deps",
     "../../../include/perfetto/traced",
     "../../../protos/perfetto/trace/ps:zero",
@@ -46,6 +47,19 @@
   ]
 }
 
+# Base class for data sources in traced_probes.
+# Needs to be a separate target to avoid cyclical deps.
+source_set("data_source") {
+  deps = [
+    "../../../gn:default_deps",
+    "../../tracing",
+  ]
+  sources = [
+    "probes_data_source.cc",
+    "probes_data_source.h",
+  ]
+}
+
 source_set("unittests") {
   testonly = true
   deps = [
diff --git a/src/traced/probes/filesystem/BUILD.gn b/src/traced/probes/filesystem/BUILD.gn
index ecea7ee..45415e0 100644
--- a/src/traced/probes/filesystem/BUILD.gn
+++ b/src/traced/probes/filesystem/BUILD.gn
@@ -18,6 +18,7 @@
     "../../../tracing",
   ]
   deps = [
+    "..:data_source",
     "../../../../gn:default_deps",
     "../../../../include/perfetto/traced",
     "../../../base",
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index dac9bf1..02fcbf8 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -87,6 +87,9 @@
 
 }  // namespace
 
+// static
+constexpr int InodeFileDataSource::kTypeId;
+
 void CreateStaticDeviceToInodeMap(
     const std::string& root_directory,
     std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>*
@@ -109,18 +112,18 @@
 InodeFileDataSource::InodeFileDataSource(
     DataSourceConfig source_config,
     base::TaskRunner* task_runner,
-    TracingSessionID id,
+    TracingSessionID session_id,
     std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>*
         static_file_map,
     LRUInodeCache* cache,
     std::unique_ptr<TraceWriter> writer)
-    : source_config_(std::move(source_config)),
+    : ProbesDataSource(session_id, kTypeId),
+      source_config_(std::move(source_config)),
       scan_mount_points_(
           source_config_.inode_file_config().scan_mount_points().cbegin(),
           source_config_.inode_file_config().scan_mount_points().cend()),
       mount_point_mapping_(BuildMountpointMapping(source_config_)),
       task_runner_(task_runner),
-      session_id_(id),
       static_file_map_(static_file_map),
       cache_(cache),
       writer_(std::move(writer)),
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 926031c..181dfda 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -34,6 +34,7 @@
 #include "src/traced/probes/filesystem/file_scanner.h"
 #include "src/traced/probes/filesystem/fs_mount.h"
 #include "src/traced/probes/filesystem/lru_inode_cache.h"
+#include "src/traced/probes/probes_data_source.h"
 
 #include "perfetto/trace/filesystem/inode_file_map.pbzero.h"
 
@@ -48,8 +49,11 @@
     std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>*
         static_file_map);
 
-class InodeFileDataSource : public FileScanner::Delegate {
+class InodeFileDataSource : public ProbesDataSource,
+                            public FileScanner::Delegate {
  public:
+  static constexpr int kTypeId = 2;
+
   InodeFileDataSource(
       DataSourceConfig,
       base::TaskRunner*,
@@ -59,7 +63,8 @@
       LRUInodeCache* cache,
       std::unique_ptr<TraceWriter> writer);
 
-  TracingSessionID session_id() const { return session_id_; }
+  ~InodeFileDataSource() override;
+
   base::WeakPtr<InodeFileDataSource> GetWeakPtr() const;
 
   // Called when Inodes are seen in the FtraceEventBundle
@@ -74,9 +79,7 @@
   void AddInodesFromLRUCache(BlockDeviceID block_device_id,
                              std::set<Inode>* inode_numbers);
 
-  void Flush();
-
-  virtual ~InodeFileDataSource();
+  void Flush() override;
 
   virtual void FillInodeEntry(InodeFileMap* destination,
                               Inode inode_number,
@@ -94,8 +97,8 @@
   bool OnInodeFound(BlockDeviceID block_device_id,
                     Inode inode_number,
                     const std::string& path,
-                    protos::pbzero::InodeFileMap_Entry_Type type);
-  void OnInodeScanDone();
+                    protos::pbzero::InodeFileMap_Entry_Type type) override;
+  void OnInodeScanDone() override;
 
   void AddRootsForBlockDevice(BlockDeviceID block_device_id,
                               std::vector<std::string>* roots);
@@ -111,7 +114,6 @@
   std::map<std::string, std::vector<std::string>> mount_point_mapping_;
 
   base::TaskRunner* task_runner_;
-  const TracingSessionID session_id_;
   std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>*
       static_file_map_;
   LRUInodeCache* cache_;
diff --git a/src/traced/probes/ftrace/BUILD.gn b/src/traced/probes/ftrace/BUILD.gn
index bdbf3f9..09e9ac4 100644
--- a/src/traced/probes/ftrace/BUILD.gn
+++ b/src/traced/probes/ftrace/BUILD.gn
@@ -96,7 +96,6 @@
     "../../../tracing",
   ]
   sources = [
-    "end_to_end_integrationtest.cc",
     "ftrace_procfs_integrationtest.cc",
   ]
 }
@@ -108,6 +107,7 @@
     "../../../tracing",
   ]
   deps = [
+    "..:data_source",
     "../../../../gn:default_deps",
     "../../../../include/perfetto/traced",
     "../../../base",
@@ -132,12 +132,14 @@
     "ftrace_config_muxer.h",
     "ftrace_controller.cc",
     "ftrace_controller.h",
+    "ftrace_data_source.cc",
+    "ftrace_data_source.h",
     "ftrace_metadata.cc",
     "ftrace_metadata.h",
     "ftrace_procfs.cc",
     "ftrace_procfs.h",
-    "ftrace_sink.cc",
-    "ftrace_sink.h",
+    "ftrace_stats.cc",
+    "ftrace_stats.h",
     "proto_translation_table.cc",
     "proto_translation_table.h",
   ]
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index 875bb63..c73f3d5 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -28,10 +28,12 @@
 #include "perfetto/base/logging.h"
 #include "perfetto/base/metatrace.h"
 #include "perfetto/base/utils.h"
+#include "src/traced/probes/ftrace/ftrace_data_source.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 
 #include "perfetto/trace/ftrace/ftrace_event.pbzero.h"
 #include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
 
@@ -78,9 +80,6 @@
   return true;
 }
 
-using BundleHandle =
-    protozero::MessageHandle<protos::pbzero::FtraceEventBundle>;
-
 const std::vector<bool> BuildEnabledVector(const ProtoTranslationTable& table,
                                            const std::set<std::string>& names) {
   std::vector<bool> enabled(table.largest_id() + 1);
@@ -267,9 +266,7 @@
 #endif
 }
 
-bool CpuReader::Drain(const std::array<const EventFilter*, kMaxSinks>& filters,
-                      const std::array<BundleHandle, kMaxSinks>& bundles,
-                      const std::array<FtraceMetadata*, kMaxSinks>& metadatas) {
+bool CpuReader::Drain(const std::set<FtraceDataSource*>& data_sources) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   while (true) {
     uint8_t* buffer = GetBuffer();
@@ -279,23 +276,18 @@
       break;
     PERFETTO_CHECK(static_cast<size_t>(bytes) == base::kPageSize);
 
-    size_t evt_size = 0;
-    for (size_t i = 0; i < kMaxSinks; i++) {
-      if (!filters[i])
-        break;
-      evt_size =
-          ParsePage(buffer, filters[i], &*bundles[i], table_, metadatas[i]);
+    for (FtraceDataSource* data_source : data_sources) {
+      auto packet = data_source->trace_writer()->NewTracePacket();
+      auto* bundle = packet->set_ftrace_events();
+      auto* metadata = data_source->mutable_metadata();
+      auto* filter = data_source->event_filter();
+      size_t evt_size = ParsePage(buffer, filter, bundle, table_, metadata);
       PERFETTO_DCHECK(evt_size);
+      bundle->set_cpu(static_cast<uint32_t>(cpu_));
+      bundle->set_overwrite_count(metadata->overwrite_count);
     }
   }
 
-  for (size_t i = 0; i < kMaxSinks; i++) {
-    if (!filters[i])
-      break;
-    bundles[i]->set_cpu(static_cast<uint32_t>(cpu_));
-    bundles[i]->set_overwrite_count(metadatas[i]->overwrite_count);
-  }
-
   return true;
 }
 
@@ -316,7 +308,7 @@
 // This method is deliberately static so it can be tested independently.
 size_t CpuReader::ParsePage(const uint8_t* ptr,
                             const EventFilter* filter,
-                            protos::pbzero::FtraceEventBundle* bundle,
+                            FtraceEventBundle* bundle,
                             const ProtoTranslationTable* table,
                             FtraceMetadata* metadata) {
   const uint8_t* const start_of_page = ptr;
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index 77cdd46..61f0fb5 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -27,17 +27,19 @@
 #include <thread>
 
 #include "gtest/gtest_prod.h"
-#include "perfetto/base/build_config.h"
 #include "perfetto/base/page_allocator.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/thread_checker.h"
 #include "perfetto/protozero/message.h"
+#include "perfetto/protozero/message_handle.h"
 #include "perfetto/traced/data_source_types.h"
-#include "src/traced/probes/ftrace/ftrace_controller.h"
+#include "src/traced/probes/ftrace/ftrace_config.h"
+#include "src/traced/probes/ftrace/ftrace_metadata.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 
 namespace perfetto {
 
+class FtraceDataSource;
 class ProtoTranslationTable;
 
 namespace protos {
@@ -46,34 +48,14 @@
 }  // namespace pbzero
 }  // namespace protos
 
-// Class for efficient 'is event with id x enabled?' tests.
-// Mirrors the data in a FtraceConfig but in a format better suited
-// to be consumed by CpuReader.
-class EventFilter {
- public:
-  EventFilter(const ProtoTranslationTable&, std::set<std::string>);
-  ~EventFilter();
+class EventFilter;  // Declared down below.
 
-  bool IsEventEnabled(size_t ftrace_event_id) const {
-    if (ftrace_event_id == 0 || ftrace_event_id > enabled_ids_.size()) {
-      return false;
-    }
-    return enabled_ids_[ftrace_event_id];
-  }
-
-  const std::set<std::string>& enabled_names() const { return enabled_names_; }
-
- private:
-  EventFilter(const EventFilter&) = delete;
-  EventFilter& operator=(const EventFilter&) = delete;
-
-  const std::vector<bool> enabled_ids_;
-  std::set<std::string> enabled_names_;
-};
-
-// Processes raw ftrace data for a logical CPU core.
+// Reads raw ftrace data for a cpu and writes that into the perfetto userspace
+// buffer.
 class CpuReader {
  public:
+  using FtraceEventBundle = protos::pbzero::FtraceEventBundle;
+
   // |on_data_available| will be called on an arbitrary thread when at least one
   // page of ftrace data is available for draining on this CPU.
   CpuReader(const ProtoTranslationTable*,
@@ -82,13 +64,10 @@
             std::function<void()> on_data_available);
   ~CpuReader();
 
-  // Drains all available data from the staging pipe into the given sinks.
+  // Drains all available data from the staging pipe into the buffer of the
+  // passed data sources.
   // Should be called in response to the |on_data_available| callback.
-  bool Drain(const std::array<const EventFilter*, kMaxSinks>&,
-             const std::array<
-                 protozero::MessageHandle<protos::pbzero::FtraceEventBundle>,
-                 kMaxSinks>&,
-             const std::array<FtraceMetadata*, kMaxSinks>& metadatas);
+  bool Drain(const std::set<FtraceDataSource*>&);
 
   template <typename T>
   static bool ReadAndAdvance(const uint8_t** ptr, const uint8_t* end, T* out) {
@@ -211,7 +190,7 @@
   CpuReader(const CpuReader&) = delete;
   CpuReader& operator=(const CpuReader&) = delete;
 
-  const ProtoTranslationTable* table_;
+  const ProtoTranslationTable* const table_;
   const size_t cpu_;
   base::ScopedFile trace_fd_;
   base::ScopedFile staging_read_fd_;
@@ -222,6 +201,31 @@
   PERFETTO_THREAD_CHECKER(thread_checker_)
 };
 
+// Class for efficient 'is event with id x enabled?' tests.
+// Mirrors the data in a FtraceConfig but in a format better suited
+// to be consumed by CpuReader.
+class EventFilter {
+ public:
+  EventFilter(const ProtoTranslationTable&, std::set<std::string>);
+  ~EventFilter();
+
+  bool IsEventEnabled(size_t ftrace_event_id) const {
+    if (ftrace_event_id == 0 || ftrace_event_id > enabled_ids_.size()) {
+      return false;
+    }
+    return enabled_ids_[ftrace_event_id];
+  }
+
+  const std::set<std::string>& enabled_names() const { return enabled_names_; }
+
+ private:
+  EventFilter(const EventFilter&) = delete;
+  EventFilter& operator=(const EventFilter&) = delete;
+
+  const std::vector<bool> enabled_ids_;
+  std::set<std::string> enabled_names_;
+};
+
 }  // namespace perfetto
 
 #endif  // SRC_TRACED_PROBES_FTRACE_CPU_READER_H_
diff --git a/src/traced/probes/ftrace/cpu_stats_parser.cc b/src/traced/probes/ftrace/cpu_stats_parser.cc
index 46f020c..8048ee1 100644
--- a/src/traced/probes/ftrace/cpu_stats_parser.cc
+++ b/src/traced/probes/ftrace/cpu_stats_parser.cc
@@ -20,6 +20,7 @@
 #include "perfetto/base/string_utils.h"
 #include "src/traced/probes/ftrace/ftrace_controller.h"
 #include "src/traced/probes/ftrace/ftrace_procfs.h"
+#include "src/traced/probes/ftrace/ftrace_stats.h"
 
 namespace perfetto {
 namespace {
diff --git a/src/traced/probes/ftrace/cpu_stats_parser_unittest.cc b/src/traced/probes/ftrace/cpu_stats_parser_unittest.cc
index 35f0f72..f016f19 100644
--- a/src/traced/probes/ftrace/cpu_stats_parser_unittest.cc
+++ b/src/traced/probes/ftrace/cpu_stats_parser_unittest.cc
@@ -4,6 +4,7 @@
 #include "gtest/gtest.h"
 
 #include "src/traced/probes/ftrace/ftrace_controller.h"
+#include "src/traced/probes/ftrace/ftrace_stats.h"
 
 namespace perfetto {
 namespace {
diff --git a/src/traced/probes/ftrace/end_to_end_integrationtest.cc b/src/traced/probes/ftrace/end_to_end_integrationtest.cc
deleted file mode 100644
index f4c3daa..0000000
--- a/src/traced/probes/ftrace/end_to_end_integrationtest.cc
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (C) 2017 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fstream>
-#include <sstream>
-
-#include "gmock/gmock.h"
-#include "google/protobuf/text_format.h"
-#include "gtest/gtest.h"
-
-#include "perfetto/base/build_config.h"
-#include "perfetto/base/unix_task_runner.h"
-#include "perfetto/base/utils.h"
-#include "perfetto/protozero/scattered_stream_writer.h"
-#include "src/protozero/scattered_stream_delegate_for_testing.h"
-#include "src/traced/probes/ftrace/ftrace_controller.h"
-#include "src/traced/probes/ftrace/ftrace_procfs.h"
-
-#include "perfetto/trace/ftrace/ftrace_event_bundle.pb.h"
-#include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
-#include "perfetto/trace/ftrace/test_bundle_wrapper.pb.h"
-#include "perfetto/trace/ftrace/test_bundle_wrapper.pbzero.h"
-
-using testing::HasSubstr;
-using testing::Not;
-
-namespace perfetto {
-namespace {
-
-constexpr char kTracingPath[] = "/sys/kernel/debug/tracing/";
-
-using FtraceBundleHandle =
-    protozero::MessageHandle<protos::pbzero::FtraceEventBundle>;
-
-class EndToEndIntegrationTest : public ::testing::Test,
-                                public FtraceSink::Delegate {
- public:
-  void Finalize(protos::TestBundleWrapper* wrapper) {
-    message->set_after("--- Bundle wrapper after ---");
-    PERFETTO_CHECK(message);
-    size_t msg_size = message->Finalize();
-    std::unique_ptr<uint8_t[]> buffer = writer_delegate->StitchChunks(msg_size);
-    wrapper->ParseFromArray(buffer.get(), static_cast<int>(msg_size));
-    message.reset();
-  }
-
- protected:
-  virtual void SetUp() {
-    writer_delegate = std::unique_ptr<ScatteredStreamDelegateForTesting>(
-        new ScatteredStreamDelegateForTesting(base::kPageSize * 100));
-    writer = std::unique_ptr<protozero::ScatteredStreamWriter>(
-        new protozero::ScatteredStreamWriter(writer_delegate.get()));
-    writer_delegate->set_writer(writer.get());
-    message = std::unique_ptr<protos::pbzero::TestBundleWrapper>(
-        new protos::pbzero::TestBundleWrapper);
-    message->Reset(writer.get());
-    message->set_before("--- Bundle wrapper before ---");
-  }
-
-  virtual FtraceBundleHandle GetBundleForCpu(size_t cpu) {
-    PERFETTO_CHECK(!currently_writing_);
-    currently_writing_ = true;
-    cpu_being_written_ = cpu;
-    return FtraceBundleHandle(message->add_bundle());
-  }
-
-  virtual void OnBundleComplete(size_t cpu,
-                                FtraceBundleHandle,
-                                const FtraceMetadata&) {
-    PERFETTO_CHECK(currently_writing_);
-    currently_writing_ = false;
-    EXPECT_NE(cpu_being_written_, 9999ul);
-    EXPECT_EQ(cpu_being_written_, cpu);
-    if (!count--)
-      runner_.Quit();
-  }
-
-  base::UnixTaskRunner* runner() { return &runner_; }
-
- private:
-  size_t count = 3;
-  base::UnixTaskRunner runner_;
-  bool currently_writing_ = false;
-  size_t cpu_being_written_ = 9999;
-  std::unique_ptr<ScatteredStreamDelegateForTesting> writer_delegate = nullptr;
-  std::unique_ptr<protozero::ScatteredStreamWriter> writer = nullptr;
-  std::unique_ptr<protos::pbzero::TestBundleWrapper> message = nullptr;
-};
-
-}  // namespace
-
-TEST_F(EndToEndIntegrationTest, DISABLED_SchedSwitchAndPrint) {
-  FtraceProcfs procfs(kTracingPath);
-  procfs.ClearTrace();
-  procfs.WriteTraceMarker("Hello, World!");
-
-  // Create a sink listening for our favorite events:
-  std::unique_ptr<FtraceController> ftrace = FtraceController::Create(runner());
-  FtraceConfig config;
-  *config.add_ftrace_events() = "print";
-  *config.add_ftrace_events() = "sched_switch";
-  std::unique_ptr<FtraceSink> sink = ftrace->CreateSink(config, this);
-
-  // Let some events build up.
-  sleep(1);
-
-  // Start processing the tasks (OnBundleComplete will quit the task runner).
-  runner()->Run();
-
-  // Disable events.
-  sink.reset();
-
-  // Read the output into a full proto so we can use reflection.
-  protos::TestBundleWrapper output;
-  Finalize(&output);
-
-  // Check we can see the guards:
-  EXPECT_THAT(output.before(), HasSubstr("before"));
-  EXPECT_THAT(output.after(), HasSubstr("after"));
-
-  std::string output_as_text;
-  // TODO(hjd): Use reflection print code.
-  printf("%s\n", output_as_text.c_str());
-}
-
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-TEST_F(EndToEndIntegrationTest, DISABLED_Atrace) {
-  FtraceProcfs procfs(kTracingPath);
-  procfs.ClearTrace();
-
-  // Create a sink listening for our favorite events:
-  std::unique_ptr<FtraceController> ftrace = FtraceController::Create(runner());
-  FtraceConfig config;
-  *config.add_ftrace_events() = "print";
-  *config.add_ftrace_events() = "sched_switch";
-  std::unique_ptr<FtraceSink> sink = ftrace->CreateSink(config, this);
-
-  // Let some events build up.
-  sleep(1);
-
-  // Start processing the tasks (OnBundleComplete will quit the task runner).
-  runner()->Run();
-
-  // Disable events.
-  sink.reset();
-
-  // Read the output into a full proto so we can use reflection.
-  protos::TestBundleWrapper output;
-  Finalize(&output);
-
-  // Check we can see the guards:
-  EXPECT_THAT(output.before(), HasSubstr("before"));
-  EXPECT_THAT(output.after(), HasSubstr("after"));
-
-  std::string output_as_text;
-  printf("%s\n", output_as_text.c_str());
-}
-#endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-
-}  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.h b/src/traced/probes/ftrace/ftrace_config_muxer.h
index eb4627d..e8164b9 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.h
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.h
@@ -17,6 +17,7 @@
 #ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_CONFIG_MUXER_H_
 #define SRC_TRACED_PROBES_FTRACE_FTRACE_CONFIG_MUXER_H_
 
+#include "src/traced/probes/ftrace/ftrace_config.h"
 #include "src/traced/probes/ftrace/ftrace_controller.h"
 #include "src/traced/probes/ftrace/ftrace_procfs.h"
 
diff --git a/src/traced/probes/ftrace/ftrace_controller.cc b/src/traced/probes/ftrace/ftrace_controller.cc
index cdb6b49..68f11f3 100644
--- a/src/traced/probes/ftrace/ftrace_controller.cc
+++ b/src/traced/probes/ftrace/ftrace_controller.cc
@@ -32,17 +32,17 @@
 #include "perfetto/base/logging.h"
 #include "perfetto/base/time.h"
 #include "perfetto/base/utils.h"
+#include "perfetto/tracing/core/trace_writer.h"
 #include "src/traced/probes/ftrace/cpu_reader.h"
 #include "src/traced/probes/ftrace/cpu_stats_parser.h"
 #include "src/traced/probes/ftrace/event_info.h"
-#include "src/traced/probes/ftrace/ftrace_config.h"
 #include "src/traced/probes/ftrace/ftrace_config_muxer.h"
+#include "src/traced/probes/ftrace/ftrace_data_source.h"
+#include "src/traced/probes/ftrace/ftrace_metadata.h"
 #include "src/traced/probes/ftrace/ftrace_procfs.h"
+#include "src/traced/probes/ftrace/ftrace_stats.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 
-#include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
-#include "perfetto/trace/ftrace/ftrace_stats.pbzero.h"
-
 namespace perfetto {
 namespace {
 
@@ -108,7 +108,8 @@
 // static
 // TODO(taylori): Add a test for tracing paths in integration tests.
 std::unique_ptr<FtraceController> FtraceController::Create(
-    base::TaskRunner* runner) {
+    base::TaskRunner* runner,
+    Observer* observer) {
   size_t index = 0;
   std::unique_ptr<FtraceProcfs> ftrace_procfs = nullptr;
   while (!ftrace_procfs && kTracingPaths[index]) {
@@ -123,25 +124,28 @@
 
   std::unique_ptr<FtraceConfigMuxer> model = std::unique_ptr<FtraceConfigMuxer>(
       new FtraceConfigMuxer(ftrace_procfs.get(), table.get()));
-  return std::unique_ptr<FtraceController>(new FtraceController(
-      std::move(ftrace_procfs), std::move(table), std::move(model), runner));
+  return std::unique_ptr<FtraceController>(
+      new FtraceController(std::move(ftrace_procfs), std::move(table),
+                           std::move(model), runner, observer));
 }
 
 FtraceController::FtraceController(std::unique_ptr<FtraceProcfs> ftrace_procfs,
                                    std::unique_ptr<ProtoTranslationTable> table,
                                    std::unique_ptr<FtraceConfigMuxer> model,
-                                   base::TaskRunner* task_runner)
-    : ftrace_procfs_(std::move(ftrace_procfs)),
+                                   base::TaskRunner* task_runner,
+                                   Observer* observer)
+    : task_runner_(task_runner),
+      observer_(observer),
+      ftrace_procfs_(std::move(ftrace_procfs)),
       table_(std::move(table)),
       ftrace_config_muxer_(std::move(model)),
-      task_runner_(task_runner),
       weak_factory_(this) {}
 
 FtraceController::~FtraceController() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  for (const auto* sink : sinks_)
-    ftrace_config_muxer_->RemoveConfig(sink->id_);
-  sinks_.clear();
+  for (const auto* data_source : data_sources_)
+    ftrace_config_muxer_->RemoveConfig(data_source->config_id());
+  data_sources_.clear();
   StopIfNeeded();
 }
 
@@ -175,7 +179,10 @@
   for (size_t cpu = 0; cpu < ctrl->ftrace_procfs_->NumberOfCpus(); cpu++) {
     if (!cpus_to_drain[cpu])
       continue;
-    ctrl->OnRawFtraceDataAvailable(cpu);
+    // This method reads the pipe and converts the raw ftrace data into
+    // protobufs using the |data_source|'s TraceWriter.
+    ctrl->cpu_readers_[cpu]->Drain(ctrl->data_sources_);
+    ctrl->OnDrainCpuForTesting(cpu);
   }
 
   // If we filled up any SHM pages while draining the data, we will have posted
@@ -184,6 +191,8 @@
   // from traced.
   ctrl->task_runner_->PostTask(
       std::bind(&FtraceController::UnblockReaders, weak_this));
+
+  ctrl->observer_->OnFtraceDataWrittenIntoDataSourceBuffers();
 }
 
 // static
@@ -198,9 +207,9 @@
 }
 
 void FtraceController::StartIfNeeded() {
-  if (sinks_.size() > 1)
+  if (data_sources_.size() > 1)
     return;
-  PERFETTO_CHECK(!sinks_.empty());
+  PERFETTO_CHECK(!data_sources_.empty());
   {
     std::unique_lock<std::mutex> lock(lock_);
     PERFETTO_CHECK(!listening_for_raw_trace_data_);
@@ -218,12 +227,12 @@
 }
 
 uint32_t FtraceController::GetDrainPeriodMs() {
-  if (sinks_.empty())
+  if (data_sources_.empty())
     return kDefaultDrainPeriodMs;
   uint32_t min_drain_period_ms = kMaxDrainPeriodMs + 1;
-  for (const FtraceSink* sink : sinks_) {
-    if (sink->config().drain_period_ms() < min_drain_period_ms)
-      min_drain_period_ms = sink->config().drain_period_ms();
+  for (const FtraceDataSource* data_source : data_sources_) {
+    if (data_source->config().drain_period_ms() < min_drain_period_ms)
+      min_drain_period_ms = data_source->config().drain_period_ms();
   }
   return ClampDrainPeriodMs(min_drain_period_ms);
 }
@@ -241,7 +250,7 @@
 }
 
 void FtraceController::StopIfNeeded() {
-  if (!sinks_.empty())
+  if (!data_sources_.empty())
     return;
   {
     // Unblock any readers that are waiting for us to drain data.
@@ -253,59 +262,15 @@
   cpu_readers_.clear();
 }
 
-void FtraceController::OnRawFtraceDataAvailable(size_t cpu) {
-  PERFETTO_CHECK(cpu < ftrace_procfs_->NumberOfCpus());
-  CpuReader* reader = cpu_readers_[cpu].get();
-  using BundleHandle =
-      protozero::MessageHandle<protos::pbzero::FtraceEventBundle>;
-  std::array<const EventFilter*, kMaxSinks> filters{};
-  std::array<BundleHandle, kMaxSinks> bundles{};
-  std::array<FtraceMetadata*, kMaxSinks> metadatas{};
-  size_t sink_count = sinks_.size();
-  size_t i = 0;
-  for (FtraceSink* sink : sinks_) {
-    filters[i] = sink->event_filter();
-    metadatas[i] = sink->metadata_mutable();
-    bundles[i++] = sink->GetBundleForCpu(cpu);
-  }
-  reader->Drain(filters, bundles, metadatas);
-  i = 0;
-  for (FtraceSink* sink : sinks_)
-    sink->OnBundleComplete(cpu, std::move(bundles[i++]));
-  PERFETTO_DCHECK(sinks_.size() == sink_count);
-}
-
-std::unique_ptr<FtraceSink> FtraceController::CreateSink(
-    FtraceConfig config,
-    FtraceSink::Delegate* delegate) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
-  if (sinks_.size() >= kMaxSinks)
-    return nullptr;
-  if (!ValidConfig(config))
-    return nullptr;
-
-  FtraceConfigId id = ftrace_config_muxer_->RequestConfig(config);
-  if (!id)
-    return nullptr;
-
-  auto controller_weak = weak_factory_.GetWeakPtr();
-  auto filter = std::unique_ptr<EventFilter>(new EventFilter(
-      *table_, FtraceEventsAsSet(*ftrace_config_muxer_->GetConfig(id))));
-
-  auto sink = std::unique_ptr<FtraceSink>(
-      new FtraceSink(std::move(controller_weak), id, std::move(config),
-                     std::move(filter), delegate));
-  Register(sink.get());
-  delegate->OnCreate(sink.get());
-  return sink;
-}
-
+// This method is called on the worker thread. Lifetime is guaranteed to be
+// valid, because the FtraceController dtor (that happens on the main thread)
+// joins the worker threads. |weak_this| is passed and not derived, because the
+// WeakPtrFactory is accessible only on the main thread.
 void FtraceController::OnDataAvailable(
     base::WeakPtr<FtraceController> weak_this,
     size_t generation,
     size_t cpu,
     uint32_t drain_period_ms) {
-  // Called on the worker thread.
   PERFETTO_DCHECK(cpu < ftrace_procfs_->NumberOfCpus());
   std::unique_lock<std::mutex> lock(lock_);
   if (!listening_for_raw_trace_data_)
@@ -328,21 +293,30 @@
   });
 }
 
-void FtraceController::Register(FtraceSink* sink) {
+bool FtraceController::AddDataSource(FtraceDataSource* data_source) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  auto it_and_inserted = sinks_.insert(sink);
+  if (!ValidConfig(data_source->config()))
+    return false;
+
+  auto config_id = ftrace_config_muxer_->RequestConfig(data_source->config());
+  if (!config_id)
+    return false;
+
+  std::unique_ptr<EventFilter> filter(new EventFilter(
+      *table_, FtraceEventsAsSet(*ftrace_config_muxer_->GetConfig(config_id))));
+  auto it_and_inserted = data_sources_.insert(data_source);
   PERFETTO_DCHECK(it_and_inserted.second);
   StartIfNeeded();
+  data_source->Initialize(config_id, std::move(filter));
+  return true;
 }
 
-void FtraceController::Unregister(FtraceSink* sink) {
+void FtraceController::RemoveDataSource(FtraceDataSource* data_source) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-
-  size_t removed = sinks_.erase(sink);
-  PERFETTO_DCHECK(removed == 1);
-
-  ftrace_config_muxer_->RemoveConfig(sink->id_);
-
+  size_t removed = data_sources_.erase(data_source);
+  if (!removed)
+    return;  // Can happen if AddDataSource failed (e.g. too many sessions).
+  ftrace_config_muxer_->RemoveConfig(data_source->config_id());
   StopIfNeeded();
 }
 
@@ -350,22 +324,6 @@
   DumpAllCpuStats(ftrace_procfs_.get(), stats);
 }
 
-void FtraceStats::Write(protos::pbzero::FtraceStats* writer) const {
-  for (const FtraceCpuStats& cpu_specific_stats : cpu_stats) {
-    cpu_specific_stats.Write(writer->add_cpu_stats());
-  }
-}
-
-void FtraceCpuStats::Write(protos::pbzero::FtraceCpuStats* writer) const {
-  writer->set_cpu(cpu);
-  writer->set_entries(entries);
-  writer->set_overrun(overrun);
-  writer->set_commit_overrun(commit_overrun);
-  writer->set_bytes_read(bytes_read);
-  writer->set_oldest_event_ts(oldest_event_ts);
-  writer->set_now_ts(now_ts);
-  writer->set_dropped_events(dropped_events);
-  writer->set_read_events(read_events);
-}
+FtraceController::Observer::~Observer() = default;
 
 }  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index 3a73771..d3f9e3d 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -21,99 +21,76 @@
 
 #include <bitset>
 #include <condition_variable>
+#include <functional>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <set>
 #include <string>
-#include <vector>
 
 #include "gtest/gtest_prod.h"
-#include "perfetto/base/scoped_file.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/base/utils.h"
 #include "perfetto/base/weak_ptr.h"
-#include "perfetto/protozero/message_handle.h"
-#include "perfetto/traced/data_source_types.h"
-#include "src/traced/probes/ftrace/ftrace_sink.h"
+#include "src/traced/probes/ftrace/ftrace_config.h"
 
 namespace perfetto {
 
-namespace protos {
-namespace pbzero {
-class FtraceEventBundle;
-class FtraceStats;
-class FtraceCpuStats;
-}  // namespace pbzero
-}  // namespace protos
-
-struct FtraceCpuStats {
-  uint64_t cpu;
-  uint64_t entries;
-  uint64_t overrun;
-  uint64_t commit_overrun;
-  uint64_t bytes_read;
-  double oldest_event_ts;
-  double now_ts;
-  uint64_t dropped_events;
-  uint64_t read_events;
-
-  void Write(protos::pbzero::FtraceCpuStats*) const;
-};
-
-struct FtraceStats {
-  std::vector<FtraceCpuStats> cpu_stats;
-
-  void Write(protos::pbzero::FtraceStats*) const;
-};
-
-constexpr size_t kMaxSinks = 32;
-constexpr size_t kMaxCpus = 64;
+class CpuReader;
+class FtraceConfigMuxer;
+class FtraceDataSource;
+class FtraceProcfs;
+class ProtoTranslationTable;
+struct FtraceStats;
 
 // Method of last resort to reset ftrace state.
 void HardResetFtraceState();
 
-class CpuReader;
-class EventFilter;
-class FtraceConfig;
-class FtraceController;
-class FtraceConfigMuxer;
-class FtraceProcfs;
-class ProtoTranslationTable;
-
 // Utility class for controlling ftrace.
 class FtraceController {
  public:
-  static std::unique_ptr<FtraceController> Create(base::TaskRunner*);
-  virtual ~FtraceController();
+  class Observer {
+   public:
+    virtual ~Observer();
+    virtual void OnFtraceDataWrittenIntoDataSourceBuffers() = 0;
+  };
 
-  std::unique_ptr<FtraceSink> CreateSink(FtraceConfig, FtraceSink::Delegate*);
+  // The passed Observer must outlive the returned FtraceController instance.
+  static std::unique_ptr<FtraceController> Create(base::TaskRunner*, Observer*);
+  virtual ~FtraceController();
 
   void DisableAllEvents();
   void WriteTraceMarker(const std::string& s);
   void ClearTrace();
 
+  bool AddDataSource(FtraceDataSource*) PERFETTO_WARN_UNUSED_RESULT;
+  void RemoveDataSource(FtraceDataSource*);
+
+  void DumpFtraceStats(FtraceStats*);
+
+  base::WeakPtr<FtraceController> GetWeakPtr() {
+    return weak_factory_.GetWeakPtr();
+  }
+
  protected:
   // Protected for testing.
   FtraceController(std::unique_ptr<FtraceProcfs>,
                    std::unique_ptr<ProtoTranslationTable>,
                    std::unique_ptr<FtraceConfigMuxer>,
-                   base::TaskRunner*);
+                   base::TaskRunner*,
+                   Observer*);
 
-  // Write
-  void DumpFtraceStats(FtraceStats*);
-
-  // Called to read data from the staging pipe for the given |cpu| and parse it
-  // into the sinks. Protected and virtual for testing.
-  virtual void OnRawFtraceDataAvailable(size_t cpu);
+  virtual void OnDrainCpuForTesting(size_t /*cpu*/) {}
 
   // Protected and virtual for testing.
   virtual uint64_t NowMs() const;
 
  private:
-  friend FtraceSink;
   friend class TestFtraceController;
   FRIEND_TEST(FtraceControllerIntegrationTest, EnableDisableEvent);
 
+  static constexpr size_t kMaxCpus = 64;
+
   FtraceController(const FtraceController&) = delete;
   FtraceController& operator=(const FtraceController&) = delete;
 
@@ -129,9 +106,6 @@
 
   uint32_t GetDrainPeriodMs();
 
-  void Register(FtraceSink*);
-  void Unregister(FtraceSink*);
-
   void StartIfNeeded();
   void StopIfNeeded();
 
@@ -142,15 +116,16 @@
   bool listening_for_raw_trace_data_ = false;
   // End lock-protected members.
 
+  base::TaskRunner* const task_runner_;
+  Observer* const observer_;
   std::unique_ptr<FtraceProcfs> ftrace_procfs_;
   std::unique_ptr<ProtoTranslationTable> table_;
   std::unique_ptr<FtraceConfigMuxer> ftrace_config_muxer_;
   size_t generation_ = 0;
   bool atrace_running_ = false;
-  base::TaskRunner* task_runner_ = nullptr;
   std::map<size_t, std::unique_ptr<CpuReader>> cpu_readers_;
-  std::set<FtraceSink*> sinks_;
-  base::WeakPtrFactory<FtraceController> weak_factory_;
+  std::set<FtraceDataSource*> data_sources_;
+  base::WeakPtrFactory<FtraceController> weak_factory_;  // Keep last.
   PERFETTO_THREAD_CHECKER(thread_checker_)
 };
 
diff --git a/src/traced/probes/ftrace/ftrace_controller_unittest.cc b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
index 62934aa..a441b2e 100644
--- a/src/traced/probes/ftrace/ftrace_controller_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
@@ -26,6 +26,7 @@
 #include "src/traced/probes/ftrace/cpu_reader.h"
 #include "src/traced/probes/ftrace/ftrace_config.h"
 #include "src/traced/probes/ftrace/ftrace_config_muxer.h"
+#include "src/traced/probes/ftrace/ftrace_data_source.h"
 #include "src/traced/probes/ftrace/ftrace_procfs.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 #include "src/tracing/core/trace_writer_for_testing.h"
@@ -92,22 +93,6 @@
   std::function<void()> task_;
 };
 
-class MockDelegate : public perfetto::FtraceSink::Delegate {
- public:
-  MOCK_METHOD1(GetBundleForCpu,
-               protozero::MessageHandle<FtraceEventBundle>(size_t));
-  MOCK_METHOD3(OnBundleComplete_,
-               void(size_t,
-                    protozero::MessageHandle<FtraceEventBundle>&,
-                    const FtraceMetadata& metadata));
-
-  void OnBundleComplete(size_t cpu,
-                        protozero::MessageHandle<FtraceEventBundle> bundle,
-                        const FtraceMetadata& metadata) override {
-    OnBundleComplete_(cpu, bundle, metadata);
-  }
-};
-
 std::unique_ptr<Table> FakeTable() {
   std::vector<Field> common_fields;
   std::vector<Event> events;
@@ -151,6 +136,11 @@
     EXPECT_CALL(*this, ReadFileIntoString("/root/trace_clock"))
         .Times(AnyNumber());
 
+    ON_CALL(*this, ReadFileIntoString("/root/per_cpu/cpu0/stats"))
+        .WillByDefault(Return(""));
+    EXPECT_CALL(*this, ReadFileIntoString("/root/per_cpu/cpu0/stats"))
+        .Times(AnyNumber());
+
     ON_CALL(*this, WriteToFile(_, _)).WillByDefault(Return(true));
     ON_CALL(*this, ClearFile(_)).WillByDefault(Return(true));
 
@@ -191,7 +181,8 @@
 
 }  // namespace
 
-class TestFtraceController : public FtraceController {
+class TestFtraceController : public FtraceController,
+                             public FtraceController::Observer {
  public:
   TestFtraceController(std::unique_ptr<MockFtraceProcfs> ftrace_procfs,
                        std::unique_ptr<Table> table,
@@ -201,11 +192,12 @@
       : FtraceController(std::move(ftrace_procfs),
                          std::move(table),
                          std::move(model),
-                         runner.get()),
+                         runner.get(),
+                         /*observer=*/this),
         runner_(std::move(runner)),
         procfs_(raw_procfs) {}
 
-  MOCK_METHOD1(OnRawFtraceDataAvailable, void(size_t cpu));
+  MOCK_METHOD1(OnDrainCpuForTesting, void(size_t cpu));
 
   MockTaskRunner* runner() { return runner_.get(); }
   MockFtraceProcfs* procfs() { return procfs_; }
@@ -233,6 +225,16 @@
     }
   }
 
+  std::unique_ptr<FtraceDataSource> AddFakeDataSource(const FtraceConfig& cfg) {
+    std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
+        GetWeakPtr(), 0 /* session id */, cfg, nullptr /* trace_writer */));
+    if (!AddDataSource(data_source.get()))
+      return nullptr;
+    return data_source;
+  }
+
+  void OnFtraceDataWrittenIntoDataSourceBuffers() override {}
+
   uint64_t now_ms = 0;
 
  private:
@@ -281,19 +283,16 @@
   auto controller =
       CreateTestController(true /* nice runner */, true /* nice procfs */);
 
-  MockDelegate delegate;
   FtraceConfig config = CreateFtraceConfig({"not_an_event"});
-
-  std::unique_ptr<FtraceSink> sink = controller->CreateSink(config, &delegate);
+  EXPECT_TRUE(controller->AddFakeDataSource(config));
 }
 
 TEST(FtraceControllerTest, RejectsBadEventNames) {
   auto controller =
       CreateTestController(true /* nice runner */, true /* nice procfs */);
 
-  MockDelegate delegate;
   FtraceConfig config = CreateFtraceConfig({"../try/to/escape"});
-  EXPECT_FALSE(controller->CreateSink(config, &delegate));
+  EXPECT_FALSE(controller->AddFakeDataSource(config));
   EXPECT_FALSE(controller->procfs()->is_tracing_on());
 }
 
@@ -301,13 +300,13 @@
   auto controller =
       CreateTestController(true /* nice runner */, false /* nice procfs */);
 
-  MockDelegate delegate;
   FtraceConfig config = CreateFtraceConfig({"foo"});
 
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
-  std::unique_ptr<FtraceSink> sink = controller->CreateSink(config, &delegate);
+  auto data_source = controller->AddFakeDataSource(config);
+  ASSERT_TRUE(data_source);
 
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", "0"));
   EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
@@ -320,7 +319,7 @@
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/events/enable", "0"));
   EXPECT_TRUE(controller->procfs()->is_tracing_on());
 
-  sink.reset();
+  data_source.reset();
   EXPECT_FALSE(controller->procfs()->is_tracing_on());
 }
 
@@ -328,22 +327,16 @@
   auto controller =
       CreateTestController(false /* nice runner */, false /* nice procfs */);
 
-  MockDelegate delegate;
-
   FtraceConfig configA = CreateFtraceConfig({"foo"});
   FtraceConfig configB = CreateFtraceConfig({"foo", "bar"});
 
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
-  std::unique_ptr<FtraceSink> sinkA =
-      controller->CreateSink(configA, &delegate);
-
+  auto data_sourceA = controller->AddFakeDataSource(configA);
   EXPECT_CALL(*controller->procfs(), WriteToFile(kBarEnablePath, "1"));
-  std::unique_ptr<FtraceSink> sinkB =
-      controller->CreateSink(configB, &delegate);
-
-  sinkA.reset();
+  auto data_sourceB = controller->AddFakeDataSource(configB);
+  data_sourceA.reset();
 
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kBarEnablePath, "0"));
@@ -353,20 +346,19 @@
   EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"));
   EXPECT_CALL(*controller->procfs(),
               ClearFile(MatchesRegex("/root/per_cpu/cpu[0-9]/trace")));
-  sinkB.reset();
+  data_sourceB.reset();
 }
 
 TEST(FtraceControllerTest, ControllerMayDieFirst) {
   auto controller =
       CreateTestController(false /* nice runner */, false /* nice procfs */);
 
-  MockDelegate delegate;
   FtraceConfig config = CreateFtraceConfig({"foo"});
 
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
-  std::unique_ptr<FtraceSink> sink = controller->CreateSink(config, &delegate);
+  auto data_source = controller->AddFakeDataSource(config);
 
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
   EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
@@ -378,8 +370,7 @@
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", "0"));
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/events/enable", "0"));
   controller.reset();
-
-  sink.reset();
+  data_source.reset();
 }
 
 TEST(FtraceControllerTest, TaskScheduling) {
@@ -390,16 +381,14 @@
   EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
   EXPECT_CALL(*controller->procfs(), ClearFile(_)).Times(AnyNumber());
 
-  MockDelegate delegate;
   FtraceConfig config = CreateFtraceConfig({"foo"});
-
-  std::unique_ptr<FtraceSink> sink = controller->CreateSink(config, &delegate);
+  auto data_source = controller->AddFakeDataSource(config);
 
   // Only one call to drain should be scheduled for the next drain period.
   EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100));
 
   // However both CPUs should be drained.
-  EXPECT_CALL(*controller, OnRawFtraceDataAvailable(_)).Times(2);
+  EXPECT_CALL(*controller, OnDrainCpuForTesting(_)).Times(2);
 
   // Finally, another task should be posted to unblock the workers.
   EXPECT_CALL(*controller->runner(), PostTask(_));
@@ -424,7 +413,7 @@
   worker0.join();
   worker1.join();
 
-  sink.reset();
+  data_source.reset();
 }
 
 // TODO(b/73452932): Fix and reenable this test.
@@ -436,18 +425,16 @@
   EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
   EXPECT_CALL(*controller->procfs(), ClearFile(_)).Times(AnyNumber());
 
-  MockDelegate delegate;
   FtraceConfig config = CreateFtraceConfig({"foo"});
+  auto data_source = controller->AddFakeDataSource(config);
 
   // Test several cycles of a worker producing data and make sure the drain
   // delay is consistent with the drain period.
-  std::unique_ptr<FtraceSink> sink = controller->CreateSink(config, &delegate);
-
   const int kCycles = 50;
   EXPECT_CALL(*controller->runner(),
               PostDelayedTask(_, controller->drain_period_ms()))
       .Times(kCycles);
-  EXPECT_CALL(*controller, OnRawFtraceDataAvailable(_)).Times(kCycles);
+  EXPECT_CALL(*controller, OnDrainCpuForTesting(_)).Times(kCycles);
   EXPECT_CALL(*controller->runner(), PostTask(_)).Times(kCycles);
 
   // Simulate a worker thread continually reporting pages of available data.
@@ -466,7 +453,7 @@
   }
 
   worker.join();
-  sink.reset();
+  data_source.reset();
 }
 
 TEST(FtraceControllerTest, BackToBackEnableDisable) {
@@ -479,31 +466,27 @@
   EXPECT_CALL(*controller->procfs(), ReadOneCharFromFile("/root/tracing_on"))
       .Times(AnyNumber());
 
-  MockDelegate delegate;
-  FtraceConfig config = CreateFtraceConfig({"foo"});
-
   EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100)).Times(2);
-  std::unique_ptr<FtraceSink> sink_a =
-      controller->CreateSink(config, &delegate);
+  FtraceConfig config = CreateFtraceConfig({"foo"});
+  auto data_source = controller->AddFakeDataSource(config);
 
   auto on_data_available = controller->GetDataAvailableCallback(0u);
   std::thread worker([on_data_available] { on_data_available(); });
   controller->WaitForData(0u);
 
-  // Disable the first sink and run the delayed task that it generated. It
-  // should be a no-op.
-  sink_a.reset();
+  // Disable the first data source and run the delayed task that it generated.
+  // It should be a no-op.
+  data_source.reset();
   controller->runner()->RunLastTask();
   worker.join();
 
-  // Register another sink and wait for it to generate data.
-  std::unique_ptr<FtraceSink> sink_b =
-      controller->CreateSink(config, &delegate);
+  // Register another data source and wait for it to generate data.
+  data_source = controller->AddFakeDataSource(config);
   std::thread worker2([on_data_available] { on_data_available(); });
   controller->WaitForData(0u);
 
-  // This drain should also be a no-op after the sink is unregistered.
-  sink_b.reset();
+  // This drain should also be a no-op after the data source is unregistered.
+  data_source.reset();
   controller->runner()->RunLastTask();
   worker2.join();
 }
@@ -515,7 +498,6 @@
   // For this test we don't care about most calls to WriteToFile/ClearFile.
   EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
   EXPECT_CALL(*controller->procfs(), ClearFile(_)).Times(AnyNumber());
-  MockDelegate delegate;
 
   {
     // No buffer size -> good default.
@@ -523,7 +505,7 @@
     EXPECT_CALL(*controller->procfs(),
                 WriteToFile("/root/buffer_size_kb", "512"));
     FtraceConfig config = CreateFtraceConfig({"foo"});
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
   }
 
   {
@@ -532,7 +514,7 @@
                 WriteToFile("/root/buffer_size_kb", "65536"));
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_buffer_size_kb(10 * 1024 * 1024);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
   }
 
   {
@@ -542,7 +524,7 @@
     FtraceConfig config = CreateFtraceConfig({"foo"});
     ON_CALL(*controller->procfs(), NumberOfCpus()).WillByDefault(Return(2));
     config.set_buffer_size_kb(65 * 1024);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
   }
 
   {
@@ -551,7 +533,7 @@
                 WriteToFile("/root/buffer_size_kb", "4"));
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_buffer_size_kb(1);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
   }
 
   {
@@ -560,7 +542,7 @@
                 WriteToFile("/root/buffer_size_kb", "40"));
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_buffer_size_kb(42);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
   }
 
   {
@@ -570,7 +552,7 @@
     FtraceConfig config = CreateFtraceConfig({"foo"});
     ON_CALL(*controller->procfs(), NumberOfCpus()).WillByDefault(Return(2));
     config.set_buffer_size_kb(42);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
   }
 }
 
@@ -581,12 +563,11 @@
   // For this test we don't care about calls to WriteToFile/ClearFile.
   EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
   EXPECT_CALL(*controller->procfs(), ClearFile(_)).Times(AnyNumber());
-  MockDelegate delegate;
 
   {
     // No period -> good default.
     FtraceConfig config = CreateFtraceConfig({"foo"});
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
     EXPECT_EQ(100u, controller->drain_period_ms());
   }
 
@@ -594,7 +575,7 @@
     // Pick a tiny value -> good default.
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_drain_period_ms(0);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
     EXPECT_EQ(100u, controller->drain_period_ms());
   }
 
@@ -602,7 +583,7 @@
     // Pick a huge value -> good default.
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_drain_period_ms(1000 * 60 * 60);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
     EXPECT_EQ(100u, controller->drain_period_ms());
   }
 
@@ -610,7 +591,7 @@
     // Pick a resonable value -> get that value.
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_drain_period_ms(200);
-    auto sink = controller->CreateSink(config, &delegate);
+    auto data_source = controller->AddFakeDataSource(config);
     EXPECT_EQ(200u, controller->drain_period_ms());
   }
 }
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
new file mode 100644
index 0000000..da5391e
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -0,0 +1,86 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/ftrace/ftrace_data_source.h"
+
+#include "src/traced/probes/ftrace/cpu_reader.h"
+#include "src/traced/probes/ftrace/ftrace_controller.h"
+
+#include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
+#include "perfetto/trace/ftrace/ftrace_stats.pbzero.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+
+namespace perfetto {
+
+// static
+constexpr int FtraceDataSource::kTypeId;
+
+FtraceDataSource::FtraceDataSource(
+    base::WeakPtr<FtraceController> controller_weak,
+    TracingSessionID session_id,
+    const FtraceConfig& config,
+    std::unique_ptr<TraceWriter> writer)
+    : ProbesDataSource(session_id, kTypeId),
+      config_(config),
+      writer_(std::move(writer)),
+      controller_weak_(std::move(controller_weak)){};
+
+FtraceDataSource::~FtraceDataSource() {
+  if (controller_weak_)
+    controller_weak_->RemoveDataSource(this);
+};
+
+void FtraceDataSource::Initialize(FtraceConfigId config_id,
+                                  std::unique_ptr<EventFilter> event_filter) {
+  config_id_ = config_id;
+  event_filter_ = std::move(event_filter);
+  DumpFtraceStats(&stats_before_);
+}
+
+void FtraceDataSource::DumpFtraceStats(FtraceStats* stats) {
+  if (controller_weak_)
+    controller_weak_->DumpFtraceStats(stats);
+}
+
+void FtraceDataSource::Flush() {
+  // TODO(primiano): this still doesn't flush data from the kernel ftrace
+  // buffers (see b/73886018). We should do that and delay the
+  // NotifyFlushComplete() until the ftrace data has been drained from the
+  // kernel ftrace buffer and written in the SMB.
+  if (!writer_)
+    return;
+  WriteStats();
+  writer_->Flush();
+}
+
+void FtraceDataSource::WriteStats() {
+  {
+    auto before_packet = writer_->NewTracePacket();
+    auto out = before_packet->set_ftrace_stats();
+    out->set_phase(protos::pbzero::FtraceStats_Phase_START_OF_TRACE);
+    stats_before_.Write(out);
+  }
+  {
+    FtraceStats stats_after{};
+    DumpFtraceStats(&stats_after);
+    auto after_packet = writer_->NewTracePacket();
+    auto out = after_packet->set_ftrace_stats();
+    out->set_phase(protos::pbzero::FtraceStats_Phase_END_OF_TRACE);
+    stats_after.Write(out);
+  }
+}
+
+}  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
new file mode 100644
index 0000000..3def5fd
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_DATA_SOURCE_H_
+#define SRC_TRACED_PROBES_FTRACE_FTRACE_DATA_SOURCE_H_
+
+#include <memory>
+#include <set>
+#include <string>
+
+#include "perfetto/base/scoped_file.h"
+#include "perfetto/base/weak_ptr.h"
+#include "perfetto/protozero/message_handle.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "src/traced/probes/ftrace/ftrace_config.h"
+#include "src/traced/probes/ftrace/ftrace_metadata.h"
+#include "src/traced/probes/ftrace/ftrace_stats.h"
+#include "src/traced/probes/probes_data_source.h"
+
+namespace perfetto {
+
+class EventFilter;
+class FtraceController;
+class ProcessStatsDataSource;
+class InodeFileDataSource;
+
+namespace protos {
+namespace pbzero {
+class FtraceEventBundle;
+}  // namespace pbzero
+}  // namespace protos
+
+// This class handles the state for one particular tracing session involving
+// ftrace. There can be several concurrent tracing sessions involving ftrace
+// and this class is essentially the building block used to multiplex them.
+// This class is instantiated by ProbesProducer. ProbesProducer also owns the
+// FtraceController.
+class FtraceDataSource : public ProbesDataSource {
+ public:
+  static constexpr int kTypeId = 1;
+  FtraceDataSource(base::WeakPtr<FtraceController>,
+                   TracingSessionID,
+                   const FtraceConfig&,
+                   std::unique_ptr<TraceWriter>);
+  ~FtraceDataSource() override;
+
+  // Called by FtraceController soon after ProbesProducer creates the data
+  // source, to inject ftrace dependencies.
+  void Initialize(FtraceConfigId, std::unique_ptr<EventFilter>);
+
+  // Flushes the ftrace buffers into the userspace trace buffers and writes
+  // also ftrace stats.
+  void Flush() override;
+
+  FtraceConfigId config_id() const { return config_id_; }
+  const FtraceConfig& config() const { return config_; }
+  EventFilter* event_filter() { return event_filter_.get(); }
+  FtraceMetadata* mutable_metadata() { return &metadata_; }
+  TraceWriter* trace_writer() { return writer_.get(); }
+
+ private:
+  FtraceDataSource(const FtraceDataSource&) = delete;
+  FtraceDataSource& operator=(const FtraceDataSource&) = delete;
+
+  void WriteStats();
+  void DumpFtraceStats(FtraceStats*);
+
+  const FtraceConfig config_;
+  FtraceMetadata metadata_;
+  FtraceStats stats_before_ = {};
+
+  // Initialized by the Initialize() call.
+  FtraceConfigId config_id_ = 0;
+  std::unique_ptr<TraceWriter> writer_;
+  base::WeakPtr<FtraceController> controller_weak_;
+  std::unique_ptr<EventFilter> event_filter_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_FTRACE_FTRACE_DATA_SOURCE_H_
diff --git a/src/traced/probes/ftrace/ftrace_metadata.h b/src/traced/probes/ftrace/ftrace_metadata.h
index eab94ef..4e2e04a 100644
--- a/src/traced/probes/ftrace/ftrace_metadata.h
+++ b/src/traced/probes/ftrace/ftrace_metadata.h
@@ -34,7 +34,7 @@
 struct FtraceMetadata {
   FtraceMetadata();
 
-  uint32_t overwrite_count;
+  uint32_t overwrite_count = 0;
   BlockDeviceID last_seen_device_id = 0;
 #if PERFETTO_DCHECK_IS_ON()
   bool seen_device_id = false;
diff --git a/src/traced/probes/ftrace/ftrace_sink.cc b/src/traced/probes/ftrace/ftrace_sink.cc
deleted file mode 100644
index 70908c0..0000000
--- a/src/traced/probes/ftrace/ftrace_sink.cc
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2017 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/traced/probes/ftrace/ftrace_sink.h"
-
-#include "src/traced/probes/ftrace/cpu_reader.h"
-#include "src/traced/probes/ftrace/ftrace_controller.h"
-
-namespace perfetto {
-
-FtraceSink::FtraceSink(base::WeakPtr<FtraceController> controller_weak,
-                       FtraceConfigId id,
-                       FtraceConfig config,
-                       std::unique_ptr<EventFilter> filter,
-                       Delegate* delegate)
-    : controller_weak_(std::move(controller_weak)),
-      id_(id),
-      config_(std::move(config)),
-      filter_(std::move(filter)),
-      delegate_(delegate){};
-
-FtraceSink::~FtraceSink() {
-  if (controller_weak_)
-    controller_weak_->Unregister(this);
-};
-
-const std::set<std::string>& FtraceSink::enabled_events() {
-  return filter_->enabled_names();
-}
-
-void FtraceSink::DumpFtraceStats(FtraceStats* stats) {
-  if (controller_weak_)
-    controller_weak_->DumpFtraceStats(stats);
-}
-
-FtraceSink::Delegate::~Delegate() = default;
-
-}  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_sink.h b/src/traced/probes/ftrace/ftrace_sink.h
deleted file mode 100644
index a95b4c7..0000000
--- a/src/traced/probes/ftrace/ftrace_sink.h
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright (C) 2017 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_SINK_H_
-#define SRC_TRACED_PROBES_FTRACE_FTRACE_SINK_H_
-
-#include <memory>
-#include <set>
-#include <string>
-
-#include "perfetto/base/scoped_file.h"
-#include "perfetto/base/weak_ptr.h"
-#include "perfetto/protozero/message_handle.h"
-#include "src/traced/probes/ftrace/ftrace_config.h"
-#include "src/traced/probes/ftrace/ftrace_metadata.h"
-
-namespace perfetto {
-
-class EventFilter;
-class FtraceController;
-struct FtraceStats;
-
-namespace protos {
-namespace pbzero {
-class FtraceEventBundle;
-}  // namespace pbzero
-}  // namespace protos
-
-// To consume ftrace data clients implement a |FtraceSink::Delegate| and use it
-// to create a |FtraceSink|. While the FtraceSink lives FtraceController will
-// call |GetBundleForCpu|, write data into the bundle then call
-// |OnBundleComplete| allowing the client to perform finalization.
-class FtraceSink {
- public:
-  using FtraceEventBundle = protos::pbzero::FtraceEventBundle;
-  class Delegate {
-   public:
-    virtual void OnCreate(FtraceSink*) {}
-    virtual protozero::MessageHandle<FtraceEventBundle> GetBundleForCpu(
-        size_t) = 0;
-    virtual void OnBundleComplete(size_t,
-                                  protozero::MessageHandle<FtraceEventBundle>,
-                                  const FtraceMetadata&) = 0;
-    virtual ~Delegate();
-  };
-
-  FtraceSink(base::WeakPtr<FtraceController>,
-             FtraceConfigId id,
-             FtraceConfig config,
-             std::unique_ptr<EventFilter>,
-             Delegate*);
-  ~FtraceSink();
-
-  void DumpFtraceStats(FtraceStats*);
-
-  const FtraceConfig& config() const { return config_; }
-
- private:
-  friend FtraceController;
-
-  FtraceSink(const FtraceSink&) = delete;
-  FtraceSink& operator=(const FtraceSink&) = delete;
-
-  EventFilter* event_filter() { return filter_.get(); }
-  FtraceMetadata* metadata_mutable() { return &metadata_; }
-
-  protozero::MessageHandle<FtraceEventBundle> GetBundleForCpu(size_t cpu) {
-    return delegate_->GetBundleForCpu(cpu);
-  }
-  void OnBundleComplete(size_t cpu,
-                        protozero::MessageHandle<FtraceEventBundle> bundle) {
-    delegate_->OnBundleComplete(cpu, std::move(bundle), metadata_);
-    metadata_.Clear();
-  }
-
-  const std::set<std::string>& enabled_events();
-
-  base::WeakPtr<FtraceController> controller_weak_;
-  const FtraceConfigId id_;
-  const FtraceConfig config_;
-  std::unique_ptr<EventFilter> filter_;
-  FtraceMetadata metadata_;
-  FtraceSink::Delegate* delegate_;
-};
-
-}  // namespace perfetto
-
-#endif  // SRC_TRACED_PROBES_FTRACE_FTRACE_SINK_H_
diff --git a/src/traced/probes/ftrace/ftrace_stats.cc b/src/traced/probes/ftrace/ftrace_stats.cc
new file mode 100644
index 0000000..eff118d
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_stats.cc
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/ftrace/ftrace_stats.h"
+
+#include "perfetto/trace/ftrace/ftrace_stats.pbzero.h"
+
+namespace perfetto {
+
+void FtraceStats::Write(protos::pbzero::FtraceStats* writer) const {
+  for (const FtraceCpuStats& cpu_specific_stats : cpu_stats) {
+    cpu_specific_stats.Write(writer->add_cpu_stats());
+  }
+}
+
+void FtraceCpuStats::Write(protos::pbzero::FtraceCpuStats* writer) const {
+  writer->set_cpu(cpu);
+  writer->set_entries(entries);
+  writer->set_overrun(overrun);
+  writer->set_commit_overrun(commit_overrun);
+  writer->set_bytes_read(bytes_read);
+  writer->set_oldest_event_ts(oldest_event_ts);
+  writer->set_now_ts(now_ts);
+  writer->set_dropped_events(dropped_events);
+  writer->set_read_events(read_events);
+}
+
+}  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_stats.h b/src/traced/probes/ftrace/ftrace_stats.h
new file mode 100644
index 0000000..f7e6b94
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_stats.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_STATS_H_
+#define SRC_TRACED_PROBES_FTRACE_FTRACE_STATS_H_
+
+#include <inttypes.h>
+
+#include <vector>
+
+namespace perfetto {
+
+namespace protos {
+namespace pbzero {
+class FtraceStats;
+class FtraceCpuStats;
+}  // namespace pbzero
+}  // namespace protos
+
+struct FtraceCpuStats {
+  uint64_t cpu;
+  uint64_t entries;
+  uint64_t overrun;
+  uint64_t commit_overrun;
+  uint64_t bytes_read;
+  double oldest_event_ts;
+  double now_ts;
+  uint64_t dropped_events;
+  uint64_t read_events;
+
+  void Write(protos::pbzero::FtraceCpuStats*) const;
+};
+
+struct FtraceStats {
+  std::vector<FtraceCpuStats> cpu_stats;
+
+  void Write(protos::pbzero::FtraceStats*) const;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_FTRACE_FTRACE_STATS_H_
diff --git a/src/traced/probes/probes_data_source.cc b/src/traced/probes/probes_data_source.cc
new file mode 100644
index 0000000..f051f8d
--- /dev/null
+++ b/src/traced/probes/probes_data_source.cc
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/probes_data_source.h"
+
+namespace perfetto {
+
+ProbesDataSource::ProbesDataSource(TracingSessionID session_id, int id)
+    : tracing_session_id(session_id), type_id(id) {}
+ProbesDataSource::~ProbesDataSource() = default;
+
+}  // namespace perfetto
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
new file mode 100644
index 0000000..3e823e0
--- /dev/null
+++ b/src/traced/probes/probes_data_source.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
+#define SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
+
+#include "perfetto/tracing/core/basic_types.h"
+
+namespace perfetto {
+
+// Base class for all data sources in traced_probes.
+class ProbesDataSource {
+ public:
+  // |type_id| is a home-brewed RTTI, e.g. InodeFileDataSource::kTypeId.
+  ProbesDataSource(TracingSessionID, int type_id);
+  virtual ~ProbesDataSource();
+
+  virtual void Flush() = 0;
+
+  const TracingSessionID tracing_session_id;
+  const int type_id;
+
+ private:
+  ProbesDataSource(const ProbesDataSource&) = delete;
+  ProbesDataSource& operator=(const ProbesDataSource&) = delete;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 1e100c4..1fd3ad5 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -24,6 +24,7 @@
 #include <string>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
 #include "perfetto/base/weak_ptr.h"
 #include "perfetto/traced/traced.h"
 #include "perfetto/tracing/core/data_source_config.h"
@@ -33,6 +34,8 @@
 #include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/ipc/producer_ipc_client.h"
 #include "src/traced/probes/filesystem/inode_file_data_source.h"
+#include "src/traced/probes/ftrace/ftrace_data_source.h"
+#include "src/traced/probes/probes_data_source.h"
 
 #include "perfetto/trace/filesystem/inode_file_map.pbzero.h"
 #include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
@@ -58,8 +61,12 @@
 //                    +--------------+
 //
 
-ProbesProducer::ProbesProducer() {}
-ProbesProducer::~ProbesProducer() = default;
+ProbesProducer::ProbesProducer() : weak_factory_(this) {}
+ProbesProducer::~ProbesProducer() {
+  // The ftrace data sources must be deleted before the ftrace controller.
+  data_sources_.clear();
+  ftrace_.reset();
+}
 
 void ProbesProducer::OnConnect() {
   PERFETTO_DCHECK(state_ == kConnecting);
@@ -111,50 +118,38 @@
 
 void ProbesProducer::CreateDataSourceInstance(DataSourceInstanceID instance_id,
                                               const DataSourceConfig& config) {
+  PERFETTO_DCHECK(data_sources_.count(instance_id) == 0);
+
   // TODO(hjd): This a hack since we don't actually know the session id. For
   // now we'll assume anything wit hthe same target buffer is in the same
   // session.
   TracingSessionID session_id = config.target_buffer();
 
+  std::unique_ptr<ProbesDataSource> data_source;
   if (config.name() == kFtraceSourceName) {
-    if (!CreateFtraceDataSourceInstance(session_id, instance_id, config))
-      failed_sources_.insert(instance_id);
+    data_source = CreateFtraceDataSource(session_id, instance_id, config);
   } else if (config.name() == kInodeMapSourceName) {
-    CreateInodeFileDataSourceInstance(session_id, instance_id, config);
+    data_source = CreateInodeFileDataSource(session_id, instance_id, config);
   } else if (config.name() == kProcessStatsSourceName) {
-    CreateProcessStatsDataSourceInstance(session_id, instance_id, config);
-  } else {
-    PERFETTO_ELOG("Data source name: %s not recognised.",
-                  config.name().c_str());
+    data_source = CreateProcessStatsDataSource(session_id, instance_id, config);
+  }
+
+  if (!data_source) {
+    PERFETTO_ELOG("Failed to create data source '%s'", config.name().c_str());
     return;
   }
 
-  std::map<TracingSessionID, InodeFileDataSource*> file_sources;
-  std::map<TracingSessionID, ProcessStatsDataSource*> ps_sources;
-  for (const auto& pair : file_map_sources_)
-    file_sources[pair.second->session_id()] = pair.second.get();
-  for (const auto& pair : process_stats_sources_)
-    ps_sources[pair.second->session_id()] = pair.second.get();
+  session_data_sources_.emplace(session_id, data_source.get());
+  data_sources_[instance_id] = std::move(data_source);
 
-  for (const auto& id_to_source : delegates_) {
-    const std::unique_ptr<SinkDelegate>& source = id_to_source.second;
-    if (session_id != source->session_id())
-      continue;
-    if (!source->ps_source() && ps_sources.count(session_id))
-      source->set_ps_source(ps_sources[session_id]->GetWeakPtr());
-    if (!source->file_source() && file_sources.count(session_id))
-      source->set_file_source(file_sources[session_id]->GetWeakPtr());
+  if (config.trace_duration_ms() != 0) {
+    uint32_t timeout = 5000 + 2 * config.trace_duration_ms();
+    watchdogs_.emplace(
+        instance_id, base::Watchdog::GetInstance()->CreateFatalTimer(timeout));
   }
 }
 
-void ProbesProducer::AddWatchdogsTimer(DataSourceInstanceID id,
-                                       const DataSourceConfig& config) {
-  if (config.trace_duration_ms() != 0)
-    watchdogs_.emplace(id, base::Watchdog::GetInstance()->CreateFatalTimer(
-                               5000 + 2 * config.trace_duration_ms()));
-}
-
-bool ProbesProducer::CreateFtraceDataSourceInstance(
+std::unique_ptr<ProbesDataSource> ProbesProducer::CreateFtraceDataSource(
     TracingSessionID session_id,
     DataSourceInstanceID id,
     const DataSourceConfig& config) {
@@ -162,16 +157,16 @@
   // This can legitimately happen on user builds where we cannot access the
   // debug paths, e.g., because of SELinux rules.
   if (ftrace_creation_failed_)
-    return false;
+    return nullptr;
 
   // Lazily create on the first instance.
   if (!ftrace_) {
-    ftrace_ = FtraceController::Create(task_runner_);
+    ftrace_ = FtraceController::Create(task_runner_, this);
 
     if (!ftrace_) {
       PERFETTO_ELOG("Failed to create FtraceController");
       ftrace_creation_failed_ = true;
-      return false;
+      return nullptr;
     }
 
     ftrace_->DisableAllEvents();
@@ -180,72 +175,65 @@
 
   PERFETTO_LOG("Ftrace start (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
                config.target_buffer());
-
-  FtraceConfig proto_config = config.ftrace_config();
-
-  auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(config.target_buffer()));
-  auto delegate = std::unique_ptr<SinkDelegate>(
-      new SinkDelegate(session_id, task_runner_, std::move(trace_writer)));
-  auto sink = ftrace_->CreateSink(std::move(proto_config), delegate.get());
-  if (!sink) {
-    PERFETTO_ELOG("Failed to start tracing (maybe someone else is using it?)");
-    return false;
+  const BufferID buffer_id = static_cast<BufferID>(config.target_buffer());
+  std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
+      ftrace_->GetWeakPtr(), session_id, config.ftrace_config(),
+      endpoint_->CreateTraceWriter(buffer_id)));
+  if (!ftrace_->AddDataSource(data_source.get())) {
+    PERFETTO_ELOG(
+        "Failed to start tracing (too many concurrent sessions or ftrace is "
+        "already in use)");
+    return nullptr;
   }
-  delegate->set_sink(std::move(sink));
-  delegates_.emplace(id, std::move(delegate));
-  AddWatchdogsTimer(id, config);
-  return true;
+  return std::move(data_source);
 }
 
-void ProbesProducer::CreateInodeFileDataSourceInstance(
+std::unique_ptr<ProbesDataSource> ProbesProducer::CreateInodeFileDataSource(
     TracingSessionID session_id,
     DataSourceInstanceID id,
     DataSourceConfig source_config) {
   PERFETTO_LOG("Inode file map start (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
                id, source_config.target_buffer());
-  auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(source_config.target_buffer()));
+  auto buffer_id = static_cast<BufferID>(source_config.target_buffer());
   if (system_inodes_.empty())
     CreateStaticDeviceToInodeMap("/system", &system_inodes_);
-  auto file_map_source =
-      std::unique_ptr<InodeFileDataSource>(new InodeFileDataSource(
-          std::move(source_config), task_runner_, session_id, &system_inodes_,
-          &cache_, std::move(trace_writer)));
-  file_map_sources_.emplace(id, std::move(file_map_source));
-  AddWatchdogsTimer(id, source_config);
+  return std::unique_ptr<InodeFileDataSource>(new InodeFileDataSource(
+      std::move(source_config), task_runner_, session_id, &system_inodes_,
+      &cache_, endpoint_->CreateTraceWriter(buffer_id)));
 }
 
-void ProbesProducer::CreateProcessStatsDataSourceInstance(
+std::unique_ptr<ProbesDataSource> ProbesProducer::CreateProcessStatsDataSource(
     TracingSessionID session_id,
     DataSourceInstanceID id,
     const DataSourceConfig& config) {
-  PERFETTO_DCHECK(process_stats_sources_.count(id) == 0);
-  auto trace_writer = endpoint_->CreateTraceWriter(
-      static_cast<BufferID>(config.target_buffer()));
-  auto source = std::unique_ptr<ProcessStatsDataSource>(
-      new ProcessStatsDataSource(session_id, std::move(trace_writer), config));
-  auto it_and_inserted = process_stats_sources_.emplace(id, std::move(source));
-  if (!it_and_inserted.second) {
-    PERFETTO_DCHECK(false);
-    return;
-  }
-  ProcessStatsDataSource* ps_data_source = it_and_inserted.first->second.get();
+  base::ignore_result(id);
+  auto buffer_id = static_cast<BufferID>(config.target_buffer());
+  auto data_source =
+      std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
+          session_id, endpoint_->CreateTraceWriter(buffer_id), config));
   if (config.process_stats_config().scan_all_processes_on_start()) {
-    ps_data_source->WriteAllProcesses();
+    data_source->WriteAllProcesses();
   }
+  return std::move(data_source);
 }
 
 void ProbesProducer::TearDownDataSourceInstance(DataSourceInstanceID id) {
   PERFETTO_LOG("Producer stop (id=%" PRIu64 ")", id);
-  // |id| could be the id of any of the datasources we handle:
-  PERFETTO_DCHECK((failed_sources_.count(id) + delegates_.count(id) +
-                   process_stats_sources_.count(id) +
-                   file_map_sources_.count(id)) == 1);
-  failed_sources_.erase(id);
-  delegates_.erase(id);
-  process_stats_sources_.erase(id);
-  file_map_sources_.erase(id);
+  auto it = data_sources_.find(id);
+  if (it == data_sources_.end()) {
+    PERFETTO_ELOG("Cannot stop data source id=%" PRIu64 ", not found", id);
+    return;
+  }
+  ProbesDataSource* data_source = it->second.get();
+  TracingSessionID session_id = data_source->tracing_session_id;
+  auto range = session_data_sources_.equal_range(session_id);
+  for (auto kv = range.first; kv != range.second; kv++) {
+    if (kv->second != data_source)
+      continue;
+    session_data_sources_.erase(kv);
+    break;
+  }
+  data_sources_.erase(it);
   watchdogs_.erase(id);
 }
 
@@ -255,26 +243,62 @@
                            const DataSourceInstanceID* data_source_ids,
                            size_t num_data_sources) {
   for (size_t i = 0; i < num_data_sources; i++) {
-    DataSourceInstanceID ds_id = data_source_ids[i];
-    {
-      auto it = process_stats_sources_.find(ds_id);
-      if (it != process_stats_sources_.end())
-        it->second->Flush();
-    }
-    {
-      auto it = file_map_sources_.find(ds_id);
-      if (it != file_map_sources_.end())
-        it->second->Flush();
-    }
-    {
-      auto it = delegates_.find(ds_id);
-      if (it != delegates_.end())
-        it->second->Flush();
-    }
+    auto it = data_sources_.find(data_source_ids[i]);
+    if (it == data_sources_.end())
+      continue;
+    it->second->Flush();
   }
   endpoint_->NotifyFlushComplete(flush_request_id);
 }
 
+// This function is called by the FtraceController in batches, whenever it has
+// read one or more pages from one or more cpus and written that into the
+// userspace tracing buffer. If more than one ftrace data sources are active,
+// this call typically happens after writing for all session has been handled.
+void ProbesProducer::OnFtraceDataWrittenIntoDataSourceBuffers() {
+  TracingSessionID last_session_id = 0;
+  FtraceMetadata* metadata = nullptr;
+  InodeFileDataSource* inode_data_source = nullptr;
+  ProcessStatsDataSource* ps_data_source = nullptr;
+
+  // unordered_multimap guarantees that entries with the same key are contiguous
+  // in the iteration.
+  for (auto it = session_data_sources_.begin(); /* check below*/; it++) {
+    // If this is the last iteration or this is the session id has changed,
+    // dispatch the metadata update to the linked data sources, if any.
+    if (it == session_data_sources_.end() || it->first != last_session_id) {
+      bool has_inodes = metadata && !metadata->inode_and_device.empty();
+      bool has_pids = metadata && !metadata->pids.empty();
+      if (has_inodes && inode_data_source)
+        inode_data_source->OnInodes(metadata->inode_and_device);
+      if (has_pids && ps_data_source)
+        ps_data_source->OnPids(metadata->pids);
+      if (metadata)
+        metadata->Clear();
+      metadata = nullptr;
+      inode_data_source = nullptr;
+      ps_data_source = nullptr;
+      if (it == session_data_sources_.end())
+        break;
+      last_session_id = it->first;
+    }
+    ProbesDataSource* ds = it->second;
+    switch (ds->type_id) {
+      case FtraceDataSource::kTypeId:
+        metadata = static_cast<FtraceDataSource*>(ds)->mutable_metadata();
+        break;
+      case InodeFileDataSource::kTypeId:
+        inode_data_source = static_cast<InodeFileDataSource*>(ds);
+        break;
+      case ProcessStatsDataSource::kTypeId:
+        ps_data_source = static_cast<ProcessStatsDataSource*>(ds);
+        break;
+      default:
+        PERFETTO_DCHECK(false);
+    }  // switch (type_id)
+  }    // for (session_data_sources_)
+}
+
 void ProbesProducer::ConnectWithRetries(const char* socket_name,
                                         base::TaskRunner* task_runner) {
   PERFETTO_DCHECK(state_ == kNotStarted);
@@ -303,81 +327,4 @@
   connection_backoff_ms_ = kInitialConnectionBackoffMs;
 }
 
-ProbesProducer::SinkDelegate::SinkDelegate(TracingSessionID id,
-                                           base::TaskRunner* task_runner,
-                                           std::unique_ptr<TraceWriter> writer)
-    : session_id_(id),
-      task_runner_(task_runner),
-      writer_(std::move(writer)),
-      weak_factory_(this) {}
-
-ProbesProducer::SinkDelegate::~SinkDelegate() = default;
-
-void ProbesProducer::SinkDelegate::OnCreate(FtraceSink* sink) {
-  sink->DumpFtraceStats(&stats_before_);
-}
-
-void ProbesProducer::SinkDelegate::Flush() {
-  // TODO(primiano): this still doesn't flush data from the kernel ftrace
-  // buffers (see b/73886018). We should do that and delay the
-  // NotifyFlushComplete() until the ftrace data has been drained from the
-  // kernel ftrace buffer and written in the SMB.
-  if (writer_ && (!trace_packet_ || trace_packet_->is_finalized())) {
-    WriteStats();
-    writer_->Flush();
-  }
-}
-
-void ProbesProducer::SinkDelegate::WriteStats() {
-  {
-    auto before_packet = writer_->NewTracePacket();
-    auto out = before_packet->set_ftrace_stats();
-    out->set_phase(protos::pbzero::FtraceStats_Phase_START_OF_TRACE);
-    stats_before_.Write(out);
-  }
-  {
-    FtraceStats stats_after{};
-    sink_->DumpFtraceStats(&stats_after);
-    auto after_packet = writer_->NewTracePacket();
-    auto out = after_packet->set_ftrace_stats();
-    out->set_phase(protos::pbzero::FtraceStats_Phase_END_OF_TRACE);
-    stats_after.Write(out);
-  }
-}
-
-ProbesProducer::FtraceBundleHandle
-ProbesProducer::SinkDelegate::GetBundleForCpu(size_t) {
-  trace_packet_ = writer_->NewTracePacket();
-  return FtraceBundleHandle(trace_packet_->set_ftrace_events());
-}
-
-void ProbesProducer::SinkDelegate::OnBundleComplete(
-    size_t,
-    FtraceBundleHandle,
-    const FtraceMetadata& metadata) {
-  trace_packet_->Finalize();
-
-  if (file_source_ && !metadata.inode_and_device.empty()) {
-    auto inodes = metadata.inode_and_device;
-    auto weak_file_source = file_source_;
-    task_runner_->PostTask([weak_file_source, inodes] {
-      if (weak_file_source)
-        weak_file_source->OnInodes(inodes);
-    });
-  }
-  if (ps_source_ && !metadata.pids.empty()) {
-    const auto& quirks = ps_source_->config().process_stats_config().quirks();
-    if (std::find(quirks.begin(), quirks.end(),
-                  ProcessStatsConfig::DISABLE_ON_DEMAND) != quirks.end()) {
-      return;
-    }
-    const auto& pids = metadata.pids;
-    auto weak_ps_source = ps_source_;
-    task_runner_->PostTask([weak_ps_source, pids] {
-      if (weak_ps_source)
-        weak_ps_source->OnPids(pids);
-    });
-  }
-}
-
 }  // namespace perfetto
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 049d58a..46e0f57 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -17,26 +17,30 @@
 #ifndef SRC_TRACED_PROBES_PROBES_PRODUCER_H_
 #define SRC_TRACED_PROBES_PROBES_PRODUCER_H_
 
-#include <map>
 #include <memory>
+#include <unordered_map>
 #include <utility>
 
 #include "perfetto/base/task_runner.h"
 #include "perfetto/base/watchdog.h"
+#include "perfetto/base/weak_ptr.h"
 #include "perfetto/tracing/core/producer.h"
 #include "perfetto/tracing/core/trace_writer.h"
 #include "perfetto/tracing/core/tracing_service.h"
 #include "src/traced/probes/filesystem/inode_file_data_source.h"
 #include "src/traced/probes/ftrace/ftrace_controller.h"
+#include "src/traced/probes/ftrace/ftrace_metadata.h"
 #include "src/traced/probes/ps/process_stats_data_source.h"
 
 #include "perfetto/trace/filesystem/inode_file_map.pbzero.h"
 
 namespace perfetto {
 
+class ProbesDataSource;
+
 const uint64_t kLRUInodeCacheSize = 1000;
 
-class ProbesProducer : public Producer {
+class ProbesProducer : public Producer, public FtraceController::Observer {
  public:
   ProbesProducer();
   ~ProbesProducer() override;
@@ -52,81 +56,26 @@
              const DataSourceInstanceID* data_source_ids,
              size_t num_data_sources) override;
 
+  // FtraceController::Observer implementation.
+  void OnFtraceDataWrittenIntoDataSourceBuffers() override;
+
   // Our Impl
   void ConnectWithRetries(const char* socket_name,
                           base::TaskRunner* task_runner);
-  bool CreateFtraceDataSourceInstance(TracingSessionID session_id,
-                                      DataSourceInstanceID id,
-                                      const DataSourceConfig& config);
-  void CreateProcessStatsDataSourceInstance(TracingSessionID session_id,
-                                            DataSourceInstanceID id,
-                                            const DataSourceConfig& config);
-  void CreateInodeFileDataSourceInstance(TracingSessionID session_id,
-                                         DataSourceInstanceID id,
-                                         DataSourceConfig config);
-
-  void OnMetadata(const FtraceMetadata& metadata);
+  std::unique_ptr<ProbesDataSource> CreateFtraceDataSource(
+      TracingSessionID session_id,
+      DataSourceInstanceID id,
+      const DataSourceConfig& config);
+  std::unique_ptr<ProbesDataSource> CreateProcessStatsDataSource(
+      TracingSessionID session_id,
+      DataSourceInstanceID id,
+      const DataSourceConfig& config);
+  std::unique_ptr<ProbesDataSource> CreateInodeFileDataSource(
+      TracingSessionID session_id,
+      DataSourceInstanceID id,
+      DataSourceConfig config);
 
  private:
-  using FtraceBundleHandle =
-      protozero::MessageHandle<protos::pbzero::FtraceEventBundle>;
-  using FtraceStatsHandle =
-      protozero::MessageHandle<protos::pbzero::FtraceStats>;
-
-  class SinkDelegate : public FtraceSink::Delegate {
-   public:
-    SinkDelegate(TracingSessionID,
-                 base::TaskRunner*,
-                 std::unique_ptr<TraceWriter>);
-    ~SinkDelegate() override;
-
-    TracingSessionID session_id() const { return session_id_; }
-
-    void Flush();
-
-    // FtraceDelegateImpl
-    FtraceBundleHandle GetBundleForCpu(size_t cpu) override;
-    void OnBundleComplete(size_t cpu,
-                          FtraceBundleHandle bundle,
-                          const FtraceMetadata& metadata) override;
-    void OnCreate(FtraceSink*) override;
-
-    void WriteStats();
-
-    void set_sink(std::unique_ptr<FtraceSink> sink) { sink_ = std::move(sink); }
-
-    void set_ps_source(base::WeakPtr<ProcessStatsDataSource> ptr) {
-      ps_source_ = std::move(ptr);
-    }
-    const base::WeakPtr<ProcessStatsDataSource>& ps_source() const {
-      return ps_source_;
-    }
-
-    void set_file_source(base::WeakPtr<InodeFileDataSource> ptr) {
-      file_source_ = std::move(ptr);
-    }
-    const base::WeakPtr<InodeFileDataSource>& file_source() const {
-      return file_source_;
-    }
-
-   private:
-    const TracingSessionID session_id_;
-    base::TaskRunner* task_runner_;
-    std::unique_ptr<FtraceSink> sink_ = nullptr;
-    std::unique_ptr<TraceWriter> writer_;
-    FtraceStats stats_before_ = {};
-
-    base::WeakPtr<ProcessStatsDataSource> ps_source_;
-    base::WeakPtr<InodeFileDataSource> file_source_;
-
-    // Keep this after the TraceWriter because TracePackets must not outlive
-    // their originating writer.
-    TraceWriter::TracePacketHandle trace_packet_;
-
-    // Keep this last.
-    base::WeakPtrFactory<SinkDelegate> weak_factory_;
-  };
-
   enum State {
     kNotStarted = 0,
     kNotConnected,
@@ -141,26 +90,29 @@
   void Restart();
   void ResetConnectionBackoff();
   void IncreaseConnectionBackoff();
-  void AddWatchdogsTimer(DataSourceInstanceID id,
-                         const DataSourceConfig& source_config);
 
   State state_ = kNotStarted;
   base::TaskRunner* task_runner_ = nullptr;
-  std::unique_ptr<TracingService::ProducerEndpoint> endpoint_ = nullptr;
-  std::unique_ptr<FtraceController> ftrace_ = nullptr;
+  std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
+  std::unique_ptr<FtraceController> ftrace_;
   bool ftrace_creation_failed_ = false;
   uint32_t connection_backoff_ms_ = 0;
   const char* socket_name_ = nullptr;
-  std::set<DataSourceInstanceID> failed_sources_;
-  std::map<DataSourceInstanceID, std::unique_ptr<ProcessStatsDataSource>>
-      process_stats_sources_;
-  std::map<DataSourceInstanceID, std::unique_ptr<SinkDelegate>> delegates_;
-  std::map<DataSourceInstanceID, base::Watchdog::Timer> watchdogs_;
-  std::map<DataSourceInstanceID, std::unique_ptr<InodeFileDataSource>>
-      file_map_sources_;
+
+  // Owning map for all active data sources.
+  std::unordered_map<DataSourceInstanceID, std::unique_ptr<ProbesDataSource>>
+      data_sources_;
+
+  // Keeps (pointers to) data sources ordered by session id.
+  std::unordered_multimap<TracingSessionID, ProbesDataSource*>
+      session_data_sources_;
+
+  std::unordered_map<DataSourceInstanceID, base::Watchdog::Timer> watchdogs_;
   LRUInodeCache cache_{kLRUInodeCacheSize};
   std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>
       system_inodes_;
+
+  base::WeakPtrFactory<ProbesProducer> weak_factory_;  // Keep last.
 };
 
 }  // namespace perfetto
diff --git a/src/traced/probes/ps/BUILD.gn b/src/traced/probes/ps/BUILD.gn
index a4b1a2c..902d850 100644
--- a/src/traced/probes/ps/BUILD.gn
+++ b/src/traced/probes/ps/BUILD.gn
@@ -17,6 +17,7 @@
     "../../../tracing",
   ]
   deps = [
+    "..:data_source",
     "../../../../gn:default_deps",
     "../../../../include/perfetto/traced",
     "../../../../protos/perfetto/trace/ps:zero",
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index 93e94ba..80d125b 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -18,6 +18,7 @@
 
 #include <stdlib.h>
 
+#include <algorithm>
 #include <utility>
 
 #include "perfetto/base/file_utils.h"
@@ -65,15 +66,23 @@
 
 }  // namespace
 
+// static
+constexpr int ProcessStatsDataSource::kTypeId;
+
 ProcessStatsDataSource::ProcessStatsDataSource(
-    TracingSessionID id,
+    TracingSessionID session_id,
     std::unique_ptr<TraceWriter> writer,
     const DataSourceConfig& config)
-    : session_id_(id),
+    : ProbesDataSource(session_id, kTypeId),
       writer_(std::move(writer)),
       config_(config),
       record_thread_names_(config.process_stats_config().record_thread_names()),
-      weak_factory_(this) {}
+      weak_factory_(this) {
+  const auto& quirks = config_.process_stats_config().quirks();
+  enable_on_demand_dumps_ =
+      (std::find(quirks.begin(), quirks.end(),
+                 ProcessStatsConfig::DISABLE_ON_DEMAND) == quirks.end());
+}
 
 ProcessStatsDataSource::~ProcessStatsDataSource() = default;
 
@@ -106,6 +115,8 @@
 }
 
 void ProcessStatsDataSource::OnPids(const std::vector<int32_t>& pids) {
+  if (!enable_on_demand_dumps_)
+    return;
   PERFETTO_DCHECK(!cur_ps_tree_);
   for (int32_t pid : pids) {
     if (seen_pids_.count(pid) || pid == 0)
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 32a86bd..866d2cf 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -25,6 +25,7 @@
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/trace_writer.h"
+#include "src/traced/probes/probes_data_source.h"
 
 namespace perfetto {
 
@@ -34,20 +35,21 @@
 }  // namespace pbzero
 }  // namespace protos
 
-class ProcessStatsDataSource {
+class ProcessStatsDataSource : public ProbesDataSource {
  public:
+  static constexpr int kTypeId = 3;
+
   ProcessStatsDataSource(TracingSessionID,
                          std::unique_ptr<TraceWriter> writer,
                          const DataSourceConfig&);
-  virtual ~ProcessStatsDataSource();
+  ~ProcessStatsDataSource() override;
 
-  TracingSessionID session_id() const { return session_id_; }
   const DataSourceConfig& config() const { return config_; }
 
   base::WeakPtr<ProcessStatsDataSource> GetWeakPtr() const;
   void WriteAllProcesses();
   void OnPids(const std::vector<int32_t>& pids);
-  void Flush();
+  void Flush() override;
 
   // Virtual for testing.
   virtual std::string ReadProcPidFile(int32_t pid, const std::string& file);
@@ -64,12 +66,12 @@
   protos::pbzero::ProcessTree* GetOrCreatePsTree();
   void FinalizeCurPsTree();
 
-  const TracingSessionID session_id_;
   std::unique_ptr<TraceWriter> writer_;
   const DataSourceConfig config_;
   TraceWriter::TracePacketHandle cur_packet_;
   protos::pbzero::ProcessTree* cur_ps_tree_ = nullptr;
   bool record_thread_names_ = false;
+  bool enable_on_demand_dumps_ = true;
 
   // This set contains PIDs as per the Linux kernel notion of a PID (which is
   // really a TID). In practice this set will contain all TIDs for all processes
diff --git a/tools/tmux b/tools/tmux
index bb88bc7..d5a8452 100755
--- a/tools/tmux
+++ b/tools/tmux
@@ -44,7 +44,7 @@
 }
 
 function adb_supports_push_sync {
-  adb --help | grep 'push.*\[--sync\]' 2>&1 >/dev/null
+  adb --help 2>&1 | grep 'push.*\[--sync\]' 2>&1 >/dev/null
 }
 
 function push {