pw_rpc: Responder & testing improvements

- Fill out remaining server-side Nanopb client & bidirectional streaming
  features and add tests.
- Use MethodType instead of a custom HasClientStream in Responder.
- Expose RawFakeChannelOutput and NanopbFakeChannelOutput for testing
  use.
- Reorganize test utils headers to avoid the need to add "." as an
  include path.

Change-Id: I3dd59ca593bd6ddd311d91a3de7f1b7eb60e3214
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/57881
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
diff --git a/pw_rpc/responder.cc b/pw_rpc/responder.cc
index 5b13a6a..fadf2c9 100644
--- a/pw_rpc/responder.cc
+++ b/pw_rpc/responder.cc
@@ -44,23 +44,22 @@
 
 }  // namespace
 
-Responder::Responder(ServerCall& call, HasClientStream has_client_stream)
+Responder::Responder(const ServerCall& call, MethodType type)
     : call_(call),
       rpc_state_(kOpen),
-      has_client_stream_(has_client_stream),
-      client_stream_state_(has_client_stream ? kClientStreamOpen
-                                             : kClientStreamClosed) {
+      type_(type),
+      client_stream_state_(HasClientStream(type) ? kClientStreamOpen
+                                                 : kClientStreamClosed) {
   call_.server().RegisterResponder(*this);
 }
 
 Responder& Responder::operator=(Responder&& other) {
   // If this RPC was running, complete it before moving in the other RPC.
-  CloseAndSendResponse(OkStatus())
-      .IgnoreError();  // TODO(pwbug/387): Handle Status properly
+  CloseAndSendResponse(OkStatus()).IgnoreError();
 
   // Move the state variables, which may change when the other client closes.
   rpc_state_ = other.rpc_state_;
-  has_client_stream_ = other.has_client_stream_;
+  type_ = other.type_;
   client_stream_state_ = other.client_stream_state_;
 
   if (other.open()) {
@@ -90,17 +89,18 @@
     return Status::FailedPrecondition();
   }
 
-  // Send a packet indicating that the RPC has terminated.
-  Status packet_status =
-      call_.channel().Send(ResponsePacket(call_, response, status));
+  Status packet_status;
 
-  // If the Responder implementer or user forgets to release an acquired buffer
-  // before finishing, release it here.
-  if (!response_.empty()) {
-    ReleasePayloadBuffer()
-        .IgnoreError();  // TODO(pwbug/387): Handle Status properly
+  // Acquire a buffer to use for the outgoing packet if none is available.
+  if (response_.empty()) {
+    response_ = call_.channel().AcquireBuffer();
   }
 
+  // Send a packet indicating that the RPC has terminated and optionally
+  // containing the final payload.
+  packet_status =
+      call_.channel().Send(response_, ResponsePacket(call_, response, status));
+
   Close();
 
   return packet_status;
@@ -117,15 +117,15 @@
   return response_.payload(StreamPacket(call_, {}));
 }
 
-Status Responder::ReleasePayloadBuffer(std::span<const std::byte> payload) {
+Status Responder::SendPayloadBufferClientStream(
+    std::span<const std::byte> payload) {
   PW_DCHECK(open());
   return call_.channel().Send(response_, StreamPacket(call_, payload));
 }
 
-Status Responder::ReleasePayloadBuffer() {
+void Responder::ReleasePayloadBuffer() {
   PW_DCHECK(open());
   call_.channel().Release(response_);
-  return OkStatus();
 }
 
 void Responder::Close() {