blob: d1a4647616c7d79a69bd59ddc4a3ac236081f36a [file] [log] [blame]
Alexei Frolovf93cb262021-07-14 16:05:15 -07001// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#include "pw_transfer/transfer.h"
16
17#include "pw_assert/check.h"
18#include "pw_log/log.h"
19#include "pw_status/try.h"
20#include "pw_transfer/transfer.pwpb.h"
21#include "pw_transfer_private/chunk.h"
Alexei Frolov563946f2021-08-05 18:58:48 -070022#include "pw_varint/varint.h"
Alexei Frolovf93cb262021-07-14 16:05:15 -070023
24namespace pw::transfer {
Alexei Frolovf93cb262021-07-14 16:05:15 -070025
26void TransferService::Read(ServerContext&,
27 RawServerReaderWriter& reader_writer) {
28 read_stream_ = std::move(reader_writer);
29
30 read_stream_.set_on_next(
31 [this](ConstByteSpan message) { OnReadMessage(message); });
32}
33
34void TransferService::Write(ServerContext&,
35 RawServerReaderWriter& reader_writer) {
Alexei Frolov563946f2021-08-05 18:58:48 -070036 write_stream_ = std::move(reader_writer);
37
38 write_stream_.set_on_next(
39 [this](ConstByteSpan message) { OnWriteMessage(message); });
Alexei Frolovf93cb262021-07-14 16:05:15 -070040}
41
42void TransferService::SendStatusChunk(RawServerReaderWriter& stream,
43 uint32_t transfer_id,
44 Status status) {
45 internal::Chunk chunk = {};
46 chunk.transfer_id = transfer_id;
47 chunk.status = status.code();
48
49 Result<ConstByteSpan> result =
50 internal::EncodeChunk(chunk, stream.PayloadBuffer());
51 if (result.ok()) {
Adrien Larbanetd1ca56c2021-06-10 14:20:45 +000052 stream.Write(result.value())
53 .IgnoreError(); // TODO(pwbug/387): Handle Status properly
Alexei Frolovf93cb262021-07-14 16:05:15 -070054 }
55}
56
57bool TransferService::SendNextReadChunk(internal::Context& context) {
58 if (context.pending_bytes() == 0) {
59 return false;
60 }
61
62 ByteSpan buffer = read_stream_.PayloadBuffer();
63
64 // Begin by doing a partial encode of all the metadata fields, leaving the
65 // buffer with usable space for the chunk data at the end.
Armando Montanez9f3708c2021-08-02 14:37:36 -070066 Chunk::MemoryEncoder encoder(buffer);
Adrien Larbanetd1ca56c2021-06-10 14:20:45 +000067 encoder.WriteTransferId(context.transfer_id())
68 .IgnoreError(); // TODO(pwbug/387): Handle Status properly
69 encoder.WriteOffset(context.offset())
70 .IgnoreError(); // TODO(pwbug/387): Handle Status properly
Alexei Frolovf93cb262021-07-14 16:05:15 -070071
72 // Reserve space for the data proto field overhead and use the remainder of
73 // the buffer for the chunk data.
74 size_t reserved_size = encoder.size() + 1 /* data key */ + 5 /* data size */;
75
76 ByteSpan data_buffer = buffer.subspan(reserved_size);
77 size_t max_bytes_to_send =
78 std::min(context.pending_bytes(), context.max_chunk_size_bytes());
79
80 if (max_bytes_to_send < data_buffer.size()) {
81 data_buffer = data_buffer.first(max_bytes_to_send);
82 }
83
84 Result<ByteSpan> data = context.reader().Read(data_buffer);
85 if (data.status().IsOutOfRange()) {
86 // No more data to read.
Adrien Larbanetd1ca56c2021-06-10 14:20:45 +000087 encoder.WriteRemainingBytes(0)
88 .IgnoreError(); // TODO(pwbug/387): Handle Status properly
Alexei Frolovf93cb262021-07-14 16:05:15 -070089 context.set_pending_bytes(0);
90 } else if (!data.ok()) {
91 read_stream_.ReleaseBuffer();
92 return false;
93 } else {
Adrien Larbanetd1ca56c2021-06-10 14:20:45 +000094 encoder.WriteData(data.value())
95 .IgnoreError(); // TODO(pwbug/387): Handle Status properly
Alexei Frolovf93cb262021-07-14 16:05:15 -070096 context.set_offset(context.offset() + data.value().size());
97 context.set_pending_bytes(context.pending_bytes() - data.value().size());
98 }
99
100 return read_stream_.Write(encoder).ok();
101}
102
Alexei Frolovf93cb262021-07-14 16:05:15 -0700103void TransferService::OnReadMessage(ConstByteSpan message) {
104 // All incoming chunks in a client read transfer are transfer parameter
105 // updates, except for the final chunk, which is an acknowledgement of
106 // completion.
107 //
108 // Transfer parameters may contain the following fields:
109 //
110 // - transfer_id (required)
111 // - pending_bytes (required)
112 // - offset (required)
113 // - max_chunk_size_bytes
114 // - min_delay_microseconds (not yet supported)
115 //
116 internal::Chunk parameters;
117
118 if (Status status = internal::DecodeChunk(message, parameters);
119 !status.ok()) {
120 // No special handling required here. The client will retransmit the chunk
121 // when no response is received.
122 PW_LOG_ERROR("Failed to decode incoming read transfer chunk");
123 return;
124 }
125
126 Result<internal::Context*> result =
Alexei Frolov563946f2021-08-05 18:58:48 -0700127 read_transfers_.GetOrStartTransfer(parameters.transfer_id);
Alexei Frolovf93cb262021-07-14 16:05:15 -0700128 if (!result.ok()) {
129 PW_LOG_ERROR("Error handling read transfer %u: %d",
130 static_cast<unsigned>(parameters.transfer_id),
131 static_cast<int>(result.status().code()));
132 SendStatusChunk(read_stream_, parameters.transfer_id, result.status());
133 return;
134 }
135
136 internal::Context& transfer = *result.value();
137
138 if (parameters.status.has_value()) {
139 // Transfer has been terminated (successfully or not).
140 Status status = parameters.status.value();
141 if (!status.ok()) {
142 PW_LOG_ERROR("Transfer %u failed with status %d",
143 static_cast<unsigned>(parameters.transfer_id),
144 static_cast<int>(status.code()));
145 }
146 transfer.Finish(status);
147 return;
148 }
149
150 if (!parameters.pending_bytes.has_value()) {
151 // Malformed chunk.
152 SendStatusChunk(
153 read_stream_, parameters.transfer_id, Status::InvalidArgument());
154 transfer.Finish(Status::InvalidArgument());
155 return;
156 }
157
158 // Update local transfer fields based on the received chunk.
159 if (transfer.offset() != parameters.offset) {
160 // TODO(frolv): pw_stream does not yet support seeking, so this temporarily
161 // cancels the transfer. Once seeking is added, this should be updated.
162 //
163 // transfer.set_offset(parameters.offset.value());
164 // transfer.Seek(transfer.offset());
165 //
166 SendStatusChunk(
167 read_stream_, parameters.transfer_id, Status::Unimplemented());
168 transfer.Finish(Status::Unimplemented());
169 return;
170 }
171
172 if (parameters.max_chunk_size_bytes.has_value()) {
Alexei Frolov563946f2021-08-05 18:58:48 -0700173 transfer.set_max_chunk_size_bytes(
174 std::min(static_cast<size_t>(parameters.max_chunk_size_bytes.value()),
175 max_chunk_size_bytes_));
Alexei Frolovf93cb262021-07-14 16:05:15 -0700176 }
177
178 transfer.set_pending_bytes(parameters.pending_bytes.value());
179 while (SendNextReadChunk(transfer)) {
180 // Empty.
181 }
182}
183
Alexei Frolov563946f2021-08-05 18:58:48 -0700184void TransferService::OnWriteMessage(ConstByteSpan message) {
185 // Process an incoming chunk during a client write transfer. The chunk may
186 // either be the initial "start write" chunk (which only contains the transfer
187 // ID), or a data chunk.
188 internal::Chunk chunk;
189
190 if (Status status = internal::DecodeChunk(message, chunk); !status.ok()) {
191 PW_LOG_ERROR("Failed to decode incoming write transfer chunk");
192 return;
193 }
194
195 // Try to find an active write transfer for the requested ID, or start a new
196 // one if a writable TransferHandler is registered for it.
197 Result<internal::Context*> maybe_context =
198 write_transfers_.GetOrStartTransfer(chunk.transfer_id);
199 if (!maybe_context.ok()) {
200 PW_LOG_ERROR("Error handling write transfer %u: %d",
201 static_cast<unsigned>(chunk.transfer_id),
202 static_cast<int>(maybe_context.status().code()));
203 SendStatusChunk(write_stream_, chunk.transfer_id, maybe_context.status());
204 return;
205 }
206
207 internal::Context& transfer = *maybe_context.value();
208
209 // Check for a client-side error terminating the transfer.
210 if (chunk.status.has_value()) {
211 transfer.Finish(chunk.status.value());
212 return;
213 }
214
215 // Copy data from the chunk into the transfer handler's Writer, if it is at
216 // the offset the transfer is currently expecting. Under some circumstances,
217 // the chunk's data may be empty (e.g. a zero-length transfer). In that case,
218 // handle the chunk as if the data exists.
219 bool chunk_data_processed = false;
220
221 if (chunk.offset == transfer.offset()) {
222 if (chunk.data.empty()) {
223 chunk_data_processed = true;
224 } else if (chunk.data.size() <= transfer.pending_bytes()) {
225 if (Status status = transfer.writer().Write(chunk.data); !status.ok()) {
226 SendStatusChunk(write_stream_, chunk.transfer_id, status);
227 transfer.Finish(status);
228 return;
229 }
230 transfer.set_offset(transfer.offset() + chunk.data.size());
231 transfer.set_pending_bytes(transfer.pending_bytes() - chunk.data.size());
232 chunk_data_processed = true;
233 }
234 } else {
235 // Bad offset; reset pending_bytes to send another parameters chunk.
236 transfer.set_pending_bytes(0);
237 }
238
239 // When the client sets remaining_bytes to 0, it indicates completion of the
240 // transfer. Acknowledge the completion through a status chunk and clean up.
241 if (chunk_data_processed && chunk.remaining_bytes == 0) {
242 SendStatusChunk(write_stream_, chunk.transfer_id, OkStatus());
243 transfer.Finish(OkStatus());
244 return;
245 }
246
247 if (transfer.pending_bytes() > 0) {
248 // Expecting more data to be sent by the client. Wait for the next chunk.
249 return;
250 }
251
252 // All pending data has been received. Send a new parameters chunk to start
253 // the next batch.
254 transfer.set_pending_bytes(
255 std::min(default_max_bytes_to_receive_,
256 transfer.writer().ConservativeWriteLimit()));
257
258 internal::Chunk parameters = {};
259 parameters.transfer_id = transfer.transfer_id();
260 parameters.offset = transfer.offset();
261 parameters.pending_bytes = transfer.pending_bytes();
262 parameters.max_chunk_size_bytes = MaxWriteChunkSize(transfer);
263
264 if (auto data =
265 internal::EncodeChunk(parameters, write_stream_.PayloadBuffer());
266 data.ok()) {
267 write_stream_.Write(*data);
268 }
269}
270
271// Calculates the maximum size of actual data that can be sent within a single
272// client write transfer chunk, accounting for the overhead of the transfer
273// protocol and RPC system.
274//
275// Note: This function relies on RPC protocol internals. This is generally a
276// *bad* idea, but is necessary here due to limitations of the RPC system and
277// its asymmetric ingress and egress paths.
278//
279// TODO(frolv): This should be investigated further and perhaps addressed within
280// the RPC system, at the least through a helper function.
281size_t TransferService::MaxWriteChunkSize(
282 const internal::Context& transfer) const {
283 // Start with the user-provided maximum chunk size, which should be the usable
284 // payload length on the RPC ingress path after any transport overhead.
285 ssize_t max_size = max_chunk_size_bytes_;
286
287 // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
288 //
289 // type: 1 byte key, 1 byte value (CLIENT_STREAM)
290 // channel_id: 1 byte key, varint value (calculate from stream)
291 // service_id: 1 byte key, 4 byte value
292 // method_id: 1 byte key, 4 byte value
293 // payload: 1 byte key, varint length (remaining space)
294 // status: 0 bytes (not set in stream packets)
295 //
296 // TOTAL: 14 bytes + encoded channel_id size + encoded payload length
297 //
298 max_size -= 14;
299 max_size -= varint::EncodedSize(write_stream_.channel_id());
300 max_size -= varint::EncodedSize(max_size);
301
302 // Subtract the transfer service overhead for a client write chunk
303 // (pw_transfer/transfer.proto).
304 //
305 // transfer_id: 1 byte key, varint value (calculate)
306 // offset: 1 byte key, varint value (calculate)
307 // data: 1 byte key, varint length (remaining space)
308 //
309 // TOTAL: 3 + encoded transfer_id + encoded offset + encoded data length
310 //
311 size_t max_offset_in_window = transfer.offset() + transfer.pending_bytes();
312 max_size -= 3;
313 max_size -= varint::EncodedSize(transfer.transfer_id());
314 max_size -= varint::EncodedSize(max_offset_in_window);
315 max_size -= varint::EncodedSize(max_size);
316
317 // A resulting value of zero (or less) renders write transfers unusable, as
318 // there is no space to send any payload. This should be considered a
319 // programmer error in the transfer service setup.
320 PW_CHECK_INT_GT(
321 max_size,
322 0,
323 "Transfer service maximum chunk size is too small to fit a payload. "
324 "Increase max_chunk_size_bytes to support write transfers.");
325
326 return max_size;
327}
328
Alexei Frolovf93cb262021-07-14 16:05:15 -0700329} // namespace pw::transfer