blob: 5e2ae293e7372d5ede05df139ae1ca9bc3a13415 [file] [log] [blame]
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
23#include "qemu-common.h"
24#include "android/async-utils.h"
25#include "android/utils/debug.h"
26#include "android/async-socket-connector.h"
27#include "android/async-socket.h"
28#include "utils/panic.h"
29#include "iolooper.h"
30
31#define E(...) derror(__VA_ARGS__)
32#define W(...) dwarning(__VA_ARGS__)
33#define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
34#define D_ACTIVE VERBOSE_CHECK(asyncsocket)
35
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -070036#define TRACE_ON 0
37
38#if TRACE_ON
39#define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
40#else
41#define T(...)
42#endif
43
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070044/********************************************************************************
45 * Asynchronous Socket internal API declarations
46 *******************************************************************************/
47
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070048/* Gets socket's address string. */
49static const char* _async_socket_string(AsyncSocket* as);
50
51/* Gets socket's looper. */
52static Looper* _async_socket_get_looper(AsyncSocket* as);
53
54/* Handler for the I/O time out.
55 * Param:
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070056 * as - Asynchronous socket for the I/O.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070057 * asio - Desciptor for the timed out I/O.
58 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070059static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
60 AsyncSocketIO* asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070061
62/********************************************************************************
63 * Asynchronous Socket Reader / Writer
64 *******************************************************************************/
65
66struct AsyncSocketIO {
67 /* Next I/O in the reader, or writer list. */
68 AsyncSocketIO* next;
69 /* Asynchronous socket for this I/O. */
70 AsyncSocket* as;
71 /* Timer used for time outs on this I/O. */
72 LoopTimer timer[1];
73 /* An opaque pointer associated with this I/O. */
74 void* io_opaque;
75 /* Buffer where to read / write data. */
76 uint8_t* buffer;
77 /* Bytes to transfer through the socket for this I/O. */
78 uint32_t to_transfer;
79 /* Bytes thransferred through the socket in this I/O. */
80 uint32_t transferred;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070081 /* I/O callback for this I/O. */
82 on_as_io_cb on_io;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070083 /* I/O type selector: 1 - read, 0 - write. */
84 int is_io_read;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070085 /* State of the I/O. */
86 AsyncIOState state;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -070087 /* Number of outstanding references to the I/O. */
88 int ref_count;
89 /* Deadline for this I/O */
90 Duration deadline;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070091};
92
93/*
94 * Recycling I/O instances.
95 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
96 * them for faster allocation, rather than allocating and freeing them for each
97 * I/O on the socket.
98 */
99
100/* List of recycled I/O descriptors. */
101static AsyncSocketIO* _asio_recycled = NULL;
102/* Number of I/O descriptors that are recycled in the _asio_recycled list. */
103static int _recycled_asio_count = 0;
104/* Maximum number of I/O descriptors that can be recycled. */
105static const int _max_recycled_asio_num = 32;
106
107/* Handler for an I/O time-out timer event.
108 * When this routine is invoked, it indicates that a time out has occurred on an
109 * I/O.
110 * Param:
111 * opaque - AsyncSocketIO instance representing the timed out I/O.
112 */
113static void _on_async_socket_io_timed_out(void* opaque);
114
115/* Creates new I/O descriptor.
116 * Param:
117 * as - Asynchronous socket for the I/O.
118 * is_io_read - I/O type selector: 1 - read, 0 - write.
119 * buffer, len - Reader / writer buffer address.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700120 * io_cb - Callback for this reader / writer.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700121 * io_opaque - An opaque pointer associated with the I/O.
122 * deadline - Deadline to complete the I/O.
123 * Return:
124 * Initialized AsyncSocketIO instance.
125 */
126static AsyncSocketIO*
127_async_socket_rw_new(AsyncSocket* as,
128 int is_io_read,
129 void* buffer,
130 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700131 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700132 void* io_opaque,
133 Duration deadline)
134{
135 /* Lookup in the recycler first. */
136 AsyncSocketIO* asio = _asio_recycled;
137 if (asio != NULL) {
138 /* Pull the descriptor from recycler. */
139 _asio_recycled = asio->next;
140 _recycled_asio_count--;
141 } else {
142 /* No recycled descriptors. Allocate new one. */
143 ANEW0(asio);
144 }
145
146 asio->next = NULL;
147 asio->as = as;
148 asio->is_io_read = is_io_read;
149 asio->buffer = (uint8_t*)buffer;
150 asio->to_transfer = len;
151 asio->transferred = 0;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700152 asio->on_io = io_cb;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700153 asio->io_opaque = io_opaque;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700154 asio->state = ASIO_STATE_QUEUED;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700155 asio->ref_count = 1;
156 asio->deadline = deadline;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700157 loopTimer_init(asio->timer, _async_socket_get_looper(as),
158 _on_async_socket_io_timed_out, asio);
159 loopTimer_startAbsolute(asio->timer, deadline);
160
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700161 /* Reference socket that is holding this I/O. */
162 async_socket_reference(as);
163
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700164 T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
165 _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
166
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700167 return asio;
168}
169
170/* Destroys and frees I/O descriptor. */
171static void
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700172_async_socket_io_free(AsyncSocketIO* asio)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700173{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700174 AsyncSocket* const as = asio->as;
175
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700176 T("ASocket %s: %s I/O descriptor %p is destroyed.",
177 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
178
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700179 loopTimer_done(asio->timer);
180
181 /* Try to recycle it first, and free the memory if recycler is full. */
182 if (_recycled_asio_count < _max_recycled_asio_num) {
183 asio->next = _asio_recycled;
184 _asio_recycled = asio;
185 _recycled_asio_count++;
186 } else {
187 AFREE(asio);
188 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700189
190 /* Release socket that is holding this I/O. */
191 async_socket_release(as);
192}
193
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700194/* An I/O has been finished and its descriptor is about to be discarded. */
195static void
196_async_socket_io_finished(AsyncSocketIO* asio)
197{
198 /* Notify the client of the I/O that I/O is finished. */
199 asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
200}
201
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700202int
203async_socket_io_reference(AsyncSocketIO* asio)
204{
205 assert(asio->ref_count > 0);
206 asio->ref_count++;
207 return asio->ref_count;
208}
209
210int
211async_socket_io_release(AsyncSocketIO* asio)
212{
213 assert(asio->ref_count > 0);
214 asio->ref_count--;
215 if (asio->ref_count == 0) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700216 _async_socket_io_finished(asio);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700217 /* Last reference has been dropped. Destroy this object. */
218 _async_socket_io_free(asio);
219 return 0;
220 }
221 return asio->ref_count;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700222}
223
224/* Creates new asynchronous socket reader.
225 * Param:
226 * as - Asynchronous socket for the reader.
227 * buffer, len - Reader's buffer.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700228 * io_cb - Reader's callback.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700229 * reader_opaque - An opaque pointer associated with the reader.
230 * deadline - Deadline to complete the operation.
231 * Return:
232 * An initialized AsyncSocketIO intance.
233 */
234static AsyncSocketIO*
235_async_socket_reader_new(AsyncSocket* as,
236 void* buffer,
237 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700238 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700239 void* reader_opaque,
240 Duration deadline)
241{
242 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
243 reader_opaque, deadline);
244 return asio;
245}
246
247/* Creates new asynchronous socket writer.
248 * Param:
249 * as - Asynchronous socket for the writer.
250 * buffer, len - Writer's buffer.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700251 * io_cb - Writer's callback.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700252 * writer_opaque - An opaque pointer associated with the writer.
253 * deadline - Deadline to complete the operation.
254 * Return:
255 * An initialized AsyncSocketIO intance.
256 */
257static AsyncSocketIO*
258_async_socket_writer_new(AsyncSocket* as,
259 const void* buffer,
260 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700261 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700262 void* writer_opaque,
263 Duration deadline)
264{
265 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
266 io_cb, writer_opaque,
267 deadline);
268 return asio;
269}
270
271/* I/O timed out. */
272static void
273_on_async_socket_io_timed_out(void* opaque)
274{
275 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700276 AsyncSocket* const as = asio->as;
277
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700278 D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700279 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
280 asio->deadline, async_socket_deadline(as, 0));
281
282 /* Reference while in callback. */
283 async_socket_io_reference(asio);
284 _async_socket_io_timed_out(asio->as, asio);
285 async_socket_io_release(asio);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700286}
287
288/********************************************************************************
289 * Public Asynchronous Socket I/O API
290 *******************************************************************************/
291
292AsyncSocket*
293async_socket_io_get_socket(const AsyncSocketIO* asio)
294{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700295 async_socket_reference(asio->as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700296 return asio->as;
297}
298
299void
300async_socket_io_cancel_time_out(AsyncSocketIO* asio)
301{
302 loopTimer_stop(asio->timer);
303}
304
305void*
306async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
307{
308 return asio->io_opaque;
309}
310
311void*
312async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
313{
314 return async_socket_get_client_opaque(asio->as);
315}
316
317void*
318async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
319 uint32_t* transferred,
320 uint32_t* to_transfer)
321{
322 if (transferred != NULL) {
323 *transferred = asio->transferred;
324 }
325 if (to_transfer != NULL) {
326 *to_transfer = asio->to_transfer;
327 }
328 return asio->buffer;
329}
330
331void*
332async_socket_io_get_buffer(const AsyncSocketIO* asio)
333{
334 return asio->buffer;
335}
336
337uint32_t
338async_socket_io_get_transferred(const AsyncSocketIO* asio)
339{
340 return asio->transferred;
341}
342
343uint32_t
344async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
345{
346 return asio->to_transfer;
347}
348
349int
350async_socket_io_is_read(const AsyncSocketIO* asio)
351{
352 return asio->is_io_read;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700353}
354
355/********************************************************************************
356 * Asynchronous Socket internals
357 *******************************************************************************/
358
359struct AsyncSocket {
360 /* TCP address for the socket. */
361 SockAddress address;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700362 /* Connection callback for this socket. */
363 on_as_connection_cb on_connection;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700364 /* An opaque pointer associated with this socket by the client. */
365 void* client_opaque;
366 /* I/O looper for asynchronous I/O on the socket. */
367 Looper* looper;
368 /* I/O descriptor for asynchronous I/O on the socket. */
369 LoopIo io[1];
370 /* Timer to use for reconnection attempts. */
371 LoopTimer reconnect_timer[1];
372 /* Head of the list of the active readers. */
373 AsyncSocketIO* readers_head;
374 /* Tail of the list of the active readers. */
375 AsyncSocketIO* readers_tail;
376 /* Head of the list of the active writers. */
377 AsyncSocketIO* writers_head;
378 /* Tail of the list of the active writers. */
379 AsyncSocketIO* writers_tail;
380 /* Socket's file descriptor. */
381 int fd;
382 /* Timeout to use for reconnection attempts. */
383 int reconnect_to;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700384 /* Number of outstanding references to the socket. */
385 int ref_count;
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700386 /* Flags whether (1) or not (0) socket owns the looper. */
387 int owns_looper;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700388};
389
390static const char*
391_async_socket_string(AsyncSocket* as)
392{
393 return sock_address_to_string(&as->address);
394}
395
396static Looper*
397_async_socket_get_looper(AsyncSocket* as)
398{
399 return as->looper;
400}
401
402/* Pulls first reader out of the list.
403 * Param:
404 * as - Initialized AsyncSocket instance.
405 * Return:
406 * First I/O pulled out of the list, or NULL if there are no I/O in the list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700407 * Note that the caller is responsible for releasing the I/O object returned
408 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700409 */
410static AsyncSocketIO*
411_async_socket_pull_first_io(AsyncSocket* as,
412 AsyncSocketIO** list_head,
413 AsyncSocketIO** list_tail)
414{
415 AsyncSocketIO* const ret = *list_head;
416 if (ret != NULL) {
417 *list_head = ret->next;
418 ret->next = NULL;
419 if (*list_head == NULL) {
420 *list_tail = NULL;
421 }
422 }
423 return ret;
424}
425
426/* Pulls first reader out of the list.
427 * Param:
428 * as - Initialized AsyncSocket instance.
429 * Return:
430 * First reader pulled out of the list, or NULL if there are no readers in the
431 * list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700432 * Note that the caller is responsible for releasing the I/O object returned
433 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700434 */
435static AsyncSocketIO*
436_async_socket_pull_first_reader(AsyncSocket* as)
437{
438 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
439}
440
441/* Pulls first writer out of the list.
442 * Param:
443 * as - Initialized AsyncSocket instance.
444 * Return:
445 * First writer pulled out of the list, or NULL if there are no writers in the
446 * list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700447 * Note that the caller is responsible for releasing the I/O object returned
448 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700449 */
450static AsyncSocketIO*
451_async_socket_pull_first_writer(AsyncSocket* as)
452{
453 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
454}
455
456/* Removes an I/O descriptor from a list of active I/O.
457 * Param:
458 * as - Initialized AsyncSocket instance.
459 * list_head, list_tail - Pointers to the list head and tail.
460 * io - I/O to remove.
461 * Return:
462 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
463 */
464static int
465_async_socket_remove_io(AsyncSocket* as,
466 AsyncSocketIO** list_head,
467 AsyncSocketIO** list_tail,
468 AsyncSocketIO* io)
469{
470 AsyncSocketIO* prev = NULL;
471
472 while (*list_head != NULL && io != *list_head) {
473 prev = *list_head;
474 list_head = &((*list_head)->next);
475 }
476 if (*list_head == NULL) {
477 D("%s: I/O %p is not found in the list for socket '%s'",
478 __FUNCTION__, io, _async_socket_string(as));
479 return 0;
480 }
481
482 *list_head = io->next;
483 if (prev != NULL) {
484 prev->next = io->next;
485 }
486 if (*list_tail == io) {
487 *list_tail = prev;
488 }
489
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700490 /* Release I/O adjusting reference added when I/O has been saved in the list. */
491 async_socket_io_release(io);
492
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700493 return 1;
494}
495
496/* Advances to the next I/O in the list.
497 * Param:
498 * as - Initialized AsyncSocket instance.
499 * list_head, list_tail - Pointers to the list head and tail.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700500 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700501static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700502_async_socket_advance_io(AsyncSocket* as,
503 AsyncSocketIO** list_head,
504 AsyncSocketIO** list_tail)
505{
506 AsyncSocketIO* first_io = *list_head;
507 if (first_io != NULL) {
508 *list_head = first_io->next;
509 first_io->next = NULL;
510 }
511 if (*list_head == NULL) {
512 *list_tail = NULL;
513 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700514 if (first_io != NULL) {
515 /* Release I/O removed from the head of the list. */
516 async_socket_io_release(first_io);
517 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700518}
519
520/* Advances to the next reader in the list.
521 * Param:
522 * as - Initialized AsyncSocket instance.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700523 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700524static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700525_async_socket_advance_reader(AsyncSocket* as)
526{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700527 _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700528}
529
530/* Advances to the next writer in the list.
531 * Param:
532 * as - Initialized AsyncSocket instance.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700533 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700534static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700535_async_socket_advance_writer(AsyncSocket* as)
536{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700537 _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700538}
539
540/* Completes an I/O.
541 * Param:
542 * as - Initialized AsyncSocket instance.
543 * asio - I/O to complete.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700544 * Return:
545 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700546 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700547static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700548_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
549{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700550 T("ASocket %s: %s I/O %p is completed.",
551 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
552
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700553 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700554 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700555
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700556 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700557}
558
559/* Timeouts an I/O.
560 * Param:
561 * as - Initialized AsyncSocket instance.
562 * asio - An I/O that has timed out.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700563 * Return:
564 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700565 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700566static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700567_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
568{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700569 T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
570 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
571 asio->deadline, async_socket_deadline(as, 0));
572
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700573 /* Report to the client. */
574 const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
575 ASIO_STATE_TIMED_OUT);
576
577 /* Remove the I/O from a list of active I/O for actions other than retry. */
578 if (action != ASIO_ACTION_RETRY) {
579 if (asio->is_io_read) {
580 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
581 } else {
582 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
583 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700584 }
585
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700586 return action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700587}
588
589/* Cancels an I/O.
590 * Param:
591 * as - Initialized AsyncSocket instance.
592 * asio - An I/O to cancel.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700593 * Return:
594 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700595 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700596static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700597_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
598{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700599 T("ASocket %s: %s I/O %p is cancelled.",
600 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
601
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700602 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700603 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700604
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700605 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700606}
607
608/* Reports an I/O failure.
609 * Param:
610 * as - Initialized AsyncSocket instance.
611 * asio - An I/O that has failed. Can be NULL for general failures.
612 * failure - Failure (errno) that has occurred.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700613 * Return:
614 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700615 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700616static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700617_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
618{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700619 T("ASocket %s: %s I/O %p has failed: %d -> %s",
620 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
621 failure, strerror(failure));
622
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700623 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700624 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700625
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700626 errno = failure;
627 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700628}
629
630/* Cancels all the active socket readers.
631 * Param:
632 * as - Initialized AsyncSocket instance.
633 */
634static void
635_async_socket_cancel_readers(AsyncSocket* as)
636{
637 while (as->readers_head != NULL) {
638 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700639 /* We ignore action returned from the cancellation callback, since we're
640 * in a disconnected state here. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700641 _async_socket_cancel_io(as, to_cancel);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700642 async_socket_io_release(to_cancel);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700643 }
644}
645
646/* Cancels all the active socket writers.
647 * Param:
648 * as - Initialized AsyncSocket instance.
649 */
650static void
651_async_socket_cancel_writers(AsyncSocket* as)
652{
653 while (as->writers_head != NULL) {
654 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700655 /* We ignore action returned from the cancellation callback, since we're
656 * in a disconnected state here. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700657 _async_socket_cancel_io(as, to_cancel);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700658 async_socket_io_release(to_cancel);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700659 }
660}
661
662/* Cancels all the I/O on the socket. */
663static void
664_async_socket_cancel_all_io(AsyncSocket* as)
665{
666 /* Stop the reconnection timer. */
667 loopTimer_stop(as->reconnect_timer);
668
669 /* Stop read / write on the socket. */
670 loopIo_dontWantWrite(as->io);
671 loopIo_dontWantRead(as->io);
672
673 /* Cancel active readers and writers. */
674 _async_socket_cancel_readers(as);
675 _async_socket_cancel_writers(as);
676}
677
678/* Closes socket handle used by the async socket.
679 * Param:
680 * as - Initialized AsyncSocket instance.
681 */
682static void
683_async_socket_close_socket(AsyncSocket* as)
684{
685 if (as->fd >= 0) {
686 loopIo_done(as->io);
687 socket_close(as->fd);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700688 T("ASocket %s: Socket handle %d is closed.",
689 _async_socket_string(as), as->fd);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700690 as->fd = -1;
691 }
692}
693
694/* Destroys AsyncSocket instance.
695 * Param:
696 * as - Initialized AsyncSocket instance.
697 */
698static void
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700699_async_socket_free(AsyncSocket* as)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700700{
701 if (as != NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700702 T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700703
704 /* Close socket. */
705 _async_socket_close_socket(as);
706
707 /* Free allocated resources. */
708 if (as->looper != NULL) {
709 loopTimer_done(as->reconnect_timer);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700710 if (as->owns_looper) {
711 looper_free(as->looper);
712 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700713 }
714 sock_address_done(&as->address);
715 AFREE(as);
716 }
717}
718
719/* Starts reconnection attempts after connection has been lost.
720 * Param:
721 * as - Initialized AsyncSocket instance.
722 * to - Milliseconds to wait before reconnection attempt.
723 */
724static void
725_async_socket_reconnect(AsyncSocket* as, int to)
726{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700727 T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
728
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700729 /* Make sure that no I/O is active, and socket is closed before we
730 * reconnect. */
731 _async_socket_cancel_all_io(as);
732
733 /* Set the timer for reconnection attempt. */
734 loopTimer_startRelative(as->reconnect_timer, to);
735}
736
737/********************************************************************************
738 * Asynchronous Socket callbacks
739 *******************************************************************************/
740
741/* A callback that is invoked when socket gets disconnected.
742 * Param:
743 * as - Initialized AsyncSocket instance.
744 */
745static void
746_on_async_socket_disconnected(AsyncSocket* as)
747{
748 /* Save error to restore it for the client's callback. */
749 const int save_errno = errno;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700750 AsyncIOAction action = ASIO_ACTION_ABORT;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700751
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700752 D("ASocket %s: Disconnected.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700753
754 /* Cancel all the I/O on this socket. */
755 _async_socket_cancel_all_io(as);
756
757 /* Close the socket. */
758 _async_socket_close_socket(as);
759
760 /* Restore errno, and invoke client's callback. */
761 errno = save_errno;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700762 action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700763
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700764 if (action == ASIO_ACTION_RETRY) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700765 /* Client requested reconnection. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700766 _async_socket_reconnect(as, as->reconnect_to);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700767 }
768}
769
770/* A callback that is invoked on socket's I/O failure.
771 * Param:
772 * as - Initialized AsyncSocket instance.
773 * asio - Descriptor for the failed I/O. Can be NULL for general failures.
774 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700775static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700776_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
777{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700778 D("ASocket %s: %s I/O failure: %d -> %s",
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700779 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
780 errno, strerror(errno));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700781
782 /* Report the failure. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700783 return _async_socket_io_failure(as, asio, errno);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700784}
785
786/* A callback that is invoked when there is data available to read.
787 * Param:
788 * as - Initialized AsyncSocket instance.
789 * Return:
790 * 0 on success, or -1 on failure. Failure returned from this routine will
791 * skip writes (if awailable) behind this read.
792 */
793static int
794_on_async_socket_recv(AsyncSocket* as)
795{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700796 AsyncIOAction action;
797
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700798 /* Get current reader. */
799 AsyncSocketIO* const asr = as->readers_head;
800 if (asr == NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700801 D("ASocket %s: No reader is available.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700802 loopIo_dontWantRead(as->io);
803 return 0;
804 }
805
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700806 /* Reference the reader while we're working with it in this callback. */
807 async_socket_io_reference(asr);
808
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700809 /* Bump I/O state, and inform the client that I/O is in progress. */
810 if (asr->state == ASIO_STATE_QUEUED) {
811 asr->state = ASIO_STATE_STARTED;
812 } else {
813 asr->state = ASIO_STATE_CONTINUES;
814 }
815 action = asr->on_io(asr->io_opaque, asr, asr->state);
816 if (action == ASIO_ACTION_ABORT) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700817 D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700818 /* Move on to the next reader. */
819 _async_socket_advance_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700820 /* Lets see if there are still active readers, and enable, or disable
821 * read I/O callback accordingly. */
822 if (as->readers_head != NULL) {
823 loopIo_wantRead(as->io);
824 } else {
825 loopIo_dontWantRead(as->io);
826 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700827 async_socket_io_release(asr);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700828 return 0;
829 }
830
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700831 /* Read next chunk of data. */
832 int res = socket_recv(as->fd, asr->buffer + asr->transferred,
833 asr->to_transfer - asr->transferred);
834 while (res < 0 && errno == EINTR) {
835 res = socket_recv(as->fd, asr->buffer + asr->transferred,
836 asr->to_transfer - asr->transferred);
837 }
838
839 if (res == 0) {
840 /* Socket has been disconnected. */
841 errno = ECONNRESET;
842 _on_async_socket_disconnected(as);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700843 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700844 return -1;
845 }
846
847 if (res < 0) {
848 if (errno == EWOULDBLOCK || errno == EAGAIN) {
849 /* Yield to writes behind this read. */
850 loopIo_wantRead(as->io);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700851 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700852 return 0;
853 }
854
855 /* An I/O error. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700856 action = _on_async_socket_failure(as, asr);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700857 if (action != ASIO_ACTION_RETRY) {
858 D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700859 /* Move on to the next reader. */
860 _async_socket_advance_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700861 /* Lets see if there are still active readers, and enable, or disable
862 * read I/O callback accordingly. */
863 if (as->readers_head != NULL) {
864 loopIo_wantRead(as->io);
865 } else {
866 loopIo_dontWantRead(as->io);
867 }
868 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700869 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700870 return -1;
871 }
872
873 /* Update the reader's descriptor. */
874 asr->transferred += res;
875 if (asr->transferred == asr->to_transfer) {
876 /* This read is completed. Move on to the next reader. */
877 _async_socket_advance_reader(as);
878
879 /* Notify reader completion. */
880 _async_socket_complete_io(as, asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700881 }
882
883 /* Lets see if there are still active readers, and enable, or disable read
884 * I/O callback accordingly. */
885 if (as->readers_head != NULL) {
886 loopIo_wantRead(as->io);
887 } else {
888 loopIo_dontWantRead(as->io);
889 }
890
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700891 async_socket_io_release(asr);
892
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700893 return 0;
894}
895
896/* A callback that is invoked when there is data available to write.
897 * Param:
898 * as - Initialized AsyncSocket instance.
899 * Return:
900 * 0 on success, or -1 on failure. Failure returned from this routine will
901 * skip reads (if awailable) behind this write.
902 */
903static int
904_on_async_socket_send(AsyncSocket* as)
905{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700906 AsyncIOAction action;
907
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700908 /* Get current writer. */
909 AsyncSocketIO* const asw = as->writers_head;
910 if (asw == NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700911 D("ASocket %s: No writer is available.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700912 loopIo_dontWantWrite(as->io);
913 return 0;
914 }
915
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700916 /* Reference the writer while we're working with it in this callback. */
917 async_socket_io_reference(asw);
918
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700919 /* Bump I/O state, and inform the client that I/O is in progress. */
920 if (asw->state == ASIO_STATE_QUEUED) {
921 asw->state = ASIO_STATE_STARTED;
922 } else {
923 asw->state = ASIO_STATE_CONTINUES;
924 }
925 action = asw->on_io(asw->io_opaque, asw, asw->state);
926 if (action == ASIO_ACTION_ABORT) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700927 D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700928 /* Move on to the next writer. */
929 _async_socket_advance_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700930 /* Lets see if there are still active writers, and enable, or disable
931 * write I/O callback accordingly. */
932 if (as->writers_head != NULL) {
933 loopIo_wantWrite(as->io);
934 } else {
935 loopIo_dontWantWrite(as->io);
936 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700937 async_socket_io_release(asw);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700938 return 0;
939 }
940
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700941 /* Write next chunk of data. */
942 int res = socket_send(as->fd, asw->buffer + asw->transferred,
943 asw->to_transfer - asw->transferred);
944 while (res < 0 && errno == EINTR) {
945 res = socket_send(as->fd, asw->buffer + asw->transferred,
946 asw->to_transfer - asw->transferred);
947 }
948
949 if (res == 0) {
950 /* Socket has been disconnected. */
951 errno = ECONNRESET;
952 _on_async_socket_disconnected(as);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700953 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700954 return -1;
955 }
956
957 if (res < 0) {
958 if (errno == EWOULDBLOCK || errno == EAGAIN) {
959 /* Yield to reads behind this write. */
960 loopIo_wantWrite(as->io);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700961 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700962 return 0;
963 }
964
965 /* An I/O error. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700966 action = _on_async_socket_failure(as, asw);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700967 if (action != ASIO_ACTION_RETRY) {
968 D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700969 /* Move on to the next writer. */
970 _async_socket_advance_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700971 /* Lets see if there are still active writers, and enable, or disable
972 * write I/O callback accordingly. */
973 if (as->writers_head != NULL) {
974 loopIo_wantWrite(as->io);
975 } else {
976 loopIo_dontWantWrite(as->io);
977 }
978 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700979 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700980 return -1;
981 }
982
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700983 /* Update the writer descriptor. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700984 asw->transferred += res;
985 if (asw->transferred == asw->to_transfer) {
986 /* This write is completed. Move on to the next writer. */
987 _async_socket_advance_writer(as);
988
989 /* Notify writer completion. */
990 _async_socket_complete_io(as, asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700991 }
992
993 /* Lets see if there are still active writers, and enable, or disable write
994 * I/O callback accordingly. */
995 if (as->writers_head != NULL) {
996 loopIo_wantWrite(as->io);
997 } else {
998 loopIo_dontWantWrite(as->io);
999 }
1000
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001001 async_socket_io_release(asw);
1002
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001003 return 0;
1004}
1005
1006/* A callback that is invoked when an I/O is available on socket.
1007 * Param:
1008 * as - Initialized AsyncSocket instance.
1009 * fd - Socket's file descriptor.
1010 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1011 */
1012static void
1013_on_async_socket_io(void* opaque, int fd, unsigned events)
1014{
1015 AsyncSocket* const as = (AsyncSocket*)opaque;
1016
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001017 /* Reference the socket while we're working with it in this callback. */
1018 async_socket_reference(as);
1019
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001020 if ((events & LOOP_IO_READ) != 0) {
1021 if (_on_async_socket_recv(as) != 0) {
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001022 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001023 return;
1024 }
1025 }
1026
1027 if ((events & LOOP_IO_WRITE) != 0) {
1028 if (_on_async_socket_send(as) != 0) {
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001029 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001030 return;
1031 }
1032 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001033
1034 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001035}
1036
1037/* A callback that is invoked by asynchronous socket connector on connection
1038 * events.
1039 * Param:
1040 * opaque - Initialized AsyncSocket instance.
1041 * connector - Connector that is used to connect this socket.
1042 * event - Connection event.
1043 * Return:
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001044 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001045 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001046static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001047_on_connector_events(void* opaque,
1048 AsyncSocketConnector* connector,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001049 AsyncIOState event)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001050{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001051 AsyncIOAction action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001052 AsyncSocket* const as = (AsyncSocket*)opaque;
1053
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001054 /* Reference the socket while we're working with it in this callback. */
1055 async_socket_reference(as);
1056
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001057 if (event == ASIO_STATE_SUCCEEDED) {
1058 /* Accept the connection. */
1059 as->fd = async_socket_connector_pull_fd(connector);
1060 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001061 }
1062
1063 /* Invoke client's callback. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001064 action = as->on_connection(as->client_opaque, as, event);
1065 if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001066 /* For whatever reason the client didn't want to keep this connection.
1067 * Close it. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001068 D("ASocket %s: Connection is discarded by the client.",
1069 _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001070 _async_socket_close_socket(as);
1071 }
1072
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001073 if (action != ASIO_ACTION_RETRY) {
1074 async_socket_connector_release(connector);
1075 }
1076
1077 async_socket_release(as);
1078
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001079 return action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001080}
1081
1082/* Timer callback invoked to reconnect the lost connection.
1083 * Param:
1084 * as - Initialized AsyncSocket instance.
1085 */
1086void
1087_on_async_socket_reconnect(void* opaque)
1088{
1089 AsyncSocket* as = (AsyncSocket*)opaque;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001090
1091 /* Reference the socket while we're working with it in this callback. */
1092 async_socket_reference(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001093 async_socket_connect(as, as->reconnect_to);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001094 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001095}
1096
1097
1098/********************************************************************************
1099 * Android Device Socket public API
1100 *******************************************************************************/
1101
1102AsyncSocket*
1103async_socket_new(int port,
1104 int reconnect_to,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001105 on_as_connection_cb client_cb,
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001106 void* client_opaque,
1107 Looper* looper)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001108{
1109 AsyncSocket* as;
1110
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001111 if (client_cb == NULL) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001112 E("Invalid client_cb parameter");
1113 return NULL;
1114 }
1115
1116 ANEW0(as);
1117
1118 as->fd = -1;
1119 as->client_opaque = client_opaque;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001120 as->on_connection = client_cb;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001121 as->readers_head = as->readers_tail = NULL;
1122 as->reconnect_to = reconnect_to;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001123 as->ref_count = 1;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001124 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001125 if (looper == NULL) {
1126 as->looper = looper_newCore();
1127 if (as->looper == NULL) {
1128 E("Unable to create I/O looper for async socket '%s'",
1129 _async_socket_string(as));
1130 client_cb(client_opaque, as, ASIO_STATE_FAILED);
1131 _async_socket_free(as);
1132 return NULL;
1133 }
1134 as->owns_looper = 1;
1135 } else {
1136 as->looper = looper;
1137 as->owns_looper = 0;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001138 }
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001139
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001140 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1141
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001142 T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1143
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001144 return as;
1145}
1146
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001147int
1148async_socket_reference(AsyncSocket* as)
1149{
1150 assert(as->ref_count > 0);
1151 as->ref_count++;
1152 return as->ref_count;
1153}
1154
1155int
1156async_socket_release(AsyncSocket* as)
1157{
1158 assert(as->ref_count > 0);
1159 as->ref_count--;
1160 if (as->ref_count == 0) {
1161 /* Last reference has been dropped. Destroy this object. */
1162 _async_socket_cancel_all_io(as);
1163 _async_socket_free(as);
1164 return 0;
1165 }
1166 return as->ref_count;
1167}
1168
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001169void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001170async_socket_connect(AsyncSocket* as, int retry_to)
1171{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001172 T("ASocket %s: Handling connection request for %dms...",
1173 _async_socket_string(as), retry_to);
1174
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001175 AsyncSocketConnector* const connector =
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001176 async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1177 as, as->looper);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001178 if (connector != NULL) {
1179 async_socket_connector_connect(connector);
1180 } else {
1181 as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001182 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001183}
1184
1185void
1186async_socket_disconnect(AsyncSocket* as)
1187{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001188 T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1189
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001190 if (as != NULL) {
1191 _async_socket_cancel_all_io(as);
1192 _async_socket_close_socket(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001193 }
1194}
1195
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001196void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001197async_socket_reconnect(AsyncSocket* as, int retry_to)
1198{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001199 T("ASocket %s: Handling reconnection request for %dms...",
1200 _async_socket_string(as), retry_to);
1201
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001202 _async_socket_cancel_all_io(as);
1203 _async_socket_close_socket(as);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001204 _async_socket_reconnect(as, retry_to);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001205}
1206
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001207void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001208async_socket_read_abs(AsyncSocket* as,
1209 void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001210 on_as_io_cb reader_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001211 void* reader_opaque,
1212 Duration deadline)
1213{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001214 T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1215 _async_socket_string(as), len, deadline);
1216
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001217 AsyncSocketIO* const asr =
1218 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1219 deadline);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001220 /* Add new reader to the list. Note that we use initial reference from I/O
1221 * 'new' routine as "in the list" reference counter. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001222 if (as->readers_head == NULL) {
1223 as->readers_head = as->readers_tail = asr;
1224 } else {
1225 as->readers_tail->next = asr;
1226 as->readers_tail = asr;
1227 }
1228 loopIo_wantRead(as->io);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001229}
1230
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001231void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001232async_socket_read_rel(AsyncSocket* as,
1233 void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001234 on_as_io_cb reader_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001235 void* reader_opaque,
1236 int to)
1237{
1238 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1239 DURATION_INFINITE;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001240 async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001241}
1242
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001243void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001244async_socket_write_abs(AsyncSocket* as,
1245 const void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001246 on_as_io_cb writer_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001247 void* writer_opaque,
1248 Duration deadline)
1249{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001250 T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1251 _async_socket_string(as), len, deadline);
1252
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001253 AsyncSocketIO* const asw =
1254 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1255 deadline);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001256 /* Add new writer to the list. Note that we use initial reference from I/O
1257 * 'new' routine as "in the list" reference counter. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001258 if (as->writers_head == NULL) {
1259 as->writers_head = as->writers_tail = asw;
1260 } else {
1261 as->writers_tail->next = asw;
1262 as->writers_tail = asw;
1263 }
1264 loopIo_wantWrite(as->io);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001265}
1266
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001267void
1268async_socket_write_rel(AsyncSocket* as,
1269 const void* buffer, uint32_t len,
1270 on_as_io_cb writer_cb,
1271 void* writer_opaque,
1272 int to)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001273{
1274 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1275 DURATION_INFINITE;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001276 async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1277}
1278
1279void*
1280async_socket_get_client_opaque(const AsyncSocket* as)
1281{
1282 return as->client_opaque;
1283}
1284
1285Duration
1286async_socket_deadline(AsyncSocket* as, int rel)
1287{
1288 return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1289 DURATION_INFINITE;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001290}
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001291
1292int
1293async_socket_get_port(const AsyncSocket* as)
1294{
1295 return sock_address_get_port(&as->address);
1296}