profiling: Add heapprofd producer.
Test: m
Test: flashed sailfish
Test: profile adbd
Change-Id: I2780570469cfa7dd2533dffa8cc810cb8914d118
diff --git a/Android.bp b/Android.bp
index 51dfcc0..811700e 100644
--- a/Android.bp
+++ b/Android.bp
@@ -52,6 +52,7 @@
"src/ipc/service_proxy.cc",
"src/ipc/virtual_destructors.cc",
"src/profiling/memory/bookkeeping.cc",
+ "src/profiling/memory/heapprofd_producer.cc",
"src/profiling/memory/main.cc",
"src/profiling/memory/record_reader.cc",
"src/profiling/memory/socket_listener.cc",
@@ -4393,6 +4394,7 @@
"src/profiling/memory/client.cc",
"src/profiling/memory/client_unittest.cc",
"src/profiling/memory/heapprofd_integrationtest.cc",
+ "src/profiling/memory/heapprofd_producer.cc",
"src/profiling/memory/record_reader.cc",
"src/profiling/memory/record_reader_unittest.cc",
"src/profiling/memory/sampler.cc",
diff --git a/heapprofd.rc b/heapprofd.rc
index 4791103..10bc845 100644
--- a/heapprofd.rc
+++ b/heapprofd.rc
@@ -18,3 +18,4 @@
user nobody
group nobody
writepid /dev/cpuset/system-background/tasks
+ capabilities KILL
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 44fbf2f..8fc7552 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -45,10 +45,14 @@
"../../../gn:default_deps",
"../../base",
"../../base:unix_socket",
+ "../../tracing",
+ "../../tracing:ipc",
]
sources = [
"bookkeeping.cc",
"bookkeeping.h",
+ "heapprofd_producer.cc",
+ "heapprofd_producer.h",
"queue_messages.h",
"record_reader.cc",
"record_reader.h",
diff --git a/src/profiling/memory/bookkeeping.cc b/src/profiling/memory/bookkeeping.cc
index 5b8c280..51e0d30 100644
--- a/src/profiling/memory/bookkeeping.cc
+++ b/src/profiling/memory/bookkeeping.cc
@@ -173,9 +173,16 @@
}
if (rec->record_type == BookkeepingRecord::Type::Dump) {
+ DumpRecord& dump_rec = rec->dump_record;
+ std::shared_ptr<TraceWriter> trace_writer = dump_rec.trace_writer.lock();
+ if (!trace_writer)
+ return;
PERFETTO_LOG("Dumping heaps");
- auto it = bookkeeping_data_.begin();
- while (it != bookkeeping_data_.end()) {
+ for (const pid_t pid : dump_rec.pids) {
+ auto it = bookkeeping_data_.find(pid);
+ if (it == bookkeeping_data_.end())
+ continue;
+
std::string dump_file_name = file_name_ + "." + std::to_string(it->first);
PERFETTO_LOG("Dumping %d to %s", it->first, dump_file_name.c_str());
base::ScopedFile fd =
@@ -188,10 +195,9 @@
if (it->second.ref_count == 0) {
std::lock_guard<std::mutex> l(bookkeeping_mutex_);
it = bookkeeping_data_.erase(it);
- } else {
- ++it;
}
}
+ dump_rec.callback();
} else if (rec->record_type == BookkeepingRecord::Type::Free) {
FreeRecord& free_rec = rec->free_record;
FreePageEntry* entries = free_rec.metadata->entries;
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
index 5850d1f..44e4292 100644
--- a/src/profiling/memory/heapprofd_integrationtest.cc
+++ b/src/profiling/memory/heapprofd_integrationtest.cc
@@ -59,7 +59,6 @@
auto done = task_runner.CreateCheckpoint("done");
constexpr uint64_t kSamplingInterval = 123;
SocketListener listener(
- {kSamplingInterval},
[&done, &bookkeeping_thread](UnwindingRecord r) {
// TODO(fmayer): Test symbolization and result of unwinding.
// This check will only work on in-tree builds as out-of-tree
@@ -71,6 +70,7 @@
},
&bookkeeping_thread);
+ listener.ExpectPID(getpid(), {kSamplingInterval});
auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
if (!sock->is_listening()) {
PERFETTO_ELOG("Socket not listening.");
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
new file mode 100644
index 0000000..912c8c1
--- /dev/null
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -0,0 +1,415 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/heapprofd_producer.h"
+
+#include <inttypes.h>
+#include <signal.h>
+#include <sys/types.h>
+
+#include "perfetto/tracing/core/data_source_config.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "perfetto/tracing/ipc/producer_ipc_client.h"
+
+namespace perfetto {
+namespace profiling {
+namespace {
+constexpr char kHeapprofdDataSource[] = "android.heapprofd";
+constexpr size_t kUnwinderQueueSize = 1000;
+constexpr size_t kBookkeepingQueueSize = 1000;
+constexpr size_t kUnwinderThreads = 5;
+constexpr const char* kDumpOutput = "/data/misc/perfetto-traces/heap_dump";
+constexpr int kHeapprofdSignal = 36;
+
+constexpr uint32_t kInitialConnectionBackoffMs = 100;
+constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
+
+ClientConfiguration MakeClientConfiguration(const DataSourceConfig& cfg) {
+ ClientConfiguration client_config;
+ client_config.interval = cfg.heapprofd_config().sampling_interval_bytes();
+ return client_config;
+}
+
+void FindPidsForBinaries(std::vector<std::string> binaries,
+ std::vector<pid_t>* pids) {
+ base::ScopedDir proc_dir(opendir("/proc"));
+ if (!proc_dir) {
+ PERFETTO_DFATAL("Failed to open /proc");
+ return;
+ }
+ struct dirent* entry;
+ while ((entry = readdir(*proc_dir))) {
+ char* end;
+ long int pid = strtol(entry->d_name, &end, 10);
+ if (*end != '\0') {
+ continue;
+ }
+
+ char link_buf[128];
+ char binary_buf[128];
+
+ if (snprintf(link_buf, sizeof(link_buf), "/proc/%lu/exe", pid) < 0) {
+ PERFETTO_DFATAL("Failed to create exe filename for %lu", pid);
+ continue;
+ }
+ ssize_t link_size = readlink(link_buf, binary_buf, sizeof(binary_buf));
+ if (link_size < 0) {
+ continue;
+ }
+ if (link_size == sizeof(binary_buf)) {
+ PERFETTO_DFATAL("Potential overflow in binary name.");
+ continue;
+ }
+ binary_buf[link_size] = '\0';
+ for (const std::string& binary : binaries) {
+ if (binary == binary_buf)
+ pids->emplace_back(static_cast<pid_t>(pid));
+ }
+ }
+}
+
+} // namespace
+
+// We create kUnwinderThreads unwinding threads and one bookeeping thread.
+// The bookkeeping thread is singleton in order to avoid expensive and
+// complicated synchronisation in the bookkeeping.
+//
+// We wire up the system by creating BoundedQueues between the threads. The main
+// thread runs the TaskRunner driving the SocketListener. The unwinding thread
+// takes the data received by the SocketListener and if it is a malloc does
+// stack unwinding, and if it is a free just forwards the content of the record
+// to the bookkeeping thread.
+//
+// +--------------+
+// |SocketListener|
+// +------+-------+
+// |
+// +--UnwindingRecord -+
+// | |
+// +--------v-------+ +-------v--------+
+// |Unwinding Thread| |Unwinding Thread|
+// +--------+-------+ +-------+--------+
+// | |
+// +-BookkeepingRecord +
+// |
+// +--------v---------+
+// |Bookkeeping Thread|
+// +------------------+
+
+HeapprofdProducer::HeapprofdProducer(base::TaskRunner* task_runner)
+ : task_runner_(task_runner),
+ bookkeeping_queue_(kBookkeepingQueueSize),
+ bookkeeping_thread_(kDumpOutput),
+ bookkeeping_th_([this] { bookkeeping_thread_.Run(&bookkeeping_queue_); }),
+ unwinder_queues_(MakeUnwinderQueues(kUnwinderThreads)),
+ unwinding_threads_(MakeUnwindingThreads(kUnwinderThreads)),
+ socket_listener_(MakeSocketListenerCallback(), &bookkeeping_thread_),
+ socket_(MakeSocket()),
+ weak_factory_(this) {}
+
+HeapprofdProducer::~HeapprofdProducer() {
+ bookkeeping_queue_.Shutdown();
+ for (auto& queue : unwinder_queues_) {
+ queue.Shutdown();
+ }
+ bookkeeping_th_.join();
+ for (std::thread& th : unwinding_threads_) {
+ th.join();
+ }
+}
+
+void HeapprofdProducer::OnConnect() {
+ // TODO(fmayer): Delete once we have generic reconnect logic.
+ PERFETTO_DCHECK(state_ == kConnecting);
+ state_ = kConnected;
+ ResetConnectionBackoff();
+ PERFETTO_LOG("Connected to the service");
+
+ DataSourceDescriptor desc;
+ desc.set_name(kHeapprofdDataSource);
+ endpoint_->RegisterDataSource(desc);
+}
+
+// TODO(fmayer): Delete once we have generic reconnect logic.
+void HeapprofdProducer::OnDisconnect() {
+ PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
+ PERFETTO_LOG("Disconnected from tracing service");
+ if (state_ == kConnected)
+ return task_runner_->PostTask([this] { this->Restart(); });
+
+ state_ = kNotConnected;
+ IncreaseConnectionBackoff();
+ task_runner_->PostDelayedTask([this] { this->Connect(); },
+ connection_backoff_ms_);
+}
+
+void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
+ const DataSourceConfig& cfg) {
+ PERFETTO_DLOG("Setting up data source.");
+ if (cfg.name() != kHeapprofdDataSource) {
+ PERFETTO_DLOG("Invalid data source name.");
+ return;
+ }
+
+ auto it = data_sources_.find(id);
+ if (it != data_sources_.end()) {
+ PERFETTO_DFATAL("Received duplicated data source instance id: %" PRIu64,
+ id);
+ return;
+ }
+
+ DataSource data_source;
+
+ ClientConfiguration client_config = MakeClientConfiguration(cfg);
+
+ for (uint64_t pid : cfg.heapprofd_config().pid())
+ data_source.pids.emplace_back(static_cast<pid_t>(pid));
+
+ FindPidsForBinaries(cfg.heapprofd_config().native_binary_name(),
+ &data_source.pids);
+
+ auto pid_it = data_source.pids.begin();
+ while (pid_it != data_source.pids.end()) {
+ auto profiling_session = socket_listener_.ExpectPID(*pid_it, client_config);
+ if (!profiling_session) {
+ PERFETTO_DFATAL("No enabling duplicate profiling session for %d",
+ *pid_it);
+ pid_it = data_source.pids.erase(pid_it);
+ continue;
+ }
+ data_source.sessions.emplace_back(std::move(profiling_session));
+ ++pid_it;
+ }
+
+ if (data_source.pids.empty()) {
+ // TODO(fmayer): Whole system profiling.
+ PERFETTO_DLOG("No valid pids given. Not setting up data source.");
+ return;
+ }
+
+ auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
+ data_source.trace_writer = endpoint_->CreateTraceWriter(buffer_id);
+
+ data_sources_.emplace(id, std::move(data_source));
+ PERFETTO_DLOG("Set up data source.");
+}
+
+void HeapprofdProducer::DoContinuousDump(DataSourceInstanceID id,
+ uint32_t dump_interval) {
+ if (!Dump(id, 0 /* flush_id */, false /* is_flush */))
+ return;
+ auto weak_producer = weak_factory_.GetWeakPtr();
+ task_runner_->PostDelayedTask(
+ [weak_producer, id, dump_interval] {
+ if (!weak_producer)
+ return;
+ weak_producer->DoContinuousDump(id, dump_interval);
+ },
+ dump_interval);
+}
+
+void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
+ const DataSourceConfig& cfg) {
+ PERFETTO_DLOG("Start DataSource");
+ auto it = data_sources_.find(id);
+ if (it == data_sources_.end()) {
+ PERFETTO_DFATAL("Received invalid data source instance to start: %" PRIu64,
+ id);
+ return;
+ }
+
+ const DataSource& data_source = it->second;
+
+ for (pid_t pid : data_source.pids) {
+ PERFETTO_DLOG("Sending %d to %d", kHeapprofdSignal, pid);
+ if (kill(pid, kHeapprofdSignal) != 0) {
+ PERFETTO_DPLOG("kill");
+ }
+ }
+
+ const auto continuous_dump_config =
+ cfg.heapprofd_config().continuous_dump_config();
+ uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
+ if (dump_interval) {
+ auto weak_producer = weak_factory_.GetWeakPtr();
+ task_runner_->PostDelayedTask(
+ [weak_producer, id, dump_interval] {
+ if (!weak_producer)
+ return;
+ weak_producer->DoContinuousDump(id, dump_interval);
+ },
+ continuous_dump_config.dump_phase_ms());
+ }
+ PERFETTO_DLOG("Started DataSource");
+}
+
+void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
+ // DataSource holds ProfilingSession handles which on being destructed tear
+ // down the profiling on the client.
+ if (data_sources_.erase(id) != 1)
+ PERFETTO_DFATAL("Trying to stop non existing data source: %" PRIu64, id);
+}
+
+void HeapprofdProducer::OnTracingSetup() {}
+
+bool HeapprofdProducer::Dump(DataSourceInstanceID id,
+ FlushRequestID flush_id,
+ bool has_flush_id) {
+ PERFETTO_DLOG("Dumping %" PRIu64 ", flush: %d", id, has_flush_id);
+ auto it = data_sources_.find(id);
+ if (it == data_sources_.end()) {
+ return false;
+ }
+
+ const DataSource& data_source = it->second;
+ BookkeepingRecord record{};
+ record.record_type = BookkeepingRecord::Type::Dump;
+ DumpRecord& dump_record = record.dump_record;
+ dump_record.pids = data_source.pids;
+ dump_record.trace_writer = data_source.trace_writer;
+
+ auto weak_producer = weak_factory_.GetWeakPtr();
+ base::TaskRunner* task_runner = task_runner_;
+ if (has_flush_id) {
+ dump_record.callback = [task_runner, weak_producer, flush_id] {
+ task_runner->PostTask([weak_producer, flush_id] {
+ if (!weak_producer)
+ return weak_producer->FinishDataSourceFlush(flush_id);
+ });
+ };
+ } else {
+ dump_record.callback = [] {};
+ }
+
+ bookkeeping_queue_.Add(std::move(record));
+ return true;
+}
+
+void HeapprofdProducer::Flush(FlushRequestID flush_id,
+ const DataSourceInstanceID* ids,
+ size_t num_ids) {
+ if (num_ids == 0)
+ return;
+
+ size_t& flush_in_progress = flushes_in_progress_[flush_id];
+ PERFETTO_DCHECK(flush_in_progress == 0);
+ flush_in_progress = num_ids;
+ for (size_t i = 0; i < num_ids; ++i)
+ Dump(ids[i], flush_id, true);
+}
+
+void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
+ auto it = flushes_in_progress_.find(flush_id);
+ if (it == flushes_in_progress_.end()) {
+ PERFETTO_DFATAL("FinishDataSourceFlush id invalid: %" PRIu64, flush_id);
+ return;
+ }
+ size_t& flush_in_progress = it->second;
+ if (--flush_in_progress == 0) {
+ endpoint_->NotifyFlushComplete(flush_id);
+ flushes_in_progress_.erase(flush_id);
+ }
+}
+
+std::function<void(UnwindingRecord)>
+HeapprofdProducer::MakeSocketListenerCallback() {
+ return [this](UnwindingRecord record) {
+ unwinder_queues_[static_cast<size_t>(record.pid) % kUnwinderThreads].Add(
+ std::move(record));
+ };
+}
+
+std::vector<BoundedQueue<UnwindingRecord>>
+HeapprofdProducer::MakeUnwinderQueues(size_t n) {
+ std::vector<BoundedQueue<UnwindingRecord>> ret(n);
+ for (size_t i = 0; i < n; ++i)
+ ret[i].SetCapacity(kUnwinderQueueSize);
+ return ret;
+}
+
+std::vector<std::thread> HeapprofdProducer::MakeUnwindingThreads(size_t n) {
+ std::vector<std::thread> ret;
+ for (size_t i = 0; i < n; ++i) {
+ ret.emplace_back([this, i] {
+ UnwindingMainLoop(&unwinder_queues_[i], &bookkeeping_queue_);
+ });
+ }
+ return ret;
+}
+
+std::unique_ptr<base::UnixSocket> HeapprofdProducer::MakeSocket() {
+ const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
+ if (sock_fd == nullptr) {
+ unlink(kHeapprofdSocketFile);
+ return base::UnixSocket::Listen(kHeapprofdSocketFile, &socket_listener_,
+ task_runner_);
+ }
+ char* end;
+ int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
+ if (*end != '\0')
+ PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
+ kHeapprofdSocketEnvVar);
+ return base::UnixSocket::Listen(base::ScopedFile(raw_fd), &socket_listener_,
+ task_runner_);
+}
+
+// TODO(fmayer): Delete these and used ReconnectingProducer once submitted
+void HeapprofdProducer::Restart() {
+ // We lost the connection with the tracing service. At this point we need
+ // to reset all the data sources. Trying to handle that manually is going to
+ // be error prone. What we do here is simply desroying the instance and
+ // recreating it again.
+ // TODO(hjd): Add e2e test for this.
+
+ base::TaskRunner* task_runner = task_runner_;
+ const char* socket_name = socket_name_;
+
+ // Invoke destructor and then the constructor again.
+ this->~HeapprofdProducer();
+ new (this) HeapprofdProducer(task_runner);
+
+ ConnectWithRetries(socket_name);
+}
+
+void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
+ PERFETTO_DCHECK(state_ == kNotStarted);
+ state_ = kNotConnected;
+
+ ResetConnectionBackoff();
+ socket_name_ = socket_name;
+ Connect();
+}
+
+void HeapprofdProducer::Connect() {
+ PERFETTO_DCHECK(state_ == kNotConnected);
+ state_ = kConnecting;
+ endpoint_ = ProducerIPCClient::Connect(socket_name_, this,
+ "android.heapprofd", task_runner_);
+}
+
+void HeapprofdProducer::IncreaseConnectionBackoff() {
+ connection_backoff_ms_ *= 2;
+ if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
+ connection_backoff_ms_ = kMaxConnectionBackoffMs;
+}
+
+void HeapprofdProducer::ResetConnectionBackoff() {
+ connection_backoff_ms_ = kInitialConnectionBackoffMs;
+}
+
+} // namespace profiling
+} // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_producer.h b/src/profiling/memory/heapprofd_producer.h
new file mode 100644
index 0000000..f1dd639
--- /dev/null
+++ b/src/profiling/memory/heapprofd_producer.h
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
+#define SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
+
+#include <map>
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/producer.h"
+#include "perfetto/tracing/core/tracing_service.h"
+#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/socket_listener.h"
+
+namespace perfetto {
+namespace profiling {
+
+class HeapprofdProducer : public Producer {
+ public:
+ HeapprofdProducer(base::TaskRunner* task_runner);
+ ~HeapprofdProducer() override;
+
+ // Producer Impl:
+ void OnConnect() override;
+ void OnDisconnect() override;
+ void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
+ void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
+ void StopDataSource(DataSourceInstanceID) override;
+ void OnTracingSetup() override;
+ void Flush(FlushRequestID,
+ const DataSourceInstanceID* data_source_ids,
+ size_t num_data_sources) override;
+
+ // TODO(fmayer): Delete once we have generic reconnect logic.
+ void ConnectWithRetries(const char* socket_name);
+
+ private:
+ // TODO(fmayer): Delete once we have generic reconnect logic.
+ enum State {
+ kNotStarted = 0,
+ kNotConnected,
+ kConnecting,
+ kConnected,
+ };
+ void Connect();
+ void Restart();
+ void ResetConnectionBackoff();
+ void IncreaseConnectionBackoff();
+
+ // TODO(fmayer): Delete once we have generic reconnect logic.
+ State state_ = kNotStarted;
+ uint32_t connection_backoff_ms_ = 0;
+ const char* socket_name_ = nullptr;
+
+ std::function<void(UnwindingRecord)> MakeSocketListenerCallback();
+ std::vector<BoundedQueue<UnwindingRecord>> MakeUnwinderQueues(size_t n);
+ std::vector<std::thread> MakeUnwindingThreads(size_t n);
+ std::unique_ptr<base::UnixSocket> MakeSocket();
+
+ void FinishDataSourceFlush(FlushRequestID flush_id);
+ bool Dump(DataSourceInstanceID id,
+ FlushRequestID flush_id,
+ bool has_flush_id);
+ void DoContinuousDump(DataSourceInstanceID id, uint32_t dump_interval);
+
+ struct DataSource {
+ std::vector<pid_t> pids;
+ // This is a shared ptr so we can lend a weak_ptr to the bookkeeping
+ // thread for unwinding.
+ std::shared_ptr<TraceWriter> trace_writer;
+ // These are opaque handles that shut down the sockets in SocketListener
+ // once they go away.
+ std::vector<SocketListener::ProfilingSession> sessions;
+ };
+
+ std::map<DataSourceInstanceID, DataSource> data_sources_;
+ std::map<FlushRequestID, size_t> flushes_in_progress_;
+
+ // These two are borrowed from the caller.
+ base::TaskRunner* const task_runner_;
+ std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
+
+ BoundedQueue<BookkeepingRecord> bookkeeping_queue_;
+ BookkeepingThread bookkeeping_thread_;
+ std::thread bookkeeping_th_;
+ std::vector<BoundedQueue<UnwindingRecord>> unwinder_queues_;
+ std::vector<std::thread> unwinding_threads_;
+ SocketListener socket_listener_;
+ std::unique_ptr<base::UnixSocket> socket_;
+
+ base::WeakPtrFactory<HeapprofdProducer> weak_factory_;
+};
+
+} // namespace profiling
+} // namespace perfetto
+
+#endif // SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index 2f5c487..5648095 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -25,8 +25,10 @@
#include "perfetto/base/event.h"
#include "perfetto/base/unix_socket.h"
#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/heapprofd_producer.h"
#include "src/profiling/memory/socket_listener.h"
#include "src/profiling/memory/wire_protocol.h"
+#include "src/tracing/ipc/default_socket.h"
#include "perfetto/base/unix_task_runner.h"
@@ -34,139 +36,10 @@
namespace profiling {
namespace {
-constexpr size_t kUnwinderQueueSize = 1000;
-constexpr size_t kBookkeepingQueueSize = 1000;
-constexpr size_t kUnwinderThreads = 5;
-constexpr uint64_t kDefaultSamplingInterval = 1;
-
-base::Event* g_dump_evt = nullptr;
-
-void DumpSignalHandler(int) {
- g_dump_evt->Notify();
-}
-
-// We create kUnwinderThreads unwinding threads and one bookeeping thread.
-// The bookkeeping thread is singleton in order to avoid expensive and
-// complicated synchronisation in the bookkeeping.
-//
-// We wire up the system by creating BoundedQueues between the threads. The main
-// thread runs the TaskRunner driving the SocketListener. The unwinding thread
-// takes the data received by the SocketListener and if it is a malloc does
-// stack unwinding, and if it is a free just forwards the content of the record
-// to the bookkeeping thread.
-//
-// +--------------+
-// |SocketListener|
-// +------+-------+
-// |
-// +--UnwindingRecord -+
-// | |
-// +--------v-------+ +-------v--------+
-// |Unwinding Thread| |Unwinding Thread|
-// +--------+-------+ +-------+--------+
-// | |
-// +-BookkeepingRecord +
-// |
-// +--------v---------+
-// |Bookkeeping Thread|
-// +------------------+
-int HeapprofdMain(int argc, char** argv) {
- // TODO(fmayer): This is temporary until heapprofd is integrated with Perfetto
- // and receives its configuration via that.
- uint64_t sampling_interval = kDefaultSamplingInterval;
- bool standalone = false;
- int opt;
- while ((opt = getopt(argc, argv, "i:s")) != -1) {
- switch (opt) {
- case 'i': {
- char* end;
- long long sampling_interval_arg = strtoll(optarg, &end, 10);
- if (*end != '\0' || *optarg == '\0')
- PERFETTO_FATAL("Invalid sampling interval: %s", optarg);
- PERFETTO_CHECK(sampling_interval > 0);
- sampling_interval = static_cast<uint64_t>(sampling_interval_arg);
- break;
- }
- case 's':
- standalone = true;
- break;
- default:
- PERFETTO_FATAL("%s [-i interval] [-s]", argv[0]);
- }
- }
-
+int HeapprofdMain(int, char**) {
base::UnixTaskRunner task_runner;
- BoundedQueue<BookkeepingRecord> bookkeeping_queue(kBookkeepingQueueSize);
- // We set this up before launching any threads, so we do not have to use a
- // std::atomic for g_dump_evt.
- g_dump_evt = new base::Event();
-
- struct sigaction action = {};
- action.sa_handler = DumpSignalHandler;
- PERFETTO_CHECK(sigaction(SIGUSR1, &action, nullptr) == 0);
- task_runner.AddFileDescriptorWatch(g_dump_evt->fd(), [&bookkeeping_queue] {
- g_dump_evt->Clear();
-
- BookkeepingRecord rec = {};
- rec.record_type = BookkeepingRecord::Type::Dump;
- bookkeeping_queue.Add(std::move(rec));
- });
-
- std::unique_ptr<base::UnixSocket> sock;
-
- BookkeepingThread bookkeeping_thread("/data/local/tmp/heap_dump");
- std::thread bookkeeping_th([&bookkeeping_thread, &bookkeeping_queue] {
- bookkeeping_thread.Run(&bookkeeping_queue);
- });
-
- std::array<BoundedQueue<UnwindingRecord>, kUnwinderThreads> unwinder_queues;
- for (size_t i = 0; i < kUnwinderThreads; ++i)
- unwinder_queues[i].SetCapacity(kUnwinderQueueSize);
- std::vector<std::thread> unwinding_threads;
- unwinding_threads.reserve(kUnwinderThreads);
- for (size_t i = 0; i < kUnwinderThreads; ++i) {
- unwinding_threads.emplace_back([&unwinder_queues, &bookkeeping_queue, i] {
- UnwindingMainLoop(&unwinder_queues[i], &bookkeeping_queue);
- });
- }
-
- auto on_record_received = [&unwinder_queues](UnwindingRecord r) {
- unwinder_queues[static_cast<size_t>(r.pid) % kUnwinderThreads].Add(
- std::move(r));
- };
- SocketListener listener({sampling_interval}, std::move(on_record_received),
- &bookkeeping_thread);
-
- if (optind != argc)
- PERFETTO_FATAL("%s [-i interval] [-s]", argv[0]);
-
- if (standalone) {
- // Allow to be able to manually specify the socket to listen on
- // for testing and sideloading purposes.
- unlink(kHeapprofdSocketFile);
- sock =
- base::UnixSocket::Listen(kHeapprofdSocketFile, &listener, &task_runner);
- } else {
- // When running as a service launched by init on Android, the socket
- // is created by init and passed to the application using an environment
- // variable.
- const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
- if (sock_fd == nullptr)
- PERFETTO_FATAL("No argument given and environment variable %s is unset.",
- kHeapprofdSocketEnvVar);
- char* end;
- int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
- if (*end != '\0')
- PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
- kHeapprofdSocketEnvVar);
- sock = base::UnixSocket::Listen(base::ScopedFile(raw_fd), &listener,
- &task_runner);
- }
-
- if (sock->last_error() != 0)
- PERFETTO_FATAL("Failed to initialize socket: %s",
- strerror(sock->last_error()));
-
+ HeapprofdProducer producer(&task_runner);
+ producer.ConnectWithRetries(GetProducerSocket());
task_runner.Run();
return 0;
}
diff --git a/src/profiling/memory/queue_messages.h b/src/profiling/memory/queue_messages.h
index b37c227..b09fbdc 100644
--- a/src/profiling/memory/queue_messages.h
+++ b/src/profiling/memory/queue_messages.h
@@ -21,18 +21,20 @@
#include <unwindstack/Maps.h>
#include <unwindstack/Unwinder.h>
+
+#include "perfetto/tracing/core/trace_writer.h"
#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
namespace profiling {
-struct ProcessMetadata;
+struct UnwindingMetadata;
struct UnwindingRecord {
pid_t pid;
size_t size;
std::unique_ptr<uint8_t[]> data;
- std::weak_ptr<ProcessMetadata> metadata;
+ std::weak_ptr<UnwindingMetadata> metadata;
};
struct FreeRecord {
@@ -46,6 +48,12 @@
std::vector<unwindstack::FrameData> frames;
};
+struct DumpRecord {
+ std::vector<pid_t> pids;
+ std::weak_ptr<TraceWriter> trace_writer;
+ std::function<void()> callback;
+};
+
struct BookkeepingRecord {
enum class Type {
Dump = 0,
@@ -57,6 +65,7 @@
Type record_type;
AllocRecord alloc_record;
FreeRecord free_record;
+ DumpRecord dump_record;
};
} // namespace profiling
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
index 9009bb0..733c6ea 100644
--- a/src/profiling/memory/socket_listener.cc
+++ b/src/profiling/memory/socket_listener.cc
@@ -22,6 +22,13 @@
void SocketListener::OnDisconnect(base::UnixSocket* self) {
bookkeeping_thread_->NotifyClientDisconnected(self->peer_pid());
+ auto it = process_info_.find(self->peer_pid());
+ if (it != process_info_.end()) {
+ ProcessInfo& process_info = it->second;
+ process_info.sockets.erase(self);
+ } else {
+ PERFETTO_DFATAL("Disconnect from socket without ProcessInfo.");
+ }
sockets_.erase(self);
}
@@ -29,8 +36,20 @@
base::UnixSocket*,
std::unique_ptr<base::UnixSocket> new_connection) {
base::UnixSocket* new_connection_raw = new_connection.get();
+ pid_t pid = new_connection_raw->peer_pid();
+
+ auto it = process_info_.find(pid);
+ if (it == process_info_.end()) {
+ PERFETTO_DFATAL("Unexpected connection.");
+ return;
+ }
+ ProcessInfo& process_info = it->second;
+
sockets_.emplace(new_connection_raw, std::move(new_connection));
- bookkeeping_thread_->NotifyClientConnected(new_connection_raw->peer_pid());
+ process_info.sockets.emplace(new_connection_raw);
+ // TODO(fmayer): Move destruction of bookkeeping data to
+ // HeapprofdProducer.
+ bookkeeping_thread_->NotifyClientConnected(pid);
}
void SocketListener::OnDataAvailable(base::UnixSocket* self) {
@@ -42,32 +61,42 @@
Entry& entry = socket_it->second;
RecordReader::ReceiveBuffer buf = entry.record_reader.BeginReceive();
+
+ auto process_info_it = process_info_.find(peer_pid);
+ if (process_info_it == process_info_.end()) {
+ PERFETTO_DFATAL("This should not happen.");
+ return;
+ }
+ ProcessInfo& process_info = process_info_it->second;
+
size_t rd;
if (PERFETTO_LIKELY(entry.recv_fds)) {
rd = self->Receive(buf.data, buf.size);
} else {
- auto it = process_metadata_.find(peer_pid);
- if (it != process_metadata_.end() && !it->second.expired()) {
+ auto it = unwinding_metadata_.find(peer_pid);
+ if (it != unwinding_metadata_.end() && !it->second.expired()) {
entry.recv_fds = true;
// If the process already has metadata, this is an additional socket for
// an existing process. Reuse existing metadata and close the received
// file descriptors.
- entry.process_metadata = std::shared_ptr<ProcessMetadata>(it->second);
+ entry.unwinding_metadata = std::shared_ptr<UnwindingMetadata>(it->second);
rd = self->Receive(buf.data, buf.size);
} else {
base::ScopedFile fds[2];
rd = self->Receive(buf.data, buf.size, fds, base::ArraySize(fds));
if (fds[0] && fds[1]) {
+ PERFETTO_DLOG("%d: Received FDs.", peer_pid);
entry.recv_fds = true;
- entry.process_metadata = std::make_shared<ProcessMetadata>(
+ entry.unwinding_metadata = std::make_shared<UnwindingMetadata>(
peer_pid, std::move(fds[0]), std::move(fds[1]));
- process_metadata_[peer_pid] = entry.process_metadata;
- self->Send(&client_config_, sizeof(client_config_), -1,
+ unwinding_metadata_[peer_pid] = entry.unwinding_metadata;
+ self->Send(&process_info.client_config,
+ sizeof(process_info.client_config), -1,
base::UnixSocket::BlockingMode::kBlocking);
} else if (fds[0] || fds[1]) {
- PERFETTO_DLOG("Received partial FDs.");
+ PERFETTO_DLOG("%d: Received partial FDs.", peer_pid);
} else {
- PERFETTO_DLOG("Received no FDs.");
+ PERFETTO_DLOG("%d: Received no FDs.", peer_pid);
}
}
}
@@ -86,6 +115,30 @@
}
}
+SocketListener::ProfilingSession SocketListener::ExpectPID(
+ pid_t pid,
+ ClientConfiguration cfg) {
+ PERFETTO_DLOG("Expecting connection from %d", pid);
+ bool inserted;
+ std::tie(std::ignore, inserted) = process_info_.emplace(pid, std::move(cfg));
+ if (!inserted)
+ return ProfilingSession(0, nullptr);
+ return ProfilingSession(pid, this);
+}
+
+void SocketListener::ShutdownPID(pid_t pid) {
+ PERFETTO_DLOG("Shutting down connecting from %d", pid);
+ auto it = process_info_.find(pid);
+ if (it == process_info_.end()) {
+ PERFETTO_DFATAL("Shutting down nonexistant pid.");
+ return;
+ }
+ ProcessInfo& process_info = it->second;
+ // Disconnect all sockets for process.
+ for (base::UnixSocket* socket : process_info.sockets)
+ socket->Shutdown(true);
+}
+
void SocketListener::RecordReceived(base::UnixSocket* self,
size_t size,
std::unique_ptr<uint8_t[]> buf) {
@@ -97,7 +150,7 @@
return;
}
Entry& entry = it->second;
- if (!entry.process_metadata) {
+ if (!entry.unwinding_metadata) {
PERFETTO_DLOG("Received record without process metadata.");
return;
}
@@ -107,12 +160,12 @@
return;
}
// This needs to be a weak_ptr for two reasons:
- // 1) most importantly, the weak_ptr in process_metadata_ should expire as
+ // 1) most importantly, the weak_ptr in unwinding_metadata_ should expire as
// soon as the last socket for a process goes away. Otherwise, a recycled
// PID might reuse incorrect metadata.
// 2) it is a waste to unwind for a process that had already gone away.
- std::weak_ptr<ProcessMetadata> weak_metadata(entry.process_metadata);
- callback_function_({entry.process_metadata->pid, size, std::move(buf),
+ std::weak_ptr<UnwindingMetadata> weak_metadata(entry.unwinding_metadata);
+ callback_function_({entry.unwinding_metadata->pid, size, std::move(buf),
std::move(weak_metadata)});
}
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
index 69738b0..8c2c3e8 100644
--- a/src/profiling/memory/socket_listener.h
+++ b/src/profiling/memory/socket_listener.h
@@ -32,11 +32,43 @@
class SocketListener : public base::UnixSocket::EventListener {
public:
- SocketListener(ClientConfiguration client_config,
- std::function<void(UnwindingRecord)> fn,
+ friend class ProfilingSession;
+ class ProfilingSession {
+ public:
+ friend class SocketListener;
+
+ ProfilingSession(ProfilingSession&& other)
+ : pid_(other.pid_), listener_(other.listener_) {
+ other.listener_ = nullptr;
+ }
+
+ ~ProfilingSession() {
+ if (listener_)
+ listener_->ShutdownPID(pid_);
+ }
+ ProfilingSession& operator=(ProfilingSession&& other) {
+ pid_ = other.pid_;
+ listener_ = other.listener_;
+ other.listener_ = nullptr;
+ return *this;
+ }
+
+ operator bool() const { return listener_ != nullptr; }
+
+ ProfilingSession(const ProfilingSession&) = delete;
+ ProfilingSession& operator=(const ProfilingSession&) = delete;
+
+ private:
+ ProfilingSession(pid_t pid, SocketListener* listener)
+ : pid_(pid), listener_(listener) {}
+
+ pid_t pid_;
+ SocketListener* listener_ = nullptr;
+ };
+
+ SocketListener(std::function<void(UnwindingRecord)> fn,
BookkeepingThread* bookkeeping_thread)
- : client_config_(client_config),
- callback_function_(std::move(fn)),
+ : callback_function_(std::move(fn)),
bookkeeping_thread_(bookkeeping_thread) {}
void OnDisconnect(base::UnixSocket* self) override;
void OnNewIncomingConnection(
@@ -44,7 +76,15 @@
std::unique_ptr<base::UnixSocket> new_connection) override;
void OnDataAvailable(base::UnixSocket* self) override;
+ ProfilingSession ExpectPID(pid_t pid, ClientConfiguration cfg);
+
private:
+ struct ProcessInfo {
+ ProcessInfo(ClientConfiguration cfg) : client_config(std::move(cfg)) {}
+ ClientConfiguration client_config;
+ std::set<base::UnixSocket*> sockets;
+ };
+
struct Entry {
Entry(std::unique_ptr<base::UnixSocket> s) : sock(std::move(s)) {}
// Only here for ownership of the object.
@@ -58,14 +98,15 @@
//
// This does not get initialized in the ctor because the file descriptors
// only get received after the first Receive call of the socket.
- std::shared_ptr<ProcessMetadata> process_metadata;
+ std::shared_ptr<UnwindingMetadata> unwinding_metadata;
};
void RecordReceived(base::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
+ void ShutdownPID(pid_t pid);
- ClientConfiguration client_config_;
std::map<base::UnixSocket*, Entry> sockets_;
- std::map<pid_t, std::weak_ptr<ProcessMetadata>> process_metadata_;
+ std::map<pid_t, std::weak_ptr<UnwindingMetadata>> unwinding_metadata_;
+ std::map<pid_t, ProcessInfo> process_info_;
std::function<void(UnwindingRecord)> callback_function_;
BookkeepingThread* const bookkeeping_thread_;
};
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index 225572e..21d9caf 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -55,8 +55,8 @@
};
BookkeepingThread actor("");
- SocketListener listener({}, // We do not care about the sampling rate.
- std::move(callback_fn), &actor);
+ SocketListener listener(std::move(callback_fn), &actor);
+ auto handle = listener.ExpectPID(getpid(), {});
MockEventListener client_listener;
EXPECT_CALL(client_listener, OnConnect(_, _))
.WillOnce(InvokeWithoutArgs(connected));
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 4f085fd..4f2bbae 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -135,7 +135,7 @@
maps_.clear();
}
-bool DoUnwind(WireMessage* msg, ProcessMetadata* metadata, AllocRecord* out) {
+bool DoUnwind(WireMessage* msg, UnwindingMetadata* metadata, AllocRecord* out) {
AllocMetadata* alloc_metadata = msg->alloc_header;
std::unique_ptr<unwindstack::Regs> regs(
CreateFromRawData(alloc_metadata->arch, alloc_metadata->register_data));
@@ -175,7 +175,7 @@
&msg))
return false;
if (msg.record_type == RecordType::Malloc) {
- std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+ std::shared_ptr<UnwindingMetadata> metadata = rec->metadata.lock();
if (!metadata) {
// Process has already gone away.
return false;
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index aef2918..f9d0b33 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -40,8 +40,8 @@
base::ScopedFile fd_;
};
-struct ProcessMetadata {
- ProcessMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
+struct UnwindingMetadata {
+ UnwindingMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
: pid(p), maps(std::move(maps_fd)), mem_fd(std::move(mem)) {
PERFETTO_CHECK(maps.Parse());
}
@@ -65,7 +65,7 @@
uint8_t* stack_;
};
-bool DoUnwind(WireMessage*, ProcessMetadata* metadata, AllocRecord* out);
+bool DoUnwind(WireMessage*, UnwindingMetadata* metadata, AllocRecord* out);
bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out);
diff --git a/src/profiling/memory/unwinding_fuzzer.cc b/src/profiling/memory/unwinding_fuzzer.cc
index e1fe6ee..a52d673 100644
--- a/src/profiling/memory/unwinding_fuzzer.cc
+++ b/src/profiling/memory/unwinding_fuzzer.cc
@@ -27,7 +27,7 @@
int FuzzUnwinding(const uint8_t* data, size_t size) {
UnwindingRecord record;
- auto process_metadata = std::make_shared<ProcessMetadata>(
+ auto unwinding_metadata = std::make_shared<UnwindingMetadata>(
getpid(), base::OpenFile("/proc/self/maps", O_RDONLY),
base::OpenFile("/proc/self/mem", O_RDONLY));
@@ -35,7 +35,7 @@
record.size = size;
record.data.reset(new uint8_t[size]);
memcpy(record.data.get(), data, size);
- record.metadata = process_metadata;
+ record.metadata = unwinding_metadata;
BookkeepingRecord out;
HandleUnwindingRecord(&record, &out);
diff --git a/src/profiling/memory/unwinding_unittest.cc b/src/profiling/memory/unwinding_unittest.cc
index 25c6f48..c38ba00 100644
--- a/src/profiling/memory/unwinding_unittest.cc
+++ b/src/profiling/memory/unwinding_unittest.cc
@@ -123,7 +123,8 @@
base::ScopedFile proc_maps(base::OpenFile("/proc/self/maps", O_RDONLY));
base::ScopedFile proc_mem(base::OpenFile("/proc/self/mem", O_RDONLY));
GlobalCallstackTrie callsites;
- ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem));
+ UnwindingMetadata metadata(getpid(), std::move(proc_maps),
+ std::move(proc_mem));
WireMessage msg;
auto record = GetRecord(&msg);
AllocRecord out;