RDS: IB: split send completion handling and do batch ack

Similar to what we did with receive CQ completion handling, we split
the transmit completion handler so that it lets us implement batched
work completion handling.

We re-use the cq_poll routine and makes use of RDS_IB_SEND_OP to
identify the send vs receive completion event handler invocation.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 28e0979..8f51d0d 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -250,11 +250,34 @@
 			rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 				 (unsigned long long)wc->wr_id, wc->status,
 				 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
-			rds_ib_recv_cqe_handler(ic, wc, ack_state);
+
+			if (wc->wr_id & RDS_IB_SEND_OP)
+				rds_ib_send_cqe_handler(ic, wc);
+			else
+				rds_ib_recv_cqe_handler(ic, wc, ack_state);
 		}
 	}
 }
 
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+	struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+	struct rds_connection *conn = ic->conn;
+	struct rds_ib_ack_state state;
+
+	rds_ib_stats_inc(s_ib_tasklet_call);
+
+	memset(&state, 0, sizeof(state));
+	poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+	ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+	poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+	if (rds_conn_up(conn) &&
+	    (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+	    test_bit(0, &conn->c_map_queued)))
+		rds_send_xmit(ic->conn);
+}
+
 static void rds_ib_tasklet_fn_recv(unsigned long data)
 {
 	struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
@@ -304,6 +327,18 @@
 	}
 }
 
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+	struct rds_connection *conn = context;
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
+	rdsdebug("conn %p cq %p\n", conn, cq);
+
+	rds_ib_stats_inc(s_ib_evt_handler_call);
+
+	tasklet_schedule(&ic->i_send_tasklet);
+}
+
 /*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
@@ -337,7 +372,8 @@
 	ic->i_pd = rds_ibdev->pd;
 
 	cq_attr.cqe = ic->i_send_ring.w_nr + 1;
-	ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+
+	ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
 				     rds_ib_cq_event_handler, conn,
 				     &cq_attr);
 	if (IS_ERR(ic->i_send_cq)) {
@@ -703,6 +739,7 @@
 		wait_event(rds_ib_ring_empty_wait,
 			   rds_ib_ring_empty(&ic->i_recv_ring) &&
 			   (atomic_read(&ic->i_signaled_sends) == 0));
+		tasklet_kill(&ic->i_send_tasklet);
 		tasklet_kill(&ic->i_recv_tasklet);
 
 		/* first destroy the ib state that generates callbacks */
@@ -809,8 +846,10 @@
 	}
 
 	INIT_LIST_HEAD(&ic->ib_node);
+	tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+		     (unsigned long)ic);
 	tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
-		     (unsigned long) ic);
+		     (unsigned long)ic);
 	mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
 	spin_lock_init(&ic->i_ack_lock);