NFSv4: Add functions to order RPC calls

 NFSv4 file state-changing functions such as OPEN, CLOSE, LOCK,... are all
 labelled with "sequence identifiers" in order to prevent the server from
 reordering RPC requests, as this could cause its file state to
 become out of sync with the client.

 Currently the NFS client code enforces this ordering locally using
 semaphores to restrict access to structures until the RPC call is done.
 This, of course, only works with synchronous RPC calls, since the
 user process must first grab the semaphore.
 By dropping semaphores, and instead teaching the RPC engine to hold
 the RPC calls until they are ready to be sent, we can extend this
 process to work nicely with asynchronous RPC calls too.

 This patch adds a new list called "rpc_sequence" that defines the order
 of the RPC calls to be sent. We add one such list for each state_owner.
 When an RPC call is ready to be sent, it checks if it is top of the
 rpc_sequence list. If so, it proceeds. If not, it goes back to sleep,
 and loops until it hits top of the list.
 Once the RPC call has completed, it can then bump the sequence id counter,
 and remove itself from the rpc_sequence list, and then wake up the next
 sleeper.

 Note that the state_owner sequence ids and lock_owner sequence ids are
 all indexed to the same rpc_sequence list, so OPEN, LOCK,... requests
 are all ordered w.r.t. each other.

 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index afe587d..f535c21 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -264,13 +264,16 @@
 {
 	struct nfs4_state_owner *sp;
 
-	sp = kmalloc(sizeof(*sp),GFP_KERNEL);
+	sp = kzalloc(sizeof(*sp),GFP_KERNEL);
 	if (!sp)
 		return NULL;
 	init_MUTEX(&sp->so_sema);
-	sp->so_seqid = 0;                 /* arbitrary */
 	INIT_LIST_HEAD(&sp->so_states);
 	INIT_LIST_HEAD(&sp->so_delegations);
+	rpc_init_wait_queue(&sp->so_sequence.wait, "Seqid_waitqueue");
+	sp->so_seqid.sequence = &sp->so_sequence;
+	spin_lock_init(&sp->so_sequence.lock);
+	INIT_LIST_HEAD(&sp->so_sequence.list);
 	atomic_set(&sp->so_count, 1);
 	return sp;
 }
@@ -553,12 +556,10 @@
 	struct nfs4_lock_state *lsp;
 	struct nfs4_client *clp = state->owner->so_client;
 
-	lsp = kmalloc(sizeof(*lsp), GFP_KERNEL);
+	lsp = kzalloc(sizeof(*lsp), GFP_KERNEL);
 	if (lsp == NULL)
 		return NULL;
-	lsp->ls_flags = 0;
-	lsp->ls_seqid = 0;	/* arbitrary */
-	memset(lsp->ls_stateid.data, 0, sizeof(lsp->ls_stateid.data));
+	lsp->ls_seqid.sequence = &state->owner->so_sequence;
 	atomic_set(&lsp->ls_count, 1);
 	lsp->ls_owner = fl_owner;
 	spin_lock(&clp->cl_lock);
@@ -673,29 +674,102 @@
 	nfs4_put_lock_state(lsp);
 }
 
-/*
-* Called with state->lock_sema and clp->cl_sem held.
-*/
-void nfs4_increment_lock_seqid(int status, struct nfs4_lock_state *lsp)
+struct nfs_seqid *nfs_alloc_seqid(struct nfs_seqid_counter *counter)
 {
-	if (status == NFS_OK || seqid_mutating_err(-status))
-		lsp->ls_seqid++;
+	struct rpc_sequence *sequence = counter->sequence;
+	struct nfs_seqid *new;
+
+	new = kmalloc(sizeof(*new), GFP_KERNEL);
+	if (new != NULL) {
+		new->sequence = counter;
+		new->task = NULL;
+		spin_lock(&sequence->lock);
+		list_add_tail(&new->list, &sequence->list);
+		spin_unlock(&sequence->lock);
+	}
+	return new;
+}
+
+void nfs_free_seqid(struct nfs_seqid *seqid)
+{
+	struct rpc_sequence *sequence = seqid->sequence->sequence;
+	struct rpc_task *next = NULL;
+
+	spin_lock(&sequence->lock);
+	list_del(&seqid->list);
+	if (!list_empty(&sequence->list)) {
+		next = list_entry(sequence->list.next, struct nfs_seqid, list)->task;
+		if (next)
+			rpc_wake_up_task(next);
+	}
+	spin_unlock(&sequence->lock);
+	kfree(seqid);
 }
 
 /*
-* Called with sp->so_sema and clp->cl_sem held.
-*
-* Increment the seqid if the OPEN/OPEN_DOWNGRADE/CLOSE succeeded, or
-* failed with a seqid incrementing error -
-* see comments nfs_fs.h:seqid_mutating_error()
-*/
-void nfs4_increment_seqid(int status, struct nfs4_state_owner *sp)
+ * Called with sp->so_sema and clp->cl_sem held.
+ *
+ * Increment the seqid if the OPEN/OPEN_DOWNGRADE/CLOSE succeeded, or
+ * failed with a seqid incrementing error -
+ * see comments nfs_fs.h:seqid_mutating_error()
+ */
+static inline void nfs_increment_seqid(int status, struct nfs_seqid *seqid)
 {
-	if (status == NFS_OK || seqid_mutating_err(-status))
-		sp->so_seqid++;
-	/* If the server returns BAD_SEQID, unhash state_owner here */
-	if (status == -NFS4ERR_BAD_SEQID)
+	switch (status) {
+		case 0:
+			break;
+		case -NFS4ERR_BAD_SEQID:
+		case -NFS4ERR_STALE_CLIENTID:
+		case -NFS4ERR_STALE_STATEID:
+		case -NFS4ERR_BAD_STATEID:
+		case -NFS4ERR_BADXDR:
+		case -NFS4ERR_RESOURCE:
+		case -NFS4ERR_NOFILEHANDLE:
+			/* Non-seqid mutating errors */
+			return;
+	};
+	/*
+	 * Note: no locking needed as we are guaranteed to be first
+	 * on the sequence list
+	 */
+	seqid->sequence->counter++;
+}
+
+void nfs_increment_open_seqid(int status, struct nfs_seqid *seqid)
+{
+	if (status == -NFS4ERR_BAD_SEQID) {
+		struct nfs4_state_owner *sp = container_of(seqid->sequence,
+				struct nfs4_state_owner, so_seqid);
 		nfs4_drop_state_owner(sp);
+	}
+	return nfs_increment_seqid(status, seqid);
+}
+
+/*
+ * Called with ls->lock_sema and clp->cl_sem held.
+ *
+ * Increment the seqid if the LOCK/LOCKU succeeded, or
+ * failed with a seqid incrementing error -
+ * see comments nfs_fs.h:seqid_mutating_error()
+ */
+void nfs_increment_lock_seqid(int status, struct nfs_seqid *seqid)
+{
+	return nfs_increment_seqid(status, seqid);
+}
+
+int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task)
+{
+	struct rpc_sequence *sequence = seqid->sequence->sequence;
+	int status = 0;
+
+	spin_lock(&sequence->lock);
+	if (sequence->list.next != &seqid->list) {
+		seqid->task = task;
+		rpc_sleep_on(&sequence->wait, task, NULL, NULL);
+		status = -EAGAIN;
+	}
+	spin_unlock(&sequence->lock);
+	return status;
 }
 
 static int reclaimer(void *);
@@ -791,8 +865,6 @@
 		if (state->state == 0)
 			continue;
 		status = ops->recover_open(sp, state);
-		list_for_each_entry(lock, &state->lock_states, ls_locks)
-			lock->ls_flags &= ~NFS_LOCK_INITIALIZED;
 		if (status >= 0) {
 			status = nfs4_reclaim_locks(ops, state);
 			if (status < 0)
@@ -831,6 +903,26 @@
 	return status;
 }
 
+static void nfs4_state_mark_reclaim(struct nfs4_client *clp)
+{
+	struct nfs4_state_owner *sp;
+	struct nfs4_state *state;
+	struct nfs4_lock_state *lock;
+
+	/* Reset all sequence ids to zero */
+	list_for_each_entry(sp, &clp->cl_state_owners, so_list) {
+		sp->so_seqid.counter = 0;
+		sp->so_seqid.flags = 0;
+		list_for_each_entry(state, &sp->so_states, open_states) {
+			list_for_each_entry(lock, &state->lock_states, ls_locks) {
+				lock->ls_seqid.counter = 0;
+				lock->ls_seqid.flags = 0;
+				lock->ls_flags &= ~NFS_LOCK_INITIALIZED;
+			}
+		}
+	}
+}
+
 static int reclaimer(void *ptr)
 {
 	struct reclaimer_args *args = (struct reclaimer_args *)ptr;
@@ -864,6 +956,7 @@
 		default:
 			ops = &nfs4_network_partition_recovery_ops;
 	};
+	nfs4_state_mark_reclaim(clp);
 	status = __nfs4_init_client(clp);
 	if (status)
 		goto out_error;