aio: implement request batching

Hi,

Some workloads issue batches of small I/O, and the performance is poor
due to the call to blk_run_address_space for every single iocb.  Nathan
Roberts pointed this out, and suggested that by deferring this call
until all I/Os in the iocb array are submitted to the block layer, we
can realize some impressive performance gains (up to 30% for sequential
4k reads in batches of 16).

Signed-off-by: Jeff Moyer <jmoyer@redhat.com>
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/fs/aio.c b/fs/aio.c
index 02a2c93..cf0bef4 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -32,6 +32,9 @@
 #include <linux/workqueue.h>
 #include <linux/security.h>
 #include <linux/eventfd.h>
+#include <linux/blkdev.h>
+#include <linux/mempool.h>
+#include <linux/hash.h>
 
 #include <asm/kmap_types.h>
 #include <asm/uaccess.h>
@@ -60,6 +63,14 @@
 static DEFINE_SPINLOCK(fput_lock);
 static LIST_HEAD(fput_head);
 
+#define AIO_BATCH_HASH_BITS	3 /* allocated on-stack, so don't go crazy */
+#define AIO_BATCH_HASH_SIZE	(1 << AIO_BATCH_HASH_BITS)
+struct aio_batch_entry {
+	struct hlist_node list;
+	struct address_space *mapping;
+};
+mempool_t *abe_pool;
+
 static void aio_kick_handler(struct work_struct *);
 static void aio_queue_work(struct kioctx *);
 
@@ -73,6 +84,8 @@
 	kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 
 	aio_wq = create_workqueue("aio");
+	abe_pool = mempool_create_kmalloc_pool(1, sizeof(struct aio_batch_entry));
+	BUG_ON(!abe_pool);
 
 	pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
 
@@ -1531,8 +1544,44 @@
 	return 1;
 }
 
+static void aio_batch_add(struct address_space *mapping,
+			  struct hlist_head *batch_hash)
+{
+	struct aio_batch_entry *abe;
+	struct hlist_node *pos;
+	unsigned bucket;
+
+	bucket = hash_ptr(mapping, AIO_BATCH_HASH_BITS);
+	hlist_for_each_entry(abe, pos, &batch_hash[bucket], list) {
+		if (abe->mapping == mapping)
+			return;
+	}
+
+	abe = mempool_alloc(abe_pool, GFP_KERNEL);
+	BUG_ON(!igrab(mapping->host));
+	abe->mapping = mapping;
+	hlist_add_head(&abe->list, &batch_hash[bucket]);
+	return;
+}
+
+static void aio_batch_free(struct hlist_head *batch_hash)
+{
+	struct aio_batch_entry *abe;
+	struct hlist_node *pos, *n;
+	int i;
+
+	for (i = 0; i < AIO_BATCH_HASH_SIZE; i++) {
+		hlist_for_each_entry_safe(abe, pos, n, &batch_hash[i], list) {
+			blk_run_address_space(abe->mapping);
+			iput(abe->mapping->host);
+			hlist_del(&abe->list);
+			mempool_free(abe, abe_pool);
+		}
+	}
+}
+
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
-			 struct iocb *iocb)
+			 struct iocb *iocb, struct hlist_head *batch_hash)
 {
 	struct kiocb *req;
 	struct file *file;
@@ -1608,6 +1657,12 @@
 			;
 	}
 	spin_unlock_irq(&ctx->ctx_lock);
+	if (req->ki_opcode == IOCB_CMD_PREAD ||
+	    req->ki_opcode == IOCB_CMD_PREADV ||
+	    req->ki_opcode == IOCB_CMD_PWRITE ||
+	    req->ki_opcode == IOCB_CMD_PWRITEV)
+		aio_batch_add(file->f_mapping, batch_hash);
+
 	aio_put_req(req);	/* drop extra ref to req */
 	return 0;
 
@@ -1635,6 +1690,7 @@
 	struct kioctx *ctx;
 	long ret = 0;
 	int i;
+	struct hlist_head batch_hash[AIO_BATCH_HASH_SIZE] = { { 0, }, };
 
 	if (unlikely(nr < 0))
 		return -EINVAL;
@@ -1666,10 +1722,11 @@
 			break;
 		}
 
-		ret = io_submit_one(ctx, user_iocb, &tmp);
+		ret = io_submit_one(ctx, user_iocb, &tmp, batch_hash);
 		if (ret)
 			break;
 	}
+	aio_batch_free(batch_hash);
 
 	put_ioctx(ctx);
 	return i ? i : ret;