Implement mutex requeueing for cv broadcasts.

Make the mutex guarding a condition variable part of its state. On a
broadcast requeue waiters on the mutex so they are awoken as the mutex
is unlocked (thereby avoiding thundering herds). Explicit futex use
still guarded behind ART_USE_FUTEXES which remains disabled as I'm
unhappy with some of the warts of mutex usage. Uploading so that the API
changes can stabilize.

Change-Id: Iedb601856ccd8bbc3a64da4ba0cee82246e7bcbf
diff --git a/src/mutex.cc b/src/mutex.cc
index 96e04ef..18dc863 100644
--- a/src/mutex.cc
+++ b/src/mutex.cc
@@ -17,11 +17,13 @@
 #include "mutex.h"
 
 #include <errno.h>
+#include <sys/time.h>
 
 #include "cutils/atomic.h"
 #include "cutils/atomic-inline.h"
 #include "logging.h"
 #include "runtime.h"
+#include "scoped_thread_state_change.h"
 #include "thread.h"
 #include "utils.h"
 
@@ -41,8 +43,8 @@
 #ifndef SYS_futex
 #define SYS_futex __NR_futex
 #endif
-int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, int *, int ) {
-  return syscall(SYS_futex, uaddr, op, val, timeout, NULL, NULL);
+int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, volatile int *uaddr2, int val3) {
+  return syscall(SYS_futex, uaddr, op, val, timeout, uaddr2, val3);
 }
 #endif  // ART_USE_FUTEXES
 
@@ -96,6 +98,51 @@
   }
 }
 
+// Initialize a timespec to either an absolute or relative time.
+static void InitTimeSpec(Thread* self, bool absolute, int clock, int64_t ms, int32_t ns,
+                         timespec* ts) {
+  int64_t endSec;
+
+  if (absolute) {
+    clock_gettime(clock, ts);
+  } else {
+    ts->tv_sec = 0;
+    ts->tv_nsec = 0;
+  }
+  endSec = ts->tv_sec + ms / 1000;
+  if (UNLIKELY(endSec >= 0x7fffffff)) {
+    std::ostringstream ss;
+    ScopedObjectAccess soa(self);
+    self->Dump(ss);
+    LOG(INFO) << "Note: end time exceeds epoch: " << ss.str();
+    endSec = 0x7ffffffe;
+  }
+  ts->tv_sec = endSec;
+  ts->tv_nsec = (ts->tv_nsec + (ms % 1000) * 1000000) + ns;
+
+  // Catch rollover.
+  if (ts->tv_nsec >= 1000000000L) {
+    ts->tv_sec++;
+    ts->tv_nsec -= 1000000000L;
+  }
+}
+
+#if ART_USE_FUTEXES
+static bool ComputeRelativeTimeSpec(timespec* result_ts, const timespec& lhs, const timespec& rhs) {
+  const long int one_sec = 1000 * 1000 * 1000;  // one second in nanoseconds.
+  result_ts->tv_sec = lhs.tv_sec - rhs.tv_sec;
+  result_ts->tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
+  if (result_ts->tv_nsec < 0) {
+    result_ts->tv_sec--;
+    result_ts->tv_nsec += one_sec;
+  } else if (result_ts->tv_nsec > one_sec) {
+    result_ts->tv_sec++;
+    result_ts->tv_nsec -= one_sec;
+  }
+  return result_ts->tv_sec < 0;
+}
+#endif
+
 BaseMutex::BaseMutex(const char* name, LockLevel level) : level_(level), name_(name) {}
 
 static void CheckUnattachedThread(LockLevel level) NO_THREAD_SAFETY_ANALYSIS {
@@ -176,7 +223,11 @@
 
 Mutex::Mutex(const char* name, LockLevel level, bool recursive)
     : BaseMutex(name, level), recursive_(recursive), recursion_count_(0) {
-#if defined(__BIONIC__) || defined(__APPLE__)
+#if ART_USE_FUTEXES
+  state_ = 0;
+  exclusive_owner_ = 0;
+  num_contenders_ = 0;
+#elif defined(__BIONIC__) || defined(__APPLE__)
   // Use recursive mutexes for bionic and Apple otherwise the
   // non-recursive mutexes don't have TIDs to check lock ownership of.
   pthread_mutexattr_t attributes;
@@ -190,6 +241,17 @@
 }
 
 Mutex::~Mutex() {
+#if ART_USE_FUTEXES
+  if (state_ != 0) {
+    MutexLock mu(Thread::Current(), *Locks::runtime_shutdown_lock_);
+    Runtime* runtime = Runtime::Current();
+    bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
+    LOG(shutting_down ? WARNING : FATAL) << "destroying mutex with owner: " << exclusive_owner_;
+  } else {
+    CHECK_EQ(exclusive_owner_, 0U)  << "unexpectedly found an owner on unlocked mutex " << name_;
+    CHECK_EQ(num_contenders_, 0) << "unexpectedly found a contender on mutex " << name_;
+  }
+#else
   // We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
   // may still be using locks.
   int rc = pthread_mutex_destroy(&mutex_);
@@ -201,6 +263,7 @@
     bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
     PLOG(shutting_down ? WARNING : FATAL) << "pthread_mutex_destroy failed for " << name_;
   }
+#endif
 }
 
 void Mutex::ExclusiveLock(Thread* self) {
@@ -209,7 +272,29 @@
     AssertNotHeld(self);
   }
   if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+    bool done = false;
+    do {
+      int32_t cur_state = state_;
+      if (cur_state == 0) {
+        // Change state from 0 to 1.
+        done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+      } else {
+        // Failed to acquire, hang up.
+        android_atomic_inc(&num_contenders_);
+        if (futex(&state_, FUTEX_WAIT, 1, NULL, NULL, 0) != 0) {
+          if (errno != EAGAIN) {
+            PLOG(FATAL) << "futex wait failed for " << name_;
+          }
+        }
+        android_atomic_dec(&num_contenders_);
+      }
+    } while(!done);
+    DCHECK_EQ(state_, 1);
+    exclusive_owner_ = SafeGetTid(self);
+#else
     CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_));
+#endif
     RegisterAsLocked(self);
   }
   recursion_count_++;
@@ -226,6 +311,20 @@
     AssertNotHeld(self);
   }
   if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+    bool done = false;
+    do {
+      int32_t cur_state = state_;
+      if (cur_state == 0) {
+        // Change state from 0 to 1.
+        done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+      } else {
+        return false;
+      }
+    } while(!done);
+    DCHECK_EQ(state_, 1);
+    exclusive_owner_ = SafeGetTid(self);
+#else
     int result = pthread_mutex_trylock(&mutex_);
     if (result == EBUSY) {
       return false;
@@ -234,6 +333,7 @@
       errno = result;
       PLOG(FATAL) << "pthread_mutex_trylock failed for " << name_;
     }
+#endif
     RegisterAsLocked(self);
   }
   recursion_count_++;
@@ -255,7 +355,28 @@
           << name_ << " " << recursion_count_;
     }
     RegisterAsUnlocked(self);
+#if ART_USE_FUTEXES
+  bool done = false;
+  do {
+    int32_t cur_state = state_;
+    if (cur_state == 1) {
+      // We're no longer the owner.
+      exclusive_owner_ = 0;
+      // Change state to 0.
+      done = android_atomic_cmpxchg(cur_state, 0, &state_) == 0;
+      if (done) { // Spurious fail?
+        // Wake a contender
+        if (num_contenders_ > 0) {
+          futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0);
+        }
+      }
+    } else {
+      LOG(FATAL) << "Unexpected state_:" << cur_state << " for " << name_;
+    }
+  } while(!done);
+#else
     CHECK_MUTEX_CALL(pthread_mutex_unlock, (&mutex_));
+#endif
   }
 }
 
@@ -272,7 +393,9 @@
 }
 
 uint64_t Mutex::GetExclusiveOwnerTid() const {
-#if defined(__BIONIC__)
+#if ART_USE_FUTEXES
+  return exclusive_owner_;
+#elif defined(__BIONIC__)
   return static_cast<uint64_t>((mutex_.value >> 16) & 0xffff);
 #elif defined(__GLIBC__)
   return reinterpret_cast<const glibc_pthread_mutex_t*>(&mutex_)->owner;
@@ -396,10 +519,12 @@
 }
 
 #if HAVE_TIMED_RWLOCK
-bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, const timespec& abs_timeout) {
+bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns) {
   DCHECK(self == NULL || self == Thread::Current());
 #if ART_USE_FUTEXES
   bool done = false;
+  timespec end_abs_ts;
+  InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
   do {
     int32_t cur_state = state_;
     if (cur_state == 0) {
@@ -407,12 +532,18 @@
       done = android_atomic_cmpxchg(0, -1, &state_) == 0;
     } else {
       // Failed to acquire, hang up.
+      timespec now_abs_ts;
+      InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+      timespec rel_ts;
+      if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+        return false;  // Timed out.
+      }
       android_atomic_inc(&num_pending_writers_);
-      if (futex(&state_, FUTEX_WAIT, cur_state, &abs_timeout, NULL, 0) != 0) {
+      if (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
         if (errno == ETIMEDOUT) {
           android_atomic_dec(&num_pending_writers_);
-          return false;
-        } else if (errno != EAGAIN) {
+          return false;  // Timed out.
+        } else if (errno != EAGAIN && errno != EINTR) {
           PLOG(FATAL) << "timed futex wait failed for " << name_;
         }
       }
@@ -421,7 +552,9 @@
   } while(!done);
   exclusive_owner_ = SafeGetTid(self);
 #else
-  int result = pthread_rwlock_timedwrlock(&rwlock_, &abs_timeout);
+  timespec ts;
+  InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &ts);
+  int result = pthread_rwlock_timedwrlock(&rwlock_, &ts);
   if (result == ETIMEDOUT) {
     return false;
   }
@@ -576,11 +709,19 @@
   return os << mu.Dump();
 }
 
-ConditionVariable::ConditionVariable(const std::string& name) : name_(name) {
+ConditionVariable::ConditionVariable(const std::string& name, Mutex& guard)
+    : name_(name), guard_(guard) {
+#if ART_USE_FUTEXES
+  state_ = 0;
+  num_waiters_ = 0;
+  num_awoken_ = 0;
+#else
   CHECK_MUTEX_CALL(pthread_cond_init, (&cond_, NULL));
+#endif
 }
 
 ConditionVariable::~ConditionVariable() {
+#if !ART_USE_FUTEXES
   // We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
   // may still be using condition variables.
   int rc = pthread_cond_destroy(&cond_);
@@ -591,39 +732,162 @@
     bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
     PLOG(shutting_down ? WARNING : FATAL) << "pthread_cond_destroy failed for " << name_;
   }
+#endif
 }
 
-void ConditionVariable::Broadcast() {
+void ConditionVariable::Broadcast(Thread* self) {
+  DCHECK(self == NULL || self == Thread::Current());
+  // TODO: enable below, there's a race in thread creation that causes false failures currently.
+  // guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+  if (num_waiters_ > 0) {
+    android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+    bool done = false;
+    do {
+       int32_t cur_state = state_;
+       // Compute number of waiters requeued and add to mutex contenders.
+       int32_t num_requeued = num_waiters_ - num_awoken_;
+       android_atomic_add(num_requeued, &guard_.num_contenders_);
+       // Requeue waiters onto contenders.
+       done = futex(&state_, FUTEX_CMP_REQUEUE, 0,
+                    reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
+                    &guard_.state_, cur_state) != -1;
+       if (!done) {
+         if (errno != EAGAIN) {
+           PLOG(FATAL) << "futex cmp requeue failed for " << name_;
+         }
+       } else {
+         num_awoken_ = num_waiters_;
+       }
+    } while (!done);
+  }
+#else
   CHECK_MUTEX_CALL(pthread_cond_broadcast, (&cond_));
+#endif
 }
 
-void ConditionVariable::Signal() {
+void ConditionVariable::Signal(Thread* self) {
+  DCHECK(self == NULL || self == Thread::Current());
+  guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+  if (num_waiters_ > 0) {
+    android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+    // Futex wake 1 waiter who will then come and in contend on mutex. It'd be nice to requeue them
+    // to avoid this, however, requeueing can only move all waiters.
+    if (futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0) == 1) {
+      // Wake success.
+      // We weren't requeued, however, to make accounting simpler in the Wait code, increment the
+      // number of contenders on the mutex.
+      num_awoken_++;
+      android_atomic_inc(&guard_.num_contenders_);
+    }
+  }
+#else
   CHECK_MUTEX_CALL(pthread_cond_signal, (&cond_));
+#endif
 }
 
-void ConditionVariable::Wait(Thread* self, Mutex& mutex) {
-  mutex.CheckSafeToWait(self);
-  unsigned int old_recursion_count = mutex.recursion_count_;
-  mutex.recursion_count_ = 0;
-  CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &mutex.mutex_));
-  mutex.recursion_count_ = old_recursion_count;
+void ConditionVariable::Wait(Thread* self) {
+  DCHECK(self == NULL || self == Thread::Current());
+  guard_.AssertExclusiveHeld(self);
+  guard_.CheckSafeToWait(self);
+  unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+  int32_t cur_state = state_;
+  num_waiters_++;
+  guard_.recursion_count_ = 1;
+  guard_.ExclusiveUnlock(self);
+  bool woken = true;
+  while (futex(&state_, FUTEX_WAIT, cur_state, NULL, NULL, 0) != 0) {
+    if (errno == EINTR || errno == EAGAIN) {
+      if (state_ != cur_state) {
+        // We failed and a signal has come in.
+        woken = false;
+        break;
+      }
+    } else {
+      PLOG(FATAL) << "futex wait failed for " << name_;
+    }
+  }
+  guard_.ExclusiveLock(self);
+  num_waiters_--;
+  if (woken) {
+    // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+    android_atomic_dec(&guard_.num_contenders_);
+    num_awoken_--;
+  }
+#else
+  guard_.recursion_count_ = 0;
+  CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &guard_.mutex_));
+#endif
+  guard_.recursion_count_ = old_recursion_count;
 }
 
-void ConditionVariable::TimedWait(Thread* self, Mutex& mutex, const timespec& ts) {
+void ConditionVariable::TimedWait(Thread* self, int64_t ms, int32_t ns) {
+  DCHECK(self == NULL || self == Thread::Current());
+  guard_.AssertExclusiveHeld(self);
+  guard_.CheckSafeToWait(self);
+  unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+  // Record the original end time so that if the futex call fails we can recompute the appropriate
+  // relative time.
+  timespec end_abs_ts;
+  InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
+  timespec rel_ts;
+  InitTimeSpec(self, false, CLOCK_REALTIME, ms, ns, &rel_ts);
+  // Read state so that we can know if a signal comes in during before we sleep.
+  int32_t cur_state = state_;
+  num_waiters_++;
+  guard_.recursion_count_ = 1;
+  guard_.ExclusiveUnlock(self);
+  bool woken = true;  // Did the futex wait end because we were awoken?
+  while (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
+    if (errno == ETIMEDOUT) {
+      woken = false;
+      break;
+    }
+    if ((errno == EINTR) || (errno == EAGAIN)) {
+      if (state_ != cur_state) {
+        // We failed and a signal has come in.
+        woken = false;
+        break;
+      }
+      timespec now_abs_ts;
+      InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+      if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+        // futex failed and we timed out in the meantime.
+        woken = false;
+        break;
+      }
+    } else {
+      PLOG(FATAL) << "timed futex wait failed for " << name_;
+    }
+  }
+  guard_.ExclusiveLock(self);
+  num_waiters_--;
+  if (woken) {
+    // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+    android_atomic_dec(&guard_.num_contenders_);
+    num_awoken_--;
+  }
+#else
 #ifdef HAVE_TIMEDWAIT_MONOTONIC
 #define TIMEDWAIT pthread_cond_timedwait_monotonic
+  int clock = CLOCK_MONOTONIC;
 #else
 #define TIMEDWAIT pthread_cond_timedwait
+  int clock = CLOCK_REALTIME;
 #endif
-  mutex.CheckSafeToWait(self);
-  unsigned int old_recursion_count = mutex.recursion_count_;
-  mutex.recursion_count_ = 0;
-  int rc = TIMEDWAIT(&cond_, &mutex.mutex_, &ts);
-  mutex.recursion_count_ = old_recursion_count;
+  guard_.recursion_count_ = 0;
+  timespec ts;
+  InitTimeSpec(self, true, clock, ms, ns, &ts);
+  int rc = TIMEDWAIT(&cond_, &guard_.mutex_, &ts);
   if (rc != 0 && rc != ETIMEDOUT) {
     errno = rc;
     PLOG(FATAL) << "TimedWait failed for " << name_;
   }
+#endif
+  guard_.recursion_count_ = old_recursion_count;
 }
 
 }  // namespace art