Deprecate grpc::thread and sync in favor of std::thread,mutex,etc
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index ca4515d..c073741 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -93,7 +93,7 @@
void ClientContext::set_call(grpc_call* call,
const std::shared_ptr<Channel>& channel) {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
GPR_ASSERT(call_ == nullptr);
call_ = call;
channel_ = channel;
@@ -119,7 +119,7 @@
}
void ClientContext::TryCancel() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (call_) {
grpc_call_cancel(call_, nullptr);
} else {
diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc
index 4b226c2..95a819d 100644
--- a/src/cpp/server/dynamic_thread_pool.cc
+++ b/src/cpp/server/dynamic_thread_pool.cc
@@ -31,15 +31,15 @@
*
*/
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
+#include <mutex>
+#include <thread>
#include "src/cpp/server/dynamic_thread_pool.h"
namespace grpc {
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
: pool_(pool),
- thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
+ thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
this)) {}
DynamicThreadPool::DynamicThread::~DynamicThread() {
thd_->join();
@@ -49,7 +49,7 @@
void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->ThreadFunc();
// Now that we have killed ourselves, we should reduce the thread count
- grpc::unique_lock<grpc::mutex> lock(pool_->mu_);
+ std::unique_lock<std::mutex> lock(pool_->mu_);
pool_->nthreads_--;
// Move ourselves to dead list
pool_->dead_threads_.push_back(this);
@@ -62,7 +62,7 @@
void DynamicThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
@@ -91,7 +91,7 @@
nthreads_(0),
threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) {
- grpc::lock_guard<grpc::mutex> lock(mu_);
+ std::lock_guard<std::mutex> lock(mu_);
nthreads_++;
new DynamicThread(this);
}
@@ -104,7 +104,7 @@
}
DynamicThreadPool::~DynamicThreadPool() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
shutdown_ = true;
cv_.notify_all();
while (nthreads_ != 0) {
@@ -114,7 +114,7 @@
}
void DynamicThreadPool::Add(const std::function<void()>& callback) {
- grpc::lock_guard<grpc::mutex> lock(mu_);
+ std::lock_guard<std::mutex> lock(mu_);
// Add works to the callbacks list
callbacks_.push(callback);
// Increase pool size or notify as needed
diff --git a/src/cpp/server/dynamic_thread_pool.h b/src/cpp/server/dynamic_thread_pool.h
index 61b1cc9..4f8c411 100644
--- a/src/cpp/server/dynamic_thread_pool.h
+++ b/src/cpp/server/dynamic_thread_pool.h
@@ -34,12 +34,13 @@
#ifndef GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
#define GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
+#include <condition_variable>
#include <list>
#include <memory>
+#include <mutex>
#include <queue>
+#include <thread>
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
#include <grpc++/support/config.h>
#include "src/cpp/server/thread_pool_interface.h"
@@ -61,12 +62,12 @@
private:
DynamicThreadPool* pool_;
- std::unique_ptr<grpc::thread> thd_;
+ std::unique_ptr<std::thread> thd_;
void ThreadFunc();
};
- grpc::mutex mu_;
- grpc::condition_variable cv_;
- grpc::condition_variable shutdown_cv_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ std::condition_variable shutdown_cv_;
bool shutdown_;
std::queue<std::function<void()>> callbacks_;
int reserve_threads_;
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 7f32848..b7cfd6d 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -379,7 +379,7 @@
Server::~Server() {
{
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
@@ -501,7 +501,7 @@
}
void Server::ShutdownInternal(gpr_timespec deadline) {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
@@ -549,7 +549,7 @@
}
void Server::Wait() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
while (started_ && !shutdown_notified_) {
shutdown_cv_.wait(lock);
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 559b5bf8..a66ec4a 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -33,9 +33,10 @@
#include <grpc++/server_context.h>
+#include <mutex>
+
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
-#include <grpc++/impl/sync.h>
#include <grpc++/support/time.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
@@ -76,20 +77,20 @@
private:
bool CheckCancelledNoPluck() {
- grpc::lock_guard<grpc::mutex> g(mu_);
+ std::lock_guard<std::mutex> g(mu_);
return finalized_ ? (cancelled_ != 0) : false;
}
bool has_tag_;
void* tag_;
- grpc::mutex mu_;
+ std::mutex mu_;
int refs_;
bool finalized_;
int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (--refs_ == 0) {
lock.unlock();
delete this;
@@ -105,7 +106,7 @@
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
finalized_ = true;
bool ret = false;
if (has_tag_) {
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index caae4c4..1450d00 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -31,13 +31,14 @@
*
*/
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
-#include <grpc/support/log.h>
-#include <climits>
-
#include "src/cpp/thread_manager/thread_manager.h"
+#include <climits>
+#include <mutex>
+#include <thread>
+
+#include <grpc/support/log.h>
+
namespace grpc {
ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
@@ -59,7 +60,7 @@
ThreadManager::~ThreadManager() {
{
- std::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
GPR_ASSERT(num_threads_ == 0);
}
@@ -67,29 +68,29 @@
}
void ThreadManager::Wait() {
- std::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
while (num_threads_ != 0) {
shutdown_cv_.wait(lock);
}
}
void ThreadManager::Shutdown() {
- std::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
shutdown_ = true;
}
bool ThreadManager::IsShutdown() {
- std::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
return shutdown_;
}
void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
{
- std::unique_lock<grpc::mutex> list_lock(list_mu_);
+ std::unique_lock<std::mutex> list_lock(list_mu_);
completed_threads_.push_back(thd);
}
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
num_threads_--;
if (num_threads_ == 0) {
shutdown_cv_.notify_one();
@@ -97,7 +98,7 @@
}
void ThreadManager::CleanupCompletedThreads() {
- std::unique_lock<grpc::mutex> lock(list_mu_);
+ std::unique_lock<std::mutex> lock(list_mu_);
for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
thd = completed_threads_.erase(thd)) {
delete *thd;
@@ -114,7 +115,7 @@
// less than max threshold (i.e max_pollers_) and the total number of threads is
// below the maximum threshold, we can let the current thread continue as poller
bool ThreadManager::MaybeContinueAsPoller() {
- std::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (shutdown_ || num_pollers_ > max_pollers_) {
return false;
}
@@ -127,7 +128,7 @@
// threads currently blocked in PollForWork()) is below the threshold (i.e
// min_pollers_) and the total number of threads is below the maximum threshold
void ThreadManager::MaybeCreatePoller() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
@@ -156,7 +157,7 @@
WorkStatus work_status = PollForWork(&tag, &ok);
{
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
num_pollers_--;
if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index 9cfdb8a..9c0569c 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -34,11 +34,12 @@
#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H
#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H
+#include <condition_variable>
#include <list>
#include <memory>
+#include <mutex>
+#include <thread>
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
#include <grpc++/support/config.h>
namespace grpc {
@@ -115,7 +116,7 @@
void Run();
ThreadManager* thd_mgr_;
- grpc::thread thd_;
+ std::thread thd_;
};
// The main funtion in ThreadManager
@@ -134,10 +135,10 @@
// Protects shutdown_, num_pollers_ and num_threads_
// TODO: sreek - Change num_pollers and num_threads_ to atomics
- grpc::mutex mu_;
+ std::mutex mu_;
bool shutdown_;
- grpc::condition_variable shutdown_cv_;
+ std::condition_variable shutdown_cv_;
// Number of threads doing polling
int num_pollers_;
@@ -150,7 +151,7 @@
// currently polling i.e num_pollers_)
int num_threads_;
- grpc::mutex list_mu_;
+ std::mutex list_mu_;
std::list<WorkerThread*> completed_threads_;
};