| /* |
| * |
| * Copyright 2015, Google Inc. |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| * |
| */ |
| |
| 'use strict'; |
| |
| var async = require('async'); |
| var fs = require('fs'); |
| var GoogleAuth = require('google-auth-library'); |
| var parseArgs = require('minimist'); |
| var strftime = require('strftime'); |
| var _ = require('underscore'); |
| var grpc = require('../..'); |
| var PROTO_PATH = __dirname + '/pubsub.proto'; |
| var pubsub = grpc.load(PROTO_PATH).tech.pubsub; |
| |
| function PubsubRunner(pub, sub, args) { |
| this.pub = pub; |
| this.sub = sub; |
| this.args = args; |
| } |
| |
| PubsubRunner.prototype.getTestTopicName = function() { |
| var base_name = '/topics/' + this.args.project_id + '/'; |
| if (this.args.topic_name) { |
| return base_name + this.args.topic_name; |
| } |
| var now_text = strftime('%Y%m%d%H%M%S%L'); |
| return base_name + process.env.USER + '-' + now_text; |
| }; |
| |
| PubsubRunner.prototype.getTestSubName = function() { |
| var base_name = '/subscriptions/' + this.args.project_id + '/'; |
| if (this.args.sub_name) { |
| return base_name + this.args.sub_name; |
| } |
| var now_text = strftime('%Y%m%d%H%M%S%L'); |
| return base_name + process.env.USER + '-' + now_text; |
| }; |
| |
| PubsubRunner.prototype.listProjectTopics = function(callback) { |
| var q = ('cloud.googleapis.com/project in (/projects/' + |
| this.args.project_id + ')'); |
| this.pub.listTopics({query: q}, callback); |
| }; |
| |
| PubsubRunner.prototype.topicExists = function(name, callback) { |
| this.listProjectTopics(function(err, response) { |
| if (err) { |
| callback(err); |
| } else { |
| callback(null, _.some(response.topic, function(t) { |
| return t.name === name; |
| })); |
| } |
| }); |
| }; |
| |
| PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) { |
| var self = this; |
| this.topicExists(name, function(err, exists) { |
| if (err) { |
| callback(err); |
| } else{ |
| if (exists) { |
| callback(null); |
| } else { |
| self.pub.createTopic({name: name}, callback); |
| } |
| } |
| }); |
| }; |
| |
| PubsubRunner.prototype.removeTopic = function(callback) { |
| var name = this.getTestTopicName(); |
| console.log('... removing Topic', name); |
| this.pub.deleteTopic({topic: name}, function(err, value) { |
| if (err) { |
| console.log('Could not delete a topic: rpc failed with', err); |
| callback(err); |
| } else { |
| console.log('removed Topic', name, 'OK'); |
| callback(null); |
| } |
| }); |
| }; |
| |
| PubsubRunner.prototype.createTopic = function(callback) { |
| var name = this.getTestTopicName(); |
| console.log('... creating Topic', name); |
| this.pub.createTopic({name: name}, function(err, value) { |
| if (err) { |
| console.log('Could not create a topic: rpc failed with', err); |
| callback(err); |
| } else { |
| console.log('created Topic', name, 'OK'); |
| callback(null); |
| } |
| }); |
| }; |
| |
| PubsubRunner.prototype.listSomeTopics = function(callback) { |
| console.log('Listing topics'); |
| console.log('-------------_'); |
| this.listProjectTopics(function(err, response) { |
| if (err) { |
| console.log('Could not list topic: rpc failed with', err); |
| callback(err); |
| } else { |
| _.each(response.topic, function(t) { |
| console.log(t.name); |
| }); |
| callback(null); |
| } |
| }); |
| }; |
| |
| PubsubRunner.prototype.checkExists = function(callback) { |
| var name = this.getTestTopicName(); |
| console.log('... checking for topic', name); |
| this.topicExists(name, function(err, exists) { |
| if (err) { |
| console.log('Could not check for a topics: rpc failed with', err); |
| callback(err); |
| } else { |
| if (exists) { |
| console.log(name, 'is a topic'); |
| } else { |
| console.log(name, 'is not a topic'); |
| } |
| callback(null); |
| } |
| }); |
| }; |
| |
| PubsubRunner.prototype.randomPubSub = function(callback) { |
| var self = this; |
| var topic_name = this.getTestTopicName(); |
| var sub_name = this.getTestSubName(); |
| var subscription = {name: sub_name, topic: topic_name}; |
| async.waterfall([ |
| _.bind(this.createTopicIfNeeded, this, topic_name), |
| _.bind(this.sub.createSubscription, this.sub, subscription), |
| function(resp, cb) { |
| var msg_count = _.random(10, 30); |
| // Set up msg_count messages to publish |
| var message_senders = _.times(msg_count, function(n) { |
| return _.bind(self.pub.publish, self.pub, { |
| topic: topic_name, |
| message: {data: new Buffer('message ' + n)} |
| }); |
| }); |
| async.parallel(message_senders, function(err, result) { |
| cb(err, result, msg_count); |
| }); |
| }, |
| function(result, msg_count, cb) { |
| console.log('Sent', msg_count, 'messages to', topic_name + ',', |
| 'checking for them now.'); |
| var batch_request = { |
| subscription: sub_name, |
| max_events: msg_count |
| }; |
| self.sub.pullBatch(batch_request, cb); |
| }, |
| function(batch, cb) { |
| var ack_id = _.pluck(batch.pull_responses, 'ack_id'); |
| console.log('Got', ack_id.length, 'messages, acknowledging them...'); |
| var ack_request = { |
| subscription: sub_name, |
| ack_id: ack_id |
| }; |
| self.sub.acknowledge(ack_request, cb); |
| }, |
| function(result, cb) { |
| console.log( |
| 'Test messages were acknowledged OK, deleting the subscription'); |
| self.sub.deleteSubscription({subscription: sub_name}, cb); |
| } |
| ], function (err, result) { |
| if (err) { |
| console.log('Could not do random pub sub: rpc failed with', err); |
| } |
| callback(err, result); |
| }); |
| }; |
| |
| function main(callback) { |
| var argv = parseArgs(process.argv, { |
| string: [ |
| 'host', |
| 'oauth_scope', |
| 'port', |
| 'action', |
| 'project_id', |
| 'topic_name', |
| 'sub_name' |
| ], |
| default: { |
| host: 'pubsub-staging.googleapis.com', |
| oauth_scope: 'https://www.googleapis.com/auth/pubsub', |
| port: 443, |
| action: 'listSomeTopics', |
| project_id: 'stoked-keyword-656' |
| } |
| }); |
| var valid_actions = [ |
| 'createTopic', |
| 'removeTopic', |
| 'listSomeTopics', |
| 'checkExists', |
| 'randomPubSub' |
| ]; |
| if (_.some(valid_actions, function(action) { |
| return action === argv.action; |
| })) { |
| callback(new Error('Action was not valid')); |
| } |
| var address = argv.host + ':' + argv.port; |
| (new GoogleAuth()).getApplicationDefault(function(err, credential) { |
| if (err) { |
| callback(err); |
| return; |
| } |
| if (credential.createScopedRequired()) { |
| credential = credential.createScoped(argv.oauth_scope); |
| } |
| var updateMetadata = grpc.getGoogleAuthDelegate(credential); |
| var ca_path = process.env.SSL_CERT_FILE; |
| fs.readFile(ca_path, function(err, ca_data) { |
| if (err) { |
| callback(err); |
| return; |
| } |
| var ssl_creds = grpc.Credentials.createSsl(ca_data); |
| var options = { |
| credentials: ssl_creds, |
| 'grpc.ssl_target_name_override': argv.host |
| }; |
| var pub = new pubsub.PublisherService(address, options, updateMetadata); |
| var sub = new pubsub.SubscriberService(address, options, updateMetadata); |
| var runner = new PubsubRunner(pub, sub, argv); |
| runner[argv.action](callback); |
| }); |
| }); |
| } |
| |
| if (require.main === module) { |
| main(function(err) { |
| if (err) { |
| throw err; |
| } |
| }); |
| } |
| |
| module.exports = PubsubRunner; |