Wait for producers to ACK the stop when disabling the trace.

Adds a new field to the DataSourceDescriptor, will_notify_on_stop,
that is advertised by producers on registration.
That fields tells that a NotifyDataSourceStopped() should be expected
by the tracing service when tracing is disabled.
In turn, on the Consumer endpoint, this changes slightly the behavior
of the OnTracingDisabled() callback. If one or more data source did
register setting the aforementioned flag, DisableTracing() will wait
that those data sources have acked the stop, until a max timeout of
5 seconds. This allows the consumer to linearize with the producers
and ensure that they fully drain all their data on stop.

Bug: 111660094
Change-Id: Ida832ba09c2fc66c651b88b3d285647dbfca68a8
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 34628e4..f482ff8 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -527,4 +527,80 @@
                         Property(&protos::TestEvent::str, Eq("payload")))));
 }
 
+// Creates a tracing session where some of the data sources set the
+// |will_notify_on_stop| flag and checks that the OnTracingDisabled notification
+// to the consumer is delayed until the acks are received.
+TEST_F(TracingServiceImplTest, OnTracingDisabledWaitsForDataSourceStopAcks) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("ds_will_ack_1", /*ack_stop=*/true);
+  producer->RegisterDataSource("ds_wont_ack");
+  producer->RegisterDataSource("ds_will_ack_2", /*ack_stop=*/true);
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  trace_config.add_data_sources()->mutable_config()->set_name("ds_will_ack_1");
+  trace_config.add_data_sources()->mutable_config()->set_name("ds_wont_ack");
+  trace_config.add_data_sources()->mutable_config()->set_name("ds_will_ack_2");
+  trace_config.set_duration_ms(1);
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("ds_will_ack_1");
+  producer->WaitForDataSourceStart("ds_wont_ack");
+  producer->WaitForDataSourceStart("ds_will_ack_2");
+
+  std::unique_ptr<TraceWriter> writer =
+      producer->CreateTraceWriter("ds_wont_ack");
+  producer->WaitForFlush(writer.get());
+
+  DataSourceInstanceID id1 = producer->GetDataSourceInstanceId("ds_will_ack_1");
+  DataSourceInstanceID id2 = producer->GetDataSourceInstanceId("ds_will_ack_2");
+
+  producer->WaitForDataSourceStop("ds_will_ack_1");
+  producer->WaitForDataSourceStop("ds_wont_ack");
+  producer->WaitForDataSourceStop("ds_will_ack_2");
+
+  producer->endpoint()->NotifyDataSourceStopped(id1);
+  producer->endpoint()->NotifyDataSourceStopped(id2);
+
+  // Wait for at most half of the service timeout, so that this test fails if
+  // the service falls back on calling the OnTracingDisabled() because some of
+  // the expected acks weren't received.
+  consumer->WaitForTracingDisabled(
+      TracingServiceImpl::kDataSourceStopTimeoutMs / 2);
+}
+
+// Similar to OnTracingDisabledWaitsForDataSourceStopAcks, but deliberately
+// skips the ack and checks that the service invokes the OnTracingDisabled()
+// after the timeout.
+TEST_F(TracingServiceImplTest, OnTracingDisabledCalledAnywaysInCaseOfTimeout) {
+  svc->override_data_source_test_timeout_ms_for_testing = 1;
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source", /*ack_stop=*/true);
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  trace_config.add_data_sources()->mutable_config()->set_name("data_source");
+  trace_config.set_duration_ms(1);
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  std::unique_ptr<TraceWriter> writer =
+      producer->CreateTraceWriter("data_source");
+  producer->WaitForFlush(writer.get());
+
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+}
+
 }  // namespace perfetto