pw_rpc: Update protocol for server streams
Update the server-to-client RPC packet types so that the packet type
unambiguously indicates whether it is the first or last packet. This is
already the case for client-to-server RPC packet types.
- Have RESPONSE always be the last packet in the stream. For RPCs
without a server stream, it includes a payload. Remove
SERVER_STREAM_END.
- Introduce the SERVER_STREAM packet, to parallel the CLIENT_STREAM
packet.
- Update the server and client code and tests. Test that old-style
streaming RPCs still work correctly.
- Refactor the duplicate MessageOutput class into a FakeChannelOutput
used by both Nanopb and raw RPCs.
- In C++, don't encode default-valued payload and status fields.
Change-Id: I218772dad6c2981dda5f032f298ea43ee5e08b4d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/49822
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/responder.cc b/pw_rpc/responder.cc
index 1c20e8c..be9abb7 100644
--- a/pw_rpc/responder.cc
+++ b/pw_rpc/responder.cc
@@ -20,6 +20,30 @@
#include "pw_rpc/internal/server.h"
namespace pw::rpc::internal {
+namespace {
+
+Packet ResponsePacket(const ServerCall& call,
+ uint32_t method_id,
+ Status status) {
+ return Packet(PacketType::RESPONSE,
+ call.channel().id(),
+ call.service().id(),
+ method_id,
+ {},
+ status);
+}
+
+Packet StreamPacket(const ServerCall& call,
+ uint32_t method_id,
+ std::span<const std::byte> payload) {
+ return Packet(PacketType::SERVER_STREAM,
+ call.channel().id(),
+ call.service().id(),
+ method_id,
+ payload);
+}
+
+} // namespace
Responder::Responder(ServerCall& call) : call_(call), state_(kOpen) {
call_.server().RegisterResponder(*this);
@@ -58,13 +82,8 @@
Close();
- // Send a control packet indicating that the stream (and RPC) has terminated.
- return call_.channel().Send(Packet(PacketType::SERVER_STREAM_END,
- call_.channel().id(),
- call_.service().id(),
- method().id(),
- {},
- status));
+ // Send a packet indicating that the RPC has terminated.
+ return call_.channel().Send(ResponsePacket(call_, method_id(), status));
}
std::span<std::byte> Responder::AcquirePayloadBuffer() {
@@ -75,12 +94,13 @@
response_ = call_.channel().AcquireBuffer();
}
- return response_.payload(ResponsePacket());
+ return response_.payload(StreamPacket(call_, method_id(), {}));
}
Status Responder::ReleasePayloadBuffer(std::span<const std::byte> payload) {
PW_DCHECK(open());
- return call_.channel().Send(response_, ResponsePacket(payload));
+ return call_.channel().Send(response_,
+ StreamPacket(call_, method_id(), payload));
}
Status Responder::ReleasePayloadBuffer() {
@@ -98,12 +118,4 @@
state_ = kClosed;
}
-Packet Responder::ResponsePacket(std::span<const std::byte> payload) const {
- return Packet(PacketType::RESPONSE,
- call_.channel().id(),
- call_.service().id(),
- method().id(),
- payload);
-}
-
} // namespace pw::rpc::internal