blob: 13e15caf04fd9001a26637ca7f8887daf2aba26c [file] [log] [blame]
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
23#include "qemu-common.h"
24#include "android/async-utils.h"
25#include "android/utils/debug.h"
26#include "android/async-socket-connector.h"
27#include "android/async-socket.h"
28#include "utils/panic.h"
29#include "iolooper.h"
30
31#define E(...) derror(__VA_ARGS__)
32#define W(...) dwarning(__VA_ARGS__)
33#define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
34#define D_ACTIVE VERBOSE_CHECK(asyncsocket)
35
36/********************************************************************************
37 * Asynchronous Socket internal API declarations
38 *******************************************************************************/
39
40/* Asynchronous socket I/O (reader, or writer) descriptor. */
41typedef struct AsyncSocketIO AsyncSocketIO;
42
43/* Gets socket's address string. */
44static const char* _async_socket_string(AsyncSocket* as);
45
46/* Gets socket's looper. */
47static Looper* _async_socket_get_looper(AsyncSocket* as);
48
49/* Handler for the I/O time out.
50 * Param:
51 * as - Asynchronous socket for the reader.
52 * asio - Desciptor for the timed out I/O.
53 */
54static void _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio);
55
56/********************************************************************************
57 * Asynchronous Socket Reader / Writer
58 *******************************************************************************/
59
60struct AsyncSocketIO {
61 /* Next I/O in the reader, or writer list. */
62 AsyncSocketIO* next;
63 /* Asynchronous socket for this I/O. */
64 AsyncSocket* as;
65 /* Timer used for time outs on this I/O. */
66 LoopTimer timer[1];
67 /* An opaque pointer associated with this I/O. */
68 void* io_opaque;
69 /* Buffer where to read / write data. */
70 uint8_t* buffer;
71 /* Bytes to transfer through the socket for this I/O. */
72 uint32_t to_transfer;
73 /* Bytes thransferred through the socket in this I/O. */
74 uint32_t transferred;
75 /* I/O callbacks for this I/O. */
76 const ASIOCb* io_cb;
77 /* I/O type selector: 1 - read, 0 - write. */
78 int is_io_read;
79};
80
81/*
82 * Recycling I/O instances.
83 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
84 * them for faster allocation, rather than allocating and freeing them for each
85 * I/O on the socket.
86 */
87
88/* List of recycled I/O descriptors. */
89static AsyncSocketIO* _asio_recycled = NULL;
90/* Number of I/O descriptors that are recycled in the _asio_recycled list. */
91static int _recycled_asio_count = 0;
92/* Maximum number of I/O descriptors that can be recycled. */
93static const int _max_recycled_asio_num = 32;
94
95/* Handler for an I/O time-out timer event.
96 * When this routine is invoked, it indicates that a time out has occurred on an
97 * I/O.
98 * Param:
99 * opaque - AsyncSocketIO instance representing the timed out I/O.
100 */
101static void _on_async_socket_io_timed_out(void* opaque);
102
103/* Creates new I/O descriptor.
104 * Param:
105 * as - Asynchronous socket for the I/O.
106 * is_io_read - I/O type selector: 1 - read, 0 - write.
107 * buffer, len - Reader / writer buffer address.
108 * io_cb - Callbacks for this reader / writer.
109 * io_opaque - An opaque pointer associated with the I/O.
110 * deadline - Deadline to complete the I/O.
111 * Return:
112 * Initialized AsyncSocketIO instance.
113 */
114static AsyncSocketIO*
115_async_socket_rw_new(AsyncSocket* as,
116 int is_io_read,
117 void* buffer,
118 uint32_t len,
119 const ASIOCb* io_cb,
120 void* io_opaque,
121 Duration deadline)
122{
123 /* Lookup in the recycler first. */
124 AsyncSocketIO* asio = _asio_recycled;
125 if (asio != NULL) {
126 /* Pull the descriptor from recycler. */
127 _asio_recycled = asio->next;
128 _recycled_asio_count--;
129 } else {
130 /* No recycled descriptors. Allocate new one. */
131 ANEW0(asio);
132 }
133
134 asio->next = NULL;
135 asio->as = as;
136 asio->is_io_read = is_io_read;
137 asio->buffer = (uint8_t*)buffer;
138 asio->to_transfer = len;
139 asio->transferred = 0;
140 asio->io_cb = io_cb;
141 asio->io_opaque = io_opaque;
142
143 loopTimer_init(asio->timer, _async_socket_get_looper(as),
144 _on_async_socket_io_timed_out, asio);
145 loopTimer_startAbsolute(asio->timer, deadline);
146
147 return asio;
148}
149
150/* Destroys and frees I/O descriptor. */
151static void
152_async_socket_io_destroy(AsyncSocketIO* asio)
153{
154 loopTimer_stop(asio->timer);
155 loopTimer_done(asio->timer);
156
157 /* Try to recycle it first, and free the memory if recycler is full. */
158 if (_recycled_asio_count < _max_recycled_asio_num) {
159 asio->next = _asio_recycled;
160 _asio_recycled = asio;
161 _recycled_asio_count++;
162 } else {
163 AFREE(asio);
164 }
165}
166
167/* Creates new asynchronous socket reader.
168 * Param:
169 * as - Asynchronous socket for the reader.
170 * buffer, len - Reader's buffer.
171 * io_cb - Lists reader's callbacks.
172 * reader_opaque - An opaque pointer associated with the reader.
173 * deadline - Deadline to complete the operation.
174 * Return:
175 * An initialized AsyncSocketIO intance.
176 */
177static AsyncSocketIO*
178_async_socket_reader_new(AsyncSocket* as,
179 void* buffer,
180 uint32_t len,
181 const ASIOCb* io_cb,
182 void* reader_opaque,
183 Duration deadline)
184{
185 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
186 reader_opaque, deadline);
187 return asio;
188}
189
190/* Creates new asynchronous socket writer.
191 * Param:
192 * as - Asynchronous socket for the writer.
193 * buffer, len - Writer's buffer.
194 * io_cb - Lists writer's callbacks.
195 * writer_opaque - An opaque pointer associated with the writer.
196 * deadline - Deadline to complete the operation.
197 * Return:
198 * An initialized AsyncSocketIO intance.
199 */
200static AsyncSocketIO*
201_async_socket_writer_new(AsyncSocket* as,
202 const void* buffer,
203 uint32_t len,
204 const ASIOCb* io_cb,
205 void* writer_opaque,
206 Duration deadline)
207{
208 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
209 io_cb, writer_opaque,
210 deadline);
211 return asio;
212}
213
214/* I/O timed out. */
215static void
216_on_async_socket_io_timed_out(void* opaque)
217{
218 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
219 _async_socket_io_timed_out(asio->as, asio);
220 _async_socket_io_destroy(asio);
221}
222
223/********************************************************************************
224 * Asynchronous Socket internals
225 *******************************************************************************/
226
227struct AsyncSocket {
228 /* TCP address for the socket. */
229 SockAddress address;
230 /* Client callbacks for this socket. */
231 const ASClientCb* client_cb;
232 /* An opaque pointer associated with this socket by the client. */
233 void* client_opaque;
234 /* I/O looper for asynchronous I/O on the socket. */
235 Looper* looper;
236 /* I/O descriptor for asynchronous I/O on the socket. */
237 LoopIo io[1];
238 /* Timer to use for reconnection attempts. */
239 LoopTimer reconnect_timer[1];
240 /* Head of the list of the active readers. */
241 AsyncSocketIO* readers_head;
242 /* Tail of the list of the active readers. */
243 AsyncSocketIO* readers_tail;
244 /* Head of the list of the active writers. */
245 AsyncSocketIO* writers_head;
246 /* Tail of the list of the active writers. */
247 AsyncSocketIO* writers_tail;
248 /* Socket's file descriptor. */
249 int fd;
250 /* Timeout to use for reconnection attempts. */
251 int reconnect_to;
252};
253
254static const char*
255_async_socket_string(AsyncSocket* as)
256{
257 return sock_address_to_string(&as->address);
258}
259
260static Looper*
261_async_socket_get_looper(AsyncSocket* as)
262{
263 return as->looper;
264}
265
266/* Pulls first reader out of the list.
267 * Param:
268 * as - Initialized AsyncSocket instance.
269 * Return:
270 * First I/O pulled out of the list, or NULL if there are no I/O in the list.
271 */
272static AsyncSocketIO*
273_async_socket_pull_first_io(AsyncSocket* as,
274 AsyncSocketIO** list_head,
275 AsyncSocketIO** list_tail)
276{
277 AsyncSocketIO* const ret = *list_head;
278 if (ret != NULL) {
279 *list_head = ret->next;
280 ret->next = NULL;
281 if (*list_head == NULL) {
282 *list_tail = NULL;
283 }
284 }
285 return ret;
286}
287
288/* Pulls first reader out of the list.
289 * Param:
290 * as - Initialized AsyncSocket instance.
291 * Return:
292 * First reader pulled out of the list, or NULL if there are no readers in the
293 * list.
294 */
295static AsyncSocketIO*
296_async_socket_pull_first_reader(AsyncSocket* as)
297{
298 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
299}
300
301/* Pulls first writer out of the list.
302 * Param:
303 * as - Initialized AsyncSocket instance.
304 * Return:
305 * First writer pulled out of the list, or NULL if there are no writers in the
306 * list.
307 */
308static AsyncSocketIO*
309_async_socket_pull_first_writer(AsyncSocket* as)
310{
311 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
312}
313
314/* Removes an I/O descriptor from a list of active I/O.
315 * Param:
316 * as - Initialized AsyncSocket instance.
317 * list_head, list_tail - Pointers to the list head and tail.
318 * io - I/O to remove.
319 * Return:
320 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
321 */
322static int
323_async_socket_remove_io(AsyncSocket* as,
324 AsyncSocketIO** list_head,
325 AsyncSocketIO** list_tail,
326 AsyncSocketIO* io)
327{
328 AsyncSocketIO* prev = NULL;
329
330 while (*list_head != NULL && io != *list_head) {
331 prev = *list_head;
332 list_head = &((*list_head)->next);
333 }
334 if (*list_head == NULL) {
335 D("%s: I/O %p is not found in the list for socket '%s'",
336 __FUNCTION__, io, _async_socket_string(as));
337 return 0;
338 }
339
340 *list_head = io->next;
341 if (prev != NULL) {
342 prev->next = io->next;
343 }
344 if (*list_tail == io) {
345 *list_tail = prev;
346 }
347
348 return 1;
349}
350
351/* Advances to the next I/O in the list.
352 * Param:
353 * as - Initialized AsyncSocket instance.
354 * list_head, list_tail - Pointers to the list head and tail.
355 * Return:
356 * Next I/O at the head of the list, or NULL if I/O list become empty.
357 */
358static AsyncSocketIO*
359_async_socket_advance_io(AsyncSocket* as,
360 AsyncSocketIO** list_head,
361 AsyncSocketIO** list_tail)
362{
363 AsyncSocketIO* first_io = *list_head;
364 if (first_io != NULL) {
365 *list_head = first_io->next;
366 first_io->next = NULL;
367 }
368 if (*list_head == NULL) {
369 *list_tail = NULL;
370 }
371 return *list_head;
372}
373
374/* Advances to the next reader in the list.
375 * Param:
376 * as - Initialized AsyncSocket instance.
377 * Return:
378 * Next reader at the head of the list, or NULL if reader list become empty.
379 */
380static AsyncSocketIO*
381_async_socket_advance_reader(AsyncSocket* as)
382{
383 return _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
384}
385
386/* Advances to the next writer in the list.
387 * Param:
388 * as - Initialized AsyncSocket instance.
389 * Return:
390 * Next writer at the head of the list, or NULL if writer list become empty.
391 */
392static AsyncSocketIO*
393_async_socket_advance_writer(AsyncSocket* as)
394{
395 return _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
396}
397
398/* Completes an I/O.
399 * Param:
400 * as - Initialized AsyncSocket instance.
401 * asio - I/O to complete.
402 */
403static void
404_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
405{
406 /* Stop the timer. */
407 loopTimer_stop(asio->timer);
408
409 /* Report I/O completion. First report via I/O callback, and only if it is
410 * not set, report via client callback. */
411 if (asio->io_cb && asio->io_cb->on_completed) {
412 asio->io_cb->on_completed(as->client_opaque, as, asio->is_io_read,
413 asio->io_opaque, asio->buffer, asio->transferred);
414 } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_completed) {
415 as->client_cb->io_cb->on_completed(as->client_opaque, as, asio->is_io_read,
416 asio->io_opaque, asio->buffer,
417 asio->transferred);
418 }
419}
420
421/* Timeouts an I/O.
422 * Param:
423 * as - Initialized AsyncSocket instance.
424 * asio - An I/O that has timed out.
425 */
426static void
427_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
428{
429 /* Remove the I/O from a list of active I/O. */
430 if (asio->is_io_read) {
431 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
432 } else {
433 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
434 }
435
436 /* Report I/O time out. First report it via I/O callbacks, and only if it is
437 * not set, report it via client callbacks. */
438 if (asio->io_cb && asio->io_cb->on_timed_out) {
439 asio->io_cb->on_timed_out(as->client_opaque, as, asio->is_io_read,
440 asio->io_opaque, asio->buffer,
441 asio->transferred, asio->to_transfer);
442 } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_timed_out) {
443 as->client_cb->io_cb->on_timed_out(as->client_opaque, as, asio->is_io_read,
444 asio->io_opaque, asio->buffer,
445 asio->transferred, asio->to_transfer);
446 }
447}
448
449/* Cancels an I/O.
450 * Param:
451 * as - Initialized AsyncSocket instance.
452 * asio - An I/O to cancel.
453 */
454static void
455_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
456{
457 /* Stop the timer. */
458 loopTimer_stop(asio->timer);
459
460 /* Report I/O cancellation. First report it via I/O callbacks, and only if it
461 * is not set, report it via client callbacks. */
462 if (asio->io_cb && asio->io_cb->on_cancelled) {
463 asio->io_cb->on_cancelled(as->client_opaque, as, asio->is_io_read,
464 asio->io_opaque, asio->buffer,
465 asio->transferred, asio->to_transfer);
466 } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_cancelled) {
467 as->client_cb->io_cb->on_cancelled(as->client_opaque, as, asio->is_io_read,
468 asio->io_opaque, asio->buffer,
469 asio->transferred, asio->to_transfer);
470 }
471}
472
473/* Reports an I/O failure.
474 * Param:
475 * as - Initialized AsyncSocket instance.
476 * asio - An I/O that has failed. Can be NULL for general failures.
477 * failure - Failure (errno) that has occurred.
478 */
479static void
480_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
481{
482 /* Stop the timer. */
483 loopTimer_stop(asio->timer);
484
485 /* Report I/O failure. First report it via I/O callbacks, and only if it
486 * is not set, report it via client callbacks. */
487 if (asio && asio->io_cb && asio->io_cb->on_io_failure) {
488 asio->io_cb->on_io_failure(as->client_opaque, as, asio->is_io_read,
489 asio->io_opaque, asio->buffer,
490 asio->transferred, asio->to_transfer, failure);
491 } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_io_failure) {
492 as->client_cb->io_cb->on_io_failure(as->client_opaque, as,
493 asio->is_io_read, asio->io_opaque,
494 asio->buffer, asio->transferred,
495 asio->to_transfer, failure);
496 }
497}
498
499/* Cancels all the active socket readers.
500 * Param:
501 * as - Initialized AsyncSocket instance.
502 */
503static void
504_async_socket_cancel_readers(AsyncSocket* as)
505{
506 while (as->readers_head != NULL) {
507 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
508 _async_socket_cancel_io(as, to_cancel);
509 _async_socket_io_destroy(to_cancel);
510 }
511}
512
513/* Cancels all the active socket writers.
514 * Param:
515 * as - Initialized AsyncSocket instance.
516 */
517static void
518_async_socket_cancel_writers(AsyncSocket* as)
519{
520 while (as->writers_head != NULL) {
521 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
522 _async_socket_cancel_io(as, to_cancel);
523 _async_socket_io_destroy(to_cancel);
524 }
525}
526
527/* Cancels all the I/O on the socket. */
528static void
529_async_socket_cancel_all_io(AsyncSocket* as)
530{
531 /* Stop the reconnection timer. */
532 loopTimer_stop(as->reconnect_timer);
533
534 /* Stop read / write on the socket. */
535 loopIo_dontWantWrite(as->io);
536 loopIo_dontWantRead(as->io);
537
538 /* Cancel active readers and writers. */
539 _async_socket_cancel_readers(as);
540 _async_socket_cancel_writers(as);
541}
542
543/* Closes socket handle used by the async socket.
544 * Param:
545 * as - Initialized AsyncSocket instance.
546 */
547static void
548_async_socket_close_socket(AsyncSocket* as)
549{
550 if (as->fd >= 0) {
551 loopIo_done(as->io);
552 socket_close(as->fd);
553 as->fd = -1;
554 }
555}
556
557/* Destroys AsyncSocket instance.
558 * Param:
559 * as - Initialized AsyncSocket instance.
560 */
561static void
562_async_socket_destroy(AsyncSocket* as)
563{
564 if (as != NULL) {
565 /* Cancel all the I/O */
566 _async_socket_cancel_all_io(as);
567
568 /* Close socket. */
569 _async_socket_close_socket(as);
570
571 /* Free allocated resources. */
572 if (as->looper != NULL) {
573 loopTimer_done(as->reconnect_timer);
574 looper_free(as->looper);
575 }
576 sock_address_done(&as->address);
577 AFREE(as);
578 }
579}
580
581/* Starts reconnection attempts after connection has been lost.
582 * Param:
583 * as - Initialized AsyncSocket instance.
584 * to - Milliseconds to wait before reconnection attempt.
585 */
586static void
587_async_socket_reconnect(AsyncSocket* as, int to)
588{
589 /* Make sure that no I/O is active, and socket is closed before we
590 * reconnect. */
591 _async_socket_cancel_all_io(as);
592
593 /* Set the timer for reconnection attempt. */
594 loopTimer_startRelative(as->reconnect_timer, to);
595}
596
597/********************************************************************************
598 * Asynchronous Socket callbacks
599 *******************************************************************************/
600
601/* A callback that is invoked when socket gets disconnected.
602 * Param:
603 * as - Initialized AsyncSocket instance.
604 */
605static void
606_on_async_socket_disconnected(AsyncSocket* as)
607{
608 /* Save error to restore it for the client's callback. */
609 const int save_errno = errno;
610 ASConnectAction action = ASCA_ABORT;
611
612 D("Async socket '%s' is disconnected. Error %d -> %s",
613 _async_socket_string(as), errno, strerror(errno));
614
615 /* Cancel all the I/O on this socket. */
616 _async_socket_cancel_all_io(as);
617
618 /* Close the socket. */
619 _async_socket_close_socket(as);
620
621 /* Restore errno, and invoke client's callback. */
622 errno = save_errno;
623 action = as->client_cb->on_connection(as->client_opaque, as,
624 ASCS_DISCONNECTED);
625
626 if (action == ASCA_RETRY) {
627 /* Client requested reconnection. */
628 if (as->reconnect_to) {
629 _async_socket_reconnect(as, as->reconnect_to);
630 }
631 }
632}
633
634/* A callback that is invoked on socket's I/O failure.
635 * Param:
636 * as - Initialized AsyncSocket instance.
637 * asio - Descriptor for the failed I/O. Can be NULL for general failures.
638 */
639static void
640_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
641{
642 D("Async socket '%s' I/O failure %d: %s",
643 _async_socket_string(as), errno, strerror(errno));
644
645 /* Report the failure. */
646 _async_socket_io_failure(as, asio, errno);
647}
648
649/* A callback that is invoked when there is data available to read.
650 * Param:
651 * as - Initialized AsyncSocket instance.
652 * Return:
653 * 0 on success, or -1 on failure. Failure returned from this routine will
654 * skip writes (if awailable) behind this read.
655 */
656static int
657_on_async_socket_recv(AsyncSocket* as)
658{
659 /* Get current reader. */
660 AsyncSocketIO* const asr = as->readers_head;
661 if (asr == NULL) {
662 D("No async socket reader available on IO_READ for '%s'",
663 _async_socket_string(as));
664 loopIo_dontWantRead(as->io);
665 return 0;
666 }
667
668 /* Read next chunk of data. */
669 int res = socket_recv(as->fd, asr->buffer + asr->transferred,
670 asr->to_transfer - asr->transferred);
671 while (res < 0 && errno == EINTR) {
672 res = socket_recv(as->fd, asr->buffer + asr->transferred,
673 asr->to_transfer - asr->transferred);
674 }
675
676 if (res == 0) {
677 /* Socket has been disconnected. */
678 errno = ECONNRESET;
679 _on_async_socket_disconnected(as);
680 return -1;
681 }
682
683 if (res < 0) {
684 if (errno == EWOULDBLOCK || errno == EAGAIN) {
685 /* Yield to writes behind this read. */
686 loopIo_wantRead(as->io);
687 return 0;
688 }
689
690 /* An I/O error. */
691 _on_async_socket_failure(as, asr);
692 return -1;
693 }
694
695 /* Update the reader's descriptor. */
696 asr->transferred += res;
697 if (asr->transferred == asr->to_transfer) {
698 /* This read is completed. Move on to the next reader. */
699 _async_socket_advance_reader(as);
700
701 /* Notify reader completion. */
702 _async_socket_complete_io(as, asr);
703 _async_socket_io_destroy(asr);
704 }
705
706 /* Lets see if there are still active readers, and enable, or disable read
707 * I/O callback accordingly. */
708 if (as->readers_head != NULL) {
709 loopIo_wantRead(as->io);
710 } else {
711 loopIo_dontWantRead(as->io);
712 }
713
714 return 0;
715}
716
717/* A callback that is invoked when there is data available to write.
718 * Param:
719 * as - Initialized AsyncSocket instance.
720 * Return:
721 * 0 on success, or -1 on failure. Failure returned from this routine will
722 * skip reads (if awailable) behind this write.
723 */
724static int
725_on_async_socket_send(AsyncSocket* as)
726{
727 /* Get current writer. */
728 AsyncSocketIO* const asw = as->writers_head;
729 if (asw == NULL) {
730 D("No async socket writer available on IO_WRITE for '%s'",
731 _async_socket_string(as));
732 loopIo_dontWantWrite(as->io);
733 return 0;
734 }
735
736 /* Write next chunk of data. */
737 int res = socket_send(as->fd, asw->buffer + asw->transferred,
738 asw->to_transfer - asw->transferred);
739 while (res < 0 && errno == EINTR) {
740 res = socket_send(as->fd, asw->buffer + asw->transferred,
741 asw->to_transfer - asw->transferred);
742 }
743
744 if (res == 0) {
745 /* Socket has been disconnected. */
746 errno = ECONNRESET;
747 _on_async_socket_disconnected(as);
748 return -1;
749 }
750
751 if (res < 0) {
752 if (errno == EWOULDBLOCK || errno == EAGAIN) {
753 /* Yield to reads behind this write. */
754 loopIo_wantWrite(as->io);
755 return 0;
756 }
757
758 /* An I/O error. */
759 _on_async_socket_failure(as, asw);
760 return -1;
761 }
762
763 /* Update the reader descriptor. */
764 asw->transferred += res;
765 if (asw->transferred == asw->to_transfer) {
766 /* This write is completed. Move on to the next writer. */
767 _async_socket_advance_writer(as);
768
769 /* Notify writer completion. */
770 _async_socket_complete_io(as, asw);
771 _async_socket_io_destroy(asw);
772 }
773
774 /* Lets see if there are still active writers, and enable, or disable write
775 * I/O callback accordingly. */
776 if (as->writers_head != NULL) {
777 loopIo_wantWrite(as->io);
778 } else {
779 loopIo_dontWantWrite(as->io);
780 }
781
782 return 0;
783}
784
785/* A callback that is invoked when an I/O is available on socket.
786 * Param:
787 * as - Initialized AsyncSocket instance.
788 * fd - Socket's file descriptor.
789 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
790 */
791static void
792_on_async_socket_io(void* opaque, int fd, unsigned events)
793{
794 AsyncSocket* const as = (AsyncSocket*)opaque;
795
796 if ((events & LOOP_IO_READ) != 0) {
797 if (_on_async_socket_recv(as) != 0) {
798 return;
799 }
800 }
801
802 if ((events & LOOP_IO_WRITE) != 0) {
803 if (_on_async_socket_send(as) != 0) {
804 return;
805 }
806 }
807}
808
809/* A callback that is invoked by asynchronous socket connector on connection
810 * events.
811 * Param:
812 * opaque - Initialized AsyncSocket instance.
813 * connector - Connector that is used to connect this socket.
814 * event - Connection event.
815 * Return:
816 * One of ASCCbRes values.
817 */
818static ASCCbRes
819_on_connector_events(void* opaque,
820 AsyncSocketConnector* connector,
821 ASCEvent event)
822{
823 ASConnectAction action;
824 ASConnectStatus adsc_status;
825 AsyncSocket* const as = (AsyncSocket*)opaque;
826
827 /* Convert connector event into async socket connection event. */
828 switch (event) {
829 case ASC_CONNECTION_SUCCEEDED:
830 /* Accept the connection. */
831 adsc_status = ASCS_CONNECTED;
832 as->fd = async_socket_connector_pull_fd(connector);
833 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
834 break;
835
836 case ASC_CONNECTION_RETRY:
837 adsc_status = ASCS_RETRY;
838 break;
839
840 case ASC_CONNECTION_FAILED:
841 default:
842 adsc_status = ASCS_FAILURE;
843 break;
844 }
845
846 /* Invoke client's callback. */
847 action = as->client_cb->on_connection(as->client_opaque, as, adsc_status);
848 if (event == ASC_CONNECTION_SUCCEEDED && action != ASCA_KEEP) {
849 /* For whatever reason the client didn't want to keep this connection.
850 * Close it. */
851 _async_socket_close_socket(as);
852 }
853
854 if (action == ASCA_RETRY) {
855 return ASC_CB_RETRY;
856 } else if (action == ASCA_ABORT) {
857 return ASC_CB_ABORT;
858 } else {
859 return ASC_CB_KEEP;
860 }
861}
862
863/* Timer callback invoked to reconnect the lost connection.
864 * Param:
865 * as - Initialized AsyncSocket instance.
866 */
867void
868_on_async_socket_reconnect(void* opaque)
869{
870 AsyncSocket* as = (AsyncSocket*)opaque;
871 async_socket_connect(as, as->reconnect_to);
872}
873
874
875/********************************************************************************
876 * Android Device Socket public API
877 *******************************************************************************/
878
879AsyncSocket*
880async_socket_new(int port,
881 int reconnect_to,
882 const ASClientCb* client_cb,
883 void* client_opaque)
884{
885 AsyncSocket* as;
886
887 if (client_cb == NULL || client_cb->on_connection == NULL) {
888 E("Invalid client_cb parameter");
889 return NULL;
890 }
891
892 ANEW0(as);
893
894 as->fd = -1;
895 as->client_opaque = client_opaque;
896 as->client_cb = client_cb;
897 as->readers_head = as->readers_tail = NULL;
898 as->reconnect_to = reconnect_to;
899 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
900 as->looper = looper_newCore();
901 if (as->looper == NULL) {
902 E("Unable to create I/O looper for async socket '%s'",
903 _async_socket_string(as));
904 _async_socket_destroy(as);
905 return NULL;
906 }
907 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
908
909 return as;
910}
911
912int
913async_socket_connect(AsyncSocket* as, int retry_to)
914{
915 AsyncSocketConnector* const connector =
916 async_socket_connector_new(&as->address, retry_to, _on_connector_events, as);
917 if (connector == NULL) {
918 return -1;
919 }
920 return (async_socket_connector_connect(connector) == ASC_CONNECT_FAILED) ? -1 : 0;
921}
922
923void
924async_socket_disconnect(AsyncSocket* as)
925{
926 if (as != NULL) {
927 _async_socket_cancel_all_io(as);
928 _async_socket_close_socket(as);
929 _async_socket_destroy(as);
930 }
931}
932
933int
934async_socket_reconnect(AsyncSocket* as, int retry_to)
935{
936 _async_socket_cancel_all_io(as);
937 _async_socket_close_socket(as);
938 return async_socket_connect(as, retry_to);
939}
940
941int
942async_socket_read_abs(AsyncSocket* as,
943 void* buffer, uint32_t len,
944 const ASIOCb* reader_cb,
945 void* reader_opaque,
946 Duration deadline)
947{
948 AsyncSocketIO* const asr =
949 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
950 deadline);
951 if (as->readers_head == NULL) {
952 as->readers_head = as->readers_tail = asr;
953 } else {
954 as->readers_tail->next = asr;
955 as->readers_tail = asr;
956 }
957 loopIo_wantRead(as->io);
958 return 0;
959}
960
961int
962async_socket_read_rel(AsyncSocket* as,
963 void* buffer, uint32_t len,
964 const ASIOCb* reader_cb,
965 void* reader_opaque,
966 int to)
967{
968 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
969 DURATION_INFINITE;
970 return async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
971}
972
973int
974async_socket_write_abs(AsyncSocket* as,
975 const void* buffer, uint32_t len,
976 const ASIOCb* writer_cb,
977 void* writer_opaque,
978 Duration deadline)
979{
980 AsyncSocketIO* const asw =
981 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
982 deadline);
983 if (as->writers_head == NULL) {
984 as->writers_head = as->writers_tail = asw;
985 } else {
986 as->writers_tail->next = asw;
987 as->writers_tail = asw;
988 }
989 loopIo_wantWrite(as->io);
990 return 0;
991}
992
993int async_socket_write_rel(AsyncSocket* as,
994 const void* buffer, uint32_t len,
995 const ASIOCb* writer_cb,
996 void* writer_opaque,
997 int to)
998{
999 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1000 DURATION_INFINITE;
1001 return async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1002}