pw_ring_buffer: Add ring buffer iterator
To support walking through the ring buffers in crash contexts, it's
useful to have a mechanism that allows processing entries in the ring
buffer without requiring buffer copies (in favor of raw access). This
adds a new unsafe iteration class that can be used while the caller has
a guarantee that the underlying buffer is not being mutated.
The multisink is also updated to support this unsafe iteration by adding
an oldest entry reader that is used as a reference point for the
iterator class. This will additionally support late-drain attach flows
where a drain may want to match the oldest entry available in the
buffer, as opposed to the current logic that always starts it at the
write index.
Bug: b/190547664
Change-Id: I430bab37b47630dbdfba184ad94f818a5225f79a
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/50262
Commit-Queue: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Reviewed-by: Ewout van Bekkum <ewout@google.com>
diff --git a/pw_multisink/docs.rst b/pw_multisink/docs.rst
index 7052e14..a33214c 100644
--- a/pw_multisink/docs.rst
+++ b/pw_multisink/docs.rst
@@ -16,10 +16,66 @@
.. c:macro:: PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
- Whether an interrupt-safe lock is used to guard multisink read/write operations.
+ Whether an interrupt-safe lock is used to guard multisink read/write
+ operations.
- By default, this option is enabled and the multisink uses an interrupt spin-lock
- to guard its transactions. If disabled, a mutex is used instead.
+ By default, this option is enabled and the multisink uses an interrupt
+ spin-lock to guard its transactions. If disabled, a mutex is used instead.
- Disabling this will alter the entry precondition of the multisink, requiring that
- it not be called from an interrupt context.
+ Disabling this will alter the entry precondition of the multisink,
+ requiring that it not be called from an interrupt context.
+
+Iterator
+========
+It may be useful to access the entries in the underlying buffer when no drains
+are attached or in crash contexts where dumping out all entries is desirable,
+even if those entries were previously consumed by a drain. This module provides
+an iteration class that is thread-unsafe and like standard iterators, assumes
+that the buffer is not being mutated while iterating. A
+`MultiSink::UnsafeIterationWrapper` class that supports range-based for-loop
+usage canbe acquired via `MultiSink::UnsafeIteration()`.
+
+The iterator starts from the oldest available entry in the buffer, regardless of
+whether all attached drains have already consumed that entry. This allows the
+iterator to be used even if no drains have been previously attached.
+
+.. code-block:: cpp
+
+ // Create a multisink and a test string to push into it.
+ constexpr char kExampleEntry[] = "Example!";
+ std::byte buffer[1024];
+ MultiSink multisink(buffer);
+ MultiSink::Drain drain;
+
+ // Push an entry before a drain is attached.
+ multisink.HandleEntry(kExampleEntry);
+ multisink.HandleEntry(kExampleEntry);
+
+ // Iterate through the entries, this will print out:
+ // "Example!"
+ // "Example!"
+ // Note: PrintByteArray is not a provided utility function.
+ for (ConstByteSpan entry : multisink.UnsafeIteration()) {
+ PrintByteArray(entry);
+ }
+
+ // Attach a drain and consume only one of the entries.
+ std::byte read_buffer[512];
+ uint32_t drop_count = 0;
+
+ multisink.AttachDrain(drain);
+ drain.GetEntry(read_buffer, drop_count);
+
+ // !! A function causes a crash before we've read out all entries.
+ FunctionThatCrashes();
+
+ // ... Crash Context ...
+
+ // You can use a range-based for-loop to walk through all entries,
+ // even though the attached drain has consumed one of them.
+ // This will also print out:
+ // "Example!"
+ // "Example!"
+ for (ConstByteSpan entry : multisink.UnsafeIteration()) {
+ PrintByteArray(entry);
+ }
diff --git a/pw_multisink/multisink_test.cc b/pw_multisink/multisink_test.cc
index c3d2583..705fc66 100644
--- a/pw_multisink/multisink_test.cc
+++ b/pw_multisink/multisink_test.cc
@@ -194,4 +194,50 @@
ExpectMessageAndDropCount(drains_[0], {}, 0u);
}
+TEST_F(MultiSinkTest, Iterator) {
+ multisink_.AttachDrain(drains_[0]);
+
+ // Insert entries and consume them all.
+ multisink_.HandleEntry(kMessage);
+ multisink_.HandleEntry(kMessage);
+ multisink_.HandleEntry(kMessage);
+
+ ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
+ ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
+ ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
+
+ // Confirm that the iterator still observes the messages in the ring buffer.
+ size_t iterated_entries = 0;
+ for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
+ EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
+ iterated_entries++;
+ }
+ EXPECT_EQ(iterated_entries, 3u);
+}
+
+TEST_F(MultiSinkTest, IteratorNoDrains) {
+ // Insert entries with no drains attached. Even though there are no consumers,
+ // iterators should still walk from the oldest entry.
+ multisink_.HandleEntry(kMessage);
+ multisink_.HandleEntry(kMessage);
+ multisink_.HandleEntry(kMessage);
+
+ // Confirm that the iterator still observes the messages in the ring buffer.
+ size_t iterated_entries = 0;
+ for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
+ EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
+ iterated_entries++;
+ }
+ EXPECT_EQ(iterated_entries, 3u);
+}
+
+TEST_F(MultiSinkTest, IteratorNoEntries) {
+ // Attach a drain, but don't add any entries.
+ multisink_.AttachDrain(drains_[0]);
+ // Confirm that the iterator has no entries.
+ MultiSink::UnsafeIterationWrapper unsafe_iterator =
+ multisink_.UnsafeIteration();
+ EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
+}
+
} // namespace pw::multisink
diff --git a/pw_multisink/public/pw_multisink/multisink.h b/pw_multisink/public/pw_multisink/multisink.h
index be8f585..0d6a89c 100644
--- a/pw_multisink/public/pw_multisink/multisink.h
+++ b/pw_multisink/public/pw_multisink/multisink.h
@@ -145,9 +145,78 @@
virtual void OnNewEntryAvailable() = 0;
};
+ class Iterator;
+
+ class iterator {
+ public:
+ iterator& operator++() {
+ it_++;
+ return *this;
+ }
+ iterator operator++(int) {
+ iterator original = *this;
+ ++*this;
+ return original;
+ }
+
+ ConstByteSpan& operator*() {
+ entry_ = (*it_).buffer;
+ return entry_;
+ }
+ ConstByteSpan* operator->() { return &operator*(); }
+
+ constexpr bool operator==(const iterator& rhs) const {
+ return it_ == rhs.it_;
+ }
+
+ constexpr bool operator!=(const iterator& rhs) const {
+ return it_ != rhs.it_;
+ }
+
+ Status status() const { return it_.status(); }
+
+ private:
+ friend class MultiSink;
+
+ iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
+ : it_(reader) {}
+ iterator() {}
+
+ ring_buffer::PrefixedEntryRingBufferMulti::iterator it_;
+ ConstByteSpan entry_;
+ Status iteration_status_;
+ };
+
+ class UnsafeIterationWrapper {
+ public:
+ using element_type = ConstByteSpan;
+ using value_type = std::remove_cv_t<ConstByteSpan>;
+ using pointer = ConstByteSpan*;
+ using reference = ConstByteSpan&;
+ using const_iterator = iterator; // Standard alias for iterable types.
+
+ iterator begin() const { return iterator(*reader_); }
+ iterator end() const { return iterator(); }
+ const_iterator cbegin() const { return begin(); }
+ const_iterator cend() const { return end(); }
+
+ private:
+ friend class MultiSink;
+ UnsafeIterationWrapper(
+ ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
+ : reader_(&reader) {}
+ ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_;
+ };
+
+ UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS {
+ return UnsafeIterationWrapper(oldest_entry_reader_);
+ }
+
// Constructs a multisink using a ring buffer backed by the provided buffer.
MultiSink(ByteSpan buffer) : ring_buffer_(true), sequence_id_(0) {
ring_buffer_.SetBuffer(buffer);
+ Status attach_status = ring_buffer_.AttachReader(oldest_entry_reader_);
+ PW_DASSERT(attach_status.ok());
}
// Write an entry to the multisink. If available space is less than the
@@ -223,6 +292,8 @@
IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_);
ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
+ ring_buffer::PrefixedEntryRingBufferMulti::Reader oldest_entry_reader_
+ PW_GUARDED_BY(lock_);
uint32_t sequence_id_ PW_GUARDED_BY(lock_);
LockType lock_;
};
diff --git a/pw_ring_buffer/BUILD b/pw_ring_buffer/BUILD
index 1e079d9..483962b 100644
--- a/pw_ring_buffer/BUILD
+++ b/pw_ring_buffer/BUILD
@@ -35,6 +35,7 @@
"//pw_containers",
"//pw_span",
"//pw_status",
+ "//pw_result",
"//pw_varint",
],
)
diff --git a/pw_ring_buffer/BUILD.gn b/pw_ring_buffer/BUILD.gn
index 8871caa..88db5bd 100644
--- a/pw_ring_buffer/BUILD.gn
+++ b/pw_ring_buffer/BUILD.gn
@@ -27,6 +27,7 @@
public_configs = [ ":default_config" ]
public_deps = [
"$dir_pw_containers",
+ "$dir_pw_result",
"$dir_pw_status",
]
sources = [ "prefixed_entry_ring_buffer.cc" ]
diff --git a/pw_ring_buffer/docs.rst b/pw_ring_buffer/docs.rst
index 2209468..a98a44c 100644
--- a/pw_ring_buffer/docs.rst
+++ b/pw_ring_buffer/docs.rst
@@ -12,6 +12,63 @@
=============
* C++11
+Iterator
+========
+In crash contexts, it may be useful to scan through a ring buffer that may
+have a mix of valid (yet to be read), stale (read), and invalid entries. The
+`PrefixedEntryRingBufferMulti::iterator` class can be used to walk through
+entries in the provided buffer.
+
+.. code-block:: cpp
+
+ // A test string to push into the buffer.
+ constexpr char kExampleEntry[] = "Example!";
+
+ // Setting up buffers and attaching a reader.
+ std::byte buffer[1024];
+ std::byte read_buffer[256];
+ PrefixedEntryRingBuffer ring_buffer(buffer);
+ PrefixedEntryRingBuffer::Reader reader;
+ ring_buffer.AttachReader(reader);
+
+ // Insert some entries and process some entries.
+ ring_buffer.PushBack(kExampleEntry);
+ ring_buffer.PushBack(kExampleEntry);
+ reader.PopFront();
+
+ // !! A function causes a crash before we've read out all entries.
+ FunctionThatCrashes();
+
+ // ... Crash Context ...
+
+ // You can use a range-based for-loop to walk through all entries.
+ for (auto entry : ring_buffer) {
+ PW_LOG_WARN("Read entry of size: %lu", entry.size());
+ }
+
+In cases where a crash has caused the ring buffer to have corrupted data, the
+iterator will progress until it sees the corrupted section and instead move to
+`iterator::end()`. The `iterator::status()` function returns a `pw::Status`
+indicating the reason the iterator reached it's end.
+
+.. code-block:: cpp
+
+ // ... Crash Context ...
+
+ using iterator = PrefixedEntryRingBufferMulti::iterator;
+
+ // Hold the iterator outside any loops to inspect it later.
+ iterator it = ring_buffer.begin();
+ for (; it != it.end(); ++it) {
+ PW_LOG_WARN("Read entry of size: %lu", it->size());
+ }
+
+ // Warn if there was a failure during iteration.
+ if (!it.status().ok()) {
+ PW_LOG_WARN("Iterator failed to read some entries!");
+ }
+
+
Dependencies
============
* ``pw_span``
diff --git a/pw_ring_buffer/prefixed_entry_ring_buffer.cc b/pw_ring_buffer/prefixed_entry_ring_buffer.cc
index a2d4b2b..8f11e9b 100644
--- a/pw_ring_buffer/prefixed_entry_ring_buffer.cc
+++ b/pw_ring_buffer/prefixed_entry_ring_buffer.cc
@@ -18,19 +18,23 @@
#include <cstring>
#include "pw_assert/assert.h"
+#include "pw_assert/check.h"
+#include "pw_status/try.h"
#include "pw_varint/varint.h"
namespace pw {
namespace ring_buffer {
using std::byte;
+using Entry = PrefixedEntryRingBufferMulti::Entry;
using Reader = PrefixedEntryRingBufferMulti::Reader;
+using iterator = PrefixedEntryRingBufferMulti::iterator;
void PrefixedEntryRingBufferMulti::Clear() {
write_idx_ = 0;
for (Reader& reader : readers_) {
- reader.read_idx = 0;
- reader.entry_count = 0;
+ reader.read_idx_ = 0;
+ reader.entry_count_ = 0;
}
}
@@ -49,26 +53,26 @@
}
Status PrefixedEntryRingBufferMulti::AttachReader(Reader& reader) {
- if (reader.buffer != nullptr) {
+ if (reader.buffer_ != nullptr) {
return Status::InvalidArgument();
}
- reader.buffer = this;
+ reader.buffer_ = this;
// Note that a newly attached reader sees the buffer as empty,
// and is not privy to entries pushed before being attached.
- reader.read_idx = write_idx_;
- reader.entry_count = 0;
+ reader.read_idx_ = write_idx_;
+ reader.entry_count_ = 0;
readers_.push_back(reader);
return OkStatus();
}
Status PrefixedEntryRingBufferMulti::DetachReader(Reader& reader) {
- if (reader.buffer != this) {
+ if (reader.buffer_ != this) {
return Status::InvalidArgument();
}
- reader.buffer = nullptr;
- reader.read_idx = 0;
- reader.entry_count = 0;
+ reader.buffer_ = nullptr;
+ reader.read_idx_ = 0;
+ reader.entry_count_ = 0;
readers_.remove(reader);
return OkStatus();
}
@@ -117,7 +121,7 @@
// Update all readers of the new count.
for (Reader& reader : readers_) {
- reader.entry_count++;
+ reader.entry_count_++;
}
return OkStatus();
}
@@ -134,26 +138,25 @@
};
}
-Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
- std::span<byte> data,
- size_t* bytes_read_out) {
+Status PrefixedEntryRingBufferMulti::InternalPeekFront(
+ const Reader& reader, std::span<byte> data, size_t* bytes_read_out) const {
*bytes_read_out = 0;
return InternalRead(reader, GetOutput(data, bytes_read_out), false);
}
-Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
- ReadOutput output) {
+Status PrefixedEntryRingBufferMulti::InternalPeekFront(
+ const Reader& reader, ReadOutput output) const {
return InternalRead(reader, output, false);
}
Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
- Reader& reader, std::span<byte> data, size_t* bytes_read_out) {
+ const Reader& reader, std::span<byte> data, size_t* bytes_read_out) const {
*bytes_read_out = 0;
return InternalRead(reader, GetOutput(data, bytes_read_out), true);
}
Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
- Reader& reader, ReadOutput output) {
+ const Reader& reader, ReadOutput output) const {
return InternalRead(reader, output, true);
}
@@ -162,21 +165,21 @@
// T should be similar to Status (*read_output)(std::span<const byte>)
template <typename T>
Status PrefixedEntryRingBufferMulti::InternalRead(
- Reader& reader,
+ const Reader& reader,
T read_output,
bool include_preamble_in_output,
- uint32_t* user_preamble_out) {
+ uint32_t* user_preamble_out) const {
if (buffer_ == nullptr) {
return Status::FailedPrecondition();
}
- if (reader.entry_count == 0) {
+ if (reader.entry_count_ == 0) {
return Status::OutOfRange();
}
// Figure out where to start reading (wrapped); accounting for preamble.
EntryInfo info = FrontEntryInfo(reader);
size_t read_bytes = info.data_bytes;
- size_t data_read_idx = reader.read_idx;
+ size_t data_read_idx = reader.read_idx_;
if (user_preamble_out) {
*user_preamble_out = info.user_preamble;
}
@@ -206,49 +209,25 @@
// It is expected that InternalPopFrontAll is called only when there is
// something to pop from at least one reader. If no readers exist, or all
// readers are caught up, this function will assert.
- size_t entry_count = GetSlowestReader().entry_count;
- PW_DASSERT(entry_count != 0);
+ size_t entry_count = GetSlowestReader().entry_count_;
+ PW_DCHECK_INT_NE(entry_count, 0);
// Otherwise, pop the readers that have the largest value.
for (Reader& reader : readers_) {
- if (reader.entry_count == entry_count) {
+ if (reader.entry_count_ == entry_count) {
reader.PopFront();
}
}
}
-Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() {
- // Readers are guaranteed to be before the writer pointer (the class enforces
- // this on every read/write operation that forces the write pointer ahead of
- // an existing reader). To determine the slowest reader, we consider three
- // scenarios:
- //
- // In all below cases, WH is the write-head, and R# are readers, with R1
- // representing the slowest reader.
- // [[R1 R2 R3 WH]] => Right-hand writer, slowest reader is left-most reader.
- // [[WH R1 R2 R3]] => Left-hand writer, slowest reader is left-most reader.
- // [[R3 WH R1 R2]] => Middle-writer, slowest reader is left-most reader after
- // writer.
- //
- // Formally, choose the left-most reader after the writer (ex.2,3), but if
- // that doesn't exist, choose the left-most reader before the writer (ex.1).
- PW_DASSERT(readers_.size() > 0);
- Reader* slowest_reader_after_writer = nullptr;
- Reader* slowest_reader_before_writer = nullptr;
- for (Reader& reader : readers_) {
- if (reader.read_idx < write_idx_) {
- if (!slowest_reader_before_writer ||
- reader.read_idx < slowest_reader_before_writer->read_idx) {
- slowest_reader_before_writer = &reader;
- }
- } else {
- if (!slowest_reader_after_writer ||
- reader.read_idx < slowest_reader_after_writer->read_idx) {
- slowest_reader_after_writer = &reader;
- }
+const Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() const {
+ PW_DCHECK_INT_GT(readers_.size(), 0);
+ const Reader* slowest_reader = &(*readers_.begin());
+ for (const Reader& reader : readers_) {
+ if (reader.entry_count_ > slowest_reader->entry_count_) {
+ slowest_reader = &reader;
}
}
- return *(slowest_reader_after_writer ? slowest_reader_after_writer
- : slowest_reader_before_writer);
+ return *slowest_reader;
}
Status PrefixedEntryRingBufferMulti::Dering() {
@@ -257,34 +236,42 @@
}
// Check if by luck we're already deringed.
- Reader* slowest_reader = &GetSlowestReader();
- if (slowest_reader->read_idx == 0) {
+ Reader& slowest_reader = GetSlowestReaderWritable();
+ if (slowest_reader.read_idx_ == 0) {
return OkStatus();
}
+ return InternalDering(slowest_reader);
+}
+
+Status PrefixedEntryRingBufferMulti::InternalDering(Reader& dering_reader) {
+ if (buffer_ == nullptr || readers_.size() == 0) {
+ return Status::FailedPrecondition();
+ }
+
auto buffer_span = std::span(buffer_, buffer_bytes_);
std::rotate(buffer_span.begin(),
- buffer_span.begin() + slowest_reader->read_idx,
+ buffer_span.begin() + dering_reader.read_idx_,
buffer_span.end());
// If the new index is past the end of the buffer,
// alias it back (wrap) to the start of the buffer.
- if (write_idx_ < slowest_reader->read_idx) {
+ if (write_idx_ < dering_reader.read_idx_) {
write_idx_ += buffer_bytes_;
}
- write_idx_ -= slowest_reader->read_idx;
+ write_idx_ -= dering_reader.read_idx_;
for (Reader& reader : readers_) {
- if (&reader == slowest_reader) {
+ if (&reader == &dering_reader) {
continue;
}
- if (reader.read_idx < slowest_reader->read_idx) {
- reader.read_idx += buffer_bytes_;
+ if (reader.read_idx_ < dering_reader.read_idx_) {
+ reader.read_idx_ += buffer_bytes_;
}
- reader.read_idx -= slowest_reader->read_idx;
+ reader.read_idx_ -= dering_reader.read_idx_;
}
- slowest_reader->read_idx = 0;
+ dering_reader.read_idx_ = 0;
return OkStatus();
}
@@ -292,30 +279,30 @@
if (buffer_ == nullptr) {
return Status::FailedPrecondition();
}
- if (reader.entry_count == 0) {
+ if (reader.entry_count_ == 0) {
return Status::OutOfRange();
}
// Advance the read pointer past the front entry to the next one.
EntryInfo info = FrontEntryInfo(reader);
size_t entry_bytes = info.preamble_bytes + info.data_bytes;
- size_t prev_read_idx = reader.read_idx;
- reader.read_idx = IncrementIndex(prev_read_idx, entry_bytes);
- reader.entry_count--;
+ size_t prev_read_idx = reader.read_idx_;
+ reader.read_idx_ = IncrementIndex(prev_read_idx, entry_bytes);
+ reader.entry_count_--;
return OkStatus();
}
size_t PrefixedEntryRingBufferMulti::InternalFrontEntryDataSizeBytes(
- Reader& reader) {
- if (reader.entry_count == 0) {
+ const Reader& reader) const {
+ if (reader.entry_count_ == 0) {
return 0;
}
return FrontEntryInfo(reader).data_bytes;
}
size_t PrefixedEntryRingBufferMulti::InternalFrontEntryTotalSizeBytes(
- Reader& reader) {
- if (reader.entry_count == 0) {
+ const Reader& reader) const {
+ if (reader.entry_count_ == 0) {
return 0;
}
EntryInfo info = FrontEntryInfo(reader);
@@ -323,7 +310,15 @@
}
PrefixedEntryRingBufferMulti::EntryInfo
-PrefixedEntryRingBufferMulti::FrontEntryInfo(Reader& reader) {
+PrefixedEntryRingBufferMulti::FrontEntryInfo(const Reader& reader) const {
+ Result<PrefixedEntryRingBufferMulti::EntryInfo> entry_info =
+ RawFrontEntryInfo(reader.read_idx_);
+ PW_DCHECK_OK(entry_info.status());
+ return entry_info.value();
+}
+
+Result<PrefixedEntryRingBufferMulti::EntryInfo>
+PrefixedEntryRingBufferMulti::RawFrontEntryInfo(size_t source_idx) const {
// Entry headers consists of: (optional prefix byte, varint size, data...)
// If a preamble exists, extract the varint and it's bytes in bytes.
@@ -331,18 +326,22 @@
uint64_t user_preamble_data = 0;
byte varint_buf[varint::kMaxVarint32SizeBytes];
if (user_preamble_) {
- RawRead(varint_buf, reader.read_idx, varint::kMaxVarint32SizeBytes);
+ RawRead(varint_buf, source_idx, varint::kMaxVarint32SizeBytes);
user_preamble_bytes = varint::Decode(varint_buf, &user_preamble_data);
- PW_DASSERT(user_preamble_bytes != 0u);
+ if (user_preamble_bytes == 0u) {
+ return Status::DataLoss();
+ }
}
// Read the entry header; extract the varint and it's bytes in bytes.
RawRead(varint_buf,
- IncrementIndex(reader.read_idx, user_preamble_bytes),
+ IncrementIndex(source_idx, user_preamble_bytes),
varint::kMaxVarint32SizeBytes);
uint64_t entry_bytes;
size_t length_bytes = varint::Decode(varint_buf, &entry_bytes);
- PW_DASSERT(length_bytes != 0u);
+ if (length_bytes == 0u) {
+ return Status::DataLoss();
+ }
EntryInfo info = {};
info.preamble_bytes = user_preamble_bytes + length_bytes;
@@ -353,15 +352,14 @@
// Comparisons ordered for more probable early exits, assuming the reader is
// not far behind the writer compared to the size of the ring.
-size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() {
- // Compute slowest reader.
- // TODO: Alternatively, the slowest reader could be actively mantained on
- // every read operation, but reads are more likely than writes.
+size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() const {
+ // Compute slowest reader. If no readers exist, the entire buffer can be
+ // written.
if (readers_.size() == 0) {
return buffer_bytes_;
}
- size_t read_idx = GetSlowestReader().read_idx;
+ size_t read_idx = GetSlowestReader().read_idx_;
// Case: Not wrapped.
if (read_idx < write_idx_) {
return buffer_bytes_ - (write_idx_ - read_idx);
@@ -371,8 +369,8 @@
return read_idx - write_idx_;
}
// Case: Matched read and write heads; empty or full.
- for (Reader& reader : readers_) {
- if (reader.read_idx == read_idx && reader.entry_count != 0) {
+ for (const Reader& reader : readers_) {
+ if (reader.read_idx_ == read_idx && reader.entry_count_ != 0) {
return 0;
}
}
@@ -395,7 +393,7 @@
void PrefixedEntryRingBufferMulti::RawRead(byte* destination,
size_t source_idx,
- size_t length_bytes) {
+ size_t length_bytes) const {
// Read the pre-wrap bytes.
size_t bytes_until_wrap = buffer_bytes_ - source_idx;
size_t bytes_to_copy = std::min(length_bytes, bytes_until_wrap);
@@ -408,7 +406,7 @@
}
size_t PrefixedEntryRingBufferMulti::IncrementIndex(size_t index,
- size_t count) {
+ size_t count) const {
// Note: This doesn't use modulus (%) since the branch is cheaper, and we
// guarantee that count will never be greater than buffer_bytes_.
index += count;
@@ -421,11 +419,59 @@
Status PrefixedEntryRingBufferMulti::Reader::PeekFrontWithPreamble(
std::span<byte> data,
uint32_t& user_preamble_out,
- size_t& entry_bytes_read_out) {
+ size_t& entry_bytes_read_out) const {
entry_bytes_read_out = 0;
- return buffer->InternalRead(
+ return buffer_->InternalRead(
*this, GetOutput(data, &entry_bytes_read_out), false, &user_preamble_out);
}
+iterator& iterator::operator++() {
+ PW_DCHECK_OK(iteration_status_);
+ PW_DCHECK_INT_NE(entry_count_, 0);
+
+ Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
+ if (!info.status().ok()) {
+ SkipToEnd(info.status());
+ return *this;
+ }
+
+ // It is guaranteed that the buffer is deringed at this point.
+ read_idx_ += info.value().preamble_bytes + info.value().data_bytes;
+ entry_count_--;
+
+ if (entry_count_ == 0) {
+ SkipToEnd(OkStatus());
+ return *this;
+ }
+
+ if (read_idx_ >= ring_buffer_->TotalUsedBytes()) {
+ SkipToEnd(Status::DataLoss());
+ return *this;
+ }
+
+ info = ring_buffer_->RawFrontEntryInfo(read_idx_);
+ if (!info.status().ok()) {
+ SkipToEnd(info.status());
+ return *this;
+ }
+ return *this;
+}
+
+const Entry& iterator::operator*() const {
+ PW_DCHECK_OK(iteration_status_);
+ PW_DCHECK_INT_NE(entry_count_, 0);
+
+ Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
+ PW_DCHECK_OK(info.status());
+
+ entry_ = {
+ .buffer = std::span<const byte>(
+ ring_buffer_->buffer_ + read_idx_ + info.value().preamble_bytes,
+ info.value().data_bytes),
+ .preamble = info.value().user_preamble,
+ };
+ return entry_;
+}
+
} // namespace ring_buffer
} // namespace pw
diff --git a/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc b/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc
index 80190b8..75e73f0 100644
--- a/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc
+++ b/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc
@@ -26,6 +26,8 @@
namespace pw {
namespace ring_buffer {
namespace {
+using Entry = PrefixedEntryRingBufferMulti::Entry;
+using iterator = PrefixedEntryRingBufferMulti::iterator;
TEST(PrefixedEntryRingBuffer, NoBuffer) {
PrefixedEntryRingBuffer ring(false);
@@ -395,34 +397,54 @@
TEST(PrefixedEntryRingBuffer, DeringNoPreload) { DeringTest(false); }
template <typename T>
-Status PushBack(PrefixedEntryRingBufferMulti& ring, T element) {
+Status PushBack(PrefixedEntryRingBufferMulti& ring,
+ T element,
+ uint32_t user_preamble = 0) {
union {
std::array<byte, sizeof(element)> buffer;
T item;
} aliased;
aliased.item = element;
- return ring.PushBack(aliased.buffer);
+ return ring.PushBack(aliased.buffer, user_preamble);
}
template <typename T>
-Status TryPushBack(PrefixedEntryRingBufferMulti& ring, T element) {
+Status TryPushBack(PrefixedEntryRingBufferMulti& ring,
+ T element,
+ uint32_t user_preamble = 0) {
union {
std::array<byte, sizeof(element)> buffer;
T item;
} aliased;
aliased.item = element;
- return ring.TryPushBack(aliased.buffer);
+ return ring.TryPushBack(aliased.buffer, user_preamble);
}
template <typename T>
-T PeekFront(PrefixedEntryRingBufferMulti::Reader& reader) {
+T PeekFront(PrefixedEntryRingBufferMulti::Reader& reader,
+ uint32_t* user_preamble_out = nullptr) {
union {
std::array<byte, sizeof(T)> buffer;
T item;
} aliased;
size_t bytes_read = 0;
- PW_CHECK_OK(reader.PeekFront(aliased.buffer, &bytes_read));
+ uint32_t user_preamble = 0;
+ PW_CHECK_OK(
+ reader.PeekFrontWithPreamble(aliased.buffer, user_preamble, bytes_read));
PW_CHECK_INT_EQ(bytes_read, sizeof(T));
+ if (user_preamble_out) {
+ *user_preamble_out = user_preamble;
+ }
+ return aliased.item;
+}
+
+template <typename T>
+T GetEntry(std::span<const std::byte> lhs) {
+ union {
+ std::array<byte, sizeof(T)> buffer;
+ T item;
+ } aliased;
+ std::memcpy(aliased.buffer.data(), lhs.data(), lhs.size_bytes());
return aliased.item;
}
@@ -457,6 +479,27 @@
EXPECT_EQ(PeekFront<int>(ring), 100);
}
+TEST(PrefixedEntryRingBuffer, Iterator) {
+ PrefixedEntryRingBuffer ring;
+ byte test_buffer[kTestBufferSize];
+ EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+ // Fill up the ring buffer with a constant value.
+ size_t entry_count = 0;
+ while (TryPushBack<size_t>(ring, entry_count).ok()) {
+ entry_count++;
+ }
+
+ // Iterate over all entries and confirm entry count.
+ size_t validated_entries = 0;
+ for (Result<const Entry> entry_info : ring) {
+ EXPECT_TRUE(entry_info.status().ok());
+ EXPECT_EQ(GetEntry<size_t>(entry_info.value().buffer), validated_entries);
+ validated_entries++;
+ }
+ EXPECT_EQ(validated_entries, entry_count);
+}
+
TEST(PrefixedEntryRingBufferMulti, TryPushBack) {
PrefixedEntryRingBufferMulti ring;
byte test_buffer[kTestBufferSize];
@@ -481,15 +524,20 @@
}
// Run fast reader twice as fast as the slow reader.
+ size_t total_used_bytes = ring.TotalUsedBytes();
for (int i = 0; i < total_items; ++i) {
+ EXPECT_EQ(PeekFront<int>(fast_reader), i);
+ EXPECT_EQ(fast_reader.PopFront(), OkStatus());
+ EXPECT_EQ(ring.TotalUsedBytes(), total_used_bytes);
if (i % 2 == 0) {
EXPECT_EQ(PeekFront<int>(slow_reader), i / 2);
EXPECT_EQ(slow_reader.PopFront(), OkStatus());
+ EXPECT_TRUE(ring.TotalUsedBytes() < total_used_bytes);
}
- EXPECT_EQ(PeekFront<int>(fast_reader), i);
- EXPECT_EQ(fast_reader.PopFront(), OkStatus());
+ total_used_bytes = ring.TotalUsedBytes();
}
EXPECT_EQ(fast_reader.PopFront(), Status::OutOfRange());
+ EXPECT_TRUE(ring.TotalUsedBytes() > 0u);
// Fill the buffer again, expect that the fast reader
// only sees half the entries as the slow reader.
@@ -516,6 +564,7 @@
}
EXPECT_EQ(slow_reader.PopFront(), Status::OutOfRange());
EXPECT_EQ(fast_reader.PopFront(), Status::OutOfRange());
+ EXPECT_EQ(ring.TotalUsedBytes(), 0u);
}
TEST(PrefixedEntryRingBufferMulti, PushBack) {
@@ -611,6 +660,185 @@
EXPECT_EQ(ring_one.AttachReader(reader), Status::InvalidArgument());
}
+TEST(PrefixedEntryRingBufferMulti, IteratorEmptyBuffer) {
+ PrefixedEntryRingBufferMulti ring;
+ // Pick a buffer that can't contain any valid sections.
+ byte test_buffer[1] = {std::byte(0xFF)};
+
+ PrefixedEntryRingBufferMulti::Reader reader;
+ EXPECT_EQ(ring.AttachReader(reader), OkStatus());
+ EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+ EXPECT_EQ(ring.begin(), ring.end());
+}
+
+TEST(PrefixedEntryRingBufferMulti, IteratorValidEntries) {
+ PrefixedEntryRingBufferMulti ring;
+ byte test_buffer[kTestBufferSize];
+ EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+ PrefixedEntryRingBufferMulti::Reader reader;
+ EXPECT_EQ(ring.AttachReader(reader), OkStatus());
+
+ // Buffer only contains valid entries. This happens after populating
+ // the buffer and no entries have been read.
+ // E.g. [VALID|VALID|VALID|INVALID]
+
+ // Fill up the ring buffer with a constant value.
+ size_t entry_count = 0;
+ while (TryPushBack<size_t>(ring, entry_count).ok()) {
+ entry_count++;
+ }
+
+ // Iterate over all entries and confirm entry count.
+ size_t validated_entries = 0;
+ for (const Entry& entry_info : ring) {
+ EXPECT_EQ(GetEntry<size_t>(entry_info.buffer), validated_entries);
+ validated_entries++;
+ }
+ EXPECT_EQ(validated_entries, entry_count);
+}
+
+TEST(PrefixedEntryRingBufferMulti, IteratorValidEntriesWithPreamble) {
+ PrefixedEntryRingBufferMulti ring(true);
+ byte test_buffer[kTestBufferSize];
+ EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+ PrefixedEntryRingBufferMulti::Reader reader;
+ EXPECT_EQ(ring.AttachReader(reader), OkStatus());
+
+ // Buffer only contains valid entries. This happens after populating
+ // the buffer and no entries have been read.
+ // E.g. [VALID|VALID|VALID|INVALID]
+
+ // Fill up the ring buffer with a constant value.
+ size_t entry_count = 0;
+ while (TryPushBack<size_t>(ring, entry_count, entry_count).ok()) {
+ entry_count++;
+ }
+
+ // Iterate over all entries and confirm entry count.
+ size_t validated_entries = 0;
+ for (const Entry& entry_info : ring) {
+ EXPECT_EQ(GetEntry<size_t>(entry_info.buffer), validated_entries);
+ EXPECT_EQ(entry_info.preamble, validated_entries);
+ validated_entries++;
+ }
+ EXPECT_EQ(validated_entries, entry_count);
+}
+
+TEST(PrefixedEntryRingBufferMulti, IteratorStaleEntries) {
+ PrefixedEntryRingBufferMulti ring;
+ byte test_buffer[kTestBufferSize];
+ EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+ // Buffer only contains stale, valid entries. This happens when after
+ // populating the buffer, all entries are read. The buffer retains the
+ // data but has an entry count of zero.
+ // E.g. [STALE|STALE|STALE]
+ PrefixedEntryRingBufferMulti::Reader trailing_reader;
+ EXPECT_EQ(ring.AttachReader(trailing_reader), OkStatus());
+
+ PrefixedEntryRingBufferMulti::Reader reader;
+ EXPECT_EQ(ring.AttachReader(reader), OkStatus());
+
+ // Push and pop all the entries.
+ size_t entry_count = 0;
+ while (TryPushBack<size_t>(ring, entry_count).ok()) {
+ entry_count++;
+ }
+
+ while (reader.PopFront().ok()) {
+ }
+
+ // Iterate over all entries and confirm entry count.
+ size_t validated_entries = 0;
+ for (const Entry& entry_info : ring) {
+ EXPECT_EQ(GetEntry<size_t>(entry_info.buffer), validated_entries);
+ validated_entries++;
+ }
+ EXPECT_EQ(validated_entries, entry_count);
+}
+
+TEST(PrefixedEntryRingBufferMulti, IteratorValidStaleEntries) {
+ PrefixedEntryRingBufferMulti ring;
+ byte test_buffer[kTestBufferSize];
+ EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+ // Buffer contains both valid and stale entries. This happens when after
+ // populating the buffer, only some of the entries are read.
+ // E.g. [VALID|INVALID|STALE|STALE]
+ PrefixedEntryRingBufferMulti::Reader trailing_reader;
+ EXPECT_EQ(ring.AttachReader(trailing_reader), OkStatus());
+
+ PrefixedEntryRingBufferMulti::Reader reader;
+ EXPECT_EQ(ring.AttachReader(reader), OkStatus());
+
+ // Fill the buffer with entries.
+ size_t entry_count = 0;
+ while (TryPushBack<size_t>(ring, entry_count).ok()) {
+ entry_count++;
+ }
+
+ // Pop roughly half the entries.
+ while (reader.EntryCount() > (entry_count / 2)) {
+ EXPECT_TRUE(reader.PopFront().ok());
+ }
+
+ // Iterate over all entries and confirm entry count.
+ size_t validated_entries = 0;
+ for (const Entry& entry_info : ring) {
+ EXPECT_EQ(GetEntry<size_t>(entry_info.buffer), validated_entries);
+ validated_entries++;
+ }
+ EXPECT_EQ(validated_entries, entry_count);
+}
+
+TEST(PrefixedEntryRingBufferMulti, IteratorBufferCorruption) {
+ PrefixedEntryRingBufferMulti ring;
+ byte test_buffer[kTestBufferSize];
+ EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+ // Buffer contains partially written entries. This may happen if writing
+ // is pre-empted (e.g. a crash occurs). In this state, we expect a series
+ // of valid entries followed by an invalid entry.
+ PrefixedEntryRingBufferMulti::Reader trailing_reader;
+ EXPECT_EQ(ring.AttachReader(trailing_reader), OkStatus());
+
+ // Add one entry to capture the second entry index.
+ size_t entry_count = 0;
+ EXPECT_TRUE(TryPushBack<size_t>(ring, entry_count++).ok());
+ size_t entry_size = ring.TotalUsedBytes();
+
+ // Fill the buffer with entries.
+ while (TryPushBack<size_t>(ring, entry_count++).ok()) {
+ }
+
+ // Push another entry to move the write index forward and force the oldest
+ // reader forward. This will require the iterator to dering.
+ EXPECT_TRUE(PushBack<size_t>(ring, 0).ok());
+ EXPECT_TRUE(ring.CheckForCorruption().ok());
+
+ // The first entry is overwritten. Corrupt all data past the fifth entry.
+ // Note that because the first entry has shifted, the entry_count recorded
+ // in each entry is shifted by 1.
+ constexpr size_t valid_entries = 5;
+ size_t offset = valid_entries * entry_size;
+ memset(test_buffer + offset, 0xFF, kTestBufferSize - offset);
+ EXPECT_FALSE(ring.CheckForCorruption().ok());
+
+ // Iterate over all entries and confirm entry count.
+ size_t validated_entries = 0;
+ iterator it = ring.begin();
+ for (; it != ring.end(); it++) {
+ EXPECT_EQ(GetEntry<size_t>(it->buffer), validated_entries + 1);
+ validated_entries++;
+ }
+ // The final entry will fail to be read.
+ EXPECT_EQ(it.status(), Status::DataLoss());
+ EXPECT_EQ(validated_entries, valid_entries);
+}
+
} // namespace
} // namespace ring_buffer
} // namespace pw
diff --git a/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h b/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h
index b7b4706..08743c9 100644
--- a/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h
+++ b/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h
@@ -17,6 +17,7 @@
#include <span>
#include "pw_containers/intrusive_list.h"
+#include "pw_result/result.h"
#include "pw_status/status.h"
namespace pw {
@@ -57,7 +58,7 @@
// loss if they read slower than the writer.
class Reader : public IntrusiveList<Reader>::Item {
public:
- constexpr Reader() : buffer(nullptr), read_idx(0), entry_count(0) {}
+ constexpr Reader() : buffer_(nullptr), read_idx_(0), entry_count_(0) {}
// TODO(pwbug/344): Add locking to the internal functions. Who owns the
// lock? This class? Does this class need a lock if it's not a multi-reader?
@@ -76,12 +77,12 @@
// bytes than the data size of the data chunk being read. Available
// destination bytes were filled, remaining bytes of the data chunk were
// ignored.
- Status PeekFront(std::span<std::byte> data, size_t* bytes_read_out) {
- return buffer->InternalPeekFront(*this, data, bytes_read_out);
+ Status PeekFront(std::span<std::byte> data, size_t* bytes_read_out) const {
+ return buffer_->InternalPeekFront(*this, data, bytes_read_out);
}
- Status PeekFront(ReadOutput output) {
- return buffer->InternalPeekFront(*this, output);
+ Status PeekFront(ReadOutput output) const {
+ return buffer_->InternalPeekFront(*this, output);
}
// Same as PeekFront but includes the entry's preamble of optional user
@@ -90,15 +91,16 @@
// as it is required to determine the length populated in the span.
Status PeekFrontWithPreamble(std::span<std::byte> data,
uint32_t& user_preamble_out,
- size_t& entry_bytes_read_out);
+ size_t& entry_bytes_read_out) const;
Status PeekFrontWithPreamble(std::span<std::byte> data,
- size_t* bytes_read_out) {
- return buffer->InternalPeekFrontWithPreamble(*this, data, bytes_read_out);
+ size_t* bytes_read_out) const {
+ return buffer_->InternalPeekFrontWithPreamble(
+ *this, data, bytes_read_out);
}
- Status PeekFrontWithPreamble(ReadOutput output) {
- return buffer->InternalPeekFrontWithPreamble(*this, output);
+ Status PeekFrontWithPreamble(ReadOutput output) const {
+ return buffer_->InternalPeekFrontWithPreamble(*this, output);
}
// Pop and discard the oldest stored data chunk of data from the ring
@@ -108,34 +110,121 @@
// OK - Data successfully read from the ring buffer.
// FAILED_PRECONDITION - Buffer not initialized.
// OUT_OF_RANGE - No entries in ring buffer to pop.
- Status PopFront() { return buffer->InternalPopFront(*this); }
+ Status PopFront() { return buffer_->InternalPopFront(*this); }
// Get the size in bytes of the next chunk, not including preamble, to be
// read.
- size_t FrontEntryDataSizeBytes() {
- return buffer->InternalFrontEntryDataSizeBytes(*this);
+ size_t FrontEntryDataSizeBytes() const {
+ return buffer_->InternalFrontEntryDataSizeBytes(*this);
}
// Get the size in bytes of the next chunk, including preamble and data
// chunk, to be read.
- size_t FrontEntryTotalSizeBytes() {
- return buffer->InternalFrontEntryTotalSizeBytes(*this);
+ size_t FrontEntryTotalSizeBytes() const {
+ return buffer_->InternalFrontEntryTotalSizeBytes(*this);
}
// Get the number of variable-length entries currently in the ring buffer.
//
// Return value:
// Entry count.
- size_t EntryCount() { return entry_count; }
+ size_t EntryCount() const { return entry_count_; }
- protected:
+ private:
friend PrefixedEntryRingBufferMulti;
- PrefixedEntryRingBufferMulti* buffer;
- size_t read_idx;
- size_t entry_count;
+ // Internal constructors for the iterator class to create Reader instances
+ // at specific positions. Readers constructed through this interface cannot
+ // be attached/detached from the multisink.
+ constexpr Reader(Reader& reader)
+ : Reader(reader.buffer_, reader.read_idx_, reader.entry_count_) {}
+ constexpr Reader(PrefixedEntryRingBufferMulti* buffer,
+ size_t read_idx,
+ size_t entry_count)
+ : buffer_(buffer), read_idx_(read_idx), entry_count_(entry_count) {}
+
+ PrefixedEntryRingBufferMulti* buffer_;
+ size_t read_idx_;
+ size_t entry_count_;
};
+ // An entry returned by the iterator containing the byte span of the entry
+ // and preamble data (if the ring buffer was configured with a preamble).
+ struct Entry {
+ std::span<const std::byte> buffer;
+ uint32_t preamble;
+ };
+
+ // An iterator that can be used to walk through all entries from a given
+ // Reader position, without mutating the underlying buffer. This is useful in
+ // crash contexts where all available entries in the buffer must be acquired,
+ // even those that have already been consumed by all attached readers.
+ class iterator {
+ public:
+ iterator() : ring_buffer_(nullptr), read_idx_(0), entry_count_(0) {}
+ iterator(Reader& reader)
+ : ring_buffer_(reader.buffer_),
+ read_idx_(0),
+ entry_count_(reader.entry_count_) {
+ Status dering_result = ring_buffer_->InternalDering(reader);
+ PW_DASSERT(dering_result.ok());
+ }
+
+ iterator& operator++();
+ iterator operator++(int) {
+ iterator original = *this;
+ ++*this;
+ return original;
+ }
+
+ // Returns entry at current position.
+ const Entry& operator*() const;
+ const Entry* operator->() const { return &operator*(); }
+
+ constexpr bool operator==(const iterator& rhs) const {
+ return entry_count_ == rhs.entry_count_;
+ }
+
+ constexpr bool operator!=(const iterator& rhs) const {
+ return entry_count_ != rhs.entry_count_;
+ }
+
+ // Returns the status of the last iteration operation. If the iterator
+ // fails to read an entry, it will move to iterator::end() and indicate
+ // the failure reason here.
+ Status status() const { return iteration_status_; }
+
+ private:
+ static constexpr Entry kEndEntry = {
+ .buffer = std::span<const std::byte>(),
+ .preamble = 0,
+ };
+
+ void SkipToEnd(Status status) {
+ iteration_status_ = status;
+ entry_ = kEndEntry;
+ entry_count_ = 0;
+ }
+
+ PrefixedEntryRingBufferMulti* ring_buffer_;
+ size_t read_idx_;
+ size_t entry_count_;
+
+ mutable Entry entry_;
+ Status iteration_status_;
+ };
+
+ using element_type = const Entry;
+ using value_type = std::remove_cv_t<const Entry>;
+ using pointer = const Entry;
+ using reference = const Entry&;
+ using const_iterator = iterator; // Standard alias for iterable types.
+
+ iterator begin() { return iterator(GetSlowestReaderWritable()); }
+ iterator end() { return iterator(); }
+ const_iterator cbegin() { return begin(); }
+ const_iterator cend() { return end(); }
+
// TODO(pwbug/340): Consider changing bool to an enum, to explicitly enumerate
// what this variable means in clients.
PrefixedEntryRingBufferMulti(bool user_preamble = false)
@@ -151,6 +240,19 @@
// INVALID_ARGUMENT - Argument was nullptr, size zero, or too large.
Status SetBuffer(std::span<std::byte> buffer);
+ // Determines if the ring buffer has corrupted entries.
+ //
+ // Precondition: At least one reader must be attached to the ring buffer.
+ // Return values:
+ // OK - No corruption was detected.
+ // DATA_LOSS - Corruption was detected.
+ Status CheckForCorruption() {
+ iterator it = begin();
+ for (; it != end(); ++it) {
+ }
+ return it.status();
+ }
+
// Attach reader to the ring buffer. Readers can only be attached to one
// ring buffer at a time.
//
@@ -223,18 +325,19 @@
// Get the size in bytes of all the current entries in the ring buffer,
// including preamble and data chunk.
- size_t TotalUsedBytes() { return buffer_bytes_ - RawAvailableBytes(); }
+ size_t TotalUsedBytes() const { return buffer_bytes_ - RawAvailableBytes(); }
// Dering the buffer by reordering entries internally in the buffer by
// rotating to have the oldest entry is at the lowest address/index with
- // newest entry at the highest address.
+ // newest entry at the highest address. If no readers are attached, the buffer
+ // is deringed at the current write index.
//
// Return values:
// OK - Buffer data successfully deringed.
- // FAILED_PRECONDITION - Buffer not initialized, or no readers attached.
+ // FAILED_PRECONDITION - Buffer not initialized.
Status Dering();
- protected:
+ private:
// Read the oldest stored data chunk of data from the ring buffer to
// the provided destination std::span. The number of bytes read is written to
// `bytes_read_out`.
@@ -246,17 +349,18 @@
// RESOURCE_EXHAUSTED - Destination data std::span was smaller number of bytes
// than the data size of the data chunk being read. Available destination
// bytes were filled, remaining bytes of the data chunk were ignored.
- Status InternalPeekFront(Reader& reader,
+ Status InternalPeekFront(const Reader& reader,
std::span<std::byte> data,
- size_t* bytes_read_out);
- Status InternalPeekFront(Reader& reader, ReadOutput output);
+ size_t* bytes_read_out) const;
+ Status InternalPeekFront(const Reader& reader, ReadOutput output) const;
// Same as Read but includes the entry's preamble of optional user value and
// the varint of the data size
- Status InternalPeekFrontWithPreamble(Reader& reader,
+ Status InternalPeekFrontWithPreamble(const Reader& reader,
std::span<std::byte> data,
- size_t* bytes_read_out);
- Status InternalPeekFrontWithPreamble(Reader& reader, ReadOutput output);
+ size_t* bytes_read_out) const;
+ Status InternalPeekFrontWithPreamble(const Reader& reader,
+ ReadOutput output) const;
// Pop and discard the oldest stored data chunk of data from the ring buffer.
//
@@ -268,21 +372,30 @@
// Get the size in bytes of the next chunk, not including preamble, to be
// read.
- size_t InternalFrontEntryDataSizeBytes(Reader& reader);
+ size_t InternalFrontEntryDataSizeBytes(const Reader& reader) const;
// Get the size in bytes of the next chunk, including preamble and data
// chunk, to be read.
- size_t InternalFrontEntryTotalSizeBytes(Reader& reader);
+ size_t InternalFrontEntryTotalSizeBytes(const Reader& reader) const;
// Internal version of Read used by all the public interface versions. T
// should be of type ReadOutput.
template <typename T>
- Status InternalRead(Reader& reader,
+ Status InternalRead(const Reader& reader,
T read_output,
bool include_preamble_in_output,
- uint32_t* user_preamble_out = nullptr);
+ uint32_t* user_preamble_out = nullptr) const;
- private:
+ // Dering the buffer by reordering entries internally in the buffer by
+ // rotating to have the oldest entry is at the lowest address/index with
+ // newest entry at the highest address. If no readers are attached, the buffer
+ // is deringed at the current write index.
+ //
+ // Return values:
+ // OK - Buffer data successfully deringed.
+ // FAILED_PRECONDITION - Buffer not initialized.
+ Status InternalDering(Reader& reader);
+
struct EntryInfo {
size_t preamble_bytes;
uint32_t user_preamble;
@@ -302,19 +415,30 @@
// and has at least one entry to pop.
void InternalPopFrontAll();
- // Returns the slowest reader in the list.
+ // Returns a the slowest reader in the list.
//
// Precondition: This function requires that at least one reader is attached.
- Reader& GetSlowestReader();
+ const Reader& GetSlowestReader() const;
+ Reader& GetSlowestReaderWritable() {
+ return const_cast<Reader&>(GetSlowestReader());
+ }
+
+ // Get info struct with the size of the preamble and data chunk for the next
+ // entry to be read. Calls RawFrontEntryInfo and asserts on failure.
+ EntryInfo FrontEntryInfo(const Reader& reader) const;
// Get info struct with the size of the preamble and data chunk for the next
// entry to be read.
- EntryInfo FrontEntryInfo(Reader& reader);
+ //
+ // Returns:
+ // Ok - EntryInfo containing the next entry metadata.
+ // DataLoss - Failed to read the metadata at this location.
+ Result<EntryInfo> RawFrontEntryInfo(size_t source_idx) const;
// Get the raw number of available bytes free in the ring buffer. This is
// not available bytes for data, since there is a variable size preamble for
// each entry.
- size_t RawAvailableBytes();
+ size_t RawAvailableBytes() const;
// Do the basic write of the specified number of bytes starting at the last
// write index of the ring buffer to the destination, handing any wrap-around
@@ -324,9 +448,11 @@
// Do the basic read of the specified number of bytes starting at the given
// index of the ring buffer to the destination, handing any wrap-around of
// the ring buffer. This is basic, raw operation with no safety checks.
- void RawRead(std::byte* destination, size_t source_idx, size_t length_bytes);
+ void RawRead(std::byte* destination,
+ size_t source_idx,
+ size_t length_bytes) const;
- size_t IncrementIndex(size_t index, size_t count);
+ size_t IncrementIndex(size_t index, size_t count) const;
std::byte* buffer_;
size_t buffer_bytes_;