Merge branch 'writeback' of git://git.kernel.dk/linux-2.6-block

* 'writeback' of git://git.kernel.dk/linux-2.6-block:
  writeback: fix possible bdi writeback refcounting problem
  writeback: Fix bdi use after free in wb_work_complete()
  writeback: improve scalability of bdi writeback work queues
  writeback: remove smp_mb(), it's not needed with list_add_tail_rcu()
  writeback: use schedule_timeout_interruptible()
  writeback: add comments to bdi_work structure
  writeback: splice dirty inode entries to default bdi on bdi_destroy()
  writeback: separate starting of sync vs opportunistic writeback
  writeback: inline allocation failure handling in bdi_alloc_queue_work()
  writeback: use RCU to protect bdi_list
  writeback: only use bdi_writeback_all() for WB_SYNC_NONE writeout
  fs: Assign bdi in super_block
  writeback: make wb_writeback() take an argument structure
  writeback: merely wakeup flusher thread if work allocation fails for WB_SYNC_NONE
  writeback: get rid of wbc->for_writepages
  fs: remove bdev->bd_inode_backing_dev_info
diff --git a/fs/afs/write.c b/fs/afs/write.c
index c2e7a7f..c63a3c8 100644
--- a/fs/afs/write.c
+++ b/fs/afs/write.c
@@ -712,7 +712,6 @@
 		.bdi		= mapping->backing_dev_info,
 		.sync_mode	= WB_SYNC_ALL,
 		.nr_to_write	= LONG_MAX,
-		.for_writepages = 1,
 		.range_cyclic	= 1,
 	};
 	int ret;
diff --git a/fs/block_dev.c b/fs/block_dev.c
index 3581a4e..71e7e03 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -420,7 +420,6 @@
 {
 	struct bdev_inode *bdi = BDEV_I(inode);
 
-	bdi->bdev.bd_inode_backing_dev_info = NULL;
 	kmem_cache_free(bdev_cachep, bdi);
 }
 
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index 15831d5..8b81927 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -1600,6 +1600,7 @@
 
 	sb->s_blocksize = 4096;
 	sb->s_blocksize_bits = blksize_bits(4096);
+	sb->s_bdi = &fs_info->bdi;
 
 	/*
 	 * we set the i_size on the btree inode to the max possible int.
diff --git a/fs/btrfs/ordered-data.c b/fs/btrfs/ordered-data.c
index d6f0806..7b2f401 100644
--- a/fs/btrfs/ordered-data.c
+++ b/fs/btrfs/ordered-data.c
@@ -740,7 +740,6 @@
 		.nr_to_write = mapping->nrpages * 2,
 		.range_start = start,
 		.range_end = end,
-		.for_writepages = 1,
 	};
 	return btrfs_writepages(mapping, &wbc);
 }
diff --git a/fs/fs-writeback.c b/fs/fs-writeback.c
index 628235c..8e1e5e1 100644
--- a/fs/fs-writeback.c
+++ b/fs/fs-writeback.c
@@ -35,21 +35,29 @@
 int nr_pdflush_threads;
 
 /*
+ * Passed into wb_writeback(), essentially a subset of writeback_control
+ */
+struct wb_writeback_args {
+	long nr_pages;
+	struct super_block *sb;
+	enum writeback_sync_modes sync_mode;
+	int for_kupdate;
+	int range_cyclic;
+};
+
+/*
  * Work items for the bdi_writeback threads
  */
 struct bdi_work {
-	struct list_head list;
-	struct list_head wait_list;
-	struct rcu_head rcu_head;
+	struct list_head list;		/* pending work list */
+	struct rcu_head rcu_head;	/* for RCU free/clear of work */
 
-	unsigned long seen;
-	atomic_t pending;
+	unsigned long seen;		/* threads that have seen this work */
+	atomic_t pending;		/* number of threads still to do work */
 
-	struct super_block *sb;
-	unsigned long nr_pages;
-	enum writeback_sync_modes sync_mode;
+	struct wb_writeback_args args;	/* writeback arguments */
 
-	unsigned long state;
+	unsigned long state;		/* flag bits, see WS_* */
 };
 
 enum {
@@ -66,22 +74,13 @@
 }
 
 static inline void bdi_work_init(struct bdi_work *work,
-				 struct writeback_control *wbc)
+				 struct wb_writeback_args *args)
 {
 	INIT_RCU_HEAD(&work->rcu_head);
-	work->sb = wbc->sb;
-	work->nr_pages = wbc->nr_to_write;
-	work->sync_mode = wbc->sync_mode;
+	work->args = *args;
 	work->state = WS_USED;
 }
 
-static inline void bdi_work_init_on_stack(struct bdi_work *work,
-					  struct writeback_control *wbc)
-{
-	bdi_work_init(work, wbc);
-	work->state |= WS_ONSTACK;
-}
-
 /**
  * writeback_in_progress - determine whether there is writeback in progress
  * @bdi: the device's backing_dev_info structure.
@@ -98,6 +97,11 @@
 {
 	clear_bit(WS_USED_B, &work->state);
 	smp_mb__after_clear_bit();
+	/*
+	 * work can have disappeared at this point. bit waitq functions
+	 * should be able to tolerate this, provided bdi_sched_wait does
+	 * not dereference it's pointer argument.
+	*/
 	wake_up_bit(&work->state, WS_USED_B);
 }
 
@@ -113,7 +117,8 @@
 
 static void wb_work_complete(struct bdi_work *work)
 {
-	const enum writeback_sync_modes sync_mode = work->sync_mode;
+	const enum writeback_sync_modes sync_mode = work->args.sync_mode;
+	int onstack = bdi_work_on_stack(work);
 
 	/*
 	 * For allocated work, we can clear the done/seen bit right here.
@@ -121,9 +126,9 @@
 	 * to after the RCU grace period, since the stack could be invalidated
 	 * as soon as bdi_work_clear() has done the wakeup.
 	 */
-	if (!bdi_work_on_stack(work))
+	if (!onstack)
 		bdi_work_clear(work);
-	if (sync_mode == WB_SYNC_NONE || bdi_work_on_stack(work))
+	if (sync_mode == WB_SYNC_NONE || onstack)
 		call_rcu(&work->rcu_head, bdi_work_free);
 }
 
@@ -146,21 +151,19 @@
 
 static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work)
 {
-	if (work) {
-		work->seen = bdi->wb_mask;
-		BUG_ON(!work->seen);
-		atomic_set(&work->pending, bdi->wb_cnt);
-		BUG_ON(!bdi->wb_cnt);
+	work->seen = bdi->wb_mask;
+	BUG_ON(!work->seen);
+	atomic_set(&work->pending, bdi->wb_cnt);
+	BUG_ON(!bdi->wb_cnt);
 
-		/*
-		 * Make sure stores are seen before it appears on the list
-		 */
-		smp_mb();
-
-		spin_lock(&bdi->wb_lock);
-		list_add_tail_rcu(&work->list, &bdi->work_list);
-		spin_unlock(&bdi->wb_lock);
-	}
+	/*
+	 * list_add_tail_rcu() contains the necessary barriers to
+	 * make sure the above stores are seen before the item is
+	 * noticed on the list
+	 */
+	spin_lock(&bdi->wb_lock);
+	list_add_tail_rcu(&work->list, &bdi->work_list);
+	spin_unlock(&bdi->wb_lock);
 
 	/*
 	 * If the default thread isn't there, make sure we add it. When
@@ -171,15 +174,7 @@
 	else {
 		struct bdi_writeback *wb = &bdi->wb;
 
-		/*
-		 * If we failed allocating the bdi work item, wake up the wb
-		 * thread always. As a safety precaution, it'll flush out
-		 * everything
-		 */
-		if (!wb_has_dirty_io(wb)) {
-			if (work)
-				wb_clear_pending(wb, work);
-		} else if (wb->task)
+		if (wb->task)
 			wake_up_process(wb->task);
 	}
 }
@@ -194,48 +189,75 @@
 		    TASK_UNINTERRUPTIBLE);
 }
 
-static struct bdi_work *bdi_alloc_work(struct writeback_control *wbc)
+static void bdi_alloc_queue_work(struct backing_dev_info *bdi,
+				 struct wb_writeback_args *args)
 {
 	struct bdi_work *work;
 
+	/*
+	 * This is WB_SYNC_NONE writeback, so if allocation fails just
+	 * wakeup the thread for old dirty data writeback
+	 */
 	work = kmalloc(sizeof(*work), GFP_ATOMIC);
-	if (work)
-		bdi_work_init(work, wbc);
+	if (work) {
+		bdi_work_init(work, args);
+		bdi_queue_work(bdi, work);
+	} else {
+		struct bdi_writeback *wb = &bdi->wb;
 
-	return work;
+		if (wb->task)
+			wake_up_process(wb->task);
+	}
 }
 
-void bdi_start_writeback(struct writeback_control *wbc)
+/**
+ * bdi_sync_writeback - start and wait for writeback
+ * @bdi: the backing device to write from
+ * @sb: write inodes from this super_block
+ *
+ * Description:
+ *   This does WB_SYNC_ALL data integrity writeback and waits for the
+ *   IO to complete. Callers must hold the sb s_umount semaphore for
+ *   reading, to avoid having the super disappear before we are done.
+ */
+static void bdi_sync_writeback(struct backing_dev_info *bdi,
+			       struct super_block *sb)
 {
-	const bool must_wait = wbc->sync_mode == WB_SYNC_ALL;
-	struct bdi_work work_stack, *work = NULL;
+	struct wb_writeback_args args = {
+		.sb		= sb,
+		.sync_mode	= WB_SYNC_ALL,
+		.nr_pages	= LONG_MAX,
+		.range_cyclic	= 0,
+	};
+	struct bdi_work work;
 
-	if (!must_wait)
-		work = bdi_alloc_work(wbc);
+	bdi_work_init(&work, &args);
+	work.state |= WS_ONSTACK;
 
-	if (!work) {
-		work = &work_stack;
-		bdi_work_init_on_stack(work, wbc);
-	}
+	bdi_queue_work(bdi, &work);
+	bdi_wait_on_work_clear(&work);
+}
 
-	bdi_queue_work(wbc->bdi, work);
+/**
+ * bdi_start_writeback - start writeback
+ * @bdi: the backing device to write from
+ * @nr_pages: the number of pages to write
+ *
+ * Description:
+ *   This does WB_SYNC_NONE opportunistic writeback. The IO is only
+ *   started when this function returns, we make no guarentees on
+ *   completion. Caller need not hold sb s_umount semaphore.
+ *
+ */
+void bdi_start_writeback(struct backing_dev_info *bdi, long nr_pages)
+{
+	struct wb_writeback_args args = {
+		.sync_mode	= WB_SYNC_NONE,
+		.nr_pages	= nr_pages,
+		.range_cyclic	= 1,
+	};
 
-	/*
-	 * If the sync mode is WB_SYNC_ALL, block waiting for the work to
-	 * complete. If not, we only need to wait for the work to be started,
-	 * if we allocated it on-stack. We use the same mechanism, if the
-	 * wait bit is set in the bdi_work struct, then threads will not
-	 * clear pending until after they are done.
-	 *
-	 * Note that work == &work_stack if must_wait is true, so we don't
-	 * need to do call_rcu() here ever, since the completion path will
-	 * have done that for us.
-	 */
-	if (must_wait || work == &work_stack) {
-		bdi_wait_on_work_clear(work);
-		if (work != &work_stack)
-			call_rcu(&work->rcu_head, bdi_work_free);
-	}
+	bdi_alloc_queue_work(bdi, &args);
 }
 
 /*
@@ -671,17 +693,16 @@
  * older_than_this takes precedence over nr_to_write.  So we'll only write back
  * all dirty pages if they are all attached to "old" mappings.
  */
-static long wb_writeback(struct bdi_writeback *wb, long nr_pages,
-			 struct super_block *sb,
-			 enum writeback_sync_modes sync_mode, int for_kupdate)
+static long wb_writeback(struct bdi_writeback *wb,
+			 struct wb_writeback_args *args)
 {
 	struct writeback_control wbc = {
 		.bdi			= wb->bdi,
-		.sb			= sb,
-		.sync_mode		= sync_mode,
+		.sb			= args->sb,
+		.sync_mode		= args->sync_mode,
 		.older_than_this	= NULL,
-		.for_kupdate		= for_kupdate,
-		.range_cyclic		= 1,
+		.for_kupdate		= args->for_kupdate,
+		.range_cyclic		= args->range_cyclic,
 	};
 	unsigned long oldest_jif;
 	long wrote = 0;
@@ -691,13 +712,18 @@
 		oldest_jif = jiffies -
 				msecs_to_jiffies(dirty_expire_interval * 10);
 	}
+	if (!wbc.range_cyclic) {
+		wbc.range_start = 0;
+		wbc.range_end = LLONG_MAX;
+	}
 
 	for (;;) {
 		/*
 		 * Don't flush anything for non-integrity writeback where
 		 * no nr_pages was given
 		 */
-		if (!for_kupdate && nr_pages <= 0 && sync_mode == WB_SYNC_NONE)
+		if (!args->for_kupdate && args->nr_pages <= 0 &&
+		     args->sync_mode == WB_SYNC_NONE)
 			break;
 
 		/*
@@ -705,7 +731,8 @@
 		 * periodic background writeout and we are below the
 		 * background dirty threshold, don't do anything
 		 */
-		if (for_kupdate && nr_pages <= 0 && !over_bground_thresh())
+		if (args->for_kupdate && args->nr_pages <= 0 &&
+		    !over_bground_thresh())
 			break;
 
 		wbc.more_io = 0;
@@ -713,7 +740,7 @@
 		wbc.nr_to_write = MAX_WRITEBACK_PAGES;
 		wbc.pages_skipped = 0;
 		writeback_inodes_wb(wb, &wbc);
-		nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
+		args->nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
 		wrote += MAX_WRITEBACK_PAGES - wbc.nr_to_write;
 
 		/*
@@ -731,7 +758,11 @@
 
 /*
  * Return the next bdi_work struct that hasn't been processed by this
- * wb thread yet
+ * wb thread yet. ->seen is initially set for each thread that exists
+ * for this device, when a thread first notices a piece of work it
+ * clears its bit. Depending on writeback type, the thread will notify
+ * completion on either receiving the work (WB_SYNC_NONE) or after
+ * it is done (WB_SYNC_ALL).
  */
 static struct bdi_work *get_next_work_item(struct backing_dev_info *bdi,
 					   struct bdi_writeback *wb)
@@ -741,8 +772,9 @@
 	rcu_read_lock();
 
 	list_for_each_entry_rcu(work, &bdi->work_list, list) {
-		if (!test_and_clear_bit(wb->nr, &work->seen))
+		if (!test_bit(wb->nr, &work->seen))
 			continue;
+		clear_bit(wb->nr, &work->seen);
 
 		ret = work;
 		break;
@@ -767,8 +799,16 @@
 			global_page_state(NR_UNSTABLE_NFS) +
 			(inodes_stat.nr_inodes - inodes_stat.nr_unused);
 
-	if (nr_pages)
-		return wb_writeback(wb, nr_pages, NULL, WB_SYNC_NONE, 1);
+	if (nr_pages) {
+		struct wb_writeback_args args = {
+			.nr_pages	= nr_pages,
+			.sync_mode	= WB_SYNC_NONE,
+			.for_kupdate	= 1,
+			.range_cyclic	= 1,
+		};
+
+		return wb_writeback(wb, &args);
+	}
 
 	return 0;
 }
@@ -780,35 +820,31 @@
 {
 	struct backing_dev_info *bdi = wb->bdi;
 	struct bdi_work *work;
-	long nr_pages, wrote = 0;
+	long wrote = 0;
 
 	while ((work = get_next_work_item(bdi, wb)) != NULL) {
-		enum writeback_sync_modes sync_mode;
-
-		nr_pages = work->nr_pages;
+		struct wb_writeback_args args = work->args;
 
 		/*
 		 * Override sync mode, in case we must wait for completion
 		 */
 		if (force_wait)
-			work->sync_mode = sync_mode = WB_SYNC_ALL;
-		else
-			sync_mode = work->sync_mode;
+			work->args.sync_mode = args.sync_mode = WB_SYNC_ALL;
 
 		/*
 		 * If this isn't a data integrity operation, just notify
 		 * that we have seen this work and we are now starting it.
 		 */
-		if (sync_mode == WB_SYNC_NONE)
+		if (args.sync_mode == WB_SYNC_NONE)
 			wb_clear_pending(wb, work);
 
-		wrote += wb_writeback(wb, nr_pages, work->sb, sync_mode, 0);
+		wrote += wb_writeback(wb, &args);
 
 		/*
 		 * This is a data integrity writeback, so only do the
 		 * notification when we have completed the work.
 		 */
-		if (sync_mode == WB_SYNC_ALL)
+		if (args.sync_mode == WB_SYNC_ALL)
 			wb_clear_pending(wb, work);
 	}
 
@@ -849,8 +885,7 @@
 		}
 
 		wait_jiffies = msecs_to_jiffies(dirty_writeback_interval * 10);
-		set_current_state(TASK_INTERRUPTIBLE);
-		schedule_timeout(wait_jiffies);
+		schedule_timeout_interruptible(wait_jiffies);
 		try_to_freeze();
 	}
 
@@ -858,67 +893,28 @@
 }
 
 /*
- * Schedule writeback for all backing devices. Expensive! If this is a data
- * integrity operation, writeback will be complete when this returns. If
- * we are simply called for WB_SYNC_NONE, then writeback will merely be
- * scheduled to run.
+ * Schedule writeback for all backing devices. This does WB_SYNC_NONE
+ * writeback, for integrity writeback see bdi_sync_writeback().
  */
-static void bdi_writeback_all(struct writeback_control *wbc)
+static void bdi_writeback_all(struct super_block *sb, long nr_pages)
 {
-	const bool must_wait = wbc->sync_mode == WB_SYNC_ALL;
+	struct wb_writeback_args args = {
+		.sb		= sb,
+		.nr_pages	= nr_pages,
+		.sync_mode	= WB_SYNC_NONE,
+	};
 	struct backing_dev_info *bdi;
-	struct bdi_work *work;
-	LIST_HEAD(list);
 
-restart:
-	spin_lock(&bdi_lock);
+	rcu_read_lock();
 
-	list_for_each_entry(bdi, &bdi_list, bdi_list) {
-		struct bdi_work *work;
-
+	list_for_each_entry_rcu(bdi, &bdi_list, bdi_list) {
 		if (!bdi_has_dirty_io(bdi))
 			continue;
 
-		/*
-		 * If work allocation fails, do the writes inline. We drop
-		 * the lock and restart the list writeout. This should be OK,
-		 * since this happens rarely and because the writeout should
-		 * eventually make more free memory available.
-		 */
-		work = bdi_alloc_work(wbc);
-		if (!work) {
-			struct writeback_control __wbc;
-
-			/*
-			 * Not a data integrity writeout, just continue
-			 */
-			if (!must_wait)
-				continue;
-
-			spin_unlock(&bdi_lock);
-			__wbc = *wbc;
-			__wbc.bdi = bdi;
-			writeback_inodes_wbc(&__wbc);
-			goto restart;
-		}
-		if (must_wait)
-			list_add_tail(&work->wait_list, &list);
-
-		bdi_queue_work(bdi, work);
+		bdi_alloc_queue_work(bdi, &args);
 	}
 
-	spin_unlock(&bdi_lock);
-
-	/*
-	 * If this is for WB_SYNC_ALL, wait for pending work to complete
-	 * before returning.
-	 */
-	while (!list_empty(&list)) {
-		work = list_entry(list.next, struct bdi_work, wait_list);
-		list_del(&work->wait_list);
-		bdi_wait_on_work_clear(work);
-		call_rcu(&work->rcu_head, bdi_work_free);
-	}
+	rcu_read_unlock();
 }
 
 /*
@@ -927,17 +923,10 @@
  */
 void wakeup_flusher_threads(long nr_pages)
 {
-	struct writeback_control wbc = {
-		.sync_mode	= WB_SYNC_NONE,
-		.older_than_this = NULL,
-		.range_cyclic	= 1,
-	};
-
 	if (nr_pages == 0)
 		nr_pages = global_page_state(NR_FILE_DIRTY) +
 				global_page_state(NR_UNSTABLE_NFS);
-	wbc.nr_to_write = nr_pages;
-	bdi_writeback_all(&wbc);
+	bdi_writeback_all(NULL, nr_pages);
 }
 
 static noinline void block_dump___mark_inode_dirty(struct inode *inode)
@@ -1084,7 +1073,7 @@
  * on the writer throttling path, and we get decent balancing between many
  * throttled threads: we don't want them all piling up on inode_sync_wait.
  */
-static void wait_sb_inodes(struct writeback_control *wbc)
+static void wait_sb_inodes(struct super_block *sb)
 {
 	struct inode *inode, *old_inode = NULL;
 
@@ -1092,7 +1081,7 @@
 	 * We need to be protected against the filesystem going from
 	 * r/o to r/w or vice versa.
 	 */
-	WARN_ON(!rwsem_is_locked(&wbc->sb->s_umount));
+	WARN_ON(!rwsem_is_locked(&sb->s_umount));
 
 	spin_lock(&inode_lock);
 
@@ -1103,7 +1092,7 @@
 	 * In which case, the inode may not be on the dirty list, but
 	 * we still have to wait for that writeout.
 	 */
-	list_for_each_entry(inode, &wbc->sb->s_inodes, i_sb_list) {
+	list_for_each_entry(inode, &sb->s_inodes, i_sb_list) {
 		struct address_space *mapping;
 
 		if (inode->i_state & (I_FREEING|I_CLEAR|I_WILL_FREE|I_NEW))
@@ -1143,14 +1132,8 @@
  * for IO completion of submitted IO. The number of pages submitted is
  * returned.
  */
-long writeback_inodes_sb(struct super_block *sb)
+void writeback_inodes_sb(struct super_block *sb)
 {
-	struct writeback_control wbc = {
-		.sb		= sb,
-		.sync_mode	= WB_SYNC_NONE,
-		.range_start	= 0,
-		.range_end	= LLONG_MAX,
-	};
 	unsigned long nr_dirty = global_page_state(NR_FILE_DIRTY);
 	unsigned long nr_unstable = global_page_state(NR_UNSTABLE_NFS);
 	long nr_to_write;
@@ -1158,9 +1141,7 @@
 	nr_to_write = nr_dirty + nr_unstable +
 			(inodes_stat.nr_inodes - inodes_stat.nr_unused);
 
-	wbc.nr_to_write = nr_to_write;
-	bdi_writeback_all(&wbc);
-	return nr_to_write - wbc.nr_to_write;
+	bdi_writeback_all(sb, nr_to_write);
 }
 EXPORT_SYMBOL(writeback_inodes_sb);
 
@@ -1171,20 +1152,10 @@
  * This function writes and waits on any dirty inode belonging to this
  * super_block. The number of pages synced is returned.
  */
-long sync_inodes_sb(struct super_block *sb)
+void sync_inodes_sb(struct super_block *sb)
 {
-	struct writeback_control wbc = {
-		.sb		= sb,
-		.sync_mode	= WB_SYNC_ALL,
-		.range_start	= 0,
-		.range_end	= LLONG_MAX,
-	};
-	long nr_to_write = LONG_MAX; /* doesn't actually matter */
-
-	wbc.nr_to_write = nr_to_write;
-	bdi_writeback_all(&wbc);
-	wait_sb_inodes(&wbc);
-	return nr_to_write - wbc.nr_to_write;
+	bdi_sync_writeback(sb->s_bdi, sb);
+	wait_sb_inodes(sb);
 }
 EXPORT_SYMBOL(sync_inodes_sb);
 
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 4567db6..e5dbecd 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -894,6 +894,8 @@
 	if (err)
 		goto err_put_conn;
 
+	sb->s_bdi = &fc->bdi;
+
 	/* Handle umasking inside the fuse code */
 	if (sb->s_flags & MS_POSIXACL)
 		fc->dont_mask = 1;
diff --git a/fs/inode.c b/fs/inode.c
index ae7b67e..b2ba83d 100644
--- a/fs/inode.c
+++ b/fs/inode.c
@@ -182,9 +182,7 @@
 	if (sb->s_bdev) {
 		struct backing_dev_info *bdi;
 
-		bdi = sb->s_bdev->bd_inode_backing_dev_info;
-		if (!bdi)
-			bdi = sb->s_bdev->bd_inode->i_mapping->backing_dev_info;
+		bdi = sb->s_bdev->bd_inode->i_mapping->backing_dev_info;
 		mapping->backing_dev_info = bdi;
 	}
 	inode->i_private = NULL;
diff --git a/fs/jbd2/commit.c b/fs/jbd2/commit.c
index 7b4088b..0df600e 100644
--- a/fs/jbd2/commit.c
+++ b/fs/jbd2/commit.c
@@ -220,7 +220,6 @@
 		.nr_to_write = mapping->nrpages * 2,
 		.range_start = 0,
 		.range_end = i_size_read(mapping->host),
-		.for_writepages = 1,
 	};
 
 	ret = generic_writepages(mapping, &wbc);
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index 867f705..de93569 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -1918,6 +1918,8 @@
 	if (server->flags & NFS_MOUNT_NOAC)
 		sb->s_flags |= MS_SYNCHRONOUS;
 
+	sb->s_bdi = &server->backing_dev_info;
+
 	nfs_super_set_maxbytes(sb, server->maxfilesize);
 }
 
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 120acad..53eb26c 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -1490,7 +1490,6 @@
 		.nr_to_write = LONG_MAX,
 		.range_start = 0,
 		.range_end = LLONG_MAX,
-		.for_writepages = 1,
 	};
 
 	return __nfs_write_mapping(mapping, &wbc, how);
diff --git a/fs/nilfs2/the_nilfs.c b/fs/nilfs2/the_nilfs.c
index d4168e2..ad391a8 100644
--- a/fs/nilfs2/the_nilfs.c
+++ b/fs/nilfs2/the_nilfs.c
@@ -591,9 +591,7 @@
 
 	nilfs->ns_mount_state = le16_to_cpu(sbp->s_state);
 
-	bdi = nilfs->ns_bdev->bd_inode_backing_dev_info;
-	if (!bdi)
-		bdi = nilfs->ns_bdev->bd_inode->i_mapping->backing_dev_info;
+	bdi = nilfs->ns_bdev->bd_inode->i_mapping->backing_dev_info;
 	nilfs->ns_bdi = bdi ? : &default_backing_dev_info;
 
 	/* Finding last segment */
diff --git a/fs/super.c b/fs/super.c
index 9cda337..b03fea8 100644
--- a/fs/super.c
+++ b/fs/super.c
@@ -707,6 +707,12 @@
 {
 	s->s_bdev = data;
 	s->s_dev = s->s_bdev->bd_dev;
+
+	/*
+	 * We set the bdi here to the queue backing, file systems can
+	 * overwrite this in ->fill_super()
+	 */
+	s->s_bdi = &bdev_get_queue(s->s_bdev)->backing_dev_info;
 	return 0;
 }
 
diff --git a/fs/sync.c b/fs/sync.c
index 1923409..c08467a 100644
--- a/fs/sync.c
+++ b/fs/sync.c
@@ -27,6 +27,13 @@
  */
 static int __sync_filesystem(struct super_block *sb, int wait)
 {
+	/*
+	 * This should be safe, as we require bdi backing to actually
+	 * write out data in the first place
+	 */
+	if (!sb->s_bdi)
+		return 0;
+
 	/* Avoid doing twice syncing and cache pruning for quota sync */
 	if (!wait) {
 		writeout_quota_sb(sb, -1);
@@ -101,7 +108,7 @@
 		spin_unlock(&sb_lock);
 
 		down_read(&sb->s_umount);
-		if (!(sb->s_flags & MS_RDONLY) && sb->s_root)
+		if (!(sb->s_flags & MS_RDONLY) && sb->s_root && sb->s_bdi)
 			__sync_filesystem(sb, wait);
 		up_read(&sb->s_umount);
 
diff --git a/fs/ubifs/budget.c b/fs/ubifs/budget.c
index 1c8991b..ee1ce68 100644
--- a/fs/ubifs/budget.c
+++ b/fs/ubifs/budget.c
@@ -54,29 +54,15 @@
  * @nr_to_write: how many dirty pages to write-back
  *
  * This function shrinks UBIFS liability by means of writing back some amount
- * of dirty inodes and their pages. Returns the amount of pages which were
- * written back. The returned value does not include dirty inodes which were
- * synchronized.
+ * of dirty inodes and their pages.
  *
  * Note, this function synchronizes even VFS inodes which are locked
  * (@i_mutex) by the caller of the budgeting function, because write-back does
  * not touch @i_mutex.
  */
-static int shrink_liability(struct ubifs_info *c, int nr_to_write)
+static void shrink_liability(struct ubifs_info *c, int nr_to_write)
 {
-	int nr_written;
-
-	nr_written = writeback_inodes_sb(c->vfs_sb);
-	if (!nr_written) {
-		/*
-		 * Re-try again but wait on pages/inodes which are being
-		 * written-back concurrently (e.g., by pdflush).
-		 */
-		nr_written = sync_inodes_sb(c->vfs_sb);
-	}
-
-	dbg_budg("%d pages were written back", nr_written);
-	return nr_written;
+	writeback_inodes_sb(c->vfs_sb);
 }
 
 /**
diff --git a/fs/ubifs/super.c b/fs/ubifs/super.c
index 51763aa..c4af069 100644
--- a/fs/ubifs/super.c
+++ b/fs/ubifs/super.c
@@ -1980,6 +1980,7 @@
 	if (err)
 		goto out_bdi;
 
+	sb->s_bdi = &c->bdi;
 	sb->s_fs_info = c;
 	sb->s_magic = UBIFS_SUPER_MAGIC;
 	sb->s_blocksize = UBIFS_BLOCK_SIZE;
diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
index f169bcb..0ee33c2 100644
--- a/include/linux/backing-dev.h
+++ b/include/linux/backing-dev.h
@@ -59,6 +59,7 @@
 
 struct backing_dev_info {
 	struct list_head bdi_list;
+	struct rcu_head rcu_head;
 	unsigned long ra_pages;	/* max readahead in PAGE_CACHE_SIZE units */
 	unsigned long state;	/* Always use atomic bitops on this */
 	unsigned int capabilities; /* Device capabilities */
@@ -100,7 +101,7 @@
 		const char *fmt, ...);
 int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
 void bdi_unregister(struct backing_dev_info *bdi);
-void bdi_start_writeback(struct writeback_control *wbc);
+void bdi_start_writeback(struct backing_dev_info *bdi, long nr_pages);
 int bdi_writeback_task(struct bdi_writeback *wb);
 int bdi_has_dirty_io(struct backing_dev_info *bdi);
 
diff --git a/include/linux/fs.h b/include/linux/fs.h
index b21cf6b..90162fb 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -655,7 +655,6 @@
 	int			bd_invalidated;
 	struct gendisk *	bd_disk;
 	struct list_head	bd_list;
-	struct backing_dev_info *bd_inode_backing_dev_info;
 	/*
 	 * Private data.  You must have bd_claim'ed the block_device
 	 * to use this.  NOTE:  bd_claim allows an owner to claim
@@ -1343,6 +1342,7 @@
 	int			s_nr_dentry_unused;	/* # of dentry on lru */
 
 	struct block_device	*s_bdev;
+	struct backing_dev_info *s_bdi;
 	struct mtd_info		*s_mtd;
 	struct list_head	s_instances;
 	struct quota_info	s_dquot;	/* Diskquota specific options */
diff --git a/include/linux/writeback.h b/include/linux/writeback.h
index d347632..75cf586 100644
--- a/include/linux/writeback.h
+++ b/include/linux/writeback.h
@@ -50,7 +50,6 @@
 	unsigned encountered_congestion:1; /* An output: a queue is full */
 	unsigned for_kupdate:1;		/* A kupdate writeback */
 	unsigned for_reclaim:1;		/* Invoked from the page allocator */
-	unsigned for_writepages:1;	/* This is a writepages() call */
 	unsigned range_cyclic:1;	/* range_start is cyclic */
 	unsigned more_io:1;		/* more io to be dispatched */
 	/*
@@ -69,8 +68,8 @@
  */	
 struct bdi_writeback;
 int inode_wait(void *);
-long writeback_inodes_sb(struct super_block *);
-long sync_inodes_sb(struct super_block *);
+void writeback_inodes_sb(struct super_block *);
+void sync_inodes_sb(struct super_block *);
 void writeback_inodes_wbc(struct writeback_control *wbc);
 long wb_do_writeback(struct bdi_writeback *wb, int force_wait);
 void wakeup_flusher_threads(long nr_pages);
diff --git a/include/trace/events/ext4.h b/include/trace/events/ext4.h
index 7d8b5bc..8d433c4e 100644
--- a/include/trace/events/ext4.h
+++ b/include/trace/events/ext4.h
@@ -227,7 +227,6 @@
 		__field(	char,	nonblocking		)
 		__field(	char,	for_kupdate		)
 		__field(	char,	for_reclaim		)
-		__field(	char,	for_writepages		)
 		__field(	char,	range_cyclic		)
 	),
 
@@ -241,16 +240,15 @@
 		__entry->nonblocking	= wbc->nonblocking;
 		__entry->for_kupdate	= wbc->for_kupdate;
 		__entry->for_reclaim	= wbc->for_reclaim;
-		__entry->for_writepages	= wbc->for_writepages;
 		__entry->range_cyclic	= wbc->range_cyclic;
 	),
 
-	TP_printk("dev %s ino %lu nr_t_write %ld pages_skipped %ld range_start %llu range_end %llu nonblocking %d for_kupdate %d for_reclaim %d for_writepages %d range_cyclic %d",
+	TP_printk("dev %s ino %lu nr_t_write %ld pages_skipped %ld range_start %llu range_end %llu nonblocking %d for_kupdate %d for_reclaim %d range_cyclic %d",
 		  jbd2_dev_to_name(__entry->dev), __entry->ino, __entry->nr_to_write,
 		  __entry->pages_skipped, __entry->range_start,
 		  __entry->range_end, __entry->nonblocking,
 		  __entry->for_kupdate, __entry->for_reclaim,
-		  __entry->for_writepages, __entry->range_cyclic)
+		  __entry->range_cyclic)
 );
 
 TRACE_EVENT(ext4_da_writepages_result,
diff --git a/mm/backing-dev.c b/mm/backing-dev.c
index d3ca0da..3d3accb 100644
--- a/mm/backing-dev.c
+++ b/mm/backing-dev.c
@@ -26,6 +26,12 @@
 EXPORT_SYMBOL_GPL(default_backing_dev_info);
 
 static struct class *bdi_class;
+
+/*
+ * bdi_lock protects updates to bdi_list and bdi_pending_list, as well as
+ * reader side protection for bdi_pending_list. bdi_list has RCU reader side
+ * locking.
+ */
 DEFINE_SPINLOCK(bdi_lock);
 LIST_HEAD(bdi_list);
 LIST_HEAD(bdi_pending_list);
@@ -284,9 +290,9 @@
 	/*
 	 * Add us to the active bdi_list
 	 */
-	spin_lock(&bdi_lock);
-	list_add(&bdi->bdi_list, &bdi_list);
-	spin_unlock(&bdi_lock);
+	spin_lock_bh(&bdi_lock);
+	list_add_rcu(&bdi->bdi_list, &bdi_list);
+	spin_unlock_bh(&bdi_lock);
 
 	bdi_task_init(bdi, wb);
 
@@ -389,7 +395,7 @@
 		if (wb_has_dirty_io(me) || !list_empty(&me->bdi->work_list))
 			wb_do_writeback(me, 0);
 
-		spin_lock(&bdi_lock);
+		spin_lock_bh(&bdi_lock);
 
 		/*
 		 * Check if any existing bdi's have dirty data without
@@ -410,7 +416,7 @@
 		if (list_empty(&bdi_pending_list)) {
 			unsigned long wait;
 
-			spin_unlock(&bdi_lock);
+			spin_unlock_bh(&bdi_lock);
 			wait = msecs_to_jiffies(dirty_writeback_interval * 10);
 			schedule_timeout(wait);
 			try_to_freeze();
@@ -426,7 +432,7 @@
 		bdi = list_entry(bdi_pending_list.next, struct backing_dev_info,
 				 bdi_list);
 		list_del_init(&bdi->bdi_list);
-		spin_unlock(&bdi_lock);
+		spin_unlock_bh(&bdi_lock);
 
 		wb = &bdi->wb;
 		wb->task = kthread_run(bdi_start_fn, wb, "flush-%s",
@@ -445,9 +451,9 @@
 			 * a chance to flush other bdi's to free
 			 * memory.
 			 */
-			spin_lock(&bdi_lock);
+			spin_lock_bh(&bdi_lock);
 			list_add_tail(&bdi->bdi_list, &bdi_pending_list);
-			spin_unlock(&bdi_lock);
+			spin_unlock_bh(&bdi_lock);
 
 			bdi_flush_io(bdi);
 		}
@@ -456,6 +462,24 @@
 	return 0;
 }
 
+static void bdi_add_to_pending(struct rcu_head *head)
+{
+	struct backing_dev_info *bdi;
+
+	bdi = container_of(head, struct backing_dev_info, rcu_head);
+	INIT_LIST_HEAD(&bdi->bdi_list);
+
+	spin_lock(&bdi_lock);
+	list_add_tail(&bdi->bdi_list, &bdi_pending_list);
+	spin_unlock(&bdi_lock);
+
+	/*
+	 * We are now on the pending list, wake up bdi_forker_task()
+	 * to finish the job and add us back to the active bdi_list
+	 */
+	wake_up_process(default_backing_dev_info.wb.task);
+}
+
 /*
  * Add the default flusher task that gets created for any bdi
  * that has dirty data pending writeout
@@ -478,16 +502,29 @@
 	 * waiting for previous additions to finish.
 	 */
 	if (!test_and_set_bit(BDI_pending, &bdi->state)) {
-		list_move_tail(&bdi->bdi_list, &bdi_pending_list);
+		list_del_rcu(&bdi->bdi_list);
 
 		/*
-		 * We are now on the pending list, wake up bdi_forker_task()
-		 * to finish the job and add us back to the active bdi_list
+		 * We must wait for the current RCU period to end before
+		 * moving to the pending list. So schedule that operation
+		 * from an RCU callback.
 		 */
-		wake_up_process(default_backing_dev_info.wb.task);
+		call_rcu(&bdi->rcu_head, bdi_add_to_pending);
 	}
 }
 
+/*
+ * Remove bdi from bdi_list, and ensure that it is no longer visible
+ */
+static void bdi_remove_from_list(struct backing_dev_info *bdi)
+{
+	spin_lock_bh(&bdi_lock);
+	list_del_rcu(&bdi->bdi_list);
+	spin_unlock_bh(&bdi_lock);
+
+	synchronize_rcu();
+}
+
 int bdi_register(struct backing_dev_info *bdi, struct device *parent,
 		const char *fmt, ...)
 {
@@ -506,9 +543,9 @@
 		goto exit;
 	}
 
-	spin_lock(&bdi_lock);
-	list_add_tail(&bdi->bdi_list, &bdi_list);
-	spin_unlock(&bdi_lock);
+	spin_lock_bh(&bdi_lock);
+	list_add_tail_rcu(&bdi->bdi_list, &bdi_list);
+	spin_unlock_bh(&bdi_lock);
 
 	bdi->dev = dev;
 
@@ -526,9 +563,7 @@
 			wb->task = NULL;
 			ret = -ENOMEM;
 
-			spin_lock(&bdi_lock);
-			list_del(&bdi->bdi_list);
-			spin_unlock(&bdi_lock);
+			bdi_remove_from_list(bdi);
 			goto exit;
 		}
 	}
@@ -565,9 +600,7 @@
 	/*
 	 * Make sure nobody finds us on the bdi_list anymore
 	 */
-	spin_lock(&bdi_lock);
-	list_del(&bdi->bdi_list);
-	spin_unlock(&bdi_lock);
+	bdi_remove_from_list(bdi);
 
 	/*
 	 * Finally, kill the kernel threads. We don't need to be RCU
@@ -599,6 +632,7 @@
 	bdi->max_ratio = 100;
 	bdi->max_prop_frac = PROP_FRAC_BASE;
 	spin_lock_init(&bdi->wb_lock);
+	INIT_RCU_HEAD(&bdi->rcu_head);
 	INIT_LIST_HEAD(&bdi->bdi_list);
 	INIT_LIST_HEAD(&bdi->wb_list);
 	INIT_LIST_HEAD(&bdi->work_list);
@@ -634,7 +668,19 @@
 {
 	int i;
 
-	WARN_ON(bdi_has_dirty_io(bdi));
+	/*
+	 * Splice our entries to the default_backing_dev_info, if this
+	 * bdi disappears
+	 */
+	if (bdi_has_dirty_io(bdi)) {
+		struct bdi_writeback *dst = &default_backing_dev_info.wb;
+
+		spin_lock(&inode_lock);
+		list_splice(&bdi->wb.b_dirty, &dst->b_dirty);
+		list_splice(&bdi->wb.b_io, &dst->b_io);
+		list_splice(&bdi->wb.b_more_io, &dst->b_more_io);
+		spin_unlock(&inode_lock);
+	}
 
 	bdi_unregister(bdi);
 
diff --git a/mm/page-writeback.c b/mm/page-writeback.c
index dd73d29..1eea4fa 100644
--- a/mm/page-writeback.c
+++ b/mm/page-writeback.c
@@ -315,7 +315,7 @@
 {
 	int ret = 0;
 
-	spin_lock(&bdi_lock);
+	spin_lock_bh(&bdi_lock);
 	if (min_ratio > bdi->max_ratio) {
 		ret = -EINVAL;
 	} else {
@@ -327,7 +327,7 @@
 			ret = -EINVAL;
 		}
 	}
-	spin_unlock(&bdi_lock);
+	spin_unlock_bh(&bdi_lock);
 
 	return ret;
 }
@@ -339,14 +339,14 @@
 	if (max_ratio > 100)
 		return -EINVAL;
 
-	spin_lock(&bdi_lock);
+	spin_lock_bh(&bdi_lock);
 	if (bdi->min_ratio > max_ratio) {
 		ret = -EINVAL;
 	} else {
 		bdi->max_ratio = max_ratio;
 		bdi->max_prop_frac = (PROP_FRAC_BASE * max_ratio) / 100;
 	}
-	spin_unlock(&bdi_lock);
+	spin_unlock_bh(&bdi_lock);
 
 	return ret;
 }
@@ -582,16 +582,8 @@
 	if ((laptop_mode && pages_written) ||
 	    (!laptop_mode && ((nr_writeback = global_page_state(NR_FILE_DIRTY)
 					  + global_page_state(NR_UNSTABLE_NFS))
-					  > background_thresh))) {
-		struct writeback_control wbc = {
-			.bdi		= bdi,
-			.sync_mode	= WB_SYNC_NONE,
-			.nr_to_write	= nr_writeback,
-		};
-
-
-		bdi_start_writeback(&wbc);
-	}
+					  > background_thresh)))
+		bdi_start_writeback(bdi, nr_writeback);
 }
 
 void set_page_dirty_balance(struct page *page, int page_mkwrite)
@@ -1020,12 +1012,10 @@
 
 	if (wbc->nr_to_write <= 0)
 		return 0;
-	wbc->for_writepages = 1;
 	if (mapping->a_ops->writepages)
 		ret = mapping->a_ops->writepages(mapping, wbc);
 	else
 		ret = generic_writepages(mapping, wbc);
-	wbc->for_writepages = 0;
 	return ret;
 }