SUNRPC: Close a race in __rpc_wait_for_completion_task()

Although they run as rpciod background tasks, under normal operation
(i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck()
and nfs4_do_close() want to be fully synchronous. This means that when we
exit, we want all references to the rpc_task to be gone, and we want
any dentry references etc. held by that task to be released.

For this reason these functions call __rpc_wait_for_completion_task(),
followed by rpc_put_task() in the expectation that the latter will be
releasing the last reference to the rpc_task, and thus ensuring that the
callback_ops->rpc_release() has been called synchronously.

This patch fixes a race which exists due to the fact that
rpciod calls rpc_complete_task() (in order to wake up the callers of
__rpc_wait_for_completion_task()) and then subsequently calls
rpc_put_task() without ensuring that these two steps are done atomically.

In order to avoid adding new spin locks, the patch uses the existing
waitqueue spin lock to order the rpc_task reference count releases between
the waiting process and rpciod.
The common case where nobody is waiting for completion is optimised for by
checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task
reference count is 1: in those cases we drop trying to grab the spin lock,
and immediately free up the rpc_task.

Those few processes that need to put the rpc_task from inside an
asynchronous context and that do not care about ordering are given a new
helper: rpc_put_task_async().

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 1ff76ac..d1ed671 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -4150,7 +4150,7 @@
 		task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp,
 				data->arg.lock_seqid);
 		if (!IS_ERR(task))
-			rpc_put_task(task);
+			rpc_put_task_async(task);
 		dprintk("%s: cancelling lock!\n", __func__);
 	} else
 		nfs_free_seqid(data->arg.lock_seqid);
@@ -5227,7 +5227,7 @@
 	if (IS_ERR(task))
 		ret = PTR_ERR(task);
 	else
-		rpc_put_task(task);
+		rpc_put_task_async(task);
 	dprintk("<-- %s status=%d\n", __func__, ret);
 	return ret;
 }
diff --git a/fs/nfs/unlink.c b/fs/nfs/unlink.c
index e313a51..6481d53 100644
--- a/fs/nfs/unlink.c
+++ b/fs/nfs/unlink.c
@@ -180,7 +180,7 @@
 	task_setup_data.rpc_client = NFS_CLIENT(dir);
 	task = rpc_run_task(&task_setup_data);
 	if (!IS_ERR(task))
-		rpc_put_task(task);
+		rpc_put_task_async(task);
 	return 1;
 }
 
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 88513fd..d81db80 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -212,6 +212,7 @@
 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
 				const struct rpc_call_ops *ops);
 void		rpc_put_task(struct rpc_task *);
+void		rpc_put_task_async(struct rpc_task *);
 void		rpc_exit_task(struct rpc_task *);
 void		rpc_exit(struct rpc_task *, int);
 void		rpc_release_calldata(const struct rpc_call_ops *, void *);
diff --git a/kernel/sched.c b/kernel/sched.c
index 18d38e4..42eab5a 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -4213,6 +4213,7 @@
 {
 	__wake_up_common(q, mode, 1, 0, key);
 }
+EXPORT_SYMBOL_GPL(__wake_up_locked_key);
 
 /**
  * __wake_up_sync_key - wake up threads blocked on a waitqueue.
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 243fc09..59e5994 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -252,23 +252,37 @@
 
 /*
  * Mark an RPC call as having completed by clearing the 'active' bit
+ * and then waking up all tasks that were sleeping.
  */
-static void rpc_mark_complete_task(struct rpc_task *task)
+static int rpc_complete_task(struct rpc_task *task)
 {
-	smp_mb__before_clear_bit();
+	void *m = &task->tk_runstate;
+	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
+	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
+	unsigned long flags;
+	int ret;
+
+	spin_lock_irqsave(&wq->lock, flags);
 	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
-	smp_mb__after_clear_bit();
-	wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+	ret = atomic_dec_and_test(&task->tk_count);
+	if (waitqueue_active(wq))
+		__wake_up_locked_key(wq, TASK_NORMAL, &k);
+	spin_unlock_irqrestore(&wq->lock, flags);
+	return ret;
 }
 
 /*
  * Allow callers to wait for completion of an RPC call
+ *
+ * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
+ * to enforce taking of the wq->lock and hence avoid races with
+ * rpc_complete_task().
  */
 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
 {
 	if (action == NULL)
 		action = rpc_wait_bit_killable;
-	return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
 			action, TASK_KILLABLE);
 }
 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -857,34 +871,67 @@
 	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
 }
 
-void rpc_put_task(struct rpc_task *task)
+static void rpc_release_resources_task(struct rpc_task *task)
 {
-	if (!atomic_dec_and_test(&task->tk_count))
-		return;
-	/* Release resources */
 	if (task->tk_rqstp)
 		xprt_release(task);
 	if (task->tk_msg.rpc_cred)
 		put_rpccred(task->tk_msg.rpc_cred);
 	rpc_task_release_client(task);
-	if (task->tk_workqueue != NULL) {
+}
+
+static void rpc_final_put_task(struct rpc_task *task,
+		struct workqueue_struct *q)
+{
+	if (q != NULL) {
 		INIT_WORK(&task->u.tk_work, rpc_async_release);
-		queue_work(task->tk_workqueue, &task->u.tk_work);
+		queue_work(q, &task->u.tk_work);
 	} else
 		rpc_free_task(task);
 }
+
+static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
+{
+	if (atomic_dec_and_test(&task->tk_count)) {
+		rpc_release_resources_task(task);
+		rpc_final_put_task(task, q);
+	}
+}
+
+void rpc_put_task(struct rpc_task *task)
+{
+	rpc_do_put_task(task, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_put_task);
 
+void rpc_put_task_async(struct rpc_task *task)
+{
+	rpc_do_put_task(task, task->tk_workqueue);
+}
+EXPORT_SYMBOL_GPL(rpc_put_task_async);
+
 static void rpc_release_task(struct rpc_task *task)
 {
 	dprintk("RPC: %5u release task\n", task->tk_pid);
 
 	BUG_ON (RPC_IS_QUEUED(task));
 
-	/* Wake up anyone who is waiting for task completion */
-	rpc_mark_complete_task(task);
+	rpc_release_resources_task(task);
 
-	rpc_put_task(task);
+	/*
+	 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
+	 * so it should be safe to use task->tk_count as a test for whether
+	 * or not any other processes still hold references to our rpc_task.
+	 */
+	if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
+		/* Wake up anyone who may be waiting for task completion */
+		if (!rpc_complete_task(task))
+			return;
+	} else {
+		if (!atomic_dec_and_test(&task->tk_count))
+			return;
+	}
+	rpc_final_put_task(task, task->tk_workqueue);
 }
 
 int rpciod_up(void)