SUNRPC: Move rpc_task->tk_task list into struct rpc_clnt

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 6661142..0801ab5 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -26,6 +26,8 @@
 struct rpc_clnt {
 	atomic_t		cl_count;	/* Number of clones */
 	atomic_t		cl_users;	/* number of references */
+	struct list_head	cl_clients;	/* Global list of clients */
+	struct list_head	cl_tasks;	/* List of tasks */
 	struct rpc_xprt *	cl_xprt;	/* transport */
 	struct rpc_procinfo *	cl_procinfo;	/* procedure info */
 	u32			cl_prog,	/* RPC program number */
@@ -122,6 +124,8 @@
 int		rpc_shutdown_client(struct rpc_clnt *);
 int		rpc_destroy_client(struct rpc_clnt *);
 void		rpc_release_client(struct rpc_clnt *);
+void		rpc_register_client(struct rpc_clnt *);
+void		rpc_unregister_client(struct rpc_clnt *);
 int		rpcb_register(u32, u32, int, unsigned short, int *);
 void		rpcb_getport(struct rpc_task *);
 
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 2047fb2..3387b00 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -110,11 +110,6 @@
 	if (!list_empty(head) &&  \
 	    ((task=list_entry((head)->next, struct rpc_task, u.tk_wait.list)),1))
 
-/* .. and walking list of all tasks */
-#define	alltask_for_each(task, pos, head) \
-	list_for_each(pos, head) \
-		if ((task=list_entry(pos, struct rpc_task, tk_task)),1)
-
 typedef void			(*rpc_action)(struct rpc_task *);
 
 struct rpc_call_ops {
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index d8fbee4..6631ece 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -148,6 +148,7 @@
 	if (clnt->cl_metrics == NULL)
 		goto out_no_stats;
 	clnt->cl_program  = program;
+	INIT_LIST_HEAD(&clnt->cl_tasks);
 
 	if (!xprt_bound(clnt->cl_xprt))
 		clnt->cl_autobind = 1;
@@ -172,6 +173,7 @@
 	if (clnt->cl_nodelen > UNX_MAXNODENAME)
 		clnt->cl_nodelen = UNX_MAXNODENAME;
 	memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
+	rpc_register_client(clnt);
 	return clnt;
 
 out_no_auth:
@@ -283,9 +285,11 @@
 	new->cl_autobind = 0;
 	new->cl_oneshot = 0;
 	new->cl_dead = 0;
+	INIT_LIST_HEAD(&new->cl_tasks);
 	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
 	if (new->cl_auth)
 		atomic_inc(&new->cl_auth->au_count);
+	rpc_register_client(new);
 	return new;
 out_no_path:
 	rpc_free_iostats(new->cl_metrics);
@@ -357,6 +361,7 @@
 	if (clnt->cl_server != clnt->cl_inline_name)
 		kfree(clnt->cl_server);
 out_free:
+	rpc_unregister_client(clnt);
 	rpc_free_iostats(clnt->cl_metrics);
 	clnt->cl_metrics = NULL;
 	xprt_put(clnt->cl_xprt);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 944d753..6309f3b 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -50,9 +50,10 @@
 static RPC_WAITQ(delay_queue, "delayq");
 
 /*
- * All RPC tasks are linked into this list
+ * All RPC clients are linked into this list
  */
-static LIST_HEAD(all_tasks);
+static LIST_HEAD(all_clients);
+static DECLARE_WAIT_QUEUE_HEAD(client_kill_wait);
 
 /*
  * rpciod-related stuff
@@ -277,7 +278,8 @@
 	task->tk_pid = rpc_task_id++;
 #endif
 	/* Add to global list of all tasks */
-	list_add_tail(&task->tk_task, &all_tasks);
+	if (task->tk_client)
+		list_add_tail(&task->tk_task, &task->tk_client->cl_tasks);
 	spin_unlock(&rpc_sched_lock);
 }
 
@@ -818,6 +820,7 @@
 	if (tk_ops->rpc_call_prepare != NULL)
 		task->tk_action = rpc_prepare_task;
 	task->tk_calldata = calldata;
+	INIT_LIST_HEAD(&task->tk_task);
 
 	/* Initialize retry counters */
 	task->tk_garb_retry = 2;
@@ -920,11 +923,12 @@
 #endif
 	dprintk("RPC: %5u release task\n", task->tk_pid);
 
-	/* Remove from global task list */
-	spin_lock(&rpc_sched_lock);
-	list_del(&task->tk_task);
-	spin_unlock(&rpc_sched_lock);
-
+	if (!list_empty(&task->tk_task)) {
+		/* Remove from client task list */
+		spin_lock(&rpc_sched_lock);
+		list_del(&task->tk_task);
+		spin_unlock(&rpc_sched_lock);
+	}
 	BUG_ON (RPC_IS_QUEUED(task));
 
 	/* Synchronously delete any running timer */
@@ -966,42 +970,52 @@
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
  */
-void rpc_killall_tasks(struct rpc_clnt *clnt)
+static void rpc_killall_tasks_locked(struct list_head *head)
 {
 	struct rpc_task	*rovr;
-	struct list_head *le;
 
-	dprintk("RPC:       killing all tasks for client %p\n", clnt);
 
-	/*
-	 * Spin lock all_tasks to prevent changes...
-	 */
-	spin_lock(&rpc_sched_lock);
-	alltask_for_each(rovr, le, &all_tasks) {
+	list_for_each_entry(rovr, head, tk_task) {
 		if (! RPC_IS_ACTIVATED(rovr))
 			continue;
-		if (!clnt || rovr->tk_client == clnt) {
+		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 			rovr->tk_flags |= RPC_TASK_KILLED;
 			rpc_exit(rovr, -EIO);
 			rpc_wake_up_task(rovr);
 		}
 	}
+}
+
+void rpc_killall_tasks(struct rpc_clnt *clnt)
+{
+	dprintk("RPC:       killing all tasks for client %p\n", clnt);
+	/*
+	 * Spin lock all_tasks to prevent changes...
+	 */
+	spin_lock(&rpc_sched_lock);
+	rpc_killall_tasks_locked(&clnt->cl_tasks);
 	spin_unlock(&rpc_sched_lock);
 }
 
 static void rpciod_killall(void)
 {
+	struct rpc_clnt *clnt;
 	unsigned long flags;
 
-	while (!list_empty(&all_tasks)) {
+	for(;;) {
 		clear_thread_flag(TIF_SIGPENDING);
-		rpc_killall_tasks(NULL);
+
+		spin_lock(&rpc_sched_lock);
+		list_for_each_entry(clnt, &all_clients, cl_clients)
+			rpc_killall_tasks_locked(&clnt->cl_tasks);
+		spin_unlock(&rpc_sched_lock);
 		flush_workqueue(rpciod_workqueue);
-		if (!list_empty(&all_tasks)) {
-			dprintk("RPC:       rpciod_killall: waiting for tasks "
+		if (!list_empty(&all_clients))
+			break;
+		dprintk("RPC:       rpciod_killall: waiting for tasks "
 					"to exit\n");
-			yield();
-		}
+		wait_event_timeout(client_kill_wait,
+				list_empty(&all_clients), 1*HZ);
 	}
 
 	spin_lock_irqsave(&current->sighand->siglock, flags);
@@ -1009,6 +1023,22 @@
 	spin_unlock_irqrestore(&current->sighand->siglock, flags);
 }
 
+void rpc_register_client(struct rpc_clnt *clnt)
+{
+	spin_lock(&rpc_sched_lock);
+	list_add(&clnt->cl_clients, &all_clients);
+	spin_unlock(&rpc_sched_lock);
+}
+
+void rpc_unregister_client(struct rpc_clnt *clnt)
+{
+	spin_lock(&rpc_sched_lock);
+	list_del(&clnt->cl_clients);
+	if (list_empty(&all_clients))
+		wake_up(&client_kill_wait);
+	spin_unlock(&rpc_sched_lock);
+}
+
 /*
  * Start up the rpciod process if it's not already running.
  */
@@ -1071,32 +1101,33 @@
 #ifdef RPC_DEBUG
 void rpc_show_tasks(void)
 {
-	struct list_head *le;
+	struct rpc_clnt *clnt;
 	struct rpc_task *t;
 
 	spin_lock(&rpc_sched_lock);
-	if (list_empty(&all_tasks)) {
-		spin_unlock(&rpc_sched_lock);
-		return;
-	}
+	if (list_empty(&all_clients))
+		goto out;
 	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
 		"-rpcwait -action- ---ops--\n");
-	alltask_for_each(t, le, &all_tasks) {
-		const char *rpc_waitq = "none";
+	list_for_each_entry(clnt, &all_clients, cl_clients) {
+		list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
+			const char *rpc_waitq = "none";
 
-		if (RPC_IS_QUEUED(t))
-			rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
+			if (RPC_IS_QUEUED(t))
+				rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
 
-		printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
-			t->tk_pid,
-			(t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
-			t->tk_flags, t->tk_status,
-			t->tk_client,
-			(t->tk_client ? t->tk_client->cl_prog : 0),
-			t->tk_rqstp, t->tk_timeout,
-			rpc_waitq,
-			t->tk_action, t->tk_ops);
+			printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
+				t->tk_pid,
+				(t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
+				t->tk_flags, t->tk_status,
+				t->tk_client,
+				(t->tk_client ? t->tk_client->cl_prog : 0),
+				t->tk_rqstp, t->tk_timeout,
+				rpc_waitq,
+				t->tk_action, t->tk_ops);
+		}
 	}
+out:
 	spin_unlock(&rpc_sched_lock);
 }
 #endif