rds: don't let RDS shutdown a connection while senders are present

This is the first in a long line of patches that tries to fix races
between RDS connection shutdown and RDS traffic.

Here we are maintaining a count of active senders to make sure
the connection doesn't go away while they are using it.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 7e4e9df..9c249f3 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -148,6 +148,7 @@
 
 	spin_lock_init(&conn->c_send_lock);
 	atomic_set(&conn->c_send_generation, 1);
+	atomic_set(&conn->c_senders, 0);
 	INIT_LIST_HEAD(&conn->c_send_queue);
 	INIT_LIST_HEAD(&conn->c_retrans);
 
@@ -276,6 +277,12 @@
 		spin_lock_irq(&conn->c_send_lock);
 		spin_unlock_irq(&conn->c_send_lock);
 
+		while(atomic_read(&conn->c_senders)) {
+			schedule_timeout(1);
+			spin_lock_irq(&conn->c_send_lock);
+			spin_unlock_irq(&conn->c_send_lock);
+		}
+
 		conn->c_trans->conn_shutdown(conn);
 		rds_conn_reset(conn);
 
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 8f041f7..24d1461 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -863,18 +863,6 @@
 	int ret = 0;
 
 	rdsdebug("conn %p\n", conn);
-
-	/*
-	 * If we get a temporary posting failure in this context then
-	 * we're really low and we want the caller to back off for a bit.
-	 */
-	mutex_lock(&ic->i_recv_mutex);
-	if (rds_ib_recv_refill(conn, 0))
-		ret = -ENOMEM;
-	else
-		rds_ib_stats_inc(s_ib_rx_refill_from_thread);
-	mutex_unlock(&ic->i_recv_mutex);
-
 	if (rds_conn_up(conn))
 		rds_ib_attempt_ack(ic);
 
diff --git a/net/rds/message.c b/net/rds/message.c
index 96e2bf7..84f937f 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -81,7 +81,10 @@
 void rds_message_put(struct rds_message *rm)
 {
 	rdsdebug("put rm %p ref %d\n", rm, atomic_read(&rm->m_refcount));
-
+	if (atomic_read(&rm->m_refcount) == 0) {
+printk(KERN_CRIT "danger refcount zero on %p\n", rm);
+WARN_ON(1);
+	}
 	if (atomic_dec_and_test(&rm->m_refcount)) {
 		BUG_ON(!list_empty(&rm->m_sock_item));
 		BUG_ON(!list_empty(&rm->m_conn_item));
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 241a085..4ab3d1a 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -93,6 +93,7 @@
 
 	spinlock_t		c_send_lock;	/* protect send ring */
 	atomic_t		c_send_generation;
+	atomic_t		c_senders;
 	struct rds_message	*c_xmit_rm;
 	unsigned long		c_xmit_sg;
 	unsigned int		c_xmit_hdr_off;
diff --git a/net/rds/send.c b/net/rds/send.c
index 8e3fd99..d35c43f 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -60,15 +60,23 @@
 	struct rds_message *rm, *tmp;
 	unsigned long flags;
 
+	spin_lock_irqsave(&conn->c_send_lock, flags);
 	if (conn->c_xmit_rm) {
+		rm = conn->c_xmit_rm;
+		conn->c_xmit_rm = NULL;
 		/* Tell the user the RDMA op is no longer mapped by the
 		 * transport. This isn't entirely true (it's flushed out
 		 * independently) but as the connection is down, there's
 		 * no ongoing RDMA to/from that memory */
-		rds_message_unmapped(conn->c_xmit_rm);
-		rds_message_put(conn->c_xmit_rm);
-		conn->c_xmit_rm = NULL;
+printk(KERN_CRIT "send reset unmapping %p\n", rm);
+		rds_message_unmapped(rm);
+		spin_unlock_irqrestore(&conn->c_send_lock, flags);
+
+		rds_message_put(rm);
+	} else {
+		spin_unlock_irqrestore(&conn->c_send_lock, flags);
 	}
+
 	conn->c_xmit_sg = 0;
 	conn->c_xmit_hdr_off = 0;
 	conn->c_xmit_data_off = 0;
@@ -131,6 +139,7 @@
 		ret = -ENOMEM;
 		goto out;
 	}
+	atomic_inc(&conn->c_senders);
 
 	if (conn->c_trans->xmit_prepare)
 		conn->c_trans->xmit_prepare(conn);
@@ -350,6 +359,8 @@
 		rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 	}
 
+	atomic_dec(&conn->c_senders);
+
 	/*
 	 * Other senders will see we have c_send_lock and exit. We
 	 * need to recheck the send queue and race again for c_send_lock