Add Flush support to Producer and expose API to Consumer

Background
----------
Before this CL, when tracing is disabled we might lose the last
trace packets produced near the end of the trace. This happens
because the service sees data in the SMB only when it's notified
by the producer via a CommitData() request. In turn, in the current
implementation, a Producer commits data only when the chunk is full.
Hence the last chunk for each data source will never been seen by
the service when disabling tracing. This causes partial data losses.

Changes
-------
This CL introduces a Flush() signaling mechanism as follows:
3) Introduces a flush request from the service to producers. Producers
   are supposed to flush all their trace writers in response to that.
2) Introduces a flush request from the consumer to the service. This allows
   the consumer to request all data to be committed at any time (not just
   while disabling tracing). The flush request has also a callback
   that is invoked by the service only after all producers have acked the
   flush (so the consumer is guaranteed that the next ReadBuffers will contain
   the result of the flush), or after a given timeout (so that the callback
   doesn't hang forever).
3) Causes time-based traces (which are handled by the service) to
   automatically flush before disabling tracing.

This CL does NOT fix the underlying problem in the ftrace reader
(see bug 73886018) where, because of the splice, we don't see the
latest trace event even on the producer side. Fixing this problem
requires the Flush() to abort the splice in ftrace's CpuReader and
commit the data (see TODO in SinkDelegate::Flush()).

Change-Id: Ifdec9f241b2ff98a7b4925b02fdd0beb16942e0e
Bug: 77684460
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index 8aa0981..65a9ca4 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -75,8 +75,10 @@
 
     std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
     void OnTracingSetup();
+    void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
     void CreateDataSourceInstance(DataSourceInstanceID,
                                   const DataSourceConfig&);
+    void NotifyFlushComplete(FlushRequestID) override;
     void TearDownDataSource(DataSourceInstanceID);
     SharedMemory* shared_memory() const override;
     size_t shared_buffer_page_size_kb() const override;
@@ -119,6 +121,7 @@
     void DisableTracing() override;
     void ReadBuffers() override;
     void FreeBuffers() override;
+    void Flush(int timeout_ms, FlushCallback) override;
 
    private:
     friend class ServiceImpl;
@@ -152,6 +155,7 @@
                                      size_t size);
   void ApplyChunkPatches(ProducerID,
                          const std::vector<CommitDataRequest::ChunkToPatch>&);
+  void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
 
   // Called by ConsumerEndpointImpl.
   void DisconnectConsumer(ConsumerEndpointImpl*);
@@ -159,6 +163,10 @@
                      const TraceConfig&,
                      base::ScopedFile);
   void DisableTracing(TracingSessionID);
+  void Flush(TracingSessionID tsid,
+             int timeout_ms,
+             ConsumerEndpoint::FlushCallback);
+  void FlushAndDisableTracing(TracingSessionID);
   void ReadBuffers(TracingSessionID, ConsumerEndpointImpl*);
   void FreeBuffers(TracingSessionID);
 
@@ -190,6 +198,12 @@
     std::string data_source_name;
   };
 
+  struct PendingFlush {
+    std::set<ProducerID> producers;
+    ConsumerEndpoint::FlushCallback callback;
+    explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
+  };
+
   // Holds the state of a tracing session. A tracing session is uniquely bound
   // a specific Consumer. Each Consumer can own one or more sessions.
   struct TracingSession {
@@ -214,6 +228,10 @@
     // producers for this tracing session.
     std::multimap<ProducerID, DataSourceInstance> data_source_instances;
 
+    // For each Flush(N) request, keeps track of the set of producers for which
+    // we are still awaiting a NotifyFlushComplete(N) ack.
+    std::map<FlushRequestID, PendingFlush> pending_flushes;
+
     // Maps a per-trace-session buffer index into the corresponding global
     // BufferID (shared namespace amongst all consumers). This vector has as
     // many entries as |config.buffers_size()|.
@@ -258,7 +276,7 @@
 
   void MaybeSnapshotClocks(TracingSession*, std::vector<TracePacket>*);
   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
-
+  void OnFlushTimeout(TracingSessionID, FlushRequestID);
   TraceBuffer* GetBufferByID(BufferID);
 
   base::TaskRunner* const task_runner_;
@@ -266,17 +284,14 @@
   ProducerID last_producer_id_ = 0;
   DataSourceInstanceID last_data_source_instance_id_ = 0;
   TracingSessionID last_tracing_session_id_ = 0;
+  FlushRequestID last_flush_request_id_ = 0;
 
   // Buffer IDs are global across all consumers (because a Producer can produce
   // data for more than one trace session, hence more than one consumer).
   IdAllocator<BufferID> buffer_ids_;
 
   std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
-
-  // TODO(primiano): There doesn't seem to be any good reason why |producers_|
-  // is a map indexed by ID and not just a set<ProducerEndpointImpl*>.
   std::map<ProducerID, ProducerEndpointImpl*> producers_;
-
   std::set<ConsumerEndpointImpl*> consumers_;
   std::map<TracingSessionID, TracingSession> tracing_sessions_;
   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;