Refactor unit-testing of core/service_impl.cc

This just improves unit-testing of ServiceImpl by
introducing MockProducer/ MockConsumer classes.
Also this renames (without adding any new behavior)
the existing methods Producer::OnTracing{Start,StopTracing}
as follows:
- OnTracingStop -> removed as it's currently unsupported
  (see b/77532839)
- OnTracingStart -> renamed to OnTracingSetup, because
  this is what it does. Also this name conflicts with
  Consumer::OnTracingStart, which has a different semantic
  and happens at different times.

Also this CL renames Consumer::OnTracingStop to
OnTracingDisabled to match the {Enable,Disable}Tracing methods.

Change-Id: Ided455d3b37cfefdfbc3eda94e5feccaeeb15a5d
diff --git a/Android.bp b/Android.bp
index 46ce586..92426f0 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3422,6 +3422,8 @@
     "src/tracing/ipc/posix_shared_memory_unittest.cc",
     "src/tracing/test/aligned_buffer_test.cc",
     "src/tracing/test/fake_packet.cc",
+    "src/tracing/test/mock_consumer.cc",
+    "src/tracing/test/mock_producer.cc",
     "src/tracing/test/test_shared_memory.cc",
     "src/tracing/test/tracing_integration_test.cc",
     "tools/ftrace_proto_gen/ftrace_proto_gen.cc",
diff --git a/include/perfetto/tracing/core/consumer.h b/include/perfetto/tracing/core/consumer.h
index 57668ca..842ac1a 100644
--- a/include/perfetto/tracing/core/consumer.h
+++ b/include/perfetto/tracing/core/consumer.h
@@ -46,7 +46,7 @@
   // - The TraceConfig's |duration_ms| has been reached.
   // - The TraceConfig's |max_file_size_bytes| has been reached.
   // - An error occurred while trying to enable tracing.
-  virtual void OnTracingStop() = 0;
+  virtual void OnTracingDisabled() = 0;
 
   // Called back by the Service (or transport layer) after invoking
   // Service::ConsumerEndpoint::ReadBuffers(). This function can be called more
diff --git a/include/perfetto/tracing/core/producer.h b/include/perfetto/tracing/core/producer.h
index 3edeb55..87bcb03 100644
--- a/include/perfetto/tracing/core/producer.h
+++ b/include/perfetto/tracing/core/producer.h
@@ -78,10 +78,7 @@
 
   // Called by the Service after OnConnect but before the first DataSource is
   // created. Can be used for any setup required before tracing begins.
-  virtual void OnTracingStart() = 0;
-
-  // Called by the Service after the final DataSource is torn down.
-  virtual void OnTracingStop() = 0;
+  virtual void OnTracingSetup() = 0;
 };
 
 }  // namespace perfetto
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index d9b1db5..abd3961 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -31,7 +31,8 @@
   // rpc CreateBuffers(CreateBuffersRequest) returns (CreateBuffersResponse) {}
 
   // Enables tracing for one or more data sources. At least one buffer must have
-  // been previously created.
+  // been previously created. The EnableTracingResponse is sent when tracing is
+  // disabled (either explicitly or because of the |duration_ms| expired).
   rpc EnableTracing(EnableTracingRequest) returns (EnableTracingResponse) {}
 
   // Disables tracing for one or more data sources.
@@ -57,7 +58,7 @@
 }
 
 message EnableTracingResponse {
-  oneof state { bool stopped = 1; }
+  oneof state { bool disabled = 1; }
 }
 
 // Arguments for rpc DisableTracing().
diff --git a/protos/perfetto/ipc/producer_port.proto b/protos/perfetto/ipc/producer_port.proto
index f151fd5..26a2670 100644
--- a/protos/perfetto/ipc/producer_port.proto
+++ b/protos/perfetto/ipc/producer_port.proto
@@ -111,14 +111,12 @@
 
   // This message also transports the file descriptor for the shared memory
   // buffer.
-  message OnTracingStart { optional uint64 shared_buffer_page_size_kb = 1; }
-
-  message OnTracingStop {}
+  message SetupTracing { optional uint64 shared_buffer_page_size_kb = 1; }
 
   oneof cmd {
     StartDataSource start_data_source = 1;
     StopDataSource stop_data_source = 2;
-    OnTracingStart on_tracing_start = 3;
-    OnTracingStop on_tracing_stop = 4;
+    SetupTracing setup_tracing = 3;
+    // id == 4 was teardown_tracing, never implemented.
   }
 }
diff --git a/src/perfetto_cmd/perfetto_cmd.cc b/src/perfetto_cmd/perfetto_cmd.cc
index ddaf9e1..0b61965 100644
--- a/src/perfetto_cmd/perfetto_cmd.cc
+++ b/src/perfetto_cmd/perfetto_cmd.cc
@@ -293,7 +293,7 @@
     FinalizeTraceAndExit();  // Reached end of trace.
 }
 
-void PerfettoCmd::OnTracingStop() {
+void PerfettoCmd::OnTracingDisabled() {
   if (trace_config_->write_into_file()) {
     // If write_into_file == true, at this point the passed file contains
     // already all the packets.
diff --git a/src/perfetto_cmd/perfetto_cmd.h b/src/perfetto_cmd/perfetto_cmd.h
index 52bb0d0..9f00fab 100644
--- a/src/perfetto_cmd/perfetto_cmd.h
+++ b/src/perfetto_cmd/perfetto_cmd.h
@@ -61,7 +61,7 @@
   // perfetto::Consumer implementation.
   void OnConnect() override;
   void OnDisconnect() override;
-  void OnTracingStop() override;
+  void OnTracingDisabled() override;
   void OnTraceData(std::vector<TracePacket>, bool has_more) override;
 
   int ctrl_c_pipe_wr() const { return *ctrl_c_pipe_wr_; }
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 5dc47dc..1a3e209 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -249,8 +249,7 @@
   watchdogs_.erase(id);
 }
 
-void ProbesProducer::OnTracingStart() {}
-void ProbesProducer::OnTracingStop() {}
+void ProbesProducer::OnTracingSetup() {}
 
 void ProbesProducer::ConnectWithRetries(const char* socket_name,
                                         base::TaskRunner* task_runner) {
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index c788ed5..72fede0 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -47,8 +47,7 @@
   void CreateDataSourceInstance(DataSourceInstanceID,
                                 const DataSourceConfig&) override;
   void TearDownDataSourceInstance(DataSourceInstanceID) override;
-  void OnTracingStart() override;
-  void OnTracingStop() override;
+  void OnTracingSetup() override;
 
   // Our Impl
   void ConnectWithRetries(const char* socket_name,
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index 69a0701..cc59dd2 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -141,6 +141,10 @@
     "test/aligned_buffer_test.h",
     "test/fake_packet.cc",
     "test/fake_packet.h",
+    "test/mock_consumer.cc",
+    "test/mock_consumer.h",
+    "test/mock_producer.cc",
+    "test/mock_producer.h",
     "test/test_shared_memory.cc",
     "test/test_shared_memory.h",
     "test/tracing_integration_test.cc",
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index 4829339..38a3d56 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -361,7 +361,7 @@
   if (!tracing_session) {
     // Can happen if the consumer calls this before EnableTracing() or after
     // FreeBuffers().
-    PERFETTO_DLOG("Couldn't find tracing session %" PRIu64, tsid);
+    PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
     return;
   }
 
@@ -369,8 +369,7 @@
     const ProducerID producer_id = data_source_inst.first;
     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
     ProducerEndpointImpl* producer = GetProducer(producer_id);
-    PERFETTO_DCHECK(producer);
-    producer->producer_->TearDownDataSourceInstance(ds_inst_id);
+    producer->TearDownDataSource(ds_inst_id);
   }
   tracing_session->data_source_instances.clear();
 
@@ -383,7 +382,7 @@
 
   if (tracing_session->tracing_enabled) {
     tracing_session->tracing_enabled = false;
-    tracing_session->consumer->NotifyOnTracingStop();
+    tracing_session->consumer->NotifyOnTracingDisabled();
   }
 
   // Deliberately NOT removing the session from |tracing_session_|, it's still
@@ -598,7 +597,7 @@
   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
   TracingSession* tracing_session = GetTracingSession(tsid);
   if (!tracing_session) {
-    PERFETTO_DLOG("Cannot FreeBuffers(): no tracing session is active");
+    PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
     return;  // TODO(primiano): signal failure?
   }
   DisableTracing(tsid);
@@ -660,17 +659,18 @@
   PERFETTO_CHECK(producer_id);
   ProducerEndpointImpl* producer = GetProducer(producer_id);
   PERFETTO_DCHECK(producer);
-  for (auto& session : tracing_sessions_) {
-    auto it = session.second.data_source_instances.begin();
-    while (it != session.second.data_source_instances.end()) {
+  for (auto& kv : tracing_sessions_) {
+    auto& ds_instances = kv.second.data_source_instances;
+    for (auto it = ds_instances.begin(); it != ds_instances.end();) {
       if (it->first == producer_id && it->second.data_source_name == name) {
-        producer->producer_->TearDownDataSourceInstance(it->second.instance_id);
-        it = session.second.data_source_instances.erase(it);
+        DataSourceInstanceID ds_inst_id = it->second.instance_id;
+        producer->TearDownDataSource(ds_inst_id);
+        it = ds_instances.erase(it);
       } else {
         ++it;
       }
-    }
-  }
+    }  // for (data_source_instances)
+  }    // for (tracing_session)
 
   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
     if (it->second.producer_id == producer_id &&
@@ -762,10 +762,10 @@
     // client to go away.
     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
     producer->SetSharedMemory(std::move(shared_memory));
-    producer->producer_->OnTracingStart();
+    producer->OnTracingSetup();
     UpdateMemoryGuardrail();
   }
-  producer->producer_->CreateDataSourceInstance(inst_id, ds_config);
+  producer->CreateDataSourceInstance(inst_id, ds_config);
 }
 
 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
@@ -985,12 +985,12 @@
   consumer_->OnDisconnect();
 }
 
-void ServiceImpl::ConsumerEndpointImpl::NotifyOnTracingStop() {
+void ServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   auto weak_this = GetWeakPtr();
   task_runner_->PostTask([weak_this] {
     if (weak_this)
-      weak_this->consumer_->OnTracingStop();
+      weak_this->consumer_->OnTracingDisabled();
   });
 }
 
@@ -998,35 +998,35 @@
                                                       base::ScopedFile fd) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   if (!service_->EnableTracing(this, cfg, std::move(fd)))
-    NotifyOnTracingStop();
+    NotifyOnTracingDisabled();
 }
 
 void ServiceImpl::ConsumerEndpointImpl::DisableTracing() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  if (tracing_session_id_) {
-    service_->DisableTracing(tracing_session_id_);
-  } else {
+  if (!tracing_session_id_) {
     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
+    return;
   }
+  service_->DisableTracing(tracing_session_id_);
 }
 
 void ServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  if (tracing_session_id_) {
-    service_->ReadBuffers(tracing_session_id_, this);
-  } else {
+  if (!tracing_session_id_) {
     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
+    return;
   }
+  service_->ReadBuffers(tracing_session_id_, this);
 }
 
 void ServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  if (tracing_session_id_) {
-    service_->FreeBuffers(tracing_session_id_);
-    tracing_session_id_ = 0;
-  } else {
+  if (!tracing_session_id_) {
     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
+    return;
   }
+  service_->FreeBuffers(tracing_session_id_);
+  tracing_session_id_ = 0;
 }
 
 base::WeakPtr<ServiceImpl::ConsumerEndpointImpl>
@@ -1051,11 +1051,8 @@
       service_(service),
       task_runner_(task_runner),
       producer_(producer),
-      name_(producer_name) {
-  // TODO(primiano): make the page-size for the SHM dynamic and find a way to
-  // communicate that to the Producer (add a field to the
-  // InitializeConnectionResponse IPC).
-}
+      name_(producer_name),
+      weak_ptr_factory_(this) {}
 
 ServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
   service_->DisconnectProducer(id_);
@@ -1151,15 +1148,51 @@
   return shared_buffer_page_size_kb_;
 }
 
-std::unique_ptr<TraceWriter>
-ServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID buf_id) {
+void ServiceImpl::ProducerEndpointImpl::TearDownDataSource(
+    DataSourceInstanceID ds_inst_id) {
+  // TODO(primiano): When we'll support tearing down the SMB, at this point we
+  // should send the Producer a TearDownTracing if all its data sources have
+  // been disabled (see b/77532839 and aosp/655179 PS1).
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, ds_inst_id] {
+    if (weak_this)
+      weak_this->producer_->TearDownDataSourceInstance(ds_inst_id);
+  });
+}
+
+SharedMemoryArbiterImpl*
+ServiceImpl::ProducerEndpointImpl::GetOrCreateShmemArbiter() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   if (!inproc_shmem_arbiter_) {
     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
         shared_memory_->start(), shared_memory_->size(),
         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
   }
-  return inproc_shmem_arbiter_->CreateTraceWriter(buf_id);
+  return inproc_shmem_arbiter_.get();
+}
+
+std::unique_ptr<TraceWriter>
+ServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID buf_id) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  return GetOrCreateShmemArbiter()->CreateTraceWriter(buf_id);
+}
+
+void ServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this] {
+    if (weak_this)
+      weak_this->producer_->OnTracingSetup();
+  });
+}
+
+void ServiceImpl::ProducerEndpointImpl::CreateDataSourceInstance(
+    DataSourceInstanceID ds_id,
+    const DataSourceConfig& config) {
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, ds_id, config] {
+    if (weak_this)
+      weak_this->producer_->CreateDataSourceInstance(ds_id, std::move(config));
+  });
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index baf9f73..58b4075 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -71,6 +71,10 @@
     void SetSharedMemory(std::unique_ptr<SharedMemory>);
 
     std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
+    void OnTracingSetup();
+    void CreateDataSourceInstance(DataSourceInstanceID,
+                                  const DataSourceConfig&);
+    void TearDownDataSource(DataSourceInstanceID);
     SharedMemory* shared_memory() const override;
     size_t shared_buffer_page_size_kb() const override;
 
@@ -79,6 +83,7 @@
     FRIEND_TEST(ServiceImplTest, RegisterAndUnregister);
     ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
     ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
+    SharedMemoryArbiterImpl* GetOrCreateShmemArbiter();
 
     ProducerID const id_;
     const uid_t uid_;
@@ -94,6 +99,7 @@
     // This is used only in in-process configurations (mostly tests).
     std::unique_ptr<SharedMemoryArbiterImpl> inproc_shmem_arbiter_;
     PERFETTO_THREAD_CHECKER(thread_checker_)
+    base::WeakPtrFactory<ProducerEndpointImpl> weak_ptr_factory_;  // Keep last.
   };
 
   // The implementation behind the service endpoint exposed to each consumer.
@@ -102,7 +108,7 @@
     ConsumerEndpointImpl(ServiceImpl*, base::TaskRunner*, Consumer*);
     ~ConsumerEndpointImpl() override;
 
-    void NotifyOnTracingStop();
+    void NotifyOnTracingDisabled();
     base::WeakPtr<ConsumerEndpointImpl> GetWeakPtr();
 
     // Service::ConsumerEndpoint implementation.
@@ -120,10 +126,8 @@
     ServiceImpl* const service_;
     Consumer* const consumer_;
     TracingSessionID tracing_session_id_ = 0;
-
     PERFETTO_THREAD_CHECKER(thread_checker_)
-
-    base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;
+    base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;  // Keep last.
   };
 
   explicit ServiceImpl(std::unique_ptr<SharedMemory::Factory>,
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 77256ac..1ea2534 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -30,47 +30,26 @@
 #include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/core/trace_writer.h"
 #include "src/base/test/test_task_runner.h"
+#include "src/tracing/test/mock_consumer.h"
+#include "src/tracing/test/mock_producer.h"
 #include "src/tracing/test/test_shared_memory.h"
 
 #include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace.pb.h"
+#include "perfetto/trace/trace_packet.pb.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 
-namespace perfetto {
 using ::testing::_;
+using ::testing::Contains;
+using ::testing::Eq;
 using ::testing::InSequence;
 using ::testing::Invoke;
+using ::testing::InvokeWithoutArgs;
 using ::testing::Mock;
+using ::testing::Property;
+using ::testing::StrictMock;
 
-namespace {
-
-class MockProducer : public Producer {
- public:
-  ~MockProducer() override {}
-
-  // Producer implementation.
-  MOCK_METHOD0(OnConnect, void());
-  MOCK_METHOD0(OnDisconnect, void());
-  MOCK_METHOD2(CreateDataSourceInstance,
-               void(DataSourceInstanceID, const DataSourceConfig&));
-  MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
-  MOCK_METHOD0(OnTracingStart, void());
-  MOCK_METHOD0(OnTracingStop, void());
-};
-
-class MockConsumer : public Consumer {
- public:
-  ~MockConsumer() override {}
-
-  // Consumer implementation.
-  MOCK_METHOD0(OnConnect, void());
-  MOCK_METHOD0(OnDisconnect, void());
-  MOCK_METHOD0(OnTracingStop, void());
-
-  void OnTraceData(std::vector<TracePacket> packets, bool has_more) override {}
-};
-
-}  // namespace
+namespace perfetto {
 
 class ServiceImplTest : public testing::Test {
  public:
@@ -82,350 +61,220 @@
             .release()));
   }
 
+  std::unique_ptr<MockProducer> CreateMockProducer() {
+    return std::unique_ptr<MockProducer>(
+        new StrictMock<MockProducer>(&task_runner));
+  }
+
+  std::unique_ptr<MockConsumer> CreateMockConsumer() {
+    return std::unique_ptr<MockConsumer>(
+        new StrictMock<MockConsumer>(&task_runner));
+  }
+
   base::TestTaskRunner task_runner;
   std::unique_ptr<ServiceImpl> svc;
 };
 
 TEST_F(ServiceImplTest, RegisterAndUnregister) {
-  MockProducer mock_producer_1;
-  MockProducer mock_producer_2;
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint_1 =
-      svc->ConnectProducer(&mock_producer_1, 123u /* uid */, "mock_producer_1");
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint_2 =
-      svc->ConnectProducer(&mock_producer_2, 456u /* uid */, "mock_producer_2");
+  std::unique_ptr<MockProducer> mock_producer_1 = CreateMockProducer();
+  std::unique_ptr<MockProducer> mock_producer_2 = CreateMockProducer();
 
-  ASSERT_TRUE(producer_endpoint_1);
-  ASSERT_TRUE(producer_endpoint_2);
-
-  InSequence seq;
-  EXPECT_CALL(mock_producer_1, OnConnect());
-  EXPECT_CALL(mock_producer_2, OnConnect());
-  task_runner.RunUntilIdle();
+  mock_producer_1->Connect(svc.get(), "mock_producer_1", 123u /* uid */);
+  mock_producer_2->Connect(svc.get(), "mock_producer_2", 456u /* uid */);
 
   ASSERT_EQ(2u, svc->num_producers());
-  ASSERT_EQ(producer_endpoint_1.get(), svc->GetProducer(1));
-  ASSERT_EQ(producer_endpoint_2.get(), svc->GetProducer(2));
+  ASSERT_EQ(mock_producer_1->endpoint(), svc->GetProducer(1));
+  ASSERT_EQ(mock_producer_2->endpoint(), svc->GetProducer(2));
   ASSERT_EQ(123u, svc->GetProducer(1)->uid_);
   ASSERT_EQ(456u, svc->GetProducer(2)->uid_);
 
-  DataSourceDescriptor ds_desc1;
-  ds_desc1.set_name("foo");
-  producer_endpoint_1->RegisterDataSource(ds_desc1);
+  mock_producer_1->RegisterDataSource("foo");
+  mock_producer_2->RegisterDataSource("bar");
 
-  DataSourceDescriptor ds_desc2;
-  ds_desc2.set_name("bar");
-  producer_endpoint_2->RegisterDataSource(ds_desc2);
+  mock_producer_1->UnregisterDataSource("foo");
+  mock_producer_2->UnregisterDataSource("bar");
 
-  task_runner.RunUntilIdle();
-
-  producer_endpoint_1->UnregisterDataSource("foo");
-  producer_endpoint_2->UnregisterDataSource("bar");
-
-  task_runner.RunUntilIdle();
-
-  EXPECT_CALL(mock_producer_1, OnDisconnect());
-  producer_endpoint_1.reset();
-  task_runner.RunUntilIdle();
-  Mock::VerifyAndClearExpectations(&mock_producer_1);
-
+  mock_producer_1.reset();
   ASSERT_EQ(1u, svc->num_producers());
   ASSERT_EQ(nullptr, svc->GetProducer(1));
 
-  EXPECT_CALL(mock_producer_2, OnDisconnect());
-  producer_endpoint_2.reset();
-  task_runner.RunUntilIdle();
-  Mock::VerifyAndClearExpectations(&mock_producer_2);
+  mock_producer_2.reset();
+  ASSERT_EQ(nullptr, svc->GetProducer(2));
 
   ASSERT_EQ(0u, svc->num_producers());
 }
 
 TEST_F(ServiceImplTest, EnableAndDisableTracing) {
-  MockProducer mock_producer;
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
-      svc->ConnectProducer(&mock_producer, 123u /* uid */, "mock_producer");
-  MockConsumer mock_consumer;
-  std::unique_ptr<Service::ConsumerEndpoint> consumer_endpoint =
-      svc->ConnectConsumer(&mock_consumer);
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
 
-  InSequence seq;
-  EXPECT_CALL(mock_producer, OnConnect());
-  EXPECT_CALL(mock_consumer, OnConnect());
-  task_runner.RunUntilIdle();
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
 
-  DataSourceDescriptor ds_desc;
-  ds_desc.set_name("foo");
-  producer_endpoint->RegisterDataSource(ds_desc);
-
-  task_runner.RunUntilIdle();
-
-  EXPECT_CALL(mock_producer, CreateDataSourceInstance(_, _));
-  EXPECT_CALL(mock_producer, TearDownDataSourceInstance(_));
   TraceConfig trace_config;
-  trace_config.add_buffers()->set_size_kb(4096 * 10);
+  trace_config.add_buffers()->set_size_kb(128);
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
-  ds_config->set_name("foo");
-  ds_config->set_target_buffer(0);
-  consumer_endpoint->EnableTracing(trace_config);
-  task_runner.RunUntilIdle();
+  ds_config->set_name("data_source");
+  consumer->EnableTracing(trace_config);
 
-  EXPECT_CALL(mock_producer, OnDisconnect());
-  EXPECT_CALL(mock_consumer, OnDisconnect());
-  consumer_endpoint->DisableTracing();
-  producer_endpoint.reset();
-  consumer_endpoint.reset();
-  task_runner.RunUntilIdle();
-  Mock::VerifyAndClearExpectations(&mock_producer);
-  Mock::VerifyAndClearExpectations(&mock_consumer);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
 }
 
 TEST_F(ServiceImplTest, LockdownMode) {
-  MockConsumer mock_consumer;
-  EXPECT_CALL(mock_consumer, OnConnect());
-  std::unique_ptr<Service::ConsumerEndpoint> consumer_endpoint =
-      svc->ConnectConsumer(&mock_consumer);
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer_sameuid", geteuid());
+  producer->RegisterDataSource("data_source");
 
   TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
   trace_config.set_lockdown_mode(
       TraceConfig::LockdownModeOperation::LOCKDOWN_SET);
-  consumer_endpoint->EnableTracing(trace_config);
+  consumer->EnableTracing(trace_config);
+
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  std::unique_ptr<MockProducer> producer_otheruid = CreateMockProducer();
+  auto x = svc->ConnectProducer(producer_otheruid.get(), geteuid() + 1,
+                                "mock_producer_ouid");
+  EXPECT_CALL(*producer_otheruid, OnConnect()).Times(0);
   task_runner.RunUntilIdle();
+  Mock::VerifyAndClearExpectations(producer_otheruid.get());
 
-  InSequence seq;
-
-  MockProducer mock_producer;
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
-      svc->ConnectProducer(&mock_producer, geteuid() + 1 /* uid */,
-                           "mock_producer");
-
-  MockProducer mock_producer_sameuid;
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint_sameuid =
-      svc->ConnectProducer(&mock_producer_sameuid, geteuid() /* uid */,
-                           "mock_producer_sameuid");
-
-  EXPECT_CALL(mock_producer, OnConnect()).Times(0);
-  EXPECT_CALL(mock_producer_sameuid, OnConnect());
-  task_runner.RunUntilIdle();
-
-  Mock::VerifyAndClearExpectations(&mock_producer);
-
-  consumer_endpoint->DisableTracing();
-  task_runner.RunUntilIdle();
+  consumer->DisableTracing();
+  consumer->FreeBuffers();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
 
   trace_config.set_lockdown_mode(
       TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR);
-  consumer_endpoint->EnableTracing(trace_config);
-  task_runner.RunUntilIdle();
+  consumer->EnableTracing(trace_config);
+  producer->WaitForDataSourceStart("data_source");
 
-  EXPECT_CALL(mock_producer_sameuid, OnDisconnect());
-  EXPECT_CALL(mock_producer, OnConnect());
-  producer_endpoint_sameuid =
-      svc->ConnectProducer(&mock_producer, geteuid() + 1, "mock_producer");
+  std::unique_ptr<MockProducer> producer_otheruid2 = CreateMockProducer();
+  producer_otheruid->Connect(svc.get(), "mock_producer_ouid2", geteuid() + 1);
 
-  EXPECT_CALL(mock_producer, OnDisconnect());
-  task_runner.RunUntilIdle();
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
 }
 
 TEST_F(ServiceImplTest, DisconnectConsumerWhileTracing) {
-  MockProducer mock_producer;
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
-      svc->ConnectProducer(&mock_producer, 123u /* uid */, "mock_producer");
-  MockConsumer mock_consumer;
-  std::unique_ptr<Service::ConsumerEndpoint> consumer_endpoint =
-      svc->ConnectConsumer(&mock_consumer);
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
 
-  InSequence seq;
-  EXPECT_CALL(mock_producer, OnConnect());
-  EXPECT_CALL(mock_consumer, OnConnect());
-  task_runner.RunUntilIdle();
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
 
-  DataSourceDescriptor ds_desc;
-  ds_desc.set_name("foo");
-  producer_endpoint->RegisterDataSource(ds_desc);
-  task_runner.RunUntilIdle();
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+  consumer->EnableTracing(trace_config);
+
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
 
   // Disconnecting the consumer while tracing should trigger data source
   // teardown.
-  EXPECT_CALL(mock_producer, CreateDataSourceInstance(_, _));
-  EXPECT_CALL(mock_producer, TearDownDataSourceInstance(_));
-  TraceConfig trace_config;
-  trace_config.add_buffers()->set_size_kb(4096 * 10);
-  auto* ds_config = trace_config.add_data_sources()->mutable_config();
-  ds_config->set_name("foo");
-  ds_config->set_target_buffer(0);
-  consumer_endpoint->EnableTracing(trace_config);
-  task_runner.RunUntilIdle();
-
-  EXPECT_CALL(mock_consumer, OnDisconnect());
-  consumer_endpoint.reset();
-  task_runner.RunUntilIdle();
-
-  EXPECT_CALL(mock_producer, OnDisconnect());
-  producer_endpoint.reset();
-  Mock::VerifyAndClearExpectations(&mock_producer);
-  Mock::VerifyAndClearExpectations(&mock_consumer);
+  consumer.reset();
+  producer->WaitForDataSourceStop("data_source");
 }
 
 TEST_F(ServiceImplTest, ReconnectProducerWhileTracing) {
-  MockProducer mock_producer;
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
-      svc->ConnectProducer(&mock_producer, 123u /* uid */, "mock_producer");
-  MockConsumer mock_consumer;
-  std::unique_ptr<Service::ConsumerEndpoint> consumer_endpoint =
-      svc->ConnectConsumer(&mock_consumer);
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
 
-  InSequence seq;
-  EXPECT_CALL(mock_producer, OnConnect());
-  EXPECT_CALL(mock_consumer, OnConnect());
-  task_runner.RunUntilIdle();
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
 
-  DataSourceDescriptor ds_desc;
-  ds_desc.set_name("foo");
-  producer_endpoint->RegisterDataSource(ds_desc);
-  task_runner.RunUntilIdle();
-
-  // Disconnecting the producer while tracing should trigger data source
-  // teardown.
-  EXPECT_CALL(mock_producer, CreateDataSourceInstance(_, _));
-  EXPECT_CALL(mock_producer, TearDownDataSourceInstance(_));
-  EXPECT_CALL(mock_producer, OnDisconnect());
   TraceConfig trace_config;
-  trace_config.add_buffers()->set_size_kb(4096 * 10);
+  trace_config.add_buffers()->set_size_kb(128);
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
-  ds_config->set_name("foo");
-  ds_config->set_target_buffer(0);
-  consumer_endpoint->EnableTracing(trace_config);
-  producer_endpoint.reset();
-  task_runner.RunUntilIdle();
+  ds_config->set_name("data_source");
+  consumer->EnableTracing(trace_config);
 
-  // Reconnecting a producer with a matching data source should see that data
-  // source getting enabled.
-  EXPECT_CALL(mock_producer, OnConnect());
-  producer_endpoint =
-      svc->ConnectProducer(&mock_producer, 123u /* uid */, "mock_producer");
-  task_runner.RunUntilIdle();
-  EXPECT_CALL(mock_producer, CreateDataSourceInstance(_, _));
-  EXPECT_CALL(mock_producer, TearDownDataSourceInstance(_));
-  producer_endpoint->RegisterDataSource(ds_desc);
-  task_runner.RunUntilIdle();
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
 
-  EXPECT_CALL(mock_consumer, OnDisconnect());
-  consumer_endpoint->DisableTracing();
-  consumer_endpoint.reset();
-  task_runner.RunUntilIdle();
-
-  EXPECT_CALL(mock_producer, OnDisconnect());
-  producer_endpoint.reset();
-  Mock::VerifyAndClearExpectations(&mock_producer);
-  Mock::VerifyAndClearExpectations(&mock_consumer);
+  // Disconnecting and reconnecting a producer with a matching data source.
+  // The Producer should see that data source getting enabled again.
+  producer.reset();
+  producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer_2");
+  producer->RegisterDataSource("data_source");
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
 }
 
 TEST_F(ServiceImplTest, ProducerIDWrapping) {
-  base::TestTaskRunner task_runner;
-  auto shm_factory =
-      std::unique_ptr<SharedMemory::Factory>(new TestSharedMemory::Factory());
-  std::unique_ptr<ServiceImpl> svc(static_cast<ServiceImpl*>(
-      Service::CreateInstance(std::move(shm_factory), &task_runner).release()));
+  std::vector<std::unique_ptr<MockProducer>> producers;
+  producers.push_back(nullptr);
 
-  std::map<ProducerID, std::pair<std::unique_ptr<MockProducer>,
-                                 std::unique_ptr<Service::ProducerEndpoint>>>
-      producers;
-
-  auto ConnectProducerAndWait = [&task_runner, &svc, &producers]() {
-    char checkpoint_name[32];
-    static int checkpoint_num = 0;
-    sprintf(checkpoint_name, "on_connect_%d", checkpoint_num++);
-    auto on_connect = task_runner.CreateCheckpoint(checkpoint_name);
-    std::unique_ptr<MockProducer> producer(new MockProducer());
-    std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
-        svc->ConnectProducer(producer.get(), 123u /* uid */, "mock_producer");
-    EXPECT_CALL(*producer, OnConnect()).WillOnce(Invoke(on_connect));
-    task_runner.RunUntilCheckpoint(checkpoint_name);
-    EXPECT_EQ(&*producer_endpoint, svc->GetProducer(svc->last_producer_id_));
-    const ProducerID pr_id = svc->last_producer_id_;
-    producers.emplace(pr_id, std::make_pair(std::move(producer),
-                                            std::move(producer_endpoint)));
-    return pr_id;
-  };
-
-  auto DisconnectProducerAndWait = [&task_runner,
-                                    &producers](ProducerID pr_id) {
-    char checkpoint_name[32];
-    static int checkpoint_num = 0;
-    sprintf(checkpoint_name, "on_disconnect_%d", checkpoint_num++);
-    auto on_disconnect = task_runner.CreateCheckpoint(checkpoint_name);
-    auto it = producers.find(pr_id);
-    PERFETTO_CHECK(it != producers.end());
-    EXPECT_CALL(*it->second.first, OnDisconnect())
-        .WillOnce(Invoke(on_disconnect));
-    producers.erase(pr_id);
-    task_runner.RunUntilCheckpoint(checkpoint_name);
+  auto connect_producer_and_get_id = [&producers,
+                                      this](const std::string& name) {
+    producers.emplace_back(CreateMockProducer());
+    producers.back()->Connect(svc.get(), "mock_producer_" + name);
+    return svc->last_producer_id_;
   };
 
   // Connect producers 1-4.
   for (ProducerID i = 1; i <= 4; i++)
-    ASSERT_EQ(i, ConnectProducerAndWait());
+    ASSERT_EQ(i, connect_producer_and_get_id(std::to_string(i)));
 
   // Disconnect producers 1,3.
-  DisconnectProducerAndWait(1);
-  DisconnectProducerAndWait(3);
+  producers[1].reset();
+  producers[3].reset();
 
   svc->last_producer_id_ = kMaxProducerID - 1;
-  ASSERT_EQ(kMaxProducerID, ConnectProducerAndWait());
-  ASSERT_EQ(1u, ConnectProducerAndWait());
-  ASSERT_EQ(3u, ConnectProducerAndWait());
-  ASSERT_EQ(5u, ConnectProducerAndWait());
-  ASSERT_EQ(6u, ConnectProducerAndWait());
-
-  // Disconnect all producers to mute spurious callbacks.
-  DisconnectProducerAndWait(kMaxProducerID);
-  for (ProducerID i = 1; i <= 6; i++)
-    DisconnectProducerAndWait(i);
+  ASSERT_EQ(kMaxProducerID, connect_producer_and_get_id("maxid"));
+  ASSERT_EQ(1u, connect_producer_and_get_id("1_again"));
+  ASSERT_EQ(3u, connect_producer_and_get_id("3_again"));
+  ASSERT_EQ(5u, connect_producer_and_get_id("5"));
+  ASSERT_EQ(6u, connect_producer_and_get_id("6"));
 }
 
 TEST_F(ServiceImplTest, WriteIntoFileAndStopOnMaxSize) {
-  MockProducer mock_producer;
-  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
-      svc->ConnectProducer(&mock_producer, 123u /* uid */, "mock_producer");
-  MockConsumer mock_consumer;
-  std::unique_ptr<Service::ConsumerEndpoint> consumer_endpoint =
-      svc->ConnectConsumer(&mock_consumer);
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
 
-  EXPECT_CALL(mock_producer, OnConnect());
-  EXPECT_CALL(mock_consumer, OnConnect());
-  task_runner.RunUntilIdle();
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
 
-  DataSourceDescriptor ds_desc;
-  ds_desc.set_name("datasource");
-  producer_endpoint->RegisterDataSource(ds_desc);
-  task_runner.RunUntilIdle();
-
-  static const char kPayload[] = "1234567890abcdef-";
-  static const int kNumPackets = 10;
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(4096);
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
-  ds_config->set_name("datasource");
+  ds_config->set_name("data_source");
   ds_config->set_target_buffer(0);
   trace_config.set_write_into_file(true);
   trace_config.set_file_write_period_ms(1);
   const uint64_t kMaxFileSize = 512;
   trace_config.set_max_file_size_bytes(kMaxFileSize);
   base::TempFile tmp_file = base::TempFile::Create();
-  auto on_tracing_start = task_runner.CreateCheckpoint("on_tracing_start");
-  BufferID buf_id = 0;
-  EXPECT_CALL(mock_producer, OnTracingStart());
-  EXPECT_CALL(mock_producer, CreateDataSourceInstance(_, _))
-      .WillOnce(Invoke([on_tracing_start, &buf_id](
-                           DataSourceInstanceID, const DataSourceConfig& cfg) {
-        buf_id = static_cast<BufferID>(cfg.target_buffer());
-        on_tracing_start();
-      }));
-  consumer_endpoint->EnableTracing(trace_config,
-                                   base::ScopedFile(dup(tmp_file.fd())));
-  task_runner.RunUntilCheckpoint("on_tracing_start");
+  consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
+
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  static const char kPayload[] = "1234567890abcdef-";
+  static const int kNumPackets = 10;
 
   std::unique_ptr<TraceWriter> writer =
-      producer_endpoint->CreateTraceWriter(buf_id);
+      producer->CreateTraceWriter("data_source");
   // All these packets should fit within kMaxFileSize.
   for (int i = 0; i < kNumPackets; i++) {
     auto tp = writer->NewTracePacket();
@@ -444,17 +293,9 @@
   writer->Flush();
   writer.reset();
 
-  auto on_tracing_stop = task_runner.CreateCheckpoint("on_tracing_stop");
-  EXPECT_CALL(mock_producer, TearDownDataSourceInstance(_));
-  EXPECT_CALL(mock_consumer, OnTracingStop()).WillOnce(Invoke(on_tracing_stop));
-  task_runner.RunUntilCheckpoint("on_tracing_stop");
-
-  EXPECT_CALL(mock_consumer, OnDisconnect());
-  EXPECT_CALL(mock_producer, OnDisconnect());
-  consumer_endpoint->DisableTracing();
-  consumer_endpoint.reset();
-  producer_endpoint.reset();
-  task_runner.RunUntilIdle();
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
 
   // Verify the contents of the file.
   std::string trace_raw;
@@ -470,6 +311,6 @@
     ASSERT_EQ(kPayload + std::to_string(num_testing_packet++),
               tp.for_testing().str());
   }
-}  // namespace perfetto
+}
 
 }  // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index c671779..c3382b5 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -78,8 +78,8 @@
       [weak_this](ipc::AsyncResult<protos::EnableTracingResponse> response) {
         if (!weak_this)
           return;
-        if (!response || response->stopped())
-          weak_this->consumer_->OnTracingStop();
+        if (!response || response->disabled())
+          weak_this->consumer_->OnTracingDisabled();
       });
 
   // |fd| will be closed when this function returns, but it's fine because the
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.cc b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
index 10b36d4..f6484b9 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.cc
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
@@ -123,23 +123,18 @@
     return;
   }
 
-  if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kOnTracingStart) {
+  if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupTracing) {
     base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
     PERFETTO_CHECK(shmem_fd);
 
     // TODO(primiano): handle mmap failure in case of OOM.
     shared_memory_ = PosixSharedMemory::AttachToFd(std::move(shmem_fd));
     shared_buffer_page_size_kb_ =
-        cmd.on_tracing_start().shared_buffer_page_size_kb();
+        cmd.setup_tracing().shared_buffer_page_size_kb();
     shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
         shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
         task_runner_);
-    producer_->OnTracingStart();
-    return;
-  }
-
-  if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kOnTracingStop) {
-    // TODO (taylori) Tear down the shm.
+    producer_->OnTracingSetup();
     return;
   }
 
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index 24643d0..3c599f4 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -109,9 +109,9 @@
 // |service_endpoint| (in the RemoteConsumer dtor).
 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
 
-void ConsumerIPCService::RemoteConsumer::OnTracingStop() {
+void ConsumerIPCService::RemoteConsumer::OnTracingDisabled() {
   auto result = ipc::AsyncResult<protos::EnableTracingResponse>::Create();
-  result->set_stopped(true);
+  result->set_disabled(true);
   enable_tracing_response.Resolve(std::move(result));
 }
 
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index 253f34e..d3c02f7 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -67,7 +67,7 @@
     // no connection here, these methods are posted straight away.
     void OnConnect() override;
     void OnDisconnect() override;
-    void OnTracingStop() override;
+    void OnTracingDisabled() override;
     void OnTraceData(std::vector<TracePacket>, bool has_more) override;
 
     // The interface obtained from the core service business logic through
@@ -80,7 +80,7 @@
     DeferredReadBuffersResponse read_buffers_response;
 
     // After EnableTracing() is invoked, this binds the async callback that
-    // allows to send the OnTracingStop notification.
+    // allows to send the OnTracingDisabled notification.
     DeferredEnableTracingResponse enable_tracing_response;
   };
 
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index 565076d..abb3506 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -223,7 +223,7 @@
   async_producer_commands.Resolve(std::move(cmd));
 }
 
-void ProducerIPCService::RemoteProducer::OnTracingStart() {
+void ProducerIPCService::RemoteProducer::OnTracingSetup() {
   if (!async_producer_commands.IsBound()) {
     PERFETTO_DLOG(
         "The Service tried to allocate the shared memory but the remote "
@@ -236,13 +236,9 @@
   auto cmd = ipc::AsyncResult<protos::GetAsyncCommandResponse>::Create();
   cmd.set_has_more(true);
   cmd.set_fd(shm_fd);
-  cmd->mutable_on_tracing_start()->set_shared_buffer_page_size_kb(
+  cmd->mutable_setup_tracing()->set_shared_buffer_page_size_kb(
       service_endpoint->shared_buffer_page_size_kb());
   async_producer_commands.Resolve(std::move(cmd));
 }
 
-void ProducerIPCService::RemoteProducer::OnTracingStop() {
-  // TODO(taylori): Implement.
-}
-
 }  // namespace perfetto
diff --git a/src/tracing/ipc/service/producer_ipc_service.h b/src/tracing/ipc/service/producer_ipc_service.h
index 289c036..f0dacb9 100644
--- a/src/tracing/ipc/service/producer_ipc_service.h
+++ b/src/tracing/ipc/service/producer_ipc_service.h
@@ -72,8 +72,7 @@
     void CreateDataSourceInstance(DataSourceInstanceID,
                                   const DataSourceConfig&) override;
     void TearDownDataSourceInstance(DataSourceInstanceID) override;
-    void OnTracingStart() override;
-    void OnTracingStop() override;
+    void OnTracingSetup() override;
 
     // The interface obtained from the core service business logic through
     // Service::ConnectProducer(this). This allows to invoke methods for a
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
new file mode 100644
index 0000000..6ea267e
--- /dev/null
+++ b/src/tracing/test/mock_consumer.cc
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/test/mock_consumer.h"
+
+#include "perfetto/tracing/core/trace_config.h"
+#include "src/base/test/test_task_runner.h"
+
+using ::testing::_;
+using ::testing::Invoke;
+
+namespace perfetto {
+
+MockConsumer::MockConsumer(base::TestTaskRunner* task_runner)
+    : task_runner_(task_runner) {}
+
+MockConsumer::~MockConsumer() {
+  if (!service_endpoint_)
+    return;
+  static int i = 0;
+  auto checkpoint_name = "on_consumer_disconnect_" + std::to_string(i++);
+  auto on_disconnect = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnDisconnect()).WillOnce(Invoke(on_disconnect));
+  service_endpoint_.reset();
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
+void MockConsumer::Connect(Service* svc) {
+  service_endpoint_ = svc->ConnectConsumer(this);
+  static int i = 0;
+  auto checkpoint_name = "on_consumer_connect_" + std::to_string(i++);
+  auto on_connect = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnConnect()).WillOnce(Invoke(on_connect));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
+void MockConsumer::EnableTracing(const TraceConfig& trace_config,
+                                 base::ScopedFile write_into_file) {
+  service_endpoint_->EnableTracing(trace_config, std::move(write_into_file));
+}
+
+void MockConsumer::DisableTracing() {
+  service_endpoint_->DisableTracing();
+}
+
+void MockConsumer::FreeBuffers() {
+  service_endpoint_->FreeBuffers();
+}
+
+void MockConsumer::WaitForTracingDisabled() {
+  static int i = 0;
+  auto checkpoint_name = "on_tracing_disabled_consumer_" + std::to_string(i++);
+  auto on_tracing_disabled = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnTracingDisabled()).WillOnce(Invoke(on_tracing_disabled));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
+std::vector<protos::TracePacket> MockConsumer::ReadBuffers() {
+  std::vector<protos::TracePacket> decoded_packets;
+  static int i = 0;
+  std::string checkpoint_name = "on_read_buffers_" + std::to_string(i++);
+  auto on_read_buffers = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnTraceData(_, _))
+      .WillRepeatedly(
+          Invoke([&decoded_packets, on_read_buffers](
+                     std::vector<TracePacket>* packets, bool has_more) {
+            for (TracePacket& packet : *packets) {
+              decoded_packets.emplace_back();
+              protos::TracePacket* decoded_packet = &decoded_packets.back();
+              packet.Decode(decoded_packet);
+            }
+            if (!has_more)
+              on_read_buffers();
+          }));
+  service_endpoint_->ReadBuffers();
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+  return decoded_packets;
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
new file mode 100644
index 0000000..f62dde2
--- /dev/null
+++ b/src/tracing/test/mock_consumer.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_TEST_MOCK_CONSUMER_H_
+#define SRC_TRACING_TEST_MOCK_CONSUMER_H_
+
+#include <memory>
+
+#include "gmock/gmock.h"
+#include "perfetto/tracing/core/consumer.h"
+#include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/trace_packet.h"
+
+#include "perfetto/trace/trace_packet.pb.h"
+
+namespace perfetto {
+
+namespace base {
+class TestTaskRunner;
+}
+
+class MockConsumer : public Consumer {
+ public:
+  explicit MockConsumer(base::TestTaskRunner*);
+  ~MockConsumer() override;
+
+  void Connect(Service* svc);
+  void EnableTracing(const TraceConfig&, base::ScopedFile = base::ScopedFile());
+  void DisableTracing();
+  void FreeBuffers();
+  void WaitForTracingDisabled();
+  std::vector<protos::TracePacket> ReadBuffers();
+
+  Service::ConsumerEndpoint* endpoint() { return service_endpoint_.get(); }
+
+  // Consumer implementation.
+  MOCK_METHOD0(OnConnect, void());
+  MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD0(OnTracingDisabled, void());
+  MOCK_METHOD2(OnTraceData,
+               void(std::vector<TracePacket>* /*packets*/, bool /*has_more*/));
+
+  // gtest doesn't support move-only types. This wrapper is here jut to pass
+  // a pointer to the vector (rather than the vector itself) to the mock method.
+  void OnTraceData(std::vector<TracePacket> packets, bool has_more) override {
+    OnTraceData(&packets, has_more);
+  }
+
+ private:
+  base::TestTaskRunner* const task_runner_;
+  std::unique_ptr<Service::ConsumerEndpoint> service_endpoint_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACING_TEST_MOCK_CONSUMER_H_
diff --git a/src/tracing/test/mock_producer.cc b/src/tracing/test/mock_producer.cc
new file mode 100644
index 0000000..02f7d77
--- /dev/null
+++ b/src/tracing/test/mock_producer.cc
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/test/mock_producer.h"
+
+#include "perfetto/tracing/core/data_source_config.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
+#include "src/base/test/test_task_runner.h"
+
+using ::testing::_;
+using ::testing::Eq;
+using ::testing::Invoke;
+using ::testing::InvokeWithoutArgs;
+using ::testing::Property;
+
+namespace perfetto {
+
+MockProducer::MockProducer(base::TestTaskRunner* task_runner)
+    : task_runner_(task_runner) {}
+
+MockProducer::~MockProducer() {
+  if (!service_endpoint_)
+    return;
+  static int i = 0;
+  auto checkpoint_name = "on_producer_disconnect_" + std::to_string(i++);
+  auto on_disconnect = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnDisconnect()).WillOnce(Invoke(on_disconnect));
+  service_endpoint_.reset();
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
+void MockProducer::Connect(Service* svc,
+                           const std::string& producer_name,
+                           uid_t uid) {
+  producer_name_ = producer_name;
+  service_endpoint_ = svc->ConnectProducer(this, uid, producer_name);
+  auto checkpoint_name = "on_producer_connect_" + producer_name;
+  auto on_connect = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnConnect()).WillOnce(Invoke(on_connect));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
+void MockProducer::RegisterDataSource(const std::string& name) {
+  DataSourceDescriptor ds_desc;
+  ds_desc.set_name(name);
+  service_endpoint_->RegisterDataSource(ds_desc);
+}
+
+void MockProducer::UnregisterDataSource(const std::string& name) {
+  service_endpoint_->UnregisterDataSource(name);
+}
+
+void MockProducer::WaitForTracingSetup() {
+  static int i = 0;
+  auto checkpoint_name =
+      "on_shmem_initialized_" + producer_name_ + "_" + std::to_string(i++);
+  auto on_tracing_enabled = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnTracingSetup()).WillOnce(Invoke(on_tracing_enabled));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
+void MockProducer::WaitForDataSourceStart(const std::string& name) {
+  static int i = 0;
+  auto checkpoint_name = "on_ds_start_" + name + "_" + std::to_string(i++);
+  auto on_ds_start = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, CreateDataSourceInstance(
+                         _, Property(&DataSourceConfig::name, Eq(name))))
+      .WillOnce(Invoke([on_ds_start, this](DataSourceInstanceID ds_id,
+                                           const DataSourceConfig& cfg) {
+        EXPECT_FALSE(data_source_instances_.count(cfg.name()));
+        auto target_buffer = static_cast<BufferID>(cfg.target_buffer());
+        data_source_instances_.emplace(cfg.name(),
+                                       EnabledDataSource{ds_id, target_buffer});
+        on_ds_start();
+      }));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
+void MockProducer::WaitForDataSourceStop(const std::string& name) {
+  static int i = 0;
+  auto checkpoint_name = "on_ds_stop_" + name + "_" + std::to_string(i++);
+  auto on_ds_stop = task_runner_->CreateCheckpoint(checkpoint_name);
+  ASSERT_EQ(1u, data_source_instances_.count(name));
+  DataSourceInstanceID ds_id = data_source_instances_[name].id;
+  EXPECT_CALL(*this, TearDownDataSourceInstance(ds_id))
+      .WillOnce(InvokeWithoutArgs(on_ds_stop));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+  data_source_instances_.erase(name);
+}
+
+std::unique_ptr<TraceWriter> MockProducer::CreateTraceWriter(
+    const std::string& data_source_name) {
+  PERFETTO_DCHECK(data_source_instances_.count(data_source_name));
+  BufferID buf_id = data_source_instances_[data_source_name].target_buffer;
+  return service_endpoint_->CreateTraceWriter(buf_id);
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/test/mock_producer.h b/src/tracing/test/mock_producer.h
new file mode 100644
index 0000000..443c9fe
--- /dev/null
+++ b/src/tracing/test/mock_producer.h
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_TEST_MOCK_PRODUCER_H_
+#define SRC_TRACING_TEST_MOCK_PRODUCER_H_
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "gmock/gmock.h"
+#include "perfetto/tracing/core/producer.h"
+#include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/trace_writer.h"
+
+namespace perfetto {
+
+namespace base {
+class TestTaskRunner;
+}
+
+class MockProducer : public Producer {
+ public:
+  struct EnabledDataSource {
+    DataSourceInstanceID id;
+    BufferID target_buffer;
+  };
+
+  explicit MockProducer(base::TestTaskRunner*);
+  ~MockProducer() override;
+
+  void Connect(Service* svc, const std::string& producer_name, uid_t uid = 42);
+  void RegisterDataSource(const std::string& name);
+  void UnregisterDataSource(const std::string& name);
+  void WaitForTracingSetup();
+  void WaitForDataSourceStart(const std::string& name);
+  void WaitForDataSourceStop(const std::string& name);
+  std::unique_ptr<TraceWriter> CreateTraceWriter(
+      const std::string& data_source_name);
+
+  Service::ProducerEndpoint* endpoint() { return service_endpoint_.get(); }
+
+  // Producer implementation.
+  MOCK_METHOD0(OnConnect, void());
+  MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD2(CreateDataSourceInstance,
+               void(DataSourceInstanceID, const DataSourceConfig&));
+  MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
+  MOCK_METHOD0(OnTracingSetup, void());
+
+ private:
+  base::TestTaskRunner* const task_runner_;
+  std::string producer_name_;
+  std::unique_ptr<Service::ProducerEndpoint> service_endpoint_;
+  std::map<std::string, EnabledDataSource> data_source_instances_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACING_TEST_MOCK_PRODUCER_H_
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index d03a7e4..0d19fdc 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -60,8 +60,7 @@
                void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
   MOCK_METHOD0(uid, uid_t());
-  MOCK_METHOD0(OnTracingStart, void());
-  MOCK_METHOD0(OnTracingStop, void());
+  MOCK_METHOD0(OnTracingSetup, void());
 };
 
 class MockConsumer : public Consumer {
@@ -71,7 +70,7 @@
   // Producer implementation.
   MOCK_METHOD0(OnConnect, void());
   MOCK_METHOD0(OnDisconnect, void());
-  MOCK_METHOD0(OnTracingStop, void());
+  MOCK_METHOD0(OnTracingDisabled, void());
   MOCK_METHOD2(OnTracePackets, void(std::vector<TracePacket>*, bool));
 
   // Workaround, gmock doesn't support yet move-only types, passing a pointer.
@@ -166,7 +165,7 @@
   BufferID global_buf_id = 0;
   auto on_create_ds_instance =
       task_runner_->CreateCheckpoint("on_create_ds_instance");
-  EXPECT_CALL(producer_, OnTracingStart());
+  EXPECT_CALL(producer_, OnTracingSetup());
   EXPECT_CALL(producer_, CreateDataSourceInstance(_, _))
       .WillOnce(
           Invoke([on_create_ds_instance, &ds_iid, &global_buf_id](
@@ -258,10 +257,12 @@
   // Disable tracing.
   consumer_endpoint_->DisableTracing();
 
-  auto on_tracing_stop = task_runner_->CreateCheckpoint("on_tracing_stop");
+  auto on_tracing_disabled =
+      task_runner_->CreateCheckpoint("on_tracing_disabled");
   EXPECT_CALL(producer_, TearDownDataSourceInstance(_));
-  EXPECT_CALL(consumer_, OnTracingStop()).WillOnce(Invoke(on_tracing_stop));
-  task_runner_->RunUntilCheckpoint("on_tracing_stop");
+  EXPECT_CALL(consumer_, OnTracingDisabled())
+      .WillOnce(Invoke(on_tracing_disabled));
+  task_runner_->RunUntilCheckpoint("on_tracing_disabled");
 }
 
 TEST_F(TracingIntegrationTest, WriteIntoFile) {
@@ -280,7 +281,7 @@
   BufferID global_buf_id = 0;
   auto on_create_ds_instance =
       task_runner_->CreateCheckpoint("on_create_ds_instance");
-  EXPECT_CALL(producer_, OnTracingStart());
+  EXPECT_CALL(producer_, OnTracingSetup());
   EXPECT_CALL(producer_, CreateDataSourceInstance(_, _))
       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
                            DataSourceInstanceID, const DataSourceConfig& cfg) {
@@ -307,10 +308,12 @@
   // file before destroying them.
   consumer_endpoint_->FreeBuffers();
 
-  auto on_tracing_stop = task_runner_->CreateCheckpoint("on_tracing_stop");
+  auto on_tracing_disabled =
+      task_runner_->CreateCheckpoint("on_tracing_disabled");
   EXPECT_CALL(producer_, TearDownDataSourceInstance(_));
-  EXPECT_CALL(consumer_, OnTracingStop()).WillOnce(Invoke(on_tracing_stop));
-  task_runner_->RunUntilCheckpoint("on_tracing_stop");
+  EXPECT_CALL(consumer_, OnTracingDisabled())
+      .WillOnce(Invoke(on_tracing_disabled));
+  task_runner_->RunUntilCheckpoint("on_tracing_disabled");
 
   // Check that |tmp_file| contains a valid trace.proto message.
   ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 450e801..a05800d 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -96,8 +96,7 @@
   }
 
   void TearDownDataSourceInstance(DataSourceInstanceID) override {}
-  void OnTracingStart() override {}
-  void OnTracingStop() override {}
+  void OnTracingSetup() override {}
 
  private:
   const std::string name_;
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index a81f656..2069b5d 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -118,8 +118,6 @@
   });
 }
 
-void FakeProducer::OnTracingStart() {}
-
-void FakeProducer::OnTracingStop() {}
+void FakeProducer::OnTracingSetup() {}
 
 }  // namespace perfetto
diff --git a/test/fake_producer.h b/test/fake_producer.h
index 2495518..ecf9d89 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -49,8 +49,7 @@
   void CreateDataSourceInstance(DataSourceInstanceID,
                                 const DataSourceConfig& source_config) override;
   void TearDownDataSourceInstance(DataSourceInstanceID) override;
-  void OnTracingStart() override;
-  void OnTracingStop() override;
+  void OnTracingSetup() override;
 
  private:
   void Shutdown();
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 7e19555..9b2b39e 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -50,7 +50,7 @@
   FAIL() << "Consumer unexpectedly disconnected from the service";
 }
 
-void TestHelper::OnTracingStop() {}
+void TestHelper::OnTracingDisabled() {}
 
 void TestHelper::OnTraceData(std::vector<TracePacket> packets, bool has_more) {
   for (auto& encoded_packet : packets) {
diff --git a/test/test_helper.h b/test/test_helper.h
index b63707c..563345f 100644
--- a/test/test_helper.h
+++ b/test/test_helper.h
@@ -36,7 +36,7 @@
   // Consumer implementation.
   void OnConnect() override;
   void OnDisconnect() override;
-  void OnTracingStop() override;
+  void OnTracingDisabled() override;
   void OnTraceData(std::vector<TracePacket> packets, bool has_more) override;
 
   void StartServiceIfRequired();