SUNRPC: switchable buffer allocation

 Add RPC client transport switch support for replacing buffer management
 on a per-transport basis.

 In the current IPv4 socket transport implementation, RPC buffers are
 allocated as needed for each RPC message that is sent.  Some transport
 implementations may choose to use pre-allocated buffers for encoding,
 sending, receiving, and unmarshalling RPC messages, however.  For
 transports capable of direct data placement, the buffers can be carved
 out of a pre-registered area of memory rather than from a slab cache.

 Test-plan:
 Millions of fsx operations.  Performance characterization with "sio" and
 "iozone".  Use oprofile and other tools to look for significant regression
 in CPU utilization.

 Signed-off-by: Chuck Lever <cel@netapp.com>
 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 94b0afa..8b25629 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -52,8 +52,6 @@
 	 * RPC call state
 	 */
 	struct rpc_message	tk_msg;		/* RPC call info */
-	__u32 *			tk_buffer;	/* XDR buffer */
-	size_t			tk_bufsize;
 	__u8			tk_garb_retry;
 	__u8			tk_cred_retry;
 
@@ -268,6 +266,7 @@
 void		rpc_wake_up_status(struct rpc_wait_queue *, int);
 void		rpc_delay(struct rpc_task *, unsigned long);
 void *		rpc_malloc(struct rpc_task *, size_t);
+void		rpc_free(struct rpc_task *);
 int		rpciod_up(void);
 void		rpciod_down(void);
 void		rpciod_wake_up(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 3b8b6e8..7885b96 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -79,21 +79,19 @@
 	void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
 	struct list_head	rq_list;
 
+	__u32 *			rq_buffer;	/* XDR encode buffer */
+	size_t			rq_bufsize;
+
 	struct xdr_buf		rq_private_buf;		/* The receive buffer
 							 * used in the softirq.
 							 */
 	unsigned long		rq_majortimeo;	/* major timeout alarm */
 	unsigned long		rq_timeout;	/* Current timeout value */
 	unsigned int		rq_retries;	/* # of retries */
-	/*
-	 * For authentication (e.g. auth_des)
-	 */
-	u32			rq_creddata[2];
 	
 	/*
 	 * Partial send handling
 	 */
-	
 	u32			rq_bytes_sent;	/* Bytes we have sent */
 
 	unsigned long		rq_xtime;	/* when transmitted */
@@ -107,6 +105,8 @@
 	int		(*reserve_xprt)(struct rpc_task *task);
 	void		(*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
 	void		(*connect)(struct rpc_task *task);
+	void *		(*buf_alloc)(struct rpc_task *task, size_t size);
+	void		(*buf_free)(struct rpc_task *task);
 	int		(*send_request)(struct rpc_task *task);
 	void		(*set_retrans_timeout)(struct rpc_task *task);
 	void		(*timer)(struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b23c0d3..25cba94 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -644,24 +644,26 @@
 
 /*
  * 2.	Allocate the buffer. For details, see sched.c:rpc_malloc.
- *	(Note: buffer memory is freed in rpc_task_release).
+ *	(Note: buffer memory is freed in xprt_release).
  */
 static void
 call_allocate(struct rpc_task *task)
 {
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_xprt *xprt = task->tk_xprt;
 	unsigned int	bufsiz;
 
 	dprintk("RPC: %4d call_allocate (status %d)\n", 
 				task->tk_pid, task->tk_status);
 	task->tk_action = call_bind;
-	if (task->tk_buffer)
+	if (req->rq_buffer)
 		return;
 
 	/* FIXME: compute buffer requirements more exactly using
 	 * auth->au_wslack */
 	bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
 
-	if (rpc_malloc(task, bufsiz << 1) != NULL)
+	if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
 		return;
 	printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
 
@@ -704,14 +706,14 @@
 				task->tk_pid, task->tk_status);
 
 	/* Default buffer setup */
-	bufsiz = task->tk_bufsize >> 1;
-	sndbuf->head[0].iov_base = (void *)task->tk_buffer;
+	bufsiz = req->rq_bufsize >> 1;
+	sndbuf->head[0].iov_base = (void *)req->rq_buffer;
 	sndbuf->head[0].iov_len  = bufsiz;
 	sndbuf->tail[0].iov_len  = 0;
 	sndbuf->page_len	 = 0;
 	sndbuf->len		 = 0;
 	sndbuf->buflen		 = bufsiz;
-	rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
+	rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
 	rcvbuf->head[0].iov_len  = bufsiz;
 	rcvbuf->tail[0].iov_len  = 0;
 	rcvbuf->page_len	 = 0;
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 48510e3..7415406 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -41,8 +41,6 @@
 
 static void			__rpc_default_timer(struct rpc_task *task);
 static void			rpciod_killall(void);
-static void			rpc_free(struct rpc_task *task);
-
 static void			rpc_async_schedule(void *);
 
 /*
@@ -599,7 +597,6 @@
 			WARN_ON(RPC_ASSASSINATED(task));
 			/* Always release the RPC slot and buffer memory */
 			xprt_release(task);
-			rpc_free(task);
 		}
 	}
 }
@@ -724,17 +721,19 @@
 	__rpc_execute((struct rpc_task *)arg);
 }
 
-/*
- * Allocate memory for RPC purposes.
+/**
+ * rpc_malloc - allocate an RPC buffer
+ * @task: RPC task that will use this buffer
+ * @size: requested byte size
  *
  * We try to ensure that some NFS reads and writes can always proceed
  * by using a mempool when allocating 'small' buffers.
  * In order to avoid memory starvation triggering more writebacks of
  * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
  */
-void *
-rpc_malloc(struct rpc_task *task, size_t size)
+void * rpc_malloc(struct rpc_task *task, size_t size)
 {
+	struct rpc_rqst *req = task->tk_rqstp;
 	gfp_t	gfp;
 
 	if (task->tk_flags & RPC_TASK_SWAPPER)
@@ -743,27 +742,33 @@
 		gfp = GFP_NOFS;
 
 	if (size > RPC_BUFFER_MAXSIZE) {
-		task->tk_buffer =  kmalloc(size, gfp);
-		if (task->tk_buffer)
-			task->tk_bufsize = size;
+		req->rq_buffer = kmalloc(size, gfp);
+		if (req->rq_buffer)
+			req->rq_bufsize = size;
 	} else {
-		task->tk_buffer =  mempool_alloc(rpc_buffer_mempool, gfp);
-		if (task->tk_buffer)
-			task->tk_bufsize = RPC_BUFFER_MAXSIZE;
+		req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
+		if (req->rq_buffer)
+			req->rq_bufsize = RPC_BUFFER_MAXSIZE;
 	}
-	return task->tk_buffer;
+	return req->rq_buffer;
 }
 
-static void
-rpc_free(struct rpc_task *task)
+/**
+ * rpc_free - free buffer allocated via rpc_malloc
+ * @task: RPC task with a buffer to be freed
+ *
+ */
+void rpc_free(struct rpc_task *task)
 {
-	if (task->tk_buffer) {
-		if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
-			mempool_free(task->tk_buffer, rpc_buffer_mempool);
+	struct rpc_rqst *req = task->tk_rqstp;
+
+	if (req->rq_buffer) {
+		if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
+			mempool_free(req->rq_buffer, rpc_buffer_mempool);
 		else
-			kfree(task->tk_buffer);
-		task->tk_buffer = NULL;
-		task->tk_bufsize = 0;
+			kfree(req->rq_buffer);
+		req->rq_buffer = NULL;
+		req->rq_bufsize = 0;
 	}
 }
 
@@ -887,7 +892,6 @@
 		xprt_release(task);
 	if (task->tk_msg.rpc_cred)
 		rpcauth_unbindcred(task);
-	rpc_free(task);
 	if (task->tk_client) {
 		rpc_release_client(task->tk_client);
 		task->tk_client = NULL;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 6dda386..069a6cb 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -838,6 +838,8 @@
 	req->rq_timeout = xprt->timeout.to_initval;
 	req->rq_task	= task;
 	req->rq_xprt    = xprt;
+	req->rq_buffer  = NULL;
+	req->rq_bufsize = 0;
 	req->rq_xid     = xprt_alloc_xid(xprt);
 	req->rq_release_snd_buf = NULL;
 	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
@@ -867,6 +869,7 @@
 		mod_timer(&xprt->timer,
 				xprt->last_used + xprt->idle_timeout);
 	spin_unlock_bh(&xprt->transport_lock);
+	xprt->ops->buf_free(task);
 	task->tk_rqstp = NULL;
 	if (req->rq_release_snd_buf)
 		req->rq_release_snd_buf(req);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 77e8800..51f07c9 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -28,6 +28,7 @@
 #include <linux/udp.h>
 #include <linux/tcp.h>
 #include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
 #include <linux/file.h>
 
 #include <net/sock.h>
@@ -1161,6 +1162,8 @@
 	.reserve_xprt		= xprt_reserve_xprt_cong,
 	.release_xprt		= xprt_release_xprt_cong,
 	.connect		= xs_connect,
+	.buf_alloc		= rpc_malloc,
+	.buf_free		= rpc_free,
 	.send_request		= xs_udp_send_request,
 	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
 	.timer			= xs_udp_timer,
@@ -1173,6 +1176,8 @@
 	.reserve_xprt		= xprt_reserve_xprt,
 	.release_xprt		= xprt_release_xprt,
 	.connect		= xs_connect,
+	.buf_alloc		= rpc_malloc,
+	.buf_free		= rpc_free,
 	.send_request		= xs_tcp_send_request,
 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
 	.close			= xs_close,