introduce ThreadTaskRunner and use it in heapprofd's UnwindingWorker

As discussed last week, proposing a wrapper object that owns a UnixTaskRunner & its task thread.

For the review, please focus on the following:
* that the writeup of the thread-checking considerations in unix_task_runner.h is accurate
* that the thread_task_runner implementation itself is sane
* that its use in heapprofd is reasonable

I have no strong opinions on the following, and simply chose one option for the initial review:
* Naming of the new utility - ThreadUnixTaskRunner would more accurately reflect that this isn't
  cross-platform?
* Whether it belongs in base::, or should be in profiling::memory:: for now.
  We can slot this into consumer_api.cc (which does a very similar handshake) in a followup cl,
  so I thought base:: might be a reasonable place.

Note: with this change, heapprofd works in debug builds e2e afaict.

Change-Id: Ia12c89c6963bf0a659bca7994f4fe87aa3bfde58
diff --git a/Android.bp b/Android.bp
index f780212..256e440 100644
--- a/Android.bp
+++ b/Android.bp
@@ -47,6 +47,7 @@
     "src/base/string_view.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
+    "src/base/thread_task_runner.cc",
     "src/base/time.cc",
     "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
@@ -162,6 +163,7 @@
     "src/base/string_view.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
+    "src/base/thread_task_runner.cc",
     "src/base/time.cc",
     "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
@@ -228,6 +230,7 @@
     "src/base/string_view.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
+    "src/base/thread_task_runner.cc",
     "src/base/time.cc",
     "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
@@ -406,6 +409,7 @@
     "src/base/string_view.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
+    "src/base/thread_task_runner.cc",
     "src/base/time.cc",
     "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
@@ -576,6 +580,7 @@
     "src/base/test/utils.cc",
     "src/base/test/vm_test_utils.cc",
     "src/base/thread_checker.cc",
+    "src/base/thread_task_runner.cc",
     "src/base/time.cc",
     "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
@@ -2581,6 +2586,7 @@
     "src/base/string_view.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
+    "src/base/thread_task_runner.cc",
     "src/base/time.cc",
     "src/base/unix_socket.cc",
     "src/base/unix_task_runner.cc",
@@ -2827,6 +2833,8 @@
     "src/base/test/vm_test_utils.cc",
     "src/base/thread_checker.cc",
     "src/base/thread_checker_unittest.cc",
+    "src/base/thread_task_runner.cc",
+    "src/base/thread_task_runner_unittest.cc",
     "src/base/time.cc",
     "src/base/time_unittest.cc",
     "src/base/unix_socket.cc",
@@ -3092,6 +3100,7 @@
     "src/base/string_view.cc",
     "src/base/temp_file.cc",
     "src/base/thread_checker.cc",
+    "src/base/thread_task_runner.cc",
     "src/base/time.cc",
     "src/base/unix_task_runner.cc",
     "src/base/virtual_destructors.cc",
diff --git a/include/perfetto/base/BUILD.gn b/include/perfetto/base/BUILD.gn
index fd5b869..bd45200 100644
--- a/include/perfetto/base/BUILD.gn
+++ b/include/perfetto/base/BUILD.gn
@@ -38,6 +38,7 @@
     "task_runner.h",
     "temp_file.h",
     "thread_checker.h",
+    "thread_task_runner.h",
     "thread_utils.h",
     "time.h",
     "unix_task_runner.h",
diff --git a/include/perfetto/base/thread_task_runner.h b/include/perfetto/base/thread_task_runner.h
new file mode 100644
index 0000000..7992eeb
--- /dev/null
+++ b/include/perfetto/base/thread_task_runner.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_BASE_THREAD_TASK_RUNNER_H_
+#define INCLUDE_PERFETTO_BASE_THREAD_TASK_RUNNER_H_
+
+#include <functional>
+#include <thread>
+
+#include "perfetto/base/unix_task_runner.h"
+
+namespace perfetto {
+namespace base {
+
+// A UnixTaskRunner backed by a dedicated task thread. Shuts down the runner and
+// joins the thread upon destruction. Can be moved to transfer ownership.
+//
+// Guarantees that:
+// * the UnixTaskRunner will be constructed and destructed on the task thread.
+// * the task thread will live for the lifetime of the UnixTaskRunner.
+//
+class ThreadTaskRunner {
+ public:
+  static ThreadTaskRunner CreateAndStart() { return ThreadTaskRunner(); }
+
+  ThreadTaskRunner(const ThreadTaskRunner&) = delete;
+  ThreadTaskRunner& operator=(const ThreadTaskRunner&) = delete;
+
+  ThreadTaskRunner(ThreadTaskRunner&&) noexcept;
+  ThreadTaskRunner& operator=(ThreadTaskRunner&&);
+  ~ThreadTaskRunner();
+
+  // Returns a pointer to the UnixTaskRunner, which is valid for the lifetime of
+  // this ThreadTaskRunner object (unless this object is moved-from, in which
+  // case the pointer remains valid for the lifetime of the new owning
+  // ThreadTaskRunner).
+  //
+  // Warning: do not call Quit() on the returned runner pointer, the termination
+  // should be handled exclusively by this class' destructor.
+  UnixTaskRunner* get() { return task_runner_; }
+
+ private:
+  ThreadTaskRunner();
+  void RunTaskThread(std::function<void(UnixTaskRunner*)> initializer);
+
+  std::thread thread_;
+  UnixTaskRunner* task_runner_ = nullptr;
+};
+
+}  // namespace base
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_BASE_THREAD_TASK_RUNNER_H_
diff --git a/include/perfetto/base/unix_task_runner.h b/include/perfetto/base/unix_task_runner.h
index 47fecfc..98c9c7e 100644
--- a/include/perfetto/base/unix_task_runner.h
+++ b/include/perfetto/base/unix_task_runner.h
@@ -36,6 +36,20 @@
 namespace base {
 
 // Runs a task runner on the current thread.
+//
+// Implementation note: we currently assume (and enforce in debug builds) that
+// Run() is called from the thread that constructed the UnixTaskRunner. This is
+// not strictly necessary, and we could instead track the thread that invokes
+// Run(). However, a related property that *might* be important to enforce is
+// that the destructor runs on the task-running thread. Otherwise, if there are
+// still-pending tasks at the time of destruction, we would destroy those
+// outside of the task thread (which might be unexpected to the caller). On the
+// other hand, the std::function task interface discourages use of any
+// resource-owning tasks (as the callable needs to be copyable), so this might
+// not be important in practice.
+//
+// TODO(rsavitski): consider adding a thread-check in the destructor, after
+// auditing existing usages.
 class UnixTaskRunner : public TaskRunner {
  public:
   UnixTaskRunner();
@@ -57,6 +71,11 @@
   void RemoveFileDescriptorWatch(int fd) override;
   bool RunsTasksOnCurrentThread() const override;
 
+  // Returns true if the task runner is quitting, or has quit and hasn't been
+  // restarted since. Exposed primarily for ThreadTaskRunner, not necessary for
+  // normal use of this class.
+  bool QuitCalled();
+
  private:
   void WakeUp();
 
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 25f3691..447a9b1 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -41,6 +41,7 @@
       "event.cc",
       "pipe.cc",
       "temp_file.cc",
+      "thread_task_runner.cc",
       "unix_task_runner.cc",
     ]
   }
@@ -157,6 +158,7 @@
       "task_runner_unittest.cc",
       "temp_file_unittest.cc",
       "thread_checker_unittest.cc",
+      "thread_task_runner_unittest.cc",
       "utils_unittest.cc",
     ]
   }
diff --git a/src/base/thread_task_runner.cc b/src/base/thread_task_runner.cc
new file mode 100644
index 0000000..a545e6b
--- /dev/null
+++ b/src/base/thread_task_runner.cc
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/thread_task_runner.h"
+
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <thread>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/unix_task_runner.h"
+
+namespace perfetto {
+namespace base {
+
+ThreadTaskRunner::ThreadTaskRunner(ThreadTaskRunner&& other) noexcept
+    : thread_(std::move(other.thread_)), task_runner_(other.task_runner_) {
+  other.task_runner_ = nullptr;
+}
+
+ThreadTaskRunner& ThreadTaskRunner::operator=(ThreadTaskRunner&& other) {
+  this->~ThreadTaskRunner();
+  new (this) ThreadTaskRunner(std::move(other));
+  return *this;
+}
+
+ThreadTaskRunner::~ThreadTaskRunner() {
+  if (task_runner_) {
+    PERFETTO_CHECK(!task_runner_->QuitCalled());
+    task_runner_->Quit();
+
+    PERFETTO_DCHECK(thread_.joinable());
+  }
+  if (thread_.joinable())
+    thread_.join();
+}
+
+ThreadTaskRunner::ThreadTaskRunner() {
+  std::mutex init_lock;
+  std::condition_variable init_cv;
+
+  std::function<void(UnixTaskRunner*)> initializer =
+      [this, &init_lock, &init_cv](UnixTaskRunner* task_runner) {
+        std::lock_guard<std::mutex> lock(init_lock);
+        task_runner_ = task_runner;
+        // Notify while still holding the lock, as init_cv ceases to exist as
+        // soon as the main thread observes a non-null task_runner_, and it can
+        // wake up spuriously (i.e. before the notify if we had unlocked before
+        // notifying).
+        init_cv.notify_one();
+      };
+  thread_ = std::thread(&ThreadTaskRunner::RunTaskThread, this,
+                        std::move(initializer));
+
+  std::unique_lock<std::mutex> lock(init_lock);
+  init_cv.wait(lock, [this] { return !!task_runner_; });
+}
+
+void ThreadTaskRunner::RunTaskThread(
+    std::function<void(UnixTaskRunner*)> initializer) {
+  UnixTaskRunner task_runner;
+  task_runner.PostTask(std::bind(std::move(initializer), &task_runner));
+  task_runner.Run();
+}
+
+}  // namespace base
+}  // namespace perfetto
diff --git a/src/base/thread_task_runner_unittest.cc b/src/base/thread_task_runner_unittest.cc
new file mode 100644
index 0000000..823967b
--- /dev/null
+++ b/src/base/thread_task_runner_unittest.cc
@@ -0,0 +1,144 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/thread_task_runner.h"
+
+#include <thread>
+
+#include "gtest/gtest.h"
+#include "perfetto/base/thread_checker.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+class ThreadTaskRunnerTest : public ::testing::Test {
+ protected:
+  std::atomic<bool> atomic_flag_{false};
+};
+
+TEST_F(ThreadTaskRunnerTest, ConstructedRunning) {
+  ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+  task_runner.get()->PostTask([this] { atomic_flag_ = true; });
+  // main thread not blocked, wait on the task explicitly
+  while (!atomic_flag_) {
+    std::this_thread::yield();
+  }
+}
+
+TEST_F(ThreadTaskRunnerTest, RunsTasksOnOneDedicatedThread) {
+  ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+  EXPECT_FALSE(task_runner.get()->RunsTasksOnCurrentThread());
+
+  ThreadChecker thread_checker;
+  task_runner.get()->PostTask([&thread_checker] {
+    // make thread_checker track the task thread
+    thread_checker.DetachFromThread();
+    EXPECT_TRUE(thread_checker.CalledOnValidThread());
+  });
+  task_runner.get()->PostTask([this, &thread_checker] {
+    // called on the same thread
+    EXPECT_TRUE(thread_checker.CalledOnValidThread());
+    atomic_flag_ = true;
+  });
+
+  while (!atomic_flag_) {
+    std::this_thread::yield();
+  }
+}
+
+TEST_F(ThreadTaskRunnerTest, MovableOwnership) {
+  ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+  UnixTaskRunner* runner_ptr = task_runner.get();
+  EXPECT_NE(runner_ptr, nullptr);
+
+  ThreadChecker thread_checker;
+  task_runner.get()->PostTask([&thread_checker] {
+    // make thread_checker track the task thread
+    thread_checker.DetachFromThread();
+    EXPECT_TRUE(thread_checker.CalledOnValidThread());
+  });
+
+  // move ownership and destroy old instance
+  ThreadTaskRunner task_runner2 = std::move(task_runner);
+  EXPECT_EQ(task_runner.get(), nullptr);
+  task_runner.~ThreadTaskRunner();
+
+  // runner pointer is stable, and remains usable
+  EXPECT_EQ(task_runner2.get(), runner_ptr);
+  task_runner2.get()->PostTask([this, &thread_checker] {
+    // task thread remains the same
+    EXPECT_TRUE(thread_checker.CalledOnValidThread());
+    atomic_flag_ = true;
+  });
+
+  while (!atomic_flag_) {
+    std::this_thread::yield();
+  }
+}
+
+// Test helper callable that remembers a copy of a given ThreadChecker, and
+// checks that this class' destructor runs on the remembered thread. Note that
+// it is copyable so that it can be passed as a task (i.e. std::function) to a
+// task runner. Also note that all instances of this class will thread-check,
+// including those that have been moved-from.
+class DestructorThreadChecker {
+ public:
+  DestructorThreadChecker(ThreadChecker checker) : checker_(checker) {}
+  DestructorThreadChecker(const DestructorThreadChecker&) = default;
+  DestructorThreadChecker& operator=(const DestructorThreadChecker&) = default;
+  DestructorThreadChecker(DestructorThreadChecker&&) = default;
+  DestructorThreadChecker& operator=(DestructorThreadChecker&&) = default;
+
+  ~DestructorThreadChecker() { EXPECT_TRUE(checker_.CalledOnValidThread()); }
+
+  void operator()() { GTEST_FAIL() << "shouldn't be called"; }
+
+  ThreadChecker checker_;
+};
+
+// Checks that the still-pending tasks (and therefore the UnixTaskRunner itself)
+// are destructed on the task thread, and not the thread that destroys the
+// ThreadTaskRunner.
+TEST_F(ThreadTaskRunnerTest, EnqueuedTasksDestructedOnTaskThread) {
+  ThreadChecker thread_checker;
+  ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+
+  task_runner.get()->PostTask([this, &thread_checker, &task_runner] {
+    // make thread_checker track the task thread
+    thread_checker.DetachFromThread();
+    EXPECT_TRUE(thread_checker.CalledOnValidThread());
+    // Post a follow-up delayed task and unblock the main thread, which will
+    // destroy the ThreadTaskRunner while this task is still pending.
+    //
+    // Note: DestructorThreadChecker will thread-check (at least) twice:
+    // * for the temporary that was moved-from to construct the task
+    //   std::function. Will pass as we're posting from a task thread.
+    // * for the still-pending task once the ThreadTaskRunner destruction causes
+    //   the destruction of UnixTaskRunner.
+    task_runner.get()->PostDelayedTask(DestructorThreadChecker(thread_checker),
+                                       100 * 1000 /*ms*/);
+    atomic_flag_ = true;
+  });
+
+  while (!atomic_flag_) {
+    std::this_thread::yield();
+  }
+}
+
+}  // namespace
+}  // namespace base
+}  // namespace perfetto
diff --git a/src/base/unix_task_runner.cc b/src/base/unix_task_runner.cc
index 937a4b1..03a1935 100644
--- a/src/base/unix_task_runner.cc
+++ b/src/base/unix_task_runner.cc
@@ -72,6 +72,11 @@
   WakeUp();
 }
 
+bool UnixTaskRunner::QuitCalled() {
+  std::lock_guard<std::mutex> lock(lock_);
+  return quit_;
+}
+
 bool UnixTaskRunner::IsIdleForTesting() {
   std::lock_guard<std::mutex> lock(lock_);
   return immediate_tasks_.empty();
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 8853317..728d395 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -216,7 +216,6 @@
     ":wire_protocol",
     "../../../gn:default_deps",
     "../../base",
-    "../../base:test_support",
     "../../tracing",
   ]
 }
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
index 434d3cc..894181d 100644
--- a/src/profiling/memory/heapprofd_producer.cc
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -24,6 +24,7 @@
 
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/string_utils.h"
+#include "perfetto/base/thread_task_runner.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/data_source_descriptor.h"
 #include "perfetto/tracing/core/trace_writer.h"
@@ -50,6 +51,15 @@
   return client_config;
 }
 
+std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
+                                                  size_t n) {
+  std::vector<UnwindingWorker> ret;
+  for (size_t i = 0; i < n; ++i) {
+    ret.emplace_back(delegate, base::ThreadTaskRunner::CreateAndStart());
+  }
+  return ret;
+}
+
 }  // namespace
 
 // We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
@@ -59,9 +69,7 @@
                                      base::TaskRunner* task_runner)
     : mode_(mode),
       task_runner_(task_runner),
-      unwinding_task_runners_(kUnwinderThreads),
-      unwinding_threads_(MakeUnwindingThreads(kUnwinderThreads)),
-      unwinding_workers_(MakeUnwindingWorkers(kUnwinderThreads)),
+      unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)),
       target_pid_(base::kInvalidPid),
       socket_delegate_(this),
       weak_factory_(this) {
@@ -71,10 +79,6 @@
 }
 
 HeapprofdProducer::~HeapprofdProducer() {
-  for (auto& task_runner : unwinding_task_runners_)
-    task_runner.Quit();
-  for (std::thread& th : unwinding_threads_)
-    th.join();
   // We only borrowed this from the environment variable.
   // UnixSocket always owns the socket, so we need to manually release it
   // here.
@@ -313,7 +317,8 @@
                              bool has_flush_id) {
   auto it = data_sources_.find(id);
   if (it == data_sources_.end()) {
-    PERFETTO_LOG("Invalid data source.");
+    PERFETTO_LOG(
+        "Data source not found (harmless if using continuous_dump_config).");
     return false;
   }
   DataSource& data_source = it->second;
@@ -381,22 +386,6 @@
   }
 }
 
-std::vector<std::thread> HeapprofdProducer::MakeUnwindingThreads(size_t n) {
-  std::vector<std::thread> ret;
-  for (size_t i = 0; i < n; ++i) {
-    ret.emplace_back([this, i] { unwinding_task_runners_[i].Run(); });
-  }
-  return ret;
-}
-
-std::vector<UnwindingWorker> HeapprofdProducer::MakeUnwindingWorkers(size_t n) {
-  std::vector<UnwindingWorker> ret;
-  for (size_t i = 0; i < n; ++i) {
-    ret.emplace_back(this, &unwinding_task_runners_[i]);
-  }
-  return ret;
-}
-
 std::unique_ptr<base::UnixSocket> HeapprofdProducer::MakeListeningSocket() {
   const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
   if (sock_fd == nullptr) {
diff --git a/src/profiling/memory/heapprofd_producer.h b/src/profiling/memory/heapprofd_producer.h
index 5c8ae66..a0aef55 100644
--- a/src/profiling/memory/heapprofd_producer.h
+++ b/src/profiling/memory/heapprofd_producer.h
@@ -125,9 +125,6 @@
 
   const HeapprofdMode mode_;
 
-  std::vector<std::thread> MakeUnwindingThreads(size_t n);
-  std::vector<UnwindingWorker> MakeUnwindingWorkers(size_t n);
-
   void FinishDataSourceFlush(FlushRequestID flush_id);
   bool Dump(DataSourceInstanceID id,
             FlushRequestID flush_id,
@@ -177,11 +174,6 @@
   // of them were dropped.
   uint64_t next_index_ = 0;
 
-  // These are not fields in UnwinderThread as the task runner is not movable
-  // and that makes UnwinderThread very unwieldy objects (e.g. we cannot
-  // emplace_back into a vector as that requires movability.)
-  std::vector<base::UnixTaskRunner> unwinding_task_runners_;
-  std::vector<std::thread> unwinding_threads_;  // Only for ownership.
   std::vector<UnwindingWorker> unwinding_workers_;
 
   // state specific to mode_ == kCentral
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index dd6c7e1..89d50f9 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -49,6 +49,7 @@
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/string_utils.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/base/thread_task_runner.h"
 #include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
@@ -268,15 +269,17 @@
       break;
     HandleBuffer(&buf, &socket_data.metadata,
                  socket_data.data_source_instance_id,
-                 socket_data.sock->peer_pid());
+                 socket_data.sock->peer_pid(), delegate_);
     shmem.EndRead(std::move(buf));
   }
 }
 
+// static
 void UnwindingWorker::HandleBuffer(SharedRingBuffer::Buffer* buf,
                                    UnwindingMetadata* unwinding_metadata,
                                    DataSourceInstanceID data_source_instance_id,
-                                   pid_t peer_pid) {
+                                   pid_t peer_pid,
+                                   Delegate* delegate) {
   WireMessage msg;
   // TODO(fmayer): standardise on char* or uint8_t*.
   // char* has stronger guarantees regarding aliasing.
@@ -293,14 +296,14 @@
     rec.pid = peer_pid;
     rec.data_source_instance_id = data_source_instance_id;
     DoUnwind(&msg, unwinding_metadata, &rec);
-    delegate_->PostAllocRecord(std::move(rec));
+    delegate->PostAllocRecord(std::move(rec));
   } else if (msg.record_type == RecordType::Free) {
     FreeRecord rec;
     rec.pid = peer_pid;
     rec.data_source_instance_id = data_source_instance_id;
     // We need to copy this, so we can return the memory to the shmem buffer.
     memcpy(&rec.metadata, msg.free_header, sizeof(*msg.free_header));
-    delegate_->PostFreeRecord(std::move(rec));
+    delegate->PostFreeRecord(std::move(rec));
   } else {
     PERFETTO_DFATAL("Invalid record type.");
   }
@@ -310,9 +313,9 @@
   // Even with C++14, this cannot be moved, as std::function has to be
   // copyable, which HandoffData is not.
   HandoffData* raw_data = new HandoffData(std::move(handoff_data));
-  // We do not need to use a WeakPtr here because the TaskRunner gets Quit-ed
-  // before this object get destructed.
-  task_runner_->PostTask([this, raw_data] {
+  // We do not need to use a WeakPtr here because the task runner will not
+  // outlive its UnwindingWorker.
+  thread_task_runner_.get()->PostTask([this, raw_data] {
     HandoffData data = std::move(*raw_data);
     delete raw_data;
     HandleHandoffSocket(std::move(data));
@@ -320,9 +323,9 @@
 }
 
 void UnwindingWorker::HandleHandoffSocket(HandoffData handoff_data) {
-  auto sock = base::UnixSocket::AdoptConnected(handoff_data.sock.ReleaseFd(),
-                                               this, this->task_runner_,
-                                               base::SockType::kStream);
+  auto sock = base::UnixSocket::AdoptConnected(
+      handoff_data.sock.ReleaseFd(), this, this->thread_task_runner_.get(),
+      base::SockType::kStream);
   pid_t peer_pid = sock->peer_pid();
 
   UnwindingMetadata metadata(peer_pid,
@@ -336,9 +339,10 @@
 }
 
 void UnwindingWorker::PostDisconnectSocket(pid_t pid) {
-  // We do not need to use a WeakPtr here because the TaskRunner gets Quit-ed
-  // before this object get destructed.
-  task_runner_->PostTask([this, pid] { HandleDisconnectSocket(pid); });
+  // We do not need to use a WeakPtr here because the task runner will not
+  // outlive its UnwindingWorker.
+  thread_task_runner_.get()->PostTask(
+      [this, pid] { HandleDisconnectSocket(pid); });
 }
 
 void UnwindingWorker::HandleDisconnectSocket(pid_t pid) {
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index 8bff66e..bcd37cd 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -28,6 +28,7 @@
 #endif
 
 #include "perfetto/base/scoped_file.h"
+#include "perfetto/base/thread_task_runner.h"
 #include "perfetto/tracing/core/basic_types.h"
 #include "src/profiling/memory/bookkeeping.h"
 #include "src/profiling/memory/queue_messages.h"
@@ -146,8 +147,9 @@
     SharedRingBuffer shmem;
   };
 
-  UnwindingWorker(Delegate* delegate, base::TaskRunner* task_runner)
-      : delegate_(delegate), task_runner_(task_runner) {}
+  UnwindingWorker(Delegate* delegate, base::ThreadTaskRunner thread_task_runner)
+      : delegate_(delegate),
+        thread_task_runner_(std::move(thread_task_runner)) {}
 
   // Public API safe to call from other threads.
   void PostDisconnectSocket(pid_t pid);
@@ -162,11 +164,13 @@
   }
   void OnDataAvailable(base::UnixSocket* self) override;
 
- public:  // public for fuzzing/testing
-  void HandleBuffer(SharedRingBuffer::Buffer* buf,
-                    UnwindingMetadata* unwinding_metadata,
-                    DataSourceInstanceID data_source_instance_id,
-                    pid_t peer_pid);
+ public:
+  // static and public for testing/fuzzing
+  static void HandleBuffer(SharedRingBuffer::Buffer* buf,
+                           UnwindingMetadata* unwinding_metadata,
+                           DataSourceInstanceID data_source_instance_id,
+                           pid_t peer_pid,
+                           Delegate* delegate);
 
  private:
   void HandleHandoffSocket(HandoffData data);
@@ -181,7 +185,8 @@
 
   std::map<pid_t, ClientData> client_data_;
   Delegate* delegate_;
-  base::TaskRunner* task_runner_;
+  // Task runner with a dedicated thread.
+  base::ThreadTaskRunner thread_task_runner_;
 };
 
 }  // namespace profiling
diff --git a/src/profiling/memory/unwinding_fuzzer.cc b/src/profiling/memory/unwinding_fuzzer.cc
index 06bbfa8..20a9357 100644
--- a/src/profiling/memory/unwinding_fuzzer.cc
+++ b/src/profiling/memory/unwinding_fuzzer.cc
@@ -19,7 +19,6 @@
 
 #include "perfetto/base/utils.h"
 #include "perfetto/tracing/core/basic_types.h"
-#include "src/base/test/test_task_runner.h"
 #include "src/profiling/memory/queue_messages.h"
 #include "src/profiling/memory/shared_ring_buffer.h"
 #include "src/profiling/memory/unwinding.h"
@@ -35,10 +34,6 @@
 };
 
 int FuzzUnwinding(const uint8_t* data, size_t size) {
-  NopDelegate nop_delegate;
-  base::TestTaskRunner task_runner;
-  UnwindingWorker worker(&nop_delegate, &task_runner);
-
   SharedRingBuffer::Buffer buf(const_cast<uint8_t*>(data), size);
 
   pid_t self_pid = getpid();
@@ -47,7 +42,8 @@
                              base::OpenFile("/proc/self/maps", O_RDONLY),
                              base::OpenFile("/proc/self/mem", O_RDONLY));
 
-  worker.HandleBuffer(&buf, &metadata, id, self_pid);
+  NopDelegate nop_delegate;
+  UnwindingWorker::HandleBuffer(&buf, &metadata, id, self_pid, &nop_delegate);
   return 0;
 }