NFSv4: Add functions to order RPC calls

 NFSv4 file state-changing functions such as OPEN, CLOSE, LOCK,... are all
 labelled with "sequence identifiers" in order to prevent the server from
 reordering RPC requests, as this could cause its file state to
 become out of sync with the client.

 Currently the NFS client code enforces this ordering locally using
 semaphores to restrict access to structures until the RPC call is done.
 This, of course, only works with synchronous RPC calls, since the
 user process must first grab the semaphore.
 By dropping semaphores, and instead teaching the RPC engine to hold
 the RPC calls until they are ready to be sent, we can extend this
 process to work nicely with asynchronous RPC calls too.

 This patch adds a new list called "rpc_sequence" that defines the order
 of the RPC calls to be sent. We add one such list for each state_owner.
 When an RPC call is ready to be sent, it checks if it is top of the
 rpc_sequence list. If so, it proceeds. If not, it goes back to sleep,
 and loops until it hits top of the list.
 Once the RPC call has completed, it can then bump the sequence id counter,
 and remove itself from the rpc_sequence list, and then wake up the next
 sleeper.

 Note that the state_owner sequence ids and lock_owner sequence ids are
 all indexed to the same rpc_sequence list, so OPEN, LOCK,... requests
 are all ordered w.r.t. each other.

 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 9701ca8..9ba89e7 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -218,7 +218,6 @@
 	struct nfs_delegation *delegation = NFS_I(inode)->delegation;
 	struct nfs_openargs o_arg = {
 		.fh = NFS_FH(inode),
-		.seqid = sp->so_seqid,
 		.id = sp->so_id,
 		.open_flags = state->state,
 		.clientid = server->nfs4_state->cl_clientid,
@@ -245,8 +244,13 @@
 		}
 		o_arg.u.delegation_type = delegation->type;
 	}
+	o_arg.seqid = nfs_alloc_seqid(&sp->so_seqid);
+	if (o_arg.seqid == NULL)
+		return -ENOMEM;
 	status = rpc_call_sync(server->client, &msg, RPC_TASK_NOINTR);
-	nfs4_increment_seqid(status, sp);
+	/* Confirm the sequence as being established */
+	nfs_confirm_seqid(&sp->so_seqid, status);
+	nfs_increment_open_seqid(status, o_arg.seqid);
 	if (status == 0) {
 		memcpy(&state->stateid, &o_res.stateid, sizeof(state->stateid));
 		if (o_res.delegation_type != 0) {
@@ -256,6 +260,7 @@
 				nfs_async_inode_return_delegation(inode, &o_res.stateid);
 		}
 	}
+	nfs_free_seqid(o_arg.seqid);
 	clear_bit(NFS_DELEGATED_STATE, &state->flags);
 	/* Ensure we update the inode attributes */
 	NFS_CACHEINV(inode);
@@ -307,16 +312,20 @@
 		goto out;
 	if (state->state == 0)
 		goto out;
-	arg.seqid = sp->so_seqid;
+	arg.seqid = nfs_alloc_seqid(&sp->so_seqid);
+	status = -ENOMEM;
+	if (arg.seqid == NULL)
+		goto out;
 	arg.open_flags = state->state;
 	memcpy(arg.u.delegation.data, state->stateid.data, sizeof(arg.u.delegation.data));
 	status = rpc_call_sync(server->client, &msg, RPC_TASK_NOINTR);
-	nfs4_increment_seqid(status, sp);
+	nfs_increment_open_seqid(status, arg.seqid);
 	if (status >= 0) {
 		memcpy(state->stateid.data, res.stateid.data,
 				sizeof(state->stateid.data));
 		clear_bit(NFS_DELEGATED_STATE, &state->flags);
 	}
+	nfs_free_seqid(arg.seqid);
 out:
 	up(&sp->so_sema);
 	dput(parent);
@@ -345,11 +354,11 @@
 	return err;
 }
 
-static inline int _nfs4_proc_open_confirm(struct rpc_clnt *clnt, const struct nfs_fh *fh, struct nfs4_state_owner *sp, nfs4_stateid *stateid)
+static inline int _nfs4_proc_open_confirm(struct rpc_clnt *clnt, const struct nfs_fh *fh, struct nfs4_state_owner *sp, nfs4_stateid *stateid, struct nfs_seqid *seqid)
 {
 	struct nfs_open_confirmargs arg = {
 		.fh             = fh,
-		.seqid          = sp->so_seqid,
+		.seqid          = seqid,
 		.stateid	= *stateid,
 	};
 	struct nfs_open_confirmres res;
@@ -362,7 +371,9 @@
 	int status;
 
 	status = rpc_call_sync(clnt, &msg, RPC_TASK_NOINTR);
-	nfs4_increment_seqid(status, sp);
+	/* Confirm the sequence as being established */
+	nfs_confirm_seqid(&sp->so_seqid, status);
+	nfs_increment_open_seqid(status, seqid);
 	if (status >= 0)
 		memcpy(stateid, &res.stateid, sizeof(*stateid));
 	return status;
@@ -380,21 +391,21 @@
 	int status;
 
 	/* Update sequence id. The caller must serialize! */
-	o_arg->seqid = sp->so_seqid;
 	o_arg->id = sp->so_id;
 	o_arg->clientid = sp->so_client->cl_clientid;
 
 	status = rpc_call_sync(server->client, &msg, RPC_TASK_NOINTR);
-	nfs4_increment_seqid(status, sp);
+	nfs_increment_open_seqid(status, o_arg->seqid);
 	if (status != 0)
 		goto out;
 	update_changeattr(dir, &o_res->cinfo);
 	if(o_res->rflags & NFS4_OPEN_RESULT_CONFIRM) {
 		status = _nfs4_proc_open_confirm(server->client, &o_res->fh,
-				sp, &o_res->stateid);
+				sp, &o_res->stateid, o_arg->seqid);
 		if (status != 0)
 			goto out;
 	}
+	nfs_confirm_seqid(&sp->so_seqid, 0);
 	if (!(o_res->f_attr->valid & NFS_ATTR_FATTR))
 		status = server->rpc_ops->getattr(server, &o_res->fh, o_res->f_attr);
 out:
@@ -465,6 +476,10 @@
 		set_bit(NFS_DELEGATED_STATE, &state->flags);
 		goto out;
 	}
+	o_arg.seqid = nfs_alloc_seqid(&sp->so_seqid);
+	status = -ENOMEM;
+	if (o_arg.seqid == NULL)
+		goto out;
 	status = _nfs4_proc_open(dir, sp, &o_arg, &o_res);
 	if (status != 0)
 		goto out_nodeleg;
@@ -490,6 +505,7 @@
 			nfs_inode_reclaim_delegation(inode, sp->so_cred, &o_res);
 	}
 out_nodeleg:
+	nfs_free_seqid(o_arg.seqid);
 	clear_bit(NFS_DELEGATED_STATE, &state->flags);
 out:
 	dput(parent);
@@ -667,6 +683,9 @@
 	/* Serialization for the sequence id */
 	down(&sp->so_sema);
 
+	o_arg.seqid = nfs_alloc_seqid(&sp->so_seqid);
+	if (o_arg.seqid == NULL)
+		return -ENOMEM;
 	status = _nfs4_proc_open(dir, sp, &o_arg, &o_res);
 	if (status != 0)
 		goto out_err;
@@ -681,6 +700,7 @@
 	update_open_stateid(state, &o_res.stateid, flags);
 	if (o_res.delegation_type != 0)
 		nfs_inode_set_delegation(inode, cred, &o_res);
+	nfs_free_seqid(o_arg.seqid);
 	up(&sp->so_sema);
 	nfs4_put_state_owner(sp);
 	up_read(&clp->cl_sem);
@@ -690,6 +710,7 @@
 	if (sp != NULL) {
 		if (state != NULL)
 			nfs4_put_open_state(state);
+		nfs_free_seqid(o_arg.seqid);
 		up(&sp->so_sema);
 		nfs4_put_state_owner(sp);
 	}
@@ -718,7 +739,7 @@
 		 * It is actually a sign of a bug on the client or on the server.
 		 *
 		 * If we receive a BAD_SEQID error in the particular case of
-		 * doing an OPEN, we assume that nfs4_increment_seqid() will
+		 * doing an OPEN, we assume that nfs_increment_open_seqid() will
 		 * have unhashed the old state_owner for us, and that we can
 		 * therefore safely retry using a new one. We should still warn
 		 * the user though...
@@ -799,7 +820,7 @@
         /* hmm. we are done with the inode, and in the process of freeing
 	 * the state_owner. we keep this around to process errors
 	 */
-	nfs4_increment_seqid(task->tk_status, sp);
+	nfs_increment_open_seqid(task->tk_status, calldata->arg.seqid);
 	switch (task->tk_status) {
 		case 0:
 			memcpy(&state->stateid, &calldata->res.stateid,
@@ -818,6 +839,7 @@
 	}
 	state->state = calldata->arg.open_flags;
 	nfs4_put_open_state(state);
+	nfs_free_seqid(calldata->arg.seqid);
 	up(&sp->so_sema);
 	nfs4_put_state_owner(sp);
 	up_read(&server->nfs4_state->cl_sem);
@@ -865,7 +887,11 @@
 	calldata->state = state;
 	calldata->arg.fh = NFS_FH(inode);
 	/* Serialization for the sequence id */
-	calldata->arg.seqid = state->owner->so_seqid;
+	calldata->arg.seqid = nfs_alloc_seqid(&state->owner->so_seqid);
+	if (calldata->arg.seqid == NULL) {
+		kfree(calldata);
+		return -ENOMEM;
+	}
 	calldata->arg.open_flags = mode;
 	memcpy(&calldata->arg.stateid, &state->stateid,
 			sizeof(calldata->arg.stateid));
@@ -2729,15 +2755,19 @@
 	/* We might have lost the locks! */
 	if ((lsp->ls_flags & NFS_LOCK_INITIALIZED) == 0)
 		goto out;
-	luargs.seqid = lsp->ls_seqid;
-	memcpy(&luargs.stateid, &lsp->ls_stateid, sizeof(luargs.stateid));
+	luargs.seqid = nfs_alloc_seqid(&lsp->ls_seqid);
+	status = -ENOMEM;
+	if (luargs.seqid == NULL)
+		goto out;
+	memcpy(luargs.stateid.data, lsp->ls_stateid.data, sizeof(luargs.stateid.data));
 	arg.u.locku = &luargs;
 	status = rpc_call_sync(server->client, &msg, RPC_TASK_NOINTR);
-	nfs4_increment_lock_seqid(status, lsp);
+	nfs_increment_lock_seqid(status, luargs.seqid);
 
 	if (status == 0)
-		memcpy(&lsp->ls_stateid,  &res.u.stateid, 
-				sizeof(lsp->ls_stateid));
+		memcpy(lsp->ls_stateid.data, res.u.stateid.data, 
+				sizeof(lsp->ls_stateid.data));
+	nfs_free_seqid(luargs.seqid);
 out:
 	up(&state->lock_sema);
 	if (status == 0)
@@ -2783,9 +2813,13 @@
 		.reclaim = reclaim,
 		.new_lock_owner = 0,
 	};
-	int status;
+	struct nfs_seqid *lock_seqid;
+	int status = -ENOMEM;
 
-	if (!(lsp->ls_flags & NFS_LOCK_INITIALIZED)) {
+	lock_seqid = nfs_alloc_seqid(&lsp->ls_seqid);
+	if (lock_seqid == NULL)
+		return -ENOMEM;
+	if (!(lsp->ls_seqid.flags & NFS_SEQID_CONFIRMED)) {
 		struct nfs4_state_owner *owner = state->owner;
 		struct nfs_open_to_lock otl = {
 			.lock_owner = {
@@ -2793,39 +2827,40 @@
 			},
 		};
 
-		otl.lock_seqid = lsp->ls_seqid;
+		otl.lock_seqid = lock_seqid;
 		otl.lock_owner.id = lsp->ls_id;
 		memcpy(&otl.open_stateid, &state->stateid, sizeof(otl.open_stateid));
 		largs.u.open_lock = &otl;
 		largs.new_lock_owner = 1;
 		arg.u.lock = &largs;
 		down(&owner->so_sema);
-		otl.open_seqid = owner->so_seqid;
-		status = rpc_call_sync(server->client, &msg, RPC_TASK_NOINTR);
-		/* increment open_owner seqid on success, and 
-		* seqid mutating errors */
-		nfs4_increment_seqid(status, owner);
-		up(&owner->so_sema);
-		if (status == 0) {
-			lsp->ls_flags |= NFS_LOCK_INITIALIZED;
-			lsp->ls_seqid++;
+		otl.open_seqid = nfs_alloc_seqid(&owner->so_seqid);
+		if (otl.open_seqid != NULL) {
+			status = rpc_call_sync(server->client, &msg, RPC_TASK_NOINTR);
+			/* increment seqid on success, and seqid mutating errors */
+			nfs_increment_open_seqid(status, otl.open_seqid);
+			nfs_free_seqid(otl.open_seqid);
 		}
+		up(&owner->so_sema);
+		if (status == 0)
+			nfs_confirm_seqid(&lsp->ls_seqid, 0);
 	} else {
-		struct nfs_exist_lock el = {
-			.seqid = lsp->ls_seqid,
-		};
+		struct nfs_exist_lock el;
 		memcpy(&el.stateid, &lsp->ls_stateid, sizeof(el.stateid));
 		largs.u.exist_lock = &el;
 		arg.u.lock = &largs;
+		el.seqid = lock_seqid;
 		status = rpc_call_sync(server->client, &msg, RPC_TASK_NOINTR);
-		/* increment seqid on success, and * seqid mutating errors*/
-		nfs4_increment_lock_seqid(status, lsp);
 	}
+	/* increment seqid on success, and seqid mutating errors*/
+	nfs_increment_lock_seqid(status, lock_seqid);
 	/* save the returned stateid. */
-	if (status == 0)
-		memcpy(&lsp->ls_stateid, &res.u.stateid, sizeof(nfs4_stateid));
-	else if (status == -NFS4ERR_DENIED)
+	if (status == 0) {
+		memcpy(lsp->ls_stateid.data, res.u.stateid.data, sizeof(lsp->ls_stateid.data));
+		lsp->ls_flags |= NFS_LOCK_INITIALIZED;
+	} else if (status == -NFS4ERR_DENIED)
 		status = -EAGAIN;
+	nfs_free_seqid(lock_seqid);
 	return status;
 }