blob: 1f5bea91e778343d09532b0aebf3237d1df6b78c [file] [log] [blame]
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -08001// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#include "pw_multisink/multisink.h"
16
Carlos Chinchilla64021a82021-08-06 00:18:41 -070017#include <array>
18#include <cstdint>
Carlos Chinchillad2263422022-03-15 17:47:43 -070019#include <cstring>
Prashanth Swaminathan3eb97d42021-07-13 10:14:38 -070020#include <optional>
Ewout van Bekkum6937a092022-03-04 08:13:37 -080021#include <span>
Armando Montanezfb7b4782021-09-30 21:38:20 -070022#include <string_view>
Prashanth Swaminathan3eb97d42021-07-13 10:14:38 -070023
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080024#include "gtest/gtest.h"
Armando Montanezfb7b4782021-09-30 21:38:20 -070025#include "pw_function/function.h"
Carlos Chinchilla64021a82021-08-06 00:18:41 -070026#include "pw_status/status.h"
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080027
28namespace pw::multisink {
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070029using Drain = MultiSink::Drain;
30using Listener = MultiSink::Listener;
31
32class CountingListener : public Listener {
33 public:
34 void OnNewEntryAvailable() override { notification_count_++; }
35
36 size_t GetNotificationCount() { return notification_count_; }
37
38 void ResetNotificationCount() { notification_count_ = 0; }
39
40 private:
41 size_t notification_count_ = 0;
42};
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080043
44class MultiSinkTest : public ::testing::Test {
45 protected:
46 static constexpr std::byte kMessage[] = {
47 (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
Carlos Chinchilla64021a82021-08-06 00:18:41 -070048 static constexpr std::byte kMessageOther[] = {
49 (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080050 static constexpr size_t kMaxDrains = 3;
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070051 static constexpr size_t kMaxListeners = 3;
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080052 static constexpr size_t kEntryBufferSize = 1024;
53 static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
54
55 MultiSinkTest() : multisink_(buffer_) {}
56
Carlos Chinchilla64021a82021-08-06 00:18:41 -070057 // Expects the peeked or popped message to equal the provided non-empty
58 // message, and the drop count to match. If `expected_message` is empty, the
59 // Pop call status expected is OUT_OF_RANGE.
Carlos Chinchillad2263422022-03-15 17:47:43 -070060 void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
61 uint32_t result_drop_count,
62 uint32_t result_ingress_drop_count,
63 std::optional<ConstByteSpan> expected_message,
64 uint32_t expected_drop_count,
65 uint32_t expected_ingress_drop_count) {
Prashanth Swaminathan3eb97d42021-07-13 10:14:38 -070066 if (!expected_message.has_value()) {
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080067 EXPECT_EQ(Status::OutOfRange(), result.status());
68 } else {
Carlos Chinchilla64021a82021-08-06 00:18:41 -070069 ASSERT_EQ(result.status(), OkStatus());
Prashanth Swaminathan3eb97d42021-07-13 10:14:38 -070070 if (!expected_message.value().empty()) {
Carlos Chinchilla64021a82021-08-06 00:18:41 -070071 ASSERT_FALSE(result.value().empty());
72 ASSERT_EQ(result.value().size_bytes(),
73 expected_message.value().size_bytes());
Prashanth Swaminathan3eb97d42021-07-13 10:14:38 -070074 EXPECT_EQ(memcmp(result.value().data(),
75 expected_message.value().data(),
76 expected_message.value().size_bytes()),
77 0);
78 }
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080079 }
Carlos Chinchilla64021a82021-08-06 00:18:41 -070080 EXPECT_EQ(result_drop_count, expected_drop_count);
Carlos Chinchillad2263422022-03-15 17:47:43 -070081 EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
Carlos Chinchilla64021a82021-08-06 00:18:41 -070082 }
83
84 void VerifyPopEntry(Drain& drain,
85 std::optional<ConstByteSpan> expected_message,
Carlos Chinchillad2263422022-03-15 17:47:43 -070086 uint32_t expected_drop_count,
87 uint32_t expected_ingress_drop_count) {
Carlos Chinchilla64021a82021-08-06 00:18:41 -070088 uint32_t drop_count = 0;
Carlos Chinchillad2263422022-03-15 17:47:43 -070089 uint32_t ingress_drop_count = 0;
90 Result<ConstByteSpan> result =
91 drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
92 ExpectMessageAndDropCounts(result,
93 drop_count,
94 ingress_drop_count,
95 expected_message,
96 expected_drop_count,
97 expected_ingress_drop_count);
Carlos Chinchilla64021a82021-08-06 00:18:41 -070098 }
99
100 void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
101 uint32_t result_drop_count,
Carlos Chinchillad2263422022-03-15 17:47:43 -0700102 uint32_t result_ingress_drop_count,
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700103 std::optional<ConstByteSpan> expected_message,
Carlos Chinchillad2263422022-03-15 17:47:43 -0700104 uint32_t expected_drop_count,
105 uint32_t expected_ingress_drop_count) {
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700106 if (peek_result.ok()) {
107 ASSERT_FALSE(peek_result.value().entry().empty());
108 Result<ConstByteSpan> verify_result(peek_result.value().entry());
Carlos Chinchillad2263422022-03-15 17:47:43 -0700109 ExpectMessageAndDropCounts(verify_result,
110 result_drop_count,
111 result_ingress_drop_count,
112 expected_message,
113 expected_drop_count,
114 expected_ingress_drop_count);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700115 return;
116 }
117 if (expected_message.has_value()) {
118 // Fail since we expected OkStatus.
119 ASSERT_EQ(peek_result.status(), OkStatus());
120 }
121 EXPECT_EQ(Status::OutOfRange(), peek_result.status());
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800122 }
123
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700124 void ExpectNotificationCount(CountingListener& listener,
125 size_t expected_notification_count) {
126 EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
127 listener.ResetNotificationCount();
128 }
129
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800130 std::byte buffer_[kBufferSize];
131 std::byte entry_buffer_[kEntryBufferSize];
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700132 CountingListener listeners_[kMaxListeners];
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800133 Drain drains_[kMaxDrains];
134 MultiSink multisink_;
135};
136
137TEST_F(MultiSinkTest, SingleDrain) {
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700138 multisink_.AttachDrain(drains_[0]);
139 multisink_.AttachListener(listeners_[0]);
Max Koopman696d6862021-09-29 10:48:29 -0700140 ExpectNotificationCount(listeners_[0], 1u);
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700141 multisink_.HandleEntry(kMessage);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800142
143 // Single entry push and pop.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700144 ExpectNotificationCount(listeners_[0], 1u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700145 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
Prashanth Swaminathan3eb97d42021-07-13 10:14:38 -0700146 // Single empty entry push and pop.
147 multisink_.HandleEntry(ConstByteSpan());
148 ExpectNotificationCount(listeners_[0], 1u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700149 VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
Prashanth Swaminathan3eb97d42021-07-13 10:14:38 -0700150
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800151 // Multiple entries with intermittent drops.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700152 multisink_.HandleEntry(kMessage);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800153 multisink_.HandleDropped();
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700154 multisink_.HandleEntry(kMessage);
155 ExpectNotificationCount(listeners_[0], 3u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700156 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
157 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800158
159 // Send drops only.
160 multisink_.HandleDropped();
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700161 ExpectNotificationCount(listeners_[0], 1u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700162 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800163
164 // Confirm out-of-range if no entries are expected.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700165 ExpectNotificationCount(listeners_[0], 0u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700166 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800167}
168
169TEST_F(MultiSinkTest, MultipleDrain) {
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700170 multisink_.AttachDrain(drains_[0]);
171 multisink_.AttachDrain(drains_[1]);
172 multisink_.AttachListener(listeners_[0]);
173 multisink_.AttachListener(listeners_[1]);
Max Koopman696d6862021-09-29 10:48:29 -0700174 ExpectNotificationCount(listeners_[0], 1u);
175 ExpectNotificationCount(listeners_[1], 1u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800176
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700177 multisink_.HandleEntry(kMessage);
178 multisink_.HandleEntry(kMessage);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800179 multisink_.HandleDropped();
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700180 multisink_.HandleEntry(kMessage);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800181 multisink_.HandleDropped();
182
183 // Drain one drain entirely.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700184 ExpectNotificationCount(listeners_[0], 5u);
185 ExpectNotificationCount(listeners_[1], 5u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700186 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
187 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
188 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
189 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
190 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800191
192 // Confirm the other drain can be drained separately.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700193 ExpectNotificationCount(listeners_[0], 0u);
194 ExpectNotificationCount(listeners_[1], 0u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700195 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
196 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
197 VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
198 VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
199 VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800200}
201
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700202TEST_F(MultiSinkTest, LateDrainRegistration) {
Prashanth Swaminathan099f7162021-07-15 13:42:20 -0700203 // Drains attached after entries are pushed should still observe those entries
204 // if they have not been evicted from the ring buffer.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700205 multisink_.HandleEntry(kMessage);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800206
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700207 multisink_.AttachDrain(drains_[0]);
208 multisink_.AttachListener(listeners_[0]);
Max Koopman696d6862021-09-29 10:48:29 -0700209 ExpectNotificationCount(listeners_[0], 1u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700210 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
211 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700212
213 multisink_.HandleEntry(kMessage);
214 ExpectNotificationCount(listeners_[0], 1u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700215 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
216 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800217}
218
219TEST_F(MultiSinkTest, DynamicDrainRegistration) {
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700220 multisink_.AttachDrain(drains_[0]);
221 multisink_.AttachListener(listeners_[0]);
Max Koopman696d6862021-09-29 10:48:29 -0700222 ExpectNotificationCount(listeners_[0], 1u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800223
224 multisink_.HandleDropped();
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700225 multisink_.HandleEntry(kMessage);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800226 multisink_.HandleDropped();
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700227 multisink_.HandleEntry(kMessage);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800228
229 // Drain out one message and detach it.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700230 ExpectNotificationCount(listeners_[0], 4u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700231 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700232 multisink_.DetachDrain(drains_[0]);
233 multisink_.DetachListener(listeners_[0]);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800234
Prashanth Swaminathan099f7162021-07-15 13:42:20 -0700235 // Re-attaching the drain should reproduce the last observed message. Note
236 // that notifications are not expected, nor are drops observed before the
237 // first valid message in the buffer.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700238 multisink_.AttachDrain(drains_[0]);
239 multisink_.AttachListener(listeners_[0]);
Max Koopman696d6862021-09-29 10:48:29 -0700240 ExpectNotificationCount(listeners_[0], 1u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700241 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
242 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
243 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800244
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700245 multisink_.HandleEntry(kMessage);
246 ExpectNotificationCount(listeners_[0], 1u);
Carlos Chinchillad2263422022-03-15 17:47:43 -0700247 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
248 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800249}
250
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700251TEST_F(MultiSinkTest, TooSmallBuffer) {
252 multisink_.AttachDrain(drains_[0]);
253
254 // Insert an entry and a drop, then try to read into an insufficient buffer.
255 uint32_t drop_count = 0;
Carlos Chinchillad2263422022-03-15 17:47:43 -0700256 uint32_t ingress_drop_count = 0;
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700257 multisink_.HandleDropped();
258 multisink_.HandleEntry(kMessage);
259
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700260 // Attempting to acquire an entry with a small buffer should result in
261 // RESOURCE_EXHAUSTED and remove it.
Carlos Chinchillad2263422022-03-15 17:47:43 -0700262 Result<ConstByteSpan> result = drains_[0].PopEntry(
263 std::span(entry_buffer_, 1), drop_count, ingress_drop_count);
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700264 EXPECT_EQ(result.status(), Status::ResourceExhausted());
265
Carlos Chinchillad2263422022-03-15 17:47:43 -0700266 VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700267}
268
Prashanth Swaminathan16541e72021-06-21 09:22:25 -0700269TEST_F(MultiSinkTest, Iterator) {
270 multisink_.AttachDrain(drains_[0]);
271
272 // Insert entries and consume them all.
273 multisink_.HandleEntry(kMessage);
274 multisink_.HandleEntry(kMessage);
275 multisink_.HandleEntry(kMessage);
276
Carlos Chinchillad2263422022-03-15 17:47:43 -0700277 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
278 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
279 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
Prashanth Swaminathan16541e72021-06-21 09:22:25 -0700280
281 // Confirm that the iterator still observes the messages in the ring buffer.
282 size_t iterated_entries = 0;
283 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
284 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
285 iterated_entries++;
286 }
287 EXPECT_EQ(iterated_entries, 3u);
288}
289
290TEST_F(MultiSinkTest, IteratorNoDrains) {
291 // Insert entries with no drains attached. Even though there are no consumers,
292 // iterators should still walk from the oldest entry.
293 multisink_.HandleEntry(kMessage);
294 multisink_.HandleEntry(kMessage);
295 multisink_.HandleEntry(kMessage);
296
297 // Confirm that the iterator still observes the messages in the ring buffer.
298 size_t iterated_entries = 0;
299 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
300 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
301 iterated_entries++;
302 }
303 EXPECT_EQ(iterated_entries, 3u);
304}
305
306TEST_F(MultiSinkTest, IteratorNoEntries) {
307 // Attach a drain, but don't add any entries.
308 multisink_.AttachDrain(drains_[0]);
309 // Confirm that the iterator has no entries.
310 MultiSink::UnsafeIterationWrapper unsafe_iterator =
311 multisink_.UnsafeIteration();
312 EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
313}
314
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700315TEST_F(MultiSinkTest, PeekEntryNoEntries) {
316 multisink_.AttachDrain(drains_[0]);
317
318 // Peek empty multisink.
319 uint32_t drop_count = 0;
Carlos Chinchillad2263422022-03-15 17:47:43 -0700320 uint32_t ingress_drop_count = 0;
321 auto peek_result =
322 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
323 VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700324}
325
326TEST_F(MultiSinkTest, PeekAndPop) {
327 multisink_.AttachDrain(drains_[0]);
328 multisink_.AttachDrain(drains_[1]);
329
330 // Peek entry after multisink has some entries.
331 multisink_.HandleEntry(kMessage);
332 multisink_.HandleEntry(kMessageOther);
333 uint32_t drop_count = 0;
Carlos Chinchillad2263422022-03-15 17:47:43 -0700334 uint32_t ingress_drop_count = 0;
335 auto first_peek_result =
336 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
337 VerifyPeekResult(
338 first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700339
340 // Multiple peeks must return the front message.
Carlos Chinchillad2263422022-03-15 17:47:43 -0700341 auto peek_duplicate =
342 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
343 VerifyPeekResult(
344 peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700345 // A second drain must peek the front message.
Carlos Chinchillad2263422022-03-15 17:47:43 -0700346 auto peek_other_drain =
347 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
348 VerifyPeekResult(
349 peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700350
351 // After a drain pops a peeked entry, the next peek call must return the next
352 // message.
353 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
Carlos Chinchillad2263422022-03-15 17:47:43 -0700354 auto second_peek_result =
355 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
356 VerifyPeekResult(
357 second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700358 // Slower readers must be unchanged.
359 auto peek_other_drain_duplicate =
Carlos Chinchillad2263422022-03-15 17:47:43 -0700360 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
361 VerifyPeekResult(peek_other_drain_duplicate,
362 drop_count,
363 ingress_drop_count,
364 kMessage,
365 0,
366 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700367
368 // PopEntry prior to popping the previously peeked entry.
Carlos Chinchillad2263422022-03-15 17:47:43 -0700369 VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700370 // Popping an entry already handled must not trigger errors.
371 ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
372 // Popping with an old peek context must not trigger errors.
373 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
374
375 // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
Carlos Chinchillad2263422022-03-15 17:47:43 -0700376 VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
377 auto empty_peek_result =
378 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
379 VerifyPeekResult(
380 empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700381
382 // // Slower readers must be unchanged.
383 auto peek_other_drain_unchanged =
Carlos Chinchillad2263422022-03-15 17:47:43 -0700384 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
385 VerifyPeekResult(peek_other_drain_unchanged,
386 drop_count,
387 ingress_drop_count,
388 kMessage,
389 0,
390 0);
391}
392
393TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
394 multisink_.AttachDrain(drains_[0]);
395
396 // Peek entry after multisink has some entries.
397 multisink_.HandleEntry(kMessage);
398 const uint32_t ingress_drops = 10;
399 multisink_.HandleDropped(ingress_drops);
400
401 uint32_t drop_count = 0;
402 uint32_t ingress_drop_count = 0;
403 auto peek_result1 =
404 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
405 // No drops reported until the drain finds a gap in the sequence IDs.
406 VerifyPeekResult(
407 peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
408
409 // Popping the peeked entry advances the drain, and a new peek will find the
410 // gap in sequence IDs.
411 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
412 auto peek_result2 =
413 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
414 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
415 EXPECT_EQ(drop_count, 0u);
416 EXPECT_EQ(ingress_drop_count, ingress_drops);
417}
418
419TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
420 multisink_.AttachDrain(drains_[0]);
421
422 // Add entries until buffer is full and drain has to be advanced.
423 // The sequence ID takes 1 byte when less than 128.
424 const size_t max_multisink_messages = 128;
425 const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
426 // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
427 const size_t message_size = buffer_entry_size - 2;
428 std::array<std::byte, message_size> message;
429 std::memset(message.data(), 'a', message.size());
430 for (size_t i = 0; i < max_multisink_messages; ++i) {
431 multisink_.HandleEntry(message);
432 }
433
434 // At this point the buffer is full, but the sequence ID will take 1 more byte
435 // in the preamble, meaning that adding N new entries, drops N + 1 entries.
436 // Account for that offset.
437 const size_t expected_drops = 5;
438 for (size_t i = 1; i < expected_drops; ++i) {
439 multisink_.HandleEntry(message);
440 }
441
442 uint32_t drop_count = 0;
443 uint32_t ingress_drop_count = 0;
444 auto peek_result =
445 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
446 VerifyPeekResult(
447 peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
448}
449
450TEST_F(MultiSinkTest, IngressDropCountOverflow) {
451 multisink_.AttachDrain(drains_[0]);
452
453 // Make drain's last handled drop larger than multisink drop count, which
454 // overflowed.
455 const uint32_t drop_count_close_to_overflow =
456 std::numeric_limits<uint32_t>::max() - 3;
457 multisink_.HandleDropped(drop_count_close_to_overflow);
458 multisink_.HandleEntry(kMessage);
459
460 // Catch up drain's drop count.
461 uint32_t drop_count = 0;
462 uint32_t ingress_drop_count = 0;
463 auto peek_result1 =
464 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
465 VerifyPeekResult(peek_result1,
466 drop_count,
467 ingress_drop_count,
468 kMessage,
469 0,
470 drop_count_close_to_overflow);
471 // Popping the peeked entry advances the drain, and a new peek will find the
472 // gap in sequence IDs.
473 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
474
475 // Overflow multisink's drop count.
476 const uint32_t expected_ingress_drop_count = 10;
477 multisink_.HandleDropped(expected_ingress_drop_count);
478
479 auto peek_result2 =
480 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
481 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
482 EXPECT_EQ(drop_count, 0u);
483 EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
484
485 multisink_.HandleEntry(kMessage);
486 auto peek_result3 =
487 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
488 VerifyPeekResult(
489 peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
490}
491
492TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
493 multisink_.AttachDrain(drains_[0]);
494
495 const uint32_t ingress_drops = 10;
496 multisink_.HandleDropped(ingress_drops);
497 multisink_.HandleEntry(kMessage);
498 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
499
500 // Detaching and attaching drain should report the same drops.
501 multisink_.DetachDrain(drains_[0]);
502 multisink_.AttachDrain(drains_[0]);
503 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
Carlos Chinchilla64021a82021-08-06 00:18:41 -0700504}
505
Armando Montanezfb7b4782021-09-30 21:38:20 -0700506TEST(UnsafeIteration, NoLimit) {
507 constexpr std::array<std::string_view, 5> kExpectedEntries{
508 "one", "two", "three", "four", "five"};
509 std::array<std::byte, 32> buffer;
510 MultiSink multisink(buffer);
511
512 for (std::string_view entry : kExpectedEntries) {
513 multisink.HandleEntry(std::as_bytes(std::span(entry)));
514 }
515
516 size_t entry_count = 0;
517 struct {
518 size_t& entry_count;
519 std::span<const std::string_view> expected_results;
520 } ctx{entry_count, kExpectedEntries};
521 auto cb = [&ctx](ConstByteSpan data) {
522 std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
523 EXPECT_EQ(data.size(), expected_entry.size());
524 const int result =
525 memcmp(data.data(), expected_entry.data(), expected_entry.size());
526 EXPECT_EQ(0, result);
527 ctx.entry_count++;
528 };
529
530 EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
531 EXPECT_EQ(kExpectedEntries.size(), entry_count);
532}
533
534TEST(UnsafeIteration, Subset) {
535 constexpr std::array<std::string_view, 5> kExpectedEntries{
536 "one", "two", "three", "four", "five"};
537 constexpr size_t kStartOffset = 3;
538 constexpr size_t kExpectedEntriesMaxEntries =
539 kExpectedEntries.size() - kStartOffset;
540 std::array<std::byte, 32> buffer;
541 MultiSink multisink(buffer);
542
543 for (std::string_view entry : kExpectedEntries) {
544 multisink.HandleEntry(std::as_bytes(std::span(entry)));
545 }
546
547 size_t entry_count = 0;
548 struct {
549 size_t& entry_count;
550 std::span<const std::string_view> expected_results;
551 } ctx{entry_count, kExpectedEntries};
552 auto cb = [&ctx](ConstByteSpan data) {
553 std::string_view expected_entry =
554 ctx.expected_results[ctx.entry_count + kStartOffset];
555 EXPECT_EQ(data.size(), expected_entry.size());
556 const int result =
557 memcmp(data.data(), expected_entry.data(), expected_entry.size());
558 EXPECT_EQ(0, result);
559 ctx.entry_count++;
560 };
561
562 EXPECT_EQ(
563 OkStatus(),
564 multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
565 EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
566}
567
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800568} // namespace pw::multisink