blob: 26874edaed8b67699d4bd30f3a44c06f3faa6e7b [file] [log] [blame]
Alexei Frolov5d6d3922020-05-08 13:57:02 -07001// Copyright 2020 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#include "pw_rpc/server.h"
16
Wyatt Hepler3e2d7192020-06-11 08:28:21 -070017#include <algorithm>
18
Alexei Frolov5d6d3922020-05-08 13:57:02 -070019#include "pw_log/log.h"
20#include "pw_rpc/internal/packet.h"
Wyatt Hepler671946e2020-06-09 14:39:33 -070021#include "pw_rpc/internal/server.h"
Wyatt Hepler80f26ff2020-06-01 09:30:17 -070022#include "pw_rpc/server_context.h"
Alexei Frolov5d6d3922020-05-08 13:57:02 -070023
24namespace pw::rpc {
Wyatt Hepler712d3672020-07-13 15:52:11 -070025namespace {
Alexei Frolov5d6d3922020-05-08 13:57:02 -070026
Wyatt Hepler80f26ff2020-06-01 09:30:17 -070027using std::byte;
28
Alexei Frolov5d6d3922020-05-08 13:57:02 -070029using internal::Packet;
Alexei Frolov33a1e8f2020-05-26 08:39:32 -070030using internal::PacketType;
Alexei Frolov5d6d3922020-05-08 13:57:02 -070031
Wyatt Hepler712d3672020-07-13 15:52:11 -070032bool DecodePacket(ChannelOutput& interface,
33 std::span<const byte> data,
34 Packet& packet) {
Alexei Frolov3f2d0082020-10-04 22:30:39 -070035 Result<Packet> result = Packet::FromBuffer(data);
36 if (!result.ok()) {
Wyatt Hepler712d3672020-07-13 15:52:11 -070037 PW_LOG_WARN("Failed to decode packet on interface %s", interface.name());
38 return false;
39 }
40
Alexei Frolov3f2d0082020-10-04 22:30:39 -070041 packet = result.value();
42
Wyatt Hepler712d3672020-07-13 15:52:11 -070043 // If the packet is malformed, don't try to process it.
44 if (packet.channel_id() == Channel::kUnassignedChannelId ||
45 packet.service_id() == 0 || packet.method_id() == 0) {
46 PW_LOG_WARN("Received incomplete packet on interface %s", interface.name());
47
48 // Only send an ERROR response if a valid channel ID was provided.
49 if (packet.channel_id() != Channel::kUnassignedChannelId) {
50 internal::Channel temp_channel(packet.channel_id(), &interface);
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070051 temp_channel.Send(Packet::ServerError(packet, Status::DataLoss()));
Wyatt Hepler712d3672020-07-13 15:52:11 -070052 }
53 return false;
54 }
55
56 return true;
57}
58
59} // namespace
60
Wyatt Heplerb7643d72020-07-10 10:33:14 -070061Server::~Server() {
Wyatt Hepler01fc15b2021-06-10 18:15:59 -070062 // Since the responders remove themselves from the server in
63 // CloseAndSendResponse(), close responders until no responders remain.
64 while (!responders_.empty()) {
65 responders_.front().CloseAndSendResponse(OkStatus());
Wyatt Heplerb7643d72020-07-10 10:33:14 -070066 }
67}
68
Wyatt Hepler0f262352020-07-29 09:51:27 -070069Status Server::ProcessPacket(std::span<const byte> data,
70 ChannelOutput& interface) {
Wyatt Hepler712d3672020-07-13 15:52:11 -070071 Packet packet;
72 if (!DecodePacket(interface, data, packet)) {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070073 return Status::DataLoss();
Wyatt Hepler0f262352020-07-29 09:51:27 -070074 }
75
76 if (packet.destination() != Packet::kServer) {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070077 return Status::InvalidArgument();
Alexei Frolov5d6d3922020-05-08 13:57:02 -070078 }
79
Wyatt Hepler671946e2020-06-09 14:39:33 -070080 internal::Channel* channel = FindChannel(packet.channel_id());
Alexei Frolov5d6d3922020-05-08 13:57:02 -070081 if (channel == nullptr) {
Alexei Frolov33a1e8f2020-05-26 08:39:32 -070082 // If the requested channel doesn't exist, try to dynamically assign one.
83 channel = AssignChannel(packet.channel_id(), interface);
84 if (channel == nullptr) {
Wyatt Hepler712d3672020-07-13 15:52:11 -070085 // If a channel can't be assigned, send a RESOURCE_EXHAUSTED error.
Wyatt Hepler671946e2020-06-09 14:39:33 -070086 internal::Channel temp_channel(packet.channel_id(), &interface);
Wyatt Hepler0f262352020-07-29 09:51:27 -070087 temp_channel.Send(
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070088 Packet::ServerError(packet, Status::ResourceExhausted()));
Wyatt Hepler1b3da3a2021-01-07 13:26:57 -080089 return OkStatus(); // OK since the packet was handled
Alexei Frolov33a1e8f2020-05-26 08:39:32 -070090 }
Alexei Frolov5d6d3922020-05-08 13:57:02 -070091 }
92
Wyatt Hepler0f262352020-07-29 09:51:27 -070093 const auto [service, method] = FindMethod(packet);
94
95 if (method == nullptr) {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070096 channel->Send(Packet::ServerError(packet, Status::NotFound()));
Wyatt Hepler1b3da3a2021-01-07 13:26:57 -080097 return OkStatus();
Wyatt Hepler0f262352020-07-29 09:51:27 -070098 }
99
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700100 // Find an existing reader/writer for this RPC, if any.
101 auto responder =
102 std::find_if(responders_.begin(), responders_.end(), [&](auto& w) {
103 return packet.channel_id() == w.channel_id() &&
104 packet.service_id() == w.service_id() &&
105 packet.method_id() == w.method_id();
106 });
107
Wyatt Hepler0f262352020-07-29 09:51:27 -0700108 switch (packet.type()) {
109 case PacketType::REQUEST: {
Wyatt Hepler85e91402021-08-05 12:48:03 -0700110 // If the REQUEST is for an ongoing RPC, cancel it, then call it again.
111 if (responder != responders_.end()) {
112 responder->HandleError(Status::Cancelled());
113 }
114
Wyatt Hepler0f262352020-07-29 09:51:27 -0700115 internal::ServerCall call(
116 static_cast<internal::Server&>(*this), *channel, *service, *method);
117 method->Invoke(call, packet);
118 break;
119 }
Wyatt Heplera9211162021-06-12 15:40:11 -0700120 case PacketType::CLIENT_STREAM:
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700121 HandleClientStreamPacket(packet, *channel, responder);
Wyatt Hepler0f262352020-07-29 09:51:27 -0700122 break;
123 case PacketType::CLIENT_ERROR:
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700124 if (responder != responders_.end()) {
125 responder->HandleError(packet.status());
126 }
Wyatt Hepler0f262352020-07-29 09:51:27 -0700127 break;
Wyatt Heplera9211162021-06-12 15:40:11 -0700128 case PacketType::CANCEL:
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700129 HandleCancelPacket(packet, *channel, responder);
Wyatt Hepler0f262352020-07-29 09:51:27 -0700130 break;
Wyatt Heplera9211162021-06-12 15:40:11 -0700131 case PacketType::CLIENT_STREAM_END:
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700132 HandleClientStreamPacket(packet, *channel, responder);
Wyatt Heplera9211162021-06-12 15:40:11 -0700133 break;
Wyatt Hepler0f262352020-07-29 09:51:27 -0700134 default:
Wyatt Heplerd78f7c62020-09-28 14:27:32 -0700135 channel->Send(Packet::ServerError(packet, Status::Unimplemented()));
Wyatt Hepler0f262352020-07-29 09:51:27 -0700136 PW_LOG_WARN("Unable to handle packet of type %u",
137 unsigned(packet.type()));
138 }
Wyatt Hepler1b3da3a2021-01-07 13:26:57 -0800139 return OkStatus();
Wyatt Hepler0f262352020-07-29 09:51:27 -0700140}
141
Wyatt Heplercbd09c22020-09-15 11:17:24 -0700142std::tuple<Service*, const internal::Method*> Server::FindMethod(
Wyatt Hepler0f262352020-07-29 09:51:27 -0700143 const internal::Packet& packet) {
Wyatt Hepler712d3672020-07-13 15:52:11 -0700144 // Packets always include service and method IDs.
Wyatt Hepler3e2d7192020-06-11 08:28:21 -0700145 auto service = std::find_if(services_.begin(), services_.end(), [&](auto& s) {
Wyatt Hepler712d3672020-07-13 15:52:11 -0700146 return s.id() == packet.service_id();
Wyatt Hepler3e2d7192020-06-11 08:28:21 -0700147 });
148
149 if (service == services_.end()) {
Wyatt Hepler0f262352020-07-29 09:51:27 -0700150 return {};
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700151 }
152
Wyatt Hepler0f262352020-07-29 09:51:27 -0700153 return {&(*service), service->FindMethod(packet.method_id())};
Wyatt Hepler712d3672020-07-13 15:52:11 -0700154}
Wyatt Hepler80f26ff2020-06-01 09:30:17 -0700155
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700156void Server::HandleClientStreamPacket(const internal::Packet& packet,
157 internal::Channel& channel,
158 ResponderIterator responder) const {
159 if (responder == responders_.end()) {
160 PW_LOG_DEBUG(
161 "Received client stream packet for method that is not pending");
Wyatt Heplerd78f7c62020-09-28 14:27:32 -0700162 channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()));
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700163 return;
164 }
165
166 if (!responder->has_client_stream()) {
167 channel.Send(Packet::ServerError(packet, Status::InvalidArgument()));
168 return;
169 }
170
171 if (!responder->client_stream_open()) {
172 channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()));
173 return;
174 }
175
176 if (packet.type() == PacketType::CLIENT_STREAM) {
177 responder->HandleClientStream(packet.payload());
178 } else { // Handle PacketType::CLIENT_STREAM_END.
179 responder->EndClientStream();
Wyatt Hepler712d3672020-07-13 15:52:11 -0700180 }
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700181}
182
Wyatt Hepler01fc15b2021-06-10 18:15:59 -0700183void Server::HandleCancelPacket(const Packet& packet,
184 internal::Channel& channel,
185 ResponderIterator responder) const {
186 if (responder == responders_.end()) {
187 channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()));
188 PW_LOG_DEBUG("Received CANCEL packet for method that is not pending");
189 } else {
190 responder->HandleError(Status::Cancelled());
Alexei Frolov062ed182020-09-28 16:23:06 -0700191 }
192}
193
Wyatt Hepler671946e2020-06-09 14:39:33 -0700194internal::Channel* Server::FindChannel(uint32_t id) const {
195 for (internal::Channel& c : channels_) {
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700196 if (c.id() == id) {
197 return &c;
198 }
199 }
200 return nullptr;
201}
202
Wyatt Hepler671946e2020-06-09 14:39:33 -0700203internal::Channel* Server::AssignChannel(uint32_t id,
204 ChannelOutput& interface) {
205 internal::Channel* channel = FindChannel(Channel::kUnassignedChannelId);
Alexei Frolov33a1e8f2020-05-26 08:39:32 -0700206 if (channel == nullptr) {
207 return nullptr;
208 }
209
Wyatt Hepler671946e2020-06-09 14:39:33 -0700210 *channel = internal::Channel(id, &interface);
Alexei Frolov33a1e8f2020-05-26 08:39:32 -0700211 return channel;
212}
213
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700214} // namespace pw::rpc