aio: convert the ioctx list to table lookup v3

On Wed, Jun 12, 2013 at 11:14:40AM -0700, Kent Overstreet wrote:
> On Mon, Apr 15, 2013 at 02:40:55PM +0300, Octavian Purdila wrote:
> > When using a large number of threads performing AIO operations the
> > IOCTX list may get a significant number of entries which will cause
> > significant overhead. For example, when running this fio script:
> >
> > rw=randrw; size=256k ;directory=/mnt/fio; ioengine=libaio; iodepth=1
> > blocksize=1024; numjobs=512; thread; loops=100
> >
> > on an EXT2 filesystem mounted on top of a ramdisk we can observe up to
> > 30% CPU time spent by lookup_ioctx:
> >
> >  32.51%  [guest.kernel]  [g] lookup_ioctx
> >   9.19%  [guest.kernel]  [g] __lock_acquire.isra.28
> >   4.40%  [guest.kernel]  [g] lock_release
> >   4.19%  [guest.kernel]  [g] sched_clock_local
> >   3.86%  [guest.kernel]  [g] local_clock
> >   3.68%  [guest.kernel]  [g] native_sched_clock
> >   3.08%  [guest.kernel]  [g] sched_clock_cpu
> >   2.64%  [guest.kernel]  [g] lock_release_holdtime.part.11
> >   2.60%  [guest.kernel]  [g] memcpy
> >   2.33%  [guest.kernel]  [g] lock_acquired
> >   2.25%  [guest.kernel]  [g] lock_acquire
> >   1.84%  [guest.kernel]  [g] do_io_submit
> >
> > This patchs converts the ioctx list to a radix tree. For a performance
> > comparison the above FIO script was run on a 2 sockets 8 core
> > machine. This are the results (average and %rsd of 10 runs) for the
> > original list based implementation and for the radix tree based
> > implementation:
> >
> > cores         1         2         4         8         16        32
> > list       109376 ms  69119 ms  35682 ms  22671 ms  19724 ms  16408 ms
> > %rsd         0.69%      1.15%     1.17%     1.21%     1.71%     1.43%
> > radix       73651 ms  41748 ms  23028 ms  16766 ms  15232 ms   13787 ms
> > %rsd         1.19%      0.98%     0.69%     1.13%    0.72%      0.75%
> > % of radix
> > relative    66.12%     65.59%    66.63%    72.31%   77.26%     83.66%
> > to list
> >
> > To consider the impact of the patch on the typical case of having
> > only one ctx per process the following FIO script was run:
> >
> > rw=randrw; size=100m ;directory=/mnt/fio; ioengine=libaio; iodepth=1
> > blocksize=1024; numjobs=1; thread; loops=100
> >
> > on the same system and the results are the following:
> >
> > list        58892 ms
> > %rsd         0.91%
> > radix       59404 ms
> > %rsd         0.81%
> > % of radix
> > relative    100.87%
> > to list
>
> So, I was just doing some benchmarking/profiling to get ready to send
> out the aio patches I've got for 3.11 - and it looks like your patch is
> causing a ~1.5% throughput regression in my testing :/
... <snip>

I've got an alternate approach for fixing this wart in lookup_ioctx()...
Instead of using an rbtree, just use the reserved id in the ring buffer
header to index an array pointing the ioctx.  It's not finished yet, and
it needs to be tidied up, but is most of the way there.

		-ben
--
"Thought is the essence of where you are now."
--
kmo> And, a rework of Ben's code, but this was entirely his idea
kmo>		-Kent

bcrl> And fix the code to use the right mm_struct in kill_ioctx(), actually
free memory.

Signed-off-by: Benjamin LaHaise <bcrl@kvack.org>
diff --git a/fs/aio.c b/fs/aio.c
index 945dd0d..52f200e 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -66,6 +66,12 @@
 
 #define AIO_RING_PAGES	8
 
+struct kioctx_table {
+	struct rcu_head	rcu;
+	unsigned	nr;
+	struct kioctx	*table[];
+};
+
 struct kioctx_cpu {
 	unsigned		reqs_available;
 };
@@ -74,9 +80,7 @@
 	struct percpu_ref	users;
 	atomic_t		dead;
 
-	/* This needs improving */
 	unsigned long		user_id;
-	struct hlist_node	list;
 
 	struct __percpu kioctx_cpu *cpu;
 
@@ -135,6 +139,8 @@
 
 	struct page		*internal_pages[AIO_RING_PAGES];
 	struct file		*aio_ring_file;
+
+	unsigned		id;
 };
 
 /*------ sysctl variables----*/
@@ -326,7 +332,7 @@
 
 	ring = kmap_atomic(ctx->ring_pages[0]);
 	ring->nr = nr_events;	/* user copy */
-	ring->id = ctx->user_id;
+	ring->id = ~0U;
 	ring->head = ring->tail = 0;
 	ring->magic = AIO_RING_MAGIC;
 	ring->compat_features = AIO_RING_COMPAT_FEATURES;
@@ -462,6 +468,58 @@
 	schedule_work(&ctx->free_work);
 }
 
+static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
+{
+	unsigned i, new_nr;
+	struct kioctx_table *table, *old;
+	struct aio_ring *ring;
+
+	spin_lock(&mm->ioctx_lock);
+	table = rcu_dereference(mm->ioctx_table);
+
+	while (1) {
+		if (table)
+			for (i = 0; i < table->nr; i++)
+				if (!table->table[i]) {
+					ctx->id = i;
+					table->table[i] = ctx;
+					spin_unlock(&mm->ioctx_lock);
+
+					ring = kmap_atomic(ctx->ring_pages[0]);
+					ring->id = ctx->id;
+					kunmap_atomic(ring);
+					return 0;
+				}
+
+		new_nr = (table ? table->nr : 1) * 4;
+
+		spin_unlock(&mm->ioctx_lock);
+
+		table = kzalloc(sizeof(*table) + sizeof(struct kioctx *) *
+				new_nr, GFP_KERNEL);
+		if (!table)
+			return -ENOMEM;
+
+		table->nr = new_nr;
+
+		spin_lock(&mm->ioctx_lock);
+		old = rcu_dereference(mm->ioctx_table);
+
+		if (!old) {
+			rcu_assign_pointer(mm->ioctx_table, table);
+		} else if (table->nr > old->nr) {
+			memcpy(table->table, old->table,
+			       old->nr * sizeof(struct kioctx *));
+
+			rcu_assign_pointer(mm->ioctx_table, table);
+			kfree_rcu(old, rcu);
+		} else {
+			kfree(table);
+			table = old;
+		}
+	}
+}
+
 /* ioctx_alloc
  *	Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
  */
@@ -520,6 +578,10 @@
 	ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4);
 	BUG_ON(!ctx->req_batch);
 
+	err = ioctx_add_table(ctx, mm);
+	if (err)
+		goto out_cleanup_noerr;
+
 	/* limit the number of system wide aios */
 	spin_lock(&aio_nr_lock);
 	if (aio_nr + nr_events > (aio_max_nr * 2UL) ||
@@ -532,17 +594,13 @@
 
 	percpu_ref_get(&ctx->users); /* io_setup() will drop this ref */
 
-	/* now link into global list. */
-	spin_lock(&mm->ioctx_lock);
-	hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
-	spin_unlock(&mm->ioctx_lock);
-
 	pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
 		 ctx, ctx->user_id, mm, ctx->nr_events);
 	return ctx;
 
 out_cleanup:
 	err = -EAGAIN;
+out_cleanup_noerr:
 	aio_free_ring(ctx);
 out_freepcpu:
 	free_percpu(ctx->cpu);
@@ -561,10 +619,18 @@
  *	when the processes owning a context have all exited to encourage
  *	the rapid destruction of the kioctx.
  */
-static void kill_ioctx(struct kioctx *ctx)
+static void kill_ioctx(struct mm_struct *mm, struct kioctx *ctx)
 {
 	if (!atomic_xchg(&ctx->dead, 1)) {
-		hlist_del_rcu(&ctx->list);
+		struct kioctx_table *table;
+
+		spin_lock(&mm->ioctx_lock);
+		table = rcu_dereference(mm->ioctx_table);
+
+		WARN_ON(ctx != table->table[ctx->id]);
+		table->table[ctx->id] = NULL;
+		spin_unlock(&mm->ioctx_lock);
+
 		/* percpu_ref_kill() will do the necessary call_rcu() */
 		wake_up_all(&ctx->wait);
 
@@ -613,10 +679,28 @@
  */
 void exit_aio(struct mm_struct *mm)
 {
+	struct kioctx_table *table;
 	struct kioctx *ctx;
-	struct hlist_node *n;
+	unsigned i = 0;
 
-	hlist_for_each_entry_safe(ctx, n, &mm->ioctx_list, list) {
+	while (1) {
+		rcu_read_lock();
+		table = rcu_dereference(mm->ioctx_table);
+
+		do {
+			if (!table || i >= table->nr) {
+				rcu_read_unlock();
+				rcu_assign_pointer(mm->ioctx_table, NULL);
+				if (table)
+					kfree(table);
+				return;
+			}
+
+			ctx = table->table[i++];
+		} while (!ctx);
+
+		rcu_read_unlock();
+
 		/*
 		 * We don't need to bother with munmap() here -
 		 * exit_mmap(mm) is coming and it'll unmap everything.
@@ -627,7 +711,7 @@
 		 */
 		ctx->mmap_size = 0;
 
-		kill_ioctx(ctx);
+		kill_ioctx(mm, ctx);
 	}
 }
 
@@ -710,19 +794,27 @@
 
 static struct kioctx *lookup_ioctx(unsigned long ctx_id)
 {
+	struct aio_ring __user *ring  = (void __user *)ctx_id;
 	struct mm_struct *mm = current->mm;
 	struct kioctx *ctx, *ret = NULL;
+	struct kioctx_table *table;
+	unsigned id;
+
+	if (get_user(id, &ring->id))
+		return NULL;
 
 	rcu_read_lock();
+	table = rcu_dereference(mm->ioctx_table);
 
-	hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) {
-		if (ctx->user_id == ctx_id) {
-			percpu_ref_get(&ctx->users);
-			ret = ctx;
-			break;
-		}
+	if (!table || id >= table->nr)
+		goto out;
+
+	ctx = table->table[id];
+	if (ctx->user_id == ctx_id) {
+		percpu_ref_get(&ctx->users);
+		ret = ctx;
 	}
-
+out:
 	rcu_read_unlock();
 	return ret;
 }
@@ -998,7 +1090,7 @@
 	if (!IS_ERR(ioctx)) {
 		ret = put_user(ioctx->user_id, ctxp);
 		if (ret)
-			kill_ioctx(ioctx);
+			kill_ioctx(current->mm, ioctx);
 		percpu_ref_put(&ioctx->users);
 	}
 
@@ -1016,7 +1108,7 @@
 {
 	struct kioctx *ioctx = lookup_ioctx(ctx);
 	if (likely(NULL != ioctx)) {
-		kill_ioctx(ioctx);
+		kill_ioctx(current->mm, ioctx);
 		percpu_ref_put(&ioctx->users);
 		return 0;
 	}