nfs41: nfs41: fix state manager deadlock in session reset

If the session is reset during state recovery, the state manager thread can
sleep on the slot_tbl_waitq causing a deadlock.

Add a completion framework to the session.  Have the state manager thread set
a new session state (NFS4CLNT_SESSION_DRAINING) and wait for the session slot
table to drain.

Signal the state manager thread in nfs41_sequence_free_slot when the
NFS4CLNT_SESSION_DRAINING bit is set and the session is drained.

Reported-by: Trond Myklebust <trond@netapp.com>
Signed-off-by: Andy Adamson <andros@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index e9ecd6b..5c77401 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -45,6 +45,7 @@
 	NFS4CLNT_RECLAIM_NOGRACE,
 	NFS4CLNT_DELEGRETURN,
 	NFS4CLNT_SESSION_RESET,
+	NFS4CLNT_SESSION_DRAINING,
 };
 
 /*
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index fb62919..c1bc9ca 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -361,6 +361,16 @@
 	}
 	nfs4_free_slot(tbl, res->sr_slotid);
 	res->sr_slotid = NFS4_MAX_SLOT_TABLE;
+
+	/* Signal state manager thread if session is drained */
+	if (test_bit(NFS4CLNT_SESSION_DRAINING, &clp->cl_state)) {
+		spin_lock(&tbl->slot_tbl_lock);
+		if (tbl->highest_used_slotid == -1) {
+			dprintk("%s COMPLETE: Session Drained\n", __func__);
+			complete(&clp->cl_session->complete);
+		}
+		spin_unlock(&tbl->slot_tbl_lock);
+	}
 }
 
 static void nfs41_sequence_done(struct nfs_client *clp,
@@ -457,15 +467,11 @@
 
 	spin_lock(&tbl->slot_tbl_lock);
 	if (test_bit(NFS4CLNT_SESSION_RESET, &session->clp->cl_state)) {
-		if (tbl->highest_used_slotid != -1) {
-			rpc_sleep_on(&tbl->slot_tbl_waitq, task, NULL);
-			spin_unlock(&tbl->slot_tbl_lock);
-			dprintk("<-- %s: Session reset: draining\n", __func__);
-			return -EAGAIN;
-		}
-
-		/* The slot table is empty; start the reset thread */
-		dprintk("%s Session Reset\n", __func__);
+		/*
+		 * The state manager will wait until the slot table is empty.
+		 * Schedule the reset thread
+		 */
+		dprintk("%s Schedule Session Reset\n", __func__);
 		rpc_sleep_on(&tbl->slot_tbl_waitq, task, NULL);
 		nfs4_schedule_state_manager(session->clp);
 		spin_unlock(&tbl->slot_tbl_lock);
@@ -4506,6 +4512,7 @@
 			1);
 	if (status)
 		return status;
+	init_completion(&session->complete);
 
 	status = nfs4_reset_slot_table(&session->bc_slot_table,
 			session->bc_attrs.max_reqs,
@@ -4608,6 +4615,7 @@
 	 * nfs_client struct
 	 */
 	clp->cl_cons_state = NFS_CS_SESSION_INITING;
+	init_completion(&session->complete);
 
 	tbl = &session->fc_slot_table;
 	spin_lock_init(&tbl->slot_tbl_lock);
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index f56f6be..3c14335 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -1181,8 +1181,23 @@
 
 static int nfs4_reset_session(struct nfs_client *clp)
 {
+	struct nfs4_session *ses = clp->cl_session;
+	struct nfs4_slot_table *tbl = &ses->fc_slot_table;
 	int status;
 
+	INIT_COMPLETION(ses->complete);
+	spin_lock(&tbl->slot_tbl_lock);
+	if (tbl->highest_used_slotid != -1) {
+		set_bit(NFS4CLNT_SESSION_DRAINING, &clp->cl_state);
+		spin_unlock(&tbl->slot_tbl_lock);
+		status = wait_for_completion_interruptible(&ses->complete);
+		clear_bit(NFS4CLNT_SESSION_DRAINING, &clp->cl_state);
+		if (status) /* -ERESTARTSYS */
+			goto out;
+	} else {
+		spin_unlock(&tbl->slot_tbl_lock);
+	}
+
 	status = nfs4_proc_destroy_session(clp->cl_session);
 	if (status && status != -NFS4ERR_BADSESSION &&
 	    status != -NFS4ERR_DEADSESSION) {