SUNRPC: Add a separate spinlock to protect the RPC request receive list

This further reduces contention with the transport_lock, and allows us
to convert to using a non-bh-safe spinlock, since the list is now never
accessed from a bh context.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3eb9ec1..2af189c 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -872,17 +872,17 @@
 }
 
 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
-__must_hold(&req->rq_xprt->transport_lock)
+__must_hold(&req->rq_xprt->recv_lock)
 {
 	struct rpc_task *task = req->rq_task;
 	
 	if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
-		spin_unlock_bh(&req->rq_xprt->transport_lock);
+		spin_unlock(&req->rq_xprt->recv_lock);
 		set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
 		wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
 				TASK_UNINTERRUPTIBLE);
 		clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
-		spin_lock_bh(&req->rq_xprt->transport_lock);
+		spin_lock(&req->rq_xprt->recv_lock);
 	}
 }
 
@@ -1008,13 +1008,13 @@
 			/*
 			 * Add to the list only if we're expecting a reply
 			 */
-			spin_lock_bh(&xprt->transport_lock);
 			/* Update the softirq receive buffer */
 			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 					sizeof(req->rq_private_buf));
 			/* Add request to the receive list */
+			spin_lock(&xprt->recv_lock);
 			list_add_tail(&req->rq_list, &xprt->recv);
-			spin_unlock_bh(&xprt->transport_lock);
+			spin_unlock(&xprt->recv_lock);
 			xprt_reset_majortimeo(req);
 			/* Turn off autodisconnect */
 			del_singleshot_timer_sync(&xprt->timer);
@@ -1329,15 +1329,18 @@
 		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
 	else if (task->tk_client)
 		rpc_count_iostats(task, task->tk_client->cl_metrics);
+	spin_lock(&xprt->recv_lock);
+	if (!list_empty(&req->rq_list)) {
+		list_del(&req->rq_list);
+		xprt_wait_on_pinned_rqst(req);
+	}
+	spin_unlock(&xprt->recv_lock);
 	spin_lock_bh(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
 	if (xprt->ops->release_request)
 		xprt->ops->release_request(task);
-	if (!list_empty(&req->rq_list))
-		list_del(&req->rq_list);
 	xprt->last_used = jiffies;
 	xprt_schedule_autodisconnect(xprt);
-	xprt_wait_on_pinned_rqst(req);
 	spin_unlock_bh(&xprt->transport_lock);
 	if (req->rq_buffer)
 		xprt->ops->buf_free(task);
@@ -1361,6 +1364,7 @@
 
 	spin_lock_init(&xprt->transport_lock);
 	spin_lock_init(&xprt->reserve_lock);
+	spin_lock_init(&xprt->recv_lock);
 
 	INIT_LIST_HEAD(&xprt->free);
 	INIT_LIST_HEAD(&xprt->recv);