traced_perf: add memory guardrail (stops the DS if daemon above limit)

Normally, when a source is stopped, traced_perf:
* pauses the perf_events stream
* waits for reader to drain the outstanding samples
* waits for the unwinder to unwind them
* cleans up and acks the StopDataSource

Instead, this patch introduces an option for stopping the work abruptly,
immediately erasing the data source state from both the reader and
unwinder stages.

Note: the tracing service (traced) doesn't respect producer stop
notifications unless the service itself started the stop sequence. So we
can't just NotifyDataSourceStopped(), and will keep receiving flush &
stop request IPCs for the DS that we've forgotten about. I believe it's
safe to blindly ack unknown flushes and stops (as the service should be
accutrate about routing the IPCs in the first place). Do tell if I'm
overlooking an important detail though.

An alternative would've been to introduce a new DS status (kTombstoned
or similar), and propagate that knowledge until the StopDataSource IPC.
But I don't think that complexity is worth it.

Note: I went with a separate periodic task per guardrail'd DS. Having
concurrent sources should be rare, so it doesn't feel worth sharing the
mem footprint calculation cost. Also, this will allow us to introduce a
guardrail check period as part of the trace config, should we need that
flexibility.

Change-Id: I6d585e6728991141e1ab8bd0b8f6b2ad6fe6d5fc
diff --git a/Android.bp b/Android.bp
index 7ed5230..95f5392 100644
--- a/Android.bp
+++ b/Android.bp
@@ -9140,6 +9140,7 @@
     ":perfetto_src_profiling_common_interner",
     ":perfetto_src_profiling_common_interning_output",
     ":perfetto_src_profiling_common_proc_utils",
+    ":perfetto_src_profiling_common_profiler_guardrails",
     ":perfetto_src_profiling_common_unwind_support",
     ":perfetto_src_profiling_perf_common_types",
     ":perfetto_src_profiling_perf_proc_descriptors",
diff --git a/src/profiling/perf/BUILD.gn b/src/profiling/perf/BUILD.gn
index ead817e..a1b19ba 100644
--- a/src/profiling/perf/BUILD.gn
+++ b/src/profiling/perf/BUILD.gn
@@ -63,6 +63,7 @@
     "../common:interner",
     "../common:interning_output",
     "../common:proc_utils",
+    "../common:profiler_guardrails",
   ]
   sources = [
     "event_config.cc",
diff --git a/src/profiling/perf/event_config.cc b/src/profiling/perf/event_config.cc
index ccf8b0e..8051af7 100644
--- a/src/profiling/perf/event_config.cc
+++ b/src/profiling/perf/event_config.cc
@@ -108,9 +108,14 @@
 // static
 base::Optional<EventConfig> EventConfig::Create(
     const DataSourceConfig& ds_config) {
-  protos::pbzero::PerfEventConfig::Decoder pb_config(
+  protos::pbzero::PerfEventConfig::Decoder event_config_pb(
       ds_config.perf_event_config_raw());
+  return EventConfig::Create(event_config_pb);
+}
 
+// static
+base::Optional<EventConfig> EventConfig::Create(
+    const protos::pbzero::PerfEventConfig::Decoder& pb_config) {
   base::Optional<TargetFilter> filter = ParseTargetFilter(pb_config);
   if (!filter.has_value())
     return base::nullopt;
diff --git a/src/profiling/perf/event_config.h b/src/profiling/perf/event_config.h
index da77075..c667d39 100644
--- a/src/profiling/perf/event_config.h
+++ b/src/profiling/perf/event_config.h
@@ -48,6 +48,8 @@
 class EventConfig {
  public:
   static base::Optional<EventConfig> Create(const DataSourceConfig& ds_config);
+  static base::Optional<EventConfig> Create(
+      const protos::pbzero::PerfEventConfig::Decoder& ds_config);
 
   uint32_t target_all_cpus() const { return target_all_cpus_; }
   uint32_t ring_buffer_pages() const { return ring_buffer_pages_; }
diff --git a/src/profiling/perf/perf_producer.cc b/src/profiling/perf/perf_producer.cc
index 69b89a2..2b9f101 100644
--- a/src/profiling/perf/perf_producer.cc
+++ b/src/profiling/perf/perf_producer.cc
@@ -37,6 +37,7 @@
 #include "perfetto/tracing/core/data_source_descriptor.h"
 #include "src/profiling/common/callstack_trie.h"
 #include "src/profiling/common/proc_utils.h"
+#include "src/profiling/common/profiler_guardrails.h"
 #include "src/profiling/common/unwind_support.h"
 #include "src/profiling/perf/common_types.h"
 #include "src/profiling/perf/event_reader.h"
@@ -62,6 +63,8 @@
 // The proper fix is in the platform, see bug for progress.
 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
 
+constexpr uint32_t kMemoryLimitCheckPeriodMs = 5 * 1000;
+
 constexpr uint32_t kInitialConnectionBackoffMs = 100;
 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
 
@@ -198,14 +201,13 @@
 void PerfProducer::SetupDataSource(DataSourceInstanceID,
                                    const DataSourceConfig&) {}
 
-void PerfProducer::StartDataSource(DataSourceInstanceID instance_id,
+void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
                                    const DataSourceConfig& config) {
-  PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(instance_id),
+  PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(ds_id),
                config.name().c_str());
 
   if (config.name() == MetatraceWriter::kDataSourceName) {
-    StartMetatraceSource(instance_id,
-                         static_cast<BufferID>(config.target_buffer()));
+    StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer()));
     return;
   }
 
@@ -213,7 +215,10 @@
   if (config.name() != kDataSourceName)
     return;
 
-  base::Optional<EventConfig> event_config = EventConfig::Create(config);
+  protos::pbzero::PerfEventConfig::Decoder event_config_pb(
+      config.perf_event_config_raw());
+  base::Optional<EventConfig> event_config =
+      EventConfig::Create(event_config_pb);
   if (!event_config.has_value()) {
     PERFETTO_ELOG("PerfEventConfig rejected.");
     return;
@@ -245,7 +250,7 @@
   std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
   bool inserted;
   std::tie(ds_it, inserted) = data_sources_.emplace(
-      std::piecewise_construct, std::forward_as_tuple(instance_id),
+      std::piecewise_construct, std::forward_as_tuple(ds_id),
       std::forward_as_tuple(event_config.value(), std::move(writer),
                             std::move(per_cpu_readers)));
   PERFETTO_CHECK(inserted);
@@ -257,39 +262,81 @@
 
   // Inform unwinder of the new data source instance, and optionally start a
   // periodic task to clear its cached state.
-  unwinding_worker_->PostStartDataSource(instance_id,
+  unwinding_worker_->PostStartDataSource(ds_id,
                                          ds.event_config.kernel_frames());
   if (ds.event_config.unwind_state_clear_period_ms()) {
     unwinding_worker_->PostClearCachedStatePeriodic(
-        instance_id, ds.event_config.unwind_state_clear_period_ms());
+        ds_id, ds.event_config.unwind_state_clear_period_ms());
   }
 
   // Kick off periodic read task.
   auto tick_period_ms = ds.event_config.read_tick_period_ms();
   auto weak_this = weak_factory_.GetWeakPtr();
   task_runner_->PostDelayedTask(
-      [weak_this, instance_id] {
+      [weak_this, ds_id] {
         if (weak_this)
-          weak_this->TickDataSourceRead(instance_id);
+          weak_this->TickDataSourceRead(ds_id);
       },
-      TimeToNextReadTickMs(instance_id, tick_period_ms));
+      TimeToNextReadTickMs(ds_id, tick_period_ms));
+
+  // Optionally kick off periodic memory footprint limit check.
+  uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb();
+  if (max_daemon_memory_kb > 0) {
+    task_runner_->PostDelayedTask(
+        [weak_this, ds_id, max_daemon_memory_kb] {
+          if (weak_this)
+            weak_this->CheckMemoryFootprintPeriodic(ds_id,
+                                                    max_daemon_memory_kb);
+        },
+        kMemoryLimitCheckPeriodMs);
+  }
 }
 
-void PerfProducer::StopDataSource(DataSourceInstanceID instance_id) {
-  PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(instance_id));
+void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
+                                                uint32_t max_daemon_memory_kb) {
+  auto ds_it = data_sources_.find(ds_id);
+  if (ds_it == data_sources_.end())
+    return;  // stop recurring
+
+  GuardrailConfig gconfig = {};
+  gconfig.memory_guardrail_kb = max_daemon_memory_kb;
+
+  ProfilerMemoryGuardrails footprint_snapshot;
+  if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) {
+    PurgeDataSource(ds_id);
+    return;  // stop recurring
+  }
+
+  // repost
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, ds_id, max_daemon_memory_kb] {
+        if (weak_this)
+          weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb);
+      },
+      kMemoryLimitCheckPeriodMs);
+}
+
+void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) {
+  PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id));
 
   // Metatrace: stop immediately (will miss the events from the
   // asynchronous shutdown of the primary data source).
-  auto meta_it = metatrace_writers_.find(instance_id);
+  auto meta_it = metatrace_writers_.find(ds_id);
   if (meta_it != metatrace_writers_.end()) {
     meta_it->second.WriteAllAndFlushTraceWriter([] {});
     metatrace_writers_.erase(meta_it);
     return;
   }
 
-  auto ds_it = data_sources_.find(instance_id);
-  if (ds_it == data_sources_.end())
+  auto ds_it = data_sources_.find(ds_id);
+  if (ds_it == data_sources_.end()) {
+    // Most likely, the source is missing due to an abrupt stop (via
+    // |PurgeDataSource|). Tell the service that we've stopped the source now,
+    // so that it doesn't wait for the ack until the timeout.
+    endpoint_->NotifyDataSourceStopped(ds_id);
     return;
+  }
 
   // Start shutting down the reading frontend, which will propagate the stop
   // further as the intermediate buffers are cleared.
@@ -305,7 +352,7 @@
 void PerfProducer::Flush(FlushRequestID flush_id,
                          const DataSourceInstanceID* data_source_ids,
                          size_t num_data_sources) {
-  bool should_ack_flush = false;
+  // Flush metatracing if requested.
   for (size_t i = 0; i < num_data_sources; i++) {
     auto ds_id = data_source_ids[i];
     PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
@@ -313,14 +360,10 @@
     auto meta_it = metatrace_writers_.find(ds_id);
     if (meta_it != metatrace_writers_.end()) {
       meta_it->second.WriteAllAndFlushTraceWriter([] {});
-      should_ack_flush = true;
-    }
-    if (data_sources_.find(ds_id) != data_sources_.end()) {
-      should_ack_flush = true;
     }
   }
-  if (should_ack_flush)
-    endpoint_->NotifyFlushComplete(flush_id);
+
+  endpoint_->NotifyFlushComplete(flush_id);
 }
 
 void PerfProducer::ClearIncrementalState(
@@ -587,7 +630,11 @@
 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
                               CompletedSample sample) {
   auto ds_it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(ds_it != data_sources_.end());
+  if (ds_it == data_sources_.end()) {
+    PERFETTO_DLOG("EmitSample(ds: %zu): source gone",
+                  static_cast<size_t>(ds_id));
+    return;
+  }
   DataSourceState& ds = ds_it->second;
 
   // intern callsite
@@ -620,7 +667,8 @@
                                       size_t cpu,
                                       uint64_t records_lost) {
   auto ds_it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(ds_it != data_sources_.end());
+  if (ds_it == data_sources_.end())
+    return;
   DataSourceState& ds = ds_it->second;
   PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
                 static_cast<size_t>(ds_id), cpu, records_lost);
@@ -662,7 +710,8 @@
                                      ParsedSample sample,
                                      SampleSkipReason reason) {
   auto ds_it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(ds_it != data_sources_.end());
+  if (ds_it == data_sources_.end())
+    return;
   DataSourceState& ds = ds_it->second;
 
   auto packet = ds.trace_writer->NewTracePacket();
@@ -711,7 +760,11 @@
 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
   PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
   auto ds_it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(ds_it != data_sources_.end());
+  if (ds_it == data_sources_.end()) {
+    PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone",
+                  static_cast<size_t>(ds_id));
+    return;
+  }
   DataSourceState& ds = ds_it->second;
   PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
 
@@ -727,6 +780,46 @@
   }
 }
 
+// TODO(rsavitski): maybe make the tracing service respect premature
+// producer-driven stops, and then issue a NotifyDataSourceStopped here.
+// Alternatively (and at the expense of higher complexity) introduce a new data
+// source status of "tombstoned", and propagate it until the source is stopped
+// by the service (this would technically allow for stricter lifetime checking
+// of data sources, and help with discarding periodic flushes).
+// TODO(rsavitski): Purging while stopping will currently leave the stop
+// unacknowledged. Consider checking whether the DS is stopping here, and if so,
+// notifying immediately after erasing.
+void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) {
+  auto ds_it = data_sources_.find(ds_id);
+  if (ds_it == data_sources_.end())
+    return;
+  DataSourceState& ds = ds_it->second;
+
+  PERFETTO_LOG("Stopping DataSource(%zu) prematurely",
+               static_cast<size_t>(ds_id));
+
+  unwinding_worker_->PostPurgeDataSource(ds_id);
+
+  // Write a packet indicating the abrupt stop.
+  {
+    auto packet = ds.trace_writer->NewTracePacket();
+    packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
+    auto* perf_sample = packet->set_perf_sample();
+    auto* producer_event = perf_sample->set_producer_event();
+    producer_event->set_source_stop_reason(
+        protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL);
+  }
+
+  ds.trace_writer->Flush();
+  data_sources_.erase(ds_it);
+
+  // Clean up resources if there are no more active sources.
+  if (data_sources_.empty()) {
+    callstack_trie_.ClearTrie();  // purge internings
+    base::MaybeReleaseAllocatorMemToOS();
+  }
+}
+
 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
                                         BufferID target_buffer) {
   auto writer = endpoint_->CreateTraceWriter(target_buffer);
diff --git a/src/profiling/perf/perf_producer.h b/src/profiling/perf/perf_producer.h
index 8313ce3..5241cb8 100644
--- a/src/profiling/perf/perf_producer.h
+++ b/src/profiling/perf/perf_producer.h
@@ -202,6 +202,15 @@
   // Destroys the state belonging to this instance, and acks the stop to the
   // tracing service.
   void FinishDataSourceStop(DataSourceInstanceID ds_id);
+  // Immediately destroys the data source state, and instructs the unwinder to
+  // do the same. This is used for abrupt stops.
+  void PurgeDataSource(DataSourceInstanceID ds_id);
+
+  // Immediately stops the data source if this daemon's overall memory footprint
+  // is above the given threshold. This periodic task is started only for data
+  // sources that specify a limit.
+  void CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
+                                    uint32_t max_daemon_memory_kb);
 
   void StartMetatraceSource(DataSourceInstanceID ds_id, BufferID target_buffer);
 
diff --git a/src/profiling/perf/unwinding.cc b/src/profiling/perf/unwinding.cc
index b93e070..c6549e4 100644
--- a/src/profiling/perf/unwinding.cc
+++ b/src/profiling/perf/unwinding.cc
@@ -100,7 +100,8 @@
                 maps_fd.get(), mem_fd.get());
 
   auto it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(it != data_sources_.end());
+  if (it == data_sources_.end())
+    return;
   DataSourceState& ds = it->second;
 
   ProcessState& proc_state = ds.process_states[pid];  // insert if new
@@ -127,7 +128,8 @@
                 static_cast<size_t>(ds_id), static_cast<int>(pid));
 
   auto it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(it != data_sources_.end());
+  if (it == data_sources_.end())
+    return;
   DataSourceState& ds = it->second;
 
   ProcessState& proc_state = ds.process_states[pid];  // insert if new
@@ -210,8 +212,12 @@
     if (!entry.valid)
       continue;  // already processed
 
+    // Data source might be gone due to an abrupt stop.
     auto it = data_sources_.find(entry.data_source_id);
-    PERFETTO_CHECK(it != data_sources_.end());
+    if (it == data_sources_.end()) {
+      entry = UnwindEntry::Invalid();
+      continue;
+    }
     DataSourceState& ds = it->second;
 
     pid_t pid = entry.sample.pid;
@@ -447,7 +453,8 @@
                 static_cast<size_t>(ds_id));
 
   auto it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(it != data_sources_.end());
+  if (it == data_sources_.end())
+    return;
   DataSourceState& ds = it->second;
 
   PERFETTO_CHECK(ds.status == DataSourceState::Status::kActive);
@@ -464,7 +471,8 @@
                 static_cast<size_t>(ds_id));
 
   auto it = data_sources_.find(ds_id);
-  PERFETTO_CHECK(it != data_sources_.end());
+  if (it == data_sources_.end())
+    return;
   DataSourceState& ds = it->second;
 
   // Drop unwinder's state tied to the source.
@@ -481,6 +489,31 @@
   delegate_->PostFinishDataSourceStop(ds_id);
 }
 
+void Unwinder::PostPurgeDataSource(DataSourceInstanceID ds_id) {
+  task_runner_->PostTask([this, ds_id] { PurgeDataSource(ds_id); });
+}
+
+void Unwinder::PurgeDataSource(DataSourceInstanceID ds_id) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  PERFETTO_DLOG("Unwinder::PurgeDataSource(%zu)", static_cast<size_t>(ds_id));
+
+  auto it = data_sources_.find(ds_id);
+  if (it == data_sources_.end())
+    return;
+
+  data_sources_.erase(it);
+
+  // Clean up state if there are no more active sources.
+  if (data_sources_.empty()) {
+    kernel_symbolizer_.Destroy();
+    ResetAndEnableUnwindstackCache();
+    // Also purge scudo on Android, which would normally be done by the service
+    // thread in |FinishDataSourceStop|. This is important as most of the scudo
+    // overhead comes from libunwindstack.
+    base::MaybeReleaseAllocatorMemToOS();
+  }
+}
+
 void Unwinder::PostClearCachedStatePeriodic(DataSourceInstanceID ds_id,
                                             uint32_t period_ms) {
   task_runner_->PostDelayedTask(
diff --git a/src/profiling/perf/unwinding.h b/src/profiling/perf/unwinding.h
index b2ff717..650e721 100644
--- a/src/profiling/perf/unwinding.h
+++ b/src/profiling/perf/unwinding.h
@@ -95,6 +95,7 @@
   void PostRecordTimedOutProcDescriptors(DataSourceInstanceID ds_id, pid_t pid);
   void PostProcessQueue();
   void PostInitiateDataSourceStop(DataSourceInstanceID ds_id);
+  void PostPurgeDataSource(DataSourceInstanceID ds_id);
 
   void PostClearCachedStatePeriodic(DataSourceInstanceID ds_id,
                                     uint32_t period_ms);
@@ -167,6 +168,9 @@
   // sequence.
   void FinishDataSourceStop(DataSourceInstanceID ds_id);
 
+  // Immediately destroys the data source state, used for abrupt stops.
+  void PurgeDataSource(DataSourceInstanceID ds_id);
+
   // Clears the parsed maps for all previously-sampled processes, and resets the
   // libunwindstack cache. This has the effect of deallocating the cached Elf
   // objects within libunwindstack, which take up non-trivial amounts of memory.