pw_rpc: Update protocol for server streams

Update the server-to-client RPC packet types so that the packet type
unambiguously indicates whether it is the first or last packet. This is
already the case for client-to-server RPC packet types.

- Have RESPONSE always be the last packet in the stream. For RPCs
  without a server stream, it includes a payload. Remove
  SERVER_STREAM_END.
- Introduce the SERVER_STREAM packet, to parallel the CLIENT_STREAM
  packet.
- Update the server and client code and tests. Test that old-style
  streaming RPCs still work correctly.
- Refactor the duplicate MessageOutput class into a FakeChannelOutput
  used by both Nanopb and raw RPCs.
- In C++, don't encode default-valued payload and status fields.

Change-Id: I218772dad6c2981dda5f032f298ea43ee5e08b4d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/49822
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/responder.cc b/pw_rpc/responder.cc
index 1c20e8c..be9abb7 100644
--- a/pw_rpc/responder.cc
+++ b/pw_rpc/responder.cc
@@ -20,6 +20,30 @@
 #include "pw_rpc/internal/server.h"
 
 namespace pw::rpc::internal {
+namespace {
+
+Packet ResponsePacket(const ServerCall& call,
+                      uint32_t method_id,
+                      Status status) {
+  return Packet(PacketType::RESPONSE,
+                call.channel().id(),
+                call.service().id(),
+                method_id,
+                {},
+                status);
+}
+
+Packet StreamPacket(const ServerCall& call,
+                    uint32_t method_id,
+                    std::span<const std::byte> payload) {
+  return Packet(PacketType::SERVER_STREAM,
+                call.channel().id(),
+                call.service().id(),
+                method_id,
+                payload);
+}
+
+}  // namespace
 
 Responder::Responder(ServerCall& call) : call_(call), state_(kOpen) {
   call_.server().RegisterResponder(*this);
@@ -58,13 +82,8 @@
 
   Close();
 
-  // Send a control packet indicating that the stream (and RPC) has terminated.
-  return call_.channel().Send(Packet(PacketType::SERVER_STREAM_END,
-                                     call_.channel().id(),
-                                     call_.service().id(),
-                                     method().id(),
-                                     {},
-                                     status));
+  // Send a packet indicating that the RPC has terminated.
+  return call_.channel().Send(ResponsePacket(call_, method_id(), status));
 }
 
 std::span<std::byte> Responder::AcquirePayloadBuffer() {
@@ -75,12 +94,13 @@
     response_ = call_.channel().AcquireBuffer();
   }
 
-  return response_.payload(ResponsePacket());
+  return response_.payload(StreamPacket(call_, method_id(), {}));
 }
 
 Status Responder::ReleasePayloadBuffer(std::span<const std::byte> payload) {
   PW_DCHECK(open());
-  return call_.channel().Send(response_, ResponsePacket(payload));
+  return call_.channel().Send(response_,
+                              StreamPacket(call_, method_id(), payload));
 }
 
 Status Responder::ReleasePayloadBuffer() {
@@ -98,12 +118,4 @@
   state_ = kClosed;
 }
 
-Packet Responder::ResponsePacket(std::span<const std::byte> payload) const {
-  return Packet(PacketType::RESPONSE,
-                call_.channel().id(),
-                call_.service().id(),
-                method().id(),
-                payload);
-}
-
 }  // namespace pw::rpc::internal