pw_multisink: Track & report ingress drops
Track drops of entries that could not be added to the MultiSink and
report this count separate from the drop count when a slow drain is
advanced.
Change-Id: I3f60bc9fd64b2bfc78c6975f5ac0d470ec5b60a7
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/88321
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_multisink/multisink_test.cc b/pw_multisink/multisink_test.cc
index 3076aea..1f5bea9 100644
--- a/pw_multisink/multisink_test.cc
+++ b/pw_multisink/multisink_test.cc
@@ -16,6 +16,7 @@
#include <array>
#include <cstdint>
+#include <cstring>
#include <optional>
#include <span>
#include <string_view>
@@ -56,10 +57,12 @@
// Expects the peeked or popped message to equal the provided non-empty
// message, and the drop count to match. If `expected_message` is empty, the
// Pop call status expected is OUT_OF_RANGE.
- void ExpectMessageAndDropCount(Result<ConstByteSpan>& result,
- uint32_t result_drop_count,
- std::optional<ConstByteSpan> expected_message,
- uint32_t expected_drop_count) {
+ void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
+ uint32_t result_drop_count,
+ uint32_t result_ingress_drop_count,
+ std::optional<ConstByteSpan> expected_message,
+ uint32_t expected_drop_count,
+ uint32_t expected_ingress_drop_count) {
if (!expected_message.has_value()) {
EXPECT_EQ(Status::OutOfRange(), result.status());
} else {
@@ -75,28 +78,40 @@
}
}
EXPECT_EQ(result_drop_count, expected_drop_count);
+ EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
}
void VerifyPopEntry(Drain& drain,
std::optional<ConstByteSpan> expected_message,
- uint32_t expected_drop_count) {
+ uint32_t expected_drop_count,
+ uint32_t expected_ingress_drop_count) {
uint32_t drop_count = 0;
- Result<ConstByteSpan> result = drain.PopEntry(entry_buffer_, drop_count);
- ExpectMessageAndDropCount(
- result, drop_count, expected_message, expected_drop_count);
+ uint32_t ingress_drop_count = 0;
+ Result<ConstByteSpan> result =
+ drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
+ ExpectMessageAndDropCounts(result,
+ drop_count,
+ ingress_drop_count,
+ expected_message,
+ expected_drop_count,
+ expected_ingress_drop_count);
}
void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
uint32_t result_drop_count,
+ uint32_t result_ingress_drop_count,
std::optional<ConstByteSpan> expected_message,
- uint32_t expected_drop_count) {
+ uint32_t expected_drop_count,
+ uint32_t expected_ingress_drop_count) {
if (peek_result.ok()) {
ASSERT_FALSE(peek_result.value().entry().empty());
Result<ConstByteSpan> verify_result(peek_result.value().entry());
- ExpectMessageAndDropCount(verify_result,
- result_drop_count,
- expected_message,
- expected_drop_count);
+ ExpectMessageAndDropCounts(verify_result,
+ result_drop_count,
+ result_ingress_drop_count,
+ expected_message,
+ expected_drop_count,
+ expected_ingress_drop_count);
return;
}
if (expected_message.has_value()) {
@@ -127,29 +142,28 @@
// Single entry push and pop.
ExpectNotificationCount(listeners_[0], 1u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
-
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
// Single empty entry push and pop.
multisink_.HandleEntry(ConstByteSpan());
ExpectNotificationCount(listeners_[0], 1u);
- VerifyPopEntry(drains_[0], ConstByteSpan(), 0u);
+ VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
// Multiple entries with intermittent drops.
multisink_.HandleEntry(kMessage);
multisink_.HandleDropped();
multisink_.HandleEntry(kMessage);
ExpectNotificationCount(listeners_[0], 3u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], kMessage, 1u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
// Send drops only.
multisink_.HandleDropped();
ExpectNotificationCount(listeners_[0], 1u);
- VerifyPopEntry(drains_[0], std::nullopt, 1u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
// Confirm out-of-range if no entries are expected.
ExpectNotificationCount(listeners_[0], 0u);
- VerifyPopEntry(drains_[0], std::nullopt, 0u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
}
TEST_F(MultiSinkTest, MultipleDrain) {
@@ -169,20 +183,20 @@
// Drain one drain entirely.
ExpectNotificationCount(listeners_[0], 5u);
ExpectNotificationCount(listeners_[1], 5u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], kMessage, 1u);
- VerifyPopEntry(drains_[0], std::nullopt, 1u);
- VerifyPopEntry(drains_[0], std::nullopt, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
// Confirm the other drain can be drained separately.
ExpectNotificationCount(listeners_[0], 0u);
ExpectNotificationCount(listeners_[1], 0u);
- VerifyPopEntry(drains_[1], kMessage, 0u);
- VerifyPopEntry(drains_[1], kMessage, 0u);
- VerifyPopEntry(drains_[1], kMessage, 1u);
- VerifyPopEntry(drains_[1], std::nullopt, 1u);
- VerifyPopEntry(drains_[1], std::nullopt, 0u);
+ VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
+ VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
+ VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
}
TEST_F(MultiSinkTest, LateDrainRegistration) {
@@ -193,13 +207,13 @@
multisink_.AttachDrain(drains_[0]);
multisink_.AttachListener(listeners_[0]);
ExpectNotificationCount(listeners_[0], 1u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], std::nullopt, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
multisink_.HandleEntry(kMessage);
ExpectNotificationCount(listeners_[0], 1u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], std::nullopt, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
}
TEST_F(MultiSinkTest, DynamicDrainRegistration) {
@@ -214,7 +228,7 @@
// Drain out one message and detach it.
ExpectNotificationCount(listeners_[0], 4u);
- VerifyPopEntry(drains_[0], kMessage, 1u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
multisink_.DetachDrain(drains_[0]);
multisink_.DetachListener(listeners_[0]);
@@ -224,14 +238,14 @@
multisink_.AttachDrain(drains_[0]);
multisink_.AttachListener(listeners_[0]);
ExpectNotificationCount(listeners_[0], 1u);
- VerifyPopEntry(drains_[0], kMessage, 1u);
- VerifyPopEntry(drains_[0], kMessage, 1u);
- VerifyPopEntry(drains_[0], std::nullopt, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
multisink_.HandleEntry(kMessage);
ExpectNotificationCount(listeners_[0], 1u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], std::nullopt, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
}
TEST_F(MultiSinkTest, TooSmallBuffer) {
@@ -239,16 +253,17 @@
// Insert an entry and a drop, then try to read into an insufficient buffer.
uint32_t drop_count = 0;
+ uint32_t ingress_drop_count = 0;
multisink_.HandleDropped();
multisink_.HandleEntry(kMessage);
// Attempting to acquire an entry with a small buffer should result in
// RESOURCE_EXHAUSTED and remove it.
- Result<ConstByteSpan> result =
- drains_[0].PopEntry(std::span(entry_buffer_, 1), drop_count);
+ Result<ConstByteSpan> result = drains_[0].PopEntry(
+ std::span(entry_buffer_, 1), drop_count, ingress_drop_count);
EXPECT_EQ(result.status(), Status::ResourceExhausted());
- VerifyPopEntry(drains_[0], std::nullopt, 2u);
+ VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
}
TEST_F(MultiSinkTest, Iterator) {
@@ -259,9 +274,9 @@
multisink_.HandleEntry(kMessage);
multisink_.HandleEntry(kMessage);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
- VerifyPopEntry(drains_[0], kMessage, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
+ VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
// Confirm that the iterator still observes the messages in the ring buffer.
size_t iterated_entries = 0;
@@ -302,8 +317,10 @@
// Peek empty multisink.
uint32_t drop_count = 0;
- auto peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(peek_result, drop_count, std::nullopt, 0);
+ uint32_t ingress_drop_count = 0;
+ auto peek_result =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
}
TEST_F(MultiSinkTest, PeekAndPop) {
@@ -314,42 +331,176 @@
multisink_.HandleEntry(kMessage);
multisink_.HandleEntry(kMessageOther);
uint32_t drop_count = 0;
- auto first_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(first_peek_result, drop_count, kMessage, 0);
+ uint32_t ingress_drop_count = 0;
+ auto first_peek_result =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(
+ first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
// Multiple peeks must return the front message.
- auto peek_duplicate = drains_[0].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(peek_duplicate, drop_count, kMessage, 0);
+ auto peek_duplicate =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(
+ peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
// A second drain must peek the front message.
- auto peek_other_drain = drains_[1].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(peek_other_drain, drop_count, kMessage, 0);
+ auto peek_other_drain =
+ drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(
+ peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
// After a drain pops a peeked entry, the next peek call must return the next
// message.
ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
- auto second_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(second_peek_result, drop_count, kMessageOther, 0);
+ auto second_peek_result =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(
+ second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
// Slower readers must be unchanged.
auto peek_other_drain_duplicate =
- drains_[1].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(peek_other_drain_duplicate, drop_count, kMessage, 0);
+ drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(peek_other_drain_duplicate,
+ drop_count,
+ ingress_drop_count,
+ kMessage,
+ 0,
+ 0);
// PopEntry prior to popping the previously peeked entry.
- VerifyPopEntry(drains_[0], kMessageOther, 0);
+ VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
// Popping an entry already handled must not trigger errors.
ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
// Popping with an old peek context must not trigger errors.
ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
// Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
- VerifyPopEntry(drains_[0], std::nullopt, 0);
- auto empty_peek_result = drains_[0].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(empty_peek_result, drop_count, std::nullopt, 0);
+ VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
+ auto empty_peek_result =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(
+ empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
// // Slower readers must be unchanged.
auto peek_other_drain_unchanged =
- drains_[1].PeekEntry(entry_buffer_, drop_count);
- VerifyPeekResult(peek_other_drain_unchanged, drop_count, kMessage, 0);
+ drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(peek_other_drain_unchanged,
+ drop_count,
+ ingress_drop_count,
+ kMessage,
+ 0,
+ 0);
+}
+
+TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
+ multisink_.AttachDrain(drains_[0]);
+
+ // Peek entry after multisink has some entries.
+ multisink_.HandleEntry(kMessage);
+ const uint32_t ingress_drops = 10;
+ multisink_.HandleDropped(ingress_drops);
+
+ uint32_t drop_count = 0;
+ uint32_t ingress_drop_count = 0;
+ auto peek_result1 =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ // No drops reported until the drain finds a gap in the sequence IDs.
+ VerifyPeekResult(
+ peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
+
+ // Popping the peeked entry advances the drain, and a new peek will find the
+ // gap in sequence IDs.
+ ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
+ auto peek_result2 =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
+ EXPECT_EQ(drop_count, 0u);
+ EXPECT_EQ(ingress_drop_count, ingress_drops);
+}
+
+TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
+ multisink_.AttachDrain(drains_[0]);
+
+ // Add entries until buffer is full and drain has to be advanced.
+ // The sequence ID takes 1 byte when less than 128.
+ const size_t max_multisink_messages = 128;
+ const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
+ // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
+ const size_t message_size = buffer_entry_size - 2;
+ std::array<std::byte, message_size> message;
+ std::memset(message.data(), 'a', message.size());
+ for (size_t i = 0; i < max_multisink_messages; ++i) {
+ multisink_.HandleEntry(message);
+ }
+
+ // At this point the buffer is full, but the sequence ID will take 1 more byte
+ // in the preamble, meaning that adding N new entries, drops N + 1 entries.
+ // Account for that offset.
+ const size_t expected_drops = 5;
+ for (size_t i = 1; i < expected_drops; ++i) {
+ multisink_.HandleEntry(message);
+ }
+
+ uint32_t drop_count = 0;
+ uint32_t ingress_drop_count = 0;
+ auto peek_result =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(
+ peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
+}
+
+TEST_F(MultiSinkTest, IngressDropCountOverflow) {
+ multisink_.AttachDrain(drains_[0]);
+
+ // Make drain's last handled drop larger than multisink drop count, which
+ // overflowed.
+ const uint32_t drop_count_close_to_overflow =
+ std::numeric_limits<uint32_t>::max() - 3;
+ multisink_.HandleDropped(drop_count_close_to_overflow);
+ multisink_.HandleEntry(kMessage);
+
+ // Catch up drain's drop count.
+ uint32_t drop_count = 0;
+ uint32_t ingress_drop_count = 0;
+ auto peek_result1 =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(peek_result1,
+ drop_count,
+ ingress_drop_count,
+ kMessage,
+ 0,
+ drop_count_close_to_overflow);
+ // Popping the peeked entry advances the drain, and a new peek will find the
+ // gap in sequence IDs.
+ ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
+
+ // Overflow multisink's drop count.
+ const uint32_t expected_ingress_drop_count = 10;
+ multisink_.HandleDropped(expected_ingress_drop_count);
+
+ auto peek_result2 =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
+ EXPECT_EQ(drop_count, 0u);
+ EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
+
+ multisink_.HandleEntry(kMessage);
+ auto peek_result3 =
+ drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
+ VerifyPeekResult(
+ peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
+}
+
+TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
+ multisink_.AttachDrain(drains_[0]);
+
+ const uint32_t ingress_drops = 10;
+ multisink_.HandleDropped(ingress_drops);
+ multisink_.HandleEntry(kMessage);
+ VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
+
+ // Detaching and attaching drain should report the same drops.
+ multisink_.DetachDrain(drains_[0]);
+ multisink_.AttachDrain(drains_[0]);
+ VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
}
TEST(UnsafeIteration, NoLimit) {