blob: 3b65f1f16d9d152309490fc75cd3fadb4f55c694 [file] [log] [blame]
The Android Open Source Projectdd7bc332009-03-03 19:32:55 -08001/*
2 * Copyright (C) 2007 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "mq"
18
19#include <assert.h>
20#include <errno.h>
21#include <fcntl.h>
22#include <pthread.h>
23#include <stdlib.h>
24#include <string.h>
25#include <unistd.h>
26
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <sys/un.h>
30#include <sys/uio.h>
31
32#include <cutils/array.h>
33#include <cutils/hashmap.h>
34#include <cutils/selector.h>
35
36#include "loghack.h"
37#include "buffer.h"
38
39/** Number of dead peers to remember. */
40#define PEER_HISTORY (16)
41
42typedef struct sockaddr SocketAddress;
43typedef struct sockaddr_un UnixAddress;
44
45/**
46 * Process/user/group ID. We don't use ucred directly because it's only
47 * available on Linux.
48 */
49typedef struct {
50 pid_t pid;
51 uid_t uid;
52 gid_t gid;
53} Credentials;
54
55/** Listens for bytes coming from remote peers. */
56typedef void BytesListener(Credentials credentials, char* bytes, size_t size);
57
58/** Listens for the deaths of remote peers. */
59typedef void DeathListener(pid_t pid);
60
61/** Types of packets. */
62typedef enum {
63 /** Request for a connection to another peer. */
64 CONNECTION_REQUEST,
65
66 /** A connection to another peer. */
67 CONNECTION,
68
69 /** Reports a failed connection attempt. */
70 CONNECTION_ERROR,
71
72 /** A generic packet of bytes. */
73 BYTES,
74} PacketType;
75
76typedef enum {
77 /** Reading a packet header. */
78 READING_HEADER,
79
80 /** Waiting for a connection from the master. */
81 ACCEPTING_CONNECTION,
82
83 /** Reading bytes. */
84 READING_BYTES,
85} InputState;
86
87/** A packet header. */
88// TODO: Use custom headers for master->peer, peer->master, peer->peer.
89typedef struct {
90 PacketType type;
91 union {
92 /** Packet size. Used for BYTES. */
93 size_t size;
94
95 /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */
96 Credentials credentials;
97 };
98} Header;
99
100/** A packet which will be sent to a peer. */
101typedef struct OutgoingPacket OutgoingPacket;
102struct OutgoingPacket {
103 /** Packet header. */
104 Header header;
105
106 union {
107 /** Connection to peer. Used with CONNECTION. */
108 int socket;
109
110 /** Buffer of bytes. Used with BYTES. */
111 Buffer* bytes;
112 };
113
114 /** Frees all resources associated with this packet. */
115 void (*free)(OutgoingPacket* packet);
116
117 /** Optional context. */
118 void* context;
119
120 /** Next packet in the queue. */
121 OutgoingPacket* nextPacket;
122};
123
124/** Represents a remote peer. */
125typedef struct PeerProxy PeerProxy;
126
127/** Local peer state. You typically have one peer per process. */
128typedef struct {
129 /** This peer's PID. */
130 pid_t pid;
131
132 /**
133 * Map from pid to peer proxy. The peer has a peer proxy for each remote
134 * peer it's connected to.
135 *
136 * Acquire mutex before use.
137 */
138 Hashmap* peerProxies;
139
140 /** Manages I/O. */
141 Selector* selector;
142
143 /** Used to synchronize operations with the selector thread. */
144 pthread_mutex_t mutex;
145
146 /** Is this peer the master? */
147 bool master;
148
149 /** Peer proxy for the master. */
150 PeerProxy* masterProxy;
151
152 /** Listens for packets from remote peers. */
153 BytesListener* onBytes;
154
155 /** Listens for deaths of remote peers. */
156 DeathListener* onDeath;
157
158 /** Keeps track of recently dead peers. Requires mutex. */
159 pid_t deadPeers[PEER_HISTORY];
160 size_t deadPeerCursor;
161} Peer;
162
163struct PeerProxy {
164 /** Credentials of the remote process. */
165 Credentials credentials;
166
167 /** Keeps track of data coming in from the remote peer. */
168 InputState inputState;
169 Buffer* inputBuffer;
170 PeerProxy* connecting;
171
172 /** File descriptor for this peer. */
173 SelectableFd* fd;
174
175 /**
176 * Queue of packets to be written out to the remote peer.
177 *
178 * Requires mutex.
179 */
180 // TODO: Limit queue length.
181 OutgoingPacket* currentPacket;
182 OutgoingPacket* lastPacket;
183
184 /** Used to write outgoing header. */
185 Buffer outgoingHeader;
186
187 /** True if this is the master's proxy. */
188 bool master;
189
190 /** Reference back to the local peer. */
191 Peer* peer;
192
193 /**
194 * Used in master only. Maps this peer proxy to other peer proxies to
195 * which the peer has been connected to. Maps pid to PeerProxy. Helps
196 * keep track of which connections we've sent to whom.
197 */
198 Hashmap* connections;
199};
200
201/** Server socket path. */
202static const char* MASTER_PATH = "/master.peer";
203
204/** Credentials of the master peer. */
205static const Credentials MASTER_CREDENTIALS = {0, 0, 0};
206
207/** Creates a peer proxy and adds it to the peer proxy map. */
208static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);
209
210/** Sets the non-blocking flag on a descriptor. */
211static void setNonBlocking(int fd) {
212 int flags;
213 if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
214 LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
215 }
216 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
217 LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
218 }
219}
220
221/** Closes a fd and logs a warning if the close fails. */
222static void closeWithWarning(int fd) {
223 int result = close(fd);
224 if (result == -1) {
225 LOGW("close() error: %s", strerror(errno));
226 }
227}
228
229/** Hashes pid_t keys. */
230static int pidHash(void* key) {
231 pid_t* pid = (pid_t*) key;
232 return (int) (*pid);
233}
234
235/** Compares pid_t keys. */
236static bool pidEquals(void* keyA, void* keyB) {
237 pid_t* a = (pid_t*) keyA;
238 pid_t* b = (pid_t*) keyB;
239 return *a == *b;
240}
241
242/** Gets the master address. Not thread safe. */
243static UnixAddress* getMasterAddress() {
244 static UnixAddress masterAddress;
245 static bool initialized = false;
246 if (initialized == false) {
247 masterAddress.sun_family = AF_LOCAL;
248 strcpy(masterAddress.sun_path, MASTER_PATH);
249 initialized = true;
250 }
251 return &masterAddress;
252}
253
254/** Gets exclusive access to the peer for this thread. */
255static void peerLock(Peer* peer) {
256 pthread_mutex_lock(&peer->mutex);
257}
258
259/** Releases exclusive access to the peer. */
260static void peerUnlock(Peer* peer) {
261 pthread_mutex_unlock(&peer->mutex);
262}
263
264/** Frees a simple, i.e. header-only, outgoing packet. */
265static void outgoingPacketFree(OutgoingPacket* packet) {
266 LOGD("Freeing outgoing packet.");
267 free(packet);
268}
269
270/**
271 * Prepare to read a new packet from the peer.
272 */
273static void peerProxyExpectHeader(PeerProxy* peerProxy) {
274 peerProxy->inputState = READING_HEADER;
275 bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));
276}
277
278/** Sets up the buffer for the outgoing header. */
279static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {
280 peerProxy->outgoingHeader.data
281 = (char*) &(peerProxy->currentPacket->header);
282 peerProxy->outgoingHeader.size = sizeof(Header);
283 bufferPrepareForWrite(&peerProxy->outgoingHeader);
284}
285
286/** Adds a packet to the end of the queue. Callers must have the mutex. */
287static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,
288 OutgoingPacket* newPacket) {
289 newPacket->nextPacket = NULL; // Just in case.
290 if (peerProxy->currentPacket == NULL) {
291 // The queue is empty.
292 peerProxy->currentPacket = newPacket;
293 peerProxy->lastPacket = newPacket;
294
295 peerProxyPrepareOutgoingHeader(peerProxy);
296 } else {
297 peerProxy->lastPacket->nextPacket = newPacket;
298 }
299}
300
301/** Takes the peer lock and enqueues the given packet. */
302static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,
303 OutgoingPacket* newPacket) {
304 Peer* peer = peerProxy->peer;
305 peerLock(peer);
306 peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);
307 peerUnlock(peer);
308}
309
310/**
311 * Frees current packet and moves to the next one. Returns true if there is
312 * a next packet or false if the queue is empty.
313 */
314static bool peerProxyNextPacket(PeerProxy* peerProxy) {
315 Peer* peer = peerProxy->peer;
316 peerLock(peer);
317
318 OutgoingPacket* current = peerProxy->currentPacket;
319
320 if (current == NULL) {
321 // The queue is already empty.
322 peerUnlock(peer);
323 return false;
324 }
325
326 OutgoingPacket* next = current->nextPacket;
327 peerProxy->currentPacket = next;
328 current->nextPacket = NULL;
329 current->free(current);
330 if (next == NULL) {
331 // The queue is empty.
332 peerProxy->lastPacket = NULL;
333 peerUnlock(peer);
334 return false;
335 } else {
336 peerUnlock(peer);
337 peerProxyPrepareOutgoingHeader(peerProxy);
338
339 // TODO: Start writing next packet? It would reduce the number of
340 // system calls, but we could also starve other peers.
341 return true;
342 }
343}
344
345/**
346 * Checks whether a peer died recently.
347 */
348static bool peerIsDead(Peer* peer, pid_t pid) {
349 size_t i;
350 for (i = 0; i < PEER_HISTORY; i++) {
351 pid_t deadPeer = peer->deadPeers[i];
352 if (deadPeer == 0) {
353 return false;
354 }
355 if (deadPeer == pid) {
356 return true;
357 }
358 }
359 return false;
360}
361
362/**
363 * Cleans up connection information.
364 */
365static bool peerProxyRemoveConnection(void* key, void* value, void* context) {
366 PeerProxy* deadPeer = (PeerProxy*) context;
367 PeerProxy* otherPeer = (PeerProxy*) value;
368 hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));
369 return true;
370}
371
372/**
373 * Called when the peer dies.
374 */
375static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {
376 if (errnoIsSet) {
377 LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid,
378 strerror(errno));
379 } else {
380 LOGI("Peer %d died.", peerProxy->credentials.pid);
381 }
382
383 // If we lost the master, we're up a creek. We can't let this happen.
384 if (peerProxy->master) {
385 LOG_ALWAYS_FATAL("Lost connection to master.");
386 }
387
388 Peer* localPeer = peerProxy->peer;
389 pid_t pid = peerProxy->credentials.pid;
390
391 peerLock(localPeer);
392
393 // Remember for awhile that the peer died.
394 localPeer->deadPeers[localPeer->deadPeerCursor]
395 = peerProxy->credentials.pid;
396 localPeer->deadPeerCursor++;
397 if (localPeer->deadPeerCursor == PEER_HISTORY) {
398 localPeer->deadPeerCursor = 0;
399 }
400
401 // Remove from peer map.
402 hashmapRemove(localPeer->peerProxies, &pid);
403
404 // External threads can no longer get to this peer proxy, so we don't
405 // need the lock anymore.
406 peerUnlock(localPeer);
407
408 // Remove the fd from the selector.
409 if (peerProxy->fd != NULL) {
410 peerProxy->fd->remove = true;
411 }
412
413 // Clear outgoing packet queue.
414 while (peerProxyNextPacket(peerProxy)) {}
415
416 bufferFree(peerProxy->inputBuffer);
417
418 // This only applies to the master.
419 if (peerProxy->connections != NULL) {
420 // We can't leave these other maps pointing to freed memory.
421 hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection,
422 peerProxy);
423 hashmapFree(peerProxy->connections);
424 }
425
426 // Invoke death listener.
427 localPeer->onDeath(pid);
428
429 // Free the peer proxy itself.
430 free(peerProxy);
431}
432
433static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {
434 if (errno == EINTR) {
435 // Log interruptions but otherwise ignore them.
436 LOGW("%s() interrupted.", functionName);
437 } else if (errno == EAGAIN) {
438 LOGD("EWOULDBLOCK");
439 // Ignore.
440 } else {
441 LOGW("Error returned by %s().", functionName);
442 peerProxyKill(peerProxy, true);
443 }
444}
445
446/**
447 * Buffers output sent to a peer. May be called multiple times until the entire
448 * buffer is filled. Returns true when the buffer is empty.
449 */
450static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {
451 ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);
452 if (size < 0) {
453 peerProxyHandleError(peerProxy, "write");
454 return false;
455 } else {
456 return bufferWriteComplete(outgoing);
457 }
458}
459
460/** Writes packet bytes to peer. */
461static void peerProxyWriteBytes(PeerProxy* peerProxy) {
462 Buffer* buffer = peerProxy->currentPacket->bytes;
463 if (peerProxyWriteFromBuffer(peerProxy, buffer)) {
464 LOGD("Bytes written.");
465 peerProxyNextPacket(peerProxy);
466 }
467}
468
469/** Sends a socket to the peer. */
470static void peerProxyWriteConnection(PeerProxy* peerProxy) {
471 int socket = peerProxy->currentPacket->socket;
472
473 // Why does sending and receiving fds have to be such a PITA?
474 struct msghdr msg;
475 struct iovec iov[1];
476
477 union {
478 struct cmsghdr cm;
479 char control[CMSG_SPACE(sizeof(int))];
480 } control_un;
481
482 struct cmsghdr *cmptr;
483
484 msg.msg_control = control_un.control;
485 msg.msg_controllen = sizeof(control_un.control);
486 cmptr = CMSG_FIRSTHDR(&msg);
487 cmptr->cmsg_len = CMSG_LEN(sizeof(int));
488 cmptr->cmsg_level = SOL_SOCKET;
489 cmptr->cmsg_type = SCM_RIGHTS;
490
491 // Store the socket in the message.
492 *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;
493
494 msg.msg_name = NULL;
495 msg.msg_namelen = 0;
496 iov[0].iov_base = "";
497 iov[0].iov_len = 1;
498 msg.msg_iov = iov;
499 msg.msg_iovlen = 1;
500
501 ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);
502
503 if (result < 0) {
504 peerProxyHandleError(peerProxy, "sendmsg");
505 } else {
506 // Success. Queue up the next packet.
507 peerProxyNextPacket(peerProxy);
508
509 }
510}
511
512/**
513 * Writes some outgoing data.
514 */
515static void peerProxyWrite(SelectableFd* fd) {
516 // TODO: Try to write header and body with one system call.
517
518 PeerProxy* peerProxy = (PeerProxy*) fd->data;
519 OutgoingPacket* current = peerProxy->currentPacket;
520
521 if (current == NULL) {
522 // We have nothing left to write.
523 return;
524 }
525
526 // Write the header.
527 Buffer* outgoingHeader = &peerProxy->outgoingHeader;
528 bool headerWritten = bufferWriteComplete(outgoingHeader);
529 if (!headerWritten) {
530 LOGD("Writing header...");
531 headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);
532 if (headerWritten) {
533 LOGD("Header written.");
534 }
535 }
536
537 // Write body.
538 if (headerWritten) {
539 PacketType type = current->header.type;
540 switch (type) {
541 case CONNECTION:
542 peerProxyWriteConnection(peerProxy);
543 break;
544 case BYTES:
545 peerProxyWriteBytes(peerProxy);
546 break;
547 case CONNECTION_REQUEST:
548 case CONNECTION_ERROR:
549 // These packets consist solely of a header.
550 peerProxyNextPacket(peerProxy);
551 break;
552 default:
553 LOG_ALWAYS_FATAL("Unknown packet type: %d", type);
554 }
555 }
556}
557
558/**
559 * Sets up a peer proxy's fd before we try to select() it.
560 */
561static void peerProxyBeforeSelect(SelectableFd* fd) {
562 LOGD("Before select...");
563
564 PeerProxy* peerProxy = (PeerProxy*) fd->data;
565
566 peerLock(peerProxy->peer);
567 bool hasPackets = peerProxy->currentPacket != NULL;
568 peerUnlock(peerProxy->peer);
569
570 if (hasPackets) {
571 LOGD("Packets found. Setting onWritable().");
572
573 fd->onWritable = &peerProxyWrite;
574 } else {
575 // We have nothing to write.
576 fd->onWritable = NULL;
577 }
578}
579
580/** Prepare to read bytes from the peer. */
581static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {
582 LOGD("Expecting %d bytes.", header->size);
583
584 peerProxy->inputState = READING_BYTES;
585 if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {
586 LOGW("Couldn't allocate memory for incoming data. Size: %u",
587 (unsigned int) header->size);
588
589 // TODO: Ignore the packet and log a warning?
590 peerProxyKill(peerProxy, false);
591 }
592}
593
594/**
595 * Gets a peer proxy for the given ID. Creates a peer proxy if necessary.
596 * Sends a connection request to the master if desired.
597 *
598 * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died
599 * or ENOMEM if memory couldn't be allocated.
600 */
601static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid,
602 bool requestConnection) {
603 if (pid == peer->pid) {
604 errno = EINVAL;
605 return NULL;
606 }
607
608 if (peerIsDead(peer, pid)) {
609 errno = EHOSTDOWN;
610 return NULL;
611 }
612
613 PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);
614 if (peerProxy != NULL) {
615 return peerProxy;
616 }
617
618 // If this is the master peer, we already know about all peers.
619 if (peer->master) {
620 errno = EHOSTDOWN;
621 return NULL;
622 }
623
624 // Try to create a peer proxy.
625 Credentials credentials;
626 credentials.pid = pid;
627
628 // Fake gid and uid until we have the real thing. The real creds are
629 // filled in by masterProxyExpectConnection(). These fake creds will
630 // never be exposed to the user.
631 credentials.uid = 0;
632 credentials.gid = 0;
633
634 // Make sure we can allocate the connection request packet.
635 OutgoingPacket* packet = NULL;
636 if (requestConnection) {
637 packet = calloc(1, sizeof(OutgoingPacket));
638 if (packet == NULL) {
639 errno = ENOMEM;
640 return NULL;
641 }
642
643 packet->header.type = CONNECTION_REQUEST;
644 packet->header.credentials = credentials;
645 packet->free = &outgoingPacketFree;
646 }
647
648 peerProxy = peerProxyCreate(peer, credentials);
649 if (peerProxy == NULL) {
650 free(packet);
651 errno = ENOMEM;
652 return NULL;
653 } else {
654 // Send a connection request to the master.
655 if (requestConnection) {
656 PeerProxy* masterProxy = peer->masterProxy;
657 peerProxyEnqueueOutgoingPacket(masterProxy, packet);
658 }
659
660 return peerProxy;
661 }
662}
663
664/**
665 * Switches the master peer proxy into a state where it's waiting for a
666 * connection from the master.
667 */
668static void masterProxyExpectConnection(PeerProxy* masterProxy,
669 Header* header) {
670 // TODO: Restructure things so we don't need this check.
671 // Verify that this really is the master.
672 if (!masterProxy->master) {
673 LOGW("Non-master process %d tried to send us a connection.",
674 masterProxy->credentials.pid);
675 // Kill off the evil peer.
676 peerProxyKill(masterProxy, false);
677 return;
678 }
679
680 masterProxy->inputState = ACCEPTING_CONNECTION;
681 Peer* localPeer = masterProxy->peer;
682
683 // Create a peer proxy so we have somewhere to stash the creds.
684 // See if we already have a proxy set up.
685 pid_t pid = header->credentials.pid;
686 peerLock(localPeer);
687 PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);
688 if (peerProxy == NULL) {
689 LOGW("Peer proxy creation failed: %s", strerror(errno));
690 } else {
691 // Fill in full credentials.
692 peerProxy->credentials = header->credentials;
693 }
694 peerUnlock(localPeer);
695
696 // Keep track of which peer proxy we're accepting a connection for.
697 masterProxy->connecting = peerProxy;
698}
699
700/**
701 * Reads input from a peer process.
702 */
703static void peerProxyRead(SelectableFd* fd);
704
705/** Sets up fd callbacks. */
706static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {
707 peerProxy->fd = fd;
708 fd->data = peerProxy;
709 fd->onReadable = &peerProxyRead;
710 fd->beforeSelect = &peerProxyBeforeSelect;
711
712 // Make the socket non-blocking.
713 setNonBlocking(fd->fd);
714}
715
716/**
717 * Accepts a connection sent by the master proxy.
718 */
719static void masterProxyAcceptConnection(PeerProxy* masterProxy) {
720 struct msghdr msg;
721 struct iovec iov[1];
722 ssize_t size;
723 char ignored;
724 int incomingFd;
725
726 // TODO: Reuse code which writes the connection. Who the heck designed
727 // this API anyway?
728 union {
729 struct cmsghdr cm;
730 char control[CMSG_SPACE(sizeof(int))];
731 } control_un;
732 struct cmsghdr *cmptr;
733 msg.msg_control = control_un.control;
734 msg.msg_controllen = sizeof(control_un.control);
735
736 msg.msg_name = NULL;
737 msg.msg_namelen = 0;
738
739 // We sent 1 byte of data so we can detect EOF.
740 iov[0].iov_base = &ignored;
741 iov[0].iov_len = 1;
742 msg.msg_iov = iov;
743 msg.msg_iovlen = 1;
744
745 size = recvmsg(masterProxy->fd->fd, &msg, 0);
746 if (size < 0) {
747 if (errno == EINTR) {
748 // Log interruptions but otherwise ignore them.
749 LOGW("recvmsg() interrupted.");
750 return;
751 } else if (errno == EAGAIN) {
752 // Keep waiting for the connection.
753 return;
754 } else {
755 LOG_ALWAYS_FATAL("Error reading connection from master: %s",
756 strerror(errno));
757 }
758 } else if (size == 0) {
759 // EOF.
760 LOG_ALWAYS_FATAL("Received EOF from master.");
761 }
762
763 // Extract fd from message.
764 if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
765 && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
766 if (cmptr->cmsg_level != SOL_SOCKET) {
767 LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");
768 }
769 if (cmptr->cmsg_type != SCM_RIGHTS) {
770 LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");
771 }
772 incomingFd = *((int*) CMSG_DATA(cmptr));
773 } else {
774 LOG_ALWAYS_FATAL("Expected fd.");
775 }
776
777 // The peer proxy this connection is for.
778 PeerProxy* peerProxy = masterProxy->connecting;
779 if (peerProxy == NULL) {
780 LOGW("Received connection for unknown peer.");
781 closeWithWarning(incomingFd);
782 } else {
783 Peer* peer = masterProxy->peer;
784
785 SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);
786 if (selectableFd == NULL) {
787 LOGW("Error adding fd to selector for %d.",
788 peerProxy->credentials.pid);
789 closeWithWarning(incomingFd);
790 peerProxyKill(peerProxy, false);
791 }
792
793 peerProxySetFd(peerProxy, selectableFd);
794 }
795
796 peerProxyExpectHeader(masterProxy);
797}
798
799/**
800 * Frees an outgoing packet containing a connection.
801 */
802static void outgoingPacketFreeSocket(OutgoingPacket* packet) {
803 closeWithWarning(packet->socket);
804 outgoingPacketFree(packet);
805}
806
807/**
808 * Connects two known peers.
809 */
810static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {
811 int sockets[2];
812 int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);
813 if (result == -1) {
814 LOGW("socketpair() error: %s", strerror(errno));
815 // TODO: Send CONNECTION_FAILED packets to peers.
816 return;
817 }
818
819 OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));
820 OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));
821 if (packetA == NULL || packetB == NULL) {
822 free(packetA);
823 free(packetB);
824 LOGW("malloc() error. Failed to tell process %d that process %d is"
825 " dead.", peerA->credentials.pid, peerB->credentials.pid);
826 return;
827 }
828
829 packetA->header.type = CONNECTION;
830 packetB->header.type = CONNECTION;
831
832 packetA->header.credentials = peerB->credentials;
833 packetB->header.credentials = peerA->credentials;
834
835 packetA->socket = sockets[0];
836 packetB->socket = sockets[1];
837
838 packetA->free = &outgoingPacketFreeSocket;
839 packetB->free = &outgoingPacketFreeSocket;
840
841 peerLock(peerA->peer);
842 peerProxyEnqueueOutgoingPacket(peerA, packetA);
843 peerProxyEnqueueOutgoingPacket(peerB, packetB);
844 peerUnlock(peerA->peer);
845}
846
847/**
848 * Informs a peer that the peer they're trying to connect to couldn't be
849 * found.
850 */
851static void masterReportConnectionError(PeerProxy* peerProxy,
852 Credentials credentials) {
853 OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
854 if (packet == NULL) {
855 LOGW("malloc() error. Failed to tell process %d that process %d is"
856 " dead.", peerProxy->credentials.pid, credentials.pid);
857 return;
858 }
859
860 packet->header.type = CONNECTION_ERROR;
861 packet->header.credentials = credentials;
862 packet->free = &outgoingPacketFree;
863
864 peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);
865}
866
867/**
868 * Handles a request to be connected to another peer.
869 */
870static void masterHandleConnectionRequest(PeerProxy* peerProxy,
871 Header* header) {
872 Peer* master = peerProxy->peer;
873 pid_t targetPid = header->credentials.pid;
874 if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {
875 // We haven't connected these peers yet.
876 PeerProxy* targetPeer
877 = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);
878 if (targetPeer == NULL) {
879 // Unknown process.
880 masterReportConnectionError(peerProxy, header->credentials);
881 } else {
882 masterConnectPeers(peerProxy, targetPeer);
883 }
884 }
885
886 // This packet is complete. Get ready for the next one.
887 peerProxyExpectHeader(peerProxy);
888}
889
890/**
891 * The master told us this peer is dead.
892 */
893static void masterProxyHandleConnectionError(PeerProxy* masterProxy,
894 Header* header) {
895 Peer* peer = masterProxy->peer;
896
897 // Look up the peer proxy.
898 pid_t pid = header->credentials.pid;
899 PeerProxy* peerProxy = NULL;
900 peerLock(peer);
901 peerProxy = hashmapGet(peer->peerProxies, &pid);
902 peerUnlock(peer);
903
904 if (peerProxy != NULL) {
905 LOGI("Couldn't connect to %d.", pid);
906 peerProxyKill(peerProxy, false);
907 } else {
908 LOGW("Peer proxy for %d not found. This shouldn't happen.", pid);
909 }
910
911 peerProxyExpectHeader(masterProxy);
912}
913
914/**
915 * Handles a packet header.
916 */
917static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {
918 switch (header->type) {
919 case CONNECTION_REQUEST:
920 masterHandleConnectionRequest(peerProxy, header);
921 break;
922 case CONNECTION:
923 masterProxyExpectConnection(peerProxy, header);
924 break;
925 case CONNECTION_ERROR:
926 masterProxyHandleConnectionError(peerProxy, header);
927 break;
928 case BYTES:
929 peerProxyExpectBytes(peerProxy, header);
930 break;
931 default:
932 LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid,
933 header->type);
934 peerProxyKill(peerProxy, false);
935 }
936}
937
938/**
939 * Buffers input sent by peer. May be called multiple times until the entire
940 * buffer is filled. Returns true when the buffer is full.
941 */
942static bool peerProxyBufferInput(PeerProxy* peerProxy) {
943 Buffer* in = peerProxy->inputBuffer;
944 ssize_t size = bufferRead(in, peerProxy->fd->fd);
945 if (size < 0) {
946 peerProxyHandleError(peerProxy, "read");
947 return false;
948 } else if (size == 0) {
949 // EOF.
950 LOGI("EOF");
951 peerProxyKill(peerProxy, false);
952 return false;
953 } else if (bufferReadComplete(in)) {
954 // We're done!
955 return true;
956 } else {
957 // Continue reading.
958 return false;
959 }
960}
961
962/**
963 * Reads input from a peer process.
964 */
965static void peerProxyRead(SelectableFd* fd) {
966 LOGD("Reading...");
967 PeerProxy* peerProxy = (PeerProxy*) fd->data;
968 int state = peerProxy->inputState;
969 Buffer* in = peerProxy->inputBuffer;
970 switch (state) {
971 case READING_HEADER:
972 if (peerProxyBufferInput(peerProxy)) {
973 LOGD("Header read.");
974 // We've read the complete header.
975 Header* header = (Header*) in->data;
976 peerProxyHandleHeader(peerProxy, header);
977 }
978 break;
979 case READING_BYTES:
980 LOGD("Reading bytes...");
981 if (peerProxyBufferInput(peerProxy)) {
982 LOGD("Bytes read.");
983 // We have the complete packet. Notify bytes listener.
984 peerProxy->peer->onBytes(peerProxy->credentials,
985 in->data, in->size);
986
987 // Get ready for the next packet.
988 peerProxyExpectHeader(peerProxy);
989 }
990 break;
991 case ACCEPTING_CONNECTION:
992 masterProxyAcceptConnection(peerProxy);
993 break;
994 default:
995 LOG_ALWAYS_FATAL("Unknown state: %d", state);
996 }
997}
998
999static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {
1000 PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));
1001 if (peerProxy == NULL) {
1002 return NULL;
1003 }
1004
1005 peerProxy->inputBuffer = bufferCreate(sizeof(Header));
1006 if (peerProxy->inputBuffer == NULL) {
1007 free(peerProxy);
1008 return NULL;
1009 }
1010
1011 peerProxy->peer = peer;
1012 peerProxy->credentials = credentials;
1013
1014 // Initial state == expecting a header.
1015 peerProxyExpectHeader(peerProxy);
1016
1017 // Add this proxy to the map. Make sure the key points to the stable memory
1018 // inside of the peer proxy itself.
1019 pid_t* pid = &(peerProxy->credentials.pid);
1020 hashmapPut(peer->peerProxies, pid, peerProxy);
1021 return peerProxy;
1022}
1023
1024/** Accepts a connection to the master peer. */
1025static void masterAcceptConnection(SelectableFd* listenerFd) {
1026 // Accept connection.
1027 int socket = accept(listenerFd->fd, NULL, NULL);
1028 if (socket == -1) {
1029 LOGW("accept() error: %s", strerror(errno));
1030 return;
1031 }
1032
1033 LOGD("Accepted connection as fd %d.", socket);
1034
1035 // Get credentials.
1036 Credentials credentials;
1037 struct ucred ucredentials;
1038 socklen_t credentialsSize = sizeof(struct ucred);
1039 int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED,
1040 &ucredentials, &credentialsSize);
1041 // We might want to verify credentialsSize.
1042 if (result == -1) {
1043 LOGW("getsockopt() error: %s", strerror(errno));
1044 closeWithWarning(socket);
1045 return;
1046 }
1047
1048 // Copy values into our own structure so we know we have the types right.
1049 credentials.pid = ucredentials.pid;
1050 credentials.uid = ucredentials.uid;
1051 credentials.gid = ucredentials.gid;
1052
1053 LOGI("Accepted connection from process %d.", credentials.pid);
1054
1055 Peer* masterPeer = (Peer*) listenerFd->data;
1056
1057 peerLock(masterPeer);
1058
1059 // Make sure we don't already have a connection from that process.
1060 PeerProxy* peerProxy
1061 = hashmapGet(masterPeer->peerProxies, &credentials.pid);
1062 if (peerProxy != NULL) {
1063 peerUnlock(masterPeer);
1064 LOGW("Alread connected to process %d.", credentials.pid);
1065 closeWithWarning(socket);
1066 return;
1067 }
1068
1069 // Add connection to the selector.
1070 SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);
1071 if (socketFd == NULL) {
1072 peerUnlock(masterPeer);
1073 LOGW("malloc() failed.");
1074 closeWithWarning(socket);
1075 return;
1076 }
1077
1078 // Create a peer proxy.
1079 peerProxy = peerProxyCreate(masterPeer, credentials);
1080 peerUnlock(masterPeer);
1081 if (peerProxy == NULL) {
1082 LOGW("malloc() failed.");
1083 socketFd->remove = true;
1084 closeWithWarning(socket);
1085 }
1086 peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);
1087 peerProxySetFd(peerProxy, socketFd);
1088}
1089
1090/**
1091 * Creates the local peer.
1092 */
1093static Peer* peerCreate() {
1094 Peer* peer = calloc(1, sizeof(Peer));
1095 if (peer == NULL) {
1096 LOG_ALWAYS_FATAL("malloc() error.");
1097 }
1098 peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);
1099 peer->selector = selectorCreate();
1100
1101 pthread_mutexattr_t attributes;
1102 if (pthread_mutexattr_init(&attributes) != 0) {
1103 LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");
1104 }
1105 if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {
1106 LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");
1107 }
1108 if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {
1109 LOG_ALWAYS_FATAL("pthread_mutex_init() error.");
1110 }
1111
1112 peer->pid = getpid();
1113 return peer;
1114}
1115
1116/** The local peer. */
1117static Peer* localPeer;
1118
1119/** Frees a packet of bytes. */
1120static void outgoingPacketFreeBytes(OutgoingPacket* packet) {
1121 LOGD("Freeing outgoing packet.");
1122 bufferFree(packet->bytes);
1123 free(packet);
1124}
1125
1126/**
1127 * Sends a packet of bytes to a remote peer. Returns 0 on success.
1128 *
1129 * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
1130 * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
1131 * to EINVAL if pid is the same as the local pid.
1132 */
1133int peerSendBytes(pid_t pid, const char* bytes, size_t size) {
1134 Peer* peer = localPeer;
1135 assert(peer != NULL);
1136
1137 OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
1138 if (packet == NULL) {
1139 errno = ENOMEM;
1140 return -1;
1141 }
1142
1143 Buffer* copy = bufferCreate(size);
1144 if (copy == NULL) {
1145 free(packet);
1146 errno = ENOMEM;
1147 return -1;
1148 }
1149
1150 // Copy data.
1151 memcpy(copy->data, bytes, size);
1152 copy->size = size;
1153
1154 packet->bytes = copy;
1155 packet->header.type = BYTES;
1156 packet->header.size = size;
1157 packet->free = outgoingPacketFreeBytes;
1158 bufferPrepareForWrite(packet->bytes);
1159
1160 peerLock(peer);
1161
1162 PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
1163 if (peerProxy == NULL) {
1164 // The peer is already dead or we couldn't alloc memory. Either way,
1165 // errno is set.
1166 peerUnlock(peer);
1167 packet->free(packet);
1168 return -1;
1169 } else {
1170 peerProxyEnqueueOutgoingPacket(peerProxy, packet);
1171 peerUnlock(peer);
1172 selectorWakeUp(peer->selector);
1173 return 0;
1174 }
1175}
1176
1177/** Keeps track of how to free shared bytes. */
1178typedef struct {
1179 void (*free)(void* context);
1180 void* context;
1181} SharedBytesFreer;
1182
1183/** Frees shared bytes. */
1184static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {
1185 SharedBytesFreer* sharedBytesFreer
1186 = (SharedBytesFreer*) packet->context;
1187 sharedBytesFreer->free(sharedBytesFreer->context);
1188 free(sharedBytesFreer);
1189 free(packet);
1190}
1191
1192/**
1193 * Sends a packet of bytes to a remote peer without copying the bytes. Calls
1194 * free() with context after the bytes have been sent.
1195 *
1196 * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
1197 * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
1198 * to EINVAL if pid is the same as the local pid.
1199 */
1200int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,
1201 void (*free)(void* context), void* context) {
1202 Peer* peer = localPeer;
1203 assert(peer != NULL);
1204
1205 OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
1206 if (packet == NULL) {
1207 errno = ENOMEM;
1208 return -1;
1209 }
1210
1211 Buffer* wrapper = bufferWrap(bytes, size, size);
1212 if (wrapper == NULL) {
1213 free(packet);
1214 errno = ENOMEM;
1215 return -1;
1216 }
1217
1218 SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));
1219 if (sharedBytesFreer == NULL) {
1220 free(packet);
1221 free(wrapper);
1222 errno = ENOMEM;
1223 return -1;
1224 }
1225 sharedBytesFreer->free = free;
1226 sharedBytesFreer->context = context;
1227
1228 packet->bytes = wrapper;
1229 packet->context = sharedBytesFreer;
1230 packet->header.type = BYTES;
1231 packet->header.size = size;
1232 packet->free = &outgoingPacketFreeSharedBytes;
1233 bufferPrepareForWrite(packet->bytes);
1234
1235 peerLock(peer);
1236
1237 PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
1238 if (peerProxy == NULL) {
1239 // The peer is already dead or we couldn't alloc memory. Either way,
1240 // errno is set.
1241 peerUnlock(peer);
1242 packet->free(packet);
1243 return -1;
1244 } else {
1245 peerProxyEnqueueOutgoingPacket(peerProxy, packet);
1246 peerUnlock(peer);
1247 selectorWakeUp(peer->selector);
1248 return 0;
1249 }
1250}
1251
1252/**
1253 * Starts the master peer. The master peer differs from other peers in that
1254 * it is responsible for connecting the other peers. You can only have one
1255 * master peer.
1256 *
1257 * Goes into an I/O loop and does not return.
1258 */
1259void masterPeerInitialize(BytesListener* bytesListener,
1260 DeathListener* deathListener) {
1261 // Create and bind socket.
1262 int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
1263 if (listenerSocket == -1) {
1264 LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
1265 }
1266 unlink(MASTER_PATH);
1267 int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(),
1268 sizeof(UnixAddress));
1269 if (result == -1) {
1270 LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));
1271 }
1272
1273 LOGD("Listener socket: %d", listenerSocket);
1274
1275 // Queue up to 16 connections.
1276 result = listen(listenerSocket, 16);
1277 if (result != 0) {
1278 LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));
1279 }
1280
1281 // Make socket non-blocking.
1282 setNonBlocking(listenerSocket);
1283
1284 // Create the peer for this process. Fail if we already have one.
1285 if (localPeer != NULL) {
1286 LOG_ALWAYS_FATAL("Peer is already initialized.");
1287 }
1288 localPeer = peerCreate();
1289 if (localPeer == NULL) {
1290 LOG_ALWAYS_FATAL("malloc() failed.");
1291 }
1292 localPeer->master = true;
1293 localPeer->onBytes = bytesListener;
1294 localPeer->onDeath = deathListener;
1295
1296 // Make listener socket selectable.
1297 SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);
1298 if (listenerFd == NULL) {
1299 LOG_ALWAYS_FATAL("malloc() error.");
1300 }
1301 listenerFd->data = localPeer;
1302 listenerFd->onReadable = &masterAcceptConnection;
1303}
1304
1305/**
1306 * Starts a local peer.
1307 *
1308 * Goes into an I/O loop and does not return.
1309 */
1310void peerInitialize(BytesListener* bytesListener,
1311 DeathListener* deathListener) {
1312 // Connect to master peer.
1313 int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
1314 if (masterSocket == -1) {
1315 LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
1316 }
1317 int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),
1318 sizeof(UnixAddress));
1319 if (result != 0) {
1320 LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));
1321 }
1322
1323 // Create the peer for this process. Fail if we already have one.
1324 if (localPeer != NULL) {
1325 LOG_ALWAYS_FATAL("Peer is already initialized.");
1326 }
1327 localPeer = peerCreate();
1328 if (localPeer == NULL) {
1329 LOG_ALWAYS_FATAL("malloc() failed.");
1330 }
1331 localPeer->onBytes = bytesListener;
1332 localPeer->onDeath = deathListener;
1333
1334 // Make connection selectable.
1335 SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);
1336 if (masterFd == NULL) {
1337 LOG_ALWAYS_FATAL("malloc() error.");
1338 }
1339
1340 // Create a peer proxy for the master peer.
1341 PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);
1342 if (masterProxy == NULL) {
1343 LOG_ALWAYS_FATAL("malloc() error.");
1344 }
1345 peerProxySetFd(masterProxy, masterFd);
1346 masterProxy->master = true;
1347 localPeer->masterProxy = masterProxy;
1348}
1349
1350/** Starts the master peer I/O loop. Doesn't return. */
1351void peerLoop() {
1352 assert(localPeer != NULL);
1353
1354 // Start selector.
1355 selectorLoop(localPeer->selector);
1356}
1357