Make flush async in probes producer

This is in preparation for the ftrace flush.
This CL doesn't do anthing other than making the
existing flush request asynchronous, allowing
producers to complete it out of line.
No change is introduced to ftrace yet. A
separate CL will deal with that.

Bug: 73886018
Change-Id: Ib140b33319af8174b181a1f74f93b12e6465d05f
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index c309180..b7d666c 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -47,6 +47,10 @@
 
 constexpr uint32_t kInitialConnectionBackoffMs = 100;
 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
+
+// Should be larger than FtraceController::kFlushTimeoutMs.
+constexpr uint32_t kFlushTimeoutMs = 1000;
+
 constexpr char kFtraceSourceName[] = "linux.ftrace";
 constexpr char kProcessStatsSourceName[] = "linux.process_stats";
 constexpr char kInodeMapSourceName[] = "linux.inode_file_map";
@@ -280,12 +284,64 @@
 void ProbesProducer::Flush(FlushRequestID flush_request_id,
                            const DataSourceInstanceID* data_source_ids,
                            size_t num_data_sources) {
+  PERFETTO_DCHECK(flush_request_id);
+  auto weak_this = weak_factory_.GetWeakPtr();
+
+  // Issue a Flush() to all started data sources.
+  bool flush_queued = false;
   for (size_t i = 0; i < num_data_sources; i++) {
-    auto it = data_sources_.find(data_source_ids[i]);
+    DataSourceInstanceID ds_id = data_source_ids[i];
+    auto it = data_sources_.find(ds_id);
     if (it == data_sources_.end() || !it->second->started)
       continue;
-    it->second->Flush();
+    pending_flushes_.emplace(flush_request_id, ds_id);
+    flush_queued = true;
+    auto flush_callback = [weak_this, flush_request_id, ds_id] {
+      if (weak_this)
+        weak_this->OnDataSourceFlushComplete(flush_request_id, ds_id);
+    };
+    it->second->Flush(flush_request_id, flush_callback);
   }
+
+  // If there is nothing to flush, ack immediately.
+  if (!flush_queued) {
+    endpoint_->NotifyFlushComplete(flush_request_id);
+    return;
+  }
+
+  // Otherwise, post the timeout task.
+  task_runner_->PostDelayedTask(
+      [weak_this, flush_request_id] {
+        if (weak_this)
+          weak_this->OnFlushTimeout(flush_request_id);
+      },
+      kFlushTimeoutMs);
+}
+
+void ProbesProducer::OnDataSourceFlushComplete(FlushRequestID flush_request_id,
+                                               DataSourceInstanceID ds_id) {
+  PERFETTO_DLOG("Flush %" PRIu64 " acked by data source %" PRIu64,
+                flush_request_id, ds_id);
+  auto range = pending_flushes_.equal_range(flush_request_id);
+  for (auto it = range.first; it != range.second; it++) {
+    if (it->second == ds_id) {
+      pending_flushes_.erase(it);
+      break;
+    }
+  }
+
+  if (pending_flushes_.count(flush_request_id))
+    return;  // Still waiting for other data sources to ack.
+
+  PERFETTO_DLOG("All data sources acked to flush %" PRIu64, flush_request_id);
+  endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
+void ProbesProducer::OnFlushTimeout(FlushRequestID flush_request_id) {
+  if (pending_flushes_.count(flush_request_id) == 0)
+    return;  // All acked.
+  PERFETTO_ELOG("Flush(%" PRIu64 ") timed out", flush_request_id);
+  pending_flushes_.erase(flush_request_id);
   endpoint_->NotifyFlushComplete(flush_request_id);
 }