Merge "trace_processor: change cursor class to be scoped to a filter operation"
diff --git a/Android.bp b/Android.bp
index 9365d34..e7f0d6c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -35,12 +35,12 @@
"src/base/file_utils.cc",
"src/base/metatrace.cc",
"src/base/page_allocator.cc",
- "src/base/sock_utils.cc",
"src/base/string_splitter.cc",
"src/base/string_utils.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
"src/base/time.cc",
+ "src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
"src/base/virtual_destructors.cc",
"src/base/watchdog_posix.cc",
@@ -49,7 +49,6 @@
"src/ipc/deferred.cc",
"src/ipc/host_impl.cc",
"src/ipc/service_proxy.cc",
- "src/ipc/unix_socket.cc",
"src/ipc/virtual_destructors.cc",
"src/protozero/message.cc",
"src/protozero/message_handle.cc",
@@ -161,12 +160,12 @@
"src/base/file_utils.cc",
"src/base/metatrace.cc",
"src/base/page_allocator.cc",
- "src/base/sock_utils.cc",
"src/base/string_splitter.cc",
"src/base/string_utils.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
"src/base/time.cc",
+ "src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
"src/base/virtual_destructors.cc",
"src/base/watchdog_posix.cc",
@@ -175,7 +174,6 @@
"src/ipc/deferred.cc",
"src/ipc/host_impl.cc",
"src/ipc/service_proxy.cc",
- "src/ipc/unix_socket.cc",
"src/ipc/virtual_destructors.cc",
"src/perfetto_cmd/main.cc",
"src/perfetto_cmd/perfetto_cmd.cc",
@@ -302,7 +300,6 @@
"src/base/file_utils.cc",
"src/base/metatrace.cc",
"src/base/page_allocator.cc",
- "src/base/sock_utils.cc",
"src/base/string_splitter.cc",
"src/base/string_utils.cc",
"src/base/temp_file.cc",
@@ -311,6 +308,7 @@
"src/base/test/vm_test_utils.cc",
"src/base/thread_checker.cc",
"src/base/time.cc",
+ "src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
"src/base/virtual_destructors.cc",
"src/base/watchdog_posix.cc",
@@ -319,7 +317,6 @@
"src/ipc/deferred.cc",
"src/ipc/host_impl.cc",
"src/ipc/service_proxy.cc",
- "src/ipc/unix_socket.cc",
"src/ipc/virtual_destructors.cc",
"src/protozero/message.cc",
"src/protozero/message_handle.cc",
@@ -3618,12 +3615,12 @@
"src/base/file_utils.cc",
"src/base/metatrace.cc",
"src/base/page_allocator.cc",
- "src/base/sock_utils.cc",
"src/base/string_splitter.cc",
"src/base/string_utils.cc",
"src/base/temp_file.cc",
"src/base/thread_checker.cc",
"src/base/time.cc",
+ "src/base/unix_socket.cc",
"src/base/unix_task_runner.cc",
"src/base/virtual_destructors.cc",
"src/base/watchdog_posix.cc",
@@ -3632,7 +3629,6 @@
"src/ipc/deferred.cc",
"src/ipc/host_impl.cc",
"src/ipc/service_proxy.cc",
- "src/ipc/unix_socket.cc",
"src/ipc/virtual_destructors.cc",
"src/protozero/message.cc",
"src/protozero/message_handle.cc",
@@ -3809,7 +3805,6 @@
"src/base/page_allocator.cc",
"src/base/page_allocator_unittest.cc",
"src/base/scoped_file_unittest.cc",
- "src/base/sock_utils.cc",
"src/base/string_splitter.cc",
"src/base/string_splitter_unittest.cc",
"src/base/string_utils.cc",
@@ -3825,6 +3820,8 @@
"src/base/thread_checker_unittest.cc",
"src/base/time.cc",
"src/base/time_unittest.cc",
+ "src/base/unix_socket.cc",
+ "src/base/unix_socket_unittest.cc",
"src/base/unix_task_runner.cc",
"src/base/utils_unittest.cc",
"src/base/virtual_destructors.cc",
@@ -3841,16 +3838,16 @@
"src/ipc/host_impl_unittest.cc",
"src/ipc/service_proxy.cc",
"src/ipc/test/ipc_integrationtest.cc",
- "src/ipc/unix_socket.cc",
- "src/ipc/unix_socket_unittest.cc",
"src/ipc/virtual_destructors.cc",
"src/perfetto_cmd/perfetto_cmd.cc",
"src/perfetto_cmd/rate_limiter.cc",
"src/perfetto_cmd/rate_limiter_unittest.cc",
"src/profiling/memory/bookkeeping.cc",
"src/profiling/memory/bookkeeping_unittest.cc",
+ "src/profiling/memory/bounded_queue_unittest.cc",
"src/profiling/memory/client.cc",
"src/profiling/memory/client_unittest.cc",
+ "src/profiling/memory/heapprofd_integrationtest.cc",
"src/profiling/memory/record_reader.cc",
"src/profiling/memory/record_reader_unittest.cc",
"src/profiling/memory/socket_listener.cc",
@@ -3859,6 +3856,7 @@
"src/profiling/memory/string_interner_unittest.cc",
"src/profiling/memory/unwinding.cc",
"src/profiling/memory/unwinding_unittest.cc",
+ "src/profiling/memory/wire_protocol.cc",
"src/protozero/message.cc",
"src/protozero/message_handle.cc",
"src/protozero/message_handle_unittest.cc",
diff --git a/include/perfetto/base/BUILD.gn b/include/perfetto/base/BUILD.gn
index ea5fa38..10da3d0 100644
--- a/include/perfetto/base/BUILD.gn
+++ b/include/perfetto/base/BUILD.gn
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import("../../../gn/perfetto.gni")
+
source_set("base") {
sources = [
"build_config.h",
@@ -39,4 +41,7 @@
if (is_android) {
sources += [ "android_task_runner.h" ]
}
+ if (!build_with_chromium) {
+ sources += [ "unix_socket.h" ]
+ }
}
diff --git a/include/perfetto/base/sock_utils.h b/include/perfetto/base/sock_utils.h
deleted file mode 100644
index 0008bf3..0000000
--- a/include/perfetto/base/sock_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef INCLUDE_PERFETTO_BASE_SOCK_UTILS_H_
-#define INCLUDE_PERFETTO_BASE_SOCK_UTILS_H_
-
-#include "perfetto/base/scoped_file.h"
-
-namespace perfetto {
-namespace base {
-
-ssize_t Send(int fd,
- const void* msg,
- size_t len,
- const int* send_fds,
- size_t num_fds);
-
-ssize_t Receive(int fd,
- void* msg,
- size_t len,
- base::ScopedFile* fd_vec,
- size_t max_files);
-} // namespace base
-} // namespace perfetto
-
-#endif // INCLUDE_PERFETTO_BASE_SOCK_UTILS_H_
diff --git a/src/ipc/unix_socket.h b/include/perfetto/base/unix_socket.h
similarity index 92%
rename from src/ipc/unix_socket.h
rename to include/perfetto/base/unix_socket.h
index 32839d3..7daf5a2 100644
--- a/src/ipc/unix_socket.h
+++ b/include/perfetto/base/unix_socket.h
@@ -14,8 +14,8 @@
* limitations under the License.
*/
-#ifndef SRC_IPC_UNIX_SOCKET_H_
-#define SRC_IPC_UNIX_SOCKET_H_
+#ifndef INCLUDE_PERFETTO_BASE_UNIX_SOCKET_H_
+#define INCLUDE_PERFETTO_BASE_UNIX_SOCKET_H_
#include <stdint.h>
#include <sys/types.h>
@@ -25,16 +25,34 @@
#include "perfetto/base/logging.h"
#include "perfetto/base/scoped_file.h"
+#include "perfetto/base/utils.h"
#include "perfetto/base/weak_ptr.h"
-#include "perfetto/ipc/basic_types.h"
+
+#include <sys/socket.h>
+#include <sys/un.h>
namespace perfetto {
-
namespace base {
-class TaskRunner;
-} // namespace base.
-namespace ipc {
+class TaskRunner;
+
+ssize_t SockSend(int fd,
+ const void* msg,
+ size_t len,
+ const int* send_fds,
+ size_t num_fds);
+
+ssize_t SockReceive(int fd,
+ void* msg,
+ size_t len,
+ base::ScopedFile* fd_vec,
+ size_t max_files);
+
+bool MakeSockAddr(const std::string& socket_name,
+ sockaddr_un* addr,
+ socklen_t* addr_size);
+
+base::ScopedFile CreateSocket();
// A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
// transfer file descriptors. None of the methods in this class are blocking.
@@ -234,7 +252,7 @@
base::WeakPtrFactory<UnixSocket> weak_ptr_factory_;
};
-} // namespace ipc
+} // namespace base
} // namespace perfetto
-#endif // SRC_IPC_UNIX_SOCKET_H_
+#endif // INCLUDE_PERFETTO_BASE_UNIX_SOCKET_H_
diff --git a/include/perfetto/base/utils.h b/include/perfetto/base/utils.h
index 9b18214..4d2f439 100644
--- a/include/perfetto/base/utils.h
+++ b/include/perfetto/base/utils.h
@@ -22,6 +22,9 @@
#include <errno.h>
#include <stddef.h>
#include <stdlib.h>
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+#include <sys/types.h>
+#endif
#define PERFETTO_EINTR(x) \
({ \
@@ -61,6 +64,11 @@
namespace perfetto {
namespace base {
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+constexpr uid_t kInvalidUid = static_cast<uid_t>(-1);
+constexpr pid_t kInvalidPid = static_cast<pid_t>(-1);
+#endif
+
constexpr size_t kPageSize = 4096;
constexpr size_t kMaxCpus = 128;
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 68939a9..019cd7b 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -88,13 +88,13 @@
if (!build_with_chromium) {
# This cannot be in :base as it does not build on WASM.
- source_set("sock_utils") {
+ source_set("unix_socket") {
deps = [
"../../gn:default_deps",
"../../include/perfetto/base",
]
sources = [
- "sock_utils.cc",
+ "unix_socket.cc",
]
}
}
@@ -153,7 +153,10 @@
"utils_unittest.cc",
]
}
- if (!build_with_chromium && (is_linux || is_android)) {
- sources += [ "watchdog_unittest.cc" ]
+ if (!build_with_chromium) {
+ sources += [ "unix_socket_unittest.cc" ]
+ if (is_linux || is_android) {
+ sources += [ "watchdog_unittest.cc" ]
+ }
}
}
diff --git a/src/base/sock_utils.cc b/src/base/sock_utils.cc
deleted file mode 100644
index d643d8d..0000000
--- a/src/base/sock_utils.cc
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "perfetto/base/sock_utils.h"
-
-#include <sys/socket.h>
-#include <sys/un.h>
-
-namespace perfetto {
-namespace base {
-namespace {
-// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
-// created with SO_NOSIGPIPE (See InitializeSocket()).
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
-constexpr int kNoSigPipe = 0;
-#else
-constexpr int kNoSigPipe = MSG_NOSIGNAL;
-#endif
-
-// Android takes an int instead of socklen_t for the control buffer size.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-using CBufLenType = size_t;
-#else
-using CBufLenType = socklen_t;
-#endif
-}
-
-// The CMSG_* macros use NULL instead of nullptr.
-#pragma GCC diagnostic push
-#if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
-#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
-#endif
-
-ssize_t Send(int fd,
- const void* msg,
- size_t len,
- const int* send_fds,
- size_t num_fds) {
- msghdr msg_hdr = {};
- iovec iov = {const_cast<void*>(msg), len};
- msg_hdr.msg_iov = &iov;
- msg_hdr.msg_iovlen = 1;
- alignas(cmsghdr) char control_buf[256];
-
- if (num_fds > 0) {
- const CBufLenType control_buf_len =
- static_cast<CBufLenType>(CMSG_SPACE(num_fds * sizeof(int)));
- PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
- memset(control_buf, 0, sizeof(control_buf));
- msg_hdr.msg_control = control_buf;
- msg_hdr.msg_controllen = control_buf_len;
- struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = static_cast<CBufLenType>(CMSG_LEN(num_fds * sizeof(int)));
- memcpy(CMSG_DATA(cmsg), send_fds, num_fds * sizeof(int));
- msg_hdr.msg_controllen = cmsg->cmsg_len;
- }
-
- return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
-}
-
-ssize_t Receive(int fd,
- void* msg,
- size_t len,
- base::ScopedFile* fd_vec,
- size_t max_files) {
- msghdr msg_hdr = {};
- iovec iov = {msg, len};
- msg_hdr.msg_iov = &iov;
- msg_hdr.msg_iovlen = 1;
- alignas(cmsghdr) char control_buf[256];
-
- if (max_files > 0) {
- msg_hdr.msg_control = control_buf;
- msg_hdr.msg_controllen =
- static_cast<CBufLenType>(CMSG_SPACE(max_files * sizeof(int)));
- PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
- }
- const ssize_t sz = PERFETTO_EINTR(recvmsg(fd, &msg_hdr, kNoSigPipe));
- if (sz <= 0) {
- return sz;
- }
- PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
-
- int* fds = nullptr;
- uint32_t fds_len = 0;
-
- if (max_files > 0) {
- for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
- cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
- const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
- if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
- PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
- PERFETTO_CHECK(fds == nullptr);
- fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
- fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
- }
- }
- }
-
- if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
- for (size_t i = 0; fds && i < fds_len; ++i)
- close(fds[i]);
- errno = EMSGSIZE;
- return -1;
- }
-
- for (size_t i = 0; fds && i < fds_len; ++i) {
- if (i < max_files)
- fd_vec[i].reset(fds[i]);
- else
- close(fds[i]);
- }
-
- return sz;
-}
-
-#pragma GCC diagnostic pop
-
-} // namespace base
-} // namespace perfetto
diff --git a/src/ipc/unix_socket.cc b/src/base/unix_socket.cc
similarity index 72%
rename from src/ipc/unix_socket.cc
rename to src/base/unix_socket.cc
index 4acc59f..e3ed5f2 100644
--- a/src/ipc/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-#include "src/ipc/unix_socket.h"
+#include "perfetto/base/unix_socket.h"
#include <errno.h>
#include <fcntl.h>
@@ -31,7 +31,6 @@
#include "perfetto/base/build_config.h"
#include "perfetto/base/logging.h"
-#include "perfetto/base/sock_utils.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/base/utils.h"
@@ -40,11 +39,117 @@
#endif
namespace perfetto {
-namespace ipc {
-
-// TODO(primiano): Add ThreadChecker to methods of this class.
+namespace base {
namespace {
+// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
+// created with SO_NOSIGPIPE (See InitializeSocket()).
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+constexpr int kNoSigPipe = 0;
+#else
+constexpr int kNoSigPipe = MSG_NOSIGNAL;
+#endif
+
+// Android takes an int instead of socklen_t for the control buffer size.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+using CBufLenType = size_t;
+#else
+using CBufLenType = socklen_t;
+#endif
+}
+
+// The CMSG_* macros use NULL instead of nullptr.
+#pragma GCC diagnostic push
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
+#endif
+
+ssize_t SockSend(int fd,
+ const void* msg,
+ size_t len,
+ const int* send_fds,
+ size_t num_fds) {
+ msghdr msg_hdr = {};
+ iovec iov = {const_cast<void*>(msg), len};
+ msg_hdr.msg_iov = &iov;
+ msg_hdr.msg_iovlen = 1;
+ alignas(cmsghdr) char control_buf[256];
+
+ if (num_fds > 0) {
+ const CBufLenType control_buf_len =
+ static_cast<CBufLenType>(CMSG_SPACE(num_fds * sizeof(int)));
+ PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
+ memset(control_buf, 0, sizeof(control_buf));
+ msg_hdr.msg_control = control_buf;
+ msg_hdr.msg_controllen = control_buf_len;
+ struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = static_cast<CBufLenType>(CMSG_LEN(num_fds * sizeof(int)));
+ memcpy(CMSG_DATA(cmsg), send_fds, num_fds * sizeof(int));
+ msg_hdr.msg_controllen = cmsg->cmsg_len;
+ }
+
+ return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
+}
+
+ssize_t SockReceive(int fd,
+ void* msg,
+ size_t len,
+ ScopedFile* fd_vec,
+ size_t max_files) {
+ msghdr msg_hdr = {};
+ iovec iov = {msg, len};
+ msg_hdr.msg_iov = &iov;
+ msg_hdr.msg_iovlen = 1;
+ alignas(cmsghdr) char control_buf[256];
+
+ if (max_files > 0) {
+ msg_hdr.msg_control = control_buf;
+ msg_hdr.msg_controllen =
+ static_cast<CBufLenType>(CMSG_SPACE(max_files * sizeof(int)));
+ PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
+ }
+ const ssize_t sz = PERFETTO_EINTR(recvmsg(fd, &msg_hdr, kNoSigPipe));
+ if (sz <= 0) {
+ return sz;
+ }
+ PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
+
+ int* fds = nullptr;
+ uint32_t fds_len = 0;
+
+ if (max_files > 0) {
+ for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
+ cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
+ const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
+ if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
+ PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
+ PERFETTO_CHECK(fds == nullptr);
+ fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
+ fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
+ }
+ }
+ }
+
+ if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
+ for (size_t i = 0; fds && i < fds_len; ++i)
+ close(fds[i]);
+ errno = EMSGSIZE;
+ return -1;
+ }
+
+ for (size_t i = 0; fds && i < fds_len; ++i) {
+ if (i < max_files)
+ fd_vec[i].reset(fds[i]);
+ else
+ close(fds[i]);
+ }
+
+ return sz;
+}
+
+#pragma GCC diagnostic pop
bool MakeSockAddr(const std::string& socket_name,
sockaddr_un* addr,
@@ -64,27 +169,27 @@
return true;
}
-base::ScopedFile CreateSocket() {
- return base::ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
+ScopedFile CreateSocket() {
+ return ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
}
-} // namespace
+// TODO(primiano): Add ThreadChecker to methods of this class.
// static
-base::ScopedFile UnixSocket::CreateAndBind(const std::string& socket_name) {
- base::ScopedFile fd = CreateSocket();
+ScopedFile UnixSocket::CreateAndBind(const std::string& socket_name) {
+ ScopedFile fd = CreateSocket();
if (!fd)
return fd;
sockaddr_un addr;
socklen_t addr_size;
if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
- return base::ScopedFile();
+ return ScopedFile();
}
if (bind(*fd, reinterpret_cast<sockaddr*>(&addr), addr_size)) {
PERFETTO_DPLOG("bind()");
- return base::ScopedFile();
+ return ScopedFile();
}
return fd;
@@ -93,15 +198,15 @@
// static
std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
EventListener* event_listener,
- base::TaskRunner* task_runner) {
+ TaskRunner* task_runner) {
// Forward the call to the Listen() overload below.
return Listen(CreateAndBind(socket_name), event_listener, task_runner);
}
// static
-std::unique_ptr<UnixSocket> UnixSocket::Listen(base::ScopedFile socket_fd,
+std::unique_ptr<UnixSocket> UnixSocket::Listen(ScopedFile socket_fd,
EventListener* event_listener,
- base::TaskRunner* task_runner) {
+ TaskRunner* task_runner) {
std::unique_ptr<UnixSocket> sock(new UnixSocket(
event_listener, task_runner, std::move(socket_fd), State::kListening));
return sock;
@@ -110,22 +215,21 @@
// static
std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
EventListener* event_listener,
- base::TaskRunner* task_runner) {
+ TaskRunner* task_runner) {
std::unique_ptr<UnixSocket> sock(new UnixSocket(event_listener, task_runner));
sock->DoConnect(socket_name);
return sock;
}
-UnixSocket::UnixSocket(EventListener* event_listener,
- base::TaskRunner* task_runner)
+UnixSocket::UnixSocket(EventListener* event_listener, TaskRunner* task_runner)
: UnixSocket(event_listener,
task_runner,
- base::ScopedFile(),
+ ScopedFile(),
State::kDisconnected) {}
UnixSocket::UnixSocket(EventListener* event_listener,
- base::TaskRunner* task_runner,
- base::ScopedFile adopt_fd,
+ TaskRunner* task_runner,
+ ScopedFile adopt_fd,
State adopt_state)
: event_listener_(event_listener),
task_runner_(task_runner),
@@ -179,7 +283,7 @@
SetBlockingIO(false);
- base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+ WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
task_runner_->AddFileDescriptorWatch(*fd_, [weak_ptr]() {
if (weak_ptr)
weak_ptr->OnEvent();
@@ -223,7 +327,7 @@
// just trigger an OnEvent without waiting for the FD watch. That will poll
// the SO_ERROR and evolve the state into either kConnected or kDisconnected.
if (res == 0) {
- base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+ WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
task_runner_->PostTask([weak_ptr]() {
if (weak_ptr)
weak_ptr->OnEvent();
@@ -281,7 +385,7 @@
for (;;) {
sockaddr_un cli_addr = {};
socklen_t size = sizeof(cli_addr);
- base::ScopedFile new_fd(PERFETTO_EINTR(
+ ScopedFile new_fd(PERFETTO_EINTR(
accept(*fd_, reinterpret_cast<sockaddr*>(&cli_addr), &size)));
if (!new_fd)
return;
@@ -317,7 +421,7 @@
if (blocking_mode == BlockingMode::kBlocking)
SetBlockingIO(true);
- const ssize_t sz = base::Send(*fd_, msg, len, send_fds, num_fds);
+ const ssize_t sz = SockSend(*fd_, msg, len, send_fds, num_fds);
if (blocking_mode == BlockingMode::kBlocking)
SetBlockingIO(false);
@@ -347,7 +451,7 @@
}
void UnixSocket::Shutdown(bool notify) {
- base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+ WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
if (notify) {
if (state_ == State::kConnected) {
task_runner_->PostTask([weak_ptr]() {
@@ -376,14 +480,14 @@
size_t UnixSocket::Receive(void* msg,
size_t len,
- base::ScopedFile* fd_vec,
+ ScopedFile* fd_vec,
size_t max_files) {
if (state_ != State::kConnected) {
last_error_ = ENOTCONN;
return 0;
}
- const ssize_t sz = base::Receive(*fd_, msg, len, fd_vec, max_files);
+ const ssize_t sz = SockReceive(*fd_, msg, len, fd_vec, max_files);
if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
last_error_ = EAGAIN;
return 0;
@@ -409,7 +513,7 @@
if (!success)
Shutdown(false);
- base::WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+ WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
task_runner_->PostTask([weak_ptr, success]() {
if (weak_ptr)
weak_ptr->event_listener_->OnConnect(weak_ptr.get(), success);
@@ -435,5 +539,5 @@
void UnixSocket::EventListener::OnDisconnect(UnixSocket*) {}
void UnixSocket::EventListener::OnDataAvailable(UnixSocket*) {}
-} // namespace ipc
+} // namespace base
} // namespace perfetto
diff --git a/src/ipc/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
similarity index 97%
rename from src/ipc/unix_socket_unittest.cc
rename to src/base/unix_socket_unittest.cc
index 024072f..5cdd6ab 100644
--- a/src/ipc/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-#include "src/ipc/unix_socket.h"
+#include "perfetto/base/unix_socket.h"
#include <sys/mman.h>
@@ -31,7 +31,7 @@
#include "src/ipc/test/test_socket.h"
namespace perfetto {
-namespace ipc {
+namespace base {
namespace {
using ::testing::_;
@@ -74,7 +74,7 @@
void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
- base::TestTaskRunner task_runner_;
+ TestTaskRunner task_runner_;
MockEventListener event_listener_;
};
@@ -199,15 +199,15 @@
ASSERT_TRUE(srv_conn);
ASSERT_TRUE(cli->is_connected());
- base::ScopedFile null_fd(open("/dev/null", O_RDONLY));
- base::ScopedFile zero_fd(open("/dev/zero", O_RDONLY));
+ ScopedFile null_fd(open("/dev/null", O_RDONLY));
+ ScopedFile zero_fd(open("/dev/zero", O_RDONLY));
auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
.WillRepeatedly(Invoke([cli_did_recv](UnixSocket* s) {
- base::ScopedFile fd_buf[3];
+ ScopedFile fd_buf[3];
char buf[sizeof(cli_str)];
- if (!s->Receive(buf, sizeof(buf), fd_buf, base::ArraySize(fd_buf)))
+ if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
return;
ASSERT_STREQ(srv_str, buf);
ASSERT_NE(*fd_buf[0], -1);
@@ -225,9 +225,9 @@
auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
.WillRepeatedly(Invoke([srv_did_recv](UnixSocket* s) {
- base::ScopedFile fd_buf[3];
+ ScopedFile fd_buf[3];
char buf[sizeof(srv_str)];
- if (!s->Receive(buf, sizeof(buf), fd_buf, base::ArraySize(fd_buf)))
+ if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
return;
ASSERT_STREQ(cli_str, buf);
ASSERT_NE(*fd_buf[0], -1);
@@ -339,7 +339,7 @@
if (pid == 0) {
// Child process.
- base::TempFile scoped_tmp = base::TempFile::CreateUnlinked();
+ TempFile scoped_tmp = TempFile::CreateUnlinked();
int tmp_fd = scoped_tmp.fd();
ASSERT_FALSE(ftruncate(tmp_fd, kTmpSize));
char* mem = reinterpret_cast<char*>(
@@ -379,7 +379,7 @@
EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
.WillOnce(Invoke([checkpoint](UnixSocket* s) {
char msg[32];
- base::ScopedFile fd;
+ ScopedFile fd;
ASSERT_EQ(5u, s->Receive(msg, sizeof(msg), &fd));
ASSERT_STREQ("txfd", msg);
ASSERT_TRUE(fd);
@@ -405,7 +405,7 @@
constexpr size_t kAtomicWrites_FrameSize = 1123;
bool AtomicWrites_SendAttempt(UnixSocket* s,
- base::TaskRunner* task_runner,
+ TaskRunner* task_runner,
int num_frame) {
char buf[kAtomicWrites_FrameSize];
memset(buf, static_cast<char>(num_frame), sizeof(buf));
@@ -547,7 +547,7 @@
// Perform the blocking send form another thread.
std::thread tx_thread([] {
- base::TestTaskRunner tx_task_runner;
+ TestTaskRunner tx_task_runner;
MockEventListener tx_events;
auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
@@ -594,7 +594,7 @@
// Perform the blocking send form another thread.
std::thread tx_thread([] {
- base::TestTaskRunner tx_task_runner;
+ TestTaskRunner tx_task_runner;
MockEventListener tx_events;
auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
@@ -634,5 +634,5 @@
// verify that no spurious EventListener callback is received.
} // namespace
-} // namespace ipc
+} // namespace base
} // namespace perfetto
diff --git a/src/ipc/BUILD.gn b/src/ipc/BUILD.gn
index 4dffcf7..8a7e4e1 100644
--- a/src/ipc/BUILD.gn
+++ b/src/ipc/BUILD.gn
@@ -30,12 +30,12 @@
public_configs = [ "../../gn:default_config" ]
public_deps = [
"../../include/perfetto/ipc",
+ "../base:unix_socket",
]
deps = [
":wire_protocol",
"../../gn:default_deps",
"../base",
- "../base:sock_utils",
]
sources = [
"buffered_frame_deserializer.cc",
@@ -44,8 +44,6 @@
"host_impl.cc",
"host_impl.h",
"service_proxy.cc",
- "unix_socket.cc",
- "unix_socket.h",
"virtual_destructors.cc",
]
}
@@ -78,7 +76,6 @@
"deferred_unittest.cc",
"host_impl_unittest.cc",
"test/ipc_integrationtest.cc",
- "unix_socket_unittest.cc",
]
}
diff --git a/src/ipc/client_impl.cc b/src/ipc/client_impl.cc
index cc58bf5..d648df5 100644
--- a/src/ipc/client_impl.cc
+++ b/src/ipc/client_impl.cc
@@ -44,13 +44,14 @@
ClientImpl::ClientImpl(const char* socket_name, base::TaskRunner* task_runner)
: task_runner_(task_runner), weak_ptr_factory_(this) {
GOOGLE_PROTOBUF_VERIFY_VERSION;
- sock_ = UnixSocket::Connect(socket_name, this, task_runner);
+ sock_ = base::UnixSocket::Connect(socket_name, this, task_runner);
}
ClientImpl::~ClientImpl() {
// Ensure we are not destroyed in the middle of invoking a reply.
PERFETTO_DCHECK(!invoking_method_reply_);
- OnDisconnect(nullptr); // The UnixSocket* ptr is not used in OnDisconnect().
+ OnDisconnect(
+ nullptr); // The base::UnixSocket* ptr is not used in OnDisconnect().
}
void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
@@ -122,12 +123,12 @@
// the send and PostTask the reply later? Right now we are making Send()
// blocking as a workaround. Propagate bakpressure to the caller instead.
bool res = sock_->Send(buf.data(), buf.size(), fd,
- UnixSocket::BlockingMode::kBlocking);
+ base::UnixSocket::BlockingMode::kBlocking);
PERFETTO_CHECK(res || !sock_->is_connected());
return res;
}
-void ClientImpl::OnConnect(UnixSocket*, bool connected) {
+void ClientImpl::OnConnect(base::UnixSocket*, bool connected) {
// Drain the BindService() calls that were queued before establishig the
// connection with the host.
for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings_) {
@@ -140,7 +141,7 @@
queued_bindings_.clear();
}
-void ClientImpl::OnDisconnect(UnixSocket*) {
+void ClientImpl::OnDisconnect(base::UnixSocket*) {
for (auto it : service_bindings_) {
base::WeakPtr<ServiceProxy>& service_proxy = it.second;
task_runner_->PostTask([service_proxy] {
@@ -152,7 +153,7 @@
queued_bindings_.clear();
}
-void ClientImpl::OnDataAvailable(UnixSocket*) {
+void ClientImpl::OnDataAvailable(base::UnixSocket*) {
size_t rsize;
do {
auto buf = frame_deserializer_.BeginReceive();
diff --git a/src/ipc/client_impl.h b/src/ipc/client_impl.h
index 2402d17..d53c05b 100644
--- a/src/ipc/client_impl.h
+++ b/src/ipc/client_impl.h
@@ -19,9 +19,9 @@
#include "perfetto/base/scoped_file.h"
#include "perfetto/base/task_runner.h"
+#include "perfetto/base/unix_socket.h"
#include "perfetto/ipc/client.h"
#include "src/ipc/buffered_frame_deserializer.h"
-#include "src/ipc/unix_socket.h"
#include "src/ipc/wire_protocol.pb.h"
@@ -39,7 +39,7 @@
class ServiceDescriptor;
-class ClientImpl : public Client, public UnixSocket::EventListener {
+class ClientImpl : public Client, public base::UnixSocket::EventListener {
public:
ClientImpl(const char* socket_name, base::TaskRunner*);
~ClientImpl() override;
@@ -49,10 +49,10 @@
void UnbindService(ServiceID) override;
base::ScopedFile TakeReceivedFD() override;
- // UnixSocket::EventListener implementation.
- void OnConnect(UnixSocket*, bool connected) override;
- void OnDisconnect(UnixSocket*) override;
- void OnDataAvailable(UnixSocket*) override;
+ // base::UnixSocket::EventListener implementation.
+ void OnConnect(base::UnixSocket*, bool connected) override;
+ void OnDisconnect(base::UnixSocket*) override;
+ void OnDataAvailable(base::UnixSocket*) override;
RequestID BeginInvoke(ServiceID,
const std::string& method_name,
@@ -82,7 +82,7 @@
void OnInvokeMethodReply(QueuedRequest, const Frame::InvokeMethodReply&);
bool invoking_method_reply_ = false;
- std::unique_ptr<UnixSocket> sock_;
+ std::unique_ptr<base::UnixSocket> sock_;
base::TaskRunner* const task_runner_;
RequestID last_request_id_ = 0;
BufferedFrameDeserializer frame_deserializer_;
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 9922cb8..445e3d0 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -24,13 +24,13 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "perfetto/base/temp_file.h"
+#include "perfetto/base/unix_socket.h"
#include "perfetto/base/utils.h"
#include "perfetto/ipc/service_descriptor.h"
#include "perfetto/ipc/service_proxy.h"
#include "src/base/test/test_task_runner.h"
#include "src/ipc/buffered_frame_deserializer.h"
#include "src/ipc/test/test_socket.h"
-#include "src/ipc/unix_socket.h"
#include "src/ipc/test/client_unittest_messages.pb.h"
@@ -78,7 +78,7 @@
// A fake host implementation. Listens on |kSockName| and replies to IPC
// metohds like a real one.
-class FakeHost : public UnixSocket::EventListener {
+class FakeHost : public base::UnixSocket::EventListener {
public:
struct FakeMethod {
MethodID id;
@@ -103,7 +103,7 @@
explicit FakeHost(base::TaskRunner* task_runner) {
DESTROY_TEST_SOCK(kSockName);
- listening_sock = UnixSocket::Listen(kSockName, this, task_runner);
+ listening_sock = base::UnixSocket::Listen(kSockName, this, task_runner);
EXPECT_TRUE(listening_sock->is_listening());
}
~FakeHost() override { DESTROY_TEST_SOCK(kSockName); }
@@ -117,15 +117,15 @@
return svc;
}
- // UnixSocket::EventListener implementation.
+ // base::UnixSocket::EventListener implementation.
void OnNewIncomingConnection(
- UnixSocket*,
- std::unique_ptr<UnixSocket> new_connection) override {
+ base::UnixSocket*,
+ std::unique_ptr<base::UnixSocket> new_connection) override {
ASSERT_FALSE(client_sock);
client_sock = std::move(new_connection);
}
- void OnDataAvailable(UnixSocket* sock) override {
+ void OnDataAvailable(base::UnixSocket* sock) override {
if (sock != client_sock.get())
return;
auto buf = frame_deserializer.BeginReceive();
@@ -187,8 +187,8 @@
}
BufferedFrameDeserializer frame_deserializer;
- std::unique_ptr<UnixSocket> listening_sock;
- std::unique_ptr<UnixSocket> client_sock;
+ std::unique_ptr<base::UnixSocket> listening_sock;
+ std::unique_ptr<base::UnixSocket> client_sock;
std::map<std::string, std::unique_ptr<FakeService>> services;
ServiceID last_service_id = 0;
int next_reply_fd = -1;
diff --git a/src/ipc/host_impl.cc b/src/ipc/host_impl.cc
index 2e9fe79..02d0a68 100644
--- a/src/ipc/host_impl.cc
+++ b/src/ipc/host_impl.cc
@@ -56,14 +56,14 @@
: task_runner_(task_runner), weak_ptr_factory_(this) {
GOOGLE_PROTOBUF_VERIFY_VERSION;
PERFETTO_DCHECK_THREAD(thread_checker_);
- sock_ = UnixSocket::Listen(std::move(socket_fd), this, task_runner_);
+ sock_ = base::UnixSocket::Listen(std::move(socket_fd), this, task_runner_);
}
HostImpl::HostImpl(const char* socket_name, base::TaskRunner* task_runner)
: task_runner_(task_runner), weak_ptr_factory_(this) {
GOOGLE_PROTOBUF_VERIFY_VERSION;
PERFETTO_DCHECK_THREAD(thread_checker_);
- sock_ = UnixSocket::Listen(socket_name, this, task_runner_);
+ sock_ = base::UnixSocket::Listen(socket_name, this, task_runner_);
}
HostImpl::~HostImpl() = default;
@@ -81,8 +81,9 @@
return true;
}
-void HostImpl::OnNewIncomingConnection(UnixSocket*,
- std::unique_ptr<UnixSocket> new_conn) {
+void HostImpl::OnNewIncomingConnection(
+ base::UnixSocket*,
+ std::unique_ptr<base::UnixSocket> new_conn) {
PERFETTO_DCHECK_THREAD(thread_checker_);
std::unique_ptr<ClientConnection> client(new ClientConnection());
ClientID client_id = ++last_client_id_;
@@ -92,7 +93,7 @@
clients_[client_id] = std::move(client);
}
-void HostImpl::OnDataAvailable(UnixSocket* sock) {
+void HostImpl::OnDataAvailable(base::UnixSocket* sock) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto it = clients_by_socket_.find(sock);
if (it == clients_by_socket_.end())
@@ -237,11 +238,11 @@
// the send and PostTask the reply later? Right now we are making Send()
// blocking as a workaround. Propagate bakpressure to the caller instead.
bool res = client->sock->Send(buf.data(), buf.size(), fd,
- UnixSocket::BlockingMode::kBlocking);
+ base::UnixSocket::BlockingMode::kBlocking);
PERFETTO_CHECK(res || !client->sock->is_connected());
}
-void HostImpl::OnDisconnect(UnixSocket* sock) {
+void HostImpl::OnDisconnect(base::UnixSocket* sock) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto it = clients_by_socket_.find(sock);
if (it == clients_by_socket_.end())
diff --git a/src/ipc/host_impl.h b/src/ipc/host_impl.h
index bef1d9e..5a062d4 100644
--- a/src/ipc/host_impl.h
+++ b/src/ipc/host_impl.h
@@ -24,17 +24,17 @@
#include "perfetto/base/task_runner.h"
#include "perfetto/base/thread_checker.h"
+#include "perfetto/base/unix_socket.h"
#include "perfetto/ipc/deferred.h"
#include "perfetto/ipc/host.h"
#include "src/ipc/buffered_frame_deserializer.h"
-#include "src/ipc/unix_socket.h"
namespace perfetto {
namespace ipc {
class Frame;
-class HostImpl : public Host, public UnixSocket::EventListener {
+class HostImpl : public Host, public base::UnixSocket::EventListener {
public:
HostImpl(const char* socket_name, base::TaskRunner*);
HostImpl(base::ScopedFile socket_fd, base::TaskRunner*);
@@ -43,20 +43,20 @@
// Host implementation.
bool ExposeService(std::unique_ptr<Service>) override;
- // UnixSocket::EventListener implementation.
- void OnNewIncomingConnection(UnixSocket*,
- std::unique_ptr<UnixSocket>) override;
- void OnDisconnect(UnixSocket*) override;
- void OnDataAvailable(UnixSocket*) override;
+ // base::UnixSocket::EventListener implementation.
+ void OnNewIncomingConnection(base::UnixSocket*,
+ std::unique_ptr<base::UnixSocket>) override;
+ void OnDisconnect(base::UnixSocket*) override;
+ void OnDataAvailable(base::UnixSocket*) override;
- const UnixSocket* sock() const { return sock_.get(); }
+ const base::UnixSocket* sock() const { return sock_.get(); }
private:
// Owns the per-client receive buffer (BufferedFrameDeserializer).
struct ClientConnection {
~ClientConnection();
ClientID id;
- std::unique_ptr<UnixSocket> sock;
+ std::unique_ptr<base::UnixSocket> sock;
BufferedFrameDeserializer frame_deserializer;
base::ScopedFile received_fd;
};
@@ -85,9 +85,9 @@
base::TaskRunner* const task_runner_;
std::map<ServiceID, ExposedService> services_;
- std::unique_ptr<UnixSocket> sock_; // The listening socket.
+ std::unique_ptr<base::UnixSocket> sock_; // The listening socket.
std::map<ClientID, std::unique_ptr<ClientConnection>> clients_;
- std::map<UnixSocket*, ClientConnection*> clients_by_socket_;
+ std::map<base::UnixSocket*, ClientConnection*> clients_by_socket_;
ServiceID last_service_id_ = 0;
ClientID last_client_id_ = 0;
base::WeakPtrFactory<HostImpl> weak_ptr_factory_;
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index f48775d..3ff1f99 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -22,13 +22,13 @@
#include "gtest/gtest.h"
#include "perfetto/base/scoped_file.h"
#include "perfetto/base/temp_file.h"
+#include "perfetto/base/unix_socket.h"
#include "perfetto/base/utils.h"
#include "perfetto/ipc/service.h"
#include "perfetto/ipc/service_descriptor.h"
#include "src/base/test/test_task_runner.h"
#include "src/ipc/buffered_frame_deserializer.h"
#include "src/ipc/test/test_socket.h"
-#include "src/ipc/unix_socket.h"
#include "src/ipc/test/client_unittest_messages.pb.h"
#include "src/ipc/wire_protocol.pb.h"
@@ -78,7 +78,7 @@
ServiceDescriptor descriptor_;
};
-class FakeClient : public UnixSocket::EventListener {
+class FakeClient : public base::UnixSocket::EventListener {
public:
MOCK_METHOD0(OnConnect, void());
MOCK_METHOD0(OnDisconnect, void());
@@ -88,7 +88,7 @@
MOCK_METHOD0(OnRequestError, void());
explicit FakeClient(base::TaskRunner* task_runner) {
- sock_ = UnixSocket::Connect(kSockName, this, task_runner);
+ sock_ = base::UnixSocket::Connect(kSockName, this, task_runner);
}
~FakeClient() override = default;
@@ -118,15 +118,15 @@
SendFrame(frame, fd);
}
- // UnixSocket::EventListener implementation.
- void OnConnect(UnixSocket*, bool success) override {
+ // base::UnixSocket::EventListener implementation.
+ void OnConnect(base::UnixSocket*, bool success) override {
ASSERT_TRUE(success);
OnConnect();
}
- void OnDisconnect(UnixSocket*) override { OnDisconnect(); }
+ void OnDisconnect(base::UnixSocket*) override { OnDisconnect(); }
- void OnDataAvailable(UnixSocket* sock) override {
+ void OnDataAvailable(base::UnixSocket* sock) override {
ASSERT_EQ(sock_.get(), sock);
auto buf = frame_deserializer_.BeginReceive();
base::ScopedFile fd;
@@ -156,7 +156,7 @@
}
BufferedFrameDeserializer frame_deserializer_;
- std::unique_ptr<UnixSocket> sock_;
+ std::unique_ptr<base::UnixSocket> sock_;
std::map<uint64_t /* request_id */, int /* num_replies_received */> requests_;
ServiceID last_bound_service_id_;
};
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 0ccc573..78e57c9 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -15,83 +15,46 @@
import("../../../gn/perfetto.gni")
import("//build_overrides/build.gni")
-source_set("record_reader") {
- public_configs = [ "../../../gn:default_config" ]
- deps = [
- "../../../gn:default_deps",
- "../../base",
- ]
- sources = [
- "record_reader.cc",
- "record_reader.h",
- ]
-}
-
-source_set("unwinding") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+source_set("wire_protocol") {
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
deps = [
"../../../buildtools:libunwindstack",
"../../../gn:default_deps",
"../../base",
]
sources = [
- "unwinding.cc",
- "unwinding.h",
+ "wire_protocol.cc",
+ "wire_protocol.h",
]
}
-source_set("socket_listener") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+source_set("daemon") {
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
deps = [
- ":record_reader",
- ":unwinding",
+ ":wire_protocol",
+ "../../../buildtools:libunwindstack",
"../../../gn:default_deps",
"../../base",
"../../ipc",
]
sources = [
- "socket_listener.cc",
- "socket_listener.h",
- ]
-}
-
-source_set("string_interner") {
- public_configs = [ "../../../gn:default_config" ]
- deps = [
- "../../../gn:default_deps",
- "../../base",
- ]
- sources = [
- "string_interner.cc",
- "string_interner.h",
- ]
-}
-
-source_set("bookkeeping") {
- public_configs = [ "../../../gn:default_config" ]
- deps = [
- ":string_interner",
- "../../../gn:default_deps",
- "../../base",
- ]
- sources = [
"bookkeeping.cc",
"bookkeeping.h",
+ "record_reader.cc",
+ "record_reader.h",
+ "socket_listener.cc",
+ "socket_listener.h",
+ "string_interner.cc",
+ "string_interner.h",
+ "unwinding.cc",
+ "unwinding.h",
]
}
source_set("client") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
deps = [
+ ":wire_protocol",
"../../../buildtools:libunwindstack",
"../../../gn:default_deps",
"../../base",
@@ -103,25 +66,22 @@
}
source_set("unittests") {
- public_configs = [
- "../../../gn:default_config",
- "../../../buildtools:libunwindstack_config",
- ]
+ public_configs = [ "../../../buildtools:libunwindstack_config" ]
testonly = true
deps = [
- ":bookkeeping",
":client",
- ":record_reader",
- ":socket_listener",
- ":string_interner",
- ":unwinding",
+ ":daemon",
+ ":wire_protocol",
+ "../../../gn:default_deps",
"../../../gn:gtest_deps",
"../../base",
"../../base:test_support",
]
sources = [
"bookkeeping_unittest.cc",
+ "bounded_queue_unittest.cc",
"client_unittest.cc",
+ "heapprofd_integrationtest.cc",
"record_reader_unittest.cc",
"socket_listener_unittest.cc",
"string_interner_unittest.cc",
@@ -130,10 +90,8 @@
}
executable("heapprofd") {
- public_configs = [ "../../../gn:default_config" ]
deps = [
- ":socket_listener",
- ":unwinding",
+ ":daemon",
"../../../gn:default_deps",
"../../base",
"../../ipc",
diff --git a/src/profiling/memory/bookkeeping.cc b/src/profiling/memory/bookkeeping.cc
index 1624cee..7a31502 100644
--- a/src/profiling/memory/bookkeeping.cc
+++ b/src/profiling/memory/bookkeeping.cc
@@ -16,37 +16,100 @@
#include "src/profiling/memory/bookkeeping.h"
+#include "perfetto/base/logging.h"
+
namespace perfetto {
-MemoryBookkeeping::Node* MemoryBookkeeping::Node::GetOrCreateChild(
- const MemoryBookkeeping::InternedCodeLocation& loc) {
+GlobalCallstackTrie::Node* GlobalCallstackTrie::Node::GetOrCreateChild(
+ const InternedCodeLocation& loc) {
Node* child = children_.Get(loc);
if (!child)
child = children_.Emplace(loc, this);
return child;
}
-void MemoryBookkeeping::RecordMalloc(const std::vector<CodeLocation>& locs,
- uint64_t address,
- uint64_t size) {
- Node* node = &root_;
- node->cum_size_ += size;
- for (const MemoryBookkeeping::CodeLocation& loc : locs) {
- node = node->GetOrCreateChild(InternCodeLocation(loc));
- node->cum_size_ += size;
+void HeapTracker::RecordMalloc(const std::vector<CodeLocation>& callstack,
+ uint64_t address,
+ uint64_t size,
+ uint64_t sequence_number) {
+ auto it = allocations_.find(address);
+ if (it != allocations_.end()) {
+ if (it->second.sequence_number > sequence_number) {
+ return;
+ } else {
+ // Clean up previous allocation by pretending a free happened just after
+ // it.
+ // CommitFree only uses the sequence number to check whether the
+ // currently active allocation is newer than the free, so we can make
+ // up a sequence_number here.
+ CommitFree(it->second.sequence_number + 1, address);
+ }
}
- allocations_.emplace(address, std::make_pair(size, node));
+ GlobalCallstackTrie::Node* node =
+ callsites_->IncrementCallsite(callstack, size);
+ allocations_.emplace(address, Allocation(size, sequence_number, node));
+
+ // Keep the sequence tracker consistent.
+ RecordFree(kNoopFree, sequence_number);
}
-void MemoryBookkeeping::RecordFree(uint64_t address) {
+void HeapTracker::RecordFree(uint64_t address, uint64_t sequence_number) {
+ if (sequence_number != sequence_number_ + 1) {
+ pending_frees_.emplace(sequence_number, address);
+ return;
+ }
+
+ if (address != kNoopFree)
+ CommitFree(sequence_number, address);
+ sequence_number_++;
+
+ // At this point some other pending frees might be eligible to be committed.
+ auto it = pending_frees_.begin();
+ while (it != pending_frees_.end() && it->first == sequence_number_ + 1) {
+ if (it->second != kNoopFree)
+ CommitFree(it->first, it->second);
+ sequence_number_++;
+ it = pending_frees_.erase(it);
+ }
+}
+
+void HeapTracker::CommitFree(uint64_t sequence_number, uint64_t address) {
auto leaf_it = allocations_.find(address);
if (leaf_it == allocations_.end())
return;
- std::pair<uint64_t, Node*> value = leaf_it->second;
- uint64_t size = value.first;
- Node* node = value.second;
+ const Allocation& value = leaf_it->second;
+ if (value.sequence_number > sequence_number)
+ return;
+ allocations_.erase(leaf_it);
+}
+
+uint64_t GlobalCallstackTrie::GetCumSizeForTesting(
+ const std::vector<CodeLocation>& callstack) {
+ Node* node = &root_;
+ for (const CodeLocation& loc : callstack) {
+ node = node->children_.Get(InternCodeLocation(loc));
+ if (node == nullptr)
+ return 0;
+ }
+ return node->cum_size_;
+}
+
+GlobalCallstackTrie::Node* GlobalCallstackTrie::IncrementCallsite(
+ const std::vector<CodeLocation>& callstack,
+ uint64_t size) {
+ Node* node = &root_;
+ node->cum_size_ += size;
+ for (const CodeLocation& loc : callstack) {
+ node = node->GetOrCreateChild(InternCodeLocation(loc));
+ node->cum_size_ += size;
+ }
+ return node;
+}
+
+void GlobalCallstackTrie::DecrementNode(Node* node, uint64_t size) {
+ PERFETTO_DCHECK(node->cum_size_ >= size);
bool delete_prev = false;
Node* prev = nullptr;
@@ -58,19 +121,6 @@
prev = node;
node = node->parent_;
}
-
- allocations_.erase(leaf_it);
-}
-
-uint64_t MemoryBookkeeping::GetCumSizeForTesting(
- const std::vector<CodeLocation>& locs) {
- Node* node = &root_;
- for (const MemoryBookkeeping::CodeLocation& loc : locs) {
- node = node->children_.Get(InternCodeLocation(loc));
- if (node == nullptr)
- return 0;
- }
- return node->cum_size_;
}
} // namespace perfetto
diff --git a/src/profiling/memory/bookkeeping.h b/src/profiling/memory/bookkeeping.h
index 7e5f705..bd56dd8 100644
--- a/src/profiling/memory/bookkeeping.h
+++ b/src/profiling/memory/bookkeeping.h
@@ -26,39 +26,37 @@
namespace perfetto {
-class MemoryBookkeeping {
- public:
- struct CodeLocation {
- CodeLocation(std::string map_n, std::string function_n)
- : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
+class HeapTracker;
- std::string map_name;
- std::string function_name;
- };
+struct CodeLocation {
+ CodeLocation(std::string map_n, std::string function_n)
+ : map_name(std::move(map_n)), function_name(std::move(function_n)) {}
- void RecordMalloc(const std::vector<CodeLocation>& stack,
- uint64_t address,
- uint64_t size);
- void RecordFree(uint64_t address);
- uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+ std::string map_name;
+ std::string function_name;
+};
- private:
- struct InternedCodeLocation {
- StringInterner::InternedString map_name;
- StringInterner::InternedString function_name;
+// Internal data-structure for GlobalCallstackTrie to save memory if the same
+// function is named multiple times.
+struct InternedCodeLocation {
+ StringInterner::InternedString map_name;
+ StringInterner::InternedString function_name;
- bool operator<(const InternedCodeLocation& other) const {
- if (map_name.id() == other.map_name.id())
- return function_name.id() < other.function_name.id();
- return map_name.id() < other.map_name.id();
- }
- };
-
- InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
- return {interner_.Intern(loc.map_name),
- interner_.Intern(loc.function_name)};
+ bool operator<(const InternedCodeLocation& other) const {
+ if (map_name.id() == other.map_name.id())
+ return function_name.id() < other.function_name.id();
+ return map_name.id() < other.map_name.id();
}
+};
+// Graph of function callsites. This is shared between heap dumps for
+// different processes. Each call site is represented by a
+// GlobalCallstackTrie::Node that is owned by the parent (i.e. calling)
+// callsite. It has a pointer to its parent, which means the function call-graph
+// can be reconstructed from a GlobalCallstackTrie::Node by walking down the
+// pointers to the parents.
+class GlobalCallstackTrie {
+ public:
// Node in a tree of function traces that resulted in an allocation. For
// instance, if alloc_buf is called from foo and bar, which are called from
// main, the tree looks as following.
@@ -77,25 +75,119 @@
// alloc_buf to the leafs of this tree.
class Node {
public:
+ // This is opaque except to GlobalCallstackTrie.
+ friend class GlobalCallstackTrie;
+
Node(InternedCodeLocation location) : Node(std::move(location), nullptr) {}
Node(InternedCodeLocation location, Node* parent)
: parent_(parent), location_(std::move(location)) {}
+ private:
Node* GetOrCreateChild(const InternedCodeLocation& loc);
uint64_t cum_size_ = 0;
- Node* parent_;
+ Node* const parent_;
const InternedCodeLocation location_;
base::LookupSet<Node, const InternedCodeLocation, &Node::location_>
children_;
};
- // Address -> (size, code location)
- std::map<uint64_t, std::pair<uint64_t, Node*>> allocations_;
+ GlobalCallstackTrie() = default;
+ GlobalCallstackTrie(const GlobalCallstackTrie&) = delete;
+ GlobalCallstackTrie& operator=(const GlobalCallstackTrie&) = delete;
+
+ uint64_t GetCumSizeForTesting(const std::vector<CodeLocation>& stack);
+ Node* IncrementCallsite(const std::vector<CodeLocation>& locs, uint64_t size);
+ static void DecrementNode(Node* node, uint64_t size);
+
+ private:
+ InternedCodeLocation InternCodeLocation(const CodeLocation& loc) {
+ return {interner_.Intern(loc.map_name),
+ interner_.Intern(loc.function_name)};
+ }
+
StringInterner interner_;
Node root_{{interner_.Intern(""), interner_.Intern("")}};
};
+// Snapshot for memory allocations of a particular process. Shares callsites
+// with other processes.
+class HeapTracker {
+ public:
+ // Caller needs to ensure that callsites outlives the HeapTracker.
+ explicit HeapTracker(GlobalCallstackTrie* callsites)
+ : callsites_(callsites) {}
+
+ void RecordMalloc(const std::vector<CodeLocation>& stack,
+ uint64_t address,
+ uint64_t size,
+ uint64_t sequence_number);
+ void RecordFree(uint64_t address, uint64_t sequence_number);
+
+ private:
+ static constexpr uint64_t kNoopFree = 0;
+ struct Allocation {
+ Allocation(uint64_t size, uint64_t seq, GlobalCallstackTrie::Node* n)
+ : alloc_size(size), sequence_number(seq), node(n) {}
+
+ Allocation() = default;
+ Allocation(const Allocation&) = delete;
+ Allocation(Allocation&& other) noexcept {
+ alloc_size = other.alloc_size;
+ sequence_number = other.sequence_number;
+ node = other.node;
+ other.node = nullptr;
+ }
+
+ ~Allocation() {
+ if (node)
+ GlobalCallstackTrie::DecrementNode(node, alloc_size);
+ }
+
+ uint64_t alloc_size;
+ uint64_t sequence_number;
+ GlobalCallstackTrie::Node* node;
+ };
+
+ // Sequencing logic works as following:
+ // * mallocs are immediately commited to |allocations_|. They are rejected if
+ // the current malloc for the address has a higher sequence number.
+ //
+ // If all operations with sequence numbers lower than the malloc have been
+ // commited to |allocations_|, sequence_number_ is advanced and all
+ // unblocked pending operations after the current id are commited to
+ // |allocations_|. Otherwise, a no-op record is added to the pending
+ // operations queue to maintain the contiguity of the sequence.
+
+ // * for frees:
+ // if all operations with sequence numbers lower than the free have
+ // been commited to |allocations_| (i.e sequence_number_ ==
+ // sequence_number - 1) the free is commited to |allocations_| and
+ // sequence_number_ is advanced. All unblocked pending operations are
+ // commited to |allocations_|.
+ // otherwise: the free is added to the queue of pending operations.
+
+ // Commits a free operation into |allocations_|.
+ // This must be called after all operations up to sequence_number have been
+ // commited to |allocations_|.
+ void CommitFree(uint64_t sequence_number, uint64_t address);
+
+ // Address -> (size, sequence_number, code location)
+ std::map<uint64_t, Allocation> allocations_;
+
+ // if allocation address != 0, there is pending free of the address.
+ // if == 0, the pending operation is a no-op.
+ // No-op operations come from allocs that have already been commited to
+ // |allocations_|. It is important to keep track of them in the list of
+ // pending to maintain the contiguity of the sequence.
+ std::map<uint64_t /* seq_id */, uint64_t /* allocation address */>
+ pending_frees_;
+
+ // The sequence number all mallocs and frees have been handled up to.
+ uint64_t sequence_number_ = 0;
+ GlobalCallstackTrie* const callsites_;
+};
+
} // namespace perfetto
#endif // SRC_PROFILING_MEMORY_BOOKKEEPING_H_
diff --git a/src/profiling/memory/bookkeeping_unittest.cc b/src/profiling/memory/bookkeeping_unittest.cc
index 23da7cf..1e7b369 100644
--- a/src/profiling/memory/bookkeeping_unittest.cc
+++ b/src/profiling/memory/bookkeeping_unittest.cc
@@ -22,22 +22,65 @@
namespace perfetto {
namespace {
-TEST(BookkeepingTest, Basic) {
- std::vector<MemoryBookkeeping::CodeLocation> stack{
+std::vector<CodeLocation> stack() {
+ return {
{"map1", "fun1"}, {"map2", "fun2"},
};
+}
- std::vector<MemoryBookkeeping::CodeLocation> stack2{
+std::vector<CodeLocation> stack2() {
+ return {
{"map1", "fun1"}, {"map3", "fun3"},
};
- MemoryBookkeeping mb;
- mb.RecordMalloc(stack, 1, 5);
- mb.RecordMalloc(stack2, 2, 2);
- ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
- mb.RecordFree(2);
- ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
- mb.RecordFree(1);
- ASSERT_EQ(mb.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, Basic) {
+ uint64_t sequence_number = 1;
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+ hd.RecordMalloc(stack2(), 2, 2, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+ hd.RecordFree(2, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+ hd.RecordFree(1, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 0);
+}
+
+TEST(BookkeepingTest, TwoHeapTrackers) {
+ uint64_t sequence_number = 1;
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+ {
+ HeapTracker hd2(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+ hd2.RecordMalloc(stack2(), 2, 2, sequence_number++);
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 7);
+ }
+ ASSERT_EQ(c.GetCumSizeForTesting({{"map1", "fun1"}}), 5);
+}
+
+TEST(BookkeepingTest, ReplaceAlloc) {
+ uint64_t sequence_number = 1;
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, sequence_number++);
+ hd.RecordMalloc(stack2(), 1, 2, sequence_number++);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack()), 0);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 2);
+}
+
+TEST(BookkeepingTest, OutOfOrder) {
+ GlobalCallstackTrie c;
+ HeapTracker hd(&c);
+
+ hd.RecordMalloc(stack(), 1, 5, 1);
+ hd.RecordMalloc(stack2(), 1, 2, 0);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack()), 5);
+ EXPECT_EQ(c.GetCumSizeForTesting(stack2()), 0);
}
} // namespace
diff --git a/src/profiling/memory/bounded_queue.h b/src/profiling/memory/bounded_queue.h
new file mode 100644
index 0000000..521ec5d
--- /dev/null
+++ b/src/profiling/memory/bounded_queue.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+#define SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
+
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+
+#include "perfetto/base/logging.h"
+
+// Transport messages between threads. Multiple-producer / single-consumer.
+//
+// This has to outlive both the consumer and the producer who have to
+// negotiate termination separately, if needed. This is currently only used
+// in a scenario where the producer and consumer both are loops that never
+// terminate.
+template <typename T>
+class BoundedQueue {
+ public:
+ BoundedQueue() : BoundedQueue(1) {}
+ BoundedQueue(size_t capacity) : capacity_(capacity) {
+ PERFETTO_CHECK(capacity > 0);
+ }
+
+ void Add(T item) {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (deque_.size() == capacity_)
+ full_cv_.wait(l, [this] { return deque_.size() < capacity_; });
+ deque_.emplace_back(std::move(item));
+ if (deque_.size() == 1)
+ empty_cv_.notify_all();
+ }
+
+ T Get() {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (elements_ == 0)
+ empty_cv_.wait(l, [this] { return !deque_.empty(); });
+ T item(std::move(deque_.front()));
+ deque_.pop_front();
+ if (deque_.size() == capacity_ - 1) {
+ l.unlock();
+ full_cv_.notify_all();
+ }
+ return item;
+ }
+
+ void SetCapacity(size_t capacity) {
+ PERFETTO_CHECK(capacity > 0);
+ {
+ std::lock_guard<std::mutex> l(mutex_);
+ capacity_ = capacity;
+ }
+ full_cv_.notify_all();
+ }
+
+ private:
+ size_t capacity_;
+ size_t elements_ = 0;
+ std::deque<T> deque_;
+ std::condition_variable full_cv_;
+ std::condition_variable empty_cv_;
+ std::mutex mutex_;
+};
+
+#endif // SRC_PROFILING_MEMORY_BOUNDED_QUEUE_H_
diff --git a/src/profiling/memory/bounded_queue_unittest.cc b/src/profiling/memory/bounded_queue_unittest.cc
new file mode 100644
index 0000000..63127d1
--- /dev/null
+++ b/src/profiling/memory/bounded_queue_unittest.cc
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/bounded_queue.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(BoundedQueueTest, IsFIFO) {
+ BoundedQueue<int> q(2);
+ q.Add(1);
+ q.Add(2);
+ EXPECT_EQ(q.Get(), 1);
+ EXPECT_EQ(q.Get(), 2);
+}
+
+TEST(BoundedQueueTest, Blocking) {
+ BoundedQueue<int> q(2);
+ q.Add(1);
+ q.Add(2);
+ std::thread th([&q] { q.Add(3); });
+ EXPECT_EQ(q.Get(), 1);
+ EXPECT_EQ(q.Get(), 2);
+ EXPECT_EQ(q.Get(), 3);
+ th.join();
+}
+
+TEST(BoundedQueueTest, Resize) {
+ BoundedQueue<int> q(2);
+ q.Add(1);
+ q.Add(2);
+ q.SetCapacity(3);
+ q.Add(3);
+ EXPECT_EQ(q.Get(), 1);
+ EXPECT_EQ(q.Get(), 2);
+ EXPECT_EQ(q.Get(), 3);
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index f8d57f7..d00f451 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -18,82 +18,90 @@
#include <inttypes.h>
#include <sys/socket.h>
+#include <sys/syscall.h>
+#include <sys/un.h>
+#include <unistd.h>
#include <atomic>
+#include <new>
+
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+#include <unwindstack/Regs.h>
+#include <unwindstack/RegsGetLocal.h>
#include "perfetto/base/logging.h"
+#include "perfetto/base/unix_socket.h"
#include "perfetto/base/utils.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
namespace {
-std::atomic<uint64_t> global_sequence_number(0);
-constexpr size_t kFreePageBytes = base::kPageSize;
-constexpr size_t kFreePageSize = kFreePageBytes / sizeof(uint64_t);
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+// glibc does not define a wrapper around gettid, bionic does.
+pid_t gettid() {
+ return static_cast<pid_t>(syscall(__NR_gettid));
+}
+#endif
+
+std::vector<base::ScopedFile> ConnectPool(const std::string& sock_name,
+ size_t n) {
+ sockaddr_un addr;
+ socklen_t addr_size;
+ if (!base::MakeSockAddr(sock_name, &addr, &addr_size))
+ return {};
+
+ std::vector<base::ScopedFile> res;
+ res.reserve(n);
+ for (size_t i = 0; i < n; ++i) {
+ auto sock = base::CreateSocket();
+ if (connect(*sock, reinterpret_cast<sockaddr*>(&addr), addr_size) == -1) {
+ PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
+ continue;
+ }
+ res.emplace_back(std::move(sock));
+ }
+ return res;
+}
+
+inline bool IsMainThread() {
+ return getpid() == gettid();
+}
} // namespace
-FreePage::FreePage() : free_page_(kFreePageSize) {
- free_page_[0] = static_cast<uint64_t>(kFreePageBytes);
- free_page_[1] = static_cast<uint64_t>(RecordType::Free);
- offset_ = 2;
- // Code in Add assumes that offset is aligned to 2.
- PERFETTO_DCHECK(offset_ % 2 == 0);
+void FreePage::Add(const uint64_t addr,
+ const uint64_t sequence_number,
+ SocketPool* pool) {
+ std::lock_guard<std::mutex> l(mutex_);
+ if (offset_ == kFreePageSize) {
+ FlushLocked(pool);
+ // Now that we have flushed, reset to after the header.
+ offset_ = 0;
+ }
+ FreePageEntry& current_entry = free_page_.entries[offset_++];
+ current_entry.sequence_number = sequence_number;
+ current_entry.addr = addr;
}
-void FreePage::Add(const uint64_t addr, SocketPool* pool) {
- std::lock_guard<std::mutex> l(mtx_);
- if (offset_ == kFreePageSize)
- Flush(pool);
- static_assert(kFreePageSize % 2 == 0,
- "free page size needs to be divisible by two");
- free_page_[offset_++] = ++global_sequence_number;
- free_page_[offset_++] = addr;
- PERFETTO_DCHECK(offset_ % 2 == 0);
-}
-
-void FreePage::Flush(SocketPool* pool) {
+void FreePage::FlushLocked(SocketPool* pool) {
+ WireMessage msg = {};
+ msg.record_type = RecordType::Free;
+ msg.free_header = &free_page_;
BorrowedSocket fd(pool->Borrow());
- size_t written = 0;
- do {
- ssize_t wr = PERFETTO_EINTR(send(*fd, &free_page_[0] + written,
- kFreePageBytes - written, MSG_NOSIGNAL));
- if (wr == -1) {
- fd.Close();
- return;
- }
- written += static_cast<size_t>(wr);
- } while (written < kFreePageBytes);
- // Now that we have flushed, reset to after the header.
- offset_ = 2;
-}
-
-BorrowedSocket::BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
- : fd_(std::move(fd)), socket_pool_(socket_pool) {}
-
-int BorrowedSocket::operator*() {
- return get();
-}
-
-int BorrowedSocket::get() {
- return *fd_;
-}
-
-void BorrowedSocket::Close() {
- fd_.reset();
-}
-
-BorrowedSocket::~BorrowedSocket() {
- if (socket_pool_ != nullptr)
- socket_pool_->Return(std::move(fd_));
+ SendWireMessage(*fd, msg);
}
SocketPool::SocketPool(std::vector<base::ScopedFile> sockets)
: sockets_(std::move(sockets)), available_sockets_(sockets_.size()) {}
BorrowedSocket SocketPool::Borrow() {
- std::unique_lock<std::mutex> lck_(mtx_);
+ std::unique_lock<std::mutex> lck_(mutex_);
if (available_sockets_ == 0)
cv_.wait(lck_, [this] { return available_sockets_ > 0; });
PERFETTO_CHECK(available_sockets_ > 0);
@@ -101,13 +109,14 @@
}
void SocketPool::Return(base::ScopedFile sock) {
+ PERFETTO_CHECK(dead_sockets_ + available_sockets_ < sockets_.size());
if (!sock) {
// TODO(fmayer): Handle reconnect or similar.
// This is just to prevent a deadlock.
PERFETTO_CHECK(++dead_sockets_ != sockets_.size());
return;
}
- std::unique_lock<std::mutex> lck_(mtx_);
+ std::unique_lock<std::mutex> lck_(mutex_);
PERFETTO_CHECK(available_sockets_ < sockets_.size());
sockets_[available_sockets_++] = std::move(sock);
if (available_sockets_ == 1) {
@@ -116,4 +125,87 @@
}
}
+const char* GetThreadStackBase() {
+ pthread_attr_t attr;
+ if (pthread_getattr_np(pthread_self(), &attr) != 0)
+ return nullptr;
+ base::ScopedResource<pthread_attr_t*, pthread_attr_destroy, nullptr> cleanup(
+ &attr);
+
+ char* stackaddr;
+ size_t stacksize;
+ if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&stackaddr),
+ &stacksize) != 0)
+ return nullptr;
+ return stackaddr + stacksize;
+}
+
+Client::Client(std::vector<base::ScopedFile> socks)
+ : socket_pool_(std::move(socks)) {
+ uint64_t size = 0;
+ int fds[2];
+ fds[0] = open("/proc/self/maps", O_RDONLY | O_CLOEXEC);
+ fds[1] = open("/proc/self/mem", O_RDONLY | O_CLOEXEC);
+ base::SockSend(*socket_pool_.Borrow(), &size, sizeof(size), fds, 2);
+}
+
+Client::Client(const std::string& sock_name, size_t conns)
+ : Client(ConnectPool(sock_name, conns)) {}
+
+const char* Client::GetStackBase() {
+ if (IsMainThread()) {
+ if (!main_thread_stack_base_)
+ // Because pthread_attr_getstack reads and parses /proc/self/maps and
+ // /proc/self/stat, we have to cache the result here.
+ main_thread_stack_base_ = GetThreadStackBase();
+ return main_thread_stack_base_;
+ }
+ return GetThreadStackBase();
+}
+
+// The stack grows towards numerically smaller addresses, so the stack layout
+// of main calling malloc is as follows.
+//
+// +------------+
+// |SendWireMsg |
+// stacktop +--> +------------+ 0x1000
+// |RecordMalloc| +
+// +------------+ |
+// | malloc | |
+// +------------+ |
+// | main | v
+// stackbase +-> +------------+ 0xffff
+void Client::RecordMalloc(uint64_t alloc_size, uint64_t alloc_address) {
+ AllocMetadata metadata;
+ const char* stackbase = GetStackBase();
+ const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+ unwindstack::AsmGetRegs(metadata.register_data);
+
+ if (stackbase < stacktop) {
+ PERFETTO_DCHECK(false);
+ return;
+ }
+
+ uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+ metadata.alloc_size = alloc_size;
+ metadata.alloc_address = alloc_address;
+ metadata.stack_pointer = reinterpret_cast<uint64_t>(stacktop);
+ metadata.stack_pointer_offset = sizeof(AllocMetadata);
+ metadata.arch = unwindstack::Regs::CurrentArch();
+ metadata.sequence_number = ++sequence_number_;
+
+ WireMessage msg{};
+ msg.alloc_header = &metadata;
+ msg.payload = const_cast<char*>(stacktop);
+ msg.payload_size = static_cast<size_t>(stack_size);
+
+ BorrowedSocket sockfd = socket_pool_.Borrow();
+ // TODO(fmayer): Handle failure.
+ PERFETTO_CHECK(SendWireMessage(*sockfd, msg));
+}
+
+void Client::RecordFree(uint64_t alloc_address) {
+ free_page_.Add(alloc_address, ++sequence_number_, &socket_pool_);
+}
+
} // namespace perfetto
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index 7a4e459..46a571d 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -23,47 +23,11 @@
#include <vector>
#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
-class SocketPool;
-
-class FreePage {
- public:
- FreePage();
-
- // Can be called from any thread. Must not hold mtx_.`
- void Add(const uint64_t addr, SocketPool* pool);
-
- private:
- // Needs to be called holding mtx_.
- void Flush(SocketPool* pool);
-
- std::vector<uint64_t> free_page_;
- std::mutex mtx_;
- size_t offset_;
-};
-
-class BorrowedSocket {
- public:
- BorrowedSocket(const BorrowedSocket&) = delete;
- BorrowedSocket& operator=(const BorrowedSocket&) = delete;
- BorrowedSocket(BorrowedSocket&& other) {
- fd_ = std::move(other.fd_);
- socket_pool_ = other.socket_pool_;
- other.socket_pool_ = nullptr;
- }
-
- BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool);
- int operator*();
- int get();
- void Close();
- ~BorrowedSocket();
-
- private:
- base::ScopedFile fd_;
- SocketPool* socket_pool_ = nullptr;
-};
+class BorrowedSocket;
class SocketPool {
public:
@@ -74,13 +38,80 @@
private:
void Return(base::ScopedFile fd);
- std::mutex mtx_;
+ std::mutex mutex_;
std::condition_variable cv_;
std::vector<base::ScopedFile> sockets_;
size_t available_sockets_;
size_t dead_sockets_ = 0;
};
+// Socket borrowed from a SocketPool. Gets returned once it goes out of scope.
+class BorrowedSocket {
+ public:
+ BorrowedSocket(const BorrowedSocket&) = delete;
+ BorrowedSocket& operator=(const BorrowedSocket&) = delete;
+ BorrowedSocket(BorrowedSocket&& other) noexcept {
+ fd_ = std::move(other.fd_);
+ socket_pool_ = other.socket_pool_;
+ other.socket_pool_ = nullptr;
+ }
+
+ BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
+ : fd_(std::move(fd)), socket_pool_(socket_pool) {}
+
+ ~BorrowedSocket() {
+ if (socket_pool_ != nullptr)
+ socket_pool_->Return(std::move(fd_));
+ }
+
+ int operator*() { return get(); }
+
+ int get() { return *fd_; }
+
+ void Close() { fd_.reset(); }
+
+ private:
+ base::ScopedFile fd_;
+ SocketPool* socket_pool_ = nullptr;
+};
+
+// Cache for frees that have been observed. It is infeasible to send every
+// free separately, so we batch and send the whole buffer once it is full.
+class FreePage {
+ public:
+ // Add address to buffer. Flush if necessary using a socket borrowed from
+ // pool.
+ // Can be called from any thread. Must not hold mutex_.`
+ void Add(const uint64_t addr, uint64_t sequence_number, SocketPool* pool);
+
+ private:
+ // Needs to be called holding mutex_.
+ void FlushLocked(SocketPool* pool);
+
+ FreeMetadata free_page_;
+ std::mutex mutex_;
+ size_t offset_ = 0;
+};
+
+const char* GetThreadStackBase();
+
+// This is created and owned by the malloc hooks.
+class Client {
+ public:
+ Client(std::vector<base::ScopedFile> sockets);
+ Client(const std::string& sock_name, size_t conns);
+ void RecordMalloc(uint64_t alloc_size, uint64_t alloc_address);
+ void RecordFree(uint64_t alloc_address);
+
+ private:
+ const char* GetStackBase();
+
+ SocketPool socket_pool_;
+ FreePage free_page_;
+ const char* main_thread_stack_base_ = nullptr;
+ std::atomic<uint64_t> sequence_number_{0};
+};
+
} // namespace perfetto
#endif // SRC_PROFILING_MEMORY_CLIENT_H_
diff --git a/src/profiling/memory/client_unittest.cc b/src/profiling/memory/client_unittest.cc
index a69bae3..df848da 100644
--- a/src/profiling/memory/client_unittest.cc
+++ b/src/profiling/memory/client_unittest.cc
@@ -67,5 +67,17 @@
t2.join();
}
+TEST(ClientTest, GetThreadStackBase) {
+ std::thread th([] {
+ const char* stackbase = GetThreadStackBase();
+ ASSERT_NE(stackbase, nullptr);
+ // The implementation assumes the stack grows from higher addresses to
+ // lower. We will need to rework once we encounter architectures where the
+ // stack grows the other way.
+ EXPECT_GT(stackbase, __builtin_frame_address(0));
+ });
+ th.join();
+}
+
} // namespace
} // namespace perfetto
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
new file mode 100644
index 0000000..1dc10aa
--- /dev/null
+++ b/src/profiling/memory/heapprofd_integrationtest.cc
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/base/test/test_task_runner.h"
+#include "src/ipc/test/test_socket.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/socket_listener.h"
+#include "src/profiling/memory/unwinding.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+namespace {
+
+constexpr char kSocketName[] = TEST_SOCK_NAME("heapprofd_integrationtest");
+
+void __attribute__((noinline)) OtherFunction(Client* client) {
+ client->RecordMalloc(10, 0xf00);
+}
+
+void __attribute__((noinline)) SomeFunction(Client* client) {
+ OtherFunction(client);
+}
+
+class HeapprofdIntegrationTest : public ::testing::Test {
+ protected:
+ void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
+ void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
+};
+
+// TODO(fmayer): Fix out of tree integration test.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+#define MAYBE_EndToEnd EndToEnd
+#else
+#define MAYBE_EndToEnd DISABLED_EndToEnd
+#endif
+
+TEST_F(HeapprofdIntegrationTest, MAYBE_EndToEnd) {
+ GlobalCallstackTrie callsites;
+
+ base::TestTaskRunner task_runner;
+ auto done = task_runner.CreateCheckpoint("done");
+ SocketListener listener(
+ [&done](UnwindingRecord r) {
+ // TODO(fmayer): Test symbolization and result of unwinding.
+ BookkeepingRecord bookkeeping_record;
+ ASSERT_TRUE(HandleUnwindingRecord(&r, &bookkeeping_record));
+ HandleBookkeepingRecord(&bookkeeping_record);
+ done();
+ },
+ &callsites);
+
+ auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
+ if (!sock->is_listening()) {
+ PERFETTO_ELOG("Socket not listening.");
+ PERFETTO_CHECK(false);
+ }
+ Client client(kSocketName, 1);
+ SomeFunction(&client);
+ task_runner.RunUntilCheckpoint("done");
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index ea53977..991006e 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -15,9 +15,12 @@
*/
#include <stdlib.h>
+#include <array>
#include <memory>
+#include <vector>
#include "src/ipc/unix_socket.h"
+#include "src/profiling/memory/bounded_queue.h"
#include "src/profiling/memory/socket_listener.h"
#include "perfetto/base/unix_task_runner.h"
@@ -25,15 +28,60 @@
namespace perfetto {
namespace {
+constexpr size_t kUnwinderQueueSize = 1000;
+constexpr size_t kBookkeepingQueueSize = 1000;
+constexpr size_t kUnwinderThreads = 5;
+
+// We create kUnwinderThreads unwinding threads and one bookeeping thread.
+// The bookkeeping thread is singleton in order to avoid expensive and
+// complicated synchronisation in the bookkeeping.
+//
+// We wire up the system by creating BoundedQueues between the threads. The main
+// thread runs the TaskRunner driving the SocketListener. The unwinding thread
+// takes the data received by the SocketListener and if it is a malloc does
+// stack unwinding, and if it is a free just forwards the content of the record
+// to the bookkeeping thread.
+//
+// +--------------+
+// |SocketListener|
+// +------+-------+
+// |
+// +--UnwindingRecord -+
+// | |
+// +--------v-------+ +-------v--------+
+// |Unwinding Thread| |Unwinding Thread|
+// +--------+-------+ +-------+--------+
+// | |
+// +-BookkeepingRecord +
+// |
+// +--------v---------+
+// |Bookkeeping Thread|
+// +------------------+
int HeapprofdMain(int argc, char** argv) {
+ GlobalCallstackTrie callsites;
std::unique_ptr<ipc::UnixSocket> sock;
- SocketListener listener(
- [](size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>) {
- // TODO(fmayer): Wire this up to a worker thread that does the
- // unwinding.
- PERFETTO_LOG("Record received.");
- });
+ BoundedQueue<BookkeepingRecord> callsites_queue(kBookkeepingQueueSize);
+ std::thread bookkeeping_thread(
+ [&callsites_queue] { BookkeepingMainLoop(&callsites_queue); });
+
+ std::array<BoundedQueue<UnwindingRecord>, kUnwinderThreads> unwinder_queues;
+ for (size_t i = 0; i < kUnwinderThreads; ++i)
+ unwinder_queues[i].SetSize(kUnwinderQueueSize);
+ std::vector<std::thread> unwinding_threads;
+ unwinding_threads.reserve(kUnwinderThreads);
+ for (size_t i = 0; i < kUnwinderThreads; ++i) {
+ unwinding_threads.emplace_back([&unwinder_queues, &callsites_queue, i] {
+ UnwindingMainLoop(&unwinder_queues[i], &callsites_queue);
+ });
+ }
+
+ auto on_record_received = [&unwinder_queues](UnwindingRecord r) {
+ unwinder_queues[static_cast<size_t>(r.pid) % kUnwinderThreads].Add(
+ std::move(r));
+ };
+ SocketListener listener(std::move(on_record_received), &callsites);
+
base::UnixTaskRunner read_task_runner;
if (argc == 2) {
// Allow to be able to manually specify the socket to listen on
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
index 81b7390..1a26e01 100644
--- a/src/profiling/memory/socket_listener.cc
+++ b/src/profiling/memory/socket_listener.cc
@@ -19,18 +19,18 @@
namespace perfetto {
-void SocketListener::OnDisconnect(ipc::UnixSocket* self) {
+void SocketListener::OnDisconnect(base::UnixSocket* self) {
sockets_.erase(self);
}
void SocketListener::OnNewIncomingConnection(
- ipc::UnixSocket*,
- std::unique_ptr<ipc::UnixSocket> new_connection) {
- ipc::UnixSocket* new_connection_raw = new_connection.get();
+ base::UnixSocket*,
+ std::unique_ptr<base::UnixSocket> new_connection) {
+ base::UnixSocket* new_connection_raw = new_connection.get();
sockets_.emplace(new_connection_raw, std::move(new_connection));
}
-void SocketListener::OnDataAvailable(ipc::UnixSocket* self) {
+void SocketListener::OnDataAvailable(base::UnixSocket* self) {
auto it = sockets_.find(self);
if (it == sockets_.end())
return;
@@ -82,7 +82,7 @@
if (it == process_metadata_.end() || it->second.expired()) {
// We have not seen the PID yet or the PID is being recycled.
entry->process_metadata = std::make_shared<ProcessMetadata>(
- peer_pid, std::move(maps_fd), std::move(mem_fd));
+ peer_pid, std::move(maps_fd), std::move(mem_fd), callsites_);
process_metadata_[peer_pid] = entry->process_metadata;
} else {
// If the process already has metadata, this is an additional socket for
@@ -92,23 +92,34 @@
}
}
-void SocketListener::RecordReceived(ipc::UnixSocket* self,
+void SocketListener::RecordReceived(base::UnixSocket* self,
size_t size,
std::unique_ptr<uint8_t[]> buf) {
auto it = sockets_.find(self);
- if (it == sockets_.end())
+ if (it == sockets_.end()) {
// This happens for zero-length records, because the callback gets called
// in the first call to Read, before InitProcess is called. Because zero
// length records are useless anyway, this is not a problem.
return;
+ }
Entry& entry = it->second;
+ if (!entry.process_metadata) {
+ PERFETTO_DLOG("Received record without process metadata.");
+ return;
+ }
+
+ if (size == 0) {
+ PERFETTO_DLOG("Dropping empty record.");
+ return;
+ }
// This needs to be a weak_ptr for two reasons:
// 1) most importantly, the weak_ptr in process_metadata_ should expire as
// soon as the last socket for a process goes away. Otherwise, a recycled
// PID might reuse incorrect metadata.
// 2) it is a waste to unwind for a process that had already gone away.
std::weak_ptr<ProcessMetadata> weak_metadata(entry.process_metadata);
- callback_function_(size, std::move(buf), std::move(weak_metadata));
+ callback_function_({entry.process_metadata->pid, size, std::move(buf),
+ std::move(weak_metadata)});
}
} // namespace perfetto
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
index ba5d92e..6d83d9e 100644
--- a/src/profiling/memory/socket_listener.h
+++ b/src/profiling/memory/socket_listener.h
@@ -17,7 +17,8 @@
#ifndef SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
#define SRC_PROFILING_MEMORY_SOCKET_LISTENER_H_
-#include "src/ipc/unix_socket.h"
+#include "perfetto/base/unix_socket.h"
+#include "src/profiling/memory/bookkeeping.h"
#include "src/profiling/memory/record_reader.h"
#include "src/profiling/memory/unwinding.h"
@@ -26,23 +27,22 @@
namespace perfetto {
-class SocketListener : public ipc::UnixSocket::EventListener {
+class SocketListener : public base::UnixSocket::EventListener {
public:
- SocketListener(std::function<void(size_t,
- std::unique_ptr<uint8_t[]>,
- std::weak_ptr<ProcessMetadata>)> fn)
- : callback_function_(std::move(fn)) {}
- void OnDisconnect(ipc::UnixSocket* self) override;
+ SocketListener(std::function<void(UnwindingRecord)> fn,
+ GlobalCallstackTrie* callsites)
+ : callback_function_(std::move(fn)), callsites_(callsites) {}
+ void OnDisconnect(base::UnixSocket* self) override;
void OnNewIncomingConnection(
- ipc::UnixSocket* self,
- std::unique_ptr<ipc::UnixSocket> new_connection) override;
- void OnDataAvailable(ipc::UnixSocket* self) override;
+ base::UnixSocket* self,
+ std::unique_ptr<base::UnixSocket> new_connection) override;
+ void OnDataAvailable(base::UnixSocket* self) override;
private:
struct Entry {
- Entry(std::unique_ptr<ipc::UnixSocket> s) : sock(std::move(s)) {}
+ Entry(std::unique_ptr<base::UnixSocket> s) : sock(std::move(s)) {}
// Only here for ownership of the object.
- const std::unique_ptr<ipc::UnixSocket> sock;
+ const std::unique_ptr<base::UnixSocket> sock;
RecordReader record_reader;
bool recv_fds = false;
// The sockets own the metadata for a particular PID. When the last socket
@@ -55,17 +55,16 @@
std::shared_ptr<ProcessMetadata> process_metadata;
};
- void RecordReceived(ipc::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
+ void RecordReceived(base::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
void InitProcess(Entry* entry,
pid_t peer_pid,
base::ScopedFile maps_fd,
base::ScopedFile mem_fd);
- std::map<ipc::UnixSocket*, Entry> sockets_;
+ std::map<base::UnixSocket*, Entry> sockets_;
std::map<pid_t, std::weak_ptr<ProcessMetadata>> process_metadata_;
- std::function<
- void(size_t, std::unique_ptr<uint8_t[]>, std::weak_ptr<ProcessMetadata>)>
- callback_function_;
+ std::function<void(UnwindingRecord)> callback_function_;
+ GlobalCallstackTrie* callsites_;
};
} // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index da7efdb..58d5e8f 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -37,34 +37,33 @@
void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
};
-class MockEventListener : public ipc::UnixSocket::EventListener {
+class MockEventListener : public base::UnixSocket::EventListener {
public:
- MOCK_METHOD2(OnConnect, void(ipc::UnixSocket*, bool));
+ MOCK_METHOD2(OnConnect, void(base::UnixSocket*, bool));
};
TEST_F(SocketListenerTest, ReceiveRecord) {
base::TestTaskRunner task_runner;
auto callback_called = task_runner.CreateCheckpoint("callback.called");
auto connected = task_runner.CreateCheckpoint("connected");
- auto callback_fn = [&callback_called](
- size_t size, std::unique_ptr<uint8_t[]> buf,
- std::weak_ptr<ProcessMetadata> metadata) {
- ASSERT_EQ(size, 1u);
- ASSERT_EQ(buf[0], '1');
- ASSERT_FALSE(metadata.expired());
+ auto callback_fn = [&callback_called](UnwindingRecord r) {
+ ASSERT_EQ(r.size, 1u);
+ ASSERT_EQ(r.data[0], '1');
+ ASSERT_FALSE(r.metadata.expired());
callback_called();
};
- SocketListener listener(std::move(callback_fn));
+ GlobalCallstackTrie bookkeeping;
+ SocketListener listener(std::move(callback_fn), &bookkeeping);
MockEventListener client_listener;
EXPECT_CALL(client_listener, OnConnect(_, _))
.WillOnce(InvokeWithoutArgs(connected));
- std::unique_ptr<ipc::UnixSocket> recv_socket =
- ipc::UnixSocket::Listen(kSocketName, &listener, &task_runner);
+ std::unique_ptr<base::UnixSocket> recv_socket =
+ base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
- std::unique_ptr<ipc::UnixSocket> client_socket =
- ipc::UnixSocket::Connect(kSocketName, &client_listener, &task_runner);
+ std::unique_ptr<base::UnixSocket> client_socket =
+ base::UnixSocket::Connect(kSocketName, &client_listener, &task_runner);
task_runner.RunUntilCheckpoint("connected");
uint64_t size = 1;
diff --git a/src/profiling/memory/transport_data.h b/src/profiling/memory/transport_data.h
deleted file mode 100644
index c0ef3ef..0000000
--- a/src/profiling/memory/transport_data.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// The data types used for communication between heapprofd and the client
-// embedded in processes that are being profiled.
-
-#ifndef SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-#define SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
-
-#include <inttypes.h>
-#include <unwindstack/Elf.h>
-
-namespace perfetto {
-
-// Use uint64_t to make sure the following data is aligned as 64bit is the
-// strongest alignment requirement.
-enum class RecordType : uint64_t {
- Free = 0,
- Malloc = 1,
-};
-
-struct AllocMetadata {
- // Size of the allocation that was made.
- uint64_t alloc_size;
- // Pointer returned by malloc(2) for this allocation.
- uint64_t alloc_address;
- // Current value of the stack pointer.
- uint64_t stack_pointer;
- // Offset of the data at stack_pointer from the start of this record.
- uint64_t stack_pointer_offset;
- // CPU architecture of the client. This determines the size of the
- // register data that follows this struct.
- unwindstack::ArchEnum arch;
-};
-
-} // namespace perfetto
-
-#endif // SRC_PROFILING_MEMORY_TRANSPORT_DATA_H_
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index a317600..1dcc8c0 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -42,8 +42,8 @@
#include "perfetto/base/file_utils.h"
#include "perfetto/base/logging.h"
#include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
#include "src/profiling/memory/unwinding.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
@@ -155,34 +155,19 @@
maps_.clear();
}
-bool DoUnwind(void* mem,
- size_t sz,
- ProcessMetadata* metadata,
- std::vector<unwindstack::FrameData>* out) {
- if (sz < sizeof(AllocMetadata)) {
- PERFETTO_ELOG("size");
- return false;
- }
- AllocMetadata* alloc_metadata = reinterpret_cast<AllocMetadata*>(mem);
- if (sizeof(AllocMetadata) + RegSize(alloc_metadata->arch) > sz)
- return false;
- void* reg_data = static_cast<uint8_t*>(mem) + sizeof(AllocMetadata);
+bool DoUnwind(WireMessage* msg, ProcessMetadata* metadata, AllocRecord* out) {
+ AllocMetadata* alloc_metadata = msg->alloc_header;
std::unique_ptr<unwindstack::Regs> regs(
- CreateFromRawData(alloc_metadata->arch, reg_data));
+ CreateFromRawData(alloc_metadata->arch, alloc_metadata->register_data));
if (regs == nullptr) {
PERFETTO_ELOG("regs");
return false;
}
- if (alloc_metadata->stack_pointer_offset < sizeof(AllocMetadata) ||
- alloc_metadata->stack_pointer_offset > sz) {
- PERFETTO_ELOG("out-of-bound stack_pointer_offset");
- return false;
- }
- uint8_t* stack =
- reinterpret_cast<uint8_t*>(mem) + alloc_metadata->stack_pointer_offset;
+ out->alloc_metadata = *alloc_metadata;
+ uint8_t* stack = reinterpret_cast<uint8_t*>(msg->payload);
std::shared_ptr<unwindstack::Memory> mems = std::make_shared<StackMemory>(
*metadata->mem_fd, alloc_metadata->stack_pointer, stack,
- sz - alloc_metadata->stack_pointer_offset);
+ msg->payload_size);
unwindstack::Unwinder unwinder(kMaxFrames, &metadata->maps, regs.get(), mems);
// Surpress incorrect "variable may be uninitialized" error for if condition
// after this loop. error_code = LastErrorCode gets run at least once.
@@ -198,10 +183,83 @@
break;
}
if (error_code == 0)
- *out = unwinder.frames();
+ out->frames = unwinder.frames();
else
PERFETTO_DLOG("unwinding failed %" PRIu8, error_code);
return error_code == 0;
}
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out) {
+ WireMessage msg;
+ if (!ReceiveWireMessage(reinterpret_cast<char*>(rec->data.get()), rec->size,
+ &msg))
+ return false;
+ switch (msg.record_type) {
+ case RecordType::Malloc: {
+ std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+ if (!metadata)
+ // Process has already gone away.
+ return false;
+
+ out->metadata = std::move(rec->metadata);
+ out->free_record = {};
+ return DoUnwind(&msg, metadata.get(), &out->alloc_record);
+ }
+ case RecordType::Free: {
+ // We need to keep this alive, because msg.free_header is a pointer into
+ // this.
+ out->metadata = std::move(rec->metadata);
+ out->free_record.free_data = std::move(rec->data);
+ out->free_record.metadata = msg.free_header;
+ out->alloc_record = {};
+ return true;
+ }
+ }
+}
+
+__attribute__((noreturn)) void UnwindingMainLoop(
+ BoundedQueue<UnwindingRecord>* input_queue,
+ BoundedQueue<BookkeepingRecord>* output_queue) {
+ for (;;) {
+ UnwindingRecord rec = input_queue->Get();
+ BookkeepingRecord out;
+ if (HandleUnwindingRecord(&rec, &out))
+ output_queue->Add(std::move(out));
+ }
+}
+
+void HandleBookkeepingRecord(BookkeepingRecord* rec) {
+ std::shared_ptr<ProcessMetadata> metadata = rec->metadata.lock();
+ if (!metadata)
+ // Process has already gone away.
+ return;
+
+ if (rec->free_record.free_data) {
+ FreeRecord& free_rec = rec->free_record;
+ FreePageEntry* entries = free_rec.metadata->entries;
+ uint64_t num_entries = free_rec.metadata->num_entries;
+ for (size_t i = 0; i < num_entries; ++i) {
+ const FreePageEntry& entry = entries[i];
+ metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
+ }
+ } else {
+ AllocRecord& alloc_rec = rec->alloc_record;
+ std::vector<CodeLocation> code_locations;
+ for (unwindstack::FrameData& frame : alloc_rec.frames)
+ code_locations.emplace_back(frame.map_name, frame.function_name);
+ metadata->heap_dump.RecordMalloc(code_locations,
+ alloc_rec.alloc_metadata.alloc_address,
+ alloc_rec.alloc_metadata.alloc_size,
+ alloc_rec.alloc_metadata.sequence_number);
+ }
+}
+
+__attribute__((noreturn)) void BookkeepingMainLoop(
+ BoundedQueue<BookkeepingRecord>* input_queue) {
+ for (;;) {
+ BookkeepingRecord rec = input_queue->Get();
+ HandleBookkeepingRecord(&rec);
+ }
+}
+
} // namespace perfetto
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index 455be43..1bdcf0d 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -20,6 +20,9 @@
#include <unwindstack/Maps.h>
#include <unwindstack/Unwinder.h>
#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/bookkeeping.h"
+#include "src/profiling/memory/bounded_queue.h"
+#include "src/profiling/memory/wire_protocol.h"
namespace perfetto {
@@ -36,13 +39,20 @@
};
struct ProcessMetadata {
- ProcessMetadata(pid_t p, base::ScopedFile maps_fd, base::ScopedFile mem)
- : pid(p), maps(std::move(maps_fd)), mem_fd(std::move(mem)) {
+ ProcessMetadata(pid_t p,
+ base::ScopedFile maps_fd,
+ base::ScopedFile mem,
+ GlobalCallstackTrie* callsites)
+ : pid(p),
+ maps(std::move(maps_fd)),
+ mem_fd(std::move(mem)),
+ heap_dump(callsites) {
PERFETTO_CHECK(maps.Parse());
}
pid_t pid;
FileDescriptorMaps maps;
base::ScopedFile mem_fd;
+ HeapTracker heap_dump;
};
// Overlays size bytes pointed to by stack for addresses in [sp, sp + size).
@@ -62,10 +72,39 @@
size_t RegSize(unwindstack::ArchEnum arch);
-bool DoUnwind(void* mem,
- size_t sz,
- ProcessMetadata* metadata,
- std::vector<unwindstack::FrameData>* out);
+struct UnwindingRecord {
+ pid_t pid;
+ size_t size;
+ std::unique_ptr<uint8_t[]> data;
+ std::weak_ptr<ProcessMetadata> metadata;
+};
+
+struct FreeRecord {
+ std::unique_ptr<uint8_t[]> free_data;
+ FreeMetadata* metadata;
+};
+
+struct AllocRecord {
+ AllocMetadata alloc_metadata;
+ std::vector<unwindstack::FrameData> frames;
+};
+
+struct BookkeepingRecord {
+ // TODO(fmayer): Use a union.
+ std::weak_ptr<ProcessMetadata> metadata;
+ AllocRecord alloc_record;
+ FreeRecord free_record;
+};
+
+bool DoUnwind(WireMessage*, ProcessMetadata* metadata, AllocRecord* out);
+
+bool HandleUnwindingRecord(UnwindingRecord* rec, BookkeepingRecord* out);
+void HandleBookkeepingRecord(BookkeepingRecord* rec);
+
+void UnwindingMainLoop(BoundedQueue<UnwindingRecord>* input_queue,
+ BoundedQueue<BookkeepingRecord>* output_queue);
+
+void BookkeepingMainLoop(BoundedQueue<BookkeepingRecord>* input_queue);
} // namespace perfetto
diff --git a/src/profiling/memory/unwinding_unittest.cc b/src/profiling/memory/unwinding_unittest.cc
index 06f3ba9..f80a0ee 100644
--- a/src/profiling/memory/unwinding_unittest.cc
+++ b/src/profiling/memory/unwinding_unittest.cc
@@ -16,7 +16,8 @@
#include "src/profiling/memory/unwinding.h"
#include "perfetto/base/scoped_file.h"
-#include "src/profiling/memory/transport_data.h"
+#include "src/profiling/memory/client.h"
+#include "src/profiling/memory/wire_protocol.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@@ -70,20 +71,6 @@
#define MAYBE_DoUnwind DISABLED_DoUnwind
#endif
-uint8_t* GetStackBase() {
- pthread_t t = pthread_self();
- pthread_attr_t attr;
- if (pthread_getattr_np(t, &attr) != 0) {
- return nullptr;
- }
- uint8_t* x;
- size_t s;
- if (pthread_attr_getstack(&attr, reinterpret_cast<void**>(&x), &s) != 0)
- return nullptr;
- pthread_attr_destroy(&attr);
- return x + s;
-}
-
// This is needed because ASAN thinks copying the whole stack is a buffer
// underrun.
void __attribute__((noinline))
@@ -95,50 +82,59 @@
to[i] = from[i];
}
-std::pair<std::unique_ptr<uint8_t[]>, size_t> __attribute__((noinline))
-GetRecord() {
- const uint8_t* stackbase = GetStackBase();
- PERFETTO_CHECK(stackbase != nullptr);
- const uint8_t* stacktop =
- reinterpret_cast<uint8_t*>(__builtin_frame_address(0));
- PERFETTO_CHECK(stacktop != nullptr);
- PERFETTO_CHECK(stacktop < stackbase);
- const size_t stack_size = static_cast<size_t>(stackbase - stacktop);
- std::unique_ptr<unwindstack::Regs> regs(unwindstack::Regs::CreateFromLocal());
- const unwindstack::ArchEnum arch = regs->CurrentArch();
- const size_t reg_size = RegSize(arch);
- const size_t total_size = sizeof(AllocMetadata) + reg_size + stack_size;
- std::unique_ptr<uint8_t[]> buf(new uint8_t[total_size]);
- AllocMetadata* metadata = reinterpret_cast<AllocMetadata*>(buf.get());
- metadata->alloc_size = 0;
- metadata->alloc_address = 0;
+struct RecordMemory {
+ std::unique_ptr<uint8_t[]> payload;
+ std::unique_ptr<AllocMetadata> metadata;
+};
+
+RecordMemory __attribute__((noinline)) GetRecord(WireMessage* msg) {
+ std::unique_ptr<AllocMetadata> metadata(new AllocMetadata);
+
+ const char* stackbase = GetThreadStackBase();
+ const char* stacktop = reinterpret_cast<char*>(__builtin_frame_address(0));
+ unwindstack::AsmGetRegs(metadata->register_data);
+
+ if (stackbase < stacktop) {
+ PERFETTO_DCHECK(false);
+ return {nullptr, nullptr};
+ }
+ uint64_t stack_size = static_cast<uint64_t>(stackbase - stacktop);
+
+ metadata->alloc_size = 10;
+ metadata->alloc_address = 0x10;
metadata->stack_pointer = reinterpret_cast<uint64_t>(stacktop);
- metadata->stack_pointer_offset = sizeof(AllocMetadata) + reg_size;
- metadata->arch = arch;
- unwindstack::RegsGetLocal(regs.get());
- // Make sure nothing above has changed the stack pointer, just for extra
- // paranoia.
- PERFETTO_CHECK(stacktop ==
- reinterpret_cast<uint8_t*>(__builtin_frame_address(0)));
- memcpy(buf.get() + sizeof(AllocMetadata), regs->RawData(), reg_size);
- UnsafeMemcpy(buf.get() + sizeof(AllocMetadata) + reg_size, stacktop,
- stack_size);
- return {std::move(buf), total_size};
+ metadata->stack_pointer_offset = sizeof(AllocMetadata);
+ metadata->arch = unwindstack::Regs::CurrentArch();
+ metadata->sequence_number = 1;
+
+ std::unique_ptr<uint8_t[]> payload(new uint8_t[stack_size]);
+ UnsafeMemcpy(&payload[0], stacktop, stack_size);
+
+ *msg = {};
+ msg->alloc_header = metadata.get();
+ msg->payload = reinterpret_cast<char*>(payload.get());
+ msg->payload_size = static_cast<size_t>(stack_size);
+ return {std::move(payload), std::move(metadata)};
}
// TODO(fmayer): Investigate why this fails out of tree.
TEST(UnwindingTest, MAYBE_DoUnwind) {
base::ScopedFile proc_maps(open("/proc/self/maps", O_RDONLY));
base::ScopedFile proc_mem(open("/proc/self/mem", O_RDONLY));
- ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem));
- auto record = GetRecord();
- std::vector<unwindstack::FrameData> out;
- ASSERT_TRUE(DoUnwind(record.first.get(), record.second, &metadata, &out));
+ GlobalCallstackTrie callsites;
+ ProcessMetadata metadata(getpid(), std::move(proc_maps), std::move(proc_mem),
+ &callsites);
+ WireMessage msg;
+ auto record = GetRecord(&msg);
+ AllocRecord out;
+ ASSERT_TRUE(DoUnwind(&msg, &metadata, &out));
int st;
- std::unique_ptr<char> demangled(
- abi::__cxa_demangle(out[0].function_name.c_str(), nullptr, nullptr, &st));
+ std::unique_ptr<char> demangled(abi::__cxa_demangle(
+ out.frames[0].function_name.c_str(), nullptr, nullptr, &st));
ASSERT_EQ(st, 0);
- ASSERT_STREQ(demangled.get(), "perfetto::(anonymous namespace)::GetRecord()");
+ ASSERT_STREQ(
+ demangled.get(),
+ "perfetto::(anonymous namespace)::GetRecord(perfetto::WireMessage*)");
}
} // namespace
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
new file mode 100644
index 0000000..39373f9
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.cc
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+namespace perfetto {
+
+namespace {
+template <typename T>
+bool ViewAndAdvance(char** ptr, T** out, const char* end) {
+ if (end - sizeof(T) < *ptr)
+ return false;
+ *out = reinterpret_cast<T*>(ptr);
+ ptr += sizeof(T);
+ return true;
+}
+} // namespace
+
+bool SendWireMessage(int sock, const WireMessage& msg) {
+ uint64_t total_size;
+ struct iovec iovecs[4] = {};
+ // TODO(fmayer): Maye pack these two.
+ iovecs[0].iov_base = &total_size;
+ iovecs[0].iov_len = sizeof(total_size);
+ iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
+ iovecs[1].iov_len = sizeof(msg.record_type);
+ if (msg.alloc_header) {
+ iovecs[2].iov_base = msg.alloc_header;
+ iovecs[2].iov_len = sizeof(*msg.alloc_header);
+ } else if (msg.free_header) {
+ iovecs[2].iov_base = msg.free_header;
+ iovecs[2].iov_len = sizeof(*msg.free_header);
+ } else {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
+
+ iovecs[3].iov_base = msg.payload;
+ iovecs[3].iov_len = msg.payload_size;
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iovecs;
+ if (msg.payload) {
+ hdr.msg_iovlen = base::ArraySize(iovecs);
+ total_size = iovecs[1].iov_len + iovecs[2].iov_len + iovecs[3].iov_len;
+ } else {
+ // If we are not sending payload, just ignore that iovec.
+ hdr.msg_iovlen = base::ArraySize(iovecs) - 1;
+ total_size = iovecs[1].iov_len + iovecs[2].iov_len;
+ }
+
+ ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+ return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
+}
+
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out) {
+ RecordType* record_type;
+ char* end = buf + size;
+ if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
+ return false;
+ switch (*record_type) {
+ case RecordType::Malloc:
+ if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+ return false;
+ out->payload = buf;
+ if (buf > end) {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
+ out->payload_size = static_cast<size_t>(end - buf);
+ break;
+ case RecordType::Free:
+ if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+ return false;
+ break;
+ }
+ out->record_type = *record_type;
+ return true;
+}
+
+} // namespace perfetto
diff --git a/src/profiling/memory/wire_protocol.h b/src/profiling/memory/wire_protocol.h
new file mode 100644
index 0000000..2810872
--- /dev/null
+++ b/src/profiling/memory/wire_protocol.h
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The data types used for communication between heapprofd and the client
+// embedded in processes that are being profiled.
+
+#ifndef SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+#define SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_
+
+#include <inttypes.h>
+#include <unwindstack/Elf.h>
+#include <unwindstack/MachineArm.h>
+#include <unwindstack/MachineArm64.h>
+#include <unwindstack/MachineMips.h>
+#include <unwindstack/MachineMips64.h>
+#include <unwindstack/MachineX86.h>
+#include <unwindstack/MachineX86_64.h>
+
+namespace perfetto {
+
+// Types needed for the wire format used for communication between the client
+// and heapprofd. The basic format of a record is
+// record size (uint64_t) | record type (RecordType = uint64_t) | record
+// If record type is malloc, the record format is AllocMetdata | raw stack.
+// If the record type is free, the record is a sequence of FreePageEntry.
+
+// Use uint64_t to make sure the following data is aligned as 64bit is the
+// strongest alignment requirement.
+
+// C++11 std::max is not constexpr.
+constexpr size_t constexpr_max(size_t x, size_t y) {
+ return x > y ? x : y;
+}
+
+constexpr size_t kMaxRegisterDataSize = constexpr_max(
+ constexpr_max(constexpr_max(unwindstack::ARM_REG_LAST * sizeof(uint32_t),
+ unwindstack::ARM64_REG_LAST * sizeof(uint64_t)),
+ unwindstack::X86_REG_LAST * sizeof(uint32_t)),
+ unwindstack::X86_64_REG_LAST * sizeof(uint64_t));
+
+constexpr size_t kFreePageSize = 1024;
+
+enum class RecordType : uint64_t {
+ Free = 0,
+ Malloc = 1,
+};
+
+struct AllocMetadata {
+ uint64_t sequence_number;
+ // Size of the allocation that was made.
+ uint64_t alloc_size;
+ // Pointer returned by malloc(2) for this allocation.
+ uint64_t alloc_address;
+ // Current value of the stack pointer.
+ uint64_t stack_pointer;
+ // Offset of the data at stack_pointer from the start of this record.
+ uint64_t stack_pointer_offset;
+ // CPU architecture of the client. This determines the size of the
+ // register data that follows this struct.
+ unwindstack::ArchEnum arch;
+ char register_data[kMaxRegisterDataSize];
+};
+
+struct FreePageEntry {
+ uint64_t sequence_number;
+ uint64_t addr;
+};
+
+struct FreeMetadata {
+ uint64_t num_entries;
+ FreePageEntry entries[kFreePageSize];
+};
+
+struct WireMessage {
+ RecordType record_type;
+
+ AllocMetadata* alloc_header;
+ FreeMetadata* free_header;
+
+ char* payload;
+ size_t payload_size;
+};
+
+bool SendWireMessage(int sock, const WireMessage& msg);
+
+// Parse message received over the wire.
+// |buf| has to outlive |out|.
+// If buf is not a valid message, return false.
+bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out);
+
+} // namespace perfetto
+
+#endif // SRC_PROFILING_MEMORY_WIRE_PROTOCOL_H_