Fixed hang when using Node gRPC with other async operations
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index bf2cd94..efc611b 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -53,6 +53,9 @@
grpc_completion_queue *CompletionQueueAsyncWorker::queue;
+// Invariants: current_threads <= max_queue_threads
+// (current_threads == max_queue_threads) || (waiting_next_calls == 0)
+
int CompletionQueueAsyncWorker::current_threads;
int CompletionQueueAsyncWorker::waiting_next_calls;
@@ -74,11 +77,15 @@
void CompletionQueueAsyncWorker::Next() {
NanScope();
if (current_threads < max_queue_threads) {
+ current_threads += 1;
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
NanAsyncQueueWorker(worker);
} else {
waiting_next_calls += 1;
}
+ GPR_ASSERT(current_threads <= max_queue_threads);
+ GPR_ASSERT((current_threads == max_queue_threads) ||
+ (waiting_next_calls == 0));
}
void CompletionQueueAsyncWorker::Init(Handle<Object> exports) {
@@ -92,11 +99,15 @@
NanScope();
if (waiting_next_calls > 0) {
waiting_next_calls -= 1;
+ // Old worker removed, new worker added. current_threads += 0
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
NanAsyncQueueWorker(worker);
} else {
current_threads -= 1;
}
+ GPR_ASSERT(current_threads <= max_queue_threads);
+ GPR_ASSERT((current_threads == max_queue_threads) ||
+ (waiting_next_calls == 0));
NanCallback *callback = GetTagCallback(result.tag);
Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result.tag)};
callback->Call(2, argv);
@@ -106,6 +117,17 @@
void CompletionQueueAsyncWorker::HandleErrorCallback() {
NanScope();
+ if (waiting_next_calls > 0) {
+ waiting_next_calls -= 1;
+ // Old worker removed, new worker added. current_threads += 0
+ CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
+ NanAsyncQueueWorker(worker);
+ } else {
+ current_threads -= 1;
+ }
+ GPR_ASSERT(current_threads <= max_queue_threads);
+ GPR_ASSERT((current_threads == max_queue_threads) ||
+ (waiting_next_calls == 0));
NanCallback *callback = GetTagCallback(result.tag);
Handle<Value> argv[] = {NanError(ErrorMessage())};