pw_multisink: Track & report ingress drops

Track drops of entries that could not be added to the MultiSink and
report this count separate from the drop count when a slow drain is
advanced.

Change-Id: I3f60bc9fd64b2bfc78c6975f5ac0d470ec5b60a7
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/88321
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_multisink/multisink_test.cc b/pw_multisink/multisink_test.cc
index 3076aea..1f5bea9 100644
--- a/pw_multisink/multisink_test.cc
+++ b/pw_multisink/multisink_test.cc
@@ -16,6 +16,7 @@
 
 #include <array>
 #include <cstdint>
+#include <cstring>
 #include <optional>
 #include <span>
 #include <string_view>
@@ -56,10 +57,12 @@
   // Expects the peeked or popped message to equal the provided non-empty
   // message, and the drop count to match. If `expected_message` is empty, the
   // Pop call status expected is OUT_OF_RANGE.
-  void ExpectMessageAndDropCount(Result<ConstByteSpan>& result,
-                                 uint32_t result_drop_count,
-                                 std::optional<ConstByteSpan> expected_message,
-                                 uint32_t expected_drop_count) {
+  void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
+                                  uint32_t result_drop_count,
+                                  uint32_t result_ingress_drop_count,
+                                  std::optional<ConstByteSpan> expected_message,
+                                  uint32_t expected_drop_count,
+                                  uint32_t expected_ingress_drop_count) {
     if (!expected_message.has_value()) {
       EXPECT_EQ(Status::OutOfRange(), result.status());
     } else {
@@ -75,28 +78,40 @@
       }
     }
     EXPECT_EQ(result_drop_count, expected_drop_count);
+    EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
   }
 
   void VerifyPopEntry(Drain& drain,
                       std::optional<ConstByteSpan> expected_message,
-                      uint32_t expected_drop_count) {
+                      uint32_t expected_drop_count,
+                      uint32_t expected_ingress_drop_count) {
     uint32_t drop_count = 0;
-    Result<ConstByteSpan> result = drain.PopEntry(entry_buffer_, drop_count);
-    ExpectMessageAndDropCount(
-        result, drop_count, expected_message, expected_drop_count);
+    uint32_t ingress_drop_count = 0;
+    Result<ConstByteSpan> result =
+        drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
+    ExpectMessageAndDropCounts(result,
+                               drop_count,
+                               ingress_drop_count,
+                               expected_message,
+                               expected_drop_count,
+                               expected_ingress_drop_count);
   }
 
   void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
                         uint32_t result_drop_count,
+                        uint32_t result_ingress_drop_count,
                         std::optional<ConstByteSpan> expected_message,
-                        uint32_t expected_drop_count) {
+                        uint32_t expected_drop_count,
+                        uint32_t expected_ingress_drop_count) {
     if (peek_result.ok()) {
       ASSERT_FALSE(peek_result.value().entry().empty());
       Result<ConstByteSpan> verify_result(peek_result.value().entry());
-      ExpectMessageAndDropCount(verify_result,
-                                result_drop_count,
-                                expected_message,
-                                expected_drop_count);
+      ExpectMessageAndDropCounts(verify_result,
+                                 result_drop_count,
+                                 result_ingress_drop_count,
+                                 expected_message,
+                                 expected_drop_count,
+                                 expected_ingress_drop_count);
       return;
     }
     if (expected_message.has_value()) {
@@ -127,29 +142,28 @@
 
   // Single entry push and pop.
   ExpectNotificationCount(listeners_[0], 1u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
   // Single empty entry push and pop.
   multisink_.HandleEntry(ConstByteSpan());
   ExpectNotificationCount(listeners_[0], 1u);
-  VerifyPopEntry(drains_[0], ConstByteSpan(), 0u);
+  VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
 
   // Multiple entries with intermittent drops.
   multisink_.HandleEntry(kMessage);
   multisink_.HandleDropped();
   multisink_.HandleEntry(kMessage);
   ExpectNotificationCount(listeners_[0], 3u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], kMessage, 1u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
 
   // Send drops only.
   multisink_.HandleDropped();
   ExpectNotificationCount(listeners_[0], 1u);
-  VerifyPopEntry(drains_[0], std::nullopt, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
 
   // Confirm out-of-range if no entries are expected.
   ExpectNotificationCount(listeners_[0], 0u);
-  VerifyPopEntry(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
 }
 
 TEST_F(MultiSinkTest, MultipleDrain) {
@@ -169,20 +183,20 @@
   // Drain one drain entirely.
   ExpectNotificationCount(listeners_[0], 5u);
   ExpectNotificationCount(listeners_[1], 5u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], kMessage, 1u);
-  VerifyPopEntry(drains_[0], std::nullopt, 1u);
-  VerifyPopEntry(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
 
   // Confirm the other drain can be drained separately.
   ExpectNotificationCount(listeners_[0], 0u);
   ExpectNotificationCount(listeners_[1], 0u);
-  VerifyPopEntry(drains_[1], kMessage, 0u);
-  VerifyPopEntry(drains_[1], kMessage, 0u);
-  VerifyPopEntry(drains_[1], kMessage, 1u);
-  VerifyPopEntry(drains_[1], std::nullopt, 1u);
-  VerifyPopEntry(drains_[1], std::nullopt, 0u);
+  VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
+  VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
+  VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
 }
 
 TEST_F(MultiSinkTest, LateDrainRegistration) {
@@ -193,13 +207,13 @@
   multisink_.AttachDrain(drains_[0]);
   multisink_.AttachListener(listeners_[0]);
   ExpectNotificationCount(listeners_[0], 1u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
 
   multisink_.HandleEntry(kMessage);
   ExpectNotificationCount(listeners_[0], 1u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
 }
 
 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
@@ -214,7 +228,7 @@
 
   // Drain out one message and detach it.
   ExpectNotificationCount(listeners_[0], 4u);
-  VerifyPopEntry(drains_[0], kMessage, 1u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
   multisink_.DetachDrain(drains_[0]);
   multisink_.DetachListener(listeners_[0]);
 
@@ -224,14 +238,14 @@
   multisink_.AttachDrain(drains_[0]);
   multisink_.AttachListener(listeners_[0]);
   ExpectNotificationCount(listeners_[0], 1u);
-  VerifyPopEntry(drains_[0], kMessage, 1u);
-  VerifyPopEntry(drains_[0], kMessage, 1u);
-  VerifyPopEntry(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
 
   multisink_.HandleEntry(kMessage);
   ExpectNotificationCount(listeners_[0], 1u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], std::nullopt, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
 }
 
 TEST_F(MultiSinkTest, TooSmallBuffer) {
@@ -239,16 +253,17 @@
 
   // Insert an entry and a drop, then try to read into an insufficient buffer.
   uint32_t drop_count = 0;
+  uint32_t ingress_drop_count = 0;
   multisink_.HandleDropped();
   multisink_.HandleEntry(kMessage);
 
   // Attempting to acquire an entry with a small buffer should result in
   // RESOURCE_EXHAUSTED and remove it.
-  Result<ConstByteSpan> result =
-      drains_[0].PopEntry(std::span(entry_buffer_, 1), drop_count);
+  Result<ConstByteSpan> result = drains_[0].PopEntry(
+      std::span(entry_buffer_, 1), drop_count, ingress_drop_count);
   EXPECT_EQ(result.status(), Status::ResourceExhausted());
 
-  VerifyPopEntry(drains_[0], std::nullopt, 2u);
+  VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
 }
 
 TEST_F(MultiSinkTest, Iterator) {
@@ -259,9 +274,9 @@
   multisink_.HandleEntry(kMessage);
   multisink_.HandleEntry(kMessage);
 
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
-  VerifyPopEntry(drains_[0], kMessage, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+  VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
 
   // Confirm that the iterator still observes the messages in the ring buffer.
   size_t iterated_entries = 0;
@@ -302,8 +317,10 @@
 
   // Peek empty multisink.
   uint32_t drop_count = 0;
-  auto peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(peek_result, drop_count, std::nullopt, 0);
+  uint32_t ingress_drop_count = 0;
+  auto peek_result =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
 }
 
 TEST_F(MultiSinkTest, PeekAndPop) {
@@ -314,42 +331,176 @@
   multisink_.HandleEntry(kMessage);
   multisink_.HandleEntry(kMessageOther);
   uint32_t drop_count = 0;
-  auto first_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(first_peek_result, drop_count, kMessage, 0);
+  uint32_t ingress_drop_count = 0;
+  auto first_peek_result =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(
+      first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
 
   // Multiple peeks must return the front message.
-  auto peek_duplicate = drains_[0].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(peek_duplicate, drop_count, kMessage, 0);
+  auto peek_duplicate =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(
+      peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
   // A second drain must peek the front message.
-  auto peek_other_drain = drains_[1].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(peek_other_drain, drop_count, kMessage, 0);
+  auto peek_other_drain =
+      drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(
+      peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
 
   // After a drain pops a peeked entry, the next peek call must return the next
   // message.
   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
-  auto second_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(second_peek_result, drop_count, kMessageOther, 0);
+  auto second_peek_result =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(
+      second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
   // Slower readers must be unchanged.
   auto peek_other_drain_duplicate =
-      drains_[1].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(peek_other_drain_duplicate, drop_count, kMessage, 0);
+      drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(peek_other_drain_duplicate,
+                   drop_count,
+                   ingress_drop_count,
+                   kMessage,
+                   0,
+                   0);
 
   // PopEntry prior to popping the previously peeked entry.
-  VerifyPopEntry(drains_[0], kMessageOther, 0);
+  VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
   // Popping an entry already handled must not trigger errors.
   ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
   // Popping with an old peek context must not trigger errors.
   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
 
   // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
-  VerifyPopEntry(drains_[0], std::nullopt, 0);
-  auto empty_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(empty_peek_result, drop_count, std::nullopt, 0);
+  VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
+  auto empty_peek_result =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(
+      empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
 
   // // Slower readers must be unchanged.
   auto peek_other_drain_unchanged =
-      drains_[1].PeekEntry(entry_buffer_, drop_count);
-  VerifyPeekResult(peek_other_drain_unchanged, drop_count, kMessage, 0);
+      drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(peek_other_drain_unchanged,
+                   drop_count,
+                   ingress_drop_count,
+                   kMessage,
+                   0,
+                   0);
+}
+
+TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
+  multisink_.AttachDrain(drains_[0]);
+
+  // Peek entry after multisink has some entries.
+  multisink_.HandleEntry(kMessage);
+  const uint32_t ingress_drops = 10;
+  multisink_.HandleDropped(ingress_drops);
+
+  uint32_t drop_count = 0;
+  uint32_t ingress_drop_count = 0;
+  auto peek_result1 =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  // No drops reported until the drain finds a gap in the sequence IDs.
+  VerifyPeekResult(
+      peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
+
+  // Popping the peeked entry advances the drain, and a new peek will find the
+  // gap in sequence IDs.
+  ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
+  auto peek_result2 =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
+  EXPECT_EQ(drop_count, 0u);
+  EXPECT_EQ(ingress_drop_count, ingress_drops);
+}
+
+TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
+  multisink_.AttachDrain(drains_[0]);
+
+  // Add entries until buffer is full and drain has to be advanced.
+  // The sequence ID takes 1 byte when less than 128.
+  const size_t max_multisink_messages = 128;
+  const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
+  // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
+  const size_t message_size = buffer_entry_size - 2;
+  std::array<std::byte, message_size> message;
+  std::memset(message.data(), 'a', message.size());
+  for (size_t i = 0; i < max_multisink_messages; ++i) {
+    multisink_.HandleEntry(message);
+  }
+
+  // At this point the buffer is full, but the sequence ID will take 1 more byte
+  // in the preamble, meaning that adding N new entries, drops N + 1 entries.
+  // Account for that offset.
+  const size_t expected_drops = 5;
+  for (size_t i = 1; i < expected_drops; ++i) {
+    multisink_.HandleEntry(message);
+  }
+
+  uint32_t drop_count = 0;
+  uint32_t ingress_drop_count = 0;
+  auto peek_result =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(
+      peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
+}
+
+TEST_F(MultiSinkTest, IngressDropCountOverflow) {
+  multisink_.AttachDrain(drains_[0]);
+
+  // Make drain's last handled drop larger than multisink drop count, which
+  // overflowed.
+  const uint32_t drop_count_close_to_overflow =
+      std::numeric_limits<uint32_t>::max() - 3;
+  multisink_.HandleDropped(drop_count_close_to_overflow);
+  multisink_.HandleEntry(kMessage);
+
+  // Catch up drain's drop count.
+  uint32_t drop_count = 0;
+  uint32_t ingress_drop_count = 0;
+  auto peek_result1 =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(peek_result1,
+                   drop_count,
+                   ingress_drop_count,
+                   kMessage,
+                   0,
+                   drop_count_close_to_overflow);
+  // Popping the peeked entry advances the drain, and a new peek will find the
+  // gap in sequence IDs.
+  ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
+
+  // Overflow multisink's drop count.
+  const uint32_t expected_ingress_drop_count = 10;
+  multisink_.HandleDropped(expected_ingress_drop_count);
+
+  auto peek_result2 =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
+  EXPECT_EQ(drop_count, 0u);
+  EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
+
+  multisink_.HandleEntry(kMessage);
+  auto peek_result3 =
+      drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+  VerifyPeekResult(
+      peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
+}
+
+TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
+  multisink_.AttachDrain(drains_[0]);
+
+  const uint32_t ingress_drops = 10;
+  multisink_.HandleDropped(ingress_drops);
+  multisink_.HandleEntry(kMessage);
+  VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
+
+  // Detaching and attaching drain should report the same drops.
+  multisink_.DetachDrain(drains_[0]);
+  multisink_.AttachDrain(drains_[0]);
+  VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
 }
 
 TEST(UnsafeIteration, NoLimit) {