fs: introduce write_begin, write_end, and perform_write aops

These are intended to replace prepare_write and commit_write with more
flexible alternatives that are also able to avoid the buffered write
deadlock problems efficiently (which prepare_write is unable to do).

[mark.fasheh@oracle.com: API design contributions, code review and fixes]
[akpm@linux-foundation.org: various fixes]
[dmonakhov@sw.ru: new aop block_write_begin fix]
Signed-off-by: Nick Piggin <npiggin@suse.de>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
Signed-off-by: Dmitriy Monakhov <dmonakhov@openvz.org>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
diff --git a/mm/filemap.c b/mm/filemap.c
index 67a03a0..ec25ba1 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1742,11 +1742,20 @@
 	i->count -= bytes;
 }
 
-int iov_iter_fault_in_readable(struct iov_iter *i)
+/*
+ * Fault in the first iovec of the given iov_iter, to a maximum length
+ * of bytes. Returns 0 on success, or non-zero if the memory could not be
+ * accessed (ie. because it is an invalid address).
+ *
+ * writev-intensive code may want this to prefault several iovecs -- that
+ * would be possible (callers must not rely on the fact that _only_ the
+ * first iovec will be faulted with the current implementation).
+ */
+int iov_iter_fault_in_readable(struct iov_iter *i, size_t bytes)
 {
-	size_t seglen = min(i->iov->iov_len - i->iov_offset, i->count);
 	char __user *buf = i->iov->iov_base + i->iov_offset;
-	return fault_in_pages_readable(buf, seglen);
+	bytes = min(bytes, i->iov->iov_len - i->iov_offset);
+	return fault_in_pages_readable(buf, bytes);
 }
 
 /*
@@ -1843,6 +1852,95 @@
 }
 EXPORT_SYMBOL(generic_write_checks);
 
+int pagecache_write_begin(struct file *file, struct address_space *mapping,
+				loff_t pos, unsigned len, unsigned flags,
+				struct page **pagep, void **fsdata)
+{
+	const struct address_space_operations *aops = mapping->a_ops;
+
+	if (aops->write_begin) {
+		return aops->write_begin(file, mapping, pos, len, flags,
+							pagep, fsdata);
+	} else {
+		int ret;
+		pgoff_t index = pos >> PAGE_CACHE_SHIFT;
+		unsigned offset = pos & (PAGE_CACHE_SIZE - 1);
+		struct inode *inode = mapping->host;
+		struct page *page;
+again:
+		page = __grab_cache_page(mapping, index);
+		*pagep = page;
+		if (!page)
+			return -ENOMEM;
+
+		if (flags & AOP_FLAG_UNINTERRUPTIBLE && !PageUptodate(page)) {
+			/*
+			 * There is no way to resolve a short write situation
+			 * for a !Uptodate page (except by double copying in
+			 * the caller done by generic_perform_write_2copy).
+			 *
+			 * Instead, we have to bring it uptodate here.
+			 */
+			ret = aops->readpage(file, page);
+			page_cache_release(page);
+			if (ret) {
+				if (ret == AOP_TRUNCATED_PAGE)
+					goto again;
+				return ret;
+			}
+			goto again;
+		}
+
+		ret = aops->prepare_write(file, page, offset, offset+len);
+		if (ret) {
+			if (ret != AOP_TRUNCATED_PAGE)
+				unlock_page(page);
+			page_cache_release(page);
+			if (pos + len > inode->i_size)
+				vmtruncate(inode, inode->i_size);
+			if (ret == AOP_TRUNCATED_PAGE)
+				goto again;
+		}
+		return ret;
+	}
+}
+EXPORT_SYMBOL(pagecache_write_begin);
+
+int pagecache_write_end(struct file *file, struct address_space *mapping,
+				loff_t pos, unsigned len, unsigned copied,
+				struct page *page, void *fsdata)
+{
+	const struct address_space_operations *aops = mapping->a_ops;
+	int ret;
+
+	if (aops->write_end) {
+		mark_page_accessed(page);
+		ret = aops->write_end(file, mapping, pos, len, copied,
+							page, fsdata);
+	} else {
+		unsigned offset = pos & (PAGE_CACHE_SIZE - 1);
+		struct inode *inode = mapping->host;
+
+		flush_dcache_page(page);
+		ret = aops->commit_write(file, page, offset, offset+len);
+		unlock_page(page);
+		mark_page_accessed(page);
+		page_cache_release(page);
+		BUG_ON(ret == AOP_TRUNCATED_PAGE); /* can't deal with */
+
+		if (ret < 0) {
+			if (pos + len > inode->i_size)
+				vmtruncate(inode, inode->i_size);
+		} else if (ret > 0)
+			ret = min_t(size_t, copied, ret);
+		else
+			ret = copied;
+	}
+
+	return ret;
+}
+EXPORT_SYMBOL(pagecache_write_end);
+
 ssize_t
 generic_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 		unsigned long *nr_segs, loff_t pos, loff_t *ppos,
@@ -1886,8 +1984,7 @@
  * Find or create a page at the given pagecache position. Return the locked
  * page. This function is specifically for buffered writes.
  */
-static struct page *__grab_cache_page(struct address_space *mapping,
-							pgoff_t index)
+struct page *__grab_cache_page(struct address_space *mapping, pgoff_t index)
 {
 	int status;
 	struct page *page;
@@ -1908,20 +2005,16 @@
 	}
 	return page;
 }
+EXPORT_SYMBOL(__grab_cache_page);
 
-ssize_t
-generic_file_buffered_write(struct kiocb *iocb, const struct iovec *iov,
-		unsigned long nr_segs, loff_t pos, loff_t *ppos,
-		size_t count, ssize_t written)
+static ssize_t generic_perform_write_2copy(struct file *file,
+				struct iov_iter *i, loff_t pos)
 {
-	struct file *file = iocb->ki_filp;
 	struct address_space *mapping = file->f_mapping;
 	const struct address_space_operations *a_ops = mapping->a_ops;
-	struct inode 	*inode = mapping->host;
-	long		status = 0;
-	struct iov_iter i;
-
-	iov_iter_init(&i, iov, nr_segs, count, written);
+	struct inode *inode = mapping->host;
+	long status = 0;
+	ssize_t written = 0;
 
 	do {
 		struct page *src_page;
@@ -1934,7 +2027,7 @@
 		offset = (pos & (PAGE_CACHE_SIZE - 1));
 		index = pos >> PAGE_CACHE_SHIFT;
 		bytes = min_t(unsigned long, PAGE_CACHE_SIZE - offset,
-						iov_iter_count(&i));
+						iov_iter_count(i));
 
 		/*
 		 * a non-NULL src_page indicates that we're doing the
@@ -1952,7 +2045,7 @@
 		 * to check that the address is actually valid, when atomic
 		 * usercopies are used, below.
 		 */
-		if (unlikely(iov_iter_fault_in_readable(&i))) {
+		if (unlikely(iov_iter_fault_in_readable(i, bytes))) {
 			status = -EFAULT;
 			break;
 		}
@@ -1983,7 +2076,7 @@
 			 * same reason as we can't take a page fault with a
 			 * page locked (as explained below).
 			 */
-			copied = iov_iter_copy_from_user(src_page, &i,
+			copied = iov_iter_copy_from_user(src_page, i,
 								offset, bytes);
 			if (unlikely(copied == 0)) {
 				status = -EFAULT;
@@ -2008,7 +2101,6 @@
 				page_cache_release(src_page);
 				continue;
 			}
-
 		}
 
 		status = a_ops->prepare_write(file, page, offset, offset+bytes);
@@ -2030,7 +2122,7 @@
 			 * really matter.
 			 */
 			pagefault_disable();
-			copied = iov_iter_copy_from_user_atomic(page, &i,
+			copied = iov_iter_copy_from_user_atomic(page, i,
 								offset, bytes);
 			pagefault_enable();
 		} else {
@@ -2056,9 +2148,9 @@
 		if (src_page)
 			page_cache_release(src_page);
 
-		iov_iter_advance(&i, copied);
-		written += copied;
+		iov_iter_advance(i, copied);
 		pos += copied;
+		written += copied;
 
 		balance_dirty_pages_ratelimited(mapping);
 		cond_resched();
@@ -2082,13 +2174,117 @@
 			continue;
 		else
 			break;
-	} while (iov_iter_count(&i));
-	*ppos = pos;
+	} while (iov_iter_count(i));
 
-	/*
-	 * For now, when the user asks for O_SYNC, we'll actually give O_DSYNC
-	 */
+	return written ? written : status;
+}
+
+static ssize_t generic_perform_write(struct file *file,
+				struct iov_iter *i, loff_t pos)
+{
+	struct address_space *mapping = file->f_mapping;
+	const struct address_space_operations *a_ops = mapping->a_ops;
+	long status = 0;
+	ssize_t written = 0;
+
+	do {
+		struct page *page;
+		pgoff_t index;		/* Pagecache index for current page */
+		unsigned long offset;	/* Offset into pagecache page */
+		unsigned long bytes;	/* Bytes to write to page */
+		size_t copied;		/* Bytes copied from user */
+		void *fsdata;
+
+		offset = (pos & (PAGE_CACHE_SIZE - 1));
+		index = pos >> PAGE_CACHE_SHIFT;
+		bytes = min_t(unsigned long, PAGE_CACHE_SIZE - offset,
+						iov_iter_count(i));
+
+again:
+
+		/*
+		 * Bring in the user page that we will copy from _first_.
+		 * Otherwise there's a nasty deadlock on copying from the
+		 * same page as we're writing to, without it being marked
+		 * up-to-date.
+		 *
+		 * Not only is this an optimisation, but it is also required
+		 * to check that the address is actually valid, when atomic
+		 * usercopies are used, below.
+		 */
+		if (unlikely(iov_iter_fault_in_readable(i, bytes))) {
+			status = -EFAULT;
+			break;
+		}
+
+		status = a_ops->write_begin(file, mapping, pos, bytes, 0,
+						&page, &fsdata);
+		if (unlikely(status))
+			break;
+
+		pagefault_disable();
+		copied = iov_iter_copy_from_user_atomic(page, i, offset, bytes);
+		pagefault_enable();
+		flush_dcache_page(page);
+
+		status = a_ops->write_end(file, mapping, pos, bytes, copied,
+						page, fsdata);
+		if (unlikely(status < 0))
+			break;
+		copied = status;
+
+		cond_resched();
+
+		if (unlikely(copied == 0)) {
+			/*
+			 * If we were unable to copy any data at all, we must
+			 * fall back to a single segment length write.
+			 *
+			 * If we didn't fallback here, we could livelock
+			 * because not all segments in the iov can be copied at
+			 * once without a pagefault.
+			 */
+			bytes = min_t(unsigned long, PAGE_CACHE_SIZE - offset,
+						iov_iter_single_seg_count(i));
+			goto again;
+		}
+		iov_iter_advance(i, copied);
+		pos += copied;
+		written += copied;
+
+		balance_dirty_pages_ratelimited(mapping);
+
+	} while (iov_iter_count(i));
+
+	return written ? written : status;
+}
+
+ssize_t
+generic_file_buffered_write(struct kiocb *iocb, const struct iovec *iov,
+		unsigned long nr_segs, loff_t pos, loff_t *ppos,
+		size_t count, ssize_t written)
+{
+	struct file *file = iocb->ki_filp;
+	struct address_space *mapping = file->f_mapping;
+	const struct address_space_operations *a_ops = mapping->a_ops;
+	struct inode *inode = mapping->host;
+	ssize_t status;
+	struct iov_iter i;
+
+	iov_iter_init(&i, iov, nr_segs, count, written);
+	if (a_ops->write_begin)
+		status = generic_perform_write(file, &i, pos);
+	else
+		status = generic_perform_write_2copy(file, &i, pos);
+
 	if (likely(status >= 0)) {
+		written += status;
+		*ppos = pos + status;
+
+		/*
+		 * For now, when the user asks for O_SYNC, we'll actually give
+		 * O_DSYNC
+		 */
 		if (unlikely((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
 			if (!a_ops->writepage || !is_sync_kiocb(iocb))
 				status = generic_osync_inode(inode, mapping,