[XFS] Introduce per-filesystem delwri pagebuf flushing to reduce
contention between filesystems and prevent deadlocks between filesystems
when a flush dependency exists between them.

SGI-PV: 947098
SGI-Modid: xfs-linux-melb:xfs-kern:24844a

Signed-off-by: David Chinner <dgc@sgi.com>
Signed-off-by: Nathan Scott <nathans@sgi.com>
diff --git a/fs/xfs/linux-2.6/xfs_buf.c b/fs/xfs/linux-2.6/xfs_buf.c
index 6fe21d2..2a8acd3 100644
--- a/fs/xfs/linux-2.6/xfs_buf.c
+++ b/fs/xfs/linux-2.6/xfs_buf.c
@@ -33,6 +33,7 @@
 
 STATIC kmem_cache_t *pagebuf_zone;
 STATIC kmem_shaker_t pagebuf_shake;
+STATIC int xfsbufd(void *);
 STATIC int xfsbufd_wakeup(int, gfp_t);
 STATIC void pagebuf_delwri_queue(xfs_buf_t *, int);
 
@@ -1492,6 +1493,30 @@
 	btp->bt_hash = NULL;
 }
 
+/*
+ * buftarg list for delwrite queue processing
+ */
+STATIC LIST_HEAD(xfs_buftarg_list);
+STATIC DEFINE_SPINLOCK(xfs_buftarg_lock);
+
+STATIC void
+xfs_register_buftarg(
+	xfs_buftarg_t           *btp)
+{
+	spin_lock(&xfs_buftarg_lock);
+	list_add(&btp->bt_list, &xfs_buftarg_list);
+	spin_unlock(&xfs_buftarg_lock);
+}
+
+STATIC void
+xfs_unregister_buftarg(
+	xfs_buftarg_t           *btp)
+{
+	spin_lock(&xfs_buftarg_lock);
+	list_del(&btp->bt_list);
+	spin_unlock(&xfs_buftarg_lock);
+}
+
 void
 xfs_free_buftarg(
 	xfs_buftarg_t		*btp,
@@ -1502,6 +1527,12 @@
 		xfs_blkdev_put(btp->pbr_bdev);
 	xfs_free_bufhash(btp);
 	iput(btp->pbr_mapping->host);
+
+	/* unregister the buftarg first so that we don't get a
+	 * wakeup finding a non-existent task */
+	xfs_unregister_buftarg(btp);
+	kthread_stop(btp->bt_task);
+
 	kmem_free(btp, sizeof(*btp));
 }
 
@@ -1591,6 +1622,26 @@
 	return 0;
 }
 
+STATIC int
+xfs_alloc_delwrite_queue(
+	xfs_buftarg_t		*btp)
+{
+	int	error = 0;
+
+	INIT_LIST_HEAD(&btp->bt_list);
+	INIT_LIST_HEAD(&btp->bt_delwrite_queue);
+	spinlock_init(&btp->bt_delwrite_lock, "delwri_lock");
+	btp->bt_flags = 0;
+	btp->bt_task = kthread_run(xfsbufd, btp, "xfsbufd");
+	if (IS_ERR(btp->bt_task)) {
+		error = PTR_ERR(btp->bt_task);
+		goto out_error;
+	}
+	xfs_register_buftarg(btp);
+out_error:
+	return error;
+}
+
 xfs_buftarg_t *
 xfs_alloc_buftarg(
 	struct block_device	*bdev,
@@ -1606,6 +1657,8 @@
 		goto error;
 	if (xfs_mapping_buftarg(btp, bdev))
 		goto error;
+	if (xfs_alloc_delwrite_queue(btp))
+		goto error;
 	xfs_alloc_bufhash(btp, external);
 	return btp;
 
@@ -1618,20 +1671,19 @@
 /*
  * Pagebuf delayed write buffer handling
  */
-
-STATIC LIST_HEAD(pbd_delwrite_queue);
-STATIC DEFINE_SPINLOCK(pbd_delwrite_lock);
-
 STATIC void
 pagebuf_delwri_queue(
 	xfs_buf_t		*pb,
 	int			unlock)
 {
+	struct list_head	*dwq = &pb->pb_target->bt_delwrite_queue;
+	spinlock_t		*dwlk = &pb->pb_target->bt_delwrite_lock;
+
 	PB_TRACE(pb, "delwri_q", (long)unlock);
 	ASSERT((pb->pb_flags & (PBF_DELWRI|PBF_ASYNC)) ==
 					(PBF_DELWRI|PBF_ASYNC));
 
-	spin_lock(&pbd_delwrite_lock);
+	spin_lock(dwlk);
 	/* If already in the queue, dequeue and place at tail */
 	if (!list_empty(&pb->pb_list)) {
 		ASSERT(pb->pb_flags & _PBF_DELWRI_Q);
@@ -1642,9 +1694,9 @@
 	}
 
 	pb->pb_flags |= _PBF_DELWRI_Q;
-	list_add_tail(&pb->pb_list, &pbd_delwrite_queue);
+	list_add_tail(&pb->pb_list, dwq);
 	pb->pb_queuetime = jiffies;
-	spin_unlock(&pbd_delwrite_lock);
+	spin_unlock(dwlk);
 
 	if (unlock)
 		pagebuf_unlock(pb);
@@ -1654,16 +1706,17 @@
 pagebuf_delwri_dequeue(
 	xfs_buf_t		*pb)
 {
+	spinlock_t		*dwlk = &pb->pb_target->bt_delwrite_lock;
 	int			dequeued = 0;
 
-	spin_lock(&pbd_delwrite_lock);
+	spin_lock(dwlk);
 	if ((pb->pb_flags & PBF_DELWRI) && !list_empty(&pb->pb_list)) {
 		ASSERT(pb->pb_flags & _PBF_DELWRI_Q);
 		list_del_init(&pb->pb_list);
 		dequeued = 1;
 	}
 	pb->pb_flags &= ~(PBF_DELWRI|_PBF_DELWRI_Q);
-	spin_unlock(&pbd_delwrite_lock);
+	spin_unlock(dwlk);
 
 	if (dequeued)
 		pagebuf_rele(pb);
@@ -1678,21 +1731,22 @@
 	flush_workqueue(queue);
 }
 
-/* Defines for pagebuf daemon */
-STATIC struct task_struct *xfsbufd_task;
-STATIC int xfsbufd_force_flush;
-STATIC int xfsbufd_force_sleep;
-
 STATIC int
 xfsbufd_wakeup(
 	int			priority,
 	gfp_t			mask)
 {
-	if (xfsbufd_force_sleep)
-		return 0;
-	xfsbufd_force_flush = 1;
-	barrier();
-	wake_up_process(xfsbufd_task);
+	xfs_buftarg_t		*btp, *n;
+
+	spin_lock(&xfs_buftarg_lock);
+	list_for_each_entry_safe(btp, n, &xfs_buftarg_list, bt_list) {
+		if (test_bit(BT_FORCE_SLEEP, &btp->bt_flags))
+			continue;
+		set_bit(BT_FORCE_FLUSH, &btp->bt_flags);
+		barrier();
+		wake_up_process(btp->bt_task);
+	}
+	spin_unlock(&xfs_buftarg_lock);
 	return 0;
 }
 
@@ -1702,31 +1756,34 @@
 {
 	struct list_head	tmp;
 	unsigned long		age;
-	xfs_buftarg_t		*target;
+	xfs_buftarg_t		*target = (xfs_buftarg_t *)data;
 	xfs_buf_t		*pb, *n;
+	struct list_head	*dwq = &target->bt_delwrite_queue;
+	spinlock_t		*dwlk = &target->bt_delwrite_lock;
 
 	current->flags |= PF_MEMALLOC;
 
 	INIT_LIST_HEAD(&tmp);
 	do {
 		if (unlikely(freezing(current))) {
-			xfsbufd_force_sleep = 1;
+			set_bit(BT_FORCE_SLEEP, &target->bt_flags);
 			refrigerator();
 		} else {
-			xfsbufd_force_sleep = 0;
+			clear_bit(BT_FORCE_SLEEP, &target->bt_flags);
 		}
 
 		schedule_timeout_interruptible(
 			xfs_buf_timer_centisecs * msecs_to_jiffies(10));
 
 		age = xfs_buf_age_centisecs * msecs_to_jiffies(10);
-		spin_lock(&pbd_delwrite_lock);
-		list_for_each_entry_safe(pb, n, &pbd_delwrite_queue, pb_list) {
+		spin_lock(dwlk);
+		list_for_each_entry_safe(pb, n, dwq, pb_list) {
 			PB_TRACE(pb, "walkq1", (long)pagebuf_ispin(pb));
 			ASSERT(pb->pb_flags & PBF_DELWRI);
 
 			if (!pagebuf_ispin(pb) && !pagebuf_cond_lock(pb)) {
-				if (!xfsbufd_force_flush &&
+				if (!test_bit(BT_FORCE_FLUSH,
+						&target->bt_flags) &&
 				    time_before(jiffies,
 						pb->pb_queuetime + age)) {
 					pagebuf_unlock(pb);
@@ -1738,11 +1795,11 @@
 				list_move(&pb->pb_list, &tmp);
 			}
 		}
-		spin_unlock(&pbd_delwrite_lock);
+		spin_unlock(dwlk);
 
 		while (!list_empty(&tmp)) {
 			pb = list_entry(tmp.next, xfs_buf_t, pb_list);
-			target = pb->pb_target;
+			ASSERT(target == pb->pb_target);
 
 			list_del_init(&pb->pb_list);
 			pagebuf_iostrategy(pb);
@@ -1753,7 +1810,7 @@
 		if (as_list_len > 0)
 			purge_addresses();
 
-		xfsbufd_force_flush = 0;
+		clear_bit(BT_FORCE_FLUSH, &target->bt_flags);
 	} while (!kthread_should_stop());
 
 	return 0;
@@ -1772,17 +1829,17 @@
 	struct list_head	tmp;
 	xfs_buf_t		*pb, *n;
 	int			pincount = 0;
+	struct list_head	*dwq = &target->bt_delwrite_queue;
+	spinlock_t		*dwlk = &target->bt_delwrite_lock;
 
 	pagebuf_runall_queues(xfsdatad_workqueue);
 	pagebuf_runall_queues(xfslogd_workqueue);
 
 	INIT_LIST_HEAD(&tmp);
-	spin_lock(&pbd_delwrite_lock);
-	list_for_each_entry_safe(pb, n, &pbd_delwrite_queue, pb_list) {
+	spin_lock(dwlk);
+	list_for_each_entry_safe(pb, n, dwq, pb_list) {
 
-		if (pb->pb_target != target)
-			continue;
-
+		ASSERT(pb->pb_target == target);
 		ASSERT(pb->pb_flags & (PBF_DELWRI|_PBF_DELWRI_Q));
 		PB_TRACE(pb, "walkq2", (long)pagebuf_ispin(pb));
 		if (pagebuf_ispin(pb)) {
@@ -1792,7 +1849,7 @@
 
 		list_move(&pb->pb_list, &tmp);
 	}
-	spin_unlock(&pbd_delwrite_lock);
+	spin_unlock(dwlk);
 
 	/*
 	 * Dropped the delayed write list lock, now walk the temporary list
@@ -1847,20 +1904,12 @@
 	if (!xfsdatad_workqueue)
 		goto out_destroy_xfslogd_workqueue;
 
-	xfsbufd_task = kthread_run(xfsbufd, NULL, "xfsbufd");
-	if (IS_ERR(xfsbufd_task)) {
-		error = PTR_ERR(xfsbufd_task);
-		goto out_destroy_xfsdatad_workqueue;
-	}
-
 	pagebuf_shake = kmem_shake_register(xfsbufd_wakeup);
 	if (!pagebuf_shake)
-		goto out_stop_xfsbufd;
+		goto out_destroy_xfsdatad_workqueue;
 
 	return 0;
 
- out_stop_xfsbufd:
-	kthread_stop(xfsbufd_task);
  out_destroy_xfsdatad_workqueue:
 	destroy_workqueue(xfsdatad_workqueue);
  out_destroy_xfslogd_workqueue:
@@ -1878,7 +1927,6 @@
 pagebuf_terminate(void)
 {
 	kmem_shake_deregister(pagebuf_shake);
-	kthread_stop(xfsbufd_task);
 	destroy_workqueue(xfsdatad_workqueue);
 	destroy_workqueue(xfslogd_workqueue);
 	kmem_zone_destroy(pagebuf_zone);