blob: dbf8844265d643c02608a99f46792fcdd0356ce0 [file] [log] [blame]
Jan Tattermusch445a82b2016-04-21 13:18:26 -07001#region Copyright notice and license
2
3// Copyright 2015-2016, Google Inc.
4// All rights reserved.
5//
6// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are
8// met:
9//
10// * Redistributions of source code must retain the above copyright
11// notice, this list of conditions and the following disclaimer.
12// * Redistributions in binary form must reproduce the above
13// copyright notice, this list of conditions and the following disclaimer
14// in the documentation and/or other materials provided with the
15// distribution.
16// * Neither the name of Google Inc. nor the names of its
17// contributors may be used to endorse or promote products derived from
18// this software without specific prior written permission.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32#endregion
33
34using System;
35using System.Collections.Generic;
36using System.Diagnostics;
37using System.Linq;
38using System.Threading;
39using System.Threading.Tasks;
40
41using CommandLine;
42using CommandLine.Text;
43using Grpc.Core;
44using Grpc.Core.Logging;
45using Grpc.Core.Utils;
46using Grpc.Testing;
47
48namespace Grpc.IntegrationTesting
49{
50 public class StressTestClient
51 {
52 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<StressTestClient>();
53 const double SecondsToNanos = 1e9;
54
55 private class ClientOptions
56 {
Jan Tattermusch678ec902016-08-08 12:54:16 +080057 [Option("server_addresses", Default = "localhost:8080")]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070058 public string ServerAddresses { get; set; }
59
Jan Tattermusch678ec902016-08-08 12:54:16 +080060 [Option("test_cases", Default = "large_unary:100")]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070061 public string TestCases { get; set; }
62
Jan Tattermusch678ec902016-08-08 12:54:16 +080063 [Option("test_duration_secs", Default = -1)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070064 public int TestDurationSecs { get; set; }
65
Jan Tattermusch678ec902016-08-08 12:54:16 +080066 [Option("num_channels_per_server", Default = 1)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070067 public int NumChannelsPerServer { get; set; }
68
Jan Tattermusch678ec902016-08-08 12:54:16 +080069 [Option("num_stubs_per_channel", Default = 1)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070070 public int NumStubsPerChannel { get; set; }
71
Jan Tattermusch678ec902016-08-08 12:54:16 +080072 [Option("metrics_port", Default = 8081)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070073 public int MetricsPort { get; set; }
Jan Tattermusch445a82b2016-04-21 13:18:26 -070074 }
75
76 ClientOptions options;
77 List<string> serverAddresses;
78 Dictionary<string, int> weightedTestCases;
79 WeightedRandomGenerator testCaseGenerator;
80
81 // cancellation will be emitted once test_duration_secs has elapsed.
82 CancellationTokenSource finishedTokenSource = new CancellationTokenSource();
83 Histogram histogram = new Histogram(0.01, 60 * SecondsToNanos);
84
85 private StressTestClient(ClientOptions options, List<string> serverAddresses, Dictionary<string, int> weightedTestCases)
86 {
87 this.options = options;
88 this.serverAddresses = serverAddresses;
89 this.weightedTestCases = weightedTestCases;
90 this.testCaseGenerator = new WeightedRandomGenerator(this.weightedTestCases);
91 }
92
93 public static void Run(string[] args)
94 {
Jan Tattermusch66f85782017-02-24 19:44:16 +010095 GrpcEnvironment.SetLogger(new ConsoleLogger());
Jan Tattermusch90cbde82016-08-08 14:33:28 +080096 var parserResult = Parser.Default.ParseArguments<ClientOptions>(args)
97 .WithNotParsed((x) => Environment.Exit(1))
98 .WithParsed(options => {
99 GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0);
100 GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0);
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700101
Jan Tattermusch90cbde82016-08-08 14:33:28 +0800102 var serverAddresses = options.ServerAddresses.Split(',');
103 GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address");
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700104
Jan Tattermusch90cbde82016-08-08 14:33:28 +0800105 var testCases = ParseWeightedTestCases(options.TestCases);
106 GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case");
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700107
Jan Tattermusch90cbde82016-08-08 14:33:28 +0800108 var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases);
109 interopClient.Run().Wait();
110 });
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700111 }
112
113 async Task Run()
114 {
115 var metricsServer = new Server()
116 {
117 Services = { MetricsService.BindService(new MetricsServiceImpl(histogram)) },
118 Ports = { { "[::]", options.MetricsPort, ServerCredentials.Insecure } }
119 };
120 metricsServer.Start();
121
122 if (options.TestDurationSecs >= 0)
123 {
124 finishedTokenSource.CancelAfter(TimeSpan.FromSeconds(options.TestDurationSecs));
125 }
126
127 var tasks = new List<Task>();
128 var channels = new List<Channel>();
129 foreach (var serverAddress in serverAddresses)
130 {
131 for (int i = 0; i < options.NumChannelsPerServer; i++)
132 {
133 var channel = new Channel(serverAddress, ChannelCredentials.Insecure);
134 channels.Add(channel);
135 for (int j = 0; j < options.NumStubsPerChannel; j++)
136 {
Jan Tattermuschf41ebc32016-06-22 12:47:14 -0700137 var client = new TestService.TestServiceClient(channel);
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700138 var task = Task.Factory.StartNew(() => RunBodyAsync(client).GetAwaiter().GetResult(),
139 TaskCreationOptions.LongRunning);
140 tasks.Add(task);
141 }
142 }
143 }
144 await Task.WhenAll(tasks);
145
146 foreach (var channel in channels)
147 {
148 await channel.ShutdownAsync();
149 }
150
151 await metricsServer.ShutdownAsync();
152 }
153
154 async Task RunBodyAsync(TestService.TestServiceClient client)
155 {
156 Logger.Info("Starting stress test client thread.");
157 while (!finishedTokenSource.Token.IsCancellationRequested)
158 {
159 var testCase = testCaseGenerator.GetNext();
160
161 var stopwatch = Stopwatch.StartNew();
162
163 await RunTestCaseAsync(client, testCase);
164
165 stopwatch.Stop();
166 histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
167 }
168 Logger.Info("Stress test client thread finished.");
169 }
170
171 async Task RunTestCaseAsync(TestService.TestServiceClient client, string testCase)
172 {
173 switch (testCase)
174 {
175 case "empty_unary":
176 InteropClient.RunEmptyUnary(client);
177 break;
178 case "large_unary":
179 InteropClient.RunLargeUnary(client);
180 break;
181 case "client_streaming":
182 await InteropClient.RunClientStreamingAsync(client);
183 break;
184 case "server_streaming":
185 await InteropClient.RunServerStreamingAsync(client);
186 break;
187 case "ping_pong":
188 await InteropClient.RunPingPongAsync(client);
189 break;
190 case "empty_stream":
191 await InteropClient.RunEmptyStreamAsync(client);
192 break;
193 case "cancel_after_begin":
194 await InteropClient.RunCancelAfterBeginAsync(client);
195 break;
196 case "cancel_after_first_response":
197 await InteropClient.RunCancelAfterFirstResponseAsync(client);
198 break;
199 case "timeout_on_sleeping_server":
200 await InteropClient.RunTimeoutOnSleepingServerAsync(client);
201 break;
202 case "custom_metadata":
203 await InteropClient.RunCustomMetadataAsync(client);
204 break;
205 case "status_code_and_message":
206 await InteropClient.RunStatusCodeAndMessageAsync(client);
207 break;
208 default:
209 throw new ArgumentException("Unsupported test case " + testCase);
210 }
211 }
212
213 static Dictionary<string, int> ParseWeightedTestCases(string weightedTestCases)
214 {
215 var result = new Dictionary<string, int>();
216 foreach (var weightedTestCase in weightedTestCases.Split(','))
217 {
218 var parts = weightedTestCase.Split(new char[] {':'}, 2);
219 GrpcPreconditions.CheckArgument(parts.Length == 2, "Malformed test_cases option.");
220 result.Add(parts[0], int.Parse(parts[1]));
221 }
222 return result;
223 }
224
225 class WeightedRandomGenerator
226 {
227 readonly Random random = new Random();
228 readonly List<Tuple<int, string>> cumulativeSums;
229 readonly int weightSum;
230
231 public WeightedRandomGenerator(Dictionary<string, int> weightedItems)
232 {
233 cumulativeSums = new List<Tuple<int, string>>();
234 weightSum = 0;
235 foreach (var entry in weightedItems)
236 {
237 weightSum += entry.Value;
238 cumulativeSums.Add(Tuple.Create(weightSum, entry.Key));
239 }
240 }
241
242 public string GetNext()
243 {
244 int rand = random.Next(weightSum);
245 foreach (var entry in cumulativeSums)
246 {
247 if (rand < entry.Item1)
248 {
249 return entry.Item2;
250 }
251 }
252 throw new InvalidOperationException("GetNext() failed.");
253 }
254 }
255
256 class MetricsServiceImpl : MetricsService.MetricsServiceBase
257 {
Jan Tattermuschf4bc99e2016-04-25 09:13:38 -0700258 const string GaugeName = "csharp_overall_qps";
259
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700260 readonly Histogram histogram;
261 readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
262
263 public MetricsServiceImpl(Histogram histogram)
264 {
265 this.histogram = histogram;
266 }
267
Jan Tattermuschf4bc99e2016-04-25 09:13:38 -0700268 public override Task<GaugeResponse> GetGauge(GaugeRequest request, ServerCallContext context)
269 {
270 if (request.Name == GaugeName)
271 {
272 long qps = GetQpsAndReset();
273
274 return Task.FromResult(new GaugeResponse
275 {
276 Name = GaugeName,
277 LongValue = qps
278 });
279 }
280 throw new RpcException(new Status(StatusCode.InvalidArgument, "Gauge does not exist"));
281 }
282
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700283 public override async Task GetAllGauges(EmptyMessage request, IServerStreamWriter<GaugeResponse> responseStream, ServerCallContext context)
284 {
Jan Tattermuschf4bc99e2016-04-25 09:13:38 -0700285 long qps = GetQpsAndReset();
286
287 var response = new GaugeResponse
288 {
289 Name = GaugeName,
290 LongValue = qps
291 };
292 await responseStream.WriteAsync(response);
293 }
294
295 long GetQpsAndReset()
296 {
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700297 var snapshot = histogram.GetSnapshot(true);
298 var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true);
299
Jan Tattermuschd151dcb2016-05-11 13:26:15 -0700300 return (long) (snapshot.Count / elapsedSnapshot.TotalSeconds);
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700301 }
302 }
303 }
304}