blob: a3001692133a35e562383c8706cf3c85f2880d92 [file] [log] [blame]
Vladimir Chtchetkinec8aa2c52012-04-05 16:22:55 -07001/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
23#include "qemu-common.h"
24#include "android/async-utils.h"
25#include "android/utils/debug.h"
26#include "android/async-socket-connector.h"
27#include "android/async-socket.h"
28#include "android/sdk-controller-socket.h"
29#include "utils/panic.h"
30#include "iolooper.h"
31
32#define E(...) derror(__VA_ARGS__)
33#define W(...) dwarning(__VA_ARGS__)
34#define D(...) VERBOSE_PRINT(sdkctlsocket,__VA_ARGS__)
35#define D_ACTIVE VERBOSE_CHECK(sdkctlsocket)
36
37#define TRACE_ON 1
38
39#if TRACE_ON
40#define T(...) VERBOSE_PRINT(sdkctlsocket,__VA_ARGS__)
41#else
42#define T(...)
43#endif
44
45/* Recycling memory descriptor. */
46typedef struct SDKCtlRecycled SDKCtlRecycled;
47struct SDKCtlRecycled {
48 union {
49 /* Next recycled descriptor (while listed in recycler). */
50 SDKCtlRecycled* next;
51 /* Allocated memory size (while outside of the recycler). */
52 uint32_t size;
53 };
54};
55
56/********************************************************************************
57 * SDKCtlPacket declarations
58 *******************************************************************************/
59
60/*
61 * Types of the packets of data sent via SDK controller socket.
62 */
63
64/* The packet is a message. */
65#define SDKCTL_PACKET_MESSAGE 1
66/* The packet is a query. */
67#define SDKCTL_PACKET_QUERY 2
68/* The packet is a response to a query. */
69#define SDKCTL_PACKET_QUERY_RESPONSE 3
70
71/* Data packet descriptor.
72 *
73 * All packets, sent and received via SDK controller socket begin with this
74 * header, with packet data immediately following this header.
75 */
76typedef struct SDKCtlPacketHeader {
77 /* Total size of the data to transfer with this packet, including this
78 * header. The transferring data should immediatelly follow this header. */
79 int size;
80 /* Encodes packet type. See SDKCTL_PACKET_XXX for the list of packet types
81 * used by SDK controller. */
82 int type;
83} SDKCtlPacketHeader;
84
85/* Packet descriptor, allocated by this API for data packets to be sent to SDK
86 * controller service on the device.
87 *
88 * When packet descriptors are allocated by this API, they are allocated large
89 * enough to contain this header, and packet data to send to the service,
90 * immediately following this descriptor.
91 */
92struct SDKCtlPacket {
93 /* Supports recycling. Don't put anything in front: recycler expects this
94 * to be the first field in recyclable descriptor. */
95 SDKCtlRecycled recycling;
96
97 /* Next packet in the list of packets to send. */
98 SDKCtlPacket* next;
99 /* SDK controller socket that transmits this packet. */
100 SDKCtlSocket* sdkctl;
101 /* Number of outstanding references to the packet. */
102 int ref_count;
103
104 /* Common packet header. Packet data immediately follows this header, so it
105 * must be last field in SDKCtlPacket descriptor. */
106 SDKCtlPacketHeader header;
107};
108
109/********************************************************************************
110 * SDKCtlQuery declarations
111 *******************************************************************************/
112
113/*
114 * Types of queries sent via SDK controller socket.
115 */
116
117/* Handshake query.
118 * This query is sent to SDK controller service as part of the connection
119 * protocol implementation.
120 */
121#define SDKCTL_QUERY_HANDSHAKE -1
122
123/* Query packet descriptor.
124 *
125 * All queries, sent and received via SDK controller socket begin with this
126 * header, with query data immediately following this header.
127 */
128typedef struct SDKCtlQueryHeader {
129 /* Data packet header for this query. */
130 SDKCtlPacketHeader packet;
131 /* A unique query identifier. This ID is used to track the query in the
132 * asynchronous environment in whcih SDK controller socket operates. */
133 int query_id;
134 /* Query type. See SDKCTL_QUERY_XXX for the list of query types used by SDK
135 * controller. */
136 int query_type;
137} SDKCtlQueryHeader;
138
139/* Query descriptor, allocated by this API for queries to be sent to SDK
140 * controller service on the device.
141 *
142 * When query descriptors are allocated by this API, they are allocated large
143 * enough to contain this header, and query data to send to the service,
144 * immediately following this descriptor.
145 */
146struct SDKCtlQuery {
147 /* Supports recycling. Don't put anything in front: recycler expects this
148 * to be the first field in recyclable descriptor. */
149 SDKCtlRecycled recycling;
150
151 /* Next query in the list of active, or recycled queries. */
152 SDKCtlQuery* next;
153 /* A timer to run time out on this query after it has been sent. */
154 LoopTimer timer[1];
155 /* Absolute time for this query's deadline. This is the value that query's
156 * timer is set for after query has been transmitted to the service. */
157 Duration deadline;
158 /* SDK controller socket that owns the query. */
159 SDKCtlSocket* sdkctl;
160 /* A callback to invoke on query state changes. */
161 on_sdkctl_query_cb query_cb;
162 /* An opaque pointer associated with this query. */
163 void* query_opaque;
164 /* Points to an address of a buffer where to save query response. */
165 void** response_buffer;
166 /* Points to a variable containing size of the response buffer (on the way in),
167 * or actual query response size (when query is completed). */
168 uint32_t* response_size;
169 /* Internal response buffer, allocated if query creator didn't provide its
170 * own. This field is valid only if response_buffer field is NULL, or is
171 * pointing to this field. */
172 void* internal_resp_buffer;
173 /* Internal response buffer size This field is valid only if response_size
174 * field is NULL, or is pointing to this field. */
175 uint32_t internal_resp_size;
176 /* Number of outstanding references to the query. */
177 int ref_count;
178
179 /* Common packet header. Query data immediately follows this header, so it
180 * must be last field in SDKCtlQuery descriptor. */
181 SDKCtlQueryHeader header;
182};
183
184/* Query reply descriptor.
185 *
186 * All replies to a query, sent and received via SDK controller socket begin with
187 * this header, with query reply data immediately following this header.
188 */
189typedef struct SDKCtlQueryReplyHeader {
190 /* Data packet header for this reply. */
191 SDKCtlPacketHeader packet;
192
193 /* An identifier for the query that is addressed with this reply. */
194 int query_id;
195} SDKCtlQueryReplyHeader;
196
197/********************************************************************************
198 * SDK Control Socket declarations
199 *******************************************************************************/
200
201/* Enumerates SDKCtlSocket states. */
202typedef enum SDKCtlSocketState {
203 /* Socket is disconnected from SDK controller. */
204 SDKCTL_SOCKET_DISCONNECTED,
205 /* Connection to SDK controller is in progress. */
206 SDKCTL_SOCKET_CONNECTING,
207 /* Socket is connected to an SDK controller service. */
208 SDKCTL_SOCKET_CONNECTED
209} SDKCtlSocketState;
210
211/* Enumerates SDKCtlSocket I/O dispatcher states. */
212typedef enum SDKCtlIODispatcherState {
213 /* I/O dispatcher expects a packet header. */
214 SDKCTL_IODISP_EXPECT_HEADER,
215 /* I/O dispatcher expects packet data. */
216 SDKCTL_IODISP_EXPECT_DATA,
217 /* I/O dispatcher expects query response header. */
218 SDKCTL_IODISP_EXPECT_QUERY_REPLY_HEADER,
219 /* I/O dispatcher expects query response data. */
220 SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA,
221} SDKCtlIODispatcherState;
222
223/* SDKCtlSocket I/O dispatcher descriptor. */
224typedef struct SDKCtlIODispatcher {
225 /* SDKCtlSocket instance for this dispatcher. */
226 SDKCtlSocket* sdkctl;
227 /* Dispatcher state. */
228 SDKCtlIODispatcherState state;
229 /* Unites all types of headers used in SDK controller data exchange. */
230 union {
231 /* Common packet header. */
232 SDKCtlPacketHeader header;
233 /* Header for a query packet. */
234 SDKCtlQueryHeader query_header;
235 /* Header for a query response packet. */
236 SDKCtlQueryReplyHeader query_reply_header;
237 };
238 /* Descriptor of a packet packet received from SDK controller. */
239 SDKCtlPacket* packet;
240 /* A query for which a reply is currently being received. */
241 SDKCtlQuery* current_query;
242} SDKCtlIODispatcher;
243
244/* SDK controller socket descriptor. */
245struct SDKCtlSocket {
246 /* SDK controller socket state */
247 SDKCtlSocketState state;
248 /* I/O dispatcher for the socket. */
249 SDKCtlIODispatcher io_dispatcher;
250 /* Asynchronous socket connected to SDK Controller on the device. */
251 AsyncSocket* as;
252 /* Client callback that monitors this socket connection. */
253 on_sdkctl_connection_cb on_connection;
254 /* A callback to invoke when handshake message is received from the
255 * SDK controller. */
256 on_sdkctl_handshake_cb on_handshake;
257 /* A callback to invoke when a message is received from the SDK controller. */
258 on_sdkctl_message_cb on_message;
259 /* An opaque pointer associated with this socket. */
260 void* opaque;
261 /* Name of an SDK controller service this socket is connected to. */
262 char* service_name;
263 /* I/O looper for timers. */
264 Looper* looper;
265 /* Head of the active query list. */
266 SDKCtlQuery* query_head;
267 /* Tail of the active query list. */
268 SDKCtlQuery* query_tail;
269 /* Query ID generator that gets incremented for each new query. */
270 int next_query_id;
271 /* Timeout before trying to reconnect after disconnection. */
272 int reconnect_to;
273 /* Number of outstanding references to this descriptor. */
274 int ref_count;
275 /* Head of the recycled memory */
276 SDKCtlRecycled* recycler;
277 /* Recyclable block size. */
278 uint32_t recycler_block_size;
279 /* Maximum number of blocks to recycle. */
280 int recycler_max;
281 /* Number of blocs in the recycler. */
282 int recycler_count;
283};
284
285/********************************************************************************
286 * SDKCtlSocket recycling management
287 *******************************************************************************/
288
289/* Gets a recycled block for a given SDKCtlSocket, or allocates new memory
290 * block. */
291static void*
292_sdkctl_socket_alloc_recycler(SDKCtlSocket* sdkctl, uint32_t size)
293{
294 SDKCtlRecycled* block = NULL;
295
296 if (sdkctl->recycler != NULL && size <= sdkctl->recycler_block_size) {
297 /* There are blocks in the recycler, and requested size fits. */
298 block = sdkctl->recycler;
299 sdkctl->recycler = block->next;
300 block->size = sdkctl->recycler_block_size;
301 sdkctl->recycler_count--;
302 } else if (size <= sdkctl->recycler_block_size) {
303 /* There are no blocks in the recycler, but requested size fits. */
304 block = malloc(sdkctl->recycler_block_size);
305 if (block == NULL) {
306 APANIC("SDKCtl %s: Unable to allocate %d bytes block",
307 sdkctl->service_name, sdkctl->recycler_block_size);
308 }
309 block->size = sdkctl->recycler_block_size;
310 } else {
311 /* Requested size doesn't fit the recycler. */
312 block = malloc(size);
313 if (block == NULL) {
314 APANIC("SDKCtl %s: Unable to allocate %d bytes block",
315 sdkctl->service_name, size);
316 }
317 block->size = size;
318 }
319
320 return block;
321}
322
323/* Recycles, or frees a block of memory for a given SDKCtlSocket. */
324static void
325_sdkctl_socket_free_recycler(SDKCtlSocket* sdkctl, void* mem)
326{
327 SDKCtlRecycled* block = (SDKCtlRecycled*)mem;
328
329 if (sdkctl->recycler_count == sdkctl->recycler_max ||
330 block->size != sdkctl->recycler_block_size) {
331 /* Recycler is full, or block cannot be recycled. */
332 free(mem);
333 return;
334 }
335
336 block->next = sdkctl->recycler;
337 sdkctl->recycler = block;
338 sdkctl->recycler_count++;
339}
340
341/* Empties the recycler for a given SDKCtlSocket. */
342static void
343_sdkctl_socket_empty_recycler(SDKCtlSocket* sdkctl)
344{
345 SDKCtlRecycled* block = sdkctl->recycler;
346 while (block != NULL) {
347 void* to_free = block;
348 block = block->next;
349 free(to_free);
350 }
351 sdkctl->recycler = NULL;
352 sdkctl->recycler_count = 0;
353}
354
355/********************************************************************************
356 * SDKCtlSocket query list management
357 *******************************************************************************/
358
359/* Adds a query to the list of active queries.
360 * Param:
361 * sdkctl - SDKCtlSocket instance for the query.
362 * query - Query to add to the list.
363 */
364static void
365_sdkctl_socket_add_query(SDKCtlQuery* query)
366{
367 SDKCtlSocket* const sdkctl = query->sdkctl;
368 if (sdkctl->query_head == NULL) {
369 sdkctl->query_head = sdkctl->query_tail = query;
370 } else {
371 sdkctl->query_tail->next = query;
372 sdkctl->query_tail = query;
373 }
374
375 /* Keep the query referenced while it's in the list. */
376 sdkctl_query_reference(query);
377}
378
379/* Removes a query from the list of active queries.
380 * Param:
381 * query - Query to remove from the list of active queries.
382 * Return:
383 * Boolean: 1 if query has been removed, or 0 if query has not been found in the
384 * list of active queries.
385 */
386static int
387_sdkctl_socket_remove_query(SDKCtlQuery* query)
388{
389 SDKCtlSocket* const sdkctl = query->sdkctl;
390 SDKCtlQuery* prev = NULL;
391 SDKCtlQuery* head = sdkctl->query_head;
392
393 /* Quick check: the query could be currently handled by dispatcher. */
394 if (sdkctl->io_dispatcher.current_query == query) {
395 /* Release the query from dispatcher. */
396 sdkctl_query_release(query);
397 sdkctl->io_dispatcher.current_query = NULL;
398 return 1;
399 }
400
401 /* Remove query from the list. */
402 while (head != NULL && query != head) {
403 prev = head;
404 head = head->next;
405 }
406 if (head != NULL) {
407 if (prev == NULL) {
408 /* Query is at the head of the list. */
409 assert(query == sdkctl->query_head);
410 sdkctl->query_head = query->next;
411 } else {
412 /* Query is in the middle / at the end of the list. */
413 assert(query != sdkctl->query_head);
414 prev->next = query->next;
415 }
416 if (sdkctl->query_tail == query) {
417 /* Query is at the tail of the list. */
418 assert(query->next == NULL);
419 sdkctl->query_tail = prev;
420 }
421 query->next = NULL;
422
423 /* Release query that is now removed from the list. Note that query
424 * passed to this routine should hold an extra reference, owned by the
425 * caller. */
426 sdkctl_query_release(query);
427 return 1;
428 } else {
429 D("%s: Query %p is not found in the list.", sdkctl->service_name, query);
430 return 0;
431 }
432}
433
434/* Removes a query (based on query ID) from the list of active queries.
435 * Param:
436 * sdkctl - SDKCtlSocket instance that owns the query.
437 * query_id - Identifies the query to remove.
438 * Return:
439 * A query removed from the list of active queries, or NULL if query with the
440 * given ID has not been found in the list.
441 */
442static SDKCtlQuery*
443_sdkctl_socket_remove_query_id(SDKCtlSocket* sdkctl, int query_id)
444{
445 SDKCtlQuery* prev = NULL;
446 SDKCtlQuery* head = sdkctl->query_head;
447
448 /* Quick check: the query could be currently handled by dispatcher. */
449 if (sdkctl->io_dispatcher.current_query != NULL &&
450 sdkctl->io_dispatcher.current_query->header.query_id == query_id) {
451 /* Release the query from dispatcher. */
452 SDKCtlQuery* const query = sdkctl->io_dispatcher.current_query;
453 sdkctl->io_dispatcher.current_query = NULL;
454 return query;
455 }
456
457 /* Remove query from the list. */
458 while (head != NULL && head->header.query_id != query_id) {
459 prev = head;
460 head = head->next;
461 }
462 if (head != NULL) {
463 /* Query is found in the list. */
464 SDKCtlQuery* const query = head;
465 if (prev == NULL) {
466 /* Query is at the head of the list. */
467 assert(query == sdkctl->query_head);
468 sdkctl->query_head = query->next;
469 } else {
470 /* Query is in the middle, or at the end of the list. */
471 assert(query != sdkctl->query_head);
472 prev->next = query->next;
473 }
474 if (sdkctl->query_tail == query) {
475 /* Query is at the tail of the list. */
476 assert(query->next == NULL);
477 sdkctl->query_tail = prev;
478 }
479 query->next = NULL;
480 return query;
481 } else {
482 D("%s: Query ID %d is not found in the list.",
483 sdkctl->service_name, query_id);
484 return NULL;
485 }
486}
487
488/* Pulls the first query from the list of active queries.
489 * Param:
490 * sdkctl - SDKCtlSocket instance that owns the query.
491 * Return:
492 * A query removed pulled from the list of active queries, or NULL if query
493 * list is empty.
494 */
495static SDKCtlQuery*
496_sdkctl_socket_pull_first_query(SDKCtlSocket* sdkctl)
497{
498 SDKCtlQuery* const query = sdkctl->query_head;
499
500 if (query != NULL) {
501 sdkctl->query_head = query->next;
502 if (sdkctl->query_head == NULL) {
503 sdkctl->query_tail = NULL;
504 }
505 }
506 return query;
507}
508
509/* Generates new query ID for the given SDKCtl. */
510static int
511_sdkctl_socket_next_query_id(SDKCtlSocket* sdkctl)
512{
513 return ++sdkctl->next_query_id;
514}
515
516/********************************************************************************
517 * SDKCtlPacket implementation
518 *******************************************************************************/
519
520/* Alocates a packet. */
521static SDKCtlPacket*
522_sdkctl_packet_new(SDKCtlSocket* sdkctl, int size, int type)
523{
524 const uint32_t total_size = sizeof(SDKCtlPacket) + size;
525 SDKCtlPacket* const packet = _sdkctl_socket_alloc_recycler(sdkctl, total_size);
526
527 packet->sdkctl = sdkctl;
528 packet->ref_count = 1;
529 packet->header.size = size;
530 packet->header.type = type;
531
532 /* Refence SDKCTlSocket that owns this packet. */
533 sdkctl_socket_reference(sdkctl);
534
535 return packet;
536}
537
538/* Frees a packet. */
539static void
540_sdkctl_packet_free(SDKCtlPacket* packet)
541{
542 SDKCtlSocket* const sdkctl = packet->sdkctl;
543
544 /* Free allocated resources. */
545 _sdkctl_socket_free_recycler(packet->sdkctl, packet);
546
547 /* Release SDKCTlSocket that owned this packet. */
548 sdkctl_socket_release(sdkctl);
549}
550
551int
552sdkctl_packet_reference(SDKCtlPacket* packet)
553{
554 assert(packet->ref_count > 0);
555 packet->ref_count++;
556 return packet->ref_count;
557}
558
559int
560sdkctl_packet_release(SDKCtlPacket* packet)
561{
562 assert(packet->ref_count > 0);
563 packet->ref_count--;
564 if (packet->ref_count == 0) {
565 /* Last reference has been dropped. Destroy this object. */
566 _sdkctl_packet_free(packet);
567 return 0;
568 }
569 return packet->ref_count;
570}
571
572/********************************************************************************
573 * SDKCtlQuery implementation
574 *******************************************************************************/
575
576/* Frees query descriptor. */
577static void
578_sdkctl_query_free(SDKCtlQuery* query)
579{
580 if (query != NULL) {
581 SDKCtlSocket* const sdkctl = query->sdkctl;
582 T("SDKCtl %s: Query %p ID %d is freed.",
583 sdkctl->service_name, query, query->header.query_id);
584
585 /* Free allocated resources. */
586 if (query->internal_resp_buffer != NULL &&
587 (query->response_buffer == NULL ||
588 query->response_buffer == &query->internal_resp_buffer)) {
589 free(query->internal_resp_buffer);
590 }
591
592 loopTimer_done(query->timer);
593 _sdkctl_socket_free_recycler(sdkctl, query);
594
595 /* Release socket that owned this query. */
596 sdkctl_socket_release(sdkctl);
597 }
598}
599
600/* Cancels timeout for the query.
601 *
602 * For the simplicity of implementation, the dispatcher will cancel query timer
603 * when query response data begins to flow in. If we let the timer to expire at
604 * that stage, we will end up with data flowing in without real place to
605 * accomodate it.
606 */
607static void
608_sdkctl_query_cancel_timeout(SDKCtlQuery* query)
609{
610 loopTimer_stop(query->timer);
611
612 T("SDKCtl %s: Query %p ID %d deadline is cancelled.",
613 query->sdkctl->service_name, query, query->header.query_id);
614}
615
616/*
617 * Query I/O callbacks.
618 */
619
620/* Callback that is invoked by the I/O dispatcher when query is successfuly
621 * completed (i.e. response to the query is received).
622 */
623static void
624_on_sdkctl_query_completed(SDKCtlQuery* query)
625{
626 T("SDKCtl %s: Query %p ID %d is completed.",
627 query->sdkctl->service_name, query, query->header.query_id);
628
629 /* Cancel deadline, and inform the client about query completion. */
630 _sdkctl_query_cancel_timeout(query);
631 query->query_cb(query->query_opaque, query, ASIO_STATE_SUCCEEDED);
632}
633
634/* A callback that is invoked on query cancellation. */
635static void
636_on_sdkctl_query_cancelled(SDKCtlQuery* query)
637{
638 /*
639 * Query cancellation means that SDK controller is disconnected. In turn,
640 * this means that SDK controller socket will handle disconnection in its
641 * connection callback. So, at this point all we need to do here is to inform
642 * the client, and then unlist the query.
643 */
644
645 /* Cancel deadline, and inform the client about query cancellation. */
646 _sdkctl_query_cancel_timeout(query);
647 query->query_cb(query->query_opaque, query, ASIO_STATE_CANCELLED);
648}
649
650/* A timer callback that is invoked on query timeout.
651 * Param:
652 * opaque - SDKCtlQuery instance.
653 */
654static void
655_on_skdctl_query_timeout(void* opaque)
656{
657 SDKCtlQuery* const query = (SDKCtlQuery*)opaque;
658
659 D("SDKCtl %s: Query %p ID %d with deadline %lld has timed out at %lld",
660 query->sdkctl->service_name, query, query->header.query_id,
661 query->deadline, async_socket_deadline(query->sdkctl->as, 0));
662
663 /* Reference the query while we're in this callback. */
664 sdkctl_query_reference(query);
665
666 /* Inform the client about deadline expiration. Note that client may
667 * extend the deadline, and retry the query. */
668 const AsyncIOAction action =
669 query->query_cb(query->query_opaque, query, ASIO_STATE_TIMED_OUT);
670
671 /* For actions other than retry we will destroy the query. */
672 if (action != ASIO_ACTION_RETRY) {
673 _sdkctl_socket_remove_query(query);
674 }
675
676 sdkctl_query_release(query);
677}
678
679/* A callback that is invoked when query has been sent to the SDK controller
680 * service. */
681static void
682_on_sdkctl_query_sent(SDKCtlQuery* query)
683{
684 T("SDKCtl %s: sent %d bytes of query %p ID %d of type %d",
685 query->sdkctl->service_name, query->header.packet.size, query,
686 query->header.query_id, query->header.query_type);
687
688 /* Inform the client about the event. */
689 query->query_cb(query->query_opaque, query, ASIO_STATE_CONTINUES);
690
691 /* Set a timer to expire at query's deadline, and let the response to come
692 * through the dispatcher loop. */
693 loopTimer_startAbsolute(query->timer, query->deadline);
694}
695
696/* An I/O callback invoked on query transmission.
697 * Param:
698 * io_opaque SDKCtlQuery instance of the query that's being sent with this I/O.
699 * asio - Write I/O descriptor.
700 * status - I/O status.
701 */
702static AsyncIOAction
703_on_sdkctl_query_send_io(void* io_opaque,
704 AsyncSocketIO* asio,
705 AsyncIOState status)
706{
707 SDKCtlQuery* const query = (SDKCtlQuery*)io_opaque;
708 AsyncIOAction action = ASIO_ACTION_DONE;
709
710 /* Reference the query while we're in this callback. */
711 sdkctl_query_reference(query);
712
713 if (status == ASIO_STATE_SUCCEEDED) {
714 /* Query has been sent to the service. */
715 _on_sdkctl_query_sent(query);
716
717 sdkctl_query_release(query);
718
719 return ASIO_ACTION_DONE;
720 }
721
722 /* Lets see what's going on with query transmission. */
723 switch (status) {
724 case ASIO_STATE_CANCELLED:
725 T("SDKCtl %s: Query %p ID %d is cancelled in %s I/O.",
726 query->sdkctl->service_name, query, query->header.query_id,
727 async_socket_io_is_read(asio) ? "READ" : "WRITE");
728 /* Remove the query from the list of active queries. */
729 _sdkctl_socket_remove_query(query);
730 _on_sdkctl_query_cancelled(query);
731 break;
732
733 case ASIO_STATE_TIMED_OUT:
734 D("SDKCtl %s: Query %p ID %d with deadline %lld has timed out in %s I/O at %lld",
735 query->sdkctl->service_name, query, query->header.query_id,
736 query->deadline, async_socket_io_is_read(asio) ? "READ" : "WRITE",
737 async_socket_deadline(query->sdkctl->as, 0));
738 /* Invoke query's callback. */
739 action = query->query_cb(query->query_opaque, query, status);
740 /* For actions other than retry we need to stop the query. */
741 if (action != ASIO_ACTION_RETRY) {
742 _sdkctl_socket_remove_query(query);
743 }
744 break;
745
746 case ASIO_STATE_FAILED:
747 T("SDKCtl %s: Query %p ID %d failed in %s I/O: %d -> %s",
748 query->sdkctl->service_name, query, query->header.query_id,
749 async_socket_io_is_read(asio) ? "READ" : "WRITE",
750 errno, strerror(errno));
751 /* Invoke query's callback. Note that we will let the client to
752 * decide what to do on I/O failure. */
753 action = query->query_cb(query->query_opaque, query, status);
754 /* For actions other than retry we need to stop the query. */
755 if (action != ASIO_ACTION_RETRY) {
756 _sdkctl_socket_remove_query(query);
757 }
758 break;
759
760 case ASIO_STATE_FINISHED:
761 /* Time to disassociate with the I/O. */
762 sdkctl_query_release(query);
763 break;
764
765 default:
766 /* Transitional state. */
767 break;
768 }
769
770 sdkctl_query_release(query);
771
772 return action;
773}
774
775/********************************************************************************
776 * SDKCtlQuery public API implementation
777 ********************************************************************************/
778
779SDKCtlQuery*
780sdkctl_query_new(SDKCtlSocket* sdkctl, int query_type, uint32_t in_data_size)
781{
782 const uint32_t total_size = sizeof(SDKCtlQuery) + in_data_size;
783
784 SDKCtlQuery* const query = _sdkctl_socket_alloc_recycler(sdkctl, total_size);
785 query->next = NULL;
786 query->sdkctl = sdkctl;
787 query->response_buffer = NULL;
788 query->response_size = NULL;
789 query->internal_resp_buffer = NULL;
790 query->internal_resp_size = 0;
791 query->query_cb = NULL;
792 query->query_opaque = NULL;
793 query->deadline = DURATION_INFINITE;
794 query->ref_count = 1;
795 query->header.packet.size = sizeof(SDKCtlQueryHeader) + in_data_size;
796 query->header.packet.type = SDKCTL_PACKET_QUERY;
797 query->header.query_id = _sdkctl_socket_next_query_id(sdkctl);
798 query->header.query_type = query_type;
799
800 /* Initialize timer to fire up on query deadline expiration. */
801 loopTimer_init(query->timer, sdkctl->looper, _on_skdctl_query_timeout, query);
802
803 /* Reference socket that owns this query. */
804 sdkctl_socket_reference(sdkctl);
805
806 T("SDKCtl %s: Query %p ID %d type %d is created for %d bytes of data.",
807 query->sdkctl->service_name, query, query->header.query_id,
808 query_type, in_data_size);
809
810 return query;
811}
812
813SDKCtlQuery*
814sdkctl_query_new_ex(SDKCtlSocket* sdkctl,
815 int query_type,
816 uint32_t in_data_size,
817 const void* in_data,
818 void** response_buffer,
819 uint32_t* response_size,
820 on_sdkctl_query_cb query_cb,
821 void* query_opaque)
822{
823 SDKCtlQuery* const query = sdkctl_query_new(sdkctl, query_type, in_data_size);
824
825 query->response_buffer = response_buffer;
826 if (query->response_buffer == NULL) {
827 /* Creator didn't supply a buffer. Use internal one instead. */
828 query->response_buffer = &query->internal_resp_buffer;
829 query->internal_resp_buffer = NULL;
830 }
831 query->response_size = response_size;
832 if (query->response_size == NULL) {
833 /* Creator didn't supply a buffer for response size. Use internal one
834 * instead. */
835 query->response_size = &query->internal_resp_size;
836 query->internal_resp_size = 0;
837 }
838 query->query_cb = query_cb;
839 query->query_opaque = query_opaque;
840 /* Init query's input buffer. */
841 if (in_data_size != 0 && in_data != NULL) {
842 memcpy(query + 1, in_data, in_data_size);
843 }
844
845 return query;
846}
847
848void
849sdkctl_query_send(SDKCtlQuery* query, int to)
850{
851 SDKCtlSocket* const sdkctl = query->sdkctl;
852
853 /* Initialize the deadline. */
854 query->deadline = async_socket_deadline(query->sdkctl->as, to);
855
856 /* List the query in the list of active queries. */
857 _sdkctl_socket_add_query(query);
858
859 /* Reference query associated with write I/O. */
860 sdkctl_query_reference(query);
861
862 /* Transmit the query to SDK controller. */
863 async_socket_write_abs(sdkctl->as, &query->header, query->header.packet.size,
864 _on_sdkctl_query_send_io, query, query->deadline);
865
866 T("SDKCtl %s: Query %p ID %d type %d is sent with deadline at %lld",
867 query->sdkctl->service_name, query, query->header.query_id,
868 query->header.query_type, query->deadline);
869}
870
871SDKCtlQuery*
872sdkctl_query_build_and_send(SDKCtlSocket* sdkctl,
873 int query_type,
874 uint32_t in_data_size,
875 const void* in_data,
876 void** response_buffer,
877 uint32_t* response_size,
878 on_sdkctl_query_cb query_cb,
879 void* query_opaque,
880 int to)
881{
882 SDKCtlQuery* const query =
883 sdkctl_query_new_ex(sdkctl, query_type, in_data_size, in_data,
884 response_buffer, response_size, query_cb,
885 query_opaque);
886 sdkctl_query_send(query, to);
887 return query;
888}
889
890int
891sdkctl_query_reference(SDKCtlQuery* query)
892{
893 assert(query->ref_count > 0);
894 query->ref_count++;
895 return query->ref_count;
896}
897
898int
899sdkctl_query_release(SDKCtlQuery* query)
900{
901 assert(query->ref_count > 0);
902 query->ref_count--;
903 if (query->ref_count == 0) {
904 /* Last reference has been dropped. Destroy this object. */
905 _sdkctl_query_free(query);
906 return 0;
907 }
908 return query->ref_count;
909}
910
911/********************************************************************************
912 * SDKCtlPacket implementation
913 *******************************************************************************/
914
915/* A packet has been received from SDK controller. */
916static void
917_on_sdkctl_packet_received(SDKCtlSocket* sdkctl, SDKCtlPacket* packet)
918{
919 T("SDKCtl %s: Received packet size: %d, type: %d",
920 sdkctl->service_name, packet->header.size, packet->header.type);
921
922 /* Dispatch received packet to the client. */
923 sdkctl->on_message(sdkctl->opaque, sdkctl, packet, packet->header.type,
924 packet + 1, packet->header.size - sizeof(SDKCtlPacketHeader));
925}
926
927/********************************************************************************
928 * SDKCtlIODispatcher implementation
929 *******************************************************************************/
930
931/* An I/O callback invoked when data gets received from the socket.
932 * Param:
933 * io_opaque SDKCtlIODispatcher instance associated with the reader.
934 * asio - Read I/O descriptor.
935 * status - I/O status.
936 */
937static AsyncIOAction _on_sdkctl_io_dispatcher_io(void* io_opaque,
938 AsyncSocketIO* asio,
939 AsyncIOState status);
940
941/* Starts I/O dispatcher for SDK controller socket. */
942static void
943_sdkctl_io_dispatcher_start(SDKCtlSocket* sdkctl) {
944 SDKCtlIODispatcher* const dispatcher = &sdkctl->io_dispatcher;
945
946 dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
947 dispatcher->sdkctl = sdkctl;
948 dispatcher->packet = NULL;
949 dispatcher->current_query = NULL;
950
951 /* Register a packet header reader with the socket. */
952 async_socket_read_rel(dispatcher->sdkctl->as, &dispatcher->header,
953 sizeof(SDKCtlPacketHeader), _on_sdkctl_io_dispatcher_io,
954 dispatcher, -1);
955}
956
957/* Resets I/O dispatcher for SDK controller socket. */
958static void
959_sdkctl_io_dispatcher_reset(SDKCtlSocket* sdkctl) {
960 SDKCtlIODispatcher* const dispatcher = &sdkctl->io_dispatcher;
961
962 /* Cancel current query. */
963 if (dispatcher->current_query != NULL) {
964 SDKCtlQuery* const query = dispatcher->current_query;
965 dispatcher->current_query = NULL;
966 _on_sdkctl_query_cancelled(query);
967 sdkctl_query_release(query);
968 }
969
970 /* Free packet data buffer. */
971 if (dispatcher->packet != NULL) {
972 sdkctl_packet_release(dispatcher->packet);
973 dispatcher->packet = NULL;
974 }
975
976 /* Reset dispatcher state. */
977 dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
978}
979
980/*
981 * I/O dispatcher callbacks.
982 */
983
984/* A callback that is invoked when a failure occurred while dispatcher was
985 * reading data from the socket.
986 */
987static void
988_on_io_dispatcher_io_failure(SDKCtlIODispatcher* dispatcher,
989 AsyncSocketIO* asio)
990{
991 SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
992
993 D("SDKCtl %s: Dispatcher I/O failure: %d -> %s",
994 sdkctl->service_name, errno, strerror(errno));
995
996 /* We treat all I/O failures same way we treat disconnection. Just cancel
997 * everything, disconnect, and let the client to decide what to do next. */
998 sdkctl_socket_disconnect(sdkctl);
999
1000 /* Report disconnection to the client, and let it restore connection in this
1001 * callback. */
1002 sdkctl->on_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED);
1003}
1004
1005/* A callback that is invoked when dispatcher's reader has been cancelled. */
1006static void
1007_on_io_dispatcher_io_cancelled(SDKCtlIODispatcher* dispatcher,
1008 AsyncSocketIO* asio)
1009{
1010 T("SDKCtl %s: Dispatcher I/O cancelled.", dispatcher->sdkctl->service_name);
1011
1012 /* If we're in the middle of receiving query reply we need to cancel the
1013 * query. */
1014 if (dispatcher->current_query != NULL) {
1015 SDKCtlQuery* const query = dispatcher->current_query;
1016 dispatcher->current_query = NULL;
1017 _on_sdkctl_query_cancelled(query);
1018 sdkctl_query_release(query);
1019 }
1020
1021 /* Discard packet data we've received so far. */
1022 if (dispatcher->packet != NULL) {
1023 sdkctl_packet_release(dispatcher->packet);
1024 dispatcher->packet = NULL;
1025 }
1026}
1027
1028/* A generic packet header has been received by I/O dispatcher. */
1029static AsyncIOAction
1030_on_io_dispatcher_packet_header(SDKCtlIODispatcher* dispatcher,
1031 AsyncSocketIO* asio)
1032{
1033 SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
1034
1035 T("SDKCtl %s: Packet header type %d, size %d is received.",
1036 dispatcher->sdkctl->service_name, dispatcher->header.type,
1037 dispatcher->header.size);
1038
1039 /* Here we have three choices for the packet, that define the rest of
1040 * the data that follow it:
1041 * - Regular packet,
1042 * - Response to a query that has been sent to SDK controller,
1043 * - A query from SDK controller.
1044 * Update the state accordingly, and initiate reading of the
1045 * remaining of the packet.
1046 */
1047 if (dispatcher->header.type == SDKCTL_PACKET_QUERY_RESPONSE) {
1048 /* This is a response to the query. Before receiving response data we
1049 * need to locate the relevant query, and use its response buffer to read
1050 * the data. For that we need to obtain query ID firts. So, initiate
1051 * reading of the remaining part of SDKCtlQueryReplyHeader. */
1052 dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_HEADER;
1053 async_socket_read_rel(sdkctl->as, &dispatcher->query_reply_header.query_id,
1054 sizeof(SDKCtlQueryReplyHeader) - sizeof(SDKCtlPacketHeader),
1055 _on_sdkctl_io_dispatcher_io, dispatcher, -1);
1056 } else {
1057 /* For regular packets, as well as queries, we simply allocate buffer,
1058 * that fits the entire packet, and read the remainder of the data in
1059 * there. */
1060 dispatcher->state = SDKCTL_IODISP_EXPECT_DATA;
1061 dispatcher->packet =
1062 _sdkctl_packet_new(sdkctl, dispatcher->header.size,
1063 dispatcher->header.type);
1064 /* Initiate reading of the packet data. */
1065 async_socket_read_rel(sdkctl->as, dispatcher->packet + 1,
1066 dispatcher->header.size - sizeof(SDKCtlPacketHeader),
1067 _on_sdkctl_io_dispatcher_io, dispatcher, -1);
1068 }
1069
1070 return ASIO_ACTION_DONE;
1071}
1072
1073/* A generic packet has been received by I/O dispatcher. */
1074static AsyncIOAction
1075_on_io_dispatcher_packet(SDKCtlIODispatcher* dispatcher, AsyncSocketIO* asio)
1076{
1077 SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
1078
1079 T("SDKCtl %s: Packet type %d, size %d is received.",
1080 dispatcher->sdkctl->service_name, dispatcher->header.type,
1081 dispatcher->header.size);
1082
1083 _on_sdkctl_packet_received(sdkctl, dispatcher->packet);
1084 sdkctl_packet_release(dispatcher->packet);
1085 dispatcher->packet = NULL;
1086
1087 /* Get ready for the next I/O cycle. */
1088 dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
1089 async_socket_read_rel(sdkctl->as, &dispatcher->header, sizeof(SDKCtlPacketHeader),
1090 _on_sdkctl_io_dispatcher_io, dispatcher, -1);
1091 return ASIO_ACTION_DONE;
1092}
1093
1094/* A query reply header has been received by I/O dispatcher. */
1095static AsyncIOAction
1096_on_io_dispatcher_query_reply_header(SDKCtlIODispatcher* dispatcher,
1097 AsyncSocketIO* asio)
1098{
1099 SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
1100 SDKCtlQuery* query;
1101
1102 T("SDKCtl %s: Query reply header is received for query ID %d",
1103 dispatcher->sdkctl->service_name, dispatcher->query_reply_header.query_id);
1104
1105 /* Pull the query out of the list of active queries. It's the dispatcher that
1106 * owns this query now. */
1107 dispatcher->current_query =
1108 _sdkctl_socket_remove_query_id(sdkctl, dispatcher->query_reply_header.query_id);
1109 query = dispatcher->current_query;
1110
1111 if (query == NULL) {
1112 D("%s: Query #%d is not found by dispatcher",
1113 dispatcher->sdkctl->service_name, dispatcher->query_reply_header.query_id);
1114
1115 /* Query is not found. Just read the remainder of reply up in the air,
1116 * and then discard when it's over. */
1117 dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA;
1118 dispatcher->packet =
1119 _sdkctl_packet_new(sdkctl, dispatcher->header.size,
1120 dispatcher->header.type);
1121 /* Copy query reply info to the packet. */
1122 memcpy(&dispatcher->packet->header, &dispatcher->query_reply_header,
1123 sizeof(SDKCtlQueryReplyHeader));
1124 async_socket_read_rel(sdkctl->as, &dispatcher->query_header + 1,
1125 dispatcher->header.size - sizeof(SDKCtlQueryReplyHeader),
1126 _on_sdkctl_io_dispatcher_io, dispatcher, -1);
1127 } else {
1128 /* Prepare to receive query reply. For the simplicity sake, cancel query
1129 * time out, so it doesn't expire on us while we're in the middle of
1130 * receiving query's reply. */
1131 _sdkctl_query_cancel_timeout(query);
1132
1133 /* Adjust the reply buffer set for the query (if needed). */
1134 const uint32_t query_data_size =
1135 dispatcher->header.size - sizeof(SDKCtlQueryReplyHeader);
1136 if (*query->response_size < query_data_size) {
1137 *query->response_buffer = malloc(query_data_size);
1138 if (*query->response_buffer == NULL) {
1139 APANIC("%s: Unable to allocate %d bytes for query response",
1140 sdkctl->service_name, query_data_size);
1141 }
1142 }
1143 /* Save the actual query response size. */
1144 *query->response_size = query_data_size;
1145
1146 /* Start reading query response. */
1147 dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA;
1148 async_socket_read_rel(sdkctl->as, *query->response_buffer,
1149 *query->response_size, _on_sdkctl_io_dispatcher_io,
1150 dispatcher, -1);
1151 }
1152
1153 return ASIO_ACTION_DONE;
1154}
1155
1156/* A query reply header has been received by I/O dispatcher. */
1157static AsyncIOAction
1158_on_io_dispatcher_query_reply(SDKCtlIODispatcher* dispatcher, AsyncSocketIO* asio)
1159{
1160 SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
1161 SDKCtlQuery* const query = dispatcher->current_query;
1162 dispatcher->current_query = NULL;
1163
1164 if (query != NULL) {
1165 _ANDROID_ASSERT(query->header.query_id == dispatcher->query_reply_header.query_id,
1166 "SDKCtl %s: Query ID mismatch in I/O dispatcher",
1167 sdkctl->service_name);
1168 T("SDKCtl %s: Query reply is received for query %p ID %d. Reply size is %d",
1169 dispatcher->sdkctl->service_name, query, query->header.query_id,
1170 *query->response_size);
1171
1172 /* Complete the query, and release it from the dispatcher. */
1173 _on_sdkctl_query_completed(query);
1174 sdkctl_query_release(query);
1175 } else {
1176 /* This was "read up in the air" for a cancelled query. Just discard the
1177 * read data. */
1178 if (dispatcher->packet != NULL) {
1179 sdkctl_packet_release(dispatcher->packet);
1180 dispatcher->packet = NULL;
1181 }
1182 }
1183
1184 /* Get ready for the next I/O cycle. */
1185 dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
1186 async_socket_read_rel(sdkctl->as, &dispatcher->header, sizeof(SDKCtlPacketHeader),
1187 _on_sdkctl_io_dispatcher_io, dispatcher, -1);
1188 return ASIO_ACTION_DONE;
1189}
1190
1191/* An I/O callback invoked when data gets received from the socket.
1192 * This is main I/O dispatcher loop.
1193 * Param:
1194 * io_opaque SDKCtlIODispatcher instance associated with the reader.
1195 * asio - Read I/O descriptor.
1196 * status - I/O status.
1197 */
1198static AsyncIOAction
1199_on_sdkctl_io_dispatcher_io(void* io_opaque,
1200 AsyncSocketIO* asio,
1201 AsyncIOState status)
1202{
1203 AsyncIOAction action = ASIO_ACTION_DONE;
1204 SDKCtlIODispatcher* const dispatcher = (SDKCtlIODispatcher*)io_opaque;
1205 SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
1206
1207 /* Reference SDKCtlSocket while we're in this callback. */
1208 sdkctl_socket_reference(sdkctl);
1209
1210 if (status != ASIO_STATE_SUCCEEDED) {
1211 /* Something going on with I/O other than receiving data.. */
1212 switch (status) {
1213 case ASIO_STATE_STARTED:
1214 /* Data has started flowing in. Cancel timeout on I/O that has
1215 * started, so we can complete the current state of the
1216 * dispatcher without interruptions other than I/O failures. */
1217 async_socket_io_cancel_time_out(asio);
1218 break;
1219
1220 case ASIO_STATE_FAILED:
1221 /* I/O failure has occurred. Handle the failure. */
1222 _on_io_dispatcher_io_failure(dispatcher, asio);
1223 break;
1224
1225 case ASIO_STATE_TIMED_OUT:
1226 /* The way I/O dispatcher is implemented, this should never
1227 * happen, because dispatcher doesn't set I/O expiration time
1228 * when registering its readers. */
1229 _ANDROID_ASSERT(0,
1230 "SDKCtl %s: We should never receive ASIO_STATE_TIMED_OUT in SDKCtl I/O dispatcher.",
1231 sdkctl->service_name);
1232 break;
1233
1234 case ASIO_STATE_CANCELLED:
1235 /* Cancellation means that we're in the middle of handling
1236 * disconnection. Sooner or later, this dispatcher will be reset,
1237 * so we don't really care about keeping its state at this point.
1238 */
1239 _on_io_dispatcher_io_cancelled(dispatcher, asio);
1240 break;
1241
1242 case ASIO_STATE_FINISHED:
1243 break;
1244
1245 default:
1246 _ANDROID_ASSERT(0, "SDKCtl %s: Unexpected I/O status %d in the dispatcher",
1247 sdkctl->service_name, status);
1248 /* Handle this as protocol failure. */
1249 errno = EINVAL;
1250 _on_io_dispatcher_io_failure(dispatcher, asio);
1251 action = ASIO_ACTION_ABORT;
1252 break;
1253 }
1254
1255 sdkctl_socket_release(sdkctl);
1256
1257 return action;
1258 }
1259
1260 /* Requested data has been read. Handle the chunk depending on dispatcher's
1261 * state. */
1262 switch (dispatcher->state) {
1263 case SDKCTL_IODISP_EXPECT_HEADER:
1264 /* A generic packet header is received. */
1265 action = _on_io_dispatcher_packet_header(dispatcher, asio);
1266 break;
1267
1268 case SDKCTL_IODISP_EXPECT_QUERY_REPLY_HEADER:
1269 /* Query reply header is received. */
1270 action = _on_io_dispatcher_query_reply_header(dispatcher, asio);
1271 break;
1272
1273 case SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA:
1274 /* Query reply is received. Complete the query. */
1275 action = _on_io_dispatcher_query_reply(dispatcher, asio);
1276 break;
1277
1278 case SDKCTL_IODISP_EXPECT_DATA:
1279 /* A generic packet is received. */
1280 action = _on_io_dispatcher_packet(dispatcher, asio);
1281 break;
1282
1283 default:
1284 _ANDROID_ASSERT(0, "SDKCtl %s: Unexpected I/O dispacher state %d",
1285 sdkctl->service_name, dispatcher->state);
1286 break;
1287 }
1288
1289 sdkctl_socket_release(sdkctl);
1290
1291 return action;
1292}
1293
1294/********************************************************************************
1295 * SDKCtlSocket internals.
1296 *******************************************************************************/
1297
1298/* Cancels all queries that is active on this socket. */
1299static void
1300_sdkctl_socket_cancel_all_queries(SDKCtlSocket* sdkctl)
1301{
1302 SDKCtlIODispatcher* const dispatcher = &sdkctl->io_dispatcher;
1303 SDKCtlQuery* query;
1304
1305 /* Cancel query that is being completed in dispatcher. */
1306 if (dispatcher->current_query != NULL) {
1307 SDKCtlQuery* const query = dispatcher->current_query;
1308 dispatcher->current_query = NULL;
1309 _on_sdkctl_query_cancelled(query);
1310 sdkctl_query_release(query);
1311 }
1312
1313 /* One by one empty query list cancelling pulled queries. */
1314 query = _sdkctl_socket_pull_first_query(sdkctl);
1315 while (query != NULL) {
1316 _sdkctl_query_cancel_timeout(query);
1317 query->query_cb(query->query_opaque, query, ASIO_STATE_CANCELLED);
1318 sdkctl_query_release(query);
1319 query = _sdkctl_socket_pull_first_query(sdkctl);
1320 }
1321}
1322
1323/* Cancels all packets that is active on this socket. */
1324static void
1325_sdkctl_socket_cancel_all_packets(SDKCtlSocket* sdkctl)
1326{
1327}
1328
1329/* Cancels all I/O that is active on this socket. */
1330static void
1331_sdkctl_socket_cancel_all_io(SDKCtlSocket* sdkctl)
1332{
1333 /* Cancel all queries, and packets that are active for this I/O. */
1334 _sdkctl_socket_cancel_all_queries(sdkctl);
1335 _sdkctl_socket_cancel_all_packets(sdkctl);
1336}
1337
1338/* Disconnects AsyncSocket for SDKCtlSocket. */
1339static void
1340_sdkctl_socket_disconnect_socket(SDKCtlSocket* sdkctl)
1341{
1342 if (sdkctl->as != NULL) {
1343 /* Disconnect the socket. This will trigger I/O cancellation callbacks. */
1344 async_socket_disconnect(sdkctl->as);
1345
1346 /* Cancel all I/O that is active on this socket. */
1347 _sdkctl_socket_cancel_all_io(sdkctl);
1348
1349 /* Reset I/O dispatcher. */
1350 _sdkctl_io_dispatcher_reset(sdkctl);
1351 }
1352
1353 sdkctl->state = SDKCTL_SOCKET_DISCONNECTED;
1354}
1355
1356/* Frees SDKCtlSocket instance. */
1357static void
1358_sdkctl_socket_free(SDKCtlSocket* sdkctl)
1359{
1360 if (sdkctl != NULL) {
1361 /* Disconnect, and release the socket. */
1362 if (sdkctl->as != NULL) {
1363 async_socket_disconnect(sdkctl->as);
1364 async_socket_release(sdkctl->as);
1365 }
1366
1367 /* Free allocated resources. */
1368 if (sdkctl->looper != NULL) {
1369 looper_free(sdkctl->looper);
1370 }
1371 if (sdkctl->service_name != NULL) {
1372 free(sdkctl->service_name);
1373 }
1374 _sdkctl_socket_empty_recycler(sdkctl);
1375
1376 AFREE(sdkctl);
1377 }
1378}
1379
1380/********************************************************************************
1381 * SDK Control Socket connection callbacks.
1382 *******************************************************************************/
1383
1384/* Initiates handshake query when SDK controller socket is connected. */
1385static void _sdkctl_do_handshake(SDKCtlSocket* sdkctl);
1386
1387/* A socket connection is established.
1388 * Here we will start I/O dispatcher, and will initiate a handshake with
1389 * the SdkController service for this socket. */
1390static AsyncIOAction
1391_on_async_socket_connected(SDKCtlSocket* sdkctl)
1392{
1393 D("SDKCtl %s: Socket is connected.", sdkctl->service_name);
1394
1395 /* Notify the client that connection is established. */
1396 const AsyncIOAction action =
1397 sdkctl->on_connection(sdkctl->opaque, sdkctl, ASIO_STATE_SUCCEEDED);
1398
1399 if (action == ASIO_ACTION_DONE) {
1400 /* Initialize, and start main I/O dispatcher. */
1401 _sdkctl_io_dispatcher_start(sdkctl);
1402
1403 /* Initiate handshake. */
1404 _sdkctl_do_handshake(sdkctl);
1405
1406 return action;
1407 } else {
1408 /* Client didn't like something about this connection. */
1409 return action;
1410 }
1411}
1412
1413/* Handles lost connection with SdkController service. */
1414static AsyncIOAction
1415_on_async_socket_disconnected(SDKCtlSocket* sdkctl)
1416{
1417 D("SDKCtl %s: Socket has been disconnected.", sdkctl->service_name);
1418
1419 _sdkctl_socket_disconnect_socket(sdkctl);
1420
1421 AsyncIOAction action = sdkctl->on_connection(sdkctl->opaque, sdkctl,
1422 ASIO_STATE_FAILED);
1423 if (action == ASIO_ACTION_DONE) {
1424 /* Default action for disconnect is to reestablish the connection. */
1425 action = ASIO_ACTION_RETRY;
1426 }
1427 if (action == ASIO_ACTION_RETRY) {
1428 sdkctl->state = SDKCTL_SOCKET_CONNECTING;
1429 }
1430 return action;
1431}
1432
1433/* An entry point for all socket connection events.
1434 * Here we will dispatch connection events to appropriate handlers.
1435 * Param:
1436 * client_opaque - SDKCtlSocket isntance.
1437 */
1438static AsyncIOAction
1439_on_async_socket_connection(void* client_opaque,
1440 AsyncSocket* as,
1441 AsyncIOState status)
1442{
1443 AsyncIOAction action = ASIO_ACTION_DONE;
1444 SDKCtlSocket* const sdkctl = (SDKCtlSocket*)client_opaque;
1445
1446 /* Reference the socket while in this callback. */
1447 sdkctl_socket_reference(sdkctl);
1448
1449 switch (status) {
1450 case ASIO_STATE_SUCCEEDED:
1451 sdkctl->state = SDKCTL_SOCKET_CONNECTED;
1452 _on_async_socket_connected(sdkctl);
1453 break;
1454
1455 case ASIO_STATE_FAILED:
1456 if (sdkctl->state == SDKCTL_SOCKET_CONNECTED) {
1457 /* This is disconnection condition. */
1458 action = _on_async_socket_disconnected(sdkctl);
1459 } else {
1460 /* An error has occurred while attempting to connect to socket.
1461 * Lets try again... */
1462 action = ASIO_ACTION_RETRY;
1463 }
1464 break;
1465
1466 case ASIO_STATE_RETRYING:
1467 default:
1468 action = ASIO_ACTION_RETRY;
1469 break;
1470 }
1471
1472 sdkctl_socket_release(sdkctl);
1473
1474 return action;
1475}
1476
1477/********************************************************************************
1478 * SDK Control Socket public API
1479 *******************************************************************************/
1480
1481SDKCtlSocket*
1482sdkctl_socket_new(int reconnect_to,
1483 const char* service_name,
1484 on_sdkctl_connection_cb on_connection,
1485 on_sdkctl_handshake_cb on_handshake,
1486 on_sdkctl_message_cb on_message,
1487 void* opaque)
1488{
1489 SDKCtlSocket* sdkctl;
1490 ANEW0(sdkctl);
1491
1492 sdkctl->state = SDKCTL_SOCKET_DISCONNECTED;
1493 sdkctl->opaque = opaque;
1494 sdkctl->service_name = ASTRDUP(service_name);
1495 sdkctl->on_connection = on_connection;
1496 sdkctl->on_handshake = on_handshake;
1497 sdkctl->on_message = on_message;
1498 sdkctl->reconnect_to = reconnect_to;
1499 sdkctl->as = NULL;
1500 sdkctl->next_query_id = 0;
1501 sdkctl->query_head = sdkctl->query_tail = NULL;
1502 sdkctl->ref_count = 1;
1503 sdkctl->recycler = NULL;
1504 sdkctl->recycler_block_size = 0;
1505 sdkctl->recycler_max = 0;
1506 sdkctl->recycler_count = 0;
1507
1508 sdkctl->looper = looper_newCore();
1509 if (sdkctl->looper == NULL) {
1510 E("Unable to create I/O looper for SDKCtl socket '%s'",
1511 service_name);
1512 on_connection(opaque, sdkctl, ASIO_STATE_FAILED);
1513 _sdkctl_socket_free(sdkctl);
1514 return NULL;
1515 }
1516
1517 return sdkctl;
1518}
1519
1520int sdkctl_socket_reference(SDKCtlSocket* sdkctl)
1521{
1522 assert(sdkctl->ref_count > 0);
1523 sdkctl->ref_count++;
1524 return sdkctl->ref_count;
1525}
1526
1527int
1528sdkctl_socket_release(SDKCtlSocket* sdkctl)
1529{
1530 assert(sdkctl->ref_count > 0);
1531 sdkctl->ref_count--;
1532 if (sdkctl->ref_count == 0) {
1533 /* Last reference has been dropped. Destroy this object. */
1534 _sdkctl_socket_free(sdkctl);
1535 return 0;
1536 }
1537 return sdkctl->ref_count;
1538}
1539
1540void
1541sdkctl_init_recycler(SDKCtlSocket* sdkctl,
1542 uint32_t data_size,
1543 int max_recycled_num)
1544{
1545 if (sdkctl->recycler != NULL) {
1546 D("SDKCtl %s: Recycler is already initialized. Ignoring recycler init.",
1547 sdkctl->service_name);
1548 return;
1549 }
1550
1551 /* SDKCtlQuery is max descriptor sizeof. */
1552 data_size += sizeof(SDKCtlQuery);
1553
1554 sdkctl->recycler_block_size = data_size;
1555 sdkctl->recycler_max = max_recycled_num;
1556 sdkctl->recycler_count = 0;
1557}
1558
1559void
1560sdkctl_socket_connect(SDKCtlSocket* sdkctl, int port, int retry_to)
1561{
1562 T("SDKCtl %s: Handling connect request to port %d, retrying in %dms...",
1563 sdkctl->service_name, port, retry_to);
1564
1565 sdkctl->state = SDKCTL_SOCKET_CONNECTING;
1566 sdkctl->as = async_socket_new(port, sdkctl->reconnect_to,
1567 _on_async_socket_connection, sdkctl,
1568 sdkctl->looper);
1569 if (sdkctl->as == NULL) {
1570 E("Unable to allocate AsyncSocket for SDKCtl socket '%s'",
1571 sdkctl->service_name);
1572 sdkctl->on_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED);
1573 } else {
1574 async_socket_connect(sdkctl->as, retry_to);
1575 }
1576}
1577
1578void
1579sdkctl_socket_reconnect(SDKCtlSocket* sdkctl, int port, int retry_to)
1580{
1581 T("SDKCtl %s: Handling reconnection request to port %d, retrying in %dms...",
1582 sdkctl->service_name, port, retry_to);
1583
1584 _sdkctl_socket_disconnect_socket(sdkctl);
1585
1586 if (sdkctl->as == NULL) {
1587 sdkctl_socket_connect(sdkctl, port, retry_to);
1588 } else {
1589 sdkctl->state = SDKCTL_SOCKET_CONNECTING;
1590 async_socket_reconnect(sdkctl->as, retry_to);
1591 }
1592}
1593
1594void
1595sdkctl_socket_disconnect(SDKCtlSocket* sdkctl)
1596{
1597 T("SDKCtl %s: Handling disconnect request.", sdkctl->service_name);
1598
1599 _sdkctl_socket_disconnect_socket(sdkctl);
1600}
1601
1602
1603/********************************************************************************
1604 * Handshake query
1605 *******************************************************************************/
1606
1607/* A callback that is ivoked on handshake I/O events. */
1608static AsyncIOAction
1609_on_handshake_io(void* query_opaque,
1610 SDKCtlQuery* query,
1611 AsyncIOState status)
1612{
1613 SDKCtlSocket* const sdkctl = (SDKCtlSocket*)query_opaque;
1614
1615 if (status == ASIO_STATE_SUCCEEDED) {
1616 D("SDKCtl %s: %d bytes of handshake reply is received.",
1617 sdkctl->service_name, *query->response_size);
1618
1619 /* Handshake is received. Inform the client. */
1620 sdkctl->on_handshake(sdkctl->opaque, sdkctl, *query->response_buffer,
1621 *query->response_size, status);
1622 } else {
1623 /* Something is going on with the handshake... */
1624 switch (status) {
1625 case ASIO_STATE_FAILED:
1626 case ASIO_STATE_TIMED_OUT:
1627 case ASIO_STATE_CANCELLED:
1628 D("SDKCtl %s: Handshake failed: I/O state %d. Error: %d -> %s",
1629 sdkctl->service_name, status, errno, strerror(errno));
1630 sdkctl->on_handshake(sdkctl->opaque, sdkctl,
1631 *query->response_buffer,
1632 *query->response_size, status);
1633 break;
1634
1635 default:
1636 break;
1637 }
1638 }
1639 return ASIO_ACTION_DONE;
1640}
1641
1642static AsyncIOAction
1643_on_sdkctl_endianness_io(void* io_opaque,
1644 AsyncSocketIO* asio,
1645 AsyncIOState status) {
1646 SDKCtlSocket* const sdkctl = (SDKCtlSocket*)io_opaque;
1647
1648 if (status == ASIO_STATE_SUCCEEDED) {
1649 /* Now it's time to initiate handshake message. */
1650 D("SDKCtl %s: Sending handshake query...", sdkctl->service_name);
1651 SDKCtlQuery* query =
1652 sdkctl_query_build_and_send(sdkctl, SDKCTL_QUERY_HANDSHAKE,
1653 strlen(sdkctl->service_name),
1654 sdkctl->service_name, NULL, NULL,
1655 _on_handshake_io, sdkctl, 3000);
1656 sdkctl_query_release(query);
1657 return ASIO_ACTION_DONE;
1658 } else {
1659 /* Something is going on with the endianness... */
1660 switch (status) {
1661 case ASIO_STATE_FAILED:
1662 case ASIO_STATE_TIMED_OUT:
1663 case ASIO_STATE_CANCELLED:
1664 D("SDKCtl %s: endianness failed: I/O state %d. Error: %d -> %s",
1665 sdkctl->service_name, status, errno, strerror(errno));
1666 sdkctl->on_handshake(sdkctl->opaque, sdkctl, NULL, 0, status);
1667 break;
1668
1669 default:
1670 break;
1671 }
1672 }
1673 return ASIO_ACTION_DONE;
1674}
1675
1676static void
1677_sdkctl_do_handshake(SDKCtlSocket* sdkctl)
1678{
1679#ifndef HOST_WORDS_BIGENDIAN
1680static const char _host_end = 0;
1681#else
1682static const char _host_end = 1;
1683#endif
1684
1685 D("SDKCtl %s: Sending endianness: %d...", sdkctl->service_name, _host_end);
1686
1687 /* Before we can send any structured data to the SDK controller we need to
1688 * report endianness of the host. */
1689 async_socket_write_rel(sdkctl->as, &_host_end, 1,
1690 _on_sdkctl_endianness_io, sdkctl, 3000);
1691}