blob: 26c83d230d26d9626faa84b09dbe64f604b5dc76 [file] [log] [blame]
Mathieu Chartier02b6a782012-10-26 13:51:26 -07001#include "casts.h"
Mathieu Chartier0e4627e2012-10-23 16:13:36 -07002#include "runtime.h"
3#include "stl_util.h"
4#include "thread.h"
5#include "thread_pool.h"
6
7namespace art {
8
9ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name,
10 size_t stack_size)
11 : thread_pool_(thread_pool),
12 name_(name),
13 stack_size_(stack_size) {
14 const char* reason = "new thread pool worker thread";
Brian Carlstrombcc29262012-11-02 11:36:03 -070015 pthread_attr_t attr;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070016 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason);
17 CHECK_PTHREAD_CALL(pthread_attr_setstacksize, (&attr, stack_size), reason);
18 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason);
19 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason);
20}
21
22ThreadPoolWorker::~ThreadPoolWorker() {
23 CHECK_PTHREAD_CALL(pthread_join, (pthread_, NULL), "thread pool worker shutdown");
24}
25
26void ThreadPoolWorker::Run() {
27 Thread* self = Thread::Current();
Mathieu Chartier02b6a782012-10-26 13:51:26 -070028 Task* task = NULL;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070029 while ((task = thread_pool_->GetTask(self)) != NULL) {
30 task->Run(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -070031 task->Finalize();
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070032 }
33}
34
35void* ThreadPoolWorker::Callback(void* arg) {
36 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg);
37 Runtime* runtime = Runtime::Current();
38 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), true, NULL));
39 // Do work until its time to shut down.
40 worker->Run();
41 runtime->DetachCurrentThread();
42 return NULL;
43}
44
Mathieu Chartier02b6a782012-10-26 13:51:26 -070045void ThreadPool::AddTask(Thread* self, Task* task){
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070046 MutexLock mu(self, task_queue_lock_);
47 tasks_.push_back(task);
48 // If we have any waiters, signal one.
49 if (waiting_count_ != 0) {
50 task_queue_condition_.Signal(self);
51 }
52}
53
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070054ThreadPool::ThreadPool(size_t num_threads)
55 : task_queue_lock_("task queue lock"),
56 task_queue_condition_("task queue condition", task_queue_lock_),
57 completion_condition_("task completion condition", task_queue_lock_),
58 started_(false),
59 shutting_down_(false),
60 waiting_count_(0) {
61 while (GetThreadCount() < num_threads) {
Mathieu Chartier02b6a782012-10-26 13:51:26 -070062 const std::string name = StringPrintf("Thread pool worker %zu", GetThreadCount());
63 threads_.push_back(new ThreadPoolWorker(this, name, ThreadPoolWorker::kDefaultStackSize));
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070064 }
65}
66
67ThreadPool::~ThreadPool() {
Mathieu Chartiere46cd752012-10-31 16:56:18 -070068 {
69 Thread* self = Thread::Current();
70 MutexLock mu(self, task_queue_lock_);
71 // Tell any remaining workers to shut down.
72 shutting_down_ = true;
Mathieu Chartiere46cd752012-10-31 16:56:18 -070073 // Broadcast to everyone waiting.
74 task_queue_condition_.Broadcast(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -070075 completion_condition_.Broadcast(self);
Mathieu Chartiere46cd752012-10-31 16:56:18 -070076 }
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070077 // Wait for the threads to finish.
78 STLDeleteElements(&threads_);
79}
80
81void ThreadPool::StartWorkers(Thread* self) {
82 MutexLock mu(self, task_queue_lock_);
83 started_ = true;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070084 task_queue_condition_.Broadcast(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -070085 start_time_ = NanoTime();
86 total_wait_time_ = 0;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070087}
88
89void ThreadPool::StopWorkers(Thread* self) {
90 MutexLock mu(self, task_queue_lock_);
91 started_ = false;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070092}
93
Mathieu Chartier02b6a782012-10-26 13:51:26 -070094Task* ThreadPool::GetTask(Thread* self) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070095 MutexLock mu(self, task_queue_lock_);
Mathieu Chartier02b6a782012-10-26 13:51:26 -070096 while (!IsShuttingDown()) {
97 Task* task = TryGetTaskLocked(self);
98 if (task != NULL) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070099 return task;
100 }
101
102 waiting_count_++;
103 if (waiting_count_ == GetThreadCount() && tasks_.empty()) {
104 // We may be done, lets broadcast to the completion condition.
105 completion_condition_.Broadcast(self);
106 }
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700107 const uint64_t wait_start = NanoTime();
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700108 task_queue_condition_.Wait(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700109 const uint64_t wait_end = NanoTime();
110 total_wait_time_ += wait_end - std::max(wait_start, start_time_);
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700111 waiting_count_--;
112 }
113
114 // We are shutting down, return NULL to tell the worker thread to stop looping.
115 return NULL;
116}
117
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700118Task* ThreadPool::TryGetTask(Thread* self) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700119 MutexLock mu(self, task_queue_lock_);
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700120 return TryGetTaskLocked(self);
121}
122
123Task* ThreadPool::TryGetTaskLocked(Thread* self) {
124 if (started_ && !tasks_.empty()) {
125 Task* task = tasks_.front();
126 tasks_.pop_front();
127 return task;
128 }
129 return NULL;
130}
131
132void ThreadPool::Wait(Thread* self, bool do_work) {
133 Task* task = NULL;
134 while ((task = TryGetTask(self)) != NULL) {
135 task->Run(self);
136 task->Finalize();
137 }
138
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700139 // Wait until each thread is waiting and the task list is empty.
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700140 MutexLock mu(self, task_queue_lock_);
141 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || !tasks_.empty())) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700142 completion_condition_.Wait(self);
143 }
144}
145
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700146size_t ThreadPool::GetTaskCount(Thread* self){
147 MutexLock mu(self, task_queue_lock_);
148 return tasks_.size();
149}
150
151WorkStealingWorker::WorkStealingWorker(ThreadPool* thread_pool, const std::string& name,
152 size_t stack_size)
153 : ThreadPoolWorker(thread_pool, name, stack_size),
154 task_(NULL) {
155
156}
157
158void WorkStealingWorker::Run() {
159 Thread* self = Thread::Current();
160 Task* task = NULL;
161 WorkStealingThreadPool* thread_pool = down_cast<WorkStealingThreadPool*>(thread_pool_);
162 while ((task = thread_pool_->GetTask(self)) != NULL) {
163 WorkStealingTask* stealing_task = down_cast<WorkStealingTask*>(task);
164
165 {
166 CHECK(task_ == NULL);
167 MutexLock mu(self, thread_pool->work_steal_lock_);
168 // Register that we are running the task
169 ++stealing_task->ref_count_;
170 task_ = stealing_task;
171 }
172 stealing_task->Run(self);
173 // Mark ourselves as not running a task so that nobody tries to steal from us.
174 // There is a race condition that someone starts stealing from us at this point. This is okay
175 // due to the reference counting.
176 task_ = NULL;
177
178 bool finalize;
179
180 // Steal work from tasks until there is none left to steal. Note: There is a race, but
181 // all that happens when the race occurs is that we steal some work instead of processing a
182 // task from the queue.
183 while (thread_pool->GetTaskCount(self) == 0) {
184 WorkStealingTask* steal_from_task = NULL;
185
186 {
187 MutexLock mu(self, thread_pool->work_steal_lock_);
188 // Try finding a task to steal from.
189 steal_from_task = thread_pool->FindTaskToStealFrom(self);
190 if (steal_from_task != NULL) {
191 CHECK_NE(stealing_task, steal_from_task)
192 << "Attempting to steal from completed self task";
193 steal_from_task->ref_count_++;
194 } else {
195 break;
196 }
197 }
198
199 if (steal_from_task != NULL) {
200 // Task which completed earlier is going to steal some work.
201 stealing_task->StealFrom(self, steal_from_task);
202
203 {
204 // We are done stealing from the task, lets decrement its reference count.
205 MutexLock mu(self, thread_pool->work_steal_lock_);
206 finalize = !--steal_from_task->ref_count_;
207 }
208
209 if (finalize) {
210 steal_from_task->Finalize();
211 }
212 }
213 }
214
215 {
216 MutexLock mu(self, thread_pool->work_steal_lock_);
217 // If nobody is still referencing task_ we can finalize it.
218 finalize = !--stealing_task->ref_count_;
219 }
220
221 if (finalize) {
222 stealing_task->Finalize();
223 }
224 }
225}
226
227WorkStealingWorker::~WorkStealingWorker() {
228
229}
230
231WorkStealingThreadPool::WorkStealingThreadPool(size_t num_threads)
232 : ThreadPool(0),
233 work_steal_lock_("work stealing lock"),
234 steal_index_(0) {
235 while (GetThreadCount() < num_threads) {
236 const std::string name = StringPrintf("Work stealing worker %zu", GetThreadCount());
237 threads_.push_back(new WorkStealingWorker(this, name, ThreadPoolWorker::kDefaultStackSize));
238 }
239}
240
241WorkStealingTask* WorkStealingThreadPool::FindTaskToStealFrom(Thread* self) {
242 const size_t thread_count = GetThreadCount();
243 for (size_t i = 0; i < thread_count; ++i) {
244 // TODO: Use CAS instead of lock.
245 ++steal_index_;
246 if (steal_index_ >= thread_count) {
247 steal_index_-= thread_count;
248 }
249
250 WorkStealingWorker* worker = down_cast<WorkStealingWorker*>(threads_[steal_index_]);
251 WorkStealingTask* task = worker->task_;
252 if (task) {
253 // Not null, we can probably steal from this worker.
254 return task;
255 }
256 }
257 // Couldn't find something to steal.
258 return NULL;
259}
260
261WorkStealingThreadPool::~WorkStealingThreadPool() {
262
263}
264
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700265} // namespace art