blob: aac5cad35eb6e8c3553caacb855e36fe86311b5c [file] [log] [blame]
Alexei Frolov22ee1142022-02-03 13:59:01 -08001// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#define PW_LOG_MODULE_NAME "TRN"
16
17#include "pw_transfer/transfer_thread.h"
18
19#include "pw_assert/check.h"
20#include "pw_log/log.h"
21#include "pw_transfer/internal/chunk.h"
22
Wyatt Hepler70a62102022-02-11 15:14:35 -080023PW_MODIFY_DIAGNOSTICS_PUSH();
24PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
25
Alexei Frolov22ee1142022-02-03 13:59:01 -080026namespace pw::transfer::internal {
27
28void TransferThread::Terminate() {
29 next_event_ownership_.acquire();
30 next_event_.type = EventType::kTerminate;
31 event_notification_.release();
32}
33
34void TransferThread::SimulateTimeout(EventType type, uint32_t transfer_id) {
35 next_event_ownership_.acquire();
36
37 next_event_.type = type;
38 next_event_.chunk = {};
39 next_event_.chunk.transfer_id = transfer_id;
40
41 event_notification_.release();
42
43 WaitUntilEventIsProcessed();
44}
45
46void TransferThread::Run() {
47 // Next event starts freed.
48 next_event_ownership_.release();
49
50 while (true) {
51 if (event_notification_.try_acquire_until(GetNextTransferTimeout())) {
52 if (next_event_.type == EventType::kTerminate) {
53 return;
54 }
55
56 HandleEvent(next_event_);
57
58 // Finished processing the event. Allow the next_event struct to be
59 // overwritten.
60 next_event_ownership_.release();
61 }
62
63 // Regardless of whether an event was received or not, check for any
64 // transfers which have timed out and process them if so.
65 for (Context& context : client_transfers_) {
66 if (context.timed_out()) {
67 context.HandleEvent({.type = EventType::kClientTimeout});
68 }
69 }
70 for (Context& context : server_transfers_) {
71 if (context.timed_out()) {
72 context.HandleEvent({.type = EventType::kServerTimeout});
73 }
74 }
75 }
76}
77
78chrono::SystemClock::time_point TransferThread::GetNextTransferTimeout() const {
79 chrono::SystemClock::time_point timeout =
80 chrono::SystemClock::TimePointAfterAtLeast(kMaxTimeout);
81
82 for (Context& context : client_transfers_) {
83 auto ctx_timeout = context.timeout();
84 if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
85 timeout = ctx_timeout.value();
86 }
87 }
88 for (Context& context : server_transfers_) {
89 auto ctx_timeout = context.timeout();
90 if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
91 timeout = ctx_timeout.value();
92 }
93 }
94
95 return timeout;
96}
97
98void TransferThread::StartTransfer(TransferType type,
99 uint32_t transfer_id,
100 uint32_t handler_id,
101 stream::Stream* stream,
102 const TransferParameters& max_parameters,
103 Function<void(Status)>&& on_completion,
104 chrono::SystemClock::duration timeout,
105 uint8_t max_retries) {
106 // Block until the last event has been processed.
107 next_event_ownership_.acquire();
108
109 bool is_client_transfer = stream != nullptr;
110
111 next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
112 : EventType::kNewServerTransfer;
113 next_event_.new_transfer = {
114 .type = type,
115 .transfer_id = transfer_id,
116 .handler_id = handler_id,
117 .max_parameters = &max_parameters,
118 .timeout = timeout,
119 .max_retries = max_retries,
120 .transfer_thread = this,
121 };
122
123 staged_on_completion_ = std::move(on_completion);
124
125 // The transfer is initialized with either a stream (client-side) or a handler
126 // (server-side). If no stream is provided, try to find a registered handler
127 // with the specified ID.
128 if (is_client_transfer) {
129 next_event_.new_transfer.stream = stream;
130 next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
131 type == TransferType::kTransmit ? client_write_stream_
132 : client_read_stream_);
133 } else {
134 auto handler = std::find_if(handlers_.begin(),
135 handlers_.end(),
136 [&](auto& h) { return h.id() == handler_id; });
137 if (handler != handlers_.end()) {
138 next_event_.new_transfer.handler = &*handler;
139 next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
140 type == TransferType::kTransmit ? server_read_stream_
141 : server_write_stream_);
142 } else {
143 // No handler exists for the transfer: return a NOT_FOUND.
144 next_event_.type = EventType::kSendStatusChunk;
145 next_event_.send_status_chunk = {
146 .transfer_id = transfer_id,
147 .status = Status::NotFound().code(),
148 .stream = type == TransferType::kTransmit
149 ? TransferStream::kServerRead
150 : TransferStream::kServerWrite,
151 };
152 }
153 }
154
155 event_notification_.release();
156}
157
158void TransferThread::ProcessChunk(EventType type, ConstByteSpan chunk) {
159 // If this assert is hit, there is a bug in the transfer implementation.
160 // Contexts' max_chunk_size_bytes fields should be set based on the size of
161 // chunk_buffer_.
162 PW_CHECK(chunk.size() <= chunk_buffer_.size(),
163 "Transfer received a larger chunk than it can handle.");
164
165 Result<uint32_t> transfer_id = ExtractTransferId(chunk);
166 if (!transfer_id.ok()) {
167 PW_LOG_ERROR("Received a malformed chunk without a transfer ID");
168 return;
169 }
170
171 // Block until the last event has been processed.
172 next_event_ownership_.acquire();
173
174 std::memcpy(chunk_buffer_.data(), chunk.data(), chunk.size());
175
176 next_event_.type = type;
177 next_event_.chunk = {
178 .transfer_id = *transfer_id,
179 .data = chunk_buffer_.data(),
180 .size = chunk.size(),
181 };
182
183 event_notification_.release();
184}
185
186void TransferThread::SetClientStream(TransferStream type,
187 rpc::RawClientReaderWriter& stream) {
188 // Block until the last event has been processed.
189 next_event_ownership_.acquire();
190
191 next_event_.type = EventType::kSetTransferStream;
192 next_event_.set_transfer_stream = type;
193 staged_client_stream_ = std::move(stream);
194
195 event_notification_.release();
196}
197
198void TransferThread::SetServerStream(TransferStream type,
199 rpc::RawServerReaderWriter& stream) {
200 // Block until the last event has been processed.
201 next_event_ownership_.acquire();
202
203 next_event_.type = EventType::kSetTransferStream;
204 next_event_.set_transfer_stream = type;
205 staged_server_stream_ = std::move(stream);
206
207 event_notification_.release();
208}
209
210void TransferThread::TransferHandlerEvent(EventType type,
211 internal::Handler& handler) {
212 // Block until the last event has been processed.
213 next_event_ownership_.acquire();
214
215 next_event_.type = type;
216 if (type == EventType::kAddTransferHandler) {
217 next_event_.add_transfer_handler = &handler;
218 } else {
219 next_event_.remove_transfer_handler = &handler;
220 }
221
222 event_notification_.release();
223}
224
225void TransferThread::HandleEvent(const internal::Event& event) {
226 switch (event.type) {
227 case EventType::kSendStatusChunk:
228 SendStatusChunk(event.send_status_chunk);
229 break;
230
231 case EventType::kSetTransferStream:
232 switch (event.set_transfer_stream) {
233 case TransferStream::kClientRead:
234 client_read_stream_ = std::move(staged_client_stream_);
235 break;
236
237 case TransferStream::kClientWrite:
238 client_write_stream_ = std::move(staged_client_stream_);
239 break;
240
241 case TransferStream::kServerRead:
242 server_read_stream_ = std::move(staged_server_stream_);
243 break;
244
245 case TransferStream::kServerWrite:
246 server_write_stream_ = std::move(staged_server_stream_);
247 break;
248 }
249 return;
250
251 case EventType::kAddTransferHandler:
252 handlers_.push_front(*event.add_transfer_handler);
253 return;
254
255 case EventType::kRemoveTransferHandler:
256 handlers_.remove(*event.remove_transfer_handler);
257 return;
258
259 default:
260 // Other events are handled by individual transfer contexts.
261 break;
262 }
263
264 if (Context* ctx = FindContextForEvent(event); ctx != nullptr) {
265 if (event.type == EventType::kNewClientTransfer) {
266 // TODO(frolv): This is terrible.
267 static_cast<ClientContext*>(ctx)->set_on_completion(
268 std::move(staged_on_completion_));
269 }
270
271 ctx->HandleEvent(event);
272 }
273}
274
275Context* TransferThread::FindContextForEvent(
276 const internal::Event& event) const {
277 switch (event.type) {
278 case EventType::kNewClientTransfer:
279 return FindNewTransfer(client_transfers_, event.new_transfer.transfer_id);
280 case EventType::kNewServerTransfer:
281 return FindNewTransfer(server_transfers_, event.new_transfer.transfer_id);
282 case EventType::kClientChunk:
283 return FindActiveTransfer(client_transfers_, event.chunk.transfer_id);
284 case EventType::kServerChunk:
285 return FindActiveTransfer(server_transfers_, event.chunk.transfer_id);
286 case EventType::kClientTimeout: // Manually triggered client timeout
287 return FindActiveTransfer(client_transfers_, event.chunk.transfer_id);
288 case EventType::kServerTimeout: // Manually triggered server timeout
289 return FindActiveTransfer(server_transfers_, event.chunk.transfer_id);
290 default:
291 return nullptr;
292 }
293}
294
295void TransferThread::SendStatusChunk(
296 const internal::SendStatusChunkEvent& event) {
297 rpc::Writer& destination = stream_for(event.stream);
298
299 internal::Chunk chunk = {};
300 chunk.transfer_id = event.transfer_id;
301 chunk.status = event.status;
302
303 Result<ConstByteSpan> result = internal::EncodeChunk(chunk, chunk_buffer_);
304
305 if (!result.ok()) {
306 PW_LOG_ERROR("Failed to encode final chunk for transfer %u",
307 static_cast<unsigned>(event.transfer_id));
308 return;
309 }
310
311 if (!destination.Write(result.value()).ok()) {
312 PW_LOG_ERROR("Failed to send final chunk for transfer %u",
313 static_cast<unsigned>(event.transfer_id));
314 return;
315 }
316}
317
318} // namespace pw::transfer::internal
Wyatt Hepler70a62102022-02-11 15:14:35 -0800319
320PW_MODIFY_DIAGNOSTICS_POP();