ore: Only IO one group at a time (API change)

Usually a single IO is confined to one group of devices
(group_width) and at the boundary of a raid group it can
spill into a second group. Current code would allocate a
full device_table size array at each io_state so it can
comply to requests that span two groups. Needless to say
that is very wasteful, specially when device_table count
can get very large (hundreds even thousands), while a
group_width is usually 8 or 10.

* Change ore API to trim on IO that spans two raid groups.
  The user passes offset+length to ore_get_rw_state, the
  ore might trim on that length if spanning a group boundary.
  The user must check ios->length or ios->nrpages to see
  how much IO will be preformed. It is the responsibility
  of the user to re-issue the reminder of the IO.

* Modify exofs To copy spilled pages on to the next IO.
  This means one last kick is needed after all coalescing
  of pages is done.

Signed-off-by: Boaz Harrosh <bharrosh@panasas.com>
diff --git a/fs/exofs/inode.c b/fs/exofs/inode.c
index 61b2f7e..d87c1f7 100644
--- a/fs/exofs/inode.c
+++ b/fs/exofs/inode.c
@@ -259,6 +259,46 @@
 	}
 }
 
+static int _maybe_not_all_in_one_io(struct ore_io_state *ios,
+	struct page_collect *pcol_src, struct page_collect *pcol)
+{
+	/* length was wrong or offset was not page aligned */
+	BUG_ON(pcol_src->nr_pages < ios->nr_pages);
+
+	if (pcol_src->nr_pages > ios->nr_pages) {
+		struct page **src_page;
+		unsigned pages_less = pcol_src->nr_pages - ios->nr_pages;
+		unsigned long len_less = pcol_src->length - ios->length;
+		unsigned i;
+		int ret;
+
+		/* This IO was trimmed */
+		pcol_src->nr_pages = ios->nr_pages;
+		pcol_src->length = ios->length;
+
+		/* Left over pages are passed to the next io */
+		pcol->expected_pages += pages_less;
+		pcol->nr_pages = pages_less;
+		pcol->length = len_less;
+		src_page = pcol_src->pages + pcol_src->nr_pages;
+		pcol->pg_first = (*src_page)->index;
+
+		ret = pcol_try_alloc(pcol);
+		if (unlikely(ret))
+			return ret;
+
+		for (i = 0; i < pages_less; ++i)
+			pcol->pages[i] = *src_page++;
+
+		EXOFS_DBGMSG("Length was adjusted nr_pages=0x%x "
+			"pages_less=0x%x expected_pages=0x%x "
+			"next_offset=0x%llx next_len=0x%lx\n",
+			pcol_src->nr_pages, pages_less, pcol->expected_pages,
+			pcol->pg_first * PAGE_SIZE, pcol->length);
+	}
+	return 0;
+}
+
 static int read_exec(struct page_collect *pcol)
 {
 	struct exofs_i_info *oi = exofs_i(pcol->inode);
@@ -280,7 +320,6 @@
 
 	ios = pcol->ios;
 	ios->pages = pcol->pages;
-	ios->nr_pages = pcol->nr_pages;
 
 	if (pcol->read_4_write) {
 		ore_read(pcol->ios);
@@ -296,17 +335,23 @@
 	*pcol_copy = *pcol;
 	ios->done = readpages_done;
 	ios->private = pcol_copy;
+
+	/* pages ownership was passed to pcol_copy */
+	_pcol_reset(pcol);
+
+	ret = _maybe_not_all_in_one_io(ios, pcol_copy, pcol);
+	if (unlikely(ret))
+		goto err;
+
+	EXOFS_DBGMSG2("read_exec(0x%lx) offset=0x%llx length=0x%llx\n",
+		pcol->inode->i_ino, _LLU(ios->offset), _LLU(ios->length));
+
 	ret = ore_read(ios);
 	if (unlikely(ret))
 		goto err;
 
 	atomic_inc(&pcol->sbi->s_curr_pending);
 
-	EXOFS_DBGMSG2("read_exec obj=0x%llx start=0x%llx length=0x%lx\n",
-		  oi->one_comp.obj.id, _LLU(ios->offset), pcol->length);
-
-	/* pages ownership was passed to pcol_copy */
-	_pcol_reset(pcol);
 	return 0;
 
 err:
@@ -429,6 +474,10 @@
 		return ret;
 	}
 
+	ret = read_exec(&pcol);
+	if (unlikely(ret))
+		return ret;
+
 	return read_exec(&pcol);
 }
 
@@ -519,7 +568,6 @@
 	ret = ore_get_rw_state(&pcol->sbi->layout, &oi->oc, false,
 				 pcol->pg_first << PAGE_CACHE_SHIFT,
 				 pcol->length, &pcol->ios);
-
 	if (unlikely(ret))
 		goto err;
 
@@ -534,10 +582,19 @@
 
 	ios = pcol->ios;
 	ios->pages = pcol_copy->pages;
-	ios->nr_pages = pcol_copy->nr_pages;
 	ios->done = writepages_done;
 	ios->private = pcol_copy;
 
+	/* pages ownership was passed to pcol_copy */
+	_pcol_reset(pcol);
+
+	ret = _maybe_not_all_in_one_io(ios, pcol_copy, pcol);
+	if (unlikely(ret))
+		goto err;
+
+	EXOFS_DBGMSG2("write_exec(0x%lx) offset=0x%llx length=0x%llx\n",
+		pcol->inode->i_ino, _LLU(ios->offset), _LLU(ios->length));
+
 	ret = ore_write(ios);
 	if (unlikely(ret)) {
 		EXOFS_ERR("write_exec: ore_write() Failed\n");
@@ -545,11 +602,6 @@
 	}
 
 	atomic_inc(&pcol->sbi->s_curr_pending);
-	EXOFS_DBGMSG2("write_exec(0x%lx, 0x%llx) start=0x%llx length=0x%lx\n",
-		  pcol->inode->i_ino, pcol->pg_first, _LLU(ios->offset),
-		  pcol->length);
-	/* pages ownership was passed to pcol_copy */
-	_pcol_reset(pcol);
 	return 0;
 
 err:
@@ -689,12 +741,30 @@
 	_pcol_init(&pcol, expected_pages, mapping->host);
 
 	ret = write_cache_pages(mapping, wbc, writepage_strip, &pcol);
-	if (ret) {
+	if (unlikely(ret)) {
 		EXOFS_ERR("write_cache_pages => %d\n", ret);
 		return ret;
 	}
 
-	return write_exec(&pcol);
+	ret = write_exec(&pcol);
+	if (unlikely(ret))
+		return ret;
+
+	if (wbc->sync_mode == WB_SYNC_ALL) {
+		return write_exec(&pcol); /* pump the last reminder */
+	} else if (pcol.nr_pages) {
+		/* not SYNC let the reminder join the next writeout */
+		unsigned i;
+
+		for (i = 0; i < pcol.nr_pages; i++) {
+			struct page *page = pcol.pages[i];
+
+			end_page_writeback(page);
+			set_page_dirty(page);
+			unlock_page(page);
+		}
+	}
+	return 0;
 }
 
 static int exofs_writepage(struct page *page, struct writeback_control *wbc)