RDS/IB: track signaled sends

We're seeing bugs today where IB connection shutdown clears the send
ring while the tasklet is processing completed sends.  Implementation
details cause this to dereference a null pointer.  Shutdown needs to
wait for send completion to stop before tearing down the connection.  We
can't simply wait for the ring to empty because it may contain
unsignaled sends that will never be processed.

This patch tracks the number of signaled sends that we've posted and
waits for them to complete.  It also makes sure that the tasklet has
finished executing.

Signed-off-by: Zach Brown <zach.brown@oracle.com>
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e88cb4a..15f7569 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -220,6 +220,18 @@
 }
 
 /*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+	if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+	    waitqueue_active(&rds_ib_ring_empty_wait))
+		wake_up(&rds_ib_ring_empty_wait);
+	BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
+/*
  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
  * operations performed in the send path.  As the sender allocs and potentially
  * unallocs the next free entry in the ring it doesn't alter which is
@@ -236,6 +248,7 @@
 	u32 oldest;
 	u32 i = 0;
 	int ret;
+	int nr_sig = 0;
 
 	rdsdebug("cq %p conn %p\n", cq, conn);
 	rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@
 
 		for (i = 0; i < completed; i++) {
 			send = &ic->i_sends[oldest];
+			if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+				nr_sig++;
 
 			rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
@@ -282,6 +297,8 @@
 		}
 
 		rds_ib_ring_free(&ic->i_send_ring, completed);
+		rds_ib_sub_signaled(ic, nr_sig);
+		nr_sig = 0;
 
 		if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
 		    test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@
 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
-					      struct rds_ib_send_work *send,
-					      bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+					     struct rds_ib_send_work *send,
+					     bool notify)
 {
 	/*
 	 * We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@
 	if (ic->i_unsignaled_wrs-- == 0 || notify) {
 		ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
 		send->s_wr.send_flags |= IB_SEND_SIGNALED;
+		return 1;
 	}
+	return 0;
 }
 
 /*
@@ -488,6 +507,7 @@
 	int bytes_sent = 0;
 	int ret;
 	int flow_controlled = 0;
+	int nr_sig = 0;
 
 	BUG_ON(off % RDS_FRAG_SIZE);
 	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@
 		if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
 			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 
+		if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+			nr_sig++;
+
 		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 			 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
@@ -689,6 +712,9 @@
 	if (ic->i_flowctl && i < credit_alloc)
 		rds_ib_send_add_credits(conn, credit_alloc - i);
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	/* XXX need to worry about failed_wr and partial sends. */
 	failed_wr = &first->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@
 		printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		if (prev->s_op) {
 			ic->i_data_op = prev->s_op;
 			prev->s_op = NULL;
@@ -728,6 +755,7 @@
 	u32 pos;
 	u32 work_alloc;
 	int ret;
+	int nr_sig = 0;
 
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
@@ -752,7 +780,7 @@
 		send->s_wr.wr.atomic.compare_add = op->op_swap_add;
 		send->s_wr.wr.atomic.swap = 0;
 	}
-	rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+	nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 	send->s_wr.num_sge = 1;
 	send->s_wr.next = NULL;
 	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@
 	rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
 		 send->s_sge[0].addr, send->s_sge[0].length);
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	failed_wr = &send->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
 	rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@
 		printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		goto out;
 	}
 
@@ -817,6 +849,7 @@
 	int sent;
 	int ret;
 	int num_sge;
+	int nr_sig = 0;
 
 	/* map the op the first time we see it */
 	if (!op->op_mapped) {
@@ -859,7 +892,7 @@
 		send->s_queued = jiffies;
 		send->s_op = NULL;
 
-		rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+		nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
 		send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
 		send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@
 		work_alloc = i;
 	}
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	failed_wr = &first->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
 	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@
 		printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		goto out;
 	}