Merge "Ftrace: Add Setup/Start support for fast triggering"
diff --git a/Android.bp b/Android.bp
index e7f0d6c..541e66f 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3850,6 +3850,8 @@
"src/profiling/memory/heapprofd_integrationtest.cc",
"src/profiling/memory/record_reader.cc",
"src/profiling/memory/record_reader_unittest.cc",
+ "src/profiling/memory/sampler.cc",
+ "src/profiling/memory/sampler_unittest.cc",
"src/profiling/memory/socket_listener.cc",
"src/profiling/memory/socket_listener_unittest.cc",
"src/profiling/memory/string_interner.cc",
@@ -3857,6 +3859,7 @@
"src/profiling/memory/unwinding.cc",
"src/profiling/memory/unwinding_unittest.cc",
"src/profiling/memory/wire_protocol.cc",
+ "src/profiling/memory/wire_protocol_unittest.cc",
"src/protozero/message.cc",
"src/protozero/message_handle.cc",
"src/protozero/message_handle_unittest.cc",
diff --git a/include/perfetto/base/unix_socket.h b/include/perfetto/base/unix_socket.h
index 7daf5a2..4a613a0 100644
--- a/include/perfetto/base/unix_socket.h
+++ b/include/perfetto/base/unix_socket.h
@@ -54,6 +54,16 @@
base::ScopedFile CreateSocket();
+// Update msghdr so subsequent sendmsg will send data that remains after n bytes
+// have already been sent.
+// This should not be used, it's exported for test use only.
+void ShiftMsgHdr(size_t n, struct msghdr* msg);
+
+// Re-enter sendmsg until all the data has been sent or an error occurs.
+//
+// TODO(fmayer): Figure out how to do timeouts here for heapprofd.
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags);
+
// A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
// transfer file descriptors. None of the methods in this class are blocking.
// The main design goal is API simplicity and strong guarantees on the
@@ -170,6 +180,8 @@
// EventListener::OnDisconnect() will be called.
// If the socket is not connected, Send() will just return false.
// Does not append a null string terminator to msg in any case.
+ //
+ // DO NOT PASS kNonBlocking, it is broken.
bool Send(const void* msg,
size_t len,
int send_fd = -1,
@@ -179,7 +191,8 @@
const int* send_fds,
size_t num_fds,
BlockingMode blocking = BlockingMode::kNonBlocking);
- bool Send(const std::string& msg);
+ bool Send(const std::string& msg,
+ BlockingMode blockimg = BlockingMode::kNonBlocking);
// Returns the number of bytes (<= |len|) written in |msg| or 0 if there
// is no data in the buffer to read or an error occurs (in which case a
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index e3ed5f2..fb4f5ff 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -64,6 +64,55 @@
#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
+void ShiftMsgHdr(size_t n, struct msghdr* msg) {
+ for (size_t i = 0; i < msg->msg_iovlen; ++i) {
+ struct iovec* vec = &msg->msg_iov[i];
+ if (n < vec->iov_len) {
+ // We sent a part of this iovec.
+ vec->iov_base = reinterpret_cast<char*>(vec->iov_base) + n;
+ vec->iov_len -= n;
+ msg->msg_iov = vec;
+ msg->msg_iovlen -= i;
+ return;
+ }
+ // We sent the whole iovec.
+ n -= vec->iov_len;
+ }
+ // We sent all the iovecs.
+ PERFETTO_CHECK(n == 0);
+ msg->msg_iovlen = 0;
+ msg->msg_iov = nullptr;
+}
+
+// For the interested reader, Linux kernel dive to verify this is not only a
+// theoretical possibility: sock_stream_sendmsg, if sock_alloc_send_pskb returns
+// NULL [1] (which it does when it gets interrupted [2]), returns early with the
+// amount of bytes already sent.
+//
+// [1]:
+// https://elixir.bootlin.com/linux/v4.18.10/source/net/unix/af_unix.c#L1872
+// [2]: https://elixir.bootlin.com/linux/v4.18.10/source/net/core/sock.c#L2101
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags) {
+ // This does not make sense on non-blocking sockets.
+ PERFETTO_DCHECK((fcntl(sockfd, F_GETFL, 0) & O_NONBLOCK) == 0);
+
+ ssize_t total_sent = 0;
+ while (msg->msg_iov) {
+ ssize_t sent = PERFETTO_EINTR(sendmsg(sockfd, msg, flags));
+ if (sent <= 0) {
+ if (sent == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+ return total_sent;
+ return sent;
+ }
+ total_sent += sent;
+ ShiftMsgHdr(static_cast<size_t>(sent), msg);
+ // Only send the ancillary data with the first sendmsg call.
+ msg->msg_control = nullptr;
+ msg->msg_controllen = 0;
+ };
+ return total_sent;
+};
+
ssize_t SockSend(int fd,
const void* msg,
size_t len,
@@ -90,7 +139,7 @@
msg_hdr.msg_controllen = cmsg->cmsg_len;
}
- return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
+ return SendMsgAll(fd, &msg_hdr, kNoSigPipe);
}
ssize_t SockReceive(int fd,
@@ -396,8 +445,8 @@
}
}
-bool UnixSocket::Send(const std::string& msg) {
- return Send(msg.c_str(), msg.size() + 1);
+bool UnixSocket::Send(const std::string& msg, BlockingMode blocking) {
+ return Send(msg.c_str(), msg.size() + 1, -1, blocking);
}
bool UnixSocket::Send(const void* msg,
@@ -414,6 +463,10 @@
const int* send_fds,
size_t num_fds,
BlockingMode blocking_mode) {
+ // TODO(b/117139237): Non-blocking sends are broken because we do not
+ // properly handle partial sends.
+ PERFETTO_DCHECK(blocking_mode == BlockingMode::kBlocking);
+
if (state_ != State::kConnected) {
errno = last_error_ = ENOTCONN;
return false;
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 5cdd6ab..382f9bc 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -41,6 +41,7 @@
using ::testing::Mock;
constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+constexpr auto kBlocking = UnixSocket::BlockingMode::kBlocking;
class MockEventListener : public UnixSocket::EventListener {
public:
@@ -115,7 +116,7 @@
auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
.WillOnce(InvokeWithoutArgs(cli_disconnected));
- EXPECT_FALSE(cli->Send("whatever"));
+ EXPECT_FALSE(cli->Send("whatever", kBlocking));
task_runner_.RunUntilCheckpoint("cli_disconnected");
}
@@ -153,8 +154,8 @@
ASSERT_EQ("cli>srv", s->ReceiveString());
srv_did_recv();
}));
- ASSERT_TRUE(cli->Send("cli>srv"));
- ASSERT_TRUE(srv_conn->Send("srv>cli"));
+ ASSERT_TRUE(cli->Send("cli>srv", kBlocking));
+ ASSERT_TRUE(srv_conn->Send("srv>cli", kBlocking));
task_runner_.RunUntilCheckpoint("cli_did_recv");
task_runner_.RunUntilCheckpoint("srv_did_recv");
@@ -168,8 +169,8 @@
ASSERT_EQ("", cli->ReceiveString());
ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
ASSERT_EQ("", srv_conn->ReceiveString());
- ASSERT_FALSE(cli->Send("foo"));
- ASSERT_FALSE(srv_conn->Send("bar"));
+ ASSERT_FALSE(cli->Send("foo", kBlocking));
+ ASSERT_FALSE(srv_conn->Send("bar", kBlocking));
srv->Shutdown(true);
task_runner_.RunUntilCheckpoint("cli_disconnected");
task_runner_.RunUntilCheckpoint("srv_disconnected");
@@ -244,10 +245,10 @@
int buf_fd[2] = {null_fd.get(), zero_fd.get()};
- ASSERT_TRUE(
- cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
+ ASSERT_TRUE(cli->Send(cli_str, sizeof(cli_str), buf_fd,
+ base::ArraySize(buf_fd), kBlocking));
ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
- base::ArraySize(buf_fd)));
+ base::ArraySize(buf_fd), kBlocking));
task_runner_.RunUntilCheckpoint("srv_did_recv");
task_runner_.RunUntilCheckpoint("cli_did_recv");
@@ -300,7 +301,7 @@
EXPECT_CALL(event_listener_, OnDataAvailable(s))
.WillOnce(Invoke([](UnixSocket* t) {
ASSERT_EQ("PING", t->ReceiveString());
- ASSERT_TRUE(t->Send("PONG"));
+ ASSERT_TRUE(t->Send("PONG", kBlocking));
}));
}));
@@ -309,7 +310,7 @@
EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
.WillOnce(Invoke([](UnixSocket* s, bool success) {
ASSERT_TRUE(success);
- ASSERT_TRUE(s->Send("PING"));
+ ASSERT_TRUE(s->Send("PING", kBlocking));
}));
auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
@@ -356,7 +357,7 @@
.WillOnce(Invoke(
[this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
- ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+ ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd, kBlocking));
// Wait for the client to change this again.
EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
.WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
@@ -391,7 +392,7 @@
// Now change the shared memory and ping the other process.
memcpy(mem, "rock more", 10);
- ASSERT_TRUE(s->Send("change notify"));
+ ASSERT_TRUE(s->Send("change notify", kBlocking));
checkpoint();
}));
task_runner_.RunUntilCheckpoint("change_seen_by_client");
@@ -409,7 +410,7 @@
int num_frame) {
char buf[kAtomicWrites_FrameSize];
memset(buf, static_cast<char>(num_frame), sizeof(buf));
- if (s->Send(buf, sizeof(buf)))
+ if (s->Send(buf, sizeof(buf), -1, kBlocking))
return true;
task_runner->PostTask(
std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
@@ -560,8 +561,7 @@
char buf[1024 * 32] = {};
tx_task_runner.PostTask([&cli, &buf, all_sent] {
for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
- cli->Send(buf, sizeof(buf), -1 /*fd*/,
- UnixSocket::BlockingMode::kBlocking);
+ cli->Send(buf, sizeof(buf), -1 /*fd*/, kBlocking);
all_sent();
});
tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
@@ -608,8 +608,7 @@
static constexpr size_t kBufSize = 32 * 1024 * 1024;
std::unique_ptr<char[]> buf(new char[kBufSize]());
tx_task_runner.PostTask([&cli, &buf, send_done] {
- bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
- UnixSocket::BlockingMode::kBlocking);
+ bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/, kBlocking);
ASSERT_FALSE(send_res);
send_done();
});
@@ -620,6 +619,166 @@
tx_thread.join();
}
+TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
+ // Send a part of the first iov, then send the rest.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(1, &hdr);
+ EXPECT_NE(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
+ EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
+ EXPECT_EQ(hdr.msg_iovlen, 2);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
+ EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
+
+ ShiftMsgHdr(base::ArraySize(hello) - 1, &hdr);
+ EXPECT_EQ(hdr.msg_iov, &iov[1]);
+ EXPECT_EQ(hdr.msg_iovlen, 1);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
+ EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
+
+ ShiftMsgHdr(base::ArraySize(world), &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendFirstAndPartial) {
+ // Send first iov and part of the second iov, then send the rest.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(base::ArraySize(hello) + 1, &hdr);
+ EXPECT_NE(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 1);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
+ EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
+
+ ShiftMsgHdr(base::ArraySize(world) - 1, &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendEverything) {
+ // Send everything at once.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(base::ArraySize(world) + base::ArraySize(hello), &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+void Handler(int) {}
+
+int RollbackSigaction(const struct sigaction* act) {
+ return sigaction(SIGWINCH, act, nullptr);
+}
+
+TEST_F(UnixSocketTest, PartialSendMsgAll) {
+ int sv[2];
+ ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+ base::ScopedFile send_socket(sv[0]);
+ base::ScopedFile recv_socket(sv[1]);
+
+ // Set bufsize to minimum.
+ int bufsize = 1024;
+ ASSERT_EQ(setsockopt(*send_socket, SOL_SOCKET, SO_SNDBUF, &bufsize,
+ sizeof(bufsize)),
+ 0);
+ ASSERT_EQ(setsockopt(*recv_socket, SOL_SOCKET, SO_RCVBUF, &bufsize,
+ sizeof(bufsize)),
+ 0);
+
+ // Send something larger than send + recv kernel buffers combined to make
+ // sendmsg block.
+ char send_buf[8192];
+ // Make MSAN happy.
+ for (size_t i = 0; i < sizeof(send_buf); ++i)
+ send_buf[i] = static_cast<char>(i % 256);
+ char recv_buf[sizeof(send_buf)];
+
+ // Need to install signal handler to cause the interrupt to happen.
+ // man 3 pthread_kill:
+ // Signal dispositions are process-wide: if a signal handler is
+ // installed, the handler will be invoked in the thread thread, but if
+ // the disposition of the signal is "stop", "continue", or "terminate",
+ // this action will affect the whole process.
+ struct sigaction oldact;
+ struct sigaction newact = {};
+ newact.sa_handler = Handler;
+ ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
+ base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
+ rollback(&oldact);
+
+ auto blocked_thread = pthread_self();
+ std::thread th([blocked_thread, &recv_socket, &recv_buf] {
+ ssize_t rd = PERFETTO_EINTR(read(*recv_socket, recv_buf, 1));
+ ASSERT_EQ(rd, 1);
+ // We are now sure the other thread is in sendmsg, interrupt send.
+ ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
+ // Drain the socket to allow SendMsgAll to succeed.
+ size_t offset = 1;
+ while (offset < sizeof(recv_buf)) {
+ rd = PERFETTO_EINTR(
+ read(*recv_socket, recv_buf + offset, sizeof(recv_buf) - offset));
+ ASSERT_GE(rd, 0);
+ offset += static_cast<size_t>(rd);
+ }
+ });
+
+ // Test sending the send_buf in several chunks as an iov to exercise the
+ // more complicated code-paths of SendMsgAll.
+ struct msghdr hdr = {};
+ struct iovec iov[4];
+ static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
+ "Cannot split buffer into even pieces.");
+ constexpr size_t kChunkSize = sizeof(send_buf) / base::ArraySize(iov);
+ for (size_t i = 0; i < base::ArraySize(iov); ++i) {
+ iov[i].iov_base = send_buf + i * kChunkSize;
+ iov[i].iov_len = kChunkSize;
+ }
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ASSERT_EQ(SendMsgAll(*send_socket, &hdr, 0), sizeof(send_buf));
+ send_socket.reset();
+ th.join();
+ // Make sure the re-entry logic was actually triggered.
+ ASSERT_EQ(hdr.msg_iov, nullptr);
+ ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
+}
+
// TODO(primiano): add a test to check that in the case of a peer sending a fd
// and the other end just doing a recv (without taking it), the fd is closed and
// not left around.
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 445e3d0..050fcca 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -182,7 +182,8 @@
void Reply(const Frame& frame) {
auto buf = BufferedFrameDeserializer::Serialize(frame);
ASSERT_TRUE(client_sock->is_connected());
- EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd));
+ EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd,
+ base::UnixSocket::BlockingMode::kBlocking));
next_reply_fd = -1;
}
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index 3ff1f99..26b56c8 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -152,7 +152,8 @@
void SendFrame(const Frame& frame, int fd = -1) {
std::string buf = BufferedFrameDeserializer::Serialize(frame);
- ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd));
+ ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd,
+ base::UnixSocket::BlockingMode::kBlocking));
}
BufferedFrameDeserializer frame_deserializer_;
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 78e57c9..369cb71 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -62,6 +62,8 @@
sources = [
"client.cc",
"client.h",
+ "sampler.cc",
+ "sampler.h",
]
}
@@ -83,9 +85,11 @@
"client_unittest.cc",
"heapprofd_integrationtest.cc",
"record_reader_unittest.cc",
+ "sampler_unittest.cc",
"socket_listener_unittest.cc",
"string_interner_unittest.cc",
"unwinding_unittest.cc",
+ "wire_protocol_unittest.cc",
]
}
diff --git a/src/profiling/memory/sampler.cc b/src/profiling/memory/sampler.cc
new file mode 100644
index 0000000..eba955e
--- /dev/null
+++ b/src/profiling/memory/sampler.cc
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "perfetto/base/utils.h"
+
+namespace perfetto {
+namespace {
+ThreadLocalSamplingData* GetSpecific(pthread_key_t key,
+ void* (*unhooked_malloc)(size_t),
+ void (*unhooked_free)(void*)) {
+ // This should not be used with glibc as it might re-enter into malloc, see
+ // http://crbug.com/776475.
+ void* specific = pthread_getspecific(key);
+ if (specific == nullptr) {
+ specific = unhooked_malloc(sizeof(ThreadLocalSamplingData));
+ new (specific) ThreadLocalSamplingData(unhooked_free);
+ pthread_setspecific(key, specific);
+ }
+ return reinterpret_cast<ThreadLocalSamplingData*>(specific);
+}
+} // namespace
+
+// The algorithm below is a inspired by the Chromium sampling algorithm at
+// https://cs.chromium.org/search/?q=f:cc+symbol:AllocatorShimLogAlloc+package:%5Echromium$&type=cs
+
+int64_t ThreadLocalSamplingData::NextSampleInterval(double rate) {
+ std::exponential_distribution<double> dist(1 / rate);
+ int64_t next = static_cast<int64_t>(dist(random_engine_));
+ return next < 1 ? 1 : next;
+}
+
+size_t ThreadLocalSamplingData::ShouldSample(size_t sz, double rate) {
+ interval_to_next_sample_ -= sz;
+ size_t sz_multiplier = 0;
+ while (PERFETTO_UNLIKELY(interval_to_next_sample_ <= 0)) {
+ interval_to_next_sample_ += NextSampleInterval(rate);
+ ++sz_multiplier;
+ }
+ return sz_multiplier;
+}
+
+size_t ShouldSample(pthread_key_t key,
+ size_t sz,
+ double rate,
+ void* (*unhooked_malloc)(size_t),
+ void (*unhooked_free)(void*)) {
+ if (PERFETTO_UNLIKELY(sz >= rate))
+ return 1;
+ return GetSpecific(key, unhooked_malloc, unhooked_free)
+ ->ShouldSample(sz, rate);
+}
+
+void ThreadLocalSamplingData::KeyDestructor(void* ptr) {
+ ThreadLocalSamplingData* thread_local_data =
+ reinterpret_cast<ThreadLocalSamplingData*>(ptr);
+ void (*unhooked_free)(void*) = thread_local_data->unhooked_free_;
+ thread_local_data->~ThreadLocalSamplingData();
+ unhooked_free(ptr);
+}
+
+} // namespace perfetto
diff --git a/src/profiling/memory/sampler.h b/src/profiling/memory/sampler.h
new file mode 100644
index 0000000..879e2a2
--- /dev/null
+++ b/src/profiling/memory/sampler.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_SAMPLER_H_
+#define SRC_PROFILING_MEMORY_SAMPLER_H_
+
+#include <pthread.h>
+#include <stdint.h>
+
+#include <random>
+
+namespace perfetto {
+
+// This is the thread-local state needed to apply poission sampling to malloc
+// samples.
+//
+// We apply poisson sampling individually to each byte. The whole
+// allocation gets accounted as often as the number of sampled bytes it
+// contains.
+//
+// Googlers see go/chrome-shp for more details about the sampling (from
+// Chrome's heap profiler).
+class ThreadLocalSamplingData {
+ public:
+ ThreadLocalSamplingData(void (*unhooked_free)(void*))
+ : unhooked_free_(unhooked_free) {}
+ // Returns number of times a sample should be accounted. Due to how the
+ // poission sampling works, some samples should be accounted multiple times.
+ size_t ShouldSample(size_t sz, double rate);
+
+ // Destroy a TheadLocalSamplingData object after the pthread key has been
+ // deleted or when the thread shuts down. This uses unhooked_free passed in
+ // the constructor.
+ static void KeyDestructor(void* ptr);
+
+ private:
+ int64_t NextSampleInterval(double rate);
+ void (*unhooked_free_)(void*);
+ int64_t interval_to_next_sample_ = 0;
+ std::default_random_engine random_engine_;
+};
+
+// Returns number of times a sample should be accounted. Due to how the
+// poission sampling works, some samples should be accounted multiple times.
+//
+// Delegate to this thread's ThreadLocalSamplingData.
+//
+// We have to pass through the real malloc in order to allocate the TLS.
+size_t ShouldSample(pthread_key_t key,
+ size_t sz,
+ double rate,
+ void* (*unhooked_malloc)(size_t),
+ void (*unhooked_free)(void*));
+
+} // namespace perfetto
+
+#endif // SRC_PROFILING_MEMORY_SAMPLER_H_
diff --git a/src/profiling/memory/sampler_unittest.cc b/src/profiling/memory/sampler_unittest.cc
new file mode 100644
index 0000000..d598e71
--- /dev/null
+++ b/src/profiling/memory/sampler_unittest.cc
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(SamplerTest, TestLarge) {
+ pthread_key_t key;
+ ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+ 0);
+ EXPECT_EQ(ShouldSample(key, 1024, 512, malloc, free), 1);
+ pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmall) {
+ pthread_key_t key;
+ ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+ 0);
+ // As we initialize interval_to_next_sample_ with 0, the first sample
+ // should always get sampled.
+ EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+ pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmallFromThread) {
+ pthread_key_t key;
+ ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+ 0);
+ std::thread th([key] {
+ // As we initialize interval_to_next_sample_ with 0, the first sample
+ // should always get sampled.
+ EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+ });
+ std::thread th2([key] {
+ // The threads should have separate state.
+ EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+ });
+ th.join();
+ th2.join();
+ pthread_key_delete(key);
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index 58d5e8f..dd3c1a7 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -71,8 +71,10 @@
base::ScopedFile(open("/dev/null", O_RDONLY))};
int raw_fds[2] = {*fds[0], *fds[1]};
ASSERT_TRUE(client_socket->Send(&size, sizeof(size), raw_fds,
- base::ArraySize(raw_fds)));
- ASSERT_TRUE(client_socket->Send("1", 1));
+ base::ArraySize(raw_fds),
+ base::UnixSocket::BlockingMode::kBlocking));
+ ASSERT_TRUE(client_socket->Send("1", 1, -1,
+ base::UnixSocket::BlockingMode::kBlocking));
task_runner.RunUntilCheckpoint("callback.called");
}
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 1dcc8c0..8ed2780 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -238,6 +238,8 @@
FreeRecord& free_rec = rec->free_record;
FreePageEntry* entries = free_rec.metadata->entries;
uint64_t num_entries = free_rec.metadata->num_entries;
+ if (num_entries > kFreePageSize)
+ return;
for (size_t i = 0; i < num_entries; ++i) {
const FreePageEntry& entry = entries[i];
metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
index 39373f9..255bae2 100644
--- a/src/profiling/memory/wire_protocol.cc
+++ b/src/profiling/memory/wire_protocol.cc
@@ -17,6 +17,7 @@
#include "src/profiling/memory/wire_protocol.h"
#include "perfetto/base/logging.h"
+#include "perfetto/base/unix_socket.h"
#include "perfetto/base/utils.h"
#include <sys/socket.h>
@@ -29,8 +30,8 @@
bool ViewAndAdvance(char** ptr, T** out, const char* end) {
if (end - sizeof(T) < *ptr)
return false;
- *out = reinterpret_cast<T*>(ptr);
- ptr += sizeof(T);
+ *out = reinterpret_cast<T*>(*ptr);
+ *ptr += sizeof(T);
return true;
}
} // namespace
@@ -44,9 +45,11 @@
iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
iovecs[1].iov_len = sizeof(msg.record_type);
if (msg.alloc_header) {
+ PERFETTO_DCHECK(msg.record_type == RecordType::Malloc);
iovecs[2].iov_base = msg.alloc_header;
iovecs[2].iov_len = sizeof(*msg.alloc_header);
} else if (msg.free_header) {
+ PERFETTO_DCHECK(msg.record_type == RecordType::Free);
iovecs[2].iov_base = msg.free_header;
iovecs[2].iov_len = sizeof(*msg.free_header);
} else {
@@ -68,7 +71,7 @@
total_size = iovecs[1].iov_len + iovecs[2].iov_len;
}
- ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+ ssize_t sent = base::SendMsgAll(sock, &hdr, MSG_NOSIGNAL);
return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
}
@@ -77,23 +80,27 @@
char* end = buf + size;
if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
return false;
- switch (*record_type) {
- case RecordType::Malloc:
- if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
- return false;
- out->payload = buf;
- if (buf > end) {
- PERFETTO_DCHECK(false);
- return false;
- }
- out->payload_size = static_cast<size_t>(end - buf);
- break;
- case RecordType::Free:
- if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
- return false;
- break;
- }
+
+ out->payload = nullptr;
+ out->payload_size = 0;
out->record_type = *record_type;
+
+ if (*record_type == RecordType::Malloc) {
+ if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+ return false;
+ out->payload = buf;
+ if (buf > end) {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
+ out->payload_size = static_cast<size_t>(end - buf);
+ } else if (*record_type == RecordType::Free) {
+ if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+ return false;
+ } else {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
return true;
}
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
new file mode 100644
index 0000000..c9e8b22
--- /dev/null
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/record_reader.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+
+bool operator==(const AllocMetadata& one, const AllocMetadata& other);
+bool operator==(const AllocMetadata& one, const AllocMetadata& other) {
+ return std::tie(one.sequence_number, one.alloc_size, one.alloc_address,
+ one.stack_pointer, one.stack_pointer_offset, one.arch) ==
+ std::tie(other.sequence_number, other.alloc_size,
+ other.alloc_address, other.stack_pointer,
+ other.stack_pointer_offset, other.arch) &&
+ memcmp(one.register_data, other.register_data, kMaxRegisterDataSize) ==
+ 0;
+}
+
+bool operator==(const FreeMetadata& one, const FreeMetadata& other);
+bool operator==(const FreeMetadata& one, const FreeMetadata& other) {
+ if (one.num_entries != other.num_entries)
+ return false;
+ for (size_t i = 0; i < one.num_entries; ++i) {
+ if (std::tie(one.entries[i].sequence_number, one.entries[i].addr) !=
+ std::tie(other.entries[i].sequence_number, other.entries[i].addr))
+ return false;
+ }
+ return true;
+}
+
+namespace {
+
+RecordReader::Record ReceiveAll(int sock) {
+ RecordReader record_reader;
+ RecordReader::Record record;
+ bool received = false;
+ while (!received) {
+ RecordReader::ReceiveBuffer buf = record_reader.BeginReceive();
+ ssize_t rd = PERFETTO_EINTR(read(sock, buf.data, buf.size));
+ PERFETTO_CHECK(rd > 0);
+ auto status = record_reader.EndReceive(static_cast<size_t>(rd), &record);
+ switch (status) {
+ case (RecordReader::Result::Noop):
+ break;
+ case (RecordReader::Result::RecordReceived):
+ received = true;
+ break;
+ case (RecordReader::Result::KillConnection):
+ PERFETTO_CHECK(false);
+ break;
+ }
+ }
+ return record;
+}
+
+TEST(WireProtocolTest, AllocMessage) {
+ char payload[] = {0x77, 0x77, 0x77, 0x00};
+ WireMessage msg = {};
+ msg.record_type = RecordType::Malloc;
+ AllocMetadata metadata = {};
+ metadata.sequence_number = 0xA1A2A3A4A5A6A7A8;
+ metadata.alloc_size = 0xB1B2B3B4B5B6B7B8;
+ metadata.alloc_address = 0xC1C2C3C4C5C6C7C8;
+ metadata.stack_pointer = 0xD1D2D3D4D5D6D7D8;
+ metadata.stack_pointer_offset = 0xE1E2E3E4E5E6E7E8;
+ metadata.arch = unwindstack::ARCH_X86;
+ for (size_t i = 0; i < kMaxRegisterDataSize; ++i)
+ metadata.register_data[i] = 0x66;
+ msg.alloc_header = &metadata;
+ msg.payload = payload;
+ msg.payload_size = sizeof(payload);
+
+ int sv[2];
+ ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+ base::ScopedFile send_sock(sv[0]);
+ base::ScopedFile recv_sock(sv[1]);
+ ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+ RecordReader::Record record = ReceiveAll(*recv_sock);
+
+ WireMessage recv_msg;
+ ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+ record.size, &recv_msg));
+ ASSERT_EQ(recv_msg.record_type, msg.record_type);
+ ASSERT_EQ(*recv_msg.alloc_header, *msg.alloc_header);
+ ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+ ASSERT_STREQ(recv_msg.payload, msg.payload);
+}
+
+TEST(WireProtocolTest, FreeMessage) {
+ WireMessage msg = {};
+ msg.record_type = RecordType::Free;
+ FreeMetadata metadata = {};
+ metadata.num_entries = kFreePageSize;
+ for (size_t i = 0; i < kFreePageSize; ++i) {
+ metadata.entries[i].sequence_number = 0x111111111111111;
+ metadata.entries[i].addr = 0x222222222222222;
+ }
+ msg.free_header = &metadata;
+
+ int sv[2];
+ ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+ base::ScopedFile send_sock(sv[0]);
+ base::ScopedFile recv_sock(sv[1]);
+ ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+ RecordReader::Record record = ReceiveAll(*recv_sock);
+
+ WireMessage recv_msg;
+ ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+ record.size, &recv_msg));
+ ASSERT_EQ(recv_msg.record_type, msg.record_type);
+ ASSERT_EQ(*recv_msg.free_header, *msg.free_header);
+ ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/tracing/core/patch_list.h b/src/tracing/core/patch_list.h
index 5cef17d..bdfd028 100644
--- a/src/tracing/core/patch_list.h
+++ b/src/tracing/core/patch_list.h
@@ -53,7 +53,7 @@
}
private:
- Patch& operator=(const Patch&) = default;
+ Patch& operator=(const Patch&) = delete;
Patch(Patch&&) noexcept = delete;
Patch& operator=(Patch&&) = delete;
};
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index bdc28e5..3c32409 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -76,8 +76,8 @@
void OnDisconnect() override {}
- void SetupDataSource(DataSourceInstanceID,
- const DataSourceConfig& source_config) override {}
+ void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override {
+ }
void StartDataSource(DataSourceInstanceID,
const DataSourceConfig& source_config) override {