blob: 0c6238076810c7729c61434be157d9f3ffe39a67 [file] [log] [blame]
Jan Tattermusch445a82b2016-04-21 13:18:26 -07001#region Copyright notice and license
2
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003// Copyright 2015-2016 gRPC authors.
Jan Tattermusch445a82b2016-04-21 13:18:26 -07004//
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
Jan Tattermusch445a82b2016-04-21 13:18:26 -07008//
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009// http://www.apache.org/licenses/LICENSE-2.0
Jan Tattermusch445a82b2016-04-21 13:18:26 -070010//
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
Jan Tattermusch445a82b2016-04-21 13:18:26 -070016
17#endregion
18
19using System;
20using System.Collections.Generic;
21using System.Diagnostics;
22using System.Linq;
23using System.Threading;
24using System.Threading.Tasks;
25
26using CommandLine;
27using CommandLine.Text;
28using Grpc.Core;
29using Grpc.Core.Logging;
30using Grpc.Core.Utils;
31using Grpc.Testing;
32
33namespace Grpc.IntegrationTesting
34{
35 public class StressTestClient
36 {
37 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<StressTestClient>();
38 const double SecondsToNanos = 1e9;
39
40 private class ClientOptions
41 {
Jan Tattermusch678ec902016-08-08 12:54:16 +080042 [Option("server_addresses", Default = "localhost:8080")]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070043 public string ServerAddresses { get; set; }
44
Jan Tattermusch678ec902016-08-08 12:54:16 +080045 [Option("test_cases", Default = "large_unary:100")]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070046 public string TestCases { get; set; }
47
Jan Tattermusch678ec902016-08-08 12:54:16 +080048 [Option("test_duration_secs", Default = -1)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070049 public int TestDurationSecs { get; set; }
50
Jan Tattermusch678ec902016-08-08 12:54:16 +080051 [Option("num_channels_per_server", Default = 1)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070052 public int NumChannelsPerServer { get; set; }
53
Jan Tattermusch678ec902016-08-08 12:54:16 +080054 [Option("num_stubs_per_channel", Default = 1)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070055 public int NumStubsPerChannel { get; set; }
56
Jan Tattermusch678ec902016-08-08 12:54:16 +080057 [Option("metrics_port", Default = 8081)]
Jan Tattermusch445a82b2016-04-21 13:18:26 -070058 public int MetricsPort { get; set; }
Jan Tattermusch445a82b2016-04-21 13:18:26 -070059 }
60
61 ClientOptions options;
62 List<string> serverAddresses;
63 Dictionary<string, int> weightedTestCases;
64 WeightedRandomGenerator testCaseGenerator;
65
66 // cancellation will be emitted once test_duration_secs has elapsed.
67 CancellationTokenSource finishedTokenSource = new CancellationTokenSource();
68 Histogram histogram = new Histogram(0.01, 60 * SecondsToNanos);
69
70 private StressTestClient(ClientOptions options, List<string> serverAddresses, Dictionary<string, int> weightedTestCases)
71 {
72 this.options = options;
73 this.serverAddresses = serverAddresses;
74 this.weightedTestCases = weightedTestCases;
75 this.testCaseGenerator = new WeightedRandomGenerator(this.weightedTestCases);
76 }
77
78 public static void Run(string[] args)
79 {
Jan Tattermusch66f85782017-02-24 19:44:16 +010080 GrpcEnvironment.SetLogger(new ConsoleLogger());
Jan Tattermusch90cbde82016-08-08 14:33:28 +080081 var parserResult = Parser.Default.ParseArguments<ClientOptions>(args)
82 .WithNotParsed((x) => Environment.Exit(1))
83 .WithParsed(options => {
84 GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0);
85 GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0);
Jan Tattermusch445a82b2016-04-21 13:18:26 -070086
Jan Tattermusch90cbde82016-08-08 14:33:28 +080087 var serverAddresses = options.ServerAddresses.Split(',');
88 GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address");
Jan Tattermusch445a82b2016-04-21 13:18:26 -070089
Jan Tattermusch90cbde82016-08-08 14:33:28 +080090 var testCases = ParseWeightedTestCases(options.TestCases);
91 GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case");
Jan Tattermusch445a82b2016-04-21 13:18:26 -070092
Jan Tattermusch90cbde82016-08-08 14:33:28 +080093 var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases);
94 interopClient.Run().Wait();
95 });
Jan Tattermusch445a82b2016-04-21 13:18:26 -070096 }
97
98 async Task Run()
99 {
100 var metricsServer = new Server()
101 {
102 Services = { MetricsService.BindService(new MetricsServiceImpl(histogram)) },
103 Ports = { { "[::]", options.MetricsPort, ServerCredentials.Insecure } }
104 };
105 metricsServer.Start();
106
107 if (options.TestDurationSecs >= 0)
108 {
109 finishedTokenSource.CancelAfter(TimeSpan.FromSeconds(options.TestDurationSecs));
110 }
111
112 var tasks = new List<Task>();
113 var channels = new List<Channel>();
114 foreach (var serverAddress in serverAddresses)
115 {
116 for (int i = 0; i < options.NumChannelsPerServer; i++)
117 {
118 var channel = new Channel(serverAddress, ChannelCredentials.Insecure);
119 channels.Add(channel);
120 for (int j = 0; j < options.NumStubsPerChannel; j++)
121 {
Jan Tattermuschf41ebc32016-06-22 12:47:14 -0700122 var client = new TestService.TestServiceClient(channel);
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700123 var task = Task.Factory.StartNew(() => RunBodyAsync(client).GetAwaiter().GetResult(),
124 TaskCreationOptions.LongRunning);
125 tasks.Add(task);
126 }
127 }
128 }
129 await Task.WhenAll(tasks);
130
131 foreach (var channel in channels)
132 {
133 await channel.ShutdownAsync();
134 }
135
136 await metricsServer.ShutdownAsync();
137 }
138
139 async Task RunBodyAsync(TestService.TestServiceClient client)
140 {
141 Logger.Info("Starting stress test client thread.");
142 while (!finishedTokenSource.Token.IsCancellationRequested)
143 {
144 var testCase = testCaseGenerator.GetNext();
145
146 var stopwatch = Stopwatch.StartNew();
147
148 await RunTestCaseAsync(client, testCase);
149
150 stopwatch.Stop();
151 histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
152 }
153 Logger.Info("Stress test client thread finished.");
154 }
155
156 async Task RunTestCaseAsync(TestService.TestServiceClient client, string testCase)
157 {
158 switch (testCase)
159 {
160 case "empty_unary":
161 InteropClient.RunEmptyUnary(client);
162 break;
163 case "large_unary":
164 InteropClient.RunLargeUnary(client);
165 break;
166 case "client_streaming":
167 await InteropClient.RunClientStreamingAsync(client);
168 break;
169 case "server_streaming":
170 await InteropClient.RunServerStreamingAsync(client);
171 break;
172 case "ping_pong":
173 await InteropClient.RunPingPongAsync(client);
174 break;
175 case "empty_stream":
176 await InteropClient.RunEmptyStreamAsync(client);
177 break;
178 case "cancel_after_begin":
179 await InteropClient.RunCancelAfterBeginAsync(client);
180 break;
181 case "cancel_after_first_response":
182 await InteropClient.RunCancelAfterFirstResponseAsync(client);
183 break;
184 case "timeout_on_sleeping_server":
185 await InteropClient.RunTimeoutOnSleepingServerAsync(client);
186 break;
187 case "custom_metadata":
188 await InteropClient.RunCustomMetadataAsync(client);
189 break;
190 case "status_code_and_message":
191 await InteropClient.RunStatusCodeAndMessageAsync(client);
192 break;
193 default:
194 throw new ArgumentException("Unsupported test case " + testCase);
195 }
196 }
197
198 static Dictionary<string, int> ParseWeightedTestCases(string weightedTestCases)
199 {
200 var result = new Dictionary<string, int>();
201 foreach (var weightedTestCase in weightedTestCases.Split(','))
202 {
203 var parts = weightedTestCase.Split(new char[] {':'}, 2);
204 GrpcPreconditions.CheckArgument(parts.Length == 2, "Malformed test_cases option.");
205 result.Add(parts[0], int.Parse(parts[1]));
206 }
207 return result;
208 }
209
210 class WeightedRandomGenerator
211 {
212 readonly Random random = new Random();
213 readonly List<Tuple<int, string>> cumulativeSums;
214 readonly int weightSum;
215
216 public WeightedRandomGenerator(Dictionary<string, int> weightedItems)
217 {
218 cumulativeSums = new List<Tuple<int, string>>();
219 weightSum = 0;
220 foreach (var entry in weightedItems)
221 {
222 weightSum += entry.Value;
223 cumulativeSums.Add(Tuple.Create(weightSum, entry.Key));
224 }
225 }
226
227 public string GetNext()
228 {
229 int rand = random.Next(weightSum);
230 foreach (var entry in cumulativeSums)
231 {
232 if (rand < entry.Item1)
233 {
234 return entry.Item2;
235 }
236 }
237 throw new InvalidOperationException("GetNext() failed.");
238 }
239 }
240
241 class MetricsServiceImpl : MetricsService.MetricsServiceBase
242 {
Jan Tattermuschf4bc99e2016-04-25 09:13:38 -0700243 const string GaugeName = "csharp_overall_qps";
244
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700245 readonly Histogram histogram;
Jan Tattermusch32671082017-11-30 09:06:20 +0100246 readonly TimeStats timeStats = new TimeStats();
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700247
248 public MetricsServiceImpl(Histogram histogram)
249 {
250 this.histogram = histogram;
251 }
252
Jan Tattermuschf4bc99e2016-04-25 09:13:38 -0700253 public override Task<GaugeResponse> GetGauge(GaugeRequest request, ServerCallContext context)
254 {
255 if (request.Name == GaugeName)
256 {
257 long qps = GetQpsAndReset();
258
259 return Task.FromResult(new GaugeResponse
260 {
261 Name = GaugeName,
262 LongValue = qps
263 });
264 }
265 throw new RpcException(new Status(StatusCode.InvalidArgument, "Gauge does not exist"));
266 }
267
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700268 public override async Task GetAllGauges(EmptyMessage request, IServerStreamWriter<GaugeResponse> responseStream, ServerCallContext context)
269 {
Jan Tattermuschf4bc99e2016-04-25 09:13:38 -0700270 long qps = GetQpsAndReset();
271
272 var response = new GaugeResponse
273 {
274 Name = GaugeName,
275 LongValue = qps
276 };
277 await responseStream.WriteAsync(response);
278 }
279
280 long GetQpsAndReset()
281 {
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700282 var snapshot = histogram.GetSnapshot(true);
Jan Tattermusch32671082017-11-30 09:06:20 +0100283 var timeSnapshot = timeStats.GetSnapshot(true);
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700284
Jan Tattermusch32671082017-11-30 09:06:20 +0100285 return (long) (snapshot.Count / timeSnapshot.WallClockTime.TotalSeconds);
Jan Tattermusch445a82b2016-04-21 13:18:26 -0700286 }
287 }
288 }
289}