blob: 81c78fe739bec8b9b91ef2860f320d240bb14052 [file] [log] [blame]
vjpaib28456b2015-07-23 14:17:10 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
vjpaib28456b2015-07-23 14:17:10 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
vjpaib28456b2015-07-23 14:17:10 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
vjpaib28456b2015-07-23 14:17:10 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
vjpaib28456b2015-07-23 14:17:10 -070016 *
17 */
18
Craig Tiller7c70b6c2017-01-23 07:48:42 -080019#include "src/cpp/server/dynamic_thread_pool.h"
20
Vijay Pai320ed132016-11-01 17:16:55 -070021#include <mutex>
22#include <thread>
yang-g3e4bd952015-08-24 15:54:07 -070023
Craig Tiller7c70b6c2017-01-23 07:48:42 -080024#include <grpc/support/log.h>
vjpaib28456b2015-07-23 14:17:10 -070025
26namespace grpc {
Craig Tiller7c70b6c2017-01-23 07:48:42 -080027
Craig Tillerd6c98df2015-08-18 09:33:44 -070028DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
29 : pool_(pool),
Vijay Pai320ed132016-11-01 17:16:55 -070030 thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
Vijay Pai0109d162016-11-01 17:20:42 -070031 this)) {}
vjpaib28456b2015-07-23 14:17:10 -070032DynamicThreadPool::DynamicThread::~DynamicThread() {
33 thd_->join();
34 thd_.reset();
35}
36
37void DynamicThreadPool::DynamicThread::ThreadFunc() {
38 pool_->ThreadFunc();
39 // Now that we have killed ourselves, we should reduce the thread count
Vijay Pai320ed132016-11-01 17:16:55 -070040 std::unique_lock<std::mutex> lock(pool_->mu_);
vjpaib28456b2015-07-23 14:17:10 -070041 pool_->nthreads_--;
vjpai02b80542015-07-23 17:44:45 -070042 // Move ourselves to dead list
43 pool_->dead_threads_.push_back(this);
44
vjpaib76f3ad2015-07-23 14:41:23 -070045 if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
46 pool_->shutdown_cv_.notify_one();
47 }
vjpaib28456b2015-07-23 14:17:10 -070048}
Craig Tillerd6c98df2015-08-18 09:33:44 -070049
vjpaib28456b2015-07-23 14:17:10 -070050void DynamicThreadPool::ThreadFunc() {
51 for (;;) {
52 // Wait until work is available or we are shutting down.
Vijay Pai320ed132016-11-01 17:16:55 -070053 std::unique_lock<std::mutex> lock(mu_);
vjpaib28456b2015-07-23 14:17:10 -070054 if (!shutdown_ && callbacks_.empty()) {
55 // If there are too many threads waiting, then quit this thread
vjpai02b80542015-07-23 17:44:45 -070056 if (threads_waiting_ >= reserve_threads_) {
Craig Tillerd6c98df2015-08-18 09:33:44 -070057 break;
vjpaib28456b2015-07-23 14:17:10 -070058 }
59 threads_waiting_++;
60 cv_.wait(lock);
61 threads_waiting_--;
62 }
63 // Drain callbacks before considering shutdown to ensure all work
64 // gets completed.
65 if (!callbacks_.empty()) {
66 auto cb = callbacks_.front();
67 callbacks_.pop();
68 lock.unlock();
69 cb();
70 } else if (shutdown_) {
71 break;
72 }
73 }
74}
75
Craig Tillerd6c98df2015-08-18 09:33:44 -070076DynamicThreadPool::DynamicThreadPool(int reserve_threads)
77 : shutdown_(false),
78 reserve_threads_(reserve_threads),
79 nthreads_(0),
80 threads_waiting_(0) {
vjpaib28456b2015-07-23 14:17:10 -070081 for (int i = 0; i < reserve_threads_; i++) {
Vijay Pai320ed132016-11-01 17:16:55 -070082 std::lock_guard<std::mutex> lock(mu_);
vjpaib28456b2015-07-23 14:17:10 -070083 nthreads_++;
vjpai02b80542015-07-23 17:44:45 -070084 new DynamicThread(this);
vjpaib28456b2015-07-23 14:17:10 -070085 }
86}
87
88void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
vjpai24b3b7e2015-07-23 18:51:03 -070089 for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
Craig Tillerd6c98df2015-08-18 09:33:44 -070090 delete *t;
vjpaib28456b2015-07-23 14:17:10 -070091 }
92}
Craig Tillerd6c98df2015-08-18 09:33:44 -070093
vjpaib28456b2015-07-23 14:17:10 -070094DynamicThreadPool::~DynamicThreadPool() {
Vijay Pai320ed132016-11-01 17:16:55 -070095 std::unique_lock<std::mutex> lock(mu_);
vjpaib76f3ad2015-07-23 14:41:23 -070096 shutdown_ = true;
97 cv_.notify_all();
98 while (nthreads_ != 0) {
99 shutdown_cv_.wait(lock);
vjpaib28456b2015-07-23 14:17:10 -0700100 }
vjpaib28456b2015-07-23 14:17:10 -0700101 ReapThreads(&dead_threads_);
102}
103
104void DynamicThreadPool::Add(const std::function<void()>& callback) {
Vijay Pai320ed132016-11-01 17:16:55 -0700105 std::lock_guard<std::mutex> lock(mu_);
vjpai02b80542015-07-23 17:44:45 -0700106 // Add works to the callbacks list
107 callbacks_.push(callback);
108 // Increase pool size or notify as needed
vjpaib28456b2015-07-23 14:17:10 -0700109 if (threads_waiting_ == 0) {
110 // Kick off a new thread
111 nthreads_++;
vjpai02b80542015-07-23 17:44:45 -0700112 new DynamicThread(this);
113 } else {
114 cv_.notify_one();
vjpaib28456b2015-07-23 14:17:10 -0700115 }
vjpaib28456b2015-07-23 14:17:10 -0700116 // Also use this chance to harvest dead threads
117 if (!dead_threads_.empty()) {
118 ReapThreads(&dead_threads_);
119 }
120}
121
122} // namespace grpc