pw_stream: add Reader class API

Add stream::Reader class for reading from a stream source.
Add MemoryReader class for Reader on a span.
Modify Writer API and add additional commenting for Writer
Doc update is not included, pending review of API changes.

Change-Id: I413654cead54caaabeafe105c7cef1ccf331f6eb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/14562
Reviewed-by: Armando Montanez <amontanez@google.com>
Commit-Queue: David Rogers <davidrogers@google.com>
diff --git a/pw_stream/BUILD b/pw_stream/BUILD
index 1f3244a..87e639d 100644
--- a/pw_stream/BUILD
+++ b/pw_stream/BUILD
@@ -34,6 +34,13 @@
       "buffered_stream.cc",
     ],
     includes = ["public"],
+    deps = [
+        "//pw_assert",
+        "//pw_bytes",
+        "//pw_result",
+        "//pw_span",
+        "//pw_status",
+    ],
 )
 
 pw_cc_test(
@@ -56,4 +63,4 @@
         ":pw_stream",
         "//pw_unit_test",
     ],
-)
\ No newline at end of file
+)
diff --git a/pw_stream/BUILD.gn b/pw_stream/BUILD.gn
index dfb5c57..d5d0ac6 100644
--- a/pw_stream/BUILD.gn
+++ b/pw_stream/BUILD.gn
@@ -31,7 +31,8 @@
   sources = [ "memory_stream.cc" ]
   public_deps = [
     dir_pw_assert,
-    dir_pw_preprocessor,
+    dir_pw_bytes,
+    dir_pw_result,
     dir_pw_span,
     dir_pw_status,
   ]
diff --git a/pw_stream/memory_stream.cc b/pw_stream/memory_stream.cc
index ca7e276..b0a9c36 100644
--- a/pw_stream/memory_stream.cc
+++ b/pw_stream/memory_stream.cc
@@ -21,16 +21,33 @@
 
 namespace pw::stream {
 
-Status MemoryWriter::DoWrite(std::span<const std::byte> data) {
-  size_t bytes_to_write =
-      std::min(data.size_bytes(), dest_.size_bytes() - bytes_written_);
+Status MemoryWriter::DoWrite(ConstByteSpan data) {
+  if (ConservativeWriteLimit() == 0) {
+    return Status::OUT_OF_RANGE;
+  }
+  if (ConservativeWriteLimit() < data.size_bytes()) {
+    return Status::RESOURCE_EXHAUSTED;
+  }
+
+  size_t bytes_to_write = data.size_bytes();
   std::memcpy(dest_.data() + bytes_written_, data.data(), bytes_to_write);
   bytes_written_ += bytes_to_write;
 
-  if (bytes_to_write != data.size_bytes()) {
-    return Status::RESOURCE_EXHAUSTED;
-  }
   return Status::OK;
 }
 
-}  // namespace pw::stream
\ No newline at end of file
+StatusWithSize MemoryReader::DoRead(ByteSpan dest) {
+  if (source_.size_bytes() == bytes_read_) {
+    return StatusWithSize::OUT_OF_RANGE;
+  }
+
+  size_t bytes_to_read =
+      std::min(dest.size_bytes(), source_.size_bytes() - bytes_read_);
+
+  std::memcpy(dest.data(), source_.data() + bytes_read_, bytes_to_read);
+  bytes_read_ += bytes_to_read;
+
+  return StatusWithSize(bytes_to_read);
+}
+
+}  // namespace pw::stream
diff --git a/pw_stream/memory_stream_test.cc b/pw_stream/memory_stream_test.cc
index 2aa3e67..1a40d65 100644
--- a/pw_stream/memory_stream_test.cc
+++ b/pw_stream/memory_stream_test.cc
@@ -38,9 +38,9 @@
   EXPECT_EQ(memory_writer.bytes_written(), 0u);
   Status status =
       memory_writer.Write(&kExpectedStruct, sizeof(kExpectedStruct));
-  EXPECT_TRUE(status.ok());
+  EXPECT_EQ(status, Status::OK);
   EXPECT_EQ(memory_writer.bytes_written(), sizeof(kExpectedStruct));
-}
+}  // namespace
 
 TEST(MemoryWriter, ValidateContents) {
   MemoryWriter memory_writer(memory_buffer);
@@ -57,22 +57,25 @@
 TEST(MemoryWriter, MultipleWrites) {
   constexpr size_t kTempBufferSize = 72;
   std::byte buffer[kTempBufferSize] = {};
-  size_t counter = 0;
 
+  for (std::byte& value : memory_buffer) {
+    value = std::byte(0);
+  }
   MemoryWriter memory_writer(memory_buffer);
 
-  do {
+  size_t counter = 0;
+  while (memory_writer.ConservativeWriteLimit() >= kTempBufferSize) {
     for (size_t i = 0; i < sizeof(buffer); ++i) {
       buffer[i] = std::byte(counter++);
     }
-  } while (memory_writer.Write(std::span(buffer)) !=
-           Status::RESOURCE_EXHAUSTED);
+    EXPECT_EQ(memory_writer.Write(std::span(buffer)), Status::OK);
+  }
 
-  // Ensure that we counted up to at least the sink buffer size. This can be
-  // more since we write to the sink via in intermediate buffer.
-  EXPECT_GE(counter, kSinkBufferSize);
+  EXPECT_GT(memory_writer.ConservativeWriteLimit(), 0u);
+  EXPECT_LT(memory_writer.ConservativeWriteLimit(), kTempBufferSize);
 
-  EXPECT_EQ(memory_writer.bytes_written(), kSinkBufferSize);
+  EXPECT_EQ(memory_writer.Write(std::span(buffer)), Status::RESOURCE_EXHAUSTED);
+  EXPECT_EQ(memory_writer.bytes_written(), counter);
 
   counter = 0;
   for (const std::byte& value : memory_writer.WrittenData()) {
@@ -80,11 +83,39 @@
   }
 }
 
+TEST(MemoryWriter, FullWriter) {
+  constexpr size_t kTempBufferSize = 32;
+  std::byte buffer[kTempBufferSize] = {};
+  const int fill_byte = 0x25;
+  memset(buffer, fill_byte, sizeof(buffer));
+
+  for (std::byte& value : memory_buffer) {
+    value = std::byte(0);
+  }
+  MemoryWriter memory_writer(memory_buffer);
+
+  while (memory_writer.ConservativeWriteLimit() > 0) {
+    size_t bytes_to_write =
+        std::min(sizeof(buffer), memory_writer.ConservativeWriteLimit());
+    EXPECT_EQ(memory_writer.Write(std::span(buffer, bytes_to_write)),
+              Status::OK);
+  }
+
+  EXPECT_EQ(memory_writer.ConservativeWriteLimit(), 0u);
+
+  EXPECT_EQ(memory_writer.Write(std::span(buffer)), Status::OUT_OF_RANGE);
+  EXPECT_EQ(memory_writer.bytes_written(), memory_buffer.size());
+
+  for (const std::byte& value : memory_writer.WrittenData()) {
+    EXPECT_EQ(value, std::byte(fill_byte));
+  }
+}
+
 TEST(MemoryWriter, EmptyData) {
   std::byte buffer[5] = {};
 
   MemoryWriter memory_writer(memory_buffer);
-  EXPECT_TRUE(memory_writer.Write(buffer, 0).ok());
+  EXPECT_EQ(memory_writer.Write(buffer, 0), Status::OK);
   EXPECT_EQ(memory_writer.bytes_written(), 0u);
 }
 
@@ -110,4 +141,4 @@
 #endif  // CHECK_TEST_CRASHES
 
 }  // namespace
-}  // namespace pw::stream
\ No newline at end of file
+}  // namespace pw::stream
diff --git a/pw_stream/public/pw_stream/memory_stream.h b/pw_stream/public/pw_stream/memory_stream.h
index 2dec4a2..c8e0460 100644
--- a/pw_stream/public/pw_stream/memory_stream.h
+++ b/pw_stream/public/pw_stream/memory_stream.h
@@ -17,20 +17,24 @@
 #include <cstddef>
 #include <span>
 
+#include "pw_bytes/span.h"
+#include "pw_result/result.h"
 #include "pw_stream/stream.h"
 
 namespace pw::stream {
 
 class MemoryWriter : public Writer {
  public:
-  MemoryWriter(std::span<std::byte> dest) : dest_(dest) {}
+  MemoryWriter(ByteSpan dest) : dest_(dest) {}
 
   size_t bytes_written() const { return bytes_written_; }
 
-  std::span<const std::byte> WrittenData() const {
-    return dest_.first(bytes_written_);
+  size_t ConservativeWriteLimit() const override {
+    return dest_.size_bytes() - bytes_written_;
   }
 
+  ConstByteSpan WrittenData() const { return dest_.first(bytes_written_); }
+
   const std::byte* data() const { return dest_.data(); }
 
  private:
@@ -38,9 +42,9 @@
   //
   // If the in-memory buffer is exhausted in the middle of a write, this will
   // perform a partial write and Status::RESOURCE_EXHAUSTED will be returned.
-  Status DoWrite(std::span<const std::byte> data) override;
+  Status DoWrite(ConstByteSpan data) override;
 
-  std::span<std::byte> dest_;
+  ByteSpan dest_;
   size_t bytes_written_ = 0;
 };
 
@@ -53,4 +57,27 @@
   std::array<std::byte, size_bytes> buffer_;
 };
 
+class MemoryReader : public Reader {
+ public:
+  MemoryReader(ConstByteSpan source) : source_(source), bytes_read_(0) {}
+
+  size_t ConservativeReadLimit() const override {
+    return source_.size_bytes() - bytes_read_;
+  }
+
+  size_t bytes_read() const { return bytes_read_; }
+
+  const std::byte* data() const { return source_.data(); }
+
+ private:
+  // Implementation for reading data from this stream.
+  //
+  // If the in-memory buffer does not have enough remaining bytes for what was
+  // requested, this will perform a partial read and OK will still be returned.
+  StatusWithSize DoRead(ByteSpan dest) override;
+
+  ConstByteSpan source_;
+  size_t bytes_read_;
+};
+
 }  // namespace pw::stream
diff --git a/pw_stream/public/pw_stream/stream.h b/pw_stream/public/pw_stream/stream.h
index 85ad410..e497ea3 100644
--- a/pw_stream/public/pw_stream/stream.h
+++ b/pw_stream/public/pw_stream/stream.h
@@ -18,7 +18,10 @@
 #include <span>
 
 #include "pw_assert/assert.h"
+#include "pw_bytes/span.h"
+#include "pw_result/result.h"
 #include "pw_status/status.h"
+#include "pw_status/status_with_size.h"
 
 namespace pw::stream {
 
@@ -26,17 +29,37 @@
 class Writer {
  public:
   // There are no requirements around if or how a `Writer` should call Flush()
-  // at the time of object destruction. In general, do not assume a `Writer`
-  // will automatically call Flush when destroyed.
+  // at the time of object destruction.
   virtual ~Writer() = default;
 
-  // Write data to this stream Writer. If the writer runs out of resources, it
-  // will return Status::RESOURCE_EXHAUSTED. The derived class choses whether
-  // to do partial writes in this case, or abort the entire write operation.
+  // Write data to this stream Writer. Data is
+  // not guaranteed to be fully written out to final resting place on Write
+  // return.
   //
-  // Derived classes should NOT try to override these public write methods.
+  // If the writer is unable to fully accept the input data size it will abort
+  // the write and return RESOURCE_EXHAUSTED.
+  //
+  // If the writer has been exhausted and is and can no longer accept additional
+  // bytes it will return OUT_OF_RANGE. This is similar to EndOfFile. Write will
+  // only return OUT_OF_RANGE if ConservativeWriteLimit() is and will remain
+  // zero. A Write operation that is successful and also exhausts the writer
+  // returns OK, with all following calls returning OUT_OF_RANGE. When
+  // ConservativeWriteLimit() is greater than zero, a Write that is a number of
+  // bytes beyond what will exhaust the Write will abort and return
+  // RESOURCE_EXHAUSTED rather than OUT_OF_RANGE because the writer is still
+  // able to write bytes.
+  //
+  // Derived classes should NOT try to override the public Write methods.
   // Instead, provide an implementation by overriding DoWrite().
-  Status Write(std::span<const std::byte> data) {
+  //
+  // Returns: OK, successful write/enqueue of data.
+  //          FAILED_PRECONDITION - writer unable/not in state to accept
+  //          data.
+  //          RESOURCE_EXHAUSTED - unable to write all of requested data at this
+  //          time. No data written.
+  //          OUT_OF_RANGE - Writer has been exhausted, similar to EOF. No data
+  //          written, no more will be written.
+  Status Write(ConstByteSpan data) {
     PW_DCHECK(data.empty() || data.data() != nullptr);
     return DoWrite(data);
   }
@@ -44,15 +67,62 @@
     return Write(std::span(static_cast<const std::byte*>(data), size_bytes));
   }
   Status Write(const std::byte b) { return Write(&b, 1); }
-  // Flush any buffered data, finalizing all writes.
-  //
-  // Generally speaking, the scope that instantiates the concrete `Writer`
-  // class should be in charge of calling `Flush()`, and functions that only
-  // have access to the Writer interface should avoid calling this function.
-  virtual Status Flush() { return Status::OK; }
+
+  // Probable (not guaranteed) minimum number of bytes at this time that can be
+  // written. This number is advisory and not guaranteed to write without a
+  // RESOURCE_EXHAUSTED or OUT_OF_RANGE. As Writer processes/handles enqueued or
+  // other contexts write data this number can go up or down for some Writers.
+  virtual size_t ConservativeWriteLimit() const = 0;
 
  private:
-  virtual Status DoWrite(std::span<const std::byte> data) = 0;
+  virtual Status DoWrite(ConstByteSpan data) = 0;
+};
+
+// General-purpose reader interface
+class Reader {
+ public:
+  virtual ~Reader() = default;
+
+  // Read data from this stream Reader. If any number of bytes are read return
+  // OK with a span of the actual byte read.
+  //
+  // If the reader has been exhausted and is and can no longer read additional
+  // bytes it will return OUT_OF_RANGE. This is similar to EndOfFile. Read will
+  // only return OUT_OF_RANGE if ConservativeReadLimit() is and will remain
+  // zero. A Read operation that is successful and also exhausts the reader
+  // returns OK, with all following calls returning OUT_OF_RANGE.
+  //
+  // Derived classes should NOT try to override these public read methods.
+  // Instead, provide an implementation by overriding DoRead().
+  //
+  // Returns: OK with span of bytes read - success, between 1 and
+  //          dest.size_bytes() were read.
+  //          FAILED_PRECONDITION - Reader unable/not in state to read data.
+  //          OUT_OF_RANGE - Reader has been exhausted, similar to EOF. No bytes
+  //          read, no more will be read.
+  Result<ByteSpan> Read(ByteSpan dest) {
+    PW_DCHECK(dest.empty() || dest.data() != nullptr);
+    StatusWithSize result = DoRead(dest);
+
+    if (result.ok()) {
+      return dest.first(result.size());
+    } else {
+      return result.status();
+    }
+  }
+  Result<ByteSpan> Read(void* dest, size_t size_bytes) {
+    return Read(std::span(static_cast<std::byte*>(dest), size_bytes));
+  }
+
+  // Probable (not guaranteed) minimum number of bytes at this time that can be
+  // read. This number is advisory and not guaranteed to read full number or
+  // requested bytes or without a OUT_OF_RANGE. As Reader
+  // processes/handles/receives enqueued data or other contexts read data this
+  // number can go up or down for some Readers.
+  virtual size_t ConservativeReadLimit() const = 0;
+
+ private:
+  virtual StatusWithSize DoRead(ByteSpan dest) = 0;
 };
 
 }  // namespace pw::stream