blob: 6302e7a103b112378b36d87fbacf0c67b5d56b8b [file] [log] [blame]
murgatroid99749666e2015-01-12 18:25:58 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
murgatroid99749666e2015-01-12 18:25:58 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid99013d2032016-11-01 13:05:24 -070034/* I don't like using #ifndef, but I don't see a better way to do this */
35#ifndef GRPC_UV
36
murgatroid99e5061512015-01-12 18:14:35 -080037#include <node.h>
38#include <nan.h>
39
40#include "grpc/grpc.h"
murgatroid9977659062015-02-11 09:26:25 -080041#include "grpc/support/log.h"
murgatroid99e5061512015-01-12 18:14:35 -080042#include "grpc/support/time.h"
murgatroid99013d2032016-11-01 13:05:24 -070043#include "completion_queue.h"
murgatroid99016bb502015-02-09 15:55:10 -080044#include "call.h"
murgatroid99e5061512015-01-12 18:14:35 -080045
46namespace grpc {
47namespace node {
48
murgatroid99013d2032016-11-01 13:05:24 -070049namespace {
50
51/* A worker that asynchronously calls completion_queue_next, and queues onto the
52 node event loop a call to the function stored in the event's tag. */
53class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
54 public:
55 CompletionQueueAsyncWorker();
56
57 ~CompletionQueueAsyncWorker();
58 /* Calls completion_queue_next with the provided deadline, and stores the
59 event if there was one or sets an error message if there was not */
60 void Execute();
61
62 /* Returns the completion queue attached to this class */
63 static grpc_completion_queue *GetQueue();
64
65 /* Convenience function to create a worker with the given arguments and queue
66 it to run asynchronously */
67 static void Next();
68
69 /* Initialize the CompletionQueueAsyncWorker class */
70 static void Init(v8::Local<v8::Object> exports);
71
72 protected:
73 /* Called when Execute has succeeded (completed without setting an error
74 message). Calls the saved callback with the event that came from
75 completion_queue_next */
76 void HandleOKCallback();
77
78 void HandleErrorCallback();
79
80 private:
81 grpc_event result;
82
83 static grpc_completion_queue *queue;
84
85 // Number of grpc_completion_queue_next calls in the thread pool
86 static int current_threads;
87 // Number of grpc_completion_queue_next calls waiting to enter the thread pool
88 static int waiting_next_calls;
89};
90
murgatroid997ab95fb2015-02-02 16:50:07 -080091const int max_queue_threads = 2;
92
murgatroid99e5061512015-01-12 18:14:35 -080093using v8::Function;
murgatroid992b097832015-09-17 13:56:25 -070094using v8::Local;
murgatroid99e5061512015-01-12 18:14:35 -080095using v8::Object;
murgatroid99e5061512015-01-12 18:14:35 -080096using v8::Value;
97
98grpc_completion_queue *CompletionQueueAsyncWorker::queue;
99
murgatroid99e339f6f2015-09-23 11:38:39 -0700100// Invariants: current_threads <= max_queue_threads
101// (current_threads == max_queue_threads) || (waiting_next_calls == 0)
102
murgatroid997ab95fb2015-02-02 16:50:07 -0800103int CompletionQueueAsyncWorker::current_threads;
104int CompletionQueueAsyncWorker::waiting_next_calls;
105
Craig Tillere8e304e2015-01-13 14:41:29 -0800106CompletionQueueAsyncWorker::CompletionQueueAsyncWorker()
murgatroid992b097832015-09-17 13:56:25 -0700107 : Nan::AsyncWorker(NULL) {}
murgatroid99e5061512015-01-12 18:14:35 -0800108
Craig Tillere8e304e2015-01-13 14:41:29 -0800109CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
murgatroid99e5061512015-01-12 18:14:35 -0800110
111void CompletionQueueAsyncWorker::Execute() {
Craig Tiller354398f2015-07-13 09:16:03 -0700112 result =
Nicolas "Pixel" Noble8e6bab52015-08-07 01:40:49 +0200113 grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
Craig Tiller64be9f72015-05-04 14:53:51 -0700114 if (!result.success) {
murgatroid99937ac9b2015-08-13 11:00:13 -0700115 SetErrorMessage("The async function encountered an error");
murgatroid99d4d67ad2015-02-09 10:43:21 -0800116 }
murgatroid99e5061512015-01-12 18:14:35 -0800117}
118
Craig Tillere8e304e2015-01-13 14:41:29 -0800119grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
murgatroid99e5061512015-01-12 18:14:35 -0800120
121void CompletionQueueAsyncWorker::Next() {
murgatroid999030c812016-09-16 13:25:08 -0700122#ifndef GRPC_UV
murgatroid992b097832015-09-17 13:56:25 -0700123 Nan::HandleScope scope;
murgatroid997ab95fb2015-02-02 16:50:07 -0800124 if (current_threads < max_queue_threads) {
murgatroid99e339f6f2015-09-23 11:38:39 -0700125 current_threads += 1;
murgatroid997ab95fb2015-02-02 16:50:07 -0800126 CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
murgatroid992b097832015-09-17 13:56:25 -0700127 Nan::AsyncQueueWorker(worker);
murgatroid997ab95fb2015-02-02 16:50:07 -0800128 } else {
129 waiting_next_calls += 1;
130 }
murgatroid99e339f6f2015-09-23 11:38:39 -0700131 GPR_ASSERT(current_threads <= max_queue_threads);
132 GPR_ASSERT((current_threads == max_queue_threads) ||
133 (waiting_next_calls == 0));
murgatroid999030c812016-09-16 13:25:08 -0700134#endif
murgatroid99e5061512015-01-12 18:14:35 -0800135}
136
murgatroid992b097832015-09-17 13:56:25 -0700137void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
138 Nan::HandleScope scope;
murgatroid997ab95fb2015-02-02 16:50:07 -0800139 current_threads = 0;
140 waiting_next_calls = 0;
Nicolas "Pixel" Noble8e6bab52015-08-07 01:40:49 +0200141 queue = grpc_completion_queue_create(NULL);
murgatroid99e5061512015-01-12 18:14:35 -0800142}
143
144void CompletionQueueAsyncWorker::HandleOKCallback() {
murgatroid992b097832015-09-17 13:56:25 -0700145 Nan::HandleScope scope;
murgatroid997ab95fb2015-02-02 16:50:07 -0800146 if (waiting_next_calls > 0) {
147 waiting_next_calls -= 1;
murgatroid99e339f6f2015-09-23 11:38:39 -0700148 // Old worker removed, new worker added. current_threads += 0
murgatroid997ab95fb2015-02-02 16:50:07 -0800149 CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
murgatroid992b097832015-09-17 13:56:25 -0700150 Nan::AsyncQueueWorker(worker);
murgatroid997ab95fb2015-02-02 16:50:07 -0800151 } else {
152 current_threads -= 1;
153 }
murgatroid99e339f6f2015-09-23 11:38:39 -0700154 GPR_ASSERT(current_threads <= max_queue_threads);
155 GPR_ASSERT((current_threads == max_queue_threads) ||
156 (waiting_next_calls == 0));
murgatroid992b097832015-09-17 13:56:25 -0700157 Nan::Callback *callback = GetTagCallback(result.tag);
158 Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
murgatroid991578c6a2015-02-11 16:19:55 -0800159 callback->Call(2, argv);
murgatroid99e5061512015-01-12 18:14:35 -0800160
Craig Tiller64be9f72015-05-04 14:53:51 -0700161 DestroyTag(result.tag);
murgatroid99d4d67ad2015-02-09 10:43:21 -0800162}
murgatroid99e5061512015-01-12 18:14:35 -0800163
murgatroid99d4d67ad2015-02-09 10:43:21 -0800164void CompletionQueueAsyncWorker::HandleErrorCallback() {
murgatroid99e339f6f2015-09-23 11:38:39 -0700165 if (waiting_next_calls > 0) {
166 waiting_next_calls -= 1;
167 // Old worker removed, new worker added. current_threads += 0
168 CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
murgatroid99b58f69d2015-09-23 11:44:01 -0700169 Nan::AsyncQueueWorker(worker);
murgatroid99e339f6f2015-09-23 11:38:39 -0700170 } else {
171 current_threads -= 1;
172 }
173 GPR_ASSERT(current_threads <= max_queue_threads);
174 GPR_ASSERT((current_threads == max_queue_threads) ||
175 (waiting_next_calls == 0));
murgatroid992b097832015-09-17 13:56:25 -0700176 Nan::HandleScope scope;
177 Nan::Callback *callback = GetTagCallback(result.tag);
178 Local<Value> argv[] = {Nan::Error(ErrorMessage())};
murgatroid99d4d67ad2015-02-09 10:43:21 -0800179
murgatroid991578c6a2015-02-11 16:19:55 -0800180 callback->Call(1, argv);
murgatroid9977659062015-02-11 09:26:25 -0800181
Craig Tiller64be9f72015-05-04 14:53:51 -0700182 DestroyTag(result.tag);
murgatroid99e5061512015-01-12 18:14:35 -0800183}
184
murgatroid99013d2032016-11-01 13:05:24 -0700185} // namespace
186
187grpc_completion_queue *GetCompletionQueue() {
188 return CompletionQueueAsyncWorker::GetQueue();
189}
190
191void CompletionQueueNext() {
192 CompletionQueueAsyncWorker::Next();
193}
194
195void CompletionQueueInit(Local<Object> exports) {
196 CompletionQueueAsyncWorker::Init(exports);
197}
198
murgatroid99e5061512015-01-12 18:14:35 -0800199} // namespace node
200} // namespace grpc
murgatroid99013d2032016-11-01 13:05:24 -0700201
202#endif /* GRPC_UV */