libbinder: dynamically accept clients
Server listens on a single port and add clients.
The server looks like this:
while True:
accept client
read client id
if new id:
create new rpc connection
else:
attach thread to existing rpc connection
Roadmap:
- having client add connections only when needed (currently they are
all added at initialization time) - when this change is made, the
server will also need to enforce the max threads per client.
- allowing RpcConnection to create reverse connections with an
threadpool to serve calls in the other direction
- replacing connection IDs with something like TLS
- access controls for who can connect to who in pKVM context
Bug: 185167543
Test: binderRpcTest
Change-Id: I510d23a50cf839c39bc8107c1b0dae24dee3bc7b
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 5f024ca..4df12ce 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -126,40 +126,61 @@
{
std::lock_guard<std::mutex> _l(mLock);
LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join.");
- // TODO(b/185167543): support more than one client at once
- mConnection = RpcConnection::make();
- mConnection->setForServer(sp<RpcServer>::fromExisting(this), 42 /*placeholder id*/);
-
- mStarted = true;
- for (size_t i = 0; i < mMaxThreads; i++) {
- pool.push_back(std::thread([=] {
- // TODO(b/185167543): do this dynamically, instead of from a static number
- // of threads
- unique_fd clientFd(TEMP_FAILURE_RETRY(
- accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
- if (clientFd < 0) {
- // If this log becomes confusing, should save more state from
- // setupUnixDomainServer in order to output here.
- ALOGE("Could not accept4 socket: %s", strerror(errno));
- return;
- }
-
- LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
-
- mConnection->join(std::move(clientFd));
- }));
- }
}
- // TODO(b/185167543): don't waste extra thread for join, and combine threads
- // between clients
- for (auto& t : pool) t.join();
+ while (true) {
+ unique_fd clientFd(
+ TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
+
+ if (clientFd < 0) {
+ ALOGE("Could not accept4 socket: %s", strerror(errno));
+ continue;
+ }
+ LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
+
+ // TODO(b/183988761): cannot trust this simple ID
+ LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+ int32_t id;
+ if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
+ ALOGE("Could not read ID from fd %d", clientFd.get());
+ continue;
+ }
+
+ {
+ std::lock_guard<std::mutex> _l(mLock);
+
+ sp<RpcConnection> connection;
+ if (id == RPC_CONNECTION_ID_NEW) {
+ // new client!
+ LOG_ALWAYS_FATAL_IF(mConnectionIdCounter >= INT32_MAX, "Out of connection IDs");
+ mConnectionIdCounter++;
+
+ connection = RpcConnection::make();
+ connection->setForServer(wp<RpcServer>::fromExisting(this), mConnectionIdCounter);
+
+ mConnections[mConnectionIdCounter] = connection;
+ } else {
+ auto it = mConnections.find(id);
+ if (it == mConnections.end()) {
+ ALOGE("Cannot add thread, no record of connection with ID %d", id);
+ continue;
+ }
+ connection = it->second;
+ }
+
+ connection->startThread(std::move(clientFd));
+ }
+ }
}
std::vector<sp<RpcConnection>> RpcServer::listConnections() {
std::lock_guard<std::mutex> _l(mLock);
- if (mConnection == nullptr) return {};
- return {mConnection};
+ std::vector<sp<RpcConnection>> connections;
+ for (auto& [id, connection] : mConnections) {
+ (void)id;
+ connections.push_back(connection);
+ }
+ return connections;
}
bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {