Wait for producers to ACK the stop when disabling the trace.

Adds a new field to the DataSourceDescriptor, will_notify_on_stop,
that is advertised by producers on registration.
That fields tells that a NotifyDataSourceStopped() should be expected
by the tracing service when tracing is disabled.
In turn, on the Consumer endpoint, this changes slightly the behavior
of the OnTracingDisabled() callback. If one or more data source did
register setting the aforementioned flag, DisableTracing() will wait
that those data sources have acked the stop, until a max timeout of
5 seconds. This allows the consumer to linearize with the producers
and ensure that they fully drain all their data on stop.

Bug: 111660094
Change-Id: Ida832ba09c2fc66c651b88b3d285647dbfca68a8
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index 9cc8ca5..a4dd848 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -55,6 +55,7 @@
  public:
   static constexpr size_t kDefaultShmSize = 256 * 1024ul;
   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
+  static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
 
   // The implementation behind the service endpoint exposed to each producer.
   class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
@@ -72,17 +73,18 @@
     void UnregisterDataSource(const std::string& name) override;
     void CommitData(const CommitDataRequest&, CommitDataCallback) override;
     void SetSharedMemory(std::unique_ptr<SharedMemory>);
-
     std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
-    void OnTracingSetup();
-    void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
-    void CreateDataSourceInstance(DataSourceInstanceID,
-                                  const DataSourceConfig&);
     void NotifyFlushComplete(FlushRequestID) override;
-    void TearDownDataSource(DataSourceInstanceID);
+    void NotifyDataSourceStopped(DataSourceInstanceID) override;
     SharedMemory* shared_memory() const override;
     size_t shared_buffer_page_size_kb() const override;
 
+    void OnTracingSetup();
+    void CreateDataSourceInstance(DataSourceInstanceID,
+                                  const DataSourceConfig&);
+    void TearDownDataSource(DataSourceInstanceID);
+    void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
+
    private:
     friend class TracingServiceImpl;
     friend class TracingServiceImplTest;
@@ -156,13 +158,14 @@
   void ApplyChunkPatches(ProducerID,
                          const std::vector<CommitDataRequest::ChunkToPatch>&);
   void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
+  void NotifyDataSourceStopped(ProducerID, const DataSourceInstanceID);
 
   // Called by ConsumerEndpointImpl.
   void DisconnectConsumer(ConsumerEndpointImpl*);
   bool EnableTracing(ConsumerEndpointImpl*,
                      const TraceConfig&,
                      base::ScopedFile);
-  void DisableTracing(TracingSessionID);
+  void DisableTracing(TracingSessionID, bool disable_immediately = false);
   void Flush(TracingSessionID tsid,
              uint32_t timeout_ms,
              ConsumerEndpoint::FlushCallback);
@@ -184,6 +187,8 @@
   size_t num_producers() const { return producers_.size(); }
   ProducerEndpointImpl* GetProducer(ProducerID) const;
 
+  uint32_t override_data_source_test_timeout_ms_for_testing = 0;
+
  private:
   friend class TracingServiceImplTest;
 
@@ -196,6 +201,7 @@
   struct DataSourceInstance {
     DataSourceInstanceID instance_id;
     std::string data_source_name;
+    bool will_notify_on_stop;
   };
 
   struct PendingFlush {
@@ -207,7 +213,9 @@
   // Holds the state of a tracing session. A tracing session is uniquely bound
   // a specific Consumer. Each Consumer can own one or more sessions.
   struct TracingSession {
-    TracingSession(ConsumerEndpointImpl*, const TraceConfig&);
+    enum State { DISABLED = 0, ENABLED, DISABLING_WAITING_STOP_ACKS };
+
+    TracingSession(TracingSessionID, ConsumerEndpointImpl*, const TraceConfig&);
 
     size_t num_buffers() const { return buffers_index.size(); }
 
@@ -217,6 +225,8 @@
              (base::GetWallTimeMs().count() % write_period_ms);
     }
 
+    const TracingSessionID id;
+
     // The consumer that started the session.
     ConsumerEndpointImpl* const consumer;
 
@@ -232,6 +242,11 @@
     // we are still awaiting a NotifyFlushComplete(N) ack.
     std::map<FlushRequestID, PendingFlush> pending_flushes;
 
+    // After DisableTracing() is called, this contains the subset of data
+    // sources that did set the |will_notify_on_stop| flag upon registration and
+    // that have the haven't replied yet to the stop request.
+    std::set<std::pair<ProducerID, DataSourceInstanceID>> pending_stop_acks;
+
     // Maps a per-trace-session buffer index into the corresponding global
     // BufferID (shared namespace amongst all consumers). This vector has as
     // many entries as |config.buffers_size()|.
@@ -246,7 +261,7 @@
     // Whether we mirrored the trace config back to the trace output yet.
     bool did_emit_config = false;
 
-    bool tracing_enabled = false;
+    State state = DISABLED;
 
     // This is set when the Consumer calls sets |write_into_file| == true in the
     // TraceConfig. In this case this represents the file we should stream the
@@ -281,6 +296,8 @@
   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
   void MaybeSnapshotStats(TracingSession*, std::vector<TracePacket>*);
   void OnFlushTimeout(TracingSessionID, FlushRequestID);
+  void OnDisableTracingTimeout(TracingSessionID);
+  void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
   TraceBuffer* GetBufferByID(BufferID);
 
   base::TaskRunner* const task_runner_;