NFSv4: Add a recovery marking scheme for state owners

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index a780518..7dcca23 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -832,6 +832,7 @@
 		clear_bit(NFS_STATE_RECLAIM_REBOOT, &state->flags);
 		return 0;
 	}
+	set_bit(NFS_OWNER_RECLAIM_REBOOT, &state->owner->so_flags);
 	set_bit(NFS4CLNT_RECLAIM_REBOOT, &clp->cl_state);
 	return 1;
 }
@@ -840,6 +841,7 @@
 {
 	set_bit(NFS_STATE_RECLAIM_NOGRACE, &state->flags);
 	clear_bit(NFS_STATE_RECLAIM_REBOOT, &state->flags);
+	set_bit(NFS_OWNER_RECLAIM_NOGRACE, &state->owner->so_flags);
 	set_bit(NFS4CLNT_RECLAIM_NOGRACE, &clp->cl_state);
 	return 1;
 }
@@ -1043,14 +1045,25 @@
 	struct rb_node *pos;
 	int status = 0;
 
-	/* Note: list is protected by exclusive lock on cl->cl_sem */
+restart:
+	spin_lock(&clp->cl_lock);
 	for (pos = rb_first(&clp->cl_state_owners); pos != NULL; pos = rb_next(pos)) {
 		struct nfs4_state_owner *sp = rb_entry(pos, struct nfs4_state_owner, so_client_node);
+		if (!test_and_clear_bit(ops->owner_flag_bit, &sp->so_flags))
+			continue;
+		atomic_inc(&sp->so_count);
+		spin_unlock(&clp->cl_lock);
 		status = nfs4_reclaim_open_state(sp, ops);
-		if (status < 0)
-			break;
+		if (status < 0) {
+			set_bit(ops->owner_flag_bit, &sp->so_flags);
+			nfs4_put_state_owner(sp);
+			nfs4_recovery_handle_error(clp, status);
+			return status;
+		}
+		nfs4_put_state_owner(sp);
+		goto restart;
 	}
-	nfs4_recovery_handle_error(clp, status);
+	spin_unlock(&clp->cl_lock);
 	return status;
 }