blob: cbf82d7baff7bdb81e86e9e3ac18267b51158275 [file] [log] [blame]
Alexei Frolov22ee1142022-02-03 13:59:01 -08001// Copyright 2022 The Pigweed Authors
Alexei Frolovff057e82021-09-20 18:37:38 -07002//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#include "pw_transfer/client.h"
Alexei Frolov3d99d922021-11-04 13:13:42 -070016
17#include <cstring>
18
19#include "gtest/gtest.h"
20#include "pw_assert/check.h"
21#include "pw_bytes/array.h"
22#include "pw_rpc/raw/client_testing.h"
Alexei Frolov22ee1142022-02-03 13:59:01 -080023#include "pw_rpc/thread_testing.h"
Alexei Frolov4fd8c802021-11-09 09:49:25 -080024#include "pw_thread/sleep.h"
25#include "pw_thread/thread.h"
26#include "pw_thread_stl/options.h"
Alexei Frolov3d99d922021-11-04 13:13:42 -070027#include "pw_transfer_private/chunk_testing.h"
28
29namespace pw::transfer::test {
30namespace {
31
32using internal::Chunk;
33using pw_rpc::raw::Transfer;
34
Alexei Frolov22ee1142022-02-03 13:59:01 -080035using namespace std::chrono_literals;
36
Wyatt Hepler70a62102022-02-11 15:14:35 -080037PW_MODIFY_DIAGNOSTICS_PUSH();
38PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
39
Alexei Frolov22ee1142022-02-03 13:59:01 -080040thread::Options& TransferThreadOptions() {
Alexei Frolov4fd8c802021-11-09 09:49:25 -080041 static thread::stl::Options options;
42 return options;
43}
44
Alexei Frolov3d99d922021-11-04 13:13:42 -070045class ReadTransfer : public ::testing::Test {
46 protected:
47 ReadTransfer(size_t max_bytes_to_receive = 0)
Alexei Frolov22ee1142022-02-03 13:59:01 -080048 : transfer_thread_(chunk_buffer_, encode_buffer_),
49 client_(context_.client(),
Alexei Frolov3d99d922021-11-04 13:13:42 -070050 context_.channel().id(),
Alexei Frolov22ee1142022-02-03 13:59:01 -080051 transfer_thread_,
Alexei Frolov3d99d922021-11-04 13:13:42 -070052 max_bytes_to_receive),
Alexei Frolov22ee1142022-02-03 13:59:01 -080053 system_thread_(TransferThreadOptions(), transfer_thread_) {}
Alexei Frolov4fd8c802021-11-09 09:49:25 -080054
55 ~ReadTransfer() {
Alexei Frolov22ee1142022-02-03 13:59:01 -080056 transfer_thread_.Terminate();
57 system_thread_.join();
Alexei Frolov4fd8c802021-11-09 09:49:25 -080058 }
Alexei Frolov3d99d922021-11-04 13:13:42 -070059
60 rpc::RawClientTestContext<> context_;
61
Alexei Frolov22ee1142022-02-03 13:59:01 -080062 Thread<1, 1> transfer_thread_;
Alexei Frolov3d99d922021-11-04 13:13:42 -070063 Client client_;
Alexei Frolov3d99d922021-11-04 13:13:42 -070064
Alexei Frolov22ee1142022-02-03 13:59:01 -080065 std::array<std::byte, 64> chunk_buffer_;
66 std::array<std::byte, 64> encode_buffer_;
67
68 thread::Thread system_thread_;
Alexei Frolov3d99d922021-11-04 13:13:42 -070069};
70
71constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
72constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
73
74TEST_F(ReadTransfer, SingleChunk) {
75 stream::MemoryWriterBuffer<64> writer;
76 Status transfer_status = Status::Unknown();
77
Wyatt Heplerbad6d272022-02-16 07:15:07 -080078 ASSERT_EQ(OkStatus(),
79 client_.Read(3, writer, [&transfer_status](Status status) {
80 transfer_status = status;
81 }));
Alexei Frolov3d99d922021-11-04 13:13:42 -070082
Alexei Frolov22ee1142022-02-03 13:59:01 -080083 transfer_thread_.WaitUntilEventIsProcessed();
84
Alexei Frolov3d99d922021-11-04 13:13:42 -070085 // First transfer parameters chunk is sent.
86 rpc::PayloadsView payloads =
87 context_.output().payloads<Transfer::Read>(context_.channel().id());
88 ASSERT_EQ(payloads.size(), 1u);
89 EXPECT_EQ(transfer_status, Status::Unknown());
90
91 Chunk c0 = DecodeChunk(payloads[0]);
92 EXPECT_EQ(c0.transfer_id, 3u);
93 EXPECT_EQ(c0.offset, 0u);
94 EXPECT_EQ(c0.pending_bytes.value(), 64u);
95
96 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
97 {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -080098 transfer_thread_.WaitUntilEventIsProcessed();
99
Alexei Frolov3d99d922021-11-04 13:13:42 -0700100 ASSERT_EQ(payloads.size(), 2u);
101
102 Chunk c1 = DecodeChunk(payloads[1]);
103 EXPECT_EQ(c1.transfer_id, 3u);
104 ASSERT_TRUE(c1.status.has_value());
105 EXPECT_EQ(c1.status.value(), OkStatus());
106
107 EXPECT_EQ(transfer_status, OkStatus());
108 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
109 0);
110}
111
112TEST_F(ReadTransfer, MultiChunk) {
113 stream::MemoryWriterBuffer<64> writer;
114 Status transfer_status = Status::Unknown();
115
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800116 ASSERT_EQ(OkStatus(),
117 client_.Read(4, writer, [&transfer_status](Status status) {
118 transfer_status = status;
119 }));
Alexei Frolov3d99d922021-11-04 13:13:42 -0700120
Alexei Frolov22ee1142022-02-03 13:59:01 -0800121 transfer_thread_.WaitUntilEventIsProcessed();
122
Alexei Frolov3d99d922021-11-04 13:13:42 -0700123 // First transfer parameters chunk is sent.
124 rpc::PayloadsView payloads =
125 context_.output().payloads<Transfer::Read>(context_.channel().id());
126 ASSERT_EQ(payloads.size(), 1u);
127 EXPECT_EQ(transfer_status, Status::Unknown());
128
129 Chunk c0 = DecodeChunk(payloads[0]);
130 EXPECT_EQ(c0.transfer_id, 4u);
131 EXPECT_EQ(c0.offset, 0u);
132 EXPECT_EQ(c0.pending_bytes.value(), 64u);
133
134 constexpr ConstByteSpan data(kData32);
135 context_.server().SendServerStream<Transfer::Read>(
136 EncodeChunk({.transfer_id = 4u, .offset = 0, .data = data.first(16)}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800137 transfer_thread_.WaitUntilEventIsProcessed();
138
Alexei Frolov3d99d922021-11-04 13:13:42 -0700139 ASSERT_EQ(payloads.size(), 1u);
140
141 context_.server().SendServerStream<Transfer::Read>(
142 EncodeChunk({.transfer_id = 4u,
143 .offset = 16,
144 .data = data.subspan(16),
145 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800146 transfer_thread_.WaitUntilEventIsProcessed();
147
Alexei Frolov3d99d922021-11-04 13:13:42 -0700148 ASSERT_EQ(payloads.size(), 2u);
149
150 Chunk c1 = DecodeChunk(payloads[1]);
151 EXPECT_EQ(c1.transfer_id, 4u);
152 ASSERT_TRUE(c1.status.has_value());
153 EXPECT_EQ(c1.status.value(), OkStatus());
154
155 EXPECT_EQ(transfer_status, OkStatus());
156 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
157 0);
158}
159
Wyatt Heplerf7c59382021-11-17 14:02:36 -0800160TEST_F(ReadTransfer, MultipleTransfers) {
161 stream::MemoryWriterBuffer<64> writer;
162 Status transfer_status = Status::Unknown();
163
164 ASSERT_EQ(OkStatus(),
165 client_.Read(3, writer, [&transfer_status](Status status) {
166 transfer_status = status;
167 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800168 transfer_thread_.WaitUntilEventIsProcessed();
Wyatt Heplerf7c59382021-11-17 14:02:36 -0800169
170 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
171 {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800172 transfer_thread_.WaitUntilEventIsProcessed();
Wyatt Heplerf7c59382021-11-17 14:02:36 -0800173
174 ASSERT_EQ(transfer_status, OkStatus());
175 transfer_status = Status::Unknown();
176
177 ASSERT_EQ(OkStatus(),
178 client_.Read(3, writer, [&transfer_status](Status status) {
179 transfer_status = status;
180 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800181 transfer_thread_.WaitUntilEventIsProcessed();
Wyatt Heplerf7c59382021-11-17 14:02:36 -0800182
183 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
184 {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800185 transfer_thread_.WaitUntilEventIsProcessed();
Wyatt Heplerf7c59382021-11-17 14:02:36 -0800186
187 EXPECT_EQ(transfer_status, OkStatus());
188}
189
Alexei Frolov3d99d922021-11-04 13:13:42 -0700190class ReadTransferMaxBytes32 : public ReadTransfer {
191 protected:
192 ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
193};
194
195TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
196 stream::MemoryWriterBuffer<64> writer;
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800197 EXPECT_EQ(OkStatus(), client_.Read(5, writer, [](Status) {}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800198 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700199
200 // First transfer parameters chunk is sent.
201 rpc::PayloadsView payloads =
202 context_.output().payloads<Transfer::Read>(context_.channel().id());
203 ASSERT_EQ(payloads.size(), 1u);
204
205 Chunk c0 = DecodeChunk(payloads[0]);
206 EXPECT_EQ(c0.transfer_id, 5u);
207 EXPECT_EQ(c0.offset, 0u);
208 ASSERT_EQ(c0.pending_bytes.value(), 32u);
209}
210
211TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
212 stream::MemoryWriterBuffer<16> small_writer;
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800213 EXPECT_EQ(OkStatus(), client_.Read(5, small_writer, [](Status) {}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800214 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700215
216 // First transfer parameters chunk is sent.
217 rpc::PayloadsView payloads =
218 context_.output().payloads<Transfer::Read>(context_.channel().id());
219 ASSERT_EQ(payloads.size(), 1u);
220
221 Chunk c0 = DecodeChunk(payloads[0]);
222 EXPECT_EQ(c0.transfer_id, 5u);
223 EXPECT_EQ(c0.offset, 0u);
224 ASSERT_EQ(c0.pending_bytes.value(), 16u);
225}
226
227TEST_F(ReadTransferMaxBytes32, MultiParameters) {
228 stream::MemoryWriterBuffer<64> writer;
229 Status transfer_status = Status::Unknown();
230
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800231 ASSERT_EQ(OkStatus(),
232 client_.Read(6, writer, [&transfer_status](Status status) {
233 transfer_status = status;
234 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800235 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700236
237 // First transfer parameters chunk is sent.
238 rpc::PayloadsView payloads =
239 context_.output().payloads<Transfer::Read>(context_.channel().id());
240 ASSERT_EQ(payloads.size(), 1u);
241 EXPECT_EQ(transfer_status, Status::Unknown());
242
243 Chunk c0 = DecodeChunk(payloads[0]);
244 EXPECT_EQ(c0.transfer_id, 6u);
245 EXPECT_EQ(c0.offset, 0u);
246 ASSERT_EQ(c0.pending_bytes.value(), 32u);
247
248 constexpr ConstByteSpan data(kData64);
249 context_.server().SendServerStream<Transfer::Read>(
250 EncodeChunk({.transfer_id = 6u, .offset = 0, .data = data.first(32)}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800251 transfer_thread_.WaitUntilEventIsProcessed();
252
Alexei Frolov3d99d922021-11-04 13:13:42 -0700253 ASSERT_EQ(payloads.size(), 2u);
254 EXPECT_EQ(transfer_status, Status::Unknown());
255
256 // Second parameters chunk.
257 Chunk c1 = DecodeChunk(payloads[1]);
258 EXPECT_EQ(c1.transfer_id, 6u);
259 EXPECT_EQ(c1.offset, 32u);
260 ASSERT_EQ(c1.pending_bytes.value(), 32u);
261
262 context_.server().SendServerStream<Transfer::Read>(
263 EncodeChunk({.transfer_id = 6u,
264 .offset = 32,
265 .data = data.subspan(32),
266 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800267 transfer_thread_.WaitUntilEventIsProcessed();
268
Alexei Frolov3d99d922021-11-04 13:13:42 -0700269 ASSERT_EQ(payloads.size(), 3u);
270
271 Chunk c2 = DecodeChunk(payloads[2]);
272 EXPECT_EQ(c2.transfer_id, 6u);
273 ASSERT_TRUE(c2.status.has_value());
274 EXPECT_EQ(c2.status.value(), OkStatus());
275
276 EXPECT_EQ(transfer_status, OkStatus());
277 EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
278}
279
280TEST_F(ReadTransfer, UnexpectedOffset) {
281 stream::MemoryWriterBuffer<64> writer;
282 Status transfer_status = Status::Unknown();
283
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800284 ASSERT_EQ(OkStatus(),
285 client_.Read(7, writer, [&transfer_status](Status status) {
286 transfer_status = status;
287 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800288 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700289
290 // First transfer parameters chunk is sent.
291 rpc::PayloadsView payloads =
292 context_.output().payloads<Transfer::Read>(context_.channel().id());
293 ASSERT_EQ(payloads.size(), 1u);
294 EXPECT_EQ(transfer_status, Status::Unknown());
295
296 Chunk c0 = DecodeChunk(payloads[0]);
297 EXPECT_EQ(c0.transfer_id, 7u);
298 EXPECT_EQ(c0.offset, 0u);
299 EXPECT_EQ(c0.pending_bytes.value(), 64u);
300
301 constexpr ConstByteSpan data(kData32);
302 context_.server().SendServerStream<Transfer::Read>(
303 EncodeChunk({.transfer_id = 7u, .offset = 0, .data = data.first(16)}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800304 transfer_thread_.WaitUntilEventIsProcessed();
305
Alexei Frolov3d99d922021-11-04 13:13:42 -0700306 ASSERT_EQ(payloads.size(), 1u);
307 EXPECT_EQ(transfer_status, Status::Unknown());
308
309 // Send a chunk with an incorrect offset. The client should resend parameters.
310 context_.server().SendServerStream<Transfer::Read>(
311 EncodeChunk({.transfer_id = 7u,
312 .offset = 8, // wrong!
313 .data = data.subspan(16),
314 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800315 transfer_thread_.WaitUntilEventIsProcessed();
316
Alexei Frolov3d99d922021-11-04 13:13:42 -0700317 ASSERT_EQ(payloads.size(), 2u);
318 EXPECT_EQ(transfer_status, Status::Unknown());
319
320 Chunk c1 = DecodeChunk(payloads[1]);
321 EXPECT_EQ(c1.transfer_id, 7u);
322 EXPECT_EQ(c1.offset, 16u);
323 EXPECT_EQ(c1.pending_bytes.value(), 48u);
324
325 // Send the correct chunk, completing the transfer.
326 context_.server().SendServerStream<Transfer::Read>(
327 EncodeChunk({.transfer_id = 7u,
328 .offset = 16,
329 .data = data.subspan(16),
330 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800331 transfer_thread_.WaitUntilEventIsProcessed();
332
Alexei Frolov3d99d922021-11-04 13:13:42 -0700333 ASSERT_EQ(payloads.size(), 3u);
334
335 Chunk c2 = DecodeChunk(payloads[2]);
336 EXPECT_EQ(c2.transfer_id, 7u);
337 ASSERT_TRUE(c2.status.has_value());
338 EXPECT_EQ(c2.status.value(), OkStatus());
339
340 EXPECT_EQ(transfer_status, OkStatus());
341 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
342 0);
343}
344
345TEST_F(ReadTransferMaxBytes32, TooMuchData) {
Alexei Frolov22ee1142022-02-03 13:59:01 -0800346 stream::MemoryWriterBuffer<32> writer;
Alexei Frolov3d99d922021-11-04 13:13:42 -0700347 Status transfer_status = Status::Unknown();
348
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800349 ASSERT_EQ(OkStatus(),
350 client_.Read(8, writer, [&transfer_status](Status status) {
351 transfer_status = status;
352 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800353 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700354
355 // First transfer parameters chunk is sent.
356 rpc::PayloadsView payloads =
357 context_.output().payloads<Transfer::Read>(context_.channel().id());
358 ASSERT_EQ(payloads.size(), 1u);
359 EXPECT_EQ(transfer_status, Status::Unknown());
360
361 Chunk c0 = DecodeChunk(payloads[0]);
362 EXPECT_EQ(c0.transfer_id, 8u);
363 EXPECT_EQ(c0.offset, 0u);
364 ASSERT_EQ(c0.pending_bytes.value(), 32u);
365
366 constexpr ConstByteSpan data(kData64);
367
368 // pending_bytes == 32
369 context_.server().SendServerStream<Transfer::Read>(
370 EncodeChunk({.transfer_id = 8u, .offset = 0, .data = data.first(16)}));
371
372 // pending_bytes == 16
373 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
374 {.transfer_id = 8u, .offset = 16, .data = data.subspan(16, 8)}));
375
376 // pending_bytes == 8, send 16 instead.
377 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
378 {.transfer_id = 8u, .offset = 24, .data = data.subspan(24, 16)}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800379 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700380
Alexei Frolov22ee1142022-02-03 13:59:01 -0800381 ASSERT_EQ(payloads.size(), 4u);
Alexei Frolov3d99d922021-11-04 13:13:42 -0700382
Alexei Frolov22ee1142022-02-03 13:59:01 -0800383 Chunk c1 = DecodeChunk(payloads[3]);
Alexei Frolov3d99d922021-11-04 13:13:42 -0700384 EXPECT_EQ(c1.transfer_id, 8u);
385 ASSERT_TRUE(c1.status.has_value());
386 EXPECT_EQ(c1.status.value(), Status::Internal());
387
388 EXPECT_EQ(transfer_status, Status::Internal());
389}
390
391TEST_F(ReadTransfer, ServerError) {
392 stream::MemoryWriterBuffer<64> writer;
393 Status transfer_status = Status::Unknown();
394
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800395 ASSERT_EQ(OkStatus(),
396 client_.Read(9, writer, [&transfer_status](Status status) {
397 transfer_status = status;
398 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800399 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700400
401 // First transfer parameters chunk is sent.
402 rpc::PayloadsView payloads =
403 context_.output().payloads<Transfer::Read>(context_.channel().id());
404 ASSERT_EQ(payloads.size(), 1u);
405 EXPECT_EQ(transfer_status, Status::Unknown());
406
407 Chunk c0 = DecodeChunk(payloads[0]);
408 EXPECT_EQ(c0.transfer_id, 9u);
409 EXPECT_EQ(c0.offset, 0u);
410 ASSERT_EQ(c0.pending_bytes.value(), 64u);
411
412 // Server sends an error. Client should not respond and terminate the
413 // transfer.
414 context_.server().SendServerStream<Transfer::Read>(
415 EncodeChunk({.transfer_id = 9u, .status = Status::NotFound()}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800416 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700417
Alexei Frolov22ee1142022-02-03 13:59:01 -0800418 ASSERT_EQ(payloads.size(), 1u);
Alexei Frolov3d99d922021-11-04 13:13:42 -0700419 EXPECT_EQ(transfer_status, Status::NotFound());
420}
421
422TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
423 stream::MemoryWriterBuffer<64> writer;
424 Status transfer_status = Status::Unknown();
425
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800426 ASSERT_EQ(OkStatus(),
427 client_.Read(10, writer, [&transfer_status](Status status) {
428 transfer_status = status;
429 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800430 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700431
432 // First transfer parameters chunk is sent.
433 rpc::PayloadsView payloads =
434 context_.output().payloads<Transfer::Read>(context_.channel().id());
435 ASSERT_EQ(payloads.size(), 1u);
436 EXPECT_EQ(transfer_status, Status::Unknown());
437
438 Chunk c0 = DecodeChunk(payloads[0]);
439 EXPECT_EQ(c0.transfer_id, 10u);
440 EXPECT_EQ(c0.offset, 0u);
441 ASSERT_EQ(c0.pending_bytes.value(), 64u);
442
443 constexpr ConstByteSpan data(kData64);
444
445 // Send the first 8 bytes of the transfer.
446 context_.server().SendServerStream<Transfer::Read>(
447 EncodeChunk({.transfer_id = 10u, .offset = 0, .data = data.first(8)}));
448
449 // Skip offset 8, send the rest starting from 16.
450 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
451 context_.server().SendServerStream<Transfer::Read>(
452 EncodeChunk({.transfer_id = 10u,
453 .offset = offset,
454 .data = data.subspan(offset, 8)}));
455 }
Alexei Frolov22ee1142022-02-03 13:59:01 -0800456 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700457
458 // Only one parameters update should be sent, with the offset of the initial
459 // dropped packet.
460 ASSERT_EQ(payloads.size(), 2u);
461
462 Chunk c1 = DecodeChunk(payloads[1]);
463 EXPECT_EQ(c1.transfer_id, 10u);
464 EXPECT_EQ(c1.offset, 8u);
465 ASSERT_EQ(c1.pending_bytes.value(), 56u);
466
467 // Send the remaining data to complete the transfer.
468 context_.server().SendServerStream<Transfer::Read>(
469 EncodeChunk({.transfer_id = 10u,
470 .offset = 8,
471 .data = data.subspan(8, 56),
472 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800473 transfer_thread_.WaitUntilEventIsProcessed();
474
Alexei Frolov3d99d922021-11-04 13:13:42 -0700475 ASSERT_EQ(payloads.size(), 3u);
476
477 Chunk c2 = DecodeChunk(payloads[2]);
478 EXPECT_EQ(c2.transfer_id, 10u);
479 ASSERT_TRUE(c2.status.has_value());
480 EXPECT_EQ(c2.status.value(), OkStatus());
481
482 EXPECT_EQ(transfer_status, OkStatus());
483}
484
485TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
486 stream::MemoryWriterBuffer<64> writer;
487 Status transfer_status = Status::Unknown();
488
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800489 ASSERT_EQ(OkStatus(),
490 client_.Read(11, writer, [&transfer_status](Status status) {
491 transfer_status = status;
492 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800493 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700494
495 // First transfer parameters chunk is sent.
496 rpc::PayloadsView payloads =
497 context_.output().payloads<Transfer::Read>(context_.channel().id());
498 ASSERT_EQ(payloads.size(), 1u);
499 EXPECT_EQ(transfer_status, Status::Unknown());
500
501 Chunk c0 = DecodeChunk(payloads[0]);
502 EXPECT_EQ(c0.transfer_id, 11u);
503 EXPECT_EQ(c0.offset, 0u);
504 ASSERT_EQ(c0.pending_bytes.value(), 64u);
505
506 constexpr ConstByteSpan data(kData64);
507
508 // Send the first 8 bytes of the transfer.
509 context_.server().SendServerStream<Transfer::Read>(
510 EncodeChunk({.transfer_id = 11u, .offset = 0, .data = data.first(8)}));
511
512 // Skip offset 8, send the rest starting from 16.
513 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
514 context_.server().SendServerStream<Transfer::Read>(
515 EncodeChunk({.transfer_id = 11u,
516 .offset = offset,
517 .data = data.subspan(offset, 8)}));
518 }
Alexei Frolov22ee1142022-02-03 13:59:01 -0800519 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700520
521 // Only one parameters update should be sent, with the offset of the initial
522 // dropped packet.
523 ASSERT_EQ(payloads.size(), 2u);
524
525 const Chunk last_chunk = {
526 .transfer_id = 11u, .offset = 56, .data = data.subspan(56)};
527
528 // Re-send the final chunk of the block.
529 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800530 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700531
532 // The original drop parameters should be re-sent.
533 ASSERT_EQ(payloads.size(), 3u);
534 Chunk c2 = DecodeChunk(payloads[2]);
535 EXPECT_EQ(c2.transfer_id, 11u);
536 EXPECT_EQ(c2.offset, 8u);
537 ASSERT_EQ(c2.pending_bytes.value(), 56u);
538
539 // Do it again.
540 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800541 transfer_thread_.WaitUntilEventIsProcessed();
542
Alexei Frolov3d99d922021-11-04 13:13:42 -0700543 ASSERT_EQ(payloads.size(), 4u);
544 Chunk c3 = DecodeChunk(payloads[3]);
545 EXPECT_EQ(c3.transfer_id, 11u);
546 EXPECT_EQ(c3.offset, 8u);
547 ASSERT_EQ(c3.pending_bytes.value(), 56u);
548
549 // Finish the transfer normally.
550 context_.server().SendServerStream<Transfer::Read>(
551 EncodeChunk({.transfer_id = 11u,
552 .offset = 8,
553 .data = data.subspan(8, 56),
554 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800555 transfer_thread_.WaitUntilEventIsProcessed();
556
Alexei Frolov3d99d922021-11-04 13:13:42 -0700557 ASSERT_EQ(payloads.size(), 5u);
558
559 Chunk c4 = DecodeChunk(payloads[4]);
560 EXPECT_EQ(c4.transfer_id, 11u);
561 ASSERT_TRUE(c4.status.has_value());
562 EXPECT_EQ(c4.status.value(), OkStatus());
563
564 EXPECT_EQ(transfer_status, OkStatus());
565}
566
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800567constexpr chrono::SystemClock::duration kTestTimeout =
568 std::chrono::milliseconds(50);
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800569constexpr uint8_t kTestRetries = 3;
570
571TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
572 stream::MemoryWriterBuffer<64> writer;
573 Status transfer_status = Status::Unknown();
574
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800575 ASSERT_EQ(OkStatus(),
576 client_.Read(
577 12,
578 writer,
579 [&transfer_status](Status status) { transfer_status = status; },
580 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800581 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800582
583 // First transfer parameters chunk is sent.
584 rpc::PayloadsView payloads =
585 context_.output().payloads<Transfer::Read>(context_.channel().id());
586 ASSERT_EQ(payloads.size(), 1u);
587 EXPECT_EQ(transfer_status, Status::Unknown());
588
589 Chunk c0 = DecodeChunk(payloads.back());
590 EXPECT_EQ(c0.transfer_id, 12u);
591 EXPECT_EQ(c0.offset, 0u);
592 EXPECT_EQ(c0.pending_bytes.value(), 64u);
593
594 // Wait for the timeout to expire without doing anything. The client should
595 // resend its parameters chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -0800596 transfer_thread_.SimulateClientTimeout(12);
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800597 ASSERT_EQ(payloads.size(), 2u);
598
599 Chunk c = DecodeChunk(payloads.back());
600 EXPECT_EQ(c.transfer_id, 12u);
601 EXPECT_EQ(c.offset, 0u);
602 EXPECT_EQ(c.pending_bytes.value(), 64u);
603
604 // Transfer has not yet completed.
605 EXPECT_EQ(transfer_status, Status::Unknown());
606
607 // Finish the transfer following the timeout.
608 context_.server().SendServerStream<Transfer::Read>(
609 EncodeChunk({.transfer_id = 12u,
610 .offset = 0,
611 .data = kData32,
612 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800613 transfer_thread_.WaitUntilEventIsProcessed();
614
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800615 ASSERT_EQ(payloads.size(), 3u);
616
617 Chunk c4 = DecodeChunk(payloads.back());
618 EXPECT_EQ(c4.transfer_id, 12u);
619 ASSERT_TRUE(c4.status.has_value());
620 EXPECT_EQ(c4.status.value(), OkStatus());
621
622 EXPECT_EQ(transfer_status, OkStatus());
623}
624
625TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
626 stream::MemoryWriterBuffer<64> writer;
627 Status transfer_status = Status::Unknown();
628
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800629 ASSERT_EQ(OkStatus(),
630 client_.Read(
631 13,
632 writer,
633 [&transfer_status](Status status) { transfer_status = status; },
634 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800635 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800636
637 // First transfer parameters chunk is sent.
638 rpc::PayloadsView payloads =
639 context_.output().payloads<Transfer::Read>(context_.channel().id());
640 ASSERT_EQ(payloads.size(), 1u);
641 EXPECT_EQ(transfer_status, Status::Unknown());
642
643 Chunk c0 = DecodeChunk(payloads.back());
644 EXPECT_EQ(c0.transfer_id, 13u);
645 EXPECT_EQ(c0.offset, 0u);
646 EXPECT_EQ(c0.pending_bytes.value(), 64u);
647
648 constexpr ConstByteSpan data(kData32);
649
650 // Send some data, but not everything.
651 context_.server().SendServerStream<Transfer::Read>(
652 EncodeChunk({.transfer_id = 13u, .offset = 0, .data = data.first(16)}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800653 transfer_thread_.WaitUntilEventIsProcessed();
654
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800655 ASSERT_EQ(payloads.size(), 1u);
656
657 // Wait for the timeout to expire without sending more data. The client should
658 // send an updated parameters chunk, accounting for the data already received.
Alexei Frolov22ee1142022-02-03 13:59:01 -0800659 transfer_thread_.SimulateClientTimeout(13);
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800660 ASSERT_EQ(payloads.size(), 2u);
661
662 Chunk c = DecodeChunk(payloads.back());
663 EXPECT_EQ(c.transfer_id, 13u);
664 EXPECT_EQ(c.offset, 16u);
665 EXPECT_EQ(c.pending_bytes.value(), 48u);
666
667 // Transfer has not yet completed.
668 EXPECT_EQ(transfer_status, Status::Unknown());
669
670 // Send the rest of the data, finishing the transfer.
671 context_.server().SendServerStream<Transfer::Read>(
672 EncodeChunk({.transfer_id = 13u,
673 .offset = 16,
674 .data = data.subspan(16),
675 .remaining_bytes = 0}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800676 transfer_thread_.WaitUntilEventIsProcessed();
677
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800678 ASSERT_EQ(payloads.size(), 3u);
679
680 Chunk c4 = DecodeChunk(payloads.back());
681 EXPECT_EQ(c4.transfer_id, 13u);
682 ASSERT_TRUE(c4.status.has_value());
683 EXPECT_EQ(c4.status.value(), OkStatus());
684
685 EXPECT_EQ(transfer_status, OkStatus());
686}
687
688TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
689 stream::MemoryWriterBuffer<64> writer;
690 Status transfer_status = Status::Unknown();
691
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800692 ASSERT_EQ(OkStatus(),
693 client_.Read(
694 14,
695 writer,
696 [&transfer_status](Status status) { transfer_status = status; },
697 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800698 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800699
700 // First transfer parameters chunk is sent.
701 rpc::PayloadsView payloads =
702 context_.output().payloads<Transfer::Read>(context_.channel().id());
703 ASSERT_EQ(payloads.size(), 1u);
704 EXPECT_EQ(transfer_status, Status::Unknown());
705
706 Chunk c0 = DecodeChunk(payloads.back());
707 EXPECT_EQ(c0.transfer_id, 14u);
708 EXPECT_EQ(c0.offset, 0u);
709 EXPECT_EQ(c0.pending_bytes.value(), 64u);
710
711 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
712 // Wait for the timeout to expire without doing anything. The client should
713 // resend its parameters chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -0800714 transfer_thread_.SimulateClientTimeout(14);
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800715 ASSERT_EQ(payloads.size(), retry + 1);
716
717 Chunk c = DecodeChunk(payloads.back());
718 EXPECT_EQ(c.transfer_id, 14u);
719 EXPECT_EQ(c.offset, 0u);
720 EXPECT_EQ(c.pending_bytes.value(), 64u);
721
722 // Transfer has not yet completed.
723 EXPECT_EQ(transfer_status, Status::Unknown());
724 }
725
726 // Sleep one more time after the final retry. The client should cancel the
727 // transfer at this point and send a DEADLINE_EXCEEDED chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -0800728 transfer_thread_.SimulateClientTimeout(14);
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800729 ASSERT_EQ(payloads.size(), 5u);
730
731 Chunk c4 = DecodeChunk(payloads.back());
732 EXPECT_EQ(c4.transfer_id, 14u);
733 ASSERT_TRUE(c4.status.has_value());
734 EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
735
736 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
737
738 // After finishing the transfer, nothing else should be sent. Verify this by
739 // waiting for a bit.
740 this_thread::sleep_for(kTestTimeout * 4);
741 ASSERT_EQ(payloads.size(), 5u);
742}
743
Alexei Frolova66909a2022-03-08 11:37:26 -0800744TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
745 stream::MemoryWriterBuffer<64> writer;
746 Status transfer_status = Status::Unknown();
747
748 constexpr ConstByteSpan data(kData32);
749
750 ASSERT_EQ(OkStatus(),
751 client_.Read(
752 14,
753 writer,
754 [&transfer_status](Status status) { transfer_status = status; },
755 kTestTimeout));
756 transfer_thread_.WaitUntilEventIsProcessed();
757
758 // First transfer parameters chunk is sent.
759 rpc::PayloadsView payloads =
760 context_.output().payloads<Transfer::Read>(context_.channel().id());
761 ASSERT_EQ(payloads.size(), 1u);
762 EXPECT_EQ(transfer_status, Status::Unknown());
763
764 Chunk c0 = DecodeChunk(payloads.back());
765 EXPECT_EQ(c0.transfer_id, 14u);
766 EXPECT_EQ(c0.offset, 0u);
767 EXPECT_EQ(c0.window_end_offset, 64u);
768
769 // Simulate one less timeout than the maximum amount of retries.
770 for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
771 transfer_thread_.SimulateClientTimeout(14);
772 ASSERT_EQ(payloads.size(), retry + 1);
773
774 Chunk c = DecodeChunk(payloads.back());
775 EXPECT_EQ(c.transfer_id, 14u);
776 EXPECT_EQ(c.offset, 0u);
777 EXPECT_EQ(c.window_end_offset, 64u);
778
779 // Transfer has not yet completed.
780 EXPECT_EQ(transfer_status, Status::Unknown());
781 }
782
783 // Send some data.
784 context_.server().SendServerStream<Transfer::Read>(
785 EncodeChunk({.transfer_id = 14u, .offset = 0, .data = data.first(16)}));
786 transfer_thread_.WaitUntilEventIsProcessed();
787 ASSERT_EQ(payloads.size(), 3u);
788
789 // Time out a couple more times. The context's retry count should have been
790 // reset, so it should go through the standard retry flow instead of
791 // terminating the transfer.
792 transfer_thread_.SimulateClientTimeout(14);
793 ASSERT_EQ(payloads.size(), 4u);
794
795 Chunk c = DecodeChunk(payloads.back());
796 EXPECT_FALSE(c.status.has_value());
797 EXPECT_EQ(c.transfer_id, 14u);
798 EXPECT_EQ(c.offset, 16u);
799 EXPECT_EQ(c.window_end_offset, 64u);
800
801 transfer_thread_.SimulateClientTimeout(14);
802 ASSERT_EQ(payloads.size(), 5u);
803
804 c = DecodeChunk(payloads.back());
805 EXPECT_FALSE(c.status.has_value());
806 EXPECT_EQ(c.transfer_id, 14u);
807 EXPECT_EQ(c.offset, 16u);
808 EXPECT_EQ(c.window_end_offset, 64u);
809}
810
Wyatt Hepler384c54d2022-02-09 12:02:36 -0800811TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
812 stream::MemoryWriterBuffer<64> writer;
813 Status transfer_status = Status::Unknown();
814
815 context_.output().set_send_status(Status::Unauthenticated());
816
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800817 ASSERT_EQ(OkStatus(),
818 client_.Read(
819 14,
820 writer,
821 [&transfer_status](Status status) {
822 ASSERT_EQ(transfer_status,
823 Status::Unknown()); // Must only call once
824 transfer_status = status;
825 },
826 kTestTimeout));
Wyatt Hepler384c54d2022-02-09 12:02:36 -0800827 transfer_thread_.WaitUntilEventIsProcessed();
828
829 EXPECT_EQ(transfer_status, Status::Internal());
830}
831
Alexei Frolov3d99d922021-11-04 13:13:42 -0700832class WriteTransfer : public ::testing::Test {
833 protected:
834 WriteTransfer()
Alexei Frolov22ee1142022-02-03 13:59:01 -0800835 : transfer_thread_(chunk_buffer_, encode_buffer_),
836 client_(context_.client(), context_.channel().id(), transfer_thread_),
837 system_thread_(TransferThreadOptions(), transfer_thread_) {}
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800838
839 ~WriteTransfer() {
Alexei Frolov22ee1142022-02-03 13:59:01 -0800840 transfer_thread_.Terminate();
841 system_thread_.join();
Alexei Frolov4fd8c802021-11-09 09:49:25 -0800842 }
Alexei Frolov3d99d922021-11-04 13:13:42 -0700843
844 rpc::RawClientTestContext<> context_;
845
Alexei Frolov22ee1142022-02-03 13:59:01 -0800846 Thread<1, 1> transfer_thread_;
Alexei Frolov3d99d922021-11-04 13:13:42 -0700847 Client client_;
Alexei Frolov3d99d922021-11-04 13:13:42 -0700848
Alexei Frolov22ee1142022-02-03 13:59:01 -0800849 std::array<std::byte, 64> chunk_buffer_;
850 std::array<std::byte, 64> encode_buffer_;
851
852 thread::Thread system_thread_;
Alexei Frolov3d99d922021-11-04 13:13:42 -0700853};
854
855TEST_F(WriteTransfer, SingleChunk) {
856 stream::MemoryReader reader(kData32);
857 Status transfer_status = Status::Unknown();
858
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800859 ASSERT_EQ(OkStatus(),
860 client_.Write(3, reader, [&transfer_status](Status status) {
861 transfer_status = status;
862 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800863 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700864
865 // The client begins by just sending the transfer ID.
866 rpc::PayloadsView payloads =
867 context_.output().payloads<Transfer::Write>(context_.channel().id());
868 ASSERT_EQ(payloads.size(), 1u);
869 EXPECT_EQ(transfer_status, Status::Unknown());
870
871 Chunk c0 = DecodeChunk(payloads[0]);
872 EXPECT_EQ(c0.transfer_id, 3u);
873
Alexei Frolov22ee1142022-02-03 13:59:01 -0800874 // Send transfer parameters. Client should send a data chunk and the final
875 // chunk.
876 rpc::test::WaitForPackets(context_.output(), 2, [this] {
877 context_.server().SendServerStream<Transfer::Write>(
878 EncodeChunk({.transfer_id = 3,
879 .pending_bytes = 64,
880 .max_chunk_size_bytes = 32,
881 .offset = 0}));
882 });
Alexei Frolov3d99d922021-11-04 13:13:42 -0700883
Alexei Frolov3d99d922021-11-04 13:13:42 -0700884 ASSERT_EQ(payloads.size(), 3u);
885
886 Chunk c1 = DecodeChunk(payloads[1]);
887 EXPECT_EQ(c1.transfer_id, 3u);
888 EXPECT_EQ(c1.offset, 0u);
889 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
890
891 Chunk c2 = DecodeChunk(payloads[2]);
892 EXPECT_EQ(c2.transfer_id, 3u);
893 ASSERT_TRUE(c2.remaining_bytes.has_value());
894 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
895
896 EXPECT_EQ(transfer_status, Status::Unknown());
897
898 // Send the final status chunk to complete the transfer.
899 context_.server().SendServerStream<Transfer::Write>(
900 EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800901 transfer_thread_.WaitUntilEventIsProcessed();
902
Alexei Frolov3d99d922021-11-04 13:13:42 -0700903 EXPECT_EQ(payloads.size(), 3u);
904 EXPECT_EQ(transfer_status, OkStatus());
905}
906
907TEST_F(WriteTransfer, MultiChunk) {
908 stream::MemoryReader reader(kData32);
909 Status transfer_status = Status::Unknown();
910
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800911 ASSERT_EQ(OkStatus(),
912 client_.Write(4, reader, [&transfer_status](Status status) {
913 transfer_status = status;
914 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800915 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700916
917 // The client begins by just sending the transfer ID.
918 rpc::PayloadsView payloads =
919 context_.output().payloads<Transfer::Write>(context_.channel().id());
920 ASSERT_EQ(payloads.size(), 1u);
921 EXPECT_EQ(transfer_status, Status::Unknown());
922
923 Chunk c0 = DecodeChunk(payloads[0]);
924 EXPECT_EQ(c0.transfer_id, 4u);
925
926 // Send transfer parameters with a chunk size smaller than the data.
Alexei Frolov3d99d922021-11-04 13:13:42 -0700927
928 // Client should send two data chunks and the final chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -0800929 rpc::test::WaitForPackets(context_.output(), 3, [this] {
930 context_.server().SendServerStream<Transfer::Write>(
931 EncodeChunk({.transfer_id = 4,
932 .pending_bytes = 64,
933 .max_chunk_size_bytes = 16,
934 .offset = 0}));
935 });
936
Alexei Frolov3d99d922021-11-04 13:13:42 -0700937 ASSERT_EQ(payloads.size(), 4u);
938
939 Chunk c1 = DecodeChunk(payloads[1]);
940 EXPECT_EQ(c1.transfer_id, 4u);
941 EXPECT_EQ(c1.offset, 0u);
942 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
943
944 Chunk c2 = DecodeChunk(payloads[2]);
945 EXPECT_EQ(c2.transfer_id, 4u);
946 EXPECT_EQ(c2.offset, 16u);
947 EXPECT_EQ(
948 std::memcmp(c2.data.data(), kData32.data() + c2.offset, c2.data.size()),
949 0);
950
951 Chunk c3 = DecodeChunk(payloads[3]);
952 EXPECT_EQ(c3.transfer_id, 4u);
953 ASSERT_TRUE(c3.remaining_bytes.has_value());
954 EXPECT_EQ(c3.remaining_bytes.value(), 0u);
955
956 EXPECT_EQ(transfer_status, Status::Unknown());
957
958 // Send the final status chunk to complete the transfer.
959 context_.server().SendServerStream<Transfer::Write>(
960 EncodeChunk({.transfer_id = 4, .status = OkStatus()}));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800961 transfer_thread_.WaitUntilEventIsProcessed();
962
Alexei Frolov3d99d922021-11-04 13:13:42 -0700963 EXPECT_EQ(payloads.size(), 4u);
964 EXPECT_EQ(transfer_status, OkStatus());
965}
966
967TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
968 stream::MemoryReader reader(kData32);
969 Status transfer_status = Status::Unknown();
970
Wyatt Heplerbad6d272022-02-16 07:15:07 -0800971 ASSERT_EQ(OkStatus(),
972 client_.Write(5, reader, [&transfer_status](Status status) {
973 transfer_status = status;
974 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -0800975 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -0700976
977 // The client begins by just sending the transfer ID.
978 rpc::PayloadsView payloads =
979 context_.output().payloads<Transfer::Write>(context_.channel().id());
980 ASSERT_EQ(payloads.size(), 1u);
981 EXPECT_EQ(transfer_status, Status::Unknown());
982
983 Chunk c0 = DecodeChunk(payloads[0]);
984 EXPECT_EQ(c0.transfer_id, 5u);
985
986 // Send transfer parameters with a nonzero offset, requesting a seek.
Alexei Frolov3d99d922021-11-04 13:13:42 -0700987 // Client should send a data chunk and the final chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -0800988 rpc::test::WaitForPackets(context_.output(), 2, [this] {
989 context_.server().SendServerStream<Transfer::Write>(
990 EncodeChunk({.transfer_id = 5,
991 .pending_bytes = 64,
992 .max_chunk_size_bytes = 32,
993 .offset = 16}));
994 });
995
Alexei Frolov3d99d922021-11-04 13:13:42 -0700996 ASSERT_EQ(payloads.size(), 3u);
997
998 Chunk c1 = DecodeChunk(payloads[1]);
999 EXPECT_EQ(c1.transfer_id, 5u);
1000 EXPECT_EQ(c1.offset, 16u);
1001 EXPECT_EQ(
1002 std::memcmp(c1.data.data(), kData32.data() + c1.offset, c1.data.size()),
1003 0);
1004
1005 Chunk c2 = DecodeChunk(payloads[2]);
1006 EXPECT_EQ(c2.transfer_id, 5u);
1007 ASSERT_TRUE(c2.remaining_bytes.has_value());
1008 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
1009
1010 EXPECT_EQ(transfer_status, Status::Unknown());
1011
1012 // Send the final status chunk to complete the transfer.
1013 context_.server().SendServerStream<Transfer::Write>(
1014 EncodeChunk({.transfer_id = 5, .status = OkStatus()}));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001015 transfer_thread_.WaitUntilEventIsProcessed();
1016
Alexei Frolov3d99d922021-11-04 13:13:42 -07001017 EXPECT_EQ(payloads.size(), 3u);
1018 EXPECT_EQ(transfer_status, OkStatus());
1019}
1020
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001021class FakeNonSeekableReader final : public stream::NonSeekableReader {
1022 public:
1023 FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
1024
Alexei Frolov3d99d922021-11-04 13:13:42 -07001025 private:
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001026 StatusWithSize DoRead(ByteSpan out) final {
1027 if (position_ == data_.size()) {
1028 return StatusWithSize::OutOfRange();
1029 }
1030
1031 size_t to_copy = std::min(out.size(), data_.size() - position_);
1032 std::memcpy(out.data(), data_.data() + position_, to_copy);
1033 position_ += to_copy;
1034
1035 return StatusWithSize(to_copy);
Alexei Frolov3d99d922021-11-04 13:13:42 -07001036 }
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001037
1038 ConstByteSpan data_;
1039 size_t position_;
Alexei Frolov3d99d922021-11-04 13:13:42 -07001040};
1041
1042TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001043 FakeNonSeekableReader reader(kData32);
Alexei Frolov3d99d922021-11-04 13:13:42 -07001044 Status transfer_status = Status::Unknown();
1045
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001046 ASSERT_EQ(OkStatus(),
1047 client_.Write(6, reader, [&transfer_status](Status status) {
1048 transfer_status = status;
1049 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001050 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001051
1052 // The client begins by just sending the transfer ID.
1053 rpc::PayloadsView payloads =
1054 context_.output().payloads<Transfer::Write>(context_.channel().id());
1055 ASSERT_EQ(payloads.size(), 1u);
1056 EXPECT_EQ(transfer_status, Status::Unknown());
1057
1058 Chunk c0 = DecodeChunk(payloads[0]);
1059 EXPECT_EQ(c0.transfer_id, 6u);
1060
1061 // Send transfer parameters with a nonzero offset, requesting a seek.
1062 context_.server().SendServerStream<Transfer::Write>(
1063 EncodeChunk({.transfer_id = 6,
1064 .pending_bytes = 64,
1065 .max_chunk_size_bytes = 32,
1066 .offset = 16}));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001067 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001068
1069 // Client should send a status chunk and end the transfer.
1070 ASSERT_EQ(payloads.size(), 2u);
1071
1072 Chunk c1 = DecodeChunk(payloads[1]);
1073 EXPECT_EQ(c1.transfer_id, 6u);
1074 ASSERT_TRUE(c1.status.has_value());
1075 EXPECT_EQ(c1.status.value(), Status::Unimplemented());
1076
1077 EXPECT_EQ(transfer_status, Status::Unimplemented());
1078}
1079
1080TEST_F(WriteTransfer, ServerError) {
1081 stream::MemoryReader reader(kData32);
1082 Status transfer_status = Status::Unknown();
1083
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001084 ASSERT_EQ(OkStatus(),
1085 client_.Write(7, reader, [&transfer_status](Status status) {
1086 transfer_status = status;
1087 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001088 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001089
1090 // The client begins by just sending the transfer ID.
1091 rpc::PayloadsView payloads =
1092 context_.output().payloads<Transfer::Write>(context_.channel().id());
1093 ASSERT_EQ(payloads.size(), 1u);
1094 EXPECT_EQ(transfer_status, Status::Unknown());
1095
1096 Chunk c0 = DecodeChunk(payloads[0]);
1097 EXPECT_EQ(c0.transfer_id, 7u);
1098
1099 // Send an error from the server.
1100 context_.server().SendServerStream<Transfer::Write>(
1101 EncodeChunk({.transfer_id = 7, .status = Status::NotFound()}));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001102 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001103
1104 // Client should not respond and terminate the transfer.
1105 EXPECT_EQ(payloads.size(), 1u);
1106 EXPECT_EQ(transfer_status, Status::NotFound());
1107}
1108
1109TEST_F(WriteTransfer, MalformedParametersChunk) {
1110 stream::MemoryReader reader(kData32);
1111 Status transfer_status = Status::Unknown();
1112
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001113 ASSERT_EQ(OkStatus(),
1114 client_.Write(8, reader, [&transfer_status](Status status) {
1115 transfer_status = status;
1116 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001117 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001118
1119 // The client begins by just sending the transfer ID.
1120 rpc::PayloadsView payloads =
1121 context_.output().payloads<Transfer::Write>(context_.channel().id());
1122 ASSERT_EQ(payloads.size(), 1u);
1123 EXPECT_EQ(transfer_status, Status::Unknown());
1124
1125 Chunk c0 = DecodeChunk(payloads[0]);
1126 EXPECT_EQ(c0.transfer_id, 8u);
1127
1128 // Send an invalid transfer parameters chunk without pending_bytes.
1129 context_.server().SendServerStream<Transfer::Write>(
1130 EncodeChunk({.transfer_id = 8, .max_chunk_size_bytes = 32}));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001131 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001132
1133 // Client should send a status chunk and end the transfer.
1134 ASSERT_EQ(payloads.size(), 2u);
1135
1136 Chunk c1 = DecodeChunk(payloads[1]);
1137 EXPECT_EQ(c1.transfer_id, 8u);
1138 ASSERT_TRUE(c1.status.has_value());
1139 EXPECT_EQ(c1.status.value(), Status::InvalidArgument());
1140
1141 EXPECT_EQ(transfer_status, Status::InvalidArgument());
1142}
1143
1144TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
1145 stream::MemoryReader reader(kData32);
1146 Status transfer_status = Status::Unknown();
1147
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001148 ASSERT_EQ(OkStatus(),
1149 client_.Write(9, reader, [&transfer_status](Status status) {
1150 transfer_status = status;
1151 }));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001152 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001153
1154 // The client begins by just sending the transfer ID.
1155 rpc::PayloadsView payloads =
1156 context_.output().payloads<Transfer::Write>(context_.channel().id());
1157 ASSERT_EQ(payloads.size(), 1u);
1158 EXPECT_EQ(transfer_status, Status::Unknown());
1159
1160 Chunk c0 = DecodeChunk(payloads[0]);
1161 EXPECT_EQ(c0.transfer_id, 9u);
1162
1163 // Send an invalid transfer parameters chunk with 0 pending_bytes.
1164 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1165 {.transfer_id = 9, .pending_bytes = 0, .max_chunk_size_bytes = 32}));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001166 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov3d99d922021-11-04 13:13:42 -07001167
1168 // Client should send a status chunk and end the transfer.
1169 ASSERT_EQ(payloads.size(), 2u);
1170
1171 Chunk c1 = DecodeChunk(payloads[1]);
1172 EXPECT_EQ(c1.transfer_id, 9u);
1173 ASSERT_TRUE(c1.status.has_value());
Wyatt Heplera0e07d62022-03-17 11:11:54 -07001174 EXPECT_EQ(c1.status.value(), Status::ResourceExhausted());
Alexei Frolov3d99d922021-11-04 13:13:42 -07001175
Wyatt Heplera0e07d62022-03-17 11:11:54 -07001176 EXPECT_EQ(transfer_status, Status::ResourceExhausted());
Alexei Frolov3d99d922021-11-04 13:13:42 -07001177}
1178
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001179TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
1180 stream::MemoryReader reader(kData32);
1181 Status transfer_status = Status::Unknown();
1182
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001183 ASSERT_EQ(OkStatus(),
1184 client_.Write(
1185 10,
1186 reader,
1187 [&transfer_status](Status status) { transfer_status = status; },
1188 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001189 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001190
1191 // The client begins by just sending the transfer ID.
1192 rpc::PayloadsView payloads =
1193 context_.output().payloads<Transfer::Write>(context_.channel().id());
1194 ASSERT_EQ(payloads.size(), 1u);
1195 EXPECT_EQ(transfer_status, Status::Unknown());
1196
1197 Chunk c0 = DecodeChunk(payloads.back());
1198 EXPECT_EQ(c0.transfer_id, 10u);
1199
1200 // Wait for the timeout to expire without doing anything. The client should
1201 // resend the initial transmit chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001202 transfer_thread_.SimulateClientTimeout(10);
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001203 ASSERT_EQ(payloads.size(), 2u);
1204
1205 Chunk c = DecodeChunk(payloads.back());
1206 EXPECT_EQ(c.transfer_id, 10u);
1207
1208 // Transfer has not yet completed.
1209 EXPECT_EQ(transfer_status, Status::Unknown());
1210}
1211
1212TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
1213 stream::MemoryReader reader(kData32);
1214 Status transfer_status = Status::Unknown();
1215
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001216 ASSERT_EQ(OkStatus(),
1217 client_.Write(
1218 11,
1219 reader,
1220 [&transfer_status](Status status) { transfer_status = status; },
1221 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001222 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001223
1224 // The client begins by just sending the transfer ID.
1225 rpc::PayloadsView payloads =
1226 context_.output().payloads<Transfer::Write>(context_.channel().id());
1227 ASSERT_EQ(payloads.size(), 1u);
1228 EXPECT_EQ(transfer_status, Status::Unknown());
1229
1230 Chunk c0 = DecodeChunk(payloads.back());
1231 EXPECT_EQ(c0.transfer_id, 11u);
1232
1233 // Send the first parameters chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001234 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1235 context_.server().SendServerStream<Transfer::Write>(
1236 EncodeChunk({.transfer_id = 11,
1237 .pending_bytes = 16,
1238 .max_chunk_size_bytes = 8,
1239 .offset = 0}));
1240 });
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001241 ASSERT_EQ(payloads.size(), 3u);
Alexei Frolov22ee1142022-02-03 13:59:01 -08001242
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001243 EXPECT_EQ(transfer_status, Status::Unknown());
1244
1245 Chunk c1 = DecodeChunk(payloads[1]);
1246 EXPECT_EQ(c1.transfer_id, 11u);
1247 EXPECT_EQ(c1.offset, 0u);
1248 EXPECT_EQ(c1.data.size(), 8u);
1249 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1250
1251 Chunk c2 = DecodeChunk(payloads[2]);
1252 EXPECT_EQ(c2.transfer_id, 11u);
1253 EXPECT_EQ(c2.offset, 8u);
1254 EXPECT_EQ(c2.data.size(), 8u);
1255 EXPECT_EQ(
1256 std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
1257 0);
1258
1259 // Wait for the timeout to expire without doing anything. The client should
1260 // resend the most recently sent chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001261 transfer_thread_.SimulateClientTimeout(11);
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001262 ASSERT_EQ(payloads.size(), 4u);
1263
1264 Chunk c3 = DecodeChunk(payloads[3]);
1265 EXPECT_EQ(c3.transfer_id, c2.transfer_id);
1266 EXPECT_EQ(c3.offset, c2.offset);
1267 EXPECT_EQ(c3.data.size(), c2.data.size());
1268 EXPECT_EQ(std::memcmp(c3.data.data(), c2.data.data(), c3.data.size()), 0);
1269
1270 // Transfer has not yet completed.
1271 EXPECT_EQ(transfer_status, Status::Unknown());
1272}
1273
1274TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
1275 stream::MemoryReader reader(kData32);
1276 Status transfer_status = Status::Unknown();
1277
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001278 ASSERT_EQ(OkStatus(),
1279 client_.Write(
1280 12,
1281 reader,
1282 [&transfer_status](Status status) { transfer_status = status; },
1283 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001284 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001285
1286 // The client begins by just sending the transfer ID.
1287 rpc::PayloadsView payloads =
1288 context_.output().payloads<Transfer::Write>(context_.channel().id());
1289 ASSERT_EQ(payloads.size(), 1u);
1290 EXPECT_EQ(transfer_status, Status::Unknown());
1291
1292 Chunk c0 = DecodeChunk(payloads.back());
1293 EXPECT_EQ(c0.transfer_id, 12u);
1294
1295 // Send the first parameters chunk, requesting all the data. The client should
1296 // respond with one data chunk and a remaining_bytes = 0 chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001297 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1298 context_.server().SendServerStream<Transfer::Write>(
1299 EncodeChunk({.transfer_id = 12,
1300 .pending_bytes = 64,
1301 .max_chunk_size_bytes = 64,
1302 .offset = 0}));
1303 });
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001304 ASSERT_EQ(payloads.size(), 3u);
Alexei Frolov22ee1142022-02-03 13:59:01 -08001305
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001306 EXPECT_EQ(transfer_status, Status::Unknown());
1307
1308 Chunk c1 = DecodeChunk(payloads[1]);
1309 EXPECT_EQ(c1.transfer_id, 12u);
1310 EXPECT_EQ(c1.offset, 0u);
1311 EXPECT_EQ(c1.data.size(), 32u);
1312 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1313
1314 Chunk c2 = DecodeChunk(payloads[2]);
1315 EXPECT_EQ(c2.transfer_id, 12u);
1316 ASSERT_TRUE(c2.remaining_bytes.has_value());
1317 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
1318
1319 // Wait for the timeout to expire without doing anything. The client should
1320 // resend the data chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001321 transfer_thread_.SimulateClientTimeout(12);
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001322 ASSERT_EQ(payloads.size(), 4u);
1323
1324 Chunk c3 = DecodeChunk(payloads[3]);
1325 EXPECT_EQ(c3.transfer_id, c1.transfer_id);
1326 EXPECT_EQ(c3.offset, c1.offset);
1327 EXPECT_EQ(c3.data.size(), c1.data.size());
1328 EXPECT_EQ(std::memcmp(c3.data.data(), c1.data.data(), c3.data.size()), 0);
1329
1330 // The remaining_bytes = 0 chunk should be resent on the next parameters.
1331 context_.server().SendServerStream<Transfer::Write>(
1332 EncodeChunk({.transfer_id = 12,
1333 .pending_bytes = 64,
1334 .max_chunk_size_bytes = 64,
1335 .offset = 32}));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001336 transfer_thread_.WaitUntilEventIsProcessed();
1337
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001338 ASSERT_EQ(payloads.size(), 5u);
1339
1340 Chunk c4 = DecodeChunk(payloads[4]);
1341 EXPECT_EQ(c4.transfer_id, 12u);
1342 ASSERT_TRUE(c4.remaining_bytes.has_value());
1343 EXPECT_EQ(c4.remaining_bytes.value(), 0u);
1344
1345 context_.server().SendServerStream<Transfer::Write>(
1346 EncodeChunk({.transfer_id = 12, .status = OkStatus()}));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001347 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001348
1349 EXPECT_EQ(transfer_status, OkStatus());
1350}
1351
1352TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
1353 stream::MemoryReader reader(kData32);
1354 Status transfer_status = Status::Unknown();
1355
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001356 ASSERT_EQ(OkStatus(),
1357 client_.Write(
1358 13,
1359 reader,
1360 [&transfer_status](Status status) { transfer_status = status; },
1361 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001362 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001363
1364 // The client begins by just sending the transfer ID.
1365 rpc::PayloadsView payloads =
1366 context_.output().payloads<Transfer::Write>(context_.channel().id());
1367 ASSERT_EQ(payloads.size(), 1u);
1368 EXPECT_EQ(transfer_status, Status::Unknown());
1369
1370 Chunk c0 = DecodeChunk(payloads.back());
1371 EXPECT_EQ(c0.transfer_id, 13u);
1372
1373 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
1374 // Wait for the timeout to expire without doing anything. The client should
1375 // resend the initial transmit chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001376 transfer_thread_.SimulateClientTimeout(13);
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001377 ASSERT_EQ(payloads.size(), retry + 1);
1378
1379 Chunk c = DecodeChunk(payloads.back());
1380 EXPECT_EQ(c.transfer_id, 13u);
1381
1382 // Transfer has not yet completed.
1383 EXPECT_EQ(transfer_status, Status::Unknown());
1384 }
1385
1386 // Sleep one more time after the final retry. The client should cancel the
1387 // transfer at this point and send a DEADLINE_EXCEEDED chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001388 transfer_thread_.SimulateClientTimeout(13);
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001389 ASSERT_EQ(payloads.size(), 5u);
1390
1391 Chunk c4 = DecodeChunk(payloads.back());
1392 EXPECT_EQ(c4.transfer_id, 13u);
1393 ASSERT_TRUE(c4.status.has_value());
1394 EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
1395
1396 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1397
1398 // After finishing the transfer, nothing else should be sent. Verify this by
1399 // waiting for a bit.
1400 this_thread::sleep_for(kTestTimeout * 4);
1401 ASSERT_EQ(payloads.size(), 5u);
1402}
1403
1404TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
1405 FakeNonSeekableReader reader(kData32);
1406 Status transfer_status = Status::Unknown();
1407
Wyatt Heplerbad6d272022-02-16 07:15:07 -08001408 ASSERT_EQ(OkStatus(),
1409 client_.Write(
1410 14,
1411 reader,
1412 [&transfer_status](Status status) { transfer_status = status; },
1413 kTestTimeout));
Alexei Frolov22ee1142022-02-03 13:59:01 -08001414 transfer_thread_.WaitUntilEventIsProcessed();
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001415
1416 // The client begins by just sending the transfer ID.
1417 rpc::PayloadsView payloads =
1418 context_.output().payloads<Transfer::Write>(context_.channel().id());
1419 ASSERT_EQ(payloads.size(), 1u);
1420 EXPECT_EQ(transfer_status, Status::Unknown());
1421
1422 Chunk c0 = DecodeChunk(payloads.back());
1423 EXPECT_EQ(c0.transfer_id, 14u);
1424
1425 // Send the first parameters chunk.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001426 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1427 context_.server().SendServerStream<Transfer::Write>(
1428 EncodeChunk({.transfer_id = 14,
1429 .pending_bytes = 16,
1430 .max_chunk_size_bytes = 8,
1431 .offset = 0}));
1432 });
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001433 ASSERT_EQ(payloads.size(), 3u);
Alexei Frolov22ee1142022-02-03 13:59:01 -08001434
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001435 EXPECT_EQ(transfer_status, Status::Unknown());
1436
1437 Chunk c1 = DecodeChunk(payloads[1]);
1438 EXPECT_EQ(c1.transfer_id, 14u);
1439 EXPECT_EQ(c1.offset, 0u);
1440 EXPECT_EQ(c1.data.size(), 8u);
1441 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1442
1443 Chunk c2 = DecodeChunk(payloads[2]);
1444 EXPECT_EQ(c2.transfer_id, 14u);
1445 EXPECT_EQ(c2.offset, 8u);
1446 EXPECT_EQ(c2.data.size(), 8u);
1447 EXPECT_EQ(
1448 std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
1449 0);
1450
1451 // Wait for the timeout to expire without doing anything. The client should
1452 // fail to seek back and end the transfer.
Alexei Frolov22ee1142022-02-03 13:59:01 -08001453 transfer_thread_.SimulateClientTimeout(14);
Alexei Frolov4fd8c802021-11-09 09:49:25 -08001454 ASSERT_EQ(payloads.size(), 4u);
1455
1456 Chunk c3 = DecodeChunk(payloads[3]);
1457 EXPECT_EQ(c3.transfer_id, 14u);
1458 ASSERT_TRUE(c3.status.has_value());
1459 EXPECT_EQ(c3.status.value(), Status::DeadlineExceeded());
1460
1461 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1462}
1463
Wyatt Hepler70a62102022-02-11 15:14:35 -08001464PW_MODIFY_DIAGNOSTICS_POP();
1465
Alexei Frolov3d99d922021-11-04 13:13:42 -07001466} // namespace
1467} // namespace pw::transfer::test