Non-functional refactorings of ftrace controller/sink/metadata

This CL mainly moves classes defined in ftrace_controller.{cc,h} into
their own files. This is in preparation of future refactorings to
clean that up. Specifically this CL:
- Move process_stats_data_source into its own ps/ folder
  (no code changes).
- Move FtraceSink and FtraceMetadata into their own .cc/.h files
  (no code changes).
- Avoid re-derefering the WeakPtr in ftrace_controller and cache the pointer
  after we establish it's still alive.

Bug: 73886018
Change-Id: Idbb214d277e4fd03d65678da413c168d0a3deb8a
diff --git a/Android.bp b/Android.bp
index e3e4fbe..0cd32cc 100644
--- a/Android.bp
+++ b/Android.bp
@@ -70,11 +70,13 @@
     "src/traced/probes/ftrace/ftrace_config.cc",
     "src/traced/probes/ftrace/ftrace_config_muxer.cc",
     "src/traced/probes/ftrace/ftrace_controller.cc",
+    "src/traced/probes/ftrace/ftrace_metadata.cc",
     "src/traced/probes/ftrace/ftrace_procfs.cc",
+    "src/traced/probes/ftrace/ftrace_sink.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/probes.cc",
     "src/traced/probes/probes_producer.cc",
-    "src/traced/probes/process_stats_data_source.cc",
+    "src/traced/probes/ps/process_stats_data_source.cc",
     "src/traced/service/service.cc",
     "src/tracing/core/chrome_config.cc",
     "src/tracing/core/commit_data_request.cc",
@@ -325,12 +327,14 @@
     "src/traced/probes/ftrace/ftrace_config.cc",
     "src/traced/probes/ftrace/ftrace_config_muxer.cc",
     "src/traced/probes/ftrace/ftrace_controller.cc",
+    "src/traced/probes/ftrace/ftrace_metadata.cc",
     "src/traced/probes/ftrace/ftrace_procfs.cc",
     "src/traced/probes/ftrace/ftrace_procfs_integrationtest.cc",
+    "src/traced/probes/ftrace/ftrace_sink.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/ftrace/test/cpu_reader_support.cc",
     "src/traced/probes/probes_producer.cc",
-    "src/traced/probes/process_stats_data_source.cc",
+    "src/traced/probes/ps/process_stats_data_source.cc",
     "src/tracing/core/chrome_config.cc",
     "src/tracing/core/commit_data_request.cc",
     "src/tracing/core/data_source_config.cc",
@@ -3725,14 +3729,16 @@
     "src/traced/probes/ftrace/ftrace_config_unittest.cc",
     "src/traced/probes/ftrace/ftrace_controller.cc",
     "src/traced/probes/ftrace/ftrace_controller_unittest.cc",
+    "src/traced/probes/ftrace/ftrace_metadata.cc",
     "src/traced/probes/ftrace/ftrace_procfs.cc",
     "src/traced/probes/ftrace/ftrace_procfs_unittest.cc",
+    "src/traced/probes/ftrace/ftrace_sink.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/ftrace/proto_translation_table_unittest.cc",
     "src/traced/probes/ftrace/test/cpu_reader_support.cc",
     "src/traced/probes/probes_producer.cc",
-    "src/traced/probes/process_stats_data_source.cc",
-    "src/traced/probes/process_stats_data_source_unittest.cc",
+    "src/traced/probes/ps/process_stats_data_source.cc",
+    "src/traced/probes/ps/process_stats_data_source_unittest.cc",
     "src/tracing/core/chrome_config.cc",
     "src/tracing/core/commit_data_request.cc",
     "src/tracing/core/data_source_config.cc",
diff --git a/src/traced/probes/BUILD.gn b/src/traced/probes/BUILD.gn
index 18cd02f..6f33405 100644
--- a/src/traced/probes/BUILD.gn
+++ b/src/traced/probes/BUILD.gn
@@ -38,12 +38,11 @@
     "../../tracing:ipc",
     "../../tracing:tracing",
     "filesystem",
+    "ps",
   ]
   sources = [
     "probes_producer.cc",
     "probes_producer.h",
-    "process_stats_data_source.cc",
-    "process_stats_data_source.h",
   ]
 }
 
@@ -54,8 +53,7 @@
     "../../../gn:default_deps",
     "../../../gn:gtest_deps",
     "../../tracing:test_support",
-  ]
-  sources = [
-    "process_stats_data_source_unittest.cc",
+    "filesystem:unittests",
+    "ps:unittests",
   ]
 }
diff --git a/src/traced/probes/ftrace/BUILD.gn b/src/traced/probes/ftrace/BUILD.gn
index 7e8e298..bdbf3f9 100644
--- a/src/traced/probes/ftrace/BUILD.gn
+++ b/src/traced/probes/ftrace/BUILD.gn
@@ -132,8 +132,12 @@
     "ftrace_config_muxer.h",
     "ftrace_controller.cc",
     "ftrace_controller.h",
+    "ftrace_metadata.cc",
+    "ftrace_metadata.h",
     "ftrace_procfs.cc",
     "ftrace_procfs.h",
+    "ftrace_sink.cc",
+    "ftrace_sink.h",
     "proto_translation_table.cc",
     "proto_translation_table.h",
   ]
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index a81921d..875bb63 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -173,7 +173,7 @@
 
   worker_thread_ =
       std::thread(std::bind(&RunWorkerThread, cpu_, *trace_fd_,
-                            *staging_write_fd_, on_data_available, &exiting_));
+                            *staging_write_fd_, on_data_available, &cmd_));
 }
 
 CpuReader::~CpuReader() {
@@ -183,7 +183,7 @@
   // trace fd (which prevents another splice from starting), raise SIGPIPE and
   // wait for the worker to exit (i.e., to guarantee no splice is in progress)
   // and only then close the staging pipe.
-  exiting_ = true;
+  cmd_ = ThreadCtl::kExit;
   trace_fd_.reset();
   pthread_kill(worker_thread_.native_handle(), SIGPIPE);
   worker_thread_.join();
@@ -194,7 +194,7 @@
                                 int trace_fd,
                                 int staging_write_fd,
                                 const std::function<void()>& on_data_available,
-                                std::atomic<bool>* exiting) {
+                                std::atomic<ThreadCtl>* cmd_atomic) {
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
   // This thread is responsible for moving data from the trace pipe into the
@@ -221,14 +221,19 @@
       // The kernel ftrace code has its own splice() implementation that can
       // occasionally fail with transient errors not reported in man 2 splice.
       // Just try again if we see these.
-      if (errno == ENOMEM || errno == EBUSY || (errno == EINTR && !*exiting)) {
+      ThreadCtl cmd = *cmd_atomic;
+      if (errno == ENOMEM || errno == EBUSY ||
+          (errno == EINTR && cmd == ThreadCtl::kRun)) {
         PERFETTO_DPLOG("Transient splice failure -- retrying");
         usleep(100 * 1000);
         continue;
       }
-      PERFETTO_DPLOG("Stopping CPUReader loop for CPU %zd.", cpu);
-      PERFETTO_DCHECK(errno == EPIPE || errno == EINTR || errno == EBADF);
-      break;  // ~CpuReader is waiting to join this thread.
+      if (cmd == ThreadCtl::kExit) {
+        PERFETTO_DPLOG("Stopping CPUReader loop for CPU %zd.", cpu);
+        PERFETTO_DCHECK(errno == EPIPE || errno == EINTR || errno == EBADF);
+        break;  // ~CpuReader is waiting to join this thread.
+      }
+      PERFETTO_FATAL("Unexpected ThreadCtl value: %d", int(cmd));
     }
 
     // Then do as many non-blocking splices as we can. This moves any full
@@ -257,7 +262,7 @@
   base::ignore_result(trace_fd);
   base::ignore_result(staging_write_fd);
   base::ignore_result(on_data_available);
-  base::ignore_result(exiting);
+  base::ignore_result(cmd_atomic);
   PERFETTO_ELOG("Supported only on Linux/Android");
 #endif
 }
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index a8aa748..77cdd46 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -200,11 +200,12 @@
                          FtraceMetadata* metadata);
 
  private:
+  enum ThreadCtl : uint32_t { kRun = 0, kExit };
   static void RunWorkerThread(size_t cpu,
                               int trace_fd,
                               int staging_write_fd,
                               const std::function<void()>& on_data_available,
-                              std::atomic<bool>* exiting);
+                              std::atomic<ThreadCtl>* cmd_atomic);
 
   uint8_t* GetBuffer();
   CpuReader(const CpuReader&) = delete;
@@ -217,7 +218,7 @@
   base::ScopedFile staging_write_fd_;
   base::PageAllocator::UniquePtr buffer_;
   std::thread worker_thread_;
-  std::atomic<bool> exiting_{false};
+  std::atomic<ThreadCtl> cmd_{kRun};
   PERFETTO_THREAD_CHECKER(thread_checker_)
 };
 
diff --git a/src/traced/probes/ftrace/ftrace_controller.cc b/src/traced/probes/ftrace/ftrace_controller.cc
index 39ba477..cdb6b49 100644
--- a/src/traced/probes/ftrace/ftrace_controller.cc
+++ b/src/traced/probes/ftrace/ftrace_controller.cc
@@ -35,6 +35,7 @@
 #include "src/traced/probes/ftrace/cpu_reader.h"
 #include "src/traced/probes/ftrace/cpu_stats_parser.h"
 #include "src/traced/probes/ftrace/event_info.h"
+#include "src/traced/probes/ftrace/ftrace_config.h"
 #include "src/traced/probes/ftrace/ftrace_config_muxer.h"
 #include "src/traced/probes/ftrace/ftrace_procfs.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
@@ -152,45 +153,48 @@
 void FtraceController::DrainCPUs(base::WeakPtr<FtraceController> weak_this,
                                  size_t generation) {
   // The controller might be gone.
-  if (!weak_this)
+  FtraceController* ctrl = weak_this.get();
+  if (!ctrl)
     return;
+
   // We might have stopped tracing then quickly re-enabled it, in this case
   // we don't want to end up with two periodic tasks for each CPU:
-  if (weak_this->generation_ != generation)
+  if (ctrl->generation_ != generation)
     return;
 
-  PERFETTO_DCHECK_THREAD(weak_this->thread_checker_);
+  PERFETTO_DCHECK_THREAD(ctrl->thread_checker_);
   std::bitset<kMaxCpus> cpus_to_drain;
   {
-    std::unique_lock<std::mutex> lock(weak_this->lock_);
+    std::unique_lock<std::mutex> lock(ctrl->lock_);
     // We might have stopped caring about events.
-    if (!weak_this->listening_for_raw_trace_data_)
+    if (!ctrl->listening_for_raw_trace_data_)
       return;
-    std::swap(cpus_to_drain, weak_this->cpus_to_drain_);
+    std::swap(cpus_to_drain, ctrl->cpus_to_drain_);
   }
 
-  for (size_t cpu = 0; cpu < weak_this->ftrace_procfs_->NumberOfCpus(); cpu++) {
+  for (size_t cpu = 0; cpu < ctrl->ftrace_procfs_->NumberOfCpus(); cpu++) {
     if (!cpus_to_drain[cpu])
       continue;
-    weak_this->OnRawFtraceDataAvailable(cpu);
+    ctrl->OnRawFtraceDataAvailable(cpu);
   }
 
   // If we filled up any SHM pages while draining the data, we will have posted
   // a task to notify traced about this. Only unblock the readers after this
   // notification is sent to make it less likely that they steal CPU time away
   // from traced.
-  weak_this->task_runner_->PostTask(
+  ctrl->task_runner_->PostTask(
       std::bind(&FtraceController::UnblockReaders, weak_this));
 }
 
 // static
 void FtraceController::UnblockReaders(
     const base::WeakPtr<FtraceController>& weak_this) {
-  if (!weak_this)
+  FtraceController* ctrl = weak_this.get();
+  if (!ctrl)
     return;
   // Unblock all waiting readers to start moving more data into their
   // respective staging pipes.
-  weak_this->data_drained_.notify_all();
+  ctrl->data_drained_.notify_all();
 }
 
 void FtraceController::StartIfNeeded() {
@@ -205,7 +209,7 @@
   generation_++;
   base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
   for (size_t cpu = 0; cpu < ftrace_procfs_->NumberOfCpus(); cpu++) {
-    readers_.emplace(
+    cpu_readers_.emplace(
         cpu, std::unique_ptr<CpuReader>(new CpuReader(
                  table_.get(), cpu, ftrace_procfs_->OpenPipeForCpu(cpu),
                  std::bind(&FtraceController::OnDataAvailable, this, weak_this,
@@ -246,12 +250,12 @@
     cpus_to_drain_.reset();
   }
   data_drained_.notify_all();
-  readers_.clear();
+  cpu_readers_.clear();
 }
 
 void FtraceController::OnRawFtraceDataAvailable(size_t cpu) {
   PERFETTO_CHECK(cpu < ftrace_procfs_->NumberOfCpus());
-  CpuReader* reader = readers_[cpu].get();
+  CpuReader* reader = cpu_readers_[cpu].get();
   using BundleHandle =
       protozero::MessageHandle<protos::pbzero::FtraceEventBundle>;
   std::array<const EventFilter*, kMaxSinks> filters{};
@@ -346,31 +350,6 @@
   DumpAllCpuStats(ftrace_procfs_.get(), stats);
 }
 
-FtraceSink::FtraceSink(base::WeakPtr<FtraceController> controller_weak,
-                       FtraceConfigId id,
-                       FtraceConfig config,
-                       std::unique_ptr<EventFilter> filter,
-                       Delegate* delegate)
-    : controller_weak_(std::move(controller_weak)),
-      id_(id),
-      config_(std::move(config)),
-      filter_(std::move(filter)),
-      delegate_(delegate){};
-
-FtraceSink::~FtraceSink() {
-  if (controller_weak_)
-    controller_weak_->Unregister(this);
-};
-
-const std::set<std::string>& FtraceSink::enabled_events() {
-  return filter_->enabled_names();
-}
-
-void FtraceSink::DumpFtraceStats(FtraceStats* stats) {
-  if (controller_weak_)
-    controller_weak_->DumpFtraceStats(stats);
-}
-
 void FtraceStats::Write(protos::pbzero::FtraceStats* writer) const {
   for (const FtraceCpuStats& cpu_specific_stats : cpu_stats) {
     cpu_specific_stats.Write(writer->add_cpu_stats());
@@ -389,63 +368,4 @@
   writer->set_read_events(read_events);
 }
 
-FtraceMetadata::FtraceMetadata() {
-  // A lot of the time there will only be a small number of inodes.
-  inode_and_device.reserve(10);
-  pids.reserve(10);
-}
-
-void FtraceMetadata::AddDevice(BlockDeviceID device_id) {
-  last_seen_device_id = device_id;
-#if PERFETTO_DCHECK_IS_ON()
-  seen_device_id = true;
-#endif
-}
-
-void FtraceMetadata::AddInode(Inode inode_number) {
-#if PERFETTO_DCHECK_IS_ON()
-  PERFETTO_DCHECK(seen_device_id);
-#endif
-  static int32_t cached_pid = 0;
-  if (!cached_pid)
-    cached_pid = getpid();
-
-  PERFETTO_DCHECK(last_seen_common_pid);
-  PERFETTO_DCHECK(cached_pid == getpid());
-  // Ignore own scanning activity.
-  if (cached_pid != last_seen_common_pid) {
-    inode_and_device.push_back(
-        std::make_pair(inode_number, last_seen_device_id));
-  }
-}
-
-void FtraceMetadata::AddCommonPid(int32_t pid) {
-  last_seen_common_pid = pid;
-}
-
-void FtraceMetadata::AddPid(int32_t pid) {
-  // Speculative optimization aginst repated pid's while keeping
-  // faster insertion than a set.
-  if (!pids.empty() && pids.back() == pid)
-    return;
-  pids.push_back(pid);
-}
-
-void FtraceMetadata::FinishEvent() {
-  last_seen_device_id = 0;
-#if PERFETTO_DCHECK_IS_ON()
-  seen_device_id = false;
-#endif
-  last_seen_common_pid = 0;
-}
-
-void FtraceMetadata::Clear() {
-  inode_and_device.clear();
-  pids.clear();
-  overwrite_count = 0;
-  FinishEvent();
-}
-
-FtraceSink::Delegate::~Delegate() = default;
-
 }  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index 381ec83..3a73771 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -19,7 +19,6 @@
 
 #include <unistd.h>
 
-#include <sys/stat.h>
 #include <bitset>
 #include <condition_variable>
 #include <map>
@@ -35,7 +34,7 @@
 #include "perfetto/base/weak_ptr.h"
 #include "perfetto/protozero/message_handle.h"
 #include "perfetto/traced/data_source_types.h"
-#include "src/traced/probes/ftrace/ftrace_config.h"
+#include "src/traced/probes/ftrace/ftrace_sink.h"
 
 namespace perfetto {
 
@@ -47,9 +46,6 @@
 }  // namespace pbzero
 }  // namespace protos
 
-using BlockDeviceID = decltype(stat::st_dev);
-using Inode = decltype(stat::st_ino);
-
 struct FtraceCpuStats {
   uint64_t cpu;
   uint64_t entries;
@@ -70,28 +66,6 @@
   void Write(protos::pbzero::FtraceStats*) const;
 };
 
-struct FtraceMetadata {
-  FtraceMetadata();
-
-  uint32_t overwrite_count;
-  BlockDeviceID last_seen_device_id = 0;
-#if PERFETTO_DCHECK_IS_ON()
-  bool seen_device_id = false;
-#endif
-  int32_t last_seen_common_pid = 0;
-
-  // A vector not a set to keep the writer_fast.
-  std::vector<std::pair<Inode, BlockDeviceID>> inode_and_device;
-  std::vector<int32_t> pids;
-
-  void AddDevice(BlockDeviceID);
-  void AddInode(Inode);
-  void AddPid(int32_t);
-  void AddCommonPid(int32_t);
-  void Clear();
-  void FinishEvent();
-};
-
 constexpr size_t kMaxSinks = 32;
 constexpr size_t kMaxCpus = 64;
 
@@ -100,68 +74,12 @@
 
 class CpuReader;
 class EventFilter;
+class FtraceConfig;
 class FtraceController;
 class FtraceConfigMuxer;
 class FtraceProcfs;
 class ProtoTranslationTable;
 
-// To consume ftrace data clients implement a |FtraceSink::Delegate| and use it
-// to create a |FtraceSink|. While the FtraceSink lives FtraceController will
-// call |GetBundleForCpu|, write data into the bundle then call
-// |OnBundleComplete| allowing the client to perform finalization.
-class FtraceSink {
- public:
-  using FtraceEventBundle = protos::pbzero::FtraceEventBundle;
-  class Delegate {
-   public:
-    virtual void OnCreate(FtraceSink*) {}
-    virtual protozero::MessageHandle<FtraceEventBundle> GetBundleForCpu(
-        size_t) = 0;
-    virtual void OnBundleComplete(size_t,
-                                  protozero::MessageHandle<FtraceEventBundle>,
-                                  const FtraceMetadata&) = 0;
-    virtual ~Delegate();
-  };
-
-  FtraceSink(base::WeakPtr<FtraceController>,
-             FtraceConfigId id,
-             FtraceConfig config,
-             std::unique_ptr<EventFilter>,
-             Delegate*);
-  ~FtraceSink();
-
-  void DumpFtraceStats(FtraceStats*);
-
-  const FtraceConfig& config() const { return config_; }
-
- private:
-  friend FtraceController;
-
-  FtraceSink(const FtraceSink&) = delete;
-  FtraceSink& operator=(const FtraceSink&) = delete;
-
-  EventFilter* event_filter() { return filter_.get(); }
-  FtraceMetadata* metadata_mutable() { return &metadata_; }
-
-  protozero::MessageHandle<FtraceEventBundle> GetBundleForCpu(size_t cpu) {
-    return delegate_->GetBundleForCpu(cpu);
-  }
-  void OnBundleComplete(size_t cpu,
-                        protozero::MessageHandle<FtraceEventBundle> bundle) {
-    delegate_->OnBundleComplete(cpu, std::move(bundle), metadata_);
-    metadata_.Clear();
-  }
-
-  const std::set<std::string>& enabled_events();
-
-  base::WeakPtr<FtraceController> controller_weak_;
-  const FtraceConfigId id_;
-  const FtraceConfig config_;
-  std::unique_ptr<EventFilter> filter_;
-  FtraceMetadata metadata_;
-  FtraceSink::Delegate* delegate_;
-};
-
 // Utility class for controlling ftrace.
 class FtraceController {
  public:
@@ -230,7 +148,7 @@
   size_t generation_ = 0;
   bool atrace_running_ = false;
   base::TaskRunner* task_runner_ = nullptr;
-  std::map<size_t, std::unique_ptr<CpuReader>> readers_;
+  std::map<size_t, std::unique_ptr<CpuReader>> cpu_readers_;
   std::set<FtraceSink*> sinks_;
   base::WeakPtrFactory<FtraceController> weak_factory_;
   PERFETTO_THREAD_CHECKER(thread_checker_)
diff --git a/src/traced/probes/ftrace/ftrace_metadata.cc b/src/traced/probes/ftrace/ftrace_metadata.cc
new file mode 100644
index 0000000..36c4287
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_metadata.cc
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/ftrace/ftrace_metadata.h"
+
+namespace perfetto {
+
+FtraceMetadata::FtraceMetadata() {
+  // A lot of the time there will only be a small number of inodes.
+  inode_and_device.reserve(10);
+  pids.reserve(10);
+}
+
+void FtraceMetadata::AddDevice(BlockDeviceID device_id) {
+  last_seen_device_id = device_id;
+#if PERFETTO_DCHECK_IS_ON()
+  seen_device_id = true;
+#endif
+}
+
+void FtraceMetadata::AddInode(Inode inode_number) {
+#if PERFETTO_DCHECK_IS_ON()
+  PERFETTO_DCHECK(seen_device_id);
+#endif
+  static int32_t cached_pid = 0;
+  if (!cached_pid)
+    cached_pid = getpid();
+
+  PERFETTO_DCHECK(last_seen_common_pid);
+  PERFETTO_DCHECK(cached_pid == getpid());
+  // Ignore own scanning activity.
+  if (cached_pid != last_seen_common_pid) {
+    inode_and_device.push_back(
+        std::make_pair(inode_number, last_seen_device_id));
+  }
+}
+
+void FtraceMetadata::AddCommonPid(int32_t pid) {
+  last_seen_common_pid = pid;
+}
+
+void FtraceMetadata::AddPid(int32_t pid) {
+  // Speculative optimization aginst repated pid's while keeping
+  // faster insertion than a set.
+  if (!pids.empty() && pids.back() == pid)
+    return;
+  pids.push_back(pid);
+}
+
+void FtraceMetadata::FinishEvent() {
+  last_seen_device_id = 0;
+#if PERFETTO_DCHECK_IS_ON()
+  seen_device_id = false;
+#endif
+  last_seen_common_pid = 0;
+}
+
+void FtraceMetadata::Clear() {
+  inode_and_device.clear();
+  pids.clear();
+  overwrite_count = 0;
+  FinishEvent();
+}
+
+}  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_metadata.h b/src/traced/probes/ftrace/ftrace_metadata.h
new file mode 100644
index 0000000..eab94ef
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_metadata.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_METADATA_H_
+#define SRC_TRACED_PROBES_FTRACE_FTRACE_METADATA_H_
+
+#include <stdint.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/logging.h"
+
+namespace perfetto {
+
+using BlockDeviceID = decltype(stat::st_dev);
+using Inode = decltype(stat::st_ino);
+
+struct FtraceMetadata {
+  FtraceMetadata();
+
+  uint32_t overwrite_count;
+  BlockDeviceID last_seen_device_id = 0;
+#if PERFETTO_DCHECK_IS_ON()
+  bool seen_device_id = false;
+#endif
+  int32_t last_seen_common_pid = 0;
+
+  // A vector not a set to keep the writer_fast.
+  std::vector<std::pair<Inode, BlockDeviceID>> inode_and_device;
+  std::vector<int32_t> pids;
+
+  void AddDevice(BlockDeviceID);
+  void AddInode(Inode);
+  void AddPid(int32_t);
+  void AddCommonPid(int32_t);
+  void Clear();
+  void FinishEvent();
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_FTRACE_FTRACE_METADATA_H_
diff --git a/src/traced/probes/ftrace/ftrace_sink.cc b/src/traced/probes/ftrace/ftrace_sink.cc
new file mode 100644
index 0000000..70908c0
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_sink.cc
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/ftrace/ftrace_sink.h"
+
+#include "src/traced/probes/ftrace/cpu_reader.h"
+#include "src/traced/probes/ftrace/ftrace_controller.h"
+
+namespace perfetto {
+
+FtraceSink::FtraceSink(base::WeakPtr<FtraceController> controller_weak,
+                       FtraceConfigId id,
+                       FtraceConfig config,
+                       std::unique_ptr<EventFilter> filter,
+                       Delegate* delegate)
+    : controller_weak_(std::move(controller_weak)),
+      id_(id),
+      config_(std::move(config)),
+      filter_(std::move(filter)),
+      delegate_(delegate){};
+
+FtraceSink::~FtraceSink() {
+  if (controller_weak_)
+    controller_weak_->Unregister(this);
+};
+
+const std::set<std::string>& FtraceSink::enabled_events() {
+  return filter_->enabled_names();
+}
+
+void FtraceSink::DumpFtraceStats(FtraceStats* stats) {
+  if (controller_weak_)
+    controller_weak_->DumpFtraceStats(stats);
+}
+
+FtraceSink::Delegate::~Delegate() = default;
+
+}  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_sink.h b/src/traced/probes/ftrace/ftrace_sink.h
new file mode 100644
index 0000000..a95b4c7
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_sink.h
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_SINK_H_
+#define SRC_TRACED_PROBES_FTRACE_FTRACE_SINK_H_
+
+#include <memory>
+#include <set>
+#include <string>
+
+#include "perfetto/base/scoped_file.h"
+#include "perfetto/base/weak_ptr.h"
+#include "perfetto/protozero/message_handle.h"
+#include "src/traced/probes/ftrace/ftrace_config.h"
+#include "src/traced/probes/ftrace/ftrace_metadata.h"
+
+namespace perfetto {
+
+class EventFilter;
+class FtraceController;
+struct FtraceStats;
+
+namespace protos {
+namespace pbzero {
+class FtraceEventBundle;
+}  // namespace pbzero
+}  // namespace protos
+
+// To consume ftrace data clients implement a |FtraceSink::Delegate| and use it
+// to create a |FtraceSink|. While the FtraceSink lives FtraceController will
+// call |GetBundleForCpu|, write data into the bundle then call
+// |OnBundleComplete| allowing the client to perform finalization.
+class FtraceSink {
+ public:
+  using FtraceEventBundle = protos::pbzero::FtraceEventBundle;
+  class Delegate {
+   public:
+    virtual void OnCreate(FtraceSink*) {}
+    virtual protozero::MessageHandle<FtraceEventBundle> GetBundleForCpu(
+        size_t) = 0;
+    virtual void OnBundleComplete(size_t,
+                                  protozero::MessageHandle<FtraceEventBundle>,
+                                  const FtraceMetadata&) = 0;
+    virtual ~Delegate();
+  };
+
+  FtraceSink(base::WeakPtr<FtraceController>,
+             FtraceConfigId id,
+             FtraceConfig config,
+             std::unique_ptr<EventFilter>,
+             Delegate*);
+  ~FtraceSink();
+
+  void DumpFtraceStats(FtraceStats*);
+
+  const FtraceConfig& config() const { return config_; }
+
+ private:
+  friend FtraceController;
+
+  FtraceSink(const FtraceSink&) = delete;
+  FtraceSink& operator=(const FtraceSink&) = delete;
+
+  EventFilter* event_filter() { return filter_.get(); }
+  FtraceMetadata* metadata_mutable() { return &metadata_; }
+
+  protozero::MessageHandle<FtraceEventBundle> GetBundleForCpu(size_t cpu) {
+    return delegate_->GetBundleForCpu(cpu);
+  }
+  void OnBundleComplete(size_t cpu,
+                        protozero::MessageHandle<FtraceEventBundle> bundle) {
+    delegate_->OnBundleComplete(cpu, std::move(bundle), metadata_);
+    metadata_.Clear();
+  }
+
+  const std::set<std::string>& enabled_events();
+
+  base::WeakPtr<FtraceController> controller_weak_;
+  const FtraceConfigId id_;
+  const FtraceConfig config_;
+  std::unique_ptr<EventFilter> filter_;
+  FtraceMetadata metadata_;
+  FtraceSink::Delegate* delegate_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_FTRACE_FTRACE_SINK_H_
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 210aece..049d58a 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -28,7 +28,7 @@
 #include "perfetto/tracing/core/tracing_service.h"
 #include "src/traced/probes/filesystem/inode_file_data_source.h"
 #include "src/traced/probes/ftrace/ftrace_controller.h"
-#include "src/traced/probes/process_stats_data_source.h"
+#include "src/traced/probes/ps/process_stats_data_source.h"
 
 #include "perfetto/trace/filesystem/inode_file_map.pbzero.h"
 
diff --git a/src/traced/probes/ps/BUILD.gn b/src/traced/probes/ps/BUILD.gn
new file mode 100644
index 0000000..a4b1a2c
--- /dev/null
+++ b/src/traced/probes/ps/BUILD.gn
@@ -0,0 +1,44 @@
+# Copyright (C) 2018 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source_set("ps") {
+  public_deps = [
+    "../../../tracing",
+  ]
+  deps = [
+    "../../../../gn:default_deps",
+    "../../../../include/perfetto/traced",
+    "../../../../protos/perfetto/trace/ps:zero",
+    "../../../base",
+  ]
+  sources = [
+    "process_stats_data_source.cc",
+    "process_stats_data_source.h",
+  ]
+}
+
+source_set("unittests") {
+  testonly = true
+  deps = [
+    ":ps",
+    "../../../../gn:default_deps",
+    "../../../../gn:gtest_deps",
+    "../../../../protos/perfetto/trace:lite",
+    "../../../../src/base:test_support",
+    "../../../../src/tracing:test_support",
+  ]
+  sources = [
+    "process_stats_data_source_unittest.cc",
+  ]
+}
diff --git a/src/traced/probes/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
similarity index 98%
rename from src/traced/probes/process_stats_data_source.cc
rename to src/traced/probes/ps/process_stats_data_source.cc
index 882676a..93e94ba 100644
--- a/src/traced/probes/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#include "src/traced/probes/process_stats_data_source.h"
+#include "src/traced/probes/ps/process_stats_data_source.h"
 
 #include <stdlib.h>
 
@@ -23,6 +23,8 @@
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/string_splitter.h"
+
+#include "perfetto/trace/ps/process_tree.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 
 // TODO(primiano): the code in this file assumes that PIDs are never recycled
diff --git a/src/traced/probes/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
similarity index 90%
rename from src/traced/probes/process_stats_data_source.h
rename to src/traced/probes/ps/process_stats_data_source.h
index 0274487..32a86bd 100644
--- a/src/traced/probes/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -14,21 +14,26 @@
  * limitations under the License.
  */
 
-#ifndef SRC_TRACED_PROBES_PROCESS_STATS_DATA_SOURCE_H_
-#define SRC_TRACED_PROBES_PROCESS_STATS_DATA_SOURCE_H_
+#ifndef SRC_TRACED_PROBES_PS_PROCESS_STATS_DATA_SOURCE_H_
+#define SRC_TRACED_PROBES_PS_PROCESS_STATS_DATA_SOURCE_H_
 
 #include <memory>
 #include <set>
 #include <vector>
 
 #include "perfetto/base/weak_ptr.h"
-#include "perfetto/trace/ps/process_tree.pbzero.h"
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/trace_writer.h"
 
 namespace perfetto {
 
+namespace protos {
+namespace pbzero {
+class ProcessTree;
+}  // namespace pbzero
+}  // namespace protos
+
 class ProcessStatsDataSource {
  public:
   ProcessStatsDataSource(TracingSessionID,
@@ -77,4 +82,4 @@
 
 }  // namespace perfetto
 
-#endif  // SRC_TRACED_PROBES_PROCESS_STATS_DATA_SOURCE_H_
+#endif  // SRC_TRACED_PROBES_PS_PROCESS_STATS_DATA_SOURCE_H_
diff --git a/src/traced/probes/process_stats_data_source_unittest.cc b/src/traced/probes/ps/process_stats_data_source_unittest.cc
similarity index 98%
rename from src/traced/probes/process_stats_data_source_unittest.cc
rename to src/traced/probes/ps/process_stats_data_source_unittest.cc
index ce3db7b..98e0191 100644
--- a/src/traced/probes/process_stats_data_source_unittest.cc
+++ b/src/traced/probes/ps/process_stats_data_source_unittest.cc
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#include "src/traced/probes/process_stats_data_source.h"
+#include "src/traced/probes/ps/process_stats_data_source.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "perfetto/trace/trace_packet.pb.h"