Merge branch 'flexfiles'

* flexfiles: (53 commits)
  pnfs: lookup new lseg at lseg boundary
  nfs41: .init_read and .init_write can be called with valid pg_lseg
  pnfs: Update documentation on the Layout Drivers
  pnfs/flexfiles: Add the FlexFile Layout Driver
  nfs: count DIO good bytes correctly with mirroring
  nfs41: wait for LAYOUTRETURN before retrying LAYOUTGET
  nfs: add a helper to set NFS_ODIRECT_RESCHED_WRITES to direct writes
  nfs41: add NFS_LAYOUT_RETRY_LAYOUTGET to layout header flags
  nfs/flexfiles: send layoutreturn before freeing lseg
  nfs41: introduce NFS_LAYOUT_RETURN_BEFORE_CLOSE
  nfs41: allow async version layoutreturn
  nfs41: add range to layoutreturn args
  pnfs: allow LD to ask to resend read through pnfs
  nfs: add nfs_pgio_current_mirror helper
  nfs: only reset desc->pg_mirror_idx when mirroring is supported
  nfs41: add a debug warning if we destroy an unempty layout
  pnfs: fail comparison when bucket verifier not set
  nfs: mirroring support for direct io
  nfs: add mirroring support to pgio layer
  pnfs: pass ds_commit_idx through the commit path
  ...

Conflicts:
	fs/nfs/pnfs.c
	fs/nfs/pnfs.h
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 4d69076..703501d 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -51,6 +51,10 @@
  */
 static LIST_HEAD(pnfs_modules_tbl);
 
+static int
+pnfs_send_layoutreturn(struct pnfs_layout_hdr *lo, nfs4_stateid stateid,
+		       enum pnfs_iomode iomode, bool sync);
+
 /* Return the registered pnfs layout driver module matching given id */
 static struct pnfs_layoutdriver_type *
 find_pnfs_driver_locked(u32 id)
@@ -239,6 +243,8 @@
 	struct inode *inode = lo->plh_inode;
 
 	if (atomic_dec_and_lock(&lo->plh_refcount, &inode->i_lock)) {
+		if (!list_empty(&lo->plh_segs))
+			WARN_ONCE(1, "NFS: BUG unfreed layout segments.\n");
 		pnfs_detach_layout_hdr(lo);
 		spin_unlock(&inode->i_lock);
 		pnfs_free_layout_hdr(lo);
@@ -338,6 +344,65 @@
 	rpc_wake_up(&NFS_SERVER(inode)->roc_rpcwaitq);
 }
 
+/* Return true if layoutreturn is needed */
+static bool
+pnfs_layout_need_return(struct pnfs_layout_hdr *lo,
+			struct pnfs_layout_segment *lseg)
+{
+	struct pnfs_layout_segment *s;
+
+	if (!test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags))
+		return false;
+
+	list_for_each_entry(s, &lo->plh_segs, pls_list)
+		if (s != lseg && test_bit(NFS_LSEG_LAYOUTRETURN, &s->pls_flags))
+			return false;
+
+	return true;
+}
+
+static void pnfs_layoutreturn_free_lseg(struct work_struct *work)
+{
+	struct pnfs_layout_segment *lseg;
+	struct pnfs_layout_hdr *lo;
+	struct inode *inode;
+
+	lseg = container_of(work, struct pnfs_layout_segment, pls_work);
+	WARN_ON(atomic_read(&lseg->pls_refcount));
+	lo = lseg->pls_layout;
+	inode = lo->plh_inode;
+
+	spin_lock(&inode->i_lock);
+	if (pnfs_layout_need_return(lo, lseg)) {
+		nfs4_stateid stateid;
+		enum pnfs_iomode iomode;
+
+		stateid = lo->plh_stateid;
+		iomode = lo->plh_return_iomode;
+		/* decreased in pnfs_send_layoutreturn() */
+		lo->plh_block_lgets++;
+		lo->plh_return_iomode = 0;
+		spin_unlock(&inode->i_lock);
+
+		pnfs_send_layoutreturn(lo, stateid, iomode, true);
+		spin_lock(&inode->i_lock);
+	} else
+		/* match pnfs_get_layout_hdr #2 in pnfs_put_lseg */
+		pnfs_put_layout_hdr(lo);
+	pnfs_layout_remove_lseg(lo, lseg);
+	spin_unlock(&inode->i_lock);
+	pnfs_free_lseg(lseg);
+	/* match pnfs_get_layout_hdr #1 in pnfs_put_lseg */
+	pnfs_put_layout_hdr(lo);
+}
+
+static void
+pnfs_layoutreturn_free_lseg_async(struct pnfs_layout_segment *lseg)
+{
+	INIT_WORK(&lseg->pls_work, pnfs_layoutreturn_free_lseg);
+	queue_work(nfsiod_workqueue, &lseg->pls_work);
+}
+
 void
 pnfs_put_lseg(struct pnfs_layout_segment *lseg)
 {
@@ -354,10 +419,17 @@
 	inode = lo->plh_inode;
 	if (atomic_dec_and_lock(&lseg->pls_refcount, &inode->i_lock)) {
 		pnfs_get_layout_hdr(lo);
-		pnfs_layout_remove_lseg(lo, lseg);
-		spin_unlock(&inode->i_lock);
-		pnfs_free_lseg(lseg);
-		pnfs_put_layout_hdr(lo);
+		if (pnfs_layout_need_return(lo, lseg)) {
+			spin_unlock(&inode->i_lock);
+			/* hdr reference dropped in nfs4_layoutreturn_release */
+			pnfs_get_layout_hdr(lo);
+			pnfs_layoutreturn_free_lseg_async(lseg);
+		} else {
+			pnfs_layout_remove_lseg(lo, lseg);
+			spin_unlock(&inode->i_lock);
+			pnfs_free_lseg(lseg);
+			pnfs_put_layout_hdr(lo);
+		}
 	}
 }
 EXPORT_SYMBOL_GPL(pnfs_put_lseg);
@@ -544,6 +616,7 @@
 		pnfs_get_layout_hdr(lo);
 		pnfs_layout_clear_fail_bit(lo, NFS_LAYOUT_RO_FAILED);
 		pnfs_layout_clear_fail_bit(lo, NFS_LAYOUT_RW_FAILED);
+		pnfs_clear_retry_layoutget(lo);
 		spin_unlock(&nfsi->vfs_inode.i_lock);
 		pnfs_free_lseg_list(&tmp_list);
 		pnfs_put_layout_hdr(lo);
@@ -741,25 +814,37 @@
 	return !pnfs_seqid_is_newer(seqid, lo->plh_barrier);
 }
 
+static bool
+pnfs_layout_returning(const struct pnfs_layout_hdr *lo,
+		      struct pnfs_layout_range *range)
+{
+	return test_bit(NFS_LAYOUT_RETURN, &lo->plh_flags) &&
+		(lo->plh_return_iomode == IOMODE_ANY ||
+		 lo->plh_return_iomode == range->iomode);
+}
+
 /* lget is set to 1 if called from inside send_layoutget call chain */
 static bool
-pnfs_layoutgets_blocked(const struct pnfs_layout_hdr *lo, int lget)
+pnfs_layoutgets_blocked(const struct pnfs_layout_hdr *lo,
+			struct pnfs_layout_range *range, int lget)
 {
 	return lo->plh_block_lgets ||
 		test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags) ||
 		(list_empty(&lo->plh_segs) &&
-		 (atomic_read(&lo->plh_outstanding) > lget));
+		 (atomic_read(&lo->plh_outstanding) > lget)) ||
+		pnfs_layout_returning(lo, range);
 }
 
 int
 pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
+			      struct pnfs_layout_range *range,
 			      struct nfs4_state *open_state)
 {
 	int status = 0;
 
 	dprintk("--> %s\n", __func__);
 	spin_lock(&lo->plh_inode->i_lock);
-	if (pnfs_layoutgets_blocked(lo, 1)) {
+	if (pnfs_layoutgets_blocked(lo, range, 1)) {
 		status = -EAGAIN;
 	} else if (!nfs4_valid_open_stateid(open_state)) {
 		status = -EBADF;
@@ -826,7 +911,9 @@
 			pnfs_layout_io_set_failed(lo, range->iomode);
 		}
 		return NULL;
-	}
+	} else
+		pnfs_layout_clear_fail_bit(lo,
+				pnfs_iomode_to_fail_bit(range->iomode));
 
 	return lseg;
 }
@@ -846,6 +933,49 @@
 	}
 }
 
+void pnfs_clear_layoutreturn_waitbit(struct pnfs_layout_hdr *lo)
+{
+	clear_bit_unlock(NFS_LAYOUT_RETURN, &lo->plh_flags);
+	smp_mb__after_atomic();
+	wake_up_bit(&lo->plh_flags, NFS_LAYOUT_RETURN);
+}
+
+static int
+pnfs_send_layoutreturn(struct pnfs_layout_hdr *lo, nfs4_stateid stateid,
+		       enum pnfs_iomode iomode, bool sync)
+{
+	struct inode *ino = lo->plh_inode;
+	struct nfs4_layoutreturn *lrp;
+	int status = 0;
+
+	lrp = kzalloc(sizeof(*lrp), GFP_KERNEL);
+	if (unlikely(lrp == NULL)) {
+		status = -ENOMEM;
+		spin_lock(&ino->i_lock);
+		lo->plh_block_lgets--;
+		pnfs_clear_layoutreturn_waitbit(lo);
+		rpc_wake_up(&NFS_SERVER(ino)->roc_rpcwaitq);
+		spin_unlock(&ino->i_lock);
+		pnfs_put_layout_hdr(lo);
+		goto out;
+	}
+
+	lrp->args.stateid = stateid;
+	lrp->args.layout_type = NFS_SERVER(ino)->pnfs_curr_ld->id;
+	lrp->args.inode = ino;
+	lrp->args.range.iomode = iomode;
+	lrp->args.range.offset = 0;
+	lrp->args.range.length = NFS4_MAX_UINT64;
+	lrp->args.layout = lo;
+	lrp->clp = NFS_SERVER(ino)->nfs_client;
+	lrp->cred = lo->plh_lc_cred;
+
+	status = nfs4_proc_layoutreturn(lrp, sync);
+out:
+	dprintk("<-- %s status: %d\n", __func__, status);
+	return status;
+}
+
 /*
  * Initiates a LAYOUTRETURN(FILE), and removes the pnfs_layout_hdr
  * when the layout segment list is empty.
@@ -860,7 +990,6 @@
 	struct pnfs_layout_hdr *lo = NULL;
 	struct nfs_inode *nfsi = NFS_I(ino);
 	LIST_HEAD(tmp_list);
-	struct nfs4_layoutreturn *lrp;
 	nfs4_stateid stateid;
 	int status = 0, empty;
 
@@ -902,24 +1031,7 @@
 	spin_unlock(&ino->i_lock);
 	pnfs_free_lseg_list(&tmp_list);
 
-	lrp = kzalloc(sizeof(*lrp), GFP_KERNEL);
-	if (unlikely(lrp == NULL)) {
-		status = -ENOMEM;
-		spin_lock(&ino->i_lock);
-		lo->plh_block_lgets--;
-		spin_unlock(&ino->i_lock);
-		pnfs_put_layout_hdr(lo);
-		goto out;
-	}
-
-	lrp->args.stateid = stateid;
-	lrp->args.layout_type = NFS_SERVER(ino)->pnfs_curr_ld->id;
-	lrp->args.inode = ino;
-	lrp->args.layout = lo;
-	lrp->clp = NFS_SERVER(ino)->nfs_client;
-	lrp->cred = lo->plh_lc_cred;
-
-	status = nfs4_proc_layoutreturn(lrp);
+	status = pnfs_send_layoutreturn(lo, stateid, IOMODE_ANY, true);
 out:
 	dprintk("<-- %s status: %d\n", __func__, status);
 	return status;
@@ -960,8 +1072,9 @@
 	struct nfs4_state *state;
 	struct pnfs_layout_hdr *lo;
 	struct pnfs_layout_segment *lseg, *tmp;
+	nfs4_stateid stateid;
 	LIST_HEAD(tmp_list);
-	bool found = false;
+	bool found = false, layoutreturn = false;
 
 	spin_lock(&ino->i_lock);
 	lo = nfsi->layout;
@@ -980,6 +1093,8 @@
 			goto out_noroc;
 	}
 
+		goto out_noroc;
+	pnfs_clear_retry_layoutget(lo);
 	list_for_each_entry_safe(lseg, tmp, &lo->plh_segs, pls_list)
 		if (test_bit(NFS_LSEG_ROC, &lseg->pls_flags)) {
 			mark_lseg_invalid(lseg, &tmp_list);
@@ -994,7 +1109,19 @@
 	return true;
 
 out_noroc:
+	if (lo) {
+		stateid = lo->plh_stateid;
+		layoutreturn =
+			test_and_clear_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
+					   &lo->plh_flags);
+		if (layoutreturn) {
+			lo->plh_block_lgets++;
+			pnfs_get_layout_hdr(lo);
+		}
+	}
 	spin_unlock(&ino->i_lock);
+	if (layoutreturn)
+		pnfs_send_layoutreturn(lo, stateid, IOMODE_ANY, true);
 	return false;
 }
 
@@ -1029,8 +1156,9 @@
 	struct nfs_inode *nfsi = NFS_I(ino);
 	struct pnfs_layout_hdr *lo;
 	struct pnfs_layout_segment *lseg;
+	nfs4_stateid stateid;
 	u32 current_seqid;
-	bool found = false;
+	bool found = false, layoutreturn = false;
 
 	spin_lock(&ino->i_lock);
 	list_for_each_entry(lseg, &nfsi->layout->plh_segs, pls_list)
@@ -1047,7 +1175,21 @@
 	 */
 	*barrier = current_seqid + atomic_read(&lo->plh_outstanding);
 out:
+	if (!found) {
+		stateid = lo->plh_stateid;
+		layoutreturn =
+			test_and_clear_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
+					   &lo->plh_flags);
+		if (layoutreturn) {
+			lo->plh_block_lgets++;
+			pnfs_get_layout_hdr(lo);
+		}
+	}
 	spin_unlock(&ino->i_lock);
+	if (layoutreturn) {
+		rpc_sleep_on(&NFS_SERVER(ino)->roc_rpcwaitq, task, NULL);
+		pnfs_send_layoutreturn(lo, stateid, IOMODE_ANY, false);
+	}
 	return found;
 }
 
@@ -1194,6 +1336,7 @@
 
 	list_for_each_entry(lseg, &lo->plh_segs, pls_list) {
 		if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags) &&
+		    !test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags) &&
 		    pnfs_lseg_range_match(&lseg->pls_range, range)) {
 			ret = pnfs_get_lseg(lseg);
 			break;
@@ -1282,6 +1425,35 @@
 	return ret;
 }
 
+/* stop waiting if someone clears NFS_LAYOUT_RETRY_LAYOUTGET bit. */
+static int pnfs_layoutget_retry_bit_wait(struct wait_bit_key *key)
+{
+	if (!test_bit(NFS_LAYOUT_RETRY_LAYOUTGET, key->flags))
+		return 1;
+	return nfs_wait_bit_killable(key);
+}
+
+static bool pnfs_prepare_to_retry_layoutget(struct pnfs_layout_hdr *lo)
+{
+	/*
+	 * send layoutcommit as it can hold up layoutreturn due to lseg
+	 * reference
+	 */
+	pnfs_layoutcommit_inode(lo->plh_inode, false);
+	return !wait_on_bit_action(&lo->plh_flags, NFS_LAYOUT_RETURN,
+				   pnfs_layoutget_retry_bit_wait,
+				   TASK_UNINTERRUPTIBLE);
+}
+
+static void pnfs_clear_first_layoutget(struct pnfs_layout_hdr *lo)
+{
+	unsigned long *bitlock = &lo->plh_flags;
+
+	clear_bit_unlock(NFS_LAYOUT_FIRST_LAYOUTGET, bitlock);
+	smp_mb__after_atomic();
+	wake_up_bit(bitlock, NFS_LAYOUT_FIRST_LAYOUTGET);
+}
+
 /*
  * Layout segment is retreived from the server if not cached.
  * The appropriate layout segment is referenced and returned to the caller.
@@ -1312,6 +1484,8 @@
 	if (pnfs_within_mdsthreshold(ctx, ino, iomode))
 		goto out;
 
+lookup_again:
+	first = false;
 	spin_lock(&ino->i_lock);
 	lo = pnfs_find_alloc_layout(ino, ctx, gfp_flags);
 	if (lo == NULL) {
@@ -1326,27 +1500,62 @@
 	}
 
 	/* if LAYOUTGET already failed once we don't try again */
-	if (pnfs_layout_io_test_failed(lo, iomode))
+	if (pnfs_layout_io_test_failed(lo, iomode) &&
+	    !pnfs_should_retry_layoutget(lo))
 		goto out_unlock;
 
-	/* Check to see if the layout for the given range already exists */
-	lseg = pnfs_find_lseg(lo, &arg);
-	if (lseg)
-		goto out_unlock;
+	first = list_empty(&lo->plh_segs);
+	if (first) {
+		/* The first layoutget for the file. Need to serialize per
+		 * RFC 5661 Errata 3208.
+		 */
+		if (test_and_set_bit(NFS_LAYOUT_FIRST_LAYOUTGET,
+				     &lo->plh_flags)) {
+			spin_unlock(&ino->i_lock);
+			wait_on_bit(&lo->plh_flags, NFS_LAYOUT_FIRST_LAYOUTGET,
+				    TASK_UNINTERRUPTIBLE);
+			pnfs_put_layout_hdr(lo);
+			goto lookup_again;
+		}
+	} else {
+		/* Check to see if the layout for the given range
+		 * already exists
+		 */
+		lseg = pnfs_find_lseg(lo, &arg);
+		if (lseg)
+			goto out_unlock;
+	}
 
-	if (pnfs_layoutgets_blocked(lo, 0))
+	/*
+	 * Because we free lsegs before sending LAYOUTRETURN, we need to wait
+	 * for LAYOUTRETURN even if first is true.
+	 */
+	if (!lseg && pnfs_should_retry_layoutget(lo) &&
+	    test_bit(NFS_LAYOUT_RETURN, &lo->plh_flags)) {
+		spin_unlock(&ino->i_lock);
+		dprintk("%s wait for layoutreturn\n", __func__);
+		if (pnfs_prepare_to_retry_layoutget(lo)) {
+			if (first)
+				pnfs_clear_first_layoutget(lo);
+			pnfs_put_layout_hdr(lo);
+			dprintk("%s retrying\n", __func__);
+			goto lookup_again;
+		}
+		goto out_put_layout_hdr;
+	}
+
+	if (pnfs_layoutgets_blocked(lo, &arg, 0))
 		goto out_unlock;
 	atomic_inc(&lo->plh_outstanding);
-
-	first = list_empty(&lo->plh_layouts) ? true : false;
 	spin_unlock(&ino->i_lock);
 
-	if (first) {
+	if (list_empty(&lo->plh_layouts)) {
 		/* The lo must be on the clp list if there is any
 		 * chance of a CB_LAYOUTRECALL(FILE) coming in.
 		 */
 		spin_lock(&clp->cl_lock);
-		list_add_tail(&lo->plh_layouts, &server->layouts);
+		if (list_empty(&lo->plh_layouts))
+			list_add_tail(&lo->plh_layouts, &server->layouts);
 		spin_unlock(&clp->cl_lock);
 	}
 
@@ -1359,8 +1568,11 @@
 		arg.length = PAGE_CACHE_ALIGN(arg.length);
 
 	lseg = send_layoutget(lo, ctx, &arg, gfp_flags);
+	pnfs_clear_retry_layoutget(lo);
 	atomic_dec(&lo->plh_outstanding);
 out_put_layout_hdr:
+	if (first)
+		pnfs_clear_first_layoutget(lo);
 	pnfs_put_layout_hdr(lo);
 out:
 	dprintk("%s: inode %s/%llu pNFS layout segment %s for "
@@ -1409,7 +1621,7 @@
 		goto out_forget_reply;
 	}
 
-	if (pnfs_layoutgets_blocked(lo, 1)) {
+	if (pnfs_layoutgets_blocked(lo, &lgp->args.range, 1)) {
 		dprintk("%s forget reply due to state\n", __func__);
 		goto out_forget_reply;
 	}
@@ -1456,24 +1668,79 @@
 	goto out;
 }
 
+static void
+pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
+				struct list_head *tmp_list,
+				struct pnfs_layout_range *return_range)
+{
+	struct pnfs_layout_segment *lseg, *next;
+
+	dprintk("%s:Begin lo %p\n", __func__, lo);
+
+	if (list_empty(&lo->plh_segs))
+		return;
+
+	list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list)
+		if (should_free_lseg(&lseg->pls_range, return_range)) {
+			dprintk("%s: marking lseg %p iomode %d "
+				"offset %llu length %llu\n", __func__,
+				lseg, lseg->pls_range.iomode,
+				lseg->pls_range.offset,
+				lseg->pls_range.length);
+			set_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags);
+			mark_lseg_invalid(lseg, tmp_list);
+		}
+}
+
+void pnfs_error_mark_layout_for_return(struct inode *inode,
+				       struct pnfs_layout_segment *lseg)
+{
+	struct pnfs_layout_hdr *lo = NFS_I(inode)->layout;
+	int iomode = pnfs_iomode_to_fail_bit(lseg->pls_range.iomode);
+	struct pnfs_layout_range range = {
+		.iomode = lseg->pls_range.iomode,
+		.offset = 0,
+		.length = NFS4_MAX_UINT64,
+	};
+	LIST_HEAD(free_me);
+
+	spin_lock(&inode->i_lock);
+	/* set failure bit so that pnfs path will be retried later */
+	pnfs_layout_set_fail_bit(lo, iomode);
+	set_bit(NFS_LAYOUT_RETURN, &lo->plh_flags);
+	if (lo->plh_return_iomode == 0)
+		lo->plh_return_iomode = range.iomode;
+	else if (lo->plh_return_iomode != range.iomode)
+		lo->plh_return_iomode = IOMODE_ANY;
+	/*
+	 * mark all matching lsegs so that we are sure to have no live
+	 * segments at hand when sending layoutreturn. See pnfs_put_lseg()
+	 * for how it works.
+	 */
+	pnfs_mark_matching_lsegs_return(lo, &free_me, &range);
+	spin_unlock(&inode->i_lock);
+	pnfs_free_lseg_list(&free_me);
+}
+EXPORT_SYMBOL_GPL(pnfs_error_mark_layout_for_return);
+
 void
 pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 {
 	u64 rd_size = req->wb_bytes;
 
-	WARN_ON_ONCE(pgio->pg_lseg != NULL);
+	if (pgio->pg_lseg == NULL) {
+		if (pgio->pg_dreq == NULL)
+			rd_size = i_size_read(pgio->pg_inode) - req_offset(req);
+		else
+			rd_size = nfs_dreq_bytes_left(pgio->pg_dreq);
 
-	if (pgio->pg_dreq == NULL)
-		rd_size = i_size_read(pgio->pg_inode) - req_offset(req);
-	else
-		rd_size = nfs_dreq_bytes_left(pgio->pg_dreq);
-
-	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
-					   req->wb_context,
-					   req_offset(req),
-					   rd_size,
-					   IOMODE_READ,
-					   GFP_KERNEL);
+		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
+						   req->wb_context,
+						   req_offset(req),
+						   rd_size,
+						   IOMODE_READ,
+						   GFP_KERNEL);
+	}
 	/* If no lseg, fall back to read through mds */
 	if (pgio->pg_lseg == NULL)
 		nfs_pageio_reset_read_mds(pgio);
@@ -1485,27 +1752,36 @@
 pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio,
 			   struct nfs_page *req, u64 wb_size)
 {
-	WARN_ON_ONCE(pgio->pg_lseg != NULL);
-
-	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
-					   req->wb_context,
-					   req_offset(req),
-					   wb_size,
-					   IOMODE_RW,
-					   GFP_NOFS);
+	if (pgio->pg_lseg == NULL)
+		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
+						   req->wb_context,
+						   req_offset(req),
+						   wb_size,
+						   IOMODE_RW,
+						   GFP_NOFS);
 	/* If no lseg, fall back to write through mds */
 	if (pgio->pg_lseg == NULL)
 		nfs_pageio_reset_write_mds(pgio);
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_pg_init_write);
 
+void
+pnfs_generic_pg_cleanup(struct nfs_pageio_descriptor *desc)
+{
+	if (desc->pg_lseg) {
+		pnfs_put_lseg(desc->pg_lseg);
+		desc->pg_lseg = NULL;
+	}
+}
+EXPORT_SYMBOL_GPL(pnfs_generic_pg_cleanup);
+
 /*
  * Return 0 if @req cannot be coalesced into @pgio, otherwise return the number
  * of bytes (maximum @req->wb_bytes) that can be coalesced.
  */
 size_t
-pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
-		     struct nfs_page *req)
+pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio,
+		     struct nfs_page *prev, struct nfs_page *req)
 {
 	unsigned int size;
 	u64 seg_end, req_start, seg_left;
@@ -1529,10 +1805,16 @@
 		seg_end = end_offset(pgio->pg_lseg->pls_range.offset,
 				     pgio->pg_lseg->pls_range.length);
 		req_start = req_offset(req);
-		WARN_ON_ONCE(req_start > seg_end);
+		WARN_ON_ONCE(req_start >= seg_end);
 		/* start of request is past the last byte of this segment */
-		if (req_start >= seg_end)
+		if (req_start >= seg_end) {
+			/* reference the new lseg */
+			if (pgio->pg_ops->pg_cleanup)
+				pgio->pg_ops->pg_cleanup(pgio);
+			if (pgio->pg_ops->pg_init)
+				pgio->pg_ops->pg_init(pgio, req);
 			return 0;
+		}
 
 		/* adjust 'size' iff there are fewer bytes left in the
 		 * segment than what nfs_generic_pg_test returned */
@@ -1587,10 +1869,12 @@
 pnfs_write_through_mds(struct nfs_pageio_descriptor *desc,
 		struct nfs_pgio_header *hdr)
 {
+	struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
 	if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
-		list_splice_tail_init(&hdr->pages, &desc->pg_list);
+		list_splice_tail_init(&hdr->pages, &mirror->pg_list);
 		nfs_pageio_reset_write_mds(desc);
-		desc->pg_recoalesce = 1;
+		mirror->pg_recoalesce = 1;
 	}
 	nfs_pgio_data_destroy(hdr);
 }
@@ -1624,11 +1908,9 @@
 	struct pnfs_layout_segment *lseg = desc->pg_lseg;
 	enum pnfs_try_status trypnfs;
 
-	desc->pg_lseg = NULL;
 	trypnfs = pnfs_try_to_write_data(hdr, call_ops, lseg, how);
 	if (trypnfs == PNFS_NOT_ATTEMPTED)
 		pnfs_write_through_mds(desc, hdr);
-	pnfs_put_lseg(lseg);
 }
 
 static void pnfs_writehdr_free(struct nfs_pgio_header *hdr)
@@ -1641,24 +1923,23 @@
 int
 pnfs_generic_pg_writepages(struct nfs_pageio_descriptor *desc)
 {
+	struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
 	struct nfs_pgio_header *hdr;
 	int ret;
 
 	hdr = nfs_pgio_header_alloc(desc->pg_rw_ops);
 	if (!hdr) {
-		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-		pnfs_put_lseg(desc->pg_lseg);
-		desc->pg_lseg = NULL;
+		desc->pg_completion_ops->error_cleanup(&mirror->pg_list);
 		return -ENOMEM;
 	}
 	nfs_pgheader_init(desc, hdr, pnfs_writehdr_free);
+
 	hdr->lseg = pnfs_get_lseg(desc->pg_lseg);
 	ret = nfs_generic_pgio(desc, hdr);
-	if (ret != 0) {
-		pnfs_put_lseg(desc->pg_lseg);
-		desc->pg_lseg = NULL;
-	} else
+	if (!ret)
 		pnfs_do_write(desc, hdr, desc->pg_ioflags);
+
 	return ret;
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_pg_writepages);
@@ -1703,10 +1984,12 @@
 pnfs_read_through_mds(struct nfs_pageio_descriptor *desc,
 		struct nfs_pgio_header *hdr)
 {
+	struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
 	if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
-		list_splice_tail_init(&hdr->pages, &desc->pg_list);
+		list_splice_tail_init(&hdr->pages, &mirror->pg_list);
 		nfs_pageio_reset_read_mds(desc);
-		desc->pg_recoalesce = 1;
+		mirror->pg_recoalesce = 1;
 	}
 	nfs_pgio_data_destroy(hdr);
 }
@@ -1735,18 +2018,29 @@
 	return trypnfs;
 }
 
+/* Resend all requests through pnfs. */
+int pnfs_read_resend_pnfs(struct nfs_pgio_header *hdr)
+{
+	struct nfs_pageio_descriptor pgio;
+
+	nfs_pageio_init_read(&pgio, hdr->inode, false, hdr->completion_ops);
+	return nfs_pageio_resend(&pgio, hdr);
+}
+EXPORT_SYMBOL_GPL(pnfs_read_resend_pnfs);
+
 static void
 pnfs_do_read(struct nfs_pageio_descriptor *desc, struct nfs_pgio_header *hdr)
 {
 	const struct rpc_call_ops *call_ops = desc->pg_rpc_callops;
 	struct pnfs_layout_segment *lseg = desc->pg_lseg;
 	enum pnfs_try_status trypnfs;
+	int err = 0;
 
-	desc->pg_lseg = NULL;
 	trypnfs = pnfs_try_to_read_data(hdr, call_ops, lseg);
-	if (trypnfs == PNFS_NOT_ATTEMPTED)
+	if (trypnfs == PNFS_TRY_AGAIN)
+		err = pnfs_read_resend_pnfs(hdr);
+	if (trypnfs == PNFS_NOT_ATTEMPTED || err)
 		pnfs_read_through_mds(desc, hdr);
-	pnfs_put_lseg(lseg);
 }
 
 static void pnfs_readhdr_free(struct nfs_pgio_header *hdr)
@@ -1759,24 +2053,20 @@
 int
 pnfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc)
 {
+	struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
 	struct nfs_pgio_header *hdr;
 	int ret;
 
 	hdr = nfs_pgio_header_alloc(desc->pg_rw_ops);
 	if (!hdr) {
-		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-		ret = -ENOMEM;
-		pnfs_put_lseg(desc->pg_lseg);
-		desc->pg_lseg = NULL;
-		return ret;
+		desc->pg_completion_ops->error_cleanup(&mirror->pg_list);
+		return -ENOMEM;
 	}
 	nfs_pgheader_init(desc, hdr, pnfs_readhdr_free);
 	hdr->lseg = pnfs_get_lseg(desc->pg_lseg);
 	ret = nfs_generic_pgio(desc, hdr);
-	if (ret != 0) {
-		pnfs_put_lseg(desc->pg_lseg);
-		desc->pg_lseg = NULL;
-	} else
+	if (!ret)
 		pnfs_do_read(desc, hdr);
 	return ret;
 }
@@ -1982,6 +2272,7 @@
 	pnfs_clear_layoutcommitting(inode);
 	goto out;
 }
+EXPORT_SYMBOL_GPL(pnfs_layoutcommit_inode);
 
 struct nfs4_threshold *pnfs_mdsthreshold_alloc(void)
 {