perfetto: use new flushing mechanisms and make variables more consistent

With the addition of benchmark and related mechanisms, we have the ability
to make test code a lot nicer to read and less hacky. Remove some of these
hacks and also start using flushing in fuzzer.

Bug: 74380167
Change-Id: I9942428700aa657b02fcc95e14c9cfb4eeea262a
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 8f32b5f..37689f7 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -50,8 +50,11 @@
   FakeProducer(std::string name,
                const uint8_t* data,
                size_t size,
-               FakeConsumer* consumer)
-      : name_(std::move(name)), data_(data), size_(size), consumer_(consumer) {}
+               std::function<void()> on_produced_and_committed)
+      : name_(std::move(name)),
+        data_(data),
+        size_(size),
+        on_produced_and_committed_(on_produced_and_committed) {}
 
   void Connect(const char* socket_name, base::TaskRunner* task_runner) {
     endpoint_ = ProducerIPCClient::Connect(socket_name, this, task_runner);
@@ -69,23 +72,19 @@
   void CreateDataSourceInstance(
       DataSourceInstanceID,
       const DataSourceConfig& source_config) override {
-    // The block is to destroy |packet| and |trace_writer| in order. Destroying
-    // the |trace_writer| will cause a flush of the completed packets.
+    auto trace_writer = endpoint_->CreateTraceWriter(
+        static_cast<BufferID>(source_config.target_buffer()));
     {
-      auto trace_writer = endpoint_->CreateTraceWriter(
-          static_cast<BufferID>(source_config.target_buffer()));
       auto packet = trace_writer->NewTracePacket();
       packet->stream_writer_->WriteBytes(data_, size_);
-      packet->Finalize();
     }
+    trace_writer->Flush();
+
     {
-      auto trace_writer = endpoint_->CreateTraceWriter(
-          static_cast<BufferID>(source_config.target_buffer()));
       auto end_packet = trace_writer->NewTracePacket();
       end_packet->set_for_testing()->set_str("end");
-      end_packet->Finalize();
     }
-    consumer_->BusyWaitReadBuffers();
+    trace_writer->Flush(on_produced_and_committed_);
   }
 
   void TearDownDataSourceInstance(DataSourceInstanceID) override {}
@@ -98,18 +97,22 @@
   const size_t size_;
   DataSourceID id_ = 0;
   std::unique_ptr<Service::ProducerEndpoint> endpoint_;
-  FakeConsumer* consumer_;
+  std::function<void()> on_produced_and_committed_;
 };
 
 class FakeProducerDelegate : public ThreadDelegate {
  public:
-  FakeProducerDelegate(const uint8_t* data, size_t size, FakeConsumer* consumer)
-      : data_(data), size_(size), consumer_(consumer) {}
+  FakeProducerDelegate(const uint8_t* data,
+                       size_t size,
+                       std::function<void()> on_produced_and_committed)
+      : data_(data),
+        size_(size),
+        on_produced_and_committed_(on_produced_and_committed) {}
   ~FakeProducerDelegate() override = default;
 
   void Initialize(base::TaskRunner* task_runner) override {
     producer_.reset(new FakeProducer("android.perfetto.FakeProducer", data_,
-                                     size_, consumer_));
+                                     size_, on_produced_and_committed_));
     producer_->Connect(kProducerSocket, task_runner);
   }
 
@@ -117,48 +120,62 @@
   std::unique_ptr<FakeProducer> producer_;
   const uint8_t* data_;
   const size_t size_;
-  FakeConsumer* consumer_;
+  std::function<void()> on_produced_and_committed_;
 };
 
 int FuzzSharedMemory(const uint8_t* data, size_t size);
 
 int FuzzSharedMemory(const uint8_t* data, size_t size) {
+  base::TestTaskRunner task_runner;
+
   TaskRunnerThread service_thread("perfetto.svc");
   service_thread.Start(std::unique_ptr<ServiceDelegate>(
       new ServiceDelegate(kProducerSocket, kConsumerSocket)));
 
+  auto on_produced_and_committed =
+      task_runner.CreateCheckpoint("produced.and.committed");
+  auto posted_on_produced_and_committed = [&task_runner,
+                                           &on_produced_and_committed] {
+    task_runner.PostTask(on_produced_and_committed);
+  };
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
+      new FakeProducerDelegate(data, size, posted_on_produced_and_committed)));
+
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(8);
-  trace_config.set_duration_ms(1000);
 
-  // Create the buffer for ftrace.
+  // Create the buffer for the fake producer.
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
   ds_config->set_name("android.perfetto.FakeProducer");
   ds_config->set_target_buffer(0);
 
-  base::TestTaskRunner task_runner;
-  auto finish = task_runner.CreateCheckpoint("no.more.packets");
-  // Wait for sentinel message from Producer, then signal no.more.packets.
-  auto function = [&finish](std::vector<TracePacket> packets, bool has_more) {
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  auto on_consumer_data = [&on_readback_complete](
+                              std::vector<TracePacket> packets, bool has_more) {
     for (auto& p : packets) {
       p.Decode();
       if (p->for_testing().str() == "end")
-        finish();
+        on_readback_complete();
     }
   };
+
   auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
   FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(function), &task_runner);
+                        std::move(on_consumer_data), &task_runner);
+
   consumer.Connect(kConsumerSocket);
   task_runner.RunUntilCheckpoint("consumer.connected");
 
   consumer.EnableTracing();
+  task_runner.RunUntilCheckpoint("produced.and.committed");
 
-  TaskRunnerThread producer_thread("perfetto.prd");
-  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
-      new FakeProducerDelegate(data, size, &consumer)));
-  task_runner.RunUntilCheckpoint("no.more.packets");
+  consumer.ReadTraceData();
+  task_runner.RunUntilCheckpoint("readback.complete");
+
+  consumer.Disconnect();
+
   return 0;
 }