blob: 48905a271540bf9ed3244a9ca135ef62667afb7b [file] [log] [blame]
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -07001#region Copyright notice and license
2
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003// Copyright 2015 gRPC authors.
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -07004//
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -07008//
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009// http://www.apache.org/licenses/LICENSE-2.0
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -070010//
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -070016
17#endregion
18
19using System;
Jan Tattermusch09c5e052016-05-18 22:24:27 -070020using System.Collections.Concurrent;
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -070021using System.Collections.Generic;
22using System.Diagnostics;
23using System.IO;
24using System.Linq;
25using System.Text.RegularExpressions;
26using System.Threading;
27using System.Threading.Tasks;
28using Google.Protobuf;
29using Grpc.Core;
Jan Tattermusch09c5e052016-05-18 22:24:27 -070030using Grpc.Core.Internal;
Jan Tattermusch3d6644a2016-03-21 09:46:15 -070031using Grpc.Core.Logging;
Jan Tattermusch09c5e052016-05-18 22:24:27 -070032using Grpc.Core.Profiling;
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -070033using Grpc.Core.Utils;
34using NUnit.Framework;
35using Grpc.Testing;
36
37namespace Grpc.IntegrationTesting
38{
39 /// <summary>
40 /// Helper methods to start client runners for performance testing.
41 /// </summary>
Jan Tattermusch3d6644a2016-03-21 09:46:15 -070042 public class ClientRunners
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -070043 {
Jan Tattermusch3d6644a2016-03-21 09:46:15 -070044 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();
45
Jan Tattermusch09c5e052016-05-18 22:24:27 -070046 // Profilers to use for clients.
47 static readonly BlockingCollection<BasicProfiler> profilers = new BlockingCollection<BasicProfiler>();
48
49 internal static void AddProfiler(BasicProfiler profiler)
50 {
51 GrpcPreconditions.CheckNotNull(profiler);
52 profilers.Add(profiler);
53 }
54
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -070055 /// <summary>
56 /// Creates a started client runner.
57 /// </summary>
58 public static IClientRunner CreateStarted(ClientConfig config)
59 {
Jan Tattermusch3d6644a2016-03-21 09:46:15 -070060 Logger.Debug("ClientConfig: {0}", config);
Jan Tattermusch3d6644a2016-03-21 09:46:15 -070061
Jan Tattermusch3d6644a2016-03-21 09:46:15 -070062 if (config.AsyncClientThreads != 0)
63 {
64 Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
65 }
66 if (config.CoreLimit != 0)
67 {
68 Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value");
69 }
70 if (config.CoreList.Count > 0)
71 {
72 Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
73 }
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -070074
Jan Tattermuschea6cc572017-11-16 18:04:37 +010075 var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams, config.ChannelArgs);
Jan Tattermusche81adf32016-04-04 15:59:50 -070076
77 return new ClientRunnerImpl(channels,
78 config.ClientType,
79 config.RpcType,
80 config.OutstandingRpcsPerChannel,
81 config.LoadParams,
82 config.PayloadConfig,
Jan Tattermusch09c5e052016-05-18 22:24:27 -070083 config.HistogramParams,
84 () => GetNextProfiler());
Jan Tattermusche81adf32016-04-04 15:59:50 -070085 }
86
Jan Tattermuschea6cc572017-11-16 18:04:37 +010087 private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams, IEnumerable<ChannelArg> channelArguments)
Jan Tattermusche81adf32016-04-04 15:59:50 -070088 {
89 GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1.");
90 GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified.");
91
92 var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
Jan Tattermuschea6cc572017-11-16 18:04:37 +010093 var channelOptions = new List<ChannelOption>();
Jan Tattermusche81adf32016-04-04 15:59:50 -070094 if (securityParams != null && securityParams.ServerHostOverride != "")
Jan Tattermusch3d6644a2016-03-21 09:46:15 -070095 {
Jan Tattermuschea6cc572017-11-16 18:04:37 +010096 channelOptions.Add(new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride));
97 }
98 foreach (var channelArgument in channelArguments)
99 {
100 channelOptions.Add(channelArgument.ToChannelOption());
Jan Tattermusch3d6644a2016-03-21 09:46:15 -0700101 }
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700102
Jan Tattermusche81adf32016-04-04 15:59:50 -0700103 var result = new List<Channel>();
104 for (int i = 0; i < clientChannels; i++)
105 {
106 var target = serverTargets.ElementAt(i % serverTargets.Count());
107 var channel = new Channel(target, credentials, channelOptions);
108 result.Add(channel);
109 }
110 return result;
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700111 }
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700112
113 private static BasicProfiler GetNextProfiler()
114 {
115 BasicProfiler result = null;
116 profilers.TryTake(out result);
117 return result;
118 }
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700119 }
120
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700121 internal class ClientRunnerImpl : IClientRunner
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700122 {
Jan Tattermusch18ce9d62015-11-18 15:41:10 -0800123 const double SecondsToNanos = 1e9;
124
Jan Tattermusche81adf32016-04-04 15:59:50 -0700125 readonly List<Channel> channels;
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700126 readonly ClientType clientType;
127 readonly RpcType rpcType;
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700128 readonly PayloadConfig payloadConfig;
Jan Tattermuschd7079b22017-05-16 19:55:57 +0200129 readonly Lazy<byte[]> cachedByteBufferRequest;
Jan Tattermusch19e3d3b2017-05-17 11:05:06 +0200130 readonly ThreadLocal<Histogram> threadLocalHistogram;
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700131
Jan Tattermusche81adf32016-04-04 15:59:50 -0700132 readonly List<Task> runnerTasks;
133 readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700134 readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700135 readonly AtomicCounter statsResetCount = new AtomicCounter();
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700136
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700137 public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams, Func<BasicProfiler> profilerFactory)
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700138 {
Jan Tattermusche81adf32016-04-04 15:59:50 -0700139 GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel");
Jan Tattermusch8c839e82016-04-12 15:12:08 -0700140 GrpcPreconditions.CheckNotNull(histogramParams, "histogramParams");
Jan Tattermusche81adf32016-04-04 15:59:50 -0700141 this.channels = new List<Channel>(channels);
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700142 this.clientType = clientType;
143 this.rpcType = rpcType;
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700144 this.payloadConfig = payloadConfig;
Jan Tattermuschd7079b22017-05-16 19:55:57 +0200145 this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]);
Jan Tattermusch19e3d3b2017-05-17 11:05:06 +0200146 this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true);
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700147
Jan Tattermusche81adf32016-04-04 15:59:50 -0700148 this.runnerTasks = new List<Task>();
149 foreach (var channel in this.channels)
150 {
151 for (int i = 0; i < outstandingRpcsPerChannel; i++)
152 {
153 var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700154 var optionalProfiler = profilerFactory();
155 this.runnerTasks.Add(RunClientAsync(channel, timer, optionalProfiler));
Jan Tattermusche81adf32016-04-04 15:59:50 -0700156 }
157 }
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700158 }
159
160 public ClientStats GetStats(bool reset)
161 {
Jan Tattermusch19e3d3b2017-05-17 11:05:06 +0200162 var histogramData = new HistogramData();
163 foreach (var hist in threadLocalHistogram.Values)
164 {
165 hist.GetSnapshot(histogramData, reset);
166 }
167
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700168 var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
169
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700170 if (reset)
171 {
172 statsResetCount.Increment();
173 }
174
Jan Tattermusch8251fdd2017-06-01 19:26:12 +0200175 GrpcEnvironment.Logger.Info("[ClientRunnerImpl.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (histogram reset count:{3}, seconds since reset: {4})",
176 GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), statsResetCount.Count, secondsElapsed);
Jan Tattermusch01ce2362017-04-25 20:53:51 +0200177
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700178 // TODO: populate user time and system time
179 return new ClientStats
180 {
181 Latencies = histogramData,
182 TimeElapsed = secondsElapsed,
183 TimeUser = 0,
184 TimeSystem = 0
185 };
186 }
187
188 public async Task StopAsync()
189 {
190 stoppedCts.Cancel();
Jan Tattermusche81adf32016-04-04 15:59:50 -0700191 foreach (var runnerTask in runnerTasks)
192 {
193 await runnerTask;
194 }
195 foreach (var channel in channels)
196 {
197 await channel.ShutdownAsync();
198 }
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700199 }
200
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700201 private void RunUnary(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler)
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700202 {
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700203 if (optionalProfiler != null)
204 {
205 Profilers.SetForCurrentThread(optionalProfiler);
206 }
207
208 bool profilerReset = false;
209
Jan Tattermuschf41ebc32016-06-22 12:47:14 -0700210 var client = new BenchmarkService.BenchmarkServiceClient(channel);
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700211 var request = CreateSimpleRequest();
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700212 var stopwatch = new Stopwatch();
213
214 while (!stoppedCts.Token.IsCancellationRequested)
215 {
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700216 // after the first stats reset, also reset the profiler.
217 if (optionalProfiler != null && !profilerReset && statsResetCount.Count > 0)
218 {
219 optionalProfiler.Reset();
220 profilerReset = true;
221 }
222
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700223 stopwatch.Restart();
224 client.UnaryCall(request);
225 stopwatch.Stop();
226
Jan Tattermusch18ce9d62015-11-18 15:41:10 -0800227 // spec requires data point in nanoseconds.
Jan Tattermusch19e3d3b2017-05-17 11:05:06 +0200228 threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
Jan Tattermusche81adf32016-04-04 15:59:50 -0700229
230 timer.WaitForNext();
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700231 }
232 }
233
Jan Tattermusche81adf32016-04-04 15:59:50 -0700234 private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer)
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700235 {
Jan Tattermuschf41ebc32016-06-22 12:47:14 -0700236 var client = new BenchmarkService.BenchmarkServiceClient(channel);
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700237 var request = CreateSimpleRequest();
238 var stopwatch = new Stopwatch();
239
240 while (!stoppedCts.Token.IsCancellationRequested)
241 {
242 stopwatch.Restart();
243 await client.UnaryCallAsync(request);
244 stopwatch.Stop();
245
246 // spec requires data point in nanoseconds.
Jan Tattermusch19e3d3b2017-05-17 11:05:06 +0200247 threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
Jan Tattermusche81adf32016-04-04 15:59:50 -0700248
249 await timer.WaitForNextAsync();
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700250 }
251 }
252
Jan Tattermusche81adf32016-04-04 15:59:50 -0700253 private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer)
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700254 {
Jan Tattermuschf41ebc32016-06-22 12:47:14 -0700255 var client = new BenchmarkService.BenchmarkServiceClient(channel);
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700256 var request = CreateSimpleRequest();
257 var stopwatch = new Stopwatch();
258
259 using (var call = client.StreamingCall())
260 {
261 while (!stoppedCts.Token.IsCancellationRequested)
262 {
263 stopwatch.Restart();
264 await call.RequestStream.WriteAsync(request);
265 await call.ResponseStream.MoveNext();
266 stopwatch.Stop();
267
268 // spec requires data point in nanoseconds.
Jan Tattermusch19e3d3b2017-05-17 11:05:06 +0200269 threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
Jan Tattermusche81adf32016-04-04 15:59:50 -0700270
271 await timer.WaitForNextAsync();
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700272 }
273
274 // finish the streaming call
275 await call.RequestStream.CompleteAsync();
276 Assert.IsFalse(await call.ResponseStream.MoveNext());
277 }
278 }
279
Jan Tattermusche81adf32016-04-04 15:59:50 -0700280 private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700281 {
Jan Tattermuschd7079b22017-05-16 19:55:57 +0200282 var request = cachedByteBufferRequest.Value;
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700283 var stopwatch = new Stopwatch();
284
Jan Tattermusch253769e2016-03-21 16:25:59 -0700285 var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700286
287 using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
288 {
289 while (!stoppedCts.Token.IsCancellationRequested)
290 {
291 stopwatch.Restart();
292 await call.RequestStream.WriteAsync(request);
293 await call.ResponseStream.MoveNext();
294 stopwatch.Stop();
295
296 // spec requires data point in nanoseconds.
Jan Tattermusch19e3d3b2017-05-17 11:05:06 +0200297 threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
Jan Tattermusche81adf32016-04-04 15:59:50 -0700298
299 await timer.WaitForNextAsync();
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700300 }
301
302 // finish the streaming call
303 await call.RequestStream.CompleteAsync();
304 Assert.IsFalse(await call.ResponseStream.MoveNext());
305 }
306 }
307
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700308 private Task RunClientAsync(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler)
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700309 {
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700310 if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
311 {
Jan Tattermusch87ba2942016-05-16 17:18:00 -0700312 GrpcPreconditions.CheckArgument(clientType == ClientType.AsyncClient, "Generic client only supports async API");
313 GrpcPreconditions.CheckArgument(rpcType == RpcType.Streaming, "Generic client only supports streaming calls");
Jan Tattermusche458d842016-05-09 14:21:52 -0700314 return RunGenericStreamingAsync(channel, timer);
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700315 }
316
317 GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
Jan Tattermusch87ba2942016-05-16 17:18:00 -0700318 if (clientType == ClientType.SyncClient)
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700319 {
Jan Tattermusch87ba2942016-05-16 17:18:00 -0700320 GrpcPreconditions.CheckArgument(rpcType == RpcType.Unary, "Sync client can only be used for Unary calls in C#");
Jan Tattermusche458d842016-05-09 14:21:52 -0700321 // create a dedicated thread for the synchronous client
Jan Tattermusch09c5e052016-05-18 22:24:27 -0700322 return Task.Factory.StartNew(() => RunUnary(channel, timer, optionalProfiler), TaskCreationOptions.LongRunning);
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700323 }
Jan Tattermusch87ba2942016-05-16 17:18:00 -0700324 else if (clientType == ClientType.AsyncClient)
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700325 {
326 switch (rpcType)
327 {
Jan Tattermusch87ba2942016-05-16 17:18:00 -0700328 case RpcType.Unary:
Jan Tattermusche458d842016-05-09 14:21:52 -0700329 return RunUnaryAsync(channel, timer);
Jan Tattermusch87ba2942016-05-16 17:18:00 -0700330 case RpcType.Streaming:
Jan Tattermusche458d842016-05-09 14:21:52 -0700331 return RunStreamingPingPongAsync(channel, timer);
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700332 }
333 }
334 throw new ArgumentException("Unsupported configuration.");
335 }
336
337 private SimpleRequest CreateSimpleRequest()
338 {
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700339 GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700340 return new SimpleRequest
341 {
Jan Tattermusche45ca5f2016-03-21 14:59:08 -0700342 Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
343 ResponseSize = payloadConfig.SimpleParams.RespSize
Jan Tattermusche26e2e52016-03-21 13:52:50 -0700344 };
345 }
346
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700347 private static Payload CreateZerosPayload(int size)
348 {
349 return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
350 }
Jan Tattermusche81adf32016-04-04 15:59:50 -0700351
352 private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier)
353 {
354 switch (loadParams.LoadCase)
355 {
356 case LoadParams.LoadOneofCase.ClosedLoop:
357 return new ClosedLoopInterarrivalTimer();
358 case LoadParams.LoadOneofCase.Poisson:
359 return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier);
360 default:
361 throw new ArgumentException("Unknown load type");
362 }
363 }
Jan Tattermuschd0c1bfa2015-10-22 19:14:57 -0700364 }
365}