[PATCH] NLM: fix a client-side race on blocking locks.

 If the lock blocks, the server may send us a GRANTED message that
 races with the reply to our LOCK request. Make sure that we catch
 the GRANTED by queueing up our request on the nlm_blocked list
 before we send off the first LOCK rpc call.

 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/lockd/clntlock.c b/fs/lockd/clntlock.c
index 44adb84..006bb9e 100644
--- a/fs/lockd/clntlock.c
+++ b/fs/lockd/clntlock.c
@@ -42,23 +42,51 @@
 static LIST_HEAD(nlm_blocked);
 
 /*
+ * Queue up a lock for blocking so that the GRANTED request can see it
+ */
+int nlmclnt_prepare_block(struct nlm_rqst *req, struct nlm_host *host, struct file_lock *fl)
+{
+	struct nlm_wait *block;
+
+	BUG_ON(req->a_block != NULL);
+	block = kmalloc(sizeof(*block), GFP_KERNEL);
+	if (block == NULL)
+		return -ENOMEM;
+	block->b_host = host;
+	block->b_lock = fl;
+	init_waitqueue_head(&block->b_wait);
+	block->b_status = NLM_LCK_BLOCKED;
+
+	list_add(&block->b_list, &nlm_blocked);
+	req->a_block = block;
+
+	return 0;
+}
+
+void nlmclnt_finish_block(struct nlm_rqst *req)
+{
+	struct nlm_wait *block = req->a_block;
+
+	if (block == NULL)
+		return;
+	req->a_block = NULL;
+	list_del(&block->b_list);
+	kfree(block);
+}
+
+/*
  * Block on a lock
  */
-int
-nlmclnt_block(struct nlm_host *host, struct file_lock *fl, u32 *statp)
+long nlmclnt_block(struct nlm_rqst *req, long timeout)
 {
-	struct nlm_wait	block, **head;
-	int		err;
-	u32		pstate;
+	struct nlm_wait	*block = req->a_block;
+	long ret;
 
-	block.b_host   = host;
-	block.b_lock   = fl;
-	init_waitqueue_head(&block.b_wait);
-	block.b_status = NLM_LCK_BLOCKED;
-	list_add(&block.b_list, &nlm_blocked);
-
-	/* Remember pseudo nsm state */
-	pstate = host->h_state;
+	/* A borken server might ask us to block even if we didn't
+	 * request it. Just say no!
+	 */
+	if (!req->a_args.block)
+		return -EAGAIN;
 
 	/* Go to sleep waiting for GRANT callback. Some servers seem
 	 * to lose callbacks, however, so we're going to poll from
@@ -68,23 +96,16 @@
 	 * a 1 minute timeout would do. See the comment before
 	 * nlmclnt_lock for an explanation.
 	 */
-	sleep_on_timeout(&block.b_wait, 30*HZ);
+	ret = wait_event_interruptible_timeout(block->b_wait,
+			block->b_status != NLM_LCK_BLOCKED,
+			timeout);
 
-	list_del(&block.b_list);
-
-	if (!signalled()) {
-		*statp = block.b_status;
-		return 0;
+	if (block->b_status != NLM_LCK_BLOCKED) {
+		req->a_res.status = block->b_status;
+		block->b_status = NLM_LCK_BLOCKED;
 	}
 
-	/* Okay, we were interrupted. Cancel the pending request
-	 * unless the server has rebooted.
-	 */
-	if (pstate == host->h_state && (err = nlmclnt_cancel(host, fl)) < 0)
-		printk(KERN_NOTICE
-			"lockd: CANCEL call failed (errno %d)\n", -err);
-
-	return -ERESTARTSYS;
+	return ret;
 }
 
 /*
@@ -94,27 +115,23 @@
 nlmclnt_grant(struct nlm_lock *lock)
 {
 	struct nlm_wait	*block;
+	u32 res = nlm_lck_denied;
 
 	/*
 	 * Look up blocked request based on arguments. 
 	 * Warning: must not use cookie to match it!
 	 */
 	list_for_each_entry(block, &nlm_blocked, b_list) {
-		if (nlm_compare_locks(block->b_lock, &lock->fl))
-			break;
+		if (nlm_compare_locks(block->b_lock, &lock->fl)) {
+			/* Alright, we found a lock. Set the return status
+			 * and wake up the caller
+			 */
+			block->b_status = NLM_LCK_GRANTED;
+			wake_up(&block->b_wait);
+			res = nlm_granted;
+		}
 	}
-
-	/* Ooops, no blocked request found. */
-	if (block == NULL)
-		return nlm_lck_denied;
-
-	/* Alright, we found the lock. Set the return status and
-	 * wake up the caller.
-	 */
-	block->b_status = NLM_LCK_GRANTED;
-	wake_up(&block->b_wait);
-
-	return nlm_granted;
+	return res;
 }
 
 /*
diff --git a/fs/lockd/clntproc.c b/fs/lockd/clntproc.c
index a440761..fd77ed1 100644
--- a/fs/lockd/clntproc.c
+++ b/fs/lockd/clntproc.c
@@ -21,6 +21,7 @@
 
 #define NLMDBG_FACILITY		NLMDBG_CLIENT
 #define NLMCLNT_GRACE_WAIT	(5*HZ)
+#define NLMCLNT_POLL_TIMEOUT	(30*HZ)
 
 static int	nlmclnt_test(struct nlm_rqst *, struct file_lock *);
 static int	nlmclnt_lock(struct nlm_rqst *, struct file_lock *);
@@ -553,7 +554,8 @@
 {
 	struct nlm_host	*host = req->a_host;
 	struct nlm_res	*resp = &req->a_res;
-	int		status;
+	long timeout;
+	int status;
 
 	if (!host->h_monitored && nsm_monitor(host) < 0) {
 		printk(KERN_NOTICE "lockd: failed to monitor %s\n",
@@ -562,15 +564,32 @@
 		goto out;
 	}
 
-	do {
-		if ((status = nlmclnt_call(req, NLMPROC_LOCK)) >= 0) {
-			if (resp->status != NLM_LCK_BLOCKED)
-				break;
-			status = nlmclnt_block(host, fl, &resp->status);
-		}
+	if (req->a_args.block) {
+		status = nlmclnt_prepare_block(req, host, fl);
 		if (status < 0)
 			goto out;
-	} while (resp->status == NLM_LCK_BLOCKED && req->a_args.block);
+	}
+	for(;;) {
+		status = nlmclnt_call(req, NLMPROC_LOCK);
+		if (status < 0)
+			goto out_unblock;
+		if (resp->status != NLM_LCK_BLOCKED)
+			break;
+		/* Wait on an NLM blocking lock */
+		timeout = nlmclnt_block(req, NLMCLNT_POLL_TIMEOUT);
+		/* Did a reclaimer thread notify us of a server reboot? */
+		if (resp->status ==  NLM_LCK_DENIED_GRACE_PERIOD)
+			continue;
+		if (resp->status != NLM_LCK_BLOCKED)
+			break;
+		if (timeout >= 0)
+			continue;
+		/* We were interrupted. Send a CANCEL request to the server
+		 * and exit
+		 */
+		status = (int)timeout;
+		goto out_unblock;
+	}
 
 	if (resp->status == NLM_LCK_GRANTED) {
 		fl->fl_u.nfs_fl.state = host->h_state;
@@ -579,6 +598,11 @@
 		do_vfs_lock(fl);
 	}
 	status = nlm_stat_to_errno(resp->status);
+out_unblock:
+	nlmclnt_finish_block(req);
+	/* Cancel the blocked request if it is still pending */
+	if (resp->status == NLM_LCK_BLOCKED)
+		nlmclnt_cancel(host, fl);
 out:
 	nlmclnt_release_lockargs(req);
 	return status;
diff --git a/include/linux/lockd/lockd.h b/include/linux/lockd/lockd.h
index 0d9d225..16d4e5a 100644
--- a/include/linux/lockd/lockd.h
+++ b/include/linux/lockd/lockd.h
@@ -72,6 +72,8 @@
 	uint32_t pid;
 };
 
+struct nlm_wait;
+
 /*
  * Memory chunk for NLM client RPC request.
  */
@@ -81,6 +83,7 @@
 	struct nlm_host *	a_host;		/* host handle */
 	struct nlm_args		a_args;		/* arguments */
 	struct nlm_res		a_res;		/* result */
+	struct nlm_wait *	a_block;
 	char			a_owner[NLMCLNT_OHSIZE];
 };
 
@@ -142,7 +145,9 @@
  * Lockd client functions
  */
 struct nlm_rqst * nlmclnt_alloc_call(void);
-int		  nlmclnt_block(struct nlm_host *, struct file_lock *, u32 *);
+int		  nlmclnt_prepare_block(struct nlm_rqst *req, struct nlm_host *host, struct file_lock *fl);
+void		  nlmclnt_finish_block(struct nlm_rqst *req);
+long		  nlmclnt_block(struct nlm_rqst *req, long timeout);
 int		  nlmclnt_cancel(struct nlm_host *, struct file_lock *);
 u32		  nlmclnt_grant(struct nlm_lock *);
 void		  nlmclnt_recovery(struct nlm_host *, u32);