A new version C# API based on async/await
diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs
index 032372b..dba5a77 100644
--- a/src/csharp/Grpc.Examples/MathExamples.cs
+++ b/src/csharp/Grpc.Examples/MathExamples.cs
@@ -61,9 +61,8 @@
public static async Task FibExample(MathGrpc.IMathServiceClient stub)
{
- var recorder = new RecordingObserver<Num>();
- stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder);
- List<Num> result = await recorder.ToList();
+ var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build());
+ List<Num> result = await call.ResponseStream.ToList();
Console.WriteLine("Fib Result: " + string.Join("|", result));
}
@@ -76,9 +75,9 @@
new Num.Builder { Num_ = 3 }.Build()
};
- var clientStreamingResult = stub.Sum();
- numbers.Subscribe(clientStreamingResult.Inputs);
- Console.WriteLine("Sum Result: " + await clientStreamingResult.Task);
+ var call = stub.Sum();
+ await call.RequestStream.WriteAll(numbers);
+ Console.WriteLine("Sum Result: " + await call.Result);
}
public static async Task DivManyExample(MathGrpc.IMathServiceClient stub)
@@ -89,12 +88,9 @@
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
-
- var recorder = new RecordingObserver<DivReply>();
- var inputs = stub.DivMany(recorder);
- divArgsList.Subscribe(inputs);
- var result = await recorder.ToList();
- Console.WriteLine("DivMany Result: " + string.Join("|", result));
+ var call = stub.DivMany();
+ await call.RequestStream.WriteAll(divArgsList);
+ Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList()));
}
public static async Task DependendRequestsExample(MathGrpc.IMathServiceClient stub)
@@ -106,9 +102,9 @@
new Num.Builder { Num_ = 3 }.Build()
};
- var clientStreamingResult = stub.Sum();
- numbers.Subscribe(clientStreamingResult.Inputs);
- Num sum = await clientStreamingResult.Task;
+ var sumCall = stub.Sum();
+ await sumCall.RequestStream.WriteAll(numbers);
+ Num sum = await sumCall.Result;
DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build());
Console.WriteLine("Avg Result: " + result);
diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs
index 24e6a1d..60408b9 100644
--- a/src/csharp/Grpc.Examples/MathGrpc.cs
+++ b/src/csharp/Grpc.Examples/MathGrpc.cs
@@ -82,11 +82,11 @@
Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
- void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
+ AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken));
- ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken));
+ AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken));
- IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken));
}
public class MathServiceClientStub : AbstractStub<MathServiceClientStub, StubConfiguration>, IMathServiceClient
@@ -111,35 +111,35 @@
return Calls.AsyncUnaryCall(call, request, token);
}
- public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(ServiceName, FibMethod);
- Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
+ return Calls.AsyncServerStreamingCall(call, request, token);
}
- public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(ServiceName, SumMethod);
return Calls.AsyncClientStreamingCall(call, token);
}
- public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(ServiceName, DivManyMethod);
- return Calls.DuplexStreamingCall(call, responseObserver, token);
+ return Calls.AsyncDuplexStreamingCall(call, token);
}
}
// server-side interface
public interface IMathService
{
- void Div(DivArgs request, IObserver<DivReply> responseObserver);
+ Task<DivReply> Div(DivArgs request);
- void Fib(FibArgs request, IObserver<Num> responseObserver);
+ Task Fib(FibArgs request, IServerStreamWriter<Num> responseStream);
- IObserver<Num> Sum(IObserver<Num> responseObserver);
+ Task<Num> Sum(IAsyncStreamReader<Num> requestStream);
- IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver);
+ Task DivMany(IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream);
}
public static ServerServiceDefinition BindService(IMathService serviceImpl)
diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs
index 0b2357e..83ec2a8 100644
--- a/src/csharp/Grpc.Examples/MathServiceImpl.cs
+++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs
@@ -36,6 +36,7 @@
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Grpc.Core;
using Grpc.Core.Utils;
namespace math
@@ -45,18 +46,16 @@
/// </summary>
public class MathServiceImpl : MathGrpc.IMathService
{
- public void Div(DivArgs request, IObserver<DivReply> responseObserver)
+ public Task<DivReply> Div(DivArgs request)
{
- var response = DivInternal(request);
- responseObserver.OnNext(response);
- responseObserver.OnCompleted();
+ return Task.FromResult(DivInternal(request));
}
- public void Fib(FibArgs request, IObserver<Num> responseObserver)
+ public async Task Fib(FibArgs request, IServerStreamWriter<Num> responseStream)
{
if (request.Limit <= 0)
{
- // TODO: support cancellation....
+ // TODO(jtattermusch): support cancellation
throw new NotImplementedException("Not implemented yet");
}
@@ -64,34 +63,27 @@
{
foreach (var num in FibInternal(request.Limit))
{
- responseObserver.OnNext(num);
+ await responseStream.Write(num);
}
- responseObserver.OnCompleted();
}
}
- public IObserver<Num> Sum(IObserver<Num> responseObserver)
+ public async Task<Num> Sum(IAsyncStreamReader<Num> requestStream)
{
- var recorder = new RecordingObserver<Num>();
- Task.Factory.StartNew(() =>
+ long sum = 0;
+ await requestStream.ForEach(async num =>
{
- List<Num> inputs = recorder.ToList().Result;
-
- long sum = 0;
- foreach (Num num in inputs)
- {
- sum += num.Num_;
- }
-
- responseObserver.OnNext(Num.CreateBuilder().SetNum_(sum).Build());
- responseObserver.OnCompleted();
+ sum += num.Num_;
});
- return recorder;
+ return Num.CreateBuilder().SetNum_(sum).Build();
}
- public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver)
+ public async Task DivMany(IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream)
{
- return new DivObserver(responseObserver);
+ await requestStream.ForEach(async divArgs =>
+ {
+ await responseStream.Write(DivInternal(divArgs));
+ });
}
static DivReply DivInternal(DivArgs args)
@@ -114,31 +106,6 @@
b = temp + b;
yield return new Num.Builder { Num_ = a }.Build();
}
- }
-
- private class DivObserver : IObserver<DivArgs>
- {
- readonly IObserver<DivReply> responseObserver;
-
- public DivObserver(IObserver<DivReply> responseObserver)
- {
- this.responseObserver = responseObserver;
- }
-
- public void OnCompleted()
- {
- responseObserver.OnCompleted();
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnNext(DivArgs value)
- {
- responseObserver.OnNext(DivInternal(value));
- }
- }
+ }
}
}