rxrpc: Use RCU to access a peer's service connection tree

Move to using RCU access to a peer's service connection tree when routing
an incoming packet.  This is done using a seqlock to trigger retrying of
the tree walk if a change happened.

Further, we no longer get a ref on the connection looked up in the
data_ready handler unless we queue the connection's work item - and then
only if the refcount > 0.


Note that I'm avoiding the use of a hash table for service connections
because each service connection is addressed by a 62-bit number
(constructed from epoch and connection ID >> 2) that would allow the client
to engage in bucket stuffing, given knowledge of the hash algorithm.
Peers, however, are hashed as the network address is less controllable by
the client.  The total number of peers will also be limited in a future
commit.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index c243647..991a20d 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -575,13 +575,13 @@
  * post connection-level events to the connection
  * - this includes challenges, responses and some aborts
  */
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
+static bool rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
 				      struct sk_buff *skb)
 {
 	_enter("%p,%p", conn, skb);
 
 	skb_queue_tail(&conn->rx_queue, skb);
-	rxrpc_queue_conn(conn);
+	return rxrpc_queue_conn(conn);
 }
 
 /*
@@ -636,6 +636,7 @@
  */
 void rxrpc_data_ready(struct sock *sk)
 {
+	struct rxrpc_connection *conn;
 	struct rxrpc_skb_priv *sp;
 	struct rxrpc_local *local = sk->sk_user_data;
 	struct sk_buff *skb;
@@ -699,36 +700,37 @@
 	    (sp->hdr.callNumber == 0 || sp->hdr.seq == 0))
 		goto bad_message;
 
+	rcu_read_lock();
+
+retry_find_conn:
+	conn = rxrpc_find_connection_rcu(local, skb);
+	if (!conn)
+		goto cant_route_call;
+
 	if (sp->hdr.callNumber == 0) {
-		/* This is a connection-level packet. These should be
-		 * fairly rare, so the extra overhead of looking them up the
-		 * old-fashioned way doesn't really hurt */
-		struct rxrpc_connection *conn;
-
-		rcu_read_lock();
-		conn = rxrpc_find_connection(local, skb);
-		rcu_read_unlock();
-		if (!conn)
-			goto cant_route_call;
-
+		/* Connection-level packet */
 		_debug("CONN %p {%d}", conn, conn->debug_id);
-		rxrpc_post_packet_to_conn(conn, skb);
-		rxrpc_put_connection(conn);
+		if (!rxrpc_post_packet_to_conn(conn, skb))
+			goto retry_find_conn;
 	} else {
-		struct rxrpc_call *call;
+		/* Call-bound packets are routed by connection channel. */
+		unsigned int channel = sp->hdr.cid & RXRPC_CHANNELMASK;
+		struct rxrpc_channel *chan = &conn->channels[channel];
+		struct rxrpc_call *call = rcu_dereference(chan->call);
 
-		call = rxrpc_find_call_hash(&sp->hdr, local,
-					    AF_INET, &ip_hdr(skb)->saddr);
-		if (call)
-			rxrpc_post_packet_to_call(call, skb);
-		else
+		if (!call || atomic_read(&call->usage) == 0)
 			goto cant_route_call;
+
+		rxrpc_post_packet_to_call(call, skb);
 	}
 
+	rcu_read_unlock();
 out:
 	return;
 
 cant_route_call:
+	rcu_read_unlock();
+
 	_debug("can't route call");
 	if (sp->hdr.flags & RXRPC_CLIENT_INITIATED &&
 	    sp->hdr.type == RXRPC_PACKET_TYPE_DATA) {