blob: e947032b297c6d30a0e94752fced527f2ed1806c [file] [log] [blame]
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var _ = require('underscore');
var grpc = require('bindings')('grpc.node');
var common = require('./common');
var Duplex = require('stream').Duplex;
var util = require('util');
util.inherits(GrpcServerStream, Duplex);
/**
* Class for representing a gRPC server side stream as a Node stream. Extends
* from stream.Duplex.
* @constructor
* @param {grpc.Call} call Call object to proxy
* @param {object} options Stream options
*/
function GrpcServerStream(call, options) {
Duplex.call(this, options);
this._call = call;
// Indicate that a status has been sent
var finished = false;
var self = this;
var status = {
'code' : grpc.status.OK,
'details' : 'OK'
};
/**
* Send the pending status
*/
function sendStatus() {
call.startWriteStatus(status.code, status.details, function() {
});
finished = true;
}
this.on('finish', sendStatus);
/**
* Set the pending status to a given error status. If the error does not have
* code or details properties, the code will be set to grpc.status.INTERNAL
* and the details will be set to 'Unknown Error'.
* @param {Error} err The error object
*/
function setStatus(err) {
console.log('Server setting status to', err);
var code = grpc.status.INTERNAL;
var details = 'Unknown Error';
if (err.hasOwnProperty('code')) {
code = err.code;
if (err.hasOwnProperty('details')) {
details = err.details;
}
}
status = {'code': code, 'details': details};
}
/**
* Terminate the call. This includes indicating that reads are done, draining
* all pending writes, and sending the given error as a status
* @param {Error} err The error object
* @this GrpcServerStream
*/
function terminateCall(err) {
// Drain readable data
this.on('data', function() {});
setStatus(err);
this.end();
}
this.on('error', terminateCall);
// Indicates that a read is pending
var reading = false;
/**
* Callback to be called when a READ event is received. Pushes the data onto
* the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object
*/
function readCallback(event) {
if (finished) {
self.push(null);
return;
}
var data = event.data;
if (self.push(data) && data != null) {
self._call.startRead(readCallback);
} else {
reading = false;
}
}
/**
* Start reading if there is not already a pending read. Reading will
* continue until self.push returns false (indicating reads should slow
* down) or the read data is null (indicating that there is no more data).
*/
this.startReading = function() {
if (finished) {
self.push(null);
} else {
if (!reading) {
reading = true;
self._call.startRead(readCallback);
}
}
};
}
/**
* Start reading from the gRPC data source. This is an implementation of a
* method required for implementing stream.Readable
* @param {number} size Ignored
*/
GrpcServerStream.prototype._read = function(size) {
this.startReading();
};
/**
* Start writing a chunk of data. This is an implementation of a method required
* for implementing stream.Writable.
* @param {Buffer} chunk The chunk of data to write
* @param {string} encoding Ignored
* @param {function(Error=)} callback Callback to indicate that the write is
* complete
*/
GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
var self = this;
self._call.startWrite(chunk, function(event) {
callback();
}, 0);
};
/**
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
* @constructor
* @param {Array} options Options that should be passed to the internal server
* implementation
*/
function Server(options) {
this.handlers = {};
var handlers = this.handlers;
var server = new grpc.Server(options);
this._server = server;
var started = false;
/**
* Start the server and begin handling requests
* @this Server
*/
this.start = function() {
console.log('Server starting');
_.each(handlers, function(handler, handler_name) {
console.log('Serving', handler_name);
});
if (this.started) {
throw 'Server is already running';
}
server.start();
/**
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
* the requested method, use that handler to respond to the request. Then
* wait for the next request
* @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
*/
function handleNewCall(event) {
var call = event.call;
var data = event.data;
if (data == null) {
return;
}
server.requestCall(handleNewCall);
var handler = undefined;
var deadline = data.absolute_deadline;
var cancelled = false;
if (handlers.hasOwnProperty(data.method)) {
handler = handlers[data.method];
}
call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) {
cancelled = true;
}
}, 0);
call.serverEndInitialMetadata(0);
var stream = new GrpcServerStream(call);
Object.defineProperty(stream, 'cancelled', {
get: function() { return cancelled;}
});
try {
handler(stream, data.metadata);
} catch (e) {
stream.emit('error', e);
}
}
server.requestCall(handleNewCall);
};
/** Shuts down the server.
*/
this.shutdown = function() {
server.shutdown();
};
}
/**
* Registers a handler to handle the named method. Fails if there already is
* a handler for the given method. Returns true on success
* @param {string} name The name of the method that the provided function should
* handle/respond to.
* @param {function} handler Function that takes a stream of request values and
* returns a stream of response values
* @return {boolean} True if the handler was set. False if a handler was already
* set for that name.
*/
Server.prototype.register = function(name, handler) {
if (this.handlers.hasOwnProperty(name)) {
return false;
}
this.handlers[name] = handler;
return true;
};
/**
* Binds the server to the given port, with SSL enabled if secure is specified
* @param {string} port The port that the server should bind on, in the format
* "address:port"
* @param {boolean=} secure Whether the server should open a secure port
*/
Server.prototype.bind = function(port, secure) {
if (secure) {
this._server.addSecureHttp2Port(port);
} else {
this._server.addHttp2Port(port);
}
};
/**
* See documentation for Server
*/
module.exports = Server;