blob: 107cdbb8874a3fd11ac7e93f8574671c696aeabb [file] [log] [blame]
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070023#include "android/async-socket-connector.h"
24#include "android/async-socket.h"
David 'Digit' Turneraf81d742014-02-03 17:11:18 +010025#include "android/utils/debug.h"
26#include "android/utils/eintr_wrapper.h"
27#include "android/utils/panic.h"
David 'Digit' Turnerd413fa52013-12-14 23:35:20 +010028#include "android/iolooper.h"
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070029
30#define E(...) derror(__VA_ARGS__)
31#define W(...) dwarning(__VA_ARGS__)
32#define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
33#define D_ACTIVE VERBOSE_CHECK(asyncsocket)
34
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -070035#define TRACE_ON 0
36
37#if TRACE_ON
38#define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
39#else
40#define T(...)
41#endif
42
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070043/********************************************************************************
44 * Asynchronous Socket internal API declarations
45 *******************************************************************************/
46
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070047/* Gets socket's address string. */
48static const char* _async_socket_string(AsyncSocket* as);
49
50/* Gets socket's looper. */
51static Looper* _async_socket_get_looper(AsyncSocket* as);
52
53/* Handler for the I/O time out.
54 * Param:
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070055 * as - Asynchronous socket for the I/O.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070056 * asio - Desciptor for the timed out I/O.
57 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070058static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
59 AsyncSocketIO* asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070060
61/********************************************************************************
62 * Asynchronous Socket Reader / Writer
63 *******************************************************************************/
64
65struct AsyncSocketIO {
66 /* Next I/O in the reader, or writer list. */
67 AsyncSocketIO* next;
68 /* Asynchronous socket for this I/O. */
69 AsyncSocket* as;
70 /* Timer used for time outs on this I/O. */
71 LoopTimer timer[1];
72 /* An opaque pointer associated with this I/O. */
73 void* io_opaque;
74 /* Buffer where to read / write data. */
75 uint8_t* buffer;
76 /* Bytes to transfer through the socket for this I/O. */
77 uint32_t to_transfer;
78 /* Bytes thransferred through the socket in this I/O. */
79 uint32_t transferred;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070080 /* I/O callback for this I/O. */
81 on_as_io_cb on_io;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070082 /* I/O type selector: 1 - read, 0 - write. */
83 int is_io_read;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070084 /* State of the I/O. */
85 AsyncIOState state;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -070086 /* Number of outstanding references to the I/O. */
87 int ref_count;
88 /* Deadline for this I/O */
89 Duration deadline;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070090};
91
92/*
93 * Recycling I/O instances.
94 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
95 * them for faster allocation, rather than allocating and freeing them for each
96 * I/O on the socket.
97 */
98
99/* List of recycled I/O descriptors. */
100static AsyncSocketIO* _asio_recycled = NULL;
101/* Number of I/O descriptors that are recycled in the _asio_recycled list. */
102static int _recycled_asio_count = 0;
103/* Maximum number of I/O descriptors that can be recycled. */
104static const int _max_recycled_asio_num = 32;
105
106/* Handler for an I/O time-out timer event.
107 * When this routine is invoked, it indicates that a time out has occurred on an
108 * I/O.
109 * Param:
110 * opaque - AsyncSocketIO instance representing the timed out I/O.
111 */
112static void _on_async_socket_io_timed_out(void* opaque);
113
114/* Creates new I/O descriptor.
115 * Param:
116 * as - Asynchronous socket for the I/O.
117 * is_io_read - I/O type selector: 1 - read, 0 - write.
118 * buffer, len - Reader / writer buffer address.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700119 * io_cb - Callback for this reader / writer.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700120 * io_opaque - An opaque pointer associated with the I/O.
121 * deadline - Deadline to complete the I/O.
122 * Return:
123 * Initialized AsyncSocketIO instance.
124 */
125static AsyncSocketIO*
126_async_socket_rw_new(AsyncSocket* as,
127 int is_io_read,
128 void* buffer,
129 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700130 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700131 void* io_opaque,
132 Duration deadline)
133{
134 /* Lookup in the recycler first. */
135 AsyncSocketIO* asio = _asio_recycled;
136 if (asio != NULL) {
137 /* Pull the descriptor from recycler. */
138 _asio_recycled = asio->next;
139 _recycled_asio_count--;
140 } else {
141 /* No recycled descriptors. Allocate new one. */
142 ANEW0(asio);
143 }
144
145 asio->next = NULL;
146 asio->as = as;
147 asio->is_io_read = is_io_read;
148 asio->buffer = (uint8_t*)buffer;
149 asio->to_transfer = len;
150 asio->transferred = 0;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700151 asio->on_io = io_cb;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700152 asio->io_opaque = io_opaque;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700153 asio->state = ASIO_STATE_QUEUED;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700154 asio->ref_count = 1;
155 asio->deadline = deadline;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700156 loopTimer_init(asio->timer, _async_socket_get_looper(as),
157 _on_async_socket_io_timed_out, asio);
158 loopTimer_startAbsolute(asio->timer, deadline);
159
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700160 /* Reference socket that is holding this I/O. */
161 async_socket_reference(as);
162
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700163 T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
164 _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
165
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700166 return asio;
167}
168
169/* Destroys and frees I/O descriptor. */
170static void
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700171_async_socket_io_free(AsyncSocketIO* asio)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700172{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700173 AsyncSocket* const as = asio->as;
174
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700175 T("ASocket %s: %s I/O descriptor %p is destroyed.",
176 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
177
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700178 loopTimer_done(asio->timer);
179
180 /* Try to recycle it first, and free the memory if recycler is full. */
181 if (_recycled_asio_count < _max_recycled_asio_num) {
182 asio->next = _asio_recycled;
183 _asio_recycled = asio;
184 _recycled_asio_count++;
185 } else {
186 AFREE(asio);
187 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700188
189 /* Release socket that is holding this I/O. */
190 async_socket_release(as);
191}
192
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700193/* An I/O has been finished and its descriptor is about to be discarded. */
194static void
195_async_socket_io_finished(AsyncSocketIO* asio)
196{
197 /* Notify the client of the I/O that I/O is finished. */
198 asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
199}
200
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700201int
202async_socket_io_reference(AsyncSocketIO* asio)
203{
204 assert(asio->ref_count > 0);
205 asio->ref_count++;
206 return asio->ref_count;
207}
208
209int
210async_socket_io_release(AsyncSocketIO* asio)
211{
212 assert(asio->ref_count > 0);
213 asio->ref_count--;
214 if (asio->ref_count == 0) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700215 _async_socket_io_finished(asio);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700216 /* Last reference has been dropped. Destroy this object. */
217 _async_socket_io_free(asio);
218 return 0;
219 }
220 return asio->ref_count;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700221}
222
223/* Creates new asynchronous socket reader.
224 * Param:
225 * as - Asynchronous socket for the reader.
226 * buffer, len - Reader's buffer.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700227 * io_cb - Reader's callback.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700228 * reader_opaque - An opaque pointer associated with the reader.
229 * deadline - Deadline to complete the operation.
230 * Return:
231 * An initialized AsyncSocketIO intance.
232 */
233static AsyncSocketIO*
234_async_socket_reader_new(AsyncSocket* as,
235 void* buffer,
236 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700237 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700238 void* reader_opaque,
239 Duration deadline)
240{
241 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
242 reader_opaque, deadline);
243 return asio;
244}
245
246/* Creates new asynchronous socket writer.
247 * Param:
248 * as - Asynchronous socket for the writer.
249 * buffer, len - Writer's buffer.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700250 * io_cb - Writer's callback.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700251 * writer_opaque - An opaque pointer associated with the writer.
252 * deadline - Deadline to complete the operation.
253 * Return:
254 * An initialized AsyncSocketIO intance.
255 */
256static AsyncSocketIO*
257_async_socket_writer_new(AsyncSocket* as,
258 const void* buffer,
259 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700260 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700261 void* writer_opaque,
262 Duration deadline)
263{
264 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
265 io_cb, writer_opaque,
266 deadline);
267 return asio;
268}
269
270/* I/O timed out. */
271static void
272_on_async_socket_io_timed_out(void* opaque)
273{
274 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700275 AsyncSocket* const as = asio->as;
276
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700277 D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700278 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
279 asio->deadline, async_socket_deadline(as, 0));
280
281 /* Reference while in callback. */
282 async_socket_io_reference(asio);
283 _async_socket_io_timed_out(asio->as, asio);
284 async_socket_io_release(asio);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700285}
286
287/********************************************************************************
288 * Public Asynchronous Socket I/O API
289 *******************************************************************************/
290
291AsyncSocket*
292async_socket_io_get_socket(const AsyncSocketIO* asio)
293{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700294 async_socket_reference(asio->as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700295 return asio->as;
296}
297
298void
299async_socket_io_cancel_time_out(AsyncSocketIO* asio)
300{
301 loopTimer_stop(asio->timer);
302}
303
304void*
305async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
306{
307 return asio->io_opaque;
308}
309
310void*
311async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
312{
313 return async_socket_get_client_opaque(asio->as);
314}
315
316void*
317async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
318 uint32_t* transferred,
319 uint32_t* to_transfer)
320{
321 if (transferred != NULL) {
322 *transferred = asio->transferred;
323 }
324 if (to_transfer != NULL) {
325 *to_transfer = asio->to_transfer;
326 }
327 return asio->buffer;
328}
329
330void*
331async_socket_io_get_buffer(const AsyncSocketIO* asio)
332{
333 return asio->buffer;
334}
335
336uint32_t
337async_socket_io_get_transferred(const AsyncSocketIO* asio)
338{
339 return asio->transferred;
340}
341
342uint32_t
343async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
344{
345 return asio->to_transfer;
346}
347
348int
349async_socket_io_is_read(const AsyncSocketIO* asio)
350{
351 return asio->is_io_read;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700352}
353
354/********************************************************************************
355 * Asynchronous Socket internals
356 *******************************************************************************/
357
358struct AsyncSocket {
359 /* TCP address for the socket. */
360 SockAddress address;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700361 /* Connection callback for this socket. */
362 on_as_connection_cb on_connection;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700363 /* An opaque pointer associated with this socket by the client. */
364 void* client_opaque;
365 /* I/O looper for asynchronous I/O on the socket. */
366 Looper* looper;
367 /* I/O descriptor for asynchronous I/O on the socket. */
368 LoopIo io[1];
369 /* Timer to use for reconnection attempts. */
370 LoopTimer reconnect_timer[1];
371 /* Head of the list of the active readers. */
372 AsyncSocketIO* readers_head;
373 /* Tail of the list of the active readers. */
374 AsyncSocketIO* readers_tail;
375 /* Head of the list of the active writers. */
376 AsyncSocketIO* writers_head;
377 /* Tail of the list of the active writers. */
378 AsyncSocketIO* writers_tail;
379 /* Socket's file descriptor. */
380 int fd;
381 /* Timeout to use for reconnection attempts. */
382 int reconnect_to;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700383 /* Number of outstanding references to the socket. */
384 int ref_count;
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700385 /* Flags whether (1) or not (0) socket owns the looper. */
386 int owns_looper;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700387};
388
389static const char*
390_async_socket_string(AsyncSocket* as)
391{
392 return sock_address_to_string(&as->address);
393}
394
395static Looper*
396_async_socket_get_looper(AsyncSocket* as)
397{
398 return as->looper;
399}
400
401/* Pulls first reader out of the list.
402 * Param:
403 * as - Initialized AsyncSocket instance.
404 * Return:
405 * First I/O pulled out of the list, or NULL if there are no I/O in the list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700406 * Note that the caller is responsible for releasing the I/O object returned
407 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700408 */
409static AsyncSocketIO*
410_async_socket_pull_first_io(AsyncSocket* as,
411 AsyncSocketIO** list_head,
412 AsyncSocketIO** list_tail)
413{
414 AsyncSocketIO* const ret = *list_head;
415 if (ret != NULL) {
416 *list_head = ret->next;
417 ret->next = NULL;
418 if (*list_head == NULL) {
419 *list_tail = NULL;
420 }
421 }
422 return ret;
423}
424
425/* Pulls first reader out of the list.
426 * Param:
427 * as - Initialized AsyncSocket instance.
428 * Return:
429 * First reader pulled out of the list, or NULL if there are no readers in the
430 * list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700431 * Note that the caller is responsible for releasing the I/O object returned
432 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700433 */
434static AsyncSocketIO*
435_async_socket_pull_first_reader(AsyncSocket* as)
436{
437 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
438}
439
440/* Pulls first writer out of the list.
441 * Param:
442 * as - Initialized AsyncSocket instance.
443 * Return:
444 * First writer pulled out of the list, or NULL if there are no writers in the
445 * list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700446 * Note that the caller is responsible for releasing the I/O object returned
447 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700448 */
449static AsyncSocketIO*
450_async_socket_pull_first_writer(AsyncSocket* as)
451{
452 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
453}
454
455/* Removes an I/O descriptor from a list of active I/O.
456 * Param:
457 * as - Initialized AsyncSocket instance.
458 * list_head, list_tail - Pointers to the list head and tail.
459 * io - I/O to remove.
460 * Return:
461 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
462 */
463static int
464_async_socket_remove_io(AsyncSocket* as,
465 AsyncSocketIO** list_head,
466 AsyncSocketIO** list_tail,
467 AsyncSocketIO* io)
468{
469 AsyncSocketIO* prev = NULL;
470
471 while (*list_head != NULL && io != *list_head) {
472 prev = *list_head;
473 list_head = &((*list_head)->next);
474 }
475 if (*list_head == NULL) {
476 D("%s: I/O %p is not found in the list for socket '%s'",
477 __FUNCTION__, io, _async_socket_string(as));
478 return 0;
479 }
480
481 *list_head = io->next;
482 if (prev != NULL) {
483 prev->next = io->next;
484 }
485 if (*list_tail == io) {
486 *list_tail = prev;
487 }
488
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700489 /* Release I/O adjusting reference added when I/O has been saved in the list. */
490 async_socket_io_release(io);
491
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700492 return 1;
493}
494
495/* Advances to the next I/O in the list.
496 * Param:
497 * as - Initialized AsyncSocket instance.
498 * list_head, list_tail - Pointers to the list head and tail.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700499 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700500static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700501_async_socket_advance_io(AsyncSocket* as,
502 AsyncSocketIO** list_head,
503 AsyncSocketIO** list_tail)
504{
505 AsyncSocketIO* first_io = *list_head;
506 if (first_io != NULL) {
507 *list_head = first_io->next;
508 first_io->next = NULL;
509 }
510 if (*list_head == NULL) {
511 *list_tail = NULL;
512 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700513 if (first_io != NULL) {
514 /* Release I/O removed from the head of the list. */
515 async_socket_io_release(first_io);
516 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700517}
518
519/* Advances to the next reader in the list.
520 * Param:
521 * as - Initialized AsyncSocket instance.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700522 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700523static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700524_async_socket_advance_reader(AsyncSocket* as)
525{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700526 _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700527}
528
529/* Advances to the next writer in the list.
530 * Param:
531 * as - Initialized AsyncSocket instance.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700532 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700533static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700534_async_socket_advance_writer(AsyncSocket* as)
535{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700536 _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700537}
538
539/* Completes an I/O.
540 * Param:
541 * as - Initialized AsyncSocket instance.
542 * asio - I/O to complete.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700543 * Return:
544 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700545 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700546static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700547_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
548{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700549 T("ASocket %s: %s I/O %p is completed.",
550 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
551
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700552 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700553 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700554
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700555 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700556}
557
558/* Timeouts an I/O.
559 * Param:
560 * as - Initialized AsyncSocket instance.
561 * asio - An I/O that has timed out.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700562 * Return:
563 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700564 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700565static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700566_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
567{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700568 T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
569 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
570 asio->deadline, async_socket_deadline(as, 0));
571
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700572 /* Report to the client. */
573 const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
574 ASIO_STATE_TIMED_OUT);
575
576 /* Remove the I/O from a list of active I/O for actions other than retry. */
577 if (action != ASIO_ACTION_RETRY) {
578 if (asio->is_io_read) {
579 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
580 } else {
581 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
582 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700583 }
584
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700585 return action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700586}
587
588/* Cancels an I/O.
589 * Param:
590 * as - Initialized AsyncSocket instance.
591 * asio - An I/O to cancel.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700592 * Return:
593 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700594 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700595static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700596_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
597{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700598 T("ASocket %s: %s I/O %p is cancelled.",
599 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
600
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700601 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700602 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700603
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700604 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700605}
606
607/* Reports an I/O failure.
608 * Param:
609 * as - Initialized AsyncSocket instance.
610 * asio - An I/O that has failed. Can be NULL for general failures.
611 * failure - Failure (errno) that has occurred.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700612 * Return:
613 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700614 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700615static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700616_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
617{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700618 T("ASocket %s: %s I/O %p has failed: %d -> %s",
619 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
620 failure, strerror(failure));
621
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700622 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700623 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700624
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700625 errno = failure;
626 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700627}
628
629/* Cancels all the active socket readers.
630 * Param:
631 * as - Initialized AsyncSocket instance.
632 */
633static void
634_async_socket_cancel_readers(AsyncSocket* as)
635{
636 while (as->readers_head != NULL) {
637 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700638 /* We ignore action returned from the cancellation callback, since we're
639 * in a disconnected state here. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700640 _async_socket_cancel_io(as, to_cancel);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700641 async_socket_io_release(to_cancel);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700642 }
643}
644
645/* Cancels all the active socket writers.
646 * Param:
647 * as - Initialized AsyncSocket instance.
648 */
649static void
650_async_socket_cancel_writers(AsyncSocket* as)
651{
652 while (as->writers_head != NULL) {
653 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700654 /* We ignore action returned from the cancellation callback, since we're
655 * in a disconnected state here. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700656 _async_socket_cancel_io(as, to_cancel);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700657 async_socket_io_release(to_cancel);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700658 }
659}
660
661/* Cancels all the I/O on the socket. */
662static void
663_async_socket_cancel_all_io(AsyncSocket* as)
664{
665 /* Stop the reconnection timer. */
666 loopTimer_stop(as->reconnect_timer);
667
668 /* Stop read / write on the socket. */
669 loopIo_dontWantWrite(as->io);
670 loopIo_dontWantRead(as->io);
671
672 /* Cancel active readers and writers. */
673 _async_socket_cancel_readers(as);
674 _async_socket_cancel_writers(as);
675}
676
677/* Closes socket handle used by the async socket.
678 * Param:
679 * as - Initialized AsyncSocket instance.
680 */
681static void
682_async_socket_close_socket(AsyncSocket* as)
683{
684 if (as->fd >= 0) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700685 T("ASocket %s: Socket handle %d is closed.",
686 _async_socket_string(as), as->fd);
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -0700687 loopIo_done(as->io);
688 socket_close(as->fd);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700689 as->fd = -1;
690 }
691}
692
693/* Destroys AsyncSocket instance.
694 * Param:
695 * as - Initialized AsyncSocket instance.
696 */
697static void
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700698_async_socket_free(AsyncSocket* as)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700699{
700 if (as != NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700701 T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700702
703 /* Close socket. */
704 _async_socket_close_socket(as);
705
706 /* Free allocated resources. */
707 if (as->looper != NULL) {
708 loopTimer_done(as->reconnect_timer);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700709 if (as->owns_looper) {
710 looper_free(as->looper);
711 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700712 }
713 sock_address_done(&as->address);
714 AFREE(as);
715 }
716}
717
718/* Starts reconnection attempts after connection has been lost.
719 * Param:
720 * as - Initialized AsyncSocket instance.
721 * to - Milliseconds to wait before reconnection attempt.
722 */
723static void
724_async_socket_reconnect(AsyncSocket* as, int to)
725{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700726 T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
727
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700728 /* Make sure that no I/O is active, and socket is closed before we
729 * reconnect. */
730 _async_socket_cancel_all_io(as);
731
732 /* Set the timer for reconnection attempt. */
733 loopTimer_startRelative(as->reconnect_timer, to);
734}
735
736/********************************************************************************
737 * Asynchronous Socket callbacks
738 *******************************************************************************/
739
740/* A callback that is invoked when socket gets disconnected.
741 * Param:
742 * as - Initialized AsyncSocket instance.
743 */
744static void
745_on_async_socket_disconnected(AsyncSocket* as)
746{
747 /* Save error to restore it for the client's callback. */
748 const int save_errno = errno;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700749 AsyncIOAction action = ASIO_ACTION_ABORT;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700750
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700751 D("ASocket %s: Disconnected.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700752
753 /* Cancel all the I/O on this socket. */
754 _async_socket_cancel_all_io(as);
755
756 /* Close the socket. */
757 _async_socket_close_socket(as);
758
759 /* Restore errno, and invoke client's callback. */
760 errno = save_errno;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700761 action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700762
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700763 if (action == ASIO_ACTION_RETRY) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700764 /* Client requested reconnection. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700765 _async_socket_reconnect(as, as->reconnect_to);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700766 }
767}
768
769/* A callback that is invoked on socket's I/O failure.
770 * Param:
771 * as - Initialized AsyncSocket instance.
772 * asio - Descriptor for the failed I/O. Can be NULL for general failures.
773 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700774static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700775_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
776{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700777 D("ASocket %s: %s I/O failure: %d -> %s",
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700778 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
779 errno, strerror(errno));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700780
781 /* Report the failure. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700782 return _async_socket_io_failure(as, asio, errno);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700783}
784
785/* A callback that is invoked when there is data available to read.
786 * Param:
787 * as - Initialized AsyncSocket instance.
788 * Return:
789 * 0 on success, or -1 on failure. Failure returned from this routine will
790 * skip writes (if awailable) behind this read.
791 */
792static int
793_on_async_socket_recv(AsyncSocket* as)
794{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700795 AsyncIOAction action;
796
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700797 /* Get current reader. */
798 AsyncSocketIO* const asr = as->readers_head;
799 if (asr == NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700800 D("ASocket %s: No reader is available.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700801 loopIo_dontWantRead(as->io);
802 return 0;
803 }
804
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700805 /* Reference the reader while we're working with it in this callback. */
806 async_socket_io_reference(asr);
807
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700808 /* Bump I/O state, and inform the client that I/O is in progress. */
809 if (asr->state == ASIO_STATE_QUEUED) {
810 asr->state = ASIO_STATE_STARTED;
811 } else {
812 asr->state = ASIO_STATE_CONTINUES;
813 }
814 action = asr->on_io(asr->io_opaque, asr, asr->state);
815 if (action == ASIO_ACTION_ABORT) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700816 D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700817 /* Move on to the next reader. */
818 _async_socket_advance_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700819 /* Lets see if there are still active readers, and enable, or disable
820 * read I/O callback accordingly. */
821 if (as->readers_head != NULL) {
822 loopIo_wantRead(as->io);
823 } else {
824 loopIo_dontWantRead(as->io);
825 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700826 async_socket_io_release(asr);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700827 return 0;
828 }
829
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700830 /* Read next chunk of data. */
David 'Digit' Turneraf81d742014-02-03 17:11:18 +0100831 int res = HANDLE_EINTR(
832 socket_recv(as->fd,
833 asr->buffer + asr->transferred,
834 asr->to_transfer - asr->transferred));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700835 if (res == 0) {
836 /* Socket has been disconnected. */
837 errno = ECONNRESET;
838 _on_async_socket_disconnected(as);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700839 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700840 return -1;
841 }
842
843 if (res < 0) {
844 if (errno == EWOULDBLOCK || errno == EAGAIN) {
845 /* Yield to writes behind this read. */
846 loopIo_wantRead(as->io);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700847 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700848 return 0;
849 }
850
851 /* An I/O error. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700852 action = _on_async_socket_failure(as, asr);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700853 if (action != ASIO_ACTION_RETRY) {
854 D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700855 /* Move on to the next reader. */
856 _async_socket_advance_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700857 /* Lets see if there are still active readers, and enable, or disable
858 * read I/O callback accordingly. */
859 if (as->readers_head != NULL) {
860 loopIo_wantRead(as->io);
861 } else {
862 loopIo_dontWantRead(as->io);
863 }
864 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700865 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700866 return -1;
867 }
868
869 /* Update the reader's descriptor. */
870 asr->transferred += res;
871 if (asr->transferred == asr->to_transfer) {
872 /* This read is completed. Move on to the next reader. */
873 _async_socket_advance_reader(as);
874
875 /* Notify reader completion. */
876 _async_socket_complete_io(as, asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700877 }
878
879 /* Lets see if there are still active readers, and enable, or disable read
880 * I/O callback accordingly. */
881 if (as->readers_head != NULL) {
882 loopIo_wantRead(as->io);
883 } else {
884 loopIo_dontWantRead(as->io);
885 }
886
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700887 async_socket_io_release(asr);
888
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700889 return 0;
890}
891
892/* A callback that is invoked when there is data available to write.
893 * Param:
894 * as - Initialized AsyncSocket instance.
895 * Return:
896 * 0 on success, or -1 on failure. Failure returned from this routine will
897 * skip reads (if awailable) behind this write.
898 */
899static int
900_on_async_socket_send(AsyncSocket* as)
901{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700902 AsyncIOAction action;
903
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700904 /* Get current writer. */
905 AsyncSocketIO* const asw = as->writers_head;
906 if (asw == NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700907 D("ASocket %s: No writer is available.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700908 loopIo_dontWantWrite(as->io);
909 return 0;
910 }
911
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700912 /* Reference the writer while we're working with it in this callback. */
913 async_socket_io_reference(asw);
914
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700915 /* Bump I/O state, and inform the client that I/O is in progress. */
916 if (asw->state == ASIO_STATE_QUEUED) {
917 asw->state = ASIO_STATE_STARTED;
918 } else {
919 asw->state = ASIO_STATE_CONTINUES;
920 }
921 action = asw->on_io(asw->io_opaque, asw, asw->state);
922 if (action == ASIO_ACTION_ABORT) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700923 D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700924 /* Move on to the next writer. */
925 _async_socket_advance_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700926 /* Lets see if there are still active writers, and enable, or disable
927 * write I/O callback accordingly. */
928 if (as->writers_head != NULL) {
929 loopIo_wantWrite(as->io);
930 } else {
931 loopIo_dontWantWrite(as->io);
932 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700933 async_socket_io_release(asw);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700934 return 0;
935 }
936
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700937 /* Write next chunk of data. */
David 'Digit' Turneraf81d742014-02-03 17:11:18 +0100938 int res = HANDLE_EINTR(
939 socket_send(as->fd,
940 asw->buffer + asw->transferred,
941 asw->to_transfer - asw->transferred));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700942 if (res == 0) {
943 /* Socket has been disconnected. */
944 errno = ECONNRESET;
945 _on_async_socket_disconnected(as);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700946 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700947 return -1;
948 }
949
950 if (res < 0) {
951 if (errno == EWOULDBLOCK || errno == EAGAIN) {
952 /* Yield to reads behind this write. */
953 loopIo_wantWrite(as->io);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700954 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700955 return 0;
956 }
957
958 /* An I/O error. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700959 action = _on_async_socket_failure(as, asw);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700960 if (action != ASIO_ACTION_RETRY) {
961 D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700962 /* Move on to the next writer. */
963 _async_socket_advance_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700964 /* Lets see if there are still active writers, and enable, or disable
965 * write I/O callback accordingly. */
966 if (as->writers_head != NULL) {
967 loopIo_wantWrite(as->io);
968 } else {
969 loopIo_dontWantWrite(as->io);
970 }
971 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700972 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700973 return -1;
974 }
975
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700976 /* Update the writer descriptor. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700977 asw->transferred += res;
978 if (asw->transferred == asw->to_transfer) {
979 /* This write is completed. Move on to the next writer. */
980 _async_socket_advance_writer(as);
981
982 /* Notify writer completion. */
983 _async_socket_complete_io(as, asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700984 }
985
986 /* Lets see if there are still active writers, and enable, or disable write
987 * I/O callback accordingly. */
988 if (as->writers_head != NULL) {
989 loopIo_wantWrite(as->io);
990 } else {
991 loopIo_dontWantWrite(as->io);
992 }
993
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700994 async_socket_io_release(asw);
995
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700996 return 0;
997}
998
999/* A callback that is invoked when an I/O is available on socket.
1000 * Param:
1001 * as - Initialized AsyncSocket instance.
1002 * fd - Socket's file descriptor.
1003 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1004 */
1005static void
1006_on_async_socket_io(void* opaque, int fd, unsigned events)
1007{
1008 AsyncSocket* const as = (AsyncSocket*)opaque;
1009
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001010 /* Reference the socket while we're working with it in this callback. */
1011 async_socket_reference(as);
1012
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001013 if ((events & LOOP_IO_READ) != 0) {
1014 if (_on_async_socket_recv(as) != 0) {
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001015 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001016 return;
1017 }
1018 }
1019
1020 if ((events & LOOP_IO_WRITE) != 0) {
1021 if (_on_async_socket_send(as) != 0) {
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001022 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001023 return;
1024 }
1025 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001026
1027 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001028}
1029
1030/* A callback that is invoked by asynchronous socket connector on connection
1031 * events.
1032 * Param:
1033 * opaque - Initialized AsyncSocket instance.
1034 * connector - Connector that is used to connect this socket.
1035 * event - Connection event.
1036 * Return:
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001037 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001038 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001039static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001040_on_connector_events(void* opaque,
1041 AsyncSocketConnector* connector,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001042 AsyncIOState event)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001043{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001044 AsyncIOAction action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001045 AsyncSocket* const as = (AsyncSocket*)opaque;
1046
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001047 /* Reference the socket while we're working with it in this callback. */
1048 async_socket_reference(as);
1049
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001050 if (event == ASIO_STATE_SUCCEEDED) {
1051 /* Accept the connection. */
1052 as->fd = async_socket_connector_pull_fd(connector);
1053 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001054 }
1055
1056 /* Invoke client's callback. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001057 action = as->on_connection(as->client_opaque, as, event);
1058 if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001059 /* For whatever reason the client didn't want to keep this connection.
1060 * Close it. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001061 D("ASocket %s: Connection is discarded by the client.",
1062 _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001063 _async_socket_close_socket(as);
1064 }
1065
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001066 if (action != ASIO_ACTION_RETRY) {
1067 async_socket_connector_release(connector);
1068 }
1069
1070 async_socket_release(as);
1071
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001072 return action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001073}
1074
1075/* Timer callback invoked to reconnect the lost connection.
1076 * Param:
1077 * as - Initialized AsyncSocket instance.
1078 */
1079void
1080_on_async_socket_reconnect(void* opaque)
1081{
1082 AsyncSocket* as = (AsyncSocket*)opaque;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001083
1084 /* Reference the socket while we're working with it in this callback. */
1085 async_socket_reference(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001086 async_socket_connect(as, as->reconnect_to);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001087 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001088}
1089
1090
1091/********************************************************************************
1092 * Android Device Socket public API
1093 *******************************************************************************/
1094
1095AsyncSocket*
1096async_socket_new(int port,
1097 int reconnect_to,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001098 on_as_connection_cb client_cb,
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001099 void* client_opaque,
1100 Looper* looper)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001101{
1102 AsyncSocket* as;
1103
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001104 if (client_cb == NULL) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001105 E("Invalid client_cb parameter");
1106 return NULL;
1107 }
1108
1109 ANEW0(as);
1110
1111 as->fd = -1;
1112 as->client_opaque = client_opaque;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001113 as->on_connection = client_cb;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001114 as->readers_head = as->readers_tail = NULL;
1115 as->reconnect_to = reconnect_to;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001116 as->ref_count = 1;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001117 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001118 if (looper == NULL) {
1119 as->looper = looper_newCore();
1120 if (as->looper == NULL) {
1121 E("Unable to create I/O looper for async socket '%s'",
1122 _async_socket_string(as));
1123 client_cb(client_opaque, as, ASIO_STATE_FAILED);
1124 _async_socket_free(as);
1125 return NULL;
1126 }
1127 as->owns_looper = 1;
1128 } else {
1129 as->looper = looper;
1130 as->owns_looper = 0;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001131 }
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001132
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001133 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1134
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001135 T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1136
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001137 return as;
1138}
1139
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001140int
1141async_socket_reference(AsyncSocket* as)
1142{
1143 assert(as->ref_count > 0);
1144 as->ref_count++;
1145 return as->ref_count;
1146}
1147
1148int
1149async_socket_release(AsyncSocket* as)
1150{
1151 assert(as->ref_count > 0);
1152 as->ref_count--;
1153 if (as->ref_count == 0) {
1154 /* Last reference has been dropped. Destroy this object. */
1155 _async_socket_cancel_all_io(as);
1156 _async_socket_free(as);
1157 return 0;
1158 }
1159 return as->ref_count;
1160}
1161
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001162void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001163async_socket_connect(AsyncSocket* as, int retry_to)
1164{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001165 T("ASocket %s: Handling connection request for %dms...",
1166 _async_socket_string(as), retry_to);
1167
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001168 AsyncSocketConnector* const connector =
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001169 async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1170 as, as->looper);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001171 if (connector != NULL) {
1172 async_socket_connector_connect(connector);
1173 } else {
1174 as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001175 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001176}
1177
1178void
1179async_socket_disconnect(AsyncSocket* as)
1180{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001181 T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1182
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001183 if (as != NULL) {
1184 _async_socket_cancel_all_io(as);
1185 _async_socket_close_socket(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001186 }
1187}
1188
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001189void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001190async_socket_reconnect(AsyncSocket* as, int retry_to)
1191{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001192 T("ASocket %s: Handling reconnection request for %dms...",
1193 _async_socket_string(as), retry_to);
1194
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001195 _async_socket_cancel_all_io(as);
1196 _async_socket_close_socket(as);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001197 _async_socket_reconnect(as, retry_to);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001198}
1199
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001200void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001201async_socket_read_abs(AsyncSocket* as,
1202 void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001203 on_as_io_cb reader_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001204 void* reader_opaque,
1205 Duration deadline)
1206{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001207 T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1208 _async_socket_string(as), len, deadline);
1209
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001210 AsyncSocketIO* const asr =
1211 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1212 deadline);
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001213 if (async_socket_is_connected(as)) {
1214 /* Add new reader to the list. Note that we use initial reference from I/O
1215 * 'new' routine as "in the list" reference counter. */
1216 if (as->readers_head == NULL) {
1217 as->readers_head = as->readers_tail = asr;
1218 } else {
1219 as->readers_tail->next = asr;
1220 as->readers_tail = asr;
1221 }
1222 loopIo_wantRead(as->io);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001223 } else {
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001224 D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
1225 errno = ECONNRESET;
1226 reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
1227 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001228 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001229}
1230
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001231void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001232async_socket_read_rel(AsyncSocket* as,
1233 void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001234 on_as_io_cb reader_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001235 void* reader_opaque,
1236 int to)
1237{
1238 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1239 DURATION_INFINITE;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001240 async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001241}
1242
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001243void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001244async_socket_write_abs(AsyncSocket* as,
1245 const void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001246 on_as_io_cb writer_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001247 void* writer_opaque,
1248 Duration deadline)
1249{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001250 T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1251 _async_socket_string(as), len, deadline);
1252
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001253 AsyncSocketIO* const asw =
1254 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1255 deadline);
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001256 if (async_socket_is_connected(as)) {
1257 /* Add new writer to the list. Note that we use initial reference from I/O
1258 * 'new' routine as "in the list" reference counter. */
1259 if (as->writers_head == NULL) {
1260 as->writers_head = as->writers_tail = asw;
1261 } else {
1262 as->writers_tail->next = asw;
1263 as->writers_tail = asw;
1264 }
1265 loopIo_wantWrite(as->io);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001266 } else {
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001267 D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
1268 errno = ECONNRESET;
1269 writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
1270 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001271 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001272}
1273
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001274void
1275async_socket_write_rel(AsyncSocket* as,
1276 const void* buffer, uint32_t len,
1277 on_as_io_cb writer_cb,
1278 void* writer_opaque,
1279 int to)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001280{
1281 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1282 DURATION_INFINITE;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001283 async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1284}
1285
1286void*
1287async_socket_get_client_opaque(const AsyncSocket* as)
1288{
1289 return as->client_opaque;
1290}
1291
1292Duration
1293async_socket_deadline(AsyncSocket* as, int rel)
1294{
1295 return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1296 DURATION_INFINITE;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001297}
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001298
1299int
1300async_socket_get_port(const AsyncSocket* as)
1301{
1302 return sock_address_get_port(&as->address);
1303}
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001304
1305int
1306async_socket_is_connected(const AsyncSocket* as)
1307{
1308 return as->fd >= 0;
1309}