ext4: completed_io locking cleanup

Current unwritten extent conversion state-machine is very fuzzy.
- For unknown reason it performs conversion under i_mutex. What for?
  My diagnosis:
  We already protect extent tree with i_data_sem, truncate and punch_hole
  should wait for DIO, so the only data we have to protect is end_io->flags
  modification, but only flush_completed_IO and end_io_work modified this
  flags and we can serialize them via i_completed_io_lock.

  Currently all these games with mutex_trylock result in the following deadlock
   truncate:                          kworker:
    ext4_setattr                       ext4_end_io_work
    mutex_lock(i_mutex)
    inode_dio_wait(inode)  ->BLOCK
                             DEADLOCK<- mutex_trylock()
                                        inode_dio_done()
  #TEST_CASE1_BEGIN
  MNT=/mnt_scrach
  unlink $MNT/file
  fallocate -l $((1024*1024*1024)) $MNT/file
  aio-stress -I 100000 -O -s 100m -n -t 1 -c 10 -o 2 -o 3 $MNT/file
  sleep 2
  truncate -s 0 $MNT/file
  #TEST_CASE1_END

Or use 286's xfstests https://github.com/dmonakhov/xfstests/blob/devel/286

This patch makes state machine simple and clean:

(1) xxx_end_io schedule final extent conversion simply by calling
    ext4_add_complete_io(), which append it to ei->i_completed_io_list
    NOTE1: because of (2A) work should be queued only if
    ->i_completed_io_list was empty, otherwise the work is scheduled already.

(2) ext4_flush_completed_IO is responsible for handling all pending
    end_io from ei->i_completed_io_list
    Flushing sequence consists of following stages:
    A) LOCKED: Atomically drain completed_io_list to local_list
    B) Perform extents conversion
    C) LOCKED: move converted io's to to_free list for final deletion
       	     This logic depends on context which we was called from.
    D) Final end_io context destruction
    NOTE1: i_mutex is no longer required because end_io->flags modification
    is protected by ei->ext4_complete_io_lock

Full list of changes:
- Move all completion end_io related routines to page-io.c in order to improve
  logic locality
- Move open coded logic from various xx_end_xx routines to ext4_add_complete_io()
- remove EXT4_IO_END_FSYNC
- Improve SMP scalability by removing useless i_mutex which does not
  protect io->flags anymore.
- Reduce lock contention on i_completed_io_lock by optimizing list walk.
- Rename ext4_end_io_nolock to end4_end_io and make it static
- Check flush completion status to ext4_ext_punch_hole(). Because it is
  not good idea to punch blocks from corrupted inode.

Changes since V3 (in request to Jan's comments):
  Fall back to active flush_completed_IO() approach in order to prevent
  performance issues with nolocked DIO reads.
Changes since V2:
  Fix use-after-free caused by race truncate vs end_io_work

Signed-off-by: Dmitry Monakhov <dmonakhov@openvz.org>
Signed-off-by: "Theodore Ts'o" <tytso@mit.edu>
diff --git a/fs/ext4/page-io.c b/fs/ext4/page-io.c
index 9970022..5b24c40 100644
--- a/fs/ext4/page-io.c
+++ b/fs/ext4/page-io.c
@@ -71,6 +71,7 @@
 	int i;
 
 	BUG_ON(!io);
+	BUG_ON(!list_empty(&io->list));
 	BUG_ON(io->flag & EXT4_IO_END_UNWRITTEN);
 
 	if (io->page)
@@ -83,21 +84,14 @@
 	kmem_cache_free(io_end_cachep, io);
 }
 
-/*
- * check a range of space and convert unwritten extents to written.
- *
- * Called with inode->i_mutex; we depend on this when we manipulate
- * io->flag, since we could otherwise race with ext4_flush_completed_IO()
- */
-int ext4_end_io_nolock(ext4_io_end_t *io)
+/* check a range of space and convert unwritten extents to written. */
+static int ext4_end_io(ext4_io_end_t *io)
 {
 	struct inode *inode = io->inode;
 	loff_t offset = io->offset;
 	ssize_t size = io->size;
 	int ret = 0;
 
-	BUG_ON(!(io->flag & EXT4_IO_END_UNWRITTEN));
-
 	ext4_debug("ext4_end_io_nolock: io 0x%p from inode %lu,list->next 0x%p,"
 		   "list->prev 0x%p\n",
 		   io, inode->i_ino, io->list.next, io->list.prev);
@@ -110,7 +104,6 @@
 			 "(inode %lu, offset %llu, size %zd, error %d)",
 			 inode->i_ino, offset, size, ret);
 	}
-	io->flag &= ~EXT4_IO_END_UNWRITTEN;
 	if (io->iocb)
 		aio_complete(io->iocb, io->result, 0);
 
@@ -122,51 +115,122 @@
 	return ret;
 }
 
+static void dump_completed_IO(struct inode *inode)
+{
+#ifdef	EXT4FS_DEBUG
+	struct list_head *cur, *before, *after;
+	ext4_io_end_t *io, *io0, *io1;
+	unsigned long flags;
+
+	if (list_empty(&EXT4_I(inode)->i_completed_io_list)) {
+		ext4_debug("inode %lu completed_io list is empty\n",
+			   inode->i_ino);
+		return;
+	}
+
+	ext4_debug("Dump inode %lu completed_io list\n", inode->i_ino);
+	list_for_each_entry(io, &EXT4_I(inode)->i_completed_io_list, list) {
+		cur = &io->list;
+		before = cur->prev;
+		io0 = container_of(before, ext4_io_end_t, list);
+		after = cur->next;
+		io1 = container_of(after, ext4_io_end_t, list);
+
+		ext4_debug("io 0x%p from inode %lu,prev 0x%p,next 0x%p\n",
+			    io, inode->i_ino, io0, io1);
+	}
+#endif
+}
+
+/* Add the io_end to per-inode completed end_io list. */
+void ext4_add_complete_io(ext4_io_end_t *io_end)
+{
+	struct ext4_inode_info *ei = EXT4_I(io_end->inode);
+	struct workqueue_struct *wq;
+	unsigned long flags;
+
+	BUG_ON(!(io_end->flag & EXT4_IO_END_UNWRITTEN));
+	wq = EXT4_SB(io_end->inode->i_sb)->dio_unwritten_wq;
+
+	spin_lock_irqsave(&ei->i_completed_io_lock, flags);
+	if (list_empty(&ei->i_completed_io_list)) {
+		io_end->flag |= EXT4_IO_END_QUEUED;
+		queue_work(wq, &io_end->work);
+	}
+	list_add_tail(&io_end->list, &ei->i_completed_io_list);
+	spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
+}
+
+static int ext4_do_flush_completed_IO(struct inode *inode,
+				      ext4_io_end_t *work_io)
+{
+	ext4_io_end_t *io;
+	struct list_head unwritten, complete, to_free;
+	unsigned long flags;
+	struct ext4_inode_info *ei = EXT4_I(inode);
+	int err, ret = 0;
+
+	INIT_LIST_HEAD(&complete);
+	INIT_LIST_HEAD(&to_free);
+
+	spin_lock_irqsave(&ei->i_completed_io_lock, flags);
+	dump_completed_IO(inode);
+	list_replace_init(&ei->i_completed_io_list, &unwritten);
+	spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
+
+	while (!list_empty(&unwritten)) {
+		io = list_entry(unwritten.next, ext4_io_end_t, list);
+		BUG_ON(!(io->flag & EXT4_IO_END_UNWRITTEN));
+		list_del_init(&io->list);
+
+		err = ext4_end_io(io);
+		if (unlikely(!ret && err))
+			ret = err;
+
+		list_add_tail(&io->list, &complete);
+	}
+	/* It is important to update all flags for all end_io in one shot w/o
+	 * dropping the lock.*/
+	spin_lock_irqsave(&ei->i_completed_io_lock, flags);
+	while (!list_empty(&complete)) {
+		io = list_entry(complete.next, ext4_io_end_t, list);
+		io->flag &= ~EXT4_IO_END_UNWRITTEN;
+		/* end_io context can not be destroyed now because it still
+		 * used by queued worker. Worker thread will destroy it later */
+		if (io->flag & EXT4_IO_END_QUEUED)
+			list_del_init(&io->list);
+		else
+			list_move(&io->list, &to_free);
+	}
+	/* If we are called from worker context, it is time to clear queued
+	 * flag, and destroy it's end_io if it was converted already */
+	if (work_io) {
+		work_io->flag &= ~EXT4_IO_END_QUEUED;
+		if (!(work_io->flag & EXT4_IO_END_UNWRITTEN))
+			list_add_tail(&work_io->list, &to_free);
+	}
+	spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
+
+	while (!list_empty(&to_free)) {
+		io = list_entry(to_free.next, ext4_io_end_t, list);
+		list_del_init(&io->list);
+		ext4_free_io_end(io);
+	}
+	return ret;
+}
+
 /*
  * work on completed aio dio IO, to convert unwritten extents to extents
  */
 static void ext4_end_io_work(struct work_struct *work)
 {
-	ext4_io_end_t		*io = container_of(work, ext4_io_end_t, work);
-	struct inode		*inode = io->inode;
-	struct ext4_inode_info	*ei = EXT4_I(inode);
-	unsigned long		flags;
+	ext4_io_end_t *io = container_of(work, ext4_io_end_t, work);
+	ext4_do_flush_completed_IO(io->inode, io);
+}
 
-	spin_lock_irqsave(&ei->i_completed_io_lock, flags);
-	if (io->flag & EXT4_IO_END_IN_FSYNC)
-		goto requeue;
-	if (list_empty(&io->list)) {
-		spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
-		goto free;
-	}
-
-	if (!mutex_trylock(&inode->i_mutex)) {
-		bool was_queued;
-requeue:
-		was_queued = !!(io->flag & EXT4_IO_END_QUEUED);
-		io->flag |= EXT4_IO_END_QUEUED;
-		spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
-		/*
-		 * Requeue the work instead of waiting so that the work
-		 * items queued after this can be processed.
-		 */
-		queue_work(EXT4_SB(inode->i_sb)->dio_unwritten_wq, &io->work);
-		/*
-		 * To prevent the ext4-dio-unwritten thread from keeping
-		 * requeueing end_io requests and occupying cpu for too long,
-		 * yield the cpu if it sees an end_io request that has already
-		 * been requeued.
-		 */
-		if (was_queued)
-			yield();
-		return;
-	}
-	list_del_init(&io->list);
-	spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
-	(void) ext4_end_io_nolock(io);
-	mutex_unlock(&inode->i_mutex);
-free:
-	ext4_free_io_end(io);
+int ext4_flush_completed_IO(struct inode *inode)
+{
+	return ext4_do_flush_completed_IO(inode, NULL);
 }
 
 ext4_io_end_t *ext4_init_io_end(struct inode *inode, gfp_t flags)
@@ -199,9 +263,7 @@
 static void ext4_end_bio(struct bio *bio, int error)
 {
 	ext4_io_end_t *io_end = bio->bi_private;
-	struct workqueue_struct *wq;
 	struct inode *inode;
-	unsigned long flags;
 	int i;
 	sector_t bi_sector = bio->bi_sector;
 
@@ -259,14 +321,7 @@
 		return;
 	}
 
-	/* Add the io_end to per-inode completed io list*/
-	spin_lock_irqsave(&EXT4_I(inode)->i_completed_io_lock, flags);
-	list_add_tail(&io_end->list, &EXT4_I(inode)->i_completed_io_list);
-	spin_unlock_irqrestore(&EXT4_I(inode)->i_completed_io_lock, flags);
-
-	wq = EXT4_SB(inode->i_sb)->dio_unwritten_wq;
-	/* queue the work to convert unwritten extents to written */
-	queue_work(wq, &io_end->work);
+	ext4_add_complete_io(io_end);
 }
 
 void ext4_io_submit(struct ext4_io_submit *io)