rxrpc: Kill the client connection bundle concept
Kill off the concept of maintaining a bundle of connections to a particular
target service to increase the number of call slots available for any
beyond four for that service (there are four call slots per connection).
This will make cleaning up the connection handling code easier and
facilitate removal of the rxrpc_transport struct. Bundling can be
reintroduced later if necessary.
Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 45849a6..9b3b48a 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -31,6 +31,8 @@
unsigned int rxrpc_dead_call_expiry = 2 * HZ;
const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
+ [RXRPC_CALL_UNINITIALISED] = "Uninit",
+ [RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn",
[RXRPC_CALL_CLIENT_SEND_REQUEST] = "ClSndReq",
[RXRPC_CALL_CLIENT_AWAIT_REPLY] = "ClAwtRpl",
[RXRPC_CALL_CLIENT_RECV_REPLY] = "ClRcvRpl",
@@ -261,6 +263,7 @@
(unsigned long) call);
INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
INIT_WORK(&call->processor, &rxrpc_process_call);
+ INIT_LIST_HEAD(&call->link);
INIT_LIST_HEAD(&call->accept_link);
skb_queue_head_init(&call->rx_queue);
skb_queue_head_init(&call->rx_oos_queue);
@@ -269,7 +272,6 @@
rwlock_init(&call->state_lock);
atomic_set(&call->usage, 1);
call->debug_id = atomic_inc_return(&rxrpc_debug_id);
- call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
memset(&call->sock_node, 0xed, sizeof(call->sock_node));
@@ -282,55 +284,70 @@
}
/*
- * allocate a new client call and attempt to get a connection slot for it
+ * Allocate a new client call.
*/
static struct rxrpc_call *rxrpc_alloc_client_call(
struct rxrpc_sock *rx,
struct rxrpc_conn_parameters *cp,
- struct rxrpc_transport *trans,
- struct rxrpc_conn_bundle *bundle,
+ struct sockaddr_rxrpc *srx,
gfp_t gfp)
{
struct rxrpc_call *call;
- int ret;
_enter("");
- ASSERT(rx != NULL);
- ASSERT(trans != NULL);
- ASSERT(bundle != NULL);
+ ASSERT(rx->local != NULL);
call = rxrpc_alloc_call(gfp);
if (!call)
return ERR_PTR(-ENOMEM);
+ call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
sock_hold(&rx->sk);
call->socket = rx;
call->rx_data_post = 1;
- ret = rxrpc_connect_call(rx, cp, trans, bundle, call, gfp);
- if (ret < 0) {
- kmem_cache_free(rxrpc_call_jar, call);
- return ERR_PTR(ret);
- }
-
/* Record copies of information for hashtable lookup */
call->family = rx->family;
- call->local = call->conn->params.local;
+ call->local = rx->local;
switch (call->family) {
case AF_INET:
- call->peer_ip.ipv4_addr =
- call->conn->params.peer->srx.transport.sin.sin_addr.s_addr;
+ call->peer_ip.ipv4_addr = srx->transport.sin.sin_addr.s_addr;
break;
case AF_INET6:
memcpy(call->peer_ip.ipv6_addr,
- call->conn->params.peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
+ srx->transport.sin6.sin6_addr.in6_u.u6_addr8,
sizeof(call->peer_ip.ipv6_addr));
break;
}
- call->epoch = call->conn->proto.epoch;
- call->service_id = call->conn->params.service_id;
- call->in_clientflag = call->conn->proto.in_clientflag;
+
+ call->service_id = srx->srx_service;
+ call->in_clientflag = 0;
+
+ _leave(" = %p", call);
+ return call;
+}
+
+/*
+ * Begin client call.
+ */
+static int rxrpc_begin_client_call(struct rxrpc_call *call,
+ struct rxrpc_conn_parameters *cp,
+ struct rxrpc_transport *trans,
+ struct sockaddr_rxrpc *srx,
+ gfp_t gfp)
+{
+ int ret;
+
+ /* Set up or get a connection record and set the protocol parameters,
+ * including channel number and call ID.
+ */
+ ret = rxrpc_connect_call(call, cp, trans, srx, gfp);
+ if (ret < 0)
+ return ret;
+
+ call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
+
/* Add the new call to the hashtable */
rxrpc_call_hash_add(call);
@@ -340,9 +357,7 @@
call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
add_timer(&call->lifetimer);
-
- _leave(" = %p", call);
- return call;
+ return 0;
}
/*
@@ -352,23 +367,23 @@
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
struct rxrpc_conn_parameters *cp,
struct rxrpc_transport *trans,
- struct rxrpc_conn_bundle *bundle,
+ struct sockaddr_rxrpc *srx,
unsigned long user_call_ID,
gfp_t gfp)
{
struct rxrpc_call *call, *xcall;
struct rb_node *parent, **pp;
+ int ret;
- _enter("%p,%d,%d,%lx",
- rx, trans->debug_id, bundle ? bundle->debug_id : -1,
- user_call_ID);
+ _enter("%p,%lx", rx, user_call_ID);
- call = rxrpc_alloc_client_call(rx, cp, trans, bundle, gfp);
+ call = rxrpc_alloc_client_call(rx, cp, srx, gfp);
if (IS_ERR(call)) {
_leave(" = %ld", PTR_ERR(call));
return call;
}
+ /* Publish the call, even though it is incompletely set up as yet */
call->user_call_ID = user_call_ID;
__set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
@@ -398,11 +413,29 @@
list_add_tail(&call->link, &rxrpc_calls);
write_unlock_bh(&rxrpc_call_lock);
+ ret = rxrpc_begin_client_call(call, cp, trans, srx, gfp);
+ if (ret < 0)
+ goto error;
+
_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
_leave(" = %p [new]", call);
return call;
+error:
+ write_lock(&rx->call_lock);
+ rb_erase(&call->sock_node, &rx->calls);
+ write_unlock(&rx->call_lock);
+ rxrpc_put_call(call);
+
+ write_lock_bh(&rxrpc_call_lock);
+ list_del(&call->link);
+ write_unlock_bh(&rxrpc_call_lock);
+
+ rxrpc_put_call(call);
+ _leave(" = %d", ret);
+ return ERR_PTR(ret);
+
/* We unexpectedly found the user ID in the list after taking
* the call_lock. This shouldn't happen unless the user races
* with itself and tries to add the same user ID twice at the
@@ -612,40 +645,13 @@
write_unlock_bh(&rx->call_lock);
/* free up the channel for reuse */
- spin_lock(&conn->trans->client_lock);
+ spin_lock(&conn->channel_lock);
write_lock_bh(&conn->lock);
write_lock(&call->state_lock);
- if (conn->channels[call->channel] == call)
- conn->channels[call->channel] = NULL;
+ rxrpc_disconnect_call(call);
- if (conn->out_clientflag && conn->bundle) {
- conn->avail_calls++;
- switch (conn->avail_calls) {
- case 1:
- list_move_tail(&conn->bundle_link,
- &conn->bundle->avail_conns);
- case 2 ... RXRPC_MAXCALLS - 1:
- ASSERT(conn->channels[0] == NULL ||
- conn->channels[1] == NULL ||
- conn->channels[2] == NULL ||
- conn->channels[3] == NULL);
- break;
- case RXRPC_MAXCALLS:
- list_move_tail(&conn->bundle_link,
- &conn->bundle->unused_conns);
- ASSERT(conn->channels[0] == NULL &&
- conn->channels[1] == NULL &&
- conn->channels[2] == NULL &&
- conn->channels[3] == NULL);
- break;
- default:
- pr_err("conn->avail_calls=%d\n", conn->avail_calls);
- BUG();
- }
- }
-
- spin_unlock(&conn->trans->client_lock);
+ spin_unlock(&conn->channel_lock);
if (call->state < RXRPC_CALL_COMPLETE &&
call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {