[PATCH] FUTEX_WAKE_OP: pthread_cond_signal() speedup

ATM pthread_cond_signal is unnecessarily slow, because it wakes one waiter
(which at least on UP usually means an immediate context switch to one of
the waiter threads).  This waiter wakes up and after a few instructions it
attempts to acquire the cv internal lock, but that lock is still held by
the thread calling pthread_cond_signal.  So it goes to sleep and eventually
the signalling thread is scheduled in, unlocks the internal lock and wakes
the waiter again.

Now, before 2003-09-21 NPTL was using FUTEX_REQUEUE in pthread_cond_signal
to avoid this performance issue, but it was removed when locks were
redesigned to the 3 state scheme (unlocked, locked uncontended, locked
contended).

Following scenario shows why simply using FUTEX_REQUEUE in
pthread_cond_signal together with using lll_mutex_unlock_force in place of
lll_mutex_unlock is not enough and probably why it has been disabled at
that time:

The number is value in cv->__data.__lock.
        thr1            thr2            thr3
0       pthread_cond_wait
1       lll_mutex_lock (cv->__data.__lock)
0       lll_mutex_unlock (cv->__data.__lock)
0       lll_futex_wait (&cv->__data.__futex, futexval)
0                       pthread_cond_signal
1                       lll_mutex_lock (cv->__data.__lock)
1                                       pthread_cond_signal
2                                       lll_mutex_lock (cv->__data.__lock)
2                                         lll_futex_wait (&cv->__data.__lock, 2)
2                       lll_futex_requeue (&cv->__data.__futex, 0, 1, &cv->__data.__lock)
                          # FUTEX_REQUEUE, not FUTEX_CMP_REQUEUE
2                       lll_mutex_unlock_force (cv->__data.__lock)
0                         cv->__data.__lock = 0
0                         lll_futex_wake (&cv->__data.__lock, 1)
1       lll_mutex_lock (cv->__data.__lock)
0       lll_mutex_unlock (cv->__data.__lock)
          # Here, lll_mutex_unlock doesn't know there are threads waiting
          # on the internal cv's lock

Now, I believe it is possible to use FUTEX_REQUEUE in pthread_cond_signal,
but it will cost us not one, but 2 extra syscalls and, what's worse, one of
these extra syscalls will be done for every single waiting loop in
pthread_cond_*wait.

We would need to use lll_mutex_unlock_force in pthread_cond_signal after
requeue and lll_mutex_cond_lock in pthread_cond_*wait after lll_futex_wait.

Another alternative is to do the unlocking pthread_cond_signal needs to do
(the lock can't be unlocked before lll_futex_wake, as that is racy) in the
kernel.

I have implemented both variants, futex-requeue-glibc.patch is the first
one and futex-wake_op{,-glibc}.patch is the unlocking inside of the kernel.
 The kernel interface allows userland to specify how exactly an unlocking
operation should look like (some atomic arithmetic operation with optional
constant argument and comparison of the previous futex value with another
constant).

It has been implemented just for ppc*, x86_64 and i?86, for other
architectures I'm including just a stub header which can be used as a
starting point by maintainers to write support for their arches and ATM
will just return -ENOSYS for FUTEX_WAKE_OP.  The requeue patch has been
(lightly) tested just on x86_64, the wake_op patch on ppc64 kernel running
32-bit and 64-bit NPTL and x86_64 kernel running 32-bit and 64-bit NPTL.

With the following benchmark on UP x86-64 I get:

for i in nptl-orig nptl-requeue nptl-wake_op; do echo time elf/ld.so --library-path .:$i /tmp/bench; \
for j in 1 2; do echo ( time elf/ld.so --library-path .:$i /tmp/bench ) 2>&1; done; done
time elf/ld.so --library-path .:nptl-orig /tmp/bench
real 0m0.655s user 0m0.253s sys 0m0.403s
real 0m0.657s user 0m0.269s sys 0m0.388s
time elf/ld.so --library-path .:nptl-requeue /tmp/bench
real 0m0.496s user 0m0.225s sys 0m0.271s
real 0m0.531s user 0m0.242s sys 0m0.288s
time elf/ld.so --library-path .:nptl-wake_op /tmp/bench
real 0m0.380s user 0m0.176s sys 0m0.204s
real 0m0.382s user 0m0.175s sys 0m0.207s

The benchmark is at:
http://sourceware.org/ml/libc-alpha/2005-03/txt00001.txt
Older futex-requeue-glibc.patch version is at:
http://sourceware.org/ml/libc-alpha/2005-03/txt00002.txt
Older futex-wake_op-glibc.patch version is at:
http://sourceware.org/ml/libc-alpha/2005-03/txt00003.txt
Will post a new version (just x86-64 fixes so that the patch
applies against pthread_cond_signal.S) to libc-hacker ml soon.

Attached is the kernel FUTEX_WAKE_OP patch as well as a simple-minded
testcase that will not test the atomicity of the operation, but at least
check if the threads that should have been woken up are woken up and
whether the arithmetic operation in the kernel gave the expected results.

Acked-by: Ingo Molnar <mingo@redhat.com>
Cc: Ulrich Drepper <drepper@redhat.com>
Cc: Jamie Lokier <jamie@shareable.org>
Cc: Rusty Russell <rusty@rustcorp.com.au>
Signed-off-by: Yoichi Yuasa <yuasa@hh.iij4u.or.jp>
Signed-off-by: Andrew Morton <akpm@osdl.org>
Signed-off-by: Linus Torvalds <torvalds@osdl.org>
diff --git a/kernel/futex.c b/kernel/futex.c
index c7130f8..07ba87d 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -40,6 +40,7 @@
 #include <linux/pagemap.h>
 #include <linux/syscalls.h>
 #include <linux/signal.h>
+#include <asm/futex.h>
 
 #define FUTEX_HASHBITS (CONFIG_BASE_SMALL ? 4 : 8)
 
@@ -327,6 +328,118 @@
 }
 
 /*
+ * Wake up all waiters hashed on the physical page that is mapped
+ * to this virtual address:
+ */
+static int futex_wake_op(unsigned long uaddr1, unsigned long uaddr2, int nr_wake, int nr_wake2, int op)
+{
+	union futex_key key1, key2;
+	struct futex_hash_bucket *bh1, *bh2;
+	struct list_head *head;
+	struct futex_q *this, *next;
+	int ret, op_ret, attempt = 0;
+
+retryfull:
+	down_read(&current->mm->mmap_sem);
+
+	ret = get_futex_key(uaddr1, &key1);
+	if (unlikely(ret != 0))
+		goto out;
+	ret = get_futex_key(uaddr2, &key2);
+	if (unlikely(ret != 0))
+		goto out;
+
+	bh1 = hash_futex(&key1);
+	bh2 = hash_futex(&key2);
+
+retry:
+	if (bh1 < bh2)
+		spin_lock(&bh1->lock);
+	spin_lock(&bh2->lock);
+	if (bh1 > bh2)
+		spin_lock(&bh1->lock);
+
+	op_ret = futex_atomic_op_inuser(op, (int __user *)uaddr2);
+	if (unlikely(op_ret < 0)) {
+		int dummy;
+
+		spin_unlock(&bh1->lock);
+		if (bh1 != bh2)
+			spin_unlock(&bh2->lock);
+
+		/* futex_atomic_op_inuser needs to both read and write
+		 * *(int __user *)uaddr2, but we can't modify it
+		 * non-atomically.  Therefore, if get_user below is not
+		 * enough, we need to handle the fault ourselves, while
+		 * still holding the mmap_sem.  */
+		if (attempt++) {
+			struct vm_area_struct * vma;
+			struct mm_struct *mm = current->mm;
+
+			ret = -EFAULT;
+			if (attempt >= 2 ||
+			    !(vma = find_vma(mm, uaddr2)) ||
+			    vma->vm_start > uaddr2 ||
+			    !(vma->vm_flags & VM_WRITE))
+				goto out;
+
+			switch (handle_mm_fault(mm, vma, uaddr2, 1)) {
+			case VM_FAULT_MINOR:
+				current->min_flt++;
+				break;
+			case VM_FAULT_MAJOR:
+				current->maj_flt++;
+				break;
+			default:
+				goto out;
+			}
+			goto retry;
+		}
+
+		/* If we would have faulted, release mmap_sem,
+		 * fault it in and start all over again.  */
+		up_read(&current->mm->mmap_sem);
+
+		ret = get_user(dummy, (int __user *)uaddr2);
+		if (ret)
+			return ret;
+
+		goto retryfull;
+	}
+
+	head = &bh1->chain;
+
+	list_for_each_entry_safe(this, next, head, list) {
+		if (match_futex (&this->key, &key1)) {
+			wake_futex(this);
+			if (++ret >= nr_wake)
+				break;
+		}
+	}
+
+	if (op_ret > 0) {
+		head = &bh2->chain;
+
+		op_ret = 0;
+		list_for_each_entry_safe(this, next, head, list) {
+			if (match_futex (&this->key, &key2)) {
+				wake_futex(this);
+				if (++op_ret >= nr_wake2)
+					break;
+			}
+		}
+		ret += op_ret;
+	}
+
+	spin_unlock(&bh1->lock);
+	if (bh1 != bh2)
+		spin_unlock(&bh2->lock);
+out:
+	up_read(&current->mm->mmap_sem);
+	return ret;
+}
+
+/*
  * Requeue all waiters hashed on one physical page to another
  * physical page.
  */
@@ -740,6 +853,9 @@
 	case FUTEX_CMP_REQUEUE:
 		ret = futex_requeue(uaddr, uaddr2, val, val2, &val3);
 		break;
+	case FUTEX_WAKE_OP:
+		ret = futex_wake_op(uaddr, uaddr2, val, val2, val3);
+		break;
 	default:
 		ret = -ENOSYS;
 	}