pNFS/flexfiles: When mirrored, retry failed reads by switching mirrors

If the pNFS/flexfiles file is mirrored, and a read to one mirror fails,
then we should bump the mirror index, so that we retry to a different
mirror. Once we've iterated through all mirrors and all failed, we can
return the layout and issue a new LAYOUTGET.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index fbc5a56..7fc14b9 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -741,17 +741,17 @@
 }
 
 static struct nfs4_pnfs_ds *
-ff_layout_choose_best_ds_for_read(struct nfs_pageio_descriptor *pgio,
+ff_layout_choose_best_ds_for_read(struct pnfs_layout_segment *lseg,
+				  int start_idx,
 				  int *best_idx)
 {
-	struct nfs4_ff_layout_segment *fls;
+	struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
 	struct nfs4_pnfs_ds *ds;
 	int idx;
 
-	fls = FF_LAYOUT_LSEG(pgio->pg_lseg);
 	/* mirrors are sorted by efficiency */
-	for (idx = 0; idx < fls->mirror_array_cnt; idx++) {
-		ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, idx, false);
+	for (idx = start_idx; idx < fls->mirror_array_cnt; idx++) {
+		ds = nfs4_ff_layout_prepare_ds(lseg, idx, false);
 		if (ds) {
 			*best_idx = idx;
 			return ds;
@@ -782,7 +782,7 @@
 	if (pgio->pg_lseg == NULL)
 		goto out_mds;
 
-	ds = ff_layout_choose_best_ds_for_read(pgio, &ds_idx);
+	ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx);
 	if (!ds)
 		goto out_mds;
 	mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx);
@@ -1171,6 +1171,10 @@
 
 	switch (err) {
 	case -NFS4ERR_RESET_TO_PNFS:
+		if (ff_layout_choose_best_ds_for_read(hdr->lseg,
+					hdr->pgio_mirror_idx + 1,
+					&hdr->pgio_mirror_idx))
+			goto out_eagain;
 		set_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
 			&hdr->lseg->pls_layout->plh_flags);
 		pnfs_read_resend_pnfs(hdr);
@@ -1179,11 +1183,13 @@
 		ff_layout_reset_read(hdr);
 		return task->tk_status;
 	case -EAGAIN:
-		rpc_restart_call_prepare(task);
-		return -EAGAIN;
+		goto out_eagain;
 	}
 
 	return 0;
+out_eagain:
+	rpc_restart_call_prepare(task);
+	return -EAGAIN;
 }
 
 static bool