xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers

xprtrdma's backward direction send and receive buffers are the same
size as the forechannel's inline threshold, and must be pre-
registered.

The consumer has no control over which receive buffer the adapter
chooses to catch an incoming backwards-direction call. Any receive
buffer can be used for either a forward reply or a backward call.
Thus both types of RPC message must all be the same size.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Sagi Grimberg <sagig@mellanox.com>
Tested-By: Devesh Sharma <devesh.sharma@avagotech.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index baa0523..7f0ed30 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -831,7 +831,21 @@
 		}
 		rc = ep->rep_connected;
 	} else {
+		struct rpcrdma_xprt *r_xprt;
+		unsigned int extras;
+
 		dprintk("RPC:       %s: connected\n", __func__);
+
+		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+		if (extras) {
+			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+			if (rc)
+				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+					__func__, rc);
+				rc = 0;
+		}
 	}
 
 out:
@@ -868,20 +882,25 @@
 	}
 }
 
-static struct rpcrdma_req *
+struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
+	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 	struct rpcrdma_req *req;
 
 	req = kzalloc(sizeof(*req), GFP_KERNEL);
 	if (req == NULL)
 		return ERR_PTR(-ENOMEM);
 
+	INIT_LIST_HEAD(&req->rl_free);
+	spin_lock(&buffer->rb_reqslock);
+	list_add(&req->rl_all, &buffer->rb_allreqs);
+	spin_unlock(&buffer->rb_reqslock);
 	req->rl_buffer = &r_xprt->rx_buf;
 	return req;
 }
 
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -920,6 +939,7 @@
 	int i, rc;
 
 	buf->rb_max_requests = r_xprt->rx_data.max_requests;
+	buf->rb_bc_srv_max_requests = 0;
 	spin_lock_init(&buf->rb_lock);
 
 	rc = ia->ri_ops->ro_init(r_xprt);
@@ -927,6 +947,8 @@
 		goto out;
 
 	INIT_LIST_HEAD(&buf->rb_send_bufs);
+	INIT_LIST_HEAD(&buf->rb_allreqs);
+	spin_lock_init(&buf->rb_reqslock);
 	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_req *req;
 
@@ -937,6 +959,7 @@
 			rc = PTR_ERR(req);
 			goto out;
 		}
+		req->rl_backchannel = false;
 		list_add(&req->rl_free, &buf->rb_send_bufs);
 	}
 
@@ -985,19 +1008,13 @@
 static void
 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 {
-	if (!rep)
-		return;
-
 	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
 	kfree(rep);
 }
 
-static void
+void
 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 {
-	if (!req)
-		return;
-
 	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
 	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
 	kfree(req);
@@ -1015,12 +1032,19 @@
 		rpcrdma_destroy_rep(ia, rep);
 	}
 
-	while (!list_empty(&buf->rb_send_bufs)) {
+	spin_lock(&buf->rb_reqslock);
+	while (!list_empty(&buf->rb_allreqs)) {
 		struct rpcrdma_req *req;
 
-		req = rpcrdma_buffer_get_req_locked(buf);
+		req = list_first_entry(&buf->rb_allreqs,
+				       struct rpcrdma_req, rl_all);
+		list_del(&req->rl_all);
+
+		spin_unlock(&buf->rb_reqslock);
 		rpcrdma_destroy_req(ia, req);
+		spin_lock(&buf->rb_reqslock);
 	}
+	spin_unlock(&buf->rb_reqslock);
 
 	ia->ri_ops->ro_destroy(buf);
 }
@@ -1288,6 +1312,47 @@
 	return rc;
 }
 
+/**
+ * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_rep *rep;
+	unsigned long flags;
+	int rc;
+
+	while (count--) {
+		spin_lock_irqsave(&buffers->rb_lock, flags);
+		if (list_empty(&buffers->rb_recv_bufs))
+			goto out_reqbuf;
+		rep = rpcrdma_buffer_get_rep_locked(buffers);
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+		rc = rpcrdma_ep_post_recv(ia, ep, rep);
+		if (rc)
+			goto out_rc;
+	}
+
+	return 0;
+
+out_reqbuf:
+	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	pr_warn("%s: no extra receive buffers\n", __func__);
+	return -ENOMEM;
+
+out_rc:
+	rpcrdma_recv_buffer_put(rep);
+	return rc;
+}
+
 /* How many chunk list items fit within our inline buffers?
  */
 unsigned int