Ensure that we always notify observers about data sources stopping.

Currently if you unregister a data source this does not call
NotifyStateChange if we have a consumer that is observing events.

Change-Id: I19b447fd60b61993ac61b4669989f57cfe8a64d8
diff --git a/src/tracing/core/tracing_service_impl_unittest.cc b/src/tracing/core/tracing_service_impl_unittest.cc
index 4648d25..7a26c2a 100644
--- a/src/tracing/core/tracing_service_impl_unittest.cc
+++ b/src/tracing/core/tracing_service_impl_unittest.cc
@@ -2775,6 +2775,63 @@
   consumer->WaitForTracingDisabled();
 }
 
+TEST_F(TracingServiceImplTest, ObserveEventsDataSourceInstancesUnregister) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  // Start tracing before the consumer is interested in events. The consumer's
+  // OnObservableEvents() should not be called yet.
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  // Calling ObserveEvents should cause an event for the initial instance state.
+  consumer->ObserveEvents(TracingService::ConsumerEndpoint::
+                              ObservableEventType::kDataSourceInstances);
+  {
+    ObservableEvents event;
+    ObservableEvents::DataSourceInstanceStateChange* change =
+        event.add_instance_state_changes();
+    change->set_producer_name("mock_producer");
+    change->set_data_source_name("data_source");
+    change->set_state(ObservableEvents::DataSourceInstanceStateChange::
+                          DATA_SOURCE_INSTANCE_STATE_STARTED);
+    EXPECT_CALL(*consumer, OnObservableEvents(Eq(event)))
+        .WillOnce(InvokeWithoutArgs(
+            task_runner.CreateCheckpoint("data_source_started")));
+
+    task_runner.RunUntilCheckpoint("data_source_started");
+  }
+  {
+    ObservableEvents event;
+    ObservableEvents::DataSourceInstanceStateChange* change =
+        event.add_instance_state_changes();
+    change->set_producer_name("mock_producer");
+    change->set_data_source_name("data_source");
+    change->set_state(ObservableEvents::DataSourceInstanceStateChange::
+                          DATA_SOURCE_INSTANCE_STATE_STOPPED);
+    EXPECT_CALL(*consumer, OnObservableEvents(Eq(event)))
+        .WillOnce(InvokeWithoutArgs(
+            task_runner.CreateCheckpoint("data_source_stopped")));
+  }
+  producer->UnregisterDataSource("data_source");
+  producer->WaitForDataSourceStop("data_source");
+  task_runner.RunUntilCheckpoint("data_source_stopped");
+
+  consumer->DisableTracing();
+  consumer->WaitForTracingDisabled();
+}
+
 TEST_F(TracingServiceImplTest, QueryServiceState) {
   std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
   consumer->Connect(svc.get());