pw_rpc: Support cancelling streaming RPCs

Introduce new PacketType::CANCEL. Clients send CANCEL to cancel an
ongoing streaming RPC. Servers send CANCEL packets to indicate or
confirm that an RPC was cancelled.

Change-Id: I72e7c5c4859f0c4262565a8c963946e00ec4dd37
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/12462
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/base_server_writer.cc b/pw_rpc/base_server_writer.cc
index 31da1d2..67abc6f 100644
--- a/pw_rpc/base_server_writer.cc
+++ b/pw_rpc/base_server_writer.cc
@@ -43,17 +43,23 @@
   return *this;
 }
 
+uint32_t BaseServerWriter::method_id() const { return call_.method().id(); }
+
 void BaseServerWriter::Finish() {
   if (!open()) {
     return;
   }
 
   call_.server().RemoveWriter(*this);
-
-  // TODO(hepler): Send a control packet indicating that the stream has
-  // terminated.
-
   state_ = kClosed;
+
+  // Send a control packet indicating that the stream has terminated.
+  auto response = call_.channel().AcquireBuffer();
+  call_.channel().Send(response,
+                       Packet(PacketType::CANCEL,
+                              call_.channel().id(),
+                              call_.service().id(),
+                              method().id()));
 }
 
 std::span<std::byte> BaseServerWriter::AcquirePayloadBuffer() {
@@ -62,7 +68,7 @@
   }
 
   response_ = call_.channel().AcquireBuffer();
-  return response_.payload(packet());
+  return response_.payload(RpcPacket());
 }
 
 Status BaseServerWriter::ReleasePayloadBuffer(
@@ -70,10 +76,10 @@
   if (!open()) {
     return Status::FAILED_PRECONDITION;
   }
-  return call_.channel().Send(response_, packet(payload));
+  return call_.channel().Send(response_, RpcPacket(payload));
 }
 
-Packet BaseServerWriter::packet(std::span<const std::byte> payload) const {
+Packet BaseServerWriter::RpcPacket(std::span<const std::byte> payload) const {
   return Packet(PacketType::RPC,
                 call_.channel().id(),
                 call_.service().id(),