Add SharedRingBuffer

Change-Id: Ief02a8f8a10d5d0a81cca03ce2864fb4cc744ba8
diff --git a/Android.bp b/Android.bp
index d22e64c..4b1d646 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2609,6 +2609,8 @@
     "src/profiling/memory/record_reader_unittest.cc",
     "src/profiling/memory/sampler.cc",
     "src/profiling/memory/sampler_unittest.cc",
+    "src/profiling/memory/shared_ring_buffer.cc",
+    "src/profiling/memory/shared_ring_buffer_unittest.cc",
     "src/profiling/memory/socket_listener.cc",
     "src/profiling/memory/socket_listener_unittest.cc",
     "src/profiling/memory/system_property.cc",
diff --git a/BUILD.gn b/BUILD.gn
index 2d3a59b..f978826 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -63,6 +63,7 @@
       deps += [
         ":perfetto_benchmarks",
         ":trace_processor",
+        "src/profiling/memory:ring_buffer",
         "src/tracing:consumer_api_test",
         "test/configs",
         "tools/ftrace_proto_gen:ftrace_proto_gen",
@@ -119,6 +120,7 @@
     deps += [
       "src/ipc:unittests",
       "src/perfetto_cmd:unittests",
+      "src/profiling/memory:ring_buffer_unittests",
       "src/traced/probes:unittests",
       "src/traced/probes/filesystem:unittests",
       "src/traced/probes/ftrace:unittests",
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 3268b4b..fc5ab08 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -48,6 +48,30 @@
   ]
 }
 
+source_set("ring_buffer") {
+  deps = [
+    "../../../gn:default_deps",
+    "../../base",
+  ]
+  sources = [
+    "shared_ring_buffer.cc",
+    "shared_ring_buffer.ch",
+  ]
+}
+
+source_set("ring_buffer_unittests") {
+  testonly = true
+  deps = [
+    ":ring_buffer",
+    "../../../gn:default_deps",
+    "../../../gn:gtest_deps",
+    "../../base",
+  ]
+  sources = [
+    "shared_ring_buffer_unittest.cc",
+  ]
+}
+
 source_set("daemon") {
   public_configs = [ "../../../buildtools:libunwindstack_config" ]
   deps = [
diff --git a/src/profiling/memory/shared_ring_buffer.cc b/src/profiling/memory/shared_ring_buffer.cc
new file mode 100644
index 0000000..ebce527
--- /dev/null
+++ b/src/profiling/memory/shared_ring_buffer.cc
@@ -0,0 +1,290 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/shared_ring_buffer.h"
+
+#include <atomic>
+#include <type_traits>
+
+#include <inttypes.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "perfetto/base/build_config.h"
+#include "perfetto/base/scoped_file.h"
+#include "perfetto/base/temp_file.h"
+
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+#include <linux/memfd.h>
+#include <sys/syscall.h>
+#endif
+
+namespace perfetto {
+namespace profiling {
+
+namespace {
+
+constexpr auto kMetaPageSize = base::kPageSize;
+constexpr auto kAlignment = 8;  // 64 bits to use aligned memcpy().
+constexpr auto kHeaderSize = kAlignment;
+constexpr auto kGuardSize = base::kPageSize * 1024 * 16;  // 64 MB.
+
+}  // namespace
+
+void ScopedSpinlock::LockSlow(Mode mode) {
+  // Slowpath.
+  for (size_t attempt = 0; mode == Mode::Blocking || attempt < 1024 * 10;
+       attempt++) {
+    if (!lock_->load(std::memory_order_relaxed) &&
+        PERFETTO_LIKELY(!lock_->exchange(true, std::memory_order_acquire))) {
+      locked_ = true;
+      return;
+    }
+    if (attempt && attempt % 1024 == 0)
+      usleep(1000);
+  }
+}
+
+ScopedSpinlock::ScopedSpinlock(ScopedSpinlock&& other) noexcept
+    : lock_(other.lock_), locked_(other.locked_) {
+  other.locked_ = false;
+}
+
+ScopedSpinlock& ScopedSpinlock::operator=(ScopedSpinlock&& other) {
+  if (this != &other) {
+    this->~ScopedSpinlock();
+    new (this) ScopedSpinlock(std::move(other));
+  }
+  return *this;
+}
+
+ScopedSpinlock::~ScopedSpinlock() {
+  Unlock();
+}
+
+SharedRingBuffer::SharedRingBuffer(CreateFlag, size_t size) {
+  size_t size_with_meta = size + kMetaPageSize;
+  // TODO(primiano): this is copy/pasted from posix_shared_memory.cc . Refactor.
+  base::ScopedFile fd;
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+  bool is_memfd = false;
+  fd.reset(static_cast<int>(syscall(__NR_memfd_create, "heaprofd_ringbuf",
+                                    MFD_CLOEXEC | MFD_ALLOW_SEALING)));
+  is_memfd = !!fd;
+
+  if (!fd) {
+    // TODO: if this fails on Android we should fall back on ashmem.
+    PERFETTO_DPLOG("memfd_create() failed");
+  }
+#endif
+
+  if (!fd)
+    fd = base::TempFile::CreateUnlinked().ReleaseFD();
+
+  PERFETTO_CHECK(fd);
+  int res = ftruncate(fd.get(), static_cast<off_t>(size_with_meta));
+  PERFETTO_CHECK(res == 0);
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+  if (is_memfd) {
+    res = fcntl(*fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_SEAL);
+    PERFETTO_DCHECK(res == 0);
+  }
+#endif
+  Initialize(std::move(fd));
+}
+
+SharedRingBuffer::~SharedRingBuffer() {
+  static_assert(std::is_trivially_constructible<MetadataPage>::value,
+                "MetadataPage must be trivially constructible");
+  static_assert(std::is_trivially_destructible<MetadataPage>::value,
+                "MetadataPage must be trivially destructible");
+
+  if (is_valid()) {
+    size_t outer_size = kMetaPageSize + size_ * 2 + kGuardSize;
+    munmap(meta_, outer_size);
+  }
+}
+
+void SharedRingBuffer::Initialize(base::ScopedFile mem_fd) {
+  struct stat stat_buf = {};
+  int res = fstat(*mem_fd, &stat_buf);
+  PERFETTO_CHECK(res == 0 && stat_buf.st_size > 0);
+  auto size_with_meta = static_cast<size_t>(stat_buf.st_size);
+  auto size = size_with_meta - kMetaPageSize;
+
+  // |size_with_meta| must be a power of two number of pages + 1 page (for
+  // metadata).
+  if (size_with_meta < 2 * base::kPageSize || size % base::kPageSize ||
+      (size & (size - 1))) {
+    PERFETTO_ELOG("SharedRingBuffer size is invalid (%zu)", size_with_meta);
+    return;
+  }
+
+  // First of all reserve the whole virtual region to fit the buffer twice
+  // + metadata page + red zone at the end.
+  size_t outer_size = kMetaPageSize + size * 2 + kGuardSize;
+  uint8_t* region = reinterpret_cast<uint8_t*>(
+      mmap(nullptr, outer_size, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
+  if (region == MAP_FAILED) {
+    PERFETTO_PLOG("mmap(PROT_NONE) failed");
+    return;
+  }
+
+  // Map first the whole buffer (including the initial metadata page) @ off=0.
+  void* reg1 = mmap(region, size_with_meta, PROT_READ | PROT_WRITE,
+                    MAP_SHARED | MAP_FIXED, *mem_fd, 0);
+
+  // Then map again the buffer, skipping the metadata page. The final result is:
+  // [ METADATA ] [ RING BUFFER SHMEM ] [ RING BUFFER SHMEM ]
+  void* reg2 = mmap(region + size_with_meta, size, PROT_READ | PROT_WRITE,
+                    MAP_SHARED | MAP_FIXED, *mem_fd,
+                    /*offset=*/kMetaPageSize);
+
+  if (reg1 != region || reg2 != region + size_with_meta) {
+    PERFETTO_PLOG("mmap(MAP_SHARED) failed");
+    munmap(region, outer_size);
+    return;
+  }
+  size_ = size;
+  meta_ = new (region) MetadataPage();
+  mem_ = region + kMetaPageSize;
+  mem_fd_ = std::move(mem_fd);
+}
+
+SharedRingBuffer::Buffer SharedRingBuffer::BeginWrite(
+    const ScopedSpinlock& spinlock,
+    size_t size) {
+  PERFETTO_DCHECK(spinlock.locked());
+  Buffer result;
+
+  if (IsCorrupt())
+    return result;
+
+  const uint64_t size_with_header =
+      base::AlignUp<kAlignment>(size + kHeaderSize);
+  if (size_with_header > write_avail(spinlock)) {
+    meta_->num_writes_failed++;
+    return result;
+  }
+
+  uint8_t* wr_ptr = at(meta_->write_pos);
+
+  result.size = size;
+  result.data = wr_ptr + kHeaderSize;
+  meta_->write_pos += size_with_header;
+  meta_->bytes_written += size;
+  meta_->num_writes_succeeded++;
+  // By making this a release store, we can save grabbing the spinlock in
+  // EndWrite.
+  reinterpret_cast<std::atomic<uint32_t>*>(wr_ptr)->store(
+      0, std::memory_order_release);
+  return result;
+}
+
+void SharedRingBuffer::EndWrite(Buffer buf) {
+  uint8_t* wr_ptr = buf.data - kHeaderSize;
+  PERFETTO_DCHECK(reinterpret_cast<uintptr_t>(wr_ptr) % kAlignment == 0);
+  reinterpret_cast<std::atomic<uint32_t>*>(wr_ptr)->store(
+      static_cast<uint32_t>(buf.size), std::memory_order_release);
+}
+
+SharedRingBuffer::Buffer SharedRingBuffer::BeginRead() {
+  ScopedSpinlock spinlock(&meta_->spinlock, ScopedSpinlock::Mode::Blocking);
+
+  if (IsCorrupt()) {
+    meta_->num_reads_failed++;
+    return Buffer();
+  }
+
+  uint64_t avail_read = read_avail(spinlock);
+
+  if (avail_read < kHeaderSize)
+    return Buffer();  // No data
+
+  uint8_t* rd_ptr = at(meta_->read_pos);
+  PERFETTO_DCHECK(reinterpret_cast<uintptr_t>(rd_ptr) % kAlignment == 0);
+  const size_t size = reinterpret_cast<std::atomic<uint32_t>*>(rd_ptr)->load(
+      std::memory_order_acquire);
+  if (size == 0)
+    return Buffer();
+  const size_t size_with_header = base::AlignUp<kAlignment>(size + kHeaderSize);
+
+  if (size_with_header > avail_read) {
+    PERFETTO_ELOG(
+        "Corrupted header detected, size=%zu"
+        ", read_avail=%zu, rd=%" PRIu64 ", wr=%" PRIu64,
+        size, read_avail(spinlock), meta_->read_pos, meta_->write_pos);
+    meta_->num_reads_failed++;
+    return Buffer();
+  }
+
+  rd_ptr += kHeaderSize;
+  PERFETTO_DCHECK(reinterpret_cast<uintptr_t>(rd_ptr) % kAlignment == 0);
+  return Buffer(rd_ptr, size);
+}
+
+void SharedRingBuffer::EndRead(Buffer buf) {
+  if (!buf)
+    return;
+  ScopedSpinlock spinlock(&meta_->spinlock, ScopedSpinlock::Mode::Blocking);
+  size_t size_with_header = base::AlignUp<kAlignment>(buf.size + kHeaderSize);
+  meta_->read_pos += size_with_header;
+}
+
+bool SharedRingBuffer::IsCorrupt() {
+  if (meta_->write_pos < meta_->read_pos ||
+      meta_->write_pos - meta_->read_pos > size_ ||
+      meta_->write_pos % kAlignment || meta_->read_pos % kAlignment) {
+    PERFETTO_ELOG("Ring buffer corrupted, rd=%" PRIu64 ", wr=%" PRIu64
+                  ", size=%zu",
+                  meta_->read_pos, meta_->write_pos, size_);
+    return true;
+  }
+  return false;
+}
+
+SharedRingBuffer::SharedRingBuffer(SharedRingBuffer&& other) noexcept {
+  *this = std::move(other);
+}
+
+SharedRingBuffer& SharedRingBuffer::operator=(SharedRingBuffer&& other) {
+  mem_fd_ = std::move(other.mem_fd_);
+  std::tie(meta_, mem_, size_) = std::tie(other.meta_, other.mem_, other.size_);
+  std::tie(other.meta_, other.mem_, other.size_) =
+      std::make_tuple(nullptr, nullptr, 0);
+  return *this;
+}
+
+// static
+base::Optional<SharedRingBuffer> SharedRingBuffer::Create(size_t size) {
+  auto buf = SharedRingBuffer(CreateFlag(), size);
+  if (!buf.is_valid())
+    return base::nullopt;
+  return base::make_optional(std::move(buf));
+}
+
+// static
+base::Optional<SharedRingBuffer> SharedRingBuffer::Attach(
+    base::ScopedFile mem_fd) {
+  auto buf = SharedRingBuffer(AttachFlag(), std::move(mem_fd));
+  if (!buf.is_valid())
+    return base::nullopt;
+  return base::make_optional(std::move(buf));
+}
+
+}  // namespace profiling
+}  // namespace perfetto
diff --git a/src/profiling/memory/shared_ring_buffer.h b/src/profiling/memory/shared_ring_buffer.h
new file mode 100644
index 0000000..a865fe3
--- /dev/null
+++ b/src/profiling/memory/shared_ring_buffer.h
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
+#define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
+
+#include "perfetto/base/optional.h"
+#include "perfetto/base/unix_socket.h"
+#include "perfetto/base/utils.h"
+
+#include <atomic>
+#include <map>
+#include <memory>
+
+#include <stdint.h>
+
+namespace perfetto {
+namespace profiling {
+
+class ScopedSpinlock {
+ public:
+  enum class Mode { Try, Blocking };
+
+  ScopedSpinlock(std::atomic<bool>* lock, Mode mode) : lock_(lock) {
+    if (PERFETTO_LIKELY(!lock_->exchange(true, std::memory_order_acquire))) {
+      locked_ = true;
+      return;
+    }
+    LockSlow(mode);
+  }
+
+  ScopedSpinlock(const ScopedSpinlock&) = delete;
+  ScopedSpinlock& operator=(const ScopedSpinlock&) = delete;
+
+  ScopedSpinlock(ScopedSpinlock&&) noexcept;
+  ScopedSpinlock& operator=(ScopedSpinlock&&);
+
+  ~ScopedSpinlock();
+
+  void Unlock() {
+    if (locked_) {
+      PERFETTO_DCHECK(lock_->load());
+      lock_->store(false, std::memory_order_release);
+    }
+    locked_ = false;
+  }
+
+  bool locked() const { return locked_; }
+
+ private:
+  void LockSlow(Mode mode);
+  std::atomic<bool>* lock_;
+  bool locked_ = false;
+};
+
+// A concurrent, multi-writer single-reader ring buffer FIFO, based on a
+// circular buffer over shared memory. It has similar semantics to a SEQ_PACKET
+// + O_NONBLOCK socket, specifically:
+//
+// - Writes are atomic, data is either written fully in the buffer or not.
+// - New writes are discarded if the buffer is full.
+// - If a write succeeds, the reader is guaranteed to see the whole buffer.
+// - Reads are atomic, no fragmentation.
+// - The reader sees writes in write order (% discarding).
+//
+// This class assumes that reader and write trust each other. Don't use in
+// untrusted contexts.
+//
+// TODO:
+// - Write a benchmark.
+// - Make the stats ifdef-able.
+class SharedRingBuffer {
+ public:
+  class Buffer {
+   public:
+    Buffer() {}
+    Buffer(uint8_t* d, size_t s) : data(d), size(s) {}
+
+    Buffer(const Buffer&) = delete;
+    Buffer& operator=(const Buffer&) = delete;
+
+    Buffer(Buffer&&) = default;
+    Buffer& operator=(Buffer&&) = default;
+
+    operator bool() const { return data != nullptr; }
+
+    uint8_t* data = nullptr;
+    size_t size = 0;
+  };
+
+  static base::Optional<SharedRingBuffer> Create(size_t);
+  static base::Optional<SharedRingBuffer> Attach(base::ScopedFile);
+
+  ~SharedRingBuffer();
+  SharedRingBuffer(SharedRingBuffer&&) noexcept;
+  SharedRingBuffer& operator=(SharedRingBuffer&&);
+
+  bool is_valid() const { return !!mem_; }
+  size_t size() const { return size_; }
+  int fd() const { return *mem_fd_; }
+
+  Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size);
+  void EndWrite(Buffer buf);
+
+  Buffer BeginRead();
+  void EndRead(Buffer);
+
+  // This is used by the caller to be able to hold the SpinLock after
+  // BeginWrite has returned. This is so that additional bookkeeping can be
+  // done under the lock. This will be used to increment the sequence_number.
+  ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) {
+    return ScopedSpinlock(&meta_->spinlock, mode);
+  }
+
+ private:
+  struct alignas(base::kPageSize) MetadataPage {
+    std::atomic<bool> spinlock;
+    uint64_t read_pos;
+    uint64_t write_pos;
+
+    // stats, for debugging only.
+    std::atomic<uint64_t> failed_spinlocks;
+    std::atomic<uint64_t> bytes_written;
+    std::atomic<uint64_t> num_writes_succeeded;
+    std::atomic<uint64_t> num_writes_failed;
+    std::atomic<uint64_t> num_reads_failed;
+  };
+
+  struct CreateFlag {};
+  struct AttachFlag {};
+  SharedRingBuffer(const SharedRingBuffer&) = delete;
+  SharedRingBuffer& operator=(const SharedRingBuffer&) = delete;
+  SharedRingBuffer(CreateFlag, size_t size);
+  SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) {
+    Initialize(std::move(mem_fd));
+  }
+
+  void Initialize(base::ScopedFile mem_fd);
+  bool IsCorrupt();
+
+  inline size_t read_avail(const ScopedSpinlock& lock) {
+    PERFETTO_DCHECK(lock.locked());
+    PERFETTO_DCHECK(meta_->write_pos >= meta_->read_pos);
+    auto res = static_cast<size_t>(meta_->write_pos - meta_->read_pos);
+    PERFETTO_DCHECK(res <= size_);
+    return res;
+  }
+
+  inline size_t write_avail(const ScopedSpinlock& spinlock) {
+    return size_ - read_avail(spinlock);
+  }
+
+  inline uint8_t* at(uint64_t pos) { return mem_ + (pos & (size_ - 1)); }
+
+  base::ScopedFile mem_fd_;
+  MetadataPage* meta_ = nullptr;  // Start of the mmaped region.
+  uint8_t* mem_ = nullptr;  // Start of the contents (i.e. meta_ + kPageSize).
+
+  // Size of the ring buffer contents, without including metadata or the 2nd
+  // mmap.
+  size_t size_ = 0;
+
+  // Remember to update the move ctor when adding new fields.
+};
+
+}  // namespace profiling
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
diff --git a/src/profiling/memory/shared_ring_buffer_unittest.cc b/src/profiling/memory/shared_ring_buffer_unittest.cc
new file mode 100644
index 0000000..d028c5f
--- /dev/null
+++ b/src/profiling/memory/shared_ring_buffer_unittest.cc
@@ -0,0 +1,233 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/shared_ring_buffer.h"
+
+#include <array>
+#include <mutex>
+#include <random>
+#include <thread>
+#include <unordered_map>
+
+#include "gtest/gtest.h"
+#include "perfetto/base/optional.h"
+
+namespace perfetto {
+namespace profiling {
+namespace {
+
+std::string ToString(const SharedRingBuffer::Buffer& buf_and_size) {
+  return std::string(reinterpret_cast<const char*>(&buf_and_size.data[0]),
+                     buf_and_size.size);
+}
+
+bool TryWrite(SharedRingBuffer* wr, const char* src, size_t size) {
+  SharedRingBuffer::Buffer buf;
+  {
+    auto lock = wr->AcquireLock(ScopedSpinlock::Mode::Try);
+    if (!lock.locked())
+      return false;
+    buf = wr->BeginWrite(lock, size);
+  }
+  if (!buf)
+    return false;
+  memcpy(buf.data, src, size);
+  wr->EndWrite(std::move(buf));
+  return true;
+}
+
+void StructuredTest(SharedRingBuffer* wr, SharedRingBuffer* rd) {
+  ASSERT_TRUE(wr);
+  ASSERT_TRUE(wr->is_valid());
+  ASSERT_TRUE(wr->size() == rd->size());
+  const size_t buf_size = wr->size();
+
+  // Test small writes.
+  ASSERT_TRUE(TryWrite(wr, "foo", 4));
+  ASSERT_TRUE(TryWrite(wr, "bar", 4));
+
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(buf_and_size.size, 4);
+    ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "foo");
+    rd->EndRead(std::move(buf_and_size));
+  }
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(buf_and_size.size, 4);
+    ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "bar");
+    rd->EndRead(std::move(buf_and_size));
+  }
+
+  for (int i = 0; i < 3; i++) {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(buf_and_size.data, nullptr);
+    ASSERT_EQ(buf_and_size.size, 0);
+  }
+
+  // Test extremely large writes (fill the buffer)
+  for (int i = 0; i < 3; i++) {
+    // TryWrite precisely |buf_size| bytes (minus the size header itself).
+    std::string data(buf_size - sizeof(uint64_t), '.' + static_cast<char>(i));
+    ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
+    ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
+    ASSERT_FALSE(TryWrite(wr, "?", 1));
+
+    // And read it back
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(ToString(buf_and_size), data);
+    rd->EndRead(std::move(buf_and_size));
+  }
+
+  // Test large writes that wrap.
+  std::string data(buf_size / 4 * 3 - sizeof(uint64_t), '!');
+  ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
+  ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(ToString(buf_and_size), data);
+    rd->EndRead(std::move(buf_and_size));
+  }
+  data = std::string(base::kPageSize - sizeof(uint64_t), '#');
+  for (int i = 0; i < 4; i++)
+    ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
+
+  for (int i = 0; i < 4; i++) {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(buf_and_size.size, data.size());
+    ASSERT_EQ(ToString(buf_and_size), data);
+    rd->EndRead(std::move(buf_and_size));
+  }
+
+  // Test misaligned writes.
+  ASSERT_TRUE(TryWrite(wr, "1", 1));
+  ASSERT_TRUE(TryWrite(wr, "22", 2));
+  ASSERT_TRUE(TryWrite(wr, "333", 3));
+  ASSERT_TRUE(TryWrite(wr, "55555", 5));
+  ASSERT_TRUE(TryWrite(wr, "7777777", 7));
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(ToString(buf_and_size), "1");
+    rd->EndRead(std::move(buf_and_size));
+  }
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(ToString(buf_and_size), "22");
+    rd->EndRead(std::move(buf_and_size));
+  }
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(ToString(buf_and_size), "333");
+    rd->EndRead(std::move(buf_and_size));
+  }
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(ToString(buf_and_size), "55555");
+    rd->EndRead(std::move(buf_and_size));
+  }
+  {
+    auto buf_and_size = rd->BeginRead();
+    ASSERT_EQ(ToString(buf_and_size), "7777777");
+    rd->EndRead(std::move(buf_and_size));
+  }
+}
+
+TEST(SharedRingBufferTest, SingleThreadSameInstance) {
+  constexpr auto kBufSize = base::kPageSize * 4;
+  base::Optional<SharedRingBuffer> buf = SharedRingBuffer::Create(kBufSize);
+  StructuredTest(&*buf, &*buf);
+}
+
+TEST(SharedRingBufferTest, SingleThreadAttach) {
+  constexpr auto kBufSize = base::kPageSize * 4;
+  base::Optional<SharedRingBuffer> buf1 = SharedRingBuffer::Create(kBufSize);
+  base::Optional<SharedRingBuffer> buf2 =
+      SharedRingBuffer::Attach(base::ScopedFile(dup(buf1->fd())));
+  StructuredTest(&*buf1, &*buf2);
+}
+
+TEST(SharedRingBufferTest, MultiThreadingTest) {
+  constexpr auto kBufSize = base::kPageSize * 1024;  // 4 MB
+  SharedRingBuffer rd = *SharedRingBuffer::Create(kBufSize);
+  SharedRingBuffer wr =
+      *SharedRingBuffer::Attach(base::ScopedFile(dup(rd.fd())));
+
+  std::mutex mutex;
+  std::unordered_map<std::string, int64_t> expected_contents;
+  std::atomic<bool> writers_enabled{false};
+
+  auto writer_thread_fn = [&wr, &expected_contents, &mutex,
+                           &writers_enabled](size_t thread_id) {
+    while (!writers_enabled.load()) {
+    }
+    std::minstd_rand0 rnd_engine(static_cast<uint32_t>(thread_id));
+    std::uniform_int_distribution<size_t> dist(1, base::kPageSize * 8);
+    for (int i = 0; i < 10000; i++) {
+      size_t size = dist(rnd_engine);
+      ASSERT_GT(size, 0);
+      std::string data;
+      data.resize(size);
+      std::generate(data.begin(), data.end(), rnd_engine);
+      if (TryWrite(&wr, data.data(), data.size())) {
+        std::lock_guard<std::mutex> lock(mutex);
+        expected_contents[std::move(data)]++;
+      } else {
+        std::this_thread::yield();
+      }
+    }
+  };
+
+  auto reader_thread_fn = [&rd, &expected_contents, &mutex, &writers_enabled] {
+    for (;;) {
+      auto buf_and_size = rd.BeginRead();
+      if (!buf_and_size) {
+        if (!writers_enabled.load()) {
+          // Failing to read after the writers are done means that there is no
+          // data left in the ring buffer.
+          return;
+        } else {
+          std::this_thread::yield();
+          continue;
+        }
+      }
+      ASSERT_GT(buf_and_size.size, 0);
+      std::string data = ToString(buf_and_size);
+      std::lock_guard<std::mutex> lock(mutex);
+      expected_contents[std::move(data)]--;
+      rd.EndRead(std::move(buf_and_size));
+    }
+  };
+
+  constexpr size_t kNumWriterThreads = 4;
+  std::array<std::thread, kNumWriterThreads> writer_threads;
+  for (size_t i = 0; i < kNumWriterThreads; i++)
+    writer_threads[i] = std::thread(writer_thread_fn, i);
+
+  writers_enabled.store(true);
+
+  std::thread reader_thread(reader_thread_fn);
+
+  for (size_t i = 0; i < kNumWriterThreads; i++)
+    writer_threads[i].join();
+
+  writers_enabled.store(false);
+
+  reader_thread.join();
+}
+
+}  // namespace
+}  // namespace profiling
+}  // namespace perfetto