SUNRPC: Fix a race in rpciod_down()

The commit 4ada539ed77c7a2bbcb75cafbbd7bd8d2b9bef7b lead to the unpleasant
possibility of an asynchronous rpc_task being required to call
rpciod_down() when it is complete. This again means that the rpciod
workqueue may get to call destroy_workqueue on itself -> hang...

Change rpciod_up/rpciod_down to just get/put the module, and then
create/destroy the workqueues on module load/unload.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index b5723c2..954d7ec 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -50,8 +50,6 @@
 /*
  * rpciod-related stuff
  */
-static DEFINE_MUTEX(rpciod_mutex);
-static atomic_t rpciod_users = ATOMIC_INIT(0);
 struct workqueue_struct *rpciod_workqueue;
 
 /*
@@ -961,60 +959,49 @@
 	spin_unlock(&clnt->cl_lock);
 }
 
+int rpciod_up(void)
+{
+	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
+}
+
+void rpciod_down(void)
+{
+	module_put(THIS_MODULE);
+}
+
 /*
- * Start up the rpciod process if it's not already running.
+ * Start up the rpciod workqueue.
  */
-int
-rpciod_up(void)
+static int rpciod_start(void)
 {
 	struct workqueue_struct *wq;
-	int error = 0;
 
-	if (atomic_inc_not_zero(&rpciod_users))
-		return 0;
-
-	mutex_lock(&rpciod_mutex);
-
-	/* Guard against races with rpciod_down() */
-	if (rpciod_workqueue != NULL)
-		goto out_ok;
 	/*
 	 * Create the rpciod thread and wait for it to start.
 	 */
 	dprintk("RPC:       creating workqueue rpciod\n");
-	error = -ENOMEM;
 	wq = create_workqueue("rpciod");
-	if (wq == NULL)
-		goto out;
-
 	rpciod_workqueue = wq;
-	error = 0;
-out_ok:
-	atomic_inc(&rpciod_users);
-out:
-	mutex_unlock(&rpciod_mutex);
-	return error;
+	return rpciod_workqueue != NULL;
 }
 
-void
-rpciod_down(void)
+static void rpciod_stop(void)
 {
-	if (!atomic_dec_and_test(&rpciod_users))
-		return;
+	struct workqueue_struct *wq = NULL;
 
-	mutex_lock(&rpciod_mutex);
+	if (rpciod_workqueue == NULL)
+		return;
 	dprintk("RPC:       destroying workqueue rpciod\n");
 
-	if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) {
-		destroy_workqueue(rpciod_workqueue);
-		rpciod_workqueue = NULL;
-	}
-	mutex_unlock(&rpciod_mutex);
+	wq = rpciod_workqueue;
+	rpciod_workqueue = NULL;
+	destroy_workqueue(wq);
 }
 
 void
 rpc_destroy_mempool(void)
 {
+	rpciod_stop();
 	if (rpc_buffer_mempool)
 		mempool_destroy(rpc_buffer_mempool);
 	if (rpc_task_mempool)
@@ -1048,6 +1035,8 @@
 						      rpc_buffer_slabp);
 	if (!rpc_buffer_mempool)
 		goto err_nomem;
+	if (!rpciod_start())
+		goto err_nomem;
 	return 0;
 err_nomem:
 	rpc_destroy_mempool();