xfs: Don't issue buffer IO direct from AIL push V2

All buffers logged into the AIL are marked as delayed write.
When the AIL needs to push the buffer out, it issues an async write of the
buffer. This means that IO patterns are dependent on the order of
buffers in the AIL.

Instead of flushing the buffer, promote the buffer in the delayed
write list so that the next time the xfsbufd is run the buffer will
be flushed by the xfsbufd. Return the state to the xfsaild that the
buffer was promoted so that the xfsaild knows that it needs to cause
the xfsbufd to run to flush the buffers that were promoted.

Using the xfsbufd for issuing the IO allows us to dispatch all
buffer IO from the one queue. This means that we can make much more
enlightened decisions on what order to flush buffers to disk as
we don't have multiple places issuing IO. Optimisations to xfsbufd
will be in a future patch.

Version 2
- kill XFS_ITEM_FLUSHING as it is now unused.

Signed-off-by: Dave Chinner <david@fromorbit.com>
Reviewed-by: Christoph Hellwig <hch@lst.de>
diff --git a/fs/xfs/quota/xfs_dquot_item.c b/fs/xfs/quota/xfs_dquot_item.c
index 1b56437..dda0fb0 100644
--- a/fs/xfs/quota/xfs_dquot_item.c
+++ b/fs/xfs/quota/xfs_dquot_item.c
@@ -212,66 +212,31 @@
 	xfs_dquot_t	*dqp;
 	xfs_mount_t	*mp;
 	xfs_buf_t	*bp;
-	uint		dopush;
 
 	dqp = qip->qli_dquot;
 	ASSERT(XFS_DQ_IS_LOCKED(dqp));
 
 	/*
-	 * The qli_pushbuf_flag keeps others from
-	 * trying to duplicate our effort.
-	 */
-	ASSERT(qip->qli_pushbuf_flag != 0);
-	ASSERT(qip->qli_push_owner == current_pid());
-
-	/*
 	 * If flushlock isn't locked anymore, chances are that the
 	 * inode flush completed and the inode was taken off the AIL.
 	 * So, just get out.
 	 */
 	if (completion_done(&dqp->q_flush)  ||
 	    ((qip->qli_item.li_flags & XFS_LI_IN_AIL) == 0)) {
-		qip->qli_pushbuf_flag = 0;
 		xfs_dqunlock(dqp);
 		return;
 	}
 	mp = dqp->q_mount;
 	bp = xfs_incore(mp->m_ddev_targp, qip->qli_format.qlf_blkno,
 		    XFS_QI_DQCHUNKLEN(mp), XBF_TRYLOCK);
-	if (bp != NULL) {
-		if (XFS_BUF_ISDELAYWRITE(bp)) {
-			dopush = ((qip->qli_item.li_flags & XFS_LI_IN_AIL) &&
-				  !completion_done(&dqp->q_flush));
-			qip->qli_pushbuf_flag = 0;
-			xfs_dqunlock(dqp);
-
-			if (XFS_BUF_ISPINNED(bp))
-				xfs_log_force(mp, 0);
-
-			if (dopush) {
-				int	error;
-#ifdef XFSRACEDEBUG
-				delay_for_intr();
-				delay(300);
-#endif
-				error = xfs_bawrite(mp, bp);
-				if (error)
-					xfs_fs_cmn_err(CE_WARN, mp,
-	"xfs_qm_dquot_logitem_pushbuf: pushbuf error %d on qip %p, bp %p",
-							error, qip, bp);
-			} else {
-				xfs_buf_relse(bp);
-			}
-		} else {
-			qip->qli_pushbuf_flag = 0;
-			xfs_dqunlock(dqp);
-			xfs_buf_relse(bp);
-		}
-		return;
-	}
-
-	qip->qli_pushbuf_flag = 0;
 	xfs_dqunlock(dqp);
+	if (!bp)
+		return;
+	if (XFS_BUF_ISDELAYWRITE(bp))
+		xfs_buf_delwri_promote(bp);
+	xfs_buf_relse(bp);
+	return;
+
 }
 
 /*
@@ -289,50 +254,24 @@
 	xfs_dq_logitem_t	*qip)
 {
 	xfs_dquot_t		*dqp;
-	uint			retval;
 
 	dqp = qip->qli_dquot;
 	if (atomic_read(&dqp->q_pincount) > 0)
-		return (XFS_ITEM_PINNED);
+		return XFS_ITEM_PINNED;
 
 	if (! xfs_qm_dqlock_nowait(dqp))
-		return (XFS_ITEM_LOCKED);
+		return XFS_ITEM_LOCKED;
 
-	retval = XFS_ITEM_SUCCESS;
 	if (!xfs_dqflock_nowait(dqp)) {
 		/*
-		 * The dquot is already being flushed.	It may have been
-		 * flushed delayed write, however, and we don't want to
-		 * get stuck waiting for that to complete.  So, we want to check
-		 * to see if we can lock the dquot's buffer without sleeping.
-		 * If we can and it is marked for delayed write, then we
-		 * hold it and send it out from the push routine.  We don't
-		 * want to do that now since we might sleep in the device
-		 * strategy routine.  We also don't want to grab the buffer lock
-		 * here because we'd like not to call into the buffer cache
-		 * while holding the AIL lock.
-		 * Make sure to only return PUSHBUF if we set pushbuf_flag
-		 * ourselves.  If someone else is doing it then we don't
-		 * want to go to the push routine and duplicate their efforts.
+		 * dquot has already been flushed to the backing buffer,
+		 * leave it locked, pushbuf routine will unlock it.
 		 */
-		if (qip->qli_pushbuf_flag == 0) {
-			qip->qli_pushbuf_flag = 1;
-			ASSERT(qip->qli_format.qlf_blkno == dqp->q_blkno);
-#ifdef DEBUG
-			qip->qli_push_owner = current_pid();
-#endif
-			/*
-			 * The dquot is left locked.
-			 */
-			retval = XFS_ITEM_PUSHBUF;
-		} else {
-			retval = XFS_ITEM_FLUSHING;
-			xfs_dqunlock_nonotify(dqp);
-		}
+		return XFS_ITEM_PUSHBUF;
 	}
 
 	ASSERT(qip->qli_item.li_flags & XFS_LI_IN_AIL);
-	return (retval);
+	return XFS_ITEM_SUCCESS;
 }