RDS: rewrite rds_ib_xmit

Now that the header always goes first, it is possible to
simplify rds_ib_xmit. Instead of having a path to handle 0-byte
dgrams and another path to handle >0, these can both be handled
in one path. This lets us eliminate xmit_populate_wr().

Rename sent to bytes_sent, to differentiate better from other
variable named "send".

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 46026d9..06c1d7e 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -425,38 +425,6 @@
 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void
-rds_ib_xmit_populate_wr(struct rds_ib_connection *ic,
-		struct rds_ib_send_work *send, unsigned int pos,
-		unsigned long buffer, unsigned int length,
-		int send_flags)
-{
-	struct ib_sge *sge;
-
-	WARN_ON(pos != send - ic->i_sends);
-
-	send->s_wr.send_flags = send_flags;
-	send->s_wr.opcode = IB_WR_SEND;
-	send->s_wr.num_sge = 1;
-	send->s_wr.next = NULL;
-	send->s_queued = jiffies;
-	send->s_op = NULL;
-
-	sge = &send->s_sge[0];
-	sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header));
-	sge->length = sizeof(struct rds_header);
-	sge->lkey = ic->i_mr->lkey;
-
-	if (length != 0) {
-		send->s_wr.num_sge = 2;
-
-		sge = &send->s_sge[1];
-		sge->addr = buffer;
-		sge->length = length;
-		sge->lkey = ic->i_mr->lkey;
-	}
-}
-
 /*
  * This can be called multiple times for a given message.  The first time
  * we see a message we map its scatterlist into the IB device so that
@@ -483,11 +451,11 @@
 	u32 pos;
 	u32 i;
 	u32 work_alloc;
-	u32 credit_alloc;
+	u32 credit_alloc = 0;
 	u32 posted;
 	u32 adv_credits = 0;
 	int send_flags = 0;
-	int sent;
+	int bytes_sent = 0;
 	int ret;
 	int flow_controlled = 0;
 
@@ -515,7 +483,6 @@
 		goto out;
 	}
 
-	credit_alloc = work_alloc;
 	if (ic->i_flowctl) {
 		credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);
 		adv_credits += posted;
@@ -591,13 +558,6 @@
 		BUG_ON(adv_credits > 255);
 	}
 
-	send = &ic->i_sends[pos];
-	first = send;
-	prev = NULL;
-	scat = &rm->data.m_sg[sg];
-	sent = 0;
-	i = 0;
-
 	/* Sometimes you want to put a fence between an RDMA
 	 * READ and the following SEND.
 	 * We could either do this all the time
@@ -607,31 +567,45 @@
 	if (rm->rdma.m_rdma_op.r_active && rm->rdma.m_rdma_op.r_fence)
 		send_flags = IB_SEND_FENCE;
 
-	/*
-	 * We could be copying the header into the unused tail of the page.
-	 * That would need to be changed in the future when those pages might
-	 * be mapped userspace pages or page cache pages.  So instead we always
-	 * use a second sge and our long-lived ring of mapped headers.  We send
-	 * the header after the data so that the data payload can be aligned on
-	 * the receiver.
-	 */
+	/* Each frag gets a header. Msgs may be 0 bytes */
+	send = &ic->i_sends[pos];
+	first = send;
+	prev = NULL;
+	scat = &rm->data.m_sg[sg];
+	i = 0;
+	do {
+		unsigned int len = 0;
 
-	/* handle a 0-len message */
-	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) {
-		rds_ib_xmit_populate_wr(ic, send, pos, 0, 0, send_flags);
-		goto add_header;
-	}
+		/* Set up the header */
+		send->s_wr.send_flags = send_flags;
+		send->s_wr.opcode = IB_WR_SEND;
+		send->s_wr.num_sge = 1;
+		send->s_wr.next = NULL;
+		send->s_queued = jiffies;
+		send->s_op = NULL;
 
-	/* if there's data reference it with a chain of work reqs */
-	for (; i < work_alloc && scat != &rm->data.m_sg[rm->data.m_count]; i++) {
-		unsigned int len;
+		send->s_sge[0].addr = ic->i_send_hdrs_dma
+			+ (pos * sizeof(struct rds_header));
+		send->s_sge[0].length = sizeof(struct rds_header);
 
-		send = &ic->i_sends[pos];
+		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
 
-		len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
-		rds_ib_xmit_populate_wr(ic, send, pos,
-				ib_sg_dma_address(dev, scat) + off, len,
-				send_flags);
+		/* Set up the data, if present */
+		if (i < work_alloc
+		    && scat != &rm->data.m_sg[rm->data.m_count]) {
+			len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
+			send->s_wr.num_sge = 2;
+
+			send->s_sge[1].addr = ib_sg_dma_address(dev, scat) + off;
+			send->s_sge[1].length = len;
+
+			bytes_sent += len;
+			off += len;
+			if (off == ib_sg_dma_len(dev, scat)) {
+				scat++;
+				off = 0;
+			}
+		}
 
 		/*
 		 * We want to delay signaling completions just enough to get
@@ -658,18 +632,6 @@
 		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 			 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
-		sent += len;
-		off += len;
-		if (off == ib_sg_dma_len(dev, scat)) {
-			scat++;
-			off = 0;
-		}
-
-add_header:
-		/* Tack on the header after the data. The header SGE should already
-		 * have been set up to point to the right header buffer. */
-		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
-
 		if (adv_credits) {
 			struct rds_header *hdr = &ic->i_send_hdrs[pos];
 
@@ -685,12 +647,16 @@
 		prev = send;
 
 		pos = (pos + 1) % ic->i_send_ring.w_nr;
-	}
+		send = &ic->i_sends[pos];
+		i++;
+
+	} while (i < work_alloc
+		 && scat != &rm->data.m_sg[rm->data.m_count]);
 
 	/* Account the RDS header in the number of bytes we sent, but just once.
 	 * The caller has no concept of fragmentation. */
 	if (hdr_off == 0)
-		sent += sizeof(struct rds_header);
+		bytes_sent += sizeof(struct rds_header);
 
 	/* if we finished the message then send completion owns it */
 	if (scat == &rm->data.m_sg[rm->data.m_count]) {
@@ -699,6 +665,7 @@
 		ic->i_rm = NULL;
 	}
 
+	/* Put back wrs & credits we didn't use */
 	if (i < work_alloc) {
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 		work_alloc = i;
@@ -725,7 +692,7 @@
 		goto out;
 	}
 
-	ret = sent;
+	ret = bytes_sent;
 out:
 	BUG_ON(adv_credits);
 	return ret;