Btrfs: Optimize compressed writeback and reads

When reading compressed extents, try to put pages into the page cache
for any pages covered by the compressed extent that readpages didn't already
preload.

Add an async work queue to handle transformations at delayed allocation processing
time.  Right now this is just compression.  The workflow is:

1) Find offsets in the file marked for delayed allocation
2) Lock the pages
3) Lock the state bits
4) Call the async delalloc code

The async delalloc code clears the state lock bits and delalloc bits.  It is
important this happens before the range goes into the work queue because
otherwise it might deadlock with other work queue items that try to lock
those extent bits.

The file pages are compressed, and if the compression doesn't work the
pages are written back directly.

An ordered work queue is used to make sure the inodes are written in the same
order that pdflush or writepages sent them down.

This changes extent_write_cache_pages to let the writepage function
update the wbc nr_written count.

Signed-off-by: Chris Mason <chris.mason@oracle.com>

diff --git a/fs/btrfs/compression.c b/fs/btrfs/compression.c
index 3549131..284f210 100644
--- a/fs/btrfs/compression.c
+++ b/fs/btrfs/compression.c
@@ -33,6 +33,7 @@
 #include <linux/writeback.h>
 #include <linux/bit_spinlock.h>
 #include <linux/version.h>
+#include <linux/pagevec.h>
 #include "ctree.h"
 #include "disk-io.h"
 #include "transaction.h"
@@ -145,9 +146,9 @@
 	}
 
 	/* do io completion on the original bio */
-	if (cb->errors)
+	if (cb->errors) {
 		bio_io_error(cb->orig_bio);
-	else
+	} else
 		bio_endio(cb->orig_bio, 0);
 
 	/* finally free the cb struct */
@@ -333,6 +334,7 @@
 		}
 		bytes_left -= PAGE_CACHE_SIZE;
 		first_byte += PAGE_CACHE_SIZE;
+		cond_resched();
 	}
 	bio_get(bio);
 
@@ -346,6 +348,130 @@
 	return 0;
 }
 
+static noinline int add_ra_bio_pages(struct inode *inode,
+				     u64 compressed_end,
+				     struct compressed_bio *cb)
+{
+	unsigned long end_index;
+	unsigned long page_index;
+	u64 last_offset;
+	u64 isize = i_size_read(inode);
+	int ret;
+	struct page *page;
+	unsigned long nr_pages = 0;
+	struct extent_map *em;
+	struct address_space *mapping = inode->i_mapping;
+	struct pagevec pvec;
+	struct extent_map_tree *em_tree;
+	struct extent_io_tree *tree;
+	u64 end;
+	int misses = 0;
+
+	page = cb->orig_bio->bi_io_vec[cb->orig_bio->bi_vcnt - 1].bv_page;
+	last_offset = (page_offset(page) + PAGE_CACHE_SIZE);
+	em_tree = &BTRFS_I(inode)->extent_tree;
+	tree = &BTRFS_I(inode)->io_tree;
+
+	if (isize == 0)
+		return 0;
+
+	end_index = (i_size_read(inode) - 1) >> PAGE_CACHE_SHIFT;
+
+	pagevec_init(&pvec, 0);
+	while(last_offset < compressed_end) {
+		page_index = last_offset >> PAGE_CACHE_SHIFT;
+
+		if (page_index > end_index)
+			break;
+
+		rcu_read_lock();
+		page = radix_tree_lookup(&mapping->page_tree, page_index);
+		rcu_read_unlock();
+		if (page) {
+			misses++;
+			if (misses > 4)
+				break;
+			goto next;
+		}
+
+		page = alloc_page(mapping_gfp_mask(mapping) | GFP_NOFS);
+		if (!page)
+			break;
+
+		page->index = page_index;
+		/*
+		 * what we want to do here is call add_to_page_cache_lru,
+		 * but that isn't exported, so we reproduce it here
+		 */
+		if (add_to_page_cache(page, mapping,
+				      page->index, GFP_NOFS)) {
+			page_cache_release(page);
+			goto next;
+		}
+
+		/* open coding of lru_cache_add, also not exported */
+		page_cache_get(page);
+		if (!pagevec_add(&pvec, page))
+			__pagevec_lru_add(&pvec);
+
+		end = last_offset + PAGE_CACHE_SIZE - 1;
+		/*
+		 * at this point, we have a locked page in the page cache
+		 * for these bytes in the file.  But, we have to make
+		 * sure they map to this compressed extent on disk.
+		 */
+		set_page_extent_mapped(page);
+		lock_extent(tree, last_offset, end, GFP_NOFS);
+		spin_lock(&em_tree->lock);
+		em = lookup_extent_mapping(em_tree, last_offset,
+					   PAGE_CACHE_SIZE);
+		spin_unlock(&em_tree->lock);
+
+		if (!em || last_offset < em->start ||
+		    (last_offset + PAGE_CACHE_SIZE > extent_map_end(em)) ||
+		    (em->block_start >> 9) != cb->orig_bio->bi_sector) {
+			free_extent_map(em);
+			unlock_extent(tree, last_offset, end, GFP_NOFS);
+			unlock_page(page);
+			page_cache_release(page);
+			break;
+		}
+		free_extent_map(em);
+
+		if (page->index == end_index) {
+			char *userpage;
+			size_t zero_offset = isize & (PAGE_CACHE_SIZE - 1);
+
+			if (zero_offset) {
+				int zeros;
+				zeros = PAGE_CACHE_SIZE - zero_offset;
+				userpage = kmap_atomic(page, KM_USER0);
+				memset(userpage + zero_offset, 0, zeros);
+				flush_dcache_page(page);
+				kunmap_atomic(userpage, KM_USER0);
+			}
+		}
+
+		ret = bio_add_page(cb->orig_bio, page,
+				   PAGE_CACHE_SIZE, 0);
+
+		if (ret == PAGE_CACHE_SIZE) {
+			nr_pages++;
+			page_cache_release(page);
+		} else {
+			unlock_extent(tree, last_offset, end, GFP_NOFS);
+			unlock_page(page);
+			page_cache_release(page);
+			break;
+		}
+next:
+		last_offset += PAGE_CACHE_SIZE;
+	}
+	if (pagevec_count(&pvec))
+		__pagevec_lru_add(&pvec);
+	return 0;
+}
+
 /*
  * for a compressed read, the bio we get passed has all the inode pages
  * in it.  We don't actually do IO on those pages but allocate new ones
@@ -373,6 +499,7 @@
 	struct block_device *bdev;
 	struct bio *comp_bio;
 	u64 cur_disk_byte = (u64)bio->bi_sector << 9;
+	u64 em_len;
 	struct extent_map *em;
 	int ret;
 
@@ -393,6 +520,7 @@
 
 	cb->start = em->start;
 	compressed_len = em->block_len;
+	em_len = em->len;
 	free_extent_map(em);
 
 	cb->len = uncompressed_len;
@@ -411,6 +539,17 @@
 	}
 	cb->nr_pages = nr_pages;
 
+	add_ra_bio_pages(inode, cb->start + em_len, cb);
+
+	if (!btrfs_test_opt(root, NODATASUM) &&
+	    !btrfs_test_flag(inode, NODATASUM)) {
+		btrfs_lookup_bio_sums(root, inode, cb->orig_bio);
+	}
+
+	/* include any pages we added in add_ra-bio_pages */
+	uncompressed_len = bio->bi_vcnt * PAGE_CACHE_SIZE;
+	cb->len = uncompressed_len;
+
 	comp_bio = compressed_bio_alloc(bdev, cur_disk_byte, GFP_NOFS);
 	comp_bio->bi_private = cb;
 	comp_bio->bi_end_io = end_compressed_bio_read;
@@ -442,9 +581,10 @@
 			comp_bio = compressed_bio_alloc(bdev, cur_disk_byte,
 							GFP_NOFS);
 			atomic_inc(&cb->pending_bios);
-			bio->bi_private = cb;
-			bio->bi_end_io = end_compressed_bio_write;
-			bio_add_page(bio, page, PAGE_CACHE_SIZE, 0);
+			comp_bio->bi_private = cb;
+			comp_bio->bi_end_io = end_compressed_bio_read;
+
+			bio_add_page(comp_bio, page, PAGE_CACHE_SIZE, 0);
 		}
 		cur_disk_byte += PAGE_CACHE_SIZE;
 	}