blob: edaa115d0fc30b183c7899dfc6ff820c151c3f7c [file] [log] [blame]
murgatroid99749666e2015-01-12 18:25:58 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid99e5061512015-01-12 18:14:35 -080034var grpc = require('bindings')('grpc.node');
35
36var common = require('./common');
37
38var Duplex = require('stream').Duplex;
39var util = require('util');
40
41util.inherits(GrpcClientStream, Duplex);
42
43/**
44 * Class for representing a gRPC client side stream as a Node stream. Extends
45 * from stream.Duplex.
46 * @constructor
47 * @param {grpc.Call} call Call object to proxy
48 * @param {object} options Stream options
49 */
50function GrpcClientStream(call, options) {
51 Duplex.call(this, options);
52 var self = this;
53 // Indicates that we can start reading and have not received a null read
54 var can_read = false;
55 // Indicates that a read is currently pending
56 var reading = false;
57 // Indicates that we can call startWrite
58 var can_write = false;
59 // Indicates that a write is currently pending
60 var writing = false;
61 this._call = call;
62 /**
63 * Callback to handle receiving a READ event. Pushes the data from that event
64 * onto the read queue and starts reading again if applicable.
65 * @param {grpc.Event} event The READ event object
66 */
67 function readCallback(event) {
68 var data = event.data;
69 if (self.push(data)) {
70 if (data == null) {
71 // Disable starting to read after null read was received
72 can_read = false;
73 reading = false;
74 } else {
75 call.startRead(readCallback);
76 }
77 } else {
78 // Indicate that reading can be resumed by calling startReading
79 reading = false;
80 }
81 };
82 /**
83 * Initiate a read, which continues until self.push returns false (indicating
84 * that reading should be paused) or data is null (indicating that there is no
85 * more data to read).
86 */
87 function startReading() {
88 call.startRead(readCallback);
89 }
90 // TODO(mlumish): possibly change queue implementation due to shift slowness
91 var write_queue = [];
92 /**
93 * Write the next chunk of data in the write queue if there is one. Otherwise
94 * indicate that there is no pending write. When the write succeeds, this
95 * function is called again.
96 */
97 function writeNext() {
98 if (write_queue.length > 0) {
99 writing = true;
100 var next = write_queue.shift();
101 var writeCallback = function(event) {
102 next.callback();
103 writeNext();
104 };
105 call.startWrite(next.chunk, writeCallback, 0);
106 } else {
107 writing = false;
108 }
109 }
110 call.startInvoke(function(event) {
111 can_read = true;
112 can_write = true;
113 startReading();
114 writeNext();
115 }, function(event) {
116 self.emit('metadata', event.data);
117 }, function(event) {
118 self.emit('status', event.data);
119 }, 0);
120 this.on('finish', function() {
121 call.writesDone(function() {});
122 });
123 /**
124 * Indicate that reads should start, and start them if the INVOKE_ACCEPTED
125 * event has been received.
126 */
127 this._enableRead = function() {
128 if (!reading) {
129 reading = true;
130 if (can_read) {
131 startReading();
132 }
133 }
134 };
135 /**
136 * Push the chunk onto the write queue, and write from the write queue if
137 * there is not a pending write
138 * @param {Buffer} chunk The chunk of data to write
139 * @param {function(Error=)} callback The callback to call when the write
140 * completes
141 */
142 this._tryWrite = function(chunk, callback) {
143 write_queue.push({chunk: chunk, callback: callback});
144 if (can_write && !writing) {
145 writeNext();
146 }
147 };
148}
149
150/**
151 * Start reading. This is an implementation of a method needed for implementing
152 * stream.Readable.
153 * @param {number} size Ignored
154 */
155GrpcClientStream.prototype._read = function(size) {
156 this._enableRead();
157};
158
159/**
160 * Attempt to write the given chunk. Calls the callback when done. This is an
161 * implementation of a method needed for implementing stream.Writable.
162 * @param {Buffer} chunk The chunk to write
163 * @param {string} encoding Ignored
164 * @param {function(Error=)} callback Ignored
165 */
166GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
167 this._tryWrite(chunk, callback);
168};
169
170/**
171 * Make a request on the channel to the given method with the given arguments
172 * @param {grpc.Channel} channel The channel on which to make the request
173 * @param {string} method The method to request
174 * @param {array=} metadata Array of metadata key/value pairs to add to the call
175 * @param {(number|Date)=} deadline The deadline for processing this request.
176 * Defaults to infinite future.
177 * @return {stream=} The stream of responses
178 */
179function makeRequest(channel,
180 method,
181 metadata,
182 deadline) {
183 if (deadline === undefined) {
184 deadline = Infinity;
185 }
186 var call = new grpc.Call(channel, method, deadline);
187 if (metadata) {
188 call.addMetadata(metadata);
189 }
190 return new GrpcClientStream(call);
191}
192
193/**
194 * See documentation for makeRequest above
195 */
196exports.makeRequest = makeRequest;
197
198/**
199 * Represents a client side gRPC channel associated with a single host.
200 */
201exports.Channel = grpc.Channel;
202/**
203 * Status name to code number mapping
204 */
205exports.status = grpc.status;
206/**
207 * Call error name to code number mapping
208 */
209exports.callError = grpc.callError;