service/producer: Track data source instance state, notify on start

Adds an OnDataSourceStarted notification to ProducerEndpoint and
modifies TracingServiceImpl to track state transitions of data source
instances explicitly, so we can later notify the consumer about them.

This will be used by aosp/923475.

Bug: 127948038
Change-Id: Ic8dc051550fc2b9d09af7a8327b17dc89cd4194a
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index 3731336..7346ff1 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -54,6 +54,9 @@
 
 // The tracing service business logic.
 class TracingServiceImpl : public TracingService {
+ private:
+  struct DataSourceInstance;
+
  public:
   static constexpr size_t kDefaultShmSize = 256 * 1024ul;
   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
@@ -83,6 +86,7 @@
     void SetSharedMemory(std::unique_ptr<SharedMemory>);
     std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
     void NotifyFlushComplete(FlushRequestID) override;
+    void NotifyDataSourceStarted(DataSourceInstanceID) override;
     void NotifyDataSourceStopped(DataSourceInstanceID) override;
     SharedMemory* shared_memory() const override;
     size_t shared_buffer_page_size_kb() const override;
@@ -171,6 +175,10 @@
     void Attach(const std::string& key) override;
     void GetTraceStats() override;
 
+    // TODO(eseckler): Notify consumer about the state change if necessary.
+    void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
+                                         const DataSourceInstance&) {}
+
    private:
     friend class TracingServiceImpl;
     ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
@@ -206,6 +214,7 @@
   void ApplyChunkPatches(ProducerID,
                          const std::vector<CommitDataRequest::ChunkToPatch>&);
   void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
+  void NotifyDataSourceStarted(ProducerID, const DataSourceInstanceID);
   void NotifyDataSourceStopped(ProducerID, const DataSourceInstanceID);
 
   // Called by ConsumerEndpointImpl.
@@ -260,18 +269,30 @@
     DataSourceInstance(DataSourceInstanceID id,
                        const DataSourceConfig& cfg,
                        const std::string& ds_name,
-                       bool notify)
+                       bool notify_on_start,
+                       bool notify_on_stop)
         : instance_id(id),
           config(cfg),
           data_source_name(ds_name),
-          will_notify_on_stop(notify) {}
+          will_notify_on_start(notify_on_start),
+          will_notify_on_stop(notify_on_stop) {}
     DataSourceInstance(const DataSourceInstance&) = delete;
     DataSourceInstance& operator=(const DataSourceInstance&) = delete;
 
     DataSourceInstanceID instance_id;
     DataSourceConfig config;
     std::string data_source_name;
+    bool will_notify_on_start;
     bool will_notify_on_stop;
+
+    enum DataSourceInstanceState {
+      CONFIGURED,
+      STARTING,
+      STARTED,
+      STOPPING,
+      STOPPED
+    };
+    DataSourceInstanceState state = CONFIGURED;
   };
 
   struct PendingFlush {
@@ -322,6 +343,27 @@
       return sequence_id;
     }
 
+    DataSourceInstance* GetDataSourceInstance(
+        ProducerID producer_id,
+        DataSourceInstanceID instance_id) {
+      for (auto& inst_kv : data_source_instances) {
+        if (inst_kv.first != producer_id ||
+            inst_kv.second.instance_id != instance_id) {
+          continue;
+        }
+        return &inst_kv.second;
+      }
+      return nullptr;
+    }
+
+    bool AllDataSourceInstancesStopped() {
+      for (const auto& inst_kv : data_source_instances) {
+        if (inst_kv.second.state != DataSourceInstance::STOPPED)
+          return false;
+      }
+      return true;
+    }
+
     const TracingSessionID id;
 
     // The consumer that started the session.
@@ -345,11 +387,6 @@
     // we are still awaiting a NotifyFlushComplete(N) ack.
     std::map<FlushRequestID, PendingFlush> pending_flushes;
 
-    // After DisableTracing() is called, this contains the subset of data
-    // sources that did set the |will_notify_on_stop| flag upon registration and
-    // that have the haven't replied yet to the stop request.
-    std::set<std::pair<ProducerID, DataSourceInstanceID>> pending_stop_acks;
-
     // Maps a per-trace-session buffer index into the corresponding global
     // BufferID (shared namespace amongst all consumers). This vector has as
     // many entries as |config.buffers_size()|.
@@ -408,6 +445,9 @@
   // shared memory and trace buffers.
   void UpdateMemoryGuardrail();
 
+  void StartDataSourceInstance(ProducerEndpointImpl* producer,
+                               TracingSession* tracing_session,
+                               DataSourceInstance* instance);
   void SnapshotSyncMarker(std::vector<TracePacket>*);
   void SnapshotClocks(std::vector<TracePacket>*);
   void SnapshotStats(TracingSession*, std::vector<TracePacket>*);