Implement ftrace flushing

This CL finally implements flushing of ftrace data and
wires it up to the Flush() API of the tracing service.
The major change introduced is in CpuReader, the multi
threaded class that reads ftrace data.
The core scheduling algorithm is left unchanged (see
//docs/ftrace.md). The old logic is still preserved:
[wait for command] -> [splice blocking] -> [splice nonblock]
However, this CL introduces a way for the FtraceController
to interrupt the splice and switch to read() mode for
flushes. read(), in fact, is the only way to read partial
pages out of the ftrace buffer.
Even when in read() mode, the scheduling algorithm is still
the same, just s/splice/read/.
There are a bunch of caveats and they are thoroughly
described in b/120188810. Essentially once we switch to
read() for the flush, we need to be careful and wait for
the ftrace read pointer to become page-aligned again before
switching back to splice().
Furthermore this CL gets rid of the internal pipe to move
data between the worker thread and the main thread and switches
to the recently introduced page pool. We still have an internal
pipe (for splice) but use it entirely in the worker therad.
Exposing the pipe to the main thread had two drawbacks:
1) Essentially limits the transfer bandwidth between the worker
   thread and the main thread, making the ftrace buffer size
   useless. Before, even if one specified a buffer size of 1GB,
   we would at most read 1 pipe buffer per read cycle.
2) That hits another kernel bug around non-blocking splice
   (b/119805587).

Test: perfetto_integrationtests --gtest_filter=PerfettoTest.VeryLargePackets
Bug: 73886018
Change-Id: I4a6e82b284971a3765b42959904f5402095e4c4e
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index 1cffd75..83e3484 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -29,7 +29,9 @@
 #include "perfetto/base/metatrace.h"
 #include "perfetto/base/optional.h"
 #include "perfetto/base/utils.h"
+#include "src/traced/probes/ftrace/ftrace_controller.h"
 #include "src/traced/probes/ftrace/ftrace_data_source.h"
+#include "src/traced/probes/ftrace/ftrace_thread_sync.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 
 #include "perfetto/trace/ftrace/ftrace_event.pbzero.h"
@@ -149,16 +151,14 @@
 using protos::pbzero::GenericFtraceEvent;
 
 CpuReader::CpuReader(const ProtoTranslationTable* table,
+                     FtraceThreadSync* thread_sync,
                      size_t cpu,
-                     base::ScopedFile fd,
-                     std::function<void()> on_data_available)
-    : table_(table), cpu_(cpu), trace_fd_(std::move(fd)) {
-  // Both reads and writes from/to the staging pipe are always non-blocking.
-  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
-  // blocking vs non-blocking behavior is controlled solely by the
-  // SPLICE_F_NONBLOCK flag passed to splice().
-  staging_pipe_ = base::Pipe::Create(base::Pipe::kBothNonBlock);
-
+                     int generation,
+                     base::ScopedFile fd)
+    : table_(table),
+      thread_sync_(thread_sync),
+      cpu_(cpu),
+      trace_fd_(std::move(fd)) {
   // Make reads from the raw pipe blocking so that splice() can sleep.
   PERFETTO_CHECK(trace_fd_);
   PERFETTO_CHECK(SetBlocking(*trace_fd_, true));
@@ -180,19 +180,27 @@
   }
 #pragma GCC diagnostic pop
 
-  worker_thread_ =
-      std::thread(std::bind(&RunWorkerThread, cpu_, *trace_fd_,
-                            *staging_pipe_.wr, on_data_available, &cmd_));
+  worker_thread_ = std::thread(std::bind(&RunWorkerThread, cpu_, generation,
+                                         *trace_fd_, &pool_, thread_sync_,
+                                         table->page_header_size_len()));
 }
 
 CpuReader::~CpuReader() {
+// FtraceController (who owns this) is supposed to issue a kStop notification
+// to the thread sync object before destroying the CpuReader.
+#if PERFETTO_DCHECK_IS_ON()
+  {
+    std::lock_guard<std::mutex> lock(thread_sync_->mutex);
+    PERFETTO_DCHECK(thread_sync_->cmd == FtraceThreadSync::kQuit);
+  }
+#endif
+
   // The kernel's splice implementation for the trace pipe doesn't generate a
   // SIGPIPE if the output pipe is closed (b/73807072). Instead, the call to
   // close() on the pipe hangs forever. To work around this, we first close the
   // trace fd (which prevents another splice from starting), raise SIGPIPE and
   // wait for the worker to exit (i.e., to guarantee no splice is in progress)
   // and only then close the staging pipe.
-  cmd_ = ThreadCtl::kExit;
   trace_fd_.reset();
   InterruptWorkerThreadWithSignal();
   worker_thread_.join();
@@ -202,80 +210,189 @@
   pthread_kill(worker_thread_.native_handle(), SIGPIPE);
 }
 
+// The worker thread reads data from the ftrace trace_pipe_raw and moves it to
+// the page |pool| allowing the main thread to read and decode that.
+// See //docs/ftrace.md for the design of the ftrace worker scheduler.
 // static
 void CpuReader::RunWorkerThread(size_t cpu,
+                                int generation,
                                 int trace_fd,
-                                int staging_write_fd,
-                                const std::function<void()>& on_data_available,
-                                std::atomic<ThreadCtl>* cmd_atomic) {
+                                PagePool* pool,
+                                FtraceThreadSync* thread_sync,
+                                uint16_t header_size_len) {
+// Before attempting any changes to this function, think twice. The kernel
+// ftrace pipe code is full of caveats and bugs. This code carefully works
+// around those bugs. See b/120188810 and b/119805587 for the full narrative.
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-  // This thread is responsible for moving data from the trace pipe into the
-  // staging pipe at least one page at a time. This is done using the splice(2)
-  // system call, which unlike poll/select makes it possible to block until at
-  // least a full page of data is ready to be read. The downside is that as the
-  // call is blocking we need a dedicated thread for each trace pipe (i.e.,
-  // CPU).
   char thread_name[16];
   snprintf(thread_name, sizeof(thread_name), "traced_probes%zu", cpu);
   pthread_setname_np(pthread_self(), thread_name);
 
-  for (;;) {
-    // First do a blocking splice which sleeps until there is at least one
-    // page of data available and enough space to write it into the staging
-    // pipe.
-    ssize_t splice_res;
-    {
-      PERFETTO_METATRACE("splice_blocking", cpu);
-      splice_res = splice(trace_fd, nullptr, staging_write_fd, nullptr,
-                          base::kPageSize, SPLICE_F_MOVE);
-    }
-    if (splice_res < 0) {
-      // The kernel ftrace code has its own splice() implementation that can
-      // occasionally fail with transient errors not reported in man 2 splice.
-      // Just try again if we see these.
-      ThreadCtl cmd = *cmd_atomic;
-      if (errno == ENOMEM || errno == EBUSY ||
-          (errno == EINTR && cmd == ThreadCtl::kRun)) {
-        PERFETTO_DPLOG("Transient splice failure -- retrying");
-        usleep(100 * 1000);
-        continue;
+  // When using splice() the target fd needs to be an actual pipe. This pipe is
+  // used only within this thread and is mainly for synchronization purposes.
+  // A blocking splice() is the only way to block and wait for a new page of
+  // ftrace data.
+  base::Pipe sync_pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
+
+  enum ReadMode { kRead, kSplice };
+  enum Block { kBlock, kNonBlock };
+  constexpr auto kPageSize = base::kPageSize;
+
+  // This lambda function reads the ftrace raw pipe using either read() or
+  // splice(), either in blocking or non-blocking mode.
+  // Returns the number of ftrace bytes read, or -1 in case of failure.
+  auto read_ftrace_pipe = [&sync_pipe, trace_fd, pool, cpu, header_size_len](
+                              ReadMode mode, Block block) -> int {
+    static const char* const kModesStr[] = {"read-nonblock", "read-block",
+                                            "splice-nonblock", "splice-block"};
+    const char* mode_str = kModesStr[(mode == kSplice) * 2 + (block == kBlock)];
+    PERFETTO_METATRACE(mode_str, cpu);
+    uint8_t* pool_page = pool->BeginWrite();
+    PERFETTO_DCHECK(pool_page);
+
+    ssize_t res;
+    int err = 0;
+    if (mode == kSplice) {
+      uint32_t flg = SPLICE_F_MOVE | ((block == kNonBlock) * SPLICE_F_NONBLOCK);
+      res = splice(trace_fd, nullptr, *sync_pipe.wr, nullptr, kPageSize, flg);
+      err = errno;
+      if (res > 0) {
+        // If the splice() succeeded, read back from the other end of our own
+        // pipe and copy the data into the pool.
+        ssize_t rdres = read(*sync_pipe.rd, pool_page, kPageSize);
+        PERFETTO_DCHECK(rdres = res);
       }
-      if (cmd == ThreadCtl::kExit) {
-        PERFETTO_DPLOG("Stopping CPUReader loop for CPU %zd.", cpu);
-        PERFETTO_DCHECK(errno == EPIPE || errno == EINTR || errno == EBADF);
-        break;  // ~CpuReader is waiting to join this thread.
+    } else {
+      if (block == kNonBlock)
+        SetBlocking(trace_fd, false);
+      res = read(trace_fd, pool_page, kPageSize);
+      err = errno;
+      if (res > 0) {
+        // Need to copy the ptr, ParsePageHeader() advances the passed ptr arg.
+        const uint8_t* ptr = pool_page;
+
+        // The caller of this function wants to have a sufficient approximation
+        // of how many bytes of ftrace data have been read. Unfortunately the
+        // return value of read() is a lie. The problem is that the ftrace
+        // read() implementation, for good reasons, always reconstructs a whole
+        // ftrace page, copying the events over and zero-filling at the end.
+        // This is nice, because we always get a valid ftrace header, but also
+        // causes read to always returns 4096. The only way to have a good
+        // indication of how many bytes of ftrace data have been read is to
+        // parse the ftrace header.
+        // Note: |header_size_len| is *not* an indication on how many bytes are
+        // available form |ptr|. It's just an independent piece of information
+        // that needs to be passed to ParsePageHeader() (a static function) in
+        // order to work.
+        base::Optional<PageHeader> hdr = ParsePageHeader(&ptr, header_size_len);
+        PERFETTO_DCHECK(hdr && hdr->size > 0 && hdr->size <= base::kPageSize);
+        res = hdr.has_value() ? static_cast<int>(hdr->size) : -1;
       }
-      PERFETTO_FATAL("Unexpected ThreadCtl value: %d", int(cmd));
+      if (block == kNonBlock)
+        SetBlocking(trace_fd, true);
     }
 
-    // Then do as many non-blocking splices as we can. This moves any full
-    // pages from the trace pipe into the staging pipe as long as there is
-    // data in the former and space in the latter.
-    for (;;) {
-      {
-        PERFETTO_METATRACE("splice_nonblocking", cpu);
-        splice_res = splice(trace_fd, nullptr, staging_write_fd, nullptr,
-                            base::kPageSize, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
-      }
-      if (splice_res < 0) {
-        if (errno != EAGAIN && errno != ENOMEM && errno != EBUSY)
-          PERFETTO_PLOG("splice");
+    if (res > 0) {
+      // splice() should return full pages, read can return < a page.
+      PERFETTO_DCHECK(res == base::kPageSize || mode == kRead);
+      pool->EndWrite();
+      return static_cast<int>(res);
+    }
+
+    // It is fine to leave the BeginWrite() unpaired in the error case.
+
+    if (res && err != EAGAIN && err != ENOMEM && err != EBUSY && err != EINTR &&
+        err != EBADF) {
+      // EAGAIN: no data when in non-blocking mode.
+      // ENONMEM, EBUSY: temporary ftrace failures (they happen).
+      // EINTR: signal interruption, likely from main thread to issue a new cmd.
+      // EBADF: the main thread has closed the fd (happens during dtor).
+      PERFETTO_PLOG("Unexpected %s() err", mode == kRead ? "read" : "splice");
+    }
+    return -1;
+  };
+
+  uint64_t last_cmd_id = 0;
+  ReadMode cur_mode = kSplice;
+  for (bool run_loop = true; run_loop;) {
+    FtraceThreadSync::Cmd cmd;
+    // Wait for a new command from the main thread issued by FtraceController.
+    // The FtraceController issues also a signal() after every new command. This
+    // is not necessary for the condition variable itself, but it's necessary to
+    // unblock us if we are in a blocking read() or splice().
+    // Commands are tagged with an ID, every new command has a new |cmd_id|, so
+    // we can distinguish spurious wakeups from actual cmd requests.
+    {
+      PERFETTO_METATRACE("wait cmd", cpu);
+      std::unique_lock<std::mutex> lock(thread_sync->mutex);
+      while (thread_sync->cmd_id == last_cmd_id)
+        thread_sync->cond.wait(lock);
+      cmd = thread_sync->cmd;
+      last_cmd_id = thread_sync->cmd_id;
+    }
+
+    // An empirical threshold (bytes read/spliced from the raw pipe) to make an
+    // educated guess on whether we should read/splice more. If we read fewer
+    // bytes it means that we caught up with the write pointer and we started
+    // consuming ftrace events in real-time. This cannot be just 4096 because
+    // it needs to account for fragmentation, i.e. for the fact that the last
+    // trace event didn't fit in the current page and hence the current page
+    // was terminated prematurely.
+    constexpr int kRoughlyAPage = 4096 - 512;
+
+    switch (cmd) {
+      case FtraceThreadSync::kQuit:
+        run_loop = false;
+        break;
+
+      case FtraceThreadSync::kRun: {
+        PERFETTO_METATRACE(cur_mode == kRead ? "read" : "splice", cpu);
+
+        // Do a blocking read/splice. This can fail for a variety of reasons:
+        // - FtraceController interrupts us with a signal for a new cmd
+        //   (e.g. it wants us to quit or do a flush).
+        // - A temporary read/splice() failure occurred (it has been observed
+        //   to happen if the system is under high load).
+        // In all these cases the most useful thing we can do is skip the
+        // current cycle and try again later.
+        if (read_ftrace_pipe(cur_mode, kBlock) <= 0)
+          break;  // Wait for next command.
+
+        // If we are in read mode (because of a previous flush) check if the
+        // in-kernel read cursor is page-aligned again. If a non-blocking splice
+        // succeeds, it means that we can safely switch back to splice mode
+        // (See b/120188810).
+        if (cur_mode == kRead && read_ftrace_pipe(kSplice, kNonBlock) > 0)
+          cur_mode = kSplice;
+
+        // Do as many non-blocking read/splice as we can.
+        while (read_ftrace_pipe(cur_mode, kNonBlock) > kRoughlyAPage) {
+        }
+        pool->CommitWrittenPages();
+        FtraceController::OnCpuReaderRead(cpu, generation, thread_sync);
         break;
       }
-    }
-    {
-      PERFETTO_METATRACE("splice_waitcallback", cpu);
-      // This callback will block until we are allowed to read more data.
-      on_data_available();
-    }
-  }
+
+      case FtraceThreadSync::kFlush: {
+        PERFETTO_METATRACE("flush", cpu);
+        cur_mode = kRead;
+        while (read_ftrace_pipe(cur_mode, kNonBlock) > kRoughlyAPage) {
+        }
+        pool->CommitWrittenPages();
+        FtraceController::OnCpuReaderFlush(cpu, generation, thread_sync);
+        break;
+      }
+    }  // switch(cmd)
+  }    // for(run_loop)
+  PERFETTO_DPLOG("Terminating CPUReader thread for CPU %zd.", cpu);
 #else
   base::ignore_result(cpu);
+  base::ignore_result(generation);
   base::ignore_result(trace_fd);
-  base::ignore_result(staging_write_fd);
-  base::ignore_result(on_data_available);
-  base::ignore_result(cmd_atomic);
+  base::ignore_result(pool);
+  base::ignore_result(thread_sync);
+  base::ignore_result(header_size_len);
   PERFETTO_ELOG("Supported only on Linux/Android");
 #endif
 }
@@ -285,38 +402,30 @@
 void CpuReader::Drain(const std::set<FtraceDataSource*>& data_sources) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   PERFETTO_METATRACE("Drain(" + std::to_string(cpu_) + ")", kMainThread);
-  for (;;) {
-    uint8_t* buffer = GetBuffer();
-    long bytes =
-        PERFETTO_EINTR(read(*staging_pipe_.rd, buffer, base::kPageSize));
-    if (bytes == -1 && errno == EAGAIN)
-      break;
-    PERFETTO_CHECK(static_cast<size_t>(bytes) == base::kPageSize);
 
-    for (FtraceDataSource* data_source : data_sources) {
-      auto packet = data_source->trace_writer()->NewTracePacket();
-      auto* bundle = packet->set_ftrace_events();
-      auto* metadata = data_source->mutable_metadata();
-      auto* filter = data_source->event_filter();
+  auto page_blocks = pool_.BeginRead();
+  for (const auto& page_block : page_blocks) {
+    for (size_t i = 0; i < page_block.size(); i++) {
+      const uint8_t* page = page_block.At(i);
 
-      // Note: The fastpath in proto_trace_parser.cc speculates on the fact that
-      // the cpu field is the first field of the proto message.
-      // If this changes, change proto_trace_parser.cc accordingly.
-      bundle->set_cpu(static_cast<uint32_t>(cpu_));
+      for (FtraceDataSource* data_source : data_sources) {
+        auto packet = data_source->trace_writer()->NewTracePacket();
+        auto* bundle = packet->set_ftrace_events();
+        auto* metadata = data_source->mutable_metadata();
+        auto* filter = data_source->event_filter();
 
-      size_t evt_size = ParsePage(buffer, filter, bundle, table_, metadata);
-      PERFETTO_DCHECK(evt_size);
+        // Note: The fastpath in proto_trace_parser.cc speculates on the fact
+        // that the cpu field is the first field of the proto message. If this
+        // changes, change proto_trace_parser.cc accordingly.
+        bundle->set_cpu(static_cast<uint32_t>(cpu_));
 
-      bundle->set_overwrite_count(metadata->overwrite_count);
+        size_t evt_size = ParsePage(page, filter, bundle, table_, metadata);
+        PERFETTO_DCHECK(evt_size);
+        bundle->set_overwrite_count(metadata->overwrite_count);
+      }
     }
   }
-}
-
-uint8_t* CpuReader::GetBuffer() {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
-  if (!buffer_.IsValid())
-    buffer_ = base::PagedMemory::Allocate(base::kPageSize);
-  return reinterpret_cast<uint8_t*>(buffer_.Get());
+  pool_.EndRead(std::move(page_blocks));
 }
 
 // The structure of a raw trace buffer page is as follows:
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index 3d3112f..9e73020 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -36,11 +36,13 @@
 #include "perfetto/traced/data_source_types.h"
 #include "src/traced/probes/ftrace/ftrace_config.h"
 #include "src/traced/probes/ftrace/ftrace_metadata.h"
+#include "src/traced/probes/ftrace/page_pool.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 
 namespace perfetto {
 
 class FtraceDataSource;
+struct FtraceThreadSync;
 class ProtoTranslationTable;
 
 namespace protos {
@@ -56,18 +58,16 @@
  public:
   using FtraceEventBundle = protos::pbzero::FtraceEventBundle;
 
-  // |on_data_available| will be called on an arbitrary thread when at least one
-  // page of ftrace data is available for draining on this CPU.
   CpuReader(const ProtoTranslationTable*,
+            FtraceThreadSync*,
             size_t cpu,
-            base::ScopedFile fd,
-            std::function<void()> on_data_available);
+            int generation,
+            base::ScopedFile fd);
   ~CpuReader();
 
-  // Drains all available data from the staging pipe into the buffer of the
-  // passed data sources.
-  // Should be called in response to the |on_data_available| callback.
+  // Drains all available data into the buffer of the passed data sources.
   void Drain(const std::set<FtraceDataSource*>&);
+
   void InterruptWorkerThreadWithSignal();
 
   template <typename T>
@@ -180,24 +180,22 @@
                          FtraceMetadata* metadata);
 
  private:
-  enum ThreadCtl : uint32_t { kRun = 0, kExit };
   static void RunWorkerThread(size_t cpu,
+                              int generation,
                               int trace_fd,
-                              int staging_write_fd,
-                              const std::function<void()>& on_data_available,
-                              std::atomic<ThreadCtl>* cmd_atomic);
+                              PagePool*,
+                              FtraceThreadSync*,
+                              uint16_t header_size_len);
 
-  uint8_t* GetBuffer();
   CpuReader(const CpuReader&) = delete;
   CpuReader& operator=(const CpuReader&) = delete;
 
   const ProtoTranslationTable* const table_;
+  FtraceThreadSync* const thread_sync_;
   const size_t cpu_;
+  PagePool pool_;
   base::ScopedFile trace_fd_;
-  base::Pipe staging_pipe_;
-  base::PagedMemory buffer_;
   std::thread worker_thread_;
-  std::atomic<ThreadCtl> cmd_{kRun};
   PERFETTO_THREAD_CHECKER(thread_checker_)
 };
 
diff --git a/src/traced/probes/ftrace/ftrace_controller.cc b/src/traced/probes/ftrace/ftrace_controller.cc
index 189066f..96f7e35 100644
--- a/src/traced/probes/ftrace/ftrace_controller.cc
+++ b/src/traced/probes/ftrace/ftrace_controller.cc
@@ -31,6 +31,7 @@
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/metatrace.h"
 #include "perfetto/base/time.h"
 #include "perfetto/tracing/core/trace_writer.h"
 #include "src/traced/probes/ftrace/cpu_reader.h"
@@ -47,8 +48,10 @@
 namespace {
 
 constexpr int kDefaultDrainPeriodMs = 100;
+constexpr int kFlushTimeoutMs = 500;
 constexpr int kMinDrainPeriodMs = 1;
 constexpr int kMaxDrainPeriodMs = 1000 * 60;
+constexpr uint32_t kMainThread = 255;  // for METATRACE
 
 uint32_t ClampDrainPeriodMs(uint32_t drain_period_ms) {
   if (drain_period_ms == 0) {
@@ -130,10 +133,13 @@
                                    Observer* observer)
     : task_runner_(task_runner),
       observer_(observer),
+      thread_sync_(task_runner),
       ftrace_procfs_(std::move(ftrace_procfs)),
       table_(std::move(table)),
       ftrace_config_muxer_(std::move(model)),
-      weak_factory_(this) {}
+      weak_factory_(this) {
+  thread_sync_.trace_controller_weak = GetWeakPtr();
+}
 
 FtraceController::~FtraceController() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
@@ -148,76 +154,160 @@
   return static_cast<uint64_t>(base::GetWallTimeMs().count());
 }
 
+// The OnCpuReader* methods below are called on the CpuReader worker threads.
+// Lifetime is guaranteed to be valid, because the FtraceController dtor
+// (that happens on the main thread) joins the worker threads.
+
 // static
-void FtraceController::DrainCPUs(base::WeakPtr<FtraceController> weak_this,
-                                 size_t generation) {
-  // The controller might be gone.
-  FtraceController* ctrl = weak_this.get();
-  if (!ctrl)
+void FtraceController::OnCpuReaderRead(size_t cpu,
+                                       int generation,
+                                       FtraceThreadSync* thread_sync) {
+  PERFETTO_METATRACE("OnCpuReaderRead()", cpu);
+
+  {
+    std::lock_guard<std::mutex> lock(thread_sync->mutex);
+    // If this was the first CPU to wake up, schedule a drain for the next
+    // drain interval.
+    bool post_drain_task = thread_sync->cpus_to_drain.none();
+    thread_sync->cpus_to_drain[cpu] = true;
+    if (!post_drain_task)
+      return;
+  }  // lock(thread_sync_.mutex)
+
+  base::WeakPtr<FtraceController> weak_ctl = thread_sync->trace_controller_weak;
+  base::TaskRunner* task_runner = thread_sync->task_runner;
+
+  // The nested PostTask is used because the FtraceController (and hence
+  // GetDrainPeriodMs()) can be called only on the main thread.
+  task_runner->PostTask([weak_ctl, task_runner, generation] {
+
+    if (!weak_ctl)
+      return;
+    uint32_t drain_period_ms = weak_ctl->GetDrainPeriodMs();
+
+    task_runner->PostDelayedTask(
+        [weak_ctl, generation] {
+          if (weak_ctl)
+            weak_ctl->DrainCPUs(generation);
+        },
+        drain_period_ms - (weak_ctl->NowMs() % drain_period_ms));
+
+  });
+}
+
+// static
+void FtraceController::OnCpuReaderFlush(size_t cpu,
+                                        int generation,
+                                        FtraceThreadSync* thread_sync) {
+  // In the case of a flush, we want to drain the data as quickly as possible to
+  // minimize the flush latency, at the cost of more tasks / wakeups (eventually
+  // one task per cpu). Flushes are not supposed to happen too frequently.
+  {
+    std::lock_guard<std::mutex> lock(thread_sync->mutex);
+    thread_sync->cpus_to_drain[cpu] = true;
+    thread_sync->flush_acks[cpu] = true;
+  }  // lock(thread_sync_.mutex)
+
+  base::WeakPtr<FtraceController> weak_ctl = thread_sync->trace_controller_weak;
+  thread_sync->task_runner->PostTask([weak_ctl, generation] {
+    if (weak_ctl)
+      weak_ctl->DrainCPUs(generation);
+  });
+}
+
+void FtraceController::DrainCPUs(int generation) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  PERFETTO_METATRACE("DrainCPUs()", kMainThread);
+
+  if (generation != generation_)
     return;
 
-  // We might have stopped tracing then quickly re-enabled it, in this case
-  // we don't want to end up with two periodic tasks for each CPU:
-  if (ctrl->generation_ != generation)
-    return;
-
-  PERFETTO_DCHECK_THREAD(ctrl->thread_checker_);
+  const size_t num_cpus = ftrace_procfs_->NumberOfCpus();
+  PERFETTO_DCHECK(cpu_readers_.size() == num_cpus);
+  FlushRequestID ack_flush_request_id = 0;
   std::bitset<base::kMaxCpus> cpus_to_drain;
   {
-    std::unique_lock<std::mutex> lock(ctrl->lock_);
-    // We might have stopped caring about events.
-    if (!ctrl->listening_for_raw_trace_data_)
-      return;
-    std::swap(cpus_to_drain, ctrl->cpus_to_drain_);
+    std::lock_guard<std::mutex> lock(thread_sync_.mutex);
+    std::swap(cpus_to_drain, thread_sync_.cpus_to_drain);
+
+    // Check also if a flush is pending and if all cpus have acked. If that's
+    // the case, ack the overall Flush() request at the end of this function.
+    if (cur_flush_request_id_ && thread_sync_.flush_acks.count() >= num_cpus) {
+      thread_sync_.flush_acks.reset();
+      ack_flush_request_id = cur_flush_request_id_;
+      cur_flush_request_id_ = 0;
+    }
   }
 
-  for (size_t cpu = 0; cpu < ctrl->ftrace_procfs_->NumberOfCpus(); cpu++) {
+  for (size_t cpu = 0; cpu < num_cpus; cpu++) {
     if (!cpus_to_drain[cpu])
       continue;
     // This method reads the pipe and converts the raw ftrace data into
     // protobufs using the |data_source|'s TraceWriter.
-    ctrl->cpu_readers_[cpu]->Drain(ctrl->started_data_sources_);
-    ctrl->OnDrainCpuForTesting(cpu);
+    cpu_readers_[cpu]->Drain(started_data_sources_);
+    OnDrainCpuForTesting(cpu);
   }
 
   // If we filled up any SHM pages while draining the data, we will have posted
   // a task to notify traced about this. Only unblock the readers after this
   // notification is sent to make it less likely that they steal CPU time away
-  // from traced.
-  ctrl->task_runner_->PostTask(
-      std::bind(&FtraceController::UnblockReaders, weak_this));
+  // from traced. Also, don't unblock the readers until all of them have replied
+  // to the flush.
+  if (!cur_flush_request_id_) {
+    base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
+    task_runner_->PostTask([weak_this] {
+      if (weak_this)
+        weak_this->UnblockReaders();
+    });
+  }
 
-  ctrl->observer_->OnFtraceDataWrittenIntoDataSourceBuffers();
+  observer_->OnFtraceDataWrittenIntoDataSourceBuffers();
+
+  if (ack_flush_request_id) {
+    // Flush completed, all CpuReader(s) acked.
+
+    IssueThreadSyncCmd(FtraceThreadSync::kRun);  // Switch back to reading mode.
+
+    // This will call FtraceDataSource::OnFtraceFlushComplete(), which in turn
+    // will flush the userspace buffers and ack the flush to the ProbesProducer
+    // which in turn will ack the flush to the tracing service.
+    NotifyFlushCompleteToStartedDataSources(ack_flush_request_id);
+  }
 }
 
-// static
-void FtraceController::UnblockReaders(
-    const base::WeakPtr<FtraceController>& weak_this) {
-  FtraceController* ctrl = weak_this.get();
-  if (!ctrl)
+void FtraceController::UnblockReaders() {
+  PERFETTO_METATRACE("UnblockReaders()", kMainThread);
+
+  // If a flush or a quit is pending, do nothing.
+  std::unique_lock<std::mutex> lock(thread_sync_.mutex);
+  if (thread_sync_.cmd != FtraceThreadSync::kRun)
     return;
+
   // Unblock all waiting readers to start moving more data into their
   // respective staging pipes.
-  ctrl->data_drained_.notify_all();
+  IssueThreadSyncCmd(FtraceThreadSync::kRun, std::move(lock));
 }
 
 void FtraceController::StartIfNeeded() {
   if (started_data_sources_.size() > 1)
     return;
   PERFETTO_DCHECK(!started_data_sources_.empty());
-  {
-    std::unique_lock<std::mutex> lock(lock_);
-    PERFETTO_CHECK(!listening_for_raw_trace_data_);
-    listening_for_raw_trace_data_ = true;
-  }
-  generation_++;
+  PERFETTO_DCHECK(cpu_readers_.empty());
   base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
+
+  {
+    std::lock_guard<std::mutex> lock(thread_sync_.mutex);
+    thread_sync_.cmd = FtraceThreadSync::kRun;
+    thread_sync_.cmd_id++;
+  }
+
+  generation_++;
+  cpu_readers_.clear();
+  cpu_readers_.reserve(ftrace_procfs_->NumberOfCpus());
   for (size_t cpu = 0; cpu < ftrace_procfs_->NumberOfCpus(); cpu++) {
-    cpu_readers_.emplace(
-        cpu, std::unique_ptr<CpuReader>(new CpuReader(
-                 table_.get(), cpu, ftrace_procfs_->OpenPipeForCpu(cpu),
-                 std::bind(&FtraceController::OnDataAvailable, this, weak_this,
-                           generation_, cpu, GetDrainPeriodMs()))));
+    cpu_readers_.emplace_back(
+        new CpuReader(table_.get(), &thread_sync_, cpu, generation_,
+                      ftrace_procfs_->OpenPipeForCpu(cpu)));
   }
 }
 
@@ -244,50 +334,61 @@
   ftrace_procfs_->WriteTraceMarker(s);
 }
 
+void FtraceController::Flush(FlushRequestID flush_id) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+
+  if (flush_id == cur_flush_request_id_)
+    return;  // Already dealing with this flush request.
+
+  cur_flush_request_id_ = flush_id;
+  {
+    std::unique_lock<std::mutex> lock(thread_sync_.mutex);
+    thread_sync_.flush_acks.reset();
+    IssueThreadSyncCmd(FtraceThreadSync::kFlush, std::move(lock));
+  }
+
+  base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, flush_id] {
+        if (weak_this)
+          weak_this->OnFlushTimeout(flush_id);
+      },
+      kFlushTimeoutMs);
+}
+
+void FtraceController::OnFlushTimeout(FlushRequestID flush_request_id) {
+  if (flush_request_id != cur_flush_request_id_)
+    return;
+
+  uint64_t acks = 0;  // For debugging purposes only.
+  {
+    // Unlock the cpu readers and move on.
+    std::unique_lock<std::mutex> lock(thread_sync_.mutex);
+    acks = thread_sync_.flush_acks.to_ulong();
+    thread_sync_.flush_acks.reset();
+    if (thread_sync_.cmd == FtraceThreadSync::kFlush)
+      IssueThreadSyncCmd(FtraceThreadSync::kRun, std::move(lock));
+  }
+
+  PERFETTO_ELOG("Ftrace flush(%" PRIu64 ") timed out. Acked cpus: 0x%" PRIx64,
+                flush_request_id, acks);
+  cur_flush_request_id_ = 0;
+  NotifyFlushCompleteToStartedDataSources(flush_request_id);
+}
+
 void FtraceController::StopIfNeeded() {
   if (!started_data_sources_.empty())
     return;
 
-  {
-    // Unblock any readers that are waiting for us to drain data.
-    std::unique_lock<std::mutex> lock(lock_);
-    listening_for_raw_trace_data_ = false;
-    cpus_to_drain_.reset();
-  }
+  // We are not implicitly flushing on Stop. The tracing service is supposed to
+  // ask for an explicit flush before stopping, unless it needs to perform a
+  // non-graceful stop.
 
-  data_drained_.notify_all();
+  IssueThreadSyncCmd(FtraceThreadSync::kQuit);
+
+  // Destroying the CpuReader(s) will join on their worker threads.
   cpu_readers_.clear();
-}
-
-// This method is called on the worker thread. Lifetime is guaranteed to be
-// valid, because the FtraceController dtor (that happens on the main thread)
-// joins the worker threads. |weak_this| is passed and not derived, because the
-// WeakPtrFactory is accessible only on the main thread.
-void FtraceController::OnDataAvailable(
-    base::WeakPtr<FtraceController> weak_this,
-    size_t generation,
-    size_t cpu,
-    uint32_t drain_period_ms) {
-  PERFETTO_DCHECK(cpu < ftrace_procfs_->NumberOfCpus());
-  std::unique_lock<std::mutex> lock(lock_);
-  if (!listening_for_raw_trace_data_)
-    return;
-  if (cpus_to_drain_.none()) {
-    // If this was the first CPU to wake up, schedule a drain for the next drain
-    // interval.
-    uint32_t delay_ms = drain_period_ms - (NowMs() % drain_period_ms);
-    task_runner_->PostDelayedTask(
-        std::bind(&FtraceController::DrainCPUs, weak_this, generation),
-        delay_ms);
-  }
-  cpus_to_drain_[cpu] = true;
-
-  // Wait until the main thread has finished draining.
-  // TODO(skyostil): The threads waiting here will all try to grab lock_
-  // when woken up. Find a way to avoid this.
-  data_drained_.wait(lock, [this, cpu] {
-    return !cpus_to_drain_[cpu] || !listening_for_raw_trace_data_;
-  });
+  generation_++;
 }
 
 bool FtraceController::AddDataSource(FtraceDataSource* data_source) {
@@ -334,6 +435,43 @@
   DumpAllCpuStats(ftrace_procfs_.get(), stats);
 }
 
+void FtraceController::IssueThreadSyncCmd(
+    FtraceThreadSync::Cmd cmd,
+    std::unique_lock<std::mutex> pass_lock_from_caller) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  {
+    std::unique_lock<std::mutex> lock(std::move(pass_lock_from_caller));
+    if (!lock.owns_lock())
+      lock = std::unique_lock<std::mutex>(thread_sync_.mutex);
+
+    if (thread_sync_.cmd == FtraceThreadSync::kQuit &&
+        cmd != FtraceThreadSync::kQuit) {
+      // If in kQuit state, we should never issue any other commands.
+      return;
+    }
+
+    thread_sync_.cmd = cmd;
+    thread_sync_.cmd_id++;
+  }
+
+  // Send a SIGPIPE to all worker threads to wake them up if they are sitting in
+  // a blocking splice(). If they are not and instead they are sitting in the
+  // cond-variable.wait(), this, together with the one below, will have at best
+  // the same effect of a spurious wakeup, depending on the implementation of
+  // the condition variable.
+  for (const auto& cpu_reader : cpu_readers_)
+    cpu_reader->InterruptWorkerThreadWithSignal();
+
+  thread_sync_.cond.notify_all();
+}
+
+void FtraceController::NotifyFlushCompleteToStartedDataSources(
+    FlushRequestID flush_request_id) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  for (FtraceDataSource* data_source : started_data_sources_)
+    data_source->OnFtraceFlushComplete(flush_request_id);
+}
+
 FtraceController::Observer::~Observer() = default;
 
 }  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index b1845ce..5201e34 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -17,14 +17,13 @@
 #ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_CONTROLLER_H_
 #define SRC_TRACED_PROBES_FTRACE_FTRACE_CONTROLLER_H_
 
+#include <stdint.h>
 #include <unistd.h>
 
 #include <bitset>
-#include <condition_variable>
 #include <functional>
 #include <map>
 #include <memory>
-#include <mutex>
 #include <set>
 #include <string>
 
@@ -32,7 +31,9 @@
 #include "perfetto/base/task_runner.h"
 #include "perfetto/base/utils.h"
 #include "perfetto/base/weak_ptr.h"
+#include "perfetto/tracing/core/basic_types.h"
 #include "src/traced/probes/ftrace/ftrace_config.h"
+#include "src/traced/probes/ftrace/ftrace_thread_sync.h"
 
 namespace perfetto {
 
@@ -61,6 +62,10 @@
   static std::unique_ptr<FtraceController> Create(base::TaskRunner*, Observer*);
   virtual ~FtraceController();
 
+  // These two methods are called by CpuReader(s) from their worker threads.
+  static void OnCpuReaderRead(size_t cpu, int generation, FtraceThreadSync*);
+  static void OnCpuReaderFlush(size_t cpu, int generation, FtraceThreadSync*);
+
   void DisableAllEvents();
   void WriteTraceMarker(const std::string& s);
   void ClearTrace();
@@ -69,6 +74,11 @@
   bool StartDataSource(FtraceDataSource*);
   void RemoveDataSource(FtraceDataSource*);
 
+  // Force a read of the ftrace buffers, including kernel buffer pages that
+  // are not full. Will call OnFtraceFlushComplete() on all
+  // |started_data_sources_| once all workers have flushed (or timed out).
+  void Flush(FlushRequestID);
+
   void DumpFtraceStats(FtraceStats*);
 
   base::WeakPtr<FtraceController> GetWeakPtr() {
@@ -95,36 +105,28 @@
   FtraceController(const FtraceController&) = delete;
   FtraceController& operator=(const FtraceController&) = delete;
 
-  // Called on a worker thread when |cpu| has at least one page of data
-  // available for reading.
-  void OnDataAvailable(base::WeakPtr<FtraceController>,
-                       size_t generation,
-                       size_t cpu,
-                       uint32_t drain_period_ms);
-
-  static void DrainCPUs(base::WeakPtr<FtraceController>, size_t generation);
-  static void UnblockReaders(const base::WeakPtr<FtraceController>&);
+  void OnFlushTimeout(FlushRequestID);
+  void DrainCPUs(int generation);
+  void UnblockReaders();
+  void NotifyFlushCompleteToStartedDataSources(FlushRequestID);
+  void IssueThreadSyncCmd(FtraceThreadSync::Cmd,
+                          std::unique_lock<std::mutex> = {});
 
   uint32_t GetDrainPeriodMs();
 
   void StartIfNeeded();
   void StopIfNeeded();
 
-  // Begin lock-protected members.
-  std::mutex lock_;
-  std::condition_variable data_drained_;
-  std::bitset<base::kMaxCpus> cpus_to_drain_;
-  bool listening_for_raw_trace_data_ = false;
-  // End lock-protected members.
-
   base::TaskRunner* const task_runner_;
   Observer* const observer_;
+  FtraceThreadSync thread_sync_;
   std::unique_ptr<FtraceProcfs> ftrace_procfs_;
   std::unique_ptr<ProtoTranslationTable> table_;
   std::unique_ptr<FtraceConfigMuxer> ftrace_config_muxer_;
-  size_t generation_ = 0;
+  int generation_ = 0;
+  FlushRequestID cur_flush_request_id_ = 0;
   bool atrace_running_ = false;
-  std::map<size_t, std::unique_ptr<CpuReader>> cpu_readers_;
+  std::vector<std::unique_ptr<CpuReader>> cpu_readers_;
   std::set<FtraceDataSource*> data_sources_;
   std::set<FtraceDataSource*> started_data_sources_;
   base::WeakPtrFactory<FtraceController> weak_factory_;  // Keep last.
diff --git a/src/traced/probes/ftrace/ftrace_controller_unittest.cc b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
index 9d8a53f..09d1744 100644
--- a/src/traced/probes/ftrace/ftrace_controller_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
@@ -76,7 +76,11 @@
     task_ = std::move(task);
   }
 
-  void RunLastTask() { TakeTask()(); }
+  void RunLastTask() {
+    auto task = TakeTask();
+    if (task)
+      task();
+  }
 
   std::function<void()> TakeTask() {
     std::unique_lock<std::mutex> lock(lock_);
@@ -216,18 +220,18 @@
   uint32_t drain_period_ms() { return GetDrainPeriodMs(); }
 
   std::function<void()> GetDataAvailableCallback(size_t cpu) {
-    base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
-    size_t generation = generation_;
-    return [this, weak_this, generation, cpu] {
-      OnDataAvailable(weak_this, generation, cpu, GetDrainPeriodMs());
+    int generation = generation_;
+    auto* thread_sync = &thread_sync_;
+    return [cpu, generation, thread_sync] {
+      FtraceController::OnCpuReaderRead(cpu, generation, thread_sync);
     };
   }
 
   void WaitForData(size_t cpu) {
     for (;;) {
       {
-        std::unique_lock<std::mutex> lock(lock_);
-        if (cpus_to_drain_[cpu])
+        std::unique_lock<std::mutex> lock(thread_sync_.mutex);
+        if (thread_sync_.cpus_to_drain[cpu])
           return;
       }
       usleep(5000);
@@ -393,50 +397,6 @@
   data_source.reset();
 }
 
-TEST(FtraceControllerTest, TaskScheduling) {
-  auto controller = CreateTestController(
-      false /* nice runner */, false /* nice procfs */, 2 /* num cpus */);
-
-  // For this test we don't care about calls to WriteToFile/ClearFile.
-  EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
-  EXPECT_CALL(*controller->procfs(), ClearFile(_)).Times(AnyNumber());
-
-  FtraceConfig config = CreateFtraceConfig({"group/foo"});
-  auto data_source = controller->AddFakeDataSource(config);
-  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
-
-  // Only one call to drain should be scheduled for the next drain period.
-  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100));
-
-  // However both CPUs should be drained.
-  EXPECT_CALL(*controller, OnDrainCpuForTesting(_)).Times(2);
-
-  // Finally, another task should be posted to unblock the workers.
-  EXPECT_CALL(*controller->runner(), PostTask(_));
-
-  // Simulate two worker threads reporting available data.
-  auto on_data_available0 = controller->GetDataAvailableCallback(0u);
-  std::thread worker0([on_data_available0] { on_data_available0(); });
-
-  auto on_data_available1 = controller->GetDataAvailableCallback(1u);
-  std::thread worker1([on_data_available1] { on_data_available1(); });
-
-  // Poll until both worker threads have reported available data.
-  controller->WaitForData(0u);
-  controller->WaitForData(1u);
-
-  // Run the task to drain all CPUs.
-  controller->runner()->RunLastTask();
-
-  // Run the task to unblock all workers.
-  controller->runner()->RunLastTask();
-
-  worker0.join();
-  worker1.join();
-
-  data_source.reset();
-}
-
 TEST(FtraceControllerTest, BackToBackEnableDisable) {
   auto controller =
       CreateTestController(false /* nice runner */, false /* nice procfs */);
@@ -447,7 +407,8 @@
   EXPECT_CALL(*controller->procfs(), ReadOneCharFromFile("/root/tracing_on"))
       .Times(AnyNumber());
 
-  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100)).Times(2);
+  EXPECT_CALL(*controller->runner(), PostTask(_)).Times(1);
+  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100)).Times(1);
   FtraceConfig config = CreateFtraceConfig({"group/foo"});
   auto data_source = controller->AddFakeDataSource(config);
   ASSERT_TRUE(controller->StartDataSource(data_source.get()));
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index 4dfaed3..4f9ef3e 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -65,15 +65,37 @@
     controller_weak_->DumpFtraceStats(stats);
 }
 
-void FtraceDataSource::Flush(FlushRequestID, std::function<void()> callback) {
-  // TODO(primiano): this still doesn't flush data from the kernel ftrace
-  // buffers (see b/73886018). We should do that and delay the
-  // NotifyFlushComplete() until the ftrace data has been drained from the
-  // kernel ftrace buffer and written in the SMB.
-  if (!writer_)
+void FtraceDataSource::Flush(FlushRequestID flush_request_id,
+                             std::function<void()> callback) {
+  if (!controller_weak_)
     return;
-  WriteStats();
-  writer_->Flush(callback);
+
+  pending_flushes_[flush_request_id] = std::move(callback);
+
+  // FtraceController will call OnFtraceFlushComplete() once the data has been
+  // drained from the ftrace buffer and written into the various writers of
+  // all its active data sources.
+  controller_weak_->Flush(flush_request_id);
+}
+
+// Called by FtraceController after all CPUs have acked the flush or timed out.
+void FtraceDataSource::OnFtraceFlushComplete(FlushRequestID flush_request_id) {
+  auto it = pending_flushes_.find(flush_request_id);
+  if (it == pending_flushes_.end()) {
+    // This can genuinely happen in case of concurrent ftrace sessions. When a
+    // FtraceDataSource issues a flush, the controller has to drain ftrace data
+    // for everybody (there is only one kernel ftrace buffer for all sessions).
+    // FtraceController doesn't bother to remember which FtraceDataSource did or
+    // did not request a flush. Instead just boradcasts the
+    // OnFtraceFlushComplete() to all of them.
+    return;
+  }
+  auto callback = std::move(it->second);
+  pending_flushes_.erase(it);
+  if (writer_) {
+    WriteStats();
+    writer_->Flush(std::move(callback));
+  }
 }
 
 void FtraceDataSource::WriteStats() {
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index 8fdbdc3..6cd4f04 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -17,9 +17,12 @@
 #ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_DATA_SOURCE_H_
 #define SRC_TRACED_PROBES_FTRACE_FTRACE_DATA_SOURCE_H_
 
+#include <functional>
+#include <map>
 #include <memory>
 #include <set>
 #include <string>
+#include <utility>
 
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/weak_ptr.h"
@@ -68,6 +71,7 @@
   // Flushes the ftrace buffers into the userspace trace buffers and writes
   // also ftrace stats.
   void Flush(FlushRequestID, std::function<void()> callback) override;
+  void OnFtraceFlushComplete(FlushRequestID);
 
   FtraceConfigId config_id() const { return config_id_; }
   const FtraceConfig& config() const { return config_; }
@@ -85,6 +89,7 @@
   const FtraceConfig config_;
   FtraceMetadata metadata_;
   FtraceStats stats_before_ = {};
+  std::map<FlushRequestID, std::function<void()>> pending_flushes_;
 
   // Initialized by the Initialize() call.
   FtraceConfigId config_id_ = 0;
diff --git a/src/traced/probes/ftrace/ftrace_thread_sync.h b/src/traced/probes/ftrace/ftrace_thread_sync.h
new file mode 100644
index 0000000..b2c4a9a
--- /dev/null
+++ b/src/traced/probes/ftrace/ftrace_thread_sync.h
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
+#define SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
+
+#include <stdint.h>
+
+#include <bitset>
+#include <condition_variable>
+#include <mutex>
+
+#include "perfetto/base/utils.h"
+#include "perfetto/base/weak_ptr.h"
+
+namespace perfetto {
+
+namespace base {
+class TaskRunner;
+}  // namespace base
+
+class FtraceController;
+
+// This struct is accessed both by the FtraceController on the main thread and
+// by the CpuReader(s) on their worker threads. It is used to synchronize
+// handshakes between FtraceController and CpuReader(s). There is only *ONE*
+// instance of this state, owned by the FtraceController and shared with all
+// CpuReader(s).
+struct FtraceThreadSync {
+  explicit FtraceThreadSync(base::TaskRunner* tr) : task_runner(tr) {}
+
+  // These variables are set upon initialization time and never changed. Can
+  // be accessed outside of the |mutex|.
+  base::TaskRunner* const task_runner;  // Where the FtraceController lives.
+  base::WeakPtr<FtraceController> trace_controller_weak;
+
+  // Mutex & condition variable shared by main thread and all per-cpu workers.
+  // All fields below are read and modified holding |mutex|.
+  std::mutex mutex;
+
+  // Used to suspend CpuReader(s) between cycles and to wake them up at the
+  // same time.
+  std::condition_variable cond;
+
+  // |cmd| and |cmd_id| are written only by FtraceController. On each cycle,
+  // FtraceController increases the |cmd_id| monotonic counter and issues the
+  // new command. |cmd_id| is used by the CpuReader(s) to distinguish a new
+  // command from a spurious wakeup.
+  enum Cmd { kRun = 0, kFlush, kQuit };
+  Cmd cmd = kRun;
+  uint64_t cmd_id = 0;
+
+  // This bitmap is cleared by the FtraceController before every kRun command
+  // and is optionally set by OnDataAvailable() if a CpuReader did fetch any
+  // ftrace data during the read cycle.
+  std::bitset<base::kMaxCpus> cpus_to_drain;
+
+  // This bitmap is cleared by the FtraceController before issuing a kFlush
+  // command and set by each CpuReader after they have completed the flush.
+  std::bitset<base::kMaxCpus> flush_acks;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
diff --git a/test/BUILD.gn b/test/BUILD.gn
index 226ff37..08b04bb 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -29,6 +29,7 @@
     "../protos/perfetto/trace:zero",
     "../src/base:base",
     "../src/base:test_support",
+    "../src/traced/probes/ftrace",
   ]
   sources = [
     "end_to_end_integrationtest.cc",
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index a9dac0e..a197df0 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -28,6 +28,8 @@
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_packet.h"
 #include "src/base/test/test_task_runner.h"
+#include "src/traced/probes/ftrace/ftrace_controller.h"
+#include "src/traced/probes/ftrace/ftrace_procfs.h"
 #include "src/tracing/ipc/default_socket.h"
 #include "test/task_runner_thread.h"
 #include "test/task_runner_thread_delegates.h"
@@ -38,6 +40,32 @@
 
 namespace perfetto {
 
+namespace {
+
+class PerfettoTest : public ::testing::Test {
+ public:
+  void SetUp() override {
+    // TODO(primiano): refactor this, it's copy/pasted in three places now.
+    size_t index = 0;
+    constexpr auto kTracingPaths = FtraceController::kTracingPaths;
+    while (!ftrace_procfs_ && kTracingPaths[index]) {
+      ftrace_procfs_ = FtraceProcfs::Create(kTracingPaths[index++]);
+    }
+    if (!ftrace_procfs_)
+      return;
+    ftrace_procfs_->SetTracingOn(false);
+  }
+
+  void TearDown() override {
+    if (ftrace_procfs_)
+      ftrace_procfs_->SetTracingOn(false);
+  }
+
+  std::unique_ptr<FtraceProcfs> ftrace_procfs_;
+};
+
+}  // namespace
+
 // If we're building on Android and starting the daemons ourselves,
 // create the sockets in a world-writable location.
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
@@ -47,14 +75,14 @@
 #define TEST_PRODUCER_SOCK_NAME ::perfetto::GetProducerSocket()
 #endif
 
-// TODO(b/73453011): reenable this on more platforms (including standalone
-// Android).
+// TODO(b/73453011): reenable on more platforms (including standalone Android).
 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
-#define MAYBE_TestFtraceProducer TestFtraceProducer
+#define TreeHuggerOnly(x) x
 #else
-#define MAYBE_TestFtraceProducer DISABLED_TestFtraceProducer
+#define TreeHuggerOnly(x) DISABLED_##x
 #endif
-TEST(PerfettoTest, MAYBE_TestFtraceProducer) {
+
+TEST_F(PerfettoTest, TreeHuggerOnly(TestFtraceProducer)) {
   base::TestTaskRunner task_runner;
 
   TestHelper helper(&task_runner);
@@ -97,14 +125,65 @@
   }
 }
 
-// TODO(b/73453011): reenable this on more platforms (including standalone
-// Android).
-#if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
-#define MAYBE_TestBatteryTracing TestBatteryTracing
-#else
-#define MAYBE_TestBatteryTracing DISABLED_TestBatteryTracing
+TEST_F(PerfettoTest, TreeHuggerOnly(TestFtraceFlush)) {
+  base::TestTaskRunner task_runner;
+
+  TestHelper helper(&task_runner);
+  helper.StartServiceIfRequired();
+
+#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
+      new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
 #endif
-TEST(PerfettoTest, MAYBE_TestBatteryTracing) {
+
+  helper.ConnectConsumer();
+  helper.WaitForConsumerConnect();
+
+  const uint32_t kTestTimeoutMs = 30000;
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(16);
+  trace_config.set_duration_ms(kTestTimeoutMs);
+
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("linux.ftrace");
+
+  auto* ftrace_config = ds_config->mutable_ftrace_config();
+  *ftrace_config->add_ftrace_events() = "print";
+
+  helper.StartTracing(trace_config);
+
+  // Do a first flush just to synchronize with the producer. The problem here
+  // is that, on a Linux workstation, the producer can take several seconds just
+  // to get to the point where ftrace is ready. We use the flush ack as a
+  // synchronization point.
+  helper.FlushAndWait(kTestTimeoutMs);
+
+  EXPECT_TRUE(ftrace_procfs_->IsTracingEnabled());
+  const char kMarker[] = "just_one_event";
+  EXPECT_TRUE(ftrace_procfs_->WriteTraceMarker(kMarker));
+
+  // This is the real flush we are testing.
+  helper.FlushAndWait(kTestTimeoutMs);
+
+  helper.DisableTracing();
+  helper.WaitForTracingDisabled(kTestTimeoutMs);
+
+  helper.ReadData();
+  helper.WaitForReadData();
+
+  int marker_found = 0;
+  for (const auto& packet : helper.trace()) {
+    for (int i = 0; i < packet.ftrace_events().event_size(); i++) {
+      const auto& ev = packet.ftrace_events().event(i);
+      if (ev.has_print() && ev.print().buf().find(kMarker) != std::string::npos)
+        marker_found++;
+    }
+  }
+  ASSERT_EQ(marker_found, 1);
+}
+
+TEST_F(PerfettoTest, TreeHuggerOnly(TestBatteryTracing)) {
   base::TestTaskRunner task_runner;
 
   TestHelper helper(&task_runner);
@@ -155,7 +234,7 @@
   ASSERT_TRUE(has_battery_packet);
 }
 
-TEST(PerfettoTest, TestFakeProducer) {
+TEST_F(PerfettoTest, TestFakeProducer) {
   base::TestTaskRunner task_runner;
 
   TestHelper helper(&task_runner);
@@ -196,7 +275,7 @@
   }
 }
 
-TEST(PerfettoTest, VeryLargePackets) {
+TEST_F(PerfettoTest, VeryLargePackets) {
   base::TestTaskRunner task_runner;
 
   TestHelper helper(&task_runner);
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 51ae9aa..6d2a6c8 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -102,6 +102,18 @@
   endpoint_->EnableTracing(config);
 }
 
+void TestHelper::DisableTracing() {
+  endpoint_->DisableTracing();
+}
+
+void TestHelper::FlushAndWait(uint32_t timeout_ms) {
+  static int flush_num = 0;
+  std::string checkpoint_name = "flush." + std::to_string(flush_num++);
+  auto checkpoint = task_runner_->CreateCheckpoint(checkpoint_name);
+  endpoint_->Flush(timeout_ms, [checkpoint](bool) { checkpoint(); });
+  task_runner_->RunUntilCheckpoint(checkpoint_name, timeout_ms + 1000);
+}
+
 void TestHelper::ReadData(uint32_t read_count) {
   on_packets_finished_callback_ = task_runner_->CreateCheckpoint(
       "readback.complete." + std::to_string(read_count));
@@ -116,8 +128,8 @@
   task_runner_->RunUntilCheckpoint("producer.enabled");
 }
 
-void TestHelper::WaitForTracingDisabled() {
-  task_runner_->RunUntilCheckpoint("stop.tracing");
+void TestHelper::WaitForTracingDisabled(uint32_t timeout_ms) {
+  task_runner_->RunUntilCheckpoint("stop.tracing", timeout_ms);
 }
 
 void TestHelper::WaitForReadData(uint32_t read_count) {
diff --git a/test/test_helper.h b/test/test_helper.h
index 2bd4cf0..599ce5d 100644
--- a/test/test_helper.h
+++ b/test/test_helper.h
@@ -43,11 +43,13 @@
   FakeProducer* ConnectFakeProducer();
   void ConnectConsumer();
   void StartTracing(const TraceConfig& config);
+  void DisableTracing();
+  void FlushAndWait(uint32_t timeout_ms);
   void ReadData(uint32_t read_count = 0);
 
   void WaitForConsumerConnect();
   void WaitForProducerEnabled();
-  void WaitForTracingDisabled();
+  void WaitForTracingDisabled(uint32_t timeout_ms = 5000);
   void WaitForReadData(uint32_t read_count = 0);
 
   std::function<void()> WrapTask(const std::function<void()>& function);