blob: 6ebf249fb791a52e0fe81b62c52c89e6e15b9d28 [file] [log] [blame]
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -08001// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#include "pw_multisink/multisink.h"
15
16#include <cstring>
17
Wyatt Heplerf298de42021-03-19 15:06:36 -070018#include "pw_assert/check.h"
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080019#include "pw_status/try.h"
20#include "pw_varint/varint.h"
21
22namespace pw {
23namespace multisink {
24
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070025void MultiSink::HandleEntry(ConstByteSpan entry) {
26 std::lock_guard lock(lock_);
27 PW_DCHECK_OK(ring_buffer_.PushBack(entry, sequence_id_++));
28 NotifyListeners();
29}
30
31void MultiSink::HandleDropped(uint32_t drop_count) {
32 std::lock_guard lock(lock_);
33 sequence_id_ += drop_count;
34 NotifyListeners();
35}
36
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080037Result<ConstByteSpan> MultiSink::GetEntry(Drain& drain,
38 ByteSpan buffer,
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070039 uint32_t& drop_count_out) {
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080040 size_t bytes_read = 0;
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070041 uint32_t entry_sequence_id = 0;
42 drop_count_out = 0;
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080043
Prashanth Swaminathan76cd4812021-05-17 11:11:56 -070044 std::lock_guard lock(lock_);
45 PW_DCHECK_PTR_EQ(drain.multisink_, this);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080046
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070047 const Status peek_status = drain.reader_.PeekFrontWithPreamble(
48 buffer, entry_sequence_id, bytes_read);
49 if (peek_status.IsOutOfRange()) {
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080050 // If the drain has caught up, report the last handled sequence ID so that
51 // it can still process any dropped entries.
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070052 entry_sequence_id = sequence_id_ - 1;
53 } else if (!peek_status.ok()) {
54 // Exit immediately if the result isn't OK or OUT_OF_RANGE, as the
55 // entry_entry_sequence_id cannot be used for computation. Later invocations
56 // to GetEntry will permit readers to determine how far the sequence ID
57 // moved forward.
58 return peek_status;
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080059 }
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070060
61 // Compute the drop count delta by comparing this entry's sequence ID with the
62 // last sequence ID this drain successfully read.
63 //
64 // The drop count calculation simply computes the difference between the
65 // current and last sequence IDs. Consecutive successful reads will always
66 // differ by one at least, so it is subtracted out. If the read was not
67 // successful, the difference is not adjusted.
68 drop_count_out = entry_sequence_id - drain.last_handled_sequence_id_ -
69 (peek_status.ok() ? 1 : 0);
70 drain.last_handled_sequence_id_ = entry_sequence_id;
71
72 // The Peek above may have failed due to OutOfRange, now that we've set the
73 // drop count see if we should return before attempting to pop.
74 if (peek_status.IsOutOfRange()) {
75 return peek_status;
76 }
77
78 // Success, pop the oldest entry!
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080079 PW_CHECK(drain.reader_.PopFront().ok());
80 return std::as_bytes(buffer.first(bytes_read));
81}
82
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070083void MultiSink::AttachDrain(Drain& drain) {
Prashanth Swaminathan76cd4812021-05-17 11:11:56 -070084 std::lock_guard lock(lock_);
85 PW_DCHECK_PTR_EQ(drain.multisink_, nullptr);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080086 drain.multisink_ = this;
Prashanth Swaminathan099f7162021-07-15 13:42:20 -070087
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070088 PW_CHECK_OK(ring_buffer_.AttachReader(drain.reader_));
Prashanth Swaminathan099f7162021-07-15 13:42:20 -070089 if (&drain == &oldest_entry_drain_) {
90 drain.last_handled_sequence_id_ = sequence_id_ - 1;
91 return;
92 }
93 drain.last_handled_sequence_id_ =
94 oldest_entry_drain_.last_handled_sequence_id_;
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -080095}
96
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -070097void MultiSink::DetachDrain(Drain& drain) {
Prashanth Swaminathan76cd4812021-05-17 11:11:56 -070098 std::lock_guard lock(lock_);
99 PW_DCHECK_PTR_EQ(drain.multisink_, this);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800100 drain.multisink_ = nullptr;
Prashanth Swaminathan3b1536d2021-05-18 14:21:30 -0700101 PW_CHECK_OK(ring_buffer_.DetachReader(drain.reader_),
102 "The drain wasn't already attached.");
103}
104
105void MultiSink::AttachListener(Listener& listener) {
106 std::lock_guard lock(lock_);
107 listeners_.push_back(listener);
108}
109
110void MultiSink::DetachListener(Listener& listener) {
111 std::lock_guard lock(lock_);
112 [[maybe_unused]] bool was_detached = listeners_.remove(listener);
113 PW_DCHECK(was_detached, "The listener was already attached.");
114}
115
116void MultiSink::Clear() {
117 std::lock_guard lock(lock_);
118 ring_buffer_.Clear();
119}
120
121void MultiSink::NotifyListeners() {
122 for (auto& listener : listeners_) {
123 listener.OnNewEntryAvailable();
124 }
125}
126
127Result<ConstByteSpan> MultiSink::Drain::GetEntry(ByteSpan buffer,
128 uint32_t& drop_count_out) {
129 PW_DCHECK_NOTNULL(multisink_);
130 return multisink_->GetEntry(*this, buffer, drop_count_out);
Prashanth Swaminathanf36832a2021-01-27 16:20:32 -0800131}
132
133} // namespace multisink
134} // namespace pw