SUNRPC: rpc_timeout_upcall_queue should not sleep

 The function rpc_timeout_upcall_queue runs from a workqueue, and hence
 sleeping is not recommended. Convert the protection of the upcall queue
 from being mutex-based to being spinlock-based.

 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 9764c80..7281746 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -38,44 +38,42 @@
 
 #define RPC_UPCALL_TIMEOUT (30*HZ)
 
-static void
-__rpc_purge_list(struct rpc_inode *rpci, struct list_head *head, int err)
+static void rpc_purge_list(struct rpc_inode *rpci, struct list_head *head,
+		void (*destroy_msg)(struct rpc_pipe_msg *), int err)
 {
 	struct rpc_pipe_msg *msg;
-	void (*destroy_msg)(struct rpc_pipe_msg *);
 
-	destroy_msg = rpci->ops->destroy_msg;
-	while (!list_empty(head)) {
+	if (list_empty(head))
+		return;
+	do {
 		msg = list_entry(head->next, struct rpc_pipe_msg, list);
-		list_del_init(&msg->list);
+		list_del(&msg->list);
 		msg->errno = err;
 		destroy_msg(msg);
-	}
-}
-
-static void
-__rpc_purge_upcall(struct inode *inode, int err)
-{
-	struct rpc_inode *rpci = RPC_I(inode);
-
-	__rpc_purge_list(rpci, &rpci->pipe, err);
-	rpci->pipelen = 0;
+	} while (!list_empty(head));
 	wake_up(&rpci->waitq);
 }
 
 static void
 rpc_timeout_upcall_queue(void *data)
 {
+	LIST_HEAD(free_list);
 	struct rpc_inode *rpci = (struct rpc_inode *)data;
 	struct inode *inode = &rpci->vfs_inode;
+	void (*destroy_msg)(struct rpc_pipe_msg *);
 
-	mutex_lock(&inode->i_mutex);
-	if (rpci->ops == NULL)
-		goto out;
-	if (rpci->nreaders == 0 && !list_empty(&rpci->pipe))
-		__rpc_purge_upcall(inode, -ETIMEDOUT);
-out:
-	mutex_unlock(&inode->i_mutex);
+	spin_lock(&inode->i_lock);
+	if (rpci->ops == NULL) {
+		spin_unlock(&inode->i_lock);
+		return;
+	}
+	destroy_msg = rpci->ops->destroy_msg;
+	if (rpci->nreaders == 0) {
+		list_splice_init(&rpci->pipe, &free_list);
+		rpci->pipelen = 0;
+	}
+	spin_unlock(&inode->i_lock);
+	rpc_purge_list(rpci, &free_list, destroy_msg, -ETIMEDOUT);
 }
 
 int
@@ -84,7 +82,7 @@
 	struct rpc_inode *rpci = RPC_I(inode);
 	int res = -EPIPE;
 
-	mutex_lock(&inode->i_mutex);
+	spin_lock(&inode->i_lock);
 	if (rpci->ops == NULL)
 		goto out;
 	if (rpci->nreaders) {
@@ -100,7 +98,7 @@
 		res = 0;
 	}
 out:
-	mutex_unlock(&inode->i_mutex);
+	spin_unlock(&inode->i_lock);
 	wake_up(&rpci->waitq);
 	return res;
 }
@@ -115,21 +113,29 @@
 rpc_close_pipes(struct inode *inode)
 {
 	struct rpc_inode *rpci = RPC_I(inode);
+	struct rpc_pipe_ops *ops;
 
 	mutex_lock(&inode->i_mutex);
-	if (rpci->ops != NULL) {
+	ops = rpci->ops;
+	if (ops != NULL) {
+		LIST_HEAD(free_list);
+
+		spin_lock(&inode->i_lock);
 		rpci->nreaders = 0;
-		__rpc_purge_list(rpci, &rpci->in_upcall, -EPIPE);
-		__rpc_purge_upcall(inode, -EPIPE);
-		rpci->nwriters = 0;
-		if (rpci->ops->release_pipe)
-			rpci->ops->release_pipe(inode);
+		list_splice_init(&rpci->in_upcall, &free_list);
+		list_splice_init(&rpci->pipe, &free_list);
+		rpci->pipelen = 0;
 		rpci->ops = NULL;
+		spin_unlock(&inode->i_lock);
+		rpc_purge_list(rpci, &free_list, ops->destroy_msg, -EPIPE);
+		rpci->nwriters = 0;
+		if (ops->release_pipe)
+			ops->release_pipe(inode);
+		cancel_delayed_work(&rpci->queue_timeout);
+		flush_scheduled_work();
 	}
 	rpc_inode_setowner(inode, NULL);
 	mutex_unlock(&inode->i_mutex);
-	cancel_delayed_work(&rpci->queue_timeout);
-	flush_scheduled_work();
 }
 
 static struct inode *
@@ -177,16 +183,26 @@
 		goto out;
 	msg = (struct rpc_pipe_msg *)filp->private_data;
 	if (msg != NULL) {
+		spin_lock(&inode->i_lock);
 		msg->errno = -EAGAIN;
-		list_del_init(&msg->list);
+		list_del(&msg->list);
+		spin_unlock(&inode->i_lock);
 		rpci->ops->destroy_msg(msg);
 	}
 	if (filp->f_mode & FMODE_WRITE)
 		rpci->nwriters --;
-	if (filp->f_mode & FMODE_READ)
+	if (filp->f_mode & FMODE_READ) {
 		rpci->nreaders --;
-	if (!rpci->nreaders)
-		__rpc_purge_upcall(inode, -EAGAIN);
+		if (rpci->nreaders == 0) {
+			LIST_HEAD(free_list);
+			spin_lock(&inode->i_lock);
+			list_splice_init(&rpci->pipe, &free_list);
+			rpci->pipelen = 0;
+			spin_unlock(&inode->i_lock);
+			rpc_purge_list(rpci, &free_list,
+					rpci->ops->destroy_msg, -EAGAIN);
+		}
+	}
 	if (rpci->ops->release_pipe)
 		rpci->ops->release_pipe(inode);
 out:
@@ -209,6 +225,7 @@
 	}
 	msg = filp->private_data;
 	if (msg == NULL) {
+		spin_lock(&inode->i_lock);
 		if (!list_empty(&rpci->pipe)) {
 			msg = list_entry(rpci->pipe.next,
 					struct rpc_pipe_msg,
@@ -218,6 +235,7 @@
 			filp->private_data = msg;
 			msg->copied = 0;
 		}
+		spin_unlock(&inode->i_lock);
 		if (msg == NULL)
 			goto out_unlock;
 	}
@@ -225,7 +243,9 @@
 	res = rpci->ops->upcall(filp, msg, buf, len);
 	if (res < 0 || msg->len == msg->copied) {
 		filp->private_data = NULL;
-		list_del_init(&msg->list);
+		spin_lock(&inode->i_lock);
+		list_del(&msg->list);
+		spin_unlock(&inode->i_lock);
 		rpci->ops->destroy_msg(msg);
 	}
 out_unlock: