Dump process data in trace.

This is now integrated. We are not doing anything with the data at the moment.

Bug:73058765
Change-Id: Ib141465f345f3629e15c5269bc024ef6fc503fe7
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
new file mode 100644
index 0000000..d3a96b9
--- /dev/null
+++ b/src/traced/probes/probes_producer.cc
@@ -0,0 +1,222 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/probes_producer.h"
+
+#include <stdio.h>
+#include <string>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/traced/traced.h"
+#include "perfetto/tracing/core/data_source_config.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
+
+#include "src/process_stats/file_utils.h"
+#include "src/process_stats/procfs_utils.h"
+
+#include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
+#include "perfetto/trace/ps/process_tree.pbzero.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace {
+
+uint64_t kInitialConnectionBackoffMs = 100;
+uint64_t kMaxConnectionBackoffMs = 30 * 1000;
+const char* kFtraceSourceName = "com.google.perfetto.ftrace";
+const char* kProcessStatsSourceName = "com.google.perfetto.process_stats";
+
+}  // namespace.
+
+// State transition diagram:
+//                    +----------------------------+
+//                    v                            +
+// NotStarted -> NotConnected -> Connecting -> Connected
+//                    ^              +
+//                    +--------------+
+//
+
+ProbesProducer::~ProbesProducer() = default;
+
+void ProbesProducer::OnConnect() {
+  PERFETTO_DCHECK(state_ == kConnecting);
+  state_ = kConnected;
+  ResetConnectionBackoff();
+  PERFETTO_LOG("Connected to the service");
+
+  DataSourceDescriptor ftrace_descriptor;
+  ftrace_descriptor.set_name(kFtraceSourceName);
+  endpoint_->RegisterDataSource(ftrace_descriptor,
+                                [](DataSourceInstanceID id) {});
+
+  DataSourceDescriptor process_stats_descriptor;
+  process_stats_descriptor.set_name(kProcessStatsSourceName);
+  endpoint_->RegisterDataSource(process_stats_descriptor,
+                                [](DataSourceInstanceID id) {});
+}
+
+void ProbesProducer::OnDisconnect() {
+  PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
+  state_ = kNotConnected;
+  PERFETTO_LOG("Disconnected from tracing service");
+  IncreaseConnectionBackoff();
+
+  // TODO(hjd): Erase all sinks and add e2e test for this.
+  task_runner_->PostDelayedTask([this] { this->Connect(); },
+                                connection_backoff_ms_);
+}
+
+void ProbesProducer::CreateDataSourceInstance(
+    DataSourceInstanceID id,
+    const DataSourceConfig& source_config) {
+  instances_[id] = source_config.name();
+  // PERFETTO_LOG("DataSourceInstanceID: %llu", id);
+  if (source_config.name() == kFtraceSourceName) {
+    CreateFtraceDataSourceInstance(id, source_config);
+  } else if (source_config.name() == kProcessStatsSourceName) {
+    CreateProcessStatsDataSourceInstance(source_config);
+  } else {
+    PERFETTO_ELOG("Data source name: %s not recognised.",
+                  source_config.name().c_str());
+  }
+}
+
+void ProbesProducer::CreateFtraceDataSourceInstance(
+    DataSourceInstanceID id,
+    const DataSourceConfig& source_config) {
+  // Don't retry if FtraceController::Create() failed once.
+  // This can legitimately happen on user builds where we cannot access the
+  // debug paths, e.g., because of SELinux rules.
+  if (ftrace_creation_failed_)
+    return;
+
+  // Lazily create on the first instance.
+  if (!ftrace_) {
+    ftrace_ = FtraceController::Create(task_runner_);
+
+    if (!ftrace_) {
+      PERFETTO_ELOG("Failed to create FtraceController");
+      ftrace_creation_failed_ = true;
+      return;
+    }
+
+    ftrace_->DisableAllEvents();
+    ftrace_->ClearTrace();
+  }
+
+  PERFETTO_LOG("Ftrace start (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
+               source_config.target_buffer());
+
+  // TODO(hjd): Would be nice if ftrace_reader could use the generated config.
+  DataSourceConfig::FtraceConfig proto_config = source_config.ftrace_config();
+
+  // TODO(hjd): Static cast is bad, target_buffer() should return a BufferID.
+  auto trace_writer = endpoint_->CreateTraceWriter(
+      static_cast<BufferID>(source_config.target_buffer()));
+  auto delegate =
+      std::unique_ptr<SinkDelegate>(new SinkDelegate(std::move(trace_writer)));
+  auto sink = ftrace_->CreateSink(std::move(proto_config), delegate.get());
+  PERFETTO_CHECK(sink);
+  delegate->sink(std::move(sink));
+  delegates_.emplace(id, std::move(delegate));
+}
+
+void ProbesProducer::CreateProcessStatsDataSourceInstance(
+    const DataSourceConfig& source_config) {
+  auto trace_writer = endpoint_->CreateTraceWriter(
+      static_cast<BufferID>(source_config.target_buffer()));
+  procfs_utils::ProcessMap processes;
+  auto trace_packet = trace_writer->NewTracePacket();
+  protos::pbzero::ProcessTree* process_tree = trace_packet->set_process_tree();
+
+  file_utils::ForEachPidInProcPath(
+      "/proc", [&processes, &process_tree](int pid) {
+        if (!processes.count(pid)) {
+          if (procfs_utils::ReadTgid(pid) != pid)
+            return;
+          processes[pid] = procfs_utils::ReadProcessInfo(pid);
+        }
+        ProcessInfo* process = processes[pid].get();
+        procfs_utils::ReadProcessThreads(process);
+        auto* process_writer = process_tree->add_processes();
+        process_writer->set_pid(process->pid);
+        process_writer->set_ppid(process->ppid);
+        process_writer->add_cmdline(process->cmdline);
+
+        for (auto& thread : process->threads) {
+          auto* thread_writer = process_writer->add_threads();
+          thread_writer->set_tid(thread.second.tid);
+          thread_writer->set_name(thread.second.name);
+        }
+      });
+  trace_packet->Finalize();
+}
+
+void ProbesProducer::TearDownDataSourceInstance(DataSourceInstanceID id) {
+  PERFETTO_LOG("Producer stop (id=%" PRIu64 ")", id);
+  PERFETTO_DCHECK(instances_.count(id));
+  if (instances_[id] == kFtraceSourceName) {
+    size_t removed = delegates_.erase(id);
+    PERFETTO_DCHECK(removed == 1);
+  }
+}
+
+void ProbesProducer::ConnectWithRetries(const char* socket_name,
+                                        base::TaskRunner* task_runner) {
+  PERFETTO_DCHECK(state_ == kNotStarted);
+  state_ = kNotConnected;
+
+  ResetConnectionBackoff();
+  socket_name_ = socket_name;
+  task_runner_ = task_runner;
+  Connect();
+}
+
+void ProbesProducer::Connect() {
+  PERFETTO_DCHECK(state_ == kNotConnected);
+  state_ = kConnecting;
+  endpoint_ = ProducerIPCClient::Connect(socket_name_, this, task_runner_);
+}
+
+void ProbesProducer::IncreaseConnectionBackoff() {
+  connection_backoff_ms_ *= 2;
+  if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
+    connection_backoff_ms_ = kMaxConnectionBackoffMs;
+}
+
+void ProbesProducer::ResetConnectionBackoff() {
+  connection_backoff_ms_ = kInitialConnectionBackoffMs;
+}
+
+ProbesProducer::SinkDelegate::SinkDelegate(std::unique_ptr<TraceWriter> writer)
+    : writer_(std::move(writer)) {}
+
+ProbesProducer::SinkDelegate::~SinkDelegate() = default;
+
+ProbesProducer::FtraceBundleHandle
+ProbesProducer::SinkDelegate::GetBundleForCpu(size_t) {
+  trace_packet_ = writer_->NewTracePacket();
+  return FtraceBundleHandle(trace_packet_->set_ftrace_events());
+}
+
+void ProbesProducer::SinkDelegate::OnBundleComplete(size_t,
+                                                    FtraceBundleHandle) {
+  trace_packet_->Finalize();
+}
+
+}  // namespace perfetto