pw_rpc: Handle client streams in the server

- Handle client stream messages in the RPC server. Client streaming RPCs
  cannot yet be created.
- Add callbacks for errors, client stream messages, and client stream
  completion.
- Rename Responder's Finish to CloseAndSendStatus to avoid overlapping
  Reader/Writer implementations' Finish() methods.
- Move Nanopb ServerWrite::Write to the .cc file.

Change-Id: Ie4894e6b2fd47a6fc7efdfba58ebeaddff9002e0
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/50362
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/responder.cc b/pw_rpc/responder.cc
index be9abb7..6f4b78c 100644
--- a/pw_rpc/responder.cc
+++ b/pw_rpc/responder.cc
@@ -23,59 +23,69 @@
 namespace {
 
 Packet ResponsePacket(const ServerCall& call,
-                      uint32_t method_id,
+                      std::span<const std::byte> payload,
                       Status status) {
   return Packet(PacketType::RESPONSE,
                 call.channel().id(),
                 call.service().id(),
-                method_id,
-                {},
+                call.method().id(),
+                payload,
                 status);
 }
 
 Packet StreamPacket(const ServerCall& call,
-                    uint32_t method_id,
                     std::span<const std::byte> payload) {
   return Packet(PacketType::SERVER_STREAM,
                 call.channel().id(),
                 call.service().id(),
-                method_id,
+                call.method().id(),
                 payload);
 }
 
 }  // namespace
 
-Responder::Responder(ServerCall& call) : call_(call), state_(kOpen) {
+Responder::Responder(ServerCall& call, HasClientStream has_client_stream)
+    : call_(call),
+      rpc_state_(kOpen),
+      has_client_stream_(has_client_stream),
+      client_stream_state_(has_client_stream ? kClientStreamOpen
+                                             : kClientStreamClosed) {
   call_.server().RegisterResponder(*this);
 }
 
 Responder& Responder::operator=(Responder&& other) {
-  Finish();
+  CloseAndSendResponse(OkStatus()).IgnoreError();
 
-  state_ = other.state_;
+  // Move the state variables, which may change when the other client closes.
+  rpc_state_ = other.rpc_state_;
+  has_client_stream_ = other.has_client_stream_;
+  client_stream_state_ = other.client_stream_state_;
 
   if (other.open()) {
-    other.call_.server().RemoveResponder(other);
-    other.state_ = kClosed;
-
+    other.Close();
     other.call_.server().RegisterResponder(*this);
   }
 
+  // Move the rest of the member variables.
   call_ = std::move(other.call_);
   response_ = std::move(other.response_);
 
+  on_error_ = std::move(other.on_error_);
+  on_next_ = std::move(other.on_next_);
+  on_client_stream_end_ = std::move(other.on_client_stream_end_);
+
   return *this;
 }
 
 uint32_t Responder::method_id() const { return call_.method().id(); }
 
-Status Responder::Finish(Status status) {
+Status Responder::CloseAndSendResponse(Status status) {
   if (!open()) {
     return Status::FailedPrecondition();
   }
 
-  // If the Responder implementer or user forgets to release an acquired
-  // buffer before finishing, release it here.
+  // If the Responder implementer or user forgets to release an acquired buffer
+  // before finishing, release it here.
   if (!response_.empty()) {
     ReleasePayloadBuffer();
   }
@@ -83,7 +93,7 @@
   Close();
 
   // Send a packet indicating that the RPC has terminated.
-  return call_.channel().Send(ResponsePacket(call_, method_id(), status));
+  return call_.channel().Send(ResponsePacket(call_, {}, status));
 }
 
 std::span<std::byte> Responder::AcquirePayloadBuffer() {
@@ -94,13 +104,12 @@
     response_ = call_.channel().AcquireBuffer();
   }
 
-  return response_.payload(StreamPacket(call_, method_id(), {}));
+  return response_.payload(StreamPacket(call_, {}));
 }
 
 Status Responder::ReleasePayloadBuffer(std::span<const std::byte> payload) {
   PW_DCHECK(open());
-  return call_.channel().Send(response_,
-                              StreamPacket(call_, method_id(), payload));
+  return call_.channel().Send(response_, StreamPacket(call_, payload));
 }
 
 Status Responder::ReleasePayloadBuffer() {
@@ -110,12 +119,11 @@
 }
 
 void Responder::Close() {
-  if (!open()) {
-    return;
-  }
+  PW_DCHECK(open());
 
   call_.server().RemoveResponder(*this);
-  state_ = kClosed;
+  rpc_state_ = kClosed;
+  client_stream_state_ = kClientStreamClosed;
 }
 
 }  // namespace pw::rpc::internal