pw_multisink: Lock multisink operations
Guards all multisink transactions with a lock. The new configuration
option PW_MULTISINK_LOCK_INTERRUPT_SAFE allows the project to select
the type of lock used to guard transactions. By default, it is enabled
and makes use of an interrupt spin-lock. If disabled, a mutex is used
instead.
Change-Id: I71ab2729d130c524da27e0d06beb0c3fdf73d145
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/45720
Commit-Queue: Prashanth Swaminathan <prashanthsw@google.com>
Pigweed-Auto-Submit: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Ewout van Bekkum <ewout@google.com>
diff --git a/pw_multisink/BUILD b/pw_multisink/BUILD
index 11f772c..f7840a2 100644
--- a/pw_multisink/BUILD
+++ b/pw_multisink/BUILD
@@ -29,6 +29,7 @@
"multisink.cc",
],
hdrs = [
+ "public/pw_multisink/config.h",
"public/pw_multisink/drain.h",
"public/pw_multisink/multisink.h",
],
@@ -36,6 +37,9 @@
deps = [
"//pw_assert",
"//pw_bytes",
+ "//pw_sync:interrupt_spin_lock",
+ "//pw_sync:lock_annotations",
+ "//pw_sync:mutex",
"//pw_result",
"//pw_ring_buffer",
"//pw_varint",
diff --git a/pw_multisink/BUILD.gn b/pw_multisink/BUILD.gn
index c8446d0..dac134c 100644
--- a/pw_multisink/BUILD.gn
+++ b/pw_multisink/BUILD.gn
@@ -26,6 +26,7 @@
pw_source_set("pw_multisink") {
public_configs = [ ":default_config" ]
public = [
+ "public/pw_multisink/config.h",
"public/pw_multisink/drain.h",
"public/pw_multisink/multisink.h",
]
@@ -34,6 +35,9 @@
"$dir_pw_result",
"$dir_pw_ring_buffer",
"$dir_pw_status",
+ "$dir_pw_sync:interrupt_spin_lock",
+ "$dir_pw_sync:lock_annotations",
+ "$dir_pw_sync:mutex",
]
deps = [
"$dir_pw_assert",
diff --git a/pw_multisink/docs.rst b/pw_multisink/docs.rst
index cf406f3..7052e14 100644
--- a/pw_multisink/docs.rst
+++ b/pw_multisink/docs.rst
@@ -1,9 +1,25 @@
.. _module-pw_multisink:
-------------
+============
pw_multisink
-------------
+============
This is an module that forwards messages to multiple attached sinks, which
consume messages asynchronously. It is not ready for use and is under
construction.
+Module Configuration Options
+============================
+The following configurations can be adjusted via compile-time configuration
+of this module, see the
+:ref:`module documentation <module-structure-compile-time-configuration>` for
+more details.
+
+.. c:macro:: PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+
+ Whether an interrupt-safe lock is used to guard multisink read/write operations.
+
+ By default, this option is enabled and the multisink uses an interrupt spin-lock
+ to guard its transactions. If disabled, a mutex is used instead.
+
+ Disabling this will alter the entry precondition of the multisink, requiring that
+ it not be called from an interrupt context.
diff --git a/pw_multisink/drain.cc b/pw_multisink/drain.cc
index e4548c5..cc684f1 100644
--- a/pw_multisink/drain.cc
+++ b/pw_multisink/drain.cc
@@ -13,15 +13,19 @@
// the License.
#include "pw_multisink/drain.h"
+#include "pw_assert/check.h"
+
namespace pw {
namespace multisink {
Result<ConstByteSpan> Drain::GetEntry(ByteSpan entry,
uint32_t& drop_count_out) {
+ PW_DCHECK_NOTNULL(multisink_);
uint32_t entry_sequence_id = 0;
drop_count_out = 0;
+
const Result<ConstByteSpan> result =
- MultiSink::GetEntry(*this, entry, entry_sequence_id);
+ multisink_->GetEntry(*this, entry, entry_sequence_id);
// Exit immediately if the result isn't OK or OUT_OF_RANGE, as the
// entry_sequence_id cannot be used for computation. Later invocations to
diff --git a/pw_multisink/multisink.cc b/pw_multisink/multisink.cc
index a956071..c1eca3c 100644
--- a/pw_multisink/multisink.cc
+++ b/pw_multisink/multisink.cc
@@ -28,17 +28,15 @@
uint32_t& sequence_id_out) {
size_t bytes_read = 0;
- // Exit immediately if there's no multisink attached to this drain.
- if (drain.multisink_ == nullptr) {
- return Status::FailedPrecondition();
- }
+ std::lock_guard lock(lock_);
+ PW_DCHECK_PTR_EQ(drain.multisink_, this);
const Status status =
drain.reader_.PeekFrontWithPreamble(buffer, sequence_id_out, bytes_read);
if (status.IsOutOfRange()) {
// If the drain has caught up, report the last handled sequence ID so that
// it can still process any dropped entries.
- sequence_id_out = drain.multisink_->sequence_id_ - 1;
+ sequence_id_out = sequence_id_ - 1;
return status;
}
PW_CHECK(drain.reader_.PopFront().ok());
@@ -46,14 +44,16 @@
}
Status MultiSink::AttachDrain(Drain& drain) {
- PW_DCHECK(drain.multisink_ == nullptr);
+ std::lock_guard lock(lock_);
+ PW_DCHECK_PTR_EQ(drain.multisink_, nullptr);
drain.multisink_ = this;
drain.last_handled_sequence_id_ = sequence_id_ - 1;
return ring_buffer_.AttachReader(drain.reader_);
}
Status MultiSink::DetachDrain(Drain& drain) {
- PW_DCHECK(drain.multisink_ == this);
+ std::lock_guard lock(lock_);
+ PW_DCHECK_PTR_EQ(drain.multisink_, this);
drain.multisink_ = nullptr;
return ring_buffer_.DetachReader(drain.reader_);
}
diff --git a/pw_multisink/public/pw_multisink/config.h b/pw_multisink/public/pw_multisink/config.h
new file mode 100644
index 0000000..98de4a7
--- /dev/null
+++ b/pw_multisink/public/pw_multisink/config.h
@@ -0,0 +1,41 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+// PW_MULTISINK_LOCK_INTERRUPT_SAFE controls whether an interrupt-safe lock is
+// used when reading and writing from the underlying ring-buffer. This is
+// enabled by default, using an interrupt spin-lock instead of a mutex.
+// Disabling this alters the entry precondition of the multisink, requiring that
+// it not be invoked from an interrupt context.
+#if !defined(PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE)
+#define PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE 1
+#endif // !defined(PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE)
+
+#if PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+#include "pw_sync/interrupt_spin_lock.h"
+#else // !PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+#include "pw_sync/mutex.h"
+#endif // PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+
+namespace pw {
+namespace multisink {
+
+#if PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+using LockType = pw::sync::InterruptSpinLock;
+#else // !PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+using LockType = pw::sync::Mutex;
+#endif // PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+
+} // namespace multisink
+} // namespace pw
diff --git a/pw_multisink/public/pw_multisink/multisink.h b/pw_multisink/public/pw_multisink/multisink.h
index 097e4a3..d86ad35 100644
--- a/pw_multisink/public/pw_multisink/multisink.h
+++ b/pw_multisink/public/pw_multisink/multisink.h
@@ -13,10 +13,14 @@
// the License.
#pragma once
+#include <mutex>
+
#include "pw_bytes/span.h"
+#include "pw_multisink/config.h"
#include "pw_result/result.h"
#include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
#include "pw_status/status.h"
+#include "pw_sync/lock_annotations.h"
namespace pw {
namespace multisink {
@@ -25,10 +29,12 @@
// An asynchronous single-writer multi-reader queue that ensures readers can
// poll for dropped message counts, which is useful for logging or similar
// scenarios where readers need to be aware of the input message sequence.
+//
+// This class is thread-safe but NOT IRQ-safe when
+// PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled.
+//
// TODO(pwbug/342): Support notifying readers when the queue is readable,
// rather than requiring them to poll to check for new entries.
-// TODO(pwbug/343): Add thread-safety, separate from the thread-safety work
-// planned for the underlying ring buffer.
class MultiSink {
public:
// Constructs a multisink using a ring buffer backed by the provided buffer.
@@ -42,12 +48,16 @@
// The sequence ID of the multisink will always increment as a result of
// calling HandleEntry, regardless of whether pushing the entry succeeds.
//
+ // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
+ // function must not be called from an interrupt context.
+ //
// Return values:
// Ok - Entry was successfully pushed to the ring buffer.
// InvalidArgument - Size of data to write is zero bytes.
// OutOfRange - Size of data is greater than buffer size.
// FailedPrecondition - Buffer was not initialized.
- Status HandleEntry(ConstByteSpan entry) {
+ Status HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_) {
+ std::lock_guard lock(lock_);
return ring_buffer_.PushBack(entry, sequence_id_++);
}
@@ -56,7 +66,10 @@
// before being sent to the multisink (e.g. the writer failed to encode
// the message). This API increments the sequence ID of the multisink by
// the provided `drop_count`.
- void HandleDropped(uint32_t drop_count = 1) { sequence_id_ += drop_count; }
+ void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_) {
+ std::lock_guard lock(lock_);
+ sequence_id_ += drop_count;
+ }
// Attach a drain to the multisink. Drains may not be associated with more
// than one multisink at a time. Entries pushed before the drain was attached
@@ -66,7 +79,7 @@
// Return values:
// Ok - Drain was successfully attached.
// InvalidArgument - Drain is currently associated with another multisink.
- Status AttachDrain(Drain& drain);
+ Status AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
// Detaches a drain from the multisink. Drains may only be detached if they
// were previously attached to this multisink.
@@ -74,11 +87,14 @@
// Return values:
// Ok - Drain was successfully detached.
// InvalidArgument - Drain is not currently associated with this multisink.
- Status DetachDrain(Drain& drain);
+ Status DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
// Removes all data from the internal buffer. The multisink's sequence ID is
// not modified, so readers may interpret this event as droppping entries.
- void Clear() { ring_buffer_.Clear(); }
+ void Clear() PW_LOCKS_EXCLUDED(lock_) {
+ std::lock_guard lock(lock_);
+ ring_buffer_.Clear();
+ }
protected:
friend Drain;
@@ -94,13 +110,15 @@
// the next available entry.
// DataLoss - An entry was read from the multisink, but did not contains an
// encoded sequence ID.
- static Result<ConstByteSpan> GetEntry(Drain& drain,
- ByteSpan buffer,
- uint32_t& sequence_id_out);
+ Result<ConstByteSpan> GetEntry(Drain& drain,
+ ByteSpan buffer,
+ uint32_t& sequence_id_out)
+ PW_LOCKS_EXCLUDED(lock_);
private:
- ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_;
- uint32_t sequence_id_ = 0;
+ ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
+ uint32_t sequence_id_ PW_GUARDED_BY(lock_);
+ LockType lock_;
};
} // namespace multisink