service: Add support for ObservableEvents.

Adds a Consumer-facing interface for observable events.
Implements a first such event for data source instance state changes.

Bug: 127948038
Change-Id: Ia05cfef7289eb0237b14f0fb7823f63aec7ee95d
diff --git a/src/tracing/core/tracing_service_impl_unittest.cc b/src/tracing/core/tracing_service_impl_unittest.cc
index 7ea63d8..857efa6 100644
--- a/src/tracing/core/tracing_service_impl_unittest.cc
+++ b/src/tracing/core/tracing_service_impl_unittest.cc
@@ -1623,4 +1623,104 @@
   consumer->WaitForTracingDisabled();
 }
 
+TEST_F(TracingServiceImplTest, ObserveEventsDataSourceInstances) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  // Start tracing before the consumer is interested in events. The consumer's
+  // OnObservableEvents() should not be called yet.
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  // Calling ObserveEvents should cause an event for the initial instance state.
+  consumer->ObserveEvents(TracingService::ConsumerEndpoint::
+                              ObservableEventType::kDataSourceInstances);
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STARTED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  // Disabling should cause an instance state change to STOPPED.
+  consumer->DisableTracing();
+
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STOPPED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+  consumer->FreeBuffers();
+
+  // Enable again, this should cause a state change for a new instance to
+  // its initial state STOPPED.
+  trace_config.set_deferred_start(true);
+  consumer->EnableTracing(trace_config);
+
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STOPPED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  producer->WaitForDataSourceSetup("data_source");
+
+  // Should move the instance into STARTED state and thus cause an event.
+  consumer->StartTracing();
+
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STARTED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  producer->WaitForDataSourceStart("data_source");
+
+  // Stop observing events.
+  consumer->ObserveEvents(
+      TracingService::ConsumerEndpoint::ObservableEventType::kNone);
+
+  // Disabling should now no longer cause events to be sent to the consumer.
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+}
+
 }  // namespace perfetto