rds: fix rds_send_xmit() serialization

rds_send_xmit() was changed to hold an interrupt masking spinlock instead of a
mutex so that it could be called from the IB receive tasklet path.  This broke
the TCP transport because its xmit method can block and masks and unmasks
interrupts.

This patch serializes callers to rds_send_xmit() with a simple bit instead of
the current spinlock or previous mutex.  This enables rds_send_xmit() to be
called from any context and to call functions which block.  Getting rid of the
c_send_lock exposes the bare c_lock acquisitions which are changed to block
interrupts.

A waitqueue is added so that rds_conn_shutdown() can wait for callers to leave
rds_send_xmit() before tearing down partial send state.  This lets us get rid
of c_senders.

rds_send_xmit() is changed to check the conn state after acquiring the
RDS_IN_XMIT bit to resolve races with the shutdown path.  Previously both
worked with the conn state and then the lock in the same order, allowing them
to race and execute the paths concurrently.

rds_send_reset() isn't racing with rds_send_xmit() now that rds_conn_shutdown()
properly ensures that rds_send_xmit() can't start once the conn state has been
changed.  We can remove its previous use of the spinlock.

Finally, c_send_generation is redundant.  Callers can race to test the c_flags
bit by simply retrying instead of racing to test the c_send_generation atomic.

Signed-off-by: Zach Brown <zach.brown@oracle.com>
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 5bb0eec..89871db 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -148,9 +148,7 @@
 	spin_lock_init(&conn->c_lock);
 	conn->c_next_tx_seq = 1;
 
-	spin_lock_init(&conn->c_send_lock);
-	atomic_set(&conn->c_send_generation, 1);
-	atomic_set(&conn->c_senders, 0);
+	init_waitqueue_head(&conn->c_waitq);
 	INIT_LIST_HEAD(&conn->c_send_queue);
 	INIT_LIST_HEAD(&conn->c_retrans);
 
@@ -275,15 +273,8 @@
 		}
 		mutex_unlock(&conn->c_cm_lock);
 
-		/* verify everybody's out of rds_send_xmit() */
-		spin_lock_irq(&conn->c_send_lock);
-		spin_unlock_irq(&conn->c_send_lock);
-
-		while(atomic_read(&conn->c_senders)) {
-			schedule_timeout(1);
-			spin_lock_irq(&conn->c_send_lock);
-			spin_unlock_irq(&conn->c_send_lock);
-		}
+		wait_event(conn->c_waitq,
+			   !test_bit(RDS_IN_XMIT, &conn->c_flags));
 
 		conn->c_trans->conn_shutdown(conn);
 		rds_conn_reset(conn);
@@ -477,8 +468,8 @@
 		sizeof(cinfo->transport));
 	cinfo->flags = 0;
 
-	rds_conn_info_set(cinfo->flags,
-			  spin_is_locked(&conn->c_send_lock), SENDING);
+	rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+			  SENDING);
 	/* XXX Future: return the state rather than these funky bits */
 	rds_conn_info_set(cinfo->flags,
 			  atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 3f91e79..e88cb4a 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -321,7 +321,7 @@
  * credits (see rds_ib_send_add_credits below).
  *
  * The RDS send code is essentially single-threaded; rds_send_xmit
- * grabs c_send_lock to ensure exclusive access to the send ring.
+ * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
  * However, the ACK sending code is independent and can race with
  * message SENDs.
  *
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 270ded7..4510344 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -80,6 +80,7 @@
 /* Bits for c_flags */
 #define RDS_LL_SEND_FULL	0
 #define RDS_RECONNECT_PENDING	1
+#define RDS_IN_XMIT		2
 
 struct rds_connection {
 	struct hlist_node	c_hash_node;
@@ -91,9 +92,6 @@
 	struct rds_cong_map	*c_lcong;
 	struct rds_cong_map	*c_fcong;
 
-	spinlock_t		c_send_lock;	/* protect send ring */
-	atomic_t		c_send_generation;
-	atomic_t		c_senders;
 	struct rds_message	*c_xmit_rm;
 	unsigned long		c_xmit_sg;
 	unsigned int		c_xmit_hdr_off;
@@ -120,6 +118,7 @@
 	struct delayed_work	c_conn_w;
 	struct work_struct	c_down_w;
 	struct mutex		c_cm_lock;	/* protect conn state & cm */
+	wait_queue_head_t	c_waitq;
 
 	struct list_head	c_map_item;
 	unsigned long		c_map_queued;
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41af..81471b2 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@
 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
 
 /*
- * Reset the send state. Caller must hold c_send_lock when calling here.
+ * Reset the send state.  Callers must ensure that this doesn't race with
+ * rds_send_xmit().
  */
 void rds_send_reset(struct rds_connection *conn)
 {
 	struct rds_message *rm, *tmp;
 	unsigned long flags;
 
-	spin_lock_irqsave(&conn->c_send_lock, flags);
 	if (conn->c_xmit_rm) {
 		rm = conn->c_xmit_rm;
 		conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@
 		 * independently) but as the connection is down, there's
 		 * no ongoing RDMA to/from that memory */
 		rds_message_unmapped(rm);
-		spin_unlock_irqrestore(&conn->c_send_lock, flags);
-
 		rds_message_put(rm);
-	} else {
-		spin_unlock_irqrestore(&conn->c_send_lock, flags);
 	}
 
 	conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@
 	spin_unlock_irqrestore(&conn->c_lock, flags);
 }
 
+static int acquire_in_xmit(struct rds_connection *conn)
+{
+	return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+}
+
+static void release_in_xmit(struct rds_connection *conn)
+{
+	clear_bit(RDS_IN_XMIT, &conn->c_flags);
+	smp_mb__after_clear_bit();
+	/*
+	 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+	 * hot path and finding waiters is very rare.  We don't want to walk
+	 * the system-wide hashed waitqueue buckets in the fast path only to
+	 * almost never find waiters.
+	 */
+	if (waitqueue_active(&conn->c_waitq))
+		wake_up_all(&conn->c_waitq);
+}
+
 /*
  * We're making the concious trade-off here to only send one message
  * down the connection at a time.
@@ -119,12 +134,9 @@
 	unsigned int tmp;
 	struct scatterlist *sg;
 	int ret = 0;
-	int gen = 0;
 	LIST_HEAD(to_be_dropped);
 
 restart:
-	if (!rds_conn_up(conn))
-		goto out;
 
 	/*
 	 * sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@
 	 * avoids blocking the caller and trading per-connection data between
 	 * caches per message.
 	 */
-	if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+	if (!acquire_in_xmit(conn)) {
 		rds_stats_inc(s_send_lock_contention);
 		ret = -ENOMEM;
 		goto out;
 	}
-	atomic_inc(&conn->c_senders);
+
+	/*
+	 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
+	 * we do the opposite to avoid races.
+	 */
+	if (!rds_conn_up(conn)) {
+		release_in_xmit(conn);
+		ret = 0;
+		goto out;
+	}
 
 	if (conn->c_trans->xmit_prepare)
 		conn->c_trans->xmit_prepare(conn);
 
-	gen = atomic_inc_return(&conn->c_send_generation);
-
 	/*
 	 * spin trying to push headers and data down the connection until
 	 * the connection doesn't make forward progress.
@@ -178,7 +197,7 @@
 		if (!rm) {
 			unsigned int len;
 
-			spin_lock(&conn->c_lock);
+			spin_lock_irqsave(&conn->c_lock, flags);
 
 			if (!list_empty(&conn->c_send_queue)) {
 				rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@
 				list_move_tail(&rm->m_conn_item, &conn->c_retrans);
 			}
 
-			spin_unlock(&conn->c_lock);
+			spin_unlock_irqrestore(&conn->c_lock, flags);
 
 			if (!rm)
 				break;
@@ -207,10 +226,10 @@
 			 */
 			if (rm->rdma.op_active &&
 			    test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
-				spin_lock(&conn->c_lock);
+				spin_lock_irqsave(&conn->c_lock, flags);
 				if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
 					list_move(&rm->m_conn_item, &to_be_dropped);
-				spin_unlock(&conn->c_lock);
+				spin_unlock_irqrestore(&conn->c_lock, flags);
 				continue;
 			}
 
@@ -336,19 +355,7 @@
 	if (conn->c_trans->xmit_complete)
 		conn->c_trans->xmit_complete(conn);
 
-	/*
-	 * We might be racing with another sender who queued a message but
-	 * backed off on noticing that we held the c_send_lock.  If we check
-	 * for queued messages after dropping the sem then either we'll
-	 * see the queued message or the queuer will get the sem.  If we
-	 * notice the queued message then we trigger an immediate retry.
-	 *
-	 * We need to be careful only to do this when we stopped processing
-	 * the send queue because it was empty.  It's the only way we
-	 * stop processing the loop when the transport hasn't taken
-	 * responsibility for forward progress.
-	 */
-	spin_unlock_irqrestore(&conn->c_send_lock, flags);
+	release_in_xmit(conn);
 
 	/* Nuke any messages we decided not to retransmit. */
 	if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@
 		rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 	}
 
-	atomic_dec(&conn->c_senders);
-
 	/*
-	 * Other senders will see we have c_send_lock and exit. We
-	 * need to recheck the send queue and race again for c_send_lock
-	 * to make sure messages don't just sit on the send queue, if
-	 * somebody hasn't already beat us into the loop.
+	 * Other senders can queue a message after we last test the send queue
+	 * but before we clear RDS_IN_XMIT.  In that case they'd back off and
+	 * not try and send their newly queued message.  We need to check the
+	 * send queue after having cleared RDS_IN_XMIT so that their message
+	 * doesn't get stuck on the send queue.
 	 *
 	 * If the transport cannot continue (i.e ret != 0), then it must
 	 * call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@
 		smp_mb();
 		if (!list_empty(&conn->c_send_queue)) {
 			rds_stats_inc(s_send_lock_queue_raced);
-			if (gen == atomic_read(&conn->c_send_generation)) {
-				goto restart;
-			}
+			goto restart;
 		}
 	}
 out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 7a8ca7a..2bab9bf 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -61,7 +61,7 @@
  *
  * Transition to state DISCONNECTING/DOWN:
  *  -	Inside the shutdown worker; synchronizes with xmit path
- *	through c_send_lock, and with connection management callbacks
+ *	through RDS_IN_XMIT, and with connection management callbacks
  *	via c_cm_lock.
  *
  *	For receive callbacks, we rely on the underlying transport