pnfs: add CB_LAYOUTRECALL handling

This is the heart of the wave 2 submission.  Add the code to trigger
drain and forget of any afected layouts.  In addition, we set a
"barrier", below which any LAYOUTGET reply is ignored.  This is to
compensate for the fact that we do not wait for outstanding LAYOUTGETs
to complete as per section 12.5.5.2.1 of RFC 5661.

Signed-off-by: Fred Isaman <iisaman@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 32b6646..bf4186b 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -178,7 +178,7 @@
  */
 
 /* Need to hold i_lock if caller does not already hold reference */
-static void
+void
 get_layout_hdr(struct pnfs_layout_hdr *lo)
 {
 	atomic_inc(&lo->plh_refcount);
@@ -254,6 +254,7 @@
 			/* List does not take a reference, so no need for put here */
 			list_del_init(&lseg->pls_layout->plh_layouts);
 			spin_unlock(&clp->cl_lock);
+			clear_bit(NFS_LAYOUT_BULK_RECALL, &lseg->pls_layout->plh_flags);
 		}
 		list_add(&lseg->pls_list, tmp_list);
 		return 1;
@@ -287,7 +288,7 @@
 /* Returns count of number of matching invalid lsegs remaining in list
  * after call.
  */
-static int
+int
 mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo,
 			    struct list_head *tmp_list,
 			    u32 iomode)
@@ -310,7 +311,7 @@
 	return invalid - removed;
 }
 
-static void
+void
 pnfs_free_lseg_list(struct list_head *free_me)
 {
 	struct pnfs_layout_segment *lseg, *tmp;
@@ -363,23 +364,45 @@
 }
 
 /* update lo->plh_stateid with new if is more recent */
-static void
-pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
-			const nfs4_stateid *new)
+void
+pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
+			bool update_barrier)
 {
 	u32 oldseq, newseq;
 
 	oldseq = be32_to_cpu(lo->plh_stateid.stateid.seqid);
 	newseq = be32_to_cpu(new->stateid.seqid);
-	if ((int)(newseq - oldseq) > 0)
+	if ((int)(newseq - oldseq) > 0) {
 		memcpy(&lo->plh_stateid, &new->stateid, sizeof(new->stateid));
+		if (update_barrier) {
+			u32 new_barrier = be32_to_cpu(new->stateid.seqid);
+
+			if ((int)(new_barrier - lo->plh_barrier))
+				lo->plh_barrier = new_barrier;
+		} else {
+			/* Because of wraparound, we want to keep the barrier
+			 * "close" to the current seqids.  It needs to be
+			 * within 2**31 to count as "behind", so if it
+			 * gets too near that limit, give us a litle leeway
+			 * and bring it to within 2**30.
+			 * NOTE - and yes, this is all unsigned arithmetic.
+			 */
+			if (unlikely((newseq - lo->plh_barrier) > (3 << 29)))
+				lo->plh_barrier = newseq - (1 << 30);
+		}
+	}
 }
 
 /* lget is set to 1 if called from inside send_layoutget call chain */
 static bool
-pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, int lget)
+pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, nfs4_stateid *stateid,
+			int lget)
 {
-	return (list_empty(&lo->plh_segs) &&
+	if ((stateid) &&
+	    (int)(lo->plh_barrier - be32_to_cpu(stateid->stateid.seqid)) >= 0)
+		return true;
+	return test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags) ||
+		(list_empty(&lo->plh_segs) &&
 		 (atomic_read(&lo->plh_outstanding) > lget));
 }
 
@@ -391,7 +414,7 @@
 
 	dprintk("--> %s\n", __func__);
 	spin_lock(&lo->plh_inode->i_lock);
-	if (pnfs_layoutgets_blocked(lo, 1)) {
+	if (pnfs_layoutgets_blocked(lo, NULL, 1)) {
 		status = -EAGAIN;
 	} else if (list_empty(&lo->plh_segs)) {
 		int seq;
@@ -510,6 +533,7 @@
 	atomic_set(&lo->plh_refcount, 1);
 	INIT_LIST_HEAD(&lo->plh_layouts);
 	INIT_LIST_HEAD(&lo->plh_segs);
+	INIT_LIST_HEAD(&lo->plh_bulk_recall);
 	lo->plh_inode = ino;
 	return lo;
 }
@@ -561,7 +585,7 @@
  * lookup range in layout
  */
 static struct pnfs_layout_segment *
-pnfs_has_layout(struct pnfs_layout_hdr *lo, u32 iomode)
+pnfs_find_lseg(struct pnfs_layout_hdr *lo, u32 iomode)
 {
 	struct pnfs_layout_segment *lseg, *ret = NULL;
 
@@ -606,19 +630,22 @@
 		goto out_unlock;
 	}
 
-	/* Check to see if the layout for the given range already exists */
-	lseg = pnfs_has_layout(lo, iomode);
-	if (lseg) {
-		dprintk("%s: Using cached lseg %p for iomode %d)\n",
-			__func__, lseg, iomode);
+	/* Do we even need to bother with this? */
+	if (test_bit(NFS4CLNT_LAYOUTRECALL, &clp->cl_state) ||
+	    test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags)) {
+		dprintk("%s matches recall, use MDS\n", __func__);
 		goto out_unlock;
 	}
+	/* Check to see if the layout for the given range already exists */
+	lseg = pnfs_find_lseg(lo, iomode);
+	if (lseg)
+		goto out_unlock;
 
 	/* if LAYOUTGET already failed once we don't try again */
 	if (test_bit(lo_fail_bit(iomode), &nfsi->layout->plh_flags))
 		goto out_unlock;
 
-	if (pnfs_layoutgets_blocked(lo, 0))
+	if (pnfs_layoutgets_blocked(lo, NULL, 0))
 		goto out_unlock;
 	atomic_inc(&lo->plh_outstanding);
 
@@ -641,6 +668,7 @@
 			spin_lock(&clp->cl_lock);
 			list_del_init(&lo->plh_layouts);
 			spin_unlock(&clp->cl_lock);
+			clear_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
 		}
 		spin_unlock(&ino->i_lock);
 	}
@@ -662,6 +690,7 @@
 	struct nfs4_layoutget_res *res = &lgp->res;
 	struct pnfs_layout_segment *lseg;
 	struct inode *ino = lo->plh_inode;
+	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
 	int status = 0;
 
 	/* Verify we got what we asked for.
@@ -688,16 +717,32 @@
 	}
 
 	spin_lock(&ino->i_lock);
+	if (test_bit(NFS4CLNT_LAYOUTRECALL, &clp->cl_state) ||
+	    test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags)) {
+		dprintk("%s forget reply due to recall\n", __func__);
+		goto out_forget_reply;
+	}
+
+	if (pnfs_layoutgets_blocked(lo, &res->stateid, 1)) {
+		dprintk("%s forget reply due to state\n", __func__);
+		goto out_forget_reply;
+	}
 	init_lseg(lo, lseg);
 	lseg->pls_range = res->range;
 	*lgp->lsegpp = lseg;
 	pnfs_insert_layout(lo, lseg);
 
 	/* Done processing layoutget. Set the layout stateid */
-	pnfs_set_layout_stateid(lo, &res->stateid);
+	pnfs_set_layout_stateid(lo, &res->stateid, false);
 	spin_unlock(&ino->i_lock);
 out:
 	return status;
+
+out_forget_reply:
+	spin_unlock(&ino->i_lock);
+	lseg->pls_layout = lo;
+	NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
+	goto out;
 }
 
 /*