blob: 92c2ef7576ea5b039838243567e4ee935ab0c743 [file] [log] [blame]
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070023#include "android/utils/debug.h"
24#include "android/async-socket-connector.h"
25#include "android/async-socket.h"
26#include "utils/panic.h"
David 'Digit' Turnerd413fa52013-12-14 23:35:20 +010027#include "android/iolooper.h"
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070028
29#define E(...) derror(__VA_ARGS__)
30#define W(...) dwarning(__VA_ARGS__)
31#define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
32#define D_ACTIVE VERBOSE_CHECK(asyncsocket)
33
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -070034#define TRACE_ON 0
35
36#if TRACE_ON
37#define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
38#else
39#define T(...)
40#endif
41
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070042/********************************************************************************
43 * Asynchronous Socket internal API declarations
44 *******************************************************************************/
45
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070046/* Gets socket's address string. */
47static const char* _async_socket_string(AsyncSocket* as);
48
49/* Gets socket's looper. */
50static Looper* _async_socket_get_looper(AsyncSocket* as);
51
52/* Handler for the I/O time out.
53 * Param:
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070054 * as - Asynchronous socket for the I/O.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070055 * asio - Desciptor for the timed out I/O.
56 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070057static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
58 AsyncSocketIO* asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070059
60/********************************************************************************
61 * Asynchronous Socket Reader / Writer
62 *******************************************************************************/
63
64struct AsyncSocketIO {
65 /* Next I/O in the reader, or writer list. */
66 AsyncSocketIO* next;
67 /* Asynchronous socket for this I/O. */
68 AsyncSocket* as;
69 /* Timer used for time outs on this I/O. */
70 LoopTimer timer[1];
71 /* An opaque pointer associated with this I/O. */
72 void* io_opaque;
73 /* Buffer where to read / write data. */
74 uint8_t* buffer;
75 /* Bytes to transfer through the socket for this I/O. */
76 uint32_t to_transfer;
77 /* Bytes thransferred through the socket in this I/O. */
78 uint32_t transferred;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070079 /* I/O callback for this I/O. */
80 on_as_io_cb on_io;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070081 /* I/O type selector: 1 - read, 0 - write. */
82 int is_io_read;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -070083 /* State of the I/O. */
84 AsyncIOState state;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -070085 /* Number of outstanding references to the I/O. */
86 int ref_count;
87 /* Deadline for this I/O */
88 Duration deadline;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -070089};
90
91/*
92 * Recycling I/O instances.
93 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
94 * them for faster allocation, rather than allocating and freeing them for each
95 * I/O on the socket.
96 */
97
98/* List of recycled I/O descriptors. */
99static AsyncSocketIO* _asio_recycled = NULL;
100/* Number of I/O descriptors that are recycled in the _asio_recycled list. */
101static int _recycled_asio_count = 0;
102/* Maximum number of I/O descriptors that can be recycled. */
103static const int _max_recycled_asio_num = 32;
104
105/* Handler for an I/O time-out timer event.
106 * When this routine is invoked, it indicates that a time out has occurred on an
107 * I/O.
108 * Param:
109 * opaque - AsyncSocketIO instance representing the timed out I/O.
110 */
111static void _on_async_socket_io_timed_out(void* opaque);
112
113/* Creates new I/O descriptor.
114 * Param:
115 * as - Asynchronous socket for the I/O.
116 * is_io_read - I/O type selector: 1 - read, 0 - write.
117 * buffer, len - Reader / writer buffer address.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700118 * io_cb - Callback for this reader / writer.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700119 * io_opaque - An opaque pointer associated with the I/O.
120 * deadline - Deadline to complete the I/O.
121 * Return:
122 * Initialized AsyncSocketIO instance.
123 */
124static AsyncSocketIO*
125_async_socket_rw_new(AsyncSocket* as,
126 int is_io_read,
127 void* buffer,
128 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700129 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700130 void* io_opaque,
131 Duration deadline)
132{
133 /* Lookup in the recycler first. */
134 AsyncSocketIO* asio = _asio_recycled;
135 if (asio != NULL) {
136 /* Pull the descriptor from recycler. */
137 _asio_recycled = asio->next;
138 _recycled_asio_count--;
139 } else {
140 /* No recycled descriptors. Allocate new one. */
141 ANEW0(asio);
142 }
143
144 asio->next = NULL;
145 asio->as = as;
146 asio->is_io_read = is_io_read;
147 asio->buffer = (uint8_t*)buffer;
148 asio->to_transfer = len;
149 asio->transferred = 0;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700150 asio->on_io = io_cb;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700151 asio->io_opaque = io_opaque;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700152 asio->state = ASIO_STATE_QUEUED;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700153 asio->ref_count = 1;
154 asio->deadline = deadline;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700155 loopTimer_init(asio->timer, _async_socket_get_looper(as),
156 _on_async_socket_io_timed_out, asio);
157 loopTimer_startAbsolute(asio->timer, deadline);
158
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700159 /* Reference socket that is holding this I/O. */
160 async_socket_reference(as);
161
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700162 T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
163 _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
164
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700165 return asio;
166}
167
168/* Destroys and frees I/O descriptor. */
169static void
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700170_async_socket_io_free(AsyncSocketIO* asio)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700171{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700172 AsyncSocket* const as = asio->as;
173
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700174 T("ASocket %s: %s I/O descriptor %p is destroyed.",
175 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
176
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700177 loopTimer_done(asio->timer);
178
179 /* Try to recycle it first, and free the memory if recycler is full. */
180 if (_recycled_asio_count < _max_recycled_asio_num) {
181 asio->next = _asio_recycled;
182 _asio_recycled = asio;
183 _recycled_asio_count++;
184 } else {
185 AFREE(asio);
186 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700187
188 /* Release socket that is holding this I/O. */
189 async_socket_release(as);
190}
191
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700192/* An I/O has been finished and its descriptor is about to be discarded. */
193static void
194_async_socket_io_finished(AsyncSocketIO* asio)
195{
196 /* Notify the client of the I/O that I/O is finished. */
197 asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
198}
199
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700200int
201async_socket_io_reference(AsyncSocketIO* asio)
202{
203 assert(asio->ref_count > 0);
204 asio->ref_count++;
205 return asio->ref_count;
206}
207
208int
209async_socket_io_release(AsyncSocketIO* asio)
210{
211 assert(asio->ref_count > 0);
212 asio->ref_count--;
213 if (asio->ref_count == 0) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700214 _async_socket_io_finished(asio);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700215 /* Last reference has been dropped. Destroy this object. */
216 _async_socket_io_free(asio);
217 return 0;
218 }
219 return asio->ref_count;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700220}
221
222/* Creates new asynchronous socket reader.
223 * Param:
224 * as - Asynchronous socket for the reader.
225 * buffer, len - Reader's buffer.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700226 * io_cb - Reader's callback.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700227 * reader_opaque - An opaque pointer associated with the reader.
228 * deadline - Deadline to complete the operation.
229 * Return:
230 * An initialized AsyncSocketIO intance.
231 */
232static AsyncSocketIO*
233_async_socket_reader_new(AsyncSocket* as,
234 void* buffer,
235 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700236 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700237 void* reader_opaque,
238 Duration deadline)
239{
240 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
241 reader_opaque, deadline);
242 return asio;
243}
244
245/* Creates new asynchronous socket writer.
246 * Param:
247 * as - Asynchronous socket for the writer.
248 * buffer, len - Writer's buffer.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700249 * io_cb - Writer's callback.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700250 * writer_opaque - An opaque pointer associated with the writer.
251 * deadline - Deadline to complete the operation.
252 * Return:
253 * An initialized AsyncSocketIO intance.
254 */
255static AsyncSocketIO*
256_async_socket_writer_new(AsyncSocket* as,
257 const void* buffer,
258 uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700259 on_as_io_cb io_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700260 void* writer_opaque,
261 Duration deadline)
262{
263 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
264 io_cb, writer_opaque,
265 deadline);
266 return asio;
267}
268
269/* I/O timed out. */
270static void
271_on_async_socket_io_timed_out(void* opaque)
272{
273 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700274 AsyncSocket* const as = asio->as;
275
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700276 D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700277 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
278 asio->deadline, async_socket_deadline(as, 0));
279
280 /* Reference while in callback. */
281 async_socket_io_reference(asio);
282 _async_socket_io_timed_out(asio->as, asio);
283 async_socket_io_release(asio);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700284}
285
286/********************************************************************************
287 * Public Asynchronous Socket I/O API
288 *******************************************************************************/
289
290AsyncSocket*
291async_socket_io_get_socket(const AsyncSocketIO* asio)
292{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700293 async_socket_reference(asio->as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700294 return asio->as;
295}
296
297void
298async_socket_io_cancel_time_out(AsyncSocketIO* asio)
299{
300 loopTimer_stop(asio->timer);
301}
302
303void*
304async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
305{
306 return asio->io_opaque;
307}
308
309void*
310async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
311{
312 return async_socket_get_client_opaque(asio->as);
313}
314
315void*
316async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
317 uint32_t* transferred,
318 uint32_t* to_transfer)
319{
320 if (transferred != NULL) {
321 *transferred = asio->transferred;
322 }
323 if (to_transfer != NULL) {
324 *to_transfer = asio->to_transfer;
325 }
326 return asio->buffer;
327}
328
329void*
330async_socket_io_get_buffer(const AsyncSocketIO* asio)
331{
332 return asio->buffer;
333}
334
335uint32_t
336async_socket_io_get_transferred(const AsyncSocketIO* asio)
337{
338 return asio->transferred;
339}
340
341uint32_t
342async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
343{
344 return asio->to_transfer;
345}
346
347int
348async_socket_io_is_read(const AsyncSocketIO* asio)
349{
350 return asio->is_io_read;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700351}
352
353/********************************************************************************
354 * Asynchronous Socket internals
355 *******************************************************************************/
356
357struct AsyncSocket {
358 /* TCP address for the socket. */
359 SockAddress address;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700360 /* Connection callback for this socket. */
361 on_as_connection_cb on_connection;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700362 /* An opaque pointer associated with this socket by the client. */
363 void* client_opaque;
364 /* I/O looper for asynchronous I/O on the socket. */
365 Looper* looper;
366 /* I/O descriptor for asynchronous I/O on the socket. */
367 LoopIo io[1];
368 /* Timer to use for reconnection attempts. */
369 LoopTimer reconnect_timer[1];
370 /* Head of the list of the active readers. */
371 AsyncSocketIO* readers_head;
372 /* Tail of the list of the active readers. */
373 AsyncSocketIO* readers_tail;
374 /* Head of the list of the active writers. */
375 AsyncSocketIO* writers_head;
376 /* Tail of the list of the active writers. */
377 AsyncSocketIO* writers_tail;
378 /* Socket's file descriptor. */
379 int fd;
380 /* Timeout to use for reconnection attempts. */
381 int reconnect_to;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700382 /* Number of outstanding references to the socket. */
383 int ref_count;
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700384 /* Flags whether (1) or not (0) socket owns the looper. */
385 int owns_looper;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700386};
387
388static const char*
389_async_socket_string(AsyncSocket* as)
390{
391 return sock_address_to_string(&as->address);
392}
393
394static Looper*
395_async_socket_get_looper(AsyncSocket* as)
396{
397 return as->looper;
398}
399
400/* Pulls first reader out of the list.
401 * Param:
402 * as - Initialized AsyncSocket instance.
403 * Return:
404 * First I/O pulled out of the list, or NULL if there are no I/O in the list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700405 * Note that the caller is responsible for releasing the I/O object returned
406 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700407 */
408static AsyncSocketIO*
409_async_socket_pull_first_io(AsyncSocket* as,
410 AsyncSocketIO** list_head,
411 AsyncSocketIO** list_tail)
412{
413 AsyncSocketIO* const ret = *list_head;
414 if (ret != NULL) {
415 *list_head = ret->next;
416 ret->next = NULL;
417 if (*list_head == NULL) {
418 *list_tail = NULL;
419 }
420 }
421 return ret;
422}
423
424/* Pulls first reader out of the list.
425 * Param:
426 * as - Initialized AsyncSocket instance.
427 * Return:
428 * First reader pulled out of the list, or NULL if there are no readers in the
429 * list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700430 * Note that the caller is responsible for releasing the I/O object returned
431 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700432 */
433static AsyncSocketIO*
434_async_socket_pull_first_reader(AsyncSocket* as)
435{
436 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
437}
438
439/* Pulls first writer out of the list.
440 * Param:
441 * as - Initialized AsyncSocket instance.
442 * Return:
443 * First writer pulled out of the list, or NULL if there are no writers in the
444 * list.
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700445 * Note that the caller is responsible for releasing the I/O object returned
446 * from this routine.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700447 */
448static AsyncSocketIO*
449_async_socket_pull_first_writer(AsyncSocket* as)
450{
451 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
452}
453
454/* Removes an I/O descriptor from a list of active I/O.
455 * Param:
456 * as - Initialized AsyncSocket instance.
457 * list_head, list_tail - Pointers to the list head and tail.
458 * io - I/O to remove.
459 * Return:
460 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
461 */
462static int
463_async_socket_remove_io(AsyncSocket* as,
464 AsyncSocketIO** list_head,
465 AsyncSocketIO** list_tail,
466 AsyncSocketIO* io)
467{
468 AsyncSocketIO* prev = NULL;
469
470 while (*list_head != NULL && io != *list_head) {
471 prev = *list_head;
472 list_head = &((*list_head)->next);
473 }
474 if (*list_head == NULL) {
475 D("%s: I/O %p is not found in the list for socket '%s'",
476 __FUNCTION__, io, _async_socket_string(as));
477 return 0;
478 }
479
480 *list_head = io->next;
481 if (prev != NULL) {
482 prev->next = io->next;
483 }
484 if (*list_tail == io) {
485 *list_tail = prev;
486 }
487
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700488 /* Release I/O adjusting reference added when I/O has been saved in the list. */
489 async_socket_io_release(io);
490
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700491 return 1;
492}
493
494/* Advances to the next I/O in the list.
495 * Param:
496 * as - Initialized AsyncSocket instance.
497 * list_head, list_tail - Pointers to the list head and tail.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700498 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700499static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700500_async_socket_advance_io(AsyncSocket* as,
501 AsyncSocketIO** list_head,
502 AsyncSocketIO** list_tail)
503{
504 AsyncSocketIO* first_io = *list_head;
505 if (first_io != NULL) {
506 *list_head = first_io->next;
507 first_io->next = NULL;
508 }
509 if (*list_head == NULL) {
510 *list_tail = NULL;
511 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700512 if (first_io != NULL) {
513 /* Release I/O removed from the head of the list. */
514 async_socket_io_release(first_io);
515 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700516}
517
518/* Advances to the next reader in the list.
519 * Param:
520 * as - Initialized AsyncSocket instance.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700521 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700522static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700523_async_socket_advance_reader(AsyncSocket* as)
524{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700525 _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700526}
527
528/* Advances to the next writer in the list.
529 * Param:
530 * as - Initialized AsyncSocket instance.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700531 */
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700532static void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700533_async_socket_advance_writer(AsyncSocket* as)
534{
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700535 _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700536}
537
538/* Completes an I/O.
539 * Param:
540 * as - Initialized AsyncSocket instance.
541 * asio - I/O to complete.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700542 * Return:
543 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700544 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700545static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700546_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
547{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700548 T("ASocket %s: %s I/O %p is completed.",
549 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
550
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700551 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700552 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700553
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700554 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700555}
556
557/* Timeouts an I/O.
558 * Param:
559 * as - Initialized AsyncSocket instance.
560 * asio - An I/O that has timed out.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700561 * Return:
562 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700563 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700564static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700565_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
566{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700567 T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
568 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
569 asio->deadline, async_socket_deadline(as, 0));
570
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700571 /* Report to the client. */
572 const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
573 ASIO_STATE_TIMED_OUT);
574
575 /* Remove the I/O from a list of active I/O for actions other than retry. */
576 if (action != ASIO_ACTION_RETRY) {
577 if (asio->is_io_read) {
578 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
579 } else {
580 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
581 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700582 }
583
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700584 return action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700585}
586
587/* Cancels an I/O.
588 * Param:
589 * as - Initialized AsyncSocket instance.
590 * asio - An I/O to cancel.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700591 * Return:
592 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700593 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700594static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700595_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
596{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700597 T("ASocket %s: %s I/O %p is cancelled.",
598 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
599
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700600 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700601 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700602
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700603 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700604}
605
606/* Reports an I/O failure.
607 * Param:
608 * as - Initialized AsyncSocket instance.
609 * asio - An I/O that has failed. Can be NULL for general failures.
610 * failure - Failure (errno) that has occurred.
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700611 * Return:
612 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700613 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700614static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700615_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
616{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700617 T("ASocket %s: %s I/O %p has failed: %d -> %s",
618 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
619 failure, strerror(failure));
620
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700621 /* Stop the timer. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700622 async_socket_io_cancel_time_out(asio);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700623
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700624 errno = failure;
625 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700626}
627
628/* Cancels all the active socket readers.
629 * Param:
630 * as - Initialized AsyncSocket instance.
631 */
632static void
633_async_socket_cancel_readers(AsyncSocket* as)
634{
635 while (as->readers_head != NULL) {
636 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700637 /* We ignore action returned from the cancellation callback, since we're
638 * in a disconnected state here. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700639 _async_socket_cancel_io(as, to_cancel);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700640 async_socket_io_release(to_cancel);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700641 }
642}
643
644/* Cancels all the active socket writers.
645 * Param:
646 * as - Initialized AsyncSocket instance.
647 */
648static void
649_async_socket_cancel_writers(AsyncSocket* as)
650{
651 while (as->writers_head != NULL) {
652 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700653 /* We ignore action returned from the cancellation callback, since we're
654 * in a disconnected state here. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700655 _async_socket_cancel_io(as, to_cancel);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700656 async_socket_io_release(to_cancel);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700657 }
658}
659
660/* Cancels all the I/O on the socket. */
661static void
662_async_socket_cancel_all_io(AsyncSocket* as)
663{
664 /* Stop the reconnection timer. */
665 loopTimer_stop(as->reconnect_timer);
666
667 /* Stop read / write on the socket. */
668 loopIo_dontWantWrite(as->io);
669 loopIo_dontWantRead(as->io);
670
671 /* Cancel active readers and writers. */
672 _async_socket_cancel_readers(as);
673 _async_socket_cancel_writers(as);
674}
675
676/* Closes socket handle used by the async socket.
677 * Param:
678 * as - Initialized AsyncSocket instance.
679 */
680static void
681_async_socket_close_socket(AsyncSocket* as)
682{
683 if (as->fd >= 0) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700684 T("ASocket %s: Socket handle %d is closed.",
685 _async_socket_string(as), as->fd);
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -0700686 loopIo_done(as->io);
687 socket_close(as->fd);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700688 as->fd = -1;
689 }
690}
691
692/* Destroys AsyncSocket instance.
693 * Param:
694 * as - Initialized AsyncSocket instance.
695 */
696static void
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700697_async_socket_free(AsyncSocket* as)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700698{
699 if (as != NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700700 T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700701
702 /* Close socket. */
703 _async_socket_close_socket(as);
704
705 /* Free allocated resources. */
706 if (as->looper != NULL) {
707 loopTimer_done(as->reconnect_timer);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700708 if (as->owns_looper) {
709 looper_free(as->looper);
710 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700711 }
712 sock_address_done(&as->address);
713 AFREE(as);
714 }
715}
716
717/* Starts reconnection attempts after connection has been lost.
718 * Param:
719 * as - Initialized AsyncSocket instance.
720 * to - Milliseconds to wait before reconnection attempt.
721 */
722static void
723_async_socket_reconnect(AsyncSocket* as, int to)
724{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700725 T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
726
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700727 /* Make sure that no I/O is active, and socket is closed before we
728 * reconnect. */
729 _async_socket_cancel_all_io(as);
730
731 /* Set the timer for reconnection attempt. */
732 loopTimer_startRelative(as->reconnect_timer, to);
733}
734
735/********************************************************************************
736 * Asynchronous Socket callbacks
737 *******************************************************************************/
738
739/* A callback that is invoked when socket gets disconnected.
740 * Param:
741 * as - Initialized AsyncSocket instance.
742 */
743static void
744_on_async_socket_disconnected(AsyncSocket* as)
745{
746 /* Save error to restore it for the client's callback. */
747 const int save_errno = errno;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700748 AsyncIOAction action = ASIO_ACTION_ABORT;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700749
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700750 D("ASocket %s: Disconnected.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700751
752 /* Cancel all the I/O on this socket. */
753 _async_socket_cancel_all_io(as);
754
755 /* Close the socket. */
756 _async_socket_close_socket(as);
757
758 /* Restore errno, and invoke client's callback. */
759 errno = save_errno;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700760 action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700761
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700762 if (action == ASIO_ACTION_RETRY) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700763 /* Client requested reconnection. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700764 _async_socket_reconnect(as, as->reconnect_to);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700765 }
766}
767
768/* A callback that is invoked on socket's I/O failure.
769 * Param:
770 * as - Initialized AsyncSocket instance.
771 * asio - Descriptor for the failed I/O. Can be NULL for general failures.
772 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700773static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700774_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
775{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700776 D("ASocket %s: %s I/O failure: %d -> %s",
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700777 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
778 errno, strerror(errno));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700779
780 /* Report the failure. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700781 return _async_socket_io_failure(as, asio, errno);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700782}
783
784/* A callback that is invoked when there is data available to read.
785 * Param:
786 * as - Initialized AsyncSocket instance.
787 * Return:
788 * 0 on success, or -1 on failure. Failure returned from this routine will
789 * skip writes (if awailable) behind this read.
790 */
791static int
792_on_async_socket_recv(AsyncSocket* as)
793{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700794 AsyncIOAction action;
795
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700796 /* Get current reader. */
797 AsyncSocketIO* const asr = as->readers_head;
798 if (asr == NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700799 D("ASocket %s: No reader is available.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700800 loopIo_dontWantRead(as->io);
801 return 0;
802 }
803
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700804 /* Reference the reader while we're working with it in this callback. */
805 async_socket_io_reference(asr);
806
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700807 /* Bump I/O state, and inform the client that I/O is in progress. */
808 if (asr->state == ASIO_STATE_QUEUED) {
809 asr->state = ASIO_STATE_STARTED;
810 } else {
811 asr->state = ASIO_STATE_CONTINUES;
812 }
813 action = asr->on_io(asr->io_opaque, asr, asr->state);
814 if (action == ASIO_ACTION_ABORT) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700815 D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700816 /* Move on to the next reader. */
817 _async_socket_advance_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700818 /* Lets see if there are still active readers, and enable, or disable
819 * read I/O callback accordingly. */
820 if (as->readers_head != NULL) {
821 loopIo_wantRead(as->io);
822 } else {
823 loopIo_dontWantRead(as->io);
824 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700825 async_socket_io_release(asr);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700826 return 0;
827 }
828
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700829 /* Read next chunk of data. */
830 int res = socket_recv(as->fd, asr->buffer + asr->transferred,
831 asr->to_transfer - asr->transferred);
832 while (res < 0 && errno == EINTR) {
833 res = socket_recv(as->fd, asr->buffer + asr->transferred,
834 asr->to_transfer - asr->transferred);
835 }
836
837 if (res == 0) {
838 /* Socket has been disconnected. */
839 errno = ECONNRESET;
840 _on_async_socket_disconnected(as);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700841 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700842 return -1;
843 }
844
845 if (res < 0) {
846 if (errno == EWOULDBLOCK || errno == EAGAIN) {
847 /* Yield to writes behind this read. */
848 loopIo_wantRead(as->io);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700849 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700850 return 0;
851 }
852
853 /* An I/O error. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700854 action = _on_async_socket_failure(as, asr);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700855 if (action != ASIO_ACTION_RETRY) {
856 D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700857 /* Move on to the next reader. */
858 _async_socket_advance_reader(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700859 /* Lets see if there are still active readers, and enable, or disable
860 * read I/O callback accordingly. */
861 if (as->readers_head != NULL) {
862 loopIo_wantRead(as->io);
863 } else {
864 loopIo_dontWantRead(as->io);
865 }
866 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700867 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700868 return -1;
869 }
870
871 /* Update the reader's descriptor. */
872 asr->transferred += res;
873 if (asr->transferred == asr->to_transfer) {
874 /* This read is completed. Move on to the next reader. */
875 _async_socket_advance_reader(as);
876
877 /* Notify reader completion. */
878 _async_socket_complete_io(as, asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700879 }
880
881 /* Lets see if there are still active readers, and enable, or disable read
882 * I/O callback accordingly. */
883 if (as->readers_head != NULL) {
884 loopIo_wantRead(as->io);
885 } else {
886 loopIo_dontWantRead(as->io);
887 }
888
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700889 async_socket_io_release(asr);
890
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700891 return 0;
892}
893
894/* A callback that is invoked when there is data available to write.
895 * Param:
896 * as - Initialized AsyncSocket instance.
897 * Return:
898 * 0 on success, or -1 on failure. Failure returned from this routine will
899 * skip reads (if awailable) behind this write.
900 */
901static int
902_on_async_socket_send(AsyncSocket* as)
903{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700904 AsyncIOAction action;
905
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700906 /* Get current writer. */
907 AsyncSocketIO* const asw = as->writers_head;
908 if (asw == NULL) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700909 D("ASocket %s: No writer is available.", _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700910 loopIo_dontWantWrite(as->io);
911 return 0;
912 }
913
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700914 /* Reference the writer while we're working with it in this callback. */
915 async_socket_io_reference(asw);
916
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700917 /* Bump I/O state, and inform the client that I/O is in progress. */
918 if (asw->state == ASIO_STATE_QUEUED) {
919 asw->state = ASIO_STATE_STARTED;
920 } else {
921 asw->state = ASIO_STATE_CONTINUES;
922 }
923 action = asw->on_io(asw->io_opaque, asw, asw->state);
924 if (action == ASIO_ACTION_ABORT) {
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700925 D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700926 /* Move on to the next writer. */
927 _async_socket_advance_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700928 /* Lets see if there are still active writers, and enable, or disable
929 * write I/O callback accordingly. */
930 if (as->writers_head != NULL) {
931 loopIo_wantWrite(as->io);
932 } else {
933 loopIo_dontWantWrite(as->io);
934 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700935 async_socket_io_release(asw);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700936 return 0;
937 }
938
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700939 /* Write next chunk of data. */
940 int res = socket_send(as->fd, asw->buffer + asw->transferred,
941 asw->to_transfer - asw->transferred);
942 while (res < 0 && errno == EINTR) {
943 res = socket_send(as->fd, asw->buffer + asw->transferred,
944 asw->to_transfer - asw->transferred);
945 }
946
947 if (res == 0) {
948 /* Socket has been disconnected. */
949 errno = ECONNRESET;
950 _on_async_socket_disconnected(as);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700951 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700952 return -1;
953 }
954
955 if (res < 0) {
956 if (errno == EWOULDBLOCK || errno == EAGAIN) {
957 /* Yield to reads behind this write. */
958 loopIo_wantWrite(as->io);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700959 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700960 return 0;
961 }
962
963 /* An I/O error. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700964 action = _on_async_socket_failure(as, asw);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -0700965 if (action != ASIO_ACTION_RETRY) {
966 D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700967 /* Move on to the next writer. */
968 _async_socket_advance_writer(as);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -0700969 /* Lets see if there are still active writers, and enable, or disable
970 * write I/O callback accordingly. */
971 if (as->writers_head != NULL) {
972 loopIo_wantWrite(as->io);
973 } else {
974 loopIo_dontWantWrite(as->io);
975 }
976 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700977 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700978 return -1;
979 }
980
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700981 /* Update the writer descriptor. */
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700982 asw->transferred += res;
983 if (asw->transferred == asw->to_transfer) {
984 /* This write is completed. Move on to the next writer. */
985 _async_socket_advance_writer(as);
986
987 /* Notify writer completion. */
988 _async_socket_complete_io(as, asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -0700989 }
990
991 /* Lets see if there are still active writers, and enable, or disable write
992 * I/O callback accordingly. */
993 if (as->writers_head != NULL) {
994 loopIo_wantWrite(as->io);
995 } else {
996 loopIo_dontWantWrite(as->io);
997 }
998
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -0700999 async_socket_io_release(asw);
1000
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001001 return 0;
1002}
1003
1004/* A callback that is invoked when an I/O is available on socket.
1005 * Param:
1006 * as - Initialized AsyncSocket instance.
1007 * fd - Socket's file descriptor.
1008 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1009 */
1010static void
1011_on_async_socket_io(void* opaque, int fd, unsigned events)
1012{
1013 AsyncSocket* const as = (AsyncSocket*)opaque;
1014
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001015 /* Reference the socket while we're working with it in this callback. */
1016 async_socket_reference(as);
1017
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001018 if ((events & LOOP_IO_READ) != 0) {
1019 if (_on_async_socket_recv(as) != 0) {
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001020 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001021 return;
1022 }
1023 }
1024
1025 if ((events & LOOP_IO_WRITE) != 0) {
1026 if (_on_async_socket_send(as) != 0) {
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001027 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001028 return;
1029 }
1030 }
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001031
1032 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001033}
1034
1035/* A callback that is invoked by asynchronous socket connector on connection
1036 * events.
1037 * Param:
1038 * opaque - Initialized AsyncSocket instance.
1039 * connector - Connector that is used to connect this socket.
1040 * event - Connection event.
1041 * Return:
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001042 * One of AsyncIOAction values.
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001043 */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001044static AsyncIOAction
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001045_on_connector_events(void* opaque,
1046 AsyncSocketConnector* connector,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001047 AsyncIOState event)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001048{
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001049 AsyncIOAction action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001050 AsyncSocket* const as = (AsyncSocket*)opaque;
1051
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001052 /* Reference the socket while we're working with it in this callback. */
1053 async_socket_reference(as);
1054
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001055 if (event == ASIO_STATE_SUCCEEDED) {
1056 /* Accept the connection. */
1057 as->fd = async_socket_connector_pull_fd(connector);
1058 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001059 }
1060
1061 /* Invoke client's callback. */
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001062 action = as->on_connection(as->client_opaque, as, event);
1063 if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001064 /* For whatever reason the client didn't want to keep this connection.
1065 * Close it. */
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001066 D("ASocket %s: Connection is discarded by the client.",
1067 _async_socket_string(as));
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001068 _async_socket_close_socket(as);
1069 }
1070
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001071 if (action != ASIO_ACTION_RETRY) {
1072 async_socket_connector_release(connector);
1073 }
1074
1075 async_socket_release(as);
1076
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001077 return action;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001078}
1079
1080/* Timer callback invoked to reconnect the lost connection.
1081 * Param:
1082 * as - Initialized AsyncSocket instance.
1083 */
1084void
1085_on_async_socket_reconnect(void* opaque)
1086{
1087 AsyncSocket* as = (AsyncSocket*)opaque;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001088
1089 /* Reference the socket while we're working with it in this callback. */
1090 async_socket_reference(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001091 async_socket_connect(as, as->reconnect_to);
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001092 async_socket_release(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001093}
1094
1095
1096/********************************************************************************
1097 * Android Device Socket public API
1098 *******************************************************************************/
1099
1100AsyncSocket*
1101async_socket_new(int port,
1102 int reconnect_to,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001103 on_as_connection_cb client_cb,
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001104 void* client_opaque,
1105 Looper* looper)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001106{
1107 AsyncSocket* as;
1108
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001109 if (client_cb == NULL) {
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001110 E("Invalid client_cb parameter");
1111 return NULL;
1112 }
1113
1114 ANEW0(as);
1115
1116 as->fd = -1;
1117 as->client_opaque = client_opaque;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001118 as->on_connection = client_cb;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001119 as->readers_head = as->readers_tail = NULL;
1120 as->reconnect_to = reconnect_to;
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001121 as->ref_count = 1;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001122 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001123 if (looper == NULL) {
1124 as->looper = looper_newCore();
1125 if (as->looper == NULL) {
1126 E("Unable to create I/O looper for async socket '%s'",
1127 _async_socket_string(as));
1128 client_cb(client_opaque, as, ASIO_STATE_FAILED);
1129 _async_socket_free(as);
1130 return NULL;
1131 }
1132 as->owns_looper = 1;
1133 } else {
1134 as->looper = looper;
1135 as->owns_looper = 0;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001136 }
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001137
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001138 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1139
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001140 T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1141
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001142 return as;
1143}
1144
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001145int
1146async_socket_reference(AsyncSocket* as)
1147{
1148 assert(as->ref_count > 0);
1149 as->ref_count++;
1150 return as->ref_count;
1151}
1152
1153int
1154async_socket_release(AsyncSocket* as)
1155{
1156 assert(as->ref_count > 0);
1157 as->ref_count--;
1158 if (as->ref_count == 0) {
1159 /* Last reference has been dropped. Destroy this object. */
1160 _async_socket_cancel_all_io(as);
1161 _async_socket_free(as);
1162 return 0;
1163 }
1164 return as->ref_count;
1165}
1166
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001167void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001168async_socket_connect(AsyncSocket* as, int retry_to)
1169{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001170 T("ASocket %s: Handling connection request for %dms...",
1171 _async_socket_string(as), retry_to);
1172
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001173 AsyncSocketConnector* const connector =
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001174 async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1175 as, as->looper);
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001176 if (connector != NULL) {
1177 async_socket_connector_connect(connector);
1178 } else {
1179 as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001180 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001181}
1182
1183void
1184async_socket_disconnect(AsyncSocket* as)
1185{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001186 T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1187
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001188 if (as != NULL) {
1189 _async_socket_cancel_all_io(as);
1190 _async_socket_close_socket(as);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001191 }
1192}
1193
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001194void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001195async_socket_reconnect(AsyncSocket* as, int retry_to)
1196{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001197 T("ASocket %s: Handling reconnection request for %dms...",
1198 _async_socket_string(as), retry_to);
1199
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001200 _async_socket_cancel_all_io(as);
1201 _async_socket_close_socket(as);
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001202 _async_socket_reconnect(as, retry_to);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001203}
1204
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001205void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001206async_socket_read_abs(AsyncSocket* as,
1207 void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001208 on_as_io_cb reader_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001209 void* reader_opaque,
1210 Duration deadline)
1211{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001212 T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1213 _async_socket_string(as), len, deadline);
1214
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001215 AsyncSocketIO* const asr =
1216 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1217 deadline);
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001218 if (async_socket_is_connected(as)) {
1219 /* Add new reader to the list. Note that we use initial reference from I/O
1220 * 'new' routine as "in the list" reference counter. */
1221 if (as->readers_head == NULL) {
1222 as->readers_head = as->readers_tail = asr;
1223 } else {
1224 as->readers_tail->next = asr;
1225 as->readers_tail = asr;
1226 }
1227 loopIo_wantRead(as->io);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001228 } else {
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001229 D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
1230 errno = ECONNRESET;
1231 reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
1232 async_socket_io_release(asr);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001233 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001234}
1235
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001236void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001237async_socket_read_rel(AsyncSocket* as,
1238 void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001239 on_as_io_cb reader_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001240 void* reader_opaque,
1241 int to)
1242{
1243 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1244 DURATION_INFINITE;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001245 async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001246}
1247
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001248void
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001249async_socket_write_abs(AsyncSocket* as,
1250 const void* buffer, uint32_t len,
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001251 on_as_io_cb writer_cb,
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001252 void* writer_opaque,
1253 Duration deadline)
1254{
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001255 T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1256 _async_socket_string(as), len, deadline);
1257
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001258 AsyncSocketIO* const asw =
1259 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1260 deadline);
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001261 if (async_socket_is_connected(as)) {
1262 /* Add new writer to the list. Note that we use initial reference from I/O
1263 * 'new' routine as "in the list" reference counter. */
1264 if (as->writers_head == NULL) {
1265 as->writers_head = as->writers_tail = asw;
1266 } else {
1267 as->writers_tail->next = asw;
1268 as->writers_tail = asw;
1269 }
1270 loopIo_wantWrite(as->io);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001271 } else {
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001272 D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
1273 errno = ECONNRESET;
1274 writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
1275 async_socket_io_release(asw);
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001276 }
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001277}
1278
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001279void
1280async_socket_write_rel(AsyncSocket* as,
1281 const void* buffer, uint32_t len,
1282 on_as_io_cb writer_cb,
1283 void* writer_opaque,
1284 int to)
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001285{
1286 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1287 DURATION_INFINITE;
Vladimir Chtchetkine6dc5c2c2012-04-02 07:48:19 -07001288 async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1289}
1290
1291void*
1292async_socket_get_client_opaque(const AsyncSocket* as)
1293{
1294 return as->client_opaque;
1295}
1296
1297Duration
1298async_socket_deadline(AsyncSocket* as, int rel)
1299{
1300 return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1301 DURATION_INFINITE;
Vladimir Chtchetkinea7383ef2012-03-29 07:34:07 -07001302}
Vladimir Chtchetkineef4ccd32012-04-03 10:27:12 -07001303
1304int
1305async_socket_get_port(const AsyncSocket* as)
1306{
1307 return sock_address_get_port(&as->address);
1308}
Vladimir Chtchetkine7136b052012-04-10 13:39:24 -07001309
1310int
1311async_socket_is_connected(const AsyncSocket* as)
1312{
1313 return as->fd >= 0;
1314}