Merge branch 'bugfixes' of git://git.linux-nfs.org/projects/trondmy/nfs-2.6

* 'bugfixes' of git://git.linux-nfs.org/projects/trondmy/nfs-2.6:
  NFSv4: Fix a regression in the NFSv4 state manager
  NFSv4: Release the sequence id before restarting a CLOSE rpc call
  nfs41: fix session fore channel negotiation
  nfs41: do not zero seqid portion of stateid on close
  nfs: run state manager in privileged mode
  nfs: make recovery state manager operations privileged
  nfs: enforce FIFO ordering of operations trying to acquire slot
  rpc: add a new priority in RPC task
  nfs: remove rpc_task argument from nfs4_find_slot
  rpc: add rpc_queue_empty function
  nfs: change nfs4_do_setlk params to identify recovery type
  nfs: do not do a LOOKUP after open
  nfs: minor cleanup of session draining
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 7e57b04..865265b 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -108,6 +108,10 @@
 	NFS_OWNER_RECLAIM_NOGRACE
 };
 
+#define NFS_LOCK_NEW		0
+#define NFS_LOCK_RECLAIM	1
+#define NFS_LOCK_EXPIRED	2
+
 /*
  * struct nfs4_state maintains the client-side state for a given
  * (state_owner,inode) tuple (OPEN) or state_owner (LOCK).
@@ -282,6 +286,7 @@
 extern int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task);
 extern void nfs_increment_open_seqid(int status, struct nfs_seqid *seqid);
 extern void nfs_increment_lock_seqid(int status, struct nfs_seqid *seqid);
+extern void nfs_release_seqid(struct nfs_seqid *seqid);
 extern void nfs_free_seqid(struct nfs_seqid *seqid);
 
 extern const nfs4_stateid zero_stateid;
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 9f5f11e..198d51d 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -64,6 +64,7 @@
 
 struct nfs4_opendata;
 static int _nfs4_proc_open(struct nfs4_opendata *data);
+static int _nfs4_recover_proc_open(struct nfs4_opendata *data);
 static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *);
 static int nfs4_async_handle_error(struct rpc_task *, const struct nfs_server *, struct nfs4_state *);
 static int _nfs4_proc_lookup(struct inode *dir, const struct qstr *name, struct nfs_fh *fhandle, struct nfs_fattr *fattr);
@@ -341,6 +342,27 @@
 		free_slotid, tbl->highest_used_slotid);
 }
 
+/*
+ * Signal state manager thread if session is drained
+ */
+static void nfs41_check_drain_session_complete(struct nfs4_session *ses)
+{
+	struct rpc_task *task;
+
+	if (!test_bit(NFS4CLNT_SESSION_DRAINING, &ses->clp->cl_state)) {
+		task = rpc_wake_up_next(&ses->fc_slot_table.slot_tbl_waitq);
+		if (task)
+			rpc_task_set_priority(task, RPC_PRIORITY_PRIVILEGED);
+		return;
+	}
+
+	if (ses->fc_slot_table.highest_used_slotid != -1)
+		return;
+
+	dprintk("%s COMPLETE: Session Drained\n", __func__);
+	complete(&ses->complete);
+}
+
 static void nfs41_sequence_free_slot(const struct nfs_client *clp,
 			      struct nfs4_sequence_res *res)
 {
@@ -356,15 +378,7 @@
 
 	spin_lock(&tbl->slot_tbl_lock);
 	nfs4_free_slot(tbl, res->sr_slotid);
-
-	/* Signal state manager thread if session is drained */
-	if (test_bit(NFS4CLNT_SESSION_DRAINING, &clp->cl_state)) {
-		if (tbl->highest_used_slotid == -1) {
-			dprintk("%s COMPLETE: Session Drained\n", __func__);
-			complete(&clp->cl_session->complete);
-		}
-	} else
-		rpc_wake_up_next(&tbl->slot_tbl_waitq);
+	nfs41_check_drain_session_complete(clp->cl_session);
 	spin_unlock(&tbl->slot_tbl_lock);
 	res->sr_slotid = NFS4_MAX_SLOT_TABLE;
 }
@@ -421,7 +435,7 @@
  * Note: must be called with under the slot_tbl_lock.
  */
 static u8
-nfs4_find_slot(struct nfs4_slot_table *tbl, struct rpc_task *task)
+nfs4_find_slot(struct nfs4_slot_table *tbl)
 {
 	int slotid;
 	u8 ret_id = NFS4_MAX_SLOT_TABLE;
@@ -463,7 +477,8 @@
 	tbl = &session->fc_slot_table;
 
 	spin_lock(&tbl->slot_tbl_lock);
-	if (test_bit(NFS4CLNT_SESSION_DRAINING, &session->clp->cl_state)) {
+	if (test_bit(NFS4CLNT_SESSION_DRAINING, &session->clp->cl_state) &&
+	    !rpc_task_has_priority(task, RPC_PRIORITY_PRIVILEGED)) {
 		/*
 		 * The state manager will wait until the slot table is empty.
 		 * Schedule the reset thread
@@ -474,7 +489,15 @@
 		return -EAGAIN;
 	}
 
-	slotid = nfs4_find_slot(tbl, task);
+	if (!rpc_queue_empty(&tbl->slot_tbl_waitq) &&
+	    !rpc_task_has_priority(task, RPC_PRIORITY_PRIVILEGED)) {
+		rpc_sleep_on(&tbl->slot_tbl_waitq, task, NULL);
+		spin_unlock(&tbl->slot_tbl_lock);
+		dprintk("%s enforce FIFO order\n", __func__);
+		return -EAGAIN;
+	}
+
+	slotid = nfs4_find_slot(tbl);
 	if (slotid == NFS4_MAX_SLOT_TABLE) {
 		rpc_sleep_on(&tbl->slot_tbl_waitq, task, NULL);
 		spin_unlock(&tbl->slot_tbl_lock);
@@ -483,6 +506,7 @@
 	}
 	spin_unlock(&tbl->slot_tbl_lock);
 
+	rpc_task_set_priority(task, RPC_PRIORITY_NORMAL);
 	slot = tbl->slots + slotid;
 	args->sa_session = session;
 	args->sa_slotid = slotid;
@@ -545,6 +569,12 @@
 	rpc_call_start(task);
 }
 
+static void nfs41_call_priv_sync_prepare(struct rpc_task *task, void *calldata)
+{
+	rpc_task_set_priority(task, RPC_PRIORITY_PRIVILEGED);
+	nfs41_call_sync_prepare(task, calldata);
+}
+
 static void nfs41_call_sync_done(struct rpc_task *task, void *calldata)
 {
 	struct nfs41_call_sync_data *data = calldata;
@@ -557,12 +587,18 @@
 	.rpc_call_done = nfs41_call_sync_done,
 };
 
+struct rpc_call_ops nfs41_call_priv_sync_ops = {
+	.rpc_call_prepare = nfs41_call_priv_sync_prepare,
+	.rpc_call_done = nfs41_call_sync_done,
+};
+
 static int nfs4_call_sync_sequence(struct nfs_client *clp,
 				   struct rpc_clnt *clnt,
 				   struct rpc_message *msg,
 				   struct nfs4_sequence_args *args,
 				   struct nfs4_sequence_res *res,
-				   int cache_reply)
+				   int cache_reply,
+				   int privileged)
 {
 	int ret;
 	struct rpc_task *task;
@@ -580,6 +616,8 @@
 	};
 
 	res->sr_slotid = NFS4_MAX_SLOT_TABLE;
+	if (privileged)
+		task_setup.callback_ops = &nfs41_call_priv_sync_ops;
 	task = rpc_run_task(&task_setup);
 	if (IS_ERR(task))
 		ret = PTR_ERR(task);
@@ -597,7 +635,7 @@
 			    int cache_reply)
 {
 	return nfs4_call_sync_sequence(server->nfs_client, server->client,
-				       msg, args, res, cache_reply);
+				       msg, args, res, cache_reply, 0);
 }
 
 #endif /* CONFIG_NFS_V4_1 */
@@ -1035,7 +1073,7 @@
 	memset(&opendata->o_res, 0, sizeof(opendata->o_res));
 	memset(&opendata->c_res, 0, sizeof(opendata->c_res));
 	nfs4_init_opendata_res(opendata);
-	ret = _nfs4_proc_open(opendata);
+	ret = _nfs4_recover_proc_open(opendata);
 	if (ret != 0)
 		return ret; 
 	newstate = nfs4_opendata_to_nfs4_state(opendata);
@@ -1326,6 +1364,12 @@
 
 }
 
+static void nfs4_recover_open_prepare(struct rpc_task *task, void *calldata)
+{
+	rpc_task_set_priority(task, RPC_PRIORITY_PRIVILEGED);
+	nfs4_open_prepare(task, calldata);
+}
+
 static void nfs4_open_done(struct rpc_task *task, void *calldata)
 {
 	struct nfs4_opendata *data = calldata;
@@ -1384,10 +1428,13 @@
 	.rpc_release = nfs4_open_release,
 };
 
-/*
- * Note: On error, nfs4_proc_open will free the struct nfs4_opendata
- */
-static int _nfs4_proc_open(struct nfs4_opendata *data)
+static const struct rpc_call_ops nfs4_recover_open_ops = {
+	.rpc_call_prepare = nfs4_recover_open_prepare,
+	.rpc_call_done = nfs4_open_done,
+	.rpc_release = nfs4_open_release,
+};
+
+static int nfs4_run_open_task(struct nfs4_opendata *data, int isrecover)
 {
 	struct inode *dir = data->dir->d_inode;
 	struct nfs_server *server = NFS_SERVER(dir);
@@ -1414,21 +1461,57 @@
 	data->rpc_done = 0;
 	data->rpc_status = 0;
 	data->cancelled = 0;
+	if (isrecover)
+		task_setup_data.callback_ops = &nfs4_recover_open_ops;
 	task = rpc_run_task(&task_setup_data);
-	if (IS_ERR(task))
-		return PTR_ERR(task);
-	status = nfs4_wait_for_completion_rpc_task(task);
-	if (status != 0) {
-		data->cancelled = 1;
-		smp_wmb();
-	} else
-		status = data->rpc_status;
-	rpc_put_task(task);
+        if (IS_ERR(task))
+                return PTR_ERR(task);
+        status = nfs4_wait_for_completion_rpc_task(task);
+        if (status != 0) {
+                data->cancelled = 1;
+                smp_wmb();
+        } else
+                status = data->rpc_status;
+        rpc_put_task(task);
+
+	return status;
+}
+
+static int _nfs4_recover_proc_open(struct nfs4_opendata *data)
+{
+	struct inode *dir = data->dir->d_inode;
+	struct nfs_openres *o_res = &data->o_res;
+        int status;
+
+	status = nfs4_run_open_task(data, 1);
 	if (status != 0 || !data->rpc_done)
 		return status;
 
-	if (o_res->fh.size == 0)
-		_nfs4_proc_lookup(dir, o_arg->name, &o_res->fh, o_res->f_attr);
+	nfs_refresh_inode(dir, o_res->dir_attr);
+
+	if (o_res->rflags & NFS4_OPEN_RESULT_CONFIRM) {
+		status = _nfs4_proc_open_confirm(data);
+		if (status != 0)
+			return status;
+	}
+
+	return status;
+}
+
+/*
+ * Note: On error, nfs4_proc_open will free the struct nfs4_opendata
+ */
+static int _nfs4_proc_open(struct nfs4_opendata *data)
+{
+	struct inode *dir = data->dir->d_inode;
+	struct nfs_server *server = NFS_SERVER(dir);
+	struct nfs_openargs *o_arg = &data->o_arg;
+	struct nfs_openres *o_res = &data->o_res;
+	int status;
+
+	status = nfs4_run_open_task(data, 0);
+	if (status != 0 || !data->rpc_done)
+		return status;
 
 	if (o_arg->open_flags & O_CREAT) {
 		update_changeattr(dir, &o_res->cinfo);
@@ -1752,11 +1835,10 @@
 			if (calldata->arg.fmode == 0)
 				break;
 		default:
-			if (nfs4_async_handle_error(task, server, state) == -EAGAIN) {
-				nfs_restart_rpc(task, server->nfs_client);
-				return;
-			}
+			if (nfs4_async_handle_error(task, server, state) == -EAGAIN)
+				rpc_restart_call_prepare(task);
 	}
+	nfs_release_seqid(calldata->arg.seqid);
 	nfs_refresh_inode(calldata->inode, calldata->res.fattr);
 }
 
@@ -1848,8 +1930,6 @@
 	calldata->state = state;
 	calldata->arg.fh = NFS_FH(state->inode);
 	calldata->arg.stateid = &state->open_stateid;
-	if (nfs4_has_session(server->nfs_client))
-		memset(calldata->arg.stateid->data, 0, 4);    /* clear seqid */
 	/* Serialization for the sequence id */
 	calldata->arg.seqid = nfs_alloc_seqid(&state->owner->so_seqid);
 	if (calldata->arg.seqid == NULL)
@@ -3941,6 +4021,12 @@
 	dprintk("%s: done!, ret = %d\n", __func__, data->rpc_status);
 }
 
+static void nfs4_recover_lock_prepare(struct rpc_task *task, void *calldata)
+{
+	rpc_task_set_priority(task, RPC_PRIORITY_PRIVILEGED);
+	nfs4_lock_prepare(task, calldata);
+}
+
 static void nfs4_lock_done(struct rpc_task *task, void *calldata)
 {
 	struct nfs4_lockdata *data = calldata;
@@ -3996,7 +4082,13 @@
 	.rpc_release = nfs4_lock_release,
 };
 
-static int _nfs4_do_setlk(struct nfs4_state *state, int cmd, struct file_lock *fl, int reclaim)
+static const struct rpc_call_ops nfs4_recover_lock_ops = {
+	.rpc_call_prepare = nfs4_recover_lock_prepare,
+	.rpc_call_done = nfs4_lock_done,
+	.rpc_release = nfs4_lock_release,
+};
+
+static int _nfs4_do_setlk(struct nfs4_state *state, int cmd, struct file_lock *fl, int recovery_type)
 {
 	struct nfs4_lockdata *data;
 	struct rpc_task *task;
@@ -4020,8 +4112,11 @@
 		return -ENOMEM;
 	if (IS_SETLKW(cmd))
 		data->arg.block = 1;
-	if (reclaim != 0)
-		data->arg.reclaim = 1;
+	if (recovery_type > NFS_LOCK_NEW) {
+		if (recovery_type == NFS_LOCK_RECLAIM)
+			data->arg.reclaim = NFS_LOCK_RECLAIM;
+		task_setup_data.callback_ops = &nfs4_recover_lock_ops;
+	}
 	msg.rpc_argp = &data->arg,
 	msg.rpc_resp = &data->res,
 	task_setup_data.callback_data = data;
@@ -4048,7 +4143,7 @@
 		/* Cache the lock if possible... */
 		if (test_bit(NFS_DELEGATED_STATE, &state->flags) != 0)
 			return 0;
-		err = _nfs4_do_setlk(state, F_SETLK, request, 1);
+		err = _nfs4_do_setlk(state, F_SETLK, request, NFS_LOCK_RECLAIM);
 		if (err != -NFS4ERR_DELAY)
 			break;
 		nfs4_handle_exception(server, err, &exception);
@@ -4068,7 +4163,7 @@
 	do {
 		if (test_bit(NFS_DELEGATED_STATE, &state->flags) != 0)
 			return 0;
-		err = _nfs4_do_setlk(state, F_SETLK, request, 0);
+		err = _nfs4_do_setlk(state, F_SETLK, request, NFS_LOCK_EXPIRED);
 		switch (err) {
 		default:
 			goto out;
@@ -4104,7 +4199,7 @@
 		status = do_vfs_lock(request->fl_file, request);
 		goto out_unlock;
 	}
-	status = _nfs4_do_setlk(state, cmd, request, 0);
+	status = _nfs4_do_setlk(state, cmd, request, NFS_LOCK_NEW);
 	if (status != 0)
 		goto out_unlock;
 	/* Note: we always want to sleep here! */
@@ -4187,7 +4282,7 @@
 	if (err != 0)
 		goto out;
 	do {
-		err = _nfs4_do_setlk(state, F_SETLK, fl, 0);
+		err = _nfs4_do_setlk(state, F_SETLK, fl, NFS_LOCK_NEW);
 		switch (err) {
 			default:
 				printk(KERN_ERR "%s: unhandled error %d.\n",
@@ -4395,11 +4490,12 @@
 			(struct nfs4_get_lease_time_data *)calldata;
 
 	dprintk("--> %s\n", __func__);
+	rpc_task_set_priority(task, RPC_PRIORITY_PRIVILEGED);
 	/* just setup sequence, do not trigger session recovery
 	   since we're invoked within one */
 	ret = nfs41_setup_sequence(data->clp->cl_session,
-					&data->args->la_seq_args,
-					&data->res->lr_seq_res, 0, task);
+				   &data->args->la_seq_args,
+				   &data->res->lr_seq_res, 0, task);
 
 	BUG_ON(ret == -EAGAIN);
 	rpc_call_start(task);
@@ -4619,7 +4715,7 @@
 	tbl = &session->fc_slot_table;
 	tbl->highest_used_slotid = -1;
 	spin_lock_init(&tbl->slot_tbl_lock);
-	rpc_init_wait_queue(&tbl->slot_tbl_waitq, "ForeChannel Slot table");
+	rpc_init_priority_wait_queue(&tbl->slot_tbl_waitq, "ForeChannel Slot table");
 
 	tbl = &session->bc_slot_table;
 	tbl->highest_used_slotid = -1;
@@ -4838,14 +4934,22 @@
 {
 	struct nfs_client *clp = server->nfs_client;
 	struct nfs4_session *session;
+	unsigned int rsize, wsize;
 	int ret;
 
 	if (!nfs4_has_session(clp))
 		return 0;
 
+	rsize = server->rsize;
+	if (rsize == 0)
+		rsize = NFS_MAX_FILE_IO_SIZE;
+	wsize = server->wsize;
+	if (wsize == 0)
+		wsize = NFS_MAX_FILE_IO_SIZE;
+
 	session = clp->cl_session;
-	session->fc_attrs.max_rqst_sz = server->wsize + nfs41_maxwrite_overhead;
-	session->fc_attrs.max_resp_sz = server->rsize + nfs41_maxread_overhead;
+	session->fc_attrs.max_rqst_sz = wsize + nfs41_maxwrite_overhead;
+	session->fc_attrs.max_resp_sz = rsize + nfs41_maxread_overhead;
 
 	ret = nfs4_recover_expired_lease(server);
 	if (!ret)
@@ -4871,7 +4975,7 @@
 	args.sa_cache_this = 0;
 
 	return nfs4_call_sync_sequence(clp, clp->cl_rpcclient, &msg, &args,
-				       &res, 0);
+				       &res, args.sa_cache_this, 1);
 }
 
 void nfs41_sequence_call_done(struct rpc_task *task, void *data)
@@ -4953,6 +5057,7 @@
 {
 	struct nfs4_reclaim_complete_data *calldata = data;
 
+	rpc_task_set_priority(task, RPC_PRIORITY_PRIVILEGED);
 	if (nfs4_setup_sequence(calldata->clp, &calldata->arg.seq_args,
 				&calldata->res.seq_res, 0, task))
 		return;
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index e76427e..6d263ed 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -135,16 +135,30 @@
 	return status;
 }
 
-static void nfs41_end_drain_session(struct nfs_client *clp,
-		struct nfs4_session *ses)
+static void nfs4_end_drain_session(struct nfs_client *clp)
 {
-	if (test_and_clear_bit(NFS4CLNT_SESSION_DRAINING, &clp->cl_state))
-		rpc_wake_up(&ses->fc_slot_table.slot_tbl_waitq);
+	struct nfs4_session *ses = clp->cl_session;
+	int max_slots;
+
+	if (test_and_clear_bit(NFS4CLNT_SESSION_DRAINING, &clp->cl_state)) {
+		spin_lock(&ses->fc_slot_table.slot_tbl_lock);
+		max_slots = ses->fc_slot_table.max_slots;
+		while (max_slots--) {
+			struct rpc_task *task;
+
+			task = rpc_wake_up_next(&ses->fc_slot_table.
+						slot_tbl_waitq);
+			if (!task)
+				break;
+			rpc_task_set_priority(task, RPC_PRIORITY_PRIVILEGED);
+		}
+		spin_unlock(&ses->fc_slot_table.slot_tbl_lock);
+	}
 }
 
-static int nfs41_begin_drain_session(struct nfs_client *clp,
-		struct nfs4_session *ses)
+static int nfs4_begin_drain_session(struct nfs_client *clp)
 {
+	struct nfs4_session *ses = clp->cl_session;
 	struct nfs4_slot_table *tbl = &ses->fc_slot_table;
 
 	spin_lock(&tbl->slot_tbl_lock);
@@ -162,16 +176,13 @@
 {
 	int status;
 
-	status = nfs41_begin_drain_session(clp, clp->cl_session);
-	if (status != 0)
-		goto out;
+	nfs4_begin_drain_session(clp);
 	status = nfs4_proc_exchange_id(clp, cred);
 	if (status != 0)
 		goto out;
 	status = nfs4_proc_create_session(clp);
 	if (status != 0)
 		goto out;
-	nfs41_end_drain_session(clp, clp->cl_session);
 	nfs41_setup_state_renewal(clp);
 	nfs_mark_client_ready(clp, NFS_CS_READY);
 out:
@@ -755,16 +766,21 @@
 	return new;
 }
 
-void nfs_free_seqid(struct nfs_seqid *seqid)
+void nfs_release_seqid(struct nfs_seqid *seqid)
 {
 	if (!list_empty(&seqid->list)) {
 		struct rpc_sequence *sequence = seqid->sequence->sequence;
 
 		spin_lock(&sequence->lock);
-		list_del(&seqid->list);
+		list_del_init(&seqid->list);
 		spin_unlock(&sequence->lock);
 		rpc_wake_up(&sequence->wait);
 	}
+}
+
+void nfs_free_seqid(struct nfs_seqid *seqid)
+{
+	nfs_release_seqid(seqid);
 	kfree(seqid);
 }
 
@@ -1257,13 +1273,9 @@
 
 static int nfs4_reset_session(struct nfs_client *clp)
 {
-	struct nfs4_session *ses = clp->cl_session;
 	int status;
 
-	status = nfs41_begin_drain_session(clp, ses);
-	if (status != 0)
-		return status;
-
+	nfs4_begin_drain_session(clp);
 	status = nfs4_proc_destroy_session(clp->cl_session);
 	if (status && status != -NFS4ERR_BADSESSION &&
 	    status != -NFS4ERR_DEADSESSION) {
@@ -1279,19 +1291,17 @@
 out:
 	/*
 	 * Let the state manager reestablish state
-	 * without waking other tasks yet.
 	 */
-	if (!test_bit(NFS4CLNT_LEASE_EXPIRED, &clp->cl_state)) {
-		/* Wake up the next rpc task */
-		nfs41_end_drain_session(clp, ses);
-		if (status == 0)
-			nfs41_setup_state_renewal(clp);
-	}
+	if (!test_bit(NFS4CLNT_LEASE_EXPIRED, &clp->cl_state) &&
+	    status == 0)
+		nfs41_setup_state_renewal(clp);
+
 	return status;
 }
 
 #else /* CONFIG_NFS_V4_1 */
 static int nfs4_reset_session(struct nfs_client *clp) { return 0; }
+static int nfs4_end_drain_session(struct nfs_client *clp) { return 0; }
 #endif /* CONFIG_NFS_V4_1 */
 
 /* Set NFS4CLNT_LEASE_EXPIRED for all v4.0 errors and for recoverable errors
@@ -1382,6 +1392,7 @@
 				goto out_error;
 		}
 
+		nfs4_end_drain_session(clp);
 		if (test_and_clear_bit(NFS4CLNT_DELEGRETURN, &clp->cl_state)) {
 			nfs_client_return_marked_delegations(clp);
 			continue;
@@ -1398,6 +1409,7 @@
 out_error:
 	printk(KERN_WARNING "Error: state manager failed on NFSv4 server %s"
 			" with error %d\n", clp->cl_hostname, -status);
+	nfs4_end_drain_session(clp);
 	nfs4_clear_state_manager_bit(clp);
 }
 
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 1906782..7bc7fd5 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -173,7 +173,8 @@
 #define RPC_PRIORITY_LOW	(-1)
 #define RPC_PRIORITY_NORMAL	(0)
 #define RPC_PRIORITY_HIGH	(1)
-#define RPC_NR_PRIORITY		(1 + RPC_PRIORITY_HIGH - RPC_PRIORITY_LOW)
+#define RPC_PRIORITY_PRIVILEGED	(2)
+#define RPC_NR_PRIORITY		(1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW)
 
 struct rpc_timer {
 	struct timer_list timer;
@@ -229,6 +230,7 @@
 void		rpc_wake_up(struct rpc_wait_queue *);
 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
 void		rpc_wake_up_status(struct rpc_wait_queue *, int);
+int		rpc_queue_empty(struct rpc_wait_queue *);
 void		rpc_delay(struct rpc_task *, unsigned long);
 void *		rpc_malloc(struct rpc_task *, size_t);
 void		rpc_free(void *);
@@ -254,6 +256,16 @@
 	return __rpc_wait_for_completion_task(task, NULL);
 }
 
+static inline void rpc_task_set_priority(struct rpc_task *task, unsigned char prio)
+{
+	task->tk_priority = prio - RPC_PRIORITY_LOW;
+}
+
+static inline int rpc_task_has_priority(struct rpc_task *task, unsigned char prio)
+{
+	return (task->tk_priority + RPC_PRIORITY_LOW == prio);
+}
+
 #ifdef RPC_DEBUG
 static inline const char * rpc_qname(struct rpc_wait_queue *q)
 {
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index cef74ba..aae6907 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -210,6 +210,7 @@
 {
 	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
 }
+EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
 
 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 {
@@ -385,6 +386,20 @@
 }
 
 /*
+ * Tests whether rpc queue is empty
+ */
+int rpc_queue_empty(struct rpc_wait_queue *queue)
+{
+	int res;
+
+	spin_lock_bh(&queue->lock);
+	res = queue->qlen;
+	spin_unlock_bh(&queue->lock);
+	return (res == 0);
+}
+EXPORT_SYMBOL_GPL(rpc_queue_empty);
+
+/*
  * Wake up a task on a specific queue
  */
 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)