probes + trace_processor: Polling of /proc/[pid]/status mem counters

This CL adds support for polling a bunch of memory counters from
/proc/[pid]/status at a configurable rate.
It also adds the trace_processor support to ingest those counters.


Bug: 117644900
Test: perfetto_unittests
Change-Id: Iad7874d62b3e2144dccfaea4d95eea977476315d
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 65c0890..c309180 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -239,7 +239,8 @@
   base::ignore_result(id);
   auto buffer_id = static_cast<BufferID>(config.target_buffer());
   return std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
-      session_id, endpoint_->CreateTraceWriter(buffer_id), config));
+      task_runner_, session_id, endpoint_->CreateTraceWriter(buffer_id),
+      config));
 }
 
 std::unique_ptr<SysStatsDataSource> ProbesProducer::CreateSysStatsDataSource(
@@ -329,9 +330,17 @@
       case InodeFileDataSource::kTypeId:
         inode_data_source = static_cast<InodeFileDataSource*>(ds);
         break;
-      case ProcessStatsDataSource::kTypeId:
-        ps_data_source = static_cast<ProcessStatsDataSource*>(ds);
+      case ProcessStatsDataSource::kTypeId: {
+        // A trace session might have declared more than one ps data source.
+        // In those cases we often use one for a full dump on startup (
+        // targeting a dedicated buffer) and another one for on-demand dumps
+        // targeting the main buffer.
+        // Only use the one that has on-demand dumps enabled, if any.
+        auto ps = static_cast<ProcessStatsDataSource*>(ds);
+        if (ps->on_demand_dumps_enabled())
+          ps_data_source = ps;
         break;
+      }
       case SysStatsDataSource::kTypeId:
         break;
       default:
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index 160484f..9bf4538 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -22,9 +22,13 @@
 #include <utility>
 
 #include "perfetto/base/file_utils.h"
+#include "perfetto/base/metatrace.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/string_splitter.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/base/time.h"
 
+#include "perfetto/trace/ps/process_stats.pbzero.h"
 #include "perfetto/trace/ps/process_tree.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 
@@ -64,25 +68,33 @@
   return atoi(str.c_str());
 }
 
+inline uint32_t ToU32(const char* str) {
+  return static_cast<uint32_t>(strtol(str, nullptr, 10));
+}
+
 }  // namespace
 
 // static
 constexpr int ProcessStatsDataSource::kTypeId;
 
 ProcessStatsDataSource::ProcessStatsDataSource(
+    base::TaskRunner* task_runner,
     TracingSessionID session_id,
     std::unique_ptr<TraceWriter> writer,
     const DataSourceConfig& config)
     : ProbesDataSource(session_id, kTypeId),
+      task_runner_(task_runner),
       writer_(std::move(writer)),
       record_thread_names_(config.process_stats_config().record_thread_names()),
       dump_all_procs_on_start_(
           config.process_stats_config().scan_all_processes_on_start()),
       weak_factory_(this) {
-  const auto& quirks = config.process_stats_config().quirks();
+  const auto& ps_config = config.process_stats_config();
+  const auto& quirks = ps_config.quirks();
   enable_on_demand_dumps_ =
       (std::find(quirks.begin(), quirks.end(),
                  ProcessStatsConfig::DISABLE_ON_DEMAND) == quirks.end());
+  poll_period_ms_ = ps_config.proc_stats_poll_ms();
 }
 
 ProcessStatsDataSource::~ProcessStatsDataSource() = default;
@@ -90,6 +102,11 @@
 void ProcessStatsDataSource::Start() {
   if (dump_all_procs_on_start_)
     WriteAllProcesses();
+
+  if (poll_period_ms_) {
+    auto weak_this = GetWeakPtr();
+    task_runner_->PostTask(std::bind(&ProcessStatsDataSource::Tick, weak_this));
+  }
 }
 
 base::WeakPtr<ProcessStatsDataSource> ProcessStatsDataSource::GetWeakPtr()
@@ -99,11 +116,9 @@
 
 void ProcessStatsDataSource::WriteAllProcesses() {
   PERFETTO_DCHECK(!cur_ps_tree_);
-  base::ScopedDir proc_dir(opendir("/proc"));
-  if (!proc_dir) {
-    PERFETTO_PLOG("Failed to opendir(/proc)");
+  base::ScopedDir proc_dir = OpenProcDir();
+  if (!proc_dir)
     return;
-  }
   while (int32_t pid = ReadNextNumericDir(*proc_dir)) {
     WriteProcessOrThread(pid);
     char task_path[255];
@@ -117,10 +132,12 @@
       WriteProcessOrThread(tid);
     }
   }
-  FinalizeCurPsTree();
+  FinalizeCurPacket();
 }
 
 void ProcessStatsDataSource::OnPids(const std::vector<int32_t>& pids) {
+  PERFETTO_METATRACE("OnPids", 0);
+
   if (!enable_on_demand_dumps_)
     return;
   PERFETTO_DCHECK(!cur_ps_tree_);
@@ -129,13 +146,13 @@
       continue;
     WriteProcessOrThread(pid);
   }
-  FinalizeCurPsTree();
+  FinalizeCurPacket();
 }
 
 void ProcessStatsDataSource::Flush() {
   // We shouldn't get this in the middle of WriteAllProcesses() or OnPids().
   PERFETTO_DCHECK(!cur_ps_tree_);
-
+  PERFETTO_DCHECK(!cur_ps_stats_);
   writer_->Flush();
 }
 
@@ -184,6 +201,13 @@
   seen_pids_.emplace(tid);
 }
 
+base::ScopedDir ProcessStatsDataSource::OpenProcDir() {
+  base::ScopedDir proc_dir(opendir("/proc"));
+  if (!proc_dir)
+    PERFETTO_PLOG("Failed to opendir(/proc)");
+  return proc_dir;
+}
+
 std::string ProcessStatsDataSource::ReadProcPidFile(int32_t pid,
                                                     const std::string& file) {
   std::string contents;
@@ -207,21 +231,163 @@
   return buf.substr(begin, end - begin);
 }
 
+void ProcessStatsDataSource::StartNewPacketIfNeeded() {
+  if (cur_packet_)
+    return;
+  cur_packet_ = writer_->NewTracePacket();
+  uint64_t now = static_cast<uint64_t>(base::GetBootTimeNs().count());
+  cur_packet_->set_timestamp(now);
+}
+
 protos::pbzero::ProcessTree* ProcessStatsDataSource::GetOrCreatePsTree() {
-  if (!cur_ps_tree_) {
-    cur_packet_ = writer_->NewTracePacket();
+  StartNewPacketIfNeeded();
+  if (!cur_ps_tree_)
     cur_ps_tree_ = cur_packet_->set_process_tree();
-  }
+  cur_ps_stats_ = nullptr;
   return cur_ps_tree_;
 }
 
-void ProcessStatsDataSource::FinalizeCurPsTree() {
-  if (!cur_ps_tree_) {
-    PERFETTO_DCHECK(!cur_packet_);
-    return;
-  }
+protos::pbzero::ProcessStats* ProcessStatsDataSource::GetOrCreateStats() {
+  StartNewPacketIfNeeded();
+  if (!cur_ps_stats_)
+    cur_ps_stats_ = cur_packet_->set_process_stats();
   cur_ps_tree_ = nullptr;
+  return cur_ps_stats_;
+}
+
+void ProcessStatsDataSource::FinalizeCurPacket() {
+  PERFETTO_DCHECK(!cur_ps_tree_ || cur_packet_);
+  PERFETTO_DCHECK(!cur_ps_stats_ || cur_packet_);
+  cur_ps_tree_ = nullptr;
+  cur_ps_stats_ = nullptr;
   cur_packet_ = TraceWriter::TracePacketHandle{};
 }
 
+// static
+void ProcessStatsDataSource::Tick(
+    base::WeakPtr<ProcessStatsDataSource> weak_this) {
+  if (!weak_this)
+    return;
+  ProcessStatsDataSource& thiz = *weak_this;
+  uint32_t period_ms = thiz.poll_period_ms_;
+  uint32_t delay_ms = period_ms - (base::GetWallTimeMs().count() % period_ms);
+  thiz.task_runner_->PostDelayedTask(
+      std::bind(&ProcessStatsDataSource::Tick, weak_this), delay_ms);
+  thiz.WriteAllProcessStats();
+}
+
+void ProcessStatsDataSource::WriteAllProcessStats() {
+  // TODO(primiano): implement whitelisting of processes by names.
+  // TODO(primiano): Have a pid cache to avoid wasting cycles reading kthreads
+  // proc files over and over. Same for non-whitelist processes (see above).
+
+  PERFETTO_METATRACE("WriteAllProcessStats", 0);
+  base::ScopedDir proc_dir = OpenProcDir();
+  if (!proc_dir)
+    return;
+  std::vector<int32_t> pids;
+  while (int32_t pid = ReadNextNumericDir(*proc_dir)) {
+    std::string proc_status = ReadProcPidFile(pid, "status");
+    if (proc_status.empty())
+      continue;
+    if (!WriteProcessStats(pid, proc_status))
+      continue;
+    pids.push_back(pid);
+  }
+  FinalizeCurPacket();
+
+  // Ensure that we write once long-term process info (e.g., name) for new pids
+  // that we haven't seen before.
+  OnPids(pids);
+}
+
+// Returns true if the stats for the given |pid| have been written, false it
+// it failed (e.g., |pid| was a kernel thread and, as such, didn't report any
+// memory counters).
+bool ProcessStatsDataSource::WriteProcessStats(int32_t pid,
+                                               const std::string& proc_status) {
+  // The MemCounters entry for a process is created lazily on the first call.
+  // This is to prevent creating empty entries that have only a pid for
+  // kernel threads and other /proc/[pid] entries that have no counters
+  // associated.
+  bool proc_status_has_mem_counters = false;
+  protos::pbzero::ProcessStats::MemCounters* mem_counters = nullptr;
+  auto get_counters_lazy = [this, &mem_counters, pid] {
+    if (!mem_counters) {
+      mem_counters = GetOrCreateStats()->add_mem_counters();
+      mem_counters->set_pid(pid);
+    }
+    return mem_counters;
+  };
+
+  // Parse /proc/[pid]/status, which looks like this:
+  // Name:   cat
+  // Umask:  0027
+  // State:  R (running)
+  // FDSize: 256
+  // Groups: 4 20 24 46 997
+  // VmPeak:     5992 kB
+  // VmSize:     5992 kB
+  // VmLck:         0 kB
+  // ...
+  std::vector<char> key;
+  std::vector<char> value;
+  enum { kKey, kSeparator, kValue } state = kKey;
+  for (char c : proc_status) {
+    if (c == '\n') {
+      key.push_back('\0');
+      value.push_back('\0');
+
+      // |value| will contain "1234 KB". We rely on strtol() (in ToU32()) to
+      // stop parsing at the first non-numeric character.
+      if (strcmp(key.data(), "VmSize") == 0) {
+        // Assume that if we see VmSize we'll see also the others.
+        proc_status_has_mem_counters = true;
+        get_counters_lazy()->set_vm_size_kb(ToU32(value.data()));
+      } else if (strcmp(key.data(), "VmLck") == 0) {
+        get_counters_lazy()->set_vm_locked_kb(ToU32(value.data()));
+      } else if (strcmp(key.data(), "VmHWM") == 0) {
+        get_counters_lazy()->set_vm_hwm_kb(ToU32(value.data()));
+      } else if (strcmp(key.data(), "VmRSS") == 0) {
+        get_counters_lazy()->set_vm_rss_kb(ToU32(value.data()));
+      } else if (strcmp(key.data(), "RssAnon") == 0) {
+        get_counters_lazy()->set_rss_anon_kb(ToU32(value.data()));
+      } else if (strcmp(key.data(), "RssFile") == 0) {
+        get_counters_lazy()->set_rss_file_kb(ToU32(value.data()));
+      } else if (strcmp(key.data(), "RssShmem") == 0) {
+        get_counters_lazy()->set_rss_shmem_kb(ToU32(value.data()));
+      } else if (strcmp(key.data(), "VmSwap") == 0) {
+        get_counters_lazy()->set_vm_swap_kb(ToU32(value.data()));
+      }
+
+      key.clear();
+      state = kKey;
+      continue;
+    }
+
+    if (state == kKey) {
+      if (c == ':') {
+        state = kSeparator;
+        continue;
+      }
+      key.push_back(c);
+      continue;
+    }
+
+    if (state == kSeparator) {
+      if (isspace(c))
+        continue;
+      value.clear();
+      value.push_back(c);
+      state = kValue;
+      continue;
+    }
+
+    if (state == kValue) {
+      value.push_back(c);
+    }
+  }
+  return proc_status_has_mem_counters;
+}
+
 }  // namespace perfetto
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 57f0324..fb5590b 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -21,6 +21,7 @@
 #include <set>
 #include <vector>
 
+#include "perfetto/base/scoped_file.h"
 #include "perfetto/base/weak_ptr.h"
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/data_source_config.h"
@@ -29,9 +30,14 @@
 
 namespace perfetto {
 
+namespace base {
+class TaskRunner;
+}
+
 namespace protos {
 namespace pbzero {
 class ProcessTree;
+class ProcessStats;
 }  // namespace pbzero
 }  // namespace protos
 
@@ -39,7 +45,8 @@
  public:
   static constexpr int kTypeId = 3;
 
-  ProcessStatsDataSource(TracingSessionID,
+  ProcessStatsDataSource(base::TaskRunner*,
+                         TracingSessionID,
                          std::unique_ptr<TraceWriter> writer,
                          const DataSourceConfig&);
   ~ProcessStatsDataSource() override;
@@ -48,27 +55,43 @@
   void WriteAllProcesses();
   void OnPids(const std::vector<int32_t>& pids);
 
-  // Virtual for testing.
-  virtual std::string ReadProcPidFile(int32_t pid, const std::string& file);
-
   // ProbesDataSource implementation.
   void Start() override;
   void Flush() override;
 
+  bool on_demand_dumps_enabled() const { return enable_on_demand_dumps_; }
+
+  // Virtual for testing.
+  virtual base::ScopedDir OpenProcDir();
+  virtual std::string ReadProcPidFile(int32_t pid, const std::string& file);
+
  private:
+  // Common functions.
   ProcessStatsDataSource(const ProcessStatsDataSource&) = delete;
   ProcessStatsDataSource& operator=(const ProcessStatsDataSource&) = delete;
 
+  void StartNewPacketIfNeeded();
+  void FinalizeCurPacket();
+  protos::pbzero::ProcessTree* GetOrCreatePsTree();
+  protos::pbzero::ProcessStats* GetOrCreateStats();
+
+  // Functions for snapshotting process/thread long-term info and relationships.
   void WriteProcess(int32_t pid, const std::string& proc_status);
   void WriteThread(int32_t tid, int32_t tgid, const std::string& proc_status);
   void WriteProcessOrThread(int32_t pid);
   std::string ReadProcStatusEntry(const std::string& buf, const char* key);
 
-  protos::pbzero::ProcessTree* GetOrCreatePsTree();
-  void FinalizeCurPsTree();
+  // Functions for periodically sampling process stats/counters.
+  static void Tick(base::WeakPtr<ProcessStatsDataSource>);
+  void WriteAllProcessStats();
+  bool WriteProcessStats(int32_t pid, const std::string& proc_status);
 
+  // Common fields used for both process/tree relationships and stats/counters.
+  base::TaskRunner* const task_runner_;
   std::unique_ptr<TraceWriter> writer_;
   TraceWriter::TracePacketHandle cur_packet_;
+
+  // Fields for keeping track of the state of process/tree relationships.
   protos::pbzero::ProcessTree* cur_ps_tree_ = nullptr;
   bool record_thread_names_ = false;
   bool enable_on_demand_dumps_ = true;
@@ -77,9 +100,12 @@
   // This set contains PIDs as per the Linux kernel notion of a PID (which is
   // really a TID). In practice this set will contain all TIDs for all processes
   // seen, not just the main thread id (aka thread group ID).
-  // TODO(b/76663469): Optimization: use a bitmap.
   std::set<int32_t> seen_pids_;
 
+  // Fields for keeping track of the periodic stats/counters.
+  uint32_t poll_period_ms_ = 0;
+  protos::pbzero::ProcessStats* cur_ps_stats_ = nullptr;
+
   base::WeakPtrFactory<ProcessStatsDataSource> weak_factory_;  // Keep last.
 };
 
diff --git a/src/traced/probes/ps/process_stats_data_source_unittest.cc b/src/traced/probes/ps/process_stats_data_source_unittest.cc
index 98e0191..4587cda 100644
--- a/src/traced/probes/ps/process_stats_data_source_unittest.cc
+++ b/src/traced/probes/ps/process_stats_data_source_unittest.cc
@@ -15,27 +15,36 @@
  */
 
 #include "src/traced/probes/ps/process_stats_data_source.h"
-#include "gmock/gmock.h"
-#include "gtest/gtest.h"
-#include "perfetto/trace/trace_packet.pb.h"
-#include "perfetto/trace/trace_packet.pbzero.h"
+
+#include <dirent.h>
+
+#include "perfetto/base/temp_file.h"
+#include "src/base/test/test_task_runner.h"
 #include "src/tracing/core/trace_writer_for_testing.h"
 
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "perfetto/trace/trace_packet.pb.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+
 using ::testing::_;
+using ::testing::ElementsAreArray;
 using ::testing::Invoke;
 using ::testing::Return;
-using ::testing::ElementsAreArray;
 
 namespace perfetto {
 namespace {
 
 class TestProcessStatsDataSource : public ProcessStatsDataSource {
  public:
-  TestProcessStatsDataSource(TracingSessionID id,
+  TestProcessStatsDataSource(base::TaskRunner* task_runner,
+                             TracingSessionID id,
                              std::unique_ptr<TraceWriter> writer,
                              const DataSourceConfig& config)
-      : ProcessStatsDataSource(id, std::move(writer), config) {}
+      : ProcessStatsDataSource(task_runner, id, std::move(writer), config) {}
 
+  MOCK_METHOD0(OpenProcDir, base::ScopedDir());
   MOCK_METHOD2(ReadProcPidFile, std::string(int32_t pid, const std::string&));
 };
 
@@ -43,16 +52,18 @@
  protected:
   ProcessStatsDataSourceTest() {}
 
-  TraceWriterForTesting* writer_raw_;
-
   std::unique_ptr<TestProcessStatsDataSource> GetProcessStatsDataSource(
       const DataSourceConfig& cfg) {
     auto writer =
         std::unique_ptr<TraceWriterForTesting>(new TraceWriterForTesting());
     writer_raw_ = writer.get();
     return std::unique_ptr<TestProcessStatsDataSource>(
-        new TestProcessStatsDataSource(0, std::move(writer), cfg));
+        new TestProcessStatsDataSource(&task_runner_, 0, std::move(writer),
+                                       cfg));
   }
+
+  base::TestTaskRunner task_runner_;
+  TraceWriterForTesting* writer_raw_;
 };
 
 TEST_F(ProcessStatsDataSourceTest, WriteOnceProcess) {
@@ -115,5 +126,70 @@
   }
 }
 
+TEST_F(ProcessStatsDataSourceTest, MemCounters) {
+  DataSourceConfig cfg;
+  cfg.mutable_process_stats_config()->set_proc_stats_poll_ms(1);
+  *(cfg.mutable_process_stats_config()->add_quirks()) =
+      perfetto::ProcessStatsConfig::DISABLE_ON_DEMAND;
+  auto data_source = GetProcessStatsDataSource(cfg);
+
+  // Populate a fake /proc/ directory.
+  auto fake_proc = base::TempDir::Create();
+  const int kPids[] = {1, 2};
+  std::vector<std::string> dirs_to_delete;
+  for (int pid : kPids) {
+    char path[256];
+    sprintf(path, "%s/%d", fake_proc.path().c_str(), pid);
+    dirs_to_delete.push_back(path);
+    mkdir(path, 0755);
+  }
+
+  auto checkpoint = task_runner_.CreateCheckpoint("all_done");
+
+  EXPECT_CALL(*data_source, OpenProcDir()).WillRepeatedly(Invoke([&fake_proc] {
+    return base::ScopedDir(opendir(fake_proc.path().c_str()));
+  }));
+
+  const int kNumIters = 4;
+  int iter = 0;
+  for (int pid : kPids) {
+    EXPECT_CALL(*data_source, ReadProcPidFile(pid, "status"))
+        .WillRepeatedly(Invoke([checkpoint, kPids, &iter](int32_t p,
+                                                          const std::string&) {
+          char ret[1024];
+          sprintf(ret, "Name:	pid_10\nVmSize:	 %d kB\nVmRSS:\t%d  kB\n",
+                  p * 100 + iter * 10 + 1, p * 100 + iter * 10 + 2);
+          if (p == kPids[base::ArraySize(kPids) - 1]) {
+            if (++iter == kNumIters)
+              checkpoint();
+          }
+          return std::string(ret);
+        }));
+  }
+
+  data_source->Start();
+  task_runner_.RunUntilCheckpoint("all_done");
+  data_source->Flush();
+
+  // |packet| will contain the merge of all kNumIter packets written.
+  std::unique_ptr<protos::TracePacket> packet = writer_raw_->ParseProto();
+  ASSERT_TRUE(packet);
+  ASSERT_TRUE(packet->has_process_stats());
+  const auto& ps_stats = packet->process_stats();
+  ASSERT_EQ(ps_stats.mem_counters_size(), kNumIters * base::ArraySize(kPids));
+  iter = 0;
+  for (const auto& proc_counters : ps_stats.mem_counters()) {
+    int32_t pid = proc_counters.pid();
+    ASSERT_EQ(proc_counters.vm_size_kb(), pid * 100 + iter * 10 + 1);
+    ASSERT_EQ(proc_counters.vm_rss_kb(), pid * 100 + iter * 10 + 2);
+    if (pid == kPids[base::ArraySize(kPids) - 1])
+      iter++;
+  }
+
+  // Cleanup |fake_proc|. TempDir checks that the directory is empty.
+  for (std::string& path : dirs_to_delete)
+    rmdir(path.c_str());
+}
+
 }  // namespace
 }  // namespace perfetto