Introduce support for deferred start (fast triggering)

This CL introduces new Setup methods to both the
producer and consumer API. The API changes are designed
to be backwards compatible with the prior versions of
Android.

From the producer viewpoint: each data source now sees a
SetupDataSource() method before the StartDataSource() method.
The data source instance id and config is identical. This
gives the data source a chance to setup itself before actually
starting.

From the consumer viewpoint:
A new "deferred_start" field is added to the TraceConfig.
When deferred_start is true, the EnableTracing method
brings up all the tracing harness and calls the
SetupDataSource() methods on the producers, but doesn't
start them. Data sources are started only when calling
StartDataSource().

Note that in this CL, the ftrace data source doesn't
deal yet with the split Setup/Start phases and starts
directly upon creation. This will be fixed in a separate
CL.

Bug: 116547201
Change-Id: I2ab22c7f8ee2388f8158df733dddeef9dc9c0143
diff --git a/include/perfetto/tracing/core/producer.h b/include/perfetto/tracing/core/producer.h
index 3f8698f..2c8e6f2 100644
--- a/include/perfetto/tracing/core/producer.h
+++ b/include/perfetto/tracing/core/producer.h
@@ -60,24 +60,40 @@
   // instance.
   virtual void OnDisconnect() = 0;
 
-  // Called by the Service to turn on one of the data source previously
+  // Called by the Service after OnConnect but before the first DataSource is
+  // created. Can be used for any setup required before tracing begins.
+  virtual void OnTracingSetup() = 0;
+
+  // The lifecycle methods below are always called in the following sequence:
+  // SetupDataSource  -> StartDataSource -> StopDataSource.
+  // Or, in the edge case where a trace is aborted immediately:
+  // SetupDataSource  -> StopDataSource.
+  // The Setup+Start call sequence is always guaranateed, regardless of the
+  // TraceConfig.deferred_start flags.
+  // Called by the Service to configure one of the data sources previously
   // registered through TracingService::ProducerEndpoint::RegisterDataSource().
+  // This method is always called before StartDataSource. There is always a
+  // SetupDataSource() call before each StartDataSource() call.
   // Args:
   // - DataSourceInstanceID is an identifier chosen by the Service that should
   //   be assigned to the newly created data source instance. It is used to
   //   match the StopDataSource() request below.
   // - DataSourceConfig is the configuration for the new data source (e.g.,
   //   tells which trace categories to enable).
+  virtual void SetupDataSource(DataSourceInstanceID,
+                               const DataSourceConfig&) = 0;
+
+  // Called by the Service to turn on one of the data sources previously
+  // registered through TracingService::ProducerEndpoint::RegisterDataSource()
+  // and initialized through SetupDataSource().
+  // Both arguments are guaranteed to be identical to the ones passed to the
+  // prior SetupDataSource() call.
   virtual void StartDataSource(DataSourceInstanceID,
                                const DataSourceConfig&) = 0;
 
   // Called by the Service to shut down an existing data source instance.
   virtual void StopDataSource(DataSourceInstanceID) = 0;
 
-  // Called by the Service after OnConnect but before the first DataSource is
-  // created. Can be used for any setup required before tracing begins.
-  virtual void OnTracingSetup() = 0;
-
   // Called by the service to request the Producer to commit the data of the
   // given data sources and return their chunks into the shared memory buffer.
   // The Producer is expected to invoke NotifyFlushComplete(FlushRequestID) on
diff --git a/include/perfetto/tracing/core/trace_config.h b/include/perfetto/tracing/core/trace_config.h
index df0a89e..f7ac92f 100644
--- a/include/perfetto/tracing/core/trace_config.h
+++ b/include/perfetto/tracing/core/trace_config.h
@@ -301,6 +301,9 @@
     return &guardrail_overrides_;
   }
 
+  bool deferred_start() const { return deferred_start_; }
+  void set_deferred_start(bool value) { deferred_start_ = value; }
+
  private:
   std::vector<BufferConfig> buffers_;
   std::vector<DataSource> data_sources_;
@@ -313,6 +316,7 @@
   uint32_t file_write_period_ms_ = {};
   uint64_t max_file_size_bytes_ = {};
   GuardrailOverrides guardrail_overrides_ = {};
+  bool deferred_start_ = {};
 
   // Allows to preserve unknown protobuf fields for compatibility
   // with future versions of .proto files.
diff --git a/include/perfetto/tracing/core/tracing_service.h b/include/perfetto/tracing/core/tracing_service.h
index 5759368..cdeb575 100644
--- a/include/perfetto/tracing/core/tracing_service.h
+++ b/include/perfetto/tracing/core/tracing_service.h
@@ -120,8 +120,20 @@
 
     // Enables tracing with the given TraceConfig. The ScopedFile argument is
     // used only when TraceConfig.write_into_file == true.
+    // If TraceConfig.deferred_start == true data sources are configured via
+    // SetupDataSource() but are not started until StartTracing() is called.
+    // This is to support pre-initialization and fast triggering of traces.
+    // The ScopedFile argument is used only when TraceConfig.write_into_file
+    // == true.
     virtual void EnableTracing(const TraceConfig&,
                                base::ScopedFile = base::ScopedFile()) = 0;
+
+    // Starts all data sources configured in the trace config. This is used only
+    // after calling EnableTracing() with TraceConfig.deferred_start=true.
+    // It's a no-op if called after a regular EnableTracing(), without setting
+    // deferred_start.
+    virtual void StartTracing() = 0;
+
     virtual void DisableTracing() = 0;
 
     // Requests all data sources to flush their data immediately and invokes the
diff --git a/protos/perfetto/config/perfetto_config.proto b/protos/perfetto/config/perfetto_config.proto
index 91f4aad..595011b 100644
--- a/protos/perfetto/config/perfetto_config.proto
+++ b/protos/perfetto/config/perfetto_config.proto
@@ -381,7 +381,7 @@
 // It contains the general config for the logging buffer(s) and the configs for
 // all the data source being enabled.
 //
-// Next id: 12.
+// Next id: 13.
 message TraceConfig {
   message BufferConfig {
     optional uint32 size_kb = 1;
@@ -487,6 +487,14 @@
   }
 
   optional GuardrailOverrides guardrail_overrides = 11;
+
+  // When true, data sources are not started until an explicit call to
+  // StartTracing() on the consumer port. This is to support early
+  // initialization and fast trace triggering. This can be used only when the
+  // Consumer explicitly triggers the StartTracing() method.
+  // This should not be used in a remote trace config via statsd, doing so will
+  // result in a hung trace session.
+  optional bool deferred_start = 12;
 }
 
 // End of protos/perfetto/config/trace_config.proto
diff --git a/protos/perfetto/config/trace_config.proto b/protos/perfetto/config/trace_config.proto
index 11ce8d2..08844f2 100644
--- a/protos/perfetto/config/trace_config.proto
+++ b/protos/perfetto/config/trace_config.proto
@@ -29,7 +29,7 @@
 // It contains the general config for the logging buffer(s) and the configs for
 // all the data source being enabled.
 //
-// Next id: 12.
+// Next id: 13.
 message TraceConfig {
   message BufferConfig {
     optional uint32 size_kb = 1;
@@ -135,4 +135,12 @@
   }
 
   optional GuardrailOverrides guardrail_overrides = 11;
+
+  // When true, data sources are not started until an explicit call to
+  // StartTracing() on the consumer port. This is to support early
+  // initialization and fast trace triggering. This can be used only when the
+  // Consumer explicitly triggers the StartTracing() method.
+  // This should not be used in a remote trace config via statsd, doing so will
+  // result in a hung trace session.
+  optional bool deferred_start = 12;
 }
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index 8c3e9b8..a8d5a84 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -33,8 +33,20 @@
   // Enables tracing for one or more data sources. At least one buffer must have
   // been previously created. The EnableTracingResponse is sent when tracing is
   // disabled (either explicitly or because of the |duration_ms| expired).
+  // The EnableTracingResponse is sent when tracing is disabled (either
+  // explicitly or because of the |duration_ms| expired).
+  // If |deferred_start| == true in the passed TraceConfig, all the tracing
+  // harness is brought up (creating buffers and data sources) without actually
+  // starting the data sources. Data sources will be started upon an explicit
+  // StartTracing() call.
+  // Note that |deferred_start| and StartTracing() have been introduced only
+  // in Android Q and are not supported in Android P.
   rpc EnableTracing(EnableTracingRequest) returns (EnableTracingResponse) {}
 
+  // Starts tracing. Only valid if EnableTracing() was called setting
+  // deferred_start = true in the TraceConfig passed to EnableTracing().
+  rpc StartTracing(StartTracingRequest) returns (StartTracingResponse) {}
+
   // Disables tracing for one or more data sources.
   rpc DisableTracing(DisableTracingRequest) returns (DisableTracingResponse) {}
 
@@ -70,6 +82,11 @@
   oneof state { bool disabled = 1; }
 }
 
+// Arguments for rpc StartTracing().
+message StartTracingRequest {}
+
+message StartTracingResponse {}
+
 // Arguments for rpc DisableTracing().
 message DisableTracingRequest {
   // TODO: not supported yet, selectively disable only some data sources.
diff --git a/protos/perfetto/ipc/producer_port.proto b/protos/perfetto/ipc/producer_port.proto
index b743914..0a27f82 100644
--- a/protos/perfetto/ipc/producer_port.proto
+++ b/protos/perfetto/ipc/producer_port.proto
@@ -118,8 +118,18 @@
 message GetAsyncCommandRequest {}
 
 message GetAsyncCommandResponse {
+  // Called after SetupTracing and before StartDataSource.
+  // This message was introduced in Android Q.
+  message SetupDataSource {
+    optional uint64 new_instance_id = 1;
+    optional protos.DataSourceConfig config = 2;
+  }
+
   message StartDataSource {
     optional uint64 new_instance_id = 1;
+
+    // For backwards compat reasons (with Android P), the config passed here
+    // is identical to the one passed to SetupDataSource.config.
     optional protos.DataSourceConfig config = 2;
   }
 
@@ -140,10 +150,12 @@
     optional uint64 request_id = 2;
   }
 
+  // Next id: 7.
   oneof cmd {
+    SetupTracing setup_tracing = 3;
+    SetupDataSource setup_data_source = 6;
     StartDataSource start_data_source = 1;
     StopDataSource stop_data_source = 2;
-    SetupTracing setup_tracing = 3;
     // id == 4 was teardown_tracing, never implemented.
     Flush flush = 5;
   }
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index 02fcbf8..9d3a098 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -131,6 +131,11 @@
 
 InodeFileDataSource::~InodeFileDataSource() = default;
 
+void InodeFileDataSource::Start() {
+  // Nothing special to do, this data source is only reacting to on-demand
+  // events such as OnInodes().
+}
+
 void InodeFileDataSource::AddInodesFromStaticMap(
     BlockDeviceID block_device_id,
     std::set<Inode>* inode_numbers) {
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 181dfda..143df66 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -79,12 +79,14 @@
   void AddInodesFromLRUCache(BlockDeviceID block_device_id,
                              std::set<Inode>* inode_numbers);
 
-  void Flush() override;
-
   virtual void FillInodeEntry(InodeFileMap* destination,
                               Inode inode_number,
                               const InodeMapValue& inode_map_value);
 
+  // ProbesDataSource implementation.
+  void Start() override;
+  void Flush() override;
+
  protected:
   std::multimap<BlockDeviceID, std::string> mount_points_;
 
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index da5391e..d209709 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -43,6 +43,11 @@
     controller_weak_->RemoveDataSource(this);
 };
 
+void FtraceDataSource::Start() {
+  // TODO(primiano): implement in next CL. For now the ftrace data source
+  // still starts immediately on creation.
+}
+
 void FtraceDataSource::Initialize(FtraceConfigId config_id,
                                   std::unique_ptr<EventFilter> event_filter) {
   config_id_ = config_id;
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index 3def5fd..e576a5a 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -62,6 +62,9 @@
   // source, to inject ftrace dependencies.
   void Initialize(FtraceConfigId, std::unique_ptr<EventFilter>);
 
+  // ProbesDataSource implementation.
+  void Start() override;
+
   // Flushes the ftrace buffers into the userspace trace buffers and writes
   // also ftrace stats.
   void Flush() override;
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
index 3e823e0..13d47e1 100644
--- a/src/traced/probes/probes_data_source.h
+++ b/src/traced/probes/probes_data_source.h
@@ -28,10 +28,12 @@
   ProbesDataSource(TracingSessionID, int type_id);
   virtual ~ProbesDataSource();
 
+  virtual void Start() = 0;
   virtual void Flush() = 0;
 
   const TracingSessionID tracing_session_id;
   const int type_id;
+  bool started = false;  // Set by probes_producer.cc.
 
  private:
   ProbesDataSource(const ProbesDataSource&) = delete;
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 5b37165..6a56c71 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -129,8 +129,10 @@
   ConnectWithRetries(socket_name, task_runner);
 }
 
-void ProbesProducer::StartDataSource(DataSourceInstanceID instance_id,
+void ProbesProducer::SetupDataSource(DataSourceInstanceID instance_id,
                                      const DataSourceConfig& config) {
+  PERFETTO_DLOG("SetupDataSource(id=%" PRIu64 ", name=%s)", instance_id,
+                config.name().c_str());
   PERFETTO_DCHECK(data_sources_.count(instance_id) == 0);
   TracingSessionID session_id = config.tracing_session_id();
   PERFETTO_CHECK(session_id > 0);
@@ -153,12 +155,27 @@
 
   session_data_sources_.emplace(session_id, data_source.get());
   data_sources_[instance_id] = std::move(data_source);
+}
 
+void ProbesProducer::StartDataSource(DataSourceInstanceID instance_id,
+                                     const DataSourceConfig& config) {
+  PERFETTO_DLOG("StartDataSource(id=%" PRIu64 ", name=%s)", instance_id,
+                config.name().c_str());
+  auto it = data_sources_.find(instance_id);
+  if (it == data_sources_.end()) {
+    PERFETTO_ELOG("Data source id=%" PRIu64 " not found", instance_id);
+    return;
+  }
+  ProbesDataSource* data_source = it->second.get();
+  if (data_source->started)
+    return;
   if (config.trace_duration_ms() != 0) {
     uint32_t timeout = 5000 + 2 * config.trace_duration_ms();
     watchdogs_.emplace(
         instance_id, base::Watchdog::GetInstance()->CreateFatalTimer(timeout));
   }
+  data_source->started = true;
+  data_source->Start();
 }
 
 std::unique_ptr<ProbesDataSource> ProbesProducer::CreateFtraceDataSource(
@@ -185,7 +202,7 @@
     ftrace_->ClearTrace();
   }
 
-  PERFETTO_LOG("Ftrace start (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
+  PERFETTO_LOG("Ftrace setup (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
                config.target_buffer());
   const BufferID buffer_id = static_cast<BufferID>(config.target_buffer());
   std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
@@ -204,7 +221,7 @@
     TracingSessionID session_id,
     DataSourceInstanceID id,
     DataSourceConfig source_config) {
-  PERFETTO_LOG("Inode file map start (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
+  PERFETTO_LOG("Inode file map setup (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
                id, source_config.target_buffer());
   auto buffer_id = static_cast<BufferID>(source_config.target_buffer());
   if (system_inodes_.empty())
@@ -223,9 +240,6 @@
   auto data_source =
       std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
           session_id, endpoint_->CreateTraceWriter(buffer_id), config));
-  if (config.process_stats_config().scan_all_processes_on_start()) {
-    data_source->WriteAllProcesses();
-  }
   return std::move(data_source);
 }
 
@@ -268,7 +282,7 @@
                            size_t num_data_sources) {
   for (size_t i = 0; i < num_data_sources; i++) {
     auto it = data_sources_.find(data_source_ids[i]);
-    if (it == data_sources_.end())
+    if (it == data_sources_.end() || !it->second->started)
       continue;
     it->second->Flush();
   }
@@ -288,7 +302,7 @@
   // unordered_multimap guarantees that entries with the same key are contiguous
   // in the iteration.
   for (auto it = session_data_sources_.begin(); /* check below*/; it++) {
-    // If this is the last iteration or this is the session id has changed,
+    // If this is the last iteration or the session id has changed,
     // dispatch the metadata update to the linked data sources, if any.
     if (it == session_data_sources_.end() || it->first != last_session_id) {
       bool has_inodes = metadata && !metadata->inode_and_device.empty();
@@ -307,6 +321,8 @@
       last_session_id = it->first;
     }
     ProbesDataSource* ds = it->second;
+    if (!ds->started)
+      continue;
     switch (ds->type_id) {
       case FtraceDataSource::kTypeId:
         metadata = static_cast<FtraceDataSource*>(ds)->mutable_metadata();
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index e7b9403..14737d6 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -49,6 +49,7 @@
   // Producer Impl:
   void OnConnect() override;
   void OnDisconnect() override;
+  void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
   void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
   void StopDataSource(DataSourceInstanceID) override;
   void OnTracingSetup() override;
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index 80d125b..160484f 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -75,10 +75,11 @@
     const DataSourceConfig& config)
     : ProbesDataSource(session_id, kTypeId),
       writer_(std::move(writer)),
-      config_(config),
       record_thread_names_(config.process_stats_config().record_thread_names()),
+      dump_all_procs_on_start_(
+          config.process_stats_config().scan_all_processes_on_start()),
       weak_factory_(this) {
-  const auto& quirks = config_.process_stats_config().quirks();
+  const auto& quirks = config.process_stats_config().quirks();
   enable_on_demand_dumps_ =
       (std::find(quirks.begin(), quirks.end(),
                  ProcessStatsConfig::DISABLE_ON_DEMAND) == quirks.end());
@@ -86,6 +87,11 @@
 
 ProcessStatsDataSource::~ProcessStatsDataSource() = default;
 
+void ProcessStatsDataSource::Start() {
+  if (dump_all_procs_on_start_)
+    WriteAllProcesses();
+}
+
 base::WeakPtr<ProcessStatsDataSource> ProcessStatsDataSource::GetWeakPtr()
     const {
   return weak_factory_.GetWeakPtr();
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 866d2cf..57f0324 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -44,16 +44,17 @@
                          const DataSourceConfig&);
   ~ProcessStatsDataSource() override;
 
-  const DataSourceConfig& config() const { return config_; }
-
   base::WeakPtr<ProcessStatsDataSource> GetWeakPtr() const;
   void WriteAllProcesses();
   void OnPids(const std::vector<int32_t>& pids);
-  void Flush() override;
 
   // Virtual for testing.
   virtual std::string ReadProcPidFile(int32_t pid, const std::string& file);
 
+  // ProbesDataSource implementation.
+  void Start() override;
+  void Flush() override;
+
  private:
   ProcessStatsDataSource(const ProcessStatsDataSource&) = delete;
   ProcessStatsDataSource& operator=(const ProcessStatsDataSource&) = delete;
@@ -67,11 +68,11 @@
   void FinalizeCurPsTree();
 
   std::unique_ptr<TraceWriter> writer_;
-  const DataSourceConfig config_;
   TraceWriter::TracePacketHandle cur_packet_;
   protos::pbzero::ProcessTree* cur_ps_tree_ = nullptr;
   bool record_thread_names_ = false;
   bool enable_on_demand_dumps_ = true;
+  bool dump_all_procs_on_start_ = false;
 
   // This set contains PIDs as per the Linux kernel notion of a PID (which is
   // really a TID). In practice this set will contain all TIDs for all processes
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.cc b/src/traced/probes/sys_stats/sys_stats_data_source.cc
index ebc3733..cd6efd1 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.cc
@@ -125,6 +125,9 @@
   meminfo_ticks_ = ticks[0];
   vmstat_ticks_ = ticks[1];
   stat_ticks_ = ticks[2];
+}
+
+void SysStatsDataSource::Start() {
   auto weak_this = GetWeakPtr();
   task_runner_->PostTask(std::bind(&SysStatsDataSource::Tick, weak_this));
 }
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.h b/src/traced/probes/sys_stats/sys_stats_data_source.h
index a5ea4f7..6081374 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.h
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.h
@@ -55,6 +55,7 @@
   ~SysStatsDataSource() override;
 
   // ProbesDataSource implementation.
+  void Start() override;
   void Flush() override;
 
   base::WeakPtr<SysStatsDataSource> GetWeakPtr() const;
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc b/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
index e2c18d9..763d8d1 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
@@ -209,6 +209,7 @@
     auto instance = std::unique_ptr<SysStatsDataSource>(new SysStatsDataSource(
         &task_runner_, 0, std::move(writer), cfg, MockOpenReadOnly));
     instance->set_ns_per_user_hz_for_testing(1000000000ull / 100);  // 100 Hz.
+    instance->Start();
     return instance;
   }
 
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 8787672..5a0ca16 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -154,8 +154,13 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
+  // Calling StartTracing() should be a noop (% a DLOG statement) because the
+  // trace config didn't have the |deferred_start| flag set.
+  consumer->StartTracing();
+
   consumer->DisableTracing();
   producer->WaitForDataSourceStop("data_source");
   consumer->WaitForTracingDisabled();
@@ -178,6 +183,7 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<MockProducer> producer_otheruid = CreateMockProducer();
@@ -195,6 +201,7 @@
   trace_config.set_lockdown_mode(
       TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR);
   consumer->EnableTracing(trace_config);
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<MockProducer> producer_otheruid2 = CreateMockProducer();
@@ -220,6 +227,7 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   // Disconnecting the consumer while tracing should trigger data source
@@ -243,6 +251,7 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   // Disconnecting and reconnecting a producer with a matching data source.
@@ -252,6 +261,7 @@
   producer->Connect(svc.get(), "mock_producer_2");
   producer->RegisterDataSource("data_source");
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 }
 
@@ -303,6 +313,7 @@
   consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   static const char kPayload[] = "1234567890abcdef-";
@@ -397,12 +408,15 @@
   size_t actual_page_sizes_kb[kNumProducers]{};
   for (size_t i = 0; i < kNumProducers; i++) {
     producer[i]->WaitForTracingSetup();
-    producer[i]->WaitForDataSourceStart("data_source");
+    producer[i]->WaitForDataSourceSetup("data_source");
     actual_shm_sizes_kb[i] =
         producer[i]->endpoint()->shared_memory()->size() / 1024;
     actual_page_sizes_kb[i] =
         producer[i]->endpoint()->shared_buffer_page_size_kb();
   }
+  for (size_t i = 0; i < kNumProducers; i++) {
+    producer[i]->WaitForDataSourceStart("data_source");
+  }
   ASSERT_THAT(actual_page_sizes_kb, ElementsAreArray(kExpectedPageSizesKb));
   ASSERT_THAT(actual_shm_sizes_kb, ElementsAreArray(kExpectedSizesKb));
 }
@@ -422,6 +436,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -460,6 +475,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -498,6 +514,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -564,6 +581,11 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+
+  producer->WaitForDataSourceSetup("ds_will_ack_1");
+  producer->WaitForDataSourceSetup("ds_wont_ack");
+  producer->WaitForDataSourceSetup("ds_will_ack_2");
+
   producer->WaitForDataSourceStart("ds_will_ack_1");
   producer->WaitForDataSourceStart("ds_wont_ack");
   producer->WaitForDataSourceStart("ds_will_ack_2");
@@ -636,6 +658,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -675,11 +698,15 @@
 
     if (i == 0)
       producer1->WaitForTracingSetup();
-    producer1->WaitForDataSourceStart("ds_1A");
-    producer1->WaitForDataSourceStart("ds_1B");
 
+    producer1->WaitForDataSourceSetup("ds_1A");
+    producer1->WaitForDataSourceSetup("ds_1B");
     if (i == 0)
       producer2->WaitForTracingSetup();
+    producer2->WaitForDataSourceSetup("ds_2A");
+
+    producer1->WaitForDataSourceStart("ds_1A");
+    producer1->WaitForDataSourceStart("ds_1B");
     producer2->WaitForDataSourceStart("ds_2A");
 
     auto* ds1 = producer1->GetDataSourceInstance("ds_1A");
@@ -723,6 +750,7 @@
   base::TempFile tmp_file = base::TempFile::Create();
   consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   // Write some variable length payload, waiting for sync markers every now
@@ -777,4 +805,42 @@
   EXPECT_EQ(whole_trace.SerializeAsString(), merged_trace.SerializeAsString());
 }
 
+// Creates a tracing session with |deferred_start| and checks that data sources
+// are started only after calling StartTracing().
+TEST_F(TracingServiceImplTest, DeferredStart) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+
+  // Create two data sources but enable only one of them.
+  producer->RegisterDataSource("ds_1");
+  producer->RegisterDataSource("ds_2");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  trace_config.add_data_sources()->mutable_config()->set_name("ds_1");
+  trace_config.set_deferred_start(true);
+  trace_config.set_duration_ms(1);
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+
+  producer->WaitForDataSourceSetup("ds_1");
+
+  // Make sure we don't get unexpected DataSourceStart() notifications yet.
+  task_runner.RunUntilIdle();
+
+  consumer->StartTracing();
+
+  producer->WaitForDataSourceStart("ds_1");
+
+  auto writer1 = producer->CreateTraceWriter("ds_1");
+  producer->WaitForFlush(writer1.get());
+
+  producer->WaitForDataSourceStop("ds_1");
+  consumer->WaitForTracingDisabled();
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/core/trace_config.cc b/src/tracing/core/trace_config.cc
index b4d0081..a5cf0bb 100644
--- a/src/tracing/core/trace_config.cc
+++ b/src/tracing/core/trace_config.cc
@@ -92,6 +92,11 @@
       static_cast<decltype(max_file_size_bytes_)>(proto.max_file_size_bytes());
 
   guardrail_overrides_.FromProto(proto.guardrail_overrides());
+
+  static_assert(sizeof(deferred_start_) == sizeof(proto.deferred_start()),
+                "size mismatch");
+  deferred_start_ =
+      static_cast<decltype(deferred_start_)>(proto.deferred_start());
   unknown_fields_ = proto.unknown_fields();
 }
 
@@ -152,6 +157,11 @@
           max_file_size_bytes_));
 
   guardrail_overrides_.ToProto(proto->mutable_guardrail_overrides());
+
+  static_assert(sizeof(deferred_start_) == sizeof(proto->deferred_start()),
+                "size mismatch");
+  proto->set_deferred_start(
+      static_cast<decltype(proto->deferred_start())>(deferred_start_));
   *(proto->mutable_unknown_fields()) = unknown_fields_;
 }
 
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index 2483235..9cab744 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -238,6 +238,11 @@
   }
 
   if (cfg.enable_extra_guardrails()) {
+    if (cfg.deferred_start()) {
+      PERFETTO_ELOG(
+          "deferred_start=true is not supported in unsupervised traces");
+      return false;
+    }
     if (cfg.duration_ms() > kMaxTracingDurationMillis) {
       PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms  > %" PRIu64
                     " ms)",
@@ -340,7 +345,7 @@
 
   consumer->tracing_session_id_ = tsid;
 
-  // Enable the data sources on the producers.
+  // Setup the data sources on the producers without starting them.
   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
     // Scan all the registered data sources with a matching name.
     auto range = data_sources_.equal_range(cfg_data_source.config().name());
@@ -353,13 +358,45 @@
           break;
         }
       }
-      StartDataSource(cfg_data_source, producer_config, it->second,
+      SetupDataSource(cfg_data_source, producer_config, it->second,
                       tracing_session);
     }
   }
 
+  tracing_session->pending_stop_acks.clear();
+  tracing_session->state = TracingSession::CONFIGURED;
+  PERFETTO_LOG(
+      "Configured tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
+      "buffer size:%zu KB, total sessions:%zu",
+      cfg.data_sources().size(), cfg.duration_ms(), cfg.buffers_size(),
+      total_buf_size_kb, tracing_sessions_.size());
+
+  // Start the data sources, unless this is a case of early setup + fast
+  // triggering, using TraceConfig.deferred_start.
+  if (!cfg.deferred_start())
+    return StartTracing(tsid);
+
+  return true;
+}
+
+bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  TracingSession* tracing_session = GetTracingSession(tsid);
+  if (!tracing_session) {
+    PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
+    return false;
+  }
+
+  if (tracing_session->state != TracingSession::CONFIGURED) {
+    PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
+                  tracing_session->state);
+    return false;
+  }
+
+  tracing_session->state = TracingSession::STARTED;
+
   // Trigger delayed task if the trace is time limited.
-  const uint32_t trace_duration_ms = cfg.duration_ms();
+  const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
   if (trace_duration_ms > 0) {
     auto weak_this = weak_ptr_factory_.GetWeakPtr();
     task_runner_->PostDelayedTask(
@@ -371,7 +408,7 @@
   }
 
   // Start the periodic drain tasks if we should to save the trace into a file.
-  if (cfg.write_into_file()) {
+  if (tracing_session->config.write_into_file()) {
     auto weak_this = weak_ptr_factory_.GetWeakPtr();
     task_runner_->PostDelayedTask(
         [weak_this, tsid] {
@@ -381,13 +418,16 @@
         tracing_session->delay_to_next_write_period_ms());
   }
 
-  tracing_session->pending_stop_acks.clear();
-  tracing_session->state = TracingSession::ENABLED;
-  PERFETTO_LOG(
-      "Enabled tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
-      "buffer size:%zu KB, total sessions:%zu",
-      cfg.data_sources().size(), trace_duration_ms, cfg.buffers_size(),
-      total_buf_size_kb, tracing_sessions_.size());
+  for (const auto& kv : tracing_session->data_source_instances) {
+    ProducerID producer_id = kv.first;
+    const DataSourceInstance& data_source = kv.second;
+    ProducerEndpointImpl* producer = GetProducer(producer_id);
+    if (!producer) {
+      PERFETTO_DCHECK(false);
+      continue;
+    }
+    producer->StartDataSource(data_source.instance_id, data_source.config);
+  }
   return true;
 }
 
@@ -427,8 +467,15 @@
         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
       return;
 
+    // Continues below.
+    case TracingSession::CONFIGURED:
+      // If the session didn't even start there is no need to orchestrate a
+      // graceful stop of data sources.
+      disable_immediately = true;
+      break;
+
     // This is the nominal case, continues below.
-    case TracingSession::ENABLED:
+    case TracingSession::STARTED:
       break;
   }
 
@@ -441,7 +488,7 @@
       tracing_session->pending_stop_acks.insert(
           std::make_pair(producer_id, ds_inst_id));
     }
-    producer->TearDownDataSource(ds_inst_id);
+    producer->StopDataSource(ds_inst_id);
   }
   tracing_session->data_source_instances.clear();
 
@@ -497,8 +544,9 @@
   PERFETTO_DCHECK_THREAD(thread_checker_);
   TracingSession* tracing_session = GetTracingSession(tsid);
   if (!tracing_session ||
-      tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS)
+      tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
     return;  // Tracing session was successfully disabled.
+  }
 
   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
                 tsid);
@@ -798,7 +846,7 @@
     if (stop_writing_into_file) {
       tracing_session->write_into_file.reset();
       tracing_session->write_period_ms = 0;
-      if (tracing_session->state == TracingSession::ENABLED)
+      if (tracing_session->state == TracingSession::STARTED)
         DisableTracing(tsid);
       return;
     }
@@ -873,8 +921,10 @@
 
   for (auto& iter : tracing_sessions_) {
     TracingSession& tracing_session = iter.second;
-    if (tracing_session.state != TracingSession::ENABLED)
+    if (tracing_session.state != TracingSession::STARTED &&
+        tracing_session.state != TracingSession::CONFIGURED) {
       continue;
+    }
 
     TraceConfig::ProducerConfig producer_config;
     for (auto& config : tracing_session.config.producers()) {
@@ -885,9 +935,12 @@
     }
     for (const TraceConfig::DataSource& cfg_data_source :
          tracing_session.config.data_sources()) {
-      if (cfg_data_source.config().name() == desc.name())
-        StartDataSource(cfg_data_source, producer_config, reg_ds->second,
-                        &tracing_session);
+      if (cfg_data_source.config().name() != desc.name())
+        continue;
+      DataSourceInstance* ds_inst = SetupDataSource(
+          cfg_data_source, producer_config, reg_ds->second, &tracing_session);
+      if (ds_inst && tracing_session.state == TracingSession::STARTED)
+        producer->StartDataSource(ds_inst->instance_id, ds_inst->config);
     }
   }
 }
@@ -903,7 +956,7 @@
     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
       if (it->first == producer_id && it->second.data_source_name == name) {
         DataSourceInstanceID ds_inst_id = it->second.instance_id;
-        producer->TearDownDataSource(ds_inst_id);
+        producer->StopDataSource(ds_inst_id);
         it = ds_instances.erase(it);
       } else {
         ++it;
@@ -926,7 +979,7 @@
   PERFETTO_DCHECK(false);
 }
 
-void TracingServiceImpl::StartDataSource(
+TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
     const TraceConfig::DataSource& cfg_data_source,
     const TraceConfig::ProducerConfig& producer_config,
     const RegisteredDataSource& data_source,
@@ -938,7 +991,7 @@
   // ftrace, we must not enable it in that case.
   if (lockdown_mode_ && producer->uid_ != uid_) {
     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
-    return;
+    return nullptr;
   }
   // TODO(primiano): Add tests for registration ordering
   // (data sources vs consumers).
@@ -950,10 +1003,19 @@
       PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
                     cfg_data_source.config().name().c_str(),
                     producer->name_.c_str());
-      return;
+      return nullptr;
     }
   }
 
+  auto relative_buffer_id = cfg_data_source.config().target_buffer();
+  if (relative_buffer_id >= tracing_session->num_buffers()) {
+    PERFETTO_LOG(
+        "The TraceConfig for DataSource %s specified a target_buffer out of "
+        "bound (%d). Skipping it.",
+        cfg_data_source.config().name().c_str(), relative_buffer_id);
+    return nullptr;
+  }
+
   // Create a copy of the DataSourceConfig specified in the trace config. This
   // will be passed to the producer after translating the |target_buffer| id.
   // The |target_buffer| parameter passed by the consumer in the trace config is
@@ -961,27 +1023,22 @@
   // translated to the global BufferID before passing it to the producers, which
   // don't know anything about tracing sessions and consumers.
 
-  DataSourceConfig ds_config = cfg_data_source.config();  // Deliberate copy.
+  DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
+  auto insert_iter = tracing_session->data_source_instances.emplace(
+      producer->id_,
+      DataSourceInstance{inst_id,
+                         cfg_data_source.config(),  //  Deliberate copy.
+                         data_source.descriptor.name(),
+                         data_source.descriptor.will_notify_on_stop()});
+  DataSourceInstance* ds_instance = &insert_iter->second;
+  DataSourceConfig& ds_config = ds_instance->config;
   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
   ds_config.set_tracing_session_id(tracing_session->id);
-  auto relative_buffer_id = ds_config.target_buffer();
-  if (relative_buffer_id >= tracing_session->num_buffers()) {
-    PERFETTO_LOG(
-        "The TraceConfig for DataSource %s specified a target_buffer out of "
-        "bound (%d). Skipping it.",
-        ds_config.name().c_str(), relative_buffer_id);
-    return;
-  }
   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
   PERFETTO_DCHECK(global_id);
   ds_config.set_target_buffer(global_id);
 
-  DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
-  tracing_session->data_source_instances.emplace(
-      producer->id_,
-      DataSourceInstance{inst_id, data_source.descriptor.name(),
-                         data_source.descriptor.will_notify_on_stop()});
-  PERFETTO_DLOG("Starting data source %s with target buffer %" PRIu16,
+  PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
                 ds_config.name().c_str(), global_id);
   if (!producer->shared_memory()) {
     // Determine the SMB page size. Must be an integer multiple of 4k.
@@ -1011,7 +1068,8 @@
     producer->OnTracingSetup();
     UpdateMemoryGuardrail();
   }
-  producer->StartDataSource(inst_id, ds_config);
+  producer->SetupDataSource(inst_id, ds_config);
+  return ds_instance;
 }
 
 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
@@ -1311,6 +1369,15 @@
     NotifyOnTracingDisabled();
 }
 
+void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  if (!tracing_session_id_) {
+    PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
+    return;
+  }
+  service_->StartTracing(tracing_session_id_);
+}
+
 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   if (!tracing_session_id_) {
@@ -1473,7 +1540,7 @@
   return shared_buffer_page_size_kb_;
 }
 
-void TracingServiceImpl::ProducerEndpointImpl::TearDownDataSource(
+void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
     DataSourceInstanceID ds_inst_id) {
   // TODO(primiano): When we'll support tearing down the SMB, at this point we
   // should send the Producer a TearDownTracing if all its data sources have
@@ -1524,6 +1591,17 @@
   });
 }
 
+void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
+    DataSourceInstanceID ds_id,
+    const DataSourceConfig& config) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, ds_id, config] {
+    if (weak_this)
+      weak_this->producer_->SetupDataSource(ds_id, std::move(config));
+  });
+}
+
 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
     DataSourceInstanceID ds_id,
     const DataSourceConfig& config) {
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index d50057e..9c325cd 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -83,8 +83,9 @@
     size_t shared_buffer_page_size_kb() const override;
 
     void OnTracingSetup();
+    void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
     void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
-    void TearDownDataSource(DataSourceInstanceID);
+    void StopDataSource(DataSourceInstanceID);
     void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
 
    private:
@@ -122,6 +123,7 @@
 
     // TracingService::ConsumerEndpoint implementation.
     void EnableTracing(const TraceConfig&, base::ScopedFile) override;
+    void StartTracing() override;
     void DisableTracing() override;
     void ReadBuffers() override;
     void FreeBuffers() override;
@@ -167,6 +169,7 @@
   bool EnableTracing(ConsumerEndpointImpl*,
                      const TraceConfig&,
                      base::ScopedFile);
+  bool StartTracing(TracingSessionID);
   void DisableTracing(TracingSessionID, bool disable_immediately = false);
   void Flush(TracingSessionID tsid,
              uint32_t timeout_ms,
@@ -201,7 +204,12 @@
 
   // Represents an active data source for a tracing session.
   struct DataSourceInstance {
+    DataSourceInstance(const DataSourceInstance&) = delete;
+    DataSourceInstance& operator=(const DataSourceInstance&) = delete;
+    DataSourceInstance(DataSourceInstance&&) noexcept = default;
+
     DataSourceInstanceID instance_id;
+    DataSourceConfig config;
     std::string data_source_name;
     bool will_notify_on_stop;
   };
@@ -215,7 +223,12 @@
   // Holds the state of a tracing session. A tracing session is uniquely bound
   // a specific Consumer. Each Consumer can own one or more sessions.
   struct TracingSession {
-    enum State { DISABLED = 0, ENABLED, DISABLING_WAITING_STOP_ACKS };
+    enum State {
+      DISABLED = 0,
+      CONFIGURED,
+      STARTED,
+      DISABLING_WAITING_STOP_ACKS
+    };
 
     TracingSession(TracingSessionID, ConsumerEndpointImpl*, const TraceConfig&);
 
@@ -276,10 +289,10 @@
   TracingServiceImpl(const TracingServiceImpl&) = delete;
   TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
 
-  void StartDataSource(const TraceConfig::DataSource&,
-                       const TraceConfig::ProducerConfig&,
-                       const RegisteredDataSource&,
-                       TracingSession*);
+  DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
+                                      const TraceConfig::ProducerConfig&,
+                                      const RegisteredDataSource&,
+                                      TracingSession*);
 
   // Returns the next available ProducerID that is not in |producers_|.
   ProducerID GetNextProducerID();
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index 05d67fa..cb54e43 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -87,6 +87,22 @@
   consumer_port_.EnableTracing(req, std::move(async_response), *fd);
 }
 
+void ConsumerIPCClientImpl::StartTracing() {
+  if (!connected_) {
+    PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
+    return;
+  }
+
+  ipc::Deferred<protos::StartTracingResponse> async_response;
+  async_response.Bind(
+      [](ipc::AsyncResult<protos::StartTracingResponse> response) {
+        if (!response)
+          PERFETTO_DLOG("StartTracing() failed");
+      });
+  protos::StartTracingRequest req;
+  consumer_port_.StartTracing(req, std::move(async_response));
+}
+
 void ConsumerIPCClientImpl::DisableTracing() {
   if (!connected_) {
     PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index 4257a54..5ed9516 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -60,6 +60,7 @@
   // These methods are invoked by the actual Consumer(s) code by clients of the
   // tracing library, which know nothing about the IPC transport.
   void EnableTracing(const TraceConfig&, base::ScopedFile) override;
+  void StartTracing() override;
   void DisableTracing() override;
   void ReadBuffers() override;
   void FreeBuffers() override;
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.cc b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
index 2abdcf7..1d2f907 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.cc
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
@@ -95,6 +95,7 @@
   PERFETTO_DLOG("Tracing service connection failure");
   connected_ = false;
   producer_->OnDisconnect();
+  data_sources_setup_.clear();
 }
 
 void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
@@ -109,11 +110,29 @@
 void ProducerIPCClientImpl::OnServiceRequest(
     const protos::GetAsyncCommandResponse& cmd) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
+
+  // This message is sent only when connecting to a service running Android Q+.
+  // See comment below in kStartDataSource.
+  if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupDataSource) {
+    const auto& req = cmd.setup_data_source();
+    const DataSourceInstanceID dsid = req.new_instance_id();
+    DataSourceConfig cfg;
+    cfg.FromProto(req.config());
+    data_sources_setup_.insert(dsid);
+    producer_->SetupDataSource(dsid, cfg);
+    return;
+  }
+
   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStartDataSource) {
     const auto& req = cmd.start_data_source();
     const DataSourceInstanceID dsid = req.new_instance_id();
     DataSourceConfig cfg;
     cfg.FromProto(req.config());
+    if (!data_sources_setup_.count(dsid)) {
+      // When connecting with an older (Android P) service, the service will not
+      // send a SetupDataSource message. We synthesize it here in that case.
+      producer_->SetupDataSource(dsid, cfg);
+    }
     producer_->StartDataSource(dsid, cfg);
     return;
   }
@@ -121,6 +140,7 @@
   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStopDataSource) {
     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
     producer_->StopDataSource(dsid);
+    data_sources_setup_.erase(dsid);
     return;
   }
 
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.h b/src/tracing/ipc/producer/producer_ipc_client_impl.h
index 743d632..ba749ad 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.h
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.h
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 
+#include <set>
 #include <vector>
 
 #include "perfetto/base/thread_checker.h"
@@ -99,6 +100,7 @@
   std::unique_ptr<PosixSharedMemory> shared_memory_;
   std::unique_ptr<SharedMemoryArbiter> shared_memory_arbiter_;
   size_t shared_buffer_page_size_kb_ = 0;
+  std::set<DataSourceInstanceID> data_sources_setup_;
   bool connected_ = false;
   std::string const name_;
   PERFETTO_THREAD_CHECKER(thread_checker_)
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index d378c67..cb18004 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -71,6 +71,14 @@
 }
 
 // Called by the IPC layer.
+void ConsumerIPCService::StartTracing(const protos::StartTracingRequest&,
+                                      DeferredStartTracingResponse resp) {
+  RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
+  remote_consumer->service_endpoint->StartTracing();
+  resp.Resolve(ipc::AsyncResult<protos::StartTracingResponse>::Create());
+}
+
+// Called by the IPC layer.
 void ConsumerIPCService::DisableTracing(const protos::DisableTracingRequest&,
                                         DeferredDisableTracingResponse resp) {
   GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index 3fdc5d6..acf7d2f 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -46,6 +46,8 @@
   // ConsumerPort implementation (from .proto IPC definition).
   void EnableTracing(const protos::EnableTracingRequest&,
                      DeferredEnableTracingResponse) override;
+  void StartTracing(const protos::StartTracingRequest&,
+                    DeferredStartTracingResponse) override;
   void DisableTracing(const protos::DisableTracingRequest&,
                       DeferredDisableTracingResponse) override;
   void ReadBuffers(const protos::ReadBuffersRequest&,
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index e2c2ab9..95dcf67 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -211,6 +211,24 @@
 // |service_endpoint| (in the RemoteProducer dtor).
 void ProducerIPCService::RemoteProducer::OnDisconnect() {}
 
+// Invoked by the |core_service_| business logic when it wants to create a new
+// data source.
+void ProducerIPCService::RemoteProducer::SetupDataSource(
+    DataSourceInstanceID dsid,
+    const DataSourceConfig& cfg) {
+  if (!async_producer_commands.IsBound()) {
+    PERFETTO_DLOG(
+        "The Service tried to create a new data source but the remote Producer "
+        "has not yet initialized the connection");
+    return;
+  }
+  auto cmd = ipc::AsyncResult<protos::GetAsyncCommandResponse>::Create();
+  cmd.set_has_more(true);
+  cmd->mutable_setup_data_source()->set_new_instance_id(dsid);
+  cfg.ToProto(cmd->mutable_setup_data_source()->mutable_config());
+  async_producer_commands.Resolve(std::move(cmd));
+}
+
 // Invoked by the |core_service_| business logic when it wants to start a new
 // data source.
 void ProducerIPCService::RemoteProducer::StartDataSource(
diff --git a/src/tracing/ipc/service/producer_ipc_service.h b/src/tracing/ipc/service/producer_ipc_service.h
index 8446804..caa1978 100644
--- a/src/tracing/ipc/service/producer_ipc_service.h
+++ b/src/tracing/ipc/service/producer_ipc_service.h
@@ -71,6 +71,8 @@
     // no connection here, these methods are posted straight away.
     void OnConnect() override;
     void OnDisconnect() override;
+    void SetupDataSource(DataSourceInstanceID,
+                         const DataSourceConfig&) override;
     void StartDataSource(DataSourceInstanceID,
                          const DataSourceConfig&) override;
     void StopDataSource(DataSourceInstanceID) override;
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
index 31c7131..ff228c9 100644
--- a/src/tracing/test/mock_consumer.cc
+++ b/src/tracing/test/mock_consumer.cc
@@ -52,6 +52,10 @@
   service_endpoint_->EnableTracing(trace_config, std::move(write_into_file));
 }
 
+void MockConsumer::StartTracing() {
+  service_endpoint_->StartTracing();
+}
+
 void MockConsumer::DisableTracing() {
   service_endpoint_->DisableTracing();
 }
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
index 917aff8..bc1c6d4 100644
--- a/src/tracing/test/mock_consumer.h
+++ b/src/tracing/test/mock_consumer.h
@@ -48,6 +48,7 @@
 
   void Connect(TracingService* svc);
   void EnableTracing(const TraceConfig&, base::ScopedFile = base::ScopedFile());
+  void StartTracing();
   void DisableTracing();
   void FreeBuffers();
   void WaitForTracingDisabled(uint32_t timeout_ms = 3000);
diff --git a/src/tracing/test/mock_producer.cc b/src/tracing/test/mock_producer.cc
index 8c2f9c5..1294c1b 100644
--- a/src/tracing/test/mock_producer.cc
+++ b/src/tracing/test/mock_producer.cc
@@ -76,12 +76,12 @@
   task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
-void MockProducer::WaitForDataSourceStart(const std::string& name) {
+void MockProducer::WaitForDataSourceSetup(const std::string& name) {
   static int i = 0;
-  auto checkpoint_name = "on_ds_start_" + name + "_" + std::to_string(i++);
+  auto checkpoint_name = "on_ds_setup_" + name + "_" + std::to_string(i++);
   auto on_ds_start = task_runner_->CreateCheckpoint(checkpoint_name);
   EXPECT_CALL(*this,
-              StartDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
+              SetupDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
       .WillOnce(Invoke([on_ds_start, this](DataSourceInstanceID ds_id,
                                            const DataSourceConfig& cfg) {
         EXPECT_FALSE(data_source_instances_.count(cfg.name()));
@@ -95,6 +95,28 @@
   task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
+void MockProducer::WaitForDataSourceStart(const std::string& name) {
+  static int i = 0;
+  auto checkpoint_name = "on_ds_start_" + name + "_" + std::to_string(i++);
+  auto on_ds_start = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this,
+              StartDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
+      .WillOnce(Invoke([on_ds_start, this](DataSourceInstanceID ds_id,
+                                           const DataSourceConfig& cfg) {
+        // The data source might have been seen already through
+        // WaitForDataSourceSetup().
+        if (data_source_instances_.count(cfg.name()) == 0) {
+          auto target_buffer = static_cast<BufferID>(cfg.target_buffer());
+          auto session_id =
+              static_cast<TracingSessionID>(cfg.tracing_session_id());
+          data_source_instances_.emplace(
+              cfg.name(), EnabledDataSource{ds_id, target_buffer, session_id});
+        }
+        on_ds_start();
+      }));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
 void MockProducer::WaitForDataSourceStop(const std::string& name) {
   static int i = 0;
   auto checkpoint_name = "on_ds_stop_" + name + "_" + std::to_string(i++);
diff --git a/src/tracing/test/mock_producer.h b/src/tracing/test/mock_producer.h
index 590a64e..be13084 100644
--- a/src/tracing/test/mock_producer.h
+++ b/src/tracing/test/mock_producer.h
@@ -50,6 +50,7 @@
   void RegisterDataSource(const std::string& name, bool ack_stop = false);
   void UnregisterDataSource(const std::string& name);
   void WaitForTracingSetup();
+  void WaitForDataSourceSetup(const std::string& name);
   void WaitForDataSourceStart(const std::string& name);
   void WaitForDataSourceStop(const std::string& name);
   DataSourceInstanceID GetDataSourceInstanceId(const std::string& name);
@@ -68,6 +69,8 @@
   // Producer implementation.
   MOCK_METHOD0(OnConnect, void());
   MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD2(SetupDataSource,
+               void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD2(StartDataSource,
                void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index 26aa5cd..8060ab7 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -56,6 +56,8 @@
   // Producer implementation.
   MOCK_METHOD0(OnConnect, void());
   MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD2(SetupDataSource,
+               void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD2(StartDataSource,
                void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
@@ -187,10 +189,31 @@
   auto on_create_ds_instance =
       task_runner_->CreateCheckpoint("on_create_ds_instance");
   EXPECT_CALL(producer_, OnTracingSetup());
+
+  // Store the arguments passed to SetupDataSource() and later check that they
+  // match the ones passed to StartDataSource().
+  DataSourceInstanceID setup_id;
+  perfetto::protos::DataSourceConfig setup_cfg_proto;
+  EXPECT_CALL(producer_, SetupDataSource(_, _))
+      .WillOnce(
+          Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
+                                               const DataSourceConfig& cfg) {
+
+            setup_id = id;
+            cfg.ToProto(&setup_cfg_proto);
+          }));
   EXPECT_CALL(producer_, StartDataSource(_, _))
       .WillOnce(
-          Invoke([on_create_ds_instance, &ds_iid, &global_buf_id](
-                     DataSourceInstanceID id, const DataSourceConfig& cfg) {
+          Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
+                  &setup_cfg_proto](DataSourceInstanceID id,
+                                    const DataSourceConfig& cfg) {
+            // id and config should match the ones passed to SetupDataSource.
+            ASSERT_EQ(id, setup_id);
+            perfetto::protos::DataSourceConfig cfg_proto;
+            cfg.ToProto(&cfg_proto);
+            ASSERT_EQ(cfg_proto.SerializeAsString(),
+                      setup_cfg_proto.SerializeAsString());
+
             ASSERT_NE(0u, id);
             ds_iid = id;
             ASSERT_EQ("perfetto.test", cfg.name());
@@ -309,6 +332,7 @@
   auto on_create_ds_instance =
       task_runner_->CreateCheckpoint("on_create_ds_instance");
   EXPECT_CALL(producer_, OnTracingSetup());
+  EXPECT_CALL(producer_, SetupDataSource(_, _));
   EXPECT_CALL(producer_, StartDataSource(_, _))
       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
                            DataSourceInstanceID, const DataSourceConfig& cfg) {
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 7bd70b3..bdc28e5 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -76,6 +76,9 @@
 
   void OnDisconnect() override {}
 
+  void SetupDataSource(DataSourceInstanceID,
+                       const DataSourceConfig& source_config) override {}
+
   void StartDataSource(DataSourceInstanceID,
                        const DataSourceConfig& source_config) override {
     auto trace_writer = endpoint_->CreateTraceWriter(
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index ae8ea07..20c4049 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -57,6 +57,9 @@
   FAIL() << "Producer unexpectedly disconnected from the service";
 }
 
+void FakeProducer::SetupDataSource(DataSourceInstanceID,
+                                   const DataSourceConfig&) {}
+
 void FakeProducer::StartDataSource(DataSourceInstanceID,
                                    const DataSourceConfig& source_config) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
diff --git a/test/fake_producer.h b/test/fake_producer.h
index cc55640..baacb9f 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -46,6 +46,8 @@
   // Producer implementation.
   void OnConnect() override;
   void OnDisconnect() override;
+  void SetupDataSource(DataSourceInstanceID,
+                       const DataSourceConfig& source_config) override;
   void StartDataSource(DataSourceInstanceID,
                        const DataSourceConfig& source_config) override;
   void StopDataSource(DataSourceInstanceID) override;