Add SharedRingBuffer
Change-Id: Ief02a8f8a10d5d0a81cca03ce2864fb4cc744ba8
diff --git a/Android.bp b/Android.bp
index d22e64c..4b1d646 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2609,6 +2609,8 @@
"src/profiling/memory/record_reader_unittest.cc",
"src/profiling/memory/sampler.cc",
"src/profiling/memory/sampler_unittest.cc",
+ "src/profiling/memory/shared_ring_buffer.cc",
+ "src/profiling/memory/shared_ring_buffer_unittest.cc",
"src/profiling/memory/socket_listener.cc",
"src/profiling/memory/socket_listener_unittest.cc",
"src/profiling/memory/system_property.cc",
diff --git a/BUILD.gn b/BUILD.gn
index 2d3a59b..f978826 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -63,6 +63,7 @@
deps += [
":perfetto_benchmarks",
":trace_processor",
+ "src/profiling/memory:ring_buffer",
"src/tracing:consumer_api_test",
"test/configs",
"tools/ftrace_proto_gen:ftrace_proto_gen",
@@ -119,6 +120,7 @@
deps += [
"src/ipc:unittests",
"src/perfetto_cmd:unittests",
+ "src/profiling/memory:ring_buffer_unittests",
"src/traced/probes:unittests",
"src/traced/probes/filesystem:unittests",
"src/traced/probes/ftrace:unittests",
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 3268b4b..fc5ab08 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -48,6 +48,30 @@
]
}
+source_set("ring_buffer") {
+ deps = [
+ "../../../gn:default_deps",
+ "../../base",
+ ]
+ sources = [
+ "shared_ring_buffer.cc",
+ "shared_ring_buffer.ch",
+ ]
+}
+
+source_set("ring_buffer_unittests") {
+ testonly = true
+ deps = [
+ ":ring_buffer",
+ "../../../gn:default_deps",
+ "../../../gn:gtest_deps",
+ "../../base",
+ ]
+ sources = [
+ "shared_ring_buffer_unittest.cc",
+ ]
+}
+
source_set("daemon") {
public_configs = [ "../../../buildtools:libunwindstack_config" ]
deps = [
diff --git a/src/profiling/memory/shared_ring_buffer.cc b/src/profiling/memory/shared_ring_buffer.cc
new file mode 100644
index 0000000..ebce527
--- /dev/null
+++ b/src/profiling/memory/shared_ring_buffer.cc
@@ -0,0 +1,290 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/shared_ring_buffer.h"
+
+#include <atomic>
+#include <type_traits>
+
+#include <inttypes.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "perfetto/base/build_config.h"
+#include "perfetto/base/scoped_file.h"
+#include "perfetto/base/temp_file.h"
+
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+#include <linux/memfd.h>
+#include <sys/syscall.h>
+#endif
+
+namespace perfetto {
+namespace profiling {
+
+namespace {
+
+constexpr auto kMetaPageSize = base::kPageSize;
+constexpr auto kAlignment = 8; // 64 bits to use aligned memcpy().
+constexpr auto kHeaderSize = kAlignment;
+constexpr auto kGuardSize = base::kPageSize * 1024 * 16; // 64 MB.
+
+} // namespace
+
+void ScopedSpinlock::LockSlow(Mode mode) {
+ // Slowpath.
+ for (size_t attempt = 0; mode == Mode::Blocking || attempt < 1024 * 10;
+ attempt++) {
+ if (!lock_->load(std::memory_order_relaxed) &&
+ PERFETTO_LIKELY(!lock_->exchange(true, std::memory_order_acquire))) {
+ locked_ = true;
+ return;
+ }
+ if (attempt && attempt % 1024 == 0)
+ usleep(1000);
+ }
+}
+
+ScopedSpinlock::ScopedSpinlock(ScopedSpinlock&& other) noexcept
+ : lock_(other.lock_), locked_(other.locked_) {
+ other.locked_ = false;
+}
+
+ScopedSpinlock& ScopedSpinlock::operator=(ScopedSpinlock&& other) {
+ if (this != &other) {
+ this->~ScopedSpinlock();
+ new (this) ScopedSpinlock(std::move(other));
+ }
+ return *this;
+}
+
+ScopedSpinlock::~ScopedSpinlock() {
+ Unlock();
+}
+
+SharedRingBuffer::SharedRingBuffer(CreateFlag, size_t size) {
+ size_t size_with_meta = size + kMetaPageSize;
+ // TODO(primiano): this is copy/pasted from posix_shared_memory.cc . Refactor.
+ base::ScopedFile fd;
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+ bool is_memfd = false;
+ fd.reset(static_cast<int>(syscall(__NR_memfd_create, "heaprofd_ringbuf",
+ MFD_CLOEXEC | MFD_ALLOW_SEALING)));
+ is_memfd = !!fd;
+
+ if (!fd) {
+ // TODO: if this fails on Android we should fall back on ashmem.
+ PERFETTO_DPLOG("memfd_create() failed");
+ }
+#endif
+
+ if (!fd)
+ fd = base::TempFile::CreateUnlinked().ReleaseFD();
+
+ PERFETTO_CHECK(fd);
+ int res = ftruncate(fd.get(), static_cast<off_t>(size_with_meta));
+ PERFETTO_CHECK(res == 0);
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+ if (is_memfd) {
+ res = fcntl(*fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_SEAL);
+ PERFETTO_DCHECK(res == 0);
+ }
+#endif
+ Initialize(std::move(fd));
+}
+
+SharedRingBuffer::~SharedRingBuffer() {
+ static_assert(std::is_trivially_constructible<MetadataPage>::value,
+ "MetadataPage must be trivially constructible");
+ static_assert(std::is_trivially_destructible<MetadataPage>::value,
+ "MetadataPage must be trivially destructible");
+
+ if (is_valid()) {
+ size_t outer_size = kMetaPageSize + size_ * 2 + kGuardSize;
+ munmap(meta_, outer_size);
+ }
+}
+
+void SharedRingBuffer::Initialize(base::ScopedFile mem_fd) {
+ struct stat stat_buf = {};
+ int res = fstat(*mem_fd, &stat_buf);
+ PERFETTO_CHECK(res == 0 && stat_buf.st_size > 0);
+ auto size_with_meta = static_cast<size_t>(stat_buf.st_size);
+ auto size = size_with_meta - kMetaPageSize;
+
+ // |size_with_meta| must be a power of two number of pages + 1 page (for
+ // metadata).
+ if (size_with_meta < 2 * base::kPageSize || size % base::kPageSize ||
+ (size & (size - 1))) {
+ PERFETTO_ELOG("SharedRingBuffer size is invalid (%zu)", size_with_meta);
+ return;
+ }
+
+ // First of all reserve the whole virtual region to fit the buffer twice
+ // + metadata page + red zone at the end.
+ size_t outer_size = kMetaPageSize + size * 2 + kGuardSize;
+ uint8_t* region = reinterpret_cast<uint8_t*>(
+ mmap(nullptr, outer_size, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
+ if (region == MAP_FAILED) {
+ PERFETTO_PLOG("mmap(PROT_NONE) failed");
+ return;
+ }
+
+ // Map first the whole buffer (including the initial metadata page) @ off=0.
+ void* reg1 = mmap(region, size_with_meta, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_FIXED, *mem_fd, 0);
+
+ // Then map again the buffer, skipping the metadata page. The final result is:
+ // [ METADATA ] [ RING BUFFER SHMEM ] [ RING BUFFER SHMEM ]
+ void* reg2 = mmap(region + size_with_meta, size, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_FIXED, *mem_fd,
+ /*offset=*/kMetaPageSize);
+
+ if (reg1 != region || reg2 != region + size_with_meta) {
+ PERFETTO_PLOG("mmap(MAP_SHARED) failed");
+ munmap(region, outer_size);
+ return;
+ }
+ size_ = size;
+ meta_ = new (region) MetadataPage();
+ mem_ = region + kMetaPageSize;
+ mem_fd_ = std::move(mem_fd);
+}
+
+SharedRingBuffer::Buffer SharedRingBuffer::BeginWrite(
+ const ScopedSpinlock& spinlock,
+ size_t size) {
+ PERFETTO_DCHECK(spinlock.locked());
+ Buffer result;
+
+ if (IsCorrupt())
+ return result;
+
+ const uint64_t size_with_header =
+ base::AlignUp<kAlignment>(size + kHeaderSize);
+ if (size_with_header > write_avail(spinlock)) {
+ meta_->num_writes_failed++;
+ return result;
+ }
+
+ uint8_t* wr_ptr = at(meta_->write_pos);
+
+ result.size = size;
+ result.data = wr_ptr + kHeaderSize;
+ meta_->write_pos += size_with_header;
+ meta_->bytes_written += size;
+ meta_->num_writes_succeeded++;
+ // By making this a release store, we can save grabbing the spinlock in
+ // EndWrite.
+ reinterpret_cast<std::atomic<uint32_t>*>(wr_ptr)->store(
+ 0, std::memory_order_release);
+ return result;
+}
+
+void SharedRingBuffer::EndWrite(Buffer buf) {
+ uint8_t* wr_ptr = buf.data - kHeaderSize;
+ PERFETTO_DCHECK(reinterpret_cast<uintptr_t>(wr_ptr) % kAlignment == 0);
+ reinterpret_cast<std::atomic<uint32_t>*>(wr_ptr)->store(
+ static_cast<uint32_t>(buf.size), std::memory_order_release);
+}
+
+SharedRingBuffer::Buffer SharedRingBuffer::BeginRead() {
+ ScopedSpinlock spinlock(&meta_->spinlock, ScopedSpinlock::Mode::Blocking);
+
+ if (IsCorrupt()) {
+ meta_->num_reads_failed++;
+ return Buffer();
+ }
+
+ uint64_t avail_read = read_avail(spinlock);
+
+ if (avail_read < kHeaderSize)
+ return Buffer(); // No data
+
+ uint8_t* rd_ptr = at(meta_->read_pos);
+ PERFETTO_DCHECK(reinterpret_cast<uintptr_t>(rd_ptr) % kAlignment == 0);
+ const size_t size = reinterpret_cast<std::atomic<uint32_t>*>(rd_ptr)->load(
+ std::memory_order_acquire);
+ if (size == 0)
+ return Buffer();
+ const size_t size_with_header = base::AlignUp<kAlignment>(size + kHeaderSize);
+
+ if (size_with_header > avail_read) {
+ PERFETTO_ELOG(
+ "Corrupted header detected, size=%zu"
+ ", read_avail=%zu, rd=%" PRIu64 ", wr=%" PRIu64,
+ size, read_avail(spinlock), meta_->read_pos, meta_->write_pos);
+ meta_->num_reads_failed++;
+ return Buffer();
+ }
+
+ rd_ptr += kHeaderSize;
+ PERFETTO_DCHECK(reinterpret_cast<uintptr_t>(rd_ptr) % kAlignment == 0);
+ return Buffer(rd_ptr, size);
+}
+
+void SharedRingBuffer::EndRead(Buffer buf) {
+ if (!buf)
+ return;
+ ScopedSpinlock spinlock(&meta_->spinlock, ScopedSpinlock::Mode::Blocking);
+ size_t size_with_header = base::AlignUp<kAlignment>(buf.size + kHeaderSize);
+ meta_->read_pos += size_with_header;
+}
+
+bool SharedRingBuffer::IsCorrupt() {
+ if (meta_->write_pos < meta_->read_pos ||
+ meta_->write_pos - meta_->read_pos > size_ ||
+ meta_->write_pos % kAlignment || meta_->read_pos % kAlignment) {
+ PERFETTO_ELOG("Ring buffer corrupted, rd=%" PRIu64 ", wr=%" PRIu64
+ ", size=%zu",
+ meta_->read_pos, meta_->write_pos, size_);
+ return true;
+ }
+ return false;
+}
+
+SharedRingBuffer::SharedRingBuffer(SharedRingBuffer&& other) noexcept {
+ *this = std::move(other);
+}
+
+SharedRingBuffer& SharedRingBuffer::operator=(SharedRingBuffer&& other) {
+ mem_fd_ = std::move(other.mem_fd_);
+ std::tie(meta_, mem_, size_) = std::tie(other.meta_, other.mem_, other.size_);
+ std::tie(other.meta_, other.mem_, other.size_) =
+ std::make_tuple(nullptr, nullptr, 0);
+ return *this;
+}
+
+// static
+base::Optional<SharedRingBuffer> SharedRingBuffer::Create(size_t size) {
+ auto buf = SharedRingBuffer(CreateFlag(), size);
+ if (!buf.is_valid())
+ return base::nullopt;
+ return base::make_optional(std::move(buf));
+}
+
+// static
+base::Optional<SharedRingBuffer> SharedRingBuffer::Attach(
+ base::ScopedFile mem_fd) {
+ auto buf = SharedRingBuffer(AttachFlag(), std::move(mem_fd));
+ if (!buf.is_valid())
+ return base::nullopt;
+ return base::make_optional(std::move(buf));
+}
+
+} // namespace profiling
+} // namespace perfetto
diff --git a/src/profiling/memory/shared_ring_buffer.h b/src/profiling/memory/shared_ring_buffer.h
new file mode 100644
index 0000000..a865fe3
--- /dev/null
+++ b/src/profiling/memory/shared_ring_buffer.h
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
+#define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
+
+#include "perfetto/base/optional.h"
+#include "perfetto/base/unix_socket.h"
+#include "perfetto/base/utils.h"
+
+#include <atomic>
+#include <map>
+#include <memory>
+
+#include <stdint.h>
+
+namespace perfetto {
+namespace profiling {
+
+class ScopedSpinlock {
+ public:
+ enum class Mode { Try, Blocking };
+
+ ScopedSpinlock(std::atomic<bool>* lock, Mode mode) : lock_(lock) {
+ if (PERFETTO_LIKELY(!lock_->exchange(true, std::memory_order_acquire))) {
+ locked_ = true;
+ return;
+ }
+ LockSlow(mode);
+ }
+
+ ScopedSpinlock(const ScopedSpinlock&) = delete;
+ ScopedSpinlock& operator=(const ScopedSpinlock&) = delete;
+
+ ScopedSpinlock(ScopedSpinlock&&) noexcept;
+ ScopedSpinlock& operator=(ScopedSpinlock&&);
+
+ ~ScopedSpinlock();
+
+ void Unlock() {
+ if (locked_) {
+ PERFETTO_DCHECK(lock_->load());
+ lock_->store(false, std::memory_order_release);
+ }
+ locked_ = false;
+ }
+
+ bool locked() const { return locked_; }
+
+ private:
+ void LockSlow(Mode mode);
+ std::atomic<bool>* lock_;
+ bool locked_ = false;
+};
+
+// A concurrent, multi-writer single-reader ring buffer FIFO, based on a
+// circular buffer over shared memory. It has similar semantics to a SEQ_PACKET
+// + O_NONBLOCK socket, specifically:
+//
+// - Writes are atomic, data is either written fully in the buffer or not.
+// - New writes are discarded if the buffer is full.
+// - If a write succeeds, the reader is guaranteed to see the whole buffer.
+// - Reads are atomic, no fragmentation.
+// - The reader sees writes in write order (% discarding).
+//
+// This class assumes that reader and write trust each other. Don't use in
+// untrusted contexts.
+//
+// TODO:
+// - Write a benchmark.
+// - Make the stats ifdef-able.
+class SharedRingBuffer {
+ public:
+ class Buffer {
+ public:
+ Buffer() {}
+ Buffer(uint8_t* d, size_t s) : data(d), size(s) {}
+
+ Buffer(const Buffer&) = delete;
+ Buffer& operator=(const Buffer&) = delete;
+
+ Buffer(Buffer&&) = default;
+ Buffer& operator=(Buffer&&) = default;
+
+ operator bool() const { return data != nullptr; }
+
+ uint8_t* data = nullptr;
+ size_t size = 0;
+ };
+
+ static base::Optional<SharedRingBuffer> Create(size_t);
+ static base::Optional<SharedRingBuffer> Attach(base::ScopedFile);
+
+ ~SharedRingBuffer();
+ SharedRingBuffer(SharedRingBuffer&&) noexcept;
+ SharedRingBuffer& operator=(SharedRingBuffer&&);
+
+ bool is_valid() const { return !!mem_; }
+ size_t size() const { return size_; }
+ int fd() const { return *mem_fd_; }
+
+ Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size);
+ void EndWrite(Buffer buf);
+
+ Buffer BeginRead();
+ void EndRead(Buffer);
+
+ // This is used by the caller to be able to hold the SpinLock after
+ // BeginWrite has returned. This is so that additional bookkeeping can be
+ // done under the lock. This will be used to increment the sequence_number.
+ ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) {
+ return ScopedSpinlock(&meta_->spinlock, mode);
+ }
+
+ private:
+ struct alignas(base::kPageSize) MetadataPage {
+ std::atomic<bool> spinlock;
+ uint64_t read_pos;
+ uint64_t write_pos;
+
+ // stats, for debugging only.
+ std::atomic<uint64_t> failed_spinlocks;
+ std::atomic<uint64_t> bytes_written;
+ std::atomic<uint64_t> num_writes_succeeded;
+ std::atomic<uint64_t> num_writes_failed;
+ std::atomic<uint64_t> num_reads_failed;
+ };
+
+ struct CreateFlag {};
+ struct AttachFlag {};
+ SharedRingBuffer(const SharedRingBuffer&) = delete;
+ SharedRingBuffer& operator=(const SharedRingBuffer&) = delete;
+ SharedRingBuffer(CreateFlag, size_t size);
+ SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) {
+ Initialize(std::move(mem_fd));
+ }
+
+ void Initialize(base::ScopedFile mem_fd);
+ bool IsCorrupt();
+
+ inline size_t read_avail(const ScopedSpinlock& lock) {
+ PERFETTO_DCHECK(lock.locked());
+ PERFETTO_DCHECK(meta_->write_pos >= meta_->read_pos);
+ auto res = static_cast<size_t>(meta_->write_pos - meta_->read_pos);
+ PERFETTO_DCHECK(res <= size_);
+ return res;
+ }
+
+ inline size_t write_avail(const ScopedSpinlock& spinlock) {
+ return size_ - read_avail(spinlock);
+ }
+
+ inline uint8_t* at(uint64_t pos) { return mem_ + (pos & (size_ - 1)); }
+
+ base::ScopedFile mem_fd_;
+ MetadataPage* meta_ = nullptr; // Start of the mmaped region.
+ uint8_t* mem_ = nullptr; // Start of the contents (i.e. meta_ + kPageSize).
+
+ // Size of the ring buffer contents, without including metadata or the 2nd
+ // mmap.
+ size_t size_ = 0;
+
+ // Remember to update the move ctor when adding new fields.
+};
+
+} // namespace profiling
+} // namespace perfetto
+
+#endif // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
diff --git a/src/profiling/memory/shared_ring_buffer_unittest.cc b/src/profiling/memory/shared_ring_buffer_unittest.cc
new file mode 100644
index 0000000..d028c5f
--- /dev/null
+++ b/src/profiling/memory/shared_ring_buffer_unittest.cc
@@ -0,0 +1,233 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/shared_ring_buffer.h"
+
+#include <array>
+#include <mutex>
+#include <random>
+#include <thread>
+#include <unordered_map>
+
+#include "gtest/gtest.h"
+#include "perfetto/base/optional.h"
+
+namespace perfetto {
+namespace profiling {
+namespace {
+
+std::string ToString(const SharedRingBuffer::Buffer& buf_and_size) {
+ return std::string(reinterpret_cast<const char*>(&buf_and_size.data[0]),
+ buf_and_size.size);
+}
+
+bool TryWrite(SharedRingBuffer* wr, const char* src, size_t size) {
+ SharedRingBuffer::Buffer buf;
+ {
+ auto lock = wr->AcquireLock(ScopedSpinlock::Mode::Try);
+ if (!lock.locked())
+ return false;
+ buf = wr->BeginWrite(lock, size);
+ }
+ if (!buf)
+ return false;
+ memcpy(buf.data, src, size);
+ wr->EndWrite(std::move(buf));
+ return true;
+}
+
+void StructuredTest(SharedRingBuffer* wr, SharedRingBuffer* rd) {
+ ASSERT_TRUE(wr);
+ ASSERT_TRUE(wr->is_valid());
+ ASSERT_TRUE(wr->size() == rd->size());
+ const size_t buf_size = wr->size();
+
+ // Test small writes.
+ ASSERT_TRUE(TryWrite(wr, "foo", 4));
+ ASSERT_TRUE(TryWrite(wr, "bar", 4));
+
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(buf_and_size.size, 4);
+ ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "foo");
+ rd->EndRead(std::move(buf_and_size));
+ }
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(buf_and_size.size, 4);
+ ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "bar");
+ rd->EndRead(std::move(buf_and_size));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(buf_and_size.data, nullptr);
+ ASSERT_EQ(buf_and_size.size, 0);
+ }
+
+ // Test extremely large writes (fill the buffer)
+ for (int i = 0; i < 3; i++) {
+ // TryWrite precisely |buf_size| bytes (minus the size header itself).
+ std::string data(buf_size - sizeof(uint64_t), '.' + static_cast<char>(i));
+ ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
+ ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
+ ASSERT_FALSE(TryWrite(wr, "?", 1));
+
+ // And read it back
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(ToString(buf_and_size), data);
+ rd->EndRead(std::move(buf_and_size));
+ }
+
+ // Test large writes that wrap.
+ std::string data(buf_size / 4 * 3 - sizeof(uint64_t), '!');
+ ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
+ ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(ToString(buf_and_size), data);
+ rd->EndRead(std::move(buf_and_size));
+ }
+ data = std::string(base::kPageSize - sizeof(uint64_t), '#');
+ for (int i = 0; i < 4; i++)
+ ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
+
+ for (int i = 0; i < 4; i++) {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(buf_and_size.size, data.size());
+ ASSERT_EQ(ToString(buf_and_size), data);
+ rd->EndRead(std::move(buf_and_size));
+ }
+
+ // Test misaligned writes.
+ ASSERT_TRUE(TryWrite(wr, "1", 1));
+ ASSERT_TRUE(TryWrite(wr, "22", 2));
+ ASSERT_TRUE(TryWrite(wr, "333", 3));
+ ASSERT_TRUE(TryWrite(wr, "55555", 5));
+ ASSERT_TRUE(TryWrite(wr, "7777777", 7));
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(ToString(buf_and_size), "1");
+ rd->EndRead(std::move(buf_and_size));
+ }
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(ToString(buf_and_size), "22");
+ rd->EndRead(std::move(buf_and_size));
+ }
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(ToString(buf_and_size), "333");
+ rd->EndRead(std::move(buf_and_size));
+ }
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(ToString(buf_and_size), "55555");
+ rd->EndRead(std::move(buf_and_size));
+ }
+ {
+ auto buf_and_size = rd->BeginRead();
+ ASSERT_EQ(ToString(buf_and_size), "7777777");
+ rd->EndRead(std::move(buf_and_size));
+ }
+}
+
+TEST(SharedRingBufferTest, SingleThreadSameInstance) {
+ constexpr auto kBufSize = base::kPageSize * 4;
+ base::Optional<SharedRingBuffer> buf = SharedRingBuffer::Create(kBufSize);
+ StructuredTest(&*buf, &*buf);
+}
+
+TEST(SharedRingBufferTest, SingleThreadAttach) {
+ constexpr auto kBufSize = base::kPageSize * 4;
+ base::Optional<SharedRingBuffer> buf1 = SharedRingBuffer::Create(kBufSize);
+ base::Optional<SharedRingBuffer> buf2 =
+ SharedRingBuffer::Attach(base::ScopedFile(dup(buf1->fd())));
+ StructuredTest(&*buf1, &*buf2);
+}
+
+TEST(SharedRingBufferTest, MultiThreadingTest) {
+ constexpr auto kBufSize = base::kPageSize * 1024; // 4 MB
+ SharedRingBuffer rd = *SharedRingBuffer::Create(kBufSize);
+ SharedRingBuffer wr =
+ *SharedRingBuffer::Attach(base::ScopedFile(dup(rd.fd())));
+
+ std::mutex mutex;
+ std::unordered_map<std::string, int64_t> expected_contents;
+ std::atomic<bool> writers_enabled{false};
+
+ auto writer_thread_fn = [&wr, &expected_contents, &mutex,
+ &writers_enabled](size_t thread_id) {
+ while (!writers_enabled.load()) {
+ }
+ std::minstd_rand0 rnd_engine(static_cast<uint32_t>(thread_id));
+ std::uniform_int_distribution<size_t> dist(1, base::kPageSize * 8);
+ for (int i = 0; i < 10000; i++) {
+ size_t size = dist(rnd_engine);
+ ASSERT_GT(size, 0);
+ std::string data;
+ data.resize(size);
+ std::generate(data.begin(), data.end(), rnd_engine);
+ if (TryWrite(&wr, data.data(), data.size())) {
+ std::lock_guard<std::mutex> lock(mutex);
+ expected_contents[std::move(data)]++;
+ } else {
+ std::this_thread::yield();
+ }
+ }
+ };
+
+ auto reader_thread_fn = [&rd, &expected_contents, &mutex, &writers_enabled] {
+ for (;;) {
+ auto buf_and_size = rd.BeginRead();
+ if (!buf_and_size) {
+ if (!writers_enabled.load()) {
+ // Failing to read after the writers are done means that there is no
+ // data left in the ring buffer.
+ return;
+ } else {
+ std::this_thread::yield();
+ continue;
+ }
+ }
+ ASSERT_GT(buf_and_size.size, 0);
+ std::string data = ToString(buf_and_size);
+ std::lock_guard<std::mutex> lock(mutex);
+ expected_contents[std::move(data)]--;
+ rd.EndRead(std::move(buf_and_size));
+ }
+ };
+
+ constexpr size_t kNumWriterThreads = 4;
+ std::array<std::thread, kNumWriterThreads> writer_threads;
+ for (size_t i = 0; i < kNumWriterThreads; i++)
+ writer_threads[i] = std::thread(writer_thread_fn, i);
+
+ writers_enabled.store(true);
+
+ std::thread reader_thread(reader_thread_fn);
+
+ for (size_t i = 0; i < kNumWriterThreads; i++)
+ writer_threads[i].join();
+
+ writers_enabled.store(false);
+
+ reader_thread.join();
+}
+
+} // namespace
+} // namespace profiling
+} // namespace perfetto