Merge branch 'xfs-4.8-buf-fixes' into for-next
diff --git a/fs/xfs/xfs_buf.c b/fs/xfs/xfs_buf.c
index 4665ff6..32fc540 100644
--- a/fs/xfs/xfs_buf.c
+++ b/fs/xfs/xfs_buf.c
@@ -80,6 +80,47 @@
 }
 
 /*
+ * Bump the I/O in flight count on the buftarg if we haven't yet done so for
+ * this buffer. The count is incremented once per buffer (per hold cycle)
+ * because the corresponding decrement is deferred to buffer release. Buffers
+ * can undergo I/O multiple times in a hold-release cycle and per buffer I/O
+ * tracking adds unnecessary overhead. This is used for sychronization purposes
+ * with unmount (see xfs_wait_buftarg()), so all we really need is a count of
+ * in-flight buffers.
+ *
+ * Buffers that are never released (e.g., superblock, iclog buffers) must set
+ * the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count
+ * never reaches zero and unmount hangs indefinitely.
+ */
+static inline void
+xfs_buf_ioacct_inc(
+	struct xfs_buf	*bp)
+{
+	if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT))
+		return;
+
+	ASSERT(bp->b_flags & XBF_ASYNC);
+	bp->b_flags |= _XBF_IN_FLIGHT;
+	percpu_counter_inc(&bp->b_target->bt_io_count);
+}
+
+/*
+ * Clear the in-flight state on a buffer about to be released to the LRU or
+ * freed and unaccount from the buftarg.
+ */
+static inline void
+xfs_buf_ioacct_dec(
+	struct xfs_buf	*bp)
+{
+	if (!(bp->b_flags & _XBF_IN_FLIGHT))
+		return;
+
+	ASSERT(bp->b_flags & XBF_ASYNC);
+	bp->b_flags &= ~_XBF_IN_FLIGHT;
+	percpu_counter_dec(&bp->b_target->bt_io_count);
+}
+
+/*
  * When we mark a buffer stale, we remove the buffer from the LRU and clear the
  * b_lru_ref count so that the buffer is freed immediately when the buffer
  * reference count falls to zero. If the buffer is already on the LRU, we need
@@ -102,6 +143,14 @@
 	 */
 	bp->b_flags &= ~_XBF_DELWRI_Q;
 
+	/*
+	 * Once the buffer is marked stale and unlocked, a subsequent lookup
+	 * could reset b_flags. There is no guarantee that the buffer is
+	 * unaccounted (released to LRU) before that occurs. Drop in-flight
+	 * status now to preserve accounting consistency.
+	 */
+	xfs_buf_ioacct_dec(bp);
+
 	spin_lock(&bp->b_lock);
 	atomic_set(&bp->b_lru_ref, 0);
 	if (!(bp->b_state & XFS_BSTATE_DISPOSE) &&
@@ -815,7 +864,8 @@
 	struct xfs_buf		*bp;
 	DEFINE_SINGLE_BUF_MAP(map, XFS_BUF_DADDR_NULL, numblks);
 
-	bp = _xfs_buf_alloc(target, &map, 1, 0);
+	/* flags might contain irrelevant bits, pass only what we care about */
+	bp = _xfs_buf_alloc(target, &map, 1, flags & XBF_NO_IOACCT);
 	if (unlikely(bp == NULL))
 		goto fail;
 
@@ -866,63 +916,85 @@
 }
 
 /*
- *	Releases a hold on the specified buffer.  If the
- *	the hold count is 1, calls xfs_buf_free.
+ * Release a hold on the specified buffer. If the hold count is 1, the buffer is
+ * placed on LRU or freed (depending on b_lru_ref).
  */
 void
 xfs_buf_rele(
 	xfs_buf_t		*bp)
 {
 	struct xfs_perag	*pag = bp->b_pag;
+	bool			release;
+	bool			freebuf = false;
 
 	trace_xfs_buf_rele(bp, _RET_IP_);
 
 	if (!pag) {
 		ASSERT(list_empty(&bp->b_lru));
 		ASSERT(RB_EMPTY_NODE(&bp->b_rbnode));
-		if (atomic_dec_and_test(&bp->b_hold))
+		if (atomic_dec_and_test(&bp->b_hold)) {
+			xfs_buf_ioacct_dec(bp);
 			xfs_buf_free(bp);
+		}
 		return;
 	}
 
 	ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode));
 
 	ASSERT(atomic_read(&bp->b_hold) > 0);
-	if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) {
-		spin_lock(&bp->b_lock);
-		if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
-			/*
-			 * If the buffer is added to the LRU take a new
-			 * reference to the buffer for the LRU and clear the
-			 * (now stale) dispose list state flag
-			 */
-			if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
-				bp->b_state &= ~XFS_BSTATE_DISPOSE;
-				atomic_inc(&bp->b_hold);
-			}
-			spin_unlock(&bp->b_lock);
-			spin_unlock(&pag->pag_buf_lock);
-		} else {
-			/*
-			 * most of the time buffers will already be removed from
-			 * the LRU, so optimise that case by checking for the
-			 * XFS_BSTATE_DISPOSE flag indicating the last list the
-			 * buffer was on was the disposal list
-			 */
-			if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
-				list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
-			} else {
-				ASSERT(list_empty(&bp->b_lru));
-			}
-			spin_unlock(&bp->b_lock);
 
-			ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
-			rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
-			spin_unlock(&pag->pag_buf_lock);
-			xfs_perag_put(pag);
-			xfs_buf_free(bp);
-		}
+	release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock);
+	spin_lock(&bp->b_lock);
+	if (!release) {
+		/*
+		 * Drop the in-flight state if the buffer is already on the LRU
+		 * and it holds the only reference. This is racy because we
+		 * haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT
+		 * ensures the decrement occurs only once per-buf.
+		 */
+		if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru))
+			xfs_buf_ioacct_dec(bp);
+		goto out_unlock;
 	}
+
+	/* the last reference has been dropped ... */
+	xfs_buf_ioacct_dec(bp);
+	if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
+		/*
+		 * If the buffer is added to the LRU take a new reference to the
+		 * buffer for the LRU and clear the (now stale) dispose list
+		 * state flag
+		 */
+		if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
+			bp->b_state &= ~XFS_BSTATE_DISPOSE;
+			atomic_inc(&bp->b_hold);
+		}
+		spin_unlock(&pag->pag_buf_lock);
+	} else {
+		/*
+		 * most of the time buffers will already be removed from the
+		 * LRU, so optimise that case by checking for the
+		 * XFS_BSTATE_DISPOSE flag indicating the last list the buffer
+		 * was on was the disposal list
+		 */
+		if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
+			list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
+		} else {
+			ASSERT(list_empty(&bp->b_lru));
+		}
+
+		ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
+		rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
+		spin_unlock(&pag->pag_buf_lock);
+		xfs_perag_put(pag);
+		freebuf = true;
+	}
+
+out_unlock:
+	spin_unlock(&bp->b_lock);
+
+	if (freebuf)
+		xfs_buf_free(bp);
 }
 
 
@@ -1341,6 +1413,7 @@
 	 * xfs_buf_ioend too early.
 	 */
 	atomic_set(&bp->b_io_remaining, 1);
+	xfs_buf_ioacct_inc(bp);
 	_xfs_buf_ioapply(bp);
 
 	/*
@@ -1526,13 +1599,19 @@
 	int loop = 0;
 
 	/*
-	 * We need to flush the buffer workqueue to ensure that all IO
-	 * completion processing is 100% done. Just waiting on buffer locks is
-	 * not sufficient for async IO as the reference count held over IO is
-	 * not released until after the buffer lock is dropped. Hence we need to
-	 * ensure here that all reference counts have been dropped before we
-	 * start walking the LRU list.
+	 * First wait on the buftarg I/O count for all in-flight buffers to be
+	 * released. This is critical as new buffers do not make the LRU until
+	 * they are released.
+	 *
+	 * Next, flush the buffer workqueue to ensure all completion processing
+	 * has finished. Just waiting on buffer locks is not sufficient for
+	 * async IO as the reference count held over IO is not released until
+	 * after the buffer lock is dropped. Hence we need to ensure here that
+	 * all reference counts have been dropped before we start walking the
+	 * LRU list.
 	 */
+	while (percpu_counter_sum(&btp->bt_io_count))
+		delay(100);
 	drain_workqueue(btp->bt_mount->m_buf_workqueue);
 
 	/* loop until there is nothing left on the lru list. */
@@ -1629,6 +1708,8 @@
 	struct xfs_buftarg	*btp)
 {
 	unregister_shrinker(&btp->bt_shrinker);
+	ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
+	percpu_counter_destroy(&btp->bt_io_count);
 	list_lru_destroy(&btp->bt_lru);
 
 	if (mp->m_flags & XFS_MOUNT_BARRIER)
@@ -1693,6 +1774,9 @@
 	if (list_lru_init(&btp->bt_lru))
 		goto error;
 
+	if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
+		goto error;
+
 	btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count;
 	btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan;
 	btp->bt_shrinker.seeks = DEFAULT_SEEKS;
@@ -1834,7 +1918,7 @@
 		 * side. We need to move the buffer onto the io_list
 		 * at this point so the caller can still access it.
 		 */
-		bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_ASYNC | XBF_WRITE_FAIL);
+		bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_WRITE_FAIL);
 		bp->b_flags |= XBF_WRITE | XBF_ASYNC;
 		if (wait_list) {
 			xfs_buf_hold(bp);