introduce ThreadTaskRunner and use it in heapprofd's UnwindingWorker
As discussed last week, proposing a wrapper object that owns a UnixTaskRunner & its task thread.
For the review, please focus on the following:
* that the writeup of the thread-checking considerations in unix_task_runner.h is accurate
* that the thread_task_runner implementation itself is sane
* that its use in heapprofd is reasonable
I have no strong opinions on the following, and simply chose one option for the initial review:
* Naming of the new utility - ThreadUnixTaskRunner would more accurately reflect that this isn't
cross-platform?
* Whether it belongs in base::, or should be in profiling::memory:: for now.
We can slot this into consumer_api.cc (which does a very similar handshake) in a followup cl,
so I thought base:: might be a reasonable place.
Note: with this change, heapprofd works in debug builds e2e afaict.
Change-Id: Ia12c89c6963bf0a659bca7994f4fe87aa3bfde58
diff --git a/Android.bp b/Android.bp
index f780212..256e440 100644
--- a/Android.bp
+++ b/Android.bp
@@ -47,6 +47,7 @@
"src/base/string_view.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
+ "src/base/thread_task_runner.cc",
"src/base/time.cc",
"src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
@@ -162,6 +163,7 @@
"src/base/string_view.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
+ "src/base/thread_task_runner.cc",
"src/base/time.cc",
"src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
@@ -228,6 +230,7 @@
"src/base/string_view.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
+ "src/base/thread_task_runner.cc",
"src/base/time.cc",
"src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
@@ -406,6 +409,7 @@
"src/base/string_view.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
+ "src/base/thread_task_runner.cc",
"src/base/time.cc",
"src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
@@ -576,6 +580,7 @@
"src/base/test/utils.cc",
"src/base/test/vm_test_utils.cc",
"src/base/thread_checker.cc",
+ "src/base/thread_task_runner.cc",
"src/base/time.cc",
"src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
@@ -2581,6 +2586,7 @@
"src/base/string_view.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
+ "src/base/thread_task_runner.cc",
"src/base/time.cc",
"src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
@@ -2827,6 +2833,8 @@
"src/base/test/vm_test_utils.cc",
"src/base/thread_checker.cc",
"src/base/thread_checker_unittest.cc",
+ "src/base/thread_task_runner.cc",
+ "src/base/thread_task_runner_unittest.cc",
"src/base/time.cc",
"src/base/time_unittest.cc",
"src/base/unix_socket.cc",
@@ -3092,6 +3100,7 @@
"src/base/string_view.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
+ "src/base/thread_task_runner.cc",
"src/base/time.cc",
"src/base/unix_task_runner.cc",
"src/base/virtual_destructors.cc",
diff --git a/include/perfetto/base/BUILD.gn b/include/perfetto/base/BUILD.gn
index fd5b869..bd45200 100644
--- a/include/perfetto/base/BUILD.gn
+++ b/include/perfetto/base/BUILD.gn
@@ -38,6 +38,7 @@
"task_runner.h",
"temp_file.h",
"thread_checker.h",
+ "thread_task_runner.h",
"thread_utils.h",
"time.h",
"unix_task_runner.h",
diff --git a/include/perfetto/base/thread_task_runner.h b/include/perfetto/base/thread_task_runner.h
new file mode 100644
index 0000000..7992eeb
--- /dev/null
+++ b/include/perfetto/base/thread_task_runner.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_BASE_THREAD_TASK_RUNNER_H_
+#define INCLUDE_PERFETTO_BASE_THREAD_TASK_RUNNER_H_
+
+#include <functional>
+#include <thread>
+
+#include "perfetto/base/unix_task_runner.h"
+
+namespace perfetto {
+namespace base {
+
+// A UnixTaskRunner backed by a dedicated task thread. Shuts down the runner and
+// joins the thread upon destruction. Can be moved to transfer ownership.
+//
+// Guarantees that:
+// * the UnixTaskRunner will be constructed and destructed on the task thread.
+// * the task thread will live for the lifetime of the UnixTaskRunner.
+//
+class ThreadTaskRunner {
+ public:
+ static ThreadTaskRunner CreateAndStart() { return ThreadTaskRunner(); }
+
+ ThreadTaskRunner(const ThreadTaskRunner&) = delete;
+ ThreadTaskRunner& operator=(const ThreadTaskRunner&) = delete;
+
+ ThreadTaskRunner(ThreadTaskRunner&&) noexcept;
+ ThreadTaskRunner& operator=(ThreadTaskRunner&&);
+ ~ThreadTaskRunner();
+
+ // Returns a pointer to the UnixTaskRunner, which is valid for the lifetime of
+ // this ThreadTaskRunner object (unless this object is moved-from, in which
+ // case the pointer remains valid for the lifetime of the new owning
+ // ThreadTaskRunner).
+ //
+ // Warning: do not call Quit() on the returned runner pointer, the termination
+ // should be handled exclusively by this class' destructor.
+ UnixTaskRunner* get() { return task_runner_; }
+
+ private:
+ ThreadTaskRunner();
+ void RunTaskThread(std::function<void(UnixTaskRunner*)> initializer);
+
+ std::thread thread_;
+ UnixTaskRunner* task_runner_ = nullptr;
+};
+
+} // namespace base
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_BASE_THREAD_TASK_RUNNER_H_
diff --git a/include/perfetto/base/unix_task_runner.h b/include/perfetto/base/unix_task_runner.h
index 47fecfc..98c9c7e 100644
--- a/include/perfetto/base/unix_task_runner.h
+++ b/include/perfetto/base/unix_task_runner.h
@@ -36,6 +36,20 @@
namespace base {
// Runs a task runner on the current thread.
+//
+// Implementation note: we currently assume (and enforce in debug builds) that
+// Run() is called from the thread that constructed the UnixTaskRunner. This is
+// not strictly necessary, and we could instead track the thread that invokes
+// Run(). However, a related property that *might* be important to enforce is
+// that the destructor runs on the task-running thread. Otherwise, if there are
+// still-pending tasks at the time of destruction, we would destroy those
+// outside of the task thread (which might be unexpected to the caller). On the
+// other hand, the std::function task interface discourages use of any
+// resource-owning tasks (as the callable needs to be copyable), so this might
+// not be important in practice.
+//
+// TODO(rsavitski): consider adding a thread-check in the destructor, after
+// auditing existing usages.
class UnixTaskRunner : public TaskRunner {
public:
UnixTaskRunner();
@@ -57,6 +71,11 @@
void RemoveFileDescriptorWatch(int fd) override;
bool RunsTasksOnCurrentThread() const override;
+ // Returns true if the task runner is quitting, or has quit and hasn't been
+ // restarted since. Exposed primarily for ThreadTaskRunner, not necessary for
+ // normal use of this class.
+ bool QuitCalled();
+
private:
void WakeUp();
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 25f3691..447a9b1 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -41,6 +41,7 @@
"event.cc",
"pipe.cc",
"temp_file.cc",
+ "thread_task_runner.cc",
"unix_task_runner.cc",
]
}
@@ -157,6 +158,7 @@
"task_runner_unittest.cc",
"temp_file_unittest.cc",
"thread_checker_unittest.cc",
+ "thread_task_runner_unittest.cc",
"utils_unittest.cc",
]
}
diff --git a/src/base/thread_task_runner.cc b/src/base/thread_task_runner.cc
new file mode 100644
index 0000000..a545e6b
--- /dev/null
+++ b/src/base/thread_task_runner.cc
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/thread_task_runner.h"
+
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <thread>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/unix_task_runner.h"
+
+namespace perfetto {
+namespace base {
+
+ThreadTaskRunner::ThreadTaskRunner(ThreadTaskRunner&& other) noexcept
+ : thread_(std::move(other.thread_)), task_runner_(other.task_runner_) {
+ other.task_runner_ = nullptr;
+}
+
+ThreadTaskRunner& ThreadTaskRunner::operator=(ThreadTaskRunner&& other) {
+ this->~ThreadTaskRunner();
+ new (this) ThreadTaskRunner(std::move(other));
+ return *this;
+}
+
+ThreadTaskRunner::~ThreadTaskRunner() {
+ if (task_runner_) {
+ PERFETTO_CHECK(!task_runner_->QuitCalled());
+ task_runner_->Quit();
+
+ PERFETTO_DCHECK(thread_.joinable());
+ }
+ if (thread_.joinable())
+ thread_.join();
+}
+
+ThreadTaskRunner::ThreadTaskRunner() {
+ std::mutex init_lock;
+ std::condition_variable init_cv;
+
+ std::function<void(UnixTaskRunner*)> initializer =
+ [this, &init_lock, &init_cv](UnixTaskRunner* task_runner) {
+ std::lock_guard<std::mutex> lock(init_lock);
+ task_runner_ = task_runner;
+ // Notify while still holding the lock, as init_cv ceases to exist as
+ // soon as the main thread observes a non-null task_runner_, and it can
+ // wake up spuriously (i.e. before the notify if we had unlocked before
+ // notifying).
+ init_cv.notify_one();
+ };
+ thread_ = std::thread(&ThreadTaskRunner::RunTaskThread, this,
+ std::move(initializer));
+
+ std::unique_lock<std::mutex> lock(init_lock);
+ init_cv.wait(lock, [this] { return !!task_runner_; });
+}
+
+void ThreadTaskRunner::RunTaskThread(
+ std::function<void(UnixTaskRunner*)> initializer) {
+ UnixTaskRunner task_runner;
+ task_runner.PostTask(std::bind(std::move(initializer), &task_runner));
+ task_runner.Run();
+}
+
+} // namespace base
+} // namespace perfetto
diff --git a/src/base/thread_task_runner_unittest.cc b/src/base/thread_task_runner_unittest.cc
new file mode 100644
index 0000000..823967b
--- /dev/null
+++ b/src/base/thread_task_runner_unittest.cc
@@ -0,0 +1,144 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/thread_task_runner.h"
+
+#include <thread>
+
+#include "gtest/gtest.h"
+#include "perfetto/base/thread_checker.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+class ThreadTaskRunnerTest : public ::testing::Test {
+ protected:
+ std::atomic<bool> atomic_flag_{false};
+};
+
+TEST_F(ThreadTaskRunnerTest, ConstructedRunning) {
+ ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+ task_runner.get()->PostTask([this] { atomic_flag_ = true; });
+ // main thread not blocked, wait on the task explicitly
+ while (!atomic_flag_) {
+ std::this_thread::yield();
+ }
+}
+
+TEST_F(ThreadTaskRunnerTest, RunsTasksOnOneDedicatedThread) {
+ ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+ EXPECT_FALSE(task_runner.get()->RunsTasksOnCurrentThread());
+
+ ThreadChecker thread_checker;
+ task_runner.get()->PostTask([&thread_checker] {
+ // make thread_checker track the task thread
+ thread_checker.DetachFromThread();
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
+ });
+ task_runner.get()->PostTask([this, &thread_checker] {
+ // called on the same thread
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
+ atomic_flag_ = true;
+ });
+
+ while (!atomic_flag_) {
+ std::this_thread::yield();
+ }
+}
+
+TEST_F(ThreadTaskRunnerTest, MovableOwnership) {
+ ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+ UnixTaskRunner* runner_ptr = task_runner.get();
+ EXPECT_NE(runner_ptr, nullptr);
+
+ ThreadChecker thread_checker;
+ task_runner.get()->PostTask([&thread_checker] {
+ // make thread_checker track the task thread
+ thread_checker.DetachFromThread();
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
+ });
+
+ // move ownership and destroy old instance
+ ThreadTaskRunner task_runner2 = std::move(task_runner);
+ EXPECT_EQ(task_runner.get(), nullptr);
+ task_runner.~ThreadTaskRunner();
+
+ // runner pointer is stable, and remains usable
+ EXPECT_EQ(task_runner2.get(), runner_ptr);
+ task_runner2.get()->PostTask([this, &thread_checker] {
+ // task thread remains the same
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
+ atomic_flag_ = true;
+ });
+
+ while (!atomic_flag_) {
+ std::this_thread::yield();
+ }
+}
+
+// Test helper callable that remembers a copy of a given ThreadChecker, and
+// checks that this class' destructor runs on the remembered thread. Note that
+// it is copyable so that it can be passed as a task (i.e. std::function) to a
+// task runner. Also note that all instances of this class will thread-check,
+// including those that have been moved-from.
+class DestructorThreadChecker {
+ public:
+ DestructorThreadChecker(ThreadChecker checker) : checker_(checker) {}
+ DestructorThreadChecker(const DestructorThreadChecker&) = default;
+ DestructorThreadChecker& operator=(const DestructorThreadChecker&) = default;
+ DestructorThreadChecker(DestructorThreadChecker&&) = default;
+ DestructorThreadChecker& operator=(DestructorThreadChecker&&) = default;
+
+ ~DestructorThreadChecker() { EXPECT_TRUE(checker_.CalledOnValidThread()); }
+
+ void operator()() { GTEST_FAIL() << "shouldn't be called"; }
+
+ ThreadChecker checker_;
+};
+
+// Checks that the still-pending tasks (and therefore the UnixTaskRunner itself)
+// are destructed on the task thread, and not the thread that destroys the
+// ThreadTaskRunner.
+TEST_F(ThreadTaskRunnerTest, EnqueuedTasksDestructedOnTaskThread) {
+ ThreadChecker thread_checker;
+ ThreadTaskRunner task_runner = ThreadTaskRunner::CreateAndStart();
+
+ task_runner.get()->PostTask([this, &thread_checker, &task_runner] {
+ // make thread_checker track the task thread
+ thread_checker.DetachFromThread();
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
+ // Post a follow-up delayed task and unblock the main thread, which will
+ // destroy the ThreadTaskRunner while this task is still pending.
+ //
+ // Note: DestructorThreadChecker will thread-check (at least) twice:
+ // * for the temporary that was moved-from to construct the task
+ // std::function. Will pass as we're posting from a task thread.
+ // * for the still-pending task once the ThreadTaskRunner destruction causes
+ // the destruction of UnixTaskRunner.
+ task_runner.get()->PostDelayedTask(DestructorThreadChecker(thread_checker),
+ 100 * 1000 /*ms*/);
+ atomic_flag_ = true;
+ });
+
+ while (!atomic_flag_) {
+ std::this_thread::yield();
+ }
+}
+
+} // namespace
+} // namespace base
+} // namespace perfetto
diff --git a/src/base/unix_task_runner.cc b/src/base/unix_task_runner.cc
index 937a4b1..03a1935 100644
--- a/src/base/unix_task_runner.cc
+++ b/src/base/unix_task_runner.cc
@@ -72,6 +72,11 @@
WakeUp();
}
+bool UnixTaskRunner::QuitCalled() {
+ std::lock_guard<std::mutex> lock(lock_);
+ return quit_;
+}
+
bool UnixTaskRunner::IsIdleForTesting() {
std::lock_guard<std::mutex> lock(lock_);
return immediate_tasks_.empty();
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 8853317..728d395 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -216,7 +216,6 @@
":wire_protocol",
"../../../gn:default_deps",
"../../base",
- "../../base:test_support",
"../../tracing",
]
}
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
index 434d3cc..894181d 100644
--- a/src/profiling/memory/heapprofd_producer.cc
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -24,6 +24,7 @@
#include "perfetto/base/file_utils.h"
#include "perfetto/base/string_utils.h"
+#include "perfetto/base/thread_task_runner.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/data_source_descriptor.h"
#include "perfetto/tracing/core/trace_writer.h"
@@ -50,6 +51,15 @@
return client_config;
}
+std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
+ size_t n) {
+ std::vector<UnwindingWorker> ret;
+ for (size_t i = 0; i < n; ++i) {
+ ret.emplace_back(delegate, base::ThreadTaskRunner::CreateAndStart());
+ }
+ return ret;
+}
+
} // namespace
// We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
@@ -59,9 +69,7 @@
base::TaskRunner* task_runner)
: mode_(mode),
task_runner_(task_runner),
- unwinding_task_runners_(kUnwinderThreads),
- unwinding_threads_(MakeUnwindingThreads(kUnwinderThreads)),
- unwinding_workers_(MakeUnwindingWorkers(kUnwinderThreads)),
+ unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)),
target_pid_(base::kInvalidPid),
socket_delegate_(this),
weak_factory_(this) {
@@ -71,10 +79,6 @@
}
HeapprofdProducer::~HeapprofdProducer() {
- for (auto& task_runner : unwinding_task_runners_)
- task_runner.Quit();
- for (std::thread& th : unwinding_threads_)
- th.join();
// We only borrowed this from the environment variable.
// UnixSocket always owns the socket, so we need to manually release it
// here.
@@ -313,7 +317,8 @@
bool has_flush_id) {
auto it = data_sources_.find(id);
if (it == data_sources_.end()) {
- PERFETTO_LOG("Invalid data source.");
+ PERFETTO_LOG(
+ "Data source not found (harmless if using continuous_dump_config).");
return false;
}
DataSource& data_source = it->second;
@@ -381,22 +386,6 @@
}
}
-std::vector<std::thread> HeapprofdProducer::MakeUnwindingThreads(size_t n) {
- std::vector<std::thread> ret;
- for (size_t i = 0; i < n; ++i) {
- ret.emplace_back([this, i] { unwinding_task_runners_[i].Run(); });
- }
- return ret;
-}
-
-std::vector<UnwindingWorker> HeapprofdProducer::MakeUnwindingWorkers(size_t n) {
- std::vector<UnwindingWorker> ret;
- for (size_t i = 0; i < n; ++i) {
- ret.emplace_back(this, &unwinding_task_runners_[i]);
- }
- return ret;
-}
-
std::unique_ptr<base::UnixSocket> HeapprofdProducer::MakeListeningSocket() {
const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
if (sock_fd == nullptr) {
diff --git a/src/profiling/memory/heapprofd_producer.h b/src/profiling/memory/heapprofd_producer.h
index 5c8ae66..a0aef55 100644
--- a/src/profiling/memory/heapprofd_producer.h
+++ b/src/profiling/memory/heapprofd_producer.h
@@ -125,9 +125,6 @@
const HeapprofdMode mode_;
- std::vector<std::thread> MakeUnwindingThreads(size_t n);
- std::vector<UnwindingWorker> MakeUnwindingWorkers(size_t n);
-
void FinishDataSourceFlush(FlushRequestID flush_id);
bool Dump(DataSourceInstanceID id,
FlushRequestID flush_id,
@@ -177,11 +174,6 @@
// of them were dropped.
uint64_t next_index_ = 0;
- // These are not fields in UnwinderThread as the task runner is not movable
- // and that makes UnwinderThread very unwieldy objects (e.g. we cannot
- // emplace_back into a vector as that requires movability.)
- std::vector<base::UnixTaskRunner> unwinding_task_runners_;
- std::vector<std::thread> unwinding_threads_; // Only for ownership.
std::vector<UnwindingWorker> unwinding_workers_;
// state specific to mode_ == kCentral
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index dd6c7e1..89d50f9 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -49,6 +49,7 @@
#include "perfetto/base/scoped_file.h"
#include "perfetto/base/string_utils.h"
#include "perfetto/base/task_runner.h"
+#include "perfetto/base/thread_task_runner.h"
#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
@@ -268,15 +269,17 @@
break;
HandleBuffer(&buf, &socket_data.metadata,
socket_data.data_source_instance_id,
- socket_data.sock->peer_pid());
+ socket_data.sock->peer_pid(), delegate_);
shmem.EndRead(std::move(buf));
}
}
+// static
void UnwindingWorker::HandleBuffer(SharedRingBuffer::Buffer* buf,
UnwindingMetadata* unwinding_metadata,
DataSourceInstanceID data_source_instance_id,
- pid_t peer_pid) {
+ pid_t peer_pid,
+ Delegate* delegate) {
WireMessage msg;
// TODO(fmayer): standardise on char* or uint8_t*.
// char* has stronger guarantees regarding aliasing.
@@ -293,14 +296,14 @@
rec.pid = peer_pid;
rec.data_source_instance_id = data_source_instance_id;
DoUnwind(&msg, unwinding_metadata, &rec);
- delegate_->PostAllocRecord(std::move(rec));
+ delegate->PostAllocRecord(std::move(rec));
} else if (msg.record_type == RecordType::Free) {
FreeRecord rec;
rec.pid = peer_pid;
rec.data_source_instance_id = data_source_instance_id;
// We need to copy this, so we can return the memory to the shmem buffer.
memcpy(&rec.metadata, msg.free_header, sizeof(*msg.free_header));
- delegate_->PostFreeRecord(std::move(rec));
+ delegate->PostFreeRecord(std::move(rec));
} else {
PERFETTO_DFATAL("Invalid record type.");
}
@@ -310,9 +313,9 @@
// Even with C++14, this cannot be moved, as std::function has to be
// copyable, which HandoffData is not.
HandoffData* raw_data = new HandoffData(std::move(handoff_data));
- // We do not need to use a WeakPtr here because the TaskRunner gets Quit-ed
- // before this object get destructed.
- task_runner_->PostTask([this, raw_data] {
+ // We do not need to use a WeakPtr here because the task runner will not
+ // outlive its UnwindingWorker.
+ thread_task_runner_.get()->PostTask([this, raw_data] {
HandoffData data = std::move(*raw_data);
delete raw_data;
HandleHandoffSocket(std::move(data));
@@ -320,9 +323,9 @@
}
void UnwindingWorker::HandleHandoffSocket(HandoffData handoff_data) {
- auto sock = base::UnixSocket::AdoptConnected(handoff_data.sock.ReleaseFd(),
- this, this->task_runner_,
- base::SockType::kStream);
+ auto sock = base::UnixSocket::AdoptConnected(
+ handoff_data.sock.ReleaseFd(), this, this->thread_task_runner_.get(),
+ base::SockType::kStream);
pid_t peer_pid = sock->peer_pid();
UnwindingMetadata metadata(peer_pid,
@@ -336,9 +339,10 @@
}
void UnwindingWorker::PostDisconnectSocket(pid_t pid) {
- // We do not need to use a WeakPtr here because the TaskRunner gets Quit-ed
- // before this object get destructed.
- task_runner_->PostTask([this, pid] { HandleDisconnectSocket(pid); });
+ // We do not need to use a WeakPtr here because the task runner will not
+ // outlive its UnwindingWorker.
+ thread_task_runner_.get()->PostTask(
+ [this, pid] { HandleDisconnectSocket(pid); });
}
void UnwindingWorker::HandleDisconnectSocket(pid_t pid) {
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index 8bff66e..bcd37cd 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -28,6 +28,7 @@
#endif
#include "perfetto/base/scoped_file.h"
+#include "perfetto/base/thread_task_runner.h"
#include "perfetto/tracing/core/basic_types.h"
#include "src/profiling/memory/bookkeeping.h"
#include "src/profiling/memory/queue_messages.h"
@@ -146,8 +147,9 @@
SharedRingBuffer shmem;
};
- UnwindingWorker(Delegate* delegate, base::TaskRunner* task_runner)
- : delegate_(delegate), task_runner_(task_runner) {}
+ UnwindingWorker(Delegate* delegate, base::ThreadTaskRunner thread_task_runner)
+ : delegate_(delegate),
+ thread_task_runner_(std::move(thread_task_runner)) {}
// Public API safe to call from other threads.
void PostDisconnectSocket(pid_t pid);
@@ -162,11 +164,13 @@
}
void OnDataAvailable(base::UnixSocket* self) override;
- public: // public for fuzzing/testing
- void HandleBuffer(SharedRingBuffer::Buffer* buf,
- UnwindingMetadata* unwinding_metadata,
- DataSourceInstanceID data_source_instance_id,
- pid_t peer_pid);
+ public:
+ // static and public for testing/fuzzing
+ static void HandleBuffer(SharedRingBuffer::Buffer* buf,
+ UnwindingMetadata* unwinding_metadata,
+ DataSourceInstanceID data_source_instance_id,
+ pid_t peer_pid,
+ Delegate* delegate);
private:
void HandleHandoffSocket(HandoffData data);
@@ -181,7 +185,8 @@
std::map<pid_t, ClientData> client_data_;
Delegate* delegate_;
- base::TaskRunner* task_runner_;
+ // Task runner with a dedicated thread.
+ base::ThreadTaskRunner thread_task_runner_;
};
} // namespace profiling
diff --git a/src/profiling/memory/unwinding_fuzzer.cc b/src/profiling/memory/unwinding_fuzzer.cc
index 06bbfa8..20a9357 100644
--- a/src/profiling/memory/unwinding_fuzzer.cc
+++ b/src/profiling/memory/unwinding_fuzzer.cc
@@ -19,7 +19,6 @@
#include "perfetto/base/utils.h"
#include "perfetto/tracing/core/basic_types.h"
-#include "src/base/test/test_task_runner.h"
#include "src/profiling/memory/queue_messages.h"
#include "src/profiling/memory/shared_ring_buffer.h"
#include "src/profiling/memory/unwinding.h"
@@ -35,10 +34,6 @@
};
int FuzzUnwinding(const uint8_t* data, size_t size) {
- NopDelegate nop_delegate;
- base::TestTaskRunner task_runner;
- UnwindingWorker worker(&nop_delegate, &task_runner);
-
SharedRingBuffer::Buffer buf(const_cast<uint8_t*>(data), size);
pid_t self_pid = getpid();
@@ -47,7 +42,8 @@
base::OpenFile("/proc/self/maps", O_RDONLY),
base::OpenFile("/proc/self/mem", O_RDONLY));
- worker.HandleBuffer(&buf, &metadata, id, self_pid);
+ NopDelegate nop_delegate;
+ UnwindingWorker::HandleBuffer(&buf, &metadata, id, self_pid, &nop_delegate);
return 0;
}