service: Scrape SMBs on flush, disable and producer disconnect

Scrape chunks from the SMBs that have been started but haven't yet been
committed by the producers (i.e. they are either completed or currently
being written to). We do this in three locations:
- When flushing, e.g. to handle unresponsive producers or producers
  that are unable to flush their active chunks.
- When a producer disconnects, e.g. to handle crashed producers.
- After the session was disabled, e.g. to gather data from producers
  that didn't stop their data sources in time.

Bug: 73828976
Bug: 117310221
Change-Id: I4ed5394b6c47ea35a4999e4ff95b469337a5d273
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index cbf2da6..c679c52 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -31,6 +31,7 @@
 #include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/core/trace_writer.h"
 #include "src/base/test/test_task_runner.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
 #include "src/tracing/core/trace_writer_impl.h"
 #include "src/tracing/test/mock_consumer.h"
 #include "src/tracing/test/mock_producer.h"
@@ -101,6 +102,11 @@
     return svc->GetProducer(producer_id)->writers_;
   }
 
+  std::unique_ptr<SharedMemoryArbiterImpl> TakeShmemArbiterForProducer(
+      ProducerID producer_id) {
+    return std::move(svc->GetProducer(producer_id)->inproc_shmem_arbiter_);
+  }
+
   size_t GetNumPendingFlushes() {
     return tracing_session()->pending_flushes.size();
   }
@@ -567,10 +573,10 @@
 
   // Make the producer reply only to the 3rd flush request.
   testing::InSequence seq;
-  producer->WaitForFlush(nullptr);       // Will NOT reply to flush id == 1.
-  producer->WaitForFlush(nullptr);       // Will NOT reply to flush id == 2.
-  producer->WaitForFlush(writer.get());  // Will reply only to flush id == 3.
-  producer->WaitForFlush(nullptr);       // Will NOT reply to flush id == 4.
+  producer->WaitForFlush(nullptr, /*reply=*/false);  // Do NOT reply to flush 1.
+  producer->WaitForFlush(nullptr, /*reply=*/false);  // Do NOT reply to flush 2.
+  producer->WaitForFlush(writer.get());              // Reply only to flush 3.
+  producer->WaitForFlush(nullptr, /*reply=*/false);  // Do NOT reply to flush 4.
 
   // Even if the producer explicily replied only to flush ID == 3, all the
   // previous flushed < 3 should be implicitly acked.
@@ -1142,4 +1148,209 @@
                            Property(&protos::TestEvent::str, Eq("payload")))));
 }
 
+TEST_F(TracingServiceImplTest, ScrapeBuffersOnFlush) {
+  svc->SetSMBScrapingEnabled(true);
+
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  ProducerID producer_id = *last_producer_id();
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+  ds_config->set_target_buffer(0);
+  consumer->EnableTracing(trace_config);
+
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  // Calling StartTracing() should be a noop (% a DLOG statement) because the
+  // trace config didn't have the |deferred_start| flag set.
+  consumer->StartTracing();
+
+  std::unique_ptr<TraceWriter> writer = producer->endpoint()->CreateTraceWriter(
+      tracing_session()->buffers_index[0]);
+  WaitForTraceWritersChanged(producer_id);
+
+  // Write a few trace packets.
+  writer->NewTracePacket()->set_for_testing()->set_str("payload1");
+  writer->NewTracePacket()->set_for_testing()->set_str("payload2");
+  writer->NewTracePacket()->set_for_testing()->set_str("payload3");
+
+  // Flush but don't actually flush the chunk from TraceWriter.
+  auto flush_request = consumer->Flush();
+  producer->WaitForFlush(nullptr, /*reply=*/true);
+  ASSERT_TRUE(flush_request.WaitForReply());
+
+  // Chunk with the packets should have been scraped. The service can't know
+  // whether the last packet was completed, so shouldn't read it.
+  auto packets = consumer->ReadBuffers();
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload1")))));
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload2")))));
+  EXPECT_THAT(packets, Not(Contains(Property(&protos::TracePacket::for_testing,
+                                             Property(&protos::TestEvent::str,
+                                                      Eq("payload3"))))));
+
+  // Write some more packets.
+  writer->NewTracePacket()->set_for_testing()->set_str("payload4");
+  writer->NewTracePacket()->set_for_testing()->set_str("payload5");
+
+  // Don't reply to flush, causing a timeout. This should scrape again.
+  flush_request = consumer->Flush(/*timeout=*/100);
+  producer->WaitForFlush(nullptr, /*reply=*/false);
+  ASSERT_FALSE(flush_request.WaitForReply());
+
+  // Chunk with the packets should have been scraped again, overriding the
+  // original one. Again, the last packet should be ignored and the first two
+  // should not be read twice.
+  packets = consumer->ReadBuffers();
+  EXPECT_THAT(packets, Not(Contains(Property(&protos::TracePacket::for_testing,
+                                             Property(&protos::TestEvent::str,
+                                                      Eq("payload1"))))));
+  EXPECT_THAT(packets, Not(Contains(Property(&protos::TracePacket::for_testing,
+                                             Property(&protos::TestEvent::str,
+                                                      Eq("payload2"))))));
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload3")))));
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload4")))));
+  EXPECT_THAT(packets, Not(Contains(Property(&protos::TracePacket::for_testing,
+                                             Property(&protos::TestEvent::str,
+                                                      Eq("payload5"))))));
+
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+}
+
+// Test scraping on producer disconnect.
+TEST_F(TracingServiceImplTest, ScrapeBuffersOnProducerDisconnect) {
+  svc->SetSMBScrapingEnabled(true);
+
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  ProducerID producer_id = *last_producer_id();
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+  ds_config->set_target_buffer(0);
+  consumer->EnableTracing(trace_config);
+
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  // Calling StartTracing() should be a noop (% a DLOG statement) because the
+  // trace config didn't have the |deferred_start| flag set.
+  consumer->StartTracing();
+
+  std::unique_ptr<TraceWriter> writer = producer->endpoint()->CreateTraceWriter(
+      tracing_session()->buffers_index[0]);
+  WaitForTraceWritersChanged(producer_id);
+
+  // Write a few trace packets.
+  writer->NewTracePacket()->set_for_testing()->set_str("payload1");
+  writer->NewTracePacket()->set_for_testing()->set_str("payload2");
+  writer->NewTracePacket()->set_for_testing()->set_str("payload3");
+
+  // Disconnect the producer without committing the chunk. This should cause a
+  // scrape of the SMB. Avoid destroying the ShmemArbiter until writer is
+  // destroyed.
+  auto shmem_arbiter = TakeShmemArbiterForProducer(producer_id);
+  producer.reset();
+
+  // Chunk with the packets should have been scraped. The service can't know
+  // whether the last packet was completed, so shouldn't read it.
+  auto packets = consumer->ReadBuffers();
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload1")))));
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload2")))));
+  EXPECT_THAT(packets, Not(Contains(Property(&protos::TracePacket::for_testing,
+                                             Property(&protos::TestEvent::str,
+                                                      Eq("payload3"))))));
+
+  // Cleanup writer without causing a crash because the producer already went
+  // away.
+  static_cast<TraceWriterImpl*>(writer.get())->ResetChunkForTesting();
+  writer.reset();
+  shmem_arbiter.reset();
+
+  consumer->DisableTracing();
+  consumer->WaitForTracingDisabled();
+}
+
+TEST_F(TracingServiceImplTest, ScrapeBuffersOnDisable) {
+  svc->SetSMBScrapingEnabled(true);
+
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  ProducerID producer_id = *last_producer_id();
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+  ds_config->set_target_buffer(0);
+  consumer->EnableTracing(trace_config);
+
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  // Calling StartTracing() should be a noop (% a DLOG statement) because the
+  // trace config didn't have the |deferred_start| flag set.
+  consumer->StartTracing();
+
+  std::unique_ptr<TraceWriter> writer = producer->endpoint()->CreateTraceWriter(
+      tracing_session()->buffers_index[0]);
+  WaitForTraceWritersChanged(producer_id);
+
+  // Write a few trace packets.
+  writer->NewTracePacket()->set_for_testing()->set_str("payload1");
+  writer->NewTracePacket()->set_for_testing()->set_str("payload2");
+  writer->NewTracePacket()->set_for_testing()->set_str("payload3");
+
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+
+  // Chunk with the packets should have been scraped. The service can't know
+  // whether the last packet was completed, so shouldn't read it.
+  auto packets = consumer->ReadBuffers();
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload1")))));
+  EXPECT_THAT(packets, Contains(Property(
+                           &protos::TracePacket::for_testing,
+                           Property(&protos::TestEvent::str, Eq("payload2")))));
+  EXPECT_THAT(packets, Not(Contains(Property(&protos::TracePacket::for_testing,
+                                             Property(&protos::TestEvent::str,
+                                                      Eq("payload3"))))));
+}
+
 }  // namespace perfetto