Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 1 | // Copyright 2021 The Pigweed Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| 4 | // use this file except in compliance with the License. You may obtain a copy of |
| 5 | // the License at |
| 6 | // |
| 7 | // https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 11 | // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 12 | // License for the specific language governing permissions and limitations under |
| 13 | // the License. |
| 14 | #include "pw_multisink/multisink.h" |
| 15 | |
| 16 | #include <cstring> |
| 17 | |
Wyatt Hepler | f298de4 | 2021-03-19 15:06:36 -0700 | [diff] [blame] | 18 | #include "pw_assert/check.h" |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 19 | #include "pw_status/try.h" |
| 20 | #include "pw_varint/varint.h" |
| 21 | |
| 22 | namespace pw { |
| 23 | namespace multisink { |
| 24 | |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 25 | void MultiSink::HandleEntry(ConstByteSpan entry) { |
| 26 | std::lock_guard lock(lock_); |
| 27 | PW_DCHECK_OK(ring_buffer_.PushBack(entry, sequence_id_++)); |
| 28 | NotifyListeners(); |
| 29 | } |
| 30 | |
| 31 | void MultiSink::HandleDropped(uint32_t drop_count) { |
| 32 | std::lock_guard lock(lock_); |
| 33 | sequence_id_ += drop_count; |
| 34 | NotifyListeners(); |
| 35 | } |
| 36 | |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 37 | Result<ConstByteSpan> MultiSink::GetEntry(Drain& drain, |
| 38 | ByteSpan buffer, |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 39 | uint32_t& drop_count_out) { |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 40 | size_t bytes_read = 0; |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 41 | uint32_t entry_sequence_id = 0; |
| 42 | drop_count_out = 0; |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 43 | |
Prashanth Swaminathan | 76cd481 | 2021-05-17 11:11:56 -0700 | [diff] [blame] | 44 | std::lock_guard lock(lock_); |
| 45 | PW_DCHECK_PTR_EQ(drain.multisink_, this); |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 46 | |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 47 | const Status peek_status = drain.reader_.PeekFrontWithPreamble( |
| 48 | buffer, entry_sequence_id, bytes_read); |
| 49 | if (peek_status.IsOutOfRange()) { |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 50 | // If the drain has caught up, report the last handled sequence ID so that |
| 51 | // it can still process any dropped entries. |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 52 | entry_sequence_id = sequence_id_ - 1; |
| 53 | } else if (!peek_status.ok()) { |
| 54 | // Exit immediately if the result isn't OK or OUT_OF_RANGE, as the |
| 55 | // entry_entry_sequence_id cannot be used for computation. Later invocations |
| 56 | // to GetEntry will permit readers to determine how far the sequence ID |
| 57 | // moved forward. |
| 58 | return peek_status; |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 59 | } |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 60 | |
| 61 | // Compute the drop count delta by comparing this entry's sequence ID with the |
| 62 | // last sequence ID this drain successfully read. |
| 63 | // |
| 64 | // The drop count calculation simply computes the difference between the |
| 65 | // current and last sequence IDs. Consecutive successful reads will always |
| 66 | // differ by one at least, so it is subtracted out. If the read was not |
| 67 | // successful, the difference is not adjusted. |
| 68 | drop_count_out = entry_sequence_id - drain.last_handled_sequence_id_ - |
| 69 | (peek_status.ok() ? 1 : 0); |
| 70 | drain.last_handled_sequence_id_ = entry_sequence_id; |
| 71 | |
| 72 | // The Peek above may have failed due to OutOfRange, now that we've set the |
| 73 | // drop count see if we should return before attempting to pop. |
| 74 | if (peek_status.IsOutOfRange()) { |
| 75 | return peek_status; |
| 76 | } |
| 77 | |
| 78 | // Success, pop the oldest entry! |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 79 | PW_CHECK(drain.reader_.PopFront().ok()); |
| 80 | return std::as_bytes(buffer.first(bytes_read)); |
| 81 | } |
| 82 | |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 83 | void MultiSink::AttachDrain(Drain& drain) { |
Prashanth Swaminathan | 76cd481 | 2021-05-17 11:11:56 -0700 | [diff] [blame] | 84 | std::lock_guard lock(lock_); |
| 85 | PW_DCHECK_PTR_EQ(drain.multisink_, nullptr); |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 86 | drain.multisink_ = this; |
Prashanth Swaminathan | 099f716 | 2021-07-15 13:42:20 -0700 | [diff] [blame] | 87 | |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 88 | PW_CHECK_OK(ring_buffer_.AttachReader(drain.reader_)); |
Prashanth Swaminathan | 099f716 | 2021-07-15 13:42:20 -0700 | [diff] [blame] | 89 | if (&drain == &oldest_entry_drain_) { |
| 90 | drain.last_handled_sequence_id_ = sequence_id_ - 1; |
| 91 | return; |
| 92 | } |
| 93 | drain.last_handled_sequence_id_ = |
| 94 | oldest_entry_drain_.last_handled_sequence_id_; |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 95 | } |
| 96 | |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 97 | void MultiSink::DetachDrain(Drain& drain) { |
Prashanth Swaminathan | 76cd481 | 2021-05-17 11:11:56 -0700 | [diff] [blame] | 98 | std::lock_guard lock(lock_); |
| 99 | PW_DCHECK_PTR_EQ(drain.multisink_, this); |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 100 | drain.multisink_ = nullptr; |
Prashanth Swaminathan | 3b1536d | 2021-05-18 14:21:30 -0700 | [diff] [blame] | 101 | PW_CHECK_OK(ring_buffer_.DetachReader(drain.reader_), |
| 102 | "The drain wasn't already attached."); |
| 103 | } |
| 104 | |
| 105 | void MultiSink::AttachListener(Listener& listener) { |
| 106 | std::lock_guard lock(lock_); |
| 107 | listeners_.push_back(listener); |
| 108 | } |
| 109 | |
| 110 | void MultiSink::DetachListener(Listener& listener) { |
| 111 | std::lock_guard lock(lock_); |
| 112 | [[maybe_unused]] bool was_detached = listeners_.remove(listener); |
| 113 | PW_DCHECK(was_detached, "The listener was already attached."); |
| 114 | } |
| 115 | |
| 116 | void MultiSink::Clear() { |
| 117 | std::lock_guard lock(lock_); |
| 118 | ring_buffer_.Clear(); |
| 119 | } |
| 120 | |
| 121 | void MultiSink::NotifyListeners() { |
| 122 | for (auto& listener : listeners_) { |
| 123 | listener.OnNewEntryAvailable(); |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | Result<ConstByteSpan> MultiSink::Drain::GetEntry(ByteSpan buffer, |
| 128 | uint32_t& drop_count_out) { |
| 129 | PW_DCHECK_NOTNULL(multisink_); |
| 130 | return multisink_->GetEntry(*this, buffer, drop_count_out); |
Prashanth Swaminathan | f36832a | 2021-01-27 16:20:32 -0800 | [diff] [blame] | 131 | } |
| 132 | |
| 133 | } // namespace multisink |
| 134 | } // namespace pw |