RDS: Add flag for silent ops. Do atomic op before RDMA

Add a flag to the API so users can indicate they want
silent operations. This is needed because silent ops
cannot be used with USE_ONCE MRs, so we can't just
assume silent.

Also, change send_xmit to do atomic op before rdma op if
both are present, and centralize the hairy logic to determine if
we want to attempt silent, or not.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 5ba5146..48781fe 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -559,6 +559,7 @@
 	op->op_write = !!(args->flags & RDS_RDMA_READWRITE);
 	op->op_fence = !!(args->flags & RDS_RDMA_FENCE);
 	op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+	op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
 	op->op_active = 1;
 	op->op_recverr = rs->rs_recverr;
 	WARN_ON(!nr_pages);
@@ -747,6 +748,7 @@
 	}
 
 	rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+	rm->atomic.op_silent = !!(args->flags & RDS_RDMA_SILENT);
 	rm->atomic.op_active = 1;
 	rm->atomic.op_recverr = rs->rs_recverr;
 	rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 46d190d..23b9210 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -319,6 +319,7 @@
 			unsigned int		op_notify:1;
 			unsigned int		op_recverr:1;
 			unsigned int		op_mapped:1;
+			unsigned int		op_silent:1;
 			unsigned int		op_active:1;
 			struct scatterlist	*op_sg;
 			struct rds_notifier	*op_notifier;
@@ -333,6 +334,7 @@
 			unsigned int		op_notify:1;
 			unsigned int		op_recverr:1;
 			unsigned int		op_mapped:1;
+			unsigned int		op_silent:1;
 			unsigned int		op_active:1;
 			unsigned int		op_bytes;
 			unsigned int		op_nents;
diff --git a/net/rds/send.c b/net/rds/send.c
index cdca974..38567f3 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -250,6 +250,18 @@
 			conn->c_xmit_rm = rm;
 		}
 
+		/* The transport either sends the whole rdma or none of it */
+		if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
+			if (ret)
+				break;
+			conn->c_xmit_rdma_sent = 1;
+
+			/* The transport owns the mapped memory for now.
+			 * You can't unmap it while it's on the send queue */
+			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
+		}
+
 		if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
 			ret = conn->c_trans->xmit_atomic(conn, rm);
 			if (ret)
@@ -258,32 +270,28 @@
 			/* The transport owns the mapped memory for now.
 			 * You can't unmap it while it's on the send queue */
 			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
-
-			/*
-			 * This is evil, muahaha.
-			 * We permit 0-byte sends. (rds-ping depends on this.)
-			 * BUT if there is an atomic op and no sent data,
-			 * we turn off sending the header, to achieve
-			 * "silent" atomics.
-			 * But see below; RDMA op might toggle this back on!
-			 */
-			if (rm->data.op_nents == 0)
-				rm->data.op_active = 0;
 		}
 
-		/* The transport either sends the whole rdma or none of it */
-		if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
-			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
-			if (ret)
-				break;
-			conn->c_xmit_rdma_sent = 1;
+		/*
+		 * A number of cases require an RDS header to be sent
+		 * even if there is no data.
+		 * We permit 0-byte sends; rds-ping depends on this.
+		 * However, if there are exclusively attached silent ops,
+		 * we skip the hdr/data send, to enable silent operation.
+		 */
+		if (rm->data.op_nents == 0) {
+			int ops_present;
+			int all_ops_are_silent = 1;
 
-			/* rdmas need data sent, even if just the header */
-			rm->data.op_active = 1;
+			ops_present = (rm->atomic.op_active || rm->rdma.op_active);
+			if (rm->atomic.op_active && !rm->atomic.op_silent)
+				all_ops_are_silent = 0;
+			if (rm->rdma.op_active && !rm->rdma.op_silent)
+				all_ops_are_silent = 0;
 
-			/* The transport owns the mapped memory for now.
-			 * You can't unmap it while it's on the send queue */
-			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
+			if (ops_present && all_ops_are_silent
+			    && !rm->m_rdma_cookie)
+				rm->data.op_active = 0;
 		}
 
 		if (rm->data.op_active && !conn->c_xmit_data_sent) {
@@ -1009,8 +1017,7 @@
 	if (ret)
 		goto out;
 
-	if ((rm->m_rdma_cookie || rm->rdma.op_active) &&
-	    !conn->c_trans->xmit_rdma) {
+	if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
 		if (printk_ratelimit())
 			printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
 			       &rm->rdma, conn->c_trans->xmit_rdma);