SUNRPC: Allow the rpc_release() callback to be run on another workqueue

A lot of the work done by the rpc_release() callback is inappropriate for
rpciod as it will often involve things like starting a new rpc call in
order to clean up state after an interrupted NFSv4 open() call, or
calls to mntput(), etc.

This patch allows the caller of rpc_run_task() to specify that the
rpc_release callback should run on a different workqueue than the default
rpciod_workqueue.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 4c66912..3e0b223 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -326,7 +326,7 @@
 		int status;
 
 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
-		status = queue_work(task->tk_workqueue, &task->u.tk_work);
+		status = queue_work(rpciod_workqueue, &task->u.tk_work);
 		if (status < 0) {
 			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
 			task->tk_status = status;
@@ -832,7 +832,7 @@
 	task->tk_owner = current->tgid;
 
 	/* Initialize workqueue for async tasks */
-	task->tk_workqueue = rpciod_workqueue;
+	task->tk_workqueue = task_setup_data->workqueue;
 
 	task->tk_client = task_setup_data->rpc_client;
 	if (task->tk_client != NULL) {
@@ -868,7 +868,7 @@
 	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
-static void rpc_free_task(struct rcu_head *rcu)
+static void rpc_free_task_rcu(struct rcu_head *rcu)
 {
 	struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
 	dprintk("RPC: %5u freeing task\n", task->tk_pid);
@@ -898,12 +898,23 @@
 	return task;
 }
 
-
-void rpc_put_task(struct rpc_task *task)
+static void rpc_free_task(struct rpc_task *task)
 {
 	const struct rpc_call_ops *tk_ops = task->tk_ops;
 	void *calldata = task->tk_calldata;
 
+	if (task->tk_flags & RPC_TASK_DYNAMIC)
+		call_rcu_bh(&task->u.tk_rcu, rpc_free_task_rcu);
+	rpc_release_calldata(tk_ops, calldata);
+}
+
+static void rpc_async_release(struct work_struct *work)
+{
+	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
+}
+
+void rpc_put_task(struct rpc_task *task)
+{
 	if (!atomic_dec_and_test(&task->tk_count))
 		return;
 	/* Release resources */
@@ -915,9 +926,11 @@
 		rpc_release_client(task->tk_client);
 		task->tk_client = NULL;
 	}
-	if (task->tk_flags & RPC_TASK_DYNAMIC)
-		call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
-	rpc_release_calldata(tk_ops, calldata);
+	if (task->tk_workqueue != NULL) {
+		INIT_WORK(&task->u.tk_work, rpc_async_release);
+		queue_work(task->tk_workqueue, &task->u.tk_work);
+	} else
+		rpc_free_task(task);
 }
 EXPORT_SYMBOL_GPL(rpc_put_task);