aio: allocate kiocbs in batches

In testing aio on a fast storage device, I found that the context lock
takes up a fair amount of cpu time in the I/O submission path.  The reason
is that we take it for every I/O submitted (see __aio_get_req).  Since we
know how many I/Os are passed to io_submit, we can preallocate the kiocbs
in batches, reducing the number of times we take and release the lock.

In my testing, I was able to reduce the amount of time spent in
_raw_spin_lock_irq by .56% (average of 3 runs).  The command I used to
test this was:

   aio-stress -O -o 2 -o 3 -r 8 -d 128 -b 32 -i 32 -s 16384 <dev>

I also tested the patch with various numbers of events passed to
io_submit, and I ran the xfstests aio group of tests to ensure I didn't
break anything.

Signed-off-by: Jeff Moyer <jmoyer@redhat.com>
Cc: Daniel Ehrenberg <dehrenberg@google.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
diff --git a/fs/aio.c b/fs/aio.c
index 632b235..78c514c 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -440,8 +440,6 @@
 static struct kiocb *__aio_get_req(struct kioctx *ctx)
 {
 	struct kiocb *req = NULL;
-	struct aio_ring *ring;
-	int okay = 0;
 
 	req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
 	if (unlikely(!req))
@@ -459,39 +457,114 @@
 	INIT_LIST_HEAD(&req->ki_run_list);
 	req->ki_eventfd = NULL;
 
-	/* Check if the completion queue has enough free space to
-	 * accept an event from this io.
-	 */
-	spin_lock_irq(&ctx->ctx_lock);
-	ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
-	if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
-		list_add(&req->ki_list, &ctx->active_reqs);
-		ctx->reqs_active++;
-		okay = 1;
-	}
-	kunmap_atomic(ring, KM_USER0);
-	spin_unlock_irq(&ctx->ctx_lock);
-
-	if (!okay) {
-		kmem_cache_free(kiocb_cachep, req);
-		req = NULL;
-	}
-
 	return req;
 }
 
-static inline struct kiocb *aio_get_req(struct kioctx *ctx)
+/*
+ * struct kiocb's are allocated in batches to reduce the number of
+ * times the ctx lock is acquired and released.
+ */
+#define KIOCB_BATCH_SIZE	32L
+struct kiocb_batch {
+	struct list_head head;
+	long count; /* number of requests left to allocate */
+};
+
+static void kiocb_batch_init(struct kiocb_batch *batch, long total)
+{
+	INIT_LIST_HEAD(&batch->head);
+	batch->count = total;
+}
+
+static void kiocb_batch_free(struct kiocb_batch *batch)
+{
+	struct kiocb *req, *n;
+
+	list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
+		list_del(&req->ki_batch);
+		kmem_cache_free(kiocb_cachep, req);
+	}
+}
+
+/*
+ * Allocate a batch of kiocbs.  This avoids taking and dropping the
+ * context lock a lot during setup.
+ */
+static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
+{
+	unsigned short allocated, to_alloc;
+	long avail;
+	bool called_fput = false;
+	struct kiocb *req, *n;
+	struct aio_ring *ring;
+
+	to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
+	for (allocated = 0; allocated < to_alloc; allocated++) {
+		req = __aio_get_req(ctx);
+		if (!req)
+			/* allocation failed, go with what we've got */
+			break;
+		list_add(&req->ki_batch, &batch->head);
+	}
+
+	if (allocated == 0)
+		goto out;
+
+retry:
+	spin_lock_irq(&ctx->ctx_lock);
+	ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
+
+	avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
+	BUG_ON(avail < 0);
+	if (avail == 0 && !called_fput) {
+		/*
+		 * Handle a potential starvation case.  It is possible that
+		 * we hold the last reference on a struct file, causing us
+		 * to delay the final fput to non-irq context.  In this case,
+		 * ctx->reqs_active is artificially high.  Calling the fput
+		 * routine here may free up a slot in the event completion
+		 * ring, allowing this allocation to succeed.
+		 */
+		kunmap_atomic(ring);
+		spin_unlock_irq(&ctx->ctx_lock);
+		aio_fput_routine(NULL);
+		called_fput = true;
+		goto retry;
+	}
+
+	if (avail < allocated) {
+		/* Trim back the number of requests. */
+		list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
+			list_del(&req->ki_batch);
+			kmem_cache_free(kiocb_cachep, req);
+			if (--allocated <= avail)
+				break;
+		}
+	}
+
+	batch->count -= allocated;
+	list_for_each_entry(req, &batch->head, ki_batch) {
+		list_add(&req->ki_list, &ctx->active_reqs);
+		ctx->reqs_active++;
+	}
+
+	kunmap_atomic(ring);
+	spin_unlock_irq(&ctx->ctx_lock);
+
+out:
+	return allocated;
+}
+
+static inline struct kiocb *aio_get_req(struct kioctx *ctx,
+					struct kiocb_batch *batch)
 {
 	struct kiocb *req;
-	/* Handle a potential starvation case -- should be exceedingly rare as 
-	 * requests will be stuck on fput_head only if the aio_fput_routine is 
-	 * delayed and the requests were the last user of the struct file.
-	 */
-	req = __aio_get_req(ctx);
-	if (unlikely(NULL == req)) {
-		aio_fput_routine(NULL);
-		req = __aio_get_req(ctx);
-	}
+
+	if (list_empty(&batch->head))
+		if (kiocb_batch_refill(ctx, batch) == 0)
+			return NULL;
+	req = list_first_entry(&batch->head, struct kiocb, ki_batch);
+	list_del(&req->ki_batch);
 	return req;
 }
 
@@ -1515,7 +1588,8 @@
 }
 
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
-			 struct iocb *iocb, bool compat)
+			 struct iocb *iocb, struct kiocb_batch *batch,
+			 bool compat)
 {
 	struct kiocb *req;
 	struct file *file;
@@ -1541,7 +1615,7 @@
 	if (unlikely(!file))
 		return -EBADF;
 
-	req = aio_get_req(ctx);		/* returns with 2 references to req */
+	req = aio_get_req(ctx, batch);  /* returns with 2 references to req */
 	if (unlikely(!req)) {
 		fput(file);
 		return -EAGAIN;
@@ -1621,8 +1695,9 @@
 {
 	struct kioctx *ctx;
 	long ret = 0;
-	int i;
+	int i = 0;
 	struct blk_plug plug;
+	struct kiocb_batch batch;
 
 	if (unlikely(nr < 0))
 		return -EINVAL;
@@ -1639,6 +1714,8 @@
 		return -EINVAL;
 	}
 
+	kiocb_batch_init(&batch, nr);
+
 	blk_start_plug(&plug);
 
 	/*
@@ -1659,12 +1736,13 @@
 			break;
 		}
 
-		ret = io_submit_one(ctx, user_iocb, &tmp, compat);
+		ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat);
 		if (ret)
 			break;
 	}
 	blk_finish_plug(&plug);
 
+	kiocb_batch_free(&batch);
 	put_ioctx(ctx);
 	return i ? i : ret;
 }