SUNRPC: switchable buffer allocation
Add RPC client transport switch support for replacing buffer management
on a per-transport basis.
In the current IPv4 socket transport implementation, RPC buffers are
allocated as needed for each RPC message that is sent. Some transport
implementations may choose to use pre-allocated buffers for encoding,
sending, receiving, and unmarshalling RPC messages, however. For
transports capable of direct data placement, the buffers can be carved
out of a pre-registered area of memory rather than from a slab cache.
Test-plan:
Millions of fsx operations. Performance characterization with "sio" and
"iozone". Use oprofile and other tools to look for significant regression
in CPU utilization.
Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 94b0afa..8b25629 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -52,8 +52,6 @@
* RPC call state
*/
struct rpc_message tk_msg; /* RPC call info */
- __u32 * tk_buffer; /* XDR buffer */
- size_t tk_bufsize;
__u8 tk_garb_retry;
__u8 tk_cred_retry;
@@ -268,6 +266,7 @@
void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long);
void * rpc_malloc(struct rpc_task *, size_t);
+void rpc_free(struct rpc_task *);
int rpciod_up(void);
void rpciod_down(void);
void rpciod_wake_up(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 3b8b6e8..7885b96 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -79,21 +79,19 @@
void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
struct list_head rq_list;
+ __u32 * rq_buffer; /* XDR encode buffer */
+ size_t rq_bufsize;
+
struct xdr_buf rq_private_buf; /* The receive buffer
* used in the softirq.
*/
unsigned long rq_majortimeo; /* major timeout alarm */
unsigned long rq_timeout; /* Current timeout value */
unsigned int rq_retries; /* # of retries */
- /*
- * For authentication (e.g. auth_des)
- */
- u32 rq_creddata[2];
/*
* Partial send handling
*/
-
u32 rq_bytes_sent; /* Bytes we have sent */
unsigned long rq_xtime; /* when transmitted */
@@ -107,6 +105,8 @@
int (*reserve_xprt)(struct rpc_task *task);
void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*connect)(struct rpc_task *task);
+ void * (*buf_alloc)(struct rpc_task *task, size_t size);
+ void (*buf_free)(struct rpc_task *task);
int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
void (*timer)(struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b23c0d3..25cba94 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -644,24 +644,26 @@
/*
* 2. Allocate the buffer. For details, see sched.c:rpc_malloc.
- * (Note: buffer memory is freed in rpc_task_release).
+ * (Note: buffer memory is freed in xprt_release).
*/
static void
call_allocate(struct rpc_task *task)
{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = task->tk_xprt;
unsigned int bufsiz;
dprintk("RPC: %4d call_allocate (status %d)\n",
task->tk_pid, task->tk_status);
task->tk_action = call_bind;
- if (task->tk_buffer)
+ if (req->rq_buffer)
return;
/* FIXME: compute buffer requirements more exactly using
* auth->au_wslack */
bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
- if (rpc_malloc(task, bufsiz << 1) != NULL)
+ if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
return;
printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task);
@@ -704,14 +706,14 @@
task->tk_pid, task->tk_status);
/* Default buffer setup */
- bufsiz = task->tk_bufsize >> 1;
- sndbuf->head[0].iov_base = (void *)task->tk_buffer;
+ bufsiz = req->rq_bufsize >> 1;
+ sndbuf->head[0].iov_base = (void *)req->rq_buffer;
sndbuf->head[0].iov_len = bufsiz;
sndbuf->tail[0].iov_len = 0;
sndbuf->page_len = 0;
sndbuf->len = 0;
sndbuf->buflen = bufsiz;
- rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
+ rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
rcvbuf->head[0].iov_len = bufsiz;
rcvbuf->tail[0].iov_len = 0;
rcvbuf->page_len = 0;
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 48510e3..7415406 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -41,8 +41,6 @@
static void __rpc_default_timer(struct rpc_task *task);
static void rpciod_killall(void);
-static void rpc_free(struct rpc_task *task);
-
static void rpc_async_schedule(void *);
/*
@@ -599,7 +597,6 @@
WARN_ON(RPC_ASSASSINATED(task));
/* Always release the RPC slot and buffer memory */
xprt_release(task);
- rpc_free(task);
}
}
}
@@ -724,17 +721,19 @@
__rpc_execute((struct rpc_task *)arg);
}
-/*
- * Allocate memory for RPC purposes.
+/**
+ * rpc_malloc - allocate an RPC buffer
+ * @task: RPC task that will use this buffer
+ * @size: requested byte size
*
* We try to ensure that some NFS reads and writes can always proceed
* by using a mempool when allocating 'small' buffers.
* In order to avoid memory starvation triggering more writebacks of
* NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
*/
-void *
-rpc_malloc(struct rpc_task *task, size_t size)
+void * rpc_malloc(struct rpc_task *task, size_t size)
{
+ struct rpc_rqst *req = task->tk_rqstp;
gfp_t gfp;
if (task->tk_flags & RPC_TASK_SWAPPER)
@@ -743,27 +742,33 @@
gfp = GFP_NOFS;
if (size > RPC_BUFFER_MAXSIZE) {
- task->tk_buffer = kmalloc(size, gfp);
- if (task->tk_buffer)
- task->tk_bufsize = size;
+ req->rq_buffer = kmalloc(size, gfp);
+ if (req->rq_buffer)
+ req->rq_bufsize = size;
} else {
- task->tk_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
- if (task->tk_buffer)
- task->tk_bufsize = RPC_BUFFER_MAXSIZE;
+ req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
+ if (req->rq_buffer)
+ req->rq_bufsize = RPC_BUFFER_MAXSIZE;
}
- return task->tk_buffer;
+ return req->rq_buffer;
}
-static void
-rpc_free(struct rpc_task *task)
+/**
+ * rpc_free - free buffer allocated via rpc_malloc
+ * @task: RPC task with a buffer to be freed
+ *
+ */
+void rpc_free(struct rpc_task *task)
{
- if (task->tk_buffer) {
- if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
- mempool_free(task->tk_buffer, rpc_buffer_mempool);
+ struct rpc_rqst *req = task->tk_rqstp;
+
+ if (req->rq_buffer) {
+ if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
+ mempool_free(req->rq_buffer, rpc_buffer_mempool);
else
- kfree(task->tk_buffer);
- task->tk_buffer = NULL;
- task->tk_bufsize = 0;
+ kfree(req->rq_buffer);
+ req->rq_buffer = NULL;
+ req->rq_bufsize = 0;
}
}
@@ -887,7 +892,6 @@
xprt_release(task);
if (task->tk_msg.rpc_cred)
rpcauth_unbindcred(task);
- rpc_free(task);
if (task->tk_client) {
rpc_release_client(task->tk_client);
task->tk_client = NULL;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 6dda386..069a6cb 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -838,6 +838,8 @@
req->rq_timeout = xprt->timeout.to_initval;
req->rq_task = task;
req->rq_xprt = xprt;
+ req->rq_buffer = NULL;
+ req->rq_bufsize = 0;
req->rq_xid = xprt_alloc_xid(xprt);
req->rq_release_snd_buf = NULL;
dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
@@ -867,6 +869,7 @@
mod_timer(&xprt->timer,
xprt->last_used + xprt->idle_timeout);
spin_unlock_bh(&xprt->transport_lock);
+ xprt->ops->buf_free(task);
task->tk_rqstp = NULL;
if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 77e8800..51f07c9 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -28,6 +28,7 @@
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
#include <linux/file.h>
#include <net/sock.h>
@@ -1161,6 +1162,8 @@
.reserve_xprt = xprt_reserve_xprt_cong,
.release_xprt = xprt_release_xprt_cong,
.connect = xs_connect,
+ .buf_alloc = rpc_malloc,
+ .buf_free = rpc_free,
.send_request = xs_udp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_rtt,
.timer = xs_udp_timer,
@@ -1173,6 +1176,8 @@
.reserve_xprt = xprt_reserve_xprt,
.release_xprt = xprt_release_xprt,
.connect = xs_connect,
+ .buf_alloc = rpc_malloc,
+ .buf_free = rpc_free,
.send_request = xs_tcp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def,
.close = xs_close,