nfs: add mirroring support to pgio layer

This patch adds mirrored write support to the pgio layer. The default
is to use one mirror, but pgio callers may define callbacks to change
this to any value up to the (arbitrarily selected) limit of 16.

The basic idea is to break out members of nfs_pageio_descriptor that cannot
be shared between mirrored DSes and put them in a new structure.

Signed-off-by: Weston Andros Adamson <dros@primarydata.com>
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 1c03187..eec12b7 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -46,17 +46,22 @@
 		       struct nfs_pgio_header *hdr,
 		       void (*release)(struct nfs_pgio_header *hdr))
 {
-	hdr->req = nfs_list_entry(desc->pg_list.next);
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
+
+
+	hdr->req = nfs_list_entry(mirror->pg_list.next);
 	hdr->inode = desc->pg_inode;
 	hdr->cred = hdr->req->wb_context->cred;
 	hdr->io_start = req_offset(hdr->req);
-	hdr->good_bytes = desc->pg_count;
+	hdr->good_bytes = mirror->pg_count;
 	hdr->dreq = desc->pg_dreq;
 	hdr->layout_private = desc->pg_layout_private;
 	hdr->release = release;
 	hdr->completion_ops = desc->pg_completion_ops;
 	if (hdr->completion_ops->init_hdr)
 		hdr->completion_ops->init_hdr(hdr);
+
+	hdr->pgio_mirror_idx = desc->pg_mirror_idx;
 }
 EXPORT_SYMBOL_GPL(nfs_pgheader_init);
 
@@ -480,7 +485,10 @@
 size_t nfs_generic_pg_test(struct nfs_pageio_descriptor *desc,
 			   struct nfs_page *prev, struct nfs_page *req)
 {
-	if (desc->pg_count > desc->pg_bsize) {
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
+
+
+	if (mirror->pg_count > mirror->pg_bsize) {
 		/* should never happen */
 		WARN_ON_ONCE(1);
 		return 0;
@@ -490,11 +498,11 @@
 	 * Limit the request size so that we can still allocate a page array
 	 * for it without upsetting the slab allocator.
 	 */
-	if (((desc->pg_count + req->wb_bytes) >> PAGE_SHIFT) *
+	if (((mirror->pg_count + req->wb_bytes) >> PAGE_SHIFT) *
 			sizeof(struct page) > PAGE_SIZE)
 		return 0;
 
-	return min(desc->pg_bsize - desc->pg_count, (size_t)req->wb_bytes);
+	return min(mirror->pg_bsize - mirror->pg_count, (size_t)req->wb_bytes);
 }
 EXPORT_SYMBOL_GPL(nfs_generic_pg_test);
 
@@ -651,10 +659,18 @@
 static int nfs_pgio_error(struct nfs_pageio_descriptor *desc,
 			  struct nfs_pgio_header *hdr)
 {
+	struct nfs_pgio_mirror *mirror;
+	u32 midx;
+
 	set_bit(NFS_IOHDR_REDO, &hdr->flags);
 	nfs_pgio_data_destroy(hdr);
 	hdr->completion_ops->completion(hdr);
-	desc->pg_completion_ops->error_cleanup(&desc->pg_list);
+	/* TODO: Make sure it's right to clean up all mirrors here
+	 *       and not just hdr->pgio_mirror_idx */
+	for (midx = 0; midx < desc->pg_mirror_count; midx++) {
+		mirror = &desc->pg_mirrors[midx];
+		desc->pg_completion_ops->error_cleanup(&mirror->pg_list);
+	}
 	return -ENOMEM;
 }
 
@@ -671,6 +687,17 @@
 	hdr->completion_ops->completion(hdr);
 }
 
+static void nfs_pageio_mirror_init(struct nfs_pgio_mirror *mirror,
+				   unsigned int bsize)
+{
+	INIT_LIST_HEAD(&mirror->pg_list);
+	mirror->pg_bytes_written = 0;
+	mirror->pg_count = 0;
+	mirror->pg_bsize = bsize;
+	mirror->pg_base = 0;
+	mirror->pg_recoalesce = 0;
+}
+
 /**
  * nfs_pageio_init - initialise a page io descriptor
  * @desc: pointer to descriptor
@@ -687,13 +714,10 @@
 		     size_t bsize,
 		     int io_flags)
 {
-	INIT_LIST_HEAD(&desc->pg_list);
-	desc->pg_bytes_written = 0;
-	desc->pg_count = 0;
-	desc->pg_bsize = bsize;
-	desc->pg_base = 0;
+	struct nfs_pgio_mirror *new;
+	int i;
+
 	desc->pg_moreio = 0;
-	desc->pg_recoalesce = 0;
 	desc->pg_inode = inode;
 	desc->pg_ops = pg_ops;
 	desc->pg_completion_ops = compl_ops;
@@ -703,6 +727,26 @@
 	desc->pg_lseg = NULL;
 	desc->pg_dreq = NULL;
 	desc->pg_layout_private = NULL;
+	desc->pg_bsize = bsize;
+
+	desc->pg_mirror_count = 1;
+	desc->pg_mirror_idx = 0;
+
+	if (pg_ops->pg_get_mirror_count) {
+		/* until we have a request, we don't have an lseg and no
+		 * idea how many mirrors there will be */
+		new = kcalloc(NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX,
+			      sizeof(struct nfs_pgio_mirror), GFP_KERNEL);
+		desc->pg_mirrors_dynamic = new;
+		desc->pg_mirrors = new;
+
+		for (i = 0; i < NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX; i++)
+			nfs_pageio_mirror_init(&desc->pg_mirrors[i], bsize);
+	} else {
+		desc->pg_mirrors_dynamic = NULL;
+		desc->pg_mirrors = desc->pg_mirrors_static;
+		nfs_pageio_mirror_init(&desc->pg_mirrors[0], bsize);
+	}
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_init);
 
@@ -738,14 +782,16 @@
 int nfs_generic_pgio(struct nfs_pageio_descriptor *desc,
 		     struct nfs_pgio_header *hdr)
 {
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
+
 	struct nfs_page		*req;
 	struct page		**pages,
 				*last_page;
-	struct list_head *head = &desc->pg_list;
+	struct list_head *head = &mirror->pg_list;
 	struct nfs_commit_info cinfo;
 	unsigned int pagecount, pageused;
 
-	pagecount = nfs_page_array_len(desc->pg_base, desc->pg_count);
+	pagecount = nfs_page_array_len(mirror->pg_base, mirror->pg_count);
 	if (!nfs_pgarray_set(&hdr->page_array, pagecount))
 		return nfs_pgio_error(desc, hdr);
 
@@ -773,7 +819,7 @@
 		desc->pg_ioflags &= ~FLUSH_COND_STABLE;
 
 	/* Set up the argument struct */
-	nfs_pgio_rpcsetup(hdr, desc->pg_count, 0, desc->pg_ioflags, &cinfo);
+	nfs_pgio_rpcsetup(hdr, mirror->pg_count, 0, desc->pg_ioflags, &cinfo);
 	desc->pg_rpc_callops = &nfs_pgio_common_ops;
 	return 0;
 }
@@ -781,12 +827,17 @@
 
 static int nfs_generic_pg_pgios(struct nfs_pageio_descriptor *desc)
 {
+	struct nfs_pgio_mirror *mirror;
 	struct nfs_pgio_header *hdr;
 	int ret;
 
+	mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
+
 	hdr = nfs_pgio_header_alloc(desc->pg_rw_ops);
 	if (!hdr) {
-		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
+		/* TODO: make sure this is right with mirroring - or
+		 *       should it back out all mirrors? */
+		desc->pg_completion_ops->error_cleanup(&mirror->pg_list);
 		return -ENOMEM;
 	}
 	nfs_pgheader_init(desc, hdr, nfs_pgio_header_free);
@@ -801,6 +852,49 @@
 	return ret;
 }
 
+/*
+ * nfs_pageio_setup_mirroring - determine if mirroring is to be used
+ *				by calling the pg_get_mirror_count op
+ */
+static int nfs_pageio_setup_mirroring(struct nfs_pageio_descriptor *pgio,
+				       struct nfs_page *req)
+{
+	int mirror_count = 1;
+
+	if (!pgio->pg_ops->pg_get_mirror_count)
+		return 0;
+
+	mirror_count = pgio->pg_ops->pg_get_mirror_count(pgio, req);
+
+	if (!mirror_count || mirror_count > NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX)
+		return -EINVAL;
+
+	if (WARN_ON_ONCE(!pgio->pg_mirrors_dynamic))
+		return -EINVAL;
+
+	pgio->pg_mirror_count = mirror_count;
+
+	return 0;
+}
+
+/*
+ * nfs_pageio_stop_mirroring - stop using mirroring (set mirror count to 1)
+ */
+void nfs_pageio_stop_mirroring(struct nfs_pageio_descriptor *pgio)
+{
+	pgio->pg_mirror_count = 1;
+	pgio->pg_mirror_idx = 0;
+}
+
+static void nfs_pageio_cleanup_mirroring(struct nfs_pageio_descriptor *pgio)
+{
+	pgio->pg_mirror_count = 1;
+	pgio->pg_mirror_idx = 0;
+	pgio->pg_mirrors = pgio->pg_mirrors_static;
+	kfree(pgio->pg_mirrors_dynamic);
+	pgio->pg_mirrors_dynamic = NULL;
+}
+
 static bool nfs_match_open_context(const struct nfs_open_context *ctx1,
 		const struct nfs_open_context *ctx2)
 {
@@ -867,19 +961,22 @@
 static int nfs_pageio_do_add_request(struct nfs_pageio_descriptor *desc,
 				     struct nfs_page *req)
 {
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
+
 	struct nfs_page *prev = NULL;
-	if (desc->pg_count != 0) {
-		prev = nfs_list_entry(desc->pg_list.prev);
+
+	if (mirror->pg_count != 0) {
+		prev = nfs_list_entry(mirror->pg_list.prev);
 	} else {
 		if (desc->pg_ops->pg_init)
 			desc->pg_ops->pg_init(desc, req);
-		desc->pg_base = req->wb_pgbase;
+		mirror->pg_base = req->wb_pgbase;
 	}
 	if (!nfs_can_coalesce_requests(prev, req, desc))
 		return 0;
 	nfs_list_remove_request(req);
-	nfs_list_add_request(req, &desc->pg_list);
-	desc->pg_count += req->wb_bytes;
+	nfs_list_add_request(req, &mirror->pg_list);
+	mirror->pg_count += req->wb_bytes;
 	return 1;
 }
 
@@ -888,16 +985,19 @@
  */
 static void nfs_pageio_doio(struct nfs_pageio_descriptor *desc)
 {
-	if (!list_empty(&desc->pg_list)) {
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
+
+
+	if (!list_empty(&mirror->pg_list)) {
 		int error = desc->pg_ops->pg_doio(desc);
 		if (error < 0)
 			desc->pg_error = error;
 		else
-			desc->pg_bytes_written += desc->pg_count;
+			mirror->pg_bytes_written += mirror->pg_count;
 	}
-	if (list_empty(&desc->pg_list)) {
-		desc->pg_count = 0;
-		desc->pg_base = 0;
+	if (list_empty(&mirror->pg_list)) {
+		mirror->pg_count = 0;
+		mirror->pg_base = 0;
 	}
 }
 
@@ -915,10 +1015,14 @@
 static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 			   struct nfs_page *req)
 {
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
+
 	struct nfs_page *subreq;
 	unsigned int bytes_left = 0;
 	unsigned int offset, pgbase;
 
+	WARN_ON_ONCE(desc->pg_mirror_idx >= desc->pg_mirror_count);
+
 	nfs_page_group_lock(req, false);
 
 	subreq = req;
@@ -938,7 +1042,7 @@
 			nfs_pageio_doio(desc);
 			if (desc->pg_error < 0)
 				return 0;
-			if (desc->pg_recoalesce)
+			if (mirror->pg_recoalesce)
 				return 0;
 			/* retry add_request for this subreq */
 			nfs_page_group_lock(req, false);
@@ -976,14 +1080,16 @@
 
 static int nfs_do_recoalesce(struct nfs_pageio_descriptor *desc)
 {
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[desc->pg_mirror_idx];
 	LIST_HEAD(head);
 
 	do {
-		list_splice_init(&desc->pg_list, &head);
-		desc->pg_bytes_written -= desc->pg_count;
-		desc->pg_count = 0;
-		desc->pg_base = 0;
-		desc->pg_recoalesce = 0;
+		list_splice_init(&mirror->pg_list, &head);
+		mirror->pg_bytes_written -= mirror->pg_count;
+		mirror->pg_count = 0;
+		mirror->pg_base = 0;
+		mirror->pg_recoalesce = 0;
+
 		desc->pg_moreio = 0;
 
 		while (!list_empty(&head)) {
@@ -997,11 +1103,11 @@
 				return 0;
 			break;
 		}
-	} while (desc->pg_recoalesce);
+	} while (mirror->pg_recoalesce);
 	return 1;
 }
 
-int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
+static int nfs_pageio_add_request_mirror(struct nfs_pageio_descriptor *desc,
 		struct nfs_page *req)
 {
 	int ret;
@@ -1014,9 +1120,78 @@
 			break;
 		ret = nfs_do_recoalesce(desc);
 	} while (ret);
+
 	return ret;
 }
 
+int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
+			   struct nfs_page *req)
+{
+	u32 midx;
+	unsigned int pgbase, offset, bytes;
+	struct nfs_page *dupreq, *lastreq;
+
+	pgbase = req->wb_pgbase;
+	offset = req->wb_offset;
+	bytes = req->wb_bytes;
+
+	nfs_pageio_setup_mirroring(desc, req);
+
+	for (midx = 0; midx < desc->pg_mirror_count; midx++) {
+		if (midx) {
+			nfs_page_group_lock(req, false);
+
+			/* find the last request */
+			for (lastreq = req->wb_head;
+			     lastreq->wb_this_page != req->wb_head;
+			     lastreq = lastreq->wb_this_page)
+				;
+
+			dupreq = nfs_create_request(req->wb_context,
+					req->wb_page, lastreq, pgbase, bytes);
+
+			if (IS_ERR(dupreq)) {
+				nfs_page_group_unlock(req);
+				return 0;
+			}
+
+			nfs_lock_request(dupreq);
+			nfs_page_group_unlock(req);
+			dupreq->wb_offset = offset;
+			dupreq->wb_index = req->wb_index;
+		} else
+			dupreq = req;
+
+		desc->pg_mirror_idx = midx;
+		if (!nfs_pageio_add_request_mirror(desc, dupreq))
+			return 0;
+	}
+
+	return 1;
+}
+
+/*
+ * nfs_pageio_complete_mirror - Complete I/O on the current mirror of an
+ *				nfs_pageio_descriptor
+ * @desc: pointer to io descriptor
+ */
+static void nfs_pageio_complete_mirror(struct nfs_pageio_descriptor *desc,
+				       u32 mirror_idx)
+{
+	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[mirror_idx];
+	u32 restore_idx = desc->pg_mirror_idx;
+
+	desc->pg_mirror_idx = mirror_idx;
+	for (;;) {
+		nfs_pageio_doio(desc);
+		if (!mirror->pg_recoalesce)
+			break;
+		if (!nfs_do_recoalesce(desc))
+			break;
+	}
+	desc->pg_mirror_idx = restore_idx;
+}
+
 /*
  * nfs_pageio_resend - Transfer requests to new descriptor and resend
  * @hdr - the pgio header to move request from
@@ -1055,16 +1230,14 @@
  */
 void nfs_pageio_complete(struct nfs_pageio_descriptor *desc)
 {
-	for (;;) {
-		nfs_pageio_doio(desc);
-		if (!desc->pg_recoalesce)
-			break;
-		if (!nfs_do_recoalesce(desc))
-			break;
-	}
+	u32 midx;
+
+	for (midx = 0; midx < desc->pg_mirror_count; midx++)
+		nfs_pageio_complete_mirror(desc, midx);
 
 	if (desc->pg_ops->pg_cleanup)
 		desc->pg_ops->pg_cleanup(desc);
+	nfs_pageio_cleanup_mirroring(desc);
 }
 
 /**
@@ -1080,10 +1253,17 @@
  */
 void nfs_pageio_cond_complete(struct nfs_pageio_descriptor *desc, pgoff_t index)
 {
-	if (!list_empty(&desc->pg_list)) {
-		struct nfs_page *prev = nfs_list_entry(desc->pg_list.prev);
-		if (index != prev->wb_index + 1)
-			nfs_pageio_complete(desc);
+	struct nfs_pgio_mirror *mirror;
+	struct nfs_page *prev;
+	u32 midx;
+
+	for (midx = 0; midx < desc->pg_mirror_count; midx++) {
+		mirror = &desc->pg_mirrors[midx];
+		if (!list_empty(&mirror->pg_list)) {
+			prev = nfs_list_entry(mirror->pg_list.prev);
+			if (index != prev->wb_index + 1)
+				nfs_pageio_complete_mirror(desc, midx);
+		}
 	}
 }