NFS: Allow the nfs_pageio_descriptor to signal that a re-coalesce is needed

If an attempt to do pNFS fails, and we have to fall back to writing through
the MDS, then we may want to re-coalesce the requests that we already have
since the block size for the MDS read/writes may be different to that of
the DS read/writes.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index d421e19..7139dbf 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -240,6 +240,7 @@
 	desc->pg_bsize = bsize;
 	desc->pg_base = 0;
 	desc->pg_moreio = 0;
+	desc->pg_recoalesce = 0;
 	desc->pg_inode = inode;
 	desc->pg_ops = pg_ops;
 	desc->pg_ioflags = io_flags;
@@ -331,7 +332,7 @@
  * Returns true if the request 'req' was successfully coalesced into the
  * existing list of pages 'desc'.
  */
-int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
+static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 			   struct nfs_page *req)
 {
 	while (!nfs_pageio_do_add_request(desc, req)) {
@@ -340,17 +341,67 @@
 		if (desc->pg_error < 0)
 			return 0;
 		desc->pg_moreio = 0;
+		if (desc->pg_recoalesce)
+			return 0;
 	}
 	return 1;
 }
 
+static int nfs_do_recoalesce(struct nfs_pageio_descriptor *desc)
+{
+	LIST_HEAD(head);
+
+	do {
+		list_splice_init(&desc->pg_list, &head);
+		desc->pg_bytes_written -= desc->pg_count;
+		desc->pg_count = 0;
+		desc->pg_base = 0;
+		desc->pg_recoalesce = 0;
+
+		while (!list_empty(&head)) {
+			struct nfs_page *req;
+
+			req = list_first_entry(&head, struct nfs_page, wb_list);
+			nfs_list_remove_request(req);
+			if (__nfs_pageio_add_request(desc, req))
+				continue;
+			if (desc->pg_error < 0)
+				return 0;
+			break;
+		}
+	} while (desc->pg_recoalesce);
+	return 1;
+}
+
+int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
+		struct nfs_page *req)
+{
+	int ret;
+
+	do {
+		ret = __nfs_pageio_add_request(desc, req);
+		if (ret)
+			break;
+		if (desc->pg_error < 0)
+			break;
+		ret = nfs_do_recoalesce(desc);
+	} while (ret);
+	return ret;
+}
+
 /**
  * nfs_pageio_complete - Complete I/O on an nfs_pageio_descriptor
  * @desc: pointer to io descriptor
  */
 void nfs_pageio_complete(struct nfs_pageio_descriptor *desc)
 {
-	nfs_pageio_doio(desc);
+	for (;;) {
+		nfs_pageio_doio(desc);
+		if (!desc->pg_recoalesce)
+			break;
+		if (!nfs_do_recoalesce(desc))
+			break;
+	}
 }
 
 /**
@@ -369,7 +420,7 @@
 	if (!list_empty(&desc->pg_list)) {
 		struct nfs_page *prev = nfs_list_entry(desc->pg_list.prev);
 		if (index != prev->wb_index + 1)
-			nfs_pageio_doio(desc);
+			nfs_pageio_complete(desc);
 	}
 }