Merge "Add Flush support to Producer and expose API to Consumer"
diff --git a/include/perfetto/protozero/message_handle.h b/include/perfetto/protozero/message_handle.h
index 02bbaf9..5fbe05b 100644
--- a/include/perfetto/protozero/message_handle.h
+++ b/include/perfetto/protozero/message_handle.h
@@ -44,21 +44,22 @@
   // Move-only type.
   MessageHandleBase(MessageHandleBase&&) noexcept;
   MessageHandleBase& operator=(MessageHandleBase&&);
-
- protected:
-  explicit MessageHandleBase(Message* = nullptr);
-  Message& operator*() const {
+  explicit operator bool() const {
 #if PERFETTO_DCHECK_IS_ON()
     PERFETTO_DCHECK(!message_ || generation_ == message_->generation_);
 #endif
-    return *message_;
+    return !!message_;
   }
+
+ protected:
+  explicit MessageHandleBase(Message* = nullptr);
   Message* operator->() const {
 #if PERFETTO_DCHECK_IS_ON()
     PERFETTO_DCHECK(!message_ || generation_ == message_->generation_);
 #endif
     return message_;
   }
+  Message& operator*() const { return *(operator->()); }
 
  private:
   friend class Message;
diff --git a/include/perfetto/tracing/core/basic_types.h b/include/perfetto/tracing/core/basic_types.h
index 112f58a..a1ca0b0 100644
--- a/include/perfetto/tracing/core/basic_types.h
+++ b/include/perfetto/tracing/core/basic_types.h
@@ -34,6 +34,9 @@
 // Unique within the scope of a Producer.
 using WriterID = uint16_t;
 
+// Unique within the scope of the tracing service.
+using FlushRequestID = uint64_t;
+
 // We need one FD per producer and we are not going to be able to keep > 64k FDs
 // open in the service.
 static constexpr ProducerID kMaxProducerID = static_cast<ProducerID>(-1);
diff --git a/include/perfetto/tracing/core/commit_data_request.h b/include/perfetto/tracing/core/commit_data_request.h
index d2e30cd..3f0042f 100644
--- a/include/perfetto/tracing/core/commit_data_request.h
+++ b/include/perfetto/tracing/core/commit_data_request.h
@@ -191,9 +191,13 @@
     return &chunks_to_patch_.back();
   }
 
+  uint64_t flush_request_id() const { return flush_request_id_; }
+  void set_flush_request_id(uint64_t value) { flush_request_id_ = value; }
+
  private:
   std::vector<ChunksToMove> chunks_to_move_;
   std::vector<ChunkToPatch> chunks_to_patch_;
+  uint64_t flush_request_id_ = {};
 
   // Allows to preserve unknown protobuf fields for compatibility
   // with future versions of .proto files.
diff --git a/include/perfetto/tracing/core/producer.h b/include/perfetto/tracing/core/producer.h
index 87bcb03..a8e1991 100644
--- a/include/perfetto/tracing/core/producer.h
+++ b/include/perfetto/tracing/core/producer.h
@@ -79,6 +79,17 @@
   // Called by the Service after OnConnect but before the first DataSource is
   // created. Can be used for any setup required before tracing begins.
   virtual void OnTracingSetup() = 0;
+
+  // Called by the service to request the Producer to commit the data of the
+  // given data sources and return their chunks into the shared memory buffer.
+  // The Producer is expected to invoke NotifyFlushComplete(FlushRequestID) on
+  // the Service after the data has been committed. The producer has to either
+  // reply to the flush requests in order, or can just reply to the latest one
+  // Upon seeing a NotifyFlushComplete(N), the service will assume that all
+  // flushes < N have also been committed.
+  virtual void Flush(FlushRequestID,
+                     const DataSourceInstanceID* data_source_ids,
+                     size_t num_data_sources) = 0;
 };
 
 }  // namespace perfetto
diff --git a/include/perfetto/tracing/core/service.h b/include/perfetto/tracing/core/service.h
index 250308e..8513667 100644
--- a/include/perfetto/tracing/core/service.h
+++ b/include/perfetto/tracing/core/service.h
@@ -95,6 +95,10 @@
     // DataSourceConfig.target_buffer().
     virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
         BufferID target_buffer) = 0;
+
+    // Called in response to a Producer::Flush(request_id) call after all data
+    // for the flush request has been committed.
+    virtual void NotifyFlushComplete(FlushRequestID) = 0;
   };  // class ProducerEndpoint.
 
   // The API for the Consumer port of the Service.
@@ -113,6 +117,13 @@
                                base::ScopedFile = base::ScopedFile()) = 0;
     virtual void DisableTracing() = 0;
 
+    // Requests all data sources to flush their data immediately and invokes the
+    // passed callback once all of them have acked the flush (in which case
+    // the callback argument |success| will be true) or |timeout_ms| are elapsed
+    // (in which case |success| will be false).
+    using FlushCallback = std::function<void(bool /*success*/)>;
+    virtual void Flush(int timeout_ms, FlushCallback) = 0;
+
     // Tracing data will be delivered invoking Consumer::OnTraceData().
     virtual void ReadBuffers() = 0;
 
diff --git a/include/perfetto/tracing/core/shared_memory_arbiter.h b/include/perfetto/tracing/core/shared_memory_arbiter.h
index 2cd336c..27f7aec 100644
--- a/include/perfetto/tracing/core/shared_memory_arbiter.h
+++ b/include/perfetto/tracing/core/shared_memory_arbiter.h
@@ -50,6 +50,10 @@
   virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer) = 0;
 
+  // Notifies the service that all data for the given FlushRequestID has been
+  // committed in the shared memory buffer.
+  virtual void NotifyFlushComplete(FlushRequestID) = 0;
+
   // Implemented in src/core/shared_memory_arbiter_impl.cc .
   static std::unique_ptr<SharedMemoryArbiter> CreateInstance(
       SharedMemory*,
diff --git a/protos/perfetto/common/commit_data_request.proto b/protos/perfetto/common/commit_data_request.proto
index 4630528..59e29fb 100644
--- a/protos/perfetto/common/commit_data_request.proto
+++ b/protos/perfetto/common/commit_data_request.proto
@@ -70,4 +70,9 @@
     optional bool has_more_patches = 5;
   }
   repeated ChunkToPatch chunks_to_patch = 2;
+
+  // Optional. If this commit is made in response to a Flush(id) request coming
+  // from the service, copy back the id of the request so the service can tell
+  // when the flush happened.
+  optional uint64 flush_request_id = 3;
 }
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index abd3961..8c3e9b8 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -49,6 +49,15 @@
   // implicitly if the Consumer disconnects.
   rpc FreeBuffers(FreeBuffersRequest) returns (FreeBuffersResponse) {}
 
+  // Asks the service to request to all data sources involved in the tracing
+  // session to commit their data into the trace buffer. The FlushResponse is
+  // sent only:
+  // - After the data has been committed (in which case FlushResponse succeeds)
+  // or
+  // - After FlushRequest.timeout_ms milliseconds (in which case the
+  //   FlushResponse is rejected and fails).
+  rpc Flush(FlushRequest) returns (FlushResponse) {}
+
   // TODO rpc ListDataSources(), for the UI.
 }
 
@@ -102,3 +111,10 @@
 }
 
 message FreeBuffersResponse {}
+
+// Arguments for rpc Flush().
+message FlushRequest {
+  optional uint32 timeout_ms = 1;
+}
+
+message FlushResponse {}
diff --git a/protos/perfetto/ipc/producer_port.proto b/protos/perfetto/ipc/producer_port.proto
index 26a2670..1146128 100644
--- a/protos/perfetto/ipc/producer_port.proto
+++ b/protos/perfetto/ipc/producer_port.proto
@@ -113,10 +113,22 @@
   // buffer.
   message SetupTracing { optional uint64 shared_buffer_page_size_kb = 1; }
 
+  message Flush {
+    // The instance id (i.e. StartDataSource.new_instance_id) of the data
+    // sources to flush.
+    repeated uint64 data_source_ids = 1;
+
+    // A monotonic counter generated by the service. The producer is simply
+    // expected to copy this value back into the CommitDataRequest, so the
+    // service can tell when the data for this flush has been committed.
+    optional uint64 request_id = 2;
+  }
+
   oneof cmd {
     StartDataSource start_data_source = 1;
     StopDataSource stop_data_source = 2;
     SetupTracing setup_tracing = 3;
     // id == 4 was teardown_tracing, never implemented.
+    Flush flush = 5;
   }
 }
diff --git a/src/perfetto_cmd/perfetto_cmd.cc b/src/perfetto_cmd/perfetto_cmd.cc
index fd9dc50..4f7e514 100644
--- a/src/perfetto_cmd/perfetto_cmd.cc
+++ b/src/perfetto_cmd/perfetto_cmd.cc
@@ -260,7 +260,7 @@
   // Failsafe mechanism to avoid waiting indefinitely if the service hangs.
   if (trace_config_->duration_ms()) {
     task_runner_.PostDelayedTask(std::bind(&PerfettoCmd::OnTimeout, this),
-                                 trace_config_->duration_ms() * 2);
+                                 trace_config_->duration_ms() + 10000);
   }
 }
 
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index fdc8785..0a15b88 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -36,7 +36,6 @@
 constexpr uint64_t kScanIntervalMs = 10000;  // 10s
 constexpr uint64_t kScanDelayMs = 10000;     // 10s
 constexpr uint64_t kScanBatchSize = 15000;
-constexpr uint64_t kFlushBeforeEndMs = 500;
 
 uint64_t OrDefault(uint64_t value, uint64_t def) {
   if (value != 0)
@@ -127,24 +126,7 @@
       static_file_map_(static_file_map),
       cache_(cache),
       writer_(std::move(writer)),
-      weak_factory_(this) {
-  if (kFlushBeforeEndMs < source_config_.trace_duration_ms()) {
-    auto weak_this = GetWeakPtr();
-    // Flush TracePacket of current scan shortly before we expect the trace
-    // to end, to retain information from any scan that might be in
-    // progress.
-    task_runner_->PostDelayedTask(
-        [weak_this] {
-          if (!weak_this) {
-            PERFETTO_DLOG("Giving up flush.");
-            return;
-          }
-          PERFETTO_DLOG("Flushing.");
-          weak_this->ResetTracePacket();
-        },
-        source_config_.trace_duration_ms() - kFlushBeforeEndMs);
-  }
-}
+      weak_factory_(this) {}
 
 void InodeFileDataSource::AddInodesFromStaticMap(
     BlockDeviceID block_device_id,
@@ -192,6 +174,11 @@
     PERFETTO_DLOG("%" PRIu64 " inodes found in cache", cache_found_count);
 }
 
+void InodeFileDataSource::Flush() {
+  ResetTracePacket();
+  writer_->Flush();
+}
+
 void InodeFileDataSource::OnInodes(
     const std::vector<std::pair<Inode, BlockDeviceID>>& inodes) {
   if (mount_points_.empty()) {
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index a2be1f8..6dfa415 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -75,6 +75,8 @@
   void AddInodesFromLRUCache(BlockDeviceID block_device_id,
                              std::set<Inode>* inode_numbers);
 
+  void Flush();
+
   virtual ~InodeFileDataSource() {}
 
   virtual void FillInodeEntry(InodeFileMap* destination,
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index abdbf07..d37f41d 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -251,6 +251,30 @@
 
 void ProbesProducer::OnTracingSetup() {}
 
+void ProbesProducer::Flush(FlushRequestID flush_request_id,
+                           const DataSourceInstanceID* data_source_ids,
+                           size_t num_data_sources) {
+  for (size_t i = 0; i < num_data_sources; i++) {
+    DataSourceInstanceID ds_id = data_source_ids[i];
+    {
+      auto it = process_stats_sources_.find(ds_id);
+      if (it != process_stats_sources_.end())
+        it->second->Flush();
+    }
+    {
+      auto it = file_map_sources_.find(ds_id);
+      if (it != file_map_sources_.end())
+        it->second->Flush();
+    }
+    {
+      auto it = delegates_.find(ds_id);
+      if (it != delegates_.end())
+        it->second->Flush();
+    }
+  }
+  endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
 void ProbesProducer::ConnectWithRetries(const char* socket_name,
                                         base::TaskRunner* task_runner) {
   PERFETTO_DCHECK(state_ == kNotStarted);
@@ -289,6 +313,15 @@
 
 ProbesProducer::SinkDelegate::~SinkDelegate() = default;
 
+void ProbesProducer::SinkDelegate::Flush() {
+  // TODO(primiano): this still doesn't flush data from the kernel ftrace
+  // buffers (see b/73886018). We should do that and delay the
+  // NotifyFlushComplete() until the ftrace data has been drained from the
+  // kernel ftrace buffer and written in the SMB.
+  if (writer_ && (!trace_packet_ || trace_packet_->is_finalized()))
+    writer_->Flush();
+}
+
 ProbesProducer::FtraceBundleHandle
 ProbesProducer::SinkDelegate::GetBundleForCpu(size_t) {
   trace_packet_ = writer_->NewTracePacket();
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 72fede0..186fc23 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -48,6 +48,9 @@
                                 const DataSourceConfig&) override;
   void TearDownDataSourceInstance(DataSourceInstanceID) override;
   void OnTracingSetup() override;
+  void Flush(FlushRequestID,
+             const DataSourceInstanceID* data_source_ids,
+             size_t num_data_sources) override;
 
   // Our Impl
   void ConnectWithRetries(const char* socket_name,
@@ -77,6 +80,8 @@
 
     TracingSessionID session_id() const { return session_id_; }
 
+    void Flush();
+
     // FtraceDelegateImpl
     FtraceBundleHandle GetBundleForCpu(size_t cpu) override;
     void OnBundleComplete(size_t cpu,
@@ -111,6 +116,7 @@
     // Keep this after the TraceWriter because TracePackets must not outlive
     // their originating writer.
     TraceWriter::TracePacketHandle trace_packet_;
+
     // Keep this last.
     base::WeakPtrFactory<SinkDelegate> weak_factory_;
   };
diff --git a/src/traced/probes/process_stats_data_source.cc b/src/traced/probes/process_stats_data_source.cc
index 7afe6ab..7c9aa9f 100644
--- a/src/traced/probes/process_stats_data_source.cc
+++ b/src/traced/probes/process_stats_data_source.cc
@@ -56,11 +56,6 @@
     WriteProcess(pid, process_tree);
     seen_pids->insert(pid);
   });
-
-  trace_packet->Finalize();
-
-  // TODO(hjd): Remove this once the service flushes the producers on teardown.
-  writer_->Flush();
 }
 
 void ProcessStatsDataSource::OnPids(const std::vector<int32_t>& pids) {
@@ -78,6 +73,10 @@
   }
 }
 
+void ProcessStatsDataSource::Flush() {
+  writer_->Flush();
+}
+
 // static
 void ProcessStatsDataSource::WriteProcess(int32_t pid,
                                           protos::pbzero::ProcessTree* tree) {
diff --git a/src/traced/probes/process_stats_data_source.h b/src/traced/probes/process_stats_data_source.h
index 5054af1..7a64269 100644
--- a/src/traced/probes/process_stats_data_source.h
+++ b/src/traced/probes/process_stats_data_source.h
@@ -42,6 +42,7 @@
   base::WeakPtr<ProcessStatsDataSource> GetWeakPtr() const;
   void WriteAllProcesses();
   void OnPids(const std::vector<int32_t>& pids);
+  void Flush();
 
  private:
   static void WriteProcess(int32_t pid, protos::pbzero::ProcessTree*);
diff --git a/src/tracing/core/commit_data_request.cc b/src/tracing/core/commit_data_request.cc
index 9468465..1ee0fbe 100644
--- a/src/tracing/core/commit_data_request.cc
+++ b/src/tracing/core/commit_data_request.cc
@@ -52,6 +52,11 @@
     chunks_to_patch_.emplace_back();
     chunks_to_patch_.back().FromProto(field);
   }
+
+  static_assert(sizeof(flush_request_id_) == sizeof(proto.flush_request_id()),
+                "size mismatch");
+  flush_request_id_ =
+      static_cast<decltype(flush_request_id_)>(proto.flush_request_id());
   unknown_fields_ = proto.unknown_fields();
 }
 
@@ -68,6 +73,11 @@
     auto* entry = proto->add_chunks_to_patch();
     it.ToProto(entry);
   }
+
+  static_assert(sizeof(flush_request_id_) == sizeof(proto->flush_request_id()),
+                "size mismatch");
+  proto->set_flush_request_id(
+      static_cast<decltype(proto->flush_request_id())>(flush_request_id_));
   *(proto->mutable_unknown_fields()) = unknown_fields_;
 }
 
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index da63a07..def0f55 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -55,6 +55,7 @@
 constexpr base::TimeMillis kClockSnapshotInterval(10 * 1000);
 constexpr int kMinWriteIntoFilePeriodMs = 100;
 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
+constexpr int kFlushTimeoutMs = 1000;
 
 constexpr uint64_t kMillisPerHour = 3600000;
 
@@ -329,7 +330,7 @@
     task_runner_->PostDelayedTask(
         [weak_this, tsid] {
           if (weak_this)
-            weak_this->DisableTracing(tsid);
+            weak_this->FlushAndDisableTracing(tsid);
         },
         cfg.duration_ms());
   }
@@ -391,6 +392,101 @@
   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
 }
 
+void ServiceImpl::Flush(TracingSessionID tsid,
+                        int timeout_ms,
+                        ConsumerEndpoint::FlushCallback callback) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  TracingSession* tracing_session = GetTracingSession(tsid);
+  if (!tracing_session) {
+    PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
+    return;
+  }
+
+  if (tracing_session->pending_flushes.size() > 1000) {
+    PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
+                  tracing_session->pending_flushes.size());
+    callback(false);
+    return;
+  }
+
+  FlushRequestID flush_request_id = ++last_flush_request_id_;
+  PendingFlush& pending_flush =
+      tracing_session->pending_flushes
+          .emplace_hint(tracing_session->pending_flushes.end(),
+                        flush_request_id, PendingFlush(std::move(callback)))
+          ->second;
+
+  // Send a flush request to each producer involved in the tracing session. In
+  // order to issue a flush request we have to build a map of all data source
+  // instance ids enabled for each producer.
+  std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
+  for (const auto& data_source_inst : tracing_session->data_source_instances) {
+    const ProducerID producer_id = data_source_inst.first;
+    const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
+    flush_map[producer_id].push_back(ds_inst_id);
+  }
+
+  for (const auto& kv : flush_map) {
+    ProducerID producer_id = kv.first;
+    ProducerEndpointImpl* producer = GetProducer(producer_id);
+    const std::vector<DataSourceInstanceID>& data_sources = kv.second;
+    producer->Flush(flush_request_id, data_sources);
+    pending_flush.producers.insert(producer_id);
+  }
+
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, tsid, flush_request_id] {
+        if (weak_this)
+          weak_this->OnFlushTimeout(tsid, flush_request_id);
+      },
+      timeout_ms);
+}
+
+void ServiceImpl::NotifyFlushDoneForProducer(ProducerID producer_id,
+                                             FlushRequestID flush_request_id) {
+  for (auto& kv : tracing_sessions_) {
+    // Remove all pending flushes <= |flush_request_id| for |producer_id|.
+    auto& pending_flushes = kv.second.pending_flushes;
+    auto end_it = pending_flushes.upper_bound(flush_request_id);
+    for (auto it = pending_flushes.begin(); it != end_it;) {
+      PendingFlush& pending_flush = it->second;
+      pending_flush.producers.erase(producer_id);
+      if (pending_flush.producers.empty()) {
+        task_runner_->PostTask(
+            std::bind(std::move(pending_flush.callback), /*success=*/true));
+        it = pending_flushes.erase(it);
+      } else {
+        it++;
+      }
+    }  // for (pending_flushes)
+  }    // for (tracing_session)
+}
+
+void ServiceImpl::OnFlushTimeout(TracingSessionID tsid,
+                                 FlushRequestID flush_request_id) {
+  TracingSession* tracing_session = GetTracingSession(tsid);
+  if (!tracing_session)
+    return;
+  auto it = tracing_session->pending_flushes.find(flush_request_id);
+  if (it == tracing_session->pending_flushes.end())
+    return;  // Nominal case: flush was completed and acked on time.
+  auto callback = std::move(it->second.callback);
+  tracing_session->pending_flushes.erase(it);
+  callback(/*success=*/false);
+}
+
+void ServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  Flush(tsid, kFlushTimeoutMs, [weak_this, tsid](bool success) {
+    PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
+                  success, tsid);
+    if (weak_this)
+      weak_this->DisableTracing(tsid);
+  });
+}
+
 // Note: when this is called to write into a file passed when starting tracing
 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
 // to send the trace data back over IPC).
@@ -1036,6 +1132,16 @@
   tracing_session_id_ = 0;
 }
 
+void ServiceImpl::ConsumerEndpointImpl::Flush(int timeout_ms,
+                                              FlushCallback callback) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  if (!tracing_session_id_) {
+    PERFETTO_LOG("Consumer called Flush() but tracing was not active");
+    return;
+  }
+  service_->Flush(tracing_session_id_, timeout_ms, callback);
+}
+
 base::WeakPtr<ServiceImpl::ConsumerEndpointImpl>
 ServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
@@ -1130,6 +1236,10 @@
 
   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
 
+  if (req_untrusted.flush_request_id()) {
+    service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
+  }
+
   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
   // callback being invoked within the same callstack and not posted. If this
   // changes, the code there needs to be changed accordingly.
@@ -1160,6 +1270,7 @@
   // TODO(primiano): When we'll support tearing down the SMB, at this point we
   // should send the Producer a TearDownTracing if all its data sources have
   // been disabled (see b/77532839 and aosp/655179 PS1).
+  PERFETTO_DCHECK_THREAD(thread_checker_);
   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   task_runner_->PostTask([weak_this, ds_inst_id] {
     if (weak_this)
@@ -1192,9 +1303,23 @@
   });
 }
 
+void ServiceImpl::ProducerEndpointImpl::Flush(
+    FlushRequestID flush_request_id,
+    const std::vector<DataSourceInstanceID>& data_sources) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
+    if (weak_this) {
+      weak_this->producer_->Flush(flush_request_id, data_sources.data(),
+                                  data_sources.size());
+    }
+  });
+}
+
 void ServiceImpl::ProducerEndpointImpl::CreateDataSourceInstance(
     DataSourceInstanceID ds_id,
     const DataSourceConfig& config) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   task_runner_->PostTask([weak_this, ds_id, config] {
     if (weak_this)
@@ -1202,6 +1327,11 @@
   });
 }
 
+void ServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(FlushRequestID id) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  return GetOrCreateShmemArbiter()->NotifyFlushComplete(id);
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ServiceImpl::TracingSession implementation
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index 8aa0981..65a9ca4 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -75,8 +75,10 @@
 
     std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
     void OnTracingSetup();
+    void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
     void CreateDataSourceInstance(DataSourceInstanceID,
                                   const DataSourceConfig&);
+    void NotifyFlushComplete(FlushRequestID) override;
     void TearDownDataSource(DataSourceInstanceID);
     SharedMemory* shared_memory() const override;
     size_t shared_buffer_page_size_kb() const override;
@@ -119,6 +121,7 @@
     void DisableTracing() override;
     void ReadBuffers() override;
     void FreeBuffers() override;
+    void Flush(int timeout_ms, FlushCallback) override;
 
    private:
     friend class ServiceImpl;
@@ -152,6 +155,7 @@
                                      size_t size);
   void ApplyChunkPatches(ProducerID,
                          const std::vector<CommitDataRequest::ChunkToPatch>&);
+  void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
 
   // Called by ConsumerEndpointImpl.
   void DisconnectConsumer(ConsumerEndpointImpl*);
@@ -159,6 +163,10 @@
                      const TraceConfig&,
                      base::ScopedFile);
   void DisableTracing(TracingSessionID);
+  void Flush(TracingSessionID tsid,
+             int timeout_ms,
+             ConsumerEndpoint::FlushCallback);
+  void FlushAndDisableTracing(TracingSessionID);
   void ReadBuffers(TracingSessionID, ConsumerEndpointImpl*);
   void FreeBuffers(TracingSessionID);
 
@@ -190,6 +198,12 @@
     std::string data_source_name;
   };
 
+  struct PendingFlush {
+    std::set<ProducerID> producers;
+    ConsumerEndpoint::FlushCallback callback;
+    explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
+  };
+
   // Holds the state of a tracing session. A tracing session is uniquely bound
   // a specific Consumer. Each Consumer can own one or more sessions.
   struct TracingSession {
@@ -214,6 +228,10 @@
     // producers for this tracing session.
     std::multimap<ProducerID, DataSourceInstance> data_source_instances;
 
+    // For each Flush(N) request, keeps track of the set of producers for which
+    // we are still awaiting a NotifyFlushComplete(N) ack.
+    std::map<FlushRequestID, PendingFlush> pending_flushes;
+
     // Maps a per-trace-session buffer index into the corresponding global
     // BufferID (shared namespace amongst all consumers). This vector has as
     // many entries as |config.buffers_size()|.
@@ -258,7 +276,7 @@
 
   void MaybeSnapshotClocks(TracingSession*, std::vector<TracePacket>*);
   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
-
+  void OnFlushTimeout(TracingSessionID, FlushRequestID);
   TraceBuffer* GetBufferByID(BufferID);
 
   base::TaskRunner* const task_runner_;
@@ -266,17 +284,14 @@
   ProducerID last_producer_id_ = 0;
   DataSourceInstanceID last_data_source_instance_id_ = 0;
   TracingSessionID last_tracing_session_id_ = 0;
+  FlushRequestID last_flush_request_id_ = 0;
 
   // Buffer IDs are global across all consumers (because a Producer can produce
   // data for more than one trace session, hence more than one consumer).
   IdAllocator<BufferID> buffer_ids_;
 
   std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
-
-  // TODO(primiano): There doesn't seem to be any good reason why |producers_|
-  // is a map indexed by ID and not just a set<ProducerEndpointImpl*>.
   std::map<ProducerID, ProducerEndpointImpl*> producers_;
-
   std::set<ConsumerEndpointImpl*> consumers_;
   std::map<TracingSessionID, TracingSession> tracing_sessions_;
   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index d166192..61c0aa6 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -84,6 +84,13 @@
     return svc->GetProducer(producer_id)->uid_;
   }
 
+  size_t GetNumPendingFlushes() {
+    ServiceImpl::TracingSession* tracing_session =
+        svc->GetTracingSession(svc->last_tracing_session_id_);
+    EXPECT_NE(nullptr, tracing_session);
+    return tracing_session->pending_flushes.size();
+  }
+
   base::TestTaskRunner task_runner;
   std::unique_ptr<ServiceImpl> svc;
 };
@@ -384,4 +391,140 @@
   ASSERT_THAT(actual_shm_sizes_kb, ElementsAreArray(kExpectedSizesKb));
 }
 
+TEST_F(ServiceImplTest, ExplicitFlush) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  std::unique_ptr<TraceWriter> writer =
+      producer->CreateTraceWriter("data_source");
+  {
+    auto tp = writer->NewTracePacket();
+    tp->set_for_testing()->set_str("payload");
+  }
+
+  auto flush_request = consumer->Flush();
+  producer->WaitForFlush(writer.get());
+  ASSERT_TRUE(flush_request.WaitForReply());
+
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+  EXPECT_THAT(
+      consumer->ReadBuffers(),
+      Contains(Property(&protos::TracePacket::for_testing,
+                        Property(&protos::TestEvent::str, Eq("payload")))));
+}
+
+TEST_F(ServiceImplTest, ImplicitFlushOnTimedTraces) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+  trace_config.set_duration_ms(1);
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  std::unique_ptr<TraceWriter> writer =
+      producer->CreateTraceWriter("data_source");
+  {
+    auto tp = writer->NewTracePacket();
+    tp->set_for_testing()->set_str("payload");
+  }
+
+  producer->WaitForFlush(writer.get());
+
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+
+  EXPECT_THAT(
+      consumer->ReadBuffers(),
+      Contains(Property(&protos::TracePacket::for_testing,
+                        Property(&protos::TestEvent::str, Eq("payload")))));
+}
+
+// Tests the monotonic semantic of flush request IDs, i.e., once a producer
+// acks flush request N, all flush requests <= N are considered successful and
+// acked to the consumer.
+TEST_F(ServiceImplTest, BatchFlushes) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  std::unique_ptr<TraceWriter> writer =
+      producer->CreateTraceWriter("data_source");
+  {
+    auto tp = writer->NewTracePacket();
+    tp->set_for_testing()->set_str("payload");
+  }
+
+  auto flush_req_1 = consumer->Flush();
+  auto flush_req_2 = consumer->Flush();
+  auto flush_req_3 = consumer->Flush();
+
+  // We'll deliberately let the 4th flush request timeout. Use a lower timeout
+  // to keep test time short.
+  auto flush_req_4 = consumer->Flush(/*timeout_ms=*/10);
+  ASSERT_EQ(4u, GetNumPendingFlushes());
+
+  // Make the producer reply only to the 3rd flush request.
+  testing::InSequence seq;
+  producer->WaitForFlush(nullptr);       // Will NOT reply to flush id == 1.
+  producer->WaitForFlush(nullptr);       // Will NOT reply to flush id == 2.
+  producer->WaitForFlush(writer.get());  // Will reply only to flush id == 3.
+  producer->WaitForFlush(nullptr);       // Will NOT reply to flush id == 4.
+
+  // Even if the producer explicily replied only to flush ID == 3, all the
+  // previous flushed < 3 should be implicitly acked.
+  ASSERT_TRUE(flush_req_1.WaitForReply());
+  ASSERT_TRUE(flush_req_2.WaitForReply());
+  ASSERT_TRUE(flush_req_3.WaitForReply());
+
+  // At this point flush id == 4 should still be pending and should fail because
+  // of reaching its timeout.
+  ASSERT_EQ(1u, GetNumPendingFlushes());
+  ASSERT_FALSE(flush_req_4.WaitForReply());
+
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+  EXPECT_THAT(
+      consumer->ReadBuffers(),
+      Contains(Property(&protos::TracePacket::for_testing,
+                        Property(&protos::TestEvent::str, Eq("payload")))));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index b6c6fd9..f0960f8 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -246,6 +246,31 @@
       new TraceWriterImpl(this, id, target_buffer));
 }
 
+void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
+  bool should_post_commit_task = false;
+  {
+    std::lock_guard<std::mutex> scoped_lock(lock_);
+    // If a commit_data_req_ exists it means that somebody else already posted a
+    // FlushPendingCommitDataRequests() task.
+    if (!commit_data_req_) {
+      commit_data_req_.reset(new CommitDataRequest());
+      should_post_commit_task = true;
+    } else {
+      // If there is another request queued and that also contains is a reply
+      // to a flush request, reply with the highest id.
+      req_id = std::max(req_id, commit_data_req_->flush_request_id());
+    }
+    commit_data_req_->set_flush_request_id(req_id);
+  }
+  if (should_post_commit_task) {
+    auto weak_this = weak_ptr_factory_.GetWeakPtr();
+    task_runner_->PostTask([weak_this] {
+      if (weak_this)
+        weak_this->FlushPendingCommitDataRequests();
+    });
+  }
+}
+
 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
   std::lock_guard<std::mutex> scoped_lock(lock_);
   active_writer_ids_.Free(id);
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 691d6b8..beefef6 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -96,6 +96,8 @@
   std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer = 0) override;
 
+  void NotifyFlushComplete(FlushRequestID) override;
+
  private:
   friend class TraceWriterImpl;
 
diff --git a/src/tracing/core/shared_memory_arbiter_impl_unittest.cc b/src/tracing/core/shared_memory_arbiter_impl_unittest.cc
index ed96226..a7414ec 100644
--- a/src/tracing/core/shared_memory_arbiter_impl_unittest.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl_unittest.cc
@@ -37,6 +37,7 @@
  public:
   void RegisterDataSource(const DataSourceDescriptor&) override {}
   void UnregisterDataSource(const std::string&) override {}
+  void NotifyFlushComplete(FlushRequestID) override {}
   SharedMemory* shared_memory() const override { return nullptr; }
   size_t shared_buffer_page_size_kb() const override { return 0; }
   std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index ad9156a..7a1a8eb 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -35,6 +35,7 @@
   void RegisterDataSource(const DataSourceDescriptor&) override {}
   void UnregisterDataSource(const std::string&) override {}
   void CommitData(const CommitDataRequest&, CommitDataCallback) override {}
+  void NotifyFlushComplete(FlushRequestID) override {}
   SharedMemory* shared_memory() const override { return nullptr; }
   size_t shared_buffer_page_size_kb() const override { return 0; }
   std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index c3382b5..6da7b23 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -156,4 +156,20 @@
   consumer_port_.FreeBuffers(req, std::move(async_response));
 }
 
+void ConsumerIPCClientImpl::Flush(int timeout_ms, FlushCallback callback) {
+  if (!connected_) {
+    PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
+    return callback(/*success=*/false);
+  }
+
+  protos::FlushRequest req;
+  req.set_timeout_ms(timeout_ms);
+  ipc::Deferred<protos::FlushResponse> async_response;
+  async_response.Bind(
+      [callback](ipc::AsyncResult<protos::FlushResponse> response) {
+        callback(!!response);
+      });
+  consumer_port_.Flush(req, std::move(async_response));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index d94b658..8b771a0 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -63,6 +63,7 @@
   void DisableTracing() override;
   void ReadBuffers() override;
   void FreeBuffers() override;
+  void Flush(int timeout_ms, FlushCallback) override;
 
   // ipc::ServiceProxy::EventListener implementation.
   // These methods are invoked by the IPC layer, which knows nothing about
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.cc b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
index f6484b9..e43da50 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.cc
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
@@ -138,8 +138,22 @@
     return;
   }
 
+  if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kFlush) {
+    // This cast boilerplate is required only because protobuf uses its own
+    // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
+    // type (long vs long long) even though they have the same size.
+    const auto* data_source_ids = cmd.flush().data_source_ids().data();
+    static_assert(sizeof(data_source_ids[0]) == sizeof(FlushRequestID),
+                  "data_source_ids should be 64-bit");
+    producer_->Flush(cmd.flush().request_id(),
+                     reinterpret_cast<const FlushRequestID*>(data_source_ids),
+                     cmd.flush().data_source_ids().size());
+    return;
+  }
+
   PERFETTO_DLOG("Unknown async request %d received from tracing service",
                 cmd.cmd_case());
+  PERFETTO_DCHECK(false);
 }
 
 void ProducerIPCClientImpl::RegisterDataSource(
@@ -205,6 +219,10 @@
   return shared_memory_arbiter_->CreateTraceWriter(target_buffer);
 }
 
+void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
+  return shared_memory_arbiter_->NotifyFlushComplete(req_id);
+}
+
 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
   return shared_memory_.get();
 }
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.h b/src/tracing/ipc/producer/producer_ipc_client_impl.h
index 1f4ec68..01f35c1 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.h
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.h
@@ -65,6 +65,7 @@
   void CommitData(const CommitDataRequest&, CommitDataCallback) override;
   std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer) override;
+  void NotifyFlushComplete(FlushRequestID) override;
   SharedMemory* shared_memory() const override;
   size_t shared_buffer_page_size_kb() const override;
 
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index 3c599f4..6a291b2 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -93,6 +93,33 @@
   resp.Resolve(ipc::AsyncResult<protos::FreeBuffersResponse>::Create());
 }
 
+// Called by the IPC layer.
+void ConsumerIPCService::Flush(const protos::FlushRequest& req,
+                               DeferredFlushResponse resp) {
+  auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
+                                            std::move(resp));
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  auto callback = [weak_this, it](bool success) {
+    if (weak_this)
+      weak_this->OnFlushCallback(success, std::move(it));
+  };
+  GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(),
+                                                          std::move(callback));
+}
+
+// Called by the service in response to a service_endpoint->Flush() request.
+void ConsumerIPCService::OnFlushCallback(
+    bool success,
+    PendingFlushResponses::iterator pending_response_it) {
+  DeferredFlushResponse response(std::move(*pending_response_it));
+  pending_flush_responses_.erase(pending_response_it);
+  if (success) {
+    response.Resolve(ipc::AsyncResult<protos::FlushResponse>::Create());
+  } else {
+    response.Reject();
+  }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // RemoteConsumer methods
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index d3c02f7..d2b22ce 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -17,6 +17,7 @@
 #ifndef SRC_TRACING_IPC_SERVICE_CONSUMER_IPC_SERVICE_H_
 #define SRC_TRACING_IPC_SERVICE_CONSUMER_IPC_SERVICE_H_
 
+#include <list>
 #include <map>
 #include <memory>
 #include <string>
@@ -52,6 +53,7 @@
                    DeferredReadBuffersResponse) override;
   void FreeBuffers(const protos::FreeBuffersRequest&,
                    DeferredFreeBuffersResponse) override;
+  void Flush(const protos::FlushRequest&, DeferredFlushResponse) override;
   void OnClientDisconnected() override;
 
  private:
@@ -84,6 +86,9 @@
     DeferredEnableTracingResponse enable_tracing_response;
   };
 
+  // This has to be a container that doesn't invalidate iterators.
+  using PendingFlushResponses = std::list<DeferredFlushResponse>;
+
   ConsumerIPCService(const ConsumerIPCService&) = delete;
   ConsumerIPCService& operator=(const ConsumerIPCService&) = delete;
 
@@ -91,13 +96,17 @@
   // the current IPC request.
   RemoteConsumer* GetConsumerForCurrentRequest();
 
+  void OnFlushCallback(bool success, PendingFlushResponses::iterator);
+
   Service* const core_service_;
 
   // Maps IPC clients to ConsumerEndpoint instances registered on the
   // |core_service_| business logic.
   std::map<ipc::ClientID, std::unique_ptr<RemoteConsumer>> consumers_;
 
-  base::WeakPtrFactory<ConsumerIPCService> weak_ptr_factory_;
+  PendingFlushResponses pending_flush_responses_;
+
+  base::WeakPtrFactory<ConsumerIPCService> weak_ptr_factory_;  // Keep last.
 };
 
 }  // namespace perfetto
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index abb3506..79161ca 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -241,4 +241,22 @@
   async_producer_commands.Resolve(std::move(cmd));
 }
 
+void ProducerIPCService::RemoteProducer::Flush(
+    FlushRequestID flush_request_id,
+    const DataSourceInstanceID* data_source_ids,
+    size_t num_data_sources) {
+  if (!async_producer_commands.IsBound()) {
+    PERFETTO_DLOG(
+        "The Service tried to request a flush but the remote Producer has not "
+        "yet initialized the connection");
+    return;
+  }
+  auto cmd = ipc::AsyncResult<protos::GetAsyncCommandResponse>::Create();
+  cmd.set_has_more(true);
+  for (size_t i = 0; i < num_data_sources; i++)
+    cmd->mutable_flush()->add_data_source_ids(data_source_ids[i]);
+  cmd->mutable_flush()->set_request_id(flush_request_id);
+  async_producer_commands.Resolve(std::move(cmd));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/ipc/service/producer_ipc_service.h b/src/tracing/ipc/service/producer_ipc_service.h
index f0dacb9..ab795a4 100644
--- a/src/tracing/ipc/service/producer_ipc_service.h
+++ b/src/tracing/ipc/service/producer_ipc_service.h
@@ -73,6 +73,9 @@
                                   const DataSourceConfig&) override;
     void TearDownDataSourceInstance(DataSourceInstanceID) override;
     void OnTracingSetup() override;
+    void Flush(FlushRequestID,
+               const DataSourceInstanceID* data_source_ids,
+               size_t num_data_sources) override;
 
     // The interface obtained from the core service business logic through
     // Service::ConnectProducer(this). This allows to invoke methods for a
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
index 6ea267e..86b83f8 100644
--- a/src/tracing/test/mock_consumer.cc
+++ b/src/tracing/test/mock_consumer.cc
@@ -68,6 +68,26 @@
   task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
+MockConsumer::FlushRequest MockConsumer::Flush(int timeout_ms) {
+  static int i = 0;
+  auto checkpoint_name = "on_consumer_flush_" + std::to_string(i++);
+  auto on_flush = task_runner_->CreateCheckpoint(checkpoint_name);
+  std::shared_ptr<bool> result(new bool());
+  service_endpoint_->Flush(timeout_ms, [result, on_flush](bool success) {
+    *result = success;
+    on_flush();
+  });
+
+  base::TestTaskRunner* task_runner = task_runner_;
+  auto wait_for_flush_completion = [result, task_runner,
+                                    checkpoint_name]() -> bool {
+    task_runner->RunUntilCheckpoint(checkpoint_name);
+    return *result;
+  };
+
+  return FlushRequest(wait_for_flush_completion);
+}
+
 std::vector<protos::TracePacket> MockConsumer::ReadBuffers() {
   std::vector<protos::TracePacket> decoded_packets;
   static int i = 0;
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
index f62dde2..bab7009 100644
--- a/src/tracing/test/mock_consumer.h
+++ b/src/tracing/test/mock_consumer.h
@@ -34,6 +34,15 @@
 
 class MockConsumer : public Consumer {
  public:
+  class FlushRequest {
+   public:
+    FlushRequest(std::function<bool(void)> wait_func) : wait_func_(wait_func) {}
+    bool WaitForReply() { return wait_func_(); }
+
+   private:
+    std::function<bool(void)> wait_func_;
+  };
+
   explicit MockConsumer(base::TestTaskRunner*);
   ~MockConsumer() override;
 
@@ -42,6 +51,7 @@
   void DisableTracing();
   void FreeBuffers();
   void WaitForTracingDisabled();
+  FlushRequest Flush(int timeout_ms = 10000);
   std::vector<protos::TracePacket> ReadBuffers();
 
   Service::ConsumerEndpoint* endpoint() { return service_endpoint_.get(); }
diff --git a/src/tracing/test/mock_producer.cc b/src/tracing/test/mock_producer.cc
index 0b40990..78bca68 100644
--- a/src/tracing/test/mock_producer.cc
+++ b/src/tracing/test/mock_producer.cc
@@ -110,4 +110,16 @@
   return service_endpoint_->CreateTraceWriter(buf_id);
 }
 
+void MockProducer::WaitForFlush(TraceWriter* writer_to_flush) {
+  auto& expected_call = EXPECT_CALL(*this, Flush(_, _, _));
+  if (!writer_to_flush)
+    return;
+  expected_call.WillOnce(
+      Invoke([this, writer_to_flush](FlushRequestID flush_req_id,
+                                     const DataSourceInstanceID*, size_t) {
+        writer_to_flush->Flush();
+        service_endpoint_->NotifyFlushComplete(flush_req_id);
+      }));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/test/mock_producer.h b/src/tracing/test/mock_producer.h
index 8af4c76..d734c7f 100644
--- a/src/tracing/test/mock_producer.h
+++ b/src/tracing/test/mock_producer.h
@@ -54,6 +54,10 @@
   std::unique_ptr<TraceWriter> CreateTraceWriter(
       const std::string& data_source_name);
 
+  // If |writer_to_flush| != nullptr does NOT reply to the flush request.
+  // If |writer_to_flush| == nullptr does NOT reply to the flush request.
+  void WaitForFlush(TraceWriter* writer_to_flush);
+
   Service::ProducerEndpoint* endpoint() { return service_endpoint_.get(); }
 
   // Producer implementation.
@@ -63,6 +67,8 @@
                void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
   MOCK_METHOD0(OnTracingSetup, void());
+  MOCK_METHOD3(Flush,
+               void(FlushRequestID, const DataSourceInstanceID*, size_t));
 
  private:
   base::TestTaskRunner* const task_runner_;
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index 0d19fdc..3760683 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -61,6 +61,8 @@
   MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
   MOCK_METHOD0(uid, uid_t());
   MOCK_METHOD0(OnTracingSetup, void());
+  MOCK_METHOD3(Flush,
+               void(FlushRequestID, const DataSourceInstanceID*, size_t));
 };
 
 class MockConsumer : public Consumer {
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 9ca5430..b509647 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -98,6 +98,7 @@
 
   void TearDownDataSourceInstance(DataSourceInstanceID) override {}
   void OnTracingSetup() override {}
+  void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override {}
 
  private:
   const std::string name_;
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 2069b5d..7f14805 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -120,4 +120,13 @@
 
 void FakeProducer::OnTracingSetup() {}
 
+void FakeProducer::Flush(FlushRequestID flush_request_id,
+                         const DataSourceInstanceID*,
+                         size_t num_data_sources) {
+  PERFETTO_DCHECK(num_data_sources > 0);
+  if (trace_writer_)
+    trace_writer_->Flush();
+  endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
 }  // namespace perfetto
diff --git a/test/fake_producer.h b/test/fake_producer.h
index ecf9d89..418c01e 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -50,6 +50,7 @@
                                 const DataSourceConfig& source_config) override;
   void TearDownDataSourceInstance(DataSourceInstanceID) override;
   void OnTracingSetup() override;
+  void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override;
 
  private:
   void Shutdown();