Use the SharedRingBuffer for sending payload from client to service.

This simplifies the code as we no longer have to deal with the
complexity of having to quickly drain the client socket in order to
unblock the client anymore.

We now use standard base::TaskRunner for our threads, including the
unwinding threads, as we no longer need to provide backpressure.

Remove code providing support for multiple DataSources about the same
process. We now only accept one DataSource per process, and we pick the
earliest we receive from traced. This simplifies the matching between
DataSource and process.

Change-Id: I9f91f7d4993a37eb8e92a43108b1cd8883b229c6
Bug: 126724929
Bug: 125891203
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
index 43ae7de..22065fc 100644
--- a/src/profiling/memory/wire_protocol.cc
+++ b/src/profiling/memory/wire_protocol.cc
@@ -35,60 +35,92 @@
   *ptr += sizeof(T);
   return true;
 }
+
+// We need this to prevent crashes due to FORTIFY_SOURCE.
+void UnsafeMemcpy(char* dest, const char* src, size_t n)
+    __attribute__((no_sanitize("address"))) {
+  for (size_t i = 0; i < n; ++i) {
+    dest[i] = src[i];
+  }
+}
 }  // namespace
 
-bool SendWireMessage(base::UnixSocketRaw* sock, const WireMessage& msg) {
+bool SendWireMessage(SharedRingBuffer* shmem, const WireMessage& msg) {
   uint64_t total_size;
-  struct iovec iovecs[4] = {};
+  struct iovec iovecs[3] = {};
   // TODO(fmayer): Maye pack these two.
-  iovecs[0].iov_base = &total_size;
-  iovecs[0].iov_len = sizeof(total_size);
-  iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
-  iovecs[1].iov_len = sizeof(msg.record_type);
+  iovecs[0].iov_base = const_cast<RecordType*>(&msg.record_type);
+  iovecs[0].iov_len = sizeof(msg.record_type);
   if (msg.alloc_header) {
     PERFETTO_DCHECK(msg.record_type == RecordType::Malloc);
-    iovecs[2].iov_base = msg.alloc_header;
-    iovecs[2].iov_len = sizeof(*msg.alloc_header);
+    iovecs[1].iov_base = msg.alloc_header;
+    iovecs[1].iov_len = sizeof(*msg.alloc_header);
   } else if (msg.free_header) {
     PERFETTO_DCHECK(msg.record_type == RecordType::Free);
-    iovecs[2].iov_base = msg.free_header;
-    iovecs[2].iov_len = sizeof(*msg.free_header);
+    iovecs[1].iov_base = msg.free_header;
+    iovecs[1].iov_len = sizeof(*msg.free_header);
   } else {
     PERFETTO_DFATAL("Neither alloc_header nor free_header set.");
     return false;
   }
 
-  iovecs[3].iov_base = msg.payload;
-  iovecs[3].iov_len = msg.payload_size;
+  iovecs[2].iov_base = msg.payload;
+  iovecs[2].iov_len = msg.payload_size;
 
   struct msghdr hdr = {};
   hdr.msg_iov = iovecs;
   if (msg.payload) {
     hdr.msg_iovlen = base::ArraySize(iovecs);
-    total_size = iovecs[1].iov_len + iovecs[2].iov_len + iovecs[3].iov_len;
+    total_size = iovecs[0].iov_len + iovecs[1].iov_len + iovecs[2].iov_len;
   } else {
     // If we are not sending payload, just ignore that iovec.
     hdr.msg_iovlen = base::ArraySize(iovecs) - 1;
-    total_size = iovecs[1].iov_len + iovecs[2].iov_len;
+    total_size = iovecs[0].iov_len + iovecs[1].iov_len;
   }
 
-  ssize_t sent = sock->SendMsgAll(&hdr);
-  return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
+  SharedRingBuffer::Buffer buf;
+  {
+    ScopedSpinlock lock = shmem->AcquireLock(ScopedSpinlock::Mode::Try);
+    if (!lock.locked()) {
+      PERFETTO_DLOG("Failed to acquire spinlock.");
+      return false;
+    }
+    buf = shmem->BeginWrite(lock, total_size);
+  }
+  if (!buf) {
+    PERFETTO_DFATAL("Buffer overflow.");
+    shmem->EndWrite(std::move(buf));
+    return false;
+  }
+
+  size_t offset = 0;
+  for (size_t i = 0; i < hdr.msg_iovlen; ++i) {
+    UnsafeMemcpy(reinterpret_cast<char*>(buf.data + offset),
+                 reinterpret_cast<const char*>(hdr.msg_iov[i].iov_base),
+                 hdr.msg_iov[i].iov_len);
+    offset += hdr.msg_iov[i].iov_len;
+  }
+  shmem->EndWrite(std::move(buf));
+  return true;
 }
 
 bool ReceiveWireMessage(char* buf, size_t size, WireMessage* out) {
   RecordType* record_type;
   char* end = buf + size;
-  if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
+  if (!ViewAndAdvance<RecordType>(&buf, &record_type, end)) {
+    PERFETTO_DFATAL("Cannot read record type.");
     return false;
+  }
 
   out->payload = nullptr;
   out->payload_size = 0;
   out->record_type = *record_type;
 
   if (*record_type == RecordType::Malloc) {
-    if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+    if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end)) {
+      PERFETTO_DFATAL("Cannot read alloc header.");
       return false;
+    }
     out->payload = buf;
     if (buf > end) {
       PERFETTO_DFATAL("Buffer overflowed");
@@ -96,8 +128,10 @@
     }
     out->payload_size = static_cast<size_t>(end - buf);
   } else if (*record_type == RecordType::Free) {
-    if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+    if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end)) {
+      PERFETTO_DFATAL("Cannot read free header.");
       return false;
+    }
   } else {
     PERFETTO_DFATAL("Invalid record type.");
     return false;