direct-io: Implement generic deferred AIO completions

Add support to the core direct-io code to defer AIO completions to user
context using a workqueue.  This replaces opencoded and less efficient
code in XFS and ext4 (we save a memory allocation for each direct IO)
and will be needed to properly support O_(D)SYNC for AIO.

The communication between the filesystem and the direct I/O code requires
a new buffer head flag, which is a bit ugly but not avoidable until the
direct I/O code stops abusing the buffer_head structure for communicating
with the filesystems.

Currently this creates a per-superblock unbound workqueue for these
completions, which is taken from an earlier patch by Jan Kara.  I'm
not really convinced about this use and would prefer a "normal" global
workqueue with a high concurrency limit, but this needs further discussion.

JK: Fixed ext4 part, dynamic allocation of the workqueue.

Signed-off-by: Christoph Hellwig <hch@lst.de>
Signed-off-by: Jan Kara <jack@suse.cz>
Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 7ab90f5..8b31b9f 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -127,6 +127,7 @@
 	spinlock_t bio_lock;		/* protects BIO fields below */
 	int page_errors;		/* errno from get_user_pages() */
 	int is_async;			/* is IO async ? */
+	bool defer_completion;		/* defer AIO completion to workqueue? */
 	int io_error;			/* IO error in completion path */
 	unsigned long refcount;		/* direct_io_worker() and bios */
 	struct bio *bio_list;		/* singly linked via bi_private */
@@ -141,7 +142,10 @@
 	 * allocation time.  Don't add new fields after pages[] unless you
 	 * wish that they not be zeroed.
 	 */
-	struct page *pages[DIO_PAGES];	/* page buffer */
+	union {
+		struct page *pages[DIO_PAGES];	/* page buffer */
+		struct work_struct complete_work;/* deferred AIO completion */
+	};
 } ____cacheline_aligned_in_smp;
 
 static struct kmem_cache *dio_cache __read_mostly;
@@ -221,16 +225,16 @@
  * dio_complete() - called when all DIO BIO I/O has been completed
  * @offset: the byte offset in the file of the completed operation
  *
- * This releases locks as dictated by the locking type, lets interested parties
- * know that a DIO operation has completed, and calculates the resulting return
- * code for the operation.
+ * This drops i_dio_count, lets interested parties know that a DIO operation
+ * has completed, and calculates the resulting return code for the operation.
  *
  * It lets the filesystem know if it registered an interest earlier via
  * get_block.  Pass the private field of the map buffer_head so that
  * filesystems can use it to hold additional state between get_block calls and
  * dio_complete.
  */
-static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
+static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
+		bool is_async)
 {
 	ssize_t transferred = 0;
 
@@ -258,19 +262,26 @@
 	if (ret == 0)
 		ret = transferred;
 
-	if (dio->end_io && dio->result) {
-		dio->end_io(dio->iocb, offset, transferred,
-			    dio->private, ret, is_async);
-	} else {
-		inode_dio_done(dio->inode);
-		if (is_async)
-			aio_complete(dio->iocb, ret, 0);
-	}
+	if (dio->end_io && dio->result)
+		dio->end_io(dio->iocb, offset, transferred, dio->private);
 
+	inode_dio_done(dio->inode);
+	if (is_async)
+		aio_complete(dio->iocb, ret, 0);
+
+	kmem_cache_free(dio_cache, dio);
 	return ret;
 }
 
+static void dio_aio_complete_work(struct work_struct *work)
+{
+	struct dio *dio = container_of(work, struct dio, complete_work);
+
+	dio_complete(dio, dio->iocb->ki_pos, 0, true);
+}
+
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
+
 /*
  * Asynchronous IO callback. 
  */
@@ -290,8 +301,13 @@
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 
 	if (remaining == 0) {
-		dio_complete(dio, dio->iocb->ki_pos, 0, true);
-		kmem_cache_free(dio_cache, dio);
+		if (dio->result && dio->defer_completion) {
+			INIT_WORK(&dio->complete_work, dio_aio_complete_work);
+			queue_work(dio->inode->i_sb->s_dio_done_wq,
+				   &dio->complete_work);
+		} else {
+			dio_complete(dio, dio->iocb->ki_pos, 0, true);
+		}
 	}
 }
 
@@ -511,6 +527,41 @@
 }
 
 /*
+ * Create workqueue for deferred direct IO completions. We allocate the
+ * workqueue when it's first needed. This avoids creating workqueue for
+ * filesystems that don't need it and also allows us to create the workqueue
+ * late enough so the we can include s_id in the name of the workqueue.
+ */
+static int sb_init_dio_done_wq(struct super_block *sb)
+{
+	struct workqueue_struct *wq = alloc_workqueue("dio/%s",
+						      WQ_MEM_RECLAIM, 0,
+						      sb->s_id);
+	if (!wq)
+		return -ENOMEM;
+	/*
+	 * This has to be atomic as more DIOs can race to create the workqueue
+	 */
+	cmpxchg(&sb->s_dio_done_wq, NULL, wq);
+	/* Someone created workqueue before us? Free ours... */
+	if (wq != sb->s_dio_done_wq)
+		destroy_workqueue(wq);
+	return 0;
+}
+
+static int dio_set_defer_completion(struct dio *dio)
+{
+	struct super_block *sb = dio->inode->i_sb;
+
+	if (dio->defer_completion)
+		return 0;
+	dio->defer_completion = true;
+	if (!sb->s_dio_done_wq)
+		return sb_init_dio_done_wq(sb);
+	return 0;
+}
+
+/*
  * Call into the fs to map some more disk blocks.  We record the current number
  * of available blocks at sdio->blocks_available.  These are in units of the
  * fs blocksize, (1 << inode->i_blkbits).
@@ -581,6 +632,9 @@
 
 		/* Store for completion */
 		dio->private = map_bh->b_private;
+
+		if (ret == 0 && buffer_defer_completion(map_bh))
+			ret = dio_set_defer_completion(dio);
 	}
 	return ret;
 }
@@ -1269,7 +1323,6 @@
 
 	if (drop_refcount(dio) == 0) {
 		retval = dio_complete(dio, offset, retval, false);
-		kmem_cache_free(dio_cache, dio);
 	} else
 		BUG_ON(retval != -EIOCBQUEUED);