svcrdma: Fix race with dto_tasklet in svc_rdma_send

The svc_rdma_send function will attempt to reap SQ WR to make room for
a new request if it finds the SQ full. This function races with the
dto_tasklet that also reaps SQ WR. To avoid polling and arming the CQ
unnecessarily move the test_and_clear_bit of the RDMAXPRT_SQ_PENDING
flag and arming of the CQ to the sq_cq_reap function.

Refactor the rq_cq_reap function to match sq_cq_reap so that the
code is easier to follow.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 1e0af2f..7373417 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -228,23 +228,8 @@
 		list_del_init(&xprt->sc_dto_q);
 		spin_unlock_irqrestore(&dto_lock, flags);
 
-		if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
-			ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-			rq_cq_reap(xprt);
-			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-			/*
-			 * If data arrived before established event,
-			 * don't enqueue. This defers RPC I/O until the
-			 * RDMA connection is complete.
-			 */
-			if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
-				svc_xprt_enqueue(&xprt->sc_xprt);
-		}
-
-		if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
-			ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-			sq_cq_reap(xprt);
-		}
+		rq_cq_reap(xprt);
+		sq_cq_reap(xprt);
 
 		svc_xprt_put(&xprt->sc_xprt);
 		spin_lock_irqsave(&dto_lock, flags);
@@ -297,6 +282,10 @@
 	struct ib_wc wc;
 	struct svc_rdma_op_ctxt *ctxt = NULL;
 
+	if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
+		return;
+
+	ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
 	atomic_inc(&rdma_stat_rq_poll);
 
 	spin_lock_bh(&xprt->sc_rq_dto_lock);
@@ -316,6 +305,15 @@
 
 	if (ctxt)
 		atomic_inc(&rdma_stat_rq_prod);
+
+	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+	/*
+	 * If data arrived before established event,
+	 * don't enqueue. This defers RPC I/O until the
+	 * RDMA connection is complete.
+	 */
+	if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+		svc_xprt_enqueue(&xprt->sc_xprt);
 }
 
 /*
@@ -328,6 +326,11 @@
 	struct ib_cq *cq = xprt->sc_sq_cq;
 	int ret;
 
+
+	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
+		return;
+
+	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
 	atomic_inc(&rdma_stat_sq_poll);
 	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
 		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
@@ -1010,7 +1013,8 @@
 		if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
 			spin_unlock_bh(&xprt->sc_lock);
 			atomic_inc(&rdma_stat_sq_starve);
-			/* See if we can reap some SQ WR */
+
+			/* See if we can opportunistically reap SQ WR to make room */
 			sq_cq_reap(xprt);
 
 			/* Wait until SQ WR available if SQ still full */