Simplify ConditionVariable and avoid potential race.

Make waiters maintain mutex contenders rather than signal/broadcast
(eliminates awoken count). Avoids race where a spurious Signal wake
could remove a Broadcast contender from the mutex.

Change-Id: I5c3e36878c9fa2db09f5dc31d24a0a6222a61731
diff --git a/src/mutex.cc b/src/mutex.cc
index e1d4db6..c09a4a0 100644
--- a/src/mutex.cc
+++ b/src/mutex.cc
@@ -679,9 +679,8 @@
 ConditionVariable::ConditionVariable(const std::string& name, Mutex& guard)
     : name_(name), guard_(guard) {
 #if ART_USE_FUTEXES
-  state_ = 0;
+  sequence_ = 0;
   num_waiters_ = 0;
-  num_awoken_ = 0;
 #else
   CHECK_MUTEX_CALL(pthread_cond_init, (&cond_, NULL));
 #endif
@@ -693,7 +692,8 @@
     MutexLock mu(Thread::Current(), *Locks::runtime_shutdown_lock_);
     Runtime* runtime = Runtime::Current();
     bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
-    LOG(shutting_down ? WARNING : FATAL) << "~ConditionVariable failed for " << name_;
+    LOG(shutting_down ? WARNING : FATAL) << "ConditionVariable::~ConditionVariable for " << name_
+        << " called with " << num_waiters_ << " waiters.";
   }
 #else
   // We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
@@ -715,28 +715,20 @@
   // guard_.AssertExclusiveHeld(self);
   DCHECK_EQ(guard_.GetExclusiveOwnerTid(), SafeGetTid(self));
 #if ART_USE_FUTEXES
-  // Compute number of waiters to be requeued and add to mutex contenders.
-  int32_t num_requeued = num_waiters_ - num_awoken_;
-  if (num_requeued != 0) {
+  if (num_waiters_ > 0) {
+    android_atomic_inc(&sequence_); // Indicate the broadcast occurred.
     bool done = false;
-    android_atomic_inc(&state_); // Indicate the broadcast occurred.
     do {
-      int32_t cur_state = state_;
-      // Requeue waiters onto contenders.
-      done = futex(&state_, FUTEX_CMP_REQUEUE, 0,
+      int32_t cur_sequence = sequence_;
+      // Requeue waiters onto mutex. The waiter holds the contender count on the mutex high ensuring
+      // mutex unlocks will awaken the requeued waiter thread.
+      done = futex(&sequence_, FUTEX_CMP_REQUEUE, 0,
                    reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
-                   &guard_.state_, cur_state) != -1;
+                   &guard_.state_, cur_sequence) != -1;
       if (!done) {
         if (errno != EAGAIN) {
           PLOG(FATAL) << "futex cmp requeue failed for " << name_;
         }
-      } else {
-        // Successful requeue, add the requeued waiters to the contenders of the guard_ to ensure
-        // that unlocks of guard_ will wake the waiters. Reflect that we've requeued the waiters
-        // in the awoken count.
-        DCHECK_EQ(num_awoken_ + num_requeued, num_waiters_);
-        android_atomic_add(num_requeued, &guard_.num_contenders_);
-        num_awoken_ = num_waiters_;
       }
     } while (!done);
   }
@@ -749,17 +741,13 @@
   DCHECK(self == NULL || self == Thread::Current());
   guard_.AssertExclusiveHeld(self);
 #if ART_USE_FUTEXES
-  if (num_waiters_ > num_awoken_) {
-    android_atomic_inc(&state_); // Indicate a signal occurred.
+  if (num_waiters_ > 0) {
+    android_atomic_inc(&sequence_); // Indicate a signal occurred.
     // Futex wake 1 waiter who will then come and in contend on mutex. It'd be nice to requeue them
     // to avoid this, however, requeueing can only move all waiters.
-    int num_woken = futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0);
-    // Check something was woken or else we changed state_ before they had chance to wait.
+    int num_woken = futex(&sequence_, FUTEX_WAKE, 1, NULL, NULL, 0);
+    // Check something was woken or else we changed sequence_ before they had chance to wait.
     CHECK((num_woken == 0) || (num_woken == 1));
-    // We weren't requeued, however, to make accounting simpler in the Wait code, increment the
-    // number of contenders on the mutex.
-    num_awoken_++;
-    android_atomic_inc(&guard_.num_contenders_);
   }
 #else
   CHECK_MUTEX_CALL(pthread_cond_signal, (&cond_));
@@ -771,31 +759,26 @@
   guard_.AssertExclusiveHeld(self);
   unsigned int old_recursion_count = guard_.recursion_count_;
 #if ART_USE_FUTEXES
-  int32_t cur_state = state_;
   num_waiters_++;
+  // Ensure the Mutex is contended so that requeued threads are awoken.
+  android_atomic_inc(&guard_.num_contenders_);
   guard_.recursion_count_ = 1;
+  int32_t cur_sequence = sequence_;
   guard_.ExclusiveUnlock(self);
-  while (futex(&state_, FUTEX_WAIT, cur_state, NULL, NULL, 0) != 0) {
-    if ((errno == EINTR) || (errno == EAGAIN)) {
-      if (state_ != cur_state) {
-        // We failed and a ConditionVariable::Signal has come in.
-        break;
-      }
-    } else {
+  if (futex(&sequence_, FUTEX_WAIT, cur_sequence, NULL, NULL, 0) != 0) {
+    // Futex failed, check it is an expected error.
+    // EAGAIN == EWOULDBLK, so we let the caller try again.
+    // EINTR implies a signal was sent to this thread.
+    if ((errno != EINTR) && (errno != EAGAIN)) {
       PLOG(FATAL) << "futex wait failed for " << name_;
     }
   }
   guard_.ExclusiveLock(self);
-  CHECK_NE(num_waiters_, 0);
+  CHECK_GE(num_waiters_, 0);
   num_waiters_--;
-  if (num_awoken_ > 0) {
-    // Note: there is a subtle race where a single signal may appear to wake multiple threads due
-    // to state_ changing. We detect this race using num_awoken_ and the mutual exclusion
-    // guaranteed by guard_.
-    CHECK_NE(guard_.num_contenders_, 0);
-    android_atomic_dec(&guard_.num_contenders_);
-    num_awoken_--;
-  }
+  // We awoke and so no longer require awakes from the guard_'s unlock.
+  CHECK_GE(guard_.num_contenders_, 0);
+  android_atomic_dec(&guard_.num_contenders_);
 #else
   guard_.recursion_count_ = 0;
   CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &guard_.mutex_));
@@ -808,48 +791,29 @@
   guard_.AssertExclusiveHeld(self);
   unsigned int old_recursion_count = guard_.recursion_count_;
 #if ART_USE_FUTEXES
-  // Record the original end time so that if the futex call fails we can recompute the appropriate
-  // relative time.
-  timespec end_abs_ts;
-  InitTimeSpec(true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
   timespec rel_ts;
   InitTimeSpec(false, CLOCK_REALTIME, ms, ns, &rel_ts);
-  // Read state so that we can know if a signal comes in during before we sleep.
-  int32_t cur_state = state_;
   num_waiters_++;
+  // Ensure the Mutex is contended so that requeued threads are awoken.
+  android_atomic_inc(&guard_.num_contenders_);
   guard_.recursion_count_ = 1;
+  int32_t cur_sequence = sequence_;
   guard_.ExclusiveUnlock(self);
-  while (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
+  if (futex(&sequence_, FUTEX_WAIT, cur_sequence, &rel_ts, NULL, 0) != 0) {
     if (errno == ETIMEDOUT) {
-      break; // Timed out we're done.
+      // Timed out we're done.
     } else if ((errno == EINTR) || (errno == EAGAIN)) {
-      if (state_ != cur_state) {
-        break; // We failed and a ConditionVariable::Signal has come in.
-      }
-      timespec now_abs_ts;
-      InitTimeSpec(true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
-      if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
-        // futex failed and we timed out in the meantime.
-        break;
-      }
+      // A signal or ConditionVariable::Signal/Broadcast has come in.
     } else {
       PLOG(FATAL) << "timed futex wait failed for " << name_;
     }
   }
   guard_.ExclusiveLock(self);
+  CHECK_GE(num_waiters_, 0);
   num_waiters_--;
-  if ((cur_state != state_) && (num_awoken_ > 0)) {
-    // TODO: We were woken and didn't timeout (ie state_ changed - ignoring overflow). This check
-    // is racy given the overflow, however, getting a good causal picture from errno is complex
-    // and racy. From practice it hasn't yet been found to work, so we live with the highly
-    // unlikely race.
-    // Note: there is a subtle race where a single signal may appear to wake multiple threads due
-    // to state_ changing. We detect this race using num_awoken_ and the mutual exclusion
-    // guaranteed by guard_.
-    CHECK_NE(guard_.num_contenders_, 0);
-    android_atomic_dec(&guard_.num_contenders_);
-    num_awoken_--;
-  }
+  // We awoke and so no longer require awakes from the guard_'s unlock.
+  CHECK_GE(guard_.num_contenders_, 0);
+  android_atomic_dec(&guard_.num_contenders_);
 #else
 #ifdef HAVE_TIMEDWAIT_MONOTONIC
 #define TIMEDWAIT pthread_cond_timedwait_monotonic