perfetto: add end to end benchmarking

Bug: 74380167
Change-Id: I448d72492e9a2aaa1123188f38b79492d4be6851
diff --git a/.travis.yml b/.travis.yml
index 4775766..3e89e9e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -135,12 +135,12 @@
     TEST_TARGETS="
     perfetto_integrationtests
     perfetto_unittests
-    perfetto_benchmarks
     "
     if [[ "$CFG" != android-* ]]; then
       for TEST_TARGET in $TEST_TARGETS; do
         "out/dist/$TEST_TARGET"
       done
+      bash -c "out/dist/perfetto_benchmarks --benchmark_filter=\"(\$(out/dist/perfetto_benchmarks  --benchmark_list_tests | sed \"/BM_EndToEnd\/.....*\//d\" | xargs | tr \" \" \"|\"))\""
       if [[ "$CFG" == *-libfuzzer ]]; then
         # Run a single iteration each to make sure they are not crashing.
         out/dist/end_to_end_shared_memory_fuzzer -runs=1
@@ -152,6 +152,7 @@
       for TEST_TARGET in $TEST_TARGETS; do
         tools/run_android_test out/dist "$TEST_TARGET"
       done
+      tools/run_android_test out/dist "perfetto_benchmarks" "--benchmark_filter=\"(\$(perfetto_benchmarks  --benchmark_list_tests | sed \"/BM_EndToEnd\/.....*\//d\" | xargs | tr \" \" \"|\"))\""
     fi
 
 after_script:
diff --git a/BUILD.gn b/BUILD.gn
index 959ec28..c22a7c0 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -85,6 +85,7 @@
       "src/ftrace_reader:ftrace_reader_benchmarks",
       "src/tracing:tracing_benchmarks",
       "test:benchmark_main",
+      "test:end_to_end_benchmarks",
     ]
   }
 
diff --git a/protos/perfetto/trace/test_event.proto b/protos/perfetto/trace/test_event.proto
index bd0fcf0..769b6b0 100644
--- a/protos/perfetto/trace/test_event.proto
+++ b/protos/perfetto/trace/test_event.proto
@@ -21,11 +21,9 @@
 
 // Event used by testing code.
 message TestEvent {
-  oneof data {
-    // Arbitary string used in tests.
-    string str = 1;
+  // Arbitary string used in tests.
+  optional string str = 1;
 
-    // The current value of the random number sequence used in tests.
-    uint32 seq_value = 2;
-  }
+  // The current value of the random number sequence used in tests.
+  optional uint32 seq_value = 2;
 }
diff --git a/src/base/test/test_task_runner.cc b/src/base/test/test_task_runner.cc
index 065f0a1..e36bb5c 100644
--- a/src/base/test/test_task_runner.cc
+++ b/src/base/test/test_task_runner.cc
@@ -82,6 +82,7 @@
   PERFETTO_DCHECK(checkpoints_.count(checkpoint) == 0);
   auto checkpoint_iter = checkpoints_.emplace(checkpoint, false);
   return [this, checkpoint_iter] {
+    PERFETTO_DCHECK_THREAD(thread_checker_);
     checkpoint_iter.first->second = true;
     if (pending_checkpoint_ == checkpoint_iter.first->first) {
       pending_checkpoint_.clear();
diff --git a/test/BUILD.gn b/test/BUILD.gn
index 587cc12..7e27608 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -54,6 +54,7 @@
     deps = [
       ":fake_consumer",
       ":task_runner_thread",
+      ":task_runner_thread_delegates",
       "../gn:default_deps",
       "../src/base:test_support",
       "../src/protozero",
@@ -109,6 +110,30 @@
 }
 
 if (!build_with_chromium) {
+  source_set("end_to_end_benchmarks") {
+    testonly = true
+    deps = [
+      ":fake_consumer",
+      ":task_runner_thread",
+      ":task_runner_thread_delegates",
+      "../../gn:default_deps",
+      "../gn:gtest_deps",
+      "../protos/perfetto/trace:lite",
+      "../protos/perfetto/trace:zero",
+      "../src/base:test_support",
+      "//buildtools:benchmark",
+    ]
+    sources = [
+      "end_to_end_benchmark.cc",
+    ]
+    if (is_android && !build_with_chromium) {
+      deps += [ "../src/base:android_task_runner" ]
+    }
+    if (start_daemons_for_testing) {
+      cflags = [ "-DPERFETTO_START_DAEMONS_FOR_TESTING" ]
+    }
+  }
+
   source_set("benchmark_main") {
     testonly = true
     deps = [
diff --git a/test/cts/end_to_end_integrationtest_cts.cc b/test/cts/end_to_end_integrationtest_cts.cc
index 2bf1d26..212b877 100644
--- a/test/cts/end_to_end_integrationtest_cts.cc
+++ b/test/cts/end_to_end_integrationtest_cts.cc
@@ -69,22 +69,27 @@
       total += packets.size();
 
       if (!has_more) {
-        ASSERT_EQ(total, kEventCount);
+        ASSERT_EQ(total, kEventCount + 1);
         finish();
       }
     };
 
     // Finally, make the consumer connect to the service.
-    FakeConsumer consumer(trace_config, std::move(function), &task_runner);
+    auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+    FakeConsumer consumer(trace_config, std::move(on_connect),
+                          std::move(function), &task_runner);
     consumer.Connect(PERFETTO_CONSUMER_SOCK_NAME);
 
+    task_runner.RunUntilCheckpoint("consumer.connected");
+    consumer.EnableTracing();
+
     // TODO(skyostil): There's a race here before the service processes our data
     // and the consumer tries to retrieve it. For now wait a bit until the
     // service is done, but we should add explicit flushing to avoid this.
     task_runner.PostDelayedTask([&consumer]() { consumer.ReadTraceData(); },
-                                5000);
+                                1000);
 
-    task_runner.RunUntilCheckpoint("no.more.packets", 10000);
+    task_runner.RunUntilCheckpoint("no.more.packets");
   }
 };
 
diff --git a/test/cts/producer/jni/fake_producer_jni.cc b/test/cts/producer/jni/fake_producer_jni.cc
index 3632c61..2341a63 100644
--- a/test/cts/producer/jni/fake_producer_jni.cc
+++ b/test/cts/producer/jni/fake_producer_jni.cc
@@ -27,7 +27,8 @@
 void ListenAndRespond(const std::string& name) {
   base::TestTaskRunner task_runner;
   FakeProducer producer(name);
-  producer.Connect(PERFETTO_PRODUCER_SOCK_NAME, &task_runner, []() {});
+  producer.Connect(PERFETTO_PRODUCER_SOCK_NAME, &task_runner,
+                   [&producer]() { producer.ProduceEventBatch([] {}); });
   task_runner.Run();
 }
 }  // namespace
diff --git a/test/end_to_end_benchmark.cc b/test/end_to_end_benchmark.cc
new file mode 100644
index 0000000..166eadc
--- /dev/null
+++ b/test/end_to_end_benchmark.cc
@@ -0,0 +1,152 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gtest/gtest.h>
+#include <random>
+
+#include "benchmark/benchmark.h"
+#include "perfetto/base/time.h"
+#include "perfetto/trace/trace_packet.pb.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+#include "perfetto/traced/traced.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
+#include "src/base/test/test_task_runner.h"
+#include "test/fake_consumer.h"
+#include "test/task_runner_thread.h"
+#include "test/task_runner_thread_delegates.h"
+
+namespace perfetto {
+
+// If we're building on Android and starting the daemons ourselves,
+// create the sockets in a world-writable location.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
+    PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+#define TEST_PRODUCER_SOCK_NAME "/data/local/tmp/traced_producer"
+#define TEST_CONSUMER_SOCK_NAME "/data/local/tmp/traced_consumer"
+#else
+#define TEST_PRODUCER_SOCK_NAME PERFETTO_PRODUCER_SOCK_NAME
+#define TEST_CONSUMER_SOCK_NAME PERFETTO_CONSUMER_SOCK_NAME
+#endif
+
+static void BM_EndToEnd(benchmark::State& state) {
+  base::TestTaskRunner task_runner;
+
+  // Setup the TraceConfig for the consumer.
+  TraceConfig trace_config;
+
+  // TODO(lalitm): the buffer size should be a function of the benchmark.
+  trace_config.add_buffers()->set_size_kb(512);
+
+  // Create the buffer for ftrace.
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("android.perfetto.FakeProducer");
+  ds_config->set_target_buffer(0);
+
+  // The parameters for the producer.
+  static constexpr uint32_t kRandomSeed = 42;
+  uint32_t message_count = state.range(0);
+
+  // Setup the test to use a random number generator.
+  ds_config->mutable_for_testing()->set_seed(kRandomSeed);
+  ds_config->mutable_for_testing()->set_message_count(message_count);
+
+#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+  TaskRunnerThread service_thread("perfetto.svc");
+  service_thread.Start(std::unique_ptr<ServiceDelegate>(
+      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
+#endif
+
+  TaskRunnerThread producer_thread("perfetto.prd");
+  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
+    task_runner.PostTask(on_producer_enabled);
+  };
+  std::unique_ptr<FakeProducerDelegate> producer_delegate(
+      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
+                               posted_on_producer_enabled));
+  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
+  producer_thread.Start(std::move(producer_delegate));
+
+  bool is_first_packet = true;
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  std::minstd_rand0 rnd_engine(kRandomSeed);
+  auto on_consumer_data = [&is_first_packet, &on_readback_complete,
+                           &rnd_engine](std::vector<TracePacket> packets,
+                                        bool has_more) {
+    for (auto& packet : packets) {
+      ASSERT_TRUE(packet.Decode());
+      ASSERT_TRUE(packet->has_for_testing() || packet->has_clock_snapshot());
+      if (packet->has_clock_snapshot()) {
+        continue;
+      }
+      ASSERT_EQ(protos::TracePacket::kTrustedUid,
+                packet->optional_trusted_uid_case());
+      if (is_first_packet) {
+        rnd_engine = std::minstd_rand0(packet->for_testing().seq_value());
+        is_first_packet = false;
+      } else {
+        ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
+      }
+    }
+
+    if (!has_more) {
+      is_first_packet = true;
+      on_readback_complete();
+    }
+  };
+
+  // Finally, make the consumer connect to the service.
+  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+  FakeConsumer consumer(trace_config, std::move(on_connect),
+                        std::move(on_consumer_data), &task_runner);
+  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
+  task_runner.RunUntilCheckpoint("consumer.connected");
+
+  consumer.EnableTracing();
+  task_runner.RunUntilCheckpoint("producer.enabled");
+
+  uint64_t wall_start_ns = base::GetWallTimeNs().count();
+  uint64_t thread_start_ns = service_thread.GetThreadCPUTimeNs();
+  while (state.KeepRunning()) {
+    auto cname = "produced.and.committed." + std::to_string(state.iterations());
+    auto on_produced_and_committed = task_runner.CreateCheckpoint(cname);
+    auto posted_on_produced_and_committed = [&task_runner,
+                                             &on_produced_and_committed] {
+      task_runner.PostTask(on_produced_and_committed);
+    };
+    FakeProducer* producer = producer_delegate_cached->producer();
+    producer->ProduceEventBatch(posted_on_produced_and_committed);
+    task_runner.RunUntilCheckpoint(cname);
+  }
+  uint64_t thread_ns = service_thread.GetThreadCPUTimeNs() - thread_start_ns;
+  uint64_t wall_ns = base::GetWallTimeNs().count() - wall_start_ns;
+  PERFETTO_ILOG("Service CPU usage: %.2f,  CPU/iterations: %lf",
+                100.0 * thread_ns / wall_ns, 1.0 * thread_ns / message_count);
+
+  // Read back the buffer just to check correctness.
+  consumer.ReadTraceData();
+  task_runner.RunUntilCheckpoint("readback.complete");
+  state.SetBytesProcessed(int64_t(state.iterations()) *
+                          (sizeof(uint32_t) + 1024) * message_count);
+
+  consumer.Disconnect();
+}
+
+BENCHMARK(BM_EndToEnd)
+    ->Unit(benchmark::kMicrosecond)
+    ->UseRealTime()
+    ->RangeMultiplier(2)
+    ->Range(16, 1024 * 1024);
+}
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index 5183dd1..98a3b69 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -14,7 +14,6 @@
  * limitations under the License.
  */
 
-#include <gtest/gtest.h>
 #include <unistd.h>
 #include <chrono>
 #include <condition_variable>
@@ -22,6 +21,7 @@
 #include <random>
 #include <thread>
 
+#include "gtest/gtest.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/trace/trace_packet.pb.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
@@ -73,12 +73,11 @@
 #endif
 TEST_F(PerfettoTest, MAYBE_TestFtraceProducer) {
   base::TestTaskRunner task_runner;
-  auto finish = task_runner.CreateCheckpoint("no.more.packets");
 
   // Setip the TraceConfig for the consumer.
   TraceConfig trace_config;
-  trace_config.add_buffers()->set_size_kb(4096 * 10);
-  trace_config.set_duration_ms(10000);
+  trace_config.add_buffers()->set_size_kb(1024);
+  trace_config.set_duration_ms(3000);
 
   // Create the buffer for ftrace.
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
@@ -92,8 +91,9 @@
 
   // Create the function to handle packets as they come in.
   uint64_t total = 0;
-  auto function = [&total, &finish](std::vector<TracePacket> packets,
-                                    bool has_more) {
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  auto function = [&total, &on_readback_complete](
+                      std::vector<TracePacket> packets, bool has_more) {
     for (auto& packet : packets) {
       ASSERT_TRUE(packet.Decode());
       ASSERT_TRUE(packet->has_ftrace_events() || packet->has_clock_snapshot());
@@ -107,47 +107,41 @@
 
     if (!has_more) {
       ASSERT_GE(total, static_cast<uint64_t>(sysconf(_SC_NPROCESSORS_CONF)));
-      finish();
+      on_readback_complete();
     }
   };
 
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  TaskRunnerThread service_thread;
+  TaskRunnerThread service_thread("perfetto.svc");
   service_thread.Start(std::unique_ptr<ServiceDelegate>(
       new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
 
-  TaskRunnerThread producer_thread;
+  TaskRunnerThread producer_thread("perfetto.prd");
   producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
       new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
 #endif
 
   // Finally, make the consumer connect to the service.
-  FakeConsumer consumer(trace_config, std::move(function), &task_runner);
+  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+  FakeConsumer consumer(trace_config, std::move(on_connect),
+                        std::move(function), &task_runner);
   consumer.Connect(TEST_CONSUMER_SOCK_NAME);
 
-  // TODO(skyostil): There's a race here before the service processes our data
-  // and the consumer tries to retrieve it. For now wait a bit until the service
-  // is done, but we should add explicit flushing to avoid this.
-  task_runner.PostDelayedTask([&consumer]() { consumer.ReadTraceData(); },
-                              13000);
+  task_runner.RunUntilCheckpoint("consumer.connected");
+  consumer.EnableTracing();
 
-  task_runner.RunUntilCheckpoint("no.more.packets", 20000);
+  // Traced probes should flush data as it produces it.
+  task_runner.PostDelayedTask([&consumer] { consumer.ReadTraceData(); }, 3000);
+
+  task_runner.RunUntilCheckpoint("readback.complete", 10000);
 }
 
-// TODO(b/73453011): reenable this on more platforms (including standalone
-// Android).
-#if defined(PERFETTO_BUILD_WITH_ANDROID)
-#define MAYBE_TestFakeProducer TestFakeProducer
-#else
-#define MAYBE_TestFakeProducer DISABLED_TestFakeProducer
-#endif
-TEST_F(PerfettoTest, MAYBE_TestFakeProducer) {
+TEST_F(PerfettoTest, TestFakeProducer) {
   base::TestTaskRunner task_runner;
-  auto finish = task_runner.CreateCheckpoint("no.more.packets");
 
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
-  trace_config.add_buffers()->set_size_kb(4096 * 10);
+  trace_config.add_buffers()->set_size_kb(1024);
   trace_config.set_duration_ms(200);
 
   // Create the buffer for ftrace.
@@ -168,8 +162,9 @@
 
   // Create the function to handle packets as they come in.
   uint64_t total = 0;
-  auto function = [&total, &finish, &random](std::vector<TracePacket> packets,
-                                             bool has_more) {
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  auto function = [&total, &on_readback_complete, &random](
+                      std::vector<TracePacket> packets, bool has_more) {
 
     for (auto& packet : packets) {
       ASSERT_TRUE(packet.Decode());
@@ -185,32 +180,51 @@
     if (!has_more) {
       // One extra packet for the clock snapshot.
       ASSERT_EQ(total, kEventCount + 1);
-      finish();
+      on_readback_complete();
     }
   };
 
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  TaskRunnerThread service_thread;
+  TaskRunnerThread service_thread("perfetto.svc");
   service_thread.Start(std::unique_ptr<ServiceDelegate>(
       new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
 #endif
 
-  auto data_produced = task_runner.CreateCheckpoint("data.produced");
-  TaskRunnerThread producer_thread;
-  producer_thread.Start(
-      std::unique_ptr<FakeProducerDelegate>(new FakeProducerDelegate(
-          TEST_PRODUCER_SOCK_NAME, [&task_runner, &data_produced] {
-            task_runner.PostTask(data_produced);
-          })));
+  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
+    task_runner.PostTask(on_producer_enabled);
+  };
+  TaskRunnerThread producer_thread("perfetto.prd");
+  std::unique_ptr<FakeProducerDelegate> producer_delegate(
+      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
+                               posted_on_producer_enabled));
+  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
+  producer_thread.Start(std::move(producer_delegate));
 
   // Finally, make the consumer connect to the service.
-  FakeConsumer consumer(trace_config, std::move(function), &task_runner);
+  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+  FakeConsumer consumer(trace_config, std::move(on_connect),
+                        std::move(function), &task_runner);
   consumer.Connect(TEST_CONSUMER_SOCK_NAME);
+  task_runner.RunUntilCheckpoint("consumer.connected");
 
-  task_runner.RunUntilCheckpoint("data.produced");
+  consumer.EnableTracing();
+  task_runner.RunUntilCheckpoint("producer.enabled");
+
+  auto on_produced_and_committed =
+      task_runner.CreateCheckpoint("produced.and.committed");
+  auto posted_on_produced_and_committed = [&task_runner,
+                                           &on_produced_and_committed] {
+    task_runner.PostTask(on_produced_and_committed);
+  };
+  FakeProducer* producer = producer_delegate_cached->producer();
+  producer->ProduceEventBatch(posted_on_produced_and_committed);
+  task_runner.RunUntilCheckpoint("produced.and.committed");
+
   consumer.ReadTraceData();
+  task_runner.RunUntilCheckpoint("readback.complete");
 
-  task_runner.RunUntilCheckpoint("no.more.packets");
+  consumer.Disconnect();
 }
 
 }  // namespace perfetto
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 1b1fd2d..8f32b5f 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -34,6 +34,7 @@
 #include "src/base/test/test_task_runner.h"
 #include "test/fake_consumer.h"
 #include "test/task_runner_thread.h"
+#include "test/task_runner_thread_delegates.h"
 
 namespace perfetto {
 namespace shm_fuzz {
@@ -119,28 +120,12 @@
   FakeConsumer* consumer_;
 };
 
-class ServiceDelegate : public ThreadDelegate {
- public:
-  ServiceDelegate() = default;
-  ~ServiceDelegate() override = default;
-  void Initialize(base::TaskRunner* task_runner) override {
-    svc_ = ServiceIPCHost::CreateInstance(task_runner);
-    unlink(kProducerSocket);
-    unlink(kConsumerSocket);
-    svc_->Start(kProducerSocket, kConsumerSocket);
-  }
-
- private:
-  std::unique_ptr<ServiceIPCHost> svc_;
-  base::ScopedFile producer_fd_;
-  base::ScopedFile consumer_fd_;
-};
-
 int FuzzSharedMemory(const uint8_t* data, size_t size);
 
 int FuzzSharedMemory(const uint8_t* data, size_t size) {
-  TaskRunnerThread service_thread;
-  service_thread.Start(std::unique_ptr<ServiceDelegate>(new ServiceDelegate()));
+  TaskRunnerThread service_thread("perfetto.svc");
+  service_thread.Start(std::unique_ptr<ServiceDelegate>(
+      new ServiceDelegate(kProducerSocket, kConsumerSocket)));
 
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
@@ -162,10 +147,15 @@
         finish();
     }
   };
-  FakeConsumer consumer(trace_config, std::move(function), &task_runner);
+  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+  FakeConsumer consumer(trace_config, std::move(on_connect),
+                        std::move(function), &task_runner);
   consumer.Connect(kConsumerSocket);
+  task_runner.RunUntilCheckpoint("consumer.connected");
 
-  TaskRunnerThread producer_thread;
+  consumer.EnableTracing();
+
+  TaskRunnerThread producer_thread("perfetto.prd");
   producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
       new FakeProducerDelegate(data, size, &consumer)));
   task_runner.RunUntilCheckpoint("no.more.packets");
diff --git a/test/fake_consumer.cc b/test/fake_consumer.cc
index f8f63ab..e409cd0 100644
--- a/test/fake_consumer.cc
+++ b/test/fake_consumer.cc
@@ -16,10 +16,10 @@
 
 #include "test/fake_consumer.h"
 
-#include <gtest/gtest.h>
 #include <utility>
 #include <vector>
 
+#include "gtest/gtest.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
@@ -31,28 +31,41 @@
 
 FakeConsumer::FakeConsumer(
     const TraceConfig& trace_config,
+    std::function<void()> on_connect,
     std::function<void(std::vector<TracePacket>, bool)> packet_callback,
     base::TaskRunner* task_runner)
-    : packet_callback_(std::move(packet_callback)),
+    : task_runner_(task_runner),
       trace_config_(trace_config),
-      task_runner_(task_runner) {}
+      on_connect_(on_connect),
+      packet_callback_(std::move(packet_callback)) {}
 FakeConsumer::~FakeConsumer() = default;
 
 void FakeConsumer::Connect(const char* socket_name) {
   endpoint_ = ConsumerIPCClient::Connect(socket_name, this, task_runner_);
 }
 
+void FakeConsumer::Disconnect() {
+  endpoint_.reset();
+}
+
 void FakeConsumer::OnConnect() {
+  on_connect_();
+}
+
+void FakeConsumer::EnableTracing() {
   endpoint_->EnableTracing(trace_config_);
 }
 
+void FakeConsumer::FreeBuffers() {
+  endpoint_->FreeBuffers();
+}
+
 void FakeConsumer::ReadTraceData() {
-  endpoint_->DisableTracing();
   endpoint_->ReadBuffers();
 }
 
 void FakeConsumer::OnDisconnect() {
-  FAIL() << "Disconnected from service unexpectedly";
+  FAIL() << "Consumer unexpectedly disconnected from the service";
 }
 
 void FakeConsumer::OnTraceData(std::vector<TracePacket> data, bool has_more) {
diff --git a/test/fake_consumer.h b/test/fake_consumer.h
index eec29c6..38ae875 100644
--- a/test/fake_consumer.h
+++ b/test/fake_consumer.h
@@ -33,11 +33,15 @@
  public:
   FakeConsumer(
       const TraceConfig& trace_config,
+      std::function<void()> on_connect,
       std::function<void(std::vector<TracePacket>, bool)> packet_callback,
       base::TaskRunner* task_runner);
   ~FakeConsumer() override;
 
+  void EnableTracing();
+  void FreeBuffers();
   void Connect(const char* socket_name);
+  void Disconnect();
   void ReadTraceData();
   void BusyWaitReadBuffers();
 
@@ -47,10 +51,11 @@
   void OnTraceData(std::vector<TracePacket> packets, bool has_more) override;
 
  private:
-  std::function<void(std::vector<TracePacket>, bool)> packet_callback_;
-  std::unique_ptr<Service::ConsumerEndpoint> endpoint_;
-  const TraceConfig trace_config_;
   base::TaskRunner* const task_runner_;
+  const TraceConfig trace_config_;
+  std::function<void()> on_connect_;
+  std::function<void(std::vector<TracePacket>, bool)> packet_callback_;
+  std::unique_ptr<Service::ConsumerEndpoint> endpoint_;  // Keep last.
 };
 
 }  // namespace perfetto
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 73fc0ea..90d0990 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -16,13 +16,14 @@
 
 #include "test/fake_producer.h"
 
-#include <random>
+#include <condition_variable>
+#include <mutex>
 
+#include "gtest/gtest.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 #include "perfetto/traced/traced.h"
-#include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/core/trace_writer.h"
 
@@ -31,48 +32,60 @@
 FakeProducer::FakeProducer(const std::string& name) : name_(name) {}
 FakeProducer::~FakeProducer() = default;
 
-void FakeProducer::Connect(const char* socket_name,
-                           base::TaskRunner* task_runner,
-                           std::function<void()> data_produced_callback) {
+void FakeProducer::Connect(
+    const char* socket_name,
+    base::TaskRunner* task_runner,
+    std::function<void()> on_create_data_source_instance) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
   task_runner_ = task_runner;
-  data_produced_callback_ = std::move(data_produced_callback);
   endpoint_ = ProducerIPCClient::Connect(socket_name, this, task_runner);
+  on_create_data_source_instance_ = std::move(on_create_data_source_instance);
 }
 
 void FakeProducer::OnConnect() {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
   DataSourceDescriptor descriptor;
   descriptor.set_name(name_);
   endpoint_->RegisterDataSource(descriptor,
                                 [this](DataSourceID id) { id_ = id; });
 }
 
-void FakeProducer::OnDisconnect() {}
+void FakeProducer::OnDisconnect() {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  FAIL() << "Producer unexpectedly disconnected from the service";
+}
 
 void FakeProducer::CreateDataSourceInstance(
     DataSourceInstanceID,
     const DataSourceConfig& source_config) {
-  auto trace_writer = endpoint_->CreateTraceWriter(
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  trace_writer_ = endpoint_->CreateTraceWriter(
       static_cast<BufferID>(source_config.target_buffer()));
-
-  const TestConfig& config = source_config.for_testing();
-  std::minstd_rand0 random(config.seed());
-  for (size_t i = 0; i < config.message_count(); i++) {
-    auto handle = trace_writer->NewTracePacket();
-    handle->set_for_testing()->set_seq_value(random());
-    handle->Finalize();
-  }
-
-  // TODO(primiano): reenable this once UnregisterDataSource is specified in
-  // ServiceImpl.
-  // endpoint_->UnregisterDataSource(id_);
-
-  // TODO(skyostil): There's a race here before the service processes our data
-  // and the consumer tries to retrieve it. For now wait a bit until the service
-  // is done, but we should add explicit flushing to avoid this.
-  task_runner_->PostDelayedTask(data_produced_callback_, 1000);
+  rnd_engine_ = std::minstd_rand0(source_config.for_testing().seed());
+  message_count_ = source_config.for_testing().message_count();
+  task_runner_->PostTask(on_create_data_source_instance_);
 }
 
-void FakeProducer::TearDownDataSourceInstance(DataSourceInstanceID) {}
+void FakeProducer::TearDownDataSourceInstance(DataSourceInstanceID) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  trace_writer_.reset();
+}
+
+// Note: this will called on a different thread.
+void FakeProducer::ProduceEventBatch(std::function<void()> callback) {
+  task_runner_->PostTask([this, callback] {
+    PERFETTO_CHECK(trace_writer_);
+    char payload[1024];
+    memset(payload, '.', sizeof(payload));
+    payload[sizeof(payload) - 1] = 0;
+    for (size_t i = 0; i < message_count_; i++) {
+      auto handle = trace_writer_->NewTracePacket();
+      handle->set_for_testing()->set_seq_value(rnd_engine_());
+      handle->set_for_testing()->set_str(payload, sizeof(payload));
+    }
+    trace_writer_->Flush(callback);
+  });
+}
 
 void FakeProducer::OnTracingStart() {}
 
diff --git a/test/fake_producer.h b/test/fake_producer.h
index edc0643..8dd1d51 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -18,12 +18,14 @@
 #define TEST_FAKE_PRODUCER_H_
 
 #include <memory>
+#include <random>
 #include <string>
 
+#include "perfetto/base/thread_checker.h"
 #include "perfetto/tracing/core/data_source_descriptor.h"
 #include "perfetto/tracing/core/producer.h"
+#include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/ipc/producer_ipc_client.h"
-
 #include "src/base/test/test_task_runner.h"
 
 namespace perfetto {
@@ -35,7 +37,11 @@
 
   void Connect(const char* socket_name,
                base::TaskRunner* task_runner,
-               std::function<void()> data_produced_callback);
+               std::function<void()> on_create_data_source_instance);
+
+  // Produces a batch of events (as configured in the DataSourceConfig) and
+  // posts a callback when the service acknowledges the commit.
+  void ProduceEventBatch(std::function<void()> callback);
 
   // Producer implementation.
   void OnConnect() override;
@@ -49,12 +55,15 @@
  private:
   void Shutdown();
 
+  base::ThreadChecker thread_checker_;
+  base::TaskRunner* task_runner_ = nullptr;
   std::string name_;
   DataSourceID id_ = 0;
-
+  std::minstd_rand0 rnd_engine_;
+  size_t message_count_ = 0;
+  std::function<void()> on_create_data_source_instance_;
   std::unique_ptr<Service::ProducerEndpoint> endpoint_;
-  base::TaskRunner* task_runner_ = nullptr;
-  std::function<void()> data_produced_callback_;
+  std::unique_ptr<TraceWriter> trace_writer_;
 };
 
 }  // namespace perfetto
diff --git a/test/task_runner_thread.cc b/test/task_runner_thread.cc
index 4c1ee65..c7dc985 100644
--- a/test/task_runner_thread.cc
+++ b/test/task_runner_thread.cc
@@ -14,22 +14,25 @@
  * limitations under the License.
  */
 
+#include <pthread.h>
+#include <stdlib.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <condition_variable>
 #include <thread>
 
+#include "perfetto/base/file_utils.h"
+#include "perfetto/base/string_splitter.h"
+#include "perfetto/base/time.h"
 #include "test/task_runner_thread.h"
 
 namespace perfetto {
 
-TaskRunnerThread::TaskRunnerThread() = default;
+TaskRunnerThread::TaskRunnerThread(const char* name) : name_(name) {}
 TaskRunnerThread::~TaskRunnerThread() {
-  {
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (runner_)
-      runner_->Quit();
-  }
-
-  if (thread_.joinable())
-    thread_.join();
+  Stop();
 }
 
 void TaskRunnerThread::Start(std::unique_ptr<ThreadDelegate> delegate) {
@@ -45,7 +48,38 @@
                   [this]() { return runner_ != nullptr; });
 }
 
+void TaskRunnerThread::Stop() {
+  {
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (runner_)
+      runner_->Quit();
+  }
+
+  if (thread_.joinable())
+    thread_.join();
+}
+
+uint64_t TaskRunnerThread::GetThreadCPUTimeNs() {
+  std::condition_variable cv;
+  std::unique_lock<std::mutex> lock(mutex_);
+  uint64_t thread_time_ns = 0;
+
+  if (!runner_)
+    return 0;
+
+  runner_->PostTask([this, &thread_time_ns, &cv] {
+    std::unique_lock<std::mutex> inner_lock(mutex_);
+    thread_time_ns = base::GetThreadCPUTimeNs().count();
+    cv.notify_one();
+  });
+
+  cv.wait(lock, [&thread_time_ns] { return thread_time_ns != 0; });
+  return thread_time_ns;
+}
+
 void TaskRunnerThread::Run(std::unique_ptr<ThreadDelegate> delegate) {
+  pthread_setname_np(pthread_self(), name_);
+
   // Create the task runner and execute the specicalised code.
   base::PlatformTaskRunner task_runner;
   delegate->Initialize(&task_runner);
diff --git a/test/task_runner_thread.h b/test/task_runner_thread.h
index d27aeb1..25be34a 100644
--- a/test/task_runner_thread.h
+++ b/test/task_runner_thread.h
@@ -40,16 +40,22 @@
 // task runner is quit and the thread is joined.
 class TaskRunnerThread {
  public:
-  TaskRunnerThread();
+  explicit TaskRunnerThread(const char* name);
   ~TaskRunnerThread();
 
   // Blocks until the thread has been created and Initialize() has been
   // called.
   void Start(std::unique_ptr<ThreadDelegate> delegate);
 
+  // Blocks until the thread has been stopped and joined.
+  void Stop();
+
+  uint64_t GetThreadCPUTimeNs();
+
  private:
   void Run(std::unique_ptr<ThreadDelegate> delegate);
 
+  const char* const name_;
   std::thread thread_;
   std::condition_variable ready_;
 
diff --git a/test/task_runner_thread_delegates.h b/test/task_runner_thread_delegates.h
index 282bec4..241ecc4 100644
--- a/test/task_runner_thread_delegates.h
+++ b/test/task_runner_thread_delegates.h
@@ -75,6 +75,8 @@
                        std::move(connect_callback_));
   }
 
+  FakeProducer* producer() { return producer_.get(); }
+
  private:
   std::string producer_socket_;
   std::unique_ptr<FakeProducer> producer_;
diff --git a/tools/run_android_test b/tools/run_android_test
index 37c1a36..bb92156 100755
--- a/tools/run_android_test
+++ b/tools/run_android_test
@@ -143,9 +143,11 @@
     AdbCall('push', sanitizer_libs, target_dir)
     env = 'LD_LIBRARY_PATH="%s/sanitizer_libs" ' % (target_dir)
   cmd = 'cd %s;' % target_dir;
-  cmd += env + './%s' % args.test_name
+  binary = env + './%s' % args.test_name
+  cmd += binary
   if args.cmd_args:
-    cmd += ' ' + ' '.join(args.cmd_args)
+    actual_args = [arg.replace(args.test_name, binary) for arg in args.cmd_args]
+    cmd += ' ' + ' '.join(actual_args)
   cmd += ';echo -e "\\nTEST_RET_CODE=$?"'
   print cmd
   test_output = subprocess.check_output([ADB_PATH, 'shell', cmd])