Implement mutex requeueing for cv broadcasts.
Make the mutex guarding a condition variable part of its state. On a
broadcast requeue waiters on the mutex so they are awoken as the mutex
is unlocked (thereby avoiding thundering herds). Explicit futex use
still guarded behind ART_USE_FUTEXES which remains disabled as I'm
unhappy with some of the warts of mutex usage. Uploading so that the API
changes can stabilize.
Change-Id: Iedb601856ccd8bbc3a64da4ba0cee82246e7bcbf
diff --git a/src/mutex.cc b/src/mutex.cc
index 96e04ef..18dc863 100644
--- a/src/mutex.cc
+++ b/src/mutex.cc
@@ -17,11 +17,13 @@
#include "mutex.h"
#include <errno.h>
+#include <sys/time.h>
#include "cutils/atomic.h"
#include "cutils/atomic-inline.h"
#include "logging.h"
#include "runtime.h"
+#include "scoped_thread_state_change.h"
#include "thread.h"
#include "utils.h"
@@ -41,8 +43,8 @@
#ifndef SYS_futex
#define SYS_futex __NR_futex
#endif
-int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, int *, int ) {
- return syscall(SYS_futex, uaddr, op, val, timeout, NULL, NULL);
+int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, volatile int *uaddr2, int val3) {
+ return syscall(SYS_futex, uaddr, op, val, timeout, uaddr2, val3);
}
#endif // ART_USE_FUTEXES
@@ -96,6 +98,51 @@
}
}
+// Initialize a timespec to either an absolute or relative time.
+static void InitTimeSpec(Thread* self, bool absolute, int clock, int64_t ms, int32_t ns,
+ timespec* ts) {
+ int64_t endSec;
+
+ if (absolute) {
+ clock_gettime(clock, ts);
+ } else {
+ ts->tv_sec = 0;
+ ts->tv_nsec = 0;
+ }
+ endSec = ts->tv_sec + ms / 1000;
+ if (UNLIKELY(endSec >= 0x7fffffff)) {
+ std::ostringstream ss;
+ ScopedObjectAccess soa(self);
+ self->Dump(ss);
+ LOG(INFO) << "Note: end time exceeds epoch: " << ss.str();
+ endSec = 0x7ffffffe;
+ }
+ ts->tv_sec = endSec;
+ ts->tv_nsec = (ts->tv_nsec + (ms % 1000) * 1000000) + ns;
+
+ // Catch rollover.
+ if (ts->tv_nsec >= 1000000000L) {
+ ts->tv_sec++;
+ ts->tv_nsec -= 1000000000L;
+ }
+}
+
+#if ART_USE_FUTEXES
+static bool ComputeRelativeTimeSpec(timespec* result_ts, const timespec& lhs, const timespec& rhs) {
+ const long int one_sec = 1000 * 1000 * 1000; // one second in nanoseconds.
+ result_ts->tv_sec = lhs.tv_sec - rhs.tv_sec;
+ result_ts->tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
+ if (result_ts->tv_nsec < 0) {
+ result_ts->tv_sec--;
+ result_ts->tv_nsec += one_sec;
+ } else if (result_ts->tv_nsec > one_sec) {
+ result_ts->tv_sec++;
+ result_ts->tv_nsec -= one_sec;
+ }
+ return result_ts->tv_sec < 0;
+}
+#endif
+
BaseMutex::BaseMutex(const char* name, LockLevel level) : level_(level), name_(name) {}
static void CheckUnattachedThread(LockLevel level) NO_THREAD_SAFETY_ANALYSIS {
@@ -176,7 +223,11 @@
Mutex::Mutex(const char* name, LockLevel level, bool recursive)
: BaseMutex(name, level), recursive_(recursive), recursion_count_(0) {
-#if defined(__BIONIC__) || defined(__APPLE__)
+#if ART_USE_FUTEXES
+ state_ = 0;
+ exclusive_owner_ = 0;
+ num_contenders_ = 0;
+#elif defined(__BIONIC__) || defined(__APPLE__)
// Use recursive mutexes for bionic and Apple otherwise the
// non-recursive mutexes don't have TIDs to check lock ownership of.
pthread_mutexattr_t attributes;
@@ -190,6 +241,17 @@
}
Mutex::~Mutex() {
+#if ART_USE_FUTEXES
+ if (state_ != 0) {
+ MutexLock mu(Thread::Current(), *Locks::runtime_shutdown_lock_);
+ Runtime* runtime = Runtime::Current();
+ bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
+ LOG(shutting_down ? WARNING : FATAL) << "destroying mutex with owner: " << exclusive_owner_;
+ } else {
+ CHECK_EQ(exclusive_owner_, 0U) << "unexpectedly found an owner on unlocked mutex " << name_;
+ CHECK_EQ(num_contenders_, 0) << "unexpectedly found a contender on mutex " << name_;
+ }
+#else
// We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
// may still be using locks.
int rc = pthread_mutex_destroy(&mutex_);
@@ -201,6 +263,7 @@
bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
PLOG(shutting_down ? WARNING : FATAL) << "pthread_mutex_destroy failed for " << name_;
}
+#endif
}
void Mutex::ExclusiveLock(Thread* self) {
@@ -209,7 +272,29 @@
AssertNotHeld(self);
}
if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ if (cur_state == 0) {
+ // Change state from 0 to 1.
+ done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+ } else {
+ // Failed to acquire, hang up.
+ android_atomic_inc(&num_contenders_);
+ if (futex(&state_, FUTEX_WAIT, 1, NULL, NULL, 0) != 0) {
+ if (errno != EAGAIN) {
+ PLOG(FATAL) << "futex wait failed for " << name_;
+ }
+ }
+ android_atomic_dec(&num_contenders_);
+ }
+ } while(!done);
+ DCHECK_EQ(state_, 1);
+ exclusive_owner_ = SafeGetTid(self);
+#else
CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_));
+#endif
RegisterAsLocked(self);
}
recursion_count_++;
@@ -226,6 +311,20 @@
AssertNotHeld(self);
}
if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ if (cur_state == 0) {
+ // Change state from 0 to 1.
+ done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+ } else {
+ return false;
+ }
+ } while(!done);
+ DCHECK_EQ(state_, 1);
+ exclusive_owner_ = SafeGetTid(self);
+#else
int result = pthread_mutex_trylock(&mutex_);
if (result == EBUSY) {
return false;
@@ -234,6 +333,7 @@
errno = result;
PLOG(FATAL) << "pthread_mutex_trylock failed for " << name_;
}
+#endif
RegisterAsLocked(self);
}
recursion_count_++;
@@ -255,7 +355,28 @@
<< name_ << " " << recursion_count_;
}
RegisterAsUnlocked(self);
+#if ART_USE_FUTEXES
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ if (cur_state == 1) {
+ // We're no longer the owner.
+ exclusive_owner_ = 0;
+ // Change state to 0.
+ done = android_atomic_cmpxchg(cur_state, 0, &state_) == 0;
+ if (done) { // Spurious fail?
+ // Wake a contender
+ if (num_contenders_ > 0) {
+ futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0);
+ }
+ }
+ } else {
+ LOG(FATAL) << "Unexpected state_:" << cur_state << " for " << name_;
+ }
+ } while(!done);
+#else
CHECK_MUTEX_CALL(pthread_mutex_unlock, (&mutex_));
+#endif
}
}
@@ -272,7 +393,9 @@
}
uint64_t Mutex::GetExclusiveOwnerTid() const {
-#if defined(__BIONIC__)
+#if ART_USE_FUTEXES
+ return exclusive_owner_;
+#elif defined(__BIONIC__)
return static_cast<uint64_t>((mutex_.value >> 16) & 0xffff);
#elif defined(__GLIBC__)
return reinterpret_cast<const glibc_pthread_mutex_t*>(&mutex_)->owner;
@@ -396,10 +519,12 @@
}
#if HAVE_TIMED_RWLOCK
-bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, const timespec& abs_timeout) {
+bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns) {
DCHECK(self == NULL || self == Thread::Current());
#if ART_USE_FUTEXES
bool done = false;
+ timespec end_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
do {
int32_t cur_state = state_;
if (cur_state == 0) {
@@ -407,12 +532,18 @@
done = android_atomic_cmpxchg(0, -1, &state_) == 0;
} else {
// Failed to acquire, hang up.
+ timespec now_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+ timespec rel_ts;
+ if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+ return false; // Timed out.
+ }
android_atomic_inc(&num_pending_writers_);
- if (futex(&state_, FUTEX_WAIT, cur_state, &abs_timeout, NULL, 0) != 0) {
+ if (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
if (errno == ETIMEDOUT) {
android_atomic_dec(&num_pending_writers_);
- return false;
- } else if (errno != EAGAIN) {
+ return false; // Timed out.
+ } else if (errno != EAGAIN && errno != EINTR) {
PLOG(FATAL) << "timed futex wait failed for " << name_;
}
}
@@ -421,7 +552,9 @@
} while(!done);
exclusive_owner_ = SafeGetTid(self);
#else
- int result = pthread_rwlock_timedwrlock(&rwlock_, &abs_timeout);
+ timespec ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &ts);
+ int result = pthread_rwlock_timedwrlock(&rwlock_, &ts);
if (result == ETIMEDOUT) {
return false;
}
@@ -576,11 +709,19 @@
return os << mu.Dump();
}
-ConditionVariable::ConditionVariable(const std::string& name) : name_(name) {
+ConditionVariable::ConditionVariable(const std::string& name, Mutex& guard)
+ : name_(name), guard_(guard) {
+#if ART_USE_FUTEXES
+ state_ = 0;
+ num_waiters_ = 0;
+ num_awoken_ = 0;
+#else
CHECK_MUTEX_CALL(pthread_cond_init, (&cond_, NULL));
+#endif
}
ConditionVariable::~ConditionVariable() {
+#if !ART_USE_FUTEXES
// We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
// may still be using condition variables.
int rc = pthread_cond_destroy(&cond_);
@@ -591,39 +732,162 @@
bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
PLOG(shutting_down ? WARNING : FATAL) << "pthread_cond_destroy failed for " << name_;
}
+#endif
}
-void ConditionVariable::Broadcast() {
+void ConditionVariable::Broadcast(Thread* self) {
+ DCHECK(self == NULL || self == Thread::Current());
+ // TODO: enable below, there's a race in thread creation that causes false failures currently.
+ // guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+ if (num_waiters_ > 0) {
+ android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ // Compute number of waiters requeued and add to mutex contenders.
+ int32_t num_requeued = num_waiters_ - num_awoken_;
+ android_atomic_add(num_requeued, &guard_.num_contenders_);
+ // Requeue waiters onto contenders.
+ done = futex(&state_, FUTEX_CMP_REQUEUE, 0,
+ reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
+ &guard_.state_, cur_state) != -1;
+ if (!done) {
+ if (errno != EAGAIN) {
+ PLOG(FATAL) << "futex cmp requeue failed for " << name_;
+ }
+ } else {
+ num_awoken_ = num_waiters_;
+ }
+ } while (!done);
+ }
+#else
CHECK_MUTEX_CALL(pthread_cond_broadcast, (&cond_));
+#endif
}
-void ConditionVariable::Signal() {
+void ConditionVariable::Signal(Thread* self) {
+ DCHECK(self == NULL || self == Thread::Current());
+ guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+ if (num_waiters_ > 0) {
+ android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+ // Futex wake 1 waiter who will then come and in contend on mutex. It'd be nice to requeue them
+ // to avoid this, however, requeueing can only move all waiters.
+ if (futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0) == 1) {
+ // Wake success.
+ // We weren't requeued, however, to make accounting simpler in the Wait code, increment the
+ // number of contenders on the mutex.
+ num_awoken_++;
+ android_atomic_inc(&guard_.num_contenders_);
+ }
+ }
+#else
CHECK_MUTEX_CALL(pthread_cond_signal, (&cond_));
+#endif
}
-void ConditionVariable::Wait(Thread* self, Mutex& mutex) {
- mutex.CheckSafeToWait(self);
- unsigned int old_recursion_count = mutex.recursion_count_;
- mutex.recursion_count_ = 0;
- CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &mutex.mutex_));
- mutex.recursion_count_ = old_recursion_count;
+void ConditionVariable::Wait(Thread* self) {
+ DCHECK(self == NULL || self == Thread::Current());
+ guard_.AssertExclusiveHeld(self);
+ guard_.CheckSafeToWait(self);
+ unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+ int32_t cur_state = state_;
+ num_waiters_++;
+ guard_.recursion_count_ = 1;
+ guard_.ExclusiveUnlock(self);
+ bool woken = true;
+ while (futex(&state_, FUTEX_WAIT, cur_state, NULL, NULL, 0) != 0) {
+ if (errno == EINTR || errno == EAGAIN) {
+ if (state_ != cur_state) {
+ // We failed and a signal has come in.
+ woken = false;
+ break;
+ }
+ } else {
+ PLOG(FATAL) << "futex wait failed for " << name_;
+ }
+ }
+ guard_.ExclusiveLock(self);
+ num_waiters_--;
+ if (woken) {
+ // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+ android_atomic_dec(&guard_.num_contenders_);
+ num_awoken_--;
+ }
+#else
+ guard_.recursion_count_ = 0;
+ CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &guard_.mutex_));
+#endif
+ guard_.recursion_count_ = old_recursion_count;
}
-void ConditionVariable::TimedWait(Thread* self, Mutex& mutex, const timespec& ts) {
+void ConditionVariable::TimedWait(Thread* self, int64_t ms, int32_t ns) {
+ DCHECK(self == NULL || self == Thread::Current());
+ guard_.AssertExclusiveHeld(self);
+ guard_.CheckSafeToWait(self);
+ unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+ // Record the original end time so that if the futex call fails we can recompute the appropriate
+ // relative time.
+ timespec end_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
+ timespec rel_ts;
+ InitTimeSpec(self, false, CLOCK_REALTIME, ms, ns, &rel_ts);
+ // Read state so that we can know if a signal comes in during before we sleep.
+ int32_t cur_state = state_;
+ num_waiters_++;
+ guard_.recursion_count_ = 1;
+ guard_.ExclusiveUnlock(self);
+ bool woken = true; // Did the futex wait end because we were awoken?
+ while (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
+ if (errno == ETIMEDOUT) {
+ woken = false;
+ break;
+ }
+ if ((errno == EINTR) || (errno == EAGAIN)) {
+ if (state_ != cur_state) {
+ // We failed and a signal has come in.
+ woken = false;
+ break;
+ }
+ timespec now_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+ if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+ // futex failed and we timed out in the meantime.
+ woken = false;
+ break;
+ }
+ } else {
+ PLOG(FATAL) << "timed futex wait failed for " << name_;
+ }
+ }
+ guard_.ExclusiveLock(self);
+ num_waiters_--;
+ if (woken) {
+ // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+ android_atomic_dec(&guard_.num_contenders_);
+ num_awoken_--;
+ }
+#else
#ifdef HAVE_TIMEDWAIT_MONOTONIC
#define TIMEDWAIT pthread_cond_timedwait_monotonic
+ int clock = CLOCK_MONOTONIC;
#else
#define TIMEDWAIT pthread_cond_timedwait
+ int clock = CLOCK_REALTIME;
#endif
- mutex.CheckSafeToWait(self);
- unsigned int old_recursion_count = mutex.recursion_count_;
- mutex.recursion_count_ = 0;
- int rc = TIMEDWAIT(&cond_, &mutex.mutex_, &ts);
- mutex.recursion_count_ = old_recursion_count;
+ guard_.recursion_count_ = 0;
+ timespec ts;
+ InitTimeSpec(self, true, clock, ms, ns, &ts);
+ int rc = TIMEDWAIT(&cond_, &guard_.mutex_, &ts);
if (rc != 0 && rc != ETIMEDOUT) {
errno = rc;
PLOG(FATAL) << "TimedWait failed for " << name_;
}
+#endif
+ guard_.recursion_count_ = old_recursion_count;
}
} // namespace art