pw_multisink & pw_ring_buffer: Add Peek APIs

pw_multisink: Add PeekEntry, PopEntry, and CommitEntry APIs.
pw_ring_buffer: Add PeekFrontPreamble to allow a reader peak the user
preamble without the need to copy data. This lets the multisink commit
multiple entries, peeking one at a time to check if it should be popped
without having to get the entry's data.

Change-Id: I2304af3d149c10942e61f2ca65400acfeda978ca
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/56246
Reviewed-by: Ewout van Bekkum <ewout@google.com>
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_multisink/multisink_test.cc b/pw_multisink/multisink_test.cc
index 4cd1c76..9a33486 100644
--- a/pw_multisink/multisink_test.cc
+++ b/pw_multisink/multisink_test.cc
@@ -14,9 +14,12 @@
 
 #include "pw_multisink/multisink.h"
 
+#include <array>
+#include <cstdint>
 #include <optional>
 
 #include "gtest/gtest.h"
+#include "pw_status/status.h"
 
 namespace pw::multisink {
 using Drain = MultiSink::Drain;
@@ -38,6 +41,8 @@
  protected:
   static constexpr std::byte kMessage[] = {
       (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
+  static constexpr std::byte kMessageOther[] = {
+      (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
   static constexpr size_t kMaxDrains = 3;
   static constexpr size_t kMaxListeners = 3;
   static constexpr size_t kEntryBufferSize = 1024;
@@ -45,23 +50,57 @@
 
   MultiSinkTest() : multisink_(buffer_) {}
 
-  void ExpectMessageAndDropCount(Drain& drain,
+  // Expects the peeked or popped message to equal the provided non-empty
+  // message, and the drop count to match. If `expected_message` is empty, the
+  // Pop call status expected is OUT_OF_RANGE.
+  void ExpectMessageAndDropCount(Result<ConstByteSpan>& result,
+                                 uint32_t result_drop_count,
                                  std::optional<ConstByteSpan> expected_message,
                                  uint32_t expected_drop_count) {
-    uint32_t drop_count = 0;
-    Result<ConstByteSpan> result = drain.GetEntry(entry_buffer_, drop_count);
     if (!expected_message.has_value()) {
       EXPECT_EQ(Status::OutOfRange(), result.status());
     } else {
-      ASSERT_TRUE(result.ok());
+      ASSERT_EQ(result.status(), OkStatus());
       if (!expected_message.value().empty()) {
+        ASSERT_FALSE(result.value().empty());
+        ASSERT_EQ(result.value().size_bytes(),
+                  expected_message.value().size_bytes());
         EXPECT_EQ(memcmp(result.value().data(),
                          expected_message.value().data(),
                          expected_message.value().size_bytes()),
                   0);
       }
     }
-    EXPECT_EQ(drop_count, expected_drop_count);
+    EXPECT_EQ(result_drop_count, expected_drop_count);
+  }
+
+  void VerifyPopEntry(Drain& drain,
+                      std::optional<ConstByteSpan> expected_message,
+                      uint32_t expected_drop_count) {
+    uint32_t drop_count = 0;
+    Result<ConstByteSpan> result = drain.PopEntry(entry_buffer_, drop_count);
+    ExpectMessageAndDropCount(
+        result, drop_count, expected_message, expected_drop_count);
+  }
+
+  void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
+                        uint32_t result_drop_count,
+                        std::optional<ConstByteSpan> expected_message,
+                        uint32_t expected_drop_count) {
+    if (peek_result.ok()) {
+      ASSERT_FALSE(peek_result.value().entry().empty());
+      Result<ConstByteSpan> verify_result(peek_result.value().entry());
+      ExpectMessageAndDropCount(verify_result,
+                                result_drop_count,
+                                expected_message,
+                                expected_drop_count);
+      return;
+    }
+    if (expected_message.has_value()) {
+      // Fail since we expected OkStatus.
+      ASSERT_EQ(peek_result.status(), OkStatus());
+    }
+    EXPECT_EQ(Status::OutOfRange(), peek_result.status());
   }
 
   void ExpectNotificationCount(CountingListener& listener,
@@ -84,29 +123,29 @@
 
   // Single entry push and pop.
   ExpectNotificationCount(listeners_[0], 1u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
 
   // Single empty entry push and pop.
   multisink_.HandleEntry(ConstByteSpan());
   ExpectNotificationCount(listeners_[0], 1u);
-  ExpectMessageAndDropCount(drains_[0], ConstByteSpan(), 0u);
+  VerifyPopEntry(drains_[0], ConstByteSpan(), 0u);
 
   // Multiple entries with intermittent drops.
   multisink_.HandleEntry(kMessage);
   multisink_.HandleDropped();
   multisink_.HandleEntry(kMessage);
   ExpectNotificationCount(listeners_[0], 3u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 1u);
 
   // Send drops only.
   multisink_.HandleDropped();
   ExpectNotificationCount(listeners_[0], 1u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 1u);
 
   // Confirm out-of-range if no entries are expected.
   ExpectNotificationCount(listeners_[0], 0u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u);
 }
 
 TEST_F(MultiSinkTest, MultipleDrain) {
@@ -124,20 +163,20 @@
   // Drain one drain entirely.
   ExpectNotificationCount(listeners_[0], 5u);
   ExpectNotificationCount(listeners_[1], 5u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 1u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u);
 
   // Confirm the other drain can be drained separately.
   ExpectNotificationCount(listeners_[0], 0u);
   ExpectNotificationCount(listeners_[1], 0u);
-  ExpectMessageAndDropCount(drains_[1], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[1], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[1], kMessage, 1u);
-  ExpectMessageAndDropCount(drains_[1], std::nullopt, 1u);
-  ExpectMessageAndDropCount(drains_[1], std::nullopt, 0u);
+  VerifyPopEntry(drains_[1], kMessage, 0u);
+  VerifyPopEntry(drains_[1], kMessage, 0u);
+  VerifyPopEntry(drains_[1], kMessage, 1u);
+  VerifyPopEntry(drains_[1], std::nullopt, 1u);
+  VerifyPopEntry(drains_[1], std::nullopt, 0u);
 }
 
 TEST_F(MultiSinkTest, LateDrainRegistration) {
@@ -148,13 +187,13 @@
   multisink_.AttachDrain(drains_[0]);
   multisink_.AttachListener(listeners_[0]);
   ExpectNotificationCount(listeners_[0], 0u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], {}, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u);
 
   multisink_.HandleEntry(kMessage);
   ExpectNotificationCount(listeners_[0], 1u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u);
 }
 
 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
@@ -168,7 +207,7 @@
 
   // Drain out one message and detach it.
   ExpectNotificationCount(listeners_[0], 4u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
+  VerifyPopEntry(drains_[0], kMessage, 1u);
   multisink_.DetachDrain(drains_[0]);
   multisink_.DetachListener(listeners_[0]);
 
@@ -178,14 +217,14 @@
   multisink_.AttachDrain(drains_[0]);
   multisink_.AttachListener(listeners_[0]);
   ExpectNotificationCount(listeners_[0], 0u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 1u);
+  VerifyPopEntry(drains_[0], kMessage, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u);
 
   multisink_.HandleEntry(kMessage);
   ExpectNotificationCount(listeners_[0], 1u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u);
 }
 
 TEST_F(MultiSinkTest, TooSmallBuffer) {
@@ -196,15 +235,13 @@
   multisink_.HandleDropped();
   multisink_.HandleEntry(kMessage);
 
-  // Attempting to acquire an entry should result in RESOURCE_EXHAUSTED.
+  // Attempting to acquire an entry with a small buffer should result in
+  // RESOURCE_EXHAUSTED and remove it.
   Result<ConstByteSpan> result =
-      drains_[0].GetEntry(std::span(entry_buffer_, 1), drop_count);
+      drains_[0].PopEntry(std::span(entry_buffer_, 1), drop_count);
   EXPECT_EQ(result.status(), Status::ResourceExhausted());
 
-  // Verify that the multisink does not move the handled sequence ID counter
-  // forward and provides this data on the next call.
-  ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
-  ExpectMessageAndDropCount(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 2u);
 }
 
 TEST_F(MultiSinkTest, Iterator) {
@@ -215,9 +252,9 @@
   multisink_.HandleEntry(kMessage);
   multisink_.HandleEntry(kMessage);
 
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
-  ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u);
 
   // Confirm that the iterator still observes the messages in the ring buffer.
   size_t iterated_entries = 0;
@@ -253,4 +290,59 @@
   EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
 }
 
+TEST_F(MultiSinkTest, PeekEntryNoEntries) {
+  multisink_.AttachDrain(drains_[0]);
+
+  // Peek empty multisink.
+  uint32_t drop_count = 0;
+  auto peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(peek_result, drop_count, std::nullopt, 0);
+}
+
+TEST_F(MultiSinkTest, PeekAndPop) {
+  multisink_.AttachDrain(drains_[0]);
+  multisink_.AttachDrain(drains_[1]);
+
+  // Peek entry after multisink has some entries.
+  multisink_.HandleEntry(kMessage);
+  multisink_.HandleEntry(kMessageOther);
+  uint32_t drop_count = 0;
+  auto first_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(first_peek_result, drop_count, kMessage, 0);
+
+  // Multiple peeks must return the front message.
+  auto peek_duplicate = drains_[0].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(peek_duplicate, drop_count, kMessage, 0);
+  // A second drain must peek the front message.
+  auto peek_other_drain = drains_[1].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(peek_other_drain, drop_count, kMessage, 0);
+
+  // After a drain pops a peeked entry, the next peek call must return the next
+  // message.
+  ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
+  auto second_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(second_peek_result, drop_count, kMessageOther, 0);
+  // Slower readers must be unchanged.
+  auto peek_other_drain_duplicate =
+      drains_[1].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(peek_other_drain_duplicate, drop_count, kMessage, 0);
+
+  // PopEntry prior to popping the previously peeked entry.
+  VerifyPopEntry(drains_[0], kMessageOther, 0);
+  // Popping an entry already handled must not trigger errors.
+  ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
+  // Popping with an old peek context must not trigger errors.
+  ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
+
+  // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
+  VerifyPopEntry(drains_[0], std::nullopt, 0);
+  auto empty_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(empty_peek_result, drop_count, std::nullopt, 0);
+
+  // // Slower readers must be unchanged.
+  auto peek_other_drain_unchanged =
+      drains_[1].PeekEntry(entry_buffer_, drop_count);
+  VerifyPeekResult(peek_other_drain_unchanged, drop_count, kMessage, 0);
+}
+
 }  // namespace pw::multisink