pw_rpc: Rename base_server_writer files
This file rename was done in a separate change from the class rename so
Git would detect these as file moves rather than new files.
Change-Id: I37b6168b142197628b6694736ce0fbe36e44b263
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/49201
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/responder.cc b/pw_rpc/responder.cc
new file mode 100644
index 0000000..1c20e8c
--- /dev/null
+++ b/pw_rpc/responder.cc
@@ -0,0 +1,109 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/internal/responder.h"
+
+#include "pw_assert/check.h"
+#include "pw_rpc/internal/method.h"
+#include "pw_rpc/internal/packet.h"
+#include "pw_rpc/internal/server.h"
+
+namespace pw::rpc::internal {
+
+Responder::Responder(ServerCall& call) : call_(call), state_(kOpen) {
+ call_.server().RegisterResponder(*this);
+}
+
+Responder& Responder::operator=(Responder&& other) {
+ Finish();
+
+ state_ = other.state_;
+
+ if (other.open()) {
+ other.call_.server().RemoveResponder(other);
+ other.state_ = kClosed;
+
+ other.call_.server().RegisterResponder(*this);
+ }
+
+ call_ = std::move(other.call_);
+ response_ = std::move(other.response_);
+
+ return *this;
+}
+
+uint32_t Responder::method_id() const { return call_.method().id(); }
+
+Status Responder::Finish(Status status) {
+ if (!open()) {
+ return Status::FailedPrecondition();
+ }
+
+ // If the Responder implementer or user forgets to release an acquired
+ // buffer before finishing, release it here.
+ if (!response_.empty()) {
+ ReleasePayloadBuffer();
+ }
+
+ Close();
+
+ // Send a control packet indicating that the stream (and RPC) has terminated.
+ return call_.channel().Send(Packet(PacketType::SERVER_STREAM_END,
+ call_.channel().id(),
+ call_.service().id(),
+ method().id(),
+ {},
+ status));
+}
+
+std::span<std::byte> Responder::AcquirePayloadBuffer() {
+ PW_DCHECK(open());
+
+ // Only allow having one active buffer at a time.
+ if (response_.empty()) {
+ response_ = call_.channel().AcquireBuffer();
+ }
+
+ return response_.payload(ResponsePacket());
+}
+
+Status Responder::ReleasePayloadBuffer(std::span<const std::byte> payload) {
+ PW_DCHECK(open());
+ return call_.channel().Send(response_, ResponsePacket(payload));
+}
+
+Status Responder::ReleasePayloadBuffer() {
+ PW_DCHECK(open());
+ call_.channel().Release(response_);
+ return OkStatus();
+}
+
+void Responder::Close() {
+ if (!open()) {
+ return;
+ }
+
+ call_.server().RemoveResponder(*this);
+ state_ = kClosed;
+}
+
+Packet Responder::ResponsePacket(std::span<const std::byte> payload) const {
+ return Packet(PacketType::RESPONSE,
+ call_.channel().id(),
+ call_.service().id(),
+ method().id(),
+ payload);
+}
+
+} // namespace pw::rpc::internal