blob: f603d135901b25d521bd40c992f675688214fba4 [file] [log] [blame]
Josh Gao3a34bc52018-10-11 16:33:05 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define TRACE_TAG USB
18
19#include "sysdeps.h"
20
21#include <errno.h>
22#include <stdio.h>
23#include <stdlib.h>
24#include <string.h>
25#include <sys/ioctl.h>
26#include <sys/types.h>
27#include <unistd.h>
28
29#include <linux/usb/functionfs.h>
30#include <sys/eventfd.h>
31
32#include <array>
33#include <future>
34#include <memory>
35#include <mutex>
36#include <optional>
37#include <vector>
38
39#include <asyncio/AsyncIO.h>
40
41#include <android-base/logging.h>
42#include <android-base/macros.h>
43#include <android-base/properties.h>
44#include <android-base/thread_annotations.h>
45
46#include <adbd/usb.h>
47
48#include "adb_unique_fd.h"
49#include "adb_utils.h"
50#include "sysdeps/chrono.h"
51#include "transport.h"
52#include "types.h"
53
54using android::base::StringPrintf;
55
56static constexpr size_t kUsbReadQueueDepth = 16;
57static constexpr size_t kUsbReadSize = 16384;
58
59static constexpr size_t kUsbWriteQueueDepth = 16;
60
61static const char* to_string(enum usb_functionfs_event_type type) {
62 switch (type) {
63 case FUNCTIONFS_BIND:
64 return "FUNCTIONFS_BIND";
65 case FUNCTIONFS_UNBIND:
66 return "FUNCTIONFS_UNBIND";
67 case FUNCTIONFS_ENABLE:
68 return "FUNCTIONFS_ENABLE";
69 case FUNCTIONFS_DISABLE:
70 return "FUNCTIONFS_DISABLE";
71 case FUNCTIONFS_SETUP:
72 return "FUNCTIONFS_SETUP";
73 case FUNCTIONFS_SUSPEND:
74 return "FUNCTIONFS_SUSPEND";
75 case FUNCTIONFS_RESUME:
76 return "FUNCTIONFS_RESUME";
77 }
78}
79
80enum class TransferDirection : uint64_t {
81 READ = 0,
82 WRITE = 1,
83};
84
85struct TransferId {
86 TransferDirection direction : 1;
87 uint64_t id : 63;
88
89 TransferId() : TransferId(TransferDirection::READ, 0) {}
90
91 private:
92 TransferId(TransferDirection direction, uint64_t id) : direction(direction), id(id) {}
93
94 public:
95 explicit operator uint64_t() const {
96 uint64_t result;
97 static_assert(sizeof(*this) == sizeof(result));
98 memcpy(&result, this, sizeof(*this));
99 return result;
100 }
101
102 static TransferId read(uint64_t id) { return TransferId(TransferDirection::READ, id); }
103 static TransferId write(uint64_t id) { return TransferId(TransferDirection::WRITE, id); }
104
105 static TransferId from_value(uint64_t value) {
106 TransferId result;
107 memcpy(&result, &value, sizeof(value));
108 return result;
109 }
110};
111
112struct IoBlock {
113 bool pending;
114 struct iocb control;
115 Block payload;
116
117 TransferId id() const { return TransferId::from_value(control.aio_data); }
118};
119
120struct ScopedAioContext {
121 ScopedAioContext() = default;
122 ~ScopedAioContext() { reset(); }
123
124 ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
125 ScopedAioContext(const ScopedAioContext& copy) = delete;
126
127 ScopedAioContext& operator=(ScopedAioContext&& move) {
128 reset(move.release());
129 return *this;
130 }
131 ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
132
133 static ScopedAioContext Create(size_t max_events) {
134 aio_context_t ctx = 0;
135 if (io_setup(max_events, &ctx) != 0) {
136 PLOG(FATAL) << "failed to create aio_context_t";
137 }
138 ScopedAioContext result;
139 result.reset(ctx);
140 return result;
141 }
142
143 aio_context_t release() {
144 aio_context_t result = context_;
145 context_ = 0;
146 return result;
147 }
148
149 void reset(aio_context_t new_context = 0) {
150 if (context_ != 0) {
151 io_destroy(context_);
152 }
153
154 context_ = new_context;
155 }
156
157 aio_context_t get() { return context_; }
158
159 private:
160 aio_context_t context_ = 0;
161};
162
163struct UsbFfsConnection : public Connection {
164 UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
165 std::promise<void> destruction_notifier)
166 : stopped_(false),
167 destruction_notifier_(std::move(destruction_notifier)),
168 control_fd_(std::move(control)),
169 read_fd_(std::move(read)),
170 write_fd_(std::move(write)) {
171 LOG(INFO) << "UsbFfsConnection constructed";
172 event_fd_.reset(eventfd(0, EFD_CLOEXEC));
173 if (event_fd_ == -1) {
174 PLOG(FATAL) << "failed to create eventfd";
175 }
176
177 aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
178 }
179
180 ~UsbFfsConnection() {
181 LOG(INFO) << "UsbFfsConnection being destroyed";
182 Stop();
183 monitor_thread_.join();
184 destruction_notifier_.set_value();
185 }
186
187 virtual bool Write(std::unique_ptr<apacket> packet) override final {
188 LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
189 Block header(sizeof(packet->msg));
190 memcpy(header.data(), &packet->msg, sizeof(packet->msg));
191
192 std::lock_guard<std::mutex> lock(write_mutex_);
193 write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++));
194 if (!packet->payload.empty()) {
195 write_requests_.push_back(
196 CreateWriteBlock(std::move(packet->payload), next_write_id_++));
197 }
198 SubmitWrites();
199 return true;
200 }
201
202 virtual void Start() override final { StartMonitor(); }
203
204 virtual void Stop() override final {
205 if (stopped_.exchange(true)) {
206 return;
207 }
208 stopped_ = true;
209 uint64_t notify = 1;
210 ssize_t rc = adb_write(event_fd_.get(), &notify, sizeof(notify));
211 if (rc < 0) {
212 PLOG(FATAL) << "failed to notify eventfd to stop UsbFfsConnection";
213 }
214 CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
215 }
216
217 private:
218 void StartMonitor() {
219 // This is a bit of a mess.
220 // It's possible for io_submit to end up blocking, if we call it as the endpoint
221 // becomes disabled. Work around this by having a monitor thread to listen for functionfs
222 // lifecycle events. If we notice an error condition (either we've become disabled, or we
223 // were never enabled in the first place), we send interruption signals to the worker thread
224 // until it dies, and then report failure to the transport via HandleError, which will
225 // eventually result in the transport being destroyed, which will result in UsbFfsConnection
226 // being destroyed, which unblocks the open thread and restarts this entire process.
227 static constexpr int kInterruptionSignal = SIGUSR1;
228 static std::once_flag handler_once;
229 std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
230
231 monitor_thread_ = std::thread([this]() {
232 adb_thread_setname("UsbFfs-monitor");
233
234 bool bound = false;
235 bool started = false;
236 bool running = true;
237 while (running) {
238 if (!bound || !started) {
239 adb_pollfd pfd = {.fd = control_fd_.get(), .events = POLLIN, .revents = 0};
240 int rc = TEMP_FAILURE_RETRY(adb_poll(&pfd, 1, 5000 /*ms*/));
241 if (rc == -1) {
242 PLOG(FATAL) << "poll on USB control fd failed";
243 } else if (rc == 0) {
244 // Something in the kernel presumably went wrong.
245 // Close our endpoints, wait for a bit, and then try again.
246 aio_context_.reset();
247 read_fd_.reset();
248 write_fd_.reset();
249 control_fd_.reset();
250 std::this_thread::sleep_for(5s);
251 HandleError("didn't receive FUNCTIONFS_ENABLE, retrying");
252 return;
253 }
254 }
255
256 struct usb_functionfs_event event;
257 if (TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event))) !=
258 sizeof(event)) {
259 PLOG(FATAL) << "failed to read functionfs event";
260 }
261
262 LOG(INFO) << "USB event: "
263 << to_string(static_cast<usb_functionfs_event_type>(event.type));
264
265 switch (event.type) {
266 case FUNCTIONFS_BIND:
267 CHECK(!started) << "received FUNCTIONFS_ENABLE while already bound?";
268 bound = true;
269 break;
270
271 case FUNCTIONFS_ENABLE:
272 CHECK(!started) << "received FUNCTIONFS_ENABLE while already running?";
273 started = true;
274 StartWorker();
275 break;
276
277 case FUNCTIONFS_DISABLE:
278 running = false;
279 break;
280 }
281 }
282
283 pthread_t worker_thread_handle = worker_thread_.native_handle();
284 while (true) {
285 int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
286 if (rc != 0) {
287 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
288 break;
289 }
290
291 std::this_thread::sleep_for(100ms);
292
293 rc = pthread_kill(worker_thread_handle, 0);
294 if (rc == 0) {
295 continue;
296 } else if (rc == ESRCH) {
297 break;
298 } else {
299 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
300 }
301 }
302
303 worker_thread_.join();
304
305 aio_context_.reset();
306 read_fd_.reset();
307 write_fd_.reset();
308 });
309 }
310
311 void StartWorker() {
312 worker_thread_ = std::thread([this]() {
313 adb_thread_setname("UsbFfs-worker");
314 for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
315 read_requests_[i] = CreateReadBlock(next_read_id_++);
316 SubmitRead(&read_requests_[i]);
317 }
318
319 while (!stopped_) {
320 uint64_t dummy;
321 ssize_t rc = adb_read(event_fd_.get(), &dummy, sizeof(dummy));
322 if (rc == -1) {
323 PLOG(FATAL) << "failed to read from eventfd";
324 } else if (rc == 0) {
325 LOG(FATAL) << "hit EOF on eventfd";
326 }
327
328 WaitForEvents();
329 }
330 });
331 }
332
333 void PrepareReadBlock(IoBlock* block, uint64_t id) {
334 block->pending = false;
335 block->payload.resize(kUsbReadSize);
336 block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
337 block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
338 block->control.aio_nbytes = block->payload.size();
339 }
340
341 IoBlock CreateReadBlock(uint64_t id) {
342 IoBlock block;
343 PrepareReadBlock(&block, id);
344 block.control.aio_rw_flags = 0;
345 block.control.aio_lio_opcode = IOCB_CMD_PREAD;
346 block.control.aio_reqprio = 0;
347 block.control.aio_fildes = read_fd_.get();
348 block.control.aio_offset = 0;
349 block.control.aio_flags = IOCB_FLAG_RESFD;
350 block.control.aio_resfd = event_fd_.get();
351 return block;
352 }
353
354 void WaitForEvents() {
355 static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
356 struct io_event events[kMaxEvents];
357 struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
358 int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
359 if (rc == -1) {
360 HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
361 return;
362 }
363
364 for (int event_idx = 0; event_idx < rc; ++event_idx) {
365 auto& event = events[event_idx];
366 TransferId id = TransferId::from_value(event.data);
367
368 if (event.res < 0) {
369 std::string error =
370 StringPrintf("%s %" PRIu64 " failed with error %s",
371 id.direction == TransferDirection::READ ? "read" : "write",
372 id.id, strerror(-event.res));
373 HandleError(error);
374 return;
375 }
376
377 if (id.direction == TransferDirection::READ) {
378 HandleRead(id, event.res);
379 } else {
380 HandleWrite(id);
381 }
382 }
383 }
384
385 void HandleRead(TransferId id, int64_t size) {
386 uint64_t read_idx = id.id % kUsbReadQueueDepth;
387 IoBlock* block = &read_requests_[read_idx];
388 block->pending = false;
389 block->payload.resize(size);
390
391 // Notification for completed reads can be received out of order.
392 if (block->id().id != needed_read_id_) {
393 LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
394 << needed_read_id_;
395 return;
396 }
397
398 for (uint64_t id = needed_read_id_;; ++id) {
399 size_t read_idx = id % kUsbReadQueueDepth;
400 IoBlock* current_block = &read_requests_[read_idx];
401 if (current_block->pending) {
402 break;
403 }
404 ProcessRead(current_block);
405 ++needed_read_id_;
406 }
407 }
408
409 void ProcessRead(IoBlock* block) {
410 if (!block->payload.empty()) {
411 if (!incoming_header_.has_value()) {
412 CHECK_EQ(sizeof(amessage), block->payload.size());
413 amessage msg;
414 memcpy(&msg, block->payload.data(), sizeof(amessage));
415 LOG(DEBUG) << "USB read:" << dump_header(&msg);
416 incoming_header_ = msg;
417 } else {
418 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
419 Block payload = std::move(block->payload);
420 CHECK_LE(payload.size(), bytes_left);
421 incoming_payload_.append(std::make_unique<Block>(std::move(payload)));
422 }
423
424 if (incoming_header_->data_length == incoming_payload_.size()) {
425 auto packet = std::make_unique<apacket>();
426 packet->msg = *incoming_header_;
427
428 // TODO: Make apacket contain an IOVector so we don't have to coalesce.
429 packet->payload = incoming_payload_.coalesce();
430 read_callback_(this, std::move(packet));
431
432 incoming_header_.reset();
433 incoming_payload_.clear();
434 }
435 }
436
437 PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
438 SubmitRead(block);
439 }
440
441 void SubmitRead(IoBlock* block) {
442 block->pending = true;
443 struct iocb* iocb = &block->control;
444 if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
445 HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
446 return;
447 }
448 }
449
450 void HandleWrite(TransferId id) {
451 std::lock_guard<std::mutex> lock(write_mutex_);
452 auto it =
453 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
454 return static_cast<uint64_t>(req->id()) == static_cast<uint64_t>(id);
455 });
456 CHECK(it != write_requests_.end());
457
458 write_requests_.erase(it);
459 size_t outstanding_writes = --writes_submitted_;
460 LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
461
462 SubmitWrites();
463 }
464
465 std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) {
466 auto block = std::make_unique<IoBlock>();
467 block->payload = std::move(payload);
468 block->control.aio_data = static_cast<uint64_t>(TransferId::write(id));
469 block->control.aio_rw_flags = 0;
470 block->control.aio_lio_opcode = IOCB_CMD_PWRITE;
471 block->control.aio_reqprio = 0;
472 block->control.aio_fildes = write_fd_.get();
473 block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
474 block->control.aio_nbytes = block->payload.size();
475 block->control.aio_offset = 0;
476 block->control.aio_flags = IOCB_FLAG_RESFD;
477 block->control.aio_resfd = event_fd_.get();
478 return block;
479 }
480
481 void SubmitWrites() REQUIRES(write_mutex_) {
482 if (writes_submitted_ == kUsbWriteQueueDepth) {
483 return;
484 }
485
486 ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
487 write_requests_.size() - writes_submitted_);
488 CHECK_GE(writes_to_submit, 0);
489 if (writes_to_submit == 0) {
490 return;
491 }
492
493 struct iocb* iocbs[kUsbWriteQueueDepth];
494 for (int i = 0; i < writes_to_submit; ++i) {
495 CHECK(!write_requests_[writes_submitted_ + i]->pending);
496 write_requests_[writes_submitted_ + i]->pending = true;
497 iocbs[i] = &write_requests_[writes_submitted_ + i]->control;
498 LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
499 }
500
501 int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
502 if (rc == -1) {
503 HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
504 return;
505 } else if (rc != writes_to_submit) {
506 LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
507 << ", actually submitted " << rc;
508 }
509
510 writes_submitted_ += rc;
511 }
512
513 void HandleError(const std::string& error) {
514 std::call_once(error_flag_, [&]() {
515 error_callback_(this, error);
516 if (!stopped_) {
517 Stop();
518 }
519 });
520 }
521
522 std::thread monitor_thread_;
523 std::thread worker_thread_;
524
525 std::atomic<bool> stopped_;
526 std::promise<void> destruction_notifier_;
527 std::once_flag error_flag_;
528
529 unique_fd event_fd_;
530
531 ScopedAioContext aio_context_;
532 unique_fd control_fd_;
533 unique_fd read_fd_;
534 unique_fd write_fd_;
535
536 std::optional<amessage> incoming_header_;
537 IOVector incoming_payload_;
538
539 std::array<IoBlock, kUsbReadQueueDepth> read_requests_;
540 IOVector read_data_;
541
542 // ID of the next request that we're going to send out.
543 size_t next_read_id_ = 0;
544
545 // ID of the next packet we're waiting for.
546 size_t needed_read_id_ = 0;
547
548 std::mutex write_mutex_;
549 std::deque<std::unique_ptr<IoBlock>> write_requests_ GUARDED_BY(write_mutex_);
550 size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
551 size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
552};
553
554static void usb_ffs_open_thread() {
555 adb_thread_setname("usb ffs open");
556
557 while (true) {
558 unique_fd control;
559 unique_fd bulk_out;
560 unique_fd bulk_in;
561 if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
562 std::this_thread::sleep_for(1s);
563 continue;
564 }
565
566 atransport* transport = new atransport();
567 transport->serial = "UsbFfs";
568 std::promise<void> destruction_notifier;
569 std::future<void> future = destruction_notifier.get_future();
570 transport->SetConnection(std::make_unique<UsbFfsConnection>(
571 std::move(control), std::move(bulk_out), std::move(bulk_in),
572 std::move(destruction_notifier)));
573 register_transport(transport);
574 future.wait();
575 }
576}
577
578void usb_init_legacy();
579void usb_init() {
580 if (!android::base::GetBoolProperty("persist.adb.nonblocking_ffs", false)) {
581 usb_init_legacy();
582 } else {
583 std::thread(usb_ffs_open_thread).detach();
584 }
585}