direct-io: Implement generic deferred AIO completions

Add support to the core direct-io code to defer AIO completions to user
context using a workqueue.  This replaces opencoded and less efficient
code in XFS and ext4 (we save a memory allocation for each direct IO)
and will be needed to properly support O_(D)SYNC for AIO.

The communication between the filesystem and the direct I/O code requires
a new buffer head flag, which is a bit ugly but not avoidable until the
direct I/O code stops abusing the buffer_head structure for communicating
with the filesystems.

Currently this creates a per-superblock unbound workqueue for these
completions, which is taken from an earlier patch by Jan Kara.  I'm
not really convinced about this use and would prefer a "normal" global
workqueue with a high concurrency limit, but this needs further discussion.

JK: Fixed ext4 part, dynamic allocation of the workqueue.

Signed-off-by: Christoph Hellwig <hch@lst.de>
Signed-off-by: Jan Kara <jack@suse.cz>
Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
index 0ab26fb..b247fbb 100644
--- a/fs/ext4/ext4.h
+++ b/fs/ext4/ext4.h
@@ -180,7 +180,6 @@
  * Flags for ext4_io_end->flags
  */
 #define	EXT4_IO_END_UNWRITTEN	0x0001
-#define EXT4_IO_END_DIRECT	0x0002
 
 /*
  * For converting uninitialized extents on a work queue. 'handle' is used for
@@ -196,8 +195,6 @@
 	unsigned int		flag;		/* unwritten or not */
 	loff_t			offset;		/* offset in the file */
 	ssize_t			size;		/* size of the extent */
-	struct kiocb		*iocb;		/* iocb struct for AIO */
-	int			result;		/* error value for AIO */
 	atomic_t		count;		/* reference counter */
 } ext4_io_end_t;
 
@@ -900,11 +897,9 @@
 	 * Completed IOs that need unwritten extents handling and don't have
 	 * transaction reserved
 	 */
-	struct list_head i_unrsv_conversion_list;
 	atomic_t i_ioend_count;	/* Number of outstanding io_end structs */
 	atomic_t i_unwritten; /* Nr. of inflight conversions pending */
 	struct work_struct i_rsv_conversion_work;
-	struct work_struct i_unrsv_conversion_work;
 
 	spinlock_t i_block_reservation_lock;
 
@@ -1276,8 +1271,6 @@
 	struct flex_groups *s_flex_groups;
 	ext4_group_t s_flex_groups_allocated;
 
-	/* workqueue for unreserved extent convertions (dio) */
-	struct workqueue_struct *unrsv_conversion_wq;
 	/* workqueue for reserved extent conversions (buffered io) */
 	struct workqueue_struct *rsv_conversion_wq;
 
@@ -1340,9 +1333,6 @@
 					      struct ext4_io_end *io_end)
 {
 	if (!(io_end->flag & EXT4_IO_END_UNWRITTEN)) {
-		/* Writeback has to have coversion transaction reserved */
-		WARN_ON(EXT4_SB(inode->i_sb)->s_journal && !io_end->handle &&
-			!(io_end->flag & EXT4_IO_END_DIRECT));
 		io_end->flag |= EXT4_IO_END_UNWRITTEN;
 		atomic_inc(&EXT4_I(inode)->i_unwritten);
 	}
@@ -2716,7 +2706,6 @@
 extern void ext4_io_submit_init(struct ext4_io_submit *io,
 				struct writeback_control *wbc);
 extern void ext4_end_io_rsv_work(struct work_struct *work);
-extern void ext4_end_io_unrsv_work(struct work_struct *work);
 extern void ext4_io_submit(struct ext4_io_submit *io);
 extern int ext4_bio_write_page(struct ext4_io_submit *io,
 			       struct page *page,
diff --git a/fs/ext4/inode.c b/fs/ext4/inode.c
index c2ca04e..123bd81 100644
--- a/fs/ext4/inode.c
+++ b/fs/ext4/inode.c
@@ -727,8 +727,12 @@
 
 	ret = ext4_map_blocks(handle, inode, &map, flags);
 	if (ret > 0) {
+		ext4_io_end_t *io_end = ext4_inode_aio(inode);
+
 		map_bh(bh, inode->i_sb, map.m_pblk);
 		bh->b_state = (bh->b_state & ~EXT4_MAP_FLAGS) | map.m_flags;
+		if (io_end && io_end->flag & EXT4_IO_END_UNWRITTEN)
+			set_buffer_defer_completion(bh);
 		bh->b_size = inode->i_sb->s_blocksize * map.m_len;
 		ret = 0;
 	}
@@ -2991,19 +2995,13 @@
 }
 
 static void ext4_end_io_dio(struct kiocb *iocb, loff_t offset,
-			    ssize_t size, void *private, int ret,
-			    bool is_async)
+			    ssize_t size, void *private)
 {
-	struct inode *inode = file_inode(iocb->ki_filp);
         ext4_io_end_t *io_end = iocb->private;
 
 	/* if not async direct IO just return */
-	if (!io_end) {
-		inode_dio_done(inode);
-		if (is_async)
-			aio_complete(iocb, ret, 0);
+	if (!io_end)
 		return;
-	}
 
 	ext_debug("ext4_end_io_dio(): io_end 0x%p "
 		  "for inode %lu, iocb 0x%p, offset %llu, size %zd\n",
@@ -3013,11 +3011,7 @@
 	iocb->private = NULL;
 	io_end->offset = offset;
 	io_end->size = size;
-	if (is_async) {
-		io_end->iocb = iocb;
-		io_end->result = ret;
-	}
-	ext4_put_io_end_defer(io_end);
+	ext4_put_io_end(io_end);
 }
 
 /*
@@ -3102,7 +3096,6 @@
 			ret = -ENOMEM;
 			goto retake_lock;
 		}
-		io_end->flag |= EXT4_IO_END_DIRECT;
 		/*
 		 * Grab reference for DIO. Will be dropped in ext4_end_io_dio()
 		 */
@@ -3147,13 +3140,6 @@
 		if (ret <= 0 && ret != -EIOCBQUEUED && iocb->private) {
 			WARN_ON(iocb->private != io_end);
 			WARN_ON(io_end->flag & EXT4_IO_END_UNWRITTEN);
-			WARN_ON(io_end->iocb);
-			/*
-			 * Generic code already did inode_dio_done() so we
-			 * have to clear EXT4_IO_END_DIRECT to not do it for
-			 * the second time.
-			 */
-			io_end->flag = 0;
 			ext4_put_io_end(io_end);
 			iocb->private = NULL;
 		}
diff --git a/fs/ext4/page-io.c b/fs/ext4/page-io.c
index 6625d21..d7d0c7b 100644
--- a/fs/ext4/page-io.c
+++ b/fs/ext4/page-io.c
@@ -123,10 +123,6 @@
 		ext4_finish_bio(bio);
 		bio_put(bio);
 	}
-	if (io_end->flag & EXT4_IO_END_DIRECT)
-		inode_dio_done(io_end->inode);
-	if (io_end->iocb)
-		aio_complete(io_end->iocb, io_end->result, 0);
 	kmem_cache_free(io_end_cachep, io_end);
 }
 
@@ -204,19 +200,14 @@
 	struct workqueue_struct *wq;
 	unsigned long flags;
 
-	BUG_ON(!(io_end->flag & EXT4_IO_END_UNWRITTEN));
+	/* Only reserved conversions from writeback should enter here */
+	WARN_ON(!(io_end->flag & EXT4_IO_END_UNWRITTEN));
+	WARN_ON(!io_end->handle);
 	spin_lock_irqsave(&ei->i_completed_io_lock, flags);
-	if (io_end->handle) {
-		wq = EXT4_SB(io_end->inode->i_sb)->rsv_conversion_wq;
-		if (list_empty(&ei->i_rsv_conversion_list))
-			queue_work(wq, &ei->i_rsv_conversion_work);
-		list_add_tail(&io_end->list, &ei->i_rsv_conversion_list);
-	} else {
-		wq = EXT4_SB(io_end->inode->i_sb)->unrsv_conversion_wq;
-		if (list_empty(&ei->i_unrsv_conversion_list))
-			queue_work(wq, &ei->i_unrsv_conversion_work);
-		list_add_tail(&io_end->list, &ei->i_unrsv_conversion_list);
-	}
+	wq = EXT4_SB(io_end->inode->i_sb)->rsv_conversion_wq;
+	if (list_empty(&ei->i_rsv_conversion_list))
+		queue_work(wq, &ei->i_rsv_conversion_work);
+	list_add_tail(&io_end->list, &ei->i_rsv_conversion_list);
 	spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
 }
 
@@ -256,13 +247,6 @@
 	ext4_do_flush_completed_IO(&ei->vfs_inode, &ei->i_rsv_conversion_list);
 }
 
-void ext4_end_io_unrsv_work(struct work_struct *work)
-{
-	struct ext4_inode_info *ei = container_of(work, struct ext4_inode_info,
-						  i_unrsv_conversion_work);
-	ext4_do_flush_completed_IO(&ei->vfs_inode, &ei->i_unrsv_conversion_list);
-}
-
 ext4_io_end_t *ext4_init_io_end(struct inode *inode, gfp_t flags)
 {
 	ext4_io_end_t *io = kmem_cache_zalloc(io_end_cachep, flags);
diff --git a/fs/ext4/super.c b/fs/ext4/super.c
index b59373b..5db4f0d 100644
--- a/fs/ext4/super.c
+++ b/fs/ext4/super.c
@@ -762,9 +762,7 @@
 	ext4_unregister_li_request(sb);
 	dquot_disable(sb, -1, DQUOT_USAGE_ENABLED | DQUOT_LIMITS_ENABLED);
 
-	flush_workqueue(sbi->unrsv_conversion_wq);
 	flush_workqueue(sbi->rsv_conversion_wq);
-	destroy_workqueue(sbi->unrsv_conversion_wq);
 	destroy_workqueue(sbi->rsv_conversion_wq);
 
 	if (sbi->s_journal) {
@@ -875,14 +873,12 @@
 #endif
 	ei->jinode = NULL;
 	INIT_LIST_HEAD(&ei->i_rsv_conversion_list);
-	INIT_LIST_HEAD(&ei->i_unrsv_conversion_list);
 	spin_lock_init(&ei->i_completed_io_lock);
 	ei->i_sync_tid = 0;
 	ei->i_datasync_tid = 0;
 	atomic_set(&ei->i_ioend_count, 0);
 	atomic_set(&ei->i_unwritten, 0);
 	INIT_WORK(&ei->i_rsv_conversion_work, ext4_end_io_rsv_work);
-	INIT_WORK(&ei->i_unrsv_conversion_work, ext4_end_io_unrsv_work);
 
 	return &ei->vfs_inode;
 }
@@ -3954,14 +3950,6 @@
 		goto failed_mount4;
 	}
 
-	EXT4_SB(sb)->unrsv_conversion_wq =
-		alloc_workqueue("ext4-unrsv-conversion", WQ_MEM_RECLAIM | WQ_UNBOUND, 1);
-	if (!EXT4_SB(sb)->unrsv_conversion_wq) {
-		printk(KERN_ERR "EXT4-fs: failed to create workqueue\n");
-		ret = -ENOMEM;
-		goto failed_mount4;
-	}
-
 	/*
 	 * The jbd2_journal_load will have done any necessary log recovery,
 	 * so we can safely mount the rest of the filesystem now.
@@ -4115,8 +4103,6 @@
 	ext4_msg(sb, KERN_ERR, "mount failed");
 	if (EXT4_SB(sb)->rsv_conversion_wq)
 		destroy_workqueue(EXT4_SB(sb)->rsv_conversion_wq);
-	if (EXT4_SB(sb)->unrsv_conversion_wq)
-		destroy_workqueue(EXT4_SB(sb)->unrsv_conversion_wq);
 failed_mount_wq:
 	if (sbi->s_journal) {
 		jbd2_journal_destroy(sbi->s_journal);
@@ -4564,7 +4550,6 @@
 
 	trace_ext4_sync_fs(sb, wait);
 	flush_workqueue(sbi->rsv_conversion_wq);
-	flush_workqueue(sbi->unrsv_conversion_wq);
 	/*
 	 * Writeback quota in non-journalled quota case - journalled quota has
 	 * no dirty dquots
@@ -4600,7 +4585,6 @@
 
 	trace_ext4_sync_fs(sb, wait);
 	flush_workqueue(EXT4_SB(sb)->rsv_conversion_wq);
-	flush_workqueue(EXT4_SB(sb)->unrsv_conversion_wq);
 	dquot_writeback_dquots(sb, -1);
 	if (wait && test_opt(sb, BARRIER))
 		ret = blkdev_issue_flush(sb->s_bdev, GFP_KERNEL, NULL);