[PATCH] md/bitmap: change md/bitmap file handling to use bmap to file blocks

If md is asked to store a bitmap in a file, it tries to hold onto the page
cache pages for that file, manipulate them directly, and call a cocktail of
operations to write the file out.  I don't believe this is a supportable
approach.

This patch changes the approach to use the same approach as swap files.  i.e.
bmap is used to enumerate all the block address of parts of the file and we
write directly to those blocks of the device.

swapfile only uses parts of the file that provide a full pages at contiguous
addresses.  We don't have that luxury so we have to cope with pages that are
non-contiguous in storage.  To handle this we attach buffers to each page, and
store the addresses in those buffers.

With this approach the pagecache may contain data which is inconsistent with
what is on disk.  To alleviate the problems this can cause, md invalidates the
pagecache when releasing the file.  If the file is to be examined while the
array is active (a non-critical but occasionally useful function), O_DIRECT io
must be used.  And new version of mdadm will have support for this.

This approach simplifies a lot of code:
 - we no longer need to keep a list of pages which we need to wait for,
   as the b_endio function can keep track of how many outstanding
   writes there are.  This saves a mempool.
 - -EAGAIN returns from write_page are no longer possible (not sure if
    they ever were actually).

Signed-off-by: Neil Brown <neilb@suse.de>
Signed-off-by: Andrew Morton <akpm@osdl.org>
Signed-off-by: Linus Torvalds <torvalds@osdl.org>
diff --git a/drivers/md/bitmap.c b/drivers/md/bitmap.c
index e40d3e4..bedd66e 100644
--- a/drivers/md/bitmap.c
+++ b/drivers/md/bitmap.c
@@ -67,7 +67,6 @@
 	return bitmap->mddev ? mdname(bitmap->mddev) : "mdX";
 }
 
-#define WRITE_POOL_SIZE 256
 
 /*
  * just a placeholder - calls kmalloc for bitmap pages
@@ -279,75 +278,137 @@
  */
 static int write_page(struct bitmap *bitmap, struct page *page, int wait)
 {
-	int ret = -ENOMEM;
+	struct buffer_head *bh;
 
 	if (bitmap->file == NULL)
 		return write_sb_page(bitmap->mddev, bitmap->offset, page, wait);
 
-	flush_dcache_page(page); /* make sure visible to anyone reading the file */
+	bh = page_buffers(page);
 
-	if (wait)
-		lock_page(page);
-	else {
-		if (TestSetPageLocked(page))
-			return -EAGAIN; /* already locked */
-		if (PageWriteback(page)) {
-			unlock_page(page);
-			return -EAGAIN;
-		}
+	while (bh && bh->b_blocknr) {
+		atomic_inc(&bitmap->pending_writes);
+		set_buffer_locked(bh);
+		set_buffer_mapped(bh);
+		submit_bh(WRITE, bh);
+		bh = bh->b_this_page;
 	}
 
-	ret = page->mapping->a_ops->prepare_write(bitmap->file, page, 0, PAGE_SIZE);
-	if (!ret)
-		ret = page->mapping->a_ops->commit_write(bitmap->file, page, 0,
-			PAGE_SIZE);
-	if (ret) {
-		unlock_page(page);
-		return ret;
+	if (wait) {
+		wait_event(bitmap->write_wait,
+			   atomic_read(&bitmap->pending_writes)==0);
+		return (bitmap->flags & BITMAP_WRITE_ERROR) ? -EIO : 0;
 	}
-
-	set_page_dirty(page); /* force it to be written out */
-
-	if (!wait) {
-		/* add to list to be waited for */
-		struct page_list *item = mempool_alloc(bitmap->write_pool, GFP_NOIO);
-		item->page = page;
-		spin_lock(&bitmap->write_lock);
-		list_add(&item->list, &bitmap->complete_pages);
-		spin_unlock(&bitmap->write_lock);
-	}
-	return write_one_page(page, wait);
+	return 0;
 }
 
-/* read a page from a file, pinning it into cache, and return bytes_read */
-static struct page *read_page(struct file *file, unsigned long index,
-					unsigned long *bytes_read)
+static void end_bitmap_write(struct buffer_head *bh, int uptodate)
 {
-	struct inode *inode = file->f_mapping->host;
+	struct bitmap *bitmap = bh->b_private;
+	unsigned long flags;
+
+	if (!uptodate) {
+		spin_lock_irqsave(&bitmap->lock, flags);
+		bitmap->flags |= BITMAP_WRITE_ERROR;
+		spin_unlock_irqrestore(&bitmap->lock, flags);
+	}
+	if (atomic_dec_and_test(&bitmap->pending_writes))
+		wake_up(&bitmap->write_wait);
+}
+
+/* copied from buffer.c */
+static void
+__clear_page_buffers(struct page *page)
+{
+	ClearPagePrivate(page);
+	set_page_private(page, 0);
+	page_cache_release(page);
+}
+static void free_buffers(struct page *page)
+{
+	struct buffer_head *bh = page_buffers(page);
+
+	while (bh) {
+		struct buffer_head *next = bh->b_this_page;
+		free_buffer_head(bh);
+		bh = next;
+	}
+	__clear_page_buffers(page);
+	put_page(page);
+}
+
+/* read a page from a file.
+ * We both read the page, and attach buffers to the page to record the
+ * address of each block (using bmap).  These addresses will be used
+ * to write the block later, completely bypassing the filesystem.
+ * This usage is similar to how swap files are handled, and allows us
+ * to write to a file with no concerns of memory allocation failing.
+ */
+static struct page *read_page(struct file *file, unsigned long index,
+			      struct bitmap *bitmap,
+			      unsigned long count)
+{
 	struct page *page = NULL;
-	loff_t isize = i_size_read(inode);
-	unsigned long end_index = isize >> PAGE_SHIFT;
+	struct inode *inode = file->f_dentry->d_inode;
+	loff_t pos = index << PAGE_SHIFT;
+	int ret;
+	struct buffer_head *bh;
+	sector_t block;
+	mm_segment_t oldfs;
 
 	PRINTK("read bitmap file (%dB @ %Lu)\n", (int)PAGE_SIZE,
 			(unsigned long long)index << PAGE_SHIFT);
 
-	page = read_cache_page(inode->i_mapping, index,
-			(filler_t *)inode->i_mapping->a_ops->readpage, file);
+	page = alloc_page(GFP_KERNEL);
+	if (!page)
+		page = ERR_PTR(-ENOMEM);
 	if (IS_ERR(page))
 		goto out;
-	wait_on_page_locked(page);
-	if (!PageUptodate(page) || PageError(page)) {
+
+	oldfs = get_fs();
+	set_fs(KERNEL_DS);
+	ret = vfs_read(file, (char __user*) page_address(page), count, &pos);
+	set_fs(oldfs);
+
+	if (ret >= 0 && ret != count)
+		ret = -EIO;
+	if (ret < 0) {
 		put_page(page);
-		page = ERR_PTR(-EIO);
+		page = ERR_PTR(ret);
 		goto out;
 	}
+	bh = alloc_page_buffers(page, 1<<inode->i_blkbits, 0);
+	if (!bh) {
+		put_page(page);
+		page = ERR_PTR(-ENOMEM);
+		goto out;
+	}
+	attach_page_buffers(page, bh);
+	block = index << (PAGE_SHIFT - inode->i_blkbits);
+	while (bh) {
+		if (count == 0)
+			bh->b_blocknr = 0;
+		else {
+			bh->b_blocknr = bmap(inode, block);
+			if (bh->b_blocknr == 0) {
+				/* Cannot use this file! */
+				free_buffers(page);
+				page = ERR_PTR(-EINVAL);
+				goto out;
+			}
+			bh->b_bdev = inode->i_sb->s_bdev;
+			if (count < (1<<inode->i_blkbits))
+				count = 0;
+			else
+				count -= (1<<inode->i_blkbits);
 
-	if (index > end_index) /* we have read beyond EOF */
-		*bytes_read = 0;
-	else if (index == end_index) /* possible short read */
-		*bytes_read = isize & ~PAGE_MASK;
-	else
-		*bytes_read = PAGE_SIZE; /* got a full page */
+			bh->b_end_io = end_bitmap_write;
+			bh->b_private = bitmap;
+		}
+		block++;
+		bh = bh->b_this_page;
+	}
+
+	page->index = index;
 out:
 	if (IS_ERR(page))
 		printk(KERN_ALERT "md: bitmap read error: (%dB @ %Lu): %ld\n",
@@ -418,16 +479,14 @@
 	char *reason = NULL;
 	bitmap_super_t *sb;
 	unsigned long chunksize, daemon_sleep, write_behind;
-	unsigned long bytes_read;
 	unsigned long long events;
 	int err = -EINVAL;
 
 	/* page 0 is the superblock, read it... */
 	if (bitmap->file)
-		bitmap->sb_page = read_page(bitmap->file, 0, &bytes_read);
+		bitmap->sb_page = read_page(bitmap->file, 0, bitmap, PAGE_SIZE);
 	else {
 		bitmap->sb_page = read_sb_page(bitmap->mddev, bitmap->offset, 0);
-		bytes_read = PAGE_SIZE;
 	}
 	if (IS_ERR(bitmap->sb_page)) {
 		err = PTR_ERR(bitmap->sb_page);
@@ -437,13 +496,6 @@
 
 	sb = (bitmap_super_t *)kmap_atomic(bitmap->sb_page, KM_USER0);
 
-	if (bytes_read < sizeof(*sb)) { /* short read */
-		printk(KERN_INFO "%s: bitmap file superblock truncated\n",
-			bmname(bitmap));
-		err = -ENOSPC;
-		goto out;
-	}
-
 	chunksize = le32_to_cpu(sb->chunksize);
 	daemon_sleep = le32_to_cpu(sb->daemon_sleep);
 	write_behind = le32_to_cpu(sb->write_behind);
@@ -589,37 +641,12 @@
 
 	while (pages--)
 		if (map[pages]->index != 0) /* 0 is sb_page, release it below */
-			put_page(map[pages]);
+			free_buffers(map[pages]);
 	kfree(map);
 	kfree(attr);
 
-	safe_put_page(sb_page);
-}
-
-/* dequeue the next item in a page list -- don't call from irq context */
-static struct page_list *dequeue_page(struct bitmap *bitmap)
-{
-	struct page_list *item = NULL;
-	struct list_head *head = &bitmap->complete_pages;
-
-	spin_lock(&bitmap->write_lock);
-	if (list_empty(head))
-		goto out;
-	item = list_entry(head->prev, struct page_list, list);
-	list_del(head->prev);
-out:
-	spin_unlock(&bitmap->write_lock);
-	return item;
-}
-
-static void drain_write_queues(struct bitmap *bitmap)
-{
-	struct page_list *item;
-
-	while ((item = dequeue_page(bitmap))) {
-		/* don't bother to wait */
-		mempool_free(item, bitmap->write_pool);
-	}
+	if (sb_page)
+		free_buffers(sb_page);
 }
 
 static void bitmap_file_put(struct bitmap *bitmap)
@@ -632,12 +659,16 @@
 	bitmap->file = NULL;
 	spin_unlock_irqrestore(&bitmap->lock, flags);
 
-	drain_write_queues(bitmap);
-
+	if (file)
+		wait_event(bitmap->write_wait,
+			   atomic_read(&bitmap->pending_writes)==0);
 	bitmap_file_unmap(bitmap);
 
-	if (file)
+	if (file) {
+		struct inode *inode = file->f_dentry->d_inode;
+		invalidate_inode_pages(inode->i_mapping);
 		fput(file);
+	}
 }
 
 
@@ -728,8 +759,6 @@
 
 }
 
-static void bitmap_writeback(struct bitmap *bitmap);
-
 /* this gets called when the md device is ready to unplug its underlying
  * (slave) device queues -- before we let any writes go down, we need to
  * sync the dirty pages of the bitmap file to disk */
@@ -761,24 +790,18 @@
 			wait = 1;
 		spin_unlock_irqrestore(&bitmap->lock, flags);
 
-		if (dirty | need_write) {
+		if (dirty | need_write)
 			err = write_page(bitmap, page, 0);
-			if (err == -EAGAIN) {
-				if (dirty)
-					err = write_page(bitmap, page, 1);
-				else
-					err = 0;
-			}
-			if (err)
-				return 1;
-		}
 	}
 	if (wait) { /* if any writes were performed, we need to wait on them */
 		if (bitmap->file)
-			bitmap_writeback(bitmap);
+			wait_event(bitmap->write_wait,
+				   atomic_read(&bitmap->pending_writes)==0);
 		else
 			md_super_wait(bitmap->mddev);
 	}
+	if (bitmap->flags & BITMAP_WRITE_ERROR)
+		bitmap_file_kick(bitmap);
 	return 0;
 }
 
@@ -800,7 +823,7 @@
 	struct page *page = NULL, *oldpage = NULL;
 	unsigned long num_pages, bit_cnt = 0;
 	struct file *file;
-	unsigned long bytes, offset, dummy;
+	unsigned long bytes, offset;
 	int outofdate;
 	int ret = -ENOSPC;
 	void *paddr;
@@ -853,7 +876,12 @@
 		index = file_page_index(i);
 		bit = file_page_offset(i);
 		if (index != oldindex) { /* this is a new page, read it in */
+			int count;
 			/* unmap the old page, we're done with it */
+			if (index == num_pages-1)
+				count = bytes - index * PAGE_SIZE;
+			else
+				count = PAGE_SIZE;
 			if (index == 0) {
 				/*
 				 * if we're here then the superblock page
@@ -863,7 +891,7 @@
 				page = bitmap->sb_page;
 				offset = sizeof(bitmap_super_t);
 			} else if (file) {
-				page = read_page(file, index, &dummy);
+				page = read_page(file, index, bitmap, count);
 				offset = 0;
 			} else {
 				page = read_sb_page(bitmap->mddev, bitmap->offset, index);
@@ -999,9 +1027,6 @@
 				spin_unlock_irqrestore(&bitmap->lock, flags);
 				if (need_write) {
 					switch (write_page(bitmap, page, 0)) {
-					case -EAGAIN:
-						set_page_attr(bitmap, page, BITMAP_PAGE_NEEDWRITE);
-						break;
 					case 0:
 						break;
 					default:
@@ -1017,10 +1042,6 @@
 					clear_page_attr(bitmap, lastpage, BITMAP_PAGE_NEEDWRITE);
 					spin_unlock_irqrestore(&bitmap->lock, flags);
 					err = write_page(bitmap, lastpage, 0);
-					if (err == -EAGAIN) {
-						err = 0;
-						set_page_attr(bitmap, lastpage, BITMAP_PAGE_NEEDWRITE);
-					}
 				} else {
 					set_page_attr(bitmap, lastpage, BITMAP_PAGE_NEEDWRITE);
 					spin_unlock_irqrestore(&bitmap->lock, flags);
@@ -1070,10 +1091,6 @@
 			clear_page_attr(bitmap, lastpage, BITMAP_PAGE_NEEDWRITE);
 			spin_unlock_irqrestore(&bitmap->lock, flags);
 			err = write_page(bitmap, lastpage, 0);
-			if (err == -EAGAIN) {
-				set_page_attr(bitmap, lastpage, BITMAP_PAGE_NEEDWRITE);
-				err = 0;
-			}
 		} else {
 			set_page_attr(bitmap, lastpage, BITMAP_PAGE_NEEDWRITE);
 			spin_unlock_irqrestore(&bitmap->lock, flags);
@@ -1083,32 +1100,6 @@
 	return err;
 }
 
-static void bitmap_writeback(struct bitmap *bitmap)
-{
-	struct page *page;
-	struct page_list *item;
-	int err = 0;
-
-	PRINTK("%s: bitmap writeback daemon woke up...\n", bmname(bitmap));
-	/* wait on bitmap page writebacks */
-	while ((item = dequeue_page(bitmap))) {
-		page = item->page;
-		mempool_free(item, bitmap->write_pool);
-		PRINTK("wait on page writeback: %p\n", page);
-		wait_on_page_writeback(page);
-		PRINTK("finished page writeback: %p\n", page);
-
-		err = PageError(page);
-		if (err) {
-			printk(KERN_WARNING "%s: bitmap file writeback "
-			       "failed (page %lu): %d\n",
-			       bmname(bitmap), page->index, err);
-			bitmap_file_kick(bitmap);
-			break;
-		}
-	}
-}
-
 static bitmap_counter_t *bitmap_get_counter(struct bitmap *bitmap,
 					    sector_t offset, int *blocks,
 					    int create)
@@ -1377,8 +1368,6 @@
 
 	/* free all allocated memory */
 
-	mempool_destroy(bitmap->write_pool);
-
 	if (bp) /* deallocate the page memory */
 		for (k = 0; k < pages; k++)
 			if (bp[k].map && !bp[k].hijacked)
@@ -1428,17 +1417,13 @@
 	spin_lock_init(&bitmap->lock);
 	bitmap->mddev = mddev;
 
-	spin_lock_init(&bitmap->write_lock);
-	INIT_LIST_HEAD(&bitmap->complete_pages);
-	bitmap->write_pool = mempool_create_kmalloc_pool(WRITE_POOL_SIZE,
-						sizeof(struct page_list));
-	err = -ENOMEM;
-	if (!bitmap->write_pool)
-		goto error;
-
 	bitmap->file = file;
 	bitmap->offset = mddev->bitmap_offset;
 	if (file) get_file(file);
+
+	/* Ensure we read fresh data */
+	invalidate_inode_pages(file->f_dentry->d_inode->i_mapping);
+
 	/* read superblock from bitmap file (this sets bitmap->chunksize) */
 	err = bitmap_read_sb(bitmap);
 	if (err)
@@ -1461,6 +1446,9 @@
 
 	bitmap->syncchunk = ~0UL;
 
+	atomic_set(&bitmap->pending_writes, 0);
+	init_waitqueue_head(&bitmap->write_wait);
+
 #ifdef INJECT_FATAL_FAULT_1
 	bitmap->bp = NULL;
 #else
@@ -1503,4 +1491,3 @@
 EXPORT_SYMBOL(bitmap_end_sync);
 EXPORT_SYMBOL(bitmap_unplug);
 EXPORT_SYMBOL(bitmap_close_sync);
-EXPORT_SYMBOL(bitmap_daemon_work);