Make flush async in probes producer

This is in preparation for the ftrace flush.
This CL doesn't do anthing other than making the
existing flush request asynchronous, allowing
producers to complete it out of line.
No change is introduced to ftrace yet. A
separate CL will deal with that.

Bug: 73886018
Change-Id: Ib140b33319af8174b181a1f74f93b12e6465d05f
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index 9d3a098..02eed04 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -182,9 +182,10 @@
     PERFETTO_DLOG("%" PRIu64 " inodes found in cache", cache_found_count);
 }
 
-void InodeFileDataSource::Flush() {
+void InodeFileDataSource::Flush(FlushRequestID,
+                                std::function<void()> callback) {
   ResetTracePacket();
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void InodeFileDataSource::OnInodes(
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 143df66..2afdeed 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -85,7 +85,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
  protected:
   std::multimap<BlockDeviceID, std::string> mount_points_;
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index d7bc843..0272cd4 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -65,7 +65,7 @@
     controller_weak_->DumpFtraceStats(stats);
 }
 
-void FtraceDataSource::Flush() {
+void FtraceDataSource::Flush(FlushRequestID, std::function<void()> callback) {
   // TODO(primiano): this still doesn't flush data from the kernel ftrace
   // buffers (see b/73886018). We should do that and delay the
   // NotifyFlushComplete() until the ftrace data has been drained from the
@@ -73,7 +73,7 @@
   if (!writer_)
     return;
   WriteStats();
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void FtraceDataSource::WriteStats() {
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index e576a5a..f15c1d1 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -67,7 +67,7 @@
 
   // Flushes the ftrace buffers into the userspace trace buffers and writes
   // also ftrace stats.
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   FtraceConfigId config_id() const { return config_id_; }
   const FtraceConfig& config() const { return config_; }
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
index 13d47e1..1e0e8cb 100644
--- a/src/traced/probes/probes_data_source.h
+++ b/src/traced/probes/probes_data_source.h
@@ -17,6 +17,8 @@
 #ifndef SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
 #define SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
 
+#include <functional>
+
 #include "perfetto/tracing/core/basic_types.h"
 
 namespace perfetto {
@@ -29,7 +31,7 @@
   virtual ~ProbesDataSource();
 
   virtual void Start() = 0;
-  virtual void Flush() = 0;
+  virtual void Flush(FlushRequestID, std::function<void()> callback) = 0;
 
   const TracingSessionID tracing_session_id;
   const int type_id;
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index c309180..b7d666c 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -47,6 +47,10 @@
 
 constexpr uint32_t kInitialConnectionBackoffMs = 100;
 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
+
+// Should be larger than FtraceController::kFlushTimeoutMs.
+constexpr uint32_t kFlushTimeoutMs = 1000;
+
 constexpr char kFtraceSourceName[] = "linux.ftrace";
 constexpr char kProcessStatsSourceName[] = "linux.process_stats";
 constexpr char kInodeMapSourceName[] = "linux.inode_file_map";
@@ -280,12 +284,64 @@
 void ProbesProducer::Flush(FlushRequestID flush_request_id,
                            const DataSourceInstanceID* data_source_ids,
                            size_t num_data_sources) {
+  PERFETTO_DCHECK(flush_request_id);
+  auto weak_this = weak_factory_.GetWeakPtr();
+
+  // Issue a Flush() to all started data sources.
+  bool flush_queued = false;
   for (size_t i = 0; i < num_data_sources; i++) {
-    auto it = data_sources_.find(data_source_ids[i]);
+    DataSourceInstanceID ds_id = data_source_ids[i];
+    auto it = data_sources_.find(ds_id);
     if (it == data_sources_.end() || !it->second->started)
       continue;
-    it->second->Flush();
+    pending_flushes_.emplace(flush_request_id, ds_id);
+    flush_queued = true;
+    auto flush_callback = [weak_this, flush_request_id, ds_id] {
+      if (weak_this)
+        weak_this->OnDataSourceFlushComplete(flush_request_id, ds_id);
+    };
+    it->second->Flush(flush_request_id, flush_callback);
   }
+
+  // If there is nothing to flush, ack immediately.
+  if (!flush_queued) {
+    endpoint_->NotifyFlushComplete(flush_request_id);
+    return;
+  }
+
+  // Otherwise, post the timeout task.
+  task_runner_->PostDelayedTask(
+      [weak_this, flush_request_id] {
+        if (weak_this)
+          weak_this->OnFlushTimeout(flush_request_id);
+      },
+      kFlushTimeoutMs);
+}
+
+void ProbesProducer::OnDataSourceFlushComplete(FlushRequestID flush_request_id,
+                                               DataSourceInstanceID ds_id) {
+  PERFETTO_DLOG("Flush %" PRIu64 " acked by data source %" PRIu64,
+                flush_request_id, ds_id);
+  auto range = pending_flushes_.equal_range(flush_request_id);
+  for (auto it = range.first; it != range.second; it++) {
+    if (it->second == ds_id) {
+      pending_flushes_.erase(it);
+      break;
+    }
+  }
+
+  if (pending_flushes_.count(flush_request_id))
+    return;  // Still waiting for other data sources to ack.
+
+  PERFETTO_DLOG("All data sources acked to flush %" PRIu64, flush_request_id);
+  endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
+void ProbesProducer::OnFlushTimeout(FlushRequestID flush_request_id) {
+  if (pending_flushes_.count(flush_request_id) == 0)
+    return;  // All acked.
+  PERFETTO_ELOG("Flush(%" PRIu64 ") timed out", flush_request_id);
+  pending_flushes_.erase(flush_request_id);
   endpoint_->NotifyFlushComplete(flush_request_id);
 }
 
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 14737d6..44ac19e 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -95,6 +95,8 @@
   void Restart();
   void ResetConnectionBackoff();
   void IncreaseConnectionBackoff();
+  void OnDataSourceFlushComplete(FlushRequestID, DataSourceInstanceID);
+  void OnFlushTimeout(FlushRequestID);
 
   State state_ = kNotStarted;
   base::TaskRunner* task_runner_ = nullptr;
@@ -112,6 +114,9 @@
   std::unordered_multimap<TracingSessionID, ProbesDataSource*>
       session_data_sources_;
 
+  std::unordered_multimap<FlushRequestID, DataSourceInstanceID>
+      pending_flushes_;
+
   std::unordered_map<DataSourceInstanceID, base::Watchdog::Timer> watchdogs_;
   LRUInodeCache cache_{kLRUInodeCacheSize};
   std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index ddff880..858f25f 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -161,11 +161,12 @@
   FinalizeCurPacket();
 }
 
-void ProcessStatsDataSource::Flush() {
+void ProcessStatsDataSource::Flush(FlushRequestID,
+                                   std::function<void()> callback) {
   // We shouldn't get this in the middle of WriteAllProcesses() or OnPids().
   PERFETTO_DCHECK(!cur_ps_tree_);
   PERFETTO_DCHECK(!cur_ps_stats_);
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void ProcessStatsDataSource::WriteProcessOrThread(int32_t pid) {
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 65b6e2e..f5329cf 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -57,7 +57,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   bool on_demand_dumps_enabled() const { return enable_on_demand_dumps_; }
 
diff --git a/src/traced/probes/ps/process_stats_data_source_unittest.cc b/src/traced/probes/ps/process_stats_data_source_unittest.cc
index 4587cda..b985bcc 100644
--- a/src/traced/probes/ps/process_stats_data_source_unittest.cc
+++ b/src/traced/probes/ps/process_stats_data_source_unittest.cc
@@ -169,7 +169,7 @@
 
   data_source->Start();
   task_runner_.RunUntilCheckpoint("all_done");
-  data_source->Flush();
+  data_source->Flush(1 /* FlushRequestId */, []() {});
 
   // |packet| will contain the merge of all kNumIter packets written.
   std::unique_ptr<protos::TracePacket> packet = writer_raw_->ParseProto();
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.cc b/src/traced/probes/sys_stats/sys_stats_data_source.cc
index 61d869d..befe396 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.cc
@@ -297,8 +297,8 @@
   return weak_factory_.GetWeakPtr();
 }
 
-void SysStatsDataSource::Flush() {
-  writer_->Flush();
+void SysStatsDataSource::Flush(FlushRequestID, std::function<void()> callback) {
+  writer_->Flush(callback);
 }
 
 size_t SysStatsDataSource::ReadFile(base::ScopedFile* fd, const char* path) {
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.h b/src/traced/probes/sys_stats/sys_stats_data_source.h
index bdb5e10..0fa085c 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.h
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.h
@@ -56,7 +56,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   base::WeakPtr<SysStatsDataSource> GetWeakPtr() const;
 
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index bfdf1f4..0d0e3e2 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -68,10 +68,12 @@
   if (cur_chunk_.is_valid()) {
     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
                                          &patch_list_);
-    shmem_arbiter_->FlushPendingCommitDataRequests(callback);
   } else {
     PERFETTO_DCHECK(patch_list_.empty());
   }
+  // Always issue the Flush request, even if there is nothing to flush, just
+  // for the sake of getting the callback posted back.
+  shmem_arbiter_->FlushPendingCommitDataRequests(callback);
   protobuf_stream_writer_.Reset({nullptr, nullptr});
 }