profiling: Add missing pieces to heapprofd.

* Allow several heap dumps using the same call graph tree.
* Add main loops for worker threads.
* Add queue between worker threads.
* Add client functions for sending the stack.

Change-Id: I5d66b6199ab72467e43335df257c08825c2d2228
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index 7a4e459..46a571d 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -23,47 +23,11 @@
 #include <vector>
 
 #include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 
-class SocketPool;
-
-class FreePage {
- public:
-  FreePage();
-
-  // Can be called from any thread. Must not hold mtx_.`
-  void Add(const uint64_t addr, SocketPool* pool);
-
- private:
-  // Needs to be called holding mtx_.
-  void Flush(SocketPool* pool);
-
-  std::vector<uint64_t> free_page_;
-  std::mutex mtx_;
-  size_t offset_;
-};
-
-class BorrowedSocket {
- public:
-  BorrowedSocket(const BorrowedSocket&) = delete;
-  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
-  BorrowedSocket(BorrowedSocket&& other) {
-    fd_ = std::move(other.fd_);
-    socket_pool_ = other.socket_pool_;
-    other.socket_pool_ = nullptr;
-  }
-
-  BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool);
-  int operator*();
-  int get();
-  void Close();
-  ~BorrowedSocket();
-
- private:
-  base::ScopedFile fd_;
-  SocketPool* socket_pool_ = nullptr;
-};
+class BorrowedSocket;
 
 class SocketPool {
  public:
@@ -74,13 +38,80 @@
 
  private:
   void Return(base::ScopedFile fd);
-  std::mutex mtx_;
+  std::mutex mutex_;
   std::condition_variable cv_;
   std::vector<base::ScopedFile> sockets_;
   size_t available_sockets_;
   size_t dead_sockets_ = 0;
 };
 
+// Socket borrowed from a SocketPool. Gets returned once it goes out of scope.
+class BorrowedSocket {
+ public:
+  BorrowedSocket(const BorrowedSocket&) = delete;
+  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
+  BorrowedSocket(BorrowedSocket&& other) noexcept {
+    fd_ = std::move(other.fd_);
+    socket_pool_ = other.socket_pool_;
+    other.socket_pool_ = nullptr;
+  }
+
+  BorrowedSocket(base::ScopedFile fd, SocketPool* socket_pool)
+      : fd_(std::move(fd)), socket_pool_(socket_pool) {}
+
+  ~BorrowedSocket() {
+    if (socket_pool_ != nullptr)
+      socket_pool_->Return(std::move(fd_));
+  }
+
+  int operator*() { return get(); }
+
+  int get() { return *fd_; }
+
+  void Close() { fd_.reset(); }
+
+ private:
+  base::ScopedFile fd_;
+  SocketPool* socket_pool_ = nullptr;
+};
+
+// Cache for frees that have been observed. It is infeasible to send every
+// free separately, so we batch and send the whole buffer once it is full.
+class FreePage {
+ public:
+  // Add address to buffer. Flush if necessary using a socket borrowed from
+  // pool.
+  // Can be called from any thread. Must not hold mutex_.`
+  void Add(const uint64_t addr, uint64_t sequence_number, SocketPool* pool);
+
+ private:
+  // Needs to be called holding mutex_.
+  void FlushLocked(SocketPool* pool);
+
+  FreeMetadata free_page_;
+  std::mutex mutex_;
+  size_t offset_ = 0;
+};
+
+const char* GetThreadStackBase();
+
+// This is created and owned by the malloc hooks.
+class Client {
+ public:
+  Client(std::vector<base::ScopedFile> sockets);
+  Client(const std::string& sock_name, size_t conns);
+  void RecordMalloc(uint64_t alloc_size, uint64_t alloc_address);
+  void RecordFree(uint64_t alloc_address);
+
+ private:
+  const char* GetStackBase();
+
+  SocketPool socket_pool_;
+  FreePage free_page_;
+  const char* main_thread_stack_base_ = nullptr;
+  std::atomic<uint64_t> sequence_number_{0};
+};
+
 }  // namespace perfetto
 
 #endif  // SRC_PROFILING_MEMORY_CLIENT_H_