Merge pull request #6628 from stanley-cheung/php-update-package-xml-0-14-2
PHP: update package.xml after fixes
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
index 6330f50..850d70c 100644
--- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@@ -89,5 +90,43 @@
channel.ShutdownAsync().Wait();
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync());
}
+
+ [Test]
+ public async Task ShutdownTokenCancelledAfterShutdown()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested);
+ var shutdownTask = channel.ShutdownAsync();
+ Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested);
+ await shutdownTask;
+ }
+
+ [Test]
+ public async Task StateIsFatalFailureAfterShutdown()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ await channel.ShutdownAsync();
+ Assert.AreEqual(ChannelState.FatalFailure, channel.State);
+ }
+
+ [Test]
+ public async Task ShutdownFinishesWaitForStateChangedAsync()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle);
+ var shutdownTask = channel.ShutdownAsync();
+ await stateChangedTask;
+ await shutdownTask;
+ }
+
+ [Test]
+ public async Task OperationsThrowAfterShutdown()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ await channel.ShutdownAsync();
+ Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle));
+ Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; });
+ Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync());
+ }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index abe9d4a..777a1c8 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -181,13 +181,14 @@
}
[Test]
- public void ClientStreaming_WriteFailure()
+ public void ClientStreaming_WriteCompletionFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
+ // TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
@@ -199,7 +200,7 @@
}
[Test]
- public void ClientStreaming_WriteAfterReceivingStatusFails()
+ public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -210,7 +211,44 @@
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+ Assert.AreEqual(Status.DefaultSuccess, ex.Status);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
+ var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+ Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ requestStream.CompleteAsync();
+
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
+
+ fakeCall.SendCompletionHandler(true);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
@@ -229,7 +267,7 @@
}
[Test]
- public void ClientStreaming_WriteAfterCancellationRequestFails()
+ public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -340,7 +378,7 @@
}
[Test]
- public void DuplexStreaming_WriteAfterReceivingStatusFails()
+ public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -352,7 +390,8 @@
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
- Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
+ Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
@@ -372,7 +411,7 @@
}
[Test]
- public void DuplexStreaming_WriteAfterCancellationRequestFails()
+ public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 89981b1..93a6e6a 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -51,6 +52,7 @@
readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
+ readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
readonly string target;
readonly GrpcEnvironment environment;
@@ -101,12 +103,13 @@
/// <summary>
/// Gets current connectivity state of this channel.
+ /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned.
/// </summary>
public ChannelState State
{
get
{
- return handle.CheckConnectivityState(false);
+ return GetConnectivityState(false);
}
}
@@ -155,6 +158,17 @@
}
/// <summary>
+ /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked.
+ /// </summary>
+ public CancellationToken ShutdownToken
+ {
+ get
+ {
+ return this.shutdownTokenSource.Token;
+ }
+ }
+
+ /// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,
/// or channel enters the FatalFailure state, the task is cancelled.
@@ -164,7 +178,7 @@
/// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param>
public async Task ConnectAsync(DateTime? deadline = null)
{
- var currentState = handle.CheckConnectivityState(true);
+ var currentState = GetConnectivityState(true);
while (currentState != ChannelState.Ready)
{
if (currentState == ChannelState.FatalFailure)
@@ -172,7 +186,7 @@
throw new OperationCanceledException("Channel has reached FatalFailure state.");
}
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
- currentState = handle.CheckConnectivityState(false);
+ currentState = GetConnectivityState(false);
}
}
@@ -188,6 +202,8 @@
shutdownRequested = true;
}
+ shutdownTokenSource.Cancel();
+
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
@@ -231,6 +247,18 @@
activeCallCounter.Decrement();
}
+ private ChannelState GetConnectivityState(bool tryToConnect)
+ {
+ try
+ {
+ return handle.CheckConnectivityState(tryToConnect);
+ }
+ catch (ObjectDisposedException)
+ {
+ return ChannelState.FatalFailure;
+ }
+ }
+
private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
{
var key = ChannelOptions.PrimaryUserAgentString;
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f522174..5535186 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -57,7 +57,7 @@
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
- // Indicates that steaming call has finished.
+ // Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received.
@@ -443,6 +443,19 @@
}
}
+ protected override void CheckSendingAllowed(bool allowFinished)
+ {
+ base.CheckSendingAllowed(true);
+
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ if (!allowFinished && finishedStatus.HasValue)
+ {
+ throw new RpcException(finishedStatus.Value.Status);
+ }
+ }
+
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 42234dc..4de2370 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -213,7 +213,7 @@
{
}
- protected void CheckSendingAllowed(bool allowFinished)
+ protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 70355b3..1541cfd 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -492,6 +492,10 @@
{
// Deadline was reached before write has started. Eat the exception and continue.
}
+ catch (RpcException)
+ {
+ // Deadline was reached before write has started. Eat the exception and continue.
+ }
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.