perfetto: refactor test code to consolidate pattern

With several types of testing now present, a clear pattern has been
established in how test code for end to end tests should be written.
This pattern involves spinning up service and producer, setting up a
test-specific trace config and then defining an intricate set of
checkpoints and RunUntilCheckpoint calls.

All of this code is duplicated and this will only get worse as we add
more benchmarks and tests to the codebase. Moreover, the
complex-appearance of the code hides the true simplicity within.

Refactor all the duplicated code into a helper file and call the
functions in this class from the four other cases where they are used
(integration test, fuzzer, benchmark and CTS).

Bug: 74380167
Change-Id: I261636b3da15b16f063e6a5dc6c293080b8ff4ac
diff --git a/Android.bp b/Android.bp
index 4fd984e..44ab92c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -334,9 +334,9 @@
     "src/tracing/core/trace_packet.cc",
     "src/tracing/core/trace_writer_impl.cc",
     "test/end_to_end_integrationtest.cc",
-    "test/fake_consumer.cc",
     "test/fake_producer.cc",
     "test/task_runner_thread.cc",
+    "test/test_helper.cc",
   ],
   shared_libs: [
     "libandroid",
@@ -3549,10 +3549,10 @@
     "src/ftrace_reader/ftrace_procfs.cc",
     "src/ftrace_reader/proto_translation_table.cc",
     "src/traced/probes/probes_producer.cc",
-    "test/fake_producer.cc",
-    "test/fake_consumer.cc",
-    "test/task_runner_thread.cc",
     "test/end_to_end_integrationtest.cc",
+    "test/fake_producer.cc",
+    "test/task_runner_thread.cc",
+    "test/test_helper.cc",
   ],
   export_include_dirs: [
     ".",
diff --git a/Android.bp.extras b/Android.bp.extras
index a3f25d5..42c389c 100644
--- a/Android.bp.extras
+++ b/Android.bp.extras
@@ -11,10 +11,10 @@
     "src/ftrace_reader/ftrace_procfs.cc",
     "src/ftrace_reader/proto_translation_table.cc",
     "src/traced/probes/probes_producer.cc",
-    "test/fake_producer.cc",
-    "test/fake_consumer.cc",
-    "test/task_runner_thread.cc",
     "test/end_to_end_integrationtest.cc",
+    "test/fake_producer.cc",
+    "test/task_runner_thread.cc",
+    "test/test_helper.cc",
   ],
   export_include_dirs: [
     ".",
diff --git a/test/BUILD.gn b/test/BUILD.gn
index 7e27608..de7c72c 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -25,9 +25,9 @@
 source_set("end_to_end_integrationtests") {
   testonly = true
   deps = [
-    ":fake_consumer",
     ":task_runner_thread",
     ":task_runner_thread_delegates",
+    ":test_helper",
     "../gn:default_deps",
     "../gn:gtest_deps",
     "../protos/perfetto/trace:lite",
@@ -52,9 +52,9 @@
     ]
     testonly = true
     deps = [
-      ":fake_consumer",
       ":task_runner_thread",
       ":task_runner_thread_delegates",
+      ":test_helper",
       "../gn:default_deps",
       "../src/base:test_support",
       "../src/protozero",
@@ -65,22 +65,6 @@
   }
 }
 
-source_set("fake_consumer") {
-  testonly = true
-  deps = [
-    "../gn:default_deps",
-    "../gn:gtest_deps",
-    "../protos/perfetto/trace:lite",
-    "../protos/perfetto/trace:zero",
-    "../src/base:base",
-    "../src/base:test_support",
-  ]
-  sources = [
-    "fake_consumer.cc",
-    "fake_consumer.h",
-  ]
-}
-
 source_set("task_runner_thread") {
   testonly = true
   deps = [
@@ -109,13 +93,31 @@
   ]
 }
 
+source_set("test_helper") {
+  testonly = true
+  deps = [
+    ":task_runner_thread",
+    ":task_runner_thread_delegates",
+    "../gn:default_deps",
+    "../protos/perfetto/trace:lite",
+    "../src/base:test_support",
+  ]
+  sources = [
+    "test_helper.cc",
+    "test_helper.h",
+  ]
+  if (start_daemons_for_testing) {
+    cflags = [ "-DPERFETTO_START_DAEMONS_FOR_TESTING" ]
+  }
+}
+
 if (!build_with_chromium) {
   source_set("end_to_end_benchmarks") {
     testonly = true
     deps = [
-      ":fake_consumer",
       ":task_runner_thread",
       ":task_runner_thread_delegates",
+      ":test_helper",
       "../../gn:default_deps",
       "../gn:gtest_deps",
       "../protos/perfetto/trace:lite",
diff --git a/test/cts/end_to_end_integrationtest_cts.cc b/test/cts/end_to_end_integrationtest_cts.cc
index d6531ec..426458b 100644
--- a/test/cts/end_to_end_integrationtest_cts.cc
+++ b/test/cts/end_to_end_integrationtest_cts.cc
@@ -22,7 +22,8 @@
 #include "perfetto/trace/trace_packet.pbzero.h"
 #include "perfetto/traced/traced.h"
 #include "perfetto/tracing/core/trace_packet.h"
-#include "test/fake_consumer.h"
+#include "src/base/test/test_task_runner.h"
+#include "test/test_helper.h"
 
 namespace perfetto {
 
@@ -31,65 +32,45 @@
   void TestMockProducer(const std::string& producer_name) {
     base::TestTaskRunner task_runner;
 
-    // Setup the trace config.
+    TestHelper helper(&task_runner);
+    helper.ConnectConsumer();
+
     TraceConfig trace_config;
-    trace_config.add_buffers()->set_size_kb(4096 * 10);
+    trace_config.add_buffers()->set_size_kb(1024);
     trace_config.set_duration_ms(200);
 
     auto* ds_config = trace_config.add_data_sources()->mutable_config();
     ds_config->set_name(producer_name);
     ds_config->set_target_buffer(0);
 
-    // The parameters for the producer.
     static constexpr uint32_t kRandomSeed = 42;
     static constexpr uint32_t kEventCount = 10;
     static constexpr uint32_t kMessageSizeBytes = 1024;
-
-    // Setup the test to use a random number generator.
     ds_config->mutable_for_testing()->set_seed(kRandomSeed);
     ds_config->mutable_for_testing()->set_message_count(kEventCount);
     ds_config->mutable_for_testing()->set_message_size(kMessageSizeBytes);
 
-    // Create the random generator with the same seed.
-    std::minstd_rand0 rnd_engine(kRandomSeed);
+    auto producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+    task_runner.PostTask(producer_enabled);
+    helper.StartTracing(trace_config);
 
-    // Setip the function.
-    uint64_t total = 0;
+    size_t packets_seen = 0;
+    std::minstd_rand0 rnd_engine(kRandomSeed);
+    auto on_consumer_data = [&packets_seen, &rnd_engine](
+                                const TracePacket::DecodedTracePacket& packet) {
+      ASSERT_TRUE(packet.has_for_testing());
+      ASSERT_EQ(packet.for_testing().seq_value(), rnd_engine());
+      packets_seen++;
+    };
     auto on_readback_complete =
         task_runner.CreateCheckpoint("readback.complete");
-    auto function = [&total, &on_readback_complete, &rnd_engine](
-                        std::vector<TracePacket> packets, bool has_more) {
-      for (auto& packet : packets) {
-        ASSERT_TRUE(packet.Decode());
-        ASSERT_TRUE(packet->has_for_testing() || packet->has_clock_snapshot() ||
-                    packet->has_trace_config());
-        if (packet->has_clock_snapshot() || packet->has_trace_config()) {
-          continue;
-        }
-        ASSERT_EQ(protos::TracePacket::kTrustedUid,
-                  packet->optional_trusted_uid_case());
-        ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
-      }
-      total += packets.size();
-
-      if (!has_more) {
-        // Extra packets for the clock snapshot and trace config.
-        ASSERT_EQ(total, kEventCount + 2);
-        on_readback_complete();
-      }
-    };
-
-    // Finally, make the consumer connect to the service.
-    auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
-    FakeConsumer consumer(trace_config, std::move(on_connect),
-                          std::move(function), &task_runner);
-    consumer.Connect(PERFETTO_CONSUMER_SOCK_NAME);
-    task_runner.RunUntilCheckpoint("consumer.connected");
-
-    consumer.EnableTracing();
-    task_runner.PostDelayedTask([&consumer]() { consumer.ReadTraceData(); },
-                                1000);
+    task_runner.PostDelayedTask(
+        [&on_consumer_data, &on_readback_complete, &helper]() {
+          helper.ReadData(on_consumer_data, on_readback_complete);
+        },
+        1000);
     task_runner.RunUntilCheckpoint("readback.complete");
+    ASSERT_EQ(packets_seen, kEventCount);
   }
 };
 
diff --git a/test/end_to_end_benchmark.cc b/test/end_to_end_benchmark.cc
index 8c3d2fb..429711a 100644
--- a/test/end_to_end_benchmark.cc
+++ b/test/end_to_end_benchmark.cc
@@ -23,25 +23,14 @@
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_packet.h"
 #include "src/base/test/test_task_runner.h"
-#include "test/fake_consumer.h"
 #include "test/task_runner_thread.h"
 #include "test/task_runner_thread_delegates.h"
+#include "test/test_helper.h"
 
 namespace perfetto {
 
 namespace {
 
-// If we're building on Android and starting the daemons ourselves,
-// create the sockets in a world-writable location.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
-    PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-#define TEST_PRODUCER_SOCK_NAME "/data/local/tmp/traced_producer"
-#define TEST_CONSUMER_SOCK_NAME "/data/local/tmp/traced_consumer"
-#else
-#define TEST_PRODUCER_SOCK_NAME PERFETTO_PRODUCER_SOCK_NAME
-#define TEST_CONSUMER_SOCK_NAME PERFETTO_CONSUMER_SOCK_NAME
-#endif
-
 bool IsBenchmarkFunctionalOnly() {
   return getenv("BENCHMARK_FUNCTIONAL_TEST_ONLY") != nullptr;
 }
@@ -49,25 +38,11 @@
 void BenchmarkCommon(benchmark::State& state) {
   base::TestTaskRunner task_runner;
 
-#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  TaskRunnerThread service_thread("perfetto.svc");
-  service_thread.Start(std::unique_ptr<ServiceDelegate>(
-      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
-#endif
+  TestHelper helper(&task_runner);
+  helper.StartServiceIfRequired();
 
-  TaskRunnerThread producer_thread("perfetto.prd");
-  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
-  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
-    task_runner.PostTask(on_producer_enabled);
-  };
-  std::unique_ptr<FakeProducerDelegate> producer_delegate(
-      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
-                               std::move(posted_on_producer_enabled)));
-  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
-  producer_thread.Start(std::move(producer_delegate));
-
-  // Once conneced, we can retrieve the inner producer.
-  FakeProducer* producer = producer_delegate_cached->producer();
+  FakeProducer* producer = helper.ConnectFakeProducer();
+  helper.ConnectConsumer();
 
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
@@ -94,61 +69,35 @@
   ds_config->mutable_for_testing()->set_message_size(message_bytes);
   ds_config->mutable_for_testing()->set_max_messages_per_second(messages_per_s);
 
-  bool is_first_packet = true;
-  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  std::minstd_rand0 rnd_engine(kRandomSeed);
-  auto on_consumer_data = [&is_first_packet, &on_readback_complete,
-                           &rnd_engine](std::vector<TracePacket> packets,
-                                        bool has_more) {
-    for (auto& packet : packets) {
-      ASSERT_TRUE(packet.Decode());
-      ASSERT_TRUE(packet->has_for_testing() || packet->has_clock_snapshot() ||
-                  packet->has_trace_config());
-      if (packet->has_clock_snapshot() || packet->has_trace_config())
-        continue;
-      ASSERT_EQ(protos::TracePacket::kTrustedUid,
-                packet->optional_trusted_uid_case());
-      if (is_first_packet) {
-        rnd_engine = std::minstd_rand0(packet->for_testing().seq_value());
-        is_first_packet = false;
-      } else {
-        ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
-      }
-    }
+  helper.StartTracing(trace_config);
 
-    if (!has_more) {
-      is_first_packet = true;
-      on_readback_complete();
+  bool is_first_packet = true;
+  std::minstd_rand0 rnd_engine(kRandomSeed);
+  auto on_consumer_data = [&is_first_packet, &rnd_engine](
+                              const TracePacket::DecodedTracePacket& packet) {
+    ASSERT_TRUE(packet.has_for_testing());
+    if (is_first_packet) {
+      rnd_engine = std::minstd_rand0(packet.for_testing().seq_value());
+      is_first_packet = false;
+    } else {
+      ASSERT_EQ(packet.for_testing().seq_value(), rnd_engine());
     }
   };
 
-  // Finally, make the consumer connect to the service.
-  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
-  FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(on_consumer_data), &task_runner);
-  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
-  task_runner.RunUntilCheckpoint("consumer.connected");
-
-  consumer.EnableTracing();
-  task_runner.RunUntilCheckpoint("producer.enabled");
-
   uint64_t wall_start_ns = base::GetWallTimeNs().count();
-  uint64_t service_start_ns = service_thread.GetThreadCPUTimeNs();
-  uint64_t producer_start_ns = producer_thread.GetThreadCPUTimeNs();
+  uint64_t service_start_ns = helper.service_thread()->GetThreadCPUTimeNs();
+  uint64_t producer_start_ns = helper.producer_thread()->GetThreadCPUTimeNs();
   uint64_t iterations = 0;
   for (auto _ : state) {
     auto cname = "produced.and.committed." + std::to_string(iterations++);
     auto on_produced_and_committed = task_runner.CreateCheckpoint(cname);
-    auto posted_on_produced_and_committed = [&task_runner,
-                                             &on_produced_and_committed] {
-      task_runner.PostTask(on_produced_and_committed);
-    };
-    producer->ProduceEventBatch(posted_on_produced_and_committed);
+    producer->ProduceEventBatch(helper.WrapTask(on_produced_and_committed));
     task_runner.RunUntilCheckpoint(cname, time_for_messages_ms);
   }
-  uint64_t service_ns = service_thread.GetThreadCPUTimeNs() - service_start_ns;
+  uint64_t service_ns =
+      helper.service_thread()->GetThreadCPUTimeNs() - service_start_ns;
   uint64_t producer_ns =
-      producer_thread.GetThreadCPUTimeNs() - producer_start_ns;
+      helper.producer_thread()->GetThreadCPUTimeNs() - producer_start_ns;
   uint64_t wall_ns = base::GetWallTimeNs().count() - wall_start_ns;
 
   state.counters["Pro CPU"] = benchmark::Counter(100.0 * producer_ns / wall_ns);
@@ -157,11 +106,10 @@
       benchmark::Counter(1.0 * service_ns / message_count);
 
   // Read back the buffer just to check correctness.
-  consumer.ReadTraceData();
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  helper.ReadData(on_consumer_data, on_readback_complete);
   task_runner.RunUntilCheckpoint("readback.complete");
   state.SetBytesProcessed(iterations * message_bytes * message_count);
-
-  consumer.Disconnect();
 }
 
 void SaturateCpuArgs(benchmark::internal::Benchmark* b) {
@@ -185,7 +133,7 @@
     b->Args({message_count, 256, speed});
   }
 }
-}
+}  // namespace
 
 static void BM_EndToEnd_SaturateCpu(benchmark::State& state) {
   BenchmarkCommon(state);
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index dbf91c2..14f8c3b 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -27,36 +27,22 @@
 #include "perfetto/trace/trace_packet.pb.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 #include "perfetto/traced/traced.h"
-#include "perfetto/tracing/core/consumer.h"
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_packet.h"
-#include "perfetto/tracing/ipc/consumer_ipc_client.h"
 #include "src/base/test/test_task_runner.h"
-#include "test/fake_consumer.h"
 #include "test/task_runner_thread.h"
 #include "test/task_runner_thread_delegates.h"
-
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-#include "perfetto/base/android_task_runner.h"
-#endif
+#include "test/test_helper.h"
 
 namespace perfetto {
 
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-using PlatformTaskRunner = base::AndroidTaskRunner;
-#else
-using PlatformTaskRunner = base::UnixTaskRunner;
-#endif
-
 // If we're building on Android and starting the daemons ourselves,
 // create the sockets in a world-writable location.
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
     PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
 #define TEST_PRODUCER_SOCK_NAME "/data/local/tmp/traced_producer"
-#define TEST_CONSUMER_SOCK_NAME "/data/local/tmp/traced_consumer"
 #else
 #define TEST_PRODUCER_SOCK_NAME PERFETTO_PRODUCER_SOCK_NAME
-#define TEST_CONSUMER_SOCK_NAME PERFETTO_CONSUMER_SOCK_NAME
 #endif
 
 // TODO(b/73453011): reenable this on more platforms (including standalone
@@ -69,245 +55,141 @@
 TEST(PerfettoTest, MAYBE_TestFtraceProducer) {
   base::TestTaskRunner task_runner;
 
-#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  TaskRunnerThread service_thread("perfetto.svc");
-  service_thread.Start(std::unique_ptr<ServiceDelegate>(
-      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
+  TestHelper helper(&task_runner);
+  helper.StartServiceIfRequired();
 
+#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
   TaskRunnerThread producer_thread("perfetto.prd");
   producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
       new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
 #endif
 
-  // Setip the TraceConfig for the consumer.
+  helper.ConnectConsumer();
+
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(1024);
   trace_config.set_duration_ms(3000);
 
-  // Create the buffer for ftrace.
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
   ds_config->set_name("linux.ftrace");
   ds_config->set_target_buffer(0);
 
-  // Setup the config for ftrace.
   auto* ftrace_config = ds_config->mutable_ftrace_config();
   *ftrace_config->add_ftrace_events() = "sched_switch";
   *ftrace_config->add_ftrace_events() = "bar";
 
-  // Create the function to handle packets as they come in.
-  uint64_t total = 0;
+  auto producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+  task_runner.PostDelayedTask(producer_enabled, 100);
+  helper.StartTracing(trace_config);
+
+  size_t packets_seen = 0;
+  auto on_consumer_data =
+      [&packets_seen](const TracePacket::DecodedTracePacket& packet) {
+        for (int ev = 0; ev < packet.ftrace_events().event_size(); ev++) {
+          ASSERT_TRUE(packet.ftrace_events().event(ev).has_sched_switch());
+        }
+        packets_seen++;
+      };
   auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  auto on_consumer_data = [&total, &on_readback_complete](
-                              std::vector<TracePacket> packets, bool has_more) {
-    for (auto& packet : packets) {
-      ASSERT_TRUE(packet.Decode());
-      ASSERT_TRUE(packet->has_ftrace_events() || packet->has_clock_snapshot() ||
-                  packet->has_trace_config());
-      if (packet->has_clock_snapshot() || packet->has_trace_config())
-        continue;
-      for (int ev = 0; ev < packet->ftrace_events().event_size(); ev++) {
-        ASSERT_TRUE(packet->ftrace_events().event(ev).has_sched_switch());
-      }
-    }
-    total += packets.size();
-
-    if (!has_more) {
-      ASSERT_GT(total, 0u);
-      on_readback_complete();
-    }
-  };
-
-  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
-  FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(on_consumer_data), &task_runner);
-
-  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
-  task_runner.RunUntilCheckpoint("consumer.connected");
-
-  // Traced probes should flush data as it produces it.
-  consumer.EnableTracing();
-  task_runner.PostDelayedTask([&consumer] { consumer.ReadTraceData(); }, 3000);
-
-  task_runner.RunUntilCheckpoint("readback.complete", 10000);
-
-  consumer.Disconnect();
+  task_runner.PostDelayedTask(
+      [&helper, &on_consumer_data, &on_readback_complete] {
+        helper.ReadData(on_consumer_data, on_readback_complete);
+      },
+      3000);
+  task_runner.RunUntilCheckpoint("readback.complete");
+  ASSERT_GT(packets_seen, 0u);
 }
 
 TEST(PerfettoTest, TestFakeProducer) {
   base::TestTaskRunner task_runner;
 
-#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  TaskRunnerThread service_thread("perfetto.svc");
-  service_thread.Start(std::unique_ptr<ServiceDelegate>(
-      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
-#endif
+  TestHelper helper(&task_runner);
+  helper.StartServiceIfRequired();
 
-  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
-  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
-    task_runner.PostTask(on_producer_enabled);
-  };
-  TaskRunnerThread producer_thread("perfetto.prd");
-  std::unique_ptr<FakeProducerDelegate> producer_delegate(
-      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
-                               posted_on_producer_enabled));
-  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
-  producer_thread.Start(std::move(producer_delegate));
+  FakeProducer* producer = helper.ConnectFakeProducer();
+  helper.ConnectConsumer();
 
-  // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(1024);
   trace_config.set_duration_ms(200);
 
-  // Create the buffer for ftrace.
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
   ds_config->set_name("android.perfetto.FakeProducer");
   ds_config->set_target_buffer(0);
 
-  // The parameters for the producer.
+  static constexpr size_t kNumPackets = 10;
   static constexpr uint32_t kRandomSeed = 42;
-  static constexpr uint32_t kEventCount = 10;
-  static constexpr uint32_t kMessageSizeBytes = 1024;
-
-  // Setup the test to use a random number generator.
+  static constexpr uint32_t kMsgSize = 1024;
   ds_config->mutable_for_testing()->set_seed(kRandomSeed);
-  ds_config->mutable_for_testing()->set_message_count(kEventCount);
-  ds_config->mutable_for_testing()->set_message_size(kMessageSizeBytes);
+  ds_config->mutable_for_testing()->set_message_count(kNumPackets);
+  ds_config->mutable_for_testing()->set_message_size(kMsgSize);
 
-  // Create the random generator with the same seed.
-  std::minstd_rand0 rnd_engine(kRandomSeed);
+  helper.StartTracing(trace_config);
 
-  // Create the function to handle packets as they come in.
-  uint64_t total = 0;
-  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  auto on_consumer_data = [&total, &on_readback_complete, &rnd_engine](
-                              std::vector<TracePacket> packets, bool has_more) {
-    for (auto& packet : packets) {
-      ASSERT_TRUE(packet.Decode());
-      ASSERT_TRUE(packet->has_for_testing() || packet->has_clock_snapshot() ||
-                  packet->has_trace_config());
-      if (packet->has_clock_snapshot() || packet->has_trace_config())
-        continue;
-      ASSERT_EQ(protos::TracePacket::kTrustedUid,
-                packet->optional_trusted_uid_case());
-      ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
-    }
-    total += packets.size();
-
-    if (!has_more) {
-      // One extra packet for the clock snapshot and another for the trace
-      // config.
-      ASSERT_EQ(total, kEventCount + 2);
-      on_readback_complete();
-    }
-  };
-
-  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
-  FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(on_consumer_data), &task_runner);
-
-  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
-  task_runner.RunUntilCheckpoint("consumer.connected");
-
-  consumer.EnableTracing();
-  task_runner.RunUntilCheckpoint("producer.enabled");
-
-  auto on_produced_and_committed =
-      task_runner.CreateCheckpoint("produced.and.committed");
-  auto posted_on_produced_and_committed = [&task_runner,
-                                           &on_produced_and_committed] {
-    task_runner.PostTask(on_produced_and_committed);
-  };
-  FakeProducer* producer = producer_delegate_cached->producer();
-  producer->ProduceEventBatch(posted_on_produced_and_committed);
+  producer->ProduceEventBatch(
+      helper.WrapTask(task_runner.CreateCheckpoint("produced.and.committed")));
   task_runner.RunUntilCheckpoint("produced.and.committed");
 
-  consumer.ReadTraceData();
+  size_t packets_seen = 0;
+  std::minstd_rand0 rnd_engine(kRandomSeed);
+  auto on_consumer_data = [&packets_seen, &rnd_engine](
+                              const TracePacket::DecodedTracePacket& packet) {
+    ASSERT_TRUE(packet.has_for_testing());
+    ASSERT_EQ(packet.for_testing().seq_value(), rnd_engine());
+    packets_seen++;
+  };
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  helper.ReadData(on_consumer_data, on_readback_complete);
   task_runner.RunUntilCheckpoint("readback.complete");
-
-  consumer.Disconnect();
+  ASSERT_EQ(packets_seen, kNumPackets);
 }
 
 TEST(PerfettoTest, VeryLargePackets) {
   base::TestTaskRunner task_runner;
 
-#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  TaskRunnerThread service_thread("perfetto.svc");
-  service_thread.Start(std::unique_ptr<ServiceDelegate>(
-      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
-#endif
+  TestHelper helper(&task_runner);
+  helper.StartServiceIfRequired();
 
-  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
-  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
-    task_runner.PostTask(on_producer_enabled);
-  };
-  TaskRunnerThread producer_thread("perfetto.prd");
-  std::unique_ptr<FakeProducerDelegate> producer_delegate(
-      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
-                               posted_on_producer_enabled));
-  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
-  producer_thread.Start(std::move(producer_delegate));
+  FakeProducer* producer = helper.ConnectFakeProducer();
+  helper.ConnectConsumer();
 
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(4096 * 10);
 
-  static constexpr int kNumPackets = 5;
-  static constexpr uint32_t kRandomSeed = 42;
-  static constexpr uint32_t kMsgSize = 1024 * 1024 - 42;
-  std::minstd_rand0 rnd_engine(kRandomSeed);
-
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
   ds_config->set_name("android.perfetto.FakeProducer");
   ds_config->set_target_buffer(0);
+
+  static constexpr size_t kNumPackets = 5;
+  static constexpr uint32_t kRandomSeed = 42;
+  static constexpr uint32_t kMsgSize = 1024 * 1024 - 42;
   ds_config->mutable_for_testing()->set_seed(kRandomSeed);
   ds_config->mutable_for_testing()->set_message_count(kNumPackets);
   ds_config->mutable_for_testing()->set_message_size(kMsgSize);
 
-  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  int packets_seen = 0;
-  auto on_consumer_data = [&on_readback_complete, &rnd_engine, &packets_seen](
-                              std::vector<TracePacket> packets, bool has_more) {
-    for (auto& packet : packets) {
-      ASSERT_TRUE(packet.Decode());
-      if (!packet->has_for_testing())
-        continue;
-      packets_seen++;
-      ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
-      size_t msg_size = packet->for_testing().str().size();
-      ASSERT_EQ(kMsgSize, msg_size);
-      for (size_t i = 0; i < msg_size; i++)
-        ASSERT_EQ(i < msg_size - 1 ? '.' : 0, packet->for_testing().str()[i]);
-    }
+  helper.StartTracing(trace_config);
 
-    if (!has_more)
-      on_readback_complete();
-  };
-
-  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
-  FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(on_consumer_data), &task_runner);
-
-  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
-  task_runner.RunUntilCheckpoint("consumer.connected");
-
-  consumer.EnableTracing();
-  task_runner.RunUntilCheckpoint("producer.enabled");
-
-  auto on_produced_and_committed =
-      task_runner.CreateCheckpoint("produced.and.committed");
-  auto posted_on_produced_and_committed = [&task_runner,
-                                           &on_produced_and_committed] {
-    task_runner.PostTask(on_produced_and_committed);
-  };
-  FakeProducer* producer = producer_delegate_cached->producer();
-  producer->ProduceEventBatch(posted_on_produced_and_committed);
+  producer->ProduceEventBatch(
+      helper.WrapTask(task_runner.CreateCheckpoint("produced.and.committed")));
   task_runner.RunUntilCheckpoint("produced.and.committed");
 
-  consumer.ReadTraceData();
+  size_t packets_seen = 0;
+  std::minstd_rand0 rnd_engine(kRandomSeed);
+  auto on_consumer_data = [&packets_seen, &rnd_engine](
+                              const TracePacket::DecodedTracePacket& packet) {
+    ASSERT_TRUE(packet.has_for_testing());
+    ASSERT_EQ(packet.for_testing().seq_value(), rnd_engine());
+    size_t msg_size = packet.for_testing().str().size();
+    ASSERT_EQ(kMsgSize, msg_size);
+    for (size_t i = 0; i < msg_size; i++)
+      ASSERT_EQ(i < msg_size - 1 ? '.' : 0, packet.for_testing().str()[i]);
+    packets_seen++;
+  };
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  helper.ReadData(on_consumer_data, on_readback_complete);
   task_runner.RunUntilCheckpoint("readback.complete");
-  ASSERT_EQ(kNumPackets, packets_seen);
-
-  consumer.Disconnect();
+  ASSERT_EQ(packets_seen, kNumPackets);
 }
 
 }  // namespace perfetto
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index bf862e7..521fe86 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -25,6 +25,7 @@
 #include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace_packet.pb.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
+#include "perfetto/traced/traced.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/data_source_descriptor.h"
 #include "perfetto/tracing/core/producer.h"
@@ -32,15 +33,21 @@
 #include "perfetto/tracing/ipc/producer_ipc_client.h"
 #include "perfetto/tracing/ipc/service_ipc_host.h"
 #include "src/base/test/test_task_runner.h"
-#include "test/fake_consumer.h"
 #include "test/task_runner_thread.h"
 #include "test/task_runner_thread_delegates.h"
+#include "test/test_helper.h"
 
 namespace perfetto {
 namespace shm_fuzz {
 
-static const char* kProducerSocket = tempnam("/tmp", "perfetto-producer");
-static const char* kConsumerSocket = tempnam("/tmp", "perfetto-consumer");
+// If we're building on Android and starting the daemons ourselves,
+// create the sockets in a world-writable location.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
+    PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+#define TEST_PRODUCER_SOCK_NAME "/data/local/tmp/traced_producer"
+#else
+#define TEST_PRODUCER_SOCK_NAME PERFETTO_PRODUCER_SOCK_NAME
+#endif
 
 // Fake producer writing a protozero message of data into shared memory
 // buffer, followed by a sentinel message to signal completion to the
@@ -112,7 +119,7 @@
   void Initialize(base::TaskRunner* task_runner) override {
     producer_.reset(new FakeProducer("android.perfetto.FakeProducer", data_,
                                      size_, on_produced_and_committed_));
-    producer_->Connect(kProducerSocket, task_runner);
+    producer_->Connect(TEST_PRODUCER_SOCK_NAME, task_runner);
   }
 
  private:
@@ -127,9 +134,8 @@
 int FuzzSharedMemory(const uint8_t* data, size_t size) {
   base::TestTaskRunner task_runner;
 
-  TaskRunnerThread service_thread("perfetto.svc");
-  service_thread.Start(std::unique_ptr<ServiceDelegate>(
-      new ServiceDelegate(kProducerSocket, kConsumerSocket)));
+  TestHelper helper(&task_runner);
+  helper.StartServiceIfRequired();
 
   auto on_produced_and_committed =
       task_runner.CreateCheckpoint("produced.and.committed");
@@ -141,40 +147,29 @@
   producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
       new FakeProducerDelegate(data, size, posted_on_produced_and_committed)));
 
-  // Setup the TraceConfig for the consumer.
+  helper.ConnectConsumer();
+
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(8);
 
-  // Create the buffer for the fake producer.
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
   ds_config->set_name("android.perfetto.FakeProducer");
   ds_config->set_target_buffer(0);
 
-  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  auto on_consumer_data = [&on_readback_complete](
-                              std::vector<TracePacket> packets, bool has_more) {
-    for (auto& p : packets) {
-      p.Decode();
-      if (p->for_testing().str() == "end")
-        on_readback_complete();
-    }
-  };
-
-  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
-  FakeConsumer consumer(trace_config, std::move(on_connect),
-                        std::move(on_consumer_data), &task_runner);
-
-  consumer.Connect(kConsumerSocket);
-  task_runner.RunUntilCheckpoint("consumer.connected");
-
-  consumer.EnableTracing();
+  auto producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+  task_runner.PostTask(producer_enabled);
+  helper.StartTracing(trace_config);
   task_runner.RunUntilCheckpoint("produced.and.committed");
 
-  consumer.ReadTraceData();
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  auto on_consumer_data =
+      [&on_readback_complete](const TracePacket::DecodedTracePacket& packet) {
+        if (packet.for_testing().str() == "end")
+          on_readback_complete();
+      };
+  helper.ReadData(on_consumer_data, [] {});
   task_runner.RunUntilCheckpoint("readback.complete");
 
-  consumer.Disconnect();
-
   return 0;
 }
 
diff --git a/test/fake_consumer.cc b/test/fake_consumer.cc
deleted file mode 100644
index d1837c7..0000000
--- a/test/fake_consumer.cc
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "test/fake_consumer.h"
-
-#include <utility>
-#include <vector>
-
-#include "gtest/gtest.h"
-#include "perfetto/base/logging.h"
-#include "perfetto/trace/test_event.pbzero.h"
-#include "perfetto/trace/trace_packet.pbzero.h"
-#include "perfetto/traced/traced.h"
-#include "perfetto/tracing/core/trace_packet.h"
-#include "perfetto/tracing/core/trace_writer.h"
-
-namespace perfetto {
-
-FakeConsumer::FakeConsumer(
-    const TraceConfig& trace_config,
-    std::function<void()> on_connect,
-    std::function<void(std::vector<TracePacket>, bool)> packet_callback,
-    base::TaskRunner* task_runner)
-    : task_runner_(task_runner),
-      trace_config_(trace_config),
-      on_connect_(on_connect),
-      packet_callback_(std::move(packet_callback)) {}
-FakeConsumer::~FakeConsumer() = default;
-
-void FakeConsumer::Connect(const char* socket_name) {
-  endpoint_ = ConsumerIPCClient::Connect(socket_name, this, task_runner_);
-}
-
-void FakeConsumer::Disconnect() {
-  endpoint_.reset();
-}
-
-void FakeConsumer::OnConnect() {
-  on_connect_();
-}
-
-void FakeConsumer::EnableTracing() {
-  endpoint_->EnableTracing(trace_config_);
-}
-
-void FakeConsumer::FreeBuffers() {
-  endpoint_->FreeBuffers();
-}
-
-void FakeConsumer::ReadTraceData() {
-  endpoint_->ReadBuffers();
-}
-
-void FakeConsumer::OnDisconnect() {
-  FAIL() << "Consumer unexpectedly disconnected from the service";
-}
-
-void FakeConsumer::OnTracingStop() {}
-
-void FakeConsumer::OnTraceData(std::vector<TracePacket> data, bool has_more) {
-  packet_callback_(std::move(data), has_more);
-}
-
-}  // namespace perfetto
diff --git a/test/fake_consumer.h b/test/fake_consumer.h
deleted file mode 100644
index d36dbba..0000000
--- a/test/fake_consumer.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef TEST_FAKE_CONSUMER_H_
-#define TEST_FAKE_CONSUMER_H_
-
-#include <memory>
-#include <vector>
-
-#include "perfetto/tracing/core/consumer.h"
-#include "perfetto/tracing/core/trace_config.h"
-#include "perfetto/tracing/core/trace_packet.h"
-#include "perfetto/tracing/ipc/consumer_ipc_client.h"
-
-#include "src/base/test/test_task_runner.h"
-
-namespace perfetto {
-
-class FakeConsumer : public Consumer {
- public:
-  FakeConsumer(
-      const TraceConfig& trace_config,
-      std::function<void()> on_connect,
-      std::function<void(std::vector<TracePacket>, bool)> packet_callback,
-      base::TaskRunner* task_runner);
-  ~FakeConsumer() override;
-
-  void EnableTracing();
-  void FreeBuffers();
-  void Connect(const char* socket_name);
-  void Disconnect();
-  void ReadTraceData();
-
-  // Consumer implementation.
-  void OnConnect() override;
-  void OnDisconnect() override;
-  void OnTracingStop() override;
-  void OnTraceData(std::vector<TracePacket> packets, bool has_more) override;
-
- private:
-  base::TaskRunner* const task_runner_;
-  const TraceConfig trace_config_;
-  std::function<void()> on_connect_;
-  std::function<void(std::vector<TracePacket>, bool)> packet_callback_;
-  std::unique_ptr<Service::ConsumerEndpoint> endpoint_;  // Keep last.
-};
-
-}  // namespace perfetto
-
-#endif  // TEST_FAKE_CONSUMER_H_
diff --git a/test/test_helper.cc b/test/test_helper.cc
new file mode 100644
index 0000000..24f814b
--- /dev/null
+++ b/test/test_helper.cc
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "test/test_helper.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/trace/trace_packet.pb.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+#include "perfetto/traced/traced.h"
+#include "perfetto/tracing/core/trace_packet.h"
+#include "test/task_runner_thread_delegates.h"
+
+namespace perfetto {
+
+// If we're building on Android and starting the daemons ourselves,
+// create the sockets in a world-writable location.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
+    PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+#define TEST_PRODUCER_SOCK_NAME "/data/local/tmp/traced_producer"
+#define TEST_CONSUMER_SOCK_NAME "/data/local/tmp/traced_consumer"
+#else
+#define TEST_PRODUCER_SOCK_NAME PERFETTO_PRODUCER_SOCK_NAME
+#define TEST_CONSUMER_SOCK_NAME PERFETTO_CONSUMER_SOCK_NAME
+#endif
+
+TestHelper::TestHelper(base::TestTaskRunner* task_runner)
+    : task_runner_(task_runner),
+      service_thread_("perfetto.svc"),
+      producer_thread_("perfetto.prd") {}
+
+void TestHelper::OnConnect() {
+  std::move(continuation_callack_)();
+}
+
+void TestHelper::OnDisconnect() {
+  FAIL() << "Consumer unexpectedly disconnected from the service";
+}
+
+void TestHelper::OnTracingStop() {}
+
+void TestHelper::OnTraceData(std::vector<TracePacket> packets, bool has_more) {
+  for (auto& packet : packets) {
+    ASSERT_TRUE(packet.Decode());
+    if (packet->has_clock_snapshot() || packet->has_trace_config())
+      continue;
+    ASSERT_EQ(protos::TracePacket::kTrustedUid,
+              packet->optional_trusted_uid_case());
+    packet_callback_(*packet);
+  }
+
+  if (!has_more) {
+    packet_callback_ = {};
+    std::move(continuation_callack_)();
+  }
+}
+
+void TestHelper::StartServiceIfRequired() {
+#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+  service_thread_.Start(std::unique_ptr<ServiceDelegate>(
+      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
+#endif
+}
+
+FakeProducer* TestHelper::ConnectFakeProducer() {
+  std::unique_ptr<FakeProducerDelegate> producer_delegate(
+      new FakeProducerDelegate(
+          TEST_PRODUCER_SOCK_NAME,
+          WrapTask(task_runner_->CreateCheckpoint("producer.enabled"))));
+  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
+  producer_thread_.Start(std::move(producer_delegate));
+  return producer_delegate_cached->producer();
+}
+
+void TestHelper::ConnectConsumer() {
+  continuation_callack_ = task_runner_->CreateCheckpoint("consumer.connected");
+  endpoint_ =
+      ConsumerIPCClient::Connect(TEST_CONSUMER_SOCK_NAME, this, task_runner_);
+  task_runner_->RunUntilCheckpoint("consumer.connected");
+}
+
+void TestHelper::StartTracing(const TraceConfig& config) {
+  endpoint_->EnableTracing(config);
+  task_runner_->RunUntilCheckpoint("producer.enabled");
+}
+
+void TestHelper::ReadData(
+    std::function<void(const TracePacket::DecodedTracePacket&)> packet_callback,
+    std::function<void()> on_finish_callback) {
+  packet_callback_ = packet_callback;
+  continuation_callack_ = on_finish_callback;
+  endpoint_->ReadBuffers();
+}
+
+std::function<void()> TestHelper::WrapTask(
+    const std::function<void()>& function) {
+  return [this, function] { task_runner_->PostTask(function); };
+}
+
+}  // namespace perfetto
diff --git a/test/test_helper.h b/test/test_helper.h
new file mode 100644
index 0000000..87a9be5
--- /dev/null
+++ b/test/test_helper.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TEST_TEST_HELPER_H_
+#define TEST_TEST_HELPER_H_
+
+#include "perfetto/tracing/core/consumer.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
+#include "perfetto/tracing/ipc/consumer_ipc_client.h"
+#include "src/base/test/test_task_runner.h"
+#include "test/fake_producer.h"
+#include "test/task_runner_thread.h"
+
+namespace perfetto {
+
+class TestHelper : public Consumer {
+ public:
+  explicit TestHelper(base::TestTaskRunner* task_runner);
+
+  // Consumer implementation.
+  void OnConnect() override;
+  void OnDisconnect() override;
+  void OnTracingStop() override;
+  void OnTraceData(std::vector<TracePacket> packets, bool has_more) override;
+
+  void StartServiceIfRequired();
+  FakeProducer* ConnectFakeProducer();
+  void ConnectConsumer();
+  void StartTracing(const TraceConfig& config);
+  void ReadData(std::function<void(const TracePacket::DecodedTracePacket&)>
+                    packet_callback,
+                std::function<void()> on_finish_callback);
+
+  std::function<void()> WrapTask(const std::function<void()>& function);
+
+  TaskRunnerThread* service_thread() { return &service_thread_; }
+  TaskRunnerThread* producer_thread() { return &producer_thread_; }
+
+ private:
+  base::TestTaskRunner* task_runner_ = nullptr;
+
+  std::function<void(const TracePacket::DecodedTracePacket&)> packet_callback_;
+  std::function<void()> continuation_callack_;
+
+  TaskRunnerThread service_thread_;
+  TaskRunnerThread producer_thread_;
+  std::unique_ptr<Service::ConsumerEndpoint> endpoint_;  // Keep last.
+};
+
+}  // namespace perfetto
+
+#endif  // TEST_TEST_HELPER_H_