blob: 08e3abe4abf4f248637f6cde753cf7222b6be5ab [file] [log] [blame]
murgatroid99ddaa69f2016-04-22 13:37:26 -07001#!/usr/bin/env ruby
2
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003# Copyright 2016 gRPC authors.
murgatroid99ddaa69f2016-04-22 13:37:26 -07004#
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
murgatroid99ddaa69f2016-04-22 13:37:26 -07008#
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009# http://www.apache.org/licenses/LICENSE-2.0
murgatroid99ddaa69f2016-04-22 13:37:26 -070010#
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
murgatroid99ddaa69f2016-04-22 13:37:26 -070016
17require 'optparse'
18require 'thread'
19require_relative '../pb/test/client'
20require_relative './metrics_server'
21require_relative '../lib/grpc'
22
23class QpsGauge < Gauge
24 @query_count
25 @query_mutex
26 @start_time
27
28 def initialize
29 @query_count = 0
30 @query_mutex = Mutex.new
31 @start_time = Time.now
32 end
33
34 def increment_queries
35 @query_mutex.synchronize { @query_count += 1}
36 end
37
38 def get_name
39 'qps'
40 end
41
42 def get_type
43 'long'
44 end
45
46 def get_value
47 (@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i
48 end
49end
50
51def start_metrics_server(port)
52 host = "0.0.0.0:#{port}"
53 server = GRPC::RpcServer.new
54 server.add_http2_port(host, :this_port_is_insecure)
55 service = MetricsServiceImpl.new
56 server.handle(service)
57 server_thread = Thread.new { server.run_till_terminated }
58 [server, service, server_thread]
59end
60
61StressArgs = Struct.new(:server_addresses, :test_cases, :duration,
62 :channels_per_server, :concurrent_calls, :metrics_port)
63
64def start(stress_args)
65 running = true
66 threads = []
67 qps_gauge = QpsGauge.new
68 metrics_server, metrics_service, metrics_thread =
69 start_metrics_server(stress_args.metrics_port)
70 metrics_service.register_gauge(qps_gauge)
71 stress_args.server_addresses.each do |address|
72 stress_args.channels_per_server.times do
73 client_args = Args.new
74 client_args.host, client_args.port = address.split(':')
75 client_args.secure = false
76 client_args.test_case = ''
77 stub = create_stub(client_args)
78 named_tests = NamedTests.new(stub, client_args)
79 stress_args.concurrent_calls.times do
80 threads << Thread.new do
81 while running
82 named_tests.method(stress_args.test_cases.sample).call
83 qps_gauge.increment_queries
84 end
85 end
86 end
87 end
88 end
89 if stress_args.duration >= 0
90 sleep stress_args.duration
91 running = false
92 metrics_server.stop
93 p "QPS: #{qps_gauge.get_value}"
94 threads.each { |thd| thd.join; }
95 end
96 metrics_thread.join
97end
98
99def parse_stress_args
100 stress_args = StressArgs.new
101 stress_args.server_addresses = ['localhost:8080']
102 stress_args.test_cases = []
103 stress_args.duration = -1
104 stress_args.channels_per_server = 1
105 stress_args.concurrent_calls = 1
106 stress_args.metrics_port = '8081'
107 OptionParser.new do |opts|
108 opts.on('--server_addresses [LIST]', Array) do |addrs|
109 stress_args.server_addresses = addrs
110 end
111 opts.on('--test_cases cases', Array) do |cases|
112 stress_args.test_cases = (cases.map do |item|
113 split = item.split(':')
114 [split[0]] * split[1].to_i
115 end).reduce([], :+)
116 end
117 opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time|
118 stress_args.duration = time
119 end
120 opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels|
121 stress_args.channels_per_server = channels
122 end
123 opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs|
124 stress_args.concurrent_calls = stubs
125 end
126 opts.on('--metrics_port [port]') do |port|
127 stress_args.metrics_port = port
128 end
129 end.parse!
130 stress_args
131end
132
133def main
134 opts = parse_stress_args
135 start(opts)
136end
137
138if __FILE__ == $0
139 main
140end