blob: c1783baa60743742f4a7ba11074c723676eb19a1 [file] [log] [blame]
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070016 *
17 */
18
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070019#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H
20#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070021
Vijay Pai320ed132016-11-01 17:16:55 -070022#include <condition_variable>
Vijay Pai92e267e2018-01-10 00:39:06 +000023#include <functional>
Sree Kuchibhotla86004382016-07-18 22:27:39 -070024#include <list>
25#include <memory>
Vijay Pai320ed132016-11-01 17:16:55 -070026#include <mutex>
Sree Kuchibhotla86004382016-07-18 22:27:39 -070027
Sree Kuchibhotla33d54942016-10-25 10:03:52 -070028#include <grpc++/support/config.h>
Vijay Pai5dd32262017-11-14 19:04:02 -080029#include <grpc/support/thd.h>
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070030
31namespace grpc {
32
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070033class ThreadManager {
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070034 public:
Vijay Pai5dd32262017-11-14 19:04:02 -080035 ThreadManager(int min_pollers, int max_pollers,
36 std::function<int(gpr_thd_id*, const char*, void (*)(void*),
37 void*, const gpr_thd_options*)>
38 thread_creator,
39 std::function<void(gpr_thd_id)> thread_joiner);
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070040 virtual ~ThreadManager();
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070041
Sree Kuchibhotla33382d02016-10-03 15:08:48 -070042 // Initializes and Starts the Rpc Manager threads
Sree Kuchibhotla86004382016-07-18 22:27:39 -070043 void Initialize();
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070044
Sree Kuchibhotlaacd64db2016-10-03 14:29:47 -070045 // The return type of PollForWork() function
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070046 enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };
Sree Kuchibhotla86004382016-07-18 22:27:39 -070047
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070048 // "Polls" for new work.
49 // If the return value is WORK_FOUND:
50 // - The implementaion of PollForWork() MAY set some opaque identifier to
51 // (identify the work item found) via the '*tag' parameter
52 // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
53 // value of 'false' indicates some implemenation specific error (that is
54 // neither SHUTDOWN nor TIMEOUT)
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070055 // - ThreadManager does not interpret the values of 'tag' and 'ok'
56 // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070057 // DoWork()
Vijay Pai5dd32262017-11-14 19:04:02 -080058 // - ThreadManager will also pass DoWork a bool saying if there are actually
59 // resources to do the work
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070060 //
61 // If the return value is SHUTDOWN:,
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070062 // - ThreadManager WILL NOT call DoWork() and terminates the thead
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070063 //
64 // If the return value is TIMEOUT:,
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070065 // - ThreadManager WILL NOT call DoWork()
66 // - ThreadManager MAY terminate the thread depending on the current number
67 // of active poller threads and mix_pollers/max_pollers settings
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070068 // - Also, the value of timeout is specific to the derived class
69 // implementation
70 virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
71
72 // The implementation of DoWork() is supposed to perform the work found by
73 // PollForWork(). The tag and ok parameters are the same as returned by
74 // PollForWork()
75 //
76 // The implementation of DoWork() should also do any setup needed to ensure
77 // that the next call to PollForWork() (not necessarily by the current thread)
78 // actually finds some work
Vijay Pai5dd32262017-11-14 19:04:02 -080079 virtual void DoWork(void* tag, bool ok, bool resources) = 0;
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070080
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070081 // Mark the ThreadManager as shutdown and begin draining the work. This is a
82 // non-blocking call and the caller should call Wait(), a blocking call which
83 // returns only once the shutdown is complete
Craig Tiller991c1012017-04-18 19:43:14 +000084 virtual void Shutdown();
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070085
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070086 // Has Shutdown() been called
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070087 bool IsShutdown();
88
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070089 // A blocking call that returns only after the ThreadManager has shutdown and
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070090 // all the threads have drained all the outstanding work
Craig Tillera3e87892017-04-18 13:08:08 -070091 virtual void Wait();
Sree Kuchibhotlaaabada92016-08-24 10:01:13 -070092
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -070093 private:
Vijay Pai5dd32262017-11-14 19:04:02 -080094 // Helper wrapper class around thread. This takes a ThreadManager object
95 // and starts a new thread to calls the Run() function.
Sree Kuchibhotla86004382016-07-18 22:27:39 -070096 //
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -070097 // The Run() function calls ThreadManager::MainWorkLoop() function and once
98 // that completes, it marks the WorkerThread completed by calling
99 // ThreadManager::MarkAsCompleted()
100 class WorkerThread {
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700101 public:
Vijay Pai5dd32262017-11-14 19:04:02 -0800102 WorkerThread(ThreadManager* thd_mgr, bool* valid);
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -0700103 ~WorkerThread();
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700104
105 private:
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -0700106 // Calls thd_mgr_->MainWorkLoop() and once that completes, calls
107 // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700108 void Run();
109
David Garcia Quintasaef267b2017-07-14 18:35:50 -0700110 ThreadManager* const thd_mgr_;
111 std::mutex wt_mu_;
Vijay Pai5dd32262017-11-14 19:04:02 -0800112 gpr_thd_id thd_;
113 bool valid_;
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700114 };
115
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -0700116 // The main funtion in ThreadManager
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700117 void MainWorkLoop();
118
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -0700119 void MarkAsCompleted(WorkerThread* thd);
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700120 void CleanupCompletedThreads();
121
122 // Protects shutdown_, num_pollers_ and num_threads_
123 // TODO: sreek - Change num_pollers and num_threads_ to atomics
Vijay Pai320ed132016-11-01 17:16:55 -0700124 std::mutex mu_;
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700125
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -0700126 bool shutdown_;
Vijay Pai320ed132016-11-01 17:16:55 -0700127 std::condition_variable shutdown_cv_;
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700128
129 // Number of threads doing polling
130 int num_pollers_;
131
132 // The minimum and maximum number of threads that should be doing polling
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -0700133 int min_pollers_;
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700134 int max_pollers_;
135
136 // The total number of threads (includes threads includes the threads that are
137 // currently polling i.e num_pollers_)
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -0700138 int num_threads_;
Sree Kuchibhotla86004382016-07-18 22:27:39 -0700139
Vijay Pai5dd32262017-11-14 19:04:02 -0800140 // Functions for creating/joining threads. Normally, these should
141 // be gpr_thd_new/gpr_thd_join but they are overridable
142 std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
143 const gpr_thd_options*)>
144 thread_creator_;
145 std::function<void(gpr_thd_id)> thread_joiner_;
146
Vijay Pai320ed132016-11-01 17:16:55 -0700147 std::mutex list_mu_;
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -0700148 std::list<WorkerThread*> completed_threads_;
Sree Kuchibhotla9e656a42016-07-18 13:01:42 -0700149};
150
151} // namespace grpc
152
Sree Kuchibhotla8f7739b2016-10-13 15:12:55 -0700153#endif // GRPC_INTERNAL_CPP_THREAD_MANAGER_H