Merge "trace_processor_shell: Persist cmdline history"
diff --git a/.travis.yml b/.travis.yml
index fcb7192..0603419 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -66,11 +66,13 @@
       sudo: false
       compiler: clang
       env: CFG=linux_trusty-clang-x86_64-msan GN_ARGS="is_debug=false is_msan=true"
-    - os: linux
-      dist: trusty
-      sudo: true
-      compiler: clang
-      env: CFG=linux_trusty-clang-x86_64-ubsan_asan_lsan GN_ARGS="is_debug=false is_ubsan=true is_asan=true is_lsan=true"
+# TODO(b/117093687): This always times out in Travis.
+# Re-enable once that is fixed.
+#    - os: linux
+#      dist: trusty
+#      sudo: true
+#      compiler: clang
+#      env: CFG=linux_trusty-clang-x86_64-ubsan_asan_lsan GN_ARGS="is_debug=false is_ubsan=true is_asan=true is_lsan=true"
     - os: linux
       dist: trusty
       sudo: false
diff --git a/Android.bp b/Android.bp
index e7f0d6c..541e66f 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3850,6 +3850,8 @@
     "src/profiling/memory/heapprofd_integrationtest.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/record_reader_unittest.cc",
+    "src/profiling/memory/sampler.cc",
+    "src/profiling/memory/sampler_unittest.cc",
     "src/profiling/memory/socket_listener.cc",
     "src/profiling/memory/socket_listener_unittest.cc",
     "src/profiling/memory/string_interner.cc",
@@ -3857,6 +3859,7 @@
     "src/profiling/memory/unwinding.cc",
     "src/profiling/memory/unwinding_unittest.cc",
     "src/profiling/memory/wire_protocol.cc",
+    "src/profiling/memory/wire_protocol_unittest.cc",
     "src/protozero/message.cc",
     "src/protozero/message_handle.cc",
     "src/protozero/message_handle_unittest.cc",
diff --git a/buildtools/.gitignore b/buildtools/.gitignore
index 1dd47b9..f33608c 100644
--- a/buildtools/.gitignore
+++ b/buildtools/.gitignore
@@ -1,3 +1,4 @@
+android-core/
 android_sdk/
 aosp-*/
 benchmark/
@@ -10,15 +11,15 @@
 libbacktrace/
 libcxx/
 libcxxabi/
+libfuzzer/
 libunwind/
 linenoise/
 linux/
 linux64/
+lzma/
 mac/
 ndk/
 nodejs/
 protobuf/
 sqlite/
-android-core/
-lzma/
 test_data/
diff --git a/include/perfetto/base/unix_socket.h b/include/perfetto/base/unix_socket.h
index 7daf5a2..4a613a0 100644
--- a/include/perfetto/base/unix_socket.h
+++ b/include/perfetto/base/unix_socket.h
@@ -54,6 +54,16 @@
 
 base::ScopedFile CreateSocket();
 
+// Update msghdr so subsequent sendmsg will send data that remains after n bytes
+// have already been sent.
+// This should not be used, it's exported for test use only.
+void ShiftMsgHdr(size_t n, struct msghdr* msg);
+
+// Re-enter sendmsg until all the data has been sent or an error occurs.
+//
+// TODO(fmayer): Figure out how to do timeouts here for heapprofd.
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags);
+
 // A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
 // transfer file descriptors. None of the methods in this class are blocking.
 // The main design goal is API simplicity and strong guarantees on the
@@ -170,6 +180,8 @@
   // EventListener::OnDisconnect() will be called.
   // If the socket is not connected, Send() will just return false.
   // Does not append a null string terminator to msg in any case.
+  //
+  // DO NOT PASS kNonBlocking, it is broken.
   bool Send(const void* msg,
             size_t len,
             int send_fd = -1,
@@ -179,7 +191,8 @@
             const int* send_fds,
             size_t num_fds,
             BlockingMode blocking = BlockingMode::kNonBlocking);
-  bool Send(const std::string& msg);
+  bool Send(const std::string& msg,
+            BlockingMode blockimg = BlockingMode::kNonBlocking);
 
   // Returns the number of bytes (<= |len|) written in |msg| or 0 if there
   // is no data in the buffer to read or an error occurs (in which case a
diff --git a/include/perfetto/traced/sys_stats_counters.h b/include/perfetto/traced/sys_stats_counters.h
index 5bb0974..98d25af 100644
--- a/include/perfetto/traced/sys_stats_counters.h
+++ b/include/perfetto/traced/sys_stats_counters.h
@@ -30,6 +30,7 @@
 };
 
 constexpr KeyAndId kMeminfoKeys[] = {
+    {"MemUnspecified", protos::pbzero::MeminfoCounters::MEMINFO_UNSPECIFIED},
     {"MemTotal", protos::pbzero::MeminfoCounters::MEMINFO_MEM_TOTAL},
     {"MemFree", protos::pbzero::MeminfoCounters::MEMINFO_MEM_FREE},
     {"MemAvailable", protos::pbzero::MeminfoCounters::MEMINFO_MEM_AVAILABLE},
@@ -66,6 +67,7 @@
 };
 
 const KeyAndId kVmstatKeys[] = {
+    {"VmstatUnspecified", protos::pbzero::VmstatCounters::VMSTAT_UNSPECIFIED},
     {"nr_free_pages", protos::pbzero::VmstatCounters::VMSTAT_NR_FREE_PAGES},
     {"nr_alloc_batch", protos::pbzero::VmstatCounters::VMSTAT_NR_ALLOC_BATCH},
     {"nr_inactive_anon",
diff --git a/include/perfetto/tracing/core/producer.h b/include/perfetto/tracing/core/producer.h
index 3f8698f..2c8e6f2 100644
--- a/include/perfetto/tracing/core/producer.h
+++ b/include/perfetto/tracing/core/producer.h
@@ -60,24 +60,40 @@
   // instance.
   virtual void OnDisconnect() = 0;
 
-  // Called by the Service to turn on one of the data source previously
+  // Called by the Service after OnConnect but before the first DataSource is
+  // created. Can be used for any setup required before tracing begins.
+  virtual void OnTracingSetup() = 0;
+
+  // The lifecycle methods below are always called in the following sequence:
+  // SetupDataSource  -> StartDataSource -> StopDataSource.
+  // Or, in the edge case where a trace is aborted immediately:
+  // SetupDataSource  -> StopDataSource.
+  // The Setup+Start call sequence is always guaranateed, regardless of the
+  // TraceConfig.deferred_start flags.
+  // Called by the Service to configure one of the data sources previously
   // registered through TracingService::ProducerEndpoint::RegisterDataSource().
+  // This method is always called before StartDataSource. There is always a
+  // SetupDataSource() call before each StartDataSource() call.
   // Args:
   // - DataSourceInstanceID is an identifier chosen by the Service that should
   //   be assigned to the newly created data source instance. It is used to
   //   match the StopDataSource() request below.
   // - DataSourceConfig is the configuration for the new data source (e.g.,
   //   tells which trace categories to enable).
+  virtual void SetupDataSource(DataSourceInstanceID,
+                               const DataSourceConfig&) = 0;
+
+  // Called by the Service to turn on one of the data sources previously
+  // registered through TracingService::ProducerEndpoint::RegisterDataSource()
+  // and initialized through SetupDataSource().
+  // Both arguments are guaranteed to be identical to the ones passed to the
+  // prior SetupDataSource() call.
   virtual void StartDataSource(DataSourceInstanceID,
                                const DataSourceConfig&) = 0;
 
   // Called by the Service to shut down an existing data source instance.
   virtual void StopDataSource(DataSourceInstanceID) = 0;
 
-  // Called by the Service after OnConnect but before the first DataSource is
-  // created. Can be used for any setup required before tracing begins.
-  virtual void OnTracingSetup() = 0;
-
   // Called by the service to request the Producer to commit the data of the
   // given data sources and return their chunks into the shared memory buffer.
   // The Producer is expected to invoke NotifyFlushComplete(FlushRequestID) on
diff --git a/include/perfetto/tracing/core/trace_config.h b/include/perfetto/tracing/core/trace_config.h
index df0a89e..f7ac92f 100644
--- a/include/perfetto/tracing/core/trace_config.h
+++ b/include/perfetto/tracing/core/trace_config.h
@@ -301,6 +301,9 @@
     return &guardrail_overrides_;
   }
 
+  bool deferred_start() const { return deferred_start_; }
+  void set_deferred_start(bool value) { deferred_start_ = value; }
+
  private:
   std::vector<BufferConfig> buffers_;
   std::vector<DataSource> data_sources_;
@@ -313,6 +316,7 @@
   uint32_t file_write_period_ms_ = {};
   uint64_t max_file_size_bytes_ = {};
   GuardrailOverrides guardrail_overrides_ = {};
+  bool deferred_start_ = {};
 
   // Allows to preserve unknown protobuf fields for compatibility
   // with future versions of .proto files.
diff --git a/include/perfetto/tracing/core/tracing_service.h b/include/perfetto/tracing/core/tracing_service.h
index 5759368..cdeb575 100644
--- a/include/perfetto/tracing/core/tracing_service.h
+++ b/include/perfetto/tracing/core/tracing_service.h
@@ -120,8 +120,20 @@
 
     // Enables tracing with the given TraceConfig. The ScopedFile argument is
     // used only when TraceConfig.write_into_file == true.
+    // If TraceConfig.deferred_start == true data sources are configured via
+    // SetupDataSource() but are not started until StartTracing() is called.
+    // This is to support pre-initialization and fast triggering of traces.
+    // The ScopedFile argument is used only when TraceConfig.write_into_file
+    // == true.
     virtual void EnableTracing(const TraceConfig&,
                                base::ScopedFile = base::ScopedFile()) = 0;
+
+    // Starts all data sources configured in the trace config. This is used only
+    // after calling EnableTracing() with TraceConfig.deferred_start=true.
+    // It's a no-op if called after a regular EnableTracing(), without setting
+    // deferred_start.
+    virtual void StartTracing() = 0;
+
     virtual void DisableTracing() = 0;
 
     // Requests all data sources to flush their data immediately and invokes the
diff --git a/protos/perfetto/config/perfetto_config.proto b/protos/perfetto/config/perfetto_config.proto
index 91f4aad..595011b 100644
--- a/protos/perfetto/config/perfetto_config.proto
+++ b/protos/perfetto/config/perfetto_config.proto
@@ -381,7 +381,7 @@
 // It contains the general config for the logging buffer(s) and the configs for
 // all the data source being enabled.
 //
-// Next id: 12.
+// Next id: 13.
 message TraceConfig {
   message BufferConfig {
     optional uint32 size_kb = 1;
@@ -487,6 +487,14 @@
   }
 
   optional GuardrailOverrides guardrail_overrides = 11;
+
+  // When true, data sources are not started until an explicit call to
+  // StartTracing() on the consumer port. This is to support early
+  // initialization and fast trace triggering. This can be used only when the
+  // Consumer explicitly triggers the StartTracing() method.
+  // This should not be used in a remote trace config via statsd, doing so will
+  // result in a hung trace session.
+  optional bool deferred_start = 12;
 }
 
 // End of protos/perfetto/config/trace_config.proto
diff --git a/protos/perfetto/config/trace_config.proto b/protos/perfetto/config/trace_config.proto
index 11ce8d2..08844f2 100644
--- a/protos/perfetto/config/trace_config.proto
+++ b/protos/perfetto/config/trace_config.proto
@@ -29,7 +29,7 @@
 // It contains the general config for the logging buffer(s) and the configs for
 // all the data source being enabled.
 //
-// Next id: 12.
+// Next id: 13.
 message TraceConfig {
   message BufferConfig {
     optional uint32 size_kb = 1;
@@ -135,4 +135,12 @@
   }
 
   optional GuardrailOverrides guardrail_overrides = 11;
+
+  // When true, data sources are not started until an explicit call to
+  // StartTracing() on the consumer port. This is to support early
+  // initialization and fast trace triggering. This can be used only when the
+  // Consumer explicitly triggers the StartTracing() method.
+  // This should not be used in a remote trace config via statsd, doing so will
+  // result in a hung trace session.
+  optional bool deferred_start = 12;
 }
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index 8c3e9b8..a8d5a84 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -33,8 +33,20 @@
   // Enables tracing for one or more data sources. At least one buffer must have
   // been previously created. The EnableTracingResponse is sent when tracing is
   // disabled (either explicitly or because of the |duration_ms| expired).
+  // The EnableTracingResponse is sent when tracing is disabled (either
+  // explicitly or because of the |duration_ms| expired).
+  // If |deferred_start| == true in the passed TraceConfig, all the tracing
+  // harness is brought up (creating buffers and data sources) without actually
+  // starting the data sources. Data sources will be started upon an explicit
+  // StartTracing() call.
+  // Note that |deferred_start| and StartTracing() have been introduced only
+  // in Android Q and are not supported in Android P.
   rpc EnableTracing(EnableTracingRequest) returns (EnableTracingResponse) {}
 
+  // Starts tracing. Only valid if EnableTracing() was called setting
+  // deferred_start = true in the TraceConfig passed to EnableTracing().
+  rpc StartTracing(StartTracingRequest) returns (StartTracingResponse) {}
+
   // Disables tracing for one or more data sources.
   rpc DisableTracing(DisableTracingRequest) returns (DisableTracingResponse) {}
 
@@ -70,6 +82,11 @@
   oneof state { bool disabled = 1; }
 }
 
+// Arguments for rpc StartTracing().
+message StartTracingRequest {}
+
+message StartTracingResponse {}
+
 // Arguments for rpc DisableTracing().
 message DisableTracingRequest {
   // TODO: not supported yet, selectively disable only some data sources.
diff --git a/protos/perfetto/ipc/producer_port.proto b/protos/perfetto/ipc/producer_port.proto
index b743914..0a27f82 100644
--- a/protos/perfetto/ipc/producer_port.proto
+++ b/protos/perfetto/ipc/producer_port.proto
@@ -118,8 +118,18 @@
 message GetAsyncCommandRequest {}
 
 message GetAsyncCommandResponse {
+  // Called after SetupTracing and before StartDataSource.
+  // This message was introduced in Android Q.
+  message SetupDataSource {
+    optional uint64 new_instance_id = 1;
+    optional protos.DataSourceConfig config = 2;
+  }
+
   message StartDataSource {
     optional uint64 new_instance_id = 1;
+
+    // For backwards compat reasons (with Android P), the config passed here
+    // is identical to the one passed to SetupDataSource.config.
     optional protos.DataSourceConfig config = 2;
   }
 
@@ -140,10 +150,12 @@
     optional uint64 request_id = 2;
   }
 
+  // Next id: 7.
   oneof cmd {
+    SetupTracing setup_tracing = 3;
+    SetupDataSource setup_data_source = 6;
     StartDataSource start_data_source = 1;
     StopDataSource stop_data_source = 2;
-    SetupTracing setup_tracing = 3;
     // id == 4 was teardown_tracing, never implemented.
     Flush flush = 5;
   }
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index e3ed5f2..fb4f5ff 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -64,6 +64,55 @@
 #pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
 #endif
 
+void ShiftMsgHdr(size_t n, struct msghdr* msg) {
+  for (size_t i = 0; i < msg->msg_iovlen; ++i) {
+    struct iovec* vec = &msg->msg_iov[i];
+    if (n < vec->iov_len) {
+      // We sent a part of this iovec.
+      vec->iov_base = reinterpret_cast<char*>(vec->iov_base) + n;
+      vec->iov_len -= n;
+      msg->msg_iov = vec;
+      msg->msg_iovlen -= i;
+      return;
+    }
+    // We sent the whole iovec.
+    n -= vec->iov_len;
+  }
+  // We sent all the iovecs.
+  PERFETTO_CHECK(n == 0);
+  msg->msg_iovlen = 0;
+  msg->msg_iov = nullptr;
+}
+
+// For the interested reader, Linux kernel dive to verify this is not only a
+// theoretical possibility: sock_stream_sendmsg, if sock_alloc_send_pskb returns
+// NULL [1] (which it does when it gets interrupted [2]), returns early with the
+// amount of bytes already sent.
+//
+// [1]:
+// https://elixir.bootlin.com/linux/v4.18.10/source/net/unix/af_unix.c#L1872
+// [2]: https://elixir.bootlin.com/linux/v4.18.10/source/net/core/sock.c#L2101
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags) {
+  // This does not make sense on non-blocking sockets.
+  PERFETTO_DCHECK((fcntl(sockfd, F_GETFL, 0) & O_NONBLOCK) == 0);
+
+  ssize_t total_sent = 0;
+  while (msg->msg_iov) {
+    ssize_t sent = PERFETTO_EINTR(sendmsg(sockfd, msg, flags));
+    if (sent <= 0) {
+      if (sent == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+        return total_sent;
+      return sent;
+    }
+    total_sent += sent;
+    ShiftMsgHdr(static_cast<size_t>(sent), msg);
+    // Only send the ancillary data with the first sendmsg call.
+    msg->msg_control = nullptr;
+    msg->msg_controllen = 0;
+  };
+  return total_sent;
+};
+
 ssize_t SockSend(int fd,
                  const void* msg,
                  size_t len,
@@ -90,7 +139,7 @@
     msg_hdr.msg_controllen = cmsg->cmsg_len;
   }
 
-  return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
+  return SendMsgAll(fd, &msg_hdr, kNoSigPipe);
 }
 
 ssize_t SockReceive(int fd,
@@ -396,8 +445,8 @@
   }
 }
 
-bool UnixSocket::Send(const std::string& msg) {
-  return Send(msg.c_str(), msg.size() + 1);
+bool UnixSocket::Send(const std::string& msg, BlockingMode blocking) {
+  return Send(msg.c_str(), msg.size() + 1, -1, blocking);
 }
 
 bool UnixSocket::Send(const void* msg,
@@ -414,6 +463,10 @@
                       const int* send_fds,
                       size_t num_fds,
                       BlockingMode blocking_mode) {
+  // TODO(b/117139237): Non-blocking sends are broken because we do not
+  // properly handle partial sends.
+  PERFETTO_DCHECK(blocking_mode == BlockingMode::kBlocking);
+
   if (state_ != State::kConnected) {
     errno = last_error_ = ENOTCONN;
     return false;
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 5cdd6ab..382f9bc 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -41,6 +41,7 @@
 using ::testing::Mock;
 
 constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+constexpr auto kBlocking = UnixSocket::BlockingMode::kBlocking;
 
 class MockEventListener : public UnixSocket::EventListener {
  public:
@@ -115,7 +116,7 @@
   auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
   EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
       .WillOnce(InvokeWithoutArgs(cli_disconnected));
-  EXPECT_FALSE(cli->Send("whatever"));
+  EXPECT_FALSE(cli->Send("whatever", kBlocking));
   task_runner_.RunUntilCheckpoint("cli_disconnected");
 }
 
@@ -153,8 +154,8 @@
         ASSERT_EQ("cli>srv", s->ReceiveString());
         srv_did_recv();
       }));
-  ASSERT_TRUE(cli->Send("cli>srv"));
-  ASSERT_TRUE(srv_conn->Send("srv>cli"));
+  ASSERT_TRUE(cli->Send("cli>srv", kBlocking));
+  ASSERT_TRUE(srv_conn->Send("srv>cli", kBlocking));
   task_runner_.RunUntilCheckpoint("cli_did_recv");
   task_runner_.RunUntilCheckpoint("srv_did_recv");
 
@@ -168,8 +169,8 @@
   ASSERT_EQ("", cli->ReceiveString());
   ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
   ASSERT_EQ("", srv_conn->ReceiveString());
-  ASSERT_FALSE(cli->Send("foo"));
-  ASSERT_FALSE(srv_conn->Send("bar"));
+  ASSERT_FALSE(cli->Send("foo", kBlocking));
+  ASSERT_FALSE(srv_conn->Send("bar", kBlocking));
   srv->Shutdown(true);
   task_runner_.RunUntilCheckpoint("cli_disconnected");
   task_runner_.RunUntilCheckpoint("srv_disconnected");
@@ -244,10 +245,10 @@
 
   int buf_fd[2] = {null_fd.get(), zero_fd.get()};
 
-  ASSERT_TRUE(
-      cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
+  ASSERT_TRUE(cli->Send(cli_str, sizeof(cli_str), buf_fd,
+                        base::ArraySize(buf_fd), kBlocking));
   ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
-                             base::ArraySize(buf_fd)));
+                             base::ArraySize(buf_fd), kBlocking));
   task_runner_.RunUntilCheckpoint("srv_did_recv");
   task_runner_.RunUntilCheckpoint("cli_did_recv");
 
@@ -300,7 +301,7 @@
         EXPECT_CALL(event_listener_, OnDataAvailable(s))
             .WillOnce(Invoke([](UnixSocket* t) {
               ASSERT_EQ("PING", t->ReceiveString());
-              ASSERT_TRUE(t->Send("PONG"));
+              ASSERT_TRUE(t->Send("PONG", kBlocking));
             }));
       }));
 
@@ -309,7 +310,7 @@
     EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
         .WillOnce(Invoke([](UnixSocket* s, bool success) {
           ASSERT_TRUE(success);
-          ASSERT_TRUE(s->Send("PING"));
+          ASSERT_TRUE(s->Send("PING", kBlocking));
         }));
 
     auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
@@ -356,7 +357,7 @@
         .WillOnce(Invoke(
             [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
               ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
-              ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+              ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd, kBlocking));
               // Wait for the client to change this again.
               EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
                   .WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
@@ -391,7 +392,7 @@
 
           // Now change the shared memory and ping the other process.
           memcpy(mem, "rock more", 10);
-          ASSERT_TRUE(s->Send("change notify"));
+          ASSERT_TRUE(s->Send("change notify", kBlocking));
           checkpoint();
         }));
     task_runner_.RunUntilCheckpoint("change_seen_by_client");
@@ -409,7 +410,7 @@
                               int num_frame) {
   char buf[kAtomicWrites_FrameSize];
   memset(buf, static_cast<char>(num_frame), sizeof(buf));
-  if (s->Send(buf, sizeof(buf)))
+  if (s->Send(buf, sizeof(buf), -1, kBlocking))
     return true;
   task_runner->PostTask(
       std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
@@ -560,8 +561,7 @@
     char buf[1024 * 32] = {};
     tx_task_runner.PostTask([&cli, &buf, all_sent] {
       for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
-        cli->Send(buf, sizeof(buf), -1 /*fd*/,
-                  UnixSocket::BlockingMode::kBlocking);
+        cli->Send(buf, sizeof(buf), -1 /*fd*/, kBlocking);
       all_sent();
     });
     tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
@@ -608,8 +608,7 @@
     static constexpr size_t kBufSize = 32 * 1024 * 1024;
     std::unique_ptr<char[]> buf(new char[kBufSize]());
     tx_task_runner.PostTask([&cli, &buf, send_done] {
-      bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
-                                UnixSocket::BlockingMode::kBlocking);
+      bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/, kBlocking);
       ASSERT_FALSE(send_res);
       send_done();
     });
@@ -620,6 +619,166 @@
   tx_thread.join();
 }
 
+TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
+  // Send a part of the first iov, then send the rest.
+  struct iovec iov[2] = {};
+  char hello[] = "hello";
+  char world[] = "world";
+  iov[0].iov_base = &hello[0];
+  iov[0].iov_len = base::ArraySize(hello);
+
+  iov[1].iov_base = &world[0];
+  iov[1].iov_len = base::ArraySize(world);
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ShiftMsgHdr(1, &hdr);
+  EXPECT_NE(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
+  EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
+  EXPECT_EQ(hdr.msg_iovlen, 2);
+  EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
+  EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
+
+  ShiftMsgHdr(base::ArraySize(hello) - 1, &hdr);
+  EXPECT_EQ(hdr.msg_iov, &iov[1]);
+  EXPECT_EQ(hdr.msg_iovlen, 1);
+  EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
+  EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
+
+  ShiftMsgHdr(base::ArraySize(world), &hdr);
+  EXPECT_EQ(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendFirstAndPartial) {
+  // Send first iov and part of the second iov, then send the rest.
+  struct iovec iov[2] = {};
+  char hello[] = "hello";
+  char world[] = "world";
+  iov[0].iov_base = &hello[0];
+  iov[0].iov_len = base::ArraySize(hello);
+
+  iov[1].iov_base = &world[0];
+  iov[1].iov_len = base::ArraySize(world);
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ShiftMsgHdr(base::ArraySize(hello) + 1, &hdr);
+  EXPECT_NE(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 1);
+  EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
+  EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
+
+  ShiftMsgHdr(base::ArraySize(world) - 1, &hdr);
+  EXPECT_EQ(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendEverything) {
+  // Send everything at once.
+  struct iovec iov[2] = {};
+  char hello[] = "hello";
+  char world[] = "world";
+  iov[0].iov_base = &hello[0];
+  iov[0].iov_len = base::ArraySize(hello);
+
+  iov[1].iov_base = &world[0];
+  iov[1].iov_len = base::ArraySize(world);
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ShiftMsgHdr(base::ArraySize(world) + base::ArraySize(hello), &hdr);
+  EXPECT_EQ(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+void Handler(int) {}
+
+int RollbackSigaction(const struct sigaction* act) {
+  return sigaction(SIGWINCH, act, nullptr);
+}
+
+TEST_F(UnixSocketTest, PartialSendMsgAll) {
+  int sv[2];
+  ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+  base::ScopedFile send_socket(sv[0]);
+  base::ScopedFile recv_socket(sv[1]);
+
+  // Set bufsize to minimum.
+  int bufsize = 1024;
+  ASSERT_EQ(setsockopt(*send_socket, SOL_SOCKET, SO_SNDBUF, &bufsize,
+                       sizeof(bufsize)),
+            0);
+  ASSERT_EQ(setsockopt(*recv_socket, SOL_SOCKET, SO_RCVBUF, &bufsize,
+                       sizeof(bufsize)),
+            0);
+
+  // Send something larger than send + recv kernel buffers combined to make
+  // sendmsg block.
+  char send_buf[8192];
+  // Make MSAN happy.
+  for (size_t i = 0; i < sizeof(send_buf); ++i)
+    send_buf[i] = static_cast<char>(i % 256);
+  char recv_buf[sizeof(send_buf)];
+
+  // Need to install signal handler to cause the interrupt to happen.
+  // man 3 pthread_kill:
+  //   Signal dispositions are process-wide: if a signal handler is
+  //   installed, the handler will be invoked in the thread thread, but if
+  //   the disposition of the signal is "stop", "continue", or "terminate",
+  //   this action will affect the whole process.
+  struct sigaction oldact;
+  struct sigaction newact = {};
+  newact.sa_handler = Handler;
+  ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
+  base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
+      rollback(&oldact);
+
+  auto blocked_thread = pthread_self();
+  std::thread th([blocked_thread, &recv_socket, &recv_buf] {
+    ssize_t rd = PERFETTO_EINTR(read(*recv_socket, recv_buf, 1));
+    ASSERT_EQ(rd, 1);
+    // We are now sure the other thread is in sendmsg, interrupt send.
+    ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
+    // Drain the socket to allow SendMsgAll to succeed.
+    size_t offset = 1;
+    while (offset < sizeof(recv_buf)) {
+      rd = PERFETTO_EINTR(
+          read(*recv_socket, recv_buf + offset, sizeof(recv_buf) - offset));
+      ASSERT_GE(rd, 0);
+      offset += static_cast<size_t>(rd);
+    }
+  });
+
+  // Test sending the send_buf in several chunks as an iov to exercise the
+  // more complicated code-paths of SendMsgAll.
+  struct msghdr hdr = {};
+  struct iovec iov[4];
+  static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
+                "Cannot split buffer into even pieces.");
+  constexpr size_t kChunkSize = sizeof(send_buf) / base::ArraySize(iov);
+  for (size_t i = 0; i < base::ArraySize(iov); ++i) {
+    iov[i].iov_base = send_buf + i * kChunkSize;
+    iov[i].iov_len = kChunkSize;
+  }
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ASSERT_EQ(SendMsgAll(*send_socket, &hdr, 0), sizeof(send_buf));
+  send_socket.reset();
+  th.join();
+  // Make sure the re-entry logic was actually triggered.
+  ASSERT_EQ(hdr.msg_iov, nullptr);
+  ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
+}
+
 // TODO(primiano): add a test to check that in the case of a peer sending a fd
 // and the other end just doing a recv (without taking it), the fd is closed and
 // not left around.
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 445e3d0..050fcca 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -182,7 +182,8 @@
   void Reply(const Frame& frame) {
     auto buf = BufferedFrameDeserializer::Serialize(frame);
     ASSERT_TRUE(client_sock->is_connected());
-    EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd));
+    EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd,
+                                  base::UnixSocket::BlockingMode::kBlocking));
     next_reply_fd = -1;
   }
 
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index 3ff1f99..26b56c8 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -152,7 +152,8 @@
 
   void SendFrame(const Frame& frame, int fd = -1) {
     std::string buf = BufferedFrameDeserializer::Serialize(frame);
-    ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd));
+    ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd,
+                            base::UnixSocket::BlockingMode::kBlocking));
   }
 
   BufferedFrameDeserializer frame_deserializer_;
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 78e57c9..369cb71 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -62,6 +62,8 @@
   sources = [
     "client.cc",
     "client.h",
+    "sampler.cc",
+    "sampler.h",
   ]
 }
 
@@ -83,9 +85,11 @@
     "client_unittest.cc",
     "heapprofd_integrationtest.cc",
     "record_reader_unittest.cc",
+    "sampler_unittest.cc",
     "socket_listener_unittest.cc",
     "string_interner_unittest.cc",
     "unwinding_unittest.cc",
+    "wire_protocol_unittest.cc",
   ]
 }
 
diff --git a/src/profiling/memory/sampler.cc b/src/profiling/memory/sampler.cc
new file mode 100644
index 0000000..eba955e
--- /dev/null
+++ b/src/profiling/memory/sampler.cc
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "perfetto/base/utils.h"
+
+namespace perfetto {
+namespace {
+ThreadLocalSamplingData* GetSpecific(pthread_key_t key,
+                                     void* (*unhooked_malloc)(size_t),
+                                     void (*unhooked_free)(void*)) {
+  // This should not be used with glibc as it might re-enter into malloc, see
+  // http://crbug.com/776475.
+  void* specific = pthread_getspecific(key);
+  if (specific == nullptr) {
+    specific = unhooked_malloc(sizeof(ThreadLocalSamplingData));
+    new (specific) ThreadLocalSamplingData(unhooked_free);
+    pthread_setspecific(key, specific);
+  }
+  return reinterpret_cast<ThreadLocalSamplingData*>(specific);
+}
+}  // namespace
+
+// The algorithm below is a inspired by the Chromium sampling algorithm at
+// https://cs.chromium.org/search/?q=f:cc+symbol:AllocatorShimLogAlloc+package:%5Echromium$&type=cs
+
+int64_t ThreadLocalSamplingData::NextSampleInterval(double rate) {
+  std::exponential_distribution<double> dist(1 / rate);
+  int64_t next = static_cast<int64_t>(dist(random_engine_));
+  return next < 1 ? 1 : next;
+}
+
+size_t ThreadLocalSamplingData::ShouldSample(size_t sz, double rate) {
+  interval_to_next_sample_ -= sz;
+  size_t sz_multiplier = 0;
+  while (PERFETTO_UNLIKELY(interval_to_next_sample_ <= 0)) {
+    interval_to_next_sample_ += NextSampleInterval(rate);
+    ++sz_multiplier;
+  }
+  return sz_multiplier;
+}
+
+size_t ShouldSample(pthread_key_t key,
+                    size_t sz,
+                    double rate,
+                    void* (*unhooked_malloc)(size_t),
+                    void (*unhooked_free)(void*)) {
+  if (PERFETTO_UNLIKELY(sz >= rate))
+    return 1;
+  return GetSpecific(key, unhooked_malloc, unhooked_free)
+      ->ShouldSample(sz, rate);
+}
+
+void ThreadLocalSamplingData::KeyDestructor(void* ptr) {
+  ThreadLocalSamplingData* thread_local_data =
+      reinterpret_cast<ThreadLocalSamplingData*>(ptr);
+  void (*unhooked_free)(void*) = thread_local_data->unhooked_free_;
+  thread_local_data->~ThreadLocalSamplingData();
+  unhooked_free(ptr);
+}
+
+}  // namespace perfetto
diff --git a/src/profiling/memory/sampler.h b/src/profiling/memory/sampler.h
new file mode 100644
index 0000000..879e2a2
--- /dev/null
+++ b/src/profiling/memory/sampler.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_SAMPLER_H_
+#define SRC_PROFILING_MEMORY_SAMPLER_H_
+
+#include <pthread.h>
+#include <stdint.h>
+
+#include <random>
+
+namespace perfetto {
+
+// This is the thread-local state needed to apply poission sampling to malloc
+// samples.
+//
+// We apply poisson sampling individually to each byte. The whole
+// allocation gets accounted as often as the number of sampled bytes it
+// contains.
+//
+// Googlers see go/chrome-shp for more details about the sampling (from
+// Chrome's heap profiler).
+class ThreadLocalSamplingData {
+ public:
+  ThreadLocalSamplingData(void (*unhooked_free)(void*))
+      : unhooked_free_(unhooked_free) {}
+  // Returns number of times a sample should be accounted. Due to how the
+  // poission sampling works, some samples should be accounted multiple times.
+  size_t ShouldSample(size_t sz, double rate);
+
+  // Destroy a TheadLocalSamplingData object after the pthread key has been
+  // deleted or when the thread shuts down. This uses unhooked_free passed in
+  // the constructor.
+  static void KeyDestructor(void* ptr);
+
+ private:
+  int64_t NextSampleInterval(double rate);
+  void (*unhooked_free_)(void*);
+  int64_t interval_to_next_sample_ = 0;
+  std::default_random_engine random_engine_;
+};
+
+// Returns number of times a sample should be accounted. Due to how the
+// poission sampling works, some samples should be accounted multiple times.
+//
+// Delegate to this thread's ThreadLocalSamplingData.
+//
+// We have to pass through the real malloc in order to allocate the TLS.
+size_t ShouldSample(pthread_key_t key,
+                    size_t sz,
+                    double rate,
+                    void* (*unhooked_malloc)(size_t),
+                    void (*unhooked_free)(void*));
+
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_MEMORY_SAMPLER_H_
diff --git a/src/profiling/memory/sampler_unittest.cc b/src/profiling/memory/sampler_unittest.cc
new file mode 100644
index 0000000..d598e71
--- /dev/null
+++ b/src/profiling/memory/sampler_unittest.cc
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(SamplerTest, TestLarge) {
+  pthread_key_t key;
+  ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+            0);
+  EXPECT_EQ(ShouldSample(key, 1024, 512, malloc, free), 1);
+  pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmall) {
+  pthread_key_t key;
+  ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+            0);
+  // As we initialize interval_to_next_sample_ with 0, the first sample
+  // should always get sampled.
+  EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+  pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmallFromThread) {
+  pthread_key_t key;
+  ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+            0);
+  std::thread th([key] {
+    // As we initialize interval_to_next_sample_ with 0, the first sample
+    // should always get sampled.
+    EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+  });
+  std::thread th2([key] {
+    // The threads should have separate state.
+    EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+  });
+  th.join();
+  th2.join();
+  pthread_key_delete(key);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index 58d5e8f..dd3c1a7 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -71,8 +71,10 @@
                              base::ScopedFile(open("/dev/null", O_RDONLY))};
   int raw_fds[2] = {*fds[0], *fds[1]};
   ASSERT_TRUE(client_socket->Send(&size, sizeof(size), raw_fds,
-                                  base::ArraySize(raw_fds)));
-  ASSERT_TRUE(client_socket->Send("1", 1));
+                                  base::ArraySize(raw_fds),
+                                  base::UnixSocket::BlockingMode::kBlocking));
+  ASSERT_TRUE(client_socket->Send("1", 1, -1,
+                                  base::UnixSocket::BlockingMode::kBlocking));
 
   task_runner.RunUntilCheckpoint("callback.called");
 }
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 1dcc8c0..8ed2780 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -238,6 +238,8 @@
     FreeRecord& free_rec = rec->free_record;
     FreePageEntry* entries = free_rec.metadata->entries;
     uint64_t num_entries = free_rec.metadata->num_entries;
+    if (num_entries > kFreePageSize)
+      return;
     for (size_t i = 0; i < num_entries; ++i) {
       const FreePageEntry& entry = entries[i];
       metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
index 39373f9..255bae2 100644
--- a/src/profiling/memory/wire_protocol.cc
+++ b/src/profiling/memory/wire_protocol.cc
@@ -17,6 +17,7 @@
 #include "src/profiling/memory/wire_protocol.h"
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/unix_socket.h"
 #include "perfetto/base/utils.h"
 
 #include <sys/socket.h>
@@ -29,8 +30,8 @@
 bool ViewAndAdvance(char** ptr, T** out, const char* end) {
   if (end - sizeof(T) < *ptr)
     return false;
-  *out = reinterpret_cast<T*>(ptr);
-  ptr += sizeof(T);
+  *out = reinterpret_cast<T*>(*ptr);
+  *ptr += sizeof(T);
   return true;
 }
 }  // namespace
@@ -44,9 +45,11 @@
   iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
   iovecs[1].iov_len = sizeof(msg.record_type);
   if (msg.alloc_header) {
+    PERFETTO_DCHECK(msg.record_type == RecordType::Malloc);
     iovecs[2].iov_base = msg.alloc_header;
     iovecs[2].iov_len = sizeof(*msg.alloc_header);
   } else if (msg.free_header) {
+    PERFETTO_DCHECK(msg.record_type == RecordType::Free);
     iovecs[2].iov_base = msg.free_header;
     iovecs[2].iov_len = sizeof(*msg.free_header);
   } else {
@@ -68,7 +71,7 @@
     total_size = iovecs[1].iov_len + iovecs[2].iov_len;
   }
 
-  ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+  ssize_t sent = base::SendMsgAll(sock, &hdr, MSG_NOSIGNAL);
   return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
 }
 
@@ -77,23 +80,27 @@
   char* end = buf + size;
   if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
     return false;
-  switch (*record_type) {
-    case RecordType::Malloc:
-      if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
-        return false;
-      out->payload = buf;
-      if (buf > end) {
-        PERFETTO_DCHECK(false);
-        return false;
-      }
-      out->payload_size = static_cast<size_t>(end - buf);
-      break;
-    case RecordType::Free:
-      if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
-        return false;
-      break;
-  }
+
+  out->payload = nullptr;
+  out->payload_size = 0;
   out->record_type = *record_type;
+
+  if (*record_type == RecordType::Malloc) {
+    if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+      return false;
+    out->payload = buf;
+    if (buf > end) {
+      PERFETTO_DCHECK(false);
+      return false;
+    }
+    out->payload_size = static_cast<size_t>(end - buf);
+  } else if (*record_type == RecordType::Free) {
+    if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+      return false;
+  } else {
+    PERFETTO_DCHECK(false);
+    return false;
+  }
   return true;
 }
 
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
new file mode 100644
index 0000000..c9e8b22
--- /dev/null
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/record_reader.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+
+bool operator==(const AllocMetadata& one, const AllocMetadata& other);
+bool operator==(const AllocMetadata& one, const AllocMetadata& other) {
+  return std::tie(one.sequence_number, one.alloc_size, one.alloc_address,
+                  one.stack_pointer, one.stack_pointer_offset, one.arch) ==
+             std::tie(other.sequence_number, other.alloc_size,
+                      other.alloc_address, other.stack_pointer,
+                      other.stack_pointer_offset, other.arch) &&
+         memcmp(one.register_data, other.register_data, kMaxRegisterDataSize) ==
+             0;
+}
+
+bool operator==(const FreeMetadata& one, const FreeMetadata& other);
+bool operator==(const FreeMetadata& one, const FreeMetadata& other) {
+  if (one.num_entries != other.num_entries)
+    return false;
+  for (size_t i = 0; i < one.num_entries; ++i) {
+    if (std::tie(one.entries[i].sequence_number, one.entries[i].addr) !=
+        std::tie(other.entries[i].sequence_number, other.entries[i].addr))
+      return false;
+  }
+  return true;
+}
+
+namespace {
+
+RecordReader::Record ReceiveAll(int sock) {
+  RecordReader record_reader;
+  RecordReader::Record record;
+  bool received = false;
+  while (!received) {
+    RecordReader::ReceiveBuffer buf = record_reader.BeginReceive();
+    ssize_t rd = PERFETTO_EINTR(read(sock, buf.data, buf.size));
+    PERFETTO_CHECK(rd > 0);
+    auto status = record_reader.EndReceive(static_cast<size_t>(rd), &record);
+    switch (status) {
+      case (RecordReader::Result::Noop):
+        break;
+      case (RecordReader::Result::RecordReceived):
+        received = true;
+        break;
+      case (RecordReader::Result::KillConnection):
+        PERFETTO_CHECK(false);
+        break;
+    }
+  }
+  return record;
+}
+
+TEST(WireProtocolTest, AllocMessage) {
+  char payload[] = {0x77, 0x77, 0x77, 0x00};
+  WireMessage msg = {};
+  msg.record_type = RecordType::Malloc;
+  AllocMetadata metadata = {};
+  metadata.sequence_number = 0xA1A2A3A4A5A6A7A8;
+  metadata.alloc_size = 0xB1B2B3B4B5B6B7B8;
+  metadata.alloc_address = 0xC1C2C3C4C5C6C7C8;
+  metadata.stack_pointer = 0xD1D2D3D4D5D6D7D8;
+  metadata.stack_pointer_offset = 0xE1E2E3E4E5E6E7E8;
+  metadata.arch = unwindstack::ARCH_X86;
+  for (size_t i = 0; i < kMaxRegisterDataSize; ++i)
+    metadata.register_data[i] = 0x66;
+  msg.alloc_header = &metadata;
+  msg.payload = payload;
+  msg.payload_size = sizeof(payload);
+
+  int sv[2];
+  ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+  base::ScopedFile send_sock(sv[0]);
+  base::ScopedFile recv_sock(sv[1]);
+  ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+  RecordReader::Record record = ReceiveAll(*recv_sock);
+
+  WireMessage recv_msg;
+  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+                                 record.size, &recv_msg));
+  ASSERT_EQ(recv_msg.record_type, msg.record_type);
+  ASSERT_EQ(*recv_msg.alloc_header, *msg.alloc_header);
+  ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+  ASSERT_STREQ(recv_msg.payload, msg.payload);
+}
+
+TEST(WireProtocolTest, FreeMessage) {
+  WireMessage msg = {};
+  msg.record_type = RecordType::Free;
+  FreeMetadata metadata = {};
+  metadata.num_entries = kFreePageSize;
+  for (size_t i = 0; i < kFreePageSize; ++i) {
+    metadata.entries[i].sequence_number = 0x111111111111111;
+    metadata.entries[i].addr = 0x222222222222222;
+  }
+  msg.free_header = &metadata;
+
+  int sv[2];
+  ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+  base::ScopedFile send_sock(sv[0]);
+  base::ScopedFile recv_sock(sv[1]);
+  ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+  RecordReader::Record record = ReceiveAll(*recv_sock);
+
+  WireMessage recv_msg;
+  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+                                 record.size, &recv_msg));
+  ASSERT_EQ(recv_msg.record_type, msg.record_type);
+  ASSERT_EQ(*recv_msg.free_header, *msg.free_header);
+  ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index 978f128..a3da82c 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -90,6 +90,7 @@
   deps = [
     "../../buildtools:sqlite",
     "../../gn:default_deps",
+    "../../include/perfetto/traced:sys_stats_counters",
     "../../protos/perfetto/trace:lite",
     "../../protos/perfetto/trace_processor:lite",
     "../base",
diff --git a/src/trace_processor/counters_table.cc b/src/trace_processor/counters_table.cc
index cd14cc4..b01400f 100644
--- a/src/trace_processor/counters_table.cc
+++ b/src/trace_processor/counters_table.cc
@@ -102,6 +102,18 @@
           sqlite3_result_text(context, "utid", -1, nullptr);
           break;
         }
+        case RefType::kNoRef: {
+          sqlite3_result_null(context);
+          break;
+        }
+        case RefType::kIrq: {
+          sqlite3_result_text(context, "irq", -1, nullptr);
+          break;
+        }
+        case RefType::kSoftIrq: {
+          sqlite3_result_text(context, "softirq", -1, nullptr);
+          break;
+        }
       }
       break;
     }
diff --git a/src/trace_processor/proto_trace_parser.cc b/src/trace_processor/proto_trace_parser.cc
index ac1b65c..c82982a 100644
--- a/src/trace_processor/proto_trace_parser.cc
+++ b/src/trace_processor/proto_trace_parser.cc
@@ -24,6 +24,7 @@
 #include "perfetto/base/string_view.h"
 #include "perfetto/base/utils.h"
 #include "perfetto/protozero/proto_decoder.h"
+#include "perfetto/traced/sys_stats_counters.h"
 #include "src/trace_processor/process_tracker.h"
 #include "src/trace_processor/sched_tracker.h"
 #include "src/trace_processor/slice_tracker.h"
@@ -102,11 +103,37 @@
 
 ProtoTraceParser::ProtoTraceParser(TraceProcessorContext* context)
     : context_(context),
-      cpu_freq_name_id_(context->storage->InternString("cpufreq")) {}
+      cpu_freq_name_id_(context->storage->InternString("cpufreq")),
+      num_forks_name_id_(context->storage->InternString("num_forks")),
+      num_irq_total_name_id_(context->storage->InternString("num_irq_total")),
+      num_softirq_total_name_id_(
+          context->storage->InternString("num_softirq_total")),
+      num_irq_name_id_(context->storage->InternString("num_irq")),
+      num_softirq_name_id_(context->storage->InternString("num_softirq")),
+      cpu_times_user_ns_id_(
+          context->storage->InternString("cpu.times.user_ns")),
+      cpu_times_user_ice_ns_id_(
+          context->storage->InternString("cpu.times.user_ice_ns")),
+      cpu_times_system_mode_ns_id_(
+          context->storage->InternString("cpu.times.system_mode_ns")),
+      cpu_times_idle_ns_id_(
+          context->storage->InternString("cpu.times.idle_ns")),
+      cpu_times_io_wait_ns_id_(
+          context->storage->InternString("cpu.times.io_wait_ns")),
+      cpu_times_irq_ns_id_(context->storage->InternString("cpu.times.irq_ns")),
+      cpu_times_softirq_ns_id_(
+          context->storage->InternString("cpu.times.softirq_ns")) {
+  for (const auto& name : BuildMeminfoCounterNames()) {
+    meminfo_strs_id_.emplace_back(context->storage->InternString(name));
+  }
+  for (const auto& name : BuildVmstatCounterNames()) {
+    vmstat_strs_id_.emplace_back(context->storage->InternString(name));
+  }
+}
 
 ProtoTraceParser::~ProtoTraceParser() = default;
 
-void ProtoTraceParser::ParseTracePacket(TraceBlobView packet) {
+void ProtoTraceParser::ParseTracePacket(uint64_t ts, TraceBlobView packet) {
   ProtoDecoder decoder(packet.data(), packet.length());
 
   for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
@@ -116,6 +143,11 @@
         ParseProcessTree(packet.slice(fld_off, fld.size()));
         break;
       }
+      case protos::TracePacket::kSysStatsFieldNumber: {
+        const size_t fld_off = packet.offset_of(fld.data());
+        ParseSysStats(ts, packet.slice(fld_off, fld.size()));
+        break;
+      }
       default:
         break;
     }
@@ -123,6 +155,192 @@
   PERFETTO_DCHECK(decoder.IsEndOfBuffer());
 }
 
+void ProtoTraceParser::ParseSysStats(uint64_t ts, TraceBlobView stats) {
+  ProtoDecoder decoder(stats.data(), stats.length());
+  for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+    switch (fld.id) {
+      case protos::SysStats::kMeminfoFieldNumber: {
+        const size_t fld_off = stats.offset_of(fld.data());
+        ParseMemInfo(ts, stats.slice(fld_off, fld.size()));
+        break;
+      }
+      case protos::SysStats::kVmstatFieldNumber: {
+        const size_t fld_off = stats.offset_of(fld.data());
+        ParseVmStat(ts, stats.slice(fld_off, fld.size()));
+        break;
+      }
+      case protos::SysStats::kCpuStatFieldNumber: {
+        const size_t fld_off = stats.offset_of(fld.data());
+        ParseCpuTimes(ts, stats.slice(fld_off, fld.size()));
+        break;
+      }
+      case protos::SysStats::kNumIrqFieldNumber: {
+        const size_t fld_off = stats.offset_of(fld.data());
+        ParseIrqCount(ts, stats.slice(fld_off, fld.size()),
+                      /*is_softirq=*/false);
+        break;
+      }
+      case protos::SysStats::kNumSoftirqFieldNumber: {
+        const size_t fld_off = stats.offset_of(fld.data());
+        ParseIrqCount(ts, stats.slice(fld_off, fld.size()),
+                      /*is_softirq=*/true);
+        break;
+      }
+      case protos::SysStats::kNumForksFieldNumber: {
+        context_->sched_tracker->PushCounter(
+            ts, fld.as_uint32(), num_forks_name_id_, 0, RefType::kNoRef);
+        break;
+      }
+      case protos::SysStats::kNumIrqTotalFieldNumber: {
+        context_->sched_tracker->PushCounter(
+            ts, fld.as_uint32(), num_irq_total_name_id_, 0, RefType::kNoRef);
+        break;
+      }
+      case protos::SysStats::kNumSoftirqTotalFieldNumber: {
+        context_->sched_tracker->PushCounter(ts, fld.as_uint32(),
+                                             num_softirq_total_name_id_, 0,
+                                             RefType::kNoRef);
+        break;
+      }
+      default:
+        break;
+    }
+  }
+}
+void ProtoTraceParser::ParseIrqCount(uint64_t ts,
+                                     TraceBlobView irq,
+                                     bool is_soft) {
+  ProtoDecoder decoder(irq.data(), irq.length());
+  uint32_t key = 0;
+  uint32_t value = 0;
+  for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+    switch (fld.id) {
+      case protos::SysStats::InterruptCount::kIrqFieldNumber:
+        key = fld.as_uint32();
+        break;
+      case protos::SysStats::InterruptCount::kCountFieldNumber:
+        value = fld.as_uint32();
+        break;
+    }
+  }
+  RefType ref_type = is_soft ? RefType::kIrq : RefType::kSoftIrq;
+  StringId name_id = is_soft ? num_irq_name_id_ : num_softirq_name_id_;
+  context_->sched_tracker->PushCounter(ts, value, name_id, key, ref_type);
+}
+
+void ProtoTraceParser::ParseMemInfo(uint64_t ts, TraceBlobView mem) {
+  ProtoDecoder decoder(mem.data(), mem.length());
+  uint32_t key = 0;
+  uint32_t value = 0;
+  for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+    switch (fld.id) {
+      case protos::SysStats::MeminfoValue::kKeyFieldNumber:
+        key = fld.as_uint32();
+        break;
+      case protos::SysStats::MeminfoValue::kValueFieldNumber:
+        value = fld.as_uint32();
+        break;
+    }
+  }
+  if (PERFETTO_UNLIKELY(key >= meminfo_strs_id_.size())) {
+    PERFETTO_ELOG("MemInfo key %d is not recognized.", key);
+    return;
+  }
+  context_->sched_tracker->PushCounter(ts, value, meminfo_strs_id_[key], 0,
+                                       RefType::kNoRef);
+}
+
+void ProtoTraceParser::ParseVmStat(uint64_t ts, TraceBlobView stat) {
+  ProtoDecoder decoder(stat.data(), stat.length());
+  uint32_t key = 0;
+  uint32_t value = 0;
+  for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+    switch (fld.id) {
+      case protos::SysStats::VmstatValue::kKeyFieldNumber:
+        key = fld.as_uint32();
+        break;
+      case protos::SysStats::VmstatValue::kValueFieldNumber:
+        value = fld.as_uint32();
+        break;
+    }
+  }
+  if (PERFETTO_UNLIKELY(key >= vmstat_strs_id_.size())) {
+    PERFETTO_ELOG("VmStat key %d is not recognized.", key);
+    return;
+  }
+  context_->sched_tracker->PushCounter(ts, value, vmstat_strs_id_[key], 0,
+                                       RefType::kNoRef);
+}
+
+void ProtoTraceParser::ParseCpuTimes(uint64_t ts, TraceBlobView cpu_times) {
+  ProtoDecoder decoder(cpu_times.data(), cpu_times.length());
+  uint64_t cpu = 0;
+  uint32_t value = 0;
+  // Speculate on CPU being first.
+  constexpr auto kCpuFieldTag = protozero::proto_utils::MakeTagVarInt(
+      protos::SysStats::CpuTimes::kCpuIdFieldNumber);
+  if (cpu_times.length() > 2 && cpu_times.data()[0] == kCpuFieldTag &&
+      cpu_times.data()[1] < 0x80) {
+    cpu = cpu_times.data()[1];
+  } else {
+    if (!PERFETTO_LIKELY((
+            decoder.FindIntField<protos::SysStats::CpuTimes::kCpuIdFieldNumber>(
+                &cpu)))) {
+      PERFETTO_ELOG("CPU field not found in CpuTimes");
+      return;
+    }
+  }
+
+  for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+    switch (fld.id) {
+      case protos::SysStats::CpuTimes::kUserNsFieldNumber: {
+        value = fld.as_uint32();
+        context_->sched_tracker->PushCounter(ts, value, cpu_times_user_ns_id_,
+                                             cpu, RefType::kCPU_ID);
+        break;
+      }
+      case protos::SysStats::CpuTimes::kUserIceNsFieldNumber: {
+        value = fld.as_uint32();
+        context_->sched_tracker->PushCounter(
+            ts, value, cpu_times_user_ice_ns_id_, cpu, RefType::kCPU_ID);
+        break;
+      }
+      case protos::SysStats::CpuTimes::kSystemModeNsFieldNumber: {
+        value = fld.as_uint32();
+        context_->sched_tracker->PushCounter(
+            ts, value, cpu_times_system_mode_ns_id_, cpu, RefType::kCPU_ID);
+        break;
+      }
+      case protos::SysStats::CpuTimes::kIdleNsFieldNumber: {
+        value = fld.as_uint32();
+        context_->sched_tracker->PushCounter(ts, value, cpu_times_idle_ns_id_,
+                                             cpu, RefType::kCPU_ID);
+        break;
+      }
+      case protos::SysStats::CpuTimes::kIoWaitNsFieldNumber: {
+        value = fld.as_uint32();
+        context_->sched_tracker->PushCounter(
+            ts, value, cpu_times_io_wait_ns_id_, cpu, RefType::kCPU_ID);
+        break;
+      }
+      case protos::SysStats::CpuTimes::kIrqNsFieldNumber: {
+        value = fld.as_uint32();
+        context_->sched_tracker->PushCounter(ts, value, cpu_times_irq_ns_id_,
+                                             cpu, RefType::kCPU_ID);
+        break;
+      }
+      case protos::SysStats::CpuTimes::kSoftirqNsFieldNumber: {
+        value = fld.as_uint32();
+        context_->sched_tracker->PushCounter(
+            ts, value, cpu_times_softirq_ns_id_, cpu, RefType::kCPU_ID);
+        break;
+      }
+      default:
+        break;
+    }
+  }
+}
+
 void ProtoTraceParser::ParseProcessTree(TraceBlobView pstree) {
   ProtoDecoder decoder(pstree.data(), pstree.length());
 
diff --git a/src/trace_processor/proto_trace_parser.h b/src/trace_processor/proto_trace_parser.h
index 9da91e4..73b3682 100644
--- a/src/trace_processor/proto_trace_parser.h
+++ b/src/trace_processor/proto_trace_parser.h
@@ -54,7 +54,7 @@
   virtual ~ProtoTraceParser();
 
   // virtual for testing.
-  virtual void ParseTracePacket(TraceBlobView);
+  virtual void ParseTracePacket(uint64_t timestamp, TraceBlobView);
   virtual void ParseFtracePacket(uint32_t cpu,
                                  uint64_t timestamp,
                                  TraceBlobView);
@@ -64,10 +64,29 @@
   void ParsePrint(uint32_t cpu, uint64_t timestamp, TraceBlobView);
   void ParseThread(TraceBlobView);
   void ParseProcess(TraceBlobView);
+  void ParseSysStats(uint64_t ts, TraceBlobView);
+  void ParseMemInfo(uint64_t ts, TraceBlobView);
+  void ParseVmStat(uint64_t ts, TraceBlobView);
+  void ParseCpuTimes(uint64_t ts, TraceBlobView);
+  void ParseIrqCount(uint64_t ts, TraceBlobView, bool is_soft);
 
  private:
   TraceProcessorContext* context_;
   const StringId cpu_freq_name_id_;
+  const StringId num_forks_name_id_;
+  const StringId num_irq_total_name_id_;
+  const StringId num_softirq_total_name_id_;
+  const StringId num_irq_name_id_;
+  const StringId num_softirq_name_id_;
+  const StringId cpu_times_user_ns_id_;
+  const StringId cpu_times_user_ice_ns_id_;
+  const StringId cpu_times_system_mode_ns_id_;
+  const StringId cpu_times_idle_ns_id_;
+  const StringId cpu_times_io_wait_ns_id_;
+  const StringId cpu_times_irq_ns_id_;
+  const StringId cpu_times_softirq_ns_id_;
+  std::vector<StringId> meminfo_strs_id_;
+  std::vector<StringId> vmstat_strs_id_;
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/proto_trace_parser_unittest.cc b/src/trace_processor/proto_trace_parser_unittest.cc
index c0881c8..e502da5 100644
--- a/src/trace_processor/proto_trace_parser_unittest.cc
+++ b/src/trace_processor/proto_trace_parser_unittest.cc
@@ -36,6 +36,7 @@
 using ::testing::ElementsAreArray;
 using ::testing::Eq;
 using ::testing::Pointwise;
+using ::testing::NiceMock;
 
 class MockSchedTracker : public SchedTracker {
  public:
@@ -79,7 +80,7 @@
 class ProtoTraceParserTest : public ::testing::Test {
  public:
   ProtoTraceParserTest() {
-    storage_ = new MockTraceStorage();
+    storage_ = new NiceMock<MockTraceStorage>();
     context_.storage.reset(storage_);
     sched_ = new MockSchedTracker(&context_);
     context_.sched_tracker.reset(sched_);
@@ -102,7 +103,7 @@
   TraceProcessorContext context_;
   MockSchedTracker* sched_;
   MockProcessTracker* process_;
-  MockTraceStorage* storage_;
+  NiceMock<MockTraceStorage>* storage_;
 };
 
 TEST_F(ProtoTraceParserTest, LoadSingleEvent) {
@@ -232,6 +233,36 @@
   Tokenize(trace_2);
 }
 
+TEST_F(ProtoTraceParserTest, LoadMemInfo) {
+  protos::Trace trace_1;
+  auto* packet = trace_1.add_packet();
+  uint64_t ts = 1000;
+  packet->set_timestamp(ts);
+  auto* bundle = packet->mutable_sys_stats();
+  auto* meminfo = bundle->add_meminfo();
+  meminfo->set_key(perfetto::protos::MEMINFO_MEM_TOTAL);
+  uint32_t value = 10;
+  meminfo->set_value(value);
+
+  EXPECT_CALL(*sched_, PushCounter(ts, value, 0, 0, RefType::kNoRef));
+  Tokenize(trace_1);
+}
+
+TEST_F(ProtoTraceParserTest, LoadVmStats) {
+  protos::Trace trace_1;
+  auto* packet = trace_1.add_packet();
+  uint64_t ts = 1000;
+  packet->set_timestamp(ts);
+  auto* bundle = packet->mutable_sys_stats();
+  auto* meminfo = bundle->add_vmstat();
+  meminfo->set_key(perfetto::protos::VMSTAT_COMPACT_SUCCESS);
+  uint32_t value = 10;
+  meminfo->set_value(value);
+
+  EXPECT_CALL(*sched_, PushCounter(ts, value, 0, 0, RefType::kNoRef));
+  Tokenize(trace_1);
+}
+
 TEST_F(ProtoTraceParserTest, LoadCpuFreq) {
   protos::Trace trace_1;
   auto* bundle = trace_1.add_packet()->mutable_ftrace_events();
diff --git a/src/trace_processor/proto_trace_tokenizer.cc b/src/trace_processor/proto_trace_tokenizer.cc
index 730f9d4..f3284c9 100644
--- a/src/trace_processor/proto_trace_tokenizer.cc
+++ b/src/trace_processor/proto_trace_tokenizer.cc
@@ -133,9 +133,28 @@
 }
 
 void ProtoTraceTokenizer::ParsePacket(TraceBlobView packet) {
+  constexpr auto kTimestampFieldNumber =
+      protos::TracePacket::kTimestampFieldNumber;
   ProtoDecoder decoder(packet.data(), packet.length());
+  uint64_t timestamp = 0;
+  bool timestamp_found = false;
 
-  // TODO(taylori): Add a timestamp to TracePacket and read it here.
+  // Speculate on the fact that the timestamp is often the 1st field of the
+  // packet.
+  constexpr auto timestampFieldTag = MakeTagVarInt(kTimestampFieldNumber);
+  if (PERFETTO_LIKELY(packet.length() > 10 &&
+                      packet.data()[0] == timestampFieldTag)) {
+    // Fastpath.
+    const uint8_t* next =
+        ParseVarInt(packet.data() + 1, packet.data() + 11, &timestamp);
+    timestamp_found = next != packet.data() + 1;
+    decoder.Reset(next);
+  } else {
+    // Slowpath.
+    timestamp_found = decoder.FindIntField<kTimestampFieldNumber>(&timestamp);
+  }
+  if (timestamp_found)
+    last_timestamp_ = timestamp;
 
   // TODO(primiano): this can be optimized for the ftrace case.
   for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
diff --git a/src/trace_processor/trace_sorter.cc b/src/trace_processor/trace_sorter.cc
index ea5c927..1158734 100644
--- a/src/trace_processor/trace_sorter.cc
+++ b/src/trace_processor/trace_sorter.cc
@@ -70,7 +70,7 @@
       next_stage->ParseFtracePacket(it->cpu, it->timestamp,
                                     std::move(it->blob_view));
     } else {
-      next_stage->ParseTracePacket(std::move(it->blob_view));
+      next_stage->ParseTracePacket(it->timestamp, std::move(it->blob_view));
     }
   }
 
diff --git a/src/trace_processor/trace_sorter_unittest.cc b/src/trace_processor/trace_sorter_unittest.cc
index 8716fbb..49fc021 100644
--- a/src/trace_processor/trace_sorter_unittest.cc
+++ b/src/trace_processor/trace_sorter_unittest.cc
@@ -28,6 +28,7 @@
 
 using ::testing::_;
 using ::testing::InSequence;
+using ::testing::NiceMock;
 
 class MockTraceParser : public ProtoTraceParser {
  public:
@@ -45,10 +46,11 @@
     MOCK_ParseFtracePacket(cpu, timestamp, tbv.data(), tbv.length());
   }
 
-  MOCK_METHOD2(MOCK_ParseTracePacket, void(const uint8_t* data, size_t length));
+  MOCK_METHOD3(MOCK_ParseTracePacket,
+               void(uint64_t ts, const uint8_t* data, size_t length));
 
-  void ParseTracePacket(TraceBlobView tbv) override {
-    MOCK_ParseTracePacket(tbv.data(), tbv.length());
+  void ParseTracePacket(uint64_t ts, TraceBlobView tbv) override {
+    MOCK_ParseTracePacket(ts, tbv.data(), tbv.length());
   }
 };
 
@@ -63,7 +65,7 @@
  public:
   TraceSorterTest()
       : test_buffer_(std::unique_ptr<uint8_t[]>(new uint8_t[8]), 0, 8) {
-    storage_ = new MockTraceStorage();
+    storage_ = new NiceMock<MockTraceStorage>();
     context_.storage.reset(storage_);
     context_.sorter.reset(
         new TraceSorter(&context_, GetParam(), 0 /*window_size*/));
@@ -74,7 +76,7 @@
  protected:
   TraceProcessorContext context_;
   MockTraceParser* parser_;
-  MockTraceStorage* storage_;
+  NiceMock<MockTraceStorage>* storage_;
   TraceBlobView test_buffer_;
 };
 
@@ -93,7 +95,7 @@
 
 TEST_P(TraceSorterTest, TestTracePacket) {
   TraceBlobView view = test_buffer_.slice(0, 1);
-  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(view.data(), 1));
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
   context_.sorter->PushTracePacket(1000, std::move(view));
   context_.sorter->FlushEventsForced();
 }
@@ -107,8 +109,8 @@
   InSequence s;
 
   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1));
-  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(view_2.data(), 2));
-  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(view_3.data(), 3));
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
 
   context_.sorter->set_window_ns_for_testing(200);
diff --git a/src/trace_processor/trace_storage.h b/src/trace_processor/trace_storage.h
index fb4e92c..9a86cf4 100644
--- a/src/trace_processor/trace_storage.h
+++ b/src/trace_processor/trace_storage.h
@@ -43,7 +43,7 @@
 // StringId is an offset into |string_pool_|.
 using StringId = size_t;
 
-enum RefType { kUTID = 0, kCPU_ID = 1 };
+enum RefType { kNoRef = 0, kUTID = 1, kCPU_ID = 2, kIrq = 3, kSoftIrq = 4 };
 
 // Stores a data inside a trace file in a columnar form. This makes it efficient
 // to read or search across a single field of the trace (e.g. all the thread
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index 02fcbf8..9d3a098 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -131,6 +131,11 @@
 
 InodeFileDataSource::~InodeFileDataSource() = default;
 
+void InodeFileDataSource::Start() {
+  // Nothing special to do, this data source is only reacting to on-demand
+  // events such as OnInodes().
+}
+
 void InodeFileDataSource::AddInodesFromStaticMap(
     BlockDeviceID block_device_id,
     std::set<Inode>* inode_numbers) {
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 181dfda..143df66 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -79,12 +79,14 @@
   void AddInodesFromLRUCache(BlockDeviceID block_device_id,
                              std::set<Inode>* inode_numbers);
 
-  void Flush() override;
-
   virtual void FillInodeEntry(InodeFileMap* destination,
                               Inode inode_number,
                               const InodeMapValue& inode_map_value);
 
+  // ProbesDataSource implementation.
+  void Start() override;
+  void Flush() override;
+
  protected:
   std::multimap<BlockDeviceID, std::string> mount_points_;
 
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.cc b/src/traced/probes/ftrace/ftrace_config_muxer.cc
index 6af02b4..5374ba7 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.cc
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.cc
@@ -228,23 +228,25 @@
     : ftrace_(ftrace), table_(table), current_state_(), configs_() {}
 FtraceConfigMuxer::~FtraceConfigMuxer() = default;
 
-FtraceConfigId FtraceConfigMuxer::RequestConfig(const FtraceConfig& request) {
+FtraceConfigId FtraceConfigMuxer::SetupConfig(const FtraceConfig& request) {
   FtraceConfig actual;
 
   bool is_ftrace_enabled = ftrace_->IsTracingEnabled();
   if (configs_.empty()) {
+    PERFETTO_DCHECK(active_configs_.empty());
     PERFETTO_DCHECK(!current_state_.tracing_on);
 
     // If someone outside of perfetto is using ftrace give up now.
     if (is_ftrace_enabled)
       return 0;
 
-    // If we're about to turn tracing on use this opportunity do some setup:
+    // Setup ftrace, without starting it. Setting buffers can be quite slow
+    // (up to hundreds of ms).
     SetupClock(request);
     SetupBufferSize(request);
   } else {
     // Did someone turn ftrace off behind our back? If so give up.
-    if (!is_ftrace_enabled)
+    if (!active_configs_.empty() && !is_ftrace_enabled)
       return 0;
   }
 
@@ -272,17 +274,34 @@
     }
   }
 
-  if (configs_.empty()) {
-    PERFETTO_DCHECK(!current_state_.tracing_on);
-    ftrace_->EnableTracing();
-    current_state_.tracing_on = true;
-  }
-
   FtraceConfigId id = ++last_id_;
   configs_.emplace(id, std::move(actual));
   return id;
 }
 
+bool FtraceConfigMuxer::ActivateConfig(FtraceConfigId id) {
+  if (!id || configs_.count(id) == 0) {
+    PERFETTO_DCHECK(false);
+    return false;
+  }
+
+  active_configs_.insert(id);
+  if (active_configs_.size() > 1) {
+    PERFETTO_DCHECK(current_state_.tracing_on);
+    return true;  // We are not the first, ftrace is already enabled. All done.
+  }
+
+  PERFETTO_DCHECK(!current_state_.tracing_on);
+  if (ftrace_->IsTracingEnabled()) {
+    // If someone outside of perfetto is using ftrace give up now.
+    return false;
+  }
+
+  ftrace_->EnableTracing();
+  current_state_.tracing_on = true;
+  return true;
+}
+
 bool FtraceConfigMuxer::RemoveConfig(FtraceConfigId id) {
   if (!id || !configs_.erase(id))
     return false;
@@ -305,13 +324,25 @@
       current_state_.ftrace_events.erase(name);
   }
 
-  if (configs_.empty()) {
+  // If there aren't any more active configs, disable ftrace.
+  auto active_it = active_configs_.find(id);
+  if (active_it != active_configs_.end()) {
     PERFETTO_DCHECK(current_state_.tracing_on);
-    ftrace_->DisableTracing();
+    active_configs_.erase(active_it);
+    if (active_configs_.empty()) {
+      // This was the last active config, disable ftrace.
+      ftrace_->DisableTracing();
+      current_state_.tracing_on = false;
+    }
+  }
+
+  // Even if we don't have any other active configs, we might still have idle
+  // configs around. Tear down the rest of the ftrace config only if all
+  // configs are removed.
+  if (configs_.empty()) {
     ftrace_->SetCpuBufferSizeInPages(0);
     ftrace_->DisableAllEvents();
     ftrace_->ClearTrace();
-    current_state_.tracing_on = false;
     if (current_state_.atrace_on)
       DisableAtrace();
   }
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.h b/src/traced/probes/ftrace/ftrace_config_muxer.h
index e8164b9..85ad05b 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.h
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.h
@@ -17,6 +17,9 @@
 #ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_CONFIG_MUXER_H_
 #define SRC_TRACED_PROBES_FTRACE_FTRACE_CONFIG_MUXER_H_
 
+#include <map>
+#include <set>
+
 #include "src/traced/probes/ftrace/ftrace_config.h"
 #include "src/traced/probes/ftrace/ftrace_controller.h"
 #include "src/traced/probes/ftrace/ftrace_procfs.h"
@@ -29,7 +32,7 @@
 // messing with the ftrace settings at the same time as us.
 
 // Specifically FtraceConfigMuxer takes in a *requested* FtraceConfig
-// (|RequestConfig|), makes a best effort attempt to modify the ftrace
+// (|SetupConfig|), makes a best effort attempt to modify the ftrace
 // debugfs files to honor those settings without interupting other perfetto
 // traces already in progress or other users of ftrace, then returns an
 // FtraceConfigId representing that config or zero on failure.
@@ -53,11 +56,14 @@
   // If someone else is tracing we won't touch atrace (since it resets the
   // buffer).
   // To see the config you ended up with use |GetConfig|.
-  FtraceConfigId RequestConfig(const FtraceConfig& request);
+  FtraceConfigId SetupConfig(const FtraceConfig& request);
+
+  // Activate ftrace for the given config (if not already active).
+  bool ActivateConfig(FtraceConfigId);
 
   // Undo changes for the given config. Returns false iff the id is 0
   // or already removed.
-  bool RemoveConfig(FtraceConfigId id);
+  bool RemoveConfig(FtraceConfigId);
 
   // public for testing
   void SetupClockForTesting(const FtraceConfig& request) {
@@ -91,7 +97,15 @@
   const ProtoTranslationTable* table_;
 
   FtraceState current_state_;
+
+  // Set of all configurations. Note that not all of them might be active.
+  // When a config is present but not active, we do setup buffer sizes and
+  // events, but don't enable ftrace (i.e. tracing_on).
   std::map<FtraceConfigId, FtraceConfig> configs_;
+
+  // Subset of |configs_| that are currently active. At any time ftrace is
+  // enabled iff |active_configs_| is not empty.
+  std::set<FtraceConfigId> active_configs_;
 };
 
 std::set<std::string> GetFtraceEvents(const FtraceConfig& request,
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc b/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc
index b67c8a0..3e3fe28 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc
@@ -163,14 +163,16 @@
       .Times(AnyNumber());
 
   EXPECT_CALL(ftrace, ReadOneCharFromFile("/root/tracing_on"))
-      .WillOnce(Return('0'));
+      .Times(2)
+      .WillRepeatedly(Return('0'));
   EXPECT_CALL(ftrace, WriteToFile("/root/buffer_size_kb", "512"));
   EXPECT_CALL(ftrace, WriteToFile("/root/trace_clock", "boot"));
   EXPECT_CALL(ftrace, WriteToFile("/root/tracing_on", "1"));
   EXPECT_CALL(ftrace,
               WriteToFile("/root/events/sched/sched_switch/enable", "1"));
-  FtraceConfigId id = model.RequestConfig(config);
+  FtraceConfigId id = model.SetupConfig(config);
   ASSERT_TRUE(id);
+  ASSERT_TRUE(model.ActivateConfig(id));
 
   const FtraceConfig* actual_config = model.GetConfig(id);
   EXPECT_TRUE(actual_config);
@@ -198,7 +200,7 @@
   // If someone is using ftrace already don't stomp on what they are doing.
   EXPECT_CALL(ftrace, ReadOneCharFromFile("/root/tracing_on"))
       .WillOnce(Return('1'));
-  FtraceConfigId id = model.RequestConfig(config);
+  FtraceConfigId id = model.SetupConfig(config);
   ASSERT_FALSE(id);
 }
 
@@ -219,7 +221,7 @@
                   {"atrace", "--async_start", "--only_userspace", "sched"})))
       .WillOnce(Return(true));
 
-  FtraceConfigId id = model.RequestConfig(config);
+  FtraceConfigId id = model.SetupConfig(config);
   ASSERT_TRUE(id);
 
   const FtraceConfig* actual_config = model.GetConfig(id);
@@ -253,7 +255,7 @@
            "com.google.android.gms.persistent,com.google.android.gms"})))
       .WillOnce(Return(true));
 
-  FtraceConfigId id = model.RequestConfig(config);
+  FtraceConfigId id = model.SetupConfig(config);
   ASSERT_TRUE(id);
 
   const FtraceConfig* actual_config = model.GetConfig(id);
diff --git a/src/traced/probes/ftrace/ftrace_controller.cc b/src/traced/probes/ftrace/ftrace_controller.cc
index 86685bc..5cff028 100644
--- a/src/traced/probes/ftrace/ftrace_controller.cc
+++ b/src/traced/probes/ftrace/ftrace_controller.cc
@@ -145,6 +145,7 @@
   for (const auto* data_source : data_sources_)
     ftrace_config_muxer_->RemoveConfig(data_source->config_id());
   data_sources_.clear();
+  started_data_sources_.clear();
   StopIfNeeded();
 }
 
@@ -180,7 +181,7 @@
       continue;
     // This method reads the pipe and converts the raw ftrace data into
     // protobufs using the |data_source|'s TraceWriter.
-    ctrl->cpu_readers_[cpu]->Drain(ctrl->data_sources_);
+    ctrl->cpu_readers_[cpu]->Drain(ctrl->started_data_sources_);
     ctrl->OnDrainCpuForTesting(cpu);
   }
 
@@ -206,9 +207,9 @@
 }
 
 void FtraceController::StartIfNeeded() {
-  if (data_sources_.size() > 1)
+  if (started_data_sources_.size() > 1)
     return;
-  PERFETTO_CHECK(!data_sources_.empty());
+  PERFETTO_DCHECK(!started_data_sources_.empty());
   {
     std::unique_lock<std::mutex> lock(lock_);
     PERFETTO_CHECK(!listening_for_raw_trace_data_);
@@ -249,14 +250,16 @@
 }
 
 void FtraceController::StopIfNeeded() {
-  if (!data_sources_.empty())
+  if (!started_data_sources_.empty())
     return;
+
   {
     // Unblock any readers that are waiting for us to drain data.
     std::unique_lock<std::mutex> lock(lock_);
     listening_for_raw_trace_data_ = false;
     cpus_to_drain_.reset();
   }
+
   data_drained_.notify_all();
   cpu_readers_.clear();
 }
@@ -297,7 +300,7 @@
   if (!ValidConfig(data_source->config()))
     return false;
 
-  auto config_id = ftrace_config_muxer_->RequestConfig(data_source->config());
+  auto config_id = ftrace_config_muxer_->SetupConfig(data_source->config());
   if (!config_id)
     return false;
 
@@ -305,13 +308,27 @@
       *table_, FtraceEventsAsSet(*ftrace_config_muxer_->GetConfig(config_id))));
   auto it_and_inserted = data_sources_.insert(data_source);
   PERFETTO_DCHECK(it_and_inserted.second);
-  StartIfNeeded();
   data_source->Initialize(config_id, std::move(filter));
   return true;
 }
 
+bool FtraceController::StartDataSource(FtraceDataSource* data_source) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+
+  FtraceConfigId config_id = data_source->config_id();
+  PERFETTO_CHECK(config_id);
+
+  if (!ftrace_config_muxer_->ActivateConfig(config_id))
+    return false;
+
+  started_data_sources_.insert(data_source);
+  StartIfNeeded();
+  return true;
+}
+
 void FtraceController::RemoveDataSource(FtraceDataSource* data_source) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
+  started_data_sources_.erase(data_source);
   size_t removed = data_sources_.erase(data_source);
   if (!removed)
     return;  // Can happen if AddDataSource failed (e.g. too many sessions).
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index b950e31..fae0a68 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -64,6 +64,7 @@
   void ClearTrace();
 
   bool AddDataSource(FtraceDataSource*) PERFETTO_WARN_UNUSED_RESULT;
+  bool StartDataSource(FtraceDataSource*);
   void RemoveDataSource(FtraceDataSource*);
 
   void DumpFtraceStats(FtraceStats*);
@@ -123,6 +124,7 @@
   bool atrace_running_ = false;
   std::map<size_t, std::unique_ptr<CpuReader>> cpu_readers_;
   std::set<FtraceDataSource*> data_sources_;
+  std::set<FtraceDataSource*> started_data_sources_;
   base::WeakPtrFactory<FtraceController> weak_factory_;  // Keep last.
   PERFETTO_THREAD_CHECKER(thread_checker_)
 };
diff --git a/src/traced/probes/ftrace/ftrace_controller_unittest.cc b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
index a441b2e..9e74a0a 100644
--- a/src/traced/probes/ftrace/ftrace_controller_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
@@ -302,12 +302,14 @@
 
   FtraceConfig config = CreateFtraceConfig({"foo"});
 
-  EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
   auto data_source = controller->AddFakeDataSource(config);
   ASSERT_TRUE(data_source);
 
+  EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
+  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
+
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", "0"));
   EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
       .WillOnce(Return(true));
@@ -330,12 +332,16 @@
   FtraceConfig configA = CreateFtraceConfig({"foo"});
   FtraceConfig configB = CreateFtraceConfig({"foo", "bar"});
 
-  EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
   auto data_sourceA = controller->AddFakeDataSource(configA);
   EXPECT_CALL(*controller->procfs(), WriteToFile(kBarEnablePath, "1"));
   auto data_sourceB = controller->AddFakeDataSource(configB);
+
+  EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
+  ASSERT_TRUE(controller->StartDataSource(data_sourceA.get()));
+  ASSERT_TRUE(controller->StartDataSource(data_sourceB.get()));
+
   data_sourceA.reset();
 
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
@@ -356,10 +362,12 @@
   FtraceConfig config = CreateFtraceConfig({"foo"});
 
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
-  EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
   auto data_source = controller->AddFakeDataSource(config);
 
+  EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
+  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
+
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
   EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
       .WillOnce(Return(true));
@@ -383,6 +391,7 @@
 
   FtraceConfig config = CreateFtraceConfig({"foo"});
   auto data_source = controller->AddFakeDataSource(config);
+  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
 
   // Only one call to drain should be scheduled for the next drain period.
   EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100));
@@ -427,6 +436,7 @@
 
   FtraceConfig config = CreateFtraceConfig({"foo"});
   auto data_source = controller->AddFakeDataSource(config);
+  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
 
   // Test several cycles of a worker producing data and make sure the drain
   // delay is consistent with the drain period.
@@ -469,6 +479,7 @@
   EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100)).Times(2);
   FtraceConfig config = CreateFtraceConfig({"foo"});
   auto data_source = controller->AddFakeDataSource(config);
+  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
 
   auto on_data_available = controller->GetDataAvailableCallback(0u);
   std::thread worker([on_data_available] { on_data_available(); });
@@ -482,6 +493,7 @@
 
   // Register another data source and wait for it to generate data.
   data_source = controller->AddFakeDataSource(config);
+  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
   std::thread worker2([on_data_available] { on_data_available(); });
   controller->WaitForData(0u);
 
@@ -506,6 +518,7 @@
                 WriteToFile("/root/buffer_size_kb", "512"));
     FtraceConfig config = CreateFtraceConfig({"foo"});
     auto data_source = controller->AddFakeDataSource(config);
+    ASSERT_TRUE(controller->StartDataSource(data_source.get()));
   }
 
   {
@@ -515,6 +528,7 @@
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_buffer_size_kb(10 * 1024 * 1024);
     auto data_source = controller->AddFakeDataSource(config);
+    ASSERT_TRUE(controller->StartDataSource(data_source.get()));
   }
 
   {
@@ -525,6 +539,7 @@
     ON_CALL(*controller->procfs(), NumberOfCpus()).WillByDefault(Return(2));
     config.set_buffer_size_kb(65 * 1024);
     auto data_source = controller->AddFakeDataSource(config);
+    ASSERT_TRUE(controller->StartDataSource(data_source.get()));
   }
 
   {
@@ -534,6 +549,7 @@
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_buffer_size_kb(1);
     auto data_source = controller->AddFakeDataSource(config);
+    ASSERT_TRUE(controller->StartDataSource(data_source.get()));
   }
 
   {
@@ -543,6 +559,7 @@
     FtraceConfig config = CreateFtraceConfig({"foo"});
     config.set_buffer_size_kb(42);
     auto data_source = controller->AddFakeDataSource(config);
+    ASSERT_TRUE(controller->StartDataSource(data_source.get()));
   }
 
   {
@@ -553,6 +570,7 @@
     ON_CALL(*controller->procfs(), NumberOfCpus()).WillByDefault(Return(2));
     config.set_buffer_size_kb(42);
     auto data_source = controller->AddFakeDataSource(config);
+    ASSERT_TRUE(controller->StartDataSource(data_source.get()));
   }
 }
 
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index da5391e..d7bc843 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -45,8 +45,18 @@
 
 void FtraceDataSource::Initialize(FtraceConfigId config_id,
                                   std::unique_ptr<EventFilter> event_filter) {
+  PERFETTO_CHECK(config_id);
   config_id_ = config_id;
   event_filter_ = std::move(event_filter);
+}
+
+void FtraceDataSource::Start() {
+  FtraceController* ftrace = controller_weak_.get();
+  if (!ftrace)
+    return;
+  PERFETTO_CHECK(config_id_);  // Must be initialized at this point.
+  if (!ftrace->StartDataSource(this))
+    return;
   DumpFtraceStats(&stats_before_);
 }
 
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index 3def5fd..e576a5a 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -62,6 +62,9 @@
   // source, to inject ftrace dependencies.
   void Initialize(FtraceConfigId, std::unique_ptr<EventFilter>);
 
+  // ProbesDataSource implementation.
+  void Start() override;
+
   // Flushes the ftrace buffers into the userspace trace buffers and writes
   // also ftrace stats.
   void Flush() override;
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
index 3e823e0..13d47e1 100644
--- a/src/traced/probes/probes_data_source.h
+++ b/src/traced/probes/probes_data_source.h
@@ -28,10 +28,12 @@
   ProbesDataSource(TracingSessionID, int type_id);
   virtual ~ProbesDataSource();
 
+  virtual void Start() = 0;
   virtual void Flush() = 0;
 
   const TracingSessionID tracing_session_id;
   const int type_id;
+  bool started = false;  // Set by probes_producer.cc.
 
  private:
   ProbesDataSource(const ProbesDataSource&) = delete;
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 5b37165..d91f066 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -129,8 +129,10 @@
   ConnectWithRetries(socket_name, task_runner);
 }
 
-void ProbesProducer::StartDataSource(DataSourceInstanceID instance_id,
+void ProbesProducer::SetupDataSource(DataSourceInstanceID instance_id,
                                      const DataSourceConfig& config) {
+  PERFETTO_DLOG("SetupDataSource(id=%" PRIu64 ", name=%s)", instance_id,
+                config.name().c_str());
   PERFETTO_DCHECK(data_sources_.count(instance_id) == 0);
   TracingSessionID session_id = config.tracing_session_id();
   PERFETTO_CHECK(session_id > 0);
@@ -153,12 +155,28 @@
 
   session_data_sources_.emplace(session_id, data_source.get());
   data_sources_[instance_id] = std::move(data_source);
+}
 
+void ProbesProducer::StartDataSource(DataSourceInstanceID instance_id,
+                                     const DataSourceConfig& config) {
+  PERFETTO_DLOG("StartDataSource(id=%" PRIu64 ", name=%s)", instance_id,
+                config.name().c_str());
+  auto it = data_sources_.find(instance_id);
+  if (it == data_sources_.end()) {
+    // Can happen if SetupDataSource() failed (e.g. ftrace was busy).
+    PERFETTO_ELOG("Data source id=%" PRIu64 " not found", instance_id);
+    return;
+  }
+  ProbesDataSource* data_source = it->second.get();
+  if (data_source->started)
+    return;
   if (config.trace_duration_ms() != 0) {
     uint32_t timeout = 5000 + 2 * config.trace_duration_ms();
     watchdogs_.emplace(
         instance_id, base::Watchdog::GetInstance()->CreateFatalTimer(timeout));
   }
+  data_source->started = true;
+  data_source->Start();
 }
 
 std::unique_ptr<ProbesDataSource> ProbesProducer::CreateFtraceDataSource(
@@ -185,7 +203,7 @@
     ftrace_->ClearTrace();
   }
 
-  PERFETTO_LOG("Ftrace start (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
+  PERFETTO_LOG("Ftrace setup (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
                config.target_buffer());
   const BufferID buffer_id = static_cast<BufferID>(config.target_buffer());
   std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
@@ -193,7 +211,7 @@
       endpoint_->CreateTraceWriter(buffer_id)));
   if (!ftrace_->AddDataSource(data_source.get())) {
     PERFETTO_ELOG(
-        "Failed to start tracing (too many concurrent sessions or ftrace is "
+        "Failed to setup tracing (too many concurrent sessions or ftrace is "
         "already in use)");
     return nullptr;
   }
@@ -204,7 +222,7 @@
     TracingSessionID session_id,
     DataSourceInstanceID id,
     DataSourceConfig source_config) {
-  PERFETTO_LOG("Inode file map start (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
+  PERFETTO_LOG("Inode file map setup (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
                id, source_config.target_buffer());
   auto buffer_id = static_cast<BufferID>(source_config.target_buffer());
   if (system_inodes_.empty())
@@ -220,13 +238,8 @@
     const DataSourceConfig& config) {
   base::ignore_result(id);
   auto buffer_id = static_cast<BufferID>(config.target_buffer());
-  auto data_source =
-      std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
-          session_id, endpoint_->CreateTraceWriter(buffer_id), config));
-  if (config.process_stats_config().scan_all_processes_on_start()) {
-    data_source->WriteAllProcesses();
-  }
-  return std::move(data_source);
+  return std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
+      session_id, endpoint_->CreateTraceWriter(buffer_id), config));
 }
 
 std::unique_ptr<SysStatsDataSource> ProbesProducer::CreateSysStatsDataSource(
@@ -235,16 +248,16 @@
     const DataSourceConfig& config) {
   base::ignore_result(id);
   auto buffer_id = static_cast<BufferID>(config.target_buffer());
-  auto data_source = std::unique_ptr<SysStatsDataSource>(
+  return std::unique_ptr<SysStatsDataSource>(
       new SysStatsDataSource(task_runner_, session_id,
                              endpoint_->CreateTraceWriter(buffer_id), config));
-  return data_source;
 }
 
 void ProbesProducer::StopDataSource(DataSourceInstanceID id) {
   PERFETTO_LOG("Producer stop (id=%" PRIu64 ")", id);
   auto it = data_sources_.find(id);
   if (it == data_sources_.end()) {
+    // Can happen if SetupDataSource() failed (e.g. ftrace was busy).
     PERFETTO_ELOG("Cannot stop data source id=%" PRIu64 ", not found", id);
     return;
   }
@@ -268,7 +281,7 @@
                            size_t num_data_sources) {
   for (size_t i = 0; i < num_data_sources; i++) {
     auto it = data_sources_.find(data_source_ids[i]);
-    if (it == data_sources_.end())
+    if (it == data_sources_.end() || !it->second->started)
       continue;
     it->second->Flush();
   }
@@ -288,7 +301,7 @@
   // unordered_multimap guarantees that entries with the same key are contiguous
   // in the iteration.
   for (auto it = session_data_sources_.begin(); /* check below*/; it++) {
-    // If this is the last iteration or this is the session id has changed,
+    // If this is the last iteration or the session id has changed,
     // dispatch the metadata update to the linked data sources, if any.
     if (it == session_data_sources_.end() || it->first != last_session_id) {
       bool has_inodes = metadata && !metadata->inode_and_device.empty();
@@ -307,6 +320,8 @@
       last_session_id = it->first;
     }
     ProbesDataSource* ds = it->second;
+    if (!ds->started)
+      continue;
     switch (ds->type_id) {
       case FtraceDataSource::kTypeId:
         metadata = static_cast<FtraceDataSource*>(ds)->mutable_metadata();
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index e7b9403..14737d6 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -49,6 +49,7 @@
   // Producer Impl:
   void OnConnect() override;
   void OnDisconnect() override;
+  void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
   void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
   void StopDataSource(DataSourceInstanceID) override;
   void OnTracingSetup() override;
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index 80d125b..160484f 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -75,10 +75,11 @@
     const DataSourceConfig& config)
     : ProbesDataSource(session_id, kTypeId),
       writer_(std::move(writer)),
-      config_(config),
       record_thread_names_(config.process_stats_config().record_thread_names()),
+      dump_all_procs_on_start_(
+          config.process_stats_config().scan_all_processes_on_start()),
       weak_factory_(this) {
-  const auto& quirks = config_.process_stats_config().quirks();
+  const auto& quirks = config.process_stats_config().quirks();
   enable_on_demand_dumps_ =
       (std::find(quirks.begin(), quirks.end(),
                  ProcessStatsConfig::DISABLE_ON_DEMAND) == quirks.end());
@@ -86,6 +87,11 @@
 
 ProcessStatsDataSource::~ProcessStatsDataSource() = default;
 
+void ProcessStatsDataSource::Start() {
+  if (dump_all_procs_on_start_)
+    WriteAllProcesses();
+}
+
 base::WeakPtr<ProcessStatsDataSource> ProcessStatsDataSource::GetWeakPtr()
     const {
   return weak_factory_.GetWeakPtr();
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 866d2cf..57f0324 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -44,16 +44,17 @@
                          const DataSourceConfig&);
   ~ProcessStatsDataSource() override;
 
-  const DataSourceConfig& config() const { return config_; }
-
   base::WeakPtr<ProcessStatsDataSource> GetWeakPtr() const;
   void WriteAllProcesses();
   void OnPids(const std::vector<int32_t>& pids);
-  void Flush() override;
 
   // Virtual for testing.
   virtual std::string ReadProcPidFile(int32_t pid, const std::string& file);
 
+  // ProbesDataSource implementation.
+  void Start() override;
+  void Flush() override;
+
  private:
   ProcessStatsDataSource(const ProcessStatsDataSource&) = delete;
   ProcessStatsDataSource& operator=(const ProcessStatsDataSource&) = delete;
@@ -67,11 +68,11 @@
   void FinalizeCurPsTree();
 
   std::unique_ptr<TraceWriter> writer_;
-  const DataSourceConfig config_;
   TraceWriter::TracePacketHandle cur_packet_;
   protos::pbzero::ProcessTree* cur_ps_tree_ = nullptr;
   bool record_thread_names_ = false;
   bool enable_on_demand_dumps_ = true;
+  bool dump_all_procs_on_start_ = false;
 
   // This set contains PIDs as per the Linux kernel notion of a PID (which is
   // really a TID). In practice this set will contain all TIDs for all processes
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.cc b/src/traced/probes/sys_stats/sys_stats_data_source.cc
index ebc3733..cd6efd1 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.cc
@@ -125,6 +125,9 @@
   meminfo_ticks_ = ticks[0];
   vmstat_ticks_ = ticks[1];
   stat_ticks_ = ticks[2];
+}
+
+void SysStatsDataSource::Start() {
   auto weak_this = GetWeakPtr();
   task_runner_->PostTask(std::bind(&SysStatsDataSource::Tick, weak_this));
 }
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.h b/src/traced/probes/sys_stats/sys_stats_data_source.h
index a5ea4f7..6081374 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.h
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.h
@@ -55,6 +55,7 @@
   ~SysStatsDataSource() override;
 
   // ProbesDataSource implementation.
+  void Start() override;
   void Flush() override;
 
   base::WeakPtr<SysStatsDataSource> GetWeakPtr() const;
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc b/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
index e2c18d9..763d8d1 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
@@ -209,6 +209,7 @@
     auto instance = std::unique_ptr<SysStatsDataSource>(new SysStatsDataSource(
         &task_runner_, 0, std::move(writer), cfg, MockOpenReadOnly));
     instance->set_ns_per_user_hz_for_testing(1000000000ull / 100);  // 100 Hz.
+    instance->Start();
     return instance;
   }
 
diff --git a/src/tracing/core/patch_list.h b/src/tracing/core/patch_list.h
index 5cef17d..bdfd028 100644
--- a/src/tracing/core/patch_list.h
+++ b/src/tracing/core/patch_list.h
@@ -53,7 +53,7 @@
   }
 
  private:
-  Patch& operator=(const Patch&) = default;
+  Patch& operator=(const Patch&) = delete;
   Patch(Patch&&) noexcept = delete;
   Patch& operator=(Patch&&) = delete;
 };
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 8787672..5a0ca16 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -154,8 +154,13 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
+  // Calling StartTracing() should be a noop (% a DLOG statement) because the
+  // trace config didn't have the |deferred_start| flag set.
+  consumer->StartTracing();
+
   consumer->DisableTracing();
   producer->WaitForDataSourceStop("data_source");
   consumer->WaitForTracingDisabled();
@@ -178,6 +183,7 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<MockProducer> producer_otheruid = CreateMockProducer();
@@ -195,6 +201,7 @@
   trace_config.set_lockdown_mode(
       TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR);
   consumer->EnableTracing(trace_config);
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<MockProducer> producer_otheruid2 = CreateMockProducer();
@@ -220,6 +227,7 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   // Disconnecting the consumer while tracing should trigger data source
@@ -243,6 +251,7 @@
   consumer->EnableTracing(trace_config);
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   // Disconnecting and reconnecting a producer with a matching data source.
@@ -252,6 +261,7 @@
   producer->Connect(svc.get(), "mock_producer_2");
   producer->RegisterDataSource("data_source");
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 }
 
@@ -303,6 +313,7 @@
   consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
 
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   static const char kPayload[] = "1234567890abcdef-";
@@ -397,12 +408,15 @@
   size_t actual_page_sizes_kb[kNumProducers]{};
   for (size_t i = 0; i < kNumProducers; i++) {
     producer[i]->WaitForTracingSetup();
-    producer[i]->WaitForDataSourceStart("data_source");
+    producer[i]->WaitForDataSourceSetup("data_source");
     actual_shm_sizes_kb[i] =
         producer[i]->endpoint()->shared_memory()->size() / 1024;
     actual_page_sizes_kb[i] =
         producer[i]->endpoint()->shared_buffer_page_size_kb();
   }
+  for (size_t i = 0; i < kNumProducers; i++) {
+    producer[i]->WaitForDataSourceStart("data_source");
+  }
   ASSERT_THAT(actual_page_sizes_kb, ElementsAreArray(kExpectedPageSizesKb));
   ASSERT_THAT(actual_shm_sizes_kb, ElementsAreArray(kExpectedSizesKb));
 }
@@ -422,6 +436,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -460,6 +475,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -498,6 +514,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -564,6 +581,11 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+
+  producer->WaitForDataSourceSetup("ds_will_ack_1");
+  producer->WaitForDataSourceSetup("ds_wont_ack");
+  producer->WaitForDataSourceSetup("ds_will_ack_2");
+
   producer->WaitForDataSourceStart("ds_will_ack_1");
   producer->WaitForDataSourceStart("ds_wont_ack");
   producer->WaitForDataSourceStart("ds_will_ack_2");
@@ -636,6 +658,7 @@
 
   consumer->EnableTracing(trace_config);
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<TraceWriter> writer =
@@ -675,11 +698,15 @@
 
     if (i == 0)
       producer1->WaitForTracingSetup();
-    producer1->WaitForDataSourceStart("ds_1A");
-    producer1->WaitForDataSourceStart("ds_1B");
 
+    producer1->WaitForDataSourceSetup("ds_1A");
+    producer1->WaitForDataSourceSetup("ds_1B");
     if (i == 0)
       producer2->WaitForTracingSetup();
+    producer2->WaitForDataSourceSetup("ds_2A");
+
+    producer1->WaitForDataSourceStart("ds_1A");
+    producer1->WaitForDataSourceStart("ds_1B");
     producer2->WaitForDataSourceStart("ds_2A");
 
     auto* ds1 = producer1->GetDataSourceInstance("ds_1A");
@@ -723,6 +750,7 @@
   base::TempFile tmp_file = base::TempFile::Create();
   consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
   producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
   producer->WaitForDataSourceStart("data_source");
 
   // Write some variable length payload, waiting for sync markers every now
@@ -777,4 +805,42 @@
   EXPECT_EQ(whole_trace.SerializeAsString(), merged_trace.SerializeAsString());
 }
 
+// Creates a tracing session with |deferred_start| and checks that data sources
+// are started only after calling StartTracing().
+TEST_F(TracingServiceImplTest, DeferredStart) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+
+  // Create two data sources but enable only one of them.
+  producer->RegisterDataSource("ds_1");
+  producer->RegisterDataSource("ds_2");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  trace_config.add_data_sources()->mutable_config()->set_name("ds_1");
+  trace_config.set_deferred_start(true);
+  trace_config.set_duration_ms(1);
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+
+  producer->WaitForDataSourceSetup("ds_1");
+
+  // Make sure we don't get unexpected DataSourceStart() notifications yet.
+  task_runner.RunUntilIdle();
+
+  consumer->StartTracing();
+
+  producer->WaitForDataSourceStart("ds_1");
+
+  auto writer1 = producer->CreateTraceWriter("ds_1");
+  producer->WaitForFlush(writer1.get());
+
+  producer->WaitForDataSourceStop("ds_1");
+  consumer->WaitForTracingDisabled();
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/core/trace_config.cc b/src/tracing/core/trace_config.cc
index b4d0081..a5cf0bb 100644
--- a/src/tracing/core/trace_config.cc
+++ b/src/tracing/core/trace_config.cc
@@ -92,6 +92,11 @@
       static_cast<decltype(max_file_size_bytes_)>(proto.max_file_size_bytes());
 
   guardrail_overrides_.FromProto(proto.guardrail_overrides());
+
+  static_assert(sizeof(deferred_start_) == sizeof(proto.deferred_start()),
+                "size mismatch");
+  deferred_start_ =
+      static_cast<decltype(deferred_start_)>(proto.deferred_start());
   unknown_fields_ = proto.unknown_fields();
 }
 
@@ -152,6 +157,11 @@
           max_file_size_bytes_));
 
   guardrail_overrides_.ToProto(proto->mutable_guardrail_overrides());
+
+  static_assert(sizeof(deferred_start_) == sizeof(proto->deferred_start()),
+                "size mismatch");
+  proto->set_deferred_start(
+      static_cast<decltype(proto->deferred_start())>(deferred_start_));
   *(proto->mutable_unknown_fields()) = unknown_fields_;
 }
 
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index 2483235..9cab744 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -238,6 +238,11 @@
   }
 
   if (cfg.enable_extra_guardrails()) {
+    if (cfg.deferred_start()) {
+      PERFETTO_ELOG(
+          "deferred_start=true is not supported in unsupervised traces");
+      return false;
+    }
     if (cfg.duration_ms() > kMaxTracingDurationMillis) {
       PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms  > %" PRIu64
                     " ms)",
@@ -340,7 +345,7 @@
 
   consumer->tracing_session_id_ = tsid;
 
-  // Enable the data sources on the producers.
+  // Setup the data sources on the producers without starting them.
   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
     // Scan all the registered data sources with a matching name.
     auto range = data_sources_.equal_range(cfg_data_source.config().name());
@@ -353,13 +358,45 @@
           break;
         }
       }
-      StartDataSource(cfg_data_source, producer_config, it->second,
+      SetupDataSource(cfg_data_source, producer_config, it->second,
                       tracing_session);
     }
   }
 
+  tracing_session->pending_stop_acks.clear();
+  tracing_session->state = TracingSession::CONFIGURED;
+  PERFETTO_LOG(
+      "Configured tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
+      "buffer size:%zu KB, total sessions:%zu",
+      cfg.data_sources().size(), cfg.duration_ms(), cfg.buffers_size(),
+      total_buf_size_kb, tracing_sessions_.size());
+
+  // Start the data sources, unless this is a case of early setup + fast
+  // triggering, using TraceConfig.deferred_start.
+  if (!cfg.deferred_start())
+    return StartTracing(tsid);
+
+  return true;
+}
+
+bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  TracingSession* tracing_session = GetTracingSession(tsid);
+  if (!tracing_session) {
+    PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
+    return false;
+  }
+
+  if (tracing_session->state != TracingSession::CONFIGURED) {
+    PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
+                  tracing_session->state);
+    return false;
+  }
+
+  tracing_session->state = TracingSession::STARTED;
+
   // Trigger delayed task if the trace is time limited.
-  const uint32_t trace_duration_ms = cfg.duration_ms();
+  const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
   if (trace_duration_ms > 0) {
     auto weak_this = weak_ptr_factory_.GetWeakPtr();
     task_runner_->PostDelayedTask(
@@ -371,7 +408,7 @@
   }
 
   // Start the periodic drain tasks if we should to save the trace into a file.
-  if (cfg.write_into_file()) {
+  if (tracing_session->config.write_into_file()) {
     auto weak_this = weak_ptr_factory_.GetWeakPtr();
     task_runner_->PostDelayedTask(
         [weak_this, tsid] {
@@ -381,13 +418,16 @@
         tracing_session->delay_to_next_write_period_ms());
   }
 
-  tracing_session->pending_stop_acks.clear();
-  tracing_session->state = TracingSession::ENABLED;
-  PERFETTO_LOG(
-      "Enabled tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
-      "buffer size:%zu KB, total sessions:%zu",
-      cfg.data_sources().size(), trace_duration_ms, cfg.buffers_size(),
-      total_buf_size_kb, tracing_sessions_.size());
+  for (const auto& kv : tracing_session->data_source_instances) {
+    ProducerID producer_id = kv.first;
+    const DataSourceInstance& data_source = kv.second;
+    ProducerEndpointImpl* producer = GetProducer(producer_id);
+    if (!producer) {
+      PERFETTO_DCHECK(false);
+      continue;
+    }
+    producer->StartDataSource(data_source.instance_id, data_source.config);
+  }
   return true;
 }
 
@@ -427,8 +467,15 @@
         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
       return;
 
+    // Continues below.
+    case TracingSession::CONFIGURED:
+      // If the session didn't even start there is no need to orchestrate a
+      // graceful stop of data sources.
+      disable_immediately = true;
+      break;
+
     // This is the nominal case, continues below.
-    case TracingSession::ENABLED:
+    case TracingSession::STARTED:
       break;
   }
 
@@ -441,7 +488,7 @@
       tracing_session->pending_stop_acks.insert(
           std::make_pair(producer_id, ds_inst_id));
     }
-    producer->TearDownDataSource(ds_inst_id);
+    producer->StopDataSource(ds_inst_id);
   }
   tracing_session->data_source_instances.clear();
 
@@ -497,8 +544,9 @@
   PERFETTO_DCHECK_THREAD(thread_checker_);
   TracingSession* tracing_session = GetTracingSession(tsid);
   if (!tracing_session ||
-      tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS)
+      tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
     return;  // Tracing session was successfully disabled.
+  }
 
   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
                 tsid);
@@ -798,7 +846,7 @@
     if (stop_writing_into_file) {
       tracing_session->write_into_file.reset();
       tracing_session->write_period_ms = 0;
-      if (tracing_session->state == TracingSession::ENABLED)
+      if (tracing_session->state == TracingSession::STARTED)
         DisableTracing(tsid);
       return;
     }
@@ -873,8 +921,10 @@
 
   for (auto& iter : tracing_sessions_) {
     TracingSession& tracing_session = iter.second;
-    if (tracing_session.state != TracingSession::ENABLED)
+    if (tracing_session.state != TracingSession::STARTED &&
+        tracing_session.state != TracingSession::CONFIGURED) {
       continue;
+    }
 
     TraceConfig::ProducerConfig producer_config;
     for (auto& config : tracing_session.config.producers()) {
@@ -885,9 +935,12 @@
     }
     for (const TraceConfig::DataSource& cfg_data_source :
          tracing_session.config.data_sources()) {
-      if (cfg_data_source.config().name() == desc.name())
-        StartDataSource(cfg_data_source, producer_config, reg_ds->second,
-                        &tracing_session);
+      if (cfg_data_source.config().name() != desc.name())
+        continue;
+      DataSourceInstance* ds_inst = SetupDataSource(
+          cfg_data_source, producer_config, reg_ds->second, &tracing_session);
+      if (ds_inst && tracing_session.state == TracingSession::STARTED)
+        producer->StartDataSource(ds_inst->instance_id, ds_inst->config);
     }
   }
 }
@@ -903,7 +956,7 @@
     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
       if (it->first == producer_id && it->second.data_source_name == name) {
         DataSourceInstanceID ds_inst_id = it->second.instance_id;
-        producer->TearDownDataSource(ds_inst_id);
+        producer->StopDataSource(ds_inst_id);
         it = ds_instances.erase(it);
       } else {
         ++it;
@@ -926,7 +979,7 @@
   PERFETTO_DCHECK(false);
 }
 
-void TracingServiceImpl::StartDataSource(
+TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
     const TraceConfig::DataSource& cfg_data_source,
     const TraceConfig::ProducerConfig& producer_config,
     const RegisteredDataSource& data_source,
@@ -938,7 +991,7 @@
   // ftrace, we must not enable it in that case.
   if (lockdown_mode_ && producer->uid_ != uid_) {
     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
-    return;
+    return nullptr;
   }
   // TODO(primiano): Add tests for registration ordering
   // (data sources vs consumers).
@@ -950,10 +1003,19 @@
       PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
                     cfg_data_source.config().name().c_str(),
                     producer->name_.c_str());
-      return;
+      return nullptr;
     }
   }
 
+  auto relative_buffer_id = cfg_data_source.config().target_buffer();
+  if (relative_buffer_id >= tracing_session->num_buffers()) {
+    PERFETTO_LOG(
+        "The TraceConfig for DataSource %s specified a target_buffer out of "
+        "bound (%d). Skipping it.",
+        cfg_data_source.config().name().c_str(), relative_buffer_id);
+    return nullptr;
+  }
+
   // Create a copy of the DataSourceConfig specified in the trace config. This
   // will be passed to the producer after translating the |target_buffer| id.
   // The |target_buffer| parameter passed by the consumer in the trace config is
@@ -961,27 +1023,22 @@
   // translated to the global BufferID before passing it to the producers, which
   // don't know anything about tracing sessions and consumers.
 
-  DataSourceConfig ds_config = cfg_data_source.config();  // Deliberate copy.
+  DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
+  auto insert_iter = tracing_session->data_source_instances.emplace(
+      producer->id_,
+      DataSourceInstance{inst_id,
+                         cfg_data_source.config(),  //  Deliberate copy.
+                         data_source.descriptor.name(),
+                         data_source.descriptor.will_notify_on_stop()});
+  DataSourceInstance* ds_instance = &insert_iter->second;
+  DataSourceConfig& ds_config = ds_instance->config;
   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
   ds_config.set_tracing_session_id(tracing_session->id);
-  auto relative_buffer_id = ds_config.target_buffer();
-  if (relative_buffer_id >= tracing_session->num_buffers()) {
-    PERFETTO_LOG(
-        "The TraceConfig for DataSource %s specified a target_buffer out of "
-        "bound (%d). Skipping it.",
-        ds_config.name().c_str(), relative_buffer_id);
-    return;
-  }
   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
   PERFETTO_DCHECK(global_id);
   ds_config.set_target_buffer(global_id);
 
-  DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
-  tracing_session->data_source_instances.emplace(
-      producer->id_,
-      DataSourceInstance{inst_id, data_source.descriptor.name(),
-                         data_source.descriptor.will_notify_on_stop()});
-  PERFETTO_DLOG("Starting data source %s with target buffer %" PRIu16,
+  PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
                 ds_config.name().c_str(), global_id);
   if (!producer->shared_memory()) {
     // Determine the SMB page size. Must be an integer multiple of 4k.
@@ -1011,7 +1068,8 @@
     producer->OnTracingSetup();
     UpdateMemoryGuardrail();
   }
-  producer->StartDataSource(inst_id, ds_config);
+  producer->SetupDataSource(inst_id, ds_config);
+  return ds_instance;
 }
 
 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
@@ -1311,6 +1369,15 @@
     NotifyOnTracingDisabled();
 }
 
+void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  if (!tracing_session_id_) {
+    PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
+    return;
+  }
+  service_->StartTracing(tracing_session_id_);
+}
+
 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   if (!tracing_session_id_) {
@@ -1473,7 +1540,7 @@
   return shared_buffer_page_size_kb_;
 }
 
-void TracingServiceImpl::ProducerEndpointImpl::TearDownDataSource(
+void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
     DataSourceInstanceID ds_inst_id) {
   // TODO(primiano): When we'll support tearing down the SMB, at this point we
   // should send the Producer a TearDownTracing if all its data sources have
@@ -1524,6 +1591,17 @@
   });
 }
 
+void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
+    DataSourceInstanceID ds_id,
+    const DataSourceConfig& config) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, ds_id, config] {
+    if (weak_this)
+      weak_this->producer_->SetupDataSource(ds_id, std::move(config));
+  });
+}
+
 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
     DataSourceInstanceID ds_id,
     const DataSourceConfig& config) {
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index d50057e..9c325cd 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -83,8 +83,9 @@
     size_t shared_buffer_page_size_kb() const override;
 
     void OnTracingSetup();
+    void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
     void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
-    void TearDownDataSource(DataSourceInstanceID);
+    void StopDataSource(DataSourceInstanceID);
     void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
 
    private:
@@ -122,6 +123,7 @@
 
     // TracingService::ConsumerEndpoint implementation.
     void EnableTracing(const TraceConfig&, base::ScopedFile) override;
+    void StartTracing() override;
     void DisableTracing() override;
     void ReadBuffers() override;
     void FreeBuffers() override;
@@ -167,6 +169,7 @@
   bool EnableTracing(ConsumerEndpointImpl*,
                      const TraceConfig&,
                      base::ScopedFile);
+  bool StartTracing(TracingSessionID);
   void DisableTracing(TracingSessionID, bool disable_immediately = false);
   void Flush(TracingSessionID tsid,
              uint32_t timeout_ms,
@@ -201,7 +204,12 @@
 
   // Represents an active data source for a tracing session.
   struct DataSourceInstance {
+    DataSourceInstance(const DataSourceInstance&) = delete;
+    DataSourceInstance& operator=(const DataSourceInstance&) = delete;
+    DataSourceInstance(DataSourceInstance&&) noexcept = default;
+
     DataSourceInstanceID instance_id;
+    DataSourceConfig config;
     std::string data_source_name;
     bool will_notify_on_stop;
   };
@@ -215,7 +223,12 @@
   // Holds the state of a tracing session. A tracing session is uniquely bound
   // a specific Consumer. Each Consumer can own one or more sessions.
   struct TracingSession {
-    enum State { DISABLED = 0, ENABLED, DISABLING_WAITING_STOP_ACKS };
+    enum State {
+      DISABLED = 0,
+      CONFIGURED,
+      STARTED,
+      DISABLING_WAITING_STOP_ACKS
+    };
 
     TracingSession(TracingSessionID, ConsumerEndpointImpl*, const TraceConfig&);
 
@@ -276,10 +289,10 @@
   TracingServiceImpl(const TracingServiceImpl&) = delete;
   TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
 
-  void StartDataSource(const TraceConfig::DataSource&,
-                       const TraceConfig::ProducerConfig&,
-                       const RegisteredDataSource&,
-                       TracingSession*);
+  DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
+                                      const TraceConfig::ProducerConfig&,
+                                      const RegisteredDataSource&,
+                                      TracingSession*);
 
   // Returns the next available ProducerID that is not in |producers_|.
   ProducerID GetNextProducerID();
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index 05d67fa..cb54e43 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -87,6 +87,22 @@
   consumer_port_.EnableTracing(req, std::move(async_response), *fd);
 }
 
+void ConsumerIPCClientImpl::StartTracing() {
+  if (!connected_) {
+    PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
+    return;
+  }
+
+  ipc::Deferred<protos::StartTracingResponse> async_response;
+  async_response.Bind(
+      [](ipc::AsyncResult<protos::StartTracingResponse> response) {
+        if (!response)
+          PERFETTO_DLOG("StartTracing() failed");
+      });
+  protos::StartTracingRequest req;
+  consumer_port_.StartTracing(req, std::move(async_response));
+}
+
 void ConsumerIPCClientImpl::DisableTracing() {
   if (!connected_) {
     PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index 4257a54..5ed9516 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -60,6 +60,7 @@
   // These methods are invoked by the actual Consumer(s) code by clients of the
   // tracing library, which know nothing about the IPC transport.
   void EnableTracing(const TraceConfig&, base::ScopedFile) override;
+  void StartTracing() override;
   void DisableTracing() override;
   void ReadBuffers() override;
   void FreeBuffers() override;
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.cc b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
index 2abdcf7..1d2f907 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.cc
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
@@ -95,6 +95,7 @@
   PERFETTO_DLOG("Tracing service connection failure");
   connected_ = false;
   producer_->OnDisconnect();
+  data_sources_setup_.clear();
 }
 
 void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
@@ -109,11 +110,29 @@
 void ProducerIPCClientImpl::OnServiceRequest(
     const protos::GetAsyncCommandResponse& cmd) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
+
+  // This message is sent only when connecting to a service running Android Q+.
+  // See comment below in kStartDataSource.
+  if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupDataSource) {
+    const auto& req = cmd.setup_data_source();
+    const DataSourceInstanceID dsid = req.new_instance_id();
+    DataSourceConfig cfg;
+    cfg.FromProto(req.config());
+    data_sources_setup_.insert(dsid);
+    producer_->SetupDataSource(dsid, cfg);
+    return;
+  }
+
   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStartDataSource) {
     const auto& req = cmd.start_data_source();
     const DataSourceInstanceID dsid = req.new_instance_id();
     DataSourceConfig cfg;
     cfg.FromProto(req.config());
+    if (!data_sources_setup_.count(dsid)) {
+      // When connecting with an older (Android P) service, the service will not
+      // send a SetupDataSource message. We synthesize it here in that case.
+      producer_->SetupDataSource(dsid, cfg);
+    }
     producer_->StartDataSource(dsid, cfg);
     return;
   }
@@ -121,6 +140,7 @@
   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStopDataSource) {
     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
     producer_->StopDataSource(dsid);
+    data_sources_setup_.erase(dsid);
     return;
   }
 
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.h b/src/tracing/ipc/producer/producer_ipc_client_impl.h
index 743d632..ba749ad 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.h
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.h
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 
+#include <set>
 #include <vector>
 
 #include "perfetto/base/thread_checker.h"
@@ -99,6 +100,7 @@
   std::unique_ptr<PosixSharedMemory> shared_memory_;
   std::unique_ptr<SharedMemoryArbiter> shared_memory_arbiter_;
   size_t shared_buffer_page_size_kb_ = 0;
+  std::set<DataSourceInstanceID> data_sources_setup_;
   bool connected_ = false;
   std::string const name_;
   PERFETTO_THREAD_CHECKER(thread_checker_)
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index d378c67..cb18004 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -71,6 +71,14 @@
 }
 
 // Called by the IPC layer.
+void ConsumerIPCService::StartTracing(const protos::StartTracingRequest&,
+                                      DeferredStartTracingResponse resp) {
+  RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
+  remote_consumer->service_endpoint->StartTracing();
+  resp.Resolve(ipc::AsyncResult<protos::StartTracingResponse>::Create());
+}
+
+// Called by the IPC layer.
 void ConsumerIPCService::DisableTracing(const protos::DisableTracingRequest&,
                                         DeferredDisableTracingResponse resp) {
   GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index 3fdc5d6..acf7d2f 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -46,6 +46,8 @@
   // ConsumerPort implementation (from .proto IPC definition).
   void EnableTracing(const protos::EnableTracingRequest&,
                      DeferredEnableTracingResponse) override;
+  void StartTracing(const protos::StartTracingRequest&,
+                    DeferredStartTracingResponse) override;
   void DisableTracing(const protos::DisableTracingRequest&,
                       DeferredDisableTracingResponse) override;
   void ReadBuffers(const protos::ReadBuffersRequest&,
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index e2c2ab9..95dcf67 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -211,6 +211,24 @@
 // |service_endpoint| (in the RemoteProducer dtor).
 void ProducerIPCService::RemoteProducer::OnDisconnect() {}
 
+// Invoked by the |core_service_| business logic when it wants to create a new
+// data source.
+void ProducerIPCService::RemoteProducer::SetupDataSource(
+    DataSourceInstanceID dsid,
+    const DataSourceConfig& cfg) {
+  if (!async_producer_commands.IsBound()) {
+    PERFETTO_DLOG(
+        "The Service tried to create a new data source but the remote Producer "
+        "has not yet initialized the connection");
+    return;
+  }
+  auto cmd = ipc::AsyncResult<protos::GetAsyncCommandResponse>::Create();
+  cmd.set_has_more(true);
+  cmd->mutable_setup_data_source()->set_new_instance_id(dsid);
+  cfg.ToProto(cmd->mutable_setup_data_source()->mutable_config());
+  async_producer_commands.Resolve(std::move(cmd));
+}
+
 // Invoked by the |core_service_| business logic when it wants to start a new
 // data source.
 void ProducerIPCService::RemoteProducer::StartDataSource(
diff --git a/src/tracing/ipc/service/producer_ipc_service.h b/src/tracing/ipc/service/producer_ipc_service.h
index 8446804..caa1978 100644
--- a/src/tracing/ipc/service/producer_ipc_service.h
+++ b/src/tracing/ipc/service/producer_ipc_service.h
@@ -71,6 +71,8 @@
     // no connection here, these methods are posted straight away.
     void OnConnect() override;
     void OnDisconnect() override;
+    void SetupDataSource(DataSourceInstanceID,
+                         const DataSourceConfig&) override;
     void StartDataSource(DataSourceInstanceID,
                          const DataSourceConfig&) override;
     void StopDataSource(DataSourceInstanceID) override;
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
index 31c7131..ff228c9 100644
--- a/src/tracing/test/mock_consumer.cc
+++ b/src/tracing/test/mock_consumer.cc
@@ -52,6 +52,10 @@
   service_endpoint_->EnableTracing(trace_config, std::move(write_into_file));
 }
 
+void MockConsumer::StartTracing() {
+  service_endpoint_->StartTracing();
+}
+
 void MockConsumer::DisableTracing() {
   service_endpoint_->DisableTracing();
 }
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
index 917aff8..bc1c6d4 100644
--- a/src/tracing/test/mock_consumer.h
+++ b/src/tracing/test/mock_consumer.h
@@ -48,6 +48,7 @@
 
   void Connect(TracingService* svc);
   void EnableTracing(const TraceConfig&, base::ScopedFile = base::ScopedFile());
+  void StartTracing();
   void DisableTracing();
   void FreeBuffers();
   void WaitForTracingDisabled(uint32_t timeout_ms = 3000);
diff --git a/src/tracing/test/mock_producer.cc b/src/tracing/test/mock_producer.cc
index 8c2f9c5..1294c1b 100644
--- a/src/tracing/test/mock_producer.cc
+++ b/src/tracing/test/mock_producer.cc
@@ -76,12 +76,12 @@
   task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
-void MockProducer::WaitForDataSourceStart(const std::string& name) {
+void MockProducer::WaitForDataSourceSetup(const std::string& name) {
   static int i = 0;
-  auto checkpoint_name = "on_ds_start_" + name + "_" + std::to_string(i++);
+  auto checkpoint_name = "on_ds_setup_" + name + "_" + std::to_string(i++);
   auto on_ds_start = task_runner_->CreateCheckpoint(checkpoint_name);
   EXPECT_CALL(*this,
-              StartDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
+              SetupDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
       .WillOnce(Invoke([on_ds_start, this](DataSourceInstanceID ds_id,
                                            const DataSourceConfig& cfg) {
         EXPECT_FALSE(data_source_instances_.count(cfg.name()));
@@ -95,6 +95,28 @@
   task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
+void MockProducer::WaitForDataSourceStart(const std::string& name) {
+  static int i = 0;
+  auto checkpoint_name = "on_ds_start_" + name + "_" + std::to_string(i++);
+  auto on_ds_start = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this,
+              StartDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
+      .WillOnce(Invoke([on_ds_start, this](DataSourceInstanceID ds_id,
+                                           const DataSourceConfig& cfg) {
+        // The data source might have been seen already through
+        // WaitForDataSourceSetup().
+        if (data_source_instances_.count(cfg.name()) == 0) {
+          auto target_buffer = static_cast<BufferID>(cfg.target_buffer());
+          auto session_id =
+              static_cast<TracingSessionID>(cfg.tracing_session_id());
+          data_source_instances_.emplace(
+              cfg.name(), EnabledDataSource{ds_id, target_buffer, session_id});
+        }
+        on_ds_start();
+      }));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
 void MockProducer::WaitForDataSourceStop(const std::string& name) {
   static int i = 0;
   auto checkpoint_name = "on_ds_stop_" + name + "_" + std::to_string(i++);
diff --git a/src/tracing/test/mock_producer.h b/src/tracing/test/mock_producer.h
index 590a64e..be13084 100644
--- a/src/tracing/test/mock_producer.h
+++ b/src/tracing/test/mock_producer.h
@@ -50,6 +50,7 @@
   void RegisterDataSource(const std::string& name, bool ack_stop = false);
   void UnregisterDataSource(const std::string& name);
   void WaitForTracingSetup();
+  void WaitForDataSourceSetup(const std::string& name);
   void WaitForDataSourceStart(const std::string& name);
   void WaitForDataSourceStop(const std::string& name);
   DataSourceInstanceID GetDataSourceInstanceId(const std::string& name);
@@ -68,6 +69,8 @@
   // Producer implementation.
   MOCK_METHOD0(OnConnect, void());
   MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD2(SetupDataSource,
+               void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD2(StartDataSource,
                void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index 26aa5cd..8060ab7 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -56,6 +56,8 @@
   // Producer implementation.
   MOCK_METHOD0(OnConnect, void());
   MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD2(SetupDataSource,
+               void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD2(StartDataSource,
                void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
@@ -187,10 +189,31 @@
   auto on_create_ds_instance =
       task_runner_->CreateCheckpoint("on_create_ds_instance");
   EXPECT_CALL(producer_, OnTracingSetup());
+
+  // Store the arguments passed to SetupDataSource() and later check that they
+  // match the ones passed to StartDataSource().
+  DataSourceInstanceID setup_id;
+  perfetto::protos::DataSourceConfig setup_cfg_proto;
+  EXPECT_CALL(producer_, SetupDataSource(_, _))
+      .WillOnce(
+          Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
+                                               const DataSourceConfig& cfg) {
+
+            setup_id = id;
+            cfg.ToProto(&setup_cfg_proto);
+          }));
   EXPECT_CALL(producer_, StartDataSource(_, _))
       .WillOnce(
-          Invoke([on_create_ds_instance, &ds_iid, &global_buf_id](
-                     DataSourceInstanceID id, const DataSourceConfig& cfg) {
+          Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
+                  &setup_cfg_proto](DataSourceInstanceID id,
+                                    const DataSourceConfig& cfg) {
+            // id and config should match the ones passed to SetupDataSource.
+            ASSERT_EQ(id, setup_id);
+            perfetto::protos::DataSourceConfig cfg_proto;
+            cfg.ToProto(&cfg_proto);
+            ASSERT_EQ(cfg_proto.SerializeAsString(),
+                      setup_cfg_proto.SerializeAsString());
+
             ASSERT_NE(0u, id);
             ds_iid = id;
             ASSERT_EQ("perfetto.test", cfg.name());
@@ -309,6 +332,7 @@
   auto on_create_ds_instance =
       task_runner_->CreateCheckpoint("on_create_ds_instance");
   EXPECT_CALL(producer_, OnTracingSetup());
+  EXPECT_CALL(producer_, SetupDataSource(_, _));
   EXPECT_CALL(producer_, StartDataSource(_, _))
       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
                            DataSourceInstanceID, const DataSourceConfig& cfg) {
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 7bd70b3..3c32409 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -76,6 +76,9 @@
 
   void OnDisconnect() override {}
 
+  void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override {
+  }
+
   void StartDataSource(DataSourceInstanceID,
                        const DataSourceConfig& source_config) override {
     auto trace_writer = endpoint_->CreateTraceWriter(
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index ae8ea07..20c4049 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -57,6 +57,9 @@
   FAIL() << "Producer unexpectedly disconnected from the service";
 }
 
+void FakeProducer::SetupDataSource(DataSourceInstanceID,
+                                   const DataSourceConfig&) {}
+
 void FakeProducer::StartDataSource(DataSourceInstanceID,
                                    const DataSourceConfig& source_config) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
diff --git a/test/fake_producer.h b/test/fake_producer.h
index cc55640..baacb9f 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -46,6 +46,8 @@
   // Producer implementation.
   void OnConnect() override;
   void OnDisconnect() override;
+  void SetupDataSource(DataSourceInstanceID,
+                       const DataSourceConfig& source_config) override;
   void StartDataSource(DataSourceInstanceID,
                        const DataSourceConfig& source_config) override;
   void StopDataSource(DataSourceInstanceID) override;