Fixes for C# cancellation support
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 8c4b92f..26a1a68 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -166,27 +166,6 @@
}
[Test]
- public void ClientStreamingCall_ServerHandlerThrows()
- {
- Task.Run(async () =>
- {
- var call = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty);
- var callResult = Calls.AsyncClientStreamingCall(call, CancellationToken.None);
- // TODO(jtattermusch): if we send "A", "THROW", "C", server hangs.
- await callResult.RequestStream.WriteAll(new string[] { "A", "B", "THROW" });
-
- try
- {
- await callResult.Result;
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- }
- }).Wait();
- }
-
- [Test]
public void ClientStreamingCall_CancelAfterBegin()
{
Task.Run(async () =>
@@ -195,6 +174,9 @@
var cts = new CancellationTokenSource();
var callResult = Calls.AsyncClientStreamingCall(call, cts.Token);
+
+ // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
+ await Task.Delay(1000);
cts.Cancel();
try
@@ -260,7 +242,9 @@
}
result += request;
});
- return result;
+ // simulate processing takes some time.
+ await Task.Delay(250);
+ return result;
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index b911cdc..7cf0f6f 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -180,7 +180,8 @@
{
if (!disposed && call != null)
{
- if (halfclosed && readingDone && finished)
+ bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null);
+ if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
return true;
@@ -207,8 +208,9 @@
protected void CheckSendingAllowed()
{
Preconditions.CheckState(started);
- Preconditions.CheckState(!disposed);
Preconditions.CheckState(!errorOccured);
+ CheckNotCancelled();
+ Preconditions.CheckState(!disposed);
Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
@@ -221,7 +223,14 @@
Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!readingDone, "Stream has already been closed.");
- Preconditions.CheckState(readCompletionDelegate == null, "Only one write can be pending at a time");
+ Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
+ }
+
+ protected void CheckNotCancelled() {
+ if (cancelRequested)
+ {
+ throw new OperationCanceledException("Remote call has been cancelled.");
+ }
}
protected byte[] UnsafeSerialize(TWrite msg)
@@ -292,6 +301,8 @@
});
}
+
+
/// <summary>
/// Handles send completion.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 4775f2d..3c66c67 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -123,18 +123,23 @@
/// </summary>
private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
+ bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
+
lock (myLock)
{
finished = true;
- if (readCompletionDelegate == null)
+ if (cancelled)
{
- // allow disposal of native call
- readingDone = true;
+ // Once we cancel, we don't have to care that much
+ // about reads and writes.
+ Cancel();
}
ReleaseResourcesIfPossible();
}
+ // TODO(jtattermusch): check if call was cancelled.
+
// TODO: handle error ...
finishedServersideTcs.SetResult(null);
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
index 3c54753..b562aba 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
@@ -61,6 +61,9 @@
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
+
public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
{
SetHandle(handle);
@@ -94,5 +97,10 @@
{
return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
}
+
+ public bool GetReceivedCloseOnServerCancelled()
+ {
+ return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
+ }
}
}
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 0416ead..01b2a11 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -80,7 +80,14 @@
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
@@ -121,7 +128,15 @@
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
@@ -151,15 +166,30 @@
Status status = Status.DefaultSuccess;
try
{
- var result = await handler(requestStream);
- await responseStream.Write(result);
- }
+ var result = await handler(requestStream);
+ try
+ {
+ await responseStream.Write(result);
+ }
+ catch (OperationCanceledException)
+ {
+ status = Status.DefaultCancelled;
+ }
+ }
catch (Exception e)
{
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
@@ -196,7 +226,14 @@
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs
index b588170..754f6cb 100644
--- a/src/csharp/Grpc.Core/Status.cs
+++ b/src/csharp/Grpc.Core/Status.cs
@@ -44,6 +44,11 @@
/// </summary>
public static readonly Status DefaultSuccess = new Status(StatusCode.OK, "");
+ /// <summary>
+ /// Default result of a cancelled RPC. StatusCode=Cancelled, empty details message.
+ /// </summary>
+ public static readonly Status DefaultCancelled = new Status(StatusCode.Cancelled, "");
+
readonly StatusCode statusCode;
readonly string detail;
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 440702d..a433659 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -366,6 +366,8 @@
var cts = new CancellationTokenSource();
var call = client.StreamingInputCall(cts.Token);
+ // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
+ await Task.Delay(1000);
cts.Cancel();
try
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index fb8b757..a8cc1b2 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -277,6 +277,12 @@
return ctx->server_rpc_new.call_details.method;
}
+GPR_EXPORT gpr_int32 GPR_CALLTYPE
+grpcsharp_batch_context_recv_close_on_server_cancelled(
+ const grpcsharp_batch_context *ctx) {
+ return (gpr_int32) ctx->recv_close_on_server_cancelled;
+}
+
/* Init & shutdown */
GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); }