Merge "Ensure that we always notify observers about data sources stopping."
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index d7e6c38..994e1da 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -829,22 +829,13 @@
   for (auto& data_source_inst : tracing_session->data_source_instances) {
     const ProducerID producer_id = data_source_inst.first;
     DataSourceInstance& instance = data_source_inst.second;
-    const DataSourceInstanceID ds_inst_id = instance.instance_id;
     ProducerEndpointImpl* producer = GetProducer(producer_id);
     PERFETTO_DCHECK(producer);
     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
                     instance.state == DataSourceInstance::STARTING ||
                     instance.state == DataSourceInstance::STARTED);
-    if (instance.will_notify_on_stop && !disable_immediately) {
-      instance.state = DataSourceInstance::STOPPING;
-    } else {
-      instance.state = DataSourceInstance::STOPPED;
-    }
-    if (tracing_session->consumer_maybe_null) {
-      tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
-          *producer, instance);
-    }
-    producer->StopDataSource(ds_inst_id);
+    StopDataSourceInstance(producer, tracing_session, &instance,
+                           disable_immediately);
   }
 
   // Either this request is flagged with |disable_immediately| or there are no
@@ -917,8 +908,6 @@
                     instance->state);
       continue;
     }
-    PERFETTO_DCHECK(tracing_session.state ==
-                    TracingSession::DISABLING_WAITING_STOP_ACKS);
 
     instance->state = DataSourceInstance::STOPPED;
 
@@ -932,6 +921,9 @@
     if (!tracing_session.AllDataSourceInstancesStopped())
       continue;
 
+    if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
+      continue;
+
     // All data sources acked the termination.
     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
   }  // for (tracing_session)
@@ -1764,9 +1756,28 @@
   }
 }
 
+void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
+                                                TracingSession* tracing_session,
+                                                DataSourceInstance* instance,
+                                                bool disable_immediately) {
+  const DataSourceInstanceID ds_inst_id = instance->instance_id;
+  if (instance->will_notify_on_stop && !disable_immediately) {
+    instance->state = DataSourceInstance::STOPPING;
+  } else {
+    instance->state = DataSourceInstance::STOPPED;
+  }
+  if (tracing_session->consumer_maybe_null) {
+    tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
+        *producer, *instance);
+  }
+  producer->StopDataSource(ds_inst_id);
+}
+
 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
                                               const std::string& name) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
+  PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
+                producer_id, name.c_str());
   PERFETTO_CHECK(producer_id);
   ProducerEndpointImpl* producer = GetProducer(producer_id);
   PERFETTO_DCHECK(producer);
@@ -1777,7 +1788,8 @@
         DataSourceInstanceID ds_inst_id = it->second.instance_id;
         if (it->second.state != DataSourceInstance::STOPPED) {
           if (it->second.state != DataSourceInstance::STOPPING)
-            producer->StopDataSource(ds_inst_id);
+            StopDataSourceInstance(producer, &kv.second, &it->second,
+                                   /* disable_immediately = */ false);
           // Mark the instance as stopped immediately, since we are
           // unregistering it below.
           if (it->second.state == DataSourceInstance::STOPPING)
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index 5c196d0..57f504e 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -519,6 +519,10 @@
   void StartDataSourceInstance(ProducerEndpointImpl* producer,
                                TracingSession* tracing_session,
                                DataSourceInstance* instance);
+  void StopDataSourceInstance(ProducerEndpointImpl* producer,
+                              TracingSession* tracing_session,
+                              DataSourceInstance* instance,
+                              bool disable_immediately);
   void SnapshotSyncMarker(std::vector<TracePacket>*);
   void SnapshotClocks(std::vector<TracePacket>*, bool set_root_timestamp);
   void SnapshotStats(TracingSession*, std::vector<TracePacket>*);
diff --git a/src/tracing/core/tracing_service_impl_unittest.cc b/src/tracing/core/tracing_service_impl_unittest.cc
index 4648d25..7a26c2a 100644
--- a/src/tracing/core/tracing_service_impl_unittest.cc
+++ b/src/tracing/core/tracing_service_impl_unittest.cc
@@ -2775,6 +2775,63 @@
   consumer->WaitForTracingDisabled();
 }
 
+TEST_F(TracingServiceImplTest, ObserveEventsDataSourceInstancesUnregister) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  // Start tracing before the consumer is interested in events. The consumer's
+  // OnObservableEvents() should not be called yet.
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  // Calling ObserveEvents should cause an event for the initial instance state.
+  consumer->ObserveEvents(TracingService::ConsumerEndpoint::
+                              ObservableEventType::kDataSourceInstances);
+  {
+    ObservableEvents event;
+    ObservableEvents::DataSourceInstanceStateChange* change =
+        event.add_instance_state_changes();
+    change->set_producer_name("mock_producer");
+    change->set_data_source_name("data_source");
+    change->set_state(ObservableEvents::DataSourceInstanceStateChange::
+                          DATA_SOURCE_INSTANCE_STATE_STARTED);
+    EXPECT_CALL(*consumer, OnObservableEvents(Eq(event)))
+        .WillOnce(InvokeWithoutArgs(
+            task_runner.CreateCheckpoint("data_source_started")));
+
+    task_runner.RunUntilCheckpoint("data_source_started");
+  }
+  {
+    ObservableEvents event;
+    ObservableEvents::DataSourceInstanceStateChange* change =
+        event.add_instance_state_changes();
+    change->set_producer_name("mock_producer");
+    change->set_data_source_name("data_source");
+    change->set_state(ObservableEvents::DataSourceInstanceStateChange::
+                          DATA_SOURCE_INSTANCE_STATE_STOPPED);
+    EXPECT_CALL(*consumer, OnObservableEvents(Eq(event)))
+        .WillOnce(InvokeWithoutArgs(
+            task_runner.CreateCheckpoint("data_source_stopped")));
+  }
+  producer->UnregisterDataSource("data_source");
+  producer->WaitForDataSourceStop("data_source");
+  task_runner.RunUntilCheckpoint("data_source_stopped");
+
+  consumer->DisableTracing();
+  consumer->WaitForTracingDisabled();
+}
+
 TEST_F(TracingServiceImplTest, QueryServiceState) {
   std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
   consumer->Connect(svc.get());