libbinder: dynamically accept clients

Server listens on a single port and add clients.

The server looks like this:

    while True:
        accept client
        read client id
        if new id:
            create new rpc connection
        else:
            attach thread to existing rpc connection

Roadmap:
- having client add connections only when needed (currently they are
  all added at initialization time) - when this change is made, the
  server will also need to enforce the max threads per client.
- allowing RpcConnection to create reverse connections with an
  threadpool to serve calls in the other direction
- replacing connection IDs with something like TLS
- access controls for who can connect to who in pKVM context

Bug: 185167543
Test: binderRpcTest
Change-Id: I510d23a50cf839c39bc8107c1b0dae24dee3bc7b
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 5f024ca..4df12ce 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -126,40 +126,61 @@
     {
         std::lock_guard<std::mutex> _l(mLock);
         LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join.");
-        // TODO(b/185167543): support more than one client at once
-        mConnection = RpcConnection::make();
-        mConnection->setForServer(sp<RpcServer>::fromExisting(this), 42 /*placeholder id*/);
-
-        mStarted = true;
-            for (size_t i = 0; i < mMaxThreads; i++) {
-                pool.push_back(std::thread([=] {
-                    // TODO(b/185167543): do this dynamically, instead of from a static number
-                    // of threads
-                    unique_fd clientFd(TEMP_FAILURE_RETRY(
-                            accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
-                    if (clientFd < 0) {
-                        // If this log becomes confusing, should save more state from
-                        // setupUnixDomainServer in order to output here.
-                        ALOGE("Could not accept4 socket: %s", strerror(errno));
-                        return;
-                    }
-
-                    LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
-
-                    mConnection->join(std::move(clientFd));
-                }));
-            }
     }
 
-    // TODO(b/185167543): don't waste extra thread for join, and combine threads
-    // between clients
-    for (auto& t : pool) t.join();
+    while (true) {
+        unique_fd clientFd(
+                TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
+
+        if (clientFd < 0) {
+            ALOGE("Could not accept4 socket: %s", strerror(errno));
+            continue;
+        }
+        LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
+
+        // TODO(b/183988761): cannot trust this simple ID
+        LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+        int32_t id;
+        if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
+            ALOGE("Could not read ID from fd %d", clientFd.get());
+            continue;
+        }
+
+        {
+            std::lock_guard<std::mutex> _l(mLock);
+
+            sp<RpcConnection> connection;
+            if (id == RPC_CONNECTION_ID_NEW) {
+                // new client!
+                LOG_ALWAYS_FATAL_IF(mConnectionIdCounter >= INT32_MAX, "Out of connection IDs");
+                mConnectionIdCounter++;
+
+                connection = RpcConnection::make();
+                connection->setForServer(wp<RpcServer>::fromExisting(this), mConnectionIdCounter);
+
+                mConnections[mConnectionIdCounter] = connection;
+            } else {
+                auto it = mConnections.find(id);
+                if (it == mConnections.end()) {
+                    ALOGE("Cannot add thread, no record of connection with ID %d", id);
+                    continue;
+                }
+                connection = it->second;
+            }
+
+            connection->startThread(std::move(clientFd));
+        }
+    }
 }
 
 std::vector<sp<RpcConnection>> RpcServer::listConnections() {
     std::lock_guard<std::mutex> _l(mLock);
-    if (mConnection == nullptr) return {};
-    return {mConnection};
+    std::vector<sp<RpcConnection>> connections;
+    for (auto& [id, connection] : mConnections) {
+        (void)id;
+        connections.push_back(connection);
+    }
+    return connections;
 }
 
 bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {