blob: 4b226c2992d4827506b2726439d24151475fc76d [file] [log] [blame]
vjpaib28456b2015-07-23 14:17:10 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc++/impl/sync.h>
35#include <grpc++/impl/thd.h>
yang-g3e4bd952015-08-24 15:54:07 -070036
Vijay Paie8a7e302015-08-24 10:52:33 -070037#include "src/cpp/server/dynamic_thread_pool.h"
vjpaib28456b2015-07-23 14:17:10 -070038
39namespace grpc {
Craig Tillerd6c98df2015-08-18 09:33:44 -070040DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
41 : pool_(pool),
42 thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
43 this)) {}
vjpaib28456b2015-07-23 14:17:10 -070044DynamicThreadPool::DynamicThread::~DynamicThread() {
45 thd_->join();
46 thd_.reset();
47}
48
49void DynamicThreadPool::DynamicThread::ThreadFunc() {
50 pool_->ThreadFunc();
51 // Now that we have killed ourselves, we should reduce the thread count
52 grpc::unique_lock<grpc::mutex> lock(pool_->mu_);
53 pool_->nthreads_--;
vjpai02b80542015-07-23 17:44:45 -070054 // Move ourselves to dead list
55 pool_->dead_threads_.push_back(this);
56
vjpaib76f3ad2015-07-23 14:41:23 -070057 if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
58 pool_->shutdown_cv_.notify_one();
59 }
vjpaib28456b2015-07-23 14:17:10 -070060}
Craig Tillerd6c98df2015-08-18 09:33:44 -070061
vjpaib28456b2015-07-23 14:17:10 -070062void DynamicThreadPool::ThreadFunc() {
63 for (;;) {
64 // Wait until work is available or we are shutting down.
65 grpc::unique_lock<grpc::mutex> lock(mu_);
66 if (!shutdown_ && callbacks_.empty()) {
67 // If there are too many threads waiting, then quit this thread
vjpai02b80542015-07-23 17:44:45 -070068 if (threads_waiting_ >= reserve_threads_) {
Craig Tillerd6c98df2015-08-18 09:33:44 -070069 break;
vjpaib28456b2015-07-23 14:17:10 -070070 }
71 threads_waiting_++;
72 cv_.wait(lock);
73 threads_waiting_--;
74 }
75 // Drain callbacks before considering shutdown to ensure all work
76 // gets completed.
77 if (!callbacks_.empty()) {
78 auto cb = callbacks_.front();
79 callbacks_.pop();
80 lock.unlock();
81 cb();
82 } else if (shutdown_) {
83 break;
84 }
85 }
86}
87
Craig Tillerd6c98df2015-08-18 09:33:44 -070088DynamicThreadPool::DynamicThreadPool(int reserve_threads)
89 : shutdown_(false),
90 reserve_threads_(reserve_threads),
91 nthreads_(0),
92 threads_waiting_(0) {
vjpaib28456b2015-07-23 14:17:10 -070093 for (int i = 0; i < reserve_threads_; i++) {
94 grpc::lock_guard<grpc::mutex> lock(mu_);
95 nthreads_++;
vjpai02b80542015-07-23 17:44:45 -070096 new DynamicThread(this);
vjpaib28456b2015-07-23 14:17:10 -070097 }
98}
99
100void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
vjpai24b3b7e2015-07-23 18:51:03 -0700101 for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
Craig Tillerd6c98df2015-08-18 09:33:44 -0700102 delete *t;
vjpaib28456b2015-07-23 14:17:10 -0700103 }
104}
Craig Tillerd6c98df2015-08-18 09:33:44 -0700105
vjpaib28456b2015-07-23 14:17:10 -0700106DynamicThreadPool::~DynamicThreadPool() {
vjpaib76f3ad2015-07-23 14:41:23 -0700107 grpc::unique_lock<grpc::mutex> lock(mu_);
108 shutdown_ = true;
109 cv_.notify_all();
110 while (nthreads_ != 0) {
111 shutdown_cv_.wait(lock);
vjpaib28456b2015-07-23 14:17:10 -0700112 }
vjpaib28456b2015-07-23 14:17:10 -0700113 ReapThreads(&dead_threads_);
114}
115
116void DynamicThreadPool::Add(const std::function<void()>& callback) {
117 grpc::lock_guard<grpc::mutex> lock(mu_);
vjpai02b80542015-07-23 17:44:45 -0700118 // Add works to the callbacks list
119 callbacks_.push(callback);
120 // Increase pool size or notify as needed
vjpaib28456b2015-07-23 14:17:10 -0700121 if (threads_waiting_ == 0) {
122 // Kick off a new thread
123 nthreads_++;
vjpai02b80542015-07-23 17:44:45 -0700124 new DynamicThread(this);
125 } else {
126 cv_.notify_one();
vjpaib28456b2015-07-23 14:17:10 -0700127 }
vjpaib28456b2015-07-23 14:17:10 -0700128 // Also use this chance to harvest dead threads
129 if (!dead_threads_.empty()) {
130 ReapThreads(&dead_threads_);
131 }
132}
133
134} // namespace grpc