blob: 698f9f1b871d50ec3ec72fc506106b30d07a74ef [file] [log] [blame]
#!/usr/bin/env ruby
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'optparse'
require 'thread'
require_relative '../pb/test/client'
require_relative './metrics_server'
require_relative '../lib/grpc'
class QpsGauge < Gauge
@query_count
@query_mutex
@start_time
def initialize
@query_count = 0
@query_mutex = Mutex.new
@start_time = Time.now
end
def increment_queries
@query_mutex.synchronize { @query_count += 1}
end
def get_name
'qps'
end
def get_type
'long'
end
def get_value
(@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i
end
end
def start_metrics_server(port)
host = "0.0.0.0:#{port}"
server = GRPC::RpcServer.new
server.add_http2_port(host, :this_port_is_insecure)
service = MetricsServiceImpl.new
server.handle(service)
server_thread = Thread.new { server.run_till_terminated }
[server, service, server_thread]
end
StressArgs = Struct.new(:server_addresses, :test_cases, :duration,
:channels_per_server, :concurrent_calls, :metrics_port)
def start(stress_args)
running = true
threads = []
qps_gauge = QpsGauge.new
metrics_server, metrics_service, metrics_thread =
start_metrics_server(stress_args.metrics_port)
metrics_service.register_gauge(qps_gauge)
stress_args.server_addresses.each do |address|
stress_args.channels_per_server.times do
client_args = Args.new
client_args.host, client_args.port = address.split(':')
client_args.secure = false
client_args.test_case = ''
stub = create_stub(client_args)
named_tests = NamedTests.new(stub, client_args)
stress_args.concurrent_calls.times do
threads << Thread.new do
while running
named_tests.method(stress_args.test_cases.sample).call
qps_gauge.increment_queries
end
end
end
end
end
if stress_args.duration >= 0
sleep stress_args.duration
running = false
metrics_server.stop
p "QPS: #{qps_gauge.get_value}"
threads.each { |thd| thd.join; }
end
metrics_thread.join
end
def parse_stress_args
stress_args = StressArgs.new
stress_args.server_addresses = ['localhost:8080']
stress_args.test_cases = []
stress_args.duration = -1
stress_args.channels_per_server = 1
stress_args.concurrent_calls = 1
stress_args.metrics_port = '8081'
OptionParser.new do |opts|
opts.on('--server_addresses [LIST]', Array) do |addrs|
stress_args.server_addresses = addrs
end
opts.on('--test_cases cases', Array) do |cases|
stress_args.test_cases = (cases.map do |item|
split = item.split(':')
[split[0]] * split[1].to_i
end).reduce([], :+)
end
opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time|
stress_args.duration = time
end
opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels|
stress_args.channels_per_server = channels
end
opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs|
stress_args.concurrent_calls = stubs
end
opts.on('--metrics_port [port]') do |port|
stress_args.metrics_port = port
end
end.parse!
stress_args
end
def main
opts = parse_stress_args
start(opts)
end
if __FILE__ == $0
main
end