profiling: Add missing pieces to heapprofd.
* Allow several heap dumps using the same call graph tree.
* Add main loops for worker threads.
* Add queue between worker threads.
* Add client functions for sending the stack.
Change-Id: I5d66b6199ab72467e43335df257c08825c2d2228
diff --git a/Android.bp b/Android.bp
index 9365d34..8edd08a 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3849,8 +3849,10 @@
"src/perfetto_cmd/rate_limiter_unittest.cc",
"src/profiling/memory/bookkeeping.cc",
"src/profiling/memory/bookkeeping_unittest.cc",
+ "src/profiling/memory/bounded_queue_unittest.cc",
"src/profiling/memory/client.cc",
"src/profiling/memory/client_unittest.cc",
+ "src/profiling/memory/heapprofd_integrationtest.cc",
"src/profiling/memory/record_reader.cc",
"src/profiling/memory/record_reader_unittest.cc",
"src/profiling/memory/socket_listener.cc",
@@ -3859,6 +3861,7 @@
"src/profiling/memory/string_interner_unittest.cc",
"src/profiling/memory/unwinding.cc",
"src/profiling/memory/unwinding_unittest.cc",
+ "src/profiling/memory/wire_protocol.cc",
"src/protozero/message.cc",
"src/protozero/message_handle.cc",
"src/protozero/message_handle_unittest.cc",
diff --git a/include/perfetto/base/sock_utils.h b/include/perfetto/base/sock_utils.h
index 0008bf3..8f3a8b3 100644
--- a/include/perfetto/base/sock_utils.h
+++ b/include/perfetto/base/sock_utils.h
@@ -19,6 +19,9 @@
#include "perfetto/base/scoped_file.h"
+#include <sys/socket.h>
+#include <sys/un.h>
+
namespace perfetto {
namespace base {
@@ -33,6 +36,13 @@
size_t len,
base::ScopedFile* fd_vec,
size_t max_files);
+
+bool MakeSockAddr(const std::string& socket_name,
+ sockaddr_un* addr,
+ socklen_t* addr_size);
+
+base::ScopedFile CreateSocket();
+
} // namespace base
} // namespace perfetto
diff --git a/src/base/sock_utils.cc b/src/base/sock_utils.cc
index d643d8d..bbe00fd 100644
--- a/src/base/sock_utils.cc
+++ b/src/base/sock_utils.cc
@@ -131,5 +131,27 @@
#pragma GCC diagnostic pop
+bool MakeSockAddr(const std::string& socket_name,
+ sockaddr_un* addr,
+ socklen_t* addr_size) {
+ memset(addr, 0, sizeof(*addr));
+ const size_t name_len = socket_name.size();
+ if (name_len >= sizeof(addr->sun_path)) {
+ errno = ENAMETOOLONG;
+ return false;
+ }
+ memcpy(addr->sun_path, socket_name.data(), name_len);
+ if (addr->sun_path[0] == '@')
+ addr->sun_path[0] = '\0';
+ addr->sun_family = AF_UNIX;
+ *addr_size = static_cast<socklen_t>(
+ __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+ return true;
+}
+
+base::ScopedFile CreateSocket() {
+ return base::ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
+}
+
} // namespace base
} // namespace perfetto
diff --git a/src/ipc/unix_socket.cc b/src/ipc/unix_socket.cc
index 4acc59f..3f49b8d 100644
--- a/src/ipc/unix_socket.cc
+++ b/src/ipc/unix_socket.cc
@@ -44,41 +44,15 @@
// TODO(primiano): Add ThreadChecker to methods of this class.
-namespace {
-
-bool MakeSockAddr(const std::string& socket_name,
- sockaddr_un* addr,
- socklen_t* addr_size) {
- memset(addr, 0, sizeof(*addr));
- const size_t name_len = socket_name.size();
- if (name_len >= sizeof(addr->sun_path)) {
- errno = ENAMETOOLONG;
- return false;
- }
- memcpy(addr->sun_path, socket_name.data(), name_len);
- if (addr->sun_path[0] == '@')
- addr->sun_path[0] = '\0';
- addr->sun_family = AF_UNIX;
- *addr_size = static_cast<socklen_t>(
- __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
- return true;
-}
-
-base::ScopedFile CreateSocket() {
- return base::ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
-}
-
-} // namespace
-
// static
base::ScopedFile UnixSocket::CreateAndBind(const std::string& socket_name) {
- base::ScopedFile fd = CreateSocket();
+ base::ScopedFile fd = base::CreateSocket();
if (!fd)
return fd;
sockaddr_un addr;
socklen_t addr_size;
- if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+ if (!base::MakeSockAddr(socket_name, &addr, &addr_size)) {
return base::ScopedFile();
}
@@ -134,7 +108,7 @@
if (adopt_state == State::kDisconnected) {
// We get here from the default ctor().
PERFETTO_DCHECK(!adopt_fd);
- fd_ = CreateSocket();
+ fd_ = base::CreateSocket();
if (!fd_) {
last_error_ = errno;
return;
@@ -201,7 +175,7 @@
sockaddr_un addr;
socklen_t addr_size;
- if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+ if (!base::MakeSockAddr(socket_name, &addr, &addr_size)) {
last_error_ = errno;
return NotifyConnectionState(false);
}
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 0ccc573..78e57c9 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -15,83 +15,46 @@
import("../../../gn/perfetto.gni")
import("//build_overrides/build.gni")
-source_set("record_reader") {
- public_configs = [ "../../../gn:default_config" ]
- deps = [
- "../../../gn:default_deps",
- "../../base",
- ]
- sources = [
- "record_reader.cc",
- "record_reader.h",
- ]
-}
-
-source_set("unwinding") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+source_set("wire_protocol") {
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
deps = [
"../../../buildtools:libunwindstack",
"../../../gn:default_deps",
"../../base",
]
sources = [
- "unwinding.cc",
- "unwinding.h",
+ "wire_protocol.cc",
+ "wire_protocol.h",
]
}
-source_set("socket_listener") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+source_set("daemon") {
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
deps = [
- ":record_reader",
- ":unwinding",
+ ":wire_protocol",
+ "../../../buildtools:libunwindstack",
"../../../gn:default_deps",
"../../base",
"../../ipc",
]
sources = [
- "socket_listener.cc",
- "socket_listener.h",
- ]
-}
-
-source_set("string_interner") {
- public_configs = [ "../../../gn:default_config" ]
- deps = [
- "../../../gn:default_deps",
- "../../base",
- ]
- sources = [
- "string_interner.cc",
- "string_interner.h",
- ]
-}
-
-source_set("bookkeeping") {
- public_configs = [ "../../../gn:default_config" ]
- deps = [
- ":string_interner",
- "../../../gn:default_deps",
- "../../base",
- ]
- sources = [
"bookkeeping.cc",
"bookkeeping.h",
+ "record_reader.cc",
+ "record_reader.h",
+ "socket_listener.cc",
+ "socket_listener.h",
+ "string_interner.cc",
+ "string_interner.h",
+ "unwinding.cc",
+ "unwinding.h",
]
}
source_set("client") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
deps = [
+ ":wire_protocol",
"../../../buildtools:libunwindstack",
"../../../gn:default_deps",
"../../base",
@@ -103,25 +66,22 @@
}
source_set("unittests") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
testonly = true
deps = [
- ":bookkeeping",
":client",
- ":record_reader",
- ":socket_listener",
- ":string_interner",
- ":unwinding",
+ ":daemon",
+ ":wire_protocol",
+ "../../../gn:default_deps",
"../../../gn:gtest_deps",
"../../base",
"../../base:test_support",
]
sources = [
"bookkeeping_unittest.cc",
+ "bounded_queue_unittest.cc",
"client_unittest.cc",
+ "heapprofd_integrationtest.cc",
"record_reader_unittest.cc",
"socket_listener_unittest.cc",
"string_interner_unittest.cc",
@@ -130,10 +90,8 @@
}
executable("heapprofd") {
- public_configs = [ "../../../gn:default_config" ]
deps = [
- ":socket_listener",
- ":unwinding",
+ ":daemon",
"../../../gn:default_deps",
"../../base",
"../../ipc",
diff --git a/src/profiling/memory/bookkeeping.cc b/src/profiling/memory/bookkeeping.cc
index 1624cee..7a31502 100644
--- a/src/profiling/memory/bookkeeping.cc
+++ b/src/profiling/memory/bookkeeping.cc
@@ -16,37 +16,100 @@
#include "src/profiling/memory/bookkeeping.h"
+#include "perfetto/base/logging.h"
+
namespace perfetto {
-MemoryBookkeeping::Node* MemoryBookkeeping::Node::GetOrCreateChild(
- const MemoryBookkeeping::InternedCodeLocation& loc) {
+GlobalCallstackTrie::Node* GlobalCallstackTrie::Node::GetOrCreateChild(
+ const InternedCodeLocation& loc) {
Node* child = children_.Get(loc);
if (!child)
child = children_.Emplace(loc, this);
return child;
}
-void MemoryBookkeeping::RecordMalloc(const std::vector<CodeLocation>& locs,
- uint64_t address,
- uint64_t size) {
- Node* node = &root_;
- node->cum_size_ += size;
- for (const MemoryBookkeeping::CodeLocation& loc : locs) {
- node = node->GetOrCreateChild(InternCodeLocation(loc));
- node->cum_size_ += size;
+void HeapTracker::RecordMalloc(const std::vector<CodeLocation>& callstack,
+ uint64_t address,
+ uint64_t size,
+ uint64_t sequence_number) {
+ auto it = allocations_.find(address);
+ if (it != allocations_.end()) {
+ if (it->second.sequence_number > sequence_number) {
+ return;
+ } else {
+ // Clean up previous allocation by pretending a free happened just after
+ // it.
+ // CommitFree only uses the sequence number to check whether the
+ // currently active allocation is newer than the free, so we can make
+ // up a sequence_number here.
+ CommitFree(it->second.sequence_number + 1, address);
+ }
}
- allocations_.emplace(address, std::make_pair(size, node));
+ GlobalCallstackTrie::Node* node =
+ callsites_->IncrementCallsite(callstack, size);
+ allocations_.emplace(address, Allocation(size, sequence_number, node));
+
+ // Keep the sequence tracker consistent.
+ RecordFree(kNoopFree, sequence_number);
}
-void MemoryBookkeeping::RecordFree(uint64_t address) {
+void HeapTracker::RecordFree(uint64_t address, uint64_t sequence_number) {
+ if (sequence_number != sequence_number_ + 1) {
+ pending_frees_.emplace(sequence_number, address);
+ return;
+ }
+
+ if (address != kNoopFree)
+ CommitFree(sequence_number, address);
+ sequence_number_++;
+
+ // At this point some other pending frees might be eligible to be committed.
+ auto it = pending_frees_.begin();
+ while (it != pending_frees_.end() && it->first == sequence_number_ + 1) {
+ if (it->second != kNoopFree)
+ CommitFree(it->first, it->second);
+ sequence_number_++;
+ it = pending_frees_.erase(it);
+ }
+}
+
+void HeapTracker::CommitFree(uint64_t sequence_number, uint64_t address) {
auto leaf_it = allocations_.find(address);
if (leaf_it == allocations_.end())
return;
- std::pair<uint64_t, Node*> value = leaf_it->second;
- uint64_t size = value.first;
- Node* node = value.second;
+ const Allocation& value = leaf_it->second;
+ if (value.sequence_number > sequence_number)
+ return;
+ allocations_.erase(leaf_it);
+}
+
+uint64_t GlobalCallstackTrie::GetCumSizeForTesting(
+ const std::vector<CodeLocation>& callstack) {
+ Node* node = &root_;
+ for (const CodeLocation& loc : callstack) {
+ node = node->children_.Get(InternCodeLocation(loc));
+ if (node == nullptr)
+ return 0;
+ }
+ return node->cum_size_;
+}
+
+GlobalCallstackTrie::Node* GlobalCallstackTrie::IncrementCallsite(
+ const std::vector<CodeLocation>& callstack,
+ uint64_t size) {
+ Node* node = &root_;
+ node->cum_size_ += size;
+ for (const CodeLocation& loc : callstack) {
+ node = node->GetOrCreateChild(InternCodeLocation(loc));
+ node->cum_size_ += size;
+ }
+ return node;
+}
+
+void GlobalCallstackTrie::DecrementNode(Node* node, uint64_t size) {
+ PERFETTO_DCHECK(node->cum_size_ >= size);
bool delete_prev = false;
Node* prev = nullptr;
@@ -58,19 +121,6 @@
prev = node;
node = node->parent_;
}
-
- allocations_.erase(leaf_it);
-}
-
-uint64_t MemoryBookkeeping::GetCumSizeForTesting(
- const std::vector<CodeLocation>& locs) {
- Node* node = &root_;
- for (const MemoryBookkeeping::CodeLocation& loc : locs) {
- node = node->children_.Get(InternCodeLocation(loc));
- if (node == nullptr)
- return 0;
- }
- return node->cum_size_;
}
} // namespace perfetto
diff --git a/src/profiling/memory/bookkeeping.h b/src/profiling/memory/bookkeeping.h
index 7e5f705..bd56dd8 100644
--- a/src/profiling/memory/bookkeeping.h
+++ b/src/profiling/memory/bookkeeping.h
@@ -26,39 +26,37 @@
namespace perfetto {
-class MemoryBookkeeping {
- public:
- struct CodeLocation {
- CodeLocation(std::string map_n, std::string function_n)
- : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
+class HeapTracker;
- std::string map_name;
- std::string function_name;
- };
+struct CodeLocation {
+ CodeLocation(std::string map_n, std::string function_n)
+ : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
- void RecordMalloc(const std::vector<CodeLocation>& stack,
- uint64_t address,
- uint64_t size);
- void RecordFree(uint64_t address);
- uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+ std::string map_name;
+ std::string function_name;
+};
- private:
- struct InternedCodeLocation {
- StringInterner::InternedString map_name;
- StringInterner::InternedString function_name;
+// Internal data-structure for GlobalCallstackTrie to save memory if the same
+// function is named multiple times.
+struct InternedCodeLocation {
+ StringInterner::InternedString map_name;
+ StringInterner::InternedString function_name;
- bool operator<(const InternedCodeLocation& other) const {
- if (map_name.id() == other.map_name.id())
- return function_name.id() < other.function_name.id();
- return map_name.id() < other.map_name.id();
- }
- };
-
- InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
- return {interner_.Intern(loc.map_name),
- interner_.Intern(loc.function_name)};
+ bool operator<(const InternedCodeLocation& other) const {
+ if (map_name.id() == other.map_name.id())
+ return function_name.id() < other.function_name.id();
+ return map_name.id() < other.map_name.id();
}
+};
+// Graph of function callsites. This is shared between heap dumps for
+// different processes. Each call site is represented by a
+// GlobalCallstackTrie::Node that is owned by the parent (i.e. calling)
+// callsite. It has a pointer to its parent, which means the function call-graph
+// can be reconstructed from a GlobalCallstackTrie::Node by walking down the
+// pointers to the parents.
+class GlobalCallstackTrie {
+ public:
// Node in a tree of function traces that resulted in an allocation. For
// instance, if alloc_buf is called from foo and bar, which are called from
// main, the tree looks as following.
@@ -77,25 +75,119 @@
// alloc_buf to the leafs of this tree.
class Node {
public:
+ // This is opaque except to GlobalCallstackTrie.
+ friend class GlobalCallstackTrie;
+
Node(InternedCodeLocation location) : Node(std::move(location), nullptr) {}
Node(InternedCodeLocation location, Node* parent)
: parent_(parent), location_(std::move(location)) {}
+ private:
Node* GetOrCreateChild(const InternedCodeLocation& loc);
uint64_t cum_size_ = 0;
- Node* parent_;
+ Node* const parent_;
const InternedCodeLocation location_;
base::LookupSet<Node, const InternedCodeLocation, &Node::location_>
children_;
};
- // Address -> (size, code location)
- std::map<uint64_t, std::pair<uint64_t, Node*>> allocations_;
+ GlobalCallstackTrie() = default;
+ GlobalCallstackTrie(const GlobalCallstackTrie&) = delete;
+ GlobalCallstackTrie& operator=(const GlobalCallstackTrie&) = delete;
+
+ uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+ Node* IncrementCallsite(const std::vector<CodeLocation>& locs, uint64_t size);
+ static void DecrementNode(Node* node, uint64_t size);
+
+ private:
+ InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
+ return {interner_.Intern(loc.map_name),
+ interner_.Intern(loc.function_name)};
+ }
+
StringInterner interner_;
Node root_{{interner_.Intern(""), interner_.Intern("")}};
};
+// Snapshot for memory allocations of a particular process. Shares callsites
+// with other processes.
+class HeapTracker {
+ public:
+ // Caller needs to ensure that callsites outlives the HeapTracker.
+ explicit HeapTracker(GlobalCallstackTrie* callsites)
+ : callsites_(callsites) {}
+
+ void RecordMalloc(const std::vector<CodeLocation>& stack,
+ uint64_t address,
+ uint64_t size,
+ uint64_t sequence_number);
+ void RecordFree(uint64_t address, uint64_t sequence_number);
+
+ private:
+ static constexpr uint64_t kNoopFree = 0;
+ struct Allocation {
+ Allocation(uint64_t size, uint64_t seq, GlobalCallstackTrie::Node* n)
+ : alloc_size(size), sequence_number(seq), node(n) {}
+
+ Allocation() = default;
+ Allocation(const Allocation&) = delete;
+ Allocation(Allocation&& other) noexcept {
+ alloc_size = other.alloc_size;
+ sequence_number = other.sequence_number;
+ node = other.node;
+ other.node = nullptr;
+ }
+
+ ~Allocation() {
+ if (node)
+ GlobalCallstackTrie::DecrementNode(node, alloc_size);
+ }
+
+ uint64_t alloc_size;
+ uint64_t sequence_number;
+ GlobalCallstackTrie::Node* node;
+ };
+
+ // Sequencing logic works as following:
+ // * mallocs are immediately commited to |allocations_|. They are rejected if
+ // the current malloc for the address has a higher sequence number.
+ //
+ // If all operations with sequence numbers lower than the malloc have been
+ // commited to |allocations_|, sequence_number_ is advanced and all
+ // unblocked pending operations after the current id are commited to
+ // |allocations_|. Otherwise, a no-op record is added to the pending
+ // operations queue to maintain the contiguity of the sequence.
+
+ // * for frees:
+ // if all operations with sequence numbers lower than the free have
+ // been commited to |allocations_| (i.e sequence_number_ ==
+ // sequence_number - 1) the free is commited to |allocations_| and
+ // sequence_number_ is advanced. All unblocked pending operations are
+ // commited to |allocations_|.
+ // otherwise: the free is added to the queue of pending operations.
+
+ // Commits a free operation into |allocations_|.
+ // This must be called after all operations up to sequence_number have been
+ // commited to |allocations_|.
+ void CommitFree(uint64_t sequence_number, uint64_t address);
+
+ // Address -> (size, sequence_number, code location)
+ std::map<uint64_t, Allocation> allocations_;
+
+ // if allocation address != 0, there is pending free of the address.
+ // if == 0, the pending operation is a no-op.
+ // No-op operations come from allocs that have already been commited to
+ // |allocations_|. It is important to keep track of them in the list of
+ // pending to maintain the contiguity of the sequence.
+ std::map<uint64_t /* seq_id */, uint64_t /* allocation address */>
+ pending_frees_;
+
+ // The sequence number all mallocs and frees have been handled up to.
+ uint64_t sequence_number_ = 0;
+ GlobalCallstackTrie* const callsites_;
+};
+
} // namespace perfetto
#endif // SRC_PROFILING_MEMORY_BOOKKEEPING_H_
diff --git a/src/profiling/memory/bookkeeping_unittest.cc b/src/profiling/memory/bookkeeping_unittest.cc
index 23da7cf..1e7b369 100644
--- a/src/profiling/memory/bookkeeping_unittest.cc
+++ b/src/profiling/memory/bookkeeping_unittest.cc
@@ -22,22 +22,65 @@
namespace perfetto {
namespace {
-TEST(BookkeepingTest, Basic) {
- std::vector<MemoryBookkeeping::CodeLocation> stack{
+std::vector<CodeLocation> stack() {
+ return {
{"map1", "fun1"}, {"map2", "fun2"},
};
+}
- std::vector<MemoryBookkeeping::CodeLocation> stack2{
+std::vector<CodeLocation> stack2() {
+ return {
{"map1", "fun1"}, {"map3", "fun3"},
};
- MemoryBookkeeping mb;
- mb.RecordMalloc(stack, 1, 5);
- mb.RecordMalloc(stack2, 2, 2);
- ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
- mb.RecordFree(2);
- ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
- mb.RecordFree(1);
- ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, Basic) {
+ uint64_t sequence_number = 1;
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+ hd.RecordMalloc(stack2(), 2, 2, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+ hd.RecordFree(2, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+ hd.RecordFree(1, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, TwoHeapTrackers) {
+ uint64_t sequence_number = 1;
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+ {
+ HeapTracker hd2(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+ hd2.RecordMalloc(stack2(), 2, 2, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+ }
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+}
+
+TEST(BookkeepingTest, ReplaceAlloc) {
+ uint64_t sequence_number = 1;
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+ hd.RecordMalloc(stack2(), 1, 2, sequence_number++);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack()), 0);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 2);
+}
+
+TEST(BookkeepingTest, OutOfOrder) {
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, 1);
+ hd.RecordMalloc(stack2(), 1, 2, 0);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack()), 5);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 0);
}
} // namespace
diff --git a/src/profiling/memory/bounded_queue.h b/src/profiling/memory/bounded_queue.h
new file mode 100644
index 0000000..521ec5d
--- /dev/null
+++ b/src/profiling/memory/bounded_queue.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+#define SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+
+#include "perfetto/base/logging.h"
+
+// Transport messages between threads. Multiple-producer / single-consumer.
+//
+// This has to outlive both the consumer and the producer who have to
+// negotiate termination separately, if needed. This is currently only used
+// in a scenario where the producer and consumer both are loops that never
+// terminate.
+template <typename T>
+class BoundedQueue {
+ public:
+ BoundedQueue() : BoundedQueue(1) {}
+ BoundedQueue(size_t capacity) : capacity_(capacity) {
+ PERFETTO_CHECK(capacity > 0);
+ }
+
+ void Add(T item) {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (deque_.size() == capacity_)
+ full_cv_.wait(l, [this] { return deque_.size() < capacity_; });
+ deque_.emplace_back(std::move(item));
+ if (deque_.size() == 1)
+ empty_cv_.notify_all();
+ }
+
+ T Get() {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (elements_ == 0)
+ empty_cv_.wait(l, [this] { return !deque_.empty(); });
+ T item(std::move(deque_.front()));
+ deque_.pop_front();
+ if (deque_.size() == capacity_ - 1) {
+ l.unlock();
+ full_cv_.notify_all();
+ }
+ return item;
+ }
+
+ void SetCapacity(size_t capacity) {
+ PERFETTO_CHECK(capacity > 0);
+ {
+ std::lock_guard<std::mutex> l(mutex_);
+ capacity_ = capacity;
+ }
+ full_cv_.notify_all();
+ }
+
+ private:
+ size_t capacity_;
+ size_t elements_ = 0;
+ std::deque<T> deque_;
+ std::condition_variable full_cv_;
+ std::condition_variable empty_cv_;
+ std::mutex mutex_;
+};
+
+#endif // SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
diff --git a/src/profiling/memory/bounded_queue_unittest.cc b/src/profiling/memory/bounded_queue_unittest.cc
new file mode 100644
index 0000000..63127d1
--- /dev/null
+++ b/src/profiling/memory/bounded_queue_unittest.cc
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/bounded_queue.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(BoundedQueueTest, IsFIFO) {
+ BoundedQueue<int> q(2);
+ q.Add(1);
+ q.Add(2);
+ EXPECT_EQ(q.Get(), 1);
+ EXPECT_EQ(q.Get(), 2);
+}
+
+TEST(BoundedQueueTest, Blocking) {
+ BoundedQueue<int> q(2);
+ q.Add(1);
+ q.Add(2);
+ std::thread th([&q] { q.Add(3); });
+ EXPECT_EQ(q.Get(), 1);
+ EXPECT_EQ(q.Get(), 2);
+ EXPECT_EQ(q.Get(), 3);
+ th.join();
+}
+
+TEST(BoundedQueueTest, Resize) {
+ BoundedQueue<int> q(2);
+ q.Add(1);
+ q.Add(2);
+ q.SetCapacity(3);
+ q.Add(3);
+ EXPECT_EQ(q.Get(), 1);
+ EXPECT_EQ(q.Get(), 2);
+ EXPECT_EQ(q.Get(), 3);
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index f8d57f7..86f2063 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -18,82 +18,90 @@
#include <inttypes.h>
#include <sys/socket.h>
+#include <sys/syscall.h>
+#include <sys/un.h>
+#include <unistd.h>
#include <atomic>
+#include <new>
+
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+#include <unwindstack/Regs.h>
+#include <unwindstack/RegsGetLocal.h>
#include "perfetto/base/logging.h"
+#include "perfetto/base/sock_utils.h"
#include "perfetto/base/utils.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
namespace {
-std::atomic<uint64_t> global_sequence_number(0);
-constexpr size_t kFreePageBytes = base::kPageSize;
-constexpr size_t kFreePageSize = kFreePageBytes / sizeof(uint64_t);
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+// glibc does not define a wrapper around gettid, bionic does.
+pid_t gettid() {
+ return static_cast<pid_t>(syscall(__NR_gettid));
+}
+#endif
+
+std::vector<base::ScopedFile> ConnectPool(const std::string& sock_name,
+ size_t n) {
+ sockaddr_un addr;
+ socklen_t addr_size;
+ if (!base::MakeSockAddr(sock_name, &addr, &addr_size))
+ return {};
+
+ std::vector<base::ScopedFile> res;
+ res.reserve(n);
+ for (size_t i = 0; i < n; ++i) {
+ auto sock = base::CreateSocket();
+ if (connect(*sock, reinterpret_cast<sockaddr*>(&addr), addr_size) == -1) {
+ PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
+ continue;
+ }
+ res.emplace_back(std::move(sock));
+ }
+ return res;
+}
+
+inline bool IsMainThread() {
+ return getpid() == gettid();
+}
} // namespace
-FreePage::FreePage() : free_page_(kFreePageSize) {
- free_page_[0] = static_cast<uint64_t>(kFreePageBytes);
- free_page_[1] = static_cast<uint64_t>(RecordType::Free);
- offset_ = 2;
- // Code in Add assumes that offset is aligned to 2.
- PERFETTO_DCHECK(offset_ % 2 == 0);
+void FreePage::Add(const uint64_t addr,
+ const uint64_t sequence_number,
+ SocketPool* pool) {
+ std::lock_guard<std::mutex> l(mutex_);
+ if (offset_ == kFreePageSize) {
+ FlushLocked(pool);
+ // Now that we have flushed, reset to after the header.
+ offset_ = 0;
+ }
+ FreePageEntry& current_entry = free_page_.entries[offset_++];
+ current_entry.sequence_number = sequence_number;
+ current_entry.addr = addr;
}
-void FreePage::Add(const uint64_t addr, SocketPool* pool) {
- std::lock_guard<std::mutex> l(mtx_);
- if (offset_ == kFreePageSize)
- Flush(pool);
- static_assert(kFreePageSize % 2 == 0,
- "free page size needs to be divisible by two");
- free_page_[offset_++] = ++global_sequence_number;
- free_page_[offset_++] = addr;
- PERFETTO_DCHECK(offset_ % 2 == 0);
-}
-
-void FreePage::Flush(SocketPool* pool) {
+void FreePage::FlushLocked(SocketPool* pool) {
+ WireMessage msg = {};
+ msg.record_type = RecordType::Free;
+ msg.free_header = &free_page_;
BorrowedSocket fd(pool->Borrow());
- size_t written = 0;
- do {
- ssize_t wr = PERFETTO_EINTR(send(*fd, &free_page_[0] + written,
- kFreePageBytes - written, MSG_NOSIGNAL));
- if (wr == -1) {
- fd.Close();
- return;
- }
- written += static_cast<size_t>(wr);
- } while (written < kFreePageBytes);
- // Now that we have flushed, reset to after the header.
- offset_ = 2;
-}
-
-BorrowedSocket::BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
- : fd_(std::move(fd)), socket_pool_(socket_pool) {}
-
-int BorrowedSocket::operator*() {
- return get();
-}
-
-int BorrowedSocket::get() {
- return *fd_;
-}
-
-void BorrowedSocket::Close() {
- fd_.reset();
-}
-
-BorrowedSocket::~BorrowedSocket() {
- if (socket_pool_ != nullptr)
- socket_pool_->Return(std::move(fd_));
+ SendWireMessage(*fd, msg);
}
SocketPool::SocketPool(std::vector<base::ScopedFile> sockets)
: sockets_(std::move(sockets)), available_sockets_(sockets_.size()) {}
BorrowedSocket SocketPool::Borrow() {
- std::unique_lock<std::mutex> lck_(mtx_);
+ std::unique_lock<std::mutex> lck_(mutex_);
if (available_sockets_ == 0)
cv_.wait(lck_, [this] { return available_sockets_ > 0; });
PERFETTO_CHECK(available_sockets_ > 0);
@@ -101,13 +109,14 @@
}
void SocketPool::Return(base::ScopedFile sock) {
+ PERFETTO_CHECK(dead_sockets_ + available_sockets_ < sockets_.size());
if (!sock) {
// TODO(fmayer): Handle reconnect or similar.
// This is just to prevent a deadlock.
PERFETTO_CHECK(++dead_sockets_ != sockets_.size());
return;
}
- std::unique_lock<std::mutex> lck_(mtx_);
+ std::unique_lock<std::mutex> lck_(mutex_);
PERFETTO_CHECK(available_sockets_ < sockets_.size());
sockets_[available_sockets_++] = std::move(sock);
if (available_sockets_ == 1) {
@@ -116,4 +125,87 @@
}
}
+const char* GetThreadStackBase() {
+ pthread_attr_t attr;
+ if (pthread_getattr_np(pthread_self(), &attr) != 0)
+ return nullptr;
+ base::ScopedResource<pthread_attr_t*, pthread_attr_destroy, nullptr> cleanup(
+ &attr);
+
+ char* stackaddr;
+ size_t stacksize;
+ if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&stackaddr),
+ &stacksize) != 0)
+ return nullptr;
+ return stackaddr + stacksize;
+}
+
+Client::Client(std::vector<base::ScopedFile> socks)
+ : socket_pool_(std::move(socks)) {
+ uint64_t size = 0;
+ int fds[2];
+ fds[0] = open("/proc/self/maps", O_RDONLY | O_CLOEXEC);
+ fds[1] = open("/proc/self/mem", O_RDONLY | O_CLOEXEC);
+ base::Send(*socket_pool_.Borrow(), &size, sizeof(size), fds, 2);
+}
+
+Client::Client(const std::string& sock_name, size_t conns)
+ : Client(ConnectPool(sock_name, conns)) {}
+
+const char* Client::GetStackBase() {
+ if (IsMainThread()) {
+ if (!main_thread_stack_base_)
+ // Because pthread_attr_getstack reads and parses /proc/self/maps and
+ // /proc/self/stat, we have to cache the result here.
+ main_thread_stack_base_ = GetThreadStackBase();
+ return main_thread_stack_base_;
+ }
+ return GetThreadStackBase();
+}
+
+// The stack grows towards numerically smaller addresses, so the stack layout
+// of main calling malloc is as follows.
+//
+// +------------+
+// |SendWireMsg |
+// stacktop +--> +------------+ 0x1000
+// |RecordMalloc| +
+// +------------+ |
+// | malloc | |
+// +------------+ |
+// | main | v
+// stackbase +-> +------------+ 0xffff
+void Client::RecordMalloc(uint64_t alloc_size, uint64_t alloc_address) {
+ AllocMetadata metadata;
+ const char* stackbase = GetStackBase();
+ const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+ unwindstack::AsmGetRegs(metadata.register_data);
+
+ if (stackbase < stacktop) {
+ PERFETTO_DCHECK(false);
+ return;
+ }
+
+ uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+ metadata.alloc_size = alloc_size;
+ metadata.alloc_address = alloc_address;
+ metadata.stack_pointer = reinterpret_cast<uint64_t>(stacktop);
+ metadata.stack_pointer_offset = sizeof(AllocMetadata);
+ metadata.arch = unwindstack::Regs::CurrentArch();
+ metadata.sequence_number = ++sequence_number_;
+
+ WireMessage msg{};
+ msg.alloc_header = &metadata;
+ msg.payload = const_cast<char*>(stacktop);
+ msg.payload_size = static_cast<size_t>(stack_size);
+
+ BorrowedSocket sockfd = socket_pool_.Borrow();
+ // TODO(fmayer): Handle failure.
+ PERFETTO_CHECK(SendWireMessage(*sockfd, msg));
+}
+
+void Client::RecordFree(uint64_t alloc_address) {
+ free_page_.Add(alloc_address, ++sequence_number_, &socket_pool_);
+}
+
} // namespace perfetto
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index 7a4e459..46a571d 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -23,47 +23,11 @@
#include <vector>
#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
-class SocketPool;
-
-class FreePage {
- public:
- FreePage();
-
- // Can be called from any thread. Must not hold mtx_.`
- void Add(const uint64_t addr, SocketPool* pool);
-
- private:
- // Needs to be called holding mtx_.
- void Flush(SocketPool* pool);
-
- std::vector<uint64_t> free_page_;
- std::mutex mtx_;
- size_t offset_;
-};
-
-class BorrowedSocket {
- public:
- BorrowedSocket(const BorrowedSocket&) = delete;
- BorrowedSocket& operator=(const BorrowedSocket&) = delete;
- BorrowedSocket(BorrowedSocket&& other) {
- fd_ = std::move(other.fd_);
- socket_pool_ = other.socket_pool_;
- other.socket_pool_ = nullptr;
- }
-
- BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool);
- int operator*();
- int get();
- void Close();
- ~BorrowedSocket();
-
- private:
- base::ScopedFile fd_;
- SocketPool* socket_pool_ = nullptr;
-};
+class BorrowedSocket;
class SocketPool {
public:
@@ -74,13 +38,80 @@
private:
void Return(base::ScopedFile fd);
- std::mutex mtx_;
+ std::mutex mutex_;
std::condition_variable cv_;
std::vector<base::ScopedFile> sockets_;
size_t available_sockets_;
size_t dead_sockets_ = 0;
};
+// Socket borrowed from a SocketPool. Gets returned once it goes out of scope.
+class BorrowedSocket {
+ public:
+ BorrowedSocket(const BorrowedSocket&) = delete;
+ BorrowedSocket& operator=(const BorrowedSocket&) = delete;
+ BorrowedSocket(BorrowedSocket&& other) noexcept {
+ fd_ = std::move(other.fd_);
+ socket_pool_ = other.socket_pool_;
+ other.socket_pool_ = nullptr;
+ }
+
+ BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
+ : fd_(std::move(fd)), socket_pool_(socket_pool) {}
+
+ ~BorrowedSocket() {
+ if (socket_pool_ != nullptr)
+ socket_pool_->Return(std::move(fd_));
+ }
+
+ int operator*() { return get(); }
+
+ int get() { return *fd_; }
+
+ void Close() { fd_.reset(); }
+
+ private:
+ base::ScopedFile fd_;
+ SocketPool* socket_pool_ = nullptr;
+};
+
+// Cache for frees that have been observed. It is infeasible to send every
+// free separately, so we batch and send the whole buffer once it is full.
+class FreePage {
+ public:
+ // Add address to buffer. Flush if necessary using a socket borrowed from
+ // pool.
+ // Can be called from any thread. Must not hold mutex_.`
+ void Add(const uint64_t addr, uint64_t sequence_number, SocketPool* pool);
+
+ private:
+ // Needs to be called holding mutex_.
+ void FlushLocked(SocketPool* pool);
+
+ FreeMetadata free_page_;
+ std::mutex mutex_;
+ size_t offset_ = 0;
+};
+
+const char* GetThreadStackBase();
+
+// This is created and owned by the malloc hooks.
+class Client {
+ public:
+ Client(std::vector<base::ScopedFile> sockets);
+ Client(const std::string& sock_name, size_t conns);
+ void RecordMalloc(uint64_t alloc_size, uint64_t alloc_address);
+ void RecordFree(uint64_t alloc_address);
+
+ private:
+ const char* GetStackBase();
+
+ SocketPool socket_pool_;
+ FreePage free_page_;
+ const char* main_thread_stack_base_ = nullptr;
+ std::atomic<uint64_t> sequence_number_{0};
+};
+
} // namespace perfetto
#endif // SRC_PROFILING_MEMORY_CLIENT_H_
diff --git a/src/profiling/memory/client_unittest.cc b/src/profiling/memory/client_unittest.cc
index a69bae3..df848da 100644
--- a/src/profiling/memory/client_unittest.cc
+++ b/src/profiling/memory/client_unittest.cc
@@ -67,5 +67,17 @@
t2.join();
}
+TEST(ClientTest, GetThreadStackBase) {
+ std::thread th([] {
+ const char* stackbase = GetThreadStackBase();
+ ASSERT_NE(stackbase, nullptr);
+ // The implementation assumes the stack grows from higher addresses to
+ // lower. We will need to rework once we encounter architectures where the
+ // stack grows the other way.
+ EXPECT_GT(stackbase, __builtin_frame_address(0));
+ });
+ th.join();
+}
+
} // namespace
} // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
new file mode 100644
index 0000000..c9b6219
--- /dev/null
+++ b/src/profiling/memory/heapprofd_integrationtest.cc
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/base/test/test_task_runner.h"
+#include "src/ipc/test/test_socket.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/socket_listener.h"
+#include "src/profiling/memory/unwinding.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+namespace {
+
+constexpr char kSocketName[] = TEST_SOCK_NAME("heapprofd_integrationtest");
+
+void __attribute__((noinline)) OtherFunction(Client* client) {
+ client->RecordMalloc(10, 0xf00);
+}
+
+void __attribute__((noinline)) SomeFunction(Client* client) {
+ OtherFunction(client);
+}
+
+class HeapprofdIntegrationTest : public ::testing::Test {
+ protected:
+ void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
+ void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
+};
+
+// TODO(fmayer): Fix out of tree integration test.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+#define MAYBE_EndToEnd EndToEnd
+#else
+#define MAYBE_EndToEnd DISABLED_EndToEnd
+#endif
+
+TEST_F(HeapprofdIntegrationTest, MAYBE_EndToEnd) {
+ GlobalCallstackTrie callsites;
+
+ base::TestTaskRunner task_runner;
+ auto done = task_runner.CreateCheckpoint("done");
+ SocketListener listener(
+ [&done](UnwindingRecord r) {
+ // TODO(fmayer): Test symbolization and result of unwinding.
+ BookkeepingRecord bookkeeping_record;
+ ASSERT_TRUE(HandleUnwindingRecord(&r, &bookkeeping_record));
+ HandleBookkeepingRecord(&bookkeeping_record);
+ done();
+ },
+ &callsites);
+
+ auto sock = ipc::UnixSocket::Listen(kSocketName, &listener, &task_runner);
+ if (!sock->is_listening()) {
+ PERFETTO_ELOG("Socket not listening.");
+ PERFETTO_CHECK(false);
+ }
+ Client client(kSocketName, 1);
+ SomeFunction(&client);
+ task_runner.RunUntilCheckpoint("done");
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index ea53977..991006e 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -15,9 +15,12 @@
*/
#include <stdlib.h>
+#include <array>
#include <memory>
+#include <vector>
#include "src/ipc/unix_socket.h"
+#include "src/profiling/memory/bounded_queue.h"
#include "src/profiling/memory/socket_listener.h"
#include "perfetto/base/unix_task_runner.h"
@@ -25,15 +28,60 @@
namespace perfetto {
namespace {
+constexpr size_t kUnwinderQueueSize = 1000;
+constexpr size_t kBookkeepingQueueSize = 1000;
+constexpr size_t kUnwinderThreads = 5;
+
+// We create kUnwinderThreads unwinding threads and one bookeeping thread.
+// The bookkeeping thread is singleton in order to avoid expensive and
+// complicated synchronisation in the bookkeeping.
+//
+// We wire up the system by creating BoundedQueues between the threads. The main
+// thread runs the TaskRunner driving the SocketListener. The unwinding thread
+// takes the data received by the SocketListener and if it is a malloc does
+// stack unwinding, and if it is a free just forwards the content of the record
+// to the bookkeeping thread.
+//
+// +--------------+
+// |SocketListener|
+// +------+-------+
+// |
+// +--UnwindingRecord -+
+// | |
+// +--------v-------+ +-------v--------+
+// |Unwinding Thread| |Unwinding Thread|
+// +--------+-------+ +-------+--------+
+// | |
+// +-BookkeepingRecord +
+// |
+// +--------v---------+
+// |Bookkeeping Thread|
+// +------------------+
int HeapprofdMain(int argc, char** argv) {
+ GlobalCallstackTrie callsites;
std::unique_ptr<ipc::UnixSocket> sock;
- SocketListener listener(
- [](size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>) {
- // TODO(fmayer): Wire this up to a worker thread that does the
- // unwinding.
- PERFETTO_LOG("Record received.");
- });
+ BoundedQueue<BookkeepingRecord> callsites_queue(kBookkeepingQueueSize);
+ std::thread bookkeeping_thread(
+ [&callsites_queue] { BookkeepingMainLoop(&callsites_queue); });
+
+ std::array<BoundedQueue<UnwindingRecord>, kUnwinderThreads> unwinder_queues;
+ for (size_t i = 0; i < kUnwinderThreads; ++i)
+ unwinder_queues[i].SetSize(kUnwinderQueueSize);
+ std::vector<std::thread> unwinding_threads;
+ unwinding_threads.reserve(kUnwinderThreads);
+ for (size_t i = 0; i < kUnwinderThreads; ++i) {
+ unwinding_threads.emplace_back([&unwinder_queues, &callsites_queue, i] {
+ UnwindingMainLoop(&unwinder_queues[i], &callsites_queue);
+ });
+ }
+
+ auto on_record_received = [&unwinder_queues](UnwindingRecord r) {
+ unwinder_queues[static_cast<size_t>(r.pid) % kUnwinderThreads].Add(
+ std::move(r));
+ };
+ SocketListener listener(std::move(on_record_received), &callsites);
+
base::UnixTaskRunner read_task_runner;
if (argc == 2) {
// Allow to be able to manually specify the socket to listen on
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
index 81b7390..f857a42 100644
--- a/src/profiling/memory/socket_listener.cc
+++ b/src/profiling/memory/socket_listener.cc
@@ -82,7 +82,7 @@
if (it == process_metadata_.end() || it->second.expired()) {
// We have not seen the PID yet or the PID is being recycled.
entry->process_metadata = std::make_shared<ProcessMetadata>(
- peer_pid, std::move(maps_fd), std::move(mem_fd));
+ peer_pid, std::move(maps_fd), std::move(mem_fd), callsites_);
process_metadata_[peer_pid] = entry->process_metadata;
} else {
// If the process already has metadata, this is an additional socket for
@@ -96,19 +96,30 @@
size_t size,
std::unique_ptr<uint8_t[]> buf) {
auto it = sockets_.find(self);
- if (it == sockets_.end())
+ if (it == sockets_.end()) {
// This happens for zero-length records, because the callback gets called
// in the first call to Read, before InitProcess is called. Because zero
// length records are useless anyway, this is not a problem.
return;
+ }
Entry& entry = it->second;
+ if (!entry.process_metadata) {
+ PERFETTO_DLOG("Received record without process metadata.");
+ return;
+ }
+
+ if (size == 0) {
+ PERFETTO_DLOG("Dropping empty record.");
+ return;
+ }
// This needs to be a weak_ptr for two reasons:
// 1) most importantly, the weak_ptr in process_metadata_ should expire as
// soon as the last socket for a process goes away. Otherwise, a recycled
// PID might reuse incorrect metadata.
// 2) it is a waste to unwind for a process that had already gone away.
std::weak_ptr<ProcessMetadata> weak_metadata(entry.process_metadata);
- callback_function_(size, std::move(buf), std::move(weak_metadata));
+ callback_function_({entry.process_metadata->pid, size, std::move(buf),
+ std::move(weak_metadata)});
}
} // namespace perfetto
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
index ba5d92e..8b1609d 100644
--- a/src/profiling/memory/socket_listener.h
+++ b/src/profiling/memory/socket_listener.h
@@ -18,6 +18,7 @@
#define SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
#include "src/ipc/unix_socket.h"
+#include "src/profiling/memory/bookkeeping.h"
#include "src/profiling/memory/record_reader.h"
#include "src/profiling/memory/unwinding.h"
@@ -28,10 +29,9 @@
class SocketListener : public ipc::UnixSocket::EventListener {
public:
- SocketListener(std::function<void(size_t,
- std::unique_ptr<uint8_t[]>,
- std::weak_ptr<ProcessMetadata>)> fn)
- : callback_function_(std::move(fn)) {}
+ SocketListener(std::function<void(UnwindingRecord)> fn,
+ GlobalCallstackTrie* callsites)
+ : callback_function_(std::move(fn)), callsites_(callsites) {}
void OnDisconnect(ipc::UnixSocket* self) override;
void OnNewIncomingConnection(
ipc::UnixSocket* self,
@@ -63,9 +63,8 @@
std::map<ipc::UnixSocket*, Entry> sockets_;
std::map<pid_t, std::weak_ptr<ProcessMetadata>> process_metadata_;
- std::function<
- void(size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>)>
- callback_function_;
+ std::function<void(UnwindingRecord)> callback_function_;
+ GlobalCallstackTrie* callsites_;
};
} // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index da7efdb..2dd6310 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -46,16 +46,15 @@
base::TestTaskRunner task_runner;
auto callback_called = task_runner.CreateCheckpoint("callback.called");
auto connected = task_runner.CreateCheckpoint("connected");
- auto callback_fn = [&callback_called](
- size_t size, std::unique_ptr<uint8_t[]> buf,
- std::weak_ptr<ProcessMetadata> metadata) {
- ASSERT_EQ(size, 1u);
- ASSERT_EQ(buf[0], '1');
- ASSERT_FALSE(metadata.expired());
+ auto callback_fn = [&callback_called](UnwindingRecord r) {
+ ASSERT_EQ(r.size, 1u);
+ ASSERT_EQ(r.data[0], '1');
+ ASSERT_FALSE(r.metadata.expired());
callback_called();
};
- SocketListener listener(std::move(callback_fn));
+ GlobalCallstackTrie bookkeeping;
+ SocketListener listener(std::move(callback_fn), &bookkeeping);
MockEventListener client_listener;
EXPECT_CALL(client_listener, OnConnect(_, _))
.WillOnce(InvokeWithoutArgs(connected));
diff --git a/src/profiling/memory/transport_data.h b/src/profiling/memory/transport_data.h
deleted file mode 100644
index c0ef3ef..0000000
--- a/src/profiling/memory/transport_data.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// The data types used for communication between heapprofd and the client
-// embedded in processes that are being profiled.
-
-#ifndef SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-#define SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-
-#include <inttypes.h>
-#include <unwindstack/Elf.h>
-
-namespace perfetto {
-
-// Use uint64_t to make sure the following data is aligned as 64bit is the
-// strongest alignment requirement.
-enum class RecordType : uint64_t {
- Free = 0,
- Malloc = 1,
-};
-
-struct AllocMetadata {
- // Size of the allocation that was made.
- uint64_t alloc_size;
- // Pointer returned by malloc(2) for this allocation.
- uint64_t alloc_address;
- // Current value of the stack pointer.
- uint64_t stack_pointer;
- // Offset of the data at stack_pointer from the start of this record.
- uint64_t stack_pointer_offset;
- // CPU architecture of the client. This determines the size of the
- // register data that follows this struct.
- unwindstack::ArchEnum arch;
-};
-
-} // namespace perfetto
-
-#endif // SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index a317600..1dcc8c0 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -42,8 +42,8 @@
#include "perfetto/base/file_utils.h"
#include "perfetto/base/logging.h"
#include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
#include "src/profiling/memory/unwinding.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
@@ -155,34 +155,19 @@
maps_.clear();
}
-bool DoUnwind(void* mem,
- size_t sz,
- ProcessMetadata* metadata,
- std::vector<unwindstack::FrameData>* out) {
- if (sz < sizeof(AllocMetadata)) {
- PERFETTO_ELOG("size");
- return false;
- }
- AllocMetadata* alloc_metadata = reinterpret_cast<AllocMetadata*>(mem);
- if (sizeof(AllocMetadata) + RegSize(alloc_metadata->arch) > sz)
- return false;
- void* reg_data = static_cast<uint8_t*>(mem) + sizeof(AllocMetadata);
+bool DoUnwind(WireMessage* msg, ProcessMetadata* metadata, AllocRecord* out) {
+ AllocMetadata* alloc_metadata = msg->alloc_header;
std::unique_ptr<unwindstack::Regs> regs(
- CreateFromRawData(alloc_metadata->arch, reg_data));
+ CreateFromRawData(alloc_metadata->arch, alloc_metadata->register_data));
if (regs == nullptr) {
PERFETTO_ELOG("regs");
return false;
}
- if (alloc_metadata->stack_pointer_offset < sizeof(AllocMetadata) ||
- alloc_metadata->stack_pointer_offset > sz) {
- PERFETTO_ELOG("out-of-bound stack_pointer_offset");
- return false;
- }
- uint8_t* stack =
- reinterpret_cast<uint8_t*>(mem) + alloc_metadata->stack_pointer_offset;
+ out->alloc_metadata = *alloc_metadata;
+ uint8_t* stack = reinterpret_cast<uint8_t*>(msg->payload);
std::shared_ptr<unwindstack::Memory> mems = std::make_shared<StackMemory>(
*metadata->mem_fd, alloc_metadata->stack_pointer, stack,
- sz - alloc_metadata->stack_pointer_offset);
+ msg->payload_size);
unwindstack::Unwinder unwinder(kMaxFrames, &metadata->maps, regs.get(), mems);
// Surpress incorrect "variable may be uninitialized" error for if condition
// after this loop. error_code = LastErrorCode gets run at least once.
@@ -198,10 +183,83 @@
break;
}
if (error_code == 0)
- *out = unwinder.frames();
+ out->frames = unwinder.frames();
else
PERFETTO_DLOG("unwinding failed %" PRIu8, error_code);
return error_code == 0;
}
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out) {
+ WireMessage msg;
+ if (!ReceiveWireMessage(reinterpret_cast<char*>(rec->data.get()), rec->size,
+ &msg))
+ return false;
+ switch (msg.record_type) {
+ case RecordType::Malloc: {
+ std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+ if (!metadata)
+ // Process has already gone away.
+ return false;
+
+ out->metadata = std::move(rec->metadata);
+ out->free_record = {};
+ return DoUnwind(&msg, metadata.get(), &out->alloc_record);
+ }
+ case RecordType::Free: {
+ // We need to keep this alive, because msg.free_header is a pointer into
+ // this.
+ out->metadata = std::move(rec->metadata);
+ out->free_record.free_data = std::move(rec->data);
+ out->free_record.metadata = msg.free_header;
+ out->alloc_record = {};
+ return true;
+ }
+ }
+}
+
+__attribute__((noreturn)) void UnwindingMainLoop(
+ BoundedQueue<UnwindingRecord>* input_queue,
+ BoundedQueue<BookkeepingRecord>* output_queue) {
+ for (;;) {
+ UnwindingRecord rec = input_queue->Get();
+ BookkeepingRecord out;
+ if (HandleUnwindingRecord(&rec, &out))
+ output_queue->Add(std::move(out));
+ }
+}
+
+void HandleBookkeepingRecord(BookkeepingRecord* rec) {
+ std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+ if (!metadata)
+ // Process has already gone away.
+ return;
+
+ if (rec->free_record.free_data) {
+ FreeRecord& free_rec = rec->free_record;
+ FreePageEntry* entries = free_rec.metadata->entries;
+ uint64_t num_entries = free_rec.metadata->num_entries;
+ for (size_t i = 0; i < num_entries; ++i) {
+ const FreePageEntry& entry = entries[i];
+ metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
+ }
+ } else {
+ AllocRecord& alloc_rec = rec->alloc_record;
+ std::vector<CodeLocation> code_locations;
+ for (unwindstack::FrameData& frame : alloc_rec.frames)
+ code_locations.emplace_back(frame.map_name, frame.function_name);
+ metadata->heap_dump.RecordMalloc(code_locations,
+ alloc_rec.alloc_metadata.alloc_address,
+ alloc_rec.alloc_metadata.alloc_size,
+ alloc_rec.alloc_metadata.sequence_number);
+ }
+}
+
+__attribute__((noreturn)) void BookkeepingMainLoop(
+ BoundedQueue<BookkeepingRecord>* input_queue) {
+ for (;;) {
+ BookkeepingRecord rec = input_queue->Get();
+ HandleBookkeepingRecord(&rec);
+ }
+}
+
} // namespace perfetto
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index 455be43..1bdcf0d 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -20,6 +20,9 @@
#include <unwindstack/Maps.h>
#include <unwindstack/Unwinder.h>
#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/bookkeeping.h"
+#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
@@ -36,13 +39,20 @@
};
struct ProcessMetadata {
- ProcessMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
- : pid(p), maps(std::move(maps_fd)), mem_fd(std::move(mem)) {
+ ProcessMetadata(pid_t p,
+ base::ScopedFile maps_fd,
+ base::ScopedFile mem,
+ GlobalCallstackTrie* callsites)
+ : pid(p),
+ maps(std::move(maps_fd)),
+ mem_fd(std::move(mem)),
+ heap_dump(callsites) {
PERFETTO_CHECK(maps.Parse());
}
pid_t pid;
FileDescriptorMaps maps;
base::ScopedFile mem_fd;
+ HeapTracker heap_dump;
};
// Overlays size bytes pointed to by stack for addresses in [sp, sp + size).
@@ -62,10 +72,39 @@
size_t RegSize(unwindstack::ArchEnum arch);
-bool DoUnwind(void* mem,
- size_t sz,
- ProcessMetadata* metadata,
- std::vector<unwindstack::FrameData>* out);
+struct UnwindingRecord {
+ pid_t pid;
+ size_t size;
+ std::unique_ptr<uint8_t[]> data;
+ std::weak_ptr<ProcessMetadata> metadata;
+};
+
+struct FreeRecord {
+ std::unique_ptr<uint8_t[]> free_data;
+ FreeMetadata* metadata;
+};
+
+struct AllocRecord {
+ AllocMetadata alloc_metadata;
+ std::vector<unwindstack::FrameData> frames;
+};
+
+struct BookkeepingRecord {
+ // TODO(fmayer): Use a union.
+ std::weak_ptr<ProcessMetadata> metadata;
+ AllocRecord alloc_record;
+ FreeRecord free_record;
+};
+
+bool DoUnwind(WireMessage*, ProcessMetadata* metadata, AllocRecord* out);
+
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out);
+void HandleBookkeepingRecord(BookkeepingRecord* rec);
+
+void UnwindingMainLoop(BoundedQueue<UnwindingRecord>* input_queue,
+ BoundedQueue<BookkeepingRecord>* output_queue);
+
+void BookkeepingMainLoop(BoundedQueue<BookkeepingRecord>* input_queue);
} // namespace perfetto
diff --git a/src/profiling/memory/unwinding_unittest.cc b/src/profiling/memory/unwinding_unittest.cc
index 06f3ba9..f80a0ee 100644
--- a/src/profiling/memory/unwinding_unittest.cc
+++ b/src/profiling/memory/unwinding_unittest.cc
@@ -16,7 +16,8 @@
#include "src/profiling/memory/unwinding.h"
#include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/wire_protocol.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@@ -70,20 +71,6 @@
#define MAYBE_DoUnwind DISABLED_DoUnwind
#endif
-uint8_t* GetStackBase() {
- pthread_t t = pthread_self();
- pthread_attr_t attr;
- if (pthread_getattr_np(t, &attr) != 0) {
- return nullptr;
- }
- uint8_t* x;
- size_t s;
- if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&x), &s) != 0)
- return nullptr;
- pthread_attr_destroy(&attr);
- return x + s;
-}
-
// This is needed because ASAN thinks copying the whole stack is a buffer
// underrun.
void __attribute__((noinline))
@@ -95,50 +82,59 @@
to[i] = from[i];
}
-std::pair<std::unique_ptr<uint8_t[]>, size_t> __attribute__((noinline))
-GetRecord() {
- const uint8_t* stackbase = GetStackBase();
- PERFETTO_CHECK(stackbase != nullptr);
- const uint8_t* stacktop =
- reinterpret_cast<uint8_t*>(__builtin_frame_address(0));
- PERFETTO_CHECK(stacktop != nullptr);
- PERFETTO_CHECK(stacktop < stackbase);
- const size_t stack_size = static_cast<size_t>(stackbase - stacktop);
- std::unique_ptr<unwindstack::Regs> regs(unwindstack::Regs::CreateFromLocal());
- const unwindstack::ArchEnum arch = regs->CurrentArch();
- const size_t reg_size = RegSize(arch);
- const size_t total_size = sizeof(AllocMetadata) + reg_size + stack_size;
- std::unique_ptr<uint8_t[]> buf(new uint8_t[total_size]);
- AllocMetadata* metadata = reinterpret_cast<AllocMetadata*>(buf.get());
- metadata->alloc_size = 0;
- metadata->alloc_address = 0;
+struct RecordMemory {
+ std::unique_ptr<uint8_t[]> payload;
+ std::unique_ptr<AllocMetadata> metadata;
+};
+
+RecordMemory __attribute__((noinline)) GetRecord(WireMessage* msg) {
+ std::unique_ptr<AllocMetadata> metadata(new AllocMetadata);
+
+ const char* stackbase = GetThreadStackBase();
+ const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+ unwindstack::AsmGetRegs(metadata->register_data);
+
+ if (stackbase < stacktop) {
+ PERFETTO_DCHECK(false);
+ return {nullptr, nullptr};
+ }
+ uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+
+ metadata->alloc_size = 10;
+ metadata->alloc_address = 0x10;
metadata->stack_pointer = reinterpret_cast<uint64_t>(stacktop);
- metadata->stack_pointer_offset = sizeof(AllocMetadata) + reg_size;
- metadata->arch = arch;
- unwindstack::RegsGetLocal(regs.get());
- // Make sure nothing above has changed the stack pointer, just for extra
- // paranoia.
- PERFETTO_CHECK(stacktop ==
- reinterpret_cast<uint8_t*>(__builtin_frame_address(0)));
- memcpy(buf.get() + sizeof(AllocMetadata), regs->RawData(), reg_size);
- UnsafeMemcpy(buf.get() + sizeof(AllocMetadata) + reg_size, stacktop,
- stack_size);
- return {std::move(buf), total_size};
+ metadata->stack_pointer_offset = sizeof(AllocMetadata);
+ metadata->arch = unwindstack::Regs::CurrentArch();
+ metadata->sequence_number = 1;
+
+ std::unique_ptr<uint8_t[]> payload(new uint8_t[stack_size]);
+ UnsafeMemcpy(&payload[0], stacktop, stack_size);
+
+ *msg = {};
+ msg->alloc_header = metadata.get();
+ msg->payload = reinterpret_cast<char*>(payload.get());
+ msg->payload_size = static_cast<size_t>(stack_size);
+ return {std::move(payload), std::move(metadata)};
}
// TODO(fmayer): Investigate why this fails out of tree.
TEST(UnwindingTest, MAYBE_DoUnwind) {
base::ScopedFile proc_maps(open("/proc/self/maps", O_RDONLY));
base::ScopedFile proc_mem(open("/proc/self/mem", O_RDONLY));
- ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem));
- auto record = GetRecord();
- std::vector<unwindstack::FrameData> out;
- ASSERT_TRUE(DoUnwind(record.first.get(), record.second, &metadata, &out));
+ GlobalCallstackTrie callsites;
+ ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem),
+ &callsites);
+ WireMessage msg;
+ auto record = GetRecord(&msg);
+ AllocRecord out;
+ ASSERT_TRUE(DoUnwind(&msg, &metadata, &out));
int st;
- std::unique_ptr<char> demangled(
- abi::__cxa_demangle(out[0].function_name.c_str(), nullptr, nullptr, &st));
+ std::unique_ptr<char> demangled(abi::__cxa_demangle(
+ out.frames[0].function_name.c_str(), nullptr, nullptr, &st));
ASSERT_EQ(st, 0);
- ASSERT_STREQ(demangled.get(), "perfetto::(anonymous namespace)::GetRecord()");
+ ASSERT_STREQ(
+ demangled.get(),
+ "perfetto::(anonymous namespace)::GetRecord(perfetto::WireMessage*)");
}
} // namespace
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
new file mode 100644
index 0000000..39373f9
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.cc
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+namespace perfetto {
+
+namespace {
+template <typename T>
+bool ViewAndAdvance(char** ptr, T** out, const char* end) {
+ if (end - sizeof(T) < *ptr)
+ return false;
+ *out = reinterpret_cast<T*>(ptr);
+ ptr += sizeof(T);
+ return true;
+}
+} // namespace
+
+bool SendWireMessage(int sock, const WireMessage& msg) {
+ uint64_t total_size;
+ struct iovec iovecs[4] = {};
+ // TODO(fmayer): Maye pack these two.
+ iovecs[0].iov_base = &total_size;
+ iovecs[0].iov_len = sizeof(total_size);
+ iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
+ iovecs[1].iov_len = sizeof(msg.record_type);
+ if (msg.alloc_header) {
+ iovecs[2].iov_base = msg.alloc_header;
+ iovecs[2].iov_len = sizeof(*msg.alloc_header);
+ } else if (msg.free_header) {
+ iovecs[2].iov_base = msg.free_header;
+ iovecs[2].iov_len = sizeof(*msg.free_header);
+ } else {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
+
+ iovecs[3].iov_base = msg.payload;
+ iovecs[3].iov_len = msg.payload_size;
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iovecs;
+ if (msg.payload) {
+ hdr.msg_iovlen = base::ArraySize(iovecs);
+ total_size = iovecs[1].iov_len + iovecs[2].iov_len + iovecs[3].iov_len;
+ } else {
+ // If we are not sending payload, just ignore that iovec.
+ hdr.msg_iovlen = base::ArraySize(iovecs) - 1;
+ total_size = iovecs[1].iov_len + iovecs[2].iov_len;
+ }
+
+ ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+ return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
+}
+
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out) {
+ RecordType* record_type;
+ char* end = buf + size;
+ if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
+ return false;
+ switch (*record_type) {
+ case RecordType::Malloc:
+ if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+ return false;
+ out->payload = buf;
+ if (buf > end) {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
+ out->payload_size = static_cast<size_t>(end - buf);
+ break;
+ case RecordType::Free:
+ if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+ return false;
+ break;
+ }
+ out->record_type = *record_type;
+ return true;
+}
+
+} // namespace perfetto
diff --git a/src/profiling/memory/wire_protocol.h b/src/profiling/memory/wire_protocol.h
new file mode 100644
index 0000000..2810872
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.h
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The data types used for communication between heapprofd and the client
+// embedded in processes that are being profiled.
+
+#ifndef SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+#define SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+
+#include <inttypes.h>
+#include <unwindstack/Elf.h>
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+
+namespace perfetto {
+
+// Types needed for the wire format used for communication between the client
+// and heapprofd. The basic format of a record is
+// record size (uint64_t) | record type (RecordType = uint64_t) | record
+// If record type is malloc, the record format is AllocMetdata | raw stack.
+// If the record type is free, the record is a sequence of FreePageEntry.
+
+// Use uint64_t to make sure the following data is aligned as 64bit is the
+// strongest alignment requirement.
+
+// C++11 std::max is not constexpr.
+constexpr size_t constexpr_max(size_t x, size_t y) {
+ return x > y ? x : y;
+}
+
+constexpr size_t kMaxRegisterDataSize = constexpr_max(
+ constexpr_max(constexpr_max(unwindstack::ARM_REG_LAST * sizeof(uint32_t),
+ unwindstack::ARM64_REG_LAST * sizeof(uint64_t)),
+ unwindstack::X86_REG_LAST * sizeof(uint32_t)),
+ unwindstack::X86_64_REG_LAST * sizeof(uint64_t));
+
+constexpr size_t kFreePageSize = 1024;
+
+enum class RecordType : uint64_t {
+ Free = 0,
+ Malloc = 1,
+};
+
+struct AllocMetadata {
+ uint64_t sequence_number;
+ // Size of the allocation that was made.
+ uint64_t alloc_size;
+ // Pointer returned by malloc(2) for this allocation.
+ uint64_t alloc_address;
+ // Current value of the stack pointer.
+ uint64_t stack_pointer;
+ // Offset of the data at stack_pointer from the start of this record.
+ uint64_t stack_pointer_offset;
+ // CPU architecture of the client. This determines the size of the
+ // register data that follows this struct.
+ unwindstack::ArchEnum arch;
+ char register_data[kMaxRegisterDataSize];
+};
+
+struct FreePageEntry {
+ uint64_t sequence_number;
+ uint64_t addr;
+};
+
+struct FreeMetadata {
+ uint64_t num_entries;
+ FreePageEntry entries[kFreePageSize];
+};
+
+struct WireMessage {
+ RecordType record_type;
+
+ AllocMetadata* alloc_header;
+ FreeMetadata* free_header;
+
+ char* payload;
+ size_t payload_size;
+};
+
+bool SendWireMessage(int sock, const WireMessage& msg);
+
+// Parse message received over the wire.
+// |buf| has to outlive |out|.
+// If buf is not a valid message, return false.
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out);
+
+} // namespace perfetto
+
+#endif // SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_