RDS: Perform unmapping ops in stages

Previously, RDS would wait until the final send WR had completed
and then handle cleanup. With silent ops, we do not know
if an atomic, rdma, or data op will be last. This patch
handles any of these cases by keeping a pointer to the last
op in the message in m_last_op.

When the TX completion event fires, rds dispatches to per-op-type
cleanup functions, and then does whole-message cleanup, if the
last op equalled m_last_op.

This patch also moves towards having op-specific functions take
the op struct, instead of the overall rm struct.

rds_ib_connection has a pointer to keep track of a a partially-
completed data send operation. This patch changes it from an
rds_message pointer to the narrower rm_data_op pointer, and
modifies places that use this pointer as needed.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 95f1524..6461a15 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -67,80 +67,122 @@
 	complete(rm, notify_status);
 }
 
-static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
-			  struct rds_ib_send_work *send,
-			  int wc_status)
+static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
+				   struct rm_data_op *op,
+				   int wc_status)
 {
-	struct rds_message *rm = send->s_rm;
+	if (op->op_nents)
+		ib_dma_unmap_sg(ic->i_cm_id->device,
+				op->op_sg, op->op_nents,
+				DMA_TO_DEVICE);
+}
 
-	rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
-
-	ib_dma_unmap_sg(ic->i_cm_id->device,
-			rm->data.op_sg, rm->data.op_nents,
-			DMA_TO_DEVICE);
-
-	if (rm->rdma.op_active) {
-		struct rm_rdma_op *op = &rm->rdma;
-
-		if (op->op_mapped) {
-			ib_dma_unmap_sg(ic->i_cm_id->device,
-					op->op_sg, op->op_nents,
-					op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
-			op->op_mapped = 0;
-		}
-
-		/* If the user asked for a completion notification on this
-		 * message, we can implement three different semantics:
-		 *  1.	Notify when we received the ACK on the RDS message
-		 *	that was queued with the RDMA. This provides reliable
-		 *	notification of RDMA status at the expense of a one-way
-		 *	packet delay.
-		 *  2.	Notify when the IB stack gives us the completion event for
-		 *	the RDMA operation.
-		 *  3.	Notify when the IB stack gives us the completion event for
-		 *	the accompanying RDS messages.
-		 * Here, we implement approach #3. To implement approach #2,
-		 * call rds_rdma_send_complete from the cq_handler. To implement #1,
-		 * don't call rds_rdma_send_complete at all, and fall back to the notify
-		 * handling in the ACK processing code.
-		 *
-		 * Note: There's no need to explicitly sync any RDMA buffers using
-		 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
-		 * operation itself unmapped the RDMA buffers, which takes care
-		 * of synching.
-		 */
-		rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete);
-
-		if (rm->rdma.op_write)
-			rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes);
-		else
-			rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes);
+static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
+				   struct rm_rdma_op *op,
+				   int wc_status)
+{
+	if (op->op_mapped) {
+		ib_dma_unmap_sg(ic->i_cm_id->device,
+				op->op_sg, op->op_nents,
+				op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
+		op->op_mapped = 0;
 	}
 
-	if (rm->atomic.op_active) {
-		struct rm_atomic_op *op = &rm->atomic;
+	/* If the user asked for a completion notification on this
+	 * message, we can implement three different semantics:
+	 *  1.	Notify when we received the ACK on the RDS message
+	 *	that was queued with the RDMA. This provides reliable
+	 *	notification of RDMA status at the expense of a one-way
+	 *	packet delay.
+	 *  2.	Notify when the IB stack gives us the completion event for
+	 *	the RDMA operation.
+	 *  3.	Notify when the IB stack gives us the completion event for
+	 *	the accompanying RDS messages.
+	 * Here, we implement approach #3. To implement approach #2,
+	 * we would need to take an event for the rdma WR. To implement #1,
+	 * don't call rds_rdma_send_complete at all, and fall back to the notify
+	 * handling in the ACK processing code.
+	 *
+	 * Note: There's no need to explicitly sync any RDMA buffers using
+	 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
+	 * operation itself unmapped the RDMA buffers, which takes care
+	 * of synching.
+	 */
+	rds_ib_send_complete(container_of(op, struct rds_message, rdma),
+			     wc_status, rds_rdma_send_complete);
 
-		/* unmap atomic recvbuf */
-		if (op->op_mapped) {
-			ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
-					DMA_FROM_DEVICE);
-			op->op_mapped = 0;
-		}
+	if (op->op_write)
+		rds_stats_add(s_send_rdma_bytes, op->op_bytes);
+	else
+		rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
+}
 
-		rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete);
-
-		if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP)
-			rds_stats_inc(s_atomic_cswp);
-		else
-			rds_stats_inc(s_atomic_fadd);
+static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
+				     struct rm_atomic_op *op,
+				     int wc_status)
+{
+	/* unmap atomic recvbuf */
+	if (op->op_mapped) {
+		ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
+				DMA_FROM_DEVICE);
+		op->op_mapped = 0;
 	}
 
-	/* If anyone waited for this message to get flushed out, wake
-	 * them up now */
-	rds_message_unmapped(rm);
+	rds_ib_send_complete(container_of(op, struct rds_message, atomic),
+			     wc_status, rds_atomic_send_complete);
 
-	rds_message_put(rm);
-	send->s_rm = NULL;
+	if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
+		rds_stats_inc(s_atomic_cswp);
+	else
+		rds_stats_inc(s_atomic_fadd);
+}
+
+/*
+ * Unmap the resources associated with a struct send_work.
+ *
+ * Returns the rm for no good reason other than it is unobtainable
+ * other than by switching on wr.opcode, currently, and the caller,
+ * the event handler, needs it.
+ */
+static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
+						struct rds_ib_send_work *send,
+						int wc_status)
+{
+	struct rds_message *rm = NULL;
+
+	/* In the error case, wc.opcode sometimes contains garbage */
+	switch (send->s_wr.opcode) {
+	case IB_WR_SEND:
+		if (send->s_op) {
+			rm = container_of(send->s_op, struct rds_message, data);
+			rds_ib_send_unmap_data(ic, send->s_op, wc_status);
+		}
+		break;
+	case IB_WR_RDMA_WRITE:
+	case IB_WR_RDMA_READ:
+		if (send->s_op) {
+			rm = container_of(send->s_op, struct rds_message, rdma);
+			rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
+		}
+		break;
+	case IB_WR_ATOMIC_FETCH_AND_ADD:
+	case IB_WR_ATOMIC_CMP_AND_SWP:
+		if (send->s_op) {
+			rm = container_of(send->s_op, struct rds_message, atomic);
+			rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
+		}
+		break;
+	default:
+		if (printk_ratelimit())
+			printk(KERN_NOTICE
+			       "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
+			       __func__, send->s_wr.opcode);
+		break;
+	}
+
+	send->s_wr.opcode = 0xdead;
+
+	return rm;
 }
 
 void rds_ib_send_init_ring(struct rds_ib_connection *ic)
@@ -151,7 +193,6 @@
 	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 		struct ib_sge *sge;
 
-		send->s_rm = NULL;
 		send->s_op = NULL;
 
 		send->s_wr.wr_id = i;
@@ -173,9 +214,8 @@
 	u32 i;
 
 	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
-		if (!send->s_rm || send->s_wr.opcode == 0xdead)
-			continue;
-		rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
+		if (send->s_op && send->s_wr.opcode != 0xdead)
+			rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
 	}
 }
 
@@ -189,6 +229,7 @@
 {
 	struct rds_connection *conn = context;
 	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rds_message *rm = NULL;
 	struct ib_wc wc;
 	struct rds_ib_send_work *send;
 	u32 completed;
@@ -222,42 +263,18 @@
 		for (i = 0; i < completed; i++) {
 			send = &ic->i_sends[oldest];
 
-			/* In the error case, wc.opcode sometimes contains garbage */
-			switch (send->s_wr.opcode) {
-			case IB_WR_SEND:
-			case IB_WR_RDMA_WRITE:
-			case IB_WR_RDMA_READ:
-			case IB_WR_ATOMIC_FETCH_AND_ADD:
-			case IB_WR_ATOMIC_CMP_AND_SWP:
-				if (send->s_rm)
-					rds_ib_send_unmap_rm(ic, send, wc.status);
-				break;
-			default:
-				if (printk_ratelimit())
-					printk(KERN_NOTICE
-						"RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
-						__func__, send->s_wr.opcode);
-				break;
-			}
+			rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
-			send->s_wr.opcode = 0xdead;
-			send->s_wr.num_sge = 1;
 			if (send->s_queued + HZ/2 < jiffies)
 				rds_ib_stats_inc(s_ib_tx_stalled);
 
-			/* If a RDMA operation produced an error, signal this right
-			 * away. If we don't, the subsequent SEND that goes with this
-			 * RDMA will be canceled with ERR_WFLUSH, and the application
-			 * never learn that the RDMA failed. */
-			if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) {
-				struct rds_message *rm;
+			if (&send->s_op == &rm->m_final_op) {
+				/* If anyone waited for this message to get flushed out, wake
+				 * them up now */
+				rds_message_unmapped(rm);
 
-				rm = rds_send_get_message(conn, send->s_op);
-				if (rm) {
-					rds_ib_send_unmap_rm(ic, send, wc.status);
-					rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete);
-					rds_message_put(rm);
-				}
+				rds_message_put(rm);
+				send->s_op = NULL;
 			}
 
 			oldest = (oldest + 1) % ic->i_send_ring.w_nr;
@@ -512,7 +529,7 @@
 	}
 
 	/* map the message the first time we see it */
-	if (!ic->i_rm) {
+	if (!ic->i_data_op) {
 		if (rm->data.op_nents) {
 			rm->data.op_count = ib_dma_map_sg(dev,
 							  rm->data.op_sg,
@@ -530,7 +547,7 @@
 		}
 
 		rds_message_addref(rm);
-		ic->i_rm = rm;
+		ic->i_data_op = &rm->data;
 
 		/* Finalize the header */
 		if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
@@ -583,7 +600,7 @@
 	send = &ic->i_sends[pos];
 	first = send;
 	prev = NULL;
-	scat = &rm->data.op_sg[sg];
+	scat = &ic->i_data_op->op_sg[sg];
 	i = 0;
 	do {
 		unsigned int len = 0;
@@ -658,9 +675,9 @@
 
 	/* if we finished the message then send completion owns it */
 	if (scat == &rm->data.op_sg[rm->data.op_count]) {
-		prev->s_rm = ic->i_rm;
+		prev->s_op = ic->i_data_op;
 		prev->s_wr.send_flags |= IB_SEND_SOLICITED;
-		ic->i_rm = NULL;
+		ic->i_data_op = NULL;
 	}
 
 	/* Put back wrs & credits we didn't use */
@@ -681,9 +698,9 @@
 		printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
-		if (prev->s_rm) {
-			ic->i_rm = prev->s_rm;
-			prev->s_rm = NULL;
+		if (prev->s_op) {
+			ic->i_data_op = prev->s_op;
+			prev->s_op = NULL;
 		}
 
 		rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
@@ -701,10 +718,9 @@
  * A simplified version of the rdma case, we always map 1 SG, and
  * only 8 bytes, for the return value from the atomic operation.
  */
-int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
-	struct rm_atomic_op *op = &rm->atomic;
 	struct rds_ib_send_work *send = NULL;
 	struct ib_send_wr *failed_wr;
 	struct rds_ib_device *rds_ibdev;
@@ -741,14 +757,6 @@
 	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
 	send->s_wr.wr.atomic.rkey = op->op_rkey;
 
-	/*
-	 * If there is no data or rdma ops in the message, then
-	 * we must fill in s_rm ourselves, so we properly clean up
-	 * on completion.
-	 */
-	if (!rm->rdma.op_active && !rm->data.op_active)
-		send->s_rm = rm;
-
 	/* map 8 byte retval buffer to the device */
 	ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
 	rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -809,7 +817,7 @@
 
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
-	/* map the message the first time we see it */
+	/* map the op the first time we see it */
 	if (!op->op_mapped) {
 		op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
 					     op->op_sg, op->op_nents, (op->op_write) ?