blob: 1c9e81325aef4d9438077d77a4b812c52d9e2b91 [file] [log] [blame]
Alexei Frolov5d6d3922020-05-08 13:57:02 -07001// Copyright 2020 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#include "pw_rpc/server.h"
16
Wyatt Hepler3e2d7192020-06-11 08:28:21 -070017#include <algorithm>
18
Alexei Frolov5d6d3922020-05-08 13:57:02 -070019#include "pw_log/log.h"
20#include "pw_rpc/internal/packet.h"
Wyatt Hepler671946e2020-06-09 14:39:33 -070021#include "pw_rpc/internal/server.h"
Wyatt Hepler80f26ff2020-06-01 09:30:17 -070022#include "pw_rpc/server_context.h"
Alexei Frolov5d6d3922020-05-08 13:57:02 -070023
24namespace pw::rpc {
Wyatt Hepler712d3672020-07-13 15:52:11 -070025namespace {
Alexei Frolov5d6d3922020-05-08 13:57:02 -070026
Wyatt Hepler80f26ff2020-06-01 09:30:17 -070027using std::byte;
28
Alexei Frolov5d6d3922020-05-08 13:57:02 -070029using internal::Packet;
Alexei Frolov33a1e8f2020-05-26 08:39:32 -070030using internal::PacketType;
Alexei Frolov5d6d3922020-05-08 13:57:02 -070031
Wyatt Hepler712d3672020-07-13 15:52:11 -070032bool DecodePacket(ChannelOutput& interface,
33 std::span<const byte> data,
34 Packet& packet) {
Alexei Frolov3f2d0082020-10-04 22:30:39 -070035 Result<Packet> result = Packet::FromBuffer(data);
36 if (!result.ok()) {
Wyatt Hepler712d3672020-07-13 15:52:11 -070037 PW_LOG_WARN("Failed to decode packet on interface %s", interface.name());
38 return false;
39 }
40
Alexei Frolov3f2d0082020-10-04 22:30:39 -070041 packet = result.value();
42
Wyatt Hepler712d3672020-07-13 15:52:11 -070043 // If the packet is malformed, don't try to process it.
44 if (packet.channel_id() == Channel::kUnassignedChannelId ||
45 packet.service_id() == 0 || packet.method_id() == 0) {
46 PW_LOG_WARN("Received incomplete packet on interface %s", interface.name());
47
48 // Only send an ERROR response if a valid channel ID was provided.
49 if (packet.channel_id() != Channel::kUnassignedChannelId) {
50 internal::Channel temp_channel(packet.channel_id(), &interface);
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070051 temp_channel.Send(Packet::ServerError(packet, Status::DataLoss()));
Wyatt Hepler712d3672020-07-13 15:52:11 -070052 }
53 return false;
54 }
55
56 return true;
57}
58
59} // namespace
60
Wyatt Heplerb7643d72020-07-10 10:33:14 -070061Server::~Server() {
62 // Since the writers remove themselves from the server in Finish(), remove the
63 // first writer until no writers remain.
64 while (!writers_.empty()) {
65 writers_.front().Finish();
66 }
67}
68
Wyatt Hepler0f262352020-07-29 09:51:27 -070069Status Server::ProcessPacket(std::span<const byte> data,
70 ChannelOutput& interface) {
Wyatt Hepler712d3672020-07-13 15:52:11 -070071 Packet packet;
72 if (!DecodePacket(interface, data, packet)) {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070073 return Status::DataLoss();
Wyatt Hepler0f262352020-07-29 09:51:27 -070074 }
75
76 if (packet.destination() != Packet::kServer) {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070077 return Status::InvalidArgument();
Alexei Frolov5d6d3922020-05-08 13:57:02 -070078 }
79
Wyatt Hepler671946e2020-06-09 14:39:33 -070080 internal::Channel* channel = FindChannel(packet.channel_id());
Alexei Frolov5d6d3922020-05-08 13:57:02 -070081 if (channel == nullptr) {
Alexei Frolov33a1e8f2020-05-26 08:39:32 -070082 // If the requested channel doesn't exist, try to dynamically assign one.
83 channel = AssignChannel(packet.channel_id(), interface);
84 if (channel == nullptr) {
Wyatt Hepler712d3672020-07-13 15:52:11 -070085 // If a channel can't be assigned, send a RESOURCE_EXHAUSTED error.
Wyatt Hepler671946e2020-06-09 14:39:33 -070086 internal::Channel temp_channel(packet.channel_id(), &interface);
Wyatt Hepler0f262352020-07-29 09:51:27 -070087 temp_channel.Send(
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070088 Packet::ServerError(packet, Status::ResourceExhausted()));
Wyatt Hepler1b3da3a2021-01-07 13:26:57 -080089 return OkStatus(); // OK since the packet was handled
Alexei Frolov33a1e8f2020-05-26 08:39:32 -070090 }
Alexei Frolov5d6d3922020-05-08 13:57:02 -070091 }
92
Wyatt Hepler0f262352020-07-29 09:51:27 -070093 const auto [service, method] = FindMethod(packet);
94
95 if (method == nullptr) {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -070096 channel->Send(Packet::ServerError(packet, Status::NotFound()));
Wyatt Hepler1b3da3a2021-01-07 13:26:57 -080097 return OkStatus();
Wyatt Hepler0f262352020-07-29 09:51:27 -070098 }
99
100 switch (packet.type()) {
101 case PacketType::REQUEST: {
102 internal::ServerCall call(
103 static_cast<internal::Server&>(*this), *channel, *service, *method);
104 method->Invoke(call, packet);
105 break;
106 }
Wyatt Heplera9211162021-06-12 15:40:11 -0700107 case PacketType::CLIENT_STREAM:
Wyatt Hepler0f262352020-07-29 09:51:27 -0700108 // TODO(hepler): Support client streaming RPCs.
109 break;
110 case PacketType::CLIENT_ERROR:
Alexei Frolov062ed182020-09-28 16:23:06 -0700111 HandleClientError(packet);
Wyatt Hepler0f262352020-07-29 09:51:27 -0700112 break;
Wyatt Heplera9211162021-06-12 15:40:11 -0700113 case PacketType::CANCEL:
Wyatt Hepler0f262352020-07-29 09:51:27 -0700114 HandleCancelPacket(packet, *channel);
115 break;
Wyatt Heplera9211162021-06-12 15:40:11 -0700116 case PacketType::CLIENT_STREAM_END:
117 // TODO(hepler): Handle client stream end packets.
118 break;
Wyatt Hepler0f262352020-07-29 09:51:27 -0700119 default:
Wyatt Heplerd78f7c62020-09-28 14:27:32 -0700120 channel->Send(Packet::ServerError(packet, Status::Unimplemented()));
Wyatt Hepler0f262352020-07-29 09:51:27 -0700121 PW_LOG_WARN("Unable to handle packet of type %u",
122 unsigned(packet.type()));
123 }
Wyatt Hepler1b3da3a2021-01-07 13:26:57 -0800124 return OkStatus();
Wyatt Hepler0f262352020-07-29 09:51:27 -0700125}
126
Wyatt Heplercbd09c22020-09-15 11:17:24 -0700127std::tuple<Service*, const internal::Method*> Server::FindMethod(
Wyatt Hepler0f262352020-07-29 09:51:27 -0700128 const internal::Packet& packet) {
Wyatt Hepler712d3672020-07-13 15:52:11 -0700129 // Packets always include service and method IDs.
Wyatt Hepler3e2d7192020-06-11 08:28:21 -0700130 auto service = std::find_if(services_.begin(), services_.end(), [&](auto& s) {
Wyatt Hepler712d3672020-07-13 15:52:11 -0700131 return s.id() == packet.service_id();
Wyatt Hepler3e2d7192020-06-11 08:28:21 -0700132 });
133
134 if (service == services_.end()) {
Wyatt Hepler0f262352020-07-29 09:51:27 -0700135 return {};
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700136 }
137
Wyatt Hepler0f262352020-07-29 09:51:27 -0700138 return {&(*service), service->FindMethod(packet.method_id())};
Wyatt Hepler712d3672020-07-13 15:52:11 -0700139}
Wyatt Hepler80f26ff2020-06-01 09:30:17 -0700140
Wyatt Hepler712d3672020-07-13 15:52:11 -0700141void Server::HandleCancelPacket(const Packet& packet,
142 internal::Channel& channel) {
143 auto writer = std::find_if(writers_.begin(), writers_.end(), [&](auto& w) {
144 return w.channel_id() == packet.channel_id() &&
145 w.service_id() == packet.service_id() &&
146 w.method_id() == packet.method_id();
147 });
Wyatt Hepler7da973a2020-06-09 10:04:48 -0700148
Wyatt Hepler712d3672020-07-13 15:52:11 -0700149 if (writer == writers_.end()) {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -0700150 channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()));
Wyatt Hepler712d3672020-07-13 15:52:11 -0700151 PW_LOG_WARN("Received CANCEL packet for method that is not pending");
152 } else {
Wyatt Heplerd78f7c62020-09-28 14:27:32 -0700153 writer->Finish(Status::Cancelled());
Wyatt Hepler712d3672020-07-13 15:52:11 -0700154 }
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700155}
156
Alexei Frolov062ed182020-09-28 16:23:06 -0700157void Server::HandleClientError(const Packet& packet) {
158 // A client error indicates that the client received a packet that it did not
159 // expect. If the packet belongs to a streaming RPC, cancel the stream without
160 // sending a final SERVER_STREAM_END packet.
161 auto writer = std::find_if(writers_.begin(), writers_.end(), [&](auto& w) {
162 return w.channel_id() == packet.channel_id() &&
163 w.service_id() == packet.service_id() &&
164 w.method_id() == packet.method_id();
165 });
166
167 if (writer != writers_.end()) {
168 writer->Close();
169 }
170}
171
Wyatt Hepler671946e2020-06-09 14:39:33 -0700172internal::Channel* Server::FindChannel(uint32_t id) const {
173 for (internal::Channel& c : channels_) {
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700174 if (c.id() == id) {
175 return &c;
176 }
177 }
178 return nullptr;
179}
180
Wyatt Hepler671946e2020-06-09 14:39:33 -0700181internal::Channel* Server::AssignChannel(uint32_t id,
182 ChannelOutput& interface) {
183 internal::Channel* channel = FindChannel(Channel::kUnassignedChannelId);
Alexei Frolov33a1e8f2020-05-26 08:39:32 -0700184 if (channel == nullptr) {
185 return nullptr;
186 }
187
Wyatt Hepler671946e2020-06-09 14:39:33 -0700188 *channel = internal::Channel(id, &interface);
Alexei Frolov33a1e8f2020-05-26 08:39:32 -0700189 return channel;
190}
191
Alexei Frolov5d6d3922020-05-08 13:57:02 -0700192} // namespace pw::rpc