pw_stream: add Reader class API
Add stream::Reader class for reading from a stream source.
Add MemoryReader class for Reader on a span.
Modify Writer API and add additional commenting for Writer
Doc update is not included, pending review of API changes.
Change-Id: I413654cead54caaabeafe105c7cef1ccf331f6eb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/14562
Reviewed-by: Armando Montanez <amontanez@google.com>
Commit-Queue: David Rogers <davidrogers@google.com>
diff --git a/pw_stream/BUILD b/pw_stream/BUILD
index 1f3244a..87e639d 100644
--- a/pw_stream/BUILD
+++ b/pw_stream/BUILD
@@ -34,6 +34,13 @@
"buffered_stream.cc",
],
includes = ["public"],
+ deps = [
+ "//pw_assert",
+ "//pw_bytes",
+ "//pw_result",
+ "//pw_span",
+ "//pw_status",
+ ],
)
pw_cc_test(
@@ -56,4 +63,4 @@
":pw_stream",
"//pw_unit_test",
],
-)
\ No newline at end of file
+)
diff --git a/pw_stream/BUILD.gn b/pw_stream/BUILD.gn
index dfb5c57..d5d0ac6 100644
--- a/pw_stream/BUILD.gn
+++ b/pw_stream/BUILD.gn
@@ -31,7 +31,8 @@
sources = [ "memory_stream.cc" ]
public_deps = [
dir_pw_assert,
- dir_pw_preprocessor,
+ dir_pw_bytes,
+ dir_pw_result,
dir_pw_span,
dir_pw_status,
]
diff --git a/pw_stream/memory_stream.cc b/pw_stream/memory_stream.cc
index ca7e276..b0a9c36 100644
--- a/pw_stream/memory_stream.cc
+++ b/pw_stream/memory_stream.cc
@@ -21,16 +21,33 @@
namespace pw::stream {
-Status MemoryWriter::DoWrite(std::span<const std::byte> data) {
- size_t bytes_to_write =
- std::min(data.size_bytes(), dest_.size_bytes() - bytes_written_);
+Status MemoryWriter::DoWrite(ConstByteSpan data) {
+ if (ConservativeWriteLimit() == 0) {
+ return Status::OUT_OF_RANGE;
+ }
+ if (ConservativeWriteLimit() < data.size_bytes()) {
+ return Status::RESOURCE_EXHAUSTED;
+ }
+
+ size_t bytes_to_write = data.size_bytes();
std::memcpy(dest_.data() + bytes_written_, data.data(), bytes_to_write);
bytes_written_ += bytes_to_write;
- if (bytes_to_write != data.size_bytes()) {
- return Status::RESOURCE_EXHAUSTED;
- }
return Status::OK;
}
-} // namespace pw::stream
\ No newline at end of file
+StatusWithSize MemoryReader::DoRead(ByteSpan dest) {
+ if (source_.size_bytes() == bytes_read_) {
+ return StatusWithSize::OUT_OF_RANGE;
+ }
+
+ size_t bytes_to_read =
+ std::min(dest.size_bytes(), source_.size_bytes() - bytes_read_);
+
+ std::memcpy(dest.data(), source_.data() + bytes_read_, bytes_to_read);
+ bytes_read_ += bytes_to_read;
+
+ return StatusWithSize(bytes_to_read);
+}
+
+} // namespace pw::stream
diff --git a/pw_stream/memory_stream_test.cc b/pw_stream/memory_stream_test.cc
index 2aa3e67..1a40d65 100644
--- a/pw_stream/memory_stream_test.cc
+++ b/pw_stream/memory_stream_test.cc
@@ -38,9 +38,9 @@
EXPECT_EQ(memory_writer.bytes_written(), 0u);
Status status =
memory_writer.Write(&kExpectedStruct, sizeof(kExpectedStruct));
- EXPECT_TRUE(status.ok());
+ EXPECT_EQ(status, Status::OK);
EXPECT_EQ(memory_writer.bytes_written(), sizeof(kExpectedStruct));
-}
+} // namespace
TEST(MemoryWriter, ValidateContents) {
MemoryWriter memory_writer(memory_buffer);
@@ -57,22 +57,25 @@
TEST(MemoryWriter, MultipleWrites) {
constexpr size_t kTempBufferSize = 72;
std::byte buffer[kTempBufferSize] = {};
- size_t counter = 0;
+ for (std::byte& value : memory_buffer) {
+ value = std::byte(0);
+ }
MemoryWriter memory_writer(memory_buffer);
- do {
+ size_t counter = 0;
+ while (memory_writer.ConservativeWriteLimit() >= kTempBufferSize) {
for (size_t i = 0; i < sizeof(buffer); ++i) {
buffer[i] = std::byte(counter++);
}
- } while (memory_writer.Write(std::span(buffer)) !=
- Status::RESOURCE_EXHAUSTED);
+ EXPECT_EQ(memory_writer.Write(std::span(buffer)), Status::OK);
+ }
- // Ensure that we counted up to at least the sink buffer size. This can be
- // more since we write to the sink via in intermediate buffer.
- EXPECT_GE(counter, kSinkBufferSize);
+ EXPECT_GT(memory_writer.ConservativeWriteLimit(), 0u);
+ EXPECT_LT(memory_writer.ConservativeWriteLimit(), kTempBufferSize);
- EXPECT_EQ(memory_writer.bytes_written(), kSinkBufferSize);
+ EXPECT_EQ(memory_writer.Write(std::span(buffer)), Status::RESOURCE_EXHAUSTED);
+ EXPECT_EQ(memory_writer.bytes_written(), counter);
counter = 0;
for (const std::byte& value : memory_writer.WrittenData()) {
@@ -80,11 +83,39 @@
}
}
+TEST(MemoryWriter, FullWriter) {
+ constexpr size_t kTempBufferSize = 32;
+ std::byte buffer[kTempBufferSize] = {};
+ const int fill_byte = 0x25;
+ memset(buffer, fill_byte, sizeof(buffer));
+
+ for (std::byte& value : memory_buffer) {
+ value = std::byte(0);
+ }
+ MemoryWriter memory_writer(memory_buffer);
+
+ while (memory_writer.ConservativeWriteLimit() > 0) {
+ size_t bytes_to_write =
+ std::min(sizeof(buffer), memory_writer.ConservativeWriteLimit());
+ EXPECT_EQ(memory_writer.Write(std::span(buffer, bytes_to_write)),
+ Status::OK);
+ }
+
+ EXPECT_EQ(memory_writer.ConservativeWriteLimit(), 0u);
+
+ EXPECT_EQ(memory_writer.Write(std::span(buffer)), Status::OUT_OF_RANGE);
+ EXPECT_EQ(memory_writer.bytes_written(), memory_buffer.size());
+
+ for (const std::byte& value : memory_writer.WrittenData()) {
+ EXPECT_EQ(value, std::byte(fill_byte));
+ }
+}
+
TEST(MemoryWriter, EmptyData) {
std::byte buffer[5] = {};
MemoryWriter memory_writer(memory_buffer);
- EXPECT_TRUE(memory_writer.Write(buffer, 0).ok());
+ EXPECT_EQ(memory_writer.Write(buffer, 0), Status::OK);
EXPECT_EQ(memory_writer.bytes_written(), 0u);
}
@@ -110,4 +141,4 @@
#endif // CHECK_TEST_CRASHES
} // namespace
-} // namespace pw::stream
\ No newline at end of file
+} // namespace pw::stream
diff --git a/pw_stream/public/pw_stream/memory_stream.h b/pw_stream/public/pw_stream/memory_stream.h
index 2dec4a2..c8e0460 100644
--- a/pw_stream/public/pw_stream/memory_stream.h
+++ b/pw_stream/public/pw_stream/memory_stream.h
@@ -17,20 +17,24 @@
#include <cstddef>
#include <span>
+#include "pw_bytes/span.h"
+#include "pw_result/result.h"
#include "pw_stream/stream.h"
namespace pw::stream {
class MemoryWriter : public Writer {
public:
- MemoryWriter(std::span<std::byte> dest) : dest_(dest) {}
+ MemoryWriter(ByteSpan dest) : dest_(dest) {}
size_t bytes_written() const { return bytes_written_; }
- std::span<const std::byte> WrittenData() const {
- return dest_.first(bytes_written_);
+ size_t ConservativeWriteLimit() const override {
+ return dest_.size_bytes() - bytes_written_;
}
+ ConstByteSpan WrittenData() const { return dest_.first(bytes_written_); }
+
const std::byte* data() const { return dest_.data(); }
private:
@@ -38,9 +42,9 @@
//
// If the in-memory buffer is exhausted in the middle of a write, this will
// perform a partial write and Status::RESOURCE_EXHAUSTED will be returned.
- Status DoWrite(std::span<const std::byte> data) override;
+ Status DoWrite(ConstByteSpan data) override;
- std::span<std::byte> dest_;
+ ByteSpan dest_;
size_t bytes_written_ = 0;
};
@@ -53,4 +57,27 @@
std::array<std::byte, size_bytes> buffer_;
};
+class MemoryReader : public Reader {
+ public:
+ MemoryReader(ConstByteSpan source) : source_(source), bytes_read_(0) {}
+
+ size_t ConservativeReadLimit() const override {
+ return source_.size_bytes() - bytes_read_;
+ }
+
+ size_t bytes_read() const { return bytes_read_; }
+
+ const std::byte* data() const { return source_.data(); }
+
+ private:
+ // Implementation for reading data from this stream.
+ //
+ // If the in-memory buffer does not have enough remaining bytes for what was
+ // requested, this will perform a partial read and OK will still be returned.
+ StatusWithSize DoRead(ByteSpan dest) override;
+
+ ConstByteSpan source_;
+ size_t bytes_read_;
+};
+
} // namespace pw::stream
diff --git a/pw_stream/public/pw_stream/stream.h b/pw_stream/public/pw_stream/stream.h
index 85ad410..e497ea3 100644
--- a/pw_stream/public/pw_stream/stream.h
+++ b/pw_stream/public/pw_stream/stream.h
@@ -18,7 +18,10 @@
#include <span>
#include "pw_assert/assert.h"
+#include "pw_bytes/span.h"
+#include "pw_result/result.h"
#include "pw_status/status.h"
+#include "pw_status/status_with_size.h"
namespace pw::stream {
@@ -26,17 +29,37 @@
class Writer {
public:
// There are no requirements around if or how a `Writer` should call Flush()
- // at the time of object destruction. In general, do not assume a `Writer`
- // will automatically call Flush when destroyed.
+ // at the time of object destruction.
virtual ~Writer() = default;
- // Write data to this stream Writer. If the writer runs out of resources, it
- // will return Status::RESOURCE_EXHAUSTED. The derived class choses whether
- // to do partial writes in this case, or abort the entire write operation.
+ // Write data to this stream Writer. Data is
+ // not guaranteed to be fully written out to final resting place on Write
+ // return.
//
- // Derived classes should NOT try to override these public write methods.
+ // If the writer is unable to fully accept the input data size it will abort
+ // the write and return RESOURCE_EXHAUSTED.
+ //
+ // If the writer has been exhausted and is and can no longer accept additional
+ // bytes it will return OUT_OF_RANGE. This is similar to EndOfFile. Write will
+ // only return OUT_OF_RANGE if ConservativeWriteLimit() is and will remain
+ // zero. A Write operation that is successful and also exhausts the writer
+ // returns OK, with all following calls returning OUT_OF_RANGE. When
+ // ConservativeWriteLimit() is greater than zero, a Write that is a number of
+ // bytes beyond what will exhaust the Write will abort and return
+ // RESOURCE_EXHAUSTED rather than OUT_OF_RANGE because the writer is still
+ // able to write bytes.
+ //
+ // Derived classes should NOT try to override the public Write methods.
// Instead, provide an implementation by overriding DoWrite().
- Status Write(std::span<const std::byte> data) {
+ //
+ // Returns: OK, successful write/enqueue of data.
+ // FAILED_PRECONDITION - writer unable/not in state to accept
+ // data.
+ // RESOURCE_EXHAUSTED - unable to write all of requested data at this
+ // time. No data written.
+ // OUT_OF_RANGE - Writer has been exhausted, similar to EOF. No data
+ // written, no more will be written.
+ Status Write(ConstByteSpan data) {
PW_DCHECK(data.empty() || data.data() != nullptr);
return DoWrite(data);
}
@@ -44,15 +67,62 @@
return Write(std::span(static_cast<const std::byte*>(data), size_bytes));
}
Status Write(const std::byte b) { return Write(&b, 1); }
- // Flush any buffered data, finalizing all writes.
- //
- // Generally speaking, the scope that instantiates the concrete `Writer`
- // class should be in charge of calling `Flush()`, and functions that only
- // have access to the Writer interface should avoid calling this function.
- virtual Status Flush() { return Status::OK; }
+
+ // Probable (not guaranteed) minimum number of bytes at this time that can be
+ // written. This number is advisory and not guaranteed to write without a
+ // RESOURCE_EXHAUSTED or OUT_OF_RANGE. As Writer processes/handles enqueued or
+ // other contexts write data this number can go up or down for some Writers.
+ virtual size_t ConservativeWriteLimit() const = 0;
private:
- virtual Status DoWrite(std::span<const std::byte> data) = 0;
+ virtual Status DoWrite(ConstByteSpan data) = 0;
+};
+
+// General-purpose reader interface
+class Reader {
+ public:
+ virtual ~Reader() = default;
+
+ // Read data from this stream Reader. If any number of bytes are read return
+ // OK with a span of the actual byte read.
+ //
+ // If the reader has been exhausted and is and can no longer read additional
+ // bytes it will return OUT_OF_RANGE. This is similar to EndOfFile. Read will
+ // only return OUT_OF_RANGE if ConservativeReadLimit() is and will remain
+ // zero. A Read operation that is successful and also exhausts the reader
+ // returns OK, with all following calls returning OUT_OF_RANGE.
+ //
+ // Derived classes should NOT try to override these public read methods.
+ // Instead, provide an implementation by overriding DoRead().
+ //
+ // Returns: OK with span of bytes read - success, between 1 and
+ // dest.size_bytes() were read.
+ // FAILED_PRECONDITION - Reader unable/not in state to read data.
+ // OUT_OF_RANGE - Reader has been exhausted, similar to EOF. No bytes
+ // read, no more will be read.
+ Result<ByteSpan> Read(ByteSpan dest) {
+ PW_DCHECK(dest.empty() || dest.data() != nullptr);
+ StatusWithSize result = DoRead(dest);
+
+ if (result.ok()) {
+ return dest.first(result.size());
+ } else {
+ return result.status();
+ }
+ }
+ Result<ByteSpan> Read(void* dest, size_t size_bytes) {
+ return Read(std::span(static_cast<std::byte*>(dest), size_bytes));
+ }
+
+ // Probable (not guaranteed) minimum number of bytes at this time that can be
+ // read. This number is advisory and not guaranteed to read full number or
+ // requested bytes or without a OUT_OF_RANGE. As Reader
+ // processes/handles/receives enqueued data or other contexts read data this
+ // number can go up or down for some Readers.
+ virtual size_t ConservativeReadLimit() const = 0;
+
+ private:
+ virtual StatusWithSize DoRead(ByteSpan dest) = 0;
};
} // namespace pw::stream