SUNRPC: Don't hold the transport lock across socket copy operations

Instead add a mechanism to ensure that the request doesn't disappear
from underneath us while copying from the socket. We do this by
preventing xprt_release() from freeing the XDR buffers until the
flag RPC_TASK_MSG_RECV has been cleared from the request.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Reviewed-by: Chuck Lever <chuck.lever@oracle.com>
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 4f154d3..04dbc70 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -973,6 +973,8 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
+	xprt_pin_rqst(rovr);
+	spin_unlock_bh(&xprt->transport_lock);
 	task = rovr->rq_task;
 
 	copied = rovr->rq_private_buf.buflen;
@@ -981,11 +983,14 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
 
 	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 		dprintk("RPC:       sk_buff copy failed\n");
-		goto out_unlock;
+		spin_lock_bh(&xprt->transport_lock);
+		goto out_unpin;
 	}
 
+	spin_lock_bh(&xprt->transport_lock);
 	xprt_complete_rqst(task, copied);
-
+out_unpin:
+	xprt_unpin_rqst(rovr);
  out_unlock:
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -1054,6 +1059,8 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
+	xprt_pin_rqst(rovr);
+	spin_unlock_bh(&xprt->transport_lock);
 	task = rovr->rq_task;
 
 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1062,14 +1069,17 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 	/* Suck it into the iovec, verify checksum if not done by hw. */
 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 		__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
-		goto out_unlock;
+		spin_lock_bh(&xprt->transport_lock);
+		goto out_unpin;
 	}
 
 	__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
 
+	spin_lock_bh(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, copied);
 	xprt_complete_rqst(task, copied);
-
+out_unpin:
+	xprt_unpin_rqst(rovr);
  out_unlock:
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -1351,12 +1361,15 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 		spin_unlock_bh(&xprt->transport_lock);
 		return -1;
 	}
+	xprt_pin_rqst(req);
+	spin_unlock_bh(&xprt->transport_lock);
 
 	xs_tcp_read_common(xprt, desc, req);
 
+	spin_lock_bh(&xprt->transport_lock);
 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
-
+	xprt_unpin_rqst(req);
 	spin_unlock_bh(&xprt->transport_lock);
 	return 0;
 }