rxrpc: Rework peer object handling to use hash table and RCU

Rework peer object handling to use a hash table instead of a flat list and
to use RCU.  Peer objects are no longer destroyed by passing them to a
workqueue to process, but rather are just passed to the RCU garbage
collector as kfree'able objects.

The hash function uses the local endpoint plus all the components of the
remote address, except for the RxRPC service ID.  Peers thus represent a
UDP port on the remote machine as contacted by a UDP port on this machine.

The RCU read lock is used to handle non-creating lookups so that they can
be called from bottom half context in the sk_error_report handler without
having to lock the hash table against modification.
rxrpc_lookup_peer_rcu() *does* take a reference on the peer object as in
the future, this will be passed to a work item for error distribution in
the error_report path and this function will cease being used in the
data_ready path.

Creating lookups are done under spinlock rather than mutex as they might be
set up due to an external stimulus if the local endpoint is a server.

Captured network error messages (ICMP) are handled with respect to this
struct and MTU size and RTT are cached here.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c
index 0b54cda..7fc50dc 100644
--- a/net/rxrpc/peer_object.c
+++ b/net/rxrpc/peer_object.c
@@ -1,6 +1,6 @@
-/* RxRPC remote transport endpoint management
+/* RxRPC remote transport endpoint record management
  *
- * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
  * Written by David Howells (dhowells@redhat.com)
  *
  * This program is free software; you can redistribute it and/or
@@ -16,20 +16,132 @@
 #include <linux/skbuff.h>
 #include <linux/udp.h>
 #include <linux/in.h>
-#include <linux/in6.h>
-#include <linux/icmp.h>
 #include <linux/slab.h>
+#include <linux/hashtable.h>
 #include <net/sock.h>
 #include <net/af_rxrpc.h>
 #include <net/ip.h>
 #include <net/route.h>
 #include "ar-internal.h"
 
-static LIST_HEAD(rxrpc_peers);
-static DEFINE_RWLOCK(rxrpc_peer_lock);
-static DECLARE_WAIT_QUEUE_HEAD(rxrpc_peer_wq);
+static DEFINE_HASHTABLE(rxrpc_peer_hash, 10);
+static DEFINE_SPINLOCK(rxrpc_peer_hash_lock);
 
-static void rxrpc_destroy_peer(struct work_struct *work);
+/*
+ * Hash a peer key.
+ */
+static unsigned long rxrpc_peer_hash_key(struct rxrpc_local *local,
+					 const struct sockaddr_rxrpc *srx)
+{
+	const u16 *p;
+	unsigned int i, size;
+	unsigned long hash_key;
+
+	_enter("");
+
+	hash_key = (unsigned long)local / __alignof__(*local);
+	hash_key += srx->transport_type;
+	hash_key += srx->transport_len;
+	hash_key += srx->transport.family;
+
+	switch (srx->transport.family) {
+	case AF_INET:
+		hash_key += (u16 __force)srx->transport.sin.sin_port;
+		size = sizeof(srx->transport.sin.sin_addr);
+		p = (u16 *)&srx->transport.sin.sin_addr;
+		break;
+	}
+
+	/* Step through the peer address in 16-bit portions for speed */
+	for (i = 0; i < size; i += sizeof(*p), p++)
+		hash_key += *p;
+
+	_leave(" 0x%lx", hash_key);
+	return hash_key;
+}
+
+/*
+ * Compare a peer to a key.  Return -ve, 0 or +ve to indicate less than, same
+ * or greater than.
+ *
+ * Unfortunately, the primitives in linux/hashtable.h don't allow for sorted
+ * buckets and mid-bucket insertion, so we don't make full use of this
+ * information at this point.
+ */
+static long rxrpc_peer_cmp_key(const struct rxrpc_peer *peer,
+			       struct rxrpc_local *local,
+			       const struct sockaddr_rxrpc *srx,
+			       unsigned long hash_key)
+{
+	long diff;
+
+	diff = ((peer->hash_key - hash_key) ?:
+		((unsigned long)peer->local - (unsigned long)local) ?:
+		(peer->srx.transport_type - srx->transport_type) ?:
+		(peer->srx.transport_len - srx->transport_len) ?:
+		(peer->srx.transport.family - srx->transport.family));
+	if (diff != 0)
+		return diff;
+
+	switch (srx->transport.family) {
+	case AF_INET:
+		return ((u16 __force)peer->srx.transport.sin.sin_port -
+			(u16 __force)srx->transport.sin.sin_port) ?:
+			memcmp(&peer->srx.transport.sin.sin_addr,
+			       &srx->transport.sin.sin_addr,
+			       sizeof(struct in_addr));
+	default:
+		BUG();
+	}
+}
+
+/*
+ * Look up a remote transport endpoint for the specified address using RCU.
+ */
+static struct rxrpc_peer *__rxrpc_lookup_peer_rcu(
+	struct rxrpc_local *local,
+	const struct sockaddr_rxrpc *srx,
+	unsigned long hash_key)
+{
+	struct rxrpc_peer *peer;
+
+	hash_for_each_possible_rcu(rxrpc_peer_hash, peer, hash_link, hash_key) {
+		if (rxrpc_peer_cmp_key(peer, local, srx, hash_key) == 0) {
+			if (atomic_read(&peer->usage) == 0)
+				return NULL;
+			return peer;
+		}
+	}
+
+	return NULL;
+}
+
+/*
+ * Look up a remote transport endpoint for the specified address using RCU.
+ */
+struct rxrpc_peer *rxrpc_lookup_peer_rcu(struct rxrpc_local *local,
+					 const struct sockaddr_rxrpc *srx)
+{
+	struct rxrpc_peer *peer;
+	unsigned long hash_key = rxrpc_peer_hash_key(local, srx);
+
+	peer = __rxrpc_lookup_peer_rcu(local, srx, hash_key);
+	if (peer) {
+		switch (srx->transport.family) {
+		case AF_INET:
+			_net("PEER %d {%d,%u,%pI4+%hu}",
+			     peer->debug_id,
+			     peer->srx.transport_type,
+			     peer->srx.transport.family,
+			     &peer->srx.transport.sin.sin_addr,
+			     ntohs(peer->srx.transport.sin.sin_port));
+			break;
+		}
+
+		_leave(" = %p {u=%d}", peer, atomic_read(&peer->usage));
+	}
+	return peer;
+}
 
 /*
  * assess the MTU size for the network interface through which this peer is
@@ -58,10 +170,9 @@
 }
 
 /*
- * allocate a new peer
+ * Allocate a peer.
  */
-static struct rxrpc_peer *rxrpc_alloc_peer(struct sockaddr_rxrpc *srx,
-					   gfp_t gfp)
+struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
 {
 	struct rxrpc_peer *peer;
 
@@ -69,12 +180,32 @@
 
 	peer = kzalloc(sizeof(struct rxrpc_peer), gfp);
 	if (peer) {
-		INIT_WORK(&peer->destroyer, &rxrpc_destroy_peer);
-		INIT_LIST_HEAD(&peer->link);
+		atomic_set(&peer->usage, 1);
+		peer->local = local;
 		INIT_LIST_HEAD(&peer->error_targets);
 		spin_lock_init(&peer->lock);
-		atomic_set(&peer->usage, 1);
 		peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
+	}
+
+	_leave(" = %p", peer);
+	return peer;
+}
+
+/*
+ * Set up a new peer.
+ */
+static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_local *local,
+					    struct sockaddr_rxrpc *srx,
+					    unsigned long hash_key,
+					    gfp_t gfp)
+{
+	struct rxrpc_peer *peer;
+
+	_enter("");
+
+	peer = rxrpc_alloc_peer(local, gfp);
+	if (peer) {
+		peer->hash_key = hash_key;
 		memcpy(&peer->srx, srx, sizeof(*srx));
 
 		rxrpc_assess_MTU_size(peer);
@@ -105,11 +236,11 @@
 /*
  * obtain a remote transport endpoint for the specified address
  */
-struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *srx, gfp_t gfp)
+struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_local *local,
+				     struct sockaddr_rxrpc *srx, gfp_t gfp)
 {
 	struct rxrpc_peer *peer, *candidate;
-	const char *new = "old";
-	int usage;
+	unsigned long hash_key = rxrpc_peer_hash_key(local, srx);
 
 	_enter("{%d,%d,%pI4+%hu}",
 	       srx->transport_type,
@@ -118,188 +249,60 @@
 	       ntohs(srx->transport.sin.sin_port));
 
 	/* search the peer list first */
-	read_lock_bh(&rxrpc_peer_lock);
-	list_for_each_entry(peer, &rxrpc_peers, link) {
-		_debug("check PEER %d { u=%d t=%d l=%d }",
-		       peer->debug_id,
-		       atomic_read(&peer->usage),
-		       peer->srx.transport_type,
-		       peer->srx.transport_len);
+	rcu_read_lock();
+	peer = __rxrpc_lookup_peer_rcu(local, srx, hash_key);
+	if (peer && !rxrpc_get_peer_maybe(peer))
+		peer = NULL;
+	rcu_read_unlock();
 
-		if (atomic_read(&peer->usage) > 0 &&
-		    peer->srx.transport_type == srx->transport_type &&
-		    peer->srx.transport_len == srx->transport_len &&
-		    memcmp(&peer->srx.transport,
-			   &srx->transport,
-			   srx->transport_len) == 0)
-			goto found_extant_peer;
-	}
-	read_unlock_bh(&rxrpc_peer_lock);
+	if (!peer) {
+		/* The peer is not yet present in hash - create a candidate
+		 * for a new record and then redo the search.
+		 */
+		candidate = rxrpc_create_peer(local, srx, hash_key, gfp);
+		if (!candidate) {
+			_leave(" = NULL [nomem]");
+			return NULL;
+		}
 
-	/* not yet present - create a candidate for a new record and then
-	 * redo the search */
-	candidate = rxrpc_alloc_peer(srx, gfp);
-	if (!candidate) {
-		_leave(" = -ENOMEM");
-		return ERR_PTR(-ENOMEM);
+		spin_lock(&rxrpc_peer_hash_lock);
+
+		/* Need to check that we aren't racing with someone else */
+		peer = __rxrpc_lookup_peer_rcu(local, srx, hash_key);
+		if (peer && !rxrpc_get_peer_maybe(peer))
+			peer = NULL;
+		if (!peer)
+			hash_add_rcu(rxrpc_peer_hash,
+				     &candidate->hash_link, hash_key);
+
+		spin_unlock(&rxrpc_peer_hash_lock);
+
+		if (peer)
+			kfree(candidate);
+		else
+			peer = candidate;
 	}
 
-	write_lock_bh(&rxrpc_peer_lock);
-
-	list_for_each_entry(peer, &rxrpc_peers, link) {
-		if (atomic_read(&peer->usage) > 0 &&
-		    peer->srx.transport_type == srx->transport_type &&
-		    peer->srx.transport_len == srx->transport_len &&
-		    memcmp(&peer->srx.transport,
-			   &srx->transport,
-			   srx->transport_len) == 0)
-			goto found_extant_second;
-	}
-
-	/* we can now add the new candidate to the list */
-	peer = candidate;
-	candidate = NULL;
-	usage = atomic_read(&peer->usage);
-
-	list_add_tail(&peer->link, &rxrpc_peers);
-	write_unlock_bh(&rxrpc_peer_lock);
-	new = "new";
-
-success:
-	_net("PEER %s %d {%d,%u,%pI4+%hu}",
-	     new,
+	_net("PEER %d {%d,%pI4+%hu}",
 	     peer->debug_id,
 	     peer->srx.transport_type,
-	     peer->srx.transport.family,
 	     &peer->srx.transport.sin.sin_addr,
 	     ntohs(peer->srx.transport.sin.sin_port));
 
-	_leave(" = %p {u=%d}", peer, usage);
+	_leave(" = %p {u=%d}", peer, atomic_read(&peer->usage));
 	return peer;
-
-	/* we found the peer in the list immediately */
-found_extant_peer:
-	usage = atomic_inc_return(&peer->usage);
-	read_unlock_bh(&rxrpc_peer_lock);
-	goto success;
-
-	/* we found the peer on the second time through the list */
-found_extant_second:
-	usage = atomic_inc_return(&peer->usage);
-	write_unlock_bh(&rxrpc_peer_lock);
-	kfree(candidate);
-	goto success;
 }
 
 /*
- * find the peer associated with a packet
+ * Discard a ref on a remote peer record.
  */
-struct rxrpc_peer *rxrpc_find_peer(struct rxrpc_local *local,
-				   __be32 addr, __be16 port)
+void __rxrpc_put_peer(struct rxrpc_peer *peer)
 {
-	struct rxrpc_peer *peer;
+	ASSERT(list_empty(&peer->error_targets));
 
-	_enter("");
+	spin_lock(&rxrpc_peer_hash_lock);
+	hash_del_rcu(&peer->hash_link);
+	spin_unlock(&rxrpc_peer_hash_lock);
 
-	/* search the peer list */
-	read_lock_bh(&rxrpc_peer_lock);
-
-	if (local->srx.transport.family == AF_INET &&
-	    local->srx.transport_type == SOCK_DGRAM
-	    ) {
-		list_for_each_entry(peer, &rxrpc_peers, link) {
-			if (atomic_read(&peer->usage) > 0 &&
-			    peer->srx.transport_type == SOCK_DGRAM &&
-			    peer->srx.transport.family == AF_INET &&
-			    peer->srx.transport.sin.sin_port == port &&
-			    peer->srx.transport.sin.sin_addr.s_addr == addr)
-				goto found_UDP_peer;
-		}
-
-		goto new_UDP_peer;
-	}
-
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = -EAFNOSUPPORT");
-	return ERR_PTR(-EAFNOSUPPORT);
-
-found_UDP_peer:
-	_net("Rx UDP DGRAM from peer %d", peer->debug_id);
-	atomic_inc(&peer->usage);
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = %p", peer);
-	return peer;
-
-new_UDP_peer:
-	_net("Rx UDP DGRAM from NEW peer");
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = -EBUSY [new]");
-	return ERR_PTR(-EBUSY);
-}
-
-/*
- * release a remote transport endpoint
- */
-void rxrpc_put_peer(struct rxrpc_peer *peer)
-{
-	_enter("%p{u=%d}", peer, atomic_read(&peer->usage));
-
-	ASSERTCMP(atomic_read(&peer->usage), >, 0);
-
-	if (likely(!atomic_dec_and_test(&peer->usage))) {
-		_leave(" [in use]");
-		return;
-	}
-
-	rxrpc_queue_work(&peer->destroyer);
-	_leave("");
-}
-
-/*
- * destroy a remote transport endpoint
- */
-static void rxrpc_destroy_peer(struct work_struct *work)
-{
-	struct rxrpc_peer *peer =
-		container_of(work, struct rxrpc_peer, destroyer);
-
-	_enter("%p{%d}", peer, atomic_read(&peer->usage));
-
-	write_lock_bh(&rxrpc_peer_lock);
-	list_del(&peer->link);
-	write_unlock_bh(&rxrpc_peer_lock);
-
-	_net("DESTROY PEER %d", peer->debug_id);
-	kfree(peer);
-
-	if (list_empty(&rxrpc_peers))
-		wake_up_all(&rxrpc_peer_wq);
-	_leave("");
-}
-
-/*
- * preemptively destroy all the peer records from a transport endpoint rather
- * than waiting for them to time out
- */
-void __exit rxrpc_destroy_all_peers(void)
-{
-	DECLARE_WAITQUEUE(myself,current);
-
-	_enter("");
-
-	/* we simply have to wait for them to go away */
-	if (!list_empty(&rxrpc_peers)) {
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		add_wait_queue(&rxrpc_peer_wq, &myself);
-
-		while (!list_empty(&rxrpc_peers)) {
-			schedule();
-			set_current_state(TASK_UNINTERRUPTIBLE);
-		}
-
-		remove_wait_queue(&rxrpc_peer_wq, &myself);
-		set_current_state(TASK_RUNNING);
-	}
-
-	_leave("");
+	kfree_rcu(peer, rcu);
 }