Merge "Ftrace: Add Setup/Start support for fast triggering"
diff --git a/Android.bp b/Android.bp
index e7f0d6c..541e66f 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3850,6 +3850,8 @@
     "src/profiling/memory/heapprofd_integrationtest.cc",
     "src/profiling/memory/record_reader.cc",
     "src/profiling/memory/record_reader_unittest.cc",
+    "src/profiling/memory/sampler.cc",
+    "src/profiling/memory/sampler_unittest.cc",
     "src/profiling/memory/socket_listener.cc",
     "src/profiling/memory/socket_listener_unittest.cc",
     "src/profiling/memory/string_interner.cc",
@@ -3857,6 +3859,7 @@
     "src/profiling/memory/unwinding.cc",
     "src/profiling/memory/unwinding_unittest.cc",
     "src/profiling/memory/wire_protocol.cc",
+    "src/profiling/memory/wire_protocol_unittest.cc",
     "src/protozero/message.cc",
     "src/protozero/message_handle.cc",
     "src/protozero/message_handle_unittest.cc",
diff --git a/include/perfetto/base/unix_socket.h b/include/perfetto/base/unix_socket.h
index 7daf5a2..4a613a0 100644
--- a/include/perfetto/base/unix_socket.h
+++ b/include/perfetto/base/unix_socket.h
@@ -54,6 +54,16 @@
 
 base::ScopedFile CreateSocket();
 
+// Update msghdr so subsequent sendmsg will send data that remains after n bytes
+// have already been sent.
+// This should not be used, it's exported for test use only.
+void ShiftMsgHdr(size_t n, struct msghdr* msg);
+
+// Re-enter sendmsg until all the data has been sent or an error occurs.
+//
+// TODO(fmayer): Figure out how to do timeouts here for heapprofd.
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags);
+
 // A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
 // transfer file descriptors. None of the methods in this class are blocking.
 // The main design goal is API simplicity and strong guarantees on the
@@ -170,6 +180,8 @@
   // EventListener::OnDisconnect() will be called.
   // If the socket is not connected, Send() will just return false.
   // Does not append a null string terminator to msg in any case.
+  //
+  // DO NOT PASS kNonBlocking, it is broken.
   bool Send(const void* msg,
             size_t len,
             int send_fd = -1,
@@ -179,7 +191,8 @@
             const int* send_fds,
             size_t num_fds,
             BlockingMode blocking = BlockingMode::kNonBlocking);
-  bool Send(const std::string& msg);
+  bool Send(const std::string& msg,
+            BlockingMode blockimg = BlockingMode::kNonBlocking);
 
   // Returns the number of bytes (<= |len|) written in |msg| or 0 if there
   // is no data in the buffer to read or an error occurs (in which case a
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index e3ed5f2..fb4f5ff 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -64,6 +64,55 @@
 #pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
 #endif
 
+void ShiftMsgHdr(size_t n, struct msghdr* msg) {
+  for (size_t i = 0; i < msg->msg_iovlen; ++i) {
+    struct iovec* vec = &msg->msg_iov[i];
+    if (n < vec->iov_len) {
+      // We sent a part of this iovec.
+      vec->iov_base = reinterpret_cast<char*>(vec->iov_base) + n;
+      vec->iov_len -= n;
+      msg->msg_iov = vec;
+      msg->msg_iovlen -= i;
+      return;
+    }
+    // We sent the whole iovec.
+    n -= vec->iov_len;
+  }
+  // We sent all the iovecs.
+  PERFETTO_CHECK(n == 0);
+  msg->msg_iovlen = 0;
+  msg->msg_iov = nullptr;
+}
+
+// For the interested reader, Linux kernel dive to verify this is not only a
+// theoretical possibility: sock_stream_sendmsg, if sock_alloc_send_pskb returns
+// NULL [1] (which it does when it gets interrupted [2]), returns early with the
+// amount of bytes already sent.
+//
+// [1]:
+// https://elixir.bootlin.com/linux/v4.18.10/source/net/unix/af_unix.c#L1872
+// [2]: https://elixir.bootlin.com/linux/v4.18.10/source/net/core/sock.c#L2101
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags) {
+  // This does not make sense on non-blocking sockets.
+  PERFETTO_DCHECK((fcntl(sockfd, F_GETFL, 0) & O_NONBLOCK) == 0);
+
+  ssize_t total_sent = 0;
+  while (msg->msg_iov) {
+    ssize_t sent = PERFETTO_EINTR(sendmsg(sockfd, msg, flags));
+    if (sent <= 0) {
+      if (sent == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+        return total_sent;
+      return sent;
+    }
+    total_sent += sent;
+    ShiftMsgHdr(static_cast<size_t>(sent), msg);
+    // Only send the ancillary data with the first sendmsg call.
+    msg->msg_control = nullptr;
+    msg->msg_controllen = 0;
+  };
+  return total_sent;
+};
+
 ssize_t SockSend(int fd,
                  const void* msg,
                  size_t len,
@@ -90,7 +139,7 @@
     msg_hdr.msg_controllen = cmsg->cmsg_len;
   }
 
-  return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
+  return SendMsgAll(fd, &msg_hdr, kNoSigPipe);
 }
 
 ssize_t SockReceive(int fd,
@@ -396,8 +445,8 @@
   }
 }
 
-bool UnixSocket::Send(const std::string& msg) {
-  return Send(msg.c_str(), msg.size() + 1);
+bool UnixSocket::Send(const std::string& msg, BlockingMode blocking) {
+  return Send(msg.c_str(), msg.size() + 1, -1, blocking);
 }
 
 bool UnixSocket::Send(const void* msg,
@@ -414,6 +463,10 @@
                       const int* send_fds,
                       size_t num_fds,
                       BlockingMode blocking_mode) {
+  // TODO(b/117139237): Non-blocking sends are broken because we do not
+  // properly handle partial sends.
+  PERFETTO_DCHECK(blocking_mode == BlockingMode::kBlocking);
+
   if (state_ != State::kConnected) {
     errno = last_error_ = ENOTCONN;
     return false;
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 5cdd6ab..382f9bc 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -41,6 +41,7 @@
 using ::testing::Mock;
 
 constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+constexpr auto kBlocking = UnixSocket::BlockingMode::kBlocking;
 
 class MockEventListener : public UnixSocket::EventListener {
  public:
@@ -115,7 +116,7 @@
   auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
   EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
       .WillOnce(InvokeWithoutArgs(cli_disconnected));
-  EXPECT_FALSE(cli->Send("whatever"));
+  EXPECT_FALSE(cli->Send("whatever", kBlocking));
   task_runner_.RunUntilCheckpoint("cli_disconnected");
 }
 
@@ -153,8 +154,8 @@
         ASSERT_EQ("cli>srv", s->ReceiveString());
         srv_did_recv();
       }));
-  ASSERT_TRUE(cli->Send("cli>srv"));
-  ASSERT_TRUE(srv_conn->Send("srv>cli"));
+  ASSERT_TRUE(cli->Send("cli>srv", kBlocking));
+  ASSERT_TRUE(srv_conn->Send("srv>cli", kBlocking));
   task_runner_.RunUntilCheckpoint("cli_did_recv");
   task_runner_.RunUntilCheckpoint("srv_did_recv");
 
@@ -168,8 +169,8 @@
   ASSERT_EQ("", cli->ReceiveString());
   ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
   ASSERT_EQ("", srv_conn->ReceiveString());
-  ASSERT_FALSE(cli->Send("foo"));
-  ASSERT_FALSE(srv_conn->Send("bar"));
+  ASSERT_FALSE(cli->Send("foo", kBlocking));
+  ASSERT_FALSE(srv_conn->Send("bar", kBlocking));
   srv->Shutdown(true);
   task_runner_.RunUntilCheckpoint("cli_disconnected");
   task_runner_.RunUntilCheckpoint("srv_disconnected");
@@ -244,10 +245,10 @@
 
   int buf_fd[2] = {null_fd.get(), zero_fd.get()};
 
-  ASSERT_TRUE(
-      cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
+  ASSERT_TRUE(cli->Send(cli_str, sizeof(cli_str), buf_fd,
+                        base::ArraySize(buf_fd), kBlocking));
   ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
-                             base::ArraySize(buf_fd)));
+                             base::ArraySize(buf_fd), kBlocking));
   task_runner_.RunUntilCheckpoint("srv_did_recv");
   task_runner_.RunUntilCheckpoint("cli_did_recv");
 
@@ -300,7 +301,7 @@
         EXPECT_CALL(event_listener_, OnDataAvailable(s))
             .WillOnce(Invoke([](UnixSocket* t) {
               ASSERT_EQ("PING", t->ReceiveString());
-              ASSERT_TRUE(t->Send("PONG"));
+              ASSERT_TRUE(t->Send("PONG", kBlocking));
             }));
       }));
 
@@ -309,7 +310,7 @@
     EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
         .WillOnce(Invoke([](UnixSocket* s, bool success) {
           ASSERT_TRUE(success);
-          ASSERT_TRUE(s->Send("PING"));
+          ASSERT_TRUE(s->Send("PING", kBlocking));
         }));
 
     auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
@@ -356,7 +357,7 @@
         .WillOnce(Invoke(
             [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
               ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
-              ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+              ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd, kBlocking));
               // Wait for the client to change this again.
               EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
                   .WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
@@ -391,7 +392,7 @@
 
           // Now change the shared memory and ping the other process.
           memcpy(mem, "rock more", 10);
-          ASSERT_TRUE(s->Send("change notify"));
+          ASSERT_TRUE(s->Send("change notify", kBlocking));
           checkpoint();
         }));
     task_runner_.RunUntilCheckpoint("change_seen_by_client");
@@ -409,7 +410,7 @@
                               int num_frame) {
   char buf[kAtomicWrites_FrameSize];
   memset(buf, static_cast<char>(num_frame), sizeof(buf));
-  if (s->Send(buf, sizeof(buf)))
+  if (s->Send(buf, sizeof(buf), -1, kBlocking))
     return true;
   task_runner->PostTask(
       std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
@@ -560,8 +561,7 @@
     char buf[1024 * 32] = {};
     tx_task_runner.PostTask([&cli, &buf, all_sent] {
       for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
-        cli->Send(buf, sizeof(buf), -1 /*fd*/,
-                  UnixSocket::BlockingMode::kBlocking);
+        cli->Send(buf, sizeof(buf), -1 /*fd*/, kBlocking);
       all_sent();
     });
     tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
@@ -608,8 +608,7 @@
     static constexpr size_t kBufSize = 32 * 1024 * 1024;
     std::unique_ptr<char[]> buf(new char[kBufSize]());
     tx_task_runner.PostTask([&cli, &buf, send_done] {
-      bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
-                                UnixSocket::BlockingMode::kBlocking);
+      bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/, kBlocking);
       ASSERT_FALSE(send_res);
       send_done();
     });
@@ -620,6 +619,166 @@
   tx_thread.join();
 }
 
+TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
+  // Send a part of the first iov, then send the rest.
+  struct iovec iov[2] = {};
+  char hello[] = "hello";
+  char world[] = "world";
+  iov[0].iov_base = &hello[0];
+  iov[0].iov_len = base::ArraySize(hello);
+
+  iov[1].iov_base = &world[0];
+  iov[1].iov_len = base::ArraySize(world);
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ShiftMsgHdr(1, &hdr);
+  EXPECT_NE(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
+  EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
+  EXPECT_EQ(hdr.msg_iovlen, 2);
+  EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
+  EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
+
+  ShiftMsgHdr(base::ArraySize(hello) - 1, &hdr);
+  EXPECT_EQ(hdr.msg_iov, &iov[1]);
+  EXPECT_EQ(hdr.msg_iovlen, 1);
+  EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
+  EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
+
+  ShiftMsgHdr(base::ArraySize(world), &hdr);
+  EXPECT_EQ(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendFirstAndPartial) {
+  // Send first iov and part of the second iov, then send the rest.
+  struct iovec iov[2] = {};
+  char hello[] = "hello";
+  char world[] = "world";
+  iov[0].iov_base = &hello[0];
+  iov[0].iov_len = base::ArraySize(hello);
+
+  iov[1].iov_base = &world[0];
+  iov[1].iov_len = base::ArraySize(world);
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ShiftMsgHdr(base::ArraySize(hello) + 1, &hdr);
+  EXPECT_NE(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 1);
+  EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
+  EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
+
+  ShiftMsgHdr(base::ArraySize(world) - 1, &hdr);
+  EXPECT_EQ(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendEverything) {
+  // Send everything at once.
+  struct iovec iov[2] = {};
+  char hello[] = "hello";
+  char world[] = "world";
+  iov[0].iov_base = &hello[0];
+  iov[0].iov_len = base::ArraySize(hello);
+
+  iov[1].iov_base = &world[0];
+  iov[1].iov_len = base::ArraySize(world);
+
+  struct msghdr hdr = {};
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ShiftMsgHdr(base::ArraySize(world) + base::ArraySize(hello), &hdr);
+  EXPECT_EQ(hdr.msg_iov, nullptr);
+  EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+void Handler(int) {}
+
+int RollbackSigaction(const struct sigaction* act) {
+  return sigaction(SIGWINCH, act, nullptr);
+}
+
+TEST_F(UnixSocketTest, PartialSendMsgAll) {
+  int sv[2];
+  ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+  base::ScopedFile send_socket(sv[0]);
+  base::ScopedFile recv_socket(sv[1]);
+
+  // Set bufsize to minimum.
+  int bufsize = 1024;
+  ASSERT_EQ(setsockopt(*send_socket, SOL_SOCKET, SO_SNDBUF, &bufsize,
+                       sizeof(bufsize)),
+            0);
+  ASSERT_EQ(setsockopt(*recv_socket, SOL_SOCKET, SO_RCVBUF, &bufsize,
+                       sizeof(bufsize)),
+            0);
+
+  // Send something larger than send + recv kernel buffers combined to make
+  // sendmsg block.
+  char send_buf[8192];
+  // Make MSAN happy.
+  for (size_t i = 0; i < sizeof(send_buf); ++i)
+    send_buf[i] = static_cast<char>(i % 256);
+  char recv_buf[sizeof(send_buf)];
+
+  // Need to install signal handler to cause the interrupt to happen.
+  // man 3 pthread_kill:
+  //   Signal dispositions are process-wide: if a signal handler is
+  //   installed, the handler will be invoked in the thread thread, but if
+  //   the disposition of the signal is "stop", "continue", or "terminate",
+  //   this action will affect the whole process.
+  struct sigaction oldact;
+  struct sigaction newact = {};
+  newact.sa_handler = Handler;
+  ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
+  base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
+      rollback(&oldact);
+
+  auto blocked_thread = pthread_self();
+  std::thread th([blocked_thread, &recv_socket, &recv_buf] {
+    ssize_t rd = PERFETTO_EINTR(read(*recv_socket, recv_buf, 1));
+    ASSERT_EQ(rd, 1);
+    // We are now sure the other thread is in sendmsg, interrupt send.
+    ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
+    // Drain the socket to allow SendMsgAll to succeed.
+    size_t offset = 1;
+    while (offset < sizeof(recv_buf)) {
+      rd = PERFETTO_EINTR(
+          read(*recv_socket, recv_buf + offset, sizeof(recv_buf) - offset));
+      ASSERT_GE(rd, 0);
+      offset += static_cast<size_t>(rd);
+    }
+  });
+
+  // Test sending the send_buf in several chunks as an iov to exercise the
+  // more complicated code-paths of SendMsgAll.
+  struct msghdr hdr = {};
+  struct iovec iov[4];
+  static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
+                "Cannot split buffer into even pieces.");
+  constexpr size_t kChunkSize = sizeof(send_buf) / base::ArraySize(iov);
+  for (size_t i = 0; i < base::ArraySize(iov); ++i) {
+    iov[i].iov_base = send_buf + i * kChunkSize;
+    iov[i].iov_len = kChunkSize;
+  }
+  hdr.msg_iov = iov;
+  hdr.msg_iovlen = base::ArraySize(iov);
+
+  ASSERT_EQ(SendMsgAll(*send_socket, &hdr, 0), sizeof(send_buf));
+  send_socket.reset();
+  th.join();
+  // Make sure the re-entry logic was actually triggered.
+  ASSERT_EQ(hdr.msg_iov, nullptr);
+  ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
+}
+
 // TODO(primiano): add a test to check that in the case of a peer sending a fd
 // and the other end just doing a recv (without taking it), the fd is closed and
 // not left around.
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 445e3d0..050fcca 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -182,7 +182,8 @@
   void Reply(const Frame& frame) {
     auto buf = BufferedFrameDeserializer::Serialize(frame);
     ASSERT_TRUE(client_sock->is_connected());
-    EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd));
+    EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd,
+                                  base::UnixSocket::BlockingMode::kBlocking));
     next_reply_fd = -1;
   }
 
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index 3ff1f99..26b56c8 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -152,7 +152,8 @@
 
   void SendFrame(const Frame& frame, int fd = -1) {
     std::string buf = BufferedFrameDeserializer::Serialize(frame);
-    ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd));
+    ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd,
+                            base::UnixSocket::BlockingMode::kBlocking));
   }
 
   BufferedFrameDeserializer frame_deserializer_;
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 78e57c9..369cb71 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -62,6 +62,8 @@
   sources = [
     "client.cc",
     "client.h",
+    "sampler.cc",
+    "sampler.h",
   ]
 }
 
@@ -83,9 +85,11 @@
     "client_unittest.cc",
     "heapprofd_integrationtest.cc",
     "record_reader_unittest.cc",
+    "sampler_unittest.cc",
     "socket_listener_unittest.cc",
     "string_interner_unittest.cc",
     "unwinding_unittest.cc",
+    "wire_protocol_unittest.cc",
   ]
 }
 
diff --git a/src/profiling/memory/sampler.cc b/src/profiling/memory/sampler.cc
new file mode 100644
index 0000000..eba955e
--- /dev/null
+++ b/src/profiling/memory/sampler.cc
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "perfetto/base/utils.h"
+
+namespace perfetto {
+namespace {
+ThreadLocalSamplingData* GetSpecific(pthread_key_t key,
+                                     void* (*unhooked_malloc)(size_t),
+                                     void (*unhooked_free)(void*)) {
+  // This should not be used with glibc as it might re-enter into malloc, see
+  // http://crbug.com/776475.
+  void* specific = pthread_getspecific(key);
+  if (specific == nullptr) {
+    specific = unhooked_malloc(sizeof(ThreadLocalSamplingData));
+    new (specific) ThreadLocalSamplingData(unhooked_free);
+    pthread_setspecific(key, specific);
+  }
+  return reinterpret_cast<ThreadLocalSamplingData*>(specific);
+}
+}  // namespace
+
+// The algorithm below is a inspired by the Chromium sampling algorithm at
+// https://cs.chromium.org/search/?q=f:cc+symbol:AllocatorShimLogAlloc+package:%5Echromium$&type=cs
+
+int64_t ThreadLocalSamplingData::NextSampleInterval(double rate) {
+  std::exponential_distribution<double> dist(1 / rate);
+  int64_t next = static_cast<int64_t>(dist(random_engine_));
+  return next < 1 ? 1 : next;
+}
+
+size_t ThreadLocalSamplingData::ShouldSample(size_t sz, double rate) {
+  interval_to_next_sample_ -= sz;
+  size_t sz_multiplier = 0;
+  while (PERFETTO_UNLIKELY(interval_to_next_sample_ <= 0)) {
+    interval_to_next_sample_ += NextSampleInterval(rate);
+    ++sz_multiplier;
+  }
+  return sz_multiplier;
+}
+
+size_t ShouldSample(pthread_key_t key,
+                    size_t sz,
+                    double rate,
+                    void* (*unhooked_malloc)(size_t),
+                    void (*unhooked_free)(void*)) {
+  if (PERFETTO_UNLIKELY(sz >= rate))
+    return 1;
+  return GetSpecific(key, unhooked_malloc, unhooked_free)
+      ->ShouldSample(sz, rate);
+}
+
+void ThreadLocalSamplingData::KeyDestructor(void* ptr) {
+  ThreadLocalSamplingData* thread_local_data =
+      reinterpret_cast<ThreadLocalSamplingData*>(ptr);
+  void (*unhooked_free)(void*) = thread_local_data->unhooked_free_;
+  thread_local_data->~ThreadLocalSamplingData();
+  unhooked_free(ptr);
+}
+
+}  // namespace perfetto
diff --git a/src/profiling/memory/sampler.h b/src/profiling/memory/sampler.h
new file mode 100644
index 0000000..879e2a2
--- /dev/null
+++ b/src/profiling/memory/sampler.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_SAMPLER_H_
+#define SRC_PROFILING_MEMORY_SAMPLER_H_
+
+#include <pthread.h>
+#include <stdint.h>
+
+#include <random>
+
+namespace perfetto {
+
+// This is the thread-local state needed to apply poission sampling to malloc
+// samples.
+//
+// We apply poisson sampling individually to each byte. The whole
+// allocation gets accounted as often as the number of sampled bytes it
+// contains.
+//
+// Googlers see go/chrome-shp for more details about the sampling (from
+// Chrome's heap profiler).
+class ThreadLocalSamplingData {
+ public:
+  ThreadLocalSamplingData(void (*unhooked_free)(void*))
+      : unhooked_free_(unhooked_free) {}
+  // Returns number of times a sample should be accounted. Due to how the
+  // poission sampling works, some samples should be accounted multiple times.
+  size_t ShouldSample(size_t sz, double rate);
+
+  // Destroy a TheadLocalSamplingData object after the pthread key has been
+  // deleted or when the thread shuts down. This uses unhooked_free passed in
+  // the constructor.
+  static void KeyDestructor(void* ptr);
+
+ private:
+  int64_t NextSampleInterval(double rate);
+  void (*unhooked_free_)(void*);
+  int64_t interval_to_next_sample_ = 0;
+  std::default_random_engine random_engine_;
+};
+
+// Returns number of times a sample should be accounted. Due to how the
+// poission sampling works, some samples should be accounted multiple times.
+//
+// Delegate to this thread's ThreadLocalSamplingData.
+//
+// We have to pass through the real malloc in order to allocate the TLS.
+size_t ShouldSample(pthread_key_t key,
+                    size_t sz,
+                    double rate,
+                    void* (*unhooked_malloc)(size_t),
+                    void (*unhooked_free)(void*));
+
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_MEMORY_SAMPLER_H_
diff --git a/src/profiling/memory/sampler_unittest.cc b/src/profiling/memory/sampler_unittest.cc
new file mode 100644
index 0000000..d598e71
--- /dev/null
+++ b/src/profiling/memory/sampler_unittest.cc
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(SamplerTest, TestLarge) {
+  pthread_key_t key;
+  ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+            0);
+  EXPECT_EQ(ShouldSample(key, 1024, 512, malloc, free), 1);
+  pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmall) {
+  pthread_key_t key;
+  ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+            0);
+  // As we initialize interval_to_next_sample_ with 0, the first sample
+  // should always get sampled.
+  EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+  pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmallFromThread) {
+  pthread_key_t key;
+  ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+            0);
+  std::thread th([key] {
+    // As we initialize interval_to_next_sample_ with 0, the first sample
+    // should always get sampled.
+    EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+  });
+  std::thread th2([key] {
+    // The threads should have separate state.
+    EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+  });
+  th.join();
+  th2.join();
+  pthread_key_delete(key);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index 58d5e8f..dd3c1a7 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -71,8 +71,10 @@
                              base::ScopedFile(open("/dev/null", O_RDONLY))};
   int raw_fds[2] = {*fds[0], *fds[1]};
   ASSERT_TRUE(client_socket->Send(&size, sizeof(size), raw_fds,
-                                  base::ArraySize(raw_fds)));
-  ASSERT_TRUE(client_socket->Send("1", 1));
+                                  base::ArraySize(raw_fds),
+                                  base::UnixSocket::BlockingMode::kBlocking));
+  ASSERT_TRUE(client_socket->Send("1", 1, -1,
+                                  base::UnixSocket::BlockingMode::kBlocking));
 
   task_runner.RunUntilCheckpoint("callback.called");
 }
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 1dcc8c0..8ed2780 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -238,6 +238,8 @@
     FreeRecord& free_rec = rec->free_record;
     FreePageEntry* entries = free_rec.metadata->entries;
     uint64_t num_entries = free_rec.metadata->num_entries;
+    if (num_entries > kFreePageSize)
+      return;
     for (size_t i = 0; i < num_entries; ++i) {
       const FreePageEntry& entry = entries[i];
       metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
index 39373f9..255bae2 100644
--- a/src/profiling/memory/wire_protocol.cc
+++ b/src/profiling/memory/wire_protocol.cc
@@ -17,6 +17,7 @@
 #include "src/profiling/memory/wire_protocol.h"
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/unix_socket.h"
 #include "perfetto/base/utils.h"
 
 #include <sys/socket.h>
@@ -29,8 +30,8 @@
 bool ViewAndAdvance(char** ptr, T** out, const char* end) {
   if (end - sizeof(T) < *ptr)
     return false;
-  *out = reinterpret_cast<T*>(ptr);
-  ptr += sizeof(T);
+  *out = reinterpret_cast<T*>(*ptr);
+  *ptr += sizeof(T);
   return true;
 }
 }  // namespace
@@ -44,9 +45,11 @@
   iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
   iovecs[1].iov_len = sizeof(msg.record_type);
   if (msg.alloc_header) {
+    PERFETTO_DCHECK(msg.record_type == RecordType::Malloc);
     iovecs[2].iov_base = msg.alloc_header;
     iovecs[2].iov_len = sizeof(*msg.alloc_header);
   } else if (msg.free_header) {
+    PERFETTO_DCHECK(msg.record_type == RecordType::Free);
     iovecs[2].iov_base = msg.free_header;
     iovecs[2].iov_len = sizeof(*msg.free_header);
   } else {
@@ -68,7 +71,7 @@
     total_size = iovecs[1].iov_len + iovecs[2].iov_len;
   }
 
-  ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+  ssize_t sent = base::SendMsgAll(sock, &hdr, MSG_NOSIGNAL);
   return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
 }
 
@@ -77,23 +80,27 @@
   char* end = buf + size;
   if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
     return false;
-  switch (*record_type) {
-    case RecordType::Malloc:
-      if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
-        return false;
-      out->payload = buf;
-      if (buf > end) {
-        PERFETTO_DCHECK(false);
-        return false;
-      }
-      out->payload_size = static_cast<size_t>(end - buf);
-      break;
-    case RecordType::Free:
-      if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
-        return false;
-      break;
-  }
+
+  out->payload = nullptr;
+  out->payload_size = 0;
   out->record_type = *record_type;
+
+  if (*record_type == RecordType::Malloc) {
+    if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+      return false;
+    out->payload = buf;
+    if (buf > end) {
+      PERFETTO_DCHECK(false);
+      return false;
+    }
+    out->payload_size = static_cast<size_t>(end - buf);
+  } else if (*record_type == RecordType::Free) {
+    if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+      return false;
+  } else {
+    PERFETTO_DCHECK(false);
+    return false;
+  }
   return true;
 }
 
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
new file mode 100644
index 0000000..c9e8b22
--- /dev/null
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/record_reader.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+
+bool operator==(const AllocMetadata& one, const AllocMetadata& other);
+bool operator==(const AllocMetadata& one, const AllocMetadata& other) {
+  return std::tie(one.sequence_number, one.alloc_size, one.alloc_address,
+                  one.stack_pointer, one.stack_pointer_offset, one.arch) ==
+             std::tie(other.sequence_number, other.alloc_size,
+                      other.alloc_address, other.stack_pointer,
+                      other.stack_pointer_offset, other.arch) &&
+         memcmp(one.register_data, other.register_data, kMaxRegisterDataSize) ==
+             0;
+}
+
+bool operator==(const FreeMetadata& one, const FreeMetadata& other);
+bool operator==(const FreeMetadata& one, const FreeMetadata& other) {
+  if (one.num_entries != other.num_entries)
+    return false;
+  for (size_t i = 0; i < one.num_entries; ++i) {
+    if (std::tie(one.entries[i].sequence_number, one.entries[i].addr) !=
+        std::tie(other.entries[i].sequence_number, other.entries[i].addr))
+      return false;
+  }
+  return true;
+}
+
+namespace {
+
+RecordReader::Record ReceiveAll(int sock) {
+  RecordReader record_reader;
+  RecordReader::Record record;
+  bool received = false;
+  while (!received) {
+    RecordReader::ReceiveBuffer buf = record_reader.BeginReceive();
+    ssize_t rd = PERFETTO_EINTR(read(sock, buf.data, buf.size));
+    PERFETTO_CHECK(rd > 0);
+    auto status = record_reader.EndReceive(static_cast<size_t>(rd), &record);
+    switch (status) {
+      case (RecordReader::Result::Noop):
+        break;
+      case (RecordReader::Result::RecordReceived):
+        received = true;
+        break;
+      case (RecordReader::Result::KillConnection):
+        PERFETTO_CHECK(false);
+        break;
+    }
+  }
+  return record;
+}
+
+TEST(WireProtocolTest, AllocMessage) {
+  char payload[] = {0x77, 0x77, 0x77, 0x00};
+  WireMessage msg = {};
+  msg.record_type = RecordType::Malloc;
+  AllocMetadata metadata = {};
+  metadata.sequence_number = 0xA1A2A3A4A5A6A7A8;
+  metadata.alloc_size = 0xB1B2B3B4B5B6B7B8;
+  metadata.alloc_address = 0xC1C2C3C4C5C6C7C8;
+  metadata.stack_pointer = 0xD1D2D3D4D5D6D7D8;
+  metadata.stack_pointer_offset = 0xE1E2E3E4E5E6E7E8;
+  metadata.arch = unwindstack::ARCH_X86;
+  for (size_t i = 0; i < kMaxRegisterDataSize; ++i)
+    metadata.register_data[i] = 0x66;
+  msg.alloc_header = &metadata;
+  msg.payload = payload;
+  msg.payload_size = sizeof(payload);
+
+  int sv[2];
+  ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+  base::ScopedFile send_sock(sv[0]);
+  base::ScopedFile recv_sock(sv[1]);
+  ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+  RecordReader::Record record = ReceiveAll(*recv_sock);
+
+  WireMessage recv_msg;
+  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+                                 record.size, &recv_msg));
+  ASSERT_EQ(recv_msg.record_type, msg.record_type);
+  ASSERT_EQ(*recv_msg.alloc_header, *msg.alloc_header);
+  ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+  ASSERT_STREQ(recv_msg.payload, msg.payload);
+}
+
+TEST(WireProtocolTest, FreeMessage) {
+  WireMessage msg = {};
+  msg.record_type = RecordType::Free;
+  FreeMetadata metadata = {};
+  metadata.num_entries = kFreePageSize;
+  for (size_t i = 0; i < kFreePageSize; ++i) {
+    metadata.entries[i].sequence_number = 0x111111111111111;
+    metadata.entries[i].addr = 0x222222222222222;
+  }
+  msg.free_header = &metadata;
+
+  int sv[2];
+  ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+  base::ScopedFile send_sock(sv[0]);
+  base::ScopedFile recv_sock(sv[1]);
+  ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+  RecordReader::Record record = ReceiveAll(*recv_sock);
+
+  WireMessage recv_msg;
+  ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+                                 record.size, &recv_msg));
+  ASSERT_EQ(recv_msg.record_type, msg.record_type);
+  ASSERT_EQ(*recv_msg.free_header, *msg.free_header);
+  ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/tracing/core/patch_list.h b/src/tracing/core/patch_list.h
index 5cef17d..bdfd028 100644
--- a/src/tracing/core/patch_list.h
+++ b/src/tracing/core/patch_list.h
@@ -53,7 +53,7 @@
   }
 
  private:
-  Patch& operator=(const Patch&) = default;
+  Patch& operator=(const Patch&) = delete;
   Patch(Patch&&) noexcept = delete;
   Patch& operator=(Patch&&) = delete;
 };
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index bdc28e5..3c32409 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -76,8 +76,8 @@
 
   void OnDisconnect() override {}
 
-  void SetupDataSource(DataSourceInstanceID,
-                       const DataSourceConfig& source_config) override {}
+  void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override {
+  }
 
   void StartDataSource(DataSourceInstanceID,
                        const DataSourceConfig& source_config) override {