pw_rpc: Support cancelling streaming RPCs
Introduce new PacketType::CANCEL. Clients send CANCEL to cancel an
ongoing streaming RPC. Servers send CANCEL packets to indicate or
confirm that an RPC was cancelled.
Change-Id: I72e7c5c4859f0c4262565a8c963946e00ec4dd37
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/12462
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/base_server_writer.cc b/pw_rpc/base_server_writer.cc
index 31da1d2..67abc6f 100644
--- a/pw_rpc/base_server_writer.cc
+++ b/pw_rpc/base_server_writer.cc
@@ -43,17 +43,23 @@
return *this;
}
+uint32_t BaseServerWriter::method_id() const { return call_.method().id(); }
+
void BaseServerWriter::Finish() {
if (!open()) {
return;
}
call_.server().RemoveWriter(*this);
-
- // TODO(hepler): Send a control packet indicating that the stream has
- // terminated.
-
state_ = kClosed;
+
+ // Send a control packet indicating that the stream has terminated.
+ auto response = call_.channel().AcquireBuffer();
+ call_.channel().Send(response,
+ Packet(PacketType::CANCEL,
+ call_.channel().id(),
+ call_.service().id(),
+ method().id()));
}
std::span<std::byte> BaseServerWriter::AcquirePayloadBuffer() {
@@ -62,7 +68,7 @@
}
response_ = call_.channel().AcquireBuffer();
- return response_.payload(packet());
+ return response_.payload(RpcPacket());
}
Status BaseServerWriter::ReleasePayloadBuffer(
@@ -70,10 +76,10 @@
if (!open()) {
return Status::FAILED_PRECONDITION;
}
- return call_.channel().Send(response_, packet(payload));
+ return call_.channel().Send(response_, RpcPacket(payload));
}
-Packet BaseServerWriter::packet(std::span<const std::byte> payload) const {
+Packet BaseServerWriter::RpcPacket(std::span<const std::byte> payload) const {
return Packet(PacketType::RPC,
call_.channel().id(),
call_.service().id(),