RDS: Rewrite rds_send_drop_to() for clarity

This function has been the source of numerous bugs; it's just
too complicated. Simplified to nest spinlocks cleanly within
the second loop body, and kick out early if there are no
rms to drop.

This will be a little slower because conn lock is grabbed for
each entry instead of "caching" the lock across rms, but this
should be entirely irrelevant to fastpath performance.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/send.c b/net/rds/send.c
index 9c1c6bc..aee58f9 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -619,9 +619,8 @@
 {
 	struct rds_message *rm, *tmp;
 	struct rds_connection *conn;
-	unsigned long flags, flags2;
+	unsigned long flags;
 	LIST_HEAD(list);
-	int wake = 0;
 
 	/* get all the messages we're dropping under the rs lock */
 	spin_lock_irqsave(&rs->rs_lock, flags);
@@ -631,59 +630,54 @@
 			     dest->sin_port != rm->m_inc.i_hdr.h_dport))
 			continue;
 
-		wake = 1;
 		list_move(&rm->m_sock_item, &list);
 		rds_send_sndbuf_remove(rs, rm);
 		clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 	}
 
 	/* order flag updates with the rs lock */
-	if (wake)
-		smp_mb__after_clear_bit();
+	smp_mb__after_clear_bit();
 
 	spin_unlock_irqrestore(&rs->rs_lock, flags);
 
-	conn = NULL;
+	if (list_empty(&list))
+		return;
 
-	/* now remove the messages from the conn list as needed */
+	/* Remove the messages from the conn */
 	list_for_each_entry(rm, &list, m_sock_item) {
-		/* We do this here rather than in the loop above, so that
-		 * we don't have to nest m_rs_lock under rs->rs_lock */
-		spin_lock_irqsave(&rm->m_rs_lock, flags2);
-		/* If this is a RDMA operation, notify the app. */
+
+		conn = rm->m_inc.i_conn;
+		spin_lock_irqsave(&conn->c_lock, flags);
+
+		/*
+		 * Maybe someone else beat us to removing rm from the conn.
+		 * If we race with their flag update we'll get the lock and
+		 * then really see that the flag has been cleared.
+		 */
+		if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
+			spin_unlock_irqrestore(&conn->c_lock, flags);
+			continue;
+		}
+
+		/*
+		 * Couldn't grab m_rs_lock in top loop (lock ordering),
+		 * but we can now.
+		 */
+		spin_lock(&rm->m_rs_lock);
+
 		spin_lock(&rs->rs_lock);
 		__rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
 		spin_unlock(&rs->rs_lock);
+
 		rm->m_rs = NULL;
-		spin_unlock_irqrestore(&rm->m_rs_lock, flags2);
+		spin_unlock(&rm->m_rs_lock);
 
-		/*
-		 * If we see this flag cleared then we're *sure* that someone
-		 * else beat us to removing it from the conn.  If we race
-		 * with their flag update we'll get the lock and then really
-		 * see that the flag has been cleared.
-		 */
-		if (!test_bit(RDS_MSG_ON_CONN, &rm->m_flags))
-			continue;
-
-		if (conn != rm->m_inc.i_conn) {
-			if (conn)
-				spin_unlock_irqrestore(&conn->c_lock, flags);
-			conn = rm->m_inc.i_conn;
-			spin_lock_irqsave(&conn->c_lock, flags);
-		}
-
-		if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
-			list_del_init(&rm->m_conn_item);
-			rds_message_put(rm);
-		}
+		list_del_init(&rm->m_conn_item);
+		rds_message_put(rm);
+		spin_unlock_irqrestore(&conn->c_lock, flags);
 	}
 
-	if (conn)
-		spin_unlock_irqrestore(&conn->c_lock, flags);
-
-	if (wake)
-		rds_wake_sk_sleep(rs);
+	rds_wake_sk_sleep(rs);
 
 	while (!list_empty(&list)) {
 		rm = list_entry(list.next, struct rds_message, m_sock_item);