blob: adb416859e49e84e26d43f91afd890f822fb4f5e [file] [log] [blame]
fdoraya2d271b2016-04-15 23:09:08 +09001// Copyright 2016 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
robliao7ac34ba2016-06-23 03:16:25 +09005#include "base/task_scheduler/scheduler_worker_pool_impl.h"
fdoraya2d271b2016-04-15 23:09:08 +09006
7#include <stddef.h>
8
9#include <memory>
10#include <unordered_set>
11#include <vector>
12
robliao2e951752016-07-23 03:12:18 +090013#include "base/atomicops.h"
Jeffrey Heb23ff4c2017-08-23 07:32:49 +090014#include "base/barrier_closure.h"
fdoraya2d271b2016-04-15 23:09:08 +090015#include "base/bind.h"
16#include "base/bind_helpers.h"
fdoraydace22d2016-04-29 04:35:47 +090017#include "base/callback.h"
fdoraya2d271b2016-04-15 23:09:08 +090018#include "base/macros.h"
19#include "base/memory/ptr_util.h"
20#include "base/memory/ref_counted.h"
fdoray4b836782016-09-28 05:44:25 +090021#include "base/metrics/histogram.h"
22#include "base/metrics/histogram_samples.h"
23#include "base/metrics/statistics_recorder.h"
fdoraya2d271b2016-04-15 23:09:08 +090024#include "base/synchronization/condition_variable.h"
25#include "base/synchronization/lock.h"
26#include "base/synchronization/waitable_event.h"
27#include "base/task_runner.h"
fdorayc2c74992016-04-20 10:39:21 +090028#include "base/task_scheduler/delayed_task_manager.h"
robliaodf2e1542016-07-21 06:46:52 +090029#include "base/task_scheduler/scheduler_worker_pool_params.h"
fdoraya2d271b2016-04-15 23:09:08 +090030#include "base/task_scheduler/sequence.h"
31#include "base/task_scheduler/sequence_sort_key.h"
32#include "base/task_scheduler/task_tracker.h"
fdoray570633b2016-04-26 01:24:46 +090033#include "base/task_scheduler/test_task_factory.h"
fdoray9c56ea32016-11-02 23:35:26 +090034#include "base/task_scheduler/test_utils.h"
gabbcf9c762016-08-02 01:39:56 +090035#include "base/test/gtest_util.h"
fdoraya6600912016-10-15 06:40:37 +090036#include "base/test/test_simple_task_runner.h"
37#include "base/test/test_timeouts.h"
fdoraya2d271b2016-04-15 23:09:08 +090038#include "base/threading/platform_thread.h"
Jeffrey Heb23ff4c2017-08-23 07:32:49 +090039#include "base/threading/scoped_blocking_call.h"
fdoraya2d271b2016-04-15 23:09:08 +090040#include "base/threading/simple_thread.h"
fdoraya6600912016-10-15 06:40:37 +090041#include "base/threading/thread.h"
robliao2e951752016-07-23 03:12:18 +090042#include "base/threading/thread_checker_impl.h"
43#include "base/threading/thread_local_storage.h"
fdoray42925262016-04-29 06:36:33 +090044#include "base/threading/thread_restrictions.h"
robliao2e951752016-07-23 03:12:18 +090045#include "base/time/time.h"
fdoraya2d271b2016-04-15 23:09:08 +090046#include "testing/gtest/include/gtest/gtest.h"
47
48namespace base {
49namespace internal {
50namespace {
51
fdoray4b836782016-09-28 05:44:25 +090052constexpr size_t kNumWorkersInWorkerPool = 4;
53constexpr size_t kNumThreadsPostingTasks = 4;
54constexpr size_t kNumTasksPostedPerThread = 150;
55// This can't be lower because Windows' WaitableEvent wakes up too early when a
56// small timeout is used. This results in many spurious wake ups before a worker
Jeffrey He92b46b42017-08-09 00:05:01 +090057// is allowed to cleanup.
58constexpr TimeDelta kReclaimTimeForCleanupTests =
fdoray4b836782016-09-28 05:44:25 +090059 TimeDelta::FromMilliseconds(500);
Jeffrey He92b46b42017-08-09 00:05:01 +090060constexpr TimeDelta kExtraTimeToWaitForCleanup = TimeDelta::FromSeconds(1);
fdoraya2d271b2016-04-15 23:09:08 +090061
Francois Dorayb9223752017-09-06 03:44:43 +090062class TaskSchedulerWorkerPoolImplTestBase {
fdoraya2d271b2016-04-15 23:09:08 +090063 protected:
Francois Dorayb9223752017-09-06 03:44:43 +090064 TaskSchedulerWorkerPoolImplTestBase()
65 : service_thread_("TaskSchedulerServiceThread"){};
fdoraya2d271b2016-04-15 23:09:08 +090066
Francois Dorayb9223752017-09-06 03:44:43 +090067 void SetUp() {
fdoray2890d2f2017-04-08 09:51:58 +090068 CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
fdoraya2d271b2016-04-15 23:09:08 +090069 }
70
Francois Dorayb9223752017-09-06 03:44:43 +090071 void TearDown() {
fdoraya6600912016-10-15 06:40:37 +090072 service_thread_.Stop();
Jeffrey He92b46b42017-08-09 00:05:01 +090073 task_tracker_.Flush();
robliaocf19f742016-06-23 03:36:41 +090074 worker_pool_->WaitForAllWorkersIdleForTesting();
robliao7ac34ba2016-06-23 03:16:25 +090075 worker_pool_->JoinForTesting();
fdoraya2d271b2016-04-15 23:09:08 +090076 }
77
fdoray2890d2f2017-04-08 09:51:58 +090078 void CreateWorkerPool() {
fdoraya6600912016-10-15 06:40:37 +090079 ASSERT_FALSE(worker_pool_);
fdoraya6600912016-10-15 06:40:37 +090080 service_thread_.Start();
fdoray4a475d62017-04-20 22:13:11 +090081 delayed_task_manager_.Start(service_thread_.task_runner());
Jeremy Romancd0c4672017-08-17 08:27:24 +090082 worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
83 "TestWorkerPool", ThreadPriority::NORMAL, &task_tracker_,
84 &delayed_task_manager_);
robliao2e951752016-07-23 03:12:18 +090085 ASSERT_TRUE(worker_pool_);
86 }
87
fdoray2890d2f2017-04-08 09:51:58 +090088 void StartWorkerPool(TimeDelta suggested_reclaim_time, size_t num_workers) {
89 ASSERT_TRUE(worker_pool_);
Jeffrey He997f4492017-07-27 07:44:45 +090090 worker_pool_->Start(
Francois Dorayb9223752017-09-06 03:44:43 +090091 SchedulerWorkerPoolParams(num_workers, suggested_reclaim_time),
92 service_thread_.task_runner());
fdoray2890d2f2017-04-08 09:51:58 +090093 }
94
95 void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time,
96 size_t num_workers) {
97 CreateWorkerPool();
98 StartWorkerPool(suggested_reclaim_time, num_workers);
99 }
100
robliao7ac34ba2016-06-23 03:16:25 +0900101 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
fdoraya2d271b2016-04-15 23:09:08 +0900102
103 TaskTracker task_tracker_;
fdoraya6600912016-10-15 06:40:37 +0900104 Thread service_thread_;
fdoraya2d271b2016-04-15 23:09:08 +0900105
fdoray9b0b2332016-04-26 06:34:33 +0900106 private:
fdoray4a475d62017-04-20 22:13:11 +0900107 DelayedTaskManager delayed_task_manager_;
108
Francois Dorayb9223752017-09-06 03:44:43 +0900109 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestBase);
110};
111
112class TaskSchedulerWorkerPoolImplTest
113 : public TaskSchedulerWorkerPoolImplTestBase,
114 public testing::Test {
115 protected:
116 TaskSchedulerWorkerPoolImplTest() = default;
117
118 void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::SetUp(); }
119
120 void TearDown() override { TaskSchedulerWorkerPoolImplTestBase::TearDown(); }
121
122 private:
robliao7ac34ba2016-06-23 03:16:25 +0900123 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTest);
fdoraya2d271b2016-04-15 23:09:08 +0900124};
125
Francois Dorayb9223752017-09-06 03:44:43 +0900126class TaskSchedulerWorkerPoolImplTestParam
127 : public TaskSchedulerWorkerPoolImplTestBase,
128 public testing::TestWithParam<test::ExecutionMode> {
129 protected:
130 TaskSchedulerWorkerPoolImplTestParam() = default;
131
132 void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::SetUp(); }
133
134 void TearDown() override { TaskSchedulerWorkerPoolImplTestBase::TearDown(); }
135
136 private:
137 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestParam);
138};
139
fdoray570633b2016-04-26 01:24:46 +0900140using PostNestedTask = test::TestTaskFactory::PostNestedTask;
fdoraya2d271b2016-04-15 23:09:08 +0900141
Jeffrey He68d29bc2017-08-24 02:14:16 +0900142class ThreadPostingTasksWaitIdle : public SimpleThread {
fdoraya2d271b2016-04-15 23:09:08 +0900143 public:
robliao7ac34ba2016-06-23 03:16:25 +0900144 // Constructs a thread that posts tasks to |worker_pool| through an
Jeffrey He68d29bc2017-08-24 02:14:16 +0900145 // |execution_mode| task runner. The thread waits until all workers in
146 // |worker_pool| are idle before posting a new task.
147 ThreadPostingTasksWaitIdle(SchedulerWorkerPoolImpl* worker_pool,
148 test::ExecutionMode execution_mode)
149 : SimpleThread("ThreadPostingTasksWaitIdle"),
robliao7ac34ba2016-06-23 03:16:25 +0900150 worker_pool_(worker_pool),
fdoray9c56ea32016-11-02 23:35:26 +0900151 factory_(CreateTaskRunnerWithExecutionMode(worker_pool, execution_mode),
fdoray570633b2016-04-26 01:24:46 +0900152 execution_mode) {
robliao7ac34ba2016-06-23 03:16:25 +0900153 DCHECK(worker_pool_);
fdoraya2d271b2016-04-15 23:09:08 +0900154 }
155
fdoray570633b2016-04-26 01:24:46 +0900156 const test::TestTaskFactory* factory() const { return &factory_; }
fdoraya2d271b2016-04-15 23:09:08 +0900157
158 private:
159 void Run() override {
Yeol0d4f3eb2017-07-26 02:09:10 +0900160 EXPECT_FALSE(factory_.task_runner()->RunsTasksInCurrentSequence());
fdoraya2d271b2016-04-15 23:09:08 +0900161
162 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
Jeffrey He68d29bc2017-08-24 02:14:16 +0900163 worker_pool_->WaitForAllWorkersIdleForTesting();
164 EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, Closure()));
fdoraya2d271b2016-04-15 23:09:08 +0900165 }
166 }
167
robliao7ac34ba2016-06-23 03:16:25 +0900168 SchedulerWorkerPoolImpl* const worker_pool_;
fdoraya2d271b2016-04-15 23:09:08 +0900169 const scoped_refptr<TaskRunner> task_runner_;
fdoray570633b2016-04-26 01:24:46 +0900170 test::TestTaskFactory factory_;
fdoraya2d271b2016-04-15 23:09:08 +0900171
Jeffrey He68d29bc2017-08-24 02:14:16 +0900172 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasksWaitIdle);
fdoraya2d271b2016-04-15 23:09:08 +0900173};
174
fdoray9b0b2332016-04-26 06:34:33 +0900175} // namespace
176
Francois Dorayb9223752017-09-06 03:44:43 +0900177TEST_P(TaskSchedulerWorkerPoolImplTestParam, PostTasksWaitAllWorkersIdle) {
robliaocf19f742016-06-23 03:36:41 +0900178 // Create threads to post tasks. To verify that workers can sleep and be woken
179 // up when new tasks are posted, wait for all workers to become idle before
180 // posting a new task.
Jeffrey He68d29bc2017-08-24 02:14:16 +0900181 std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
182 threads_posting_tasks;
fdoraya2d271b2016-04-15 23:09:08 +0900183 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
Jeffrey He68d29bc2017-08-24 02:14:16 +0900184 threads_posting_tasks.push_back(
185 MakeUnique<ThreadPostingTasksWaitIdle>(worker_pool_.get(), GetParam()));
fdoraya2d271b2016-04-15 23:09:08 +0900186 threads_posting_tasks.back()->Start();
187 }
188
189 // Wait for all tasks to run.
190 for (const auto& thread_posting_tasks : threads_posting_tasks) {
191 thread_posting_tasks->Join();
192 thread_posting_tasks->factory()->WaitForAllTasksToRun();
fdoraya2d271b2016-04-15 23:09:08 +0900193 }
194
robliaocf19f742016-06-23 03:36:41 +0900195 // Wait until all workers are idle to be sure that no task accesses its
196 // TestTaskFactory after |thread_posting_tasks| is destroyed.
197 worker_pool_->WaitForAllWorkersIdleForTesting();
fdoraya2d271b2016-04-15 23:09:08 +0900198}
199
Francois Dorayb9223752017-09-06 03:44:43 +0900200TEST_P(TaskSchedulerWorkerPoolImplTestParam, PostTasksWithOneAvailableWorker) {
robliaocf19f742016-06-23 03:36:41 +0900201 // Post blocking tasks to keep all workers busy except one until |event| is
fdoraydace22d2016-04-29 04:35:47 +0900202 // signaled. Use different factories so that tasks are added to different
203 // sequences and can run simultaneously when the execution mode is SEQUENCED.
gab56162332016-06-02 06:15:33 +0900204 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
205 WaitableEvent::InitialState::NOT_SIGNALED);
fdoray570633b2016-04-26 01:24:46 +0900206 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
robliao7ac34ba2016-06-23 03:16:25 +0900207 for (size_t i = 0; i < (kNumWorkersInWorkerPool - 1); ++i) {
Jeremy Romancd0c4672017-08-17 08:27:24 +0900208 blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
fdoray9c56ea32016-11-02 23:35:26 +0900209 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
ricead95e71b2016-09-13 13:10:11 +0900210 GetParam()));
fdoraydace22d2016-04-29 04:35:47 +0900211 EXPECT_TRUE(blocked_task_factories.back()->PostTask(
212 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
fdoray82243232016-04-16 08:25:15 +0900213 blocked_task_factories.back()->WaitForAllTasksToRun();
214 }
fdoraya2d271b2016-04-15 23:09:08 +0900215
216 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
robliaocf19f742016-06-23 03:36:41 +0900217 // that only one worker in |worker_pool_| isn't busy.
fdoray570633b2016-04-26 01:24:46 +0900218 test::TestTaskFactory short_task_factory(
fdoray9c56ea32016-11-02 23:35:26 +0900219 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
fdoray570633b2016-04-26 01:24:46 +0900220 GetParam());
fdoraya2d271b2016-04-15 23:09:08 +0900221 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
fdoraydace22d2016-04-29 04:35:47 +0900222 EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, Closure()));
fdoray82243232016-04-16 08:25:15 +0900223 short_task_factory.WaitForAllTasksToRun();
fdoraya2d271b2016-04-15 23:09:08 +0900224
225 // Release tasks waiting on |event|.
226 event.Signal();
227
robliaocf19f742016-06-23 03:36:41 +0900228 // Wait until all workers are idle to be sure that no task accesses
fdoray570633b2016-04-26 01:24:46 +0900229 // its TestTaskFactory after it is destroyed.
robliaocf19f742016-06-23 03:36:41 +0900230 worker_pool_->WaitForAllWorkersIdleForTesting();
fdoraya2d271b2016-04-15 23:09:08 +0900231}
232
Francois Dorayb9223752017-09-06 03:44:43 +0900233TEST_P(TaskSchedulerWorkerPoolImplTestParam, Saturate) {
robliao7ac34ba2016-06-23 03:16:25 +0900234 // Verify that it is possible to have |kNumWorkersInWorkerPool|
fdoraydace22d2016-04-29 04:35:47 +0900235 // tasks/sequences running simultaneously. Use different factories so that the
236 // blocking tasks are added to different sequences and can run simultaneously
237 // when the execution mode is SEQUENCED.
gab56162332016-06-02 06:15:33 +0900238 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
239 WaitableEvent::InitialState::NOT_SIGNALED);
fdoray570633b2016-04-26 01:24:46 +0900240 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
robliao7ac34ba2016-06-23 03:16:25 +0900241 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
Jeremy Romancd0c4672017-08-17 08:27:24 +0900242 factories.push_back(std::make_unique<test::TestTaskFactory>(
fdoray9c56ea32016-11-02 23:35:26 +0900243 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
ricead95e71b2016-09-13 13:10:11 +0900244 GetParam()));
fdoraydace22d2016-04-29 04:35:47 +0900245 EXPECT_TRUE(factories.back()->PostTask(
246 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
fdoray82243232016-04-16 08:25:15 +0900247 factories.back()->WaitForAllTasksToRun();
248 }
fdoraya2d271b2016-04-15 23:09:08 +0900249
250 // Release tasks waiting on |event|.
251 event.Signal();
252
robliaocf19f742016-06-23 03:36:41 +0900253 // Wait until all workers are idle to be sure that no task accesses
fdoray570633b2016-04-26 01:24:46 +0900254 // its TestTaskFactory after it is destroyed.
robliaocf19f742016-06-23 03:36:41 +0900255 worker_pool_->WaitForAllWorkersIdleForTesting();
fdoraya2d271b2016-04-15 23:09:08 +0900256}
257
fdoray82243232016-04-16 08:25:15 +0900258INSTANTIATE_TEST_CASE_P(Parallel,
Francois Dorayb9223752017-09-06 03:44:43 +0900259 TaskSchedulerWorkerPoolImplTestParam,
fdoray9c56ea32016-11-02 23:35:26 +0900260 ::testing::Values(test::ExecutionMode::PARALLEL));
fdoray82243232016-04-16 08:25:15 +0900261INSTANTIATE_TEST_CASE_P(Sequenced,
Francois Dorayb9223752017-09-06 03:44:43 +0900262 TaskSchedulerWorkerPoolImplTestParam,
fdoray9c56ea32016-11-02 23:35:26 +0900263 ::testing::Values(test::ExecutionMode::SEQUENCED));
robliao2e951752016-07-23 03:12:18 +0900264
265namespace {
266
fdoray2890d2f2017-04-08 09:51:58 +0900267class TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest
268 : public TaskSchedulerWorkerPoolImplTest {
269 public:
270 void SetUp() override {
271 CreateWorkerPool();
272 // Let the test start the worker pool.
273 }
274};
275
276void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
277 WaitableEvent* task_scheduled,
278 WaitableEvent* barrier) {
279 *platform_thread_ref = PlatformThread::CurrentRef();
280 task_scheduled->Signal();
281 barrier->Wait();
282}
283
284} // namespace
285
286// Verify that 2 tasks posted before Start() to a SchedulerWorkerPoolImpl with
287// more than 2 workers are scheduled on different workers when Start() is
288// called.
289TEST_F(TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest,
290 PostTasksBeforeStart) {
291 PlatformThreadRef task_1_thread_ref;
292 PlatformThreadRef task_2_thread_ref;
293 WaitableEvent task_1_scheduled(WaitableEvent::ResetPolicy::MANUAL,
294 WaitableEvent::InitialState::NOT_SIGNALED);
295 WaitableEvent task_2_scheduled(WaitableEvent::ResetPolicy::MANUAL,
296 WaitableEvent::InitialState::NOT_SIGNALED);
297
298 // This event is used to prevent a task from completing before the other task
299 // is scheduled. If that happened, both tasks could run on the same worker and
300 // this test couldn't verify that the correct number of workers were woken up.
301 WaitableEvent barrier(WaitableEvent::ResetPolicy::MANUAL,
302 WaitableEvent::InitialState::NOT_SIGNALED);
303
fdorayb7013402017-05-09 13:18:32 +0900304 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()})
tzik330d83f2017-06-26 15:13:17 +0900305 ->PostTask(
306 FROM_HERE,
307 BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
308 Unretained(&task_1_scheduled), Unretained(&barrier)));
fdorayb7013402017-05-09 13:18:32 +0900309 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()})
tzik330d83f2017-06-26 15:13:17 +0900310 ->PostTask(
311 FROM_HERE,
312 BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
313 Unretained(&task_2_scheduled), Unretained(&barrier)));
fdoray2890d2f2017-04-08 09:51:58 +0900314
315 // Workers should not be created and tasks should not run before the pool is
316 // started.
Jeffrey He92b46b42017-08-09 00:05:01 +0900317 EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting());
fdoray2890d2f2017-04-08 09:51:58 +0900318 EXPECT_FALSE(task_1_scheduled.IsSignaled());
319 EXPECT_FALSE(task_2_scheduled.IsSignaled());
320
321 StartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
322
323 // Tasks should be scheduled shortly after the pool is started.
324 task_1_scheduled.Wait();
325 task_2_scheduled.Wait();
326
327 // Tasks should be scheduled on different threads.
328 EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
329
330 barrier.Signal();
331 task_tracker_.Flush();
332}
333
Jeffrey He2afe9232017-08-11 00:22:35 +0900334// Verify that posting many tasks before Start will cause the number of workers
335// to grow to |worker_capacity_| during Start.
336TEST_F(TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest, PostManyTasks) {
337 scoped_refptr<TaskRunner> task_runner =
338 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
339 constexpr size_t kNumTasksPosted = 2 * kNumWorkersInWorkerPool;
340 for (size_t i = 0; i < kNumTasksPosted; ++i)
341 task_runner->PostTask(FROM_HERE, BindOnce(&DoNothing));
342
343 EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting());
344
345 StartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
346 ASSERT_GT(kNumTasksPosted, worker_pool_->GetWorkerCapacityForTesting());
347 EXPECT_EQ(kNumWorkersInWorkerPool,
348 worker_pool_->GetWorkerCapacityForTesting());
349
350 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(),
351 worker_pool_->GetWorkerCapacityForTesting());
352}
353
fdoray2890d2f2017-04-08 09:51:58 +0900354namespace {
355
robliao2e951752016-07-23 03:12:18 +0900356constexpr size_t kMagicTlsValue = 42;
357
358class TaskSchedulerWorkerPoolCheckTlsReuse
359 : public TaskSchedulerWorkerPoolImplTest {
360 public:
361 void SetTlsValueAndWait() {
362 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
363 waiter_.Wait();
364 }
365
366 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) {
367 if (!slot_.Get())
368 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
369
370 count_waiter->Signal();
371 waiter_.Wait();
372 }
373
374 protected:
375 TaskSchedulerWorkerPoolCheckTlsReuse() :
376 waiter_(WaitableEvent::ResetPolicy::MANUAL,
377 WaitableEvent::InitialState::NOT_SIGNALED) {}
378
379 void SetUp() override {
Jeffrey He92b46b42017-08-09 00:05:01 +0900380 CreateAndStartWorkerPool(kReclaimTimeForCleanupTests,
fdoray2890d2f2017-04-08 09:51:58 +0900381 kNumWorkersInWorkerPool);
robliao2e951752016-07-23 03:12:18 +0900382 }
383
384 subtle::Atomic32 zero_tls_values_ = 0;
385
386 WaitableEvent waiter_;
387
388 private:
389 ThreadLocalStorage::Slot slot_;
390
391 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse);
392};
393
394} // namespace
395
Jeffrey He92b46b42017-08-09 00:05:01 +0900396// Checks that at least one worker has been cleaned up by checking the TLS.
397TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckCleanupWorkers) {
398 // Saturate the workers and mark each worker's thread with a magic TLS value.
robliao2e951752016-07-23 03:12:18 +0900399 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
400 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
Jeremy Romancd0c4672017-08-17 08:27:24 +0900401 factories.push_back(std::make_unique<test::TestTaskFactory>(
fdorayb7013402017-05-09 13:18:32 +0900402 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()}),
fdoray9c56ea32016-11-02 23:35:26 +0900403 test::ExecutionMode::PARALLEL));
robliao2e951752016-07-23 03:12:18 +0900404 ASSERT_TRUE(factories.back()->PostTask(
405 PostNestedTask::NO,
406 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait,
407 Unretained(this))));
408 factories.back()->WaitForAllTasksToRun();
409 }
410
411 // Release tasks waiting on |waiter_|.
412 waiter_.Signal();
413 worker_pool_->WaitForAllWorkersIdleForTesting();
414
Jeffrey He92b46b42017-08-09 00:05:01 +0900415 // All workers should be done running by now, so reset for the next phase.
robliao2e951752016-07-23 03:12:18 +0900416 waiter_.Reset();
417
Jeffrey He92b46b42017-08-09 00:05:01 +0900418 // Give the worker pool a chance to cleanup its workers.
419 PlatformThread::Sleep(kReclaimTimeForCleanupTests +
420 kExtraTimeToWaitForCleanup);
robliao2e951752016-07-23 03:12:18 +0900421
Jeffrey He92b46b42017-08-09 00:05:01 +0900422 worker_pool_->DisallowWorkerCleanupForTesting();
robliao2e951752016-07-23 03:12:18 +0900423
Jeffrey He92b46b42017-08-09 00:05:01 +0900424 // Saturate and count the worker threads that do not have the magic TLS value.
425 // If the value is not there, that means we're at a new worker.
robliao2e951752016-07-23 03:12:18 +0900426 std::vector<std::unique_ptr<WaitableEvent>> count_waiters;
427 for (auto& factory : factories) {
428 count_waiters.push_back(WrapUnique(new WaitableEvent(
429 WaitableEvent::ResetPolicy::MANUAL,
430 WaitableEvent::InitialState::NOT_SIGNALED)));
431 ASSERT_TRUE(factory->PostTask(
432 PostNestedTask::NO,
433 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait,
434 Unretained(this),
435 count_waiters.back().get())));
436 factory->WaitForAllTasksToRun();
437 }
438
439 // Wait for all counters to complete.
440 for (auto& count_waiter : count_waiters)
441 count_waiter->Wait();
442
443 EXPECT_GT(subtle::NoBarrier_Load(&zero_tls_values_), 0);
444
445 // Release tasks waiting on |waiter_|.
446 waiter_.Signal();
447}
448
fdoray4b836782016-09-28 05:44:25 +0900449namespace {
450
451class TaskSchedulerWorkerPoolHistogramTest
452 : public TaskSchedulerWorkerPoolImplTest {
453 public:
454 TaskSchedulerWorkerPoolHistogramTest() = default;
455
456 protected:
fdoray23df36e2016-10-21 01:25:56 +0900457 // Override SetUp() to allow every test case to initialize a worker pool with
458 // its own arguments.
fdoray4b836782016-09-28 05:44:25 +0900459 void SetUp() override {}
460
fdoray4b836782016-09-28 05:44:25 +0900461 private:
462 std::unique_ptr<StatisticsRecorder> statistics_recorder_ =
463 StatisticsRecorder::CreateTemporaryForTesting();
464
fdoray4b836782016-09-28 05:44:25 +0900465 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest);
466};
467
468} // namespace
469
470TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) {
471 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
472 WaitableEvent::InitialState::NOT_SIGNALED);
fdoray2890d2f2017-04-08 09:51:58 +0900473 CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
fdoray26866e22016-11-30 04:45:01 +0900474 auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits(
fdorayb7013402017-05-09 13:18:32 +0900475 {WithBaseSyncPrimitives()});
fdoray4b836782016-09-28 05:44:25 +0900476
477 // Post a task.
478 task_runner->PostTask(FROM_HERE,
tzik6bdbeb22017-04-12 00:00:44 +0900479 BindOnce(&WaitableEvent::Wait, Unretained(&event)));
fdoray4b836782016-09-28 05:44:25 +0900480
481 // Post 2 more tasks while the first task hasn't completed its execution. It
482 // is guaranteed that these tasks will run immediately after the first task,
483 // without allowing the worker to sleep.
tzik6bdbeb22017-04-12 00:00:44 +0900484 task_runner->PostTask(FROM_HERE, BindOnce(&DoNothing));
485 task_runner->PostTask(FROM_HERE, BindOnce(&DoNothing));
fdoray4b836782016-09-28 05:44:25 +0900486
487 // Allow tasks to run and wait until the SchedulerWorker is idle.
488 event.Signal();
489 worker_pool_->WaitForAllWorkersIdleForTesting();
490
491 // Wake up the SchedulerWorker that just became idle by posting a task and
492 // wait until it becomes idle again. The SchedulerWorker should record the
493 // TaskScheduler.NumTasksBetweenWaits.* histogram on wake up.
tzik6bdbeb22017-04-12 00:00:44 +0900494 task_runner->PostTask(FROM_HERE, BindOnce(&DoNothing));
fdoray4b836782016-09-28 05:44:25 +0900495 worker_pool_->WaitForAllWorkersIdleForTesting();
496
497 // Verify that counts were recorded to the histogram as expected.
fdorayfd7279f2016-10-14 10:30:36 +0900498 const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
499 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
500 EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
501 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
fdoray4b836782016-09-28 05:44:25 +0900502}
503
504namespace {
505
506void SignalAndWaitEvent(WaitableEvent* signal_event,
507 WaitableEvent* wait_event) {
508 signal_event->Signal();
509 wait_event->Wait();
510}
511
512} // namespace
513
Jeffrey He92b46b42017-08-09 00:05:01 +0900514TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaitsWithCleanup) {
fdoray4b836782016-09-28 05:44:25 +0900515 WaitableEvent tasks_can_exit_event(WaitableEvent::ResetPolicy::MANUAL,
516 WaitableEvent::InitialState::NOT_SIGNALED);
Jeffrey He92b46b42017-08-09 00:05:01 +0900517 CreateAndStartWorkerPool(kReclaimTimeForCleanupTests,
518 kNumWorkersInWorkerPool);
fdorayb7013402017-05-09 13:18:32 +0900519 auto task_runner =
520 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
fdoray4b836782016-09-28 05:44:25 +0900521
522 // Post tasks to saturate the pool.
523 std::vector<std::unique_ptr<WaitableEvent>> task_started_events;
524 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
Jeremy Romancd0c4672017-08-17 08:27:24 +0900525 task_started_events.push_back(std::make_unique<WaitableEvent>(
526 WaitableEvent::ResetPolicy::MANUAL,
527 WaitableEvent::InitialState::NOT_SIGNALED));
tzik6bdbeb22017-04-12 00:00:44 +0900528 task_runner->PostTask(FROM_HERE,
529 BindOnce(&SignalAndWaitEvent,
530 Unretained(task_started_events.back().get()),
531 Unretained(&tasks_can_exit_event)));
fdoray4b836782016-09-28 05:44:25 +0900532 }
533 for (const auto& task_started_event : task_started_events)
534 task_started_event->Wait();
535
536 // Allow tasks to complete their execution and wait to allow workers to
Jeffrey He92b46b42017-08-09 00:05:01 +0900537 // cleanup.
fdoray4b836782016-09-28 05:44:25 +0900538 tasks_can_exit_event.Signal();
539 worker_pool_->WaitForAllWorkersIdleForTesting();
Jeffrey He92b46b42017-08-09 00:05:01 +0900540 PlatformThread::Sleep(kReclaimTimeForCleanupTests +
541 kExtraTimeToWaitForCleanup);
fdoray4b836782016-09-28 05:44:25 +0900542
543 // Wake up SchedulerWorkers by posting tasks. They should record the
544 // TaskScheduler.NumTasksBetweenWaits.* histogram on wake up.
545 tasks_can_exit_event.Reset();
546 task_started_events.clear();
547 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
Jeremy Romancd0c4672017-08-17 08:27:24 +0900548 task_started_events.push_back(std::make_unique<WaitableEvent>(
549 WaitableEvent::ResetPolicy::MANUAL,
550 WaitableEvent::InitialState::NOT_SIGNALED));
tzik6bdbeb22017-04-12 00:00:44 +0900551 task_runner->PostTask(FROM_HERE,
552 BindOnce(&SignalAndWaitEvent,
553 Unretained(task_started_events.back().get()),
554 Unretained(&tasks_can_exit_event)));
fdoray4b836782016-09-28 05:44:25 +0900555 }
556 for (const auto& task_started_event : task_started_events)
557 task_started_event->Wait();
558
fdorayfd7279f2016-10-14 10:30:36 +0900559 const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
560
fdoray4b836782016-09-28 05:44:25 +0900561 // Verify that counts were recorded to the histogram as expected.
562 // - The "0" bucket has a count of at least 1 because the SchedulerWorker on
Jeffrey He92b46b42017-08-09 00:05:01 +0900563 // top of the idle stack isn't allowed to cleanup when its sleep timeout
fdoray4b836782016-09-28 05:44:25 +0900564 // expires. Instead, it waits on its WaitableEvent again without running a
565 // task. The count may be higher than 1 because of spurious wake ups before
566 // the sleep timeout expires.
fdorayfd7279f2016-10-14 10:30:36 +0900567 EXPECT_GE(histogram->SnapshotSamples()->GetCount(0), 1);
fdoray4b836782016-09-28 05:44:25 +0900568 // - The "1" bucket has a count of |kNumWorkersInWorkerPool| because each
569 // SchedulerWorker ran a task before waiting on its WaitableEvent at the
570 // beginning of the test.
571 EXPECT_EQ(static_cast<int>(kNumWorkersInWorkerPool),
fdorayfd7279f2016-10-14 10:30:36 +0900572 histogram->SnapshotSamples()->GetCount(1));
573 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
fdoray4b836782016-09-28 05:44:25 +0900574
575 tasks_can_exit_event.Signal();
576 worker_pool_->WaitForAllWorkersIdleForTesting();
Jeffrey He92b46b42017-08-09 00:05:01 +0900577 worker_pool_->DisallowWorkerCleanupForTesting();
fdoray4b836782016-09-28 05:44:25 +0900578}
579
Jeffrey He92b46b42017-08-09 00:05:01 +0900580TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeCleanup) {
581 CreateAndStartWorkerPool(kReclaimTimeForCleanupTests,
582 kNumWorkersInWorkerPool);
robliao989772a2017-02-27 11:41:56 +0900583
584 auto histogrammed_thread_task_runner =
585 worker_pool_->CreateSequencedTaskRunnerWithTraits(
fdorayb7013402017-05-09 13:18:32 +0900586 {WithBaseSyncPrimitives()});
robliao989772a2017-02-27 11:41:56 +0900587
588 // Post 3 tasks and hold the thread for idle thread stack ordering.
589 // This test assumes |histogrammed_thread_task_runner| gets assigned the same
590 // thread for each of its tasks.
591 PlatformThreadRef thread_ref;
592 histogrammed_thread_task_runner->PostTask(
tzik6bdbeb22017-04-12 00:00:44 +0900593 FROM_HERE, BindOnce(
robliao989772a2017-02-27 11:41:56 +0900594 [](PlatformThreadRef* thread_ref) {
595 ASSERT_TRUE(thread_ref);
596 *thread_ref = PlatformThread::CurrentRef();
597 },
598 Unretained(&thread_ref)));
599 histogrammed_thread_task_runner->PostTask(
tzik6bdbeb22017-04-12 00:00:44 +0900600 FROM_HERE, BindOnce(
robliao79bfa452017-03-14 09:30:45 +0900601 [](PlatformThreadRef* thread_ref) {
602 ASSERT_FALSE(thread_ref->is_null());
603 EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
robliao989772a2017-02-27 11:41:56 +0900604 },
robliao79bfa452017-03-14 09:30:45 +0900605 Unretained(&thread_ref)));
606
Jeffrey He92b46b42017-08-09 00:05:01 +0900607 WaitableEvent cleanup_thread_running(
robliao989772a2017-02-27 11:41:56 +0900608 WaitableEvent::ResetPolicy::MANUAL,
609 WaitableEvent::InitialState::NOT_SIGNALED);
Jeffrey He92b46b42017-08-09 00:05:01 +0900610 WaitableEvent cleanup_thread_continue(
robliao989772a2017-02-27 11:41:56 +0900611 WaitableEvent::ResetPolicy::MANUAL,
612 WaitableEvent::InitialState::NOT_SIGNALED);
613 histogrammed_thread_task_runner->PostTask(
614 FROM_HERE,
tzik6bdbeb22017-04-12 00:00:44 +0900615 BindOnce(
robliao79bfa452017-03-14 09:30:45 +0900616 [](PlatformThreadRef* thread_ref,
Jeffrey He92b46b42017-08-09 00:05:01 +0900617 WaitableEvent* cleanup_thread_running,
618 WaitableEvent* cleanup_thread_continue) {
robliao79bfa452017-03-14 09:30:45 +0900619 ASSERT_FALSE(thread_ref->is_null());
620 EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
Jeffrey He92b46b42017-08-09 00:05:01 +0900621 cleanup_thread_running->Signal();
622 cleanup_thread_continue->Wait();
robliao989772a2017-02-27 11:41:56 +0900623 },
Jeffrey He92b46b42017-08-09 00:05:01 +0900624 Unretained(&thread_ref), Unretained(&cleanup_thread_running),
625 Unretained(&cleanup_thread_continue)));
robliao989772a2017-02-27 11:41:56 +0900626
Jeffrey He92b46b42017-08-09 00:05:01 +0900627 cleanup_thread_running.Wait();
robliao989772a2017-02-27 11:41:56 +0900628
629 // To allow the SchedulerWorker associated with
Jeffrey He92b46b42017-08-09 00:05:01 +0900630 // |histogrammed_thread_task_runner| to cleanup, make sure it isn't on top of
robliao989772a2017-02-27 11:41:56 +0900631 // the idle stack by waking up another SchedulerWorker via
632 // |task_runner_for_top_idle|. |histogrammed_thread_task_runner| should
633 // release and go idle first and then |task_runner_for_top_idle| should
634 // release and go idle. This allows the SchedulerWorker associated with
Jeffrey He92b46b42017-08-09 00:05:01 +0900635 // |histogrammed_thread_task_runner| to cleanup.
robliao989772a2017-02-27 11:41:56 +0900636 WaitableEvent top_idle_thread_running(
637 WaitableEvent::ResetPolicy::MANUAL,
638 WaitableEvent::InitialState::NOT_SIGNALED);
639 WaitableEvent top_idle_thread_continue(
640 WaitableEvent::ResetPolicy::MANUAL,
641 WaitableEvent::InitialState::NOT_SIGNALED);
642 auto task_runner_for_top_idle =
643 worker_pool_->CreateSequencedTaskRunnerWithTraits(
fdorayb7013402017-05-09 13:18:32 +0900644 {WithBaseSyncPrimitives()});
robliao989772a2017-02-27 11:41:56 +0900645 task_runner_for_top_idle->PostTask(
tzik6bdbeb22017-04-12 00:00:44 +0900646 FROM_HERE, BindOnce(
robliao989772a2017-02-27 11:41:56 +0900647 [](PlatformThreadRef thread_ref,
648 WaitableEvent* top_idle_thread_running,
649 WaitableEvent* top_idle_thread_continue) {
robliao79bfa452017-03-14 09:30:45 +0900650 ASSERT_FALSE(thread_ref.is_null());
robliao989772a2017-02-27 11:41:56 +0900651 EXPECT_NE(thread_ref, PlatformThread::CurrentRef())
Jeffrey He92b46b42017-08-09 00:05:01 +0900652 << "Worker reused. Worker will not cleanup and the "
robliao989772a2017-02-27 11:41:56 +0900653 "histogram value will be wrong.";
654 top_idle_thread_running->Signal();
655 top_idle_thread_continue->Wait();
656 },
657 thread_ref, Unretained(&top_idle_thread_running),
658 Unretained(&top_idle_thread_continue)));
659 top_idle_thread_running.Wait();
Jeffrey He92b46b42017-08-09 00:05:01 +0900660 cleanup_thread_continue.Signal();
robliao989772a2017-02-27 11:41:56 +0900661 // Wait for the thread processing the |histogrammed_thread_task_runner| work
662 // to go to the idle stack.
663 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
664 top_idle_thread_continue.Signal();
665 // Allow the thread processing the |histogrammed_thread_task_runner| work to
Jeffrey He92b46b42017-08-09 00:05:01 +0900666 // cleanup.
667 PlatformThread::Sleep(kReclaimTimeForCleanupTests +
668 kReclaimTimeForCleanupTests);
robliao989772a2017-02-27 11:41:56 +0900669 worker_pool_->WaitForAllWorkersIdleForTesting();
Jeffrey He92b46b42017-08-09 00:05:01 +0900670 worker_pool_->DisallowWorkerCleanupForTesting();
robliao989772a2017-02-27 11:41:56 +0900671
672 // Verify that counts were recorded to the histogram as expected.
673 const auto* histogram = worker_pool_->num_tasks_before_detach_histogram();
Jeffrey He92b46b42017-08-09 00:05:01 +0900674 // Note: There'll be a thread that cleanups after running no tasks. This
Jeffrey He9a7fff22017-07-28 01:11:10 +0900675 // thread was the one created to maintain an idle thread after posting the
676 // task via |task_runner_for_top_idle|.
677 EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(0));
robliao989772a2017-02-27 11:41:56 +0900678 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(1));
679 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(2));
680 EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
681 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(4));
682 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(5));
683 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(6));
684 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
685}
686
robliao8ff674c2016-11-18 03:33:32 +0900687TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) {
688 TaskTracker task_tracker;
fdoray4a475d62017-04-20 22:13:11 +0900689 DelayedTaskManager delayed_task_manager;
Francois Dorayb9223752017-09-06 03:44:43 +0900690 scoped_refptr<TaskRunner> service_thread_task_runner =
691 MakeRefCounted<TestSimpleTaskRunner>();
692 delayed_task_manager.Start(service_thread_task_runner);
Jeremy Romancd0c4672017-08-17 08:27:24 +0900693 auto worker_pool = std::make_unique<SchedulerWorkerPoolImpl>(
fdoray9b6446c2017-05-09 02:31:00 +0900694 "OnePolicyWorkerPool", ThreadPriority::NORMAL, &task_tracker,
robliao8ff674c2016-11-18 03:33:32 +0900695 &delayed_task_manager);
Francois Dorayb9223752017-09-06 03:44:43 +0900696 worker_pool->Start(SchedulerWorkerPoolParams(8U, TimeDelta::Max()),
697 service_thread_task_runner);
robliao8ff674c2016-11-18 03:33:32 +0900698 ASSERT_TRUE(worker_pool);
Jeffrey He92b46b42017-08-09 00:05:01 +0900699 EXPECT_EQ(1U, worker_pool->NumberOfWorkersForTesting());
robliao8ff674c2016-11-18 03:33:32 +0900700 worker_pool->JoinForTesting();
701}
702
Jeffrey He9a7fff22017-07-28 01:11:10 +0900703// Verify the SchedulerWorkerPoolImpl keeps at least one idle standby thread,
704// capacity permitting.
705TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, VerifyStandbyThread) {
706 constexpr size_t worker_capacity = 3;
707
708 TaskTracker task_tracker;
709 DelayedTaskManager delayed_task_manager;
Francois Dorayb9223752017-09-06 03:44:43 +0900710 scoped_refptr<TaskRunner> service_thread_task_runner =
711 MakeRefCounted<TestSimpleTaskRunner>();
712 delayed_task_manager.Start(service_thread_task_runner);
Jeremy Romancd0c4672017-08-17 08:27:24 +0900713 auto worker_pool = std::make_unique<SchedulerWorkerPoolImpl>(
Jeffrey He9a7fff22017-07-28 01:11:10 +0900714 "StandbyThreadWorkerPool", ThreadPriority::NORMAL, &task_tracker,
715 &delayed_task_manager);
716 worker_pool->Start(
Francois Dorayb9223752017-09-06 03:44:43 +0900717 SchedulerWorkerPoolParams(worker_capacity, kReclaimTimeForCleanupTests),
718 service_thread_task_runner);
Jeffrey He9a7fff22017-07-28 01:11:10 +0900719 ASSERT_TRUE(worker_pool);
Jeffrey He92b46b42017-08-09 00:05:01 +0900720 EXPECT_EQ(1U, worker_pool->NumberOfWorkersForTesting());
Jeffrey He9a7fff22017-07-28 01:11:10 +0900721
722 auto task_runner =
723 worker_pool->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
724
725 WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC,
726 WaitableEvent::InitialState::NOT_SIGNALED);
727 WaitableEvent thread_continue(WaitableEvent::ResetPolicy::MANUAL,
728 WaitableEvent::InitialState::NOT_SIGNALED);
729
730 RepeatingClosure closure = BindRepeating(
731 [](WaitableEvent* thread_running, WaitableEvent* thread_continue) {
732 thread_running->Signal();
733 thread_continue->Wait();
734 },
735 Unretained(&thread_running), Unretained(&thread_continue));
736
737 // There should be one idle thread until we reach worker capacity
738 for (size_t i = 0; i < worker_capacity; ++i) {
Jeffrey He92b46b42017-08-09 00:05:01 +0900739 EXPECT_EQ(i + 1, worker_pool->NumberOfWorkersForTesting());
Jeffrey He9a7fff22017-07-28 01:11:10 +0900740 task_runner->PostTask(FROM_HERE, closure);
741 thread_running.Wait();
742 }
743
744 // There should not be an extra idle thread if it means going above capacity
Jeffrey He92b46b42017-08-09 00:05:01 +0900745 EXPECT_EQ(worker_capacity, worker_pool->NumberOfWorkersForTesting());
Jeffrey He9a7fff22017-07-28 01:11:10 +0900746
747 thread_continue.Signal();
Jeffrey He92b46b42017-08-09 00:05:01 +0900748 // Give time for a worker to cleanup. Verify that the pool attempts to keep
749 // one idle active worker.
750 PlatformThread::Sleep(kReclaimTimeForCleanupTests +
751 kExtraTimeToWaitForCleanup);
752 EXPECT_EQ(1U, worker_pool->NumberOfWorkersForTesting());
Jeffrey He9a7fff22017-07-28 01:11:10 +0900753
Jeffrey He92b46b42017-08-09 00:05:01 +0900754 worker_pool->DisallowWorkerCleanupForTesting();
Jeffrey He9a7fff22017-07-28 01:11:10 +0900755 worker_pool->JoinForTesting();
756}
757
Francois Doray7c49b872017-09-12 02:27:50 +0900758namespace {
759
760enum class OptionalBlockingType {
761 NO_BLOCK,
762 MAY_BLOCK,
763 WILL_BLOCK,
764};
765
766struct NestedBlockingType {
767 NestedBlockingType(BlockingType first_in,
768 OptionalBlockingType second_in,
769 BlockingType behaves_as_in)
770 : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
771
772 BlockingType first;
773 OptionalBlockingType second;
774 BlockingType behaves_as;
775};
776
777class NestedScopedBlockingCall {
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900778 public:
Francois Doray7c49b872017-09-12 02:27:50 +0900779 NestedScopedBlockingCall(const NestedBlockingType& nested_blocking_type)
780 : first_scoped_blocking_call_(nested_blocking_type.first),
781 second_scoped_blocking_call_(
782 nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
783 ? std::make_unique<ScopedBlockingCall>(BlockingType::WILL_BLOCK)
784 : (nested_blocking_type.second ==
785 OptionalBlockingType::MAY_BLOCK
786 ? std::make_unique<ScopedBlockingCall>(
787 BlockingType::MAY_BLOCK)
788 : nullptr)) {}
789
790 private:
791 ScopedBlockingCall first_scoped_blocking_call_;
792 std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
793
794 DISALLOW_COPY_AND_ASSIGN(NestedScopedBlockingCall);
795};
796
797} // namespace
798
799class TaskSchedulerWorkerPoolBlockingTest
800 : public TaskSchedulerWorkerPoolImplTestBase,
801 public testing::TestWithParam<NestedBlockingType> {
802 public:
803 TaskSchedulerWorkerPoolBlockingTest()
Francois Dorayb9223752017-09-06 03:44:43 +0900804 : blocking_thread_running_(WaitableEvent::ResetPolicy::AUTOMATIC,
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900805 WaitableEvent::InitialState::NOT_SIGNALED),
806 blocking_thread_continue_(WaitableEvent::ResetPolicy::MANUAL,
807 WaitableEvent::InitialState::NOT_SIGNALED) {}
808
Francois Doray7c49b872017-09-12 02:27:50 +0900809 static std::string ParamInfoToString(
810 ::testing::TestParamInfo<NestedBlockingType> param_info) {
811 std::string str = param_info.param.first == BlockingType::MAY_BLOCK
812 ? "MAY_BLOCK"
813 : "WILL_BLOCK";
814 if (param_info.param.second == OptionalBlockingType::MAY_BLOCK)
815 str += "_MAY_BLOCK";
816 else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK)
817 str += "_WILL_BLOCK";
818 return str;
819 }
820
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900821 void SetUp() override {
Francois Dorayb9223752017-09-06 03:44:43 +0900822 TaskSchedulerWorkerPoolImplTestBase::SetUp();
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900823 task_runner_ =
824 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
825 }
826
Francois Dorayb9223752017-09-06 03:44:43 +0900827 void TearDown() override { TaskSchedulerWorkerPoolImplTestBase::TearDown(); }
828
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900829 protected:
830 // Saturates the worker pool with a task that first blocks, waits to be
831 // unblocked, then exits.
Francois Doray7c49b872017-09-12 02:27:50 +0900832 void SaturateWithBlockingTasks(
833 const NestedBlockingType& nested_blocking_type) {
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900834 RepeatingClosure blocking_thread_running_closure =
835 BarrierClosure(kNumWorkersInWorkerPool,
836 BindOnce(&WaitableEvent::Signal,
837 Unretained(&blocking_thread_running_)));
Francois Dorayb9223752017-09-06 03:44:43 +0900838
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900839 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
840 task_runner_->PostTask(
841 FROM_HERE,
842 BindOnce(
843 [](Closure* blocking_thread_running_closure,
Francois Dorayb9223752017-09-06 03:44:43 +0900844 WaitableEvent* blocking_thread_continue_,
Francois Doray7c49b872017-09-12 02:27:50 +0900845 const NestedBlockingType& nested_blocking_type) {
846 NestedScopedBlockingCall nested_scoped_blocking_call(
847 nested_blocking_type);
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900848 blocking_thread_running_closure->Run();
Francois Doraye058b722017-09-12 04:53:29 +0900849
850 {
851 // Use ScopedClearBlockingObserverForTesting to avoid
852 // affecting the worker capacity with this WaitableEvent.
853 internal::ScopedClearBlockingObserverForTesting
854 scoped_clear_blocking_observer;
855 blocking_thread_continue_->Wait();
856 }
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900857
858 },
859 Unretained(&blocking_thread_running_closure),
Francois Doray7c49b872017-09-12 02:27:50 +0900860 Unretained(&blocking_thread_continue_), nested_blocking_type));
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900861 }
862 blocking_thread_running_.Wait();
863 }
864
Francois Dorayb9223752017-09-06 03:44:43 +0900865 // Returns how long we can expect a change to |worker_capacity_| to occur
866 // after a task has become blocked.
867 TimeDelta GetWorkerCapacityChangeSleepTime() {
868 return std::max(SchedulerWorkerPoolImpl::kBlockedWorkersPollPeriod,
869 worker_pool_->MayBlockThreshold()) +
870 TestTimeouts::tiny_timeout();
871 }
872
873 // Waits up to some amount of time until |worker_pool_|'s worker capacity
874 // reaches |expected_worker_capacity|.
875 void ExpectWorkerCapacityAfterDelay(size_t expected_worker_capacity) {
876 constexpr int kMaxAttempts = 4;
877 for (int i = 0;
878 i < kMaxAttempts && worker_pool_->GetWorkerCapacityForTesting() !=
879 expected_worker_capacity;
880 ++i) {
881 PlatformThread::Sleep(GetWorkerCapacityChangeSleepTime());
882 }
883
884 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
885 expected_worker_capacity);
886 }
887
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900888 // Unblocks tasks posted by SaturateWithBlockingTasks().
889 void UnblockTasks() { blocking_thread_continue_.Signal(); }
890
891 scoped_refptr<TaskRunner> task_runner_;
892
893 private:
894 WaitableEvent blocking_thread_running_;
895 WaitableEvent blocking_thread_continue_;
896
Francois Doray7c49b872017-09-12 02:27:50 +0900897 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolBlockingTest);
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900898};
899
900// Verify that BlockingScopeEntered() causes worker capacity to increase and
901// creates a worker if needed. Also verify that BlockingScopeExited() decreases
902// worker capacity after an increase.
Francois Doray7c49b872017-09-12 02:27:50 +0900903TEST_P(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockedUnblocked) {
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900904 ASSERT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
905 kNumWorkersInWorkerPool);
906
Francois Dorayb9223752017-09-06 03:44:43 +0900907 SaturateWithBlockingTasks(GetParam());
Francois Doray7c49b872017-09-12 02:27:50 +0900908 if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
Francois Dorayb9223752017-09-06 03:44:43 +0900909 ExpectWorkerCapacityAfterDelay(2 * kNumWorkersInWorkerPool);
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900910 // A range of possible number of workers is accepted because of
911 // crbug.com/757897.
912 EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(),
913 kNumWorkersInWorkerPool + 1);
914 EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(),
915 2 * kNumWorkersInWorkerPool);
916 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
917 2 * kNumWorkersInWorkerPool);
918
919 UnblockTasks();
920 task_tracker_.Flush();
921 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
922 kNumWorkersInWorkerPool);
923}
924
925// Verify that tasks posted in a saturated pool before a ScopedBlockingCall will
926// execute after ScopedBlockingCall is instantiated.
Francois Doray7c49b872017-09-12 02:27:50 +0900927TEST_P(TaskSchedulerWorkerPoolBlockingTest, PostBeforeBlocking) {
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900928 WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC,
929 WaitableEvent::InitialState::NOT_SIGNALED);
930 WaitableEvent thread_can_block(WaitableEvent::ResetPolicy::MANUAL,
931 WaitableEvent::InitialState::NOT_SIGNALED);
932 WaitableEvent thread_continue(WaitableEvent::ResetPolicy::MANUAL,
933 WaitableEvent::InitialState::NOT_SIGNALED);
934
935 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
936 task_runner_->PostTask(
937 FROM_HERE,
938 BindOnce(
Francois Doray7c49b872017-09-12 02:27:50 +0900939 [](const NestedBlockingType& nested_blocking_type,
940 WaitableEvent* thread_running, WaitableEvent* thread_can_block,
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900941 WaitableEvent* thread_continue) {
942 thread_running->Signal();
Francois Doraye058b722017-09-12 04:53:29 +0900943 {
944 // Use ScopedClearBlockingObserverForTesting to avoid affecting
945 // the worker capacity with this WaitableEvent.
946 internal::ScopedClearBlockingObserverForTesting
947 scoped_clear_blocking_observer;
948 thread_can_block->Wait();
949 }
950
Francois Doray7c49b872017-09-12 02:27:50 +0900951 NestedScopedBlockingCall nested_scoped_blocking_call(
952 nested_blocking_type);
Francois Doraye058b722017-09-12 04:53:29 +0900953
954 {
955 // Use ScopedClearBlockingObserverForTesting to avoid affecting
956 // the worker capacity with this WaitableEvent.
957 internal::ScopedClearBlockingObserverForTesting
958 scoped_clear_blocking_observer;
959 thread_continue->Wait();
960 }
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900961 },
Francois Dorayb9223752017-09-06 03:44:43 +0900962 GetParam(), Unretained(&thread_running),
963 Unretained(&thread_can_block), Unretained(&thread_continue)));
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900964 thread_running.Wait();
965 }
966
967 // All workers should be occupied and the pool should be saturated. Workers
968 // have not entered ScopedBlockingCall yet.
969 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), kNumWorkersInWorkerPool);
970 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
971 kNumWorkersInWorkerPool);
972
973 WaitableEvent extra_thread_running(WaitableEvent::ResetPolicy::MANUAL,
974 WaitableEvent::InitialState::NOT_SIGNALED);
975 WaitableEvent extra_threads_continue(
976 WaitableEvent::ResetPolicy::MANUAL,
977 WaitableEvent::InitialState::NOT_SIGNALED);
978 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
979 kNumWorkersInWorkerPool,
980 BindOnce(&WaitableEvent::Signal, Unretained(&extra_thread_running)));
981 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
982 task_runner_->PostTask(FROM_HERE,
983 BindOnce(
984 [](Closure* extra_threads_running_barrier,
985 WaitableEvent* extra_threads_continue) {
986 extra_threads_running_barrier->Run();
Francois Doraye058b722017-09-12 04:53:29 +0900987 {
988 // Use ScopedClearBlockingObserverForTesting
989 // to avoid affecting the worker capacity
990 // with this WaitableEvent.
991 internal::
992 ScopedClearBlockingObserverForTesting
993 scoped_clear_blocking_observer;
994 extra_threads_continue->Wait();
995 }
Jeffrey Heb23ff4c2017-08-23 07:32:49 +0900996 },
997 Unretained(&extra_threads_running_barrier),
998 Unretained(&extra_threads_continue)));
999 }
1000
1001 // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
1002 // tasks we just posted.
1003 thread_can_block.Signal();
Francois Doray7c49b872017-09-12 02:27:50 +09001004 if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
Francois Dorayb9223752017-09-06 03:44:43 +09001005 ExpectWorkerCapacityAfterDelay(2 * kNumWorkersInWorkerPool);
Jeffrey Heb23ff4c2017-08-23 07:32:49 +09001006
1007 // Should not block forever.
1008 extra_thread_running.Wait();
1009 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(),
1010 2 * kNumWorkersInWorkerPool);
1011 extra_threads_continue.Signal();
1012
1013 thread_continue.Signal();
1014 task_tracker_.Flush();
1015}
1016// Verify that workers become idle when the pool is over-capacity and that
1017// those workers do no work.
Francois Doray7c49b872017-09-12 02:27:50 +09001018TEST_P(TaskSchedulerWorkerPoolBlockingTest, WorkersIdleWhenOverCapacity) {
Jeffrey Heb23ff4c2017-08-23 07:32:49 +09001019 ASSERT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1020 kNumWorkersInWorkerPool);
1021
Francois Dorayb9223752017-09-06 03:44:43 +09001022 SaturateWithBlockingTasks(GetParam());
Francois Doray7c49b872017-09-12 02:27:50 +09001023 if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
Francois Dorayb9223752017-09-06 03:44:43 +09001024 ExpectWorkerCapacityAfterDelay(2 * kNumWorkersInWorkerPool);
Jeffrey Heb23ff4c2017-08-23 07:32:49 +09001025 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1026 2 * kNumWorkersInWorkerPool);
1027 // A range of possible number of workers is accepted because of
1028 // crbug.com/757897.
1029 EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(),
1030 kNumWorkersInWorkerPool + 1);
1031 EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(),
1032 2 * kNumWorkersInWorkerPool);
1033
1034 WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC,
1035 WaitableEvent::InitialState::NOT_SIGNALED);
1036 WaitableEvent thread_continue(WaitableEvent::ResetPolicy::MANUAL,
1037 WaitableEvent::InitialState::NOT_SIGNALED);
1038
1039 RepeatingClosure thread_running_barrier = BarrierClosure(
1040 kNumWorkersInWorkerPool,
1041 BindOnce(&WaitableEvent::Signal, Unretained(&thread_running)));
1042 // Posting these tasks should cause new workers to be created.
1043 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
Francois Doraye058b722017-09-12 04:53:29 +09001044 auto callback = BindOnce(
1045 [](Closure* thread_running_barrier, WaitableEvent* thread_continue) {
1046 thread_running_barrier->Run();
1047 {
1048 // Use ScopedClearBlockingObserver ForTesting to avoid affecting the
1049 // worker capacity with this WaitableEvent.
1050 internal::ScopedClearBlockingObserverForTesting
1051 scoped_clear_blocking_observer;
1052 thread_continue->Wait();
1053 }
1054 },
1055 Unretained(&thread_running_barrier), Unretained(&thread_continue));
1056 task_runner_->PostTask(FROM_HERE, std::move(callback));
Jeffrey Heb23ff4c2017-08-23 07:32:49 +09001057 }
1058 thread_running.Wait();
1059
1060 ASSERT_EQ(worker_pool_->NumberOfIdleWorkersForTesting(), 0U);
1061 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(),
1062 2 * kNumWorkersInWorkerPool);
1063
1064 AtomicFlag is_exiting;
1065 // These tasks should not get executed until after other tasks become
1066 // unblocked.
1067 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
1068 task_runner_->PostTask(FROM_HERE, BindOnce(
1069 [](AtomicFlag* is_exiting) {
1070 EXPECT_TRUE(is_exiting->IsSet());
1071 },
1072 Unretained(&is_exiting)));
1073 }
1074
1075 // The original |kNumWorkersInWorkerPool| will finish their tasks after being
1076 // unblocked. There will be work in the work queue, but the pool should now
1077 // be over-capacity and workers will become idle.
1078 UnblockTasks();
1079 worker_pool_->WaitForWorkersIdleForTesting(kNumWorkersInWorkerPool);
1080 EXPECT_EQ(worker_pool_->NumberOfIdleWorkersForTesting(),
1081 kNumWorkersInWorkerPool);
1082
1083 // Posting more tasks should not cause workers idle from the pool being over
1084 // capacity to begin doing work.
1085 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
1086 task_runner_->PostTask(FROM_HERE, BindOnce(
1087 [](AtomicFlag* is_exiting) {
1088 EXPECT_TRUE(is_exiting->IsSet());
1089 },
1090 Unretained(&is_exiting)));
1091 }
1092
1093 // Give time for those idle workers to possibly do work (which should not
1094 // happen).
1095 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1096
1097 is_exiting.Set();
1098 // Unblocks the new workers.
1099 thread_continue.Signal();
1100 task_tracker_.Flush();
1101}
1102
Francois Doray7c49b872017-09-12 02:27:50 +09001103INSTANTIATE_TEST_CASE_P(
1104 ,
1105 TaskSchedulerWorkerPoolBlockingTest,
1106 ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1107 OptionalBlockingType::NO_BLOCK,
1108 BlockingType::MAY_BLOCK),
1109 NestedBlockingType(BlockingType::WILL_BLOCK,
1110 OptionalBlockingType::NO_BLOCK,
1111 BlockingType::WILL_BLOCK),
1112 NestedBlockingType(BlockingType::MAY_BLOCK,
1113 OptionalBlockingType::WILL_BLOCK,
1114 BlockingType::WILL_BLOCK),
1115 NestedBlockingType(BlockingType::WILL_BLOCK,
1116 OptionalBlockingType::MAY_BLOCK,
1117 BlockingType::WILL_BLOCK)),
1118 TaskSchedulerWorkerPoolBlockingTest::ParamInfoToString);
Francois Dorayb9223752017-09-06 03:44:43 +09001119
Francois Doray7c49b872017-09-12 02:27:50 +09001120// Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1121// but exits the scope before the MayBlockThreshold() is reached, that the
1122// worker capacity does not increase.
1123TEST_F(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockUnblockPremature) {
Francois Dorayb9223752017-09-06 03:44:43 +09001124 ASSERT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1125 kNumWorkersInWorkerPool);
1126
1127 TimeDelta worker_capacity_change_sleep = GetWorkerCapacityChangeSleepTime();
1128 worker_pool_->MaximizeMayBlockThresholdForTesting();
1129
Francois Doray7c49b872017-09-12 02:27:50 +09001130 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1131 OptionalBlockingType::NO_BLOCK,
1132 BlockingType::MAY_BLOCK));
Francois Dorayb9223752017-09-06 03:44:43 +09001133 PlatformThread::Sleep(worker_capacity_change_sleep);
1134 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), kNumWorkersInWorkerPool);
1135 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1136 kNumWorkersInWorkerPool);
1137
1138 UnblockTasks();
1139 task_tracker_.Flush();
1140 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1141 kNumWorkersInWorkerPool);
1142}
1143
Francois Doray7c49b872017-09-12 02:27:50 +09001144// Verify that if worker capacity is incremented because of a MAY_BLOCK
1145// ScopedBlockingCall, it isn't incremented again when there is a nested
1146// WILL_BLOCK ScopedBlockingCall.
1147TEST_F(TaskSchedulerWorkerPoolBlockingTest,
1148 MayBlockIncreaseCapacityNestedWillBlock) {
1149 ASSERT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1150 kNumWorkersInWorkerPool);
1151 auto task_runner =
1152 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1153 WaitableEvent can_return(WaitableEvent::ResetPolicy::MANUAL,
1154 WaitableEvent::InitialState::NOT_SIGNALED);
1155
1156 // Saturate the pool so that a MAY_BLOCK ScopedBlockingCall would increment
1157 // the worker capacity.
1158 for (size_t i = 0; i < kNumWorkersInWorkerPool - 1; ++i) {
Francois Doraye058b722017-09-12 04:53:29 +09001159 task_runner->PostTask(FROM_HERE,
1160 BindOnce(
1161 [](WaitableEvent* can_return) {
1162 // Use ScopedClearBlockingObserverForTesting to
1163 // avoid affecting the worker capacity with this
1164 // WaitableEvent.
1165 internal::ScopedClearBlockingObserverForTesting
1166 scoped_clear_blocking_observer;
1167 can_return->Wait();
1168 },
1169 Unretained(&can_return)));
Francois Doray7c49b872017-09-12 02:27:50 +09001170 }
1171
1172 WaitableEvent can_instantiate_will_block(
1173 WaitableEvent::ResetPolicy::MANUAL,
1174 WaitableEvent::InitialState::NOT_SIGNALED);
1175 WaitableEvent did_instantiate_will_block(
1176 WaitableEvent::ResetPolicy::MANUAL,
1177 WaitableEvent::InitialState::NOT_SIGNALED);
1178
1179 // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1180 task_runner->PostTask(
1181 FROM_HERE,
1182 BindOnce(
1183 [](WaitableEvent* can_instantiate_will_block,
1184 WaitableEvent* did_instantiate_will_block,
1185 WaitableEvent* can_return) {
1186 ScopedBlockingCall may_block(BlockingType::MAY_BLOCK);
Francois Doraye058b722017-09-12 04:53:29 +09001187 {
1188 // Use ScopedClearBlockingObserverForTesting to avoid affecting
1189 // the worker capacity with this WaitableEvent.
1190 internal::ScopedClearBlockingObserverForTesting
1191 scoped_clear_blocking_observer;
1192 can_instantiate_will_block->Wait();
1193 }
Francois Doray7c49b872017-09-12 02:27:50 +09001194 ScopedBlockingCall will_block(BlockingType::WILL_BLOCK);
1195 did_instantiate_will_block->Signal();
Francois Doraye058b722017-09-12 04:53:29 +09001196 {
1197 // Use ScopedClearBlockingObserverForTesting to avoid affecting
1198 // the worker capacity with this WaitableEvent.
1199 internal::ScopedClearBlockingObserverForTesting
1200 scoped_clear_blocking_observer;
1201 can_return->Wait();
1202 }
Francois Doray7c49b872017-09-12 02:27:50 +09001203 },
1204 Unretained(&can_instantiate_will_block),
1205 Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1206
1207 // After a short delay, worker capacity should be incremented.
1208 ExpectWorkerCapacityAfterDelay(kNumWorkersInWorkerPool + 1);
1209
1210 // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1211 can_instantiate_will_block.Signal();
1212 did_instantiate_will_block.Wait();
1213
1214 // Worker capacity shouldn't be incremented again.
1215 EXPECT_EQ(kNumWorkersInWorkerPool + 1,
1216 worker_pool_->GetWorkerCapacityForTesting());
1217
1218 // Tear down.
1219 can_return.Signal();
1220 task_tracker_.Flush();
1221 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1222 kNumWorkersInWorkerPool);
1223}
1224
Jeffrey Heb23ff4c2017-08-23 07:32:49 +09001225// Verify that workers that become idle due to the pool being over capacity will
1226// eventually cleanup.
1227TEST(TaskSchedulerWorkerPoolOverWorkerCapacityTest, VerifyCleanup) {
1228 constexpr size_t kWorkerCapacity = 3;
1229
1230 TaskTracker task_tracker;
1231 DelayedTaskManager delayed_task_manager;
Francois Dorayb9223752017-09-06 03:44:43 +09001232 scoped_refptr<TaskRunner> service_thread_task_runner =
1233 MakeRefCounted<TestSimpleTaskRunner>();
1234 delayed_task_manager.Start(service_thread_task_runner);
Jeffrey Heb23ff4c2017-08-23 07:32:49 +09001235 SchedulerWorkerPoolImpl worker_pool("OverWorkerCapacityTestWorkerPool",
1236 ThreadPriority::NORMAL, &task_tracker,
1237 &delayed_task_manager);
1238 worker_pool.Start(
Francois Dorayb9223752017-09-06 03:44:43 +09001239 SchedulerWorkerPoolParams(kWorkerCapacity, kReclaimTimeForCleanupTests),
1240 service_thread_task_runner);
Jeffrey Heb23ff4c2017-08-23 07:32:49 +09001241
1242 scoped_refptr<TaskRunner> task_runner =
1243 worker_pool.CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1244
1245 WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC,
1246 WaitableEvent::InitialState::NOT_SIGNALED);
1247 WaitableEvent thread_continue(WaitableEvent::ResetPolicy::MANUAL,
1248 WaitableEvent::InitialState::NOT_SIGNALED);
1249 RepeatingClosure thread_running_barrier = BarrierClosure(
1250 kWorkerCapacity,
1251 BindOnce(&WaitableEvent::Signal, Unretained(&thread_running)));
1252
1253 WaitableEvent blocked_call_continue(
1254 WaitableEvent::ResetPolicy::MANUAL,
1255 WaitableEvent::InitialState::NOT_SIGNALED);
1256
1257 RepeatingClosure closure = BindRepeating(
1258 [](Closure* thread_running_barrier, WaitableEvent* thread_continue,
1259 WaitableEvent* blocked_call_continue) {
1260 thread_running_barrier->Run();
1261 {
1262 ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
1263 blocked_call_continue->Wait();
1264 }
1265 thread_continue->Wait();
1266
1267 },
1268 Unretained(&thread_running_barrier), Unretained(&thread_continue),
1269 Unretained(&blocked_call_continue));
1270
1271 for (size_t i = 0; i < kWorkerCapacity; ++i)
1272 task_runner->PostTask(FROM_HERE, closure);
1273
1274 thread_running.Wait();
1275
1276 WaitableEvent extra_threads_running(
1277 WaitableEvent::ResetPolicy::AUTOMATIC,
1278 WaitableEvent::InitialState::NOT_SIGNALED);
1279 WaitableEvent extra_threads_continue(
1280 WaitableEvent::ResetPolicy::MANUAL,
1281 WaitableEvent::InitialState::NOT_SIGNALED);
1282
1283 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1284 kWorkerCapacity,
1285 BindOnce(&WaitableEvent::Signal, Unretained(&extra_threads_running)));
1286 // These tasks should run on the new threads from increasing worker capacity.
1287 for (size_t i = 0; i < kWorkerCapacity; ++i) {
1288 task_runner->PostTask(FROM_HERE,
1289 BindOnce(
1290 [](Closure* extra_threads_running_barrier,
1291 WaitableEvent* extra_threads_continue) {
1292 extra_threads_running_barrier->Run();
1293 extra_threads_continue->Wait();
1294 },
1295 Unretained(&extra_threads_running_barrier),
1296 Unretained(&extra_threads_continue)));
1297 }
1298 extra_threads_running.Wait();
1299
1300 ASSERT_EQ(kWorkerCapacity * 2, worker_pool.NumberOfWorkersForTesting());
1301 EXPECT_EQ(kWorkerCapacity * 2, worker_pool.GetWorkerCapacityForTesting());
1302 blocked_call_continue.Signal();
1303 extra_threads_continue.Signal();
1304
1305 TimeTicks before_cleanup_start = TimeTicks::Now();
1306 while (TimeTicks::Now() - before_cleanup_start <
1307 kReclaimTimeForCleanupTests + kExtraTimeToWaitForCleanup) {
1308 if (worker_pool.NumberOfWorkersForTesting() <= kWorkerCapacity + 1)
1309 break;
1310
1311 // Periodically post tasks to ensure that posting tasks does not prevent
1312 // workers that are idle due to the pool being over capacity from cleaning
1313 // up.
1314 task_runner->PostTask(FROM_HERE, BindOnce(&DoNothing));
1315 PlatformThread::Sleep(kReclaimTimeForCleanupTests / 2);
1316 }
1317 // Note: one worker above capacity will not get cleaned up since it's on the
1318 // top of the idle stack.
1319 EXPECT_EQ(kWorkerCapacity + 1, worker_pool.NumberOfWorkersForTesting());
1320
1321 thread_continue.Signal();
1322
1323 worker_pool.DisallowWorkerCleanupForTesting();
1324 worker_pool.JoinForTesting();
1325}
1326
Jeffrey He44264742017-08-27 11:38:05 +09001327// Verify that the maximum number of workers is 256 and that hitting the max
1328// leaves the pool in a valid state with regards to worker capacity.
Francois Doray7c49b872017-09-12 02:27:50 +09001329TEST_F(TaskSchedulerWorkerPoolBlockingTest, MaximumWorkersTest) {
Jeffrey He44264742017-08-27 11:38:05 +09001330 constexpr size_t kMaxNumberOfWorkers = 256;
1331 constexpr size_t kNumExtraTasks = 10;
1332
1333 WaitableEvent early_blocking_thread_running(
1334 WaitableEvent::ResetPolicy::MANUAL,
1335 WaitableEvent::InitialState::NOT_SIGNALED);
1336 RepeatingClosure early_threads_barrier_closure =
1337 BarrierClosure(kMaxNumberOfWorkers,
1338 BindOnce(&WaitableEvent::Signal,
1339 Unretained(&early_blocking_thread_running)));
1340
1341 WaitableEvent early_threads_finished(
1342 WaitableEvent::ResetPolicy::MANUAL,
1343 WaitableEvent::InitialState::NOT_SIGNALED);
1344 RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1345 kMaxNumberOfWorkers,
1346 BindOnce(&WaitableEvent::Signal, Unretained(&early_threads_finished)));
1347
1348 WaitableEvent early_release_thread_continue(
1349 WaitableEvent::ResetPolicy::MANUAL,
1350 WaitableEvent::InitialState::NOT_SIGNALED);
1351
1352 // Post ScopedBlockingCall tasks to hit the worker cap.
1353 for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1354 task_runner_->PostTask(FROM_HERE,
1355 BindOnce(
1356 [](Closure* early_threads_barrier_closure,
1357 WaitableEvent* early_release_thread_continue,
1358 Closure* early_threads_finished) {
1359 {
1360 ScopedBlockingCall scoped_blocking_call(
1361 BlockingType::WILL_BLOCK);
1362 early_threads_barrier_closure->Run();
1363 early_release_thread_continue->Wait();
1364 }
1365 early_threads_finished->Run();
1366 },
1367 Unretained(&early_threads_barrier_closure),
1368 Unretained(&early_release_thread_continue),
1369 Unretained(&early_threads_finished_barrier)));
1370 }
1371
1372 early_blocking_thread_running.Wait();
1373 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1374 kNumWorkersInWorkerPool + kMaxNumberOfWorkers);
1375
1376 WaitableEvent late_release_thread_contine(
1377 WaitableEvent::ResetPolicy::MANUAL,
1378 WaitableEvent::InitialState::NOT_SIGNALED);
1379
1380 WaitableEvent late_blocking_thread_running(
1381 WaitableEvent::ResetPolicy::MANUAL,
1382 WaitableEvent::InitialState::NOT_SIGNALED);
1383 RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1384 kNumExtraTasks, BindOnce(&WaitableEvent::Signal,
1385 Unretained(&late_blocking_thread_running)));
1386
1387 // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1388 // tasks running. These tasks should not be able to get executed yet as
1389 // the pool is already at its max worker cap.
1390 for (size_t i = 0; i < kNumExtraTasks; ++i) {
1391 task_runner_->PostTask(
1392 FROM_HERE,
1393 BindOnce(
1394 [](Closure* late_threads_barrier_closure,
1395 WaitableEvent* late_release_thread_contine) {
1396 ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
1397 late_threads_barrier_closure->Run();
1398 late_release_thread_contine->Wait();
1399 },
1400 Unretained(&late_threads_barrier_closure),
1401 Unretained(&late_release_thread_contine)));
1402 }
1403
1404 // Give time to see if we exceed the max number of workers.
1405 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1406 EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1407
1408 early_release_thread_continue.Signal();
1409 early_threads_finished.Wait();
1410 late_blocking_thread_running.Wait();
1411
1412 WaitableEvent final_tasks_running(WaitableEvent::ResetPolicy::MANUAL,
1413 WaitableEvent::InitialState::NOT_SIGNALED);
1414 WaitableEvent final_tasks_continue(WaitableEvent::ResetPolicy::MANUAL,
1415 WaitableEvent::InitialState::NOT_SIGNALED);
1416 RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1417 kNumWorkersInWorkerPool,
1418 BindOnce(&WaitableEvent::Signal, Unretained(&final_tasks_running)));
1419
1420 // Verify that we are still able to saturate the pool.
1421 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
1422 task_runner_->PostTask(
1423 FROM_HERE,
1424 BindOnce(
1425 [](Closure* closure, WaitableEvent* final_tasks_continue) {
1426 closure->Run();
1427 final_tasks_continue->Wait();
1428 },
1429 Unretained(&final_tasks_running_barrier),
1430 Unretained(&final_tasks_continue)));
1431 }
1432 final_tasks_running.Wait();
1433 EXPECT_EQ(worker_pool_->GetWorkerCapacityForTesting(),
1434 kNumWorkersInWorkerPool + kNumExtraTasks);
1435 late_release_thread_contine.Signal();
1436 final_tasks_continue.Signal();
1437 task_tracker_.Flush();
1438}
1439
fdoraya2d271b2016-04-15 23:09:08 +09001440} // namespace internal
1441} // namespace base