Simplify ConditionVariable and avoid potential race.
Make waiters maintain mutex contenders rather than signal/broadcast
(eliminates awoken count). Avoids race where a spurious Signal wake
could remove a Broadcast contender from the mutex.
Change-Id: I5c3e36878c9fa2db09f5dc31d24a0a6222a61731
diff --git a/src/mutex.cc b/src/mutex.cc
index e1d4db6..c09a4a0 100644
--- a/src/mutex.cc
+++ b/src/mutex.cc
@@ -679,9 +679,8 @@
ConditionVariable::ConditionVariable(const std::string& name, Mutex& guard)
: name_(name), guard_(guard) {
#if ART_USE_FUTEXES
- state_ = 0;
+ sequence_ = 0;
num_waiters_ = 0;
- num_awoken_ = 0;
#else
CHECK_MUTEX_CALL(pthread_cond_init, (&cond_, NULL));
#endif
@@ -693,7 +692,8 @@
MutexLock mu(Thread::Current(), *Locks::runtime_shutdown_lock_);
Runtime* runtime = Runtime::Current();
bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
- LOG(shutting_down ? WARNING : FATAL) << "~ConditionVariable failed for " << name_;
+ LOG(shutting_down ? WARNING : FATAL) << "ConditionVariable::~ConditionVariable for " << name_
+ << " called with " << num_waiters_ << " waiters.";
}
#else
// We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
@@ -715,28 +715,20 @@
// guard_.AssertExclusiveHeld(self);
DCHECK_EQ(guard_.GetExclusiveOwnerTid(), SafeGetTid(self));
#if ART_USE_FUTEXES
- // Compute number of waiters to be requeued and add to mutex contenders.
- int32_t num_requeued = num_waiters_ - num_awoken_;
- if (num_requeued != 0) {
+ if (num_waiters_ > 0) {
+ android_atomic_inc(&sequence_); // Indicate the broadcast occurred.
bool done = false;
- android_atomic_inc(&state_); // Indicate the broadcast occurred.
do {
- int32_t cur_state = state_;
- // Requeue waiters onto contenders.
- done = futex(&state_, FUTEX_CMP_REQUEUE, 0,
+ int32_t cur_sequence = sequence_;
+ // Requeue waiters onto mutex. The waiter holds the contender count on the mutex high ensuring
+ // mutex unlocks will awaken the requeued waiter thread.
+ done = futex(&sequence_, FUTEX_CMP_REQUEUE, 0,
reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
- &guard_.state_, cur_state) != -1;
+ &guard_.state_, cur_sequence) != -1;
if (!done) {
if (errno != EAGAIN) {
PLOG(FATAL) << "futex cmp requeue failed for " << name_;
}
- } else {
- // Successful requeue, add the requeued waiters to the contenders of the guard_ to ensure
- // that unlocks of guard_ will wake the waiters. Reflect that we've requeued the waiters
- // in the awoken count.
- DCHECK_EQ(num_awoken_ + num_requeued, num_waiters_);
- android_atomic_add(num_requeued, &guard_.num_contenders_);
- num_awoken_ = num_waiters_;
}
} while (!done);
}
@@ -749,17 +741,13 @@
DCHECK(self == NULL || self == Thread::Current());
guard_.AssertExclusiveHeld(self);
#if ART_USE_FUTEXES
- if (num_waiters_ > num_awoken_) {
- android_atomic_inc(&state_); // Indicate a signal occurred.
+ if (num_waiters_ > 0) {
+ android_atomic_inc(&sequence_); // Indicate a signal occurred.
// Futex wake 1 waiter who will then come and in contend on mutex. It'd be nice to requeue them
// to avoid this, however, requeueing can only move all waiters.
- int num_woken = futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0);
- // Check something was woken or else we changed state_ before they had chance to wait.
+ int num_woken = futex(&sequence_, FUTEX_WAKE, 1, NULL, NULL, 0);
+ // Check something was woken or else we changed sequence_ before they had chance to wait.
CHECK((num_woken == 0) || (num_woken == 1));
- // We weren't requeued, however, to make accounting simpler in the Wait code, increment the
- // number of contenders on the mutex.
- num_awoken_++;
- android_atomic_inc(&guard_.num_contenders_);
}
#else
CHECK_MUTEX_CALL(pthread_cond_signal, (&cond_));
@@ -771,31 +759,26 @@
guard_.AssertExclusiveHeld(self);
unsigned int old_recursion_count = guard_.recursion_count_;
#if ART_USE_FUTEXES
- int32_t cur_state = state_;
num_waiters_++;
+ // Ensure the Mutex is contended so that requeued threads are awoken.
+ android_atomic_inc(&guard_.num_contenders_);
guard_.recursion_count_ = 1;
+ int32_t cur_sequence = sequence_;
guard_.ExclusiveUnlock(self);
- while (futex(&state_, FUTEX_WAIT, cur_state, NULL, NULL, 0) != 0) {
- if ((errno == EINTR) || (errno == EAGAIN)) {
- if (state_ != cur_state) {
- // We failed and a ConditionVariable::Signal has come in.
- break;
- }
- } else {
+ if (futex(&sequence_, FUTEX_WAIT, cur_sequence, NULL, NULL, 0) != 0) {
+ // Futex failed, check it is an expected error.
+ // EAGAIN == EWOULDBLK, so we let the caller try again.
+ // EINTR implies a signal was sent to this thread.
+ if ((errno != EINTR) && (errno != EAGAIN)) {
PLOG(FATAL) << "futex wait failed for " << name_;
}
}
guard_.ExclusiveLock(self);
- CHECK_NE(num_waiters_, 0);
+ CHECK_GE(num_waiters_, 0);
num_waiters_--;
- if (num_awoken_ > 0) {
- // Note: there is a subtle race where a single signal may appear to wake multiple threads due
- // to state_ changing. We detect this race using num_awoken_ and the mutual exclusion
- // guaranteed by guard_.
- CHECK_NE(guard_.num_contenders_, 0);
- android_atomic_dec(&guard_.num_contenders_);
- num_awoken_--;
- }
+ // We awoke and so no longer require awakes from the guard_'s unlock.
+ CHECK_GE(guard_.num_contenders_, 0);
+ android_atomic_dec(&guard_.num_contenders_);
#else
guard_.recursion_count_ = 0;
CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &guard_.mutex_));
@@ -808,48 +791,29 @@
guard_.AssertExclusiveHeld(self);
unsigned int old_recursion_count = guard_.recursion_count_;
#if ART_USE_FUTEXES
- // Record the original end time so that if the futex call fails we can recompute the appropriate
- // relative time.
- timespec end_abs_ts;
- InitTimeSpec(true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
timespec rel_ts;
InitTimeSpec(false, CLOCK_REALTIME, ms, ns, &rel_ts);
- // Read state so that we can know if a signal comes in during before we sleep.
- int32_t cur_state = state_;
num_waiters_++;
+ // Ensure the Mutex is contended so that requeued threads are awoken.
+ android_atomic_inc(&guard_.num_contenders_);
guard_.recursion_count_ = 1;
+ int32_t cur_sequence = sequence_;
guard_.ExclusiveUnlock(self);
- while (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
+ if (futex(&sequence_, FUTEX_WAIT, cur_sequence, &rel_ts, NULL, 0) != 0) {
if (errno == ETIMEDOUT) {
- break; // Timed out we're done.
+ // Timed out we're done.
} else if ((errno == EINTR) || (errno == EAGAIN)) {
- if (state_ != cur_state) {
- break; // We failed and a ConditionVariable::Signal has come in.
- }
- timespec now_abs_ts;
- InitTimeSpec(true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
- if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
- // futex failed and we timed out in the meantime.
- break;
- }
+ // A signal or ConditionVariable::Signal/Broadcast has come in.
} else {
PLOG(FATAL) << "timed futex wait failed for " << name_;
}
}
guard_.ExclusiveLock(self);
+ CHECK_GE(num_waiters_, 0);
num_waiters_--;
- if ((cur_state != state_) && (num_awoken_ > 0)) {
- // TODO: We were woken and didn't timeout (ie state_ changed - ignoring overflow). This check
- // is racy given the overflow, however, getting a good causal picture from errno is complex
- // and racy. From practice it hasn't yet been found to work, so we live with the highly
- // unlikely race.
- // Note: there is a subtle race where a single signal may appear to wake multiple threads due
- // to state_ changing. We detect this race using num_awoken_ and the mutual exclusion
- // guaranteed by guard_.
- CHECK_NE(guard_.num_contenders_, 0);
- android_atomic_dec(&guard_.num_contenders_);
- num_awoken_--;
- }
+ // We awoke and so no longer require awakes from the guard_'s unlock.
+ CHECK_GE(guard_.num_contenders_, 0);
+ android_atomic_dec(&guard_.num_contenders_);
#else
#ifdef HAVE_TIMEDWAIT_MONOTONIC
#define TIMEDWAIT pthread_cond_timedwait_monotonic