| /* |
| * |
| * Copyright 2014, Google Inc. |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| * |
| */ |
| |
| var grpc = require('bindings')('grpc.node'); |
| |
| var common = require('./common'); |
| |
| var Duplex = require('stream').Duplex; |
| var util = require('util'); |
| |
| util.inherits(GrpcClientStream, Duplex); |
| |
| /** |
| * Class for representing a gRPC client side stream as a Node stream. Extends |
| * from stream.Duplex. |
| * @constructor |
| * @param {grpc.Call} call Call object to proxy |
| * @param {object} options Stream options |
| */ |
| function GrpcClientStream(call, options) { |
| Duplex.call(this, options); |
| var self = this; |
| // Indicates that we can start reading and have not received a null read |
| var can_read = false; |
| // Indicates that a read is currently pending |
| var reading = false; |
| // Indicates that we can call startWrite |
| var can_write = false; |
| // Indicates that a write is currently pending |
| var writing = false; |
| this._call = call; |
| /** |
| * Callback to handle receiving a READ event. Pushes the data from that event |
| * onto the read queue and starts reading again if applicable. |
| * @param {grpc.Event} event The READ event object |
| */ |
| function readCallback(event) { |
| var data = event.data; |
| if (self.push(data)) { |
| if (data == null) { |
| // Disable starting to read after null read was received |
| can_read = false; |
| reading = false; |
| } else { |
| call.startRead(readCallback); |
| } |
| } else { |
| // Indicate that reading can be resumed by calling startReading |
| reading = false; |
| } |
| }; |
| /** |
| * Initiate a read, which continues until self.push returns false (indicating |
| * that reading should be paused) or data is null (indicating that there is no |
| * more data to read). |
| */ |
| function startReading() { |
| call.startRead(readCallback); |
| } |
| // TODO(mlumish): possibly change queue implementation due to shift slowness |
| var write_queue = []; |
| /** |
| * Write the next chunk of data in the write queue if there is one. Otherwise |
| * indicate that there is no pending write. When the write succeeds, this |
| * function is called again. |
| */ |
| function writeNext() { |
| if (write_queue.length > 0) { |
| writing = true; |
| var next = write_queue.shift(); |
| var writeCallback = function(event) { |
| next.callback(); |
| writeNext(); |
| }; |
| call.startWrite(next.chunk, writeCallback, 0); |
| } else { |
| writing = false; |
| } |
| } |
| call.startInvoke(function(event) { |
| can_read = true; |
| can_write = true; |
| startReading(); |
| writeNext(); |
| }, function(event) { |
| self.emit('metadata', event.data); |
| }, function(event) { |
| self.emit('status', event.data); |
| }, 0); |
| this.on('finish', function() { |
| call.writesDone(function() {}); |
| }); |
| /** |
| * Indicate that reads should start, and start them if the INVOKE_ACCEPTED |
| * event has been received. |
| */ |
| this._enableRead = function() { |
| if (!reading) { |
| reading = true; |
| if (can_read) { |
| startReading(); |
| } |
| } |
| }; |
| /** |
| * Push the chunk onto the write queue, and write from the write queue if |
| * there is not a pending write |
| * @param {Buffer} chunk The chunk of data to write |
| * @param {function(Error=)} callback The callback to call when the write |
| * completes |
| */ |
| this._tryWrite = function(chunk, callback) { |
| write_queue.push({chunk: chunk, callback: callback}); |
| if (can_write && !writing) { |
| writeNext(); |
| } |
| }; |
| } |
| |
| /** |
| * Start reading. This is an implementation of a method needed for implementing |
| * stream.Readable. |
| * @param {number} size Ignored |
| */ |
| GrpcClientStream.prototype._read = function(size) { |
| this._enableRead(); |
| }; |
| |
| /** |
| * Attempt to write the given chunk. Calls the callback when done. This is an |
| * implementation of a method needed for implementing stream.Writable. |
| * @param {Buffer} chunk The chunk to write |
| * @param {string} encoding Ignored |
| * @param {function(Error=)} callback Ignored |
| */ |
| GrpcClientStream.prototype._write = function(chunk, encoding, callback) { |
| this._tryWrite(chunk, callback); |
| }; |
| |
| /** |
| * Make a request on the channel to the given method with the given arguments |
| * @param {grpc.Channel} channel The channel on which to make the request |
| * @param {string} method The method to request |
| * @param {array=} metadata Array of metadata key/value pairs to add to the call |
| * @param {(number|Date)=} deadline The deadline for processing this request. |
| * Defaults to infinite future. |
| * @return {stream=} The stream of responses |
| */ |
| function makeRequest(channel, |
| method, |
| metadata, |
| deadline) { |
| if (deadline === undefined) { |
| deadline = Infinity; |
| } |
| var call = new grpc.Call(channel, method, deadline); |
| if (metadata) { |
| call.addMetadata(metadata); |
| } |
| return new GrpcClientStream(call); |
| } |
| |
| /** |
| * See documentation for makeRequest above |
| */ |
| exports.makeRequest = makeRequest; |
| |
| /** |
| * Represents a client side gRPC channel associated with a single host. |
| */ |
| exports.Channel = grpc.Channel; |
| /** |
| * Status name to code number mapping |
| */ |
| exports.status = grpc.status; |
| /** |
| * Call error name to code number mapping |
| */ |
| exports.callError = grpc.callError; |