blob: 1917074dc2dfe85ddd238513b6ae2199291b52ef [file] [log] [blame]
murgatroid99749666e2015-01-12 18:25:58 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
murgatroid99749666e2015-01-12 18:25:58 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid99013d2032016-11-01 13:05:24 -070034/* I don't like using #ifndef, but I don't see a better way to do this */
35#ifndef GRPC_UV
36
murgatroid99e5061512015-01-12 18:14:35 -080037#include <node.h>
38#include <nan.h>
39
40#include "grpc/grpc.h"
murgatroid9977659062015-02-11 09:26:25 -080041#include "grpc/support/log.h"
murgatroid99e5061512015-01-12 18:14:35 -080042#include "grpc/support/time.h"
murgatroid99013d2032016-11-01 13:05:24 -070043#include "completion_queue.h"
murgatroid99016bb502015-02-09 15:55:10 -080044#include "call.h"
murgatroid99e5061512015-01-12 18:14:35 -080045
46namespace grpc {
47namespace node {
48
murgatroid99013d2032016-11-01 13:05:24 -070049namespace {
50
51/* A worker that asynchronously calls completion_queue_next, and queues onto the
52 node event loop a call to the function stored in the event's tag. */
53class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
54 public:
55 CompletionQueueAsyncWorker();
56
57 ~CompletionQueueAsyncWorker();
58 /* Calls completion_queue_next with the provided deadline, and stores the
59 event if there was one or sets an error message if there was not */
60 void Execute();
61
62 /* Returns the completion queue attached to this class */
63 static grpc_completion_queue *GetQueue();
64
65 /* Convenience function to create a worker with the given arguments and queue
66 it to run asynchronously */
67 static void Next();
68
69 /* Initialize the CompletionQueueAsyncWorker class */
70 static void Init(v8::Local<v8::Object> exports);
71
72 protected:
73 /* Called when Execute has succeeded (completed without setting an error
74 message). Calls the saved callback with the event that came from
75 completion_queue_next */
76 void HandleOKCallback();
77
78 void HandleErrorCallback();
79
80 private:
Michael Lumishfe6b3282017-02-07 11:20:16 -080081 static void TryAddWorker();
82
murgatroid99013d2032016-11-01 13:05:24 -070083 grpc_event result;
84
85 static grpc_completion_queue *queue;
86
87 // Number of grpc_completion_queue_next calls in the thread pool
88 static int current_threads;
89 // Number of grpc_completion_queue_next calls waiting to enter the thread pool
90 static int waiting_next_calls;
91};
92
murgatroid997ab95fb2015-02-02 16:50:07 -080093const int max_queue_threads = 2;
94
murgatroid99e5061512015-01-12 18:14:35 -080095using v8::Function;
murgatroid992b097832015-09-17 13:56:25 -070096using v8::Local;
murgatroid99e5061512015-01-12 18:14:35 -080097using v8::Object;
murgatroid99e5061512015-01-12 18:14:35 -080098using v8::Value;
99
100grpc_completion_queue *CompletionQueueAsyncWorker::queue;
101
murgatroid99e339f6f2015-09-23 11:38:39 -0700102// Invariants: current_threads <= max_queue_threads
103// (current_threads == max_queue_threads) || (waiting_next_calls == 0)
104
murgatroid997ab95fb2015-02-02 16:50:07 -0800105int CompletionQueueAsyncWorker::current_threads;
106int CompletionQueueAsyncWorker::waiting_next_calls;
107
Craig Tillere8e304e2015-01-13 14:41:29 -0800108CompletionQueueAsyncWorker::CompletionQueueAsyncWorker()
murgatroid992b097832015-09-17 13:56:25 -0700109 : Nan::AsyncWorker(NULL) {}
murgatroid99e5061512015-01-12 18:14:35 -0800110
Craig Tillere8e304e2015-01-13 14:41:29 -0800111CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
murgatroid99e5061512015-01-12 18:14:35 -0800112
113void CompletionQueueAsyncWorker::Execute() {
Craig Tiller354398f2015-07-13 09:16:03 -0700114 result =
Nicolas "Pixel" Noble8e6bab52015-08-07 01:40:49 +0200115 grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
Craig Tiller64be9f72015-05-04 14:53:51 -0700116 if (!result.success) {
murgatroid99937ac9b2015-08-13 11:00:13 -0700117 SetErrorMessage("The async function encountered an error");
murgatroid99d4d67ad2015-02-09 10:43:21 -0800118 }
murgatroid99e5061512015-01-12 18:14:35 -0800119}
120
Craig Tillere8e304e2015-01-13 14:41:29 -0800121grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
murgatroid99e5061512015-01-12 18:14:35 -0800122
Michael Lumishfe6b3282017-02-07 11:20:16 -0800123void CompletionQueueAsyncWorker::TryAddWorker() {
124 if (current_threads < max_queue_threads && waiting_next_calls > 0) {
murgatroid99e339f6f2015-09-23 11:38:39 -0700125 current_threads += 1;
Michael Lumishfe6b3282017-02-07 11:20:16 -0800126 waiting_next_calls -= 1;
murgatroid997ab95fb2015-02-02 16:50:07 -0800127 CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
murgatroid992b097832015-09-17 13:56:25 -0700128 Nan::AsyncQueueWorker(worker);
murgatroid997ab95fb2015-02-02 16:50:07 -0800129 }
murgatroid99e339f6f2015-09-23 11:38:39 -0700130 GPR_ASSERT(current_threads <= max_queue_threads);
131 GPR_ASSERT((current_threads == max_queue_threads) ||
132 (waiting_next_calls == 0));
Michael Lumishfe6b3282017-02-07 11:20:16 -0800133}
134
135void CompletionQueueAsyncWorker::Next() {
136 waiting_next_calls += 1;
137 TryAddWorker();
murgatroid99e5061512015-01-12 18:14:35 -0800138}
139
murgatroid992b097832015-09-17 13:56:25 -0700140void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
141 Nan::HandleScope scope;
murgatroid997ab95fb2015-02-02 16:50:07 -0800142 current_threads = 0;
143 waiting_next_calls = 0;
Nicolas "Pixel" Noble8e6bab52015-08-07 01:40:49 +0200144 queue = grpc_completion_queue_create(NULL);
murgatroid99e5061512015-01-12 18:14:35 -0800145}
146
147void CompletionQueueAsyncWorker::HandleOKCallback() {
murgatroid992b097832015-09-17 13:56:25 -0700148 Nan::HandleScope scope;
Michael Lumishfe6b3282017-02-07 11:20:16 -0800149 current_threads -= 1;
150 TryAddWorker();
murgatroid992b097832015-09-17 13:56:25 -0700151 Nan::Callback *callback = GetTagCallback(result.tag);
152 Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
murgatroid991578c6a2015-02-11 16:19:55 -0800153 callback->Call(2, argv);
murgatroid99e5061512015-01-12 18:14:35 -0800154
Craig Tiller64be9f72015-05-04 14:53:51 -0700155 DestroyTag(result.tag);
murgatroid99d4d67ad2015-02-09 10:43:21 -0800156}
murgatroid99e5061512015-01-12 18:14:35 -0800157
murgatroid99d4d67ad2015-02-09 10:43:21 -0800158void CompletionQueueAsyncWorker::HandleErrorCallback() {
murgatroid992b097832015-09-17 13:56:25 -0700159 Nan::HandleScope scope;
Michael Lumishfe6b3282017-02-07 11:20:16 -0800160 current_threads -= 1;
161 TryAddWorker();
murgatroid992b097832015-09-17 13:56:25 -0700162 Nan::Callback *callback = GetTagCallback(result.tag);
163 Local<Value> argv[] = {Nan::Error(ErrorMessage())};
murgatroid99d4d67ad2015-02-09 10:43:21 -0800164
murgatroid991578c6a2015-02-11 16:19:55 -0800165 callback->Call(1, argv);
murgatroid9977659062015-02-11 09:26:25 -0800166
Craig Tiller64be9f72015-05-04 14:53:51 -0700167 DestroyTag(result.tag);
murgatroid99e5061512015-01-12 18:14:35 -0800168}
169
murgatroid99013d2032016-11-01 13:05:24 -0700170} // namespace
171
172grpc_completion_queue *GetCompletionQueue() {
173 return CompletionQueueAsyncWorker::GetQueue();
174}
175
176void CompletionQueueNext() {
177 CompletionQueueAsyncWorker::Next();
178}
179
180void CompletionQueueInit(Local<Object> exports) {
181 CompletionQueueAsyncWorker::Init(exports);
182}
183
murgatroid99e5061512015-01-12 18:14:35 -0800184} // namespace node
185} // namespace grpc
murgatroid99013d2032016-11-01 13:05:24 -0700186
187#endif /* GRPC_UV */