pnfs: serialize LAYOUTGET(openstateid)

We shouldn't send a LAYOUTGET(openstateid) unless all outstanding RPCs
using the previous stateid are completed.  This requires choosing the
stateid to encode earlier, so we can abort if one is not available (we
want to use the open stateid, but a LAYOUTGET is already out using
it), and adding a count of the number of outstanding rpc calls using
layout state (which for now consist solely of LAYOUTGETs).

Signed-off-by: Fred Isaman <iisaman@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 5bee453..a3549ce 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -5304,6 +5304,12 @@
 	if (nfs4_setup_sequence(server, &lgp->args.seq_args,
 				&lgp->res.seq_res, 0, task))
 		return;
+	if (pnfs_choose_layoutget_stateid(&lgp->args.stateid,
+					  NFS_I(lgp->args.inode)->layout,
+					  lgp->args.ctx->state)) {
+		rpc_exit(task, NFS4_OK);
+		return;
+	}
 	rpc_call_start(task);
 }
 
@@ -5338,7 +5344,6 @@
 	struct nfs4_layoutget *lgp = calldata;
 
 	dprintk("--> %s\n", __func__);
-	put_layout_hdr(lgp->args.inode);
 	if (lgp->res.layout.buf != NULL)
 		free_page((unsigned long) lgp->res.layout.buf);
 	put_nfs_open_context(lgp->args.ctx);
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 4e28242..3cbdd0c 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -1787,7 +1787,6 @@
 		      const struct nfs4_layoutget_args *args,
 		      struct compound_hdr *hdr)
 {
-	nfs4_stateid stateid;
 	__be32 *p;
 
 	p = reserve_space(xdr, 44 + NFS4_STATEID_SIZE);
@@ -1798,9 +1797,7 @@
 	p = xdr_encode_hyper(p, args->range.offset);
 	p = xdr_encode_hyper(p, args->range.length);
 	p = xdr_encode_hyper(p, args->minlength);
-	pnfs_choose_layoutget_stateid(&stateid, NFS_I(args->inode)->layout,
-				args->ctx->state);
-	p = xdr_encode_opaque_fixed(p, &stateid.data, NFS4_STATEID_SIZE);
+	p = xdr_encode_opaque_fixed(p, &args->stateid.data, NFS4_STATEID_SIZE);
 	*p = cpu_to_be32(args->maxcount);
 
 	dprintk("%s: 1st type:0x%x iomode:%d off:%lu len:%lu mc:%d\n",
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 212cbc2..59ed68b 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -371,6 +371,14 @@
 		memcpy(&lo->plh_stateid, &new->stateid, sizeof(new->stateid));
 }
 
+/* lget is set to 1 if called from inside send_layoutget call chain */
+static bool
+pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, int lget)
+{
+	return (list_empty(&lo->plh_segs) &&
+		 (atomic_read(&lo->plh_outstanding) > lget));
+}
+
 int
 pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
 			      struct nfs4_state *open_state)
@@ -379,7 +387,9 @@
 
 	dprintk("--> %s\n", __func__);
 	spin_lock(&lo->plh_inode->i_lock);
-	if (list_empty(&lo->plh_segs)) {
+	if (pnfs_layoutgets_blocked(lo, 1)) {
+		status = -EAGAIN;
+	} else if (list_empty(&lo->plh_segs)) {
 		int seq;
 
 		do {
@@ -414,10 +424,8 @@
 
 	BUG_ON(ctx == NULL);
 	lgp = kzalloc(sizeof(*lgp), GFP_KERNEL);
-	if (lgp == NULL) {
-		put_layout_hdr(lo->plh_inode);
+	if (lgp == NULL)
 		return NULL;
-	}
 	lgp->args.minlength = NFS4_MAX_UINT64;
 	lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE;
 	lgp->args.range.iomode = iomode;
@@ -613,10 +621,16 @@
 	if (test_bit(lo_fail_bit(iomode), &nfsi->layout->plh_flags))
 		goto out_unlock;
 
-	get_layout_hdr_locked(lo); /* Matched in nfs4_layoutget_release */
+	if (pnfs_layoutgets_blocked(lo, 0))
+		goto out_unlock;
+	atomic_inc(&lo->plh_outstanding);
+
+	get_layout_hdr_locked(lo);
 	spin_unlock(&ino->i_lock);
 
 	lseg = send_layoutget(lo, ctx, iomode);
+	atomic_dec(&lo->plh_outstanding);
+	put_layout_hdr(ino);
 out:
 	dprintk("%s end, state 0x%lx lseg %p\n", __func__,
 		nfsi->layout->plh_flags, lseg);
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 787253e..698380d 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -69,6 +69,7 @@
 	struct list_head	plh_layouts;   /* other client layouts */
 	struct list_head	plh_segs;      /* layout segments list */
 	nfs4_stateid		plh_stateid;
+	atomic_t		plh_outstanding; /* number of RPCs out */
 	unsigned long		plh_flags;
 	struct inode		*plh_inode;
 };