RDS: Rewrite rds_send_xmit

Simplify rds_send_xmit().

Send a congestion map (via xmit_cong_map) without
decrementing send_quota.

Move resetting of conn xmit variables to end of loop.

Update comments.

Implement a special case to turn off sending an rds header
when there is an atomic op and no other data.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/send.c b/net/rds/send.c
index d60d313..66dc6b0 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -72,8 +72,9 @@
 	conn->c_xmit_sg = 0;
 	conn->c_xmit_hdr_off = 0;
 	conn->c_xmit_data_off = 0;
-	conn->c_xmit_rdma_sent = 0;
 	conn->c_xmit_atomic_sent = 0;
+	conn->c_xmit_rdma_sent = 0;
+	conn->c_xmit_data_sent = 0;
 
 	conn->c_map_queued = 0;
 
@@ -137,69 +138,54 @@
 
 	/*
 	 * spin trying to push headers and data down the connection until
-	 * the connection doens't make forward progress.
+	 * the connection doesn't make forward progress.
 	 */
 	while (--send_quota) {
-		/*
-		 * See if need to send a congestion map update if we're
-		 * between sending messages.  The send_sem protects our sole
-		 * use of c_map_offset and _bytes.
-		 * Note this is used only by transports that define a special
-		 * xmit_cong_map function. For all others, we create allocate
-		 * a cong_map message and treat it just like any other send.
-		 */
-		if (conn->c_map_bytes) {
-			ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
-							   conn->c_map_offset);
-			if (ret <= 0)
-				break;
 
-			conn->c_map_offset += ret;
-			conn->c_map_bytes -= ret;
-			if (conn->c_map_bytes)
-				continue;
-		}
-
-		/* If we're done sending the current message, clear the
-		 * offset and S/G temporaries.
-		 */
 		rm = conn->c_xmit_rm;
-		if (rm &&
-		    conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
-		    conn->c_xmit_sg == rm->data.op_nents) {
-			conn->c_xmit_rm = NULL;
-			conn->c_xmit_sg = 0;
-			conn->c_xmit_hdr_off = 0;
-			conn->c_xmit_data_off = 0;
-			conn->c_xmit_rdma_sent = 0;
-			conn->c_xmit_atomic_sent = 0;
 
-			/* Release the reference to the previous message. */
-			rds_message_put(rm);
-			rm = NULL;
-		}
-
-		/* If we're asked to send a cong map update, do so.
+		/*
+		 * If between sending messages, we can send a pending congestion
+		 * map update.
+		 *
+		 * Transports either define a special xmit_cong_map function,
+		 * or we allocate a cong_map message and treat it just like any
+		 * other send.
 		 */
 		if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
 			if (conn->c_trans->xmit_cong_map) {
-				conn->c_map_offset = 0;
-				conn->c_map_bytes = sizeof(struct rds_header) +
+				unsigned long map_offset = 0;
+				unsigned long map_bytes = sizeof(struct rds_header) +
 					RDS_CONG_MAP_BYTES;
-				continue;
-			}
 
-			rm = rds_cong_update_alloc(conn);
-			if (IS_ERR(rm)) {
-				ret = PTR_ERR(rm);
-				break;
-			}
+				while (map_bytes) {
+					ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
+									   map_offset);
+					if (ret <= 0) {
+						/* too far down the rabbithole! */
+						mutex_unlock(&conn->c_send_lock);
+						rds_conn_error(conn, "Cong map xmit failed\n");
+						goto out;
+					}
 
-			conn->c_xmit_rm = rm;
+					map_offset += ret;
+					map_bytes -= ret;
+				}
+			} else {
+				/* send cong update like a normal rm */
+				rm = rds_cong_update_alloc(conn);
+				if (IS_ERR(rm)) {
+					ret = PTR_ERR(rm);
+					break;
+				}
+				rm->data.op_active = 1;
+
+				conn->c_xmit_rm = rm;
+			}
 		}
 
 		/*
-		 * Grab the next message from the send queue, if there is one.
+		 * If not already working on one, grab the next message.
 		 *
 		 * c_xmit_rm holds a ref while we're sending this message down
 		 * the connction.  We can use this ref while holding the
@@ -264,7 +250,6 @@
 			conn->c_xmit_rm = rm;
 		}
 
-
 		if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
 			ret = conn->c_trans->xmit_atomic(conn, rm);
 			if (ret)
@@ -273,13 +258,20 @@
 			/* The transport owns the mapped memory for now.
 			 * You can't unmap it while it's on the send queue */
 			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
+
+			/*
+			 * This is evil, muahaha.
+			 * We permit 0-byte sends. (rds-ping depends on this.)
+			 * BUT if there is an atomic op and no sent data,
+			 * we turn off sending the header, to achieve
+			 * "silent" atomics.
+			 * But see below; RDMA op might toggle this back on!
+			 */
+			if (rm->data.op_nents == 0)
+				rm->data.op_active = 0;
 		}
 
-		/*
-		 * Try and send an rdma message.  Let's see if we can
-		 * keep this simple and require that the transport either
-		 * send the whole rdma or none of it.
-		 */
+		/* The transport either sends the whole rdma or none of it */
 		if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
 			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
 			if (ret)
@@ -294,9 +286,7 @@
 			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 		}
 
-		if (rm->data.op_active
-		    && (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
-			conn->c_xmit_sg < rm->data.op_nents)) {
+		if (rm->data.op_active && !conn->c_xmit_data_sent) {
 			ret = conn->c_trans->xmit(conn, rm,
 						  conn->c_xmit_hdr_off,
 						  conn->c_xmit_sg,
@@ -326,6 +316,27 @@
 					       conn->c_xmit_sg == rm->data.op_nents);
 				}
 			}
+
+			if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
+			    (conn->c_xmit_sg == rm->data.op_nents))
+				conn->c_xmit_data_sent = 1;
+		}
+
+		/*
+		 * A rm will only take multiple times through this loop
+		 * if there is a data op. Thus, if the data is sent (or there was
+		 * none), then we're done with the rm.
+		 */
+		if (!rm->data.op_active || conn->c_xmit_data_sent) {
+			conn->c_xmit_rm = NULL;
+			conn->c_xmit_sg = 0;
+			conn->c_xmit_hdr_off = 0;
+			conn->c_xmit_data_off = 0;
+			conn->c_xmit_rdma_sent = 0;
+			conn->c_xmit_atomic_sent = 0;
+			conn->c_xmit_data_sent = 0;
+
+			rds_message_put(rm);
 		}
 	}
 
@@ -350,7 +361,7 @@
 	 */
 	mutex_unlock(&conn->c_send_lock);
 
-	if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
+	if (send_quota == 0 && !was_empty) {
 		/* We exhausted the send quota, but there's work left to
 		 * do. Return and (re-)schedule the send worker.
 		 */