Merge pull request #2818 from ctiller/y12kdm3
Add a test of non-blocking API behavior
diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h
index 640c1fd..7f8f4d4 100644
--- a/include/grpc/grpc_security.h
+++ b/include/grpc/grpc_security.h
@@ -275,12 +275,18 @@
/* --- Auth Metadata Processing --- */
/* Callback function that is called when the metadata processing is done.
- success is 1 if processing succeeded, 0 otherwise.
- Consumed metadata will be removed from the set of metadata available on the
- call. */
+ - Consumed metadata will be removed from the set of metadata available on the
+ call. consumed_md may be NULL if no metadata has been consumed.
+ - Response metadata will be set on the response. response_md may be NULL.
+ - status is GRPC_STATUS_OK for success or a specific status for an error.
+ Common error status for auth metadata processing is either
+ GRPC_STATUS_UNAUTHENTICATED in case of an authentication failure or
+ GRPC_STATUS PERMISSION_DENIED in case of an authorization failure.
+ - error_details gives details about the error. May be NULL. */
typedef void (*grpc_process_auth_metadata_done_cb)(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
- int success);
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details);
/* Pluggable server-side metadata processor object. */
typedef struct {
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 2f42f01..6e83143 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -104,24 +104,34 @@
return md;
}
-static void on_md_processing_done(void *user_data,
- const grpc_metadata *consumed_md,
- size_t num_consumed_md, int success) {
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (success) {
+ /* TODO(jboeuf): Implement support for response_md. */
+ if (response_md != NULL && num_response_md > 0) {
+ gpr_log(GPR_INFO,
+ "response_md in auth metadata processing not supported for now. "
+ "Ignoring...");
+ }
+
+ if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md,
elem);
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1);
} else {
- gpr_slice message = gpr_slice_from_copied_string(
- "Authentication metadata processing failed.");
+ gpr_slice message;
+ error_details = error_details != NULL
+ ? error_details
+ : "Authentication metadata processing failed.";
+ message = gpr_slice_from_copied_string(error_details);
grpc_sopb_reset(calld->recv_ops);
- grpc_transport_stream_op_add_close(&calld->transport_op,
- GRPC_STATUS_UNAUTHENTICATED, &message);
+ grpc_transport_stream_op_add_close(&calld->transport_op, status, &message);
grpc_call_next_op(elem, &calld->transport_op);
}
}
diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore
index ae48956..48365e3 100644
--- a/src/csharp/.gitignore
+++ b/src/csharp/.gitignore
@@ -5,4 +5,5 @@
packages
Grpc.v12.suo
TestResult.xml
+/TestResults
*.nupkg
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index ad4e94a..b571fe9 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -65,6 +65,7 @@
</Compile>
<Compile Include="ClientBaseTest.cs" />
<Compile Include="ShutdownTest.cs" />
+ <Compile Include="Internal\AsyncCallTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 4fdfab5..78295cf 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -53,7 +53,7 @@
{
var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.AddRef();
- Assert.IsTrue(object.ReferenceEquals(env1, env2));
+ Assert.AreSame(env1, env2);
GrpcEnvironment.Release();
GrpcEnvironment.Release();
}
@@ -61,18 +61,21 @@
[Test]
public void InitializeAfterShutdown()
{
+ Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
+
var env1 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
var env2 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
- Assert.IsFalse(object.ReferenceEquals(env1, env2));
+ Assert.AreNotSame(env1, env2);
}
[Test]
public void ReleaseWithoutAddRef()
{
+ Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
new file mode 100644
index 0000000..685c5f7
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -0,0 +1,222 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class AsyncCallTest
+ {
+ Channel channel;
+ FakeNativeCall fakeCall;
+ AsyncCall<string, string> asyncCall;
+
+ [SetUp]
+ public void Init()
+ {
+ channel = new Channel("localhost", Credentials.Insecure);
+
+ fakeCall = new FakeNativeCall();
+
+ var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions());
+ asyncCall = new AsyncCall<string, string>(callDetails, fakeCall);
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.ShutdownAsync().Wait();
+ }
+
+ [Test]
+ public void AsyncUnary_CompletionSuccess()
+ {
+ var resultTask = asyncCall.UnaryCallAsync("abc");
+ fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }, new Metadata());
+ Assert.IsTrue(resultTask.IsCompleted);
+ Assert.IsTrue(fakeCall.IsDisposed);
+ Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
+ }
+
+ [Test]
+ public void AsyncUnary_CompletionFailure()
+ {
+ var resultTask = asyncCall.UnaryCallAsync("abc");
+ fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(new Status(StatusCode.Internal, ""), null), new byte[] { 1, 2, 3 }, new Metadata());
+
+ Assert.IsTrue(resultTask.IsCompleted);
+ Assert.IsTrue(fakeCall.IsDisposed);
+
+ Assert.AreEqual(StatusCode.Internal, asyncCall.GetStatus().StatusCode);
+ Assert.IsNull(asyncCall.GetTrailers());
+ var ex = Assert.Throws<RpcException>(() => resultTask.GetAwaiter().GetResult());
+ Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
+ }
+
+ internal class FakeNativeCall : INativeCall
+ {
+ public UnaryResponseClientHandler UnaryResponseClientHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedMessageHandler ReceivedMessageHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
+ {
+ get;
+ set;
+ }
+
+ public SendCompletionHandler SendCompletionHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
+ {
+ get;
+ set;
+ }
+
+ public bool IsCancelled
+ {
+ get;
+ set;
+ }
+
+ public bool IsDisposed
+ {
+ get;
+ set;
+ }
+
+ public void Cancel()
+ {
+ IsCancelled = true;
+ }
+
+ public void CancelWithStatus(Status status)
+ {
+ IsCancelled = true;
+ }
+
+ public string GetPeer()
+ {
+ return "PEER";
+ }
+
+ public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ UnaryResponseClientHandler = callback;
+ }
+
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ UnaryResponseClientHandler = callback;
+ }
+
+ public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ ReceivedStatusOnClientHandler = callback;
+ }
+
+ public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ ReceivedStatusOnClientHandler = callback;
+ }
+
+ public void StartReceiveMessage(ReceivedMessageHandler callback)
+ {
+ ReceivedMessageHandler = callback;
+ }
+
+ public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+ {
+ ReceivedResponseHeadersHandler = callback;
+ }
+
+ public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendCloseFromClient(SendCompletionHandler callback)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartServerSide(ReceivedCloseOnServerHandler callback)
+ {
+ ReceivedCloseOnServerHandler = callback;
+ }
+
+ public void Dispose()
+ {
+ IsDisposed = true;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
index 7060067..a1648f3 100644
--- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
@@ -32,13 +32,16 @@
#endregion
using System;
+using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
+
using NUnit.Framework;
namespace Grpc.Core.Tests
@@ -74,6 +77,80 @@
}
[Test]
+ public async Task ResponseHeadersAsync_UnaryCall()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ return "PASS";
+ });
+
+ var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "");
+ var responseHeaders = await call.ResponseHeadersAsync;
+
+ Assert.AreEqual(headers.Count, responseHeaders.Count);
+ Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+ Assert.AreEqual("abcdefg", responseHeaders[0].Value);
+
+ Assert.AreEqual("PASS", await call.ResponseAsync);
+ }
+
+ [Test]
+ public async Task ResponseHeadersAsync_ClientStreamingCall()
+ {
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ return "PASS";
+ });
+
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
+ await call.RequestStream.CompleteAsync();
+ var responseHeaders = await call.ResponseHeadersAsync;
+
+ Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+ Assert.AreEqual("PASS", await call.ResponseAsync);
+ }
+
+ [Test]
+ public async Task ResponseHeadersAsync_ServerStreamingCall()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ await responseStream.WriteAsync("PASS");
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+ var responseHeaders = await call.ResponseHeadersAsync;
+
+ Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+ CollectionAssert.AreEqual(new[] { "PASS" }, await call.ResponseStream.ToListAsync());
+ }
+
+ [Test]
+ public async Task ResponseHeadersAsync_DuplexStreamingCall()
+ {
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ while (await requestStream.MoveNext())
+ {
+ await responseStream.WriteAsync(requestStream.Current);
+ }
+ });
+
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
+ var responseHeaders = await call.ResponseHeadersAsync;
+
+ var messages = new[] { "PASS" };
+ await call.RequestStream.WriteAllAsync(messages);
+
+ Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+ CollectionAssert.AreEqual(messages, await call.ResponseStream.ToListAsync());
+ }
+
+ [Test]
public void WriteResponseHeaders_NullNotAllowed()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index fb9b562..dbaa308 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -44,14 +44,16 @@
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> responseAsync;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -69,6 +71,17 @@
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Async stream to send streaming requests.
/// </summary>
public IClientStreamWriter<TRequest> RequestStream
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index 183c842..ee7ba29 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Threading.Tasks;
namespace Grpc.Core
{
@@ -42,14 +43,16 @@
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -78,6 +81,17 @@
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index ab2049f..2853a79 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Threading.Tasks;
namespace Grpc.Core
{
@@ -41,13 +42,15 @@
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseStream = responseStream;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -65,6 +68,17 @@
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
index 224e343..154a17a 100644
--- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs
+++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
@@ -43,13 +43,15 @@
public sealed class AsyncUnaryCall<TResponse> : IDisposable
{
readonly Task<TResponse> responseAsync;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseAsync = responseAsync;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -67,6 +69,17 @@
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Allows awaiting this object directly.
/// </summary>
public TaskAwaiter<TResponse> GetAwaiter()
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 7067456..e57ac89 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -74,7 +74,7 @@
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var asyncResult = asyncCall.UnaryCallAsync(req);
- return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
@@ -93,7 +93,7 @@
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
asyncCall.StartServerStreamingCall(req);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
@@ -110,7 +110,7 @@
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
- return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
@@ -130,7 +130,7 @@
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
}
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 2f8519d..c11b320 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -58,7 +58,6 @@
readonly List<ChannelOption> options;
bool shutdownRequested;
- bool disposed;
/// <summary>
/// Creates a channel that connects to a specific host.
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 055aff1..ad2af17 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -49,6 +49,7 @@
<Compile Include="AsyncDuplexStreamingCall.cs" />
<Compile Include="AsyncServerStreamingCall.cs" />
<Compile Include="IClientStreamWriter.cs" />
+ <Compile Include="Internal\INativeCall.cs" />
<Compile Include="IServerStreamWriter.cs" />
<Compile Include="IAsyncStreamWriter.cs" />
<Compile Include="IAsyncStreamReader.cs" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 0a44eea..e7c0418 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -102,6 +102,14 @@
}
}
+ internal static int GetRefCount()
+ {
+ lock (staticLock)
+ {
+ return refCount;
+ }
+ }
+
/// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
@@ -177,7 +185,6 @@
return Marshal.PtrToStringAnsi(ptr);
}
-
internal static void GrpcNativeInit()
{
grpcsharp_init();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index bb9ba5b..be5d611 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -51,22 +51,35 @@
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
readonly CallInvocationDetails<TRequest, TResponse> details;
+ readonly INativeCall injectedNativeCall; // for testing
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // Indicates that steaming call has finished.
+ TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+
+ // Response headers set here once received.
+ TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
+
// Set after status is received. Used for both unary and streaming response calls.
ClientSideStatus? finishedStatus;
- bool readObserverCompleted; // True if readObserver has already been completed.
-
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
- : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
+ /// <summary>
+ /// This constructor should only be used for testing.
+ /// </summary>
+ public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
+ {
+ this.injectedNativeCall = injectedNativeCall;
+ }
+
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
// it is reusing fair amount of code in this class, so we are leaving it here.
/// <summary>
@@ -100,7 +113,7 @@
bool success = (ev.success != 0);
try
{
- HandleUnaryResponse(success, ctx);
+ HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
}
catch (Exception e)
{
@@ -125,7 +138,7 @@
Preconditions.CheckState(!started);
started = true;
- Initialize(details.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -152,7 +165,7 @@
Preconditions.CheckState(!started);
started = true;
- Initialize(details.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
readingDone = true;
@@ -176,10 +189,9 @@
Preconditions.CheckState(!started);
started = true;
- Initialize(details.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
halfcloseRequested = true;
- halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
@@ -187,6 +199,7 @@
{
call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
+ call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@@ -201,12 +214,13 @@
Preconditions.CheckState(!started);
started = true;
- Initialize(details.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
+ call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@@ -248,6 +262,28 @@
}
/// <summary>
+ /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
+ /// </summary>
+ public Task StreamingCallFinishedTask
+ {
+ get
+ {
+ return streamingCallFinishedTcs.Task;
+ }
+ }
+
+ /// <summary>
+ /// Get the task that completes once response headers are received.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return responseHeadersTcs.Task;
+ }
+ }
+
+ /// <summary>
/// Gets the resulting status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
@@ -281,36 +317,6 @@
}
}
- /// <summary>
- /// On client-side, we only fire readCompletionDelegate once all messages have been read
- /// and status has been received.
- /// </summary>
- protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
- {
- if (completionDelegate != null && readingDone && finishedStatus.HasValue)
- {
- bool shouldComplete;
- lock (myLock)
- {
- shouldComplete = !readObserverCompleted;
- readObserverCompleted = true;
- }
-
- if (shouldComplete)
- {
- var status = finishedStatus.Value.Status;
- if (status.StatusCode != StatusCode.OK)
- {
- FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
- }
- else
- {
- FireCompletion(completionDelegate, default(TResponse), null);
- }
- }
- }
- }
-
protected override void OnAfterReleaseResources()
{
details.Channel.RemoveCallReference(this);
@@ -318,18 +324,26 @@
private void Initialize(CompletionQueueSafeHandle cq)
{
- var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
-
- var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
- parentCall, ContextPropagationToken.DefaultMask, cq,
- details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
-
+ var call = CreateNativeCall(cq);
details.Channel.AddCallReference(this);
-
InitializeInternal(call);
RegisterCancellationCallback();
}
+ private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
+ {
+ if (injectedNativeCall != null)
+ {
+ return injectedNativeCall; // allows injecting a mock INativeCall in tests.
+ }
+
+ var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+ return details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
+ }
+
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
@@ -350,31 +364,31 @@
}
/// <summary>
+ /// Handles receive status completion for calls with streaming response.
+ /// </summary>
+ private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
+ {
+ responseHeadersTcs.SetResult(responseHeaders);
+ }
+
+ /// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
+ private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
- var fullStatus = ctx.GetReceivedStatusOnClient();
-
lock (myLock)
{
finished = true;
- finishedStatus = fullStatus;
-
- halfclosed = true;
+ finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
}
- if (!success)
- {
- unaryResponseTcs.SetException(new RpcException(new Status(StatusCode.Internal, "Internal error occured.")));
- return;
- }
+ responseHeadersTcs.SetResult(responseHeaders);
- var status = fullStatus.Status;
+ var status = receivedStatus.Status;
- if (status.StatusCode != StatusCode.OK)
+ if (!success || status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
return;
@@ -382,7 +396,7 @@
// TODO: handle deserialization error
TResponse msg;
- TryDeserialize(ctx.GetReceivedMessage(), out msg);
+ TryDeserialize(receivedMessage, out msg);
unaryResponseTcs.SetResult(msg);
}
@@ -390,22 +404,25 @@
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
- private void HandleFinished(bool success, BatchContextSafeHandle ctx)
+ private void HandleFinished(bool success, ClientSideStatus receivedStatus)
{
- var fullStatus = ctx.GetReceivedStatusOnClient();
-
- AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
lock (myLock)
{
finished = true;
- finishedStatus = fullStatus;
-
- origReadCompletionDelegate = readCompletionDelegate;
+ finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
}
- ProcessLastRead(origReadCompletionDelegate);
+ var status = receivedStatus.Status;
+
+ if (!success || status.StatusCode != StatusCode.OK)
+ {
+ streamingCallFinishedTcs.SetException(new RpcException(status));
+ return;
+ }
+
+ streamingCallFinishedTcs.SetResult(null);
}
}
}
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 1808294..4d20394 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -54,30 +54,30 @@
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
+ protected readonly GrpcEnvironment environment;
protected readonly object myLock = new object();
- protected CallSafeHandle call;
+ protected INativeCall call;
protected bool disposed;
protected bool started;
- protected bool errorOccured;
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected bool readingDone;
- protected bool halfcloseRequested;
- protected bool halfclosed;
+ protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
+ protected bool halfcloseRequested; // True if send close have been initiated.
protected bool finished; // True if close has been received from the peer.
protected bool initialMetadataSent;
- protected long streamingWritesCounter;
+ protected long streamingWritesCounter; // Number of streaming send operations started so far.
- public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
+ this.environment = Preconditions.CheckNotNull(environment);
}
/// <summary>
@@ -114,7 +114,7 @@
}
}
- protected void InitializeInternal(CallSafeHandle call)
+ protected void InitializeInternal(INativeCall call)
{
lock (myLock)
{
@@ -159,16 +159,6 @@
}
}
- // TODO(jtattermusch): find more fitting name for this method.
- /// <summary>
- /// Default behavior just completes the read observer, but more sofisticated behavior might be required
- /// by subclasses.
- /// </summary>
- protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate)
- {
- FireCompletion(completionDelegate, default(TRead), null);
- }
-
/// <summary>
/// If there are no more pending actions and no new actions can be started, releases
/// the underlying native resources.
@@ -177,7 +167,7 @@
{
if (!disposed && call != null)
{
- bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null);
+ bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@@ -204,11 +194,11 @@
protected void CheckSendingAllowed()
{
Preconditions.CheckState(started);
- Preconditions.CheckState(!errorOccured);
CheckNotCancelled();
Preconditions.CheckState(!disposed);
Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
+ Preconditions.CheckState(!finished, "Already finished.");
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
@@ -216,7 +206,6 @@
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
- Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!readingDone, "Stream has already been closed.");
Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
@@ -280,7 +269,7 @@
/// <summary>
/// Handles send completion.
/// </summary>
- protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
+ protected void HandleSendFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -304,12 +293,11 @@
/// <summary>
/// Handles halfclose completion.
/// </summary>
- protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
+ protected void HandleHalfclosed(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
- halfclosed = true;
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
@@ -329,23 +317,17 @@
/// <summary>
/// Handles streaming read completion.
/// </summary>
- protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
+ protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
- var payload = ctx.GetReceivedMessage();
-
AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
lock (myLock)
{
origCompletionDelegate = readCompletionDelegate;
- if (payload != null)
+ readCompletionDelegate = null;
+
+ if (receivedMessage == null)
{
- readCompletionDelegate = null;
- }
- else
- {
- // This was the last read. Keeping the readCompletionDelegate
- // to be either fired by this handler or by client-side finished
- // handler.
+ // This was the last read.
readingDone = true;
}
@@ -354,17 +336,17 @@
// TODO: handle the case when error occured...
- if (payload != null)
+ if (receivedMessage != null)
{
// TODO: handle deserialization error
TRead msg;
- TryDeserialize(payload, out msg);
+ TryDeserialize(receivedMessage, out msg);
FireCompletion(origCompletionDelegate, msg, null);
}
else
{
- ProcessLastRead(origCompletionDelegate);
+ FireCompletion(origCompletionDelegate, default(TRead), null);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 6278c01..5c47251 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -49,12 +49,10 @@
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
- readonly GrpcEnvironment environment;
readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
{
- this.environment = Preconditions.CheckNotNull(environment);
this.server = Preconditions.CheckNotNull(server);
}
@@ -185,10 +183,8 @@
/// <summary>
/// Handles the server side close completion.
/// </summary>
- private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
+ private void HandleFinishedServerside(bool success, bool cancelled)
{
- bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
-
lock (myLock)
{
finished = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 3cb01e2..0f18752 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -40,7 +40,7 @@
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
- internal class CallSafeHandle : SafeHandleZeroIsInvalid
+ internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall
{
public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
@@ -87,6 +87,10 @@
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
@@ -109,10 +113,10 @@
this.completionRegistry = completionRegistry;
}
- public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -123,66 +127,73 @@
.CheckOk();
}
- public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
- public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+ public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
- public void StartSendCloseFromClient(BatchCompletionDelegate callback)
+ public void StartSendCloseFromClient(SendCompletionHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
- public void StartReceiveMessage(BatchCompletionDelegate callback)
+ public void StartReceiveMessage(ReceivedMessageHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
- public void StartServerSide(BatchCompletionDelegate callback)
+ public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+ }
+
+ public void StartServerSide(ReceivedCloseOnServerHandler callback)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
- public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index 6c44521..b4a7335 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -72,7 +72,13 @@
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task;
this.current = result;
- return result != null;
+
+ if (result == null)
+ {
+ await call.StreamingCallFinishedTask;
+ return false;
+ }
+ return true;
}
public void Dispose()
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
new file mode 100644
index 0000000..cbef599
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -0,0 +1,85 @@
+#region Copyright notice and license
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core.Internal
+{
+ internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
+
+ // Received status for streaming response calls.
+ internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus);
+
+ internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage);
+
+ internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders);
+
+ internal delegate void SendCompletionHandler(bool success);
+
+ internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled);
+
+ /// <summary>
+ /// Abstraction of a native call object.
+ /// </summary>
+ internal interface INativeCall : IDisposable
+ {
+ void Cancel();
+
+ void CancelWithStatus(Grpc.Core.Status status);
+
+ string GetPeer();
+
+ void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+ void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+ void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray);
+
+ void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+ void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray);
+
+ void StartReceiveMessage(ReceivedMessageHandler callback);
+
+ void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
+
+ void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
+
+ void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+
+ void StartSendCloseFromClient(SendCompletionHandler callback);
+
+ void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+
+ void StartServerSide(ReceivedCloseOnServerHandler callback);
+ }
+}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index fc9470f..489e219 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -595,7 +595,7 @@
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
- grpc_op ops[5];
+ grpc_op ops[4];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -615,23 +615,18 @@
ops[2].flags = 0;
ops[2].reserved = NULL;
- ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
- ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
- ops[3].flags = 0;
- ops[3].reserved = NULL;
-
- ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- ops[4].data.recv_status_on_client.trailing_metadata =
+ ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[3].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
- ops[4].data.recv_status_on_client.status =
+ ops[3].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
- ops[4].data.recv_status_on_client.status_details =
+ ops[3].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
- ops[4].data.recv_status_on_client.status_details_capacity =
+ ops[3].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
- ops[4].flags = 0;
- ops[4].reserved = NULL;
+ ops[3].flags = 0;
+ ops[3].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@@ -642,7 +637,7 @@
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[3];
+ grpc_op ops[2];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -652,28 +647,36 @@
ops[0].flags = 0;
ops[0].reserved = NULL;
- ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
- ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+ ops[1].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[1].data.recv_status_on_client.trailing_metadata =
+ &(ctx->recv_status_on_client.trailing_metadata);
+ ops[1].data.recv_status_on_client.status =
+ &(ctx->recv_status_on_client.status);
+ /* not using preallocation for status_details */
+ ops[1].data.recv_status_on_client.status_details =
+ &(ctx->recv_status_on_client.status_details);
+ ops[1].data.recv_status_on_client.status_details_capacity =
+ &(ctx->recv_status_on_client.status_details_capacity);
ops[1].flags = 0;
ops[1].reserved = NULL;
- ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- ops[2].data.recv_status_on_client.trailing_metadata =
- &(ctx->recv_status_on_client.trailing_metadata);
- ops[2].data.recv_status_on_client.status =
- &(ctx->recv_status_on_client.status);
- /* not using preallocation for status_details */
- ops[2].data.recv_status_on_client.status_details =
- &(ctx->recv_status_on_client.status_details);
- ops[2].data.recv_status_on_client.status_details_capacity =
- &(ctx->recv_status_on_client.status_details_capacity);
- ops[2].flags = 0;
- ops[2].reserved = NULL;
-
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
}
+GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata(
+ grpc_call *call, grpcsharp_batch_context *ctx) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_RECV_INITIAL_METADATA;
+ ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+ ops[0].flags = 0;
+ ops[0].reserved = NULL;
+
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
+}
+
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py
new file mode 100644
index 0000000..d3be3a4
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_constants.py
@@ -0,0 +1,59 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private constants for the package."""
+
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = {
+ base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE,
+ base.Subscription.Kind.TERMINATION_ONLY:
+ links.Ticket.Subscription.TERMINATION,
+ base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL,
+ }
+
+# Mapping from abortive operation outcome to ticket termination to be
+# sent to the other side of the operation, or None to indicate that no
+# ticket should be sent to the other side in the event of such an
+# outcome.
+ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
+ base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION,
+ base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION,
+ base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
+ base.Outcome.REMOTE_SHUTDOWN: None,
+ base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE,
+ base.Outcome.TRANSMISSION_FAILURE: None,
+ base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
+ base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
+}
+
+INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
+TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = (
+ 'Exception calling termination callback!')
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
new file mode 100644
index 0000000..24a12b6
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_context.py
@@ -0,0 +1,92 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation context."""
+
+import time
+
+# _interfaces is referenced from specification in this module.
+from grpc.framework.core import _interfaces # pylint: disable=unused-import
+from grpc.framework.interfaces.base import base
+
+
+class OperationContext(base.OperationContext):
+ """An implementation of interfaces.OperationContext."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+
+ def _abort(self, outcome):
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+ def outcome(self):
+ """See base.OperationContext.outcome for specification."""
+ with self._lock:
+ return self._termination_manager.outcome
+
+ def add_termination_callback(self, callback):
+ """See base.OperationContext.add_termination_callback."""
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.add_callback(callback)
+ return None
+ else:
+ return self._termination_manager.outcome
+
+ def time_remaining(self):
+ """See base.OperationContext.time_remaining for specification."""
+ with self._lock:
+ deadline = self._expiration_manager.deadline()
+ return max(0.0, deadline - time.time())
+
+ def cancel(self):
+ """See base.OperationContext.cancel for specification."""
+ self._abort(base.Outcome.CANCELLED)
+
+ def fail(self, exception):
+ """See base.OperationContext.fail for specification."""
+ self._abort(base.Outcome.LOCAL_FAILURE)
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
new file mode 100644
index 0000000..7c702ab
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_emission.py
@@ -0,0 +1,97 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for handling emitted values."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+
+
+class EmissionManager(_interfaces.EmissionManager):
+ """An EmissionManager implementation."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+ self._ingestion_manager = None
+
+ self._initial_metadata_seen = False
+ self._payload_seen = False
+ self._completion_seen = False
+
+ def set_ingestion_manager(self, ingestion_manager):
+ """Sets the ingestion manager with which this manager will cooperate.
+
+ Args:
+ ingestion_manager: The _interfaces.IngestionManager for the operation.
+ """
+ self._ingestion_manager = ingestion_manager
+
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ initial_metadata_present = initial_metadata is not None
+ payload_present = payload is not None
+ completion_present = completion is not None
+ allowance_present = allowance is not None
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ if (initial_metadata_present and (
+ self._initial_metadata_seen or self._payload_seen or
+ self._completion_seen) or
+ payload_present and self._completion_seen or
+ completion_present and self._completion_seen or
+ allowance_present and allowance <= 0):
+ self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
+ self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE)
+ self._expiration_manager.terminate()
+ else:
+ self._initial_metadata_seen |= initial_metadata_present
+ self._payload_seen |= payload_present
+ self._completion_seen |= completion_present
+ if completion_present:
+ self._termination_manager.emission_complete()
+ self._ingestion_manager.local_emissions_done()
+ self._transmission_manager.advance(
+ initial_metadata, payload, completion, allowance)
+ if allowance_present:
+ self._ingestion_manager.add_local_allowance(allowance)
diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py
new file mode 100644
index 0000000..fb2c532
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_end.py
@@ -0,0 +1,251 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of base.End."""
+
+import abc
+import enum
+import threading
+import uuid
+
+from grpc.framework.core import _operation
+from grpc.framework.core import _utilities
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import later
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
+
+
+class End(base.End, links.Link):
+ """A bridge between base.End and links.Link.
+
+ Implementations of this interface translate arriving tickets into
+ calls on application objects implementing base interfaces and
+ translate calls from application objects implementing base interfaces
+ into tickets sent to a joined link.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _Cycle(object):
+ """State for a single start-stop End lifecycle."""
+
+ def __init__(self, pool):
+ self.pool = pool
+ self.grace = False
+ self.futures = []
+ self.operations = {}
+ self.idle_actions = []
+
+
+def _abort(operations):
+ for operation in operations:
+ operation.abort(base.Outcome.LOCAL_SHUTDOWN)
+
+
+def _cancel_futures(futures):
+ for future in futures:
+ futures.cancel()
+
+
+def _future_shutdown(lock, cycle, event):
+ def in_future():
+ with lock:
+ _abort(cycle.operations.values())
+ _cancel_futures(cycle.futures)
+ pool = cycle.pool
+ cycle.pool.shutdown(wait=True)
+ return in_future
+
+
+def _termination_action(lock, stats, operation_id, cycle):
+ """Constructs the termination action for a single operation.
+
+ Args:
+ lock: A lock to hold during the termination action.
+ states: A mapping from base.Outcome values to integers to increment with
+ the outcome given to the termination action.
+ operation_id: The operation ID for the termination action.
+ cycle: A _Cycle value to be updated during the termination action.
+
+ Returns:
+ A callable that takes an operation outcome as its sole parameter and that
+ should be used as the termination action for the operation associated
+ with the given operation ID.
+ """
+ def termination_action(outcome):
+ with lock:
+ stats[outcome] += 1
+ cycle.operations.pop(operation_id, None)
+ if not cycle.operations:
+ for action in cycle.idle_actions:
+ cycle.pool.submit(action)
+ cycle.idle_actions = []
+ if cycle.grace:
+ _cancel_futures(cycle.futures)
+ return termination_action
+
+
+class _End(End):
+ """An End implementation."""
+
+ def __init__(self, servicer_package):
+ """Constructor.
+
+ Args:
+ servicer_package: A _ServicerPackage for servicing operations or None if
+ this end will not be used to service operations.
+ """
+ self._lock = threading.Condition()
+ self._servicer_package = servicer_package
+
+ self._stats = {outcome: 0 for outcome in base.Outcome}
+
+ self._mate = None
+
+ self._cycle = None
+
+ def start(self):
+ """See base.End.start for specification."""
+ with self._lock:
+ if self._cycle is not None:
+ raise ValueError('Tried to start a not-stopped End!')
+ else:
+ self._cycle = _Cycle(logging_pool.pool(1))
+
+ def stop(self, grace):
+ """See base.End.stop for specification."""
+ with self._lock:
+ if self._cycle is None:
+ event = threading.Event()
+ event.set()
+ return event
+ elif not self._cycle.operations:
+ event = threading.Event()
+ self._cycle.pool.submit(event.set)
+ self._cycle.pool.shutdown(wait=False)
+ self._cycle = None
+ return event
+ else:
+ self._cycle.grace = True
+ event = threading.Event()
+ self._cycle.idle_actions.append(event.set)
+ if 0 < grace:
+ future = later.later(
+ grace, _future_shutdown(self._lock, self._cycle, event))
+ self._cycle.futures.append(future)
+ else:
+ _abort(self._cycle.operations.values())
+ return event
+
+ def operate(
+ self, group, method, subscription, timeout, initial_metadata=None,
+ payload=None, completion=None):
+ """See base.End.operate for specification."""
+ operation_id = uuid.uuid4()
+ with self._lock:
+ if self._cycle is None or self._cycle.grace:
+ raise ValueError('Can\'t operate on stopped or stopping End!')
+ termination_action = _termination_action(
+ self._lock, self._stats, operation_id, self._cycle)
+ operation = _operation.invocation_operate(
+ operation_id, group, method, subscription, timeout, initial_metadata,
+ payload, completion, self._mate.accept_ticket, termination_action,
+ self._cycle.pool)
+ self._cycle.operations[operation_id] = operation
+ return operation.context, operation.operator
+
+ def operation_stats(self):
+ """See base.End.operation_stats for specification."""
+ with self._lock:
+ return dict(self._stats)
+
+ def add_idle_action(self, action):
+ """See base.End.add_idle_action for specification."""
+ with self._lock:
+ if self._cycle is None:
+ raise ValueError('Can\'t add idle action to stopped End!')
+ action_with_exceptions_logged = callable_util.with_exceptions_logged(
+ action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)
+ if self._cycle.operations:
+ self._cycle.idle_actions.append(action_with_exceptions_logged)
+ else:
+ self._cycle.pool.submit(action_with_exceptions_logged)
+
+ def accept_ticket(self, ticket):
+ """See links.Link.accept_ticket for specification."""
+ with self._lock:
+ if self._cycle is not None and not self._cycle.grace:
+ operation = self._cycle.operations.get(ticket.operation_id)
+ if operation is not None:
+ operation.handle_ticket(ticket)
+ elif self._servicer_package is not None:
+ termination_action = _termination_action(
+ self._lock, self._stats, ticket.operation_id, self._cycle)
+ operation = _operation.service_operate(
+ self._servicer_package, ticket, self._mate.accept_ticket,
+ termination_action, self._cycle.pool)
+ if operation is not None:
+ self._cycle.operations[ticket.operation_id] = operation
+
+ def join_link(self, link):
+ """See links.Link.join_link for specification."""
+ with self._lock:
+ self._mate = utilities.NULL_LINK if link is None else link
+
+
+def serviceless_end_link():
+ """Constructs an End usable only for invoking operations.
+
+ Returns:
+ An End usable for translating operations into ticket exchange.
+ """
+ return _End(None)
+
+
+def serviceful_end_link(servicer, default_timeout, maximum_timeout):
+ """Constructs an End capable of servicing operations.
+
+ Args:
+ servicer: An interfaces.Servicer for servicing operations.
+ default_timeout: A length of time in seconds to be used as the default
+ time alloted for a single operation.
+ maximum_timeout: A length of time in seconds to be used as the maximum
+ time alloted for a single operation.
+
+ Returns:
+ An End capable of servicing the operations requested of it through ticket
+ exchange.
+ """
+ return _End(
+ _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout))
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
new file mode 100644
index 0000000..d94bdf2
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_expiration.py
@@ -0,0 +1,152 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation expiration."""
+
+import time
+
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import later
+from grpc.framework.interfaces.base import base
+
+
+class _ExpirationManager(_interfaces.ExpirationManager):
+ """An implementation of _interfaces.ExpirationManager."""
+
+ def __init__(
+ self, commencement, timeout, maximum_timeout, lock, termination_manager,
+ transmission_manager):
+ """Constructor.
+
+ Args:
+ commencement: The time in seconds since the epoch at which the operation
+ began.
+ timeout: A length of time in seconds to allow for the operation to run.
+ maximum_timeout: The maximum length of time in seconds to allow for the
+ operation to run despite what is requested via this object's
+ change_timout method.
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._commencement = commencement
+ self._maximum_timeout = maximum_timeout
+
+ self._timeout = timeout
+ self._deadline = commencement + timeout
+ self._index = None
+ self._future = None
+
+ def _expire(self, index):
+ def expire():
+ with self._lock:
+ if self._future is not None and index == self._index:
+ self._future = None
+ self._termination_manager.expire()
+ self._transmission_manager.abort(base.Outcome.EXPIRED)
+ return expire
+
+ def start(self):
+ self._index = 0
+ self._future = later.later(self._timeout, self._expire(0))
+
+ def change_timeout(self, timeout):
+ if self._future is not None and timeout != self._timeout:
+ self._future.cancel()
+ new_timeout = min(timeout, self._maximum_timeout)
+ new_index = self._index + 1
+ self._timeout = new_timeout
+ self._deadline = self._commencement + new_timeout
+ self._index = new_index
+ delay = self._deadline - time.time()
+ self._future = later.later(delay, self._expire(new_index))
+ if new_timeout != timeout:
+ self._transmission_manager.timeout(new_timeout)
+
+ def deadline(self):
+ return self._deadline
+
+ def terminate(self):
+ if self._future:
+ self._future.cancel()
+ self._future = None
+ self._deadline_index = None
+
+
+def invocation_expiration_manager(
+ timeout, lock, termination_manager, transmission_manager):
+ """Creates an _interfaces.ExpirationManager appropriate for front-side use.
+
+ Args:
+ timeout: A length of time in seconds to allow for the operation to run.
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+
+ Returns:
+ An _interfaces.ExpirationManager appropriate for invocation-side use.
+ """
+ expiration_manager = _ExpirationManager(
+ time.time(), timeout, timeout, lock, termination_manager,
+ transmission_manager)
+ expiration_manager.start()
+ return expiration_manager
+
+
+def service_expiration_manager(
+ timeout, default_timeout, maximum_timeout, lock, termination_manager,
+ transmission_manager):
+ """Creates an _interfaces.ExpirationManager appropriate for back-side use.
+
+ Args:
+ timeout: A length of time in seconds to allow for the operation to run. May
+ be None in which case default_timeout will be used.
+ default_timeout: The default length of time in seconds to allow for the
+ operation to run if the front-side customer has not specified such a value
+ (or if the value they specified is not yet known).
+ maximum_timeout: The maximum length of time in seconds to allow for the
+ operation to run.
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+
+ Returns:
+ An _interfaces.ExpirationManager appropriate for service-side use.
+ """
+ expiration_manager = _ExpirationManager(
+ time.time(), default_timeout if timeout is None else timeout,
+ maximum_timeout, lock, termination_manager, transmission_manager)
+ expiration_manager.start()
+ return expiration_manager
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
new file mode 100644
index 0000000..59f7f8a
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -0,0 +1,410 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ingestion during an operation."""
+
+import abc
+import collections
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
+_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
+
+
+class _SubscriptionCreation(collections.namedtuple(
+ '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))):
+ """A sum type for the outcome of ingestion initialization.
+
+ Either subscription will be non-None, remote_error will be True, or abandoned
+ will be True.
+
+ Attributes:
+ subscription: A base.Subscription describing the customer's interest in
+ operation values from the other side.
+ remote_error: A boolean indicating that the subscription could not be
+ created due to an error on the remote side of the operation.
+ abandoned: A boolean indicating that subscription creation was abandoned.
+ """
+
+
+class _SubscriptionCreator(object):
+ """Common specification of subscription-creating behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def create(self, group, method):
+ """Creates the base.Subscription of the local customer.
+
+ Any exceptions raised by this method should be attributed to and treated as
+ defects in the customer code called by this method.
+
+ Args:
+ group: The group identifier of the operation.
+ method: The method identifier of the operation.
+
+ Returns:
+ A _SubscriptionCreation describing the result of subscription creation.
+ """
+ raise NotImplementedError()
+
+
+class _ServiceSubscriptionCreator(_SubscriptionCreator):
+ """A _SubscriptionCreator appropriate for service-side use."""
+
+ def __init__(self, servicer, operation_context, output_operator):
+ """Constructor.
+
+ Args:
+ servicer: The base.Servicer that will service the operation.
+ operation_context: A base.OperationContext for the operation to be passed
+ to the customer.
+ output_operator: A base.Operator for the operation to be passed to the
+ customer and to be called by the customer to accept operation data
+ emitted by the customer.
+ """
+ self._servicer = servicer
+ self._operation_context = operation_context
+ self._output_operator = output_operator
+
+ def create(self, group, method):
+ try:
+ subscription = self._servicer.service(
+ group, method, self._operation_context, self._output_operator)
+ except base.NoSuchMethodError:
+ return _SubscriptionCreation(None, True, False)
+ except abandonment.Abandoned:
+ return _SubscriptionCreation(None, False, True)
+ else:
+ return _SubscriptionCreation(subscription, False, False)
+
+
+def _wrap(behavior):
+ def wrapped(*args, **kwargs):
+ try:
+ behavior(*args, **kwargs)
+ except abandonment.Abandoned:
+ return False
+ else:
+ return True
+ return wrapped
+
+
+class _IngestionManager(_interfaces.IngestionManager):
+ """An implementation of _interfaces.IngestionManager."""
+
+ def __init__(
+ self, lock, pool, subscription, subscription_creator, termination_manager,
+ transmission_manager, expiration_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ subscription: A base.Subscription describing the customer's interest in
+ operation values from the other side. May be None if
+ subscription_creator is not None.
+ subscription_creator: A _SubscriptionCreator wrapping the portion of
+ customer code that when called returns the base.Subscription describing
+ the customer's interest in operation values from the other side. May be
+ None if subscription is not None.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+
+ if subscription is None:
+ self._subscription_creator = subscription_creator
+ self._wrapped_operator = None
+ elif subscription.kind is base.Subscription.Kind.FULL:
+ self._subscription_creator = None
+ self._wrapped_operator = _wrap(subscription.operator.advance)
+ else:
+ # TODO(nathaniel): Support other subscriptions.
+ raise ValueError('Unsupported subscription "%s"!' % subscription.kind)
+ self._pending_initial_metadata = None
+ self._pending_payloads = []
+ self._pending_completion = None
+ self._local_allowance = 1
+ # A nonnegative integer or None, with None indicating that the local
+ # customer is done emitting anyway so there's no need to bother it by
+ # informing it that the remote customer has granted it further permission to
+ # emit.
+ self._remote_allowance = 0
+ self._processing = False
+
+ def _abort_internal_only(self):
+ self._subscription_creator = None
+ self._wrapped_operator = None
+ self._pending_initial_metadata = None
+ self._pending_payloads = None
+ self._pending_completion = None
+
+ def _abort_and_notify(self, outcome):
+ self._abort_internal_only()
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+ def _operator_next(self):
+ """Computes the next step for full-subscription ingestion.
+
+ Returns:
+ An initial_metadata, payload, completion, allowance, continue quintet
+ indicating what operation values (if any) are available to pass into
+ customer code and whether or not there is anything immediately
+ actionable to call customer code to do.
+ """
+ if self._wrapped_operator is None:
+ return None, None, None, None, False
+ else:
+ initial_metadata, payload, completion, allowance, action = [None] * 5
+ if self._pending_initial_metadata is not None:
+ initial_metadata = self._pending_initial_metadata
+ self._pending_initial_metadata = None
+ action = True
+ if self._pending_payloads and 0 < self._local_allowance:
+ payload = self._pending_payloads.pop(0)
+ self._local_allowance -= 1
+ action = True
+ if not self._pending_payloads and self._pending_completion is not None:
+ completion = self._pending_completion
+ self._pending_completion = None
+ action = True
+ if self._remote_allowance is not None and 0 < self._remote_allowance:
+ allowance = self._remote_allowance
+ self._remote_allowance = 0
+ action = True
+ return initial_metadata, payload, completion, allowance, bool(action)
+
+ def _operator_process(
+ self, wrapped_operator, initial_metadata, payload,
+ completion, allowance):
+ while True:
+ advance_outcome = callable_util.call_logging_exceptions(
+ wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE,
+ initial_metadata=initial_metadata, payload=payload,
+ completion=completion, allowance=allowance)
+ if advance_outcome.exception is None:
+ if advance_outcome.return_value:
+ with self._lock:
+ if self._termination_manager.outcome is not None:
+ return
+ if completion is not None:
+ self._termination_manager.ingestion_complete()
+ initial_metadata, payload, completion, allowance, moar = (
+ self._operator_next())
+ if not moar:
+ self._processing = False
+ return
+ else:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ return
+ else:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ return
+
+ def _operator_post_create(self, subscription):
+ wrapped_operator = _wrap(subscription.operator.advance)
+ with self._lock:
+ if self._termination_manager.outcome is not None:
+ return
+ self._wrapped_operator = wrapped_operator
+ self._subscription_creator = None
+ metadata, payload, completion, allowance, moar = self._operator_next()
+ if not moar:
+ self._processing = False
+ return
+ self._operator_process(
+ wrapped_operator, metadata, payload, completion, allowance)
+
+ def _create(self, subscription_creator, group, name):
+ outcome = callable_util.call_logging_exceptions(
+ subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE,
+ group, name)
+ if outcome.return_value is None:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ elif outcome.return_value.abandoned:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ elif outcome.return_value.remote_error:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.REMOTE_FAILURE)
+ elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
+ self._operator_post_create(outcome.return_value.subscription)
+ else:
+ # TODO(nathaniel): Support other subscriptions.
+ raise ValueError(
+ 'Unsupported "%s"!' % outcome.return_value.subscription.kind)
+
+ def _store_advance(self, initial_metadata, payload, completion, allowance):
+ if initial_metadata is not None:
+ self._pending_initial_metadata = initial_metadata
+ if payload is not None:
+ self._pending_payloads.append(payload)
+ if completion is not None:
+ self._pending_completion = completion
+ if allowance is not None and self._remote_allowance is not None:
+ self._remote_allowance += allowance
+
+ def _operator_advance(self, initial_metadata, payload, completion, allowance):
+ if self._processing:
+ self._store_advance(initial_metadata, payload, completion, allowance)
+ else:
+ action = False
+ if initial_metadata is not None:
+ action = True
+ if payload is not None:
+ if 0 < self._local_allowance:
+ self._local_allowance -= 1
+ action = True
+ else:
+ self._pending_payloads.append(payload)
+ payload = False
+ if completion is not None:
+ if self._pending_payloads:
+ self._pending_completion = completion
+ else:
+ action = True
+ if allowance is not None and self._remote_allowance is not None:
+ allowance += self._remote_allowance
+ self._remote_allowance = 0
+ action = True
+ if action:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._wrapped_operator, initial_metadata, payload, completion,
+ allowance)
+
+ def set_group_and_method(self, group, method):
+ """See _interfaces.IngestionManager.set_group_and_method for spec."""
+ if self._subscription_creator is not None and not self._processing:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._subscription_creator, group, method)
+ self._processing = True
+
+ def add_local_allowance(self, allowance):
+ """See _interfaces.IngestionManager.add_local_allowance for spec."""
+ if any((self._subscription_creator, self._wrapped_operator,)):
+ self._local_allowance += allowance
+ if not self._processing:
+ initial_metadata, payload, completion, allowance, moar = (
+ self._operator_next())
+ if moar:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._operator_process,
+ _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ initial_metadata, payload, completion, allowance)
+
+ def local_emissions_done(self):
+ self._remote_allowance = None
+
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """See _interfaces.IngestionManager.advance for specification."""
+ if self._subscription_creator is not None:
+ self._store_advance(initial_metadata, payload, completion, allowance)
+ elif self._wrapped_operator is not None:
+ self._operator_advance(initial_metadata, payload, completion, allowance)
+
+
+def invocation_ingestion_manager(
+ subscription, lock, pool, termination_manager, transmission_manager,
+ expiration_manager):
+ """Creates an IngestionManager appropriate for invocation-side use.
+
+ Args:
+ subscription: A base.Subscription indicating the customer's interest in the
+ data and results from the service-side of the operation.
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+ Returns:
+ An IngestionManager appropriate for invocation-side use.
+ """
+ return _IngestionManager(
+ lock, pool, subscription, None, termination_manager, transmission_manager,
+ expiration_manager)
+
+
+def service_ingestion_manager(
+ servicer, operation_context, output_operator, lock, pool,
+ termination_manager, transmission_manager, expiration_manager):
+ """Creates an IngestionManager appropriate for service-side use.
+
+ The returned IngestionManager will require its set_group_and_name method to be
+ called before its advance method may be called.
+
+ Args:
+ servicer: A base.Servicer for servicing the operation.
+ operation_context: A base.OperationContext for the operation to be passed to
+ the customer.
+ output_operator: A base.Operator for the operation to be passed to the
+ customer and to be called by the customer to accept operation data output
+ by the customer.
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+ Returns:
+ An IngestionManager appropriate for service-side use.
+ """
+ subscription_creator = _ServiceSubscriptionCreator(
+ servicer, operation_context, output_operator)
+ return _IngestionManager(
+ lock, pool, None, subscription_creator, termination_manager,
+ transmission_manager, expiration_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
new file mode 100644
index 0000000..a626b9f
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_interfaces.py
@@ -0,0 +1,308 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal interfaces."""
+
+import abc
+
+from grpc.framework.interfaces.base import base
+
+
+class TerminationManager(object):
+ """An object responsible for handling the termination of an operation.
+
+ Attributes:
+ outcome: None if the operation is active or a base.Outcome value if it has
+ terminated.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def add_callback(self, callback):
+ """Registers a callback to be called on operation termination.
+
+ If the operation has already terminated the callback will not be called.
+
+ Args:
+ callback: A callable that will be passed an interfaces.Outcome value.
+
+ Returns:
+ None if the operation has not yet terminated and the passed callback will
+ be called when it does, or a base.Outcome value describing the operation
+ termination if the operation has terminated and the callback will not be
+ called as a result of this method call.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def emission_complete(self):
+ """Indicates that emissions from customer code have completed."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def transmission_complete(self):
+ """Indicates that transmissions to the remote end are complete.
+
+ Returns:
+ True if the operation has terminated or False if the operation remains
+ ongoing.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def reception_complete(self):
+ """Indicates that reception from the other side is complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def ingestion_complete(self):
+ """Indicates that customer code ingestion of received values is complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def expire(self):
+ """Indicates that the operation must abort because it has taken too long."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, outcome):
+ """Indicates that the operation must abort for the indicated reason.
+
+ Args:
+ outcome: An interfaces.Outcome indicating operation abortion.
+ """
+ raise NotImplementedError()
+
+
+class TransmissionManager(object):
+ """A manager responsible for transmitting to the other end of an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def kick_off(
+ self, group, method, timeout, initial_metadata, payload, completion,
+ allowance):
+ """Transmits the values associated with operation invocation."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """Accepts values for transmission to the other end of the operation.
+
+ Args:
+ initial_metadata: An initial metadata value to be transmitted to the other
+ side of the operation. May only ever be non-None once.
+ payload: A payload value.
+ completion: A base.Completion value. May only ever be non-None in the last
+ transmission to be made to the other side.
+ allowance: A positive integer communicating the number of additional
+ payloads allowed to be transmitted from the other side to this side of
+ the operation, or None if no additional allowance is being granted in
+ this call.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def timeout(self, timeout):
+ """Accepts for transmission to the other side a new timeout value.
+
+ Args:
+ timeout: A positive float used as the new timeout value for the operation
+ to be transmitted to the other side.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def allowance(self, allowance):
+ """Indicates to this manager that the remote customer is allowing payloads.
+
+ Args:
+ allowance: A positive integer indicating the number of additional payloads
+ the remote customer is allowing to be transmitted from this side of the
+ operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def remote_complete(self):
+ """Indicates to this manager that data from the remote side is complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, outcome):
+ """Indicates that the operation has aborted.
+
+ Args:
+ outcome: An interfaces.Outcome for the operation. If None, indicates that
+ the operation abortion should not be communicated to the other side of
+ the operation.
+ """
+ raise NotImplementedError()
+
+
+class ExpirationManager(object):
+ """A manager responsible for aborting the operation if it runs out of time."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def change_timeout(self, timeout):
+ """Changes the timeout allotted for the operation.
+
+ Operation duration is always measure from the beginning of the operation;
+ calling this method changes the operation's allotted time to timeout total
+ seconds, not timeout seconds from the time of this method call.
+
+ Args:
+ timeout: A length of time in seconds to allow for the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deadline(self):
+ """Returns the time until which the operation is allowed to run.
+
+ Returns:
+ The time (seconds since the epoch) at which the operation will expire.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self):
+ """Indicates to this manager that the operation has terminated."""
+ raise NotImplementedError()
+
+
+class EmissionManager(base.Operator):
+ """A manager of values emitted by customer code."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ """Accepts a value emitted by customer code.
+
+ This method should only be called by customer code.
+
+ Args:
+ initial_metadata: An initial metadata value emitted by the local customer
+ to be sent to the other side of the operation.
+ payload: A payload value emitted by the local customer to be sent to the
+ other side of the operation.
+ completion: A Completion value emitted by the local customer to be sent to
+ the other side of the operation.
+ allowance: A positive integer indicating an additional number of payloads
+ that the local customer is willing to accept from the other side of the
+ operation.
+ """
+ raise NotImplementedError()
+
+
+class IngestionManager(object):
+ """A manager responsible for executing customer code.
+
+ This name of this manager comes from its responsibility to pass successive
+ values from the other side of the operation into the code of the local
+ customer.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_group_and_method(self, group, method):
+ """Communicates to this IngestionManager the operation group and method.
+
+ Args:
+ group: The group identifier of the operation.
+ method: The method identifier of the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_local_allowance(self, allowance):
+ """Communicates to this IngestionManager that more payloads may be ingested.
+
+ Args:
+ allowance: A positive integer indicating an additional number of payloads
+ that the local customer is willing to ingest.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def local_emissions_done(self):
+ """Indicates to this manager that local emissions are done."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """Advances the operation by passing values to the local customer."""
+ raise NotImplementedError()
+
+
+class ReceptionManager(object):
+ """A manager responsible for receiving tickets from the other end."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def receive_ticket(self, ticket):
+ """Handle a ticket from the other side of the operation.
+
+ Args:
+ ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
+ appropriate to this end of the operation and this object.
+ """
+ raise NotImplementedError()
+
+
+class Operation(object):
+ """An ongoing operation.
+
+ Attributes:
+ context: A base.OperationContext object for the operation.
+ operator: A base.Operator object for the operation for use by the customer
+ of the operation.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def handle_ticket(self, ticket):
+ """Handle a ticket from the other side of the operation.
+
+ Args:
+ ticket: A links.Ticket from the other side of the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, outcome):
+ """Aborts the operation.
+
+ Args:
+ outcome: A base.Outcome value indicating operation abortion.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
new file mode 100644
index 0000000..d20e40a
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_operation.py
@@ -0,0 +1,192 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of operations."""
+
+import threading
+
+# _utilities is referenced from specification in this module.
+from grpc.framework.core import _context
+from grpc.framework.core import _emission
+from grpc.framework.core import _expiration
+from grpc.framework.core import _ingestion
+from grpc.framework.core import _interfaces
+from grpc.framework.core import _reception
+from grpc.framework.core import _termination
+from grpc.framework.core import _transmission
+from grpc.framework.core import _utilities # pylint: disable=unused-import
+
+
+class _EasyOperation(_interfaces.Operation):
+ """A trivial implementation of interfaces.Operation."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager, expiration_manager,
+ context, operator, reception_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ context: A base.OperationContext for use by the customer during the
+ operation.
+ operator: A base.Operator for use by the customer during the operation.
+ reception_manager: The _interfaces.ReceptionManager for the operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+ self._reception_manager = reception_manager
+
+ self.context = context
+ self.operator = operator
+
+ def handle_ticket(self, ticket):
+ with self._lock:
+ self._reception_manager.receive_ticket(ticket)
+
+ def abort(self, outcome):
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+
+def invocation_operate(
+ operation_id, group, method, subscription, timeout, initial_metadata,
+ payload, completion, ticket_sink, termination_action, pool):
+ """Constructs objects necessary for front-side operation management.
+
+ Args:
+ operation_id: An object identifying the operation.
+ group: The group identifier of the operation.
+ method: The method identifier of the operation.
+ subscription: A base.Subscription describing the customer's interest in the
+ results of the operation.
+ timeout: A length of time in seconds to allow for the operation.
+ initial_metadata: An initial metadata value to be sent to the other side of
+ the operation. May be None if the initial metadata will be passed later or
+ if there will be no initial metadata passed at all.
+ payload: The first payload value to be transmitted to the other side. May be
+ None if there is no such value or if the customer chose not to pass it at
+ operation invocation.
+ completion: A base.Completion value indicating the end of values passed to
+ the other side of the operation.
+ ticket_sink: A callable that accepts links.Tickets and delivers them to the
+ other side of the operation.
+ termination_action: A callable that accepts the outcome of the operation as
+ a base.Outcome value to be called on operation completion.
+ pool: A thread pool with which to do the work of the operation.
+
+ Returns:
+ An _interfaces.Operation for the operation.
+ """
+ lock = threading.Lock()
+ with lock:
+ termination_manager = _termination.invocation_termination_manager(
+ termination_action, pool)
+ transmission_manager = _transmission.TransmissionManager(
+ operation_id, ticket_sink, lock, pool, termination_manager)
+ expiration_manager = _expiration.invocation_expiration_manager(
+ timeout, lock, termination_manager, transmission_manager)
+ operation_context = _context.OperationContext(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ emission_manager = _emission.EmissionManager(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ ingestion_manager = _ingestion.invocation_ingestion_manager(
+ subscription, lock, pool, termination_manager, transmission_manager,
+ expiration_manager)
+ reception_manager = _reception.ReceptionManager(
+ termination_manager, transmission_manager, expiration_manager,
+ ingestion_manager)
+
+ termination_manager.set_expiration_manager(expiration_manager)
+ transmission_manager.set_expiration_manager(expiration_manager)
+ emission_manager.set_ingestion_manager(ingestion_manager)
+
+ transmission_manager.kick_off(
+ group, method, timeout, initial_metadata, payload, completion, None)
+
+ return _EasyOperation(
+ lock, termination_manager, transmission_manager, expiration_manager,
+ operation_context, emission_manager, reception_manager)
+
+
+def service_operate(
+ servicer_package, ticket, ticket_sink, termination_action, pool):
+ """Constructs an Operation for service of an operation.
+
+ Args:
+ servicer_package: A _utilities.ServicerPackage to be used servicing the
+ operation.
+ ticket: The first links.Ticket received for the operation.
+ ticket_sink: A callable that accepts links.Tickets and delivers them to the
+ other side of the operation.
+ termination_action: A callable that accepts the outcome of the operation as
+ a base.Outcome value to be called on operation completion.
+ pool: A thread pool with which to do the work of the operation.
+
+ Returns:
+ An _interfaces.Operation for the operation.
+ """
+ lock = threading.Lock()
+ with lock:
+ termination_manager = _termination.service_termination_manager(
+ termination_action, pool)
+ transmission_manager = _transmission.TransmissionManager(
+ ticket.operation_id, ticket_sink, lock, pool, termination_manager)
+ expiration_manager = _expiration.service_expiration_manager(
+ ticket.timeout, servicer_package.default_timeout,
+ servicer_package.maximum_timeout, lock, termination_manager,
+ transmission_manager)
+ operation_context = _context.OperationContext(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ emission_manager = _emission.EmissionManager(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ ingestion_manager = _ingestion.service_ingestion_manager(
+ servicer_package.servicer, operation_context, emission_manager, lock,
+ pool, termination_manager, transmission_manager, expiration_manager)
+ reception_manager = _reception.ReceptionManager(
+ termination_manager, transmission_manager, expiration_manager,
+ ingestion_manager)
+
+ termination_manager.set_expiration_manager(expiration_manager)
+ transmission_manager.set_expiration_manager(expiration_manager)
+ emission_manager.set_ingestion_manager(ingestion_manager)
+
+ reception_manager.receive_ticket(ticket)
+
+ return _EasyOperation(
+ lock, termination_manager, transmission_manager, expiration_manager,
+ operation_context, emission_manager, reception_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
new file mode 100644
index 0000000..b64faf8
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_reception.py
@@ -0,0 +1,137 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket reception."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.links import links
+
+_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = {
+ links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED,
+ links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED,
+ links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN,
+ links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE,
+ links.Ticket.Termination.TRANSMISSION_FAILURE:
+ base.Outcome.TRANSMISSION_FAILURE,
+ links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE,
+}
+
+
+class ReceptionManager(_interfaces.ReceptionManager):
+ """A ReceptionManager based around a _Receiver passed to it."""
+
+ def __init__(
+ self, termination_manager, transmission_manager, expiration_manager,
+ ingestion_manager):
+ """Constructor.
+
+ Args:
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ """
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+ self._ingestion_manager = ingestion_manager
+
+ self._lowest_unseen_sequence_number = 0
+ self._out_of_sequence_tickets = {}
+ self._aborted = False
+
+ def _abort(self, outcome):
+ self._aborted = True
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+ def _sequence_failure(self, ticket):
+ """Determines a just-arrived ticket's sequential legitimacy.
+
+ Args:
+ ticket: A just-arrived ticket.
+
+ Returns:
+ True if the ticket is sequentially legitimate; False otherwise.
+ """
+ if ticket.sequence_number < self._lowest_unseen_sequence_number:
+ return True
+ elif ticket.sequence_number in self._out_of_sequence_tickets:
+ return True
+ else:
+ return False
+
+ def _process_one(self, ticket):
+ if ticket.sequence_number == 0:
+ self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
+ if ticket.timeout is not None:
+ self._expiration_manager.change_timeout(ticket.timeout)
+ if ticket.termination is None:
+ completion = None
+ else:
+ completion = utilities.completion(
+ ticket.terminal_metadata, ticket.code, ticket.message)
+ self._ingestion_manager.advance(
+ ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
+ if ticket.allowance is not None:
+ self._transmission_manager.allowance(ticket.allowance)
+
+ def _process(self, ticket):
+ """Process those tickets ready to be processed.
+
+ Args:
+ ticket: A just-arrived ticket the sequence number of which matches this
+ _ReceptionManager's _lowest_unseen_sequence_number field.
+ """
+ while True:
+ self._process_one(ticket)
+ next_ticket = self._out_of_sequence_tickets.pop(
+ ticket.sequence_number + 1, None)
+ if next_ticket is None:
+ self._lowest_unseen_sequence_number = ticket.sequence_number + 1
+ return
+ else:
+ ticket = next_ticket
+
+ def receive_ticket(self, ticket):
+ """See _interfaces.ReceptionManager.receive_ticket for specification."""
+ if self._aborted:
+ return
+ elif self._sequence_failure(ticket):
+ self._abort(base.Outcome.RECEPTION_FAILURE)
+ elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
+ outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination]
+ self._abort(outcome)
+ elif ticket.sequence_number == self._lowest_unseen_sequence_number:
+ self._process(ticket)
+ else:
+ self._out_of_sequence_tickets[ticket.sequence_number] = ticket
diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py
new file mode 100644
index 0000000..ad9f612
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_termination.py
@@ -0,0 +1,212 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation termination."""
+
+import abc
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+
+def _invocation_completion_predicate(
+ unused_emission_complete, unused_transmission_complete,
+ unused_reception_complete, ingestion_complete):
+ return ingestion_complete
+
+
+def _service_completion_predicate(
+ unused_emission_complete, transmission_complete, unused_reception_complete,
+ unused_ingestion_complete):
+ return transmission_complete
+
+
+class TerminationManager(_interfaces.TerminationManager):
+ """A _interfaces.TransmissionManager on which another manager may be set."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the expiration manager with which this manager will interact.
+
+ Args:
+ expiration_manager: The _interfaces.ExpirationManager associated with the
+ current operation.
+ """
+ raise NotImplementedError()
+
+
+class _TerminationManager(TerminationManager):
+ """An implementation of TerminationManager."""
+
+ def __init__(self, predicate, action, pool):
+ """Constructor.
+
+ Args:
+ predicate: One of _invocation_completion_predicate or
+ _service_completion_predicate to be used to determine when the operation
+ has completed.
+ action: A behavior to pass the operation outcome on operation termination.
+ pool: A thread pool.
+ """
+ self._predicate = predicate
+ self._action = action
+ self._pool = pool
+ self._expiration_manager = None
+
+ self.outcome = None
+ self._callbacks = []
+
+ self._emission_complete = False
+ self._transmission_complete = False
+ self._reception_complete = False
+ self._ingestion_complete = False
+
+ def set_expiration_manager(self, expiration_manager):
+ self._expiration_manager = expiration_manager
+
+ def _terminate_internal_only(self, outcome):
+ """Terminates the operation.
+
+ Args:
+ outcome: A base.Outcome describing the outcome of the operation.
+ """
+ self.outcome = outcome
+ callbacks = list(self._callbacks)
+ self._callbacks = None
+
+ act = callable_util.with_exceptions_logged(
+ self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
+
+ if outcome is base.Outcome.LOCAL_FAILURE:
+ self._pool.submit(act, outcome)
+ else:
+ def call_callbacks_and_act(callbacks, outcome):
+ for callback in callbacks:
+ callback_outcome = callable_util.call_logging_exceptions(
+ callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
+ outcome)
+ if callback_outcome.exception is not None:
+ outcome = base.Outcome.LOCAL_FAILURE
+ break
+ act(outcome)
+
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ callbacks, outcome)
+
+ def _terminate_and_notify(self, outcome):
+ self._terminate_internal_only(outcome)
+ self._expiration_manager.terminate()
+
+ def _perhaps_complete(self):
+ if self._predicate(
+ self._emission_complete, self._transmission_complete,
+ self._reception_complete, self._ingestion_complete):
+ self._terminate_and_notify(base.Outcome.COMPLETED)
+ return True
+ else:
+ return False
+
+ def is_active(self):
+ """See _interfaces.TerminationManager.is_active for specification."""
+ return self.outcome is None
+
+ def add_callback(self, callback):
+ """See _interfaces.TerminationManager.add_callback for specification."""
+ if self.outcome is None:
+ self._callbacks.append(callback)
+ return None
+ else:
+ return self.outcome
+
+ def emission_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._emission_complete = True
+ self._perhaps_complete()
+
+ def transmission_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._transmission_complete = True
+ return self._perhaps_complete()
+ else:
+ return False
+
+ def reception_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._reception_complete = True
+ self._perhaps_complete()
+
+ def ingestion_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._ingestion_complete = True
+ self._perhaps_complete()
+
+ def expire(self):
+ """See _interfaces.TerminationManager.expire for specification."""
+ self._terminate_internal_only(base.Outcome.EXPIRED)
+
+ def abort(self, outcome):
+ """See _interfaces.TerminationManager.abort for specification."""
+ self._terminate_and_notify(outcome)
+
+
+def invocation_termination_manager(action, pool):
+ """Creates a TerminationManager appropriate for invocation-side use.
+
+ Args:
+ action: An action to call on operation termination.
+ pool: A thread pool in which to execute the passed action and any
+ termination callbacks that are registered during the operation.
+
+ Returns:
+ A TerminationManager appropriate for invocation-side use.
+ """
+ return _TerminationManager(_invocation_completion_predicate, action, pool)
+
+
+def service_termination_manager(action, pool):
+ """Creates a TerminationManager appropriate for service-side use.
+
+ Args:
+ action: An action to call on operation termination.
+ pool: A thread pool in which to execute the passed action and any
+ termination callbacks that are registered during the operation.
+
+ Returns:
+ A TerminationManager appropriate for service-side use.
+ """
+ return _TerminationManager(_service_completion_predicate, action, pool)
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
new file mode 100644
index 0000000..01894d3
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -0,0 +1,294 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket transmission during an operation."""
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+
+def _explode_completion(completion):
+ if completion is None:
+ return None, None, None, None
+ else:
+ return (
+ completion.terminal_metadata, completion.code, completion.message,
+ links.Ticket.Termination.COMPLETION)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+ """An _interfaces.TransmissionManager that sends links.Tickets."""
+
+ def __init__(
+ self, operation_id, ticket_sink, lock, pool, termination_manager):
+ """Constructor.
+
+ Args:
+ operation_id: The operation's ID.
+ ticket_sink: A callable that accepts tickets and sends them to the other
+ side of the operation.
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting tickets will be
+ performed.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._ticket_sink = ticket_sink
+ self._operation_id = operation_id
+ self._termination_manager = termination_manager
+ self._expiration_manager = None
+
+ self._lowest_unused_sequence_number = 0
+ self._remote_allowance = 1
+ self._remote_complete = False
+ self._timeout = None
+ self._local_allowance = 0
+ self._initial_metadata = None
+ self._payloads = []
+ self._completion = None
+ self._aborted = False
+ self._abortion_outcome = None
+ self._transmitting = False
+
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the ExpirationManager with which this manager will cooperate."""
+ self._expiration_manager = expiration_manager
+
+ def _next_ticket(self):
+ """Creates the next ticket to be transmitted.
+
+ Returns:
+ A links.Ticket to be sent to the other side of the operation or None if
+ there is nothing to be sent at this time.
+ """
+ if self._aborted:
+ if self._abortion_outcome is None:
+ return None
+ else:
+ termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+ self._abortion_outcome]
+ if termination is None:
+ return None
+ else:
+ self._abortion_outcome = None
+ return links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None,
+ None, None, None, None, None, None, None, None, None,
+ termination)
+
+ action = False
+ # TODO(nathaniel): Support other subscriptions.
+ local_subscription = links.Ticket.Subscription.FULL
+ timeout = self._timeout
+ if timeout is not None:
+ self._timeout = None
+ action = True
+ if self._local_allowance <= 0:
+ allowance = None
+ else:
+ allowance = self._local_allowance
+ self._local_allowance = 0
+ action = True
+ initial_metadata = self._initial_metadata
+ if initial_metadata is not None:
+ self._initial_metadata = None
+ action = True
+ if not self._payloads or self._remote_allowance <= 0:
+ payload = None
+ else:
+ payload = self._payloads.pop(0)
+ self._remote_allowance -= 1
+ action = True
+ if self._completion is None or self._payloads:
+ terminal_metadata, code, message, termination = None, None, None, None
+ else:
+ terminal_metadata, code, message, termination = _explode_completion(
+ self._completion)
+ self._completion = None
+ action = True
+
+ if action:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ local_subscription, timeout, allowance, initial_metadata, payload,
+ terminal_metadata, code, message, termination)
+ self._lowest_unused_sequence_number += 1
+ return ticket
+ else:
+ return None
+
+ def _transmit(self, ticket):
+ """Commences the transmission loop sending tickets.
+
+ Args:
+ ticket: A links.Ticket to be sent to the other side of the operation.
+ """
+ def transmit(ticket):
+ while True:
+ transmission_outcome = callable_util.call_logging_exceptions(
+ self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
+ if transmission_outcome.exception is None:
+ with self._lock:
+ if ticket.termination is links.Ticket.Termination.COMPLETION:
+ self._termination_manager.transmission_complete()
+ ticket = self._next_ticket()
+ if ticket is None:
+ self._transmitting = False
+ return
+ else:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
+ self._expiration_manager.terminate()
+ return
+
+ self._pool.submit(callable_util.with_exceptions_logged(
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
+ self._transmitting = True
+
+ def kick_off(
+ self, group, method, timeout, initial_metadata, payload, completion,
+ allowance):
+ """See _interfaces.TransmissionManager.kickoff for specification."""
+ # TODO(nathaniel): Support other subscriptions.
+ subscription = links.Ticket.Subscription.FULL
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ self._remote_allowance = 1 if payload is None else 0
+ ticket = links.Ticket(
+ self._operation_id, 0, group, method, subscription, timeout, allowance,
+ initial_metadata, payload, terminal_metadata, code, message,
+ termination)
+ self._lowest_unused_sequence_number = 1
+ self._transmit(ticket)
+
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """See _interfaces.TransmissionManager.advance for specification."""
+ effective_initial_metadata = initial_metadata
+ effective_payload = payload
+ effective_completion = completion
+ if allowance is not None and not self._remote_complete:
+ effective_allowance = allowance
+ else:
+ effective_allowance = None
+ if self._transmitting:
+ if effective_initial_metadata is not None:
+ self._initial_metadata = effective_initial_metadata
+ if effective_payload is not None:
+ self._payloads.append(effective_payload)
+ if effective_completion is not None:
+ self._completion = effective_completion
+ if effective_allowance is not None:
+ self._local_allowance += effective_allowance
+ else:
+ if effective_payload is not None:
+ if 0 < self._remote_allowance:
+ ticket_payload = effective_payload
+ self._remote_allowance -= 1
+ else:
+ self._payloads.append(effective_payload)
+ ticket_payload = None
+ else:
+ ticket_payload = None
+ if effective_completion is not None and not self._payloads:
+ ticket_completion = effective_completion
+ else:
+ self._completion = effective_completion
+ ticket_completion = None
+ if any(
+ (effective_initial_metadata, ticket_payload, ticket_completion,
+ effective_allowance)):
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, allowance, effective_initial_metadata, ticket_payload,
+ terminal_metadata, code, message, termination)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def timeout(self, timeout):
+ """See _interfaces.TransmissionManager.timeout for specification."""
+ if self._transmitting:
+ self._timeout = timeout
+ else:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, timeout, None, None, None, None, None, None, None)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def allowance(self, allowance):
+ """See _interfaces.TransmissionManager.allowance for specification."""
+ if self._transmitting or not self._payloads:
+ self._remote_allowance += allowance
+ else:
+ self._remote_allowance += allowance - 1
+ payload = self._payloads.pop(0)
+ if self._payloads:
+ completion = None
+ else:
+ completion = self._completion
+ self._completion = None
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, None, None, payload, terminal_metadata, code, message,
+ termination)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def remote_complete(self):
+ """See _interfaces.TransmissionManager.remote_complete for specification."""
+ self._remote_complete = True
+ self._local_allowance = 0
+
+ def abort(self, outcome):
+ """See _interfaces.TransmissionManager.abort for specification."""
+ if self._transmitting:
+ self._aborted, self._abortion_outcome = True, outcome
+ else:
+ self._aborted = True
+ if outcome is not None:
+ termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+ outcome]
+ if termination is not None:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None,
+ None, None, None, None, None, None, None, None, None,
+ termination)
+ self._transmit(ticket)
diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py
new file mode 100644
index 0000000..5b0d798
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_utilities.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal utilities."""
+
+import collections
+
+
+class ServicerPackage(
+ collections.namedtuple(
+ 'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))):
+ """A trivial bundle class.
+
+ Attributes:
+ servicer: A base.Servicer.
+ default_timeout: A float indicating the length of time in seconds to allow
+ for an operation invoked without a timeout.
+ maximum_timeout: A float indicating the maximum length of time in seconds to
+ allow for an operation.
+ """
diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py
new file mode 100644
index 0000000..364a7fa
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/implementations.py
@@ -0,0 +1,62 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the ticket-exchange-based base layer implementation."""
+
+# base and links are referenced from specification in this module.
+from grpc.framework.core import _end
+from grpc.framework.interfaces.base import base # pylint: disable=unused-import
+from grpc.framework.interfaces.links import links # pylint: disable=unused-import
+
+
+def invocation_end_link():
+ """Creates a base.End-links.Link suitable for operation invocation.
+
+ Returns:
+ An object that is both a base.End and a links.Link, that supports operation
+ invocation, and that translates operation invocation into ticket exchange.
+ """
+ return _end.serviceless_end_link()
+
+
+def service_end_link(servicer, default_timeout, maximum_timeout):
+ """Creates a base.End-links.Link suitable for operation service.
+
+ Args:
+ servicer: A base.Servicer for servicing operations.
+ default_timeout: A length of time in seconds to be used as the default
+ time alloted for a single operation.
+ maximum_timeout: A length of time in seconds to be used as the maximum
+ time alloted for a single operation.
+
+ Returns:
+ An object that is both a base.End and a links.Link and that services
+ operations that arrive at it through ticket exchange.
+ """
+ return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout)
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py
index 9d1651d..76e0a5b 100644
--- a/src/python/grpcio/grpc/framework/interfaces/base/base.py
+++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py
@@ -27,10 +27,20 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""The base interface of RPC Framework."""
+"""The base interface of RPC Framework.
+Implementations of this interface support the conduct of "operations":
+exchanges between two distinct ends of an arbitrary number of data payloads
+and metadata such as a name for the operation, initial and terminal metadata
+in each direction, and flow control. These operations may be used for transfers
+of data, remote procedure calls, status indication, or anything else
+applications choose.
+"""
+
+# threading is referenced from specification in this module.
import abc
import enum
+import threading
# abandonment is referenced from specification in this module.
from grpc.framework.foundation import abandonment # pylint: disable=unused-import
@@ -208,19 +218,26 @@
raise NotImplementedError()
@abc.abstractmethod
- def stop_gracefully(self):
- """Gracefully stops this object's service of operations.
+ def stop(self, grace):
+ """Stops this object's service of operations.
- Operations in progress will be allowed to complete, and this method blocks
- until all of them have.
- """
- raise NotImplementedError()
+ This object will refuse service of new operations as soon as this method is
+ called but operations under way at the time of the call may be given a
+ grace period during which they are allowed to finish.
- @abc.abstractmethod
- def stop_immediately(self):
- """Immediately stops this object's service of operations.
+ Args:
+ grace: A duration of time in seconds to allow ongoing operations to
+ terminate before being forcefully terminated by the stopping of this
+ End. May be zero to terminate all ongoing operations and immediately
+ stop.
- Operations in progress will not be allowed to complete.
+ Returns:
+ A threading.Event that will be set to indicate all operations having
+ terminated and this End having completely stopped. The returned event
+ may not be set until after the full grace period (if some ongoing
+ operation continues for the full length of the period) or it may be set
+ much sooner (if for example this End had no operations in progress at
+ the time its stop method was called).
"""
raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/__init__.py b/src/python/grpcio/grpc/framework/interfaces/face/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/face/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py
new file mode 100644
index 0000000..948e750
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py
@@ -0,0 +1,933 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces defining the Face layer of RPC Framework."""
+
+import abc
+import collections
+import enum
+
+# cardinality, style, abandonment, future, and stream are
+# referenced from specification in this module.
+from grpc.framework.common import cardinality # pylint: disable=unused-import
+from grpc.framework.common import style # pylint: disable=unused-import
+from grpc.framework.foundation import abandonment # pylint: disable=unused-import
+from grpc.framework.foundation import future # pylint: disable=unused-import
+from grpc.framework.foundation import stream # pylint: disable=unused-import
+
+
+class NoSuchMethodError(Exception):
+ """Raised by customer code to indicate an unrecognized method.
+
+ Attributes:
+ group: The group of the unrecognized method.
+ name: The name of the unrecognized method.
+ """
+
+ def __init__(self, group, method):
+ """Constructor.
+
+ Args:
+ group: The group identifier of the unrecognized RPC name.
+ method: The method identifier of the unrecognized RPC name.
+ """
+ super(NoSuchMethodError, self).__init__()
+ self.group = group
+ self.method = method
+
+ def __repr__(self):
+ return 'face.NoSuchMethodError(%s, %s)' % (self.group, self.method,)
+
+
+class Abortion(
+ collections.namedtuple(
+ 'Abortion',
+ ('kind', 'initial_metadata', 'terminal_metadata', 'code', 'details',))):
+ """A value describing RPC abortion.
+
+ Attributes:
+ kind: A Kind value identifying how the RPC failed.
+ initial_metadata: The initial metadata from the other side of the RPC or
+ None if no initial metadata value was received.
+ terminal_metadata: The terminal metadata from the other side of the RPC or
+ None if no terminal metadata value was received.
+ code: The code value from the other side of the RPC or None if no code value
+ was received.
+ details: The details value from the other side of the RPC or None if no
+ details value was received.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ """Types of RPC abortion."""
+
+ CANCELLED = 'cancelled'
+ EXPIRED = 'expired'
+ LOCAL_SHUTDOWN = 'local shutdown'
+ REMOTE_SHUTDOWN = 'remote shutdown'
+ NETWORK_FAILURE = 'network failure'
+ LOCAL_FAILURE = 'local failure'
+ REMOTE_FAILURE = 'remote failure'
+
+
+class AbortionError(Exception):
+ """Common super type for exceptions indicating RPC abortion.
+
+ initial_metadata: The initial metadata from the other side of the RPC or
+ None if no initial metadata value was received.
+ terminal_metadata: The terminal metadata from the other side of the RPC or
+ None if no terminal metadata value was received.
+ code: The code value from the other side of the RPC or None if no code value
+ was received.
+ details: The details value from the other side of the RPC or None if no
+ details value was received.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, initial_metadata, terminal_metadata, code, details):
+ super(AbortionError, self).__init__()
+ self.initial_metadata = initial_metadata
+ self.terminal_metadata = terminal_metadata
+ self.code = code
+ self.details = details
+
+
+class CancellationError(AbortionError):
+ """Indicates that an RPC has been cancelled."""
+
+
+class ExpirationError(AbortionError):
+ """Indicates that an RPC has expired ("timed out")."""
+
+
+class LocalShutdownError(AbortionError):
+ """Indicates that an RPC has terminated due to local shutdown of RPCs."""
+
+
+class RemoteShutdownError(AbortionError):
+ """Indicates that an RPC has terminated due to remote shutdown of RPCs."""
+
+
+class NetworkError(AbortionError):
+ """Indicates that some error occurred on the network."""
+
+
+class LocalError(AbortionError):
+ """Indicates that an RPC has terminated due to a local defect."""
+
+
+class RemoteError(AbortionError):
+ """Indicates that an RPC has terminated due to a remote defect."""
+
+
+class RpcContext(object):
+ """Provides RPC-related information and action."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def is_active(self):
+ """Describes whether the RPC is active or has terminated."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def time_remaining(self):
+ """Describes the length of allowed time remaining for the RPC.
+
+ Returns:
+ A nonnegative float indicating the length of allowed time in seconds
+ remaining for the RPC to complete before it is considered to have timed
+ out.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_abortion_callback(self, abortion_callback):
+ """Registers a callback to be called if the RPC is aborted.
+
+ Args:
+ abortion_callback: A callable to be called and passed an Abortion value
+ in the event of RPC abortion.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC.
+
+ Idempotent and has no effect if the RPC has already terminated.
+ """
+ raise NotImplementedError()
+
+
+class Call(RpcContext):
+ """Invocation-side utility object for an RPC."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata from the service-side of the RPC.
+
+ This method blocks until the value is available or is known not to have been
+ emitted from the service-side of the RPC.
+
+ Returns:
+ The initial metadata object emitted by the service-side of the RPC, or
+ None if there was no such value.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminal_metadata(self):
+ """Accesses the terminal metadata from the service-side of the RPC.
+
+ This method blocks until the value is available or is known not to have been
+ emitted from the service-side of the RPC.
+
+ Returns:
+ The terminal metadata object emitted by the service-side of the RPC, or
+ None if there was no such value.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def code(self):
+ """Accesses the code emitted by the service-side of the RPC.
+
+ This method blocks until the value is available or is known not to have been
+ emitted from the service-side of the RPC.
+
+ Returns:
+ The code object emitted by the service-side of the RPC, or None if there
+ was no such value.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def details(self):
+ """Accesses the details value emitted by the service-side of the RPC.
+
+ This method blocks until the value is available or is known not to have been
+ emitted from the service-side of the RPC.
+
+ Returns:
+ The details value emitted by the service-side of the RPC, or None if there
+ was no such value.
+ """
+ raise NotImplementedError()
+
+
+class ServicerContext(RpcContext):
+ """A context object passed to method implementations."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def invocation_metadata(self):
+ """Accesses the metadata from the invocation-side of the RPC.
+
+ This method blocks until the value is available or is known not to have been
+ emitted from the invocation-side of the RPC.
+
+ Returns:
+ The metadata object emitted by the invocation-side of the RPC, or None if
+ there was no such value.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def initial_metadata(self, initial_metadata):
+ """Accepts the service-side initial metadata value of the RPC.
+
+ This method need not be called by method implementations if they have no
+ service-side initial metadata to transmit.
+
+ Args:
+ initial_metadata: The service-side initial metadata value of the RPC to
+ be transmitted to the invocation side of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminal_metadata(self, terminal_metadata):
+ """Accepts the service-side terminal metadata value of the RPC.
+
+ This method need not be called by method implementations if they have no
+ service-side terminal metadata to transmit.
+
+ Args:
+ terminal_metadata: The service-side terminal metadata value of the RPC to
+ be transmitted to the invocation side of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def code(self, code):
+ """Accepts the service-side code of the RPC.
+
+ This method need not be called by method implementations if they have no
+ code to transmit.
+
+ Args:
+ code: The code of the RPC to be transmitted to the invocation side of the
+ RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def details(self, details):
+ """Accepts the service-side details of the RPC.
+
+ This method need not be called by method implementations if they have no
+ service-side details to transmit.
+
+ Args:
+ details: The service-side details value of the RPC to be transmitted to
+ the invocation side of the RPC.
+ """
+ raise NotImplementedError()
+
+
+class ResponseReceiver(object):
+ """Invocation-side object used to accept the output of an RPC."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def initial_metadata(self, initial_metadata):
+ """Receives the initial metadata from the service-side of the RPC.
+
+ Args:
+ initial_metadata: The initial metadata object emitted from the
+ service-side of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def response(self, response):
+ """Receives a response from the service-side of the RPC.
+
+ Args:
+ response: A response object emitted from the service-side of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def complete(self, terminal_metadata, code, details):
+ """Receives the completion values emitted from the service-side of the RPC.
+
+ Args:
+ terminal_metadata: The terminal metadata object emitted from the
+ service-side of the RPC.
+ code: The code object emitted from the service-side of the RPC.
+ details: The details object emitted from the service-side of the RPC.
+ """
+ raise NotImplementedError()
+
+
+class UnaryUnaryMultiCallable(object):
+ """Affords invoking a unary-unary RPC in any call style."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(
+ self, request, timeout, metadata=None, with_call=False):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+ with_call: Whether or not to include return a Call for the RPC in addition
+ to the reponse.
+
+ Returns:
+ The response value for the RPC, and a Call for the RPC if with_call was
+ set to True at invocation.
+
+ Raises:
+ AbortionError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future(self, request, timeout, metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and a future.Future. In the
+ event of RPC completion, the return Future's result value will be the
+ response value of the RPC. In the event of RPC abortion, the returned
+ Future's exception value will be an AbortionError.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event(
+ self, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ A Call for the RPC.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamMultiCallable(object):
+ """Affords invoking a unary-stream RPC in any call style."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(self, request, timeout, metadata=None):
+ """Invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and an iterator of response
+ values. Drawing response values from the returned iterator may raise
+ AbortionError indicating abortion of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event(
+ self, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ A Call object for the RPC.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryMultiCallable(object):
+ """Affords invoking a stream-unary RPC in any call style."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(
+ self, request_iterator, timeout, metadata=None,
+ with_call=False):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+ with_call: Whether or not to include return a Call for the RPC in addition
+ to the reponse.
+
+ Returns:
+ The response value for the RPC, and a Call for the RPC if with_call was
+ set to True at invocation.
+
+ Raises:
+ AbortionError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future(self, request_iterator, timeout, metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and a future.Future. In the
+ event of RPC completion, the return Future's result value will be the
+ response value of the RPC. In the event of RPC abortion, the returned
+ Future's exception value will be an AbortionError.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event(
+ self, receiver, abortion_callback, timeout, metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ A single object that is both a Call object for the RPC and a
+ stream.Consumer to which the request values of the RPC should be passed.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamMultiCallable(object):
+ """Affords invoking a stream-stream RPC in any call style."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(self, request_iterator, timeout, metadata=None):
+ """Invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and an iterator of response
+ values. Drawing response values from the returned iterator may raise
+ AbortionError indicating abortion of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event(
+ self, receiver, abortion_callback, timeout, metadata=None):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of
+ the RPC.
+
+ Returns:
+ A single object that is both a Call object for the RPC and a
+ stream.Consumer to which the request values of the RPC should be passed.
+ """
+ raise NotImplementedError()
+
+
+class MethodImplementation(object):
+ """A sum type that describes a method implementation.
+
+ Attributes:
+ cardinality: A cardinality.Cardinality value.
+ style: A style.Service value.
+ unary_unary_inline: The implementation of the method as a callable value
+ that takes a request value and a ServicerContext object and returns a
+ response value. Only non-None if cardinality is
+ cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
+ unary_stream_inline: The implementation of the method as a callable value
+ that takes a request value and a ServicerContext object and returns an
+ iterator of response values. Only non-None if cardinality is
+ cardinality.Cardinality.UNARY_STREAM and style is style.Service.INLINE.
+ stream_unary_inline: The implementation of the method as a callable value
+ that takes an iterator of request values and a ServicerContext object and
+ returns a response value. Only non-None if cardinality is
+ cardinality.Cardinality.STREAM_UNARY and style is style.Service.INLINE.
+ stream_stream_inline: The implementation of the method as a callable value
+ that takes an iterator of request values and a ServicerContext object and
+ returns an iterator of response values. Only non-None if cardinality is
+ cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
+ unary_unary_event: The implementation of the method as a callable value that
+ takes a request value, a response callback to which to pass the response
+ value of the RPC, and a ServicerContext. Only non-None if cardinality is
+ cardinality.Cardinality.UNARY_UNARY and style is style.Service.EVENT.
+ unary_stream_event: The implementation of the method as a callable value
+ that takes a request value, a stream.Consumer to which to pass the
+ response values of the RPC, and a ServicerContext. Only non-None if
+ cardinality is cardinality.Cardinality.UNARY_STREAM and style is
+ style.Service.EVENT.
+ stream_unary_event: The implementation of the method as a callable value
+ that takes a response callback to which to pass the response value of the
+ RPC and a ServicerContext and returns a stream.Consumer to which the
+ request values of the RPC should be passed. Only non-None if cardinality
+ is cardinality.Cardinality.STREAM_UNARY and style is style.Service.EVENT.
+ stream_stream_event: The implementation of the method as a callable value
+ that takes a stream.Consumer to which to pass the response values of the
+ RPC and a ServicerContext and returns a stream.Consumer to which the
+ request values of the RPC should be passed. Only non-None if cardinality
+ is cardinality.Cardinality.STREAM_STREAM and style is
+ style.Service.EVENT.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class MultiMethodImplementation(object):
+ """A general type able to service many methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, group, method, response_consumer, context):
+ """Services an RPC.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ response_consumer: A stream.Consumer to be called to accept the response
+ values of the RPC.
+ context: a ServicerContext object.
+
+ Returns:
+ A stream.Consumer with which to accept the request values of the RPC. The
+ consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing values to this object. Implementations must not assume that this
+ object will be called to completion of the request stream or even called
+ at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ NoSuchMethodError: If this MultiMethod does not recognize the given group
+ and name for the RPC and is not able to service the RPC.
+ """
+ raise NotImplementedError()
+
+
+class GenericStub(object):
+ """Affords RPC invocation via generic methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def blocking_unary_unary(
+ self, group, method, request, timeout, metadata=None,
+ with_call=False):
+ """Invokes a unary-request-unary-response method.
+
+ This method blocks until either returning the response value of the RPC
+ (in the event of RPC completion) or raising an exception (in the event of
+ RPC abortion).
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+ with_call: Whether or not to include return a Call for the RPC in addition
+ to the reponse.
+
+ Returns:
+ The response value for the RPC, and a Call for the RPC if with_call was
+ set to True at invocation.
+
+ Raises:
+ AbortionError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future_unary_unary(
+ self, group, method, request, timeout, metadata=None):
+ """Invokes a unary-request-unary-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and a future.Future. In the
+ event of RPC completion, the return Future's result value will be the
+ response value of the RPC. In the event of RPC abortion, the returned
+ Future's exception value will be an AbortionError.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def inline_unary_stream(
+ self, group, method, request, timeout, metadata=None):
+ """Invokes a unary-request-stream-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and an iterator of response
+ values. Drawing response values from the returned iterator may raise
+ AbortionError indicating abortion of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def blocking_stream_unary(
+ self, group, method, request_iterator, timeout, metadata=None,
+ with_call=False):
+ """Invokes a stream-request-unary-response method.
+
+ This method blocks until either returning the response value of the RPC
+ (in the event of RPC completion) or raising an exception (in the event of
+ RPC abortion).
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+ with_call: Whether or not to include return a Call for the RPC in addition
+ to the reponse.
+
+ Returns:
+ The response value for the RPC, and a Call for the RPC if with_call was
+ set to True at invocation.
+
+ Raises:
+ AbortionError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future_stream_unary(
+ self, group, method, request_iterator, timeout, metadata=None):
+ """Invokes a stream-request-unary-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and a future.Future. In the
+ event of RPC completion, the return Future's result value will be the
+ response value of the RPC. In the event of RPC abortion, the returned
+ Future's exception value will be an AbortionError.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def inline_stream_stream(
+ self, group, method, request_iterator, timeout, metadata=None):
+ """Invokes a stream-request-stream-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ An object that is both a Call for the RPC and an iterator of response
+ values. Drawing response values from the returned iterator may raise
+ AbortionError indicating abortion of the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_unary_unary(
+ self, group, method, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ """Event-driven invocation of a unary-request-unary-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request: The request value for the RPC.
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ A Call for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_unary_stream(
+ self, group, method, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ """Event-driven invocation of a unary-request-stream-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ request: The request value for the RPC.
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ A Call for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_stream_unary(
+ self, group, method, receiver, abortion_callback, timeout,
+ metadata=None):
+ """Event-driven invocation of a unary-request-unary-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ A pair of a Call object for the RPC and a stream.Consumer to which the
+ request values of the RPC should be passed.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_stream_stream(
+ self, group, method, receiver, abortion_callback, timeout,
+ metadata=None):
+ """Event-driven invocation of a unary-request-stream-response method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+ receiver: A ResponseReceiver to be passed the response data of the RPC.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+ metadata: A metadata value to be passed to the service-side of the RPC.
+
+ Returns:
+ A pair of a Call object for the RPC and a stream.Consumer to which the
+ request values of the RPC should be passed.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_unary(self, group, method):
+ """Creates a UnaryUnaryMultiCallable for a unary-unary method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+
+ Returns:
+ A UnaryUnaryMultiCallable value for the named unary-unary method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_stream(self, group, method):
+ """Creates a UnaryStreamMultiCallable for a unary-stream method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+
+ Returns:
+ A UnaryStreamMultiCallable value for the name unary-stream method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_unary(self, group, method):
+ """Creates a StreamUnaryMultiCallable for a stream-unary method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+
+ Returns:
+ A StreamUnaryMultiCallable value for the named stream-unary method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_stream(self, group, method):
+ """Creates a StreamStreamMultiCallable for a stream-stream method.
+
+ Args:
+ group: The group identifier of the RPC.
+ method: The method identifier of the RPC.
+
+ Returns:
+ A StreamStreamMultiCallable value for the named stream-stream method.
+ """
+ raise NotImplementedError()
+
+
+class DynamicStub(object):
+ """Affords RPC invocation via attributes corresponding to afforded methods.
+
+ Instances of this type may be scoped to a single group so that attribute
+ access is unambiguous.
+
+ Instances of this type respond to attribute access as follows: if the
+ requested attribute is the name of a unary-unary method, the value of the
+ attribute will be a UnaryUnaryMultiCallable with which to invoke an RPC; if
+ the requested attribute is the name of a unary-stream method, the value of the
+ attribute will be a UnaryStreamMultiCallable with which to invoke an RPC; if
+ the requested attribute is the name of a stream-unary method, the value of the
+ attribute will be a StreamUnaryMultiCallable with which to invoke an RPC; and
+ if the requested attribute is the name of a stream-stream method, the value of
+ the attribute will be a StreamStreamMultiCallable with which to invoke an RPC.
+ """
+ __metaclass__ = abc.ABCMeta
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/utilities.py b/src/python/grpcio/grpc/framework/interfaces/face/utilities.py
new file mode 100644
index 0000000..db2ec6e
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/face/utilities.py
@@ -0,0 +1,178 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for RPC Framework's Face interface."""
+
+import collections
+
+# stream is referenced from specification in this module.
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import stream # pylint: disable=unused-import
+from grpc.framework.interfaces.face import face
+
+
+class _MethodImplementation(
+ face.MethodImplementation,
+ collections.namedtuple(
+ '_MethodImplementation',
+ ['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
+ 'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
+ 'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
+ pass
+
+
+def unary_unary_inline(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable value
+ that takes a request value and an face.ServicerContext object and
+ returns a response value.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
+ None, None, None, None, None, None, None)
+
+
+def unary_stream_inline(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value and an face.ServicerContext object and
+ returns an iterator of response values.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
+ behavior, None, None, None, None, None, None)
+
+
+def stream_unary_inline(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes an iterator of request values and an
+ face.ServicerContext object and returns a response value.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
+ behavior, None, None, None, None, None)
+
+
+def stream_stream_inline(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes an iterator of request values and an
+ face.ServicerContext object and returns an iterator of response values.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
+ None, behavior, None, None, None, None)
+
+
+def unary_unary_event(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable
+ value that takes a request value, a response callback to which to pass
+ the response value of the RPC, and an face.ServicerContext.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
+ None, None, behavior, None, None, None)
+
+
+def unary_stream_event(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value, a stream.Consumer to which to pass the
+ the response values of the RPC, and an face.ServicerContext.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
+ None, None, None, behavior, None, None)
+
+
+def stream_unary_event(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes a response callback to which to pass the response value
+ of the RPC and an face.ServicerContext and returns a stream.Consumer to
+ which the request values of the RPC should be passed.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
+ None, None, None, None, behavior, None)
+
+
+def stream_stream_event(behavior):
+ """Creates an face.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes a stream.Consumer to which to pass the response values
+ of the RPC and an face.ServicerContext and returns a stream.Consumer to
+ which the request values of the RPC should be passed.
+
+ Returns:
+ An face.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
+ None, None, None, None, None, behavior)
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py
index 5ebbac8..069ff02 100644
--- a/src/python/grpcio/grpc/framework/interfaces/links/links.py
+++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py
@@ -98,7 +98,7 @@
COMPLETION = 'completion'
CANCELLATION = 'cancellation'
EXPIRATION = 'expiration'
- LOCAL_SHUTDOWN = 'local shutdown'
+ SHUTDOWN = 'shutdown'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
LOCAL_FAILURE = 'local failure'
diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
new file mode 100644
index 0000000..72b1ae5
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
@@ -0,0 +1,165 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import collections
+import logging
+import random
+import time
+import unittest
+
+from grpc._adapter import _intermediary_low
+from grpc._links import invocation
+from grpc._links import service
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test import test_common as grpc_test_common
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
+_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
+_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
+_CODE = _intermediary_low.Code.OK
+_MESSAGE = b'test message'
+
+
+class _SerializationBehaviors(
+ collections.namedtuple(
+ '_SerializationBehaviors',
+ ('request_serializers', 'request_deserializers', 'response_serializers',
+ 'response_deserializers',))):
+ pass
+
+
+class _Links(
+ collections.namedtuple(
+ '_Links',
+ ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
+ 'service_end_link'))):
+ pass
+
+
+def _serialization_behaviors_from_serializations(serializations):
+ request_serializers = {}
+ request_deserializers = {}
+ response_serializers = {}
+ response_deserializers = {}
+ for (group, method), serialization in serializations.iteritems():
+ request_serializers[group, method] = serialization.serialize_request
+ request_deserializers[group, method] = serialization.deserialize_request
+ response_serializers[group, method] = serialization.serialize_response
+ response_deserializers[group, method] = serialization.deserialize_response
+ return _SerializationBehaviors(
+ request_serializers, request_deserializers, response_serializers,
+ response_deserializers)
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def instantiate(self, serializations, servicer):
+ serialization_behaviors = _serialization_behaviors_from_serializations(
+ serializations)
+ invocation_end_link = implementations.invocation_end_link()
+ service_end_link = implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ service_grpc_link = service.service_link(
+ serialization_behaviors.request_deserializers,
+ serialization_behaviors.response_serializers)
+ port = service_grpc_link.add_port(0, None)
+ channel = _intermediary_low.Channel('localhost:%d' % port, None)
+ invocation_grpc_link = invocation.invocation_link(
+ channel, b'localhost',
+ serialization_behaviors.request_serializers,
+ serialization_behaviors.response_deserializers)
+
+ invocation_end_link.join_link(invocation_grpc_link)
+ invocation_grpc_link.join_link(invocation_end_link)
+ service_end_link.join_link(service_grpc_link)
+ service_grpc_link.join_link(service_end_link)
+ invocation_grpc_link.start()
+ service_grpc_link.start()
+ return invocation_end_link, service_end_link, (
+ invocation_grpc_link, service_grpc_link)
+
+ def destantiate(self, memo):
+ invocation_grpc_link, service_grpc_link = memo
+ invocation_grpc_link.stop()
+ service_grpc_link.stop_gracefully()
+
+ def invocation_initial_metadata(self):
+ return _INVOCATION_INITIAL_METADATA
+
+ def service_initial_metadata(self):
+ return _SERVICE_INITIAL_METADATA
+
+ def invocation_completion(self):
+ return utilities.completion(None, None, None)
+
+ def service_completion(self):
+ return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return original_metadata is None or grpc_test_common.metadata_transmitted(
+ original_metadata, transmitted_metadata)
+
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ if (original_completion.terminal_metadata is not None and
+ not grpc_test_common.metadata_transmitted(
+ original_completion.terminal_metadata,
+ transmitted_completion.terminal_metadata)):
+ return False
+ elif original_completion.code is not transmitted_completion.code:
+ return False
+ elif original_completion.message != transmitted_completion.message:
+ return False
+ else:
+ return True
+
+
+def setUpModule():
+ logging.warn('setUpModule!')
+
+
+def tearDownModule():
+ logging.warn('tearDownModule!')
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
new file mode 100644
index 0000000..8d72f13
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
@@ -0,0 +1,96 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import logging
+import random
+import time
+import unittest
+
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def __init__(self):
+ self._invocation_initial_metadata = object()
+ self._service_initial_metadata = object()
+ self._invocation_terminal_metadata = object()
+ self._service_terminal_metadata = object()
+
+ def instantiate(self, serializations, servicer):
+ invocation = implementations.invocation_end_link()
+ service = implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ invocation.join_link(service)
+ service.join_link(invocation)
+ return invocation, service, None
+
+ def destantiate(self, memo):
+ pass
+
+ def invocation_initial_metadata(self):
+ return self._invocation_initial_metadata
+
+ def service_initial_metadata(self):
+ return self._service_initial_metadata
+
+ def invocation_completion(self):
+ return utilities.completion(self._invocation_terminal_metadata, None, None)
+
+ def service_completion(self):
+ return utilities.completion(self._service_terminal_metadata, None, None)
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return transmitted_metadata is original_metadata
+
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ return (
+ (original_completion.terminal_metadata is
+ transmitted_completion.terminal_metadata) and
+ original_completion.code is transmitted_completion.code and
+ original_completion.message is transmitted_completion.message
+ )
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
index dd332fe..5c8b176 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -211,8 +211,10 @@
elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
break
- invocation_end.stop_gracefully()
- service_end.stop_gracefully()
+ invocation_stop_event = invocation_end.stop(0)
+ service_stop_event = service_end.stop(0)
+ invocation_stop_event.wait()
+ service_stop_event.wait()
invocation_stats = invocation_end.operation_stats()
service_stats = service_end.operation_stats()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
index 6c2e334..a2bd710 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
@@ -29,9 +29,42 @@
"""State and behavior appropriate for use in tests."""
+import logging
import threading
+import time
from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+# A more-or-less arbitrary limit on the length of raw data values to be logged.
+_UNCOMFORTABLY_LONG = 48
+
+
+def _safe_for_log_ticket(ticket):
+ """Creates a safe-for-printing-to-the-log ticket for a given ticket.
+
+ Args:
+ ticket: Any links.Ticket.
+
+ Returns:
+ A links.Ticket that is as much as can be equal to the given ticket but
+ possibly features values like the string "<payload of length 972321>" in
+ place of the actual values of the given ticket.
+ """
+ if isinstance(ticket.payload, (basestring,)):
+ payload_length = len(ticket.payload)
+ else:
+ payload_length = -1
+ if payload_length < _UNCOMFORTABLY_LONG:
+ return ticket
+ else:
+ return links.Ticket(
+ ticket.operation_id, ticket.sequence_number,
+ ticket.group, ticket.method, ticket.subscription, ticket.timeout,
+ ticket.allowance, ticket.initial_metadata,
+ '<payload of length {}>'.format(payload_length),
+ ticket.terminal_metadata, ticket.code, ticket.message,
+ ticket.termination)
class RecordingLink(links.Link):
@@ -64,3 +97,71 @@
"""Returns a copy of the list of all tickets received by this Link."""
with self._condition:
return tuple(self._tickets)
+
+
+class _Pipe(object):
+ """A conduit that logs all tickets passed through it."""
+
+ def __init__(self, name):
+ self._lock = threading.Lock()
+ self._name = name
+ self._left_mate = utilities.NULL_LINK
+ self._right_mate = utilities.NULL_LINK
+
+ def accept_left_to_right_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving left to right through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._right_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def accept_right_to_left_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving right to left through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._left_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def join_left_mate(self, left_mate):
+ with self._lock:
+ self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate
+
+ def join_right_mate(self, right_mate):
+ with self._lock:
+ self._right_mate = (
+ utilities.NULL_LINK if right_mate is None else right_mate)
+
+
+class _Facade(links.Link):
+
+ def __init__(self, accept, join):
+ self._accept = accept
+ self._join = join
+
+ def accept_ticket(self, ticket):
+ self._accept(ticket)
+
+ def join_link(self, link):
+ self._join(link)
+
+
+def logging_links(name):
+ """Creates a conduit that logs all tickets passed through it.
+
+ Args:
+ name: A name to use for the conduit to identify itself in logging output.
+
+ Returns:
+ Two links.Links, the first of which is the "left" side of the conduit
+ and the second of which is the "right" side of the conduit.
+ """
+ pipe = _Pipe(name)
+ left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate)
+ right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate)
+ return left_facade, right_facade
diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c
index 27531ec..a0a6793 100644
--- a/test/core/end2end/fixtures/chttp2_fake_security.c
+++ b/test/core/end2end/fixtures/chttp2_fake_security.c
@@ -70,7 +70,7 @@
grpc_process_auth_metadata_done_cb cb,
void *user_data) {
GPR_ASSERT(state == NULL);
- cb(user_data, NULL, 0, 0);
+ cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
}
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
index 491a293..beae241 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
@@ -73,7 +73,7 @@
grpc_process_auth_metadata_done_cb cb,
void *user_data) {
GPR_ASSERT(state == NULL);
- cb(user_data, NULL, 0, 0);
+ cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
}
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c
index f2736cc..c8971be 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c
@@ -73,7 +73,7 @@
grpc_process_auth_metadata_done_cb cb,
void *user_data) {
GPR_ASSERT(state == NULL);
- cb(user_data, NULL, 0, 0);
+ cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
}
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c
index cc0b9db..a518a7d 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c
@@ -101,7 +101,7 @@
grpc_process_auth_metadata_done_cb cb,
void *user_data) {
GPR_ASSERT(state == NULL);
- cb(user_data, NULL, 0, 0);
+ cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
}
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
index d82e623..7f11028 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
@@ -79,7 +79,7 @@
client_identity);
GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(
ctx, client_identity_property_name) == 1);
- cb(user_data, oauth2, 1, 1);
+ cb(user_data, oauth2, 1, NULL, 0, GRPC_STATUS_OK, NULL);
}
static void process_oauth2_failure(void *state, grpc_auth_context *ctx,
@@ -90,7 +90,7 @@
find_metadata(md, md_count, "Authorization", oauth2_md);
GPR_ASSERT(state == NULL);
GPR_ASSERT(oauth2 != NULL);
- cb(user_data, oauth2, 1, 0);
+ cb(user_data, oauth2, 1, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
}
static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack(
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 6f80219..6fdca93 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -39,4 +39,12 @@
export DYLD_LIBRARY_PATH=$ROOT/libs/$CONFIG
export PATH=$ROOT/bins/$CONFIG:$ROOT/bins/$CONFIG/protobuf:$PATH
source "python"$PYVER"_virtual_environment"/bin/activate
+
+# TODO(atash): These tests don't currently run under py.test and thus don't
+# appear under the coverage report. Find a way to get these tests to work with
+# py.test (or find another tool or *something*) that's acceptable to the rest of
+# the team...
+"python"$PYVER -m grpc_test._core_over_links_base_interface_test
+"python"$PYVER -m grpc_test.framework.core._base_interface_test
+
"python"$PYVER $GRPCIO_TEST/setup.py test -a "-n8 --cov=grpc --junitxml=./report.xml"