Add Flush support to Producer and expose API to Consumer

Background
----------
Before this CL, when tracing is disabled we might lose the last
trace packets produced near the end of the trace. This happens
because the service sees data in the SMB only when it's notified
by the producer via a CommitData() request. In turn, in the current
implementation, a Producer commits data only when the chunk is full.
Hence the last chunk for each data source will never been seen by
the service when disabling tracing. This causes partial data losses.

Changes
-------
This CL introduces a Flush() signaling mechanism as follows:
3) Introduces a flush request from the service to producers. Producers
   are supposed to flush all their trace writers in response to that.
2) Introduces a flush request from the consumer to the service. This allows
   the consumer to request all data to be committed at any time (not just
   while disabling tracing). The flush request has also a callback
   that is invoked by the service only after all producers have acked the
   flush (so the consumer is guaranteed that the next ReadBuffers will contain
   the result of the flush), or after a given timeout (so that the callback
   doesn't hang forever).
3) Causes time-based traces (which are handled by the service) to
   automatically flush before disabling tracing.

This CL does NOT fix the underlying problem in the ftrace reader
(see bug 73886018) where, because of the splice, we don't see the
latest trace event even on the producer side. Fixing this problem
requires the Flush() to abort the splice in ftrace's CpuReader and
commit the data (see TODO in SinkDelegate::Flush()).

Change-Id: Ifdec9f241b2ff98a7b4925b02fdd0beb16942e0e
Bug: 77684460
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index abdbf07..d37f41d 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -251,6 +251,30 @@
 
 void ProbesProducer::OnTracingSetup() {}
 
+void ProbesProducer::Flush(FlushRequestID flush_request_id,
+                           const DataSourceInstanceID* data_source_ids,
+                           size_t num_data_sources) {
+  for (size_t i = 0; i < num_data_sources; i++) {
+    DataSourceInstanceID ds_id = data_source_ids[i];
+    {
+      auto it = process_stats_sources_.find(ds_id);
+      if (it != process_stats_sources_.end())
+        it->second->Flush();
+    }
+    {
+      auto it = file_map_sources_.find(ds_id);
+      if (it != file_map_sources_.end())
+        it->second->Flush();
+    }
+    {
+      auto it = delegates_.find(ds_id);
+      if (it != delegates_.end())
+        it->second->Flush();
+    }
+  }
+  endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
 void ProbesProducer::ConnectWithRetries(const char* socket_name,
                                         base::TaskRunner* task_runner) {
   PERFETTO_DCHECK(state_ == kNotStarted);
@@ -289,6 +313,15 @@
 
 ProbesProducer::SinkDelegate::~SinkDelegate() = default;
 
+void ProbesProducer::SinkDelegate::Flush() {
+  // TODO(primiano): this still doesn't flush data from the kernel ftrace
+  // buffers (see b/73886018). We should do that and delay the
+  // NotifyFlushComplete() until the ftrace data has been drained from the
+  // kernel ftrace buffer and written in the SMB.
+  if (writer_ && (!trace_packet_ || trace_packet_->is_finalized()))
+    writer_->Flush();
+}
+
 ProbesProducer::FtraceBundleHandle
 ProbesProducer::SinkDelegate::GetBundleForCpu(size_t) {
   trace_packet_ = writer_->NewTracePacket();