perfetto: use new flushing mechanisms and make variables more consistent

With the addition of benchmark and related mechanisms, we have the ability
to make test code a lot nicer to read and less hacky. Remove some of these
hacks and also start using flushing in fuzzer.

Bug: 74380167
Change-Id: I9942428700aa657b02fcc95e14c9cfb4eeea262a
diff --git a/test/cts/end_to_end_integrationtest_cts.cc b/test/cts/end_to_end_integrationtest_cts.cc
index 212b877..b7577ca 100644
--- a/test/cts/end_to_end_integrationtest_cts.cc
+++ b/test/cts/end_to_end_integrationtest_cts.cc
@@ -53,8 +53,9 @@
 
     // Setip the function.
     uint64_t total = 0;
-    auto finish = task_runner.CreateCheckpoint("no.more.packets");
-    auto function = [&total, &finish, &rnd_engine](
+    auto on_readback_complete =
+        task_runner.CreateCheckpoint("readback.complete");
+    auto function = [&total, &on_readback_complete, &rnd_engine](
                         std::vector<TracePacket> packets, bool has_more) {
       for (auto& packet : packets) {
         ASSERT_TRUE(packet.Decode());
@@ -69,8 +70,9 @@
       total += packets.size();
 
       if (!has_more) {
+        // One extra packet for the clock snapshot.
         ASSERT_EQ(total, kEventCount + 1);
-        finish();
+        on_readback_complete();
       }
     };
 
@@ -79,17 +81,12 @@
     FakeConsumer consumer(trace_config, std::move(on_connect),
                           std::move(function), &task_runner);
     consumer.Connect(PERFETTO_CONSUMER_SOCK_NAME);
-
     task_runner.RunUntilCheckpoint("consumer.connected");
-    consumer.EnableTracing();
 
-    // TODO(skyostil): There's a race here before the service processes our data
-    // and the consumer tries to retrieve it. For now wait a bit until the
-    // service is done, but we should add explicit flushing to avoid this.
+    consumer.EnableTracing();
     task_runner.PostDelayedTask([&consumer]() { consumer.ReadTraceData(); },
                                 1000);
-
-    task_runner.RunUntilCheckpoint("no.more.packets");
+    task_runner.RunUntilCheckpoint("readback.complete");
   }
 };
 
diff --git a/test/end_to_end_benchmark.cc b/test/end_to_end_benchmark.cc
index 166eadc..615e1bf 100644
--- a/test/end_to_end_benchmark.cc
+++ b/test/end_to_end_benchmark.cc
@@ -43,25 +43,6 @@
 static void BM_EndToEnd(benchmark::State& state) {
   base::TestTaskRunner task_runner;
 
-  // Setup the TraceConfig for the consumer.
-  TraceConfig trace_config;
-
-  // TODO(lalitm): the buffer size should be a function of the benchmark.
-  trace_config.add_buffers()->set_size_kb(512);
-
-  // Create the buffer for ftrace.
-  auto* ds_config = trace_config.add_data_sources()->mutable_config();
-  ds_config->set_name("android.perfetto.FakeProducer");
-  ds_config->set_target_buffer(0);
-
-  // The parameters for the producer.
-  static constexpr uint32_t kRandomSeed = 42;
-  uint32_t message_count = state.range(0);
-
-  // Setup the test to use a random number generator.
-  ds_config->mutable_for_testing()->set_seed(kRandomSeed);
-  ds_config->mutable_for_testing()->set_message_count(message_count);
-
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
   TaskRunnerThread service_thread("perfetto.svc");
   service_thread.Start(std::unique_ptr<ServiceDelegate>(
@@ -79,6 +60,24 @@
   FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
   producer_thread.Start(std::move(producer_delegate));
 
+  // Setup the TraceConfig for the consumer.
+  // TODO(lalitm): the buffer size should be a function of the benchmark.
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(512);
+
+  // Create the buffer for ftrace.
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("android.perfetto.FakeProducer");
+  ds_config->set_target_buffer(0);
+
+  // The parameters for the producer.
+  static constexpr uint32_t kRandomSeed = 42;
+  uint32_t message_count = state.range(0);
+
+  // Setup the test to use a random number generator.
+  ds_config->mutable_for_testing()->set_seed(kRandomSeed);
+  ds_config->mutable_for_testing()->set_message_count(message_count);
+
   bool is_first_packet = true;
   auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
   std::minstd_rand0 rnd_engine(kRandomSeed);
@@ -119,8 +118,9 @@
 
   uint64_t wall_start_ns = base::GetWallTimeNs().count();
   uint64_t thread_start_ns = service_thread.GetThreadCPUTimeNs();
-  while (state.KeepRunning()) {
-    auto cname = "produced.and.committed." + std::to_string(state.iterations());
+  uint64_t iterations = 0;
+  for (auto _ : state) {
+    auto cname = "produced.and.committed." + std::to_string(iterations++);
     auto on_produced_and_committed = task_runner.CreateCheckpoint(cname);
     auto posted_on_produced_and_committed = [&task_runner,
                                              &on_produced_and_committed] {
@@ -132,8 +132,10 @@
   }
   uint64_t thread_ns = service_thread.GetThreadCPUTimeNs() - thread_start_ns;
   uint64_t wall_ns = base::GetWallTimeNs().count() - wall_start_ns;
-  PERFETTO_ILOG("Service CPU usage: %.2f,  CPU/iterations: %lf",
-                100.0 * thread_ns / wall_ns, 1.0 * thread_ns / message_count);
+
+  state.counters["Ser CPU"] = benchmark::Counter(100.0 * thread_ns / wall_ns);
+  state.counters["Ser ns/m"] =
+      benchmark::Counter(1.0 * thread_ns / message_count);
 
   // Read back the buffer just to check correctness.
   consumer.ReadTraceData();
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index 78ae680..7a969bc 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -59,12 +59,6 @@
 #define TEST_CONSUMER_SOCK_NAME PERFETTO_CONSUMER_SOCK_NAME
 #endif
 
-class PerfettoTest : public ::testing::Test {
- public:
-  PerfettoTest() {}
-  ~PerfettoTest() override = default;
-};
-
 // TODO(b/73453011): reenable this on more platforms (including standalone
 // Android).
 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
@@ -72,9 +66,19 @@
 #else
 #define MAYBE_TestFtraceProducer DISABLED_TestFtraceProducer
 #endif
-TEST_F(PerfettoTest, MAYBE_TestFtraceProducer) {
+TEST(PerfettoTest, MAYBE_TestFtraceProducer) {
   base::TestTaskRunner task_runner;
 
+#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+  TaskRunnerThread service_thread("perfetto.svc");
+  service_thread.Start(std::unique_ptr<ServiceDelegate>(
+      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
+
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
+      new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
+#endif
+
   // Setip the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(1024);
@@ -93,8 +97,8 @@
   // Create the function to handle packets as they come in.
   uint64_t total = 0;
   auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  auto function = [&total, &on_readback_complete](
-                      std::vector<TracePacket> packets, bool has_more) {
+  auto on_consumer_data = [&total, &on_readback_complete](
+                              std::vector<TracePacket> packets, bool has_more) {
     for (auto& packet : packets) {
       ASSERT_TRUE(packet.Decode());
       ASSERT_TRUE(packet->has_ftrace_events() || packet->has_clock_snapshot());
@@ -112,33 +116,41 @@
     }
   };
 
+  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+  FakeConsumer consumer(trace_config, std::move(on_connect),
+                        std::move(on_consumer_data), &task_runner);
+
+  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
+  task_runner.RunUntilCheckpoint("consumer.connected");
+
+  // Traced probes should flush data as it produces it.
+  consumer.EnableTracing();
+  task_runner.PostDelayedTask([&consumer] { consumer.ReadTraceData(); }, 3000);
+
+  task_runner.RunUntilCheckpoint("readback.complete", 10000);
+
+  consumer.Disconnect();
+}
+
+TEST(PerfettoTest, TestFakeProducer) {
+  base::TestTaskRunner task_runner;
+
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
   TaskRunnerThread service_thread("perfetto.svc");
   service_thread.Start(std::unique_ptr<ServiceDelegate>(
       new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
-
-  TaskRunnerThread producer_thread("perfetto.prd");
-  producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
-      new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
 #endif
 
-  // Finally, make the consumer connect to the service.
-  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
-  FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(function), &task_runner);
-  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
-
-  task_runner.RunUntilCheckpoint("consumer.connected");
-  consumer.EnableTracing();
-
-  // Traced probes should flush data as it produces it.
-  task_runner.PostDelayedTask([&consumer] { consumer.ReadTraceData(); }, 3000);
-
-  task_runner.RunUntilCheckpoint("readback.complete", 10000);
-}
-
-TEST_F(PerfettoTest, TestFakeProducer) {
-  base::TestTaskRunner task_runner;
+  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
+    task_runner.PostTask(on_producer_enabled);
+  };
+  TaskRunnerThread producer_thread("perfetto.prd");
+  std::unique_ptr<FakeProducerDelegate> producer_delegate(
+      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
+                               posted_on_producer_enabled));
+  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
+  producer_thread.Start(std::move(producer_delegate));
 
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
@@ -164,9 +176,8 @@
   // Create the function to handle packets as they come in.
   uint64_t total = 0;
   auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  auto function = [&total, &on_readback_complete, &random](
-                      std::vector<TracePacket> packets, bool has_more) {
-
+  auto on_consumer_data = [&total, &on_readback_complete, &random](
+                              std::vector<TracePacket> packets, bool has_more) {
     for (auto& packet : packets) {
       ASSERT_TRUE(packet.Decode());
       if (packet->has_clock_snapshot())
@@ -185,27 +196,10 @@
     }
   };
 
-#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  TaskRunnerThread service_thread("perfetto.svc");
-  service_thread.Start(std::unique_ptr<ServiceDelegate>(
-      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
-#endif
-
-  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
-  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
-    task_runner.PostTask(on_producer_enabled);
-  };
-  TaskRunnerThread producer_thread("perfetto.prd");
-  std::unique_ptr<FakeProducerDelegate> producer_delegate(
-      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
-                               posted_on_producer_enabled));
-  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
-  producer_thread.Start(std::move(producer_delegate));
-
-  // Finally, make the consumer connect to the service.
   auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
   FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(function), &task_runner);
+                        std::move(on_consumer_data), &task_runner);
+
   consumer.Connect(TEST_CONSUMER_SOCK_NAME);
   task_runner.RunUntilCheckpoint("consumer.connected");
 
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 8f32b5f..37689f7 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -50,8 +50,11 @@
   FakeProducer(std::string name,
                const uint8_t* data,
                size_t size,
-               FakeConsumer* consumer)
-      : name_(std::move(name)), data_(data), size_(size), consumer_(consumer) {}
+               std::function<void()> on_produced_and_committed)
+      : name_(std::move(name)),
+        data_(data),
+        size_(size),
+        on_produced_and_committed_(on_produced_and_committed) {}
 
   void Connect(const char* socket_name, base::TaskRunner* task_runner) {
     endpoint_ = ProducerIPCClient::Connect(socket_name, this, task_runner);
@@ -69,23 +72,19 @@
   void CreateDataSourceInstance(
       DataSourceInstanceID,
       const DataSourceConfig& source_config) override {
-    // The block is to destroy |packet| and |trace_writer| in order. Destroying
-    // the |trace_writer| will cause a flush of the completed packets.
+    auto trace_writer = endpoint_->CreateTraceWriter(
+        static_cast<BufferID>(source_config.target_buffer()));
     {
-      auto trace_writer = endpoint_->CreateTraceWriter(
-          static_cast<BufferID>(source_config.target_buffer()));
       auto packet = trace_writer->NewTracePacket();
       packet->stream_writer_->WriteBytes(data_, size_);
-      packet->Finalize();
     }
+    trace_writer->Flush();
+
     {
-      auto trace_writer = endpoint_->CreateTraceWriter(
-          static_cast<BufferID>(source_config.target_buffer()));
       auto end_packet = trace_writer->NewTracePacket();
       end_packet->set_for_testing()->set_str("end");
-      end_packet->Finalize();
     }
-    consumer_->BusyWaitReadBuffers();
+    trace_writer->Flush(on_produced_and_committed_);
   }
 
   void TearDownDataSourceInstance(DataSourceInstanceID) override {}
@@ -98,18 +97,22 @@
   const size_t size_;
   DataSourceID id_ = 0;
   std::unique_ptr<Service::ProducerEndpoint> endpoint_;
-  FakeConsumer* consumer_;
+  std::function<void()> on_produced_and_committed_;
 };
 
 class FakeProducerDelegate : public ThreadDelegate {
  public:
-  FakeProducerDelegate(const uint8_t* data, size_t size, FakeConsumer* consumer)
-      : data_(data), size_(size), consumer_(consumer) {}
+  FakeProducerDelegate(const uint8_t* data,
+                       size_t size,
+                       std::function<void()> on_produced_and_committed)
+      : data_(data),
+        size_(size),
+        on_produced_and_committed_(on_produced_and_committed) {}
   ~FakeProducerDelegate() override = default;
 
   void Initialize(base::TaskRunner* task_runner) override {
     producer_.reset(new FakeProducer("android.perfetto.FakeProducer", data_,
-                                     size_, consumer_));
+                                     size_, on_produced_and_committed_));
     producer_->Connect(kProducerSocket, task_runner);
   }
 
@@ -117,48 +120,62 @@
   std::unique_ptr<FakeProducer> producer_;
   const uint8_t* data_;
   const size_t size_;
-  FakeConsumer* consumer_;
+  std::function<void()> on_produced_and_committed_;
 };
 
 int FuzzSharedMemory(const uint8_t* data, size_t size);
 
 int FuzzSharedMemory(const uint8_t* data, size_t size) {
+  base::TestTaskRunner task_runner;
+
   TaskRunnerThread service_thread("perfetto.svc");
   service_thread.Start(std::unique_ptr<ServiceDelegate>(
       new ServiceDelegate(kProducerSocket, kConsumerSocket)));
 
+  auto on_produced_and_committed =
+      task_runner.CreateCheckpoint("produced.and.committed");
+  auto posted_on_produced_and_committed = [&task_runner,
+                                           &on_produced_and_committed] {
+    task_runner.PostTask(on_produced_and_committed);
+  };
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
+      new FakeProducerDelegate(data, size, posted_on_produced_and_committed)));
+
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(8);
-  trace_config.set_duration_ms(1000);
 
-  // Create the buffer for ftrace.
+  // Create the buffer for the fake producer.
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
   ds_config->set_name("android.perfetto.FakeProducer");
   ds_config->set_target_buffer(0);
 
-  base::TestTaskRunner task_runner;
-  auto finish = task_runner.CreateCheckpoint("no.more.packets");
-  // Wait for sentinel message from Producer, then signal no.more.packets.
-  auto function = [&finish](std::vector<TracePacket> packets, bool has_more) {
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  auto on_consumer_data = [&on_readback_complete](
+                              std::vector<TracePacket> packets, bool has_more) {
     for (auto& p : packets) {
       p.Decode();
       if (p->for_testing().str() == "end")
-        finish();
+        on_readback_complete();
     }
   };
+
   auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
   FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(function), &task_runner);
+                        std::move(on_consumer_data), &task_runner);
+
   consumer.Connect(kConsumerSocket);
   task_runner.RunUntilCheckpoint("consumer.connected");
 
   consumer.EnableTracing();
+  task_runner.RunUntilCheckpoint("produced.and.committed");
 
-  TaskRunnerThread producer_thread("perfetto.prd");
-  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
-      new FakeProducerDelegate(data, size, &consumer)));
-  task_runner.RunUntilCheckpoint("no.more.packets");
+  consumer.ReadTraceData();
+  task_runner.RunUntilCheckpoint("readback.complete");
+
+  consumer.Disconnect();
+
   return 0;
 }
 
diff --git a/test/fake_consumer.cc b/test/fake_consumer.cc
index e409cd0..e986450 100644
--- a/test/fake_consumer.cc
+++ b/test/fake_consumer.cc
@@ -72,14 +72,4 @@
   packet_callback_(std::move(data), has_more);
 }
 
-void FakeConsumer::BusyWaitReadBuffers() {
-  task_runner_->PostDelayedTask(
-      std::bind([this]() {
-        endpoint_->ReadBuffers();
-        task_runner_->PostDelayedTask(
-            std::bind([this]() { BusyWaitReadBuffers(); }), 1);
-      }),
-      1);
-}
-
 }  // namespace perfetto
diff --git a/test/fake_consumer.h b/test/fake_consumer.h
index 38ae875..e71c05c 100644
--- a/test/fake_consumer.h
+++ b/test/fake_consumer.h
@@ -43,7 +43,6 @@
   void Connect(const char* socket_name);
   void Disconnect();
   void ReadTraceData();
-  void BusyWaitReadBuffers();
 
   // Consumer implementation.
   void OnConnect() override;