| /* |
| This file is part of Valgrind, a dynamic binary instrumentation |
| framework. |
| |
| Copyright (C) 2008-2008 Google Inc |
| opensource@google.com |
| |
| This program is free software; you can redistribute it and/or |
| modify it under the terms of the GNU General Public License as |
| published by the Free Software Foundation; either version 2 of the |
| License, or (at your option) any later version. |
| |
| This program is distributed in the hope that it will be useful, but |
| WITHOUT ANY WARRANTY; without even the implied warranty of |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| General Public License for more details. |
| |
| You should have received a copy of the GNU General Public License |
| along with this program; if not, write to the Free Software |
| Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA |
| 02111-1307, USA. |
| |
| The GNU General Public License is contained in the file COPYING. |
| */ |
| |
| // Author: Konstantin Serebryany <opensource@google.com> |
| // |
| // Here we define few simple classes that wrap pthread primitives. |
| // |
| // We need this to create unit tests for helgrind (or similar tool) |
| // that will work with different threading frameworks. |
| // |
| // If one needs to test helgrind's support for another threading library, |
| // he/she can create a copy of this file and replace pthread_ calls |
| // with appropriate calls to his/her library. |
| // |
| // Note, that some of the methods defined here are annotated with |
| // ANNOTATE_* macros defined in dynamic_annotations.h. |
| // |
| // DISCLAIMER: the classes defined in this header file |
| // are NOT intended for general use -- only for unit tests. |
| // |
| |
| #ifndef THREAD_WRAPPERS_PTHREAD_H |
| #define THREAD_WRAPPERS_PTHREAD_H |
| |
| #include <pthread.h> |
| #include <semaphore.h> |
| #include <unistd.h> |
| #include <queue> |
| #include <stdio.h> |
| #include <limits.h> // INT_MAX |
| |
| #ifdef VGO_darwin |
| #include <libkern/OSAtomic.h> |
| #define NO_BARRIER |
| #define NO_TLS |
| #endif |
| |
| #include <string> |
| using namespace std; |
| |
| #include <sys/time.h> |
| #include <time.h> |
| |
| #include "../../drd/drd.h" |
| #define ANNOTATE_NO_OP(arg) do { } while(0) |
| #define ANNOTATE_EXPECT_RACE(addr, descr) \ |
| ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race") |
| static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND; } |
| |
| #include <assert.h> |
| #ifdef NDEBUG |
| # error "Pleeease, do not define NDEBUG" |
| #endif |
| #define CHECK assert |
| |
| /// Set this to true if malloc() uses mutex on your platform as this may |
| /// introduce a happens-before arc for a pure happens-before race detector. |
| const bool kMallocUsesMutex = false; |
| |
| /// Current time in milliseconds. |
| static inline int64_t GetCurrentTimeMillis() { |
| struct timeval now; |
| gettimeofday(&now, NULL); |
| return now.tv_sec * 1000 + now.tv_usec / 1000; |
| } |
| |
| /// Copy tv to ts adding offset in milliseconds. |
| static inline void timeval2timespec(timeval *const tv, |
| timespec *ts, |
| int64_t offset_milli) { |
| const int64_t ten_9 = 1000000000LL; |
| const int64_t ten_6 = 1000000LL; |
| const int64_t ten_3 = 1000LL; |
| int64_t now_nsec = (int64_t)tv->tv_sec * ten_9; |
| now_nsec += (int64_t)tv->tv_usec * ten_3; |
| int64_t then_nsec = now_nsec + offset_milli * ten_6; |
| ts->tv_sec = then_nsec / ten_9; |
| ts->tv_nsec = then_nsec % ten_9; |
| } |
| |
| |
| class CondVar; |
| |
| #ifndef NO_SPINLOCK |
| /// helgrind does not (yet) support spin locks, so we annotate them. |
| |
| #ifndef VGO_darwin |
| class SpinLock { |
| public: |
| SpinLock() { |
| CHECK(0 == pthread_spin_init(&mu_, 0)); |
| ANNOTATE_RWLOCK_CREATE((void*)&mu_); |
| } |
| ~SpinLock() { |
| ANNOTATE_RWLOCK_DESTROY((void*)&mu_); |
| CHECK(0 == pthread_spin_destroy(&mu_)); |
| } |
| void Lock() { |
| CHECK(0 == pthread_spin_lock(&mu_)); |
| ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1); |
| } |
| void Unlock() { |
| ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1); |
| CHECK(0 == pthread_spin_unlock(&mu_)); |
| } |
| private: |
| pthread_spinlock_t mu_; |
| }; |
| |
| #else |
| |
| class SpinLock { |
| public: |
| // Mac OS X version. |
| SpinLock() : mu_(OS_SPINLOCK_INIT) { |
| ANNOTATE_RWLOCK_CREATE((void*)&mu_); |
| } |
| ~SpinLock() { |
| ANNOTATE_RWLOCK_DESTROY((void*)&mu_); |
| } |
| void Lock() { |
| OSSpinLockLock(&mu_); |
| ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1); |
| } |
| void Unlock() { |
| ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1); |
| OSSpinLockUnlock(&mu_); |
| } |
| private: |
| OSSpinLock mu_; |
| }; |
| #endif // VGO_darwin |
| |
| #endif // NO_SPINLOCK |
| |
| /// Just a boolean condition. Used by Mutex::LockWhen and similar. |
| class Condition { |
| public: |
| typedef bool (*func_t)(void*); |
| |
| template <typename T> |
| Condition(bool (*func)(T*), T* arg) |
| : func_(reinterpret_cast<func_t>(func)), arg_(arg) {} |
| |
| Condition(bool (*func)()) |
| : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {} |
| |
| bool Eval() { return func_(arg_); } |
| private: |
| func_t func_; |
| void *arg_; |
| |
| }; |
| |
| |
| /// Wrapper for pthread_mutex_t. |
| /// |
| /// pthread_mutex_t is *not* a reader-writer lock, |
| /// so the methods like ReaderLock() aren't really reader locks. |
| /// We can not use pthread_rwlock_t because it |
| /// does not work with pthread_cond_t. |
| /// |
| /// TODO: We still need to test reader locks with this class. |
| /// Implement a mode where pthread_rwlock_t will be used |
| /// instead of pthread_mutex_t (only when not used with CondVar or LockWhen). |
| /// |
| class Mutex { |
| friend class CondVar; |
| public: |
| Mutex() { |
| CHECK(0 == pthread_mutex_init(&mu_, NULL)); |
| CHECK(0 == pthread_cond_init(&cv_, NULL)); |
| signal_at_unlock_ = true; // Always signal at Unlock to make |
| // Mutex more friendly to hybrid detectors. |
| } |
| ~Mutex() { |
| CHECK(0 == pthread_cond_destroy(&cv_)); |
| CHECK(0 == pthread_mutex_destroy(&mu_)); |
| } |
| void Lock() { CHECK(0 == pthread_mutex_lock(&mu_));} |
| bool TryLock() { return (0 == pthread_mutex_trylock(&mu_));} |
| void Unlock() { |
| if (signal_at_unlock_) { |
| CHECK(0 == pthread_cond_signal(&cv_)); |
| } |
| CHECK(0 == pthread_mutex_unlock(&mu_)); |
| } |
| void ReaderLock() { Lock(); } |
| bool ReaderTryLock() { return TryLock();} |
| void ReaderUnlock() { Unlock(); } |
| |
| void LockWhen(Condition cond) { Lock(); WaitLoop(cond); } |
| void ReaderLockWhen(Condition cond) { Lock(); WaitLoop(cond); } |
| void Await(Condition cond) { WaitLoop(cond); } |
| |
| bool ReaderLockWhenWithTimeout(Condition cond, int millis) |
| { Lock(); return WaitLoopWithTimeout(cond, millis); } |
| bool LockWhenWithTimeout(Condition cond, int millis) |
| { Lock(); return WaitLoopWithTimeout(cond, millis); } |
| bool AwaitWithTimeout(Condition cond, int millis) |
| { return WaitLoopWithTimeout(cond, millis); } |
| |
| private: |
| |
| void WaitLoop(Condition cond) { |
| signal_at_unlock_ = true; |
| while(cond.Eval() == false) { |
| pthread_cond_wait(&cv_, &mu_); |
| } |
| ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_); |
| } |
| |
| bool WaitLoopWithTimeout(Condition cond, int millis) { |
| struct timeval now; |
| struct timespec timeout; |
| int retcode = 0; |
| gettimeofday(&now, NULL); |
| timeval2timespec(&now, &timeout, millis); |
| |
| signal_at_unlock_ = true; |
| while (cond.Eval() == false && retcode == 0) { |
| retcode = pthread_cond_timedwait(&cv_, &mu_, &timeout); |
| } |
| if(retcode == 0) { |
| ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_); |
| } |
| return cond.Eval(); |
| } |
| |
| // A hack. cv_ should be the first data member so that |
| // ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works. |
| // (See also racecheck_unittest.cc) |
| pthread_cond_t cv_; |
| pthread_mutex_t mu_; |
| bool signal_at_unlock_; // Set to true if Wait was called. |
| }; |
| |
| |
| class MutexLock { // Scoped Mutex Locker/Unlocker |
| public: |
| MutexLock(Mutex *mu) |
| : mu_(mu) { |
| mu_->Lock(); |
| } |
| ~MutexLock() { |
| mu_->Unlock(); |
| } |
| private: |
| Mutex *mu_; |
| }; |
| |
| |
| /// Wrapper for pthread_cond_t. |
| class CondVar { |
| public: |
| CondVar() { CHECK(0 == pthread_cond_init(&cv_, NULL)); } |
| ~CondVar() { CHECK(0 == pthread_cond_destroy(&cv_)); } |
| void Wait(Mutex *mu) { CHECK(0 == pthread_cond_wait(&cv_, &mu->mu_)); } |
| bool WaitWithTimeout(Mutex *mu, int millis) { |
| struct timeval now; |
| struct timespec timeout; |
| gettimeofday(&now, NULL); |
| timeval2timespec(&now, &timeout, millis); |
| return 0 != pthread_cond_timedwait(&cv_, &mu->mu_, &timeout); |
| } |
| void Signal() { CHECK(0 == pthread_cond_signal(&cv_)); } |
| void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_)); } |
| private: |
| pthread_cond_t cv_; |
| }; |
| |
| |
| // pthreads do not allow to use condvar with rwlock so we can't make |
| // ReaderLock method of Mutex to be the real rw-lock. |
| // So, we need a special lock class to test reader locks. |
| #define NEEDS_SEPERATE_RW_LOCK |
| class RWLock { |
| public: |
| RWLock() { CHECK(0 == pthread_rwlock_init(&mu_, NULL)); } |
| ~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_)); } |
| void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_)); } |
| void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_)); } |
| void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); } |
| void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); } |
| private: |
| pthread_cond_t dummy; // Damn, this requires some redesign... |
| pthread_rwlock_t mu_; |
| }; |
| |
| class ReaderLockScoped { // Scoped RWLock Locker/Unlocker |
| public: |
| ReaderLockScoped(RWLock *mu) |
| : mu_(mu) { |
| mu_->ReaderLock(); |
| } |
| ~ReaderLockScoped() { |
| mu_->ReaderUnlock(); |
| } |
| private: |
| RWLock *mu_; |
| }; |
| |
| class WriterLockScoped { // Scoped RWLock Locker/Unlocker |
| public: |
| WriterLockScoped(RWLock *mu) |
| : mu_(mu) { |
| mu_->Lock(); |
| } |
| ~WriterLockScoped() { |
| mu_->Unlock(); |
| } |
| private: |
| RWLock *mu_; |
| }; |
| |
| |
| |
| |
| /// Wrapper for pthread_create()/pthread_join(). |
| class MyThread { |
| public: |
| typedef void *(*worker_t)(void*); |
| |
| MyThread(worker_t worker, void *arg = NULL, const char *name = NULL) |
| :w_(worker), arg_(arg), name_(name) {} |
| MyThread(void (*worker)(void), void *arg = NULL, const char *name = NULL) |
| :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {} |
| MyThread(void (*worker)(void *), void *arg = NULL, const char *name = NULL) |
| :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {} |
| |
| ~MyThread(){ w_ = NULL; arg_ = NULL;} |
| void Start() { CHECK(0 == pthread_create(&t_, NULL, (worker_t)ThreadBody, this));} |
| void Join() { CHECK(0 == pthread_join(t_, NULL));} |
| pthread_t tid() const { return t_; } |
| private: |
| static void ThreadBody(MyThread *my_thread) { |
| if (my_thread->name_) { |
| ANNOTATE_THREAD_NAME(my_thread->name_); |
| } |
| my_thread->w_(my_thread->arg_); |
| } |
| pthread_t t_; |
| worker_t w_; |
| void *arg_; |
| const char *name_; |
| }; |
| |
| |
| /// Just a message queue. |
| class ProducerConsumerQueue { |
| public: |
| ProducerConsumerQueue(int unused) { |
| //ANNOTATE_PCQ_CREATE(this); |
| } |
| ~ProducerConsumerQueue() { |
| CHECK(q_.empty()); |
| //ANNOTATE_PCQ_DESTROY(this); |
| } |
| |
| // Put. |
| void Put(void *item) { |
| mu_.Lock(); |
| q_.push(item); |
| ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get() |
| //ANNOTATE_PCQ_PUT(this); |
| mu_.Unlock(); |
| } |
| |
| // Get. |
| // Blocks if the queue is empty. |
| void *Get() { |
| mu_.LockWhen(Condition(IsQueueNotEmpty, &q_)); |
| void * item = NULL; |
| bool ok = TryGetInternal(&item); |
| CHECK(ok); |
| mu_.Unlock(); |
| return item; |
| } |
| |
| // If queue is not empty, |
| // remove an element from queue, put it into *res and return true. |
| // Otherwise return false. |
| bool TryGet(void **res) { |
| mu_.Lock(); |
| bool ok = TryGetInternal(res); |
| mu_.Unlock(); |
| return ok; |
| } |
| |
| private: |
| Mutex mu_; |
| std::queue<void*> q_; // protected by mu_ |
| |
| // Requires mu_ |
| bool TryGetInternal(void ** item_ptr) { |
| if (q_.empty()) |
| return false; |
| *item_ptr = q_.front(); |
| q_.pop(); |
| //ANNOTATE_PCQ_GET(this); |
| return true; |
| } |
| |
| static bool IsQueueNotEmpty(std::queue<void*> * queue) { |
| return !queue->empty(); |
| } |
| }; |
| |
| |
| |
| /// Function pointer with zero, one or two parameters. |
| struct Closure { |
| typedef void (*F0)(); |
| typedef void (*F1)(void *arg1); |
| typedef void (*F2)(void *arg1, void *arg2); |
| int n_params; |
| void *f; |
| void *param1; |
| void *param2; |
| |
| void Execute() { |
| if (n_params == 0) { |
| (F0(f))(); |
| } else if (n_params == 1) { |
| (F1(f))(param1); |
| } else { |
| CHECK(n_params == 2); |
| (F2(f))(param1, param2); |
| } |
| delete this; |
| } |
| }; |
| |
| Closure *NewCallback(void (*f)()) { |
| Closure *res = new Closure; |
| res->n_params = 0; |
| res->f = (void*)(f); |
| res->param1 = NULL; |
| res->param2 = NULL; |
| return res; |
| } |
| |
| template <class P1> |
| Closure *NewCallback(void (*f)(P1), P1 p1) { |
| CHECK(sizeof(P1) <= sizeof(void*)); |
| Closure *res = new Closure; |
| res->n_params = 1; |
| res->f = (void*)(f); |
| res->param1 = (void*)p1; |
| res->param2 = NULL; |
| return res; |
| } |
| |
| template <class T, class P1, class P2> |
| Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) { |
| CHECK(sizeof(P1) <= sizeof(void*)); |
| Closure *res = new Closure; |
| res->n_params = 2; |
| res->f = (void*)(f); |
| res->param1 = (void*)p1; |
| res->param2 = (void*)p2; |
| return res; |
| } |
| |
| /*! A thread pool that uses ProducerConsumerQueue. |
| Usage: |
| { |
| ThreadPool pool(n_workers); |
| pool.StartWorkers(); |
| pool.Add(NewCallback(func_with_no_args)); |
| pool.Add(NewCallback(func_with_one_arg, arg)); |
| pool.Add(NewCallback(func_with_two_args, arg1, arg2)); |
| ... // more calls to pool.Add() |
| |
| // the ~ThreadPool() is called: we wait workers to finish |
| // and then join all threads in the pool. |
| } |
| */ |
| class ThreadPool { |
| public: |
| //! Create n_threads threads, but do not start. |
| explicit ThreadPool(int n_threads) |
| : queue_(INT_MAX) { |
| for (int i = 0; i < n_threads; i++) { |
| MyThread *thread = new MyThread(&ThreadPool::Worker, this); |
| workers_.push_back(thread); |
| } |
| } |
| |
| //! Start all threads. |
| void StartWorkers() { |
| for (size_t i = 0; i < workers_.size(); i++) { |
| workers_[i]->Start(); |
| } |
| } |
| |
| //! Add a closure. |
| void Add(Closure *closure) { |
| queue_.Put(closure); |
| } |
| |
| int num_threads() { return workers_.size();} |
| |
| //! Wait workers to finish, then join all threads. |
| ~ThreadPool() { |
| for (size_t i = 0; i < workers_.size(); i++) { |
| Add(NULL); |
| } |
| for (size_t i = 0; i < workers_.size(); i++) { |
| workers_[i]->Join(); |
| delete workers_[i]; |
| } |
| } |
| private: |
| std::vector<MyThread*> workers_; |
| ProducerConsumerQueue queue_; |
| |
| static void *Worker(void *p) { |
| ThreadPool *pool = reinterpret_cast<ThreadPool*>(p); |
| while (true) { |
| Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get()); |
| if(closure == NULL) { |
| return NULL; |
| } |
| closure->Execute(); |
| } |
| } |
| }; |
| |
| #ifndef NO_BARRIER |
| /// Wrapper for pthread_barrier_t. |
| class Barrier{ |
| public: |
| explicit Barrier(int n_threads) {CHECK(0 == pthread_barrier_init(&b_, 0, n_threads));} |
| ~Barrier() {CHECK(0 == pthread_barrier_destroy(&b_));} |
| void Block() { |
| // helgrind 3.3.0 does not have an interceptor for barrier. |
| // but our current local version does. |
| // ANNOTATE_CONDVAR_SIGNAL(this); |
| pthread_barrier_wait(&b_); |
| // ANNOTATE_CONDVAR_WAIT(this, this); |
| } |
| private: |
| pthread_barrier_t b_; |
| }; |
| |
| #endif // NO_BARRIER |
| |
| class BlockingCounter { |
| public: |
| explicit BlockingCounter(int initial_count) : |
| count_(initial_count) {} |
| bool DecrementCount() { |
| MutexLock lock(&mu_); |
| count_--; |
| return count_ == 0; |
| } |
| void Wait() { |
| mu_.LockWhen(Condition(&IsZero, &count_)); |
| mu_.Unlock(); |
| } |
| private: |
| static bool IsZero(int *arg) { return *arg == 0; } |
| Mutex mu_; |
| int count_; |
| }; |
| |
| int AtomicIncrement(volatile int *value, int increment); |
| |
| #ifndef VGO_darwin |
| inline int AtomicIncrement(volatile int *value, int increment) { |
| return __sync_add_and_fetch(value, increment); |
| } |
| |
| #else |
| // Mac OS X version. |
| inline int AtomicIncrement(volatile int *value, int increment) { |
| return OSAtomicAdd32(increment, value); |
| } |
| |
| // TODO(timurrrr) this is a hack |
| #define memalign(A,B) malloc(B) |
| |
| // TODO(timurrrr) this is a hack |
| int posix_memalign(void **out, size_t al, size_t size) { |
| *out = memalign(al, size); |
| return (*out == 0); |
| } |
| #endif // VGO_darwin |
| |
| #endif // THREAD_WRAPPERS_PTHREAD_H |
| // vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker |