SUNRPC: Add a separate spinlock to protect the RPC request receive list

This further reduces contention with the transport_lock, and allows us
to convert to using a non-bh-safe spinlock, since the list is now never
accessed from a bh context.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index a344bea..2b91813 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -969,12 +969,12 @@
 		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
 	xprt_pin_rqst(rovr);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 	task = rovr->rq_task;
 
 	copied = rovr->rq_private_buf.buflen;
@@ -983,16 +983,16 @@
 
 	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 		dprintk("RPC:       sk_buff copy failed\n");
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->recv_lock);
 		goto out_unpin;
 	}
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	xprt_complete_rqst(task, copied);
 out_unpin:
 	xprt_unpin_rqst(rovr);
  out_unlock:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 }
 
 static void xs_local_data_receive(struct sock_xprt *transport)
@@ -1055,12 +1055,12 @@
 		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
 	xprt_pin_rqst(rovr);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 	task = rovr->rq_task;
 
 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1069,7 +1069,7 @@
 	/* Suck it into the iovec, verify checksum if not done by hw. */
 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 		__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->recv_lock);
 		goto out_unpin;
 	}
 
@@ -1077,11 +1077,13 @@
 
 	spin_lock_bh(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, copied);
+	spin_unlock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	xprt_complete_rqst(task, copied);
 out_unpin:
 	xprt_unpin_rqst(rovr);
  out_unlock:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 }
 
 static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1344,24 +1346,24 @@
 	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
 
 	/* Find and lock the request corresponding to this xid */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
 	if (!req) {
 		dprintk("RPC:       XID %08x request not found!\n",
 				ntohl(transport->tcp_xid));
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->recv_lock);
 		return -1;
 	}
 	xprt_pin_rqst(req);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 
 	xs_tcp_read_common(xprt, desc, req);
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
 	xprt_unpin_rqst(req);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 	return 0;
 }