heapprofd: remove Client.FreePage nesting, and try to make names more consistent

Mostly a follow-up to the following comment thread:
https://android-review.git.corp.google.com/c/platform/external/perfetto/+/915280/10/src/profiling/memory/client.h#49

Primary renames:
FreeMetadata -> FreeBatch
kFreePageSize -> kFreeBatchSize
FreePageEntry -> FreeBatchEntry

Note that while I renamed FreeMetadata -> FreeBatch, I kept AllocMetadata (see
wire_protocol.h). I think this is still clearer than before, but open to
discussion.

Bonus: drive-by fixes because presubmit turnaround times make small patches painful.
Change-Id: Id34a785973b36279fce8cea56d2934bbe71f6741
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 2005e6a..30d1ee4 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -109,11 +109,11 @@
     "heapprofd_producer.cc",
     "heapprofd_producer.h",
     "interner.h",
-    "queue_messages.h",
     "system_property.cc",
     "system_property.h",
     "unwinding.cc",
     "unwinding.h",
+    "unwound_messages.h",
   ]
 }
 
diff --git a/src/profiling/memory/bookkeeping.h b/src/profiling/memory/bookkeeping.h
index 985adb5..b6e0404 100644
--- a/src/profiling/memory/bookkeeping.h
+++ b/src/profiling/memory/bookkeeping.h
@@ -27,7 +27,7 @@
 #include "perfetto/trace/trace_packet.pbzero.h"
 #include "perfetto/tracing/core/trace_writer.h"
 #include "src/profiling/memory/interner.h"
-#include "src/profiling/memory/queue_messages.h"
+#include "src/profiling/memory/unwound_messages.h"
 
 // Below is an illustration of the bookkeeping system state where
 // PID 1 does the following allocations:
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index 5050f5b..ee7887a 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -98,40 +98,6 @@
 
 }  // namespace
 
-bool FreePage::Add(const uint64_t addr,
-                   const uint64_t sequence_number,
-                   Client* client) {
-  std::unique_lock<std::timed_mutex> l(mutex_, kLockTimeout);
-  if (!l.owns_lock())
-    return false;
-  if (free_page_.num_entries == kFreePageSize) {
-    if (!client->FlushFrees(&free_page_))
-      return false;
-    // Now that we have flushed, reset to after the header.
-    free_page_.num_entries = 0;
-  }
-  FreePageEntry& current_entry = free_page_.entries[free_page_.num_entries++];
-  current_entry.sequence_number = sequence_number;
-  current_entry.addr = addr;
-  return true;
-}
-
-bool Client::FlushFrees(FreeMetadata* free_metadata) {
-  WireMessage msg = {};
-  msg.record_type = RecordType::Free;
-  msg.free_header = free_metadata;
-  if (!SendWireMessage(&shmem_, msg)) {
-    PERFETTO_PLOG("Failed to send wire message");
-    Shutdown();
-    return false;
-  }
-  if (sock_.Send(kSingleByte, sizeof(kSingleByte)) == -1) {
-    Shutdown();
-    return false;
-  }
-  return true;
-}
-
 const char* GetThreadStackBase() {
   pthread_attr_t attr;
   if (pthread_getattr_np(pthread_self(), &attr) != 0)
@@ -287,17 +253,52 @@
   return true;
 }
 
-bool Client::RecordFree(uint64_t alloc_address) {
+bool Client::RecordFree(const uint64_t alloc_address) {
   if (!inited_.load(std::memory_order_acquire))
     return false;
-  bool success = free_page_.Add(
+  bool success = AddFreeToBatch(
       alloc_address,
-      1 + sequence_number_.fetch_add(1, std::memory_order_acq_rel), this);
+      1 + sequence_number_.fetch_add(1, std::memory_order_acq_rel));
   if (!success)
     Shutdown();
   return success;
 }
 
+// TODO(rsavitski): consider inlining into RecordFree.
+bool Client::AddFreeToBatch(const uint64_t addr,
+                            const uint64_t sequence_number) {
+  std::unique_lock<std::timed_mutex> l(free_batch_lock_, kLockTimeout);
+  if (!l.owns_lock())
+    return false;
+  if (free_batch_.num_entries == kFreeBatchSize) {
+    if (!FlushFreesLocked())
+      return false;
+    // Flushed the contents of the buffer, reset it for reuse.
+    free_batch_.num_entries = 0;
+  }
+  FreeBatchEntry& current_entry =
+      free_batch_.entries[free_batch_.num_entries++];
+  current_entry.sequence_number = sequence_number;
+  current_entry.addr = addr;
+  return true;
+}
+
+bool Client::FlushFreesLocked() {
+  WireMessage msg = {};
+  msg.record_type = RecordType::Free;
+  msg.free_header = &free_batch_;
+  if (!SendWireMessage(&shmem_, msg)) {
+    PERFETTO_PLOG("Failed to send wire message");
+    Shutdown();
+    return false;
+  }
+  if (sock_.Send(kSingleByte, sizeof(kSingleByte)) == -1) {
+    Shutdown();
+    return false;
+  }
+  return true;
+}
+
 void Client::Shutdown() {
   inited_.store(false, std::memory_order_release);
 }
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index adf9e54..baef4ca 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -32,25 +32,6 @@
 namespace perfetto {
 namespace profiling {
 
-class Client;
-
-// Cache for frees that have been observed. It is infeasible to send every
-// free separately, so we batch and send the whole buffer once it is full.
-class FreePage {
- public:
-  FreePage() { free_page_.num_entries = 0; }
-
-  // Add address to buffer. Flush if necessary.
-  // Can be called from any thread. Must not hold mutex_.
-  bool Add(const uint64_t addr, uint64_t sequence_number, Client* client);
-
- private:
-  // TODO(fmayer): Sort out naming. It's confusing data FreePage has a member
-  // called free_page_ that is of type FreeMetadata.
-  FreeMetadata free_page_;
-  std::timed_mutex mutex_;
-};
-
 const char* GetThreadStackBase();
 
 constexpr uint32_t kClientSockTimeoutMs = 1000;
@@ -71,7 +52,6 @@
                     uint64_t alloc_address);
   bool RecordFree(uint64_t alloc_address);
   void Shutdown();
-  bool FlushFrees(FreeMetadata* free_metadata);
 
   // Returns the number of bytes to assign to an allocation with the given
   // |alloc_size|, based on the current sampling rate. A return value of zero
@@ -91,6 +71,12 @@
  private:
   const char* GetStackBase();
 
+  // Add address to buffer of deallocations. Flush if necessary.
+  // Can be called from any thread. Must not hold free_batch_lock_.
+  bool AddFreeToBatch(uint64_t addr, uint64_t sequence_number);
+  // Flush the contents of free_batch_. Must hold free_batch_lock_.
+  bool FlushFreesLocked();
+
   // TODO(rsavitski): used to check if the client is completely initialized
   // after construction. The reads in RecordFree & GetSampleSizeLocked are no
   // longer necessary (was an optimization to not do redundant work after
@@ -101,7 +87,11 @@
   // sampler_ operations are not thread-safe.
   Sampler sampler_;
   base::UnixSocketRaw sock_;
-  FreePage free_page_;
+
+  // Protected by free_batch_lock_.
+  FreeBatch free_batch_;
+  std::timed_mutex free_batch_lock_;
+
   const char* main_thread_stack_base_ = nullptr;
   std::atomic<uint64_t> sequence_number_{0};
   SharedRingBuffer shmem_;
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
index 894181d..34f9e49 100644
--- a/src/profiling/memory/heapprofd_producer.cc
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -659,7 +659,7 @@
 }
 
 void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
-  const FreeMetadata& free_metadata = free_rec.metadata;
+  const FreeBatch& free_batch = free_rec.free_batch;
   auto it = data_sources_.find(free_rec.data_source_instance_id);
   if (it == data_sources_.end()) {
     PERFETTO_LOG("Invalid data source in free record.");
@@ -675,14 +675,14 @@
 
   HeapTracker& heap_tracker = heap_tracker_it->second;
 
-  const FreePageEntry* entries = free_metadata.entries;
-  uint64_t num_entries = free_metadata.num_entries;
-  if (num_entries > kFreePageSize) {
+  const FreeBatchEntry* entries = free_batch.entries;
+  uint64_t num_entries = free_batch.num_entries;
+  if (num_entries > kFreeBatchSize) {
     PERFETTO_DFATAL("Malformed free page.");
     return;
   }
   for (size_t i = 0; i < num_entries; ++i) {
-    const FreePageEntry& entry = entries[i];
+    const FreeBatchEntry& entry = entries[i];
     heap_tracker.RecordFree(entry.addr, entry.sequence_number);
   }
 }
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 44f2e54..80d05eb 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -174,12 +174,12 @@
   std::unique_ptr<unwindstack::Regs> regs(
       CreateFromRawData(alloc_metadata->arch, alloc_metadata->register_data));
   if (regs == nullptr) {
+    PERFETTO_DLOG("Unable to construct unwindstack::Regs");
     unwindstack::FrameData frame_data{};
     frame_data.function_name = "ERROR READING REGISTERS";
     frame_data.map_name = "ERROR";
 
     out->frames.emplace_back(frame_data, "");
-    PERFETTO_DLOG("regs");
     return false;
   }
   uint8_t* stack = reinterpret_cast<uint8_t*>(msg->payload);
@@ -223,13 +223,12 @@
   }
 
   if (error_code != 0) {
-    PERFETTO_DLOG("Unwinding error %d", error_code);
+    PERFETTO_DLOG("Unwinding error %" PRIu8, error_code);
     unwindstack::FrameData frame_data{};
     frame_data.function_name = "ERROR " + std::to_string(error_code);
     frame_data.map_name = "ERROR";
 
     out->frames.emplace_back(frame_data, "");
-    PERFETTO_DLOG("unwinding failed %" PRIu8, error_code);
   }
 
   return true;
@@ -242,8 +241,8 @@
     PERFETTO_DFATAL("Disconnected unexpecter socket.");
     return;
   }
-  ClientData& socket_data = it->second;
-  DataSourceInstanceID ds_id = socket_data.data_source_instance_id;
+  ClientData& client_data = it->second;
+  DataSourceInstanceID ds_id = client_data.data_source_instance_id;
   client_data_.erase(it);
   delegate_->PostSocketDisconnected(ds_id, self->peer_pid());
 }
@@ -259,8 +258,8 @@
     return;
   }
 
-  ClientData& socket_data = it->second;
-  SharedRingBuffer& shmem = socket_data.shmem;
+  ClientData& client_data = it->second;
+  SharedRingBuffer& shmem = client_data.shmem;
   SharedRingBuffer::Buffer buf;
 
   for (;;) {
@@ -269,15 +268,15 @@
     buf = shmem.BeginRead();
     if (!buf)
       break;
-    HandleBuffer(&buf, &socket_data.metadata,
-                 socket_data.data_source_instance_id,
-                 socket_data.sock->peer_pid(), delegate_);
+    HandleBuffer(buf, &client_data.metadata,
+                 client_data.data_source_instance_id,
+                 client_data.sock->peer_pid(), delegate_);
     shmem.EndRead(std::move(buf));
   }
 }
 
 // static
-void UnwindingWorker::HandleBuffer(SharedRingBuffer::Buffer* buf,
+void UnwindingWorker::HandleBuffer(const SharedRingBuffer::Buffer& buf,
                                    UnwindingMetadata* unwinding_metadata,
                                    DataSourceInstanceID data_source_instance_id,
                                    pid_t peer_pid,
@@ -286,8 +285,7 @@
   // TODO(fmayer): standardise on char* or uint8_t*.
   // char* has stronger guarantees regarding aliasing.
   // see https://timsong-cpp.github.io/cppwp/n3337/basic.lval#10.8
-  if (!ReceiveWireMessage(reinterpret_cast<char*>(buf->data), buf->size,
-                          &msg)) {
+  if (!ReceiveWireMessage(reinterpret_cast<char*>(buf.data), buf.size, &msg)) {
     PERFETTO_DFATAL("Failed to receive wire message.");
     return;
   }
@@ -304,7 +302,7 @@
     rec.pid = peer_pid;
     rec.data_source_instance_id = data_source_instance_id;
     // We need to copy this, so we can return the memory to the shmem buffer.
-    memcpy(&rec.metadata, msg.free_header, sizeof(*msg.free_header));
+    memcpy(&rec.free_batch, msg.free_header, sizeof(*msg.free_header));
     delegate->PostFreeRecord(std::move(rec));
   } else {
     PERFETTO_DFATAL("Invalid record type.");
diff --git a/src/profiling/memory/unwinding.h b/src/profiling/memory/unwinding.h
index bcd37cd..af238db 100644
--- a/src/profiling/memory/unwinding.h
+++ b/src/profiling/memory/unwinding.h
@@ -31,7 +31,7 @@
 #include "perfetto/base/thread_task_runner.h"
 #include "perfetto/tracing/core/basic_types.h"
 #include "src/profiling/memory/bookkeeping.h"
-#include "src/profiling/memory/queue_messages.h"
+#include "src/profiling/memory/unwound_messages.h"
 #include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
@@ -166,7 +166,7 @@
 
  public:
   // static and public for testing/fuzzing
-  static void HandleBuffer(SharedRingBuffer::Buffer* buf,
+  static void HandleBuffer(const SharedRingBuffer::Buffer& buf,
                            UnwindingMetadata* unwinding_metadata,
                            DataSourceInstanceID data_source_instance_id,
                            pid_t peer_pid,
diff --git a/src/profiling/memory/unwinding_fuzzer.cc b/src/profiling/memory/unwinding_fuzzer.cc
index 20a9357..998d922 100644
--- a/src/profiling/memory/unwinding_fuzzer.cc
+++ b/src/profiling/memory/unwinding_fuzzer.cc
@@ -19,9 +19,9 @@
 
 #include "perfetto/base/utils.h"
 #include "perfetto/tracing/core/basic_types.h"
-#include "src/profiling/memory/queue_messages.h"
 #include "src/profiling/memory/shared_ring_buffer.h"
 #include "src/profiling/memory/unwinding.h"
+#include "src/profiling/memory/unwound_messages.h"
 
 namespace perfetto {
 namespace profiling {
@@ -43,7 +43,7 @@
                              base::OpenFile("/proc/self/mem", O_RDONLY));
 
   NopDelegate nop_delegate;
-  UnwindingWorker::HandleBuffer(&buf, &metadata, id, self_pid, &nop_delegate);
+  UnwindingWorker::HandleBuffer(buf, &metadata, id, self_pid, &nop_delegate);
   return 0;
 }
 
diff --git a/src/profiling/memory/unwinding_unittest.cc b/src/profiling/memory/unwinding_unittest.cc
index 8a8f357..9f03ecf 100644
--- a/src/profiling/memory/unwinding_unittest.cc
+++ b/src/profiling/memory/unwinding_unittest.cc
@@ -116,7 +116,6 @@
   return {std::move(payload), std::move(metadata)};
 }
 
-// TODO(fmayer): Investigate why this fails out of tree.
 TEST(UnwindingTest, DoUnwind) {
   base::ScopedFile proc_maps(base::OpenFile("/proc/self/maps", O_RDONLY));
   base::ScopedFile proc_mem(base::OpenFile("/proc/self/mem", O_RDONLY));
diff --git a/src/profiling/memory/queue_messages.h b/src/profiling/memory/unwound_messages.h
similarity index 83%
rename from src/profiling/memory/queue_messages.h
rename to src/profiling/memory/unwound_messages.h
index 1c15fe3..b3060e7 100644
--- a/src/profiling/memory/queue_messages.h
+++ b/src/profiling/memory/unwound_messages.h
@@ -14,16 +14,14 @@
  * limitations under the License.
  */
 
-#ifndef SRC_PROFILING_MEMORY_QUEUE_MESSAGES_H_
-#define SRC_PROFILING_MEMORY_QUEUE_MESSAGES_H_
+#ifndef SRC_PROFILING_MEMORY_UNWOUND_MESSAGES_H_
+#define SRC_PROFILING_MEMORY_UNWOUND_MESSAGES_H_
 
 #include <unwindstack/Maps.h>
 #include <unwindstack/Unwinder.h>
 
 #include "src/profiling/memory/wire_protocol.h"
 
-// TODO(fmayer): Find better places to put these structs.
-
 namespace perfetto {
 namespace profiling {
 
@@ -36,6 +34,7 @@
   std::string build_id;
 };
 
+// Single allocation with an unwound callstack.
 struct AllocRecord {
   pid_t pid;
   uint64_t data_source_instance_id;
@@ -43,13 +42,14 @@
   std::vector<FrameData> frames;
 };
 
+// Batch of deallocations.
 struct FreeRecord {
   pid_t pid;
   uint64_t data_source_instance_id;
-  FreeMetadata metadata;
+  FreeBatch free_batch;
 };
 
 }  // namespace profiling
 }  // namespace perfetto
 
-#endif  // SRC_PROFILING_MEMORY_QUEUE_MESSAGES_H_
+#endif  // SRC_PROFILING_MEMORY_UNWOUND_MESSAGES_H_
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
index 22065fc..8c71fcb 100644
--- a/src/profiling/memory/wire_protocol.cc
+++ b/src/profiling/memory/wire_protocol.cc
@@ -128,7 +128,7 @@
     }
     out->payload_size = static_cast<size_t>(end - buf);
   } else if (*record_type == RecordType::Free) {
-    if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end)) {
+    if (!ViewAndAdvance<FreeBatch>(&buf, &out->free_header, end)) {
       PERFETTO_DFATAL("Cannot read free header.");
       return false;
     }
diff --git a/src/profiling/memory/wire_protocol.h b/src/profiling/memory/wire_protocol.h
index 422a9d4..512a2c5 100644
--- a/src/profiling/memory/wire_protocol.h
+++ b/src/profiling/memory/wire_protocol.h
@@ -39,11 +39,18 @@
 
 namespace profiling {
 
+struct ClientConfiguration {
+  // On average, sample one allocation every interval bytes,
+  // If interval == 1, sample every allocation.
+  // Must be >= 1.
+  uint64_t interval;
+};
+
 // Types needed for the wire format used for communication between the client
 // and heapprofd. The basic format of a record is
 // record size (uint64_t) | record type (RecordType = uint64_t) | record
 // If record type is malloc, the record format is AllocMetdata | raw stack.
-// If the record type is free, the record is a sequence of FreePageEntry.
+// If the record type is free, the record is a sequence of FreeBatchEntry.
 
 // Use uint64_t to make sure the following data is aligned as 64bit is the
 // strongest alignment requirement.
@@ -70,7 +77,7 @@
   );
 // clang-format on
 
-constexpr size_t kFreePageSize = 1024;
+constexpr size_t kFreeBatchSize = 1024;
 
 enum class RecordType : uint64_t {
   Free = 0,
@@ -95,21 +102,16 @@
   unwindstack::ArchEnum arch;
 };
 
-struct FreePageEntry {
+struct FreeBatchEntry {
   uint64_t sequence_number;
   uint64_t addr;
 };
 
-struct ClientConfiguration {
-  // On average, sample one allocation every interval bytes,
-  // If interval == 1, sample every allocation.
-  // Must be >= 1.
-  uint64_t interval;
-};
-
-struct FreeMetadata {
+struct FreeBatch {
   uint64_t num_entries;
-  FreePageEntry entries[kFreePageSize];
+  FreeBatchEntry entries[kFreeBatchSize];
+
+  FreeBatch() { num_entries = 0; }
 };
 
 enum HandshakeFDs : size_t {
@@ -122,7 +124,7 @@
   RecordType record_type;
 
   AllocMetadata* alloc_header;
-  FreeMetadata* free_header;
+  FreeBatch* free_header;
 
   char* payload;
   size_t payload_size;
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
index 3be469d..189cba0 100644
--- a/src/profiling/memory/wire_protocol_unittest.cc
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -39,8 +39,8 @@
              0;
 }
 
-bool operator==(const FreeMetadata& one, const FreeMetadata& other);
-bool operator==(const FreeMetadata& one, const FreeMetadata& other) {
+bool operator==(const FreeBatch& one, const FreeBatch& other);
+bool operator==(const FreeBatch& one, const FreeBatch& other) {
   if (one.num_entries != other.num_entries)
     return false;
   for (size_t i = 0; i < one.num_entries; ++i) {
@@ -109,13 +109,13 @@
 TEST(WireProtocolTest, FreeMessage) {
   WireMessage msg = {};
   msg.record_type = RecordType::Free;
-  FreeMetadata metadata = {};
-  metadata.num_entries = kFreePageSize;
-  for (size_t i = 0; i < kFreePageSize; ++i) {
-    metadata.entries[i].sequence_number = 0x111111111111111;
-    metadata.entries[i].addr = 0x222222222222222;
+  FreeBatch batch = {};
+  batch.num_entries = kFreeBatchSize;
+  for (size_t i = 0; i < kFreeBatchSize; ++i) {
+    batch.entries[i].sequence_number = 0x111111111111111;
+    batch.entries[i].addr = 0x222222222222222;
   }
-  msg.free_header = &metadata;
+  msg.free_header = &batch;
 
   auto shmem_client = SharedRingBuffer::Create(kShmemSize);
   ASSERT_TRUE(shmem_client);