Use the SharedRingBuffer for sending payload from client to service.

This simplifies the code as we no longer have to deal with the
complexity of having to quickly drain the client socket in order to
unblock the client anymore.

We now use standard base::TaskRunner for our threads, including the
unwinding threads, as we no longer need to provide backpressure.

Remove code providing support for multiple DataSources about the same
process. We now only accept one DataSource per process, and we pick the
earliest we receive from traced. This simplifies the matching between
DataSource and process.

Change-Id: I9f91f7d4993a37eb8e92a43108b1cd8883b229c6
Bug: 126724929
Bug: 125891203
diff --git a/src/profiling/memory/client.h b/src/profiling/memory/client.h
index 5bcfff5..adf9e54 100644
--- a/src/profiling/memory/client.h
+++ b/src/profiling/memory/client.h
@@ -26,80 +26,29 @@
 
 #include "perfetto/base/unix_socket.h"
 #include "src/profiling/memory/sampler.h"
+#include "src/profiling/memory/shared_ring_buffer.h"
 #include "src/profiling/memory/wire_protocol.h"
 
 namespace perfetto {
 namespace profiling {
 
-class BorrowedSocket;
-
-class SocketPool {
- public:
-  friend class BorrowedSocket;
-  SocketPool(std::vector<base::UnixSocketRaw> sockets);
-
-  BorrowedSocket Borrow();
-  void Shutdown();
-
- private:
-  bool shutdown_ = false;
-
-  void Return(base::UnixSocketRaw);
-  std::timed_mutex mutex_;
-  std::condition_variable_any cv_;
-  std::vector<base::UnixSocketRaw> sockets_;
-  size_t available_sockets_;
-  size_t dead_sockets_ = 0;
-};
-
-// Socket borrowed from a SocketPool. Gets returned once it goes out of scope.
-class BorrowedSocket {
- public:
-  BorrowedSocket(const BorrowedSocket&) = delete;
-  BorrowedSocket& operator=(const BorrowedSocket&) = delete;
-  BorrowedSocket(BorrowedSocket&& other) noexcept
-      : sock_(std::move(other.sock_)), socket_pool_(other.socket_pool_) {
-    other.socket_pool_ = nullptr;
-  }
-
-  BorrowedSocket(base::UnixSocketRaw sock, SocketPool* socket_pool)
-      : sock_(std::move(sock)), socket_pool_(socket_pool) {}
-
-  ~BorrowedSocket() {
-    if (socket_pool_ != nullptr)
-      socket_pool_->Return(std::move(sock_));
-  }
-
-  base::UnixSocketRaw* operator->() { return &sock_; }
-  base::UnixSocketRaw* get() { return &sock_; }
-  void Shutdown() { sock_.Shutdown(); }
-  explicit operator bool() const { return !!sock_; }
-
- private:
-  base::UnixSocketRaw sock_;
-  SocketPool* socket_pool_ = nullptr;
-};
+class Client;
 
 // Cache for frees that have been observed. It is infeasible to send every
 // free separately, so we batch and send the whole buffer once it is full.
 class FreePage {
  public:
-  FreePage(uint64_t client_generation) {
-    free_page_.client_generation = client_generation;
-  }
+  FreePage() { free_page_.num_entries = 0; }
 
-  // Add address to buffer. Flush if necessary using a socket borrowed from
-  // pool.
+  // Add address to buffer. Flush if necessary.
   // Can be called from any thread. Must not hold mutex_.
-  bool Add(const uint64_t addr, uint64_t sequence_number, SocketPool* pool);
+  bool Add(const uint64_t addr, uint64_t sequence_number, Client* client);
 
  private:
-  // Needs to be called holding mutex_.
-  bool FlushLocked(SocketPool* pool);
-
+  // TODO(fmayer): Sort out naming. It's confusing data FreePage has a member
+  // called free_page_ that is of type FreeMetadata.
   FreeMetadata free_page_;
   std::timed_mutex mutex_;
-  size_t offset_ = 0;
 };
 
 const char* GetThreadStackBase();
@@ -115,13 +64,14 @@
 // the caller needs to synchronize calls behind a mutex or similar.
 class Client {
  public:
-  Client(std::vector<base::UnixSocketRaw> sockets);
-  Client(const std::string& sock_name, size_t conns);
+  Client(base::Optional<base::UnixSocketRaw> sock);
+  Client(const std::string& sock_name);
   bool RecordMalloc(uint64_t alloc_size,
                     uint64_t total_size,
                     uint64_t alloc_address);
   bool RecordFree(uint64_t alloc_address);
   void Shutdown();
+  bool FlushFrees(FreeMetadata* free_metadata);
 
   // Returns the number of bytes to assign to an allocation with the given
   // |alloc_size|, based on the current sampling rate. A return value of zero
@@ -141,9 +91,6 @@
  private:
   const char* GetStackBase();
 
-  static std::atomic<uint64_t> max_generation_;
-  const uint64_t generation_;
-
   // TODO(rsavitski): used to check if the client is completely initialized
   // after construction. The reads in RecordFree & GetSampleSizeLocked are no
   // longer necessary (was an optimization to not do redundant work after
@@ -153,10 +100,11 @@
   ClientConfiguration client_config_;
   // sampler_ operations are not thread-safe.
   Sampler sampler_;
-  SocketPool socket_pool_;
+  base::UnixSocketRaw sock_;
   FreePage free_page_;
   const char* main_thread_stack_base_ = nullptr;
   std::atomic<uint64_t> sequence_number_{0};
+  SharedRingBuffer shmem_;
 };
 
 }  // namespace profiling