pw_rpc: Handle client streams in the server
- Handle client stream messages in the RPC server. Client streaming RPCs
cannot yet be created.
- Add callbacks for errors, client stream messages, and client stream
completion.
- Rename Responder's Finish to CloseAndSendStatus to avoid overlapping
Reader/Writer implementations' Finish() methods.
- Move Nanopb ServerWrite::Write to the .cc file.
Change-Id: Ie4894e6b2fd47a6fc7efdfba58ebeaddff9002e0
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/50362
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/responder.cc b/pw_rpc/responder.cc
index be9abb7..6f4b78c 100644
--- a/pw_rpc/responder.cc
+++ b/pw_rpc/responder.cc
@@ -23,59 +23,69 @@
namespace {
Packet ResponsePacket(const ServerCall& call,
- uint32_t method_id,
+ std::span<const std::byte> payload,
Status status) {
return Packet(PacketType::RESPONSE,
call.channel().id(),
call.service().id(),
- method_id,
- {},
+ call.method().id(),
+ payload,
status);
}
Packet StreamPacket(const ServerCall& call,
- uint32_t method_id,
std::span<const std::byte> payload) {
return Packet(PacketType::SERVER_STREAM,
call.channel().id(),
call.service().id(),
- method_id,
+ call.method().id(),
payload);
}
} // namespace
-Responder::Responder(ServerCall& call) : call_(call), state_(kOpen) {
+Responder::Responder(ServerCall& call, HasClientStream has_client_stream)
+ : call_(call),
+ rpc_state_(kOpen),
+ has_client_stream_(has_client_stream),
+ client_stream_state_(has_client_stream ? kClientStreamOpen
+ : kClientStreamClosed) {
call_.server().RegisterResponder(*this);
}
Responder& Responder::operator=(Responder&& other) {
- Finish();
+ CloseAndSendResponse(OkStatus()).IgnoreError();
- state_ = other.state_;
+ // Move the state variables, which may change when the other client closes.
+ rpc_state_ = other.rpc_state_;
+ has_client_stream_ = other.has_client_stream_;
+ client_stream_state_ = other.client_stream_state_;
if (other.open()) {
- other.call_.server().RemoveResponder(other);
- other.state_ = kClosed;
-
+ other.Close();
other.call_.server().RegisterResponder(*this);
}
+ // Move the rest of the member variables.
call_ = std::move(other.call_);
response_ = std::move(other.response_);
+ on_error_ = std::move(other.on_error_);
+ on_next_ = std::move(other.on_next_);
+ on_client_stream_end_ = std::move(other.on_client_stream_end_);
+
return *this;
}
uint32_t Responder::method_id() const { return call_.method().id(); }
-Status Responder::Finish(Status status) {
+Status Responder::CloseAndSendResponse(Status status) {
if (!open()) {
return Status::FailedPrecondition();
}
- // If the Responder implementer or user forgets to release an acquired
- // buffer before finishing, release it here.
+ // If the Responder implementer or user forgets to release an acquired buffer
+ // before finishing, release it here.
if (!response_.empty()) {
ReleasePayloadBuffer();
}
@@ -83,7 +93,7 @@
Close();
// Send a packet indicating that the RPC has terminated.
- return call_.channel().Send(ResponsePacket(call_, method_id(), status));
+ return call_.channel().Send(ResponsePacket(call_, {}, status));
}
std::span<std::byte> Responder::AcquirePayloadBuffer() {
@@ -94,13 +104,12 @@
response_ = call_.channel().AcquireBuffer();
}
- return response_.payload(StreamPacket(call_, method_id(), {}));
+ return response_.payload(StreamPacket(call_, {}));
}
Status Responder::ReleasePayloadBuffer(std::span<const std::byte> payload) {
PW_DCHECK(open());
- return call_.channel().Send(response_,
- StreamPacket(call_, method_id(), payload));
+ return call_.channel().Send(response_, StreamPacket(call_, payload));
}
Status Responder::ReleasePayloadBuffer() {
@@ -110,12 +119,11 @@
}
void Responder::Close() {
- if (!open()) {
- return;
- }
+ PW_DCHECK(open());
call_.server().RemoveResponder(*this);
- state_ = kClosed;
+ rpc_state_ = kClosed;
+ client_stream_state_ = kClientStreamClosed;
}
} // namespace pw::rpc::internal