blob: a06f9d3bffc0e8d699884f11f8532d0251bda42a [file] [log] [blame]
Elliott Hughes1aa246d2012-12-13 09:29:36 -08001/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070017#include "thread_pool.h"
18
Elliott Hughes1aa246d2012-12-13 09:29:36 -080019#include "base/casts.h"
20#include "base/stl_util.h"
21#include "runtime.h"
22#include "thread.h"
23
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070024namespace art {
25
26ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name,
27 size_t stack_size)
28 : thread_pool_(thread_pool),
29 name_(name),
30 stack_size_(stack_size) {
31 const char* reason = "new thread pool worker thread";
Brian Carlstrombcc29262012-11-02 11:36:03 -070032 pthread_attr_t attr;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070033 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason);
34 CHECK_PTHREAD_CALL(pthread_attr_setstacksize, (&attr, stack_size), reason);
35 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason);
36 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason);
37}
38
39ThreadPoolWorker::~ThreadPoolWorker() {
40 CHECK_PTHREAD_CALL(pthread_join, (pthread_, NULL), "thread pool worker shutdown");
41}
42
43void ThreadPoolWorker::Run() {
44 Thread* self = Thread::Current();
Mathieu Chartier02b6a782012-10-26 13:51:26 -070045 Task* task = NULL;
Mathieu Chartier35883cc2012-11-13 14:08:12 -080046 thread_pool_->creation_barier_.Wait(self);
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070047 while ((task = thread_pool_->GetTask(self)) != NULL) {
48 task->Run(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -070049 task->Finalize();
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070050 }
51}
52
53void* ThreadPoolWorker::Callback(void* arg) {
54 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg);
55 Runtime* runtime = Runtime::Current();
Mathieu Chartier664bebf2012-11-12 16:54:11 -080056 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), true, NULL, false));
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070057 // Do work until its time to shut down.
58 worker->Run();
59 runtime->DetachCurrentThread();
60 return NULL;
61}
62
Mathieu Chartier02b6a782012-10-26 13:51:26 -070063void ThreadPool::AddTask(Thread* self, Task* task){
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070064 MutexLock mu(self, task_queue_lock_);
65 tasks_.push_back(task);
66 // If we have any waiters, signal one.
67 if (waiting_count_ != 0) {
68 task_queue_condition_.Signal(self);
69 }
70}
71
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070072ThreadPool::ThreadPool(size_t num_threads)
73 : task_queue_lock_("task queue lock"),
74 task_queue_condition_("task queue condition", task_queue_lock_),
75 completion_condition_("task completion condition", task_queue_lock_),
76 started_(false),
77 shutting_down_(false),
Mathieu Chartier35883cc2012-11-13 14:08:12 -080078 waiting_count_(0),
79 // Add one since the caller of constructor waits on the barrier too.
80 creation_barier_(num_threads + 1) {
81 Thread* self = Thread::Current();
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070082 while (GetThreadCount() < num_threads) {
Mathieu Chartier02b6a782012-10-26 13:51:26 -070083 const std::string name = StringPrintf("Thread pool worker %zu", GetThreadCount());
84 threads_.push_back(new ThreadPoolWorker(this, name, ThreadPoolWorker::kDefaultStackSize));
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070085 }
Mathieu Chartier35883cc2012-11-13 14:08:12 -080086 // Wait for all of the threads to attach.
87 creation_barier_.Wait(self);
Mathieu Chartier0e4627e2012-10-23 16:13:36 -070088}
89
90ThreadPool::~ThreadPool() {
Mathieu Chartiere46cd752012-10-31 16:56:18 -070091 {
92 Thread* self = Thread::Current();
93 MutexLock mu(self, task_queue_lock_);
94 // Tell any remaining workers to shut down.
95 shutting_down_ = true;
Mathieu Chartiere46cd752012-10-31 16:56:18 -070096 // Broadcast to everyone waiting.
97 task_queue_condition_.Broadcast(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -070098 completion_condition_.Broadcast(self);
Mathieu Chartiere46cd752012-10-31 16:56:18 -070099 }
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700100 // Wait for the threads to finish.
101 STLDeleteElements(&threads_);
102}
103
104void ThreadPool::StartWorkers(Thread* self) {
105 MutexLock mu(self, task_queue_lock_);
106 started_ = true;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700107 task_queue_condition_.Broadcast(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700108 start_time_ = NanoTime();
109 total_wait_time_ = 0;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700110}
111
112void ThreadPool::StopWorkers(Thread* self) {
113 MutexLock mu(self, task_queue_lock_);
114 started_ = false;
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700115}
116
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700117Task* ThreadPool::GetTask(Thread* self) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700118 MutexLock mu(self, task_queue_lock_);
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700119 while (!IsShuttingDown()) {
120 Task* task = TryGetTaskLocked(self);
121 if (task != NULL) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700122 return task;
123 }
124
125 waiting_count_++;
126 if (waiting_count_ == GetThreadCount() && tasks_.empty()) {
127 // We may be done, lets broadcast to the completion condition.
128 completion_condition_.Broadcast(self);
129 }
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700130 const uint64_t wait_start = NanoTime();
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700131 task_queue_condition_.Wait(self);
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700132 const uint64_t wait_end = NanoTime();
133 total_wait_time_ += wait_end - std::max(wait_start, start_time_);
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700134 waiting_count_--;
135 }
136
137 // We are shutting down, return NULL to tell the worker thread to stop looping.
138 return NULL;
139}
140
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700141Task* ThreadPool::TryGetTask(Thread* self) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700142 MutexLock mu(self, task_queue_lock_);
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700143 return TryGetTaskLocked(self);
144}
145
146Task* ThreadPool::TryGetTaskLocked(Thread* self) {
147 if (started_ && !tasks_.empty()) {
148 Task* task = tasks_.front();
149 tasks_.pop_front();
150 return task;
151 }
152 return NULL;
153}
154
155void ThreadPool::Wait(Thread* self, bool do_work) {
156 Task* task = NULL;
157 while ((task = TryGetTask(self)) != NULL) {
158 task->Run(self);
159 task->Finalize();
160 }
161
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700162 // Wait until each thread is waiting and the task list is empty.
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700163 MutexLock mu(self, task_queue_lock_);
164 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || !tasks_.empty())) {
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700165 completion_condition_.Wait(self);
166 }
167}
168
Mathieu Chartier02b6a782012-10-26 13:51:26 -0700169size_t ThreadPool::GetTaskCount(Thread* self){
170 MutexLock mu(self, task_queue_lock_);
171 return tasks_.size();
172}
173
174WorkStealingWorker::WorkStealingWorker(ThreadPool* thread_pool, const std::string& name,
175 size_t stack_size)
176 : ThreadPoolWorker(thread_pool, name, stack_size),
177 task_(NULL) {
178
179}
180
181void WorkStealingWorker::Run() {
182 Thread* self = Thread::Current();
183 Task* task = NULL;
184 WorkStealingThreadPool* thread_pool = down_cast<WorkStealingThreadPool*>(thread_pool_);
185 while ((task = thread_pool_->GetTask(self)) != NULL) {
186 WorkStealingTask* stealing_task = down_cast<WorkStealingTask*>(task);
187
188 {
189 CHECK(task_ == NULL);
190 MutexLock mu(self, thread_pool->work_steal_lock_);
191 // Register that we are running the task
192 ++stealing_task->ref_count_;
193 task_ = stealing_task;
194 }
195 stealing_task->Run(self);
196 // Mark ourselves as not running a task so that nobody tries to steal from us.
197 // There is a race condition that someone starts stealing from us at this point. This is okay
198 // due to the reference counting.
199 task_ = NULL;
200
201 bool finalize;
202
203 // Steal work from tasks until there is none left to steal. Note: There is a race, but
204 // all that happens when the race occurs is that we steal some work instead of processing a
205 // task from the queue.
206 while (thread_pool->GetTaskCount(self) == 0) {
207 WorkStealingTask* steal_from_task = NULL;
208
209 {
210 MutexLock mu(self, thread_pool->work_steal_lock_);
211 // Try finding a task to steal from.
212 steal_from_task = thread_pool->FindTaskToStealFrom(self);
213 if (steal_from_task != NULL) {
214 CHECK_NE(stealing_task, steal_from_task)
215 << "Attempting to steal from completed self task";
216 steal_from_task->ref_count_++;
217 } else {
218 break;
219 }
220 }
221
222 if (steal_from_task != NULL) {
223 // Task which completed earlier is going to steal some work.
224 stealing_task->StealFrom(self, steal_from_task);
225
226 {
227 // We are done stealing from the task, lets decrement its reference count.
228 MutexLock mu(self, thread_pool->work_steal_lock_);
229 finalize = !--steal_from_task->ref_count_;
230 }
231
232 if (finalize) {
233 steal_from_task->Finalize();
234 }
235 }
236 }
237
238 {
239 MutexLock mu(self, thread_pool->work_steal_lock_);
240 // If nobody is still referencing task_ we can finalize it.
241 finalize = !--stealing_task->ref_count_;
242 }
243
244 if (finalize) {
245 stealing_task->Finalize();
246 }
247 }
248}
249
250WorkStealingWorker::~WorkStealingWorker() {
251
252}
253
254WorkStealingThreadPool::WorkStealingThreadPool(size_t num_threads)
255 : ThreadPool(0),
256 work_steal_lock_("work stealing lock"),
257 steal_index_(0) {
258 while (GetThreadCount() < num_threads) {
259 const std::string name = StringPrintf("Work stealing worker %zu", GetThreadCount());
260 threads_.push_back(new WorkStealingWorker(this, name, ThreadPoolWorker::kDefaultStackSize));
261 }
262}
263
264WorkStealingTask* WorkStealingThreadPool::FindTaskToStealFrom(Thread* self) {
265 const size_t thread_count = GetThreadCount();
266 for (size_t i = 0; i < thread_count; ++i) {
267 // TODO: Use CAS instead of lock.
268 ++steal_index_;
269 if (steal_index_ >= thread_count) {
270 steal_index_-= thread_count;
271 }
272
273 WorkStealingWorker* worker = down_cast<WorkStealingWorker*>(threads_[steal_index_]);
274 WorkStealingTask* task = worker->task_;
275 if (task) {
276 // Not null, we can probably steal from this worker.
277 return task;
278 }
279 }
280 // Couldn't find something to steal.
281 return NULL;
282}
283
284WorkStealingThreadPool::~WorkStealingThreadPool() {
285
286}
287
Mathieu Chartier0e4627e2012-10-23 16:13:36 -0700288} // namespace art