Merge pull request #2818 from ctiller/y12kdm3

Add a test of non-blocking API behavior
diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h
index 640c1fd..7f8f4d4 100644
--- a/include/grpc/grpc_security.h
+++ b/include/grpc/grpc_security.h
@@ -275,12 +275,18 @@
 /* --- Auth Metadata Processing --- */
 
 /* Callback function that is called when the metadata processing is done.
-   success is 1 if processing succeeded, 0 otherwise.
-   Consumed metadata will be removed from the set of metadata available on the
-   call. */
+   - Consumed metadata will be removed from the set of metadata available on the
+     call. consumed_md may be NULL if no metadata has been consumed.
+   - Response metadata will be set on the response. response_md may be NULL.
+   - status is GRPC_STATUS_OK for success or a specific status for an error.
+     Common error status for auth metadata processing is either
+     GRPC_STATUS_UNAUTHENTICATED in case of an authentication failure or
+     GRPC_STATUS PERMISSION_DENIED in case of an authorization failure.
+   - error_details gives details about the error. May be NULL. */
 typedef void (*grpc_process_auth_metadata_done_cb)(
     void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
-    int success);
+    const grpc_metadata *response_md, size_t num_response_md,
+    grpc_status_code status, const char *error_details);
 
 /* Pluggable server-side metadata processor object. */
 typedef struct {
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 2f42f01..6e83143 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -104,24 +104,34 @@
   return md;
 }
 
-static void on_md_processing_done(void *user_data,
-                                  const grpc_metadata *consumed_md,
-                                  size_t num_consumed_md, int success) {
+static void on_md_processing_done(
+    void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+    const grpc_metadata *response_md, size_t num_response_md,
+    grpc_status_code status, const char *error_details) {
   grpc_call_element *elem = user_data;
   call_data *calld = elem->call_data;
 
-  if (success) {
+  /* TODO(jboeuf): Implement support for response_md. */
+  if (response_md != NULL && num_response_md > 0) {
+    gpr_log(GPR_INFO,
+            "response_md in auth metadata processing not supported for now. "
+            "Ignoring...");
+  }
+
+  if (status == GRPC_STATUS_OK) {
     calld->consumed_md = consumed_md;
     calld->num_consumed_md = num_consumed_md;
     grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md,
                                elem);
-    calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
+    calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1);
   } else {
-    gpr_slice message = gpr_slice_from_copied_string(
-        "Authentication metadata processing failed.");
+    gpr_slice message;
+    error_details = error_details != NULL
+                    ? error_details
+                    : "Authentication metadata processing failed.";
+    message = gpr_slice_from_copied_string(error_details);
     grpc_sopb_reset(calld->recv_ops);
-    grpc_transport_stream_op_add_close(&calld->transport_op,
-                                       GRPC_STATUS_UNAUTHENTICATED, &message);
+    grpc_transport_stream_op_add_close(&calld->transport_op, status, &message);
     grpc_call_next_op(elem, &calld->transport_op);
   }
 }
diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore
index ae48956..48365e3 100644
--- a/src/csharp/.gitignore
+++ b/src/csharp/.gitignore
@@ -5,4 +5,5 @@
 packages
 Grpc.v12.suo
 TestResult.xml
+/TestResults
 *.nupkg
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index ad4e94a..b571fe9 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -65,6 +65,7 @@
     </Compile>
     <Compile Include="ClientBaseTest.cs" />
     <Compile Include="ShutdownTest.cs" />
+    <Compile Include="Internal\AsyncCallTest.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="ClientServerTest.cs" />
     <Compile Include="ServerTest.cs" />
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 4fdfab5..78295cf 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -53,7 +53,7 @@
         {
             var env1 = GrpcEnvironment.AddRef();
             var env2 = GrpcEnvironment.AddRef();
-            Assert.IsTrue(object.ReferenceEquals(env1, env2));
+            Assert.AreSame(env1, env2);
             GrpcEnvironment.Release();
             GrpcEnvironment.Release();
         }
@@ -61,18 +61,21 @@
         [Test]
         public void InitializeAfterShutdown()
         {
+            Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
+
             var env1 = GrpcEnvironment.AddRef();
             GrpcEnvironment.Release();
 
             var env2 = GrpcEnvironment.AddRef();
             GrpcEnvironment.Release();
 
-            Assert.IsFalse(object.ReferenceEquals(env1, env2));
+            Assert.AreNotSame(env1, env2);
         }
 
         [Test]
         public void ReleaseWithoutAddRef()
         {
+            Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
             Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
         }
 
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
new file mode 100644
index 0000000..685c5f7
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -0,0 +1,222 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+    public class AsyncCallTest
+    {
+        Channel channel;
+        FakeNativeCall fakeCall;
+        AsyncCall<string, string> asyncCall;
+
+        [SetUp]
+        public void Init()
+        {
+            channel = new Channel("localhost", Credentials.Insecure);
+
+            fakeCall = new FakeNativeCall();
+
+            var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions());
+            asyncCall = new AsyncCall<string, string>(callDetails, fakeCall);
+        }
+
+        [TearDown]
+        public void Cleanup()
+        {
+            channel.ShutdownAsync().Wait();
+        }
+
+        [Test]
+        public void AsyncUnary_CompletionSuccess()
+        {
+            var resultTask = asyncCall.UnaryCallAsync("abc");
+            fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }, new Metadata());
+            Assert.IsTrue(resultTask.IsCompleted);
+            Assert.IsTrue(fakeCall.IsDisposed);
+            Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
+        }
+
+        [Test]
+        public void AsyncUnary_CompletionFailure()
+        {
+            var resultTask = asyncCall.UnaryCallAsync("abc");
+            fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(new Status(StatusCode.Internal, ""), null), new byte[] { 1, 2, 3 }, new Metadata());
+
+            Assert.IsTrue(resultTask.IsCompleted);
+            Assert.IsTrue(fakeCall.IsDisposed);
+
+            Assert.AreEqual(StatusCode.Internal, asyncCall.GetStatus().StatusCode);
+            Assert.IsNull(asyncCall.GetTrailers());
+            var ex = Assert.Throws<RpcException>(() => resultTask.GetAwaiter().GetResult());
+            Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
+        }
+
+        internal class FakeNativeCall : INativeCall
+        {
+            public UnaryResponseClientHandler UnaryResponseClientHandler
+            {
+                get;
+                set;
+            }
+
+            public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
+            {
+                get;
+                set;
+            }
+
+            public ReceivedMessageHandler ReceivedMessageHandler
+            {
+                get;
+                set;
+            }
+
+            public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
+            {
+                get;
+                set;
+            }
+
+            public SendCompletionHandler SendCompletionHandler
+            {
+                get;
+                set;
+            }
+
+            public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
+            {
+                get;
+                set;
+            }
+
+            public bool IsCancelled
+            {
+                get;
+                set;
+            }
+
+            public bool IsDisposed
+            {
+                get;
+                set;
+            }
+
+            public void Cancel()
+            {
+                IsCancelled = true;
+            }
+
+            public void CancelWithStatus(Status status)
+            {
+                IsCancelled = true;
+            }
+
+            public string GetPeer()
+            {
+                return "PEER";
+            }
+
+            public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+            {
+                UnaryResponseClientHandler = callback;
+            }
+
+            public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
+            {
+                UnaryResponseClientHandler = callback;
+            }
+
+            public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+            {
+                ReceivedStatusOnClientHandler = callback;
+            }
+
+            public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
+            {
+                ReceivedStatusOnClientHandler = callback;
+            }
+
+            public void StartReceiveMessage(ReceivedMessageHandler callback)
+            {
+                ReceivedMessageHandler = callback;
+            }
+
+            public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+            {
+                ReceivedResponseHeadersHandler = callback;
+            }
+
+            public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
+            {
+                SendCompletionHandler = callback;
+            }
+
+            public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+            {
+                SendCompletionHandler = callback;
+            }
+
+            public void StartSendCloseFromClient(SendCompletionHandler callback)
+            {
+                SendCompletionHandler = callback;
+            }
+
+            public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+            {
+                SendCompletionHandler = callback;
+            }
+
+            public void StartServerSide(ReceivedCloseOnServerHandler callback)
+            {
+                ReceivedCloseOnServerHandler = callback;
+            }
+
+            public void Dispose()
+            {
+                IsDisposed = true;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
index 7060067..a1648f3 100644
--- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
@@ -32,13 +32,16 @@
 #endregion
 
 using System;
+using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
+
 using Grpc.Core;
 using Grpc.Core.Internal;
 using Grpc.Core.Utils;
+
 using NUnit.Framework;
 
 namespace Grpc.Core.Tests
@@ -74,6 +77,80 @@
         }
 
         [Test]
+        public async Task ResponseHeadersAsync_UnaryCall()
+        {
+            helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+            {
+                await context.WriteResponseHeadersAsync(headers);
+                return "PASS";
+            });
+
+            var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "");
+            var responseHeaders = await call.ResponseHeadersAsync;
+
+            Assert.AreEqual(headers.Count, responseHeaders.Count);
+            Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+            Assert.AreEqual("abcdefg", responseHeaders[0].Value);
+
+            Assert.AreEqual("PASS", await call.ResponseAsync);
+        }
+
+        [Test]
+        public async Task ResponseHeadersAsync_ClientStreamingCall()
+        {
+            helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+            {
+                await context.WriteResponseHeadersAsync(headers);
+                return "PASS";
+            });
+
+            var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
+            await call.RequestStream.CompleteAsync();
+            var responseHeaders = await call.ResponseHeadersAsync;
+
+            Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+            Assert.AreEqual("PASS", await call.ResponseAsync);
+        }
+
+        [Test]
+        public async Task ResponseHeadersAsync_ServerStreamingCall()
+        {
+            helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+            {
+                await context.WriteResponseHeadersAsync(headers);
+                await responseStream.WriteAsync("PASS");
+            });
+
+            var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+            var responseHeaders = await call.ResponseHeadersAsync;
+
+            Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+            CollectionAssert.AreEqual(new[] { "PASS" }, await call.ResponseStream.ToListAsync());
+        }
+
+        [Test]
+        public async Task ResponseHeadersAsync_DuplexStreamingCall()
+        {
+            helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+            {
+                await context.WriteResponseHeadersAsync(headers);
+                while (await requestStream.MoveNext())
+                {
+                    await responseStream.WriteAsync(requestStream.Current);
+                }
+            });
+
+            var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
+            var responseHeaders = await call.ResponseHeadersAsync;
+
+            var messages = new[] { "PASS" };
+            await call.RequestStream.WriteAllAsync(messages);
+
+            Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+            CollectionAssert.AreEqual(messages, await call.ResponseStream.ToListAsync());
+        }
+
+        [Test]
         public void WriteResponseHeaders_NullNotAllowed()
         {
             helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index fb9b562..dbaa308 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -44,14 +44,16 @@
     {
         readonly IClientStreamWriter<TRequest> requestStream;
         readonly Task<TResponse> responseAsync;
+        readonly Task<Metadata> responseHeadersAsync;
         readonly Func<Status> getStatusFunc;
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+        public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
         {
             this.requestStream = requestStream;
             this.responseAsync = responseAsync;
+            this.responseHeadersAsync = responseHeadersAsync;
             this.getStatusFunc = getStatusFunc;
             this.getTrailersFunc = getTrailersFunc;
             this.disposeAction = disposeAction;
@@ -69,6 +71,17 @@
         }
 
         /// <summary>
+        /// Asynchronous access to response headers.
+        /// </summary>
+        public Task<Metadata> ResponseHeadersAsync
+        {
+            get
+            {
+                return this.responseHeadersAsync;
+            }
+        }
+
+        /// <summary>
         /// Async stream to send streaming requests.
         /// </summary>
         public IClientStreamWriter<TRequest> RequestStream
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index 183c842..ee7ba29 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -32,6 +32,7 @@
 #endregion
 
 using System;
+using System.Threading.Tasks;
 
 namespace Grpc.Core
 {
@@ -42,14 +43,16 @@
     {
         readonly IClientStreamWriter<TRequest> requestStream;
         readonly IAsyncStreamReader<TResponse> responseStream;
+        readonly Task<Metadata> responseHeadersAsync;
         readonly Func<Status> getStatusFunc;
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+        public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
         {
             this.requestStream = requestStream;
             this.responseStream = responseStream;
+            this.responseHeadersAsync = responseHeadersAsync;
             this.getStatusFunc = getStatusFunc;
             this.getTrailersFunc = getTrailersFunc;
             this.disposeAction = disposeAction;
@@ -78,6 +81,17 @@
         }
 
         /// <summary>
+        /// Asynchronous access to response headers.
+        /// </summary>
+        public Task<Metadata> ResponseHeadersAsync
+        {
+            get
+            {
+                return this.responseHeadersAsync;
+            }
+        }
+
+        /// <summary>
         /// Gets the call status if the call has already finished.
         /// Throws InvalidOperationException otherwise.
         /// </summary>
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index ab2049f..2853a79 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -32,6 +32,7 @@
 #endregion
 
 using System;
+using System.Threading.Tasks;
 
 namespace Grpc.Core
 {
@@ -41,13 +42,15 @@
     public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
     {
         readonly IAsyncStreamReader<TResponse> responseStream;
+        readonly Task<Metadata> responseHeadersAsync;
         readonly Func<Status> getStatusFunc;
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+        public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
         {
             this.responseStream = responseStream;
+            this.responseHeadersAsync = responseHeadersAsync;
             this.getStatusFunc = getStatusFunc;
             this.getTrailersFunc = getTrailersFunc;
             this.disposeAction = disposeAction;
@@ -65,6 +68,17 @@
         }
 
         /// <summary>
+        /// Asynchronous access to response headers.
+        /// </summary>
+        public Task<Metadata> ResponseHeadersAsync
+        {
+            get
+            {
+                return this.responseHeadersAsync;
+            }
+        }
+
+        /// <summary>
         /// Gets the call status if the call has already finished.
         /// Throws InvalidOperationException otherwise.
         /// </summary>
diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
index 224e343..154a17a 100644
--- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs
+++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
@@ -43,13 +43,15 @@
     public sealed class AsyncUnaryCall<TResponse> : IDisposable
     {
         readonly Task<TResponse> responseAsync;
+        readonly Task<Metadata> responseHeadersAsync;
         readonly Func<Status> getStatusFunc;
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+        public AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
         {
             this.responseAsync = responseAsync;
+            this.responseHeadersAsync = responseHeadersAsync;
             this.getStatusFunc = getStatusFunc;
             this.getTrailersFunc = getTrailersFunc;
             this.disposeAction = disposeAction;
@@ -67,6 +69,17 @@
         }
 
         /// <summary>
+        /// Asynchronous access to response headers.
+        /// </summary>
+        public Task<Metadata> ResponseHeadersAsync
+        {
+            get
+            {
+                return this.responseHeadersAsync;
+            }
+        }
+
+        /// <summary>
         /// Allows awaiting this object directly.
         /// </summary>
         public TaskAwaiter<TResponse> GetAwaiter()
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 7067456..e57ac89 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -74,7 +74,7 @@
         {
             var asyncCall = new AsyncCall<TRequest, TResponse>(call);
             var asyncResult = asyncCall.UnaryCallAsync(req);
-            return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+            return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
         }
 
         /// <summary>
@@ -93,7 +93,7 @@
             var asyncCall = new AsyncCall<TRequest, TResponse>(call);
             asyncCall.StartServerStreamingCall(req);
             var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
-            return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+            return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
         }
 
         /// <summary>
@@ -110,7 +110,7 @@
             var asyncCall = new AsyncCall<TRequest, TResponse>(call);
             var resultTask = asyncCall.ClientStreamingCallAsync();
             var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
-            return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+            return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
         }
 
         /// <summary>
@@ -130,7 +130,7 @@
             asyncCall.StartDuplexStreamingCall();
             var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
             var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
-            return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+            return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
         }
     }
 }
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 2f8519d..c11b320 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -58,7 +58,6 @@
         readonly List<ChannelOption> options;
 
         bool shutdownRequested;
-        bool disposed;
 
         /// <summary>
         /// Creates a channel that connects to a specific host.
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 055aff1..ad2af17 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -49,6 +49,7 @@
     <Compile Include="AsyncDuplexStreamingCall.cs" />
     <Compile Include="AsyncServerStreamingCall.cs" />
     <Compile Include="IClientStreamWriter.cs" />
+    <Compile Include="Internal\INativeCall.cs" />
     <Compile Include="IServerStreamWriter.cs" />
     <Compile Include="IAsyncStreamWriter.cs" />
     <Compile Include="IAsyncStreamReader.cs" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 0a44eea..e7c0418 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -102,6 +102,14 @@
             }
         }
 
+        internal static int GetRefCount()
+        {
+            lock (staticLock)
+            {
+                return refCount;
+            }
+        }
+
         /// <summary>
         /// Gets application-wide logger used by gRPC.
         /// </summary>
@@ -177,7 +185,6 @@
             return Marshal.PtrToStringAnsi(ptr);
         }
 
-
         internal static void GrpcNativeInit()
         {
             grpcsharp_init();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index bb9ba5b..be5d611 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -51,22 +51,35 @@
         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
 
         readonly CallInvocationDetails<TRequest, TResponse> details;
+        readonly INativeCall injectedNativeCall;  // for testing
 
         // Completion of a pending unary response if not null.
         TaskCompletionSource<TResponse> unaryResponseTcs;
 
+        // Indicates that steaming call has finished.
+        TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+
+        // Response headers set here once received.
+        TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
+
         // Set after status is received. Used for both unary and streaming response calls.
         ClientSideStatus? finishedStatus;
 
-        bool readObserverCompleted;  // True if readObserver has already been completed.
-
         public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
-            : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
+            : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
         {
             this.details = callDetails.WithOptions(callDetails.Options.Normalize());
             this.initialMetadataSent = true;  // we always send metadata at the very beginning of the call.
         }
 
+        /// <summary>
+        /// This constructor should only be used for testing.
+        /// </summary>
+        public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
+        {
+            this.injectedNativeCall = injectedNativeCall;
+        }
+
         // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but 
         // it is reusing fair amount of code in this class, so we are leaving it here.
         /// <summary>
@@ -100,7 +113,7 @@
                         bool success = (ev.success != 0);
                         try
                         {
-                            HandleUnaryResponse(success, ctx);
+                            HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
                         }
                         catch (Exception e)
                         {
@@ -125,7 +138,7 @@
                 Preconditions.CheckState(!started);
                 started = true;
 
-                Initialize(details.Channel.Environment.CompletionQueue);
+                Initialize(environment.CompletionQueue);
 
                 halfcloseRequested = true;
                 readingDone = true;
@@ -152,7 +165,7 @@
                 Preconditions.CheckState(!started);
                 started = true;
 
-                Initialize(details.Channel.Environment.CompletionQueue);
+                Initialize(environment.CompletionQueue);
 
                 readingDone = true;
 
@@ -176,10 +189,9 @@
                 Preconditions.CheckState(!started);
                 started = true;
 
-                Initialize(details.Channel.Environment.CompletionQueue);
+                Initialize(environment.CompletionQueue);
 
                 halfcloseRequested = true;
-                halfclosed = true;  // halfclose not confirmed yet, but it will be once finishedHandler is called.
 
                 byte[] payload = UnsafeSerialize(msg);
 
@@ -187,6 +199,7 @@
                 {
                     call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
                 }
+                call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
             }
         }
 
@@ -201,12 +214,13 @@
                 Preconditions.CheckState(!started);
                 started = true;
 
-                Initialize(details.Channel.Environment.CompletionQueue);
+                Initialize(environment.CompletionQueue);
 
                 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
                 {
                     call.StartDuplexStreaming(HandleFinished, metadataArray);
                 }
+                call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
             }
         }
 
@@ -248,6 +262,28 @@
         }
 
         /// <summary>
+        /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
+        /// </summary>
+        public Task StreamingCallFinishedTask
+        {
+            get
+            {
+                return streamingCallFinishedTcs.Task;
+            }
+        }
+
+        /// <summary>
+        /// Get the task that completes once response headers are received.
+        /// </summary>
+        public Task<Metadata> ResponseHeadersAsync
+        {
+            get
+            {
+                return responseHeadersTcs.Task;
+            }
+        }
+
+        /// <summary>
         /// Gets the resulting status if the call has already finished.
         /// Throws InvalidOperationException otherwise.
         /// </summary>
@@ -281,36 +317,6 @@
             }
         }
 
-        /// <summary>
-        /// On client-side, we only fire readCompletionDelegate once all messages have been read 
-        /// and status has been received.
-        /// </summary>
-        protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
-        {
-            if (completionDelegate != null && readingDone && finishedStatus.HasValue)
-            {
-                bool shouldComplete;
-                lock (myLock)
-                {
-                    shouldComplete = !readObserverCompleted;
-                    readObserverCompleted = true;
-                }
-
-                if (shouldComplete)
-                {
-                    var status = finishedStatus.Value.Status;
-                    if (status.StatusCode != StatusCode.OK)
-                    {
-                        FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
-                    }
-                    else
-                    {
-                        FireCompletion(completionDelegate, default(TResponse), null);
-                    }
-                }
-            }
-        }
-
         protected override void OnAfterReleaseResources()
         {
             details.Channel.RemoveCallReference(this);
@@ -318,18 +324,26 @@
 
         private void Initialize(CompletionQueueSafeHandle cq)
         {
-            var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
-
-            var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
-                parentCall, ContextPropagationToken.DefaultMask, cq,
-                details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
-
+            var call = CreateNativeCall(cq);
             details.Channel.AddCallReference(this);
-
             InitializeInternal(call);
             RegisterCancellationCallback();
         }
 
+        private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
+        {
+            if (injectedNativeCall != null)
+            {
+                return injectedNativeCall;  // allows injecting a mock INativeCall in tests.
+            }
+
+            var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+            return details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+                parentCall, ContextPropagationToken.DefaultMask, cq,
+                details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
+        }
+
         // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
         private void RegisterCancellationCallback()
         {
@@ -350,31 +364,31 @@
         }
 
         /// <summary>
+        /// Handles receive status completion for calls with streaming response.
+        /// </summary>
+        private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
+        {
+            responseHeadersTcs.SetResult(responseHeaders);
+        }
+
+        /// <summary>
         /// Handler for unary response completion.
         /// </summary>
-        private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
+        private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
         {
-            var fullStatus = ctx.GetReceivedStatusOnClient();
-
             lock (myLock)
             {
                 finished = true;
-                finishedStatus = fullStatus;
-
-                halfclosed = true;
+                finishedStatus = receivedStatus;
 
                 ReleaseResourcesIfPossible();
             }
 
-            if (!success)
-            {
-                unaryResponseTcs.SetException(new RpcException(new Status(StatusCode.Internal, "Internal error occured.")));
-                return;
-            }
+            responseHeadersTcs.SetResult(responseHeaders);
 
-            var status = fullStatus.Status;
+            var status = receivedStatus.Status;
 
-            if (status.StatusCode != StatusCode.OK)
+            if (!success || status.StatusCode != StatusCode.OK)
             {
                 unaryResponseTcs.SetException(new RpcException(status));
                 return;
@@ -382,7 +396,7 @@
 
             // TODO: handle deserialization error
             TResponse msg;
-            TryDeserialize(ctx.GetReceivedMessage(), out msg);
+            TryDeserialize(receivedMessage, out msg);
 
             unaryResponseTcs.SetResult(msg);
         }
@@ -390,22 +404,25 @@
         /// <summary>
         /// Handles receive status completion for calls with streaming response.
         /// </summary>
-        private void HandleFinished(bool success, BatchContextSafeHandle ctx)
+        private void HandleFinished(bool success, ClientSideStatus receivedStatus)
         {
-            var fullStatus = ctx.GetReceivedStatusOnClient();
-
-            AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
             lock (myLock)
             {
                 finished = true;
-                finishedStatus = fullStatus;
-
-                origReadCompletionDelegate = readCompletionDelegate;
+                finishedStatus = receivedStatus;
 
                 ReleaseResourcesIfPossible();
             }
 
-            ProcessLastRead(origReadCompletionDelegate);
+            var status = receivedStatus.Status;
+
+            if (!success || status.StatusCode != StatusCode.OK)
+            {
+                streamingCallFinishedTcs.SetException(new RpcException(status));
+                return;
+            }
+
+            streamingCallFinishedTcs.SetResult(null);
         }
     }
 }
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 1808294..4d20394 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -54,30 +54,30 @@
         readonly Func<TWrite, byte[]> serializer;
         readonly Func<byte[], TRead> deserializer;
 
+        protected readonly GrpcEnvironment environment;
         protected readonly object myLock = new object();
 
-        protected CallSafeHandle call;
+        protected INativeCall call;
         protected bool disposed;
 
         protected bool started;
-        protected bool errorOccured;
         protected bool cancelRequested;
 
         protected AsyncCompletionDelegate<object> sendCompletionDelegate;  // Completion of a pending send or sendclose if not null.
         protected AsyncCompletionDelegate<TRead> readCompletionDelegate;  // Completion of a pending send or sendclose if not null.
 
-        protected bool readingDone;
-        protected bool halfcloseRequested;
-        protected bool halfclosed;
+        protected bool readingDone;  // True if last read (i.e. read with null payload) was already received.
+        protected bool halfcloseRequested;  // True if send close have been initiated.
         protected bool finished;  // True if close has been received from the peer.
 
         protected bool initialMetadataSent;
-        protected long streamingWritesCounter;
+        protected long streamingWritesCounter;  // Number of streaming send operations started so far.
 
-        public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+        public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
         {
             this.serializer = Preconditions.CheckNotNull(serializer);
             this.deserializer = Preconditions.CheckNotNull(deserializer);
+            this.environment = Preconditions.CheckNotNull(environment);
         }
 
         /// <summary>
@@ -114,7 +114,7 @@
             }
         }
 
-        protected void InitializeInternal(CallSafeHandle call)
+        protected void InitializeInternal(INativeCall call)
         {
             lock (myLock)
             {
@@ -159,16 +159,6 @@
             }
         }
 
-        // TODO(jtattermusch): find more fitting name for this method.
-        /// <summary>
-        /// Default behavior just completes the read observer, but more sofisticated behavior might be required
-        /// by subclasses.
-        /// </summary>
-        protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate)
-        {
-            FireCompletion(completionDelegate, default(TRead), null);
-        }
-
         /// <summary>
         /// If there are no more pending actions and no new actions can be started, releases
         /// the underlying native resources.
@@ -177,7 +167,7 @@
         {
             if (!disposed && call != null)
             {
-                bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null);
+                bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
                 if (noMoreSendCompletions && readingDone && finished)
                 {
                     ReleaseResources();
@@ -204,11 +194,11 @@
         protected void CheckSendingAllowed()
         {
             Preconditions.CheckState(started);
-            Preconditions.CheckState(!errorOccured);
             CheckNotCancelled();
             Preconditions.CheckState(!disposed);
 
             Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
+            Preconditions.CheckState(!finished, "Already finished.");
             Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
         }
 
@@ -216,7 +206,6 @@
         {
             Preconditions.CheckState(started);
             Preconditions.CheckState(!disposed);
-            Preconditions.CheckState(!errorOccured);
 
             Preconditions.CheckState(!readingDone, "Stream has already been closed.");
             Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
@@ -280,7 +269,7 @@
         /// <summary>
         /// Handles send completion.
         /// </summary>
-        protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
+        protected void HandleSendFinished(bool success)
         {
             AsyncCompletionDelegate<object> origCompletionDelegate = null;
             lock (myLock)
@@ -304,12 +293,11 @@
         /// <summary>
         /// Handles halfclose completion.
         /// </summary>
-        protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
+        protected void HandleHalfclosed(bool success)
         {
             AsyncCompletionDelegate<object> origCompletionDelegate = null;
             lock (myLock)
             {
-                halfclosed = true;
                 origCompletionDelegate = sendCompletionDelegate;
                 sendCompletionDelegate = null;
 
@@ -329,23 +317,17 @@
         /// <summary>
         /// Handles streaming read completion.
         /// </summary>
-        protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
+        protected void HandleReadFinished(bool success, byte[] receivedMessage)
         {
-            var payload = ctx.GetReceivedMessage();
-
             AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
             lock (myLock)
             {
                 origCompletionDelegate = readCompletionDelegate;
-                if (payload != null)
+                readCompletionDelegate = null;
+
+                if (receivedMessage == null)
                 {
-                    readCompletionDelegate = null;
-                }
-                else
-                {
-                    // This was the last read. Keeping the readCompletionDelegate
-                    // to be either fired by this handler or by client-side finished
-                    // handler.
+                    // This was the last read.
                     readingDone = true;
                 }
 
@@ -354,17 +336,17 @@
 
             // TODO: handle the case when error occured...
 
-            if (payload != null)
+            if (receivedMessage != null)
             {
                 // TODO: handle deserialization error
                 TRead msg;
-                TryDeserialize(payload, out msg);
+                TryDeserialize(receivedMessage, out msg);
 
                 FireCompletion(origCompletionDelegate, msg, null);
             }
             else
             {
-                ProcessLastRead(origCompletionDelegate);
+                FireCompletion(origCompletionDelegate, default(TRead), null);
             }
         }
     }
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 6278c01..5c47251 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -49,12 +49,10 @@
     {
         readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
         readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
-        readonly GrpcEnvironment environment;
         readonly Server server;
 
-        public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer)
+        public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
         {
-            this.environment = Preconditions.CheckNotNull(environment);
             this.server = Preconditions.CheckNotNull(server);
         }
 
@@ -185,10 +183,8 @@
         /// <summary>
         /// Handles the server side close completion.
         /// </summary>
-        private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
+        private void HandleFinishedServerside(bool success, bool cancelled)
         {
-            bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
-
             lock (myLock)
             {
                 finished = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 3cb01e2..0f18752 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -40,7 +40,7 @@
     /// <summary>
     /// grpc_call from <grpc/grpc.h>
     /// </summary>
-    internal class CallSafeHandle : SafeHandleZeroIsInvalid
+    internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall
     {
         public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
 
@@ -87,6 +87,10 @@
             BatchContextSafeHandle ctx);
 
         [DllImport("grpc_csharp_ext.dll")]
+        static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
+            BatchContextSafeHandle ctx);
+
+        [DllImport("grpc_csharp_ext.dll")]
         static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
             BatchContextSafeHandle ctx);
 
@@ -109,10 +113,10 @@
             this.completionRegistry = completionRegistry;
         }
 
-        public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+        public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
             grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
                 .CheckOk();
         }
@@ -123,66 +127,73 @@
                 .CheckOk();
         }
 
-        public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+        public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
             grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
         }
 
-        public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+        public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
             grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
         }
 
-        public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+        public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
             grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
         }
 
-        public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+        public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
             grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
         }
 
-        public void StartSendCloseFromClient(BatchCompletionDelegate callback)
+        public void StartSendCloseFromClient(SendCompletionHandler callback)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
             grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
         }
 
-        public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+        public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
             grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
         }
 
-        public void StartReceiveMessage(BatchCompletionDelegate callback)
+        public void StartReceiveMessage(ReceivedMessageHandler callback)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
             grpcsharp_call_recv_message(this, ctx).CheckOk();
         }
 
-        public void StartServerSide(BatchCompletionDelegate callback)
+        public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+            grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+        }
+
+        public void StartServerSide(ReceivedCloseOnServerHandler callback)
+        {
+            var ctx = BatchContextSafeHandle.Create();
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
             grpcsharp_call_start_serverside(this, ctx).CheckOk();
         }
 
-        public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+        public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
             grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
         }
 
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index 6c44521..b4a7335 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -72,7 +72,13 @@
             call.StartReadMessage(taskSource.CompletionDelegate);
             var result = await taskSource.Task;
             this.current = result;
-            return result != null;
+
+            if (result == null)
+            {
+                await call.StreamingCallFinishedTask;
+                return false;
+            }
+            return true;
         }
 
         public void Dispose()
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
new file mode 100644
index 0000000..cbef599
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -0,0 +1,85 @@
+#region Copyright notice and license
+// Copyright 2015, Google Inc.
+// All rights reserved.
+// 
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+// 
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+// 
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core.Internal
+{
+    internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
+
+    // Received status for streaming response calls.
+    internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus);
+
+    internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage);
+
+    internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders);
+
+    internal delegate void SendCompletionHandler(bool success);
+
+    internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled);
+
+    /// <summary>
+    /// Abstraction of a native call object.
+    /// </summary>
+    internal interface INativeCall : IDisposable
+    {
+        void Cancel();
+
+        void CancelWithStatus(Grpc.Core.Status status);
+
+        string GetPeer();
+
+        void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+        void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+        void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray);
+
+        void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+        void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray);
+
+        void StartReceiveMessage(ReceivedMessageHandler callback);
+
+        void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
+
+        void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
+
+        void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+
+        void StartSendCloseFromClient(SendCompletionHandler callback);
+
+        void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+
+        void StartServerSide(ReceivedCloseOnServerHandler callback);
+    }
+}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index fc9470f..489e219 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -595,7 +595,7 @@
     grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
     size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
   /* TODO: don't use magic number */
-  grpc_op ops[5];
+  grpc_op ops[4];
   ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
   grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
                                 initial_metadata);
@@ -615,23 +615,18 @@
   ops[2].flags = 0;
   ops[2].reserved = NULL;
 
-  ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
-  ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
-  ops[3].flags = 0;
-  ops[3].reserved = NULL;
-
-  ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
-  ops[4].data.recv_status_on_client.trailing_metadata =
+  ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+  ops[3].data.recv_status_on_client.trailing_metadata =
       &(ctx->recv_status_on_client.trailing_metadata);
-  ops[4].data.recv_status_on_client.status =
+  ops[3].data.recv_status_on_client.status =
       &(ctx->recv_status_on_client.status);
   /* not using preallocation for status_details */
-  ops[4].data.recv_status_on_client.status_details =
+  ops[3].data.recv_status_on_client.status_details =
       &(ctx->recv_status_on_client.status_details);
-  ops[4].data.recv_status_on_client.status_details_capacity =
+  ops[3].data.recv_status_on_client.status_details_capacity =
       &(ctx->recv_status_on_client.status_details_capacity);
-  ops[4].flags = 0;
-  ops[4].reserved = NULL;
+  ops[3].flags = 0;
+  ops[3].reserved = NULL;
 
   return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
                                NULL);
@@ -642,7 +637,7 @@
                                       grpcsharp_batch_context *ctx,
                                       grpc_metadata_array *initial_metadata) {
   /* TODO: don't use magic number */
-  grpc_op ops[3];
+  grpc_op ops[2];
   ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
   grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
                                 initial_metadata);
@@ -652,28 +647,36 @@
   ops[0].flags = 0;
   ops[0].reserved = NULL;
 
-  ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
-  ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+  ops[1].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+  ops[1].data.recv_status_on_client.trailing_metadata =
+      &(ctx->recv_status_on_client.trailing_metadata);
+  ops[1].data.recv_status_on_client.status =
+      &(ctx->recv_status_on_client.status);
+  /* not using preallocation for status_details */
+  ops[1].data.recv_status_on_client.status_details =
+      &(ctx->recv_status_on_client.status_details);
+  ops[1].data.recv_status_on_client.status_details_capacity =
+      &(ctx->recv_status_on_client.status_details_capacity);
   ops[1].flags = 0;
   ops[1].reserved = NULL;
 
-  ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
-  ops[2].data.recv_status_on_client.trailing_metadata =
-      &(ctx->recv_status_on_client.trailing_metadata);
-  ops[2].data.recv_status_on_client.status =
-      &(ctx->recv_status_on_client.status);
-  /* not using preallocation for status_details */
-  ops[2].data.recv_status_on_client.status_details =
-      &(ctx->recv_status_on_client.status_details);
-  ops[2].data.recv_status_on_client.status_details_capacity =
-      &(ctx->recv_status_on_client.status_details_capacity);
-  ops[2].flags = 0;
-  ops[2].reserved = NULL;
-
   return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
                                NULL);
 }
 
+GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata(
+	grpc_call *call, grpcsharp_batch_context *ctx) {
+	/* TODO: don't use magic number */
+	grpc_op ops[1];
+	ops[0].op = GRPC_OP_RECV_INITIAL_METADATA;
+	ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+	ops[0].flags = 0;
+	ops[0].reserved = NULL;
+
+	return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+		NULL);
+}
+
 GPR_EXPORT grpc_call_error GPR_CALLTYPE
 grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
                             const char *send_buffer, size_t send_buffer_len,
diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py
new file mode 100644
index 0000000..d3be3a4
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_constants.py
@@ -0,0 +1,59 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private constants for the package."""
+
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = {
+    base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE,
+    base.Subscription.Kind.TERMINATION_ONLY:
+        links.Ticket.Subscription.TERMINATION,
+    base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL,
+    }
+
+# Mapping from abortive operation outcome to ticket termination to be
+# sent to the other side of the operation, or None to indicate that no
+# ticket should be sent to the other side in the event of such an
+# outcome.
+ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
+    base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION,
+    base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION,
+    base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
+    base.Outcome.REMOTE_SHUTDOWN: None,
+    base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE,
+    base.Outcome.TRANSMISSION_FAILURE: None,
+    base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
+    base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
+}
+
+INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
+TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = (
+    'Exception calling termination callback!')
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
new file mode 100644
index 0000000..24a12b6
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_context.py
@@ -0,0 +1,92 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation context."""
+
+import time
+
+# _interfaces is referenced from specification in this module.
+from grpc.framework.core import _interfaces  # pylint: disable=unused-import
+from grpc.framework.interfaces.base import base
+
+
+class OperationContext(base.OperationContext):
+  """An implementation of interfaces.OperationContext."""
+
+  def __init__(
+      self, lock, termination_manager, transmission_manager,
+      expiration_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+
+  def _abort(self, outcome):
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        self._termination_manager.abort(outcome)
+        self._transmission_manager.abort(outcome)
+        self._expiration_manager.terminate()
+
+  def outcome(self):
+    """See base.OperationContext.outcome for specification."""
+    with self._lock:
+      return self._termination_manager.outcome
+
+  def add_termination_callback(self, callback):
+    """See base.OperationContext.add_termination_callback."""
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        self._termination_manager.add_callback(callback)
+        return None
+      else:
+        return self._termination_manager.outcome
+
+  def time_remaining(self):
+    """See base.OperationContext.time_remaining for specification."""
+    with self._lock:
+      deadline = self._expiration_manager.deadline()
+    return max(0.0, deadline - time.time())
+
+  def cancel(self):
+    """See base.OperationContext.cancel for specification."""
+    self._abort(base.Outcome.CANCELLED)
+
+  def fail(self, exception):
+    """See base.OperationContext.fail for specification."""
+    self._abort(base.Outcome.LOCAL_FAILURE)
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
new file mode 100644
index 0000000..7c702ab
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_emission.py
@@ -0,0 +1,97 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for handling emitted values."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+
+
+class EmissionManager(_interfaces.EmissionManager):
+  """An EmissionManager implementation."""
+
+  def __init__(
+      self, lock, termination_manager, transmission_manager,
+      expiration_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+    self._ingestion_manager = None
+
+    self._initial_metadata_seen = False
+    self._payload_seen = False
+    self._completion_seen = False
+
+  def set_ingestion_manager(self, ingestion_manager):
+    """Sets the ingestion manager with which this manager will cooperate.
+
+    Args:
+      ingestion_manager: The _interfaces.IngestionManager for the operation.
+    """
+    self._ingestion_manager = ingestion_manager
+
+  def advance(
+      self, initial_metadata=None, payload=None, completion=None,
+      allowance=None):
+    initial_metadata_present = initial_metadata is not None
+    payload_present = payload is not None
+    completion_present = completion is not None
+    allowance_present = allowance is not None
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        if (initial_metadata_present and (
+                self._initial_metadata_seen or self._payload_seen or
+                self._completion_seen) or
+            payload_present and self._completion_seen or
+            completion_present and self._completion_seen or
+            allowance_present and allowance <= 0):
+          self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
+          self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE)
+          self._expiration_manager.terminate()
+        else:
+          self._initial_metadata_seen |= initial_metadata_present
+          self._payload_seen |= payload_present
+          self._completion_seen |= completion_present
+          if completion_present:
+            self._termination_manager.emission_complete()
+            self._ingestion_manager.local_emissions_done()
+          self._transmission_manager.advance(
+              initial_metadata, payload, completion, allowance)
+          if allowance_present:
+            self._ingestion_manager.add_local_allowance(allowance)
diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py
new file mode 100644
index 0000000..fb2c532
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_end.py
@@ -0,0 +1,251 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of base.End."""
+
+import abc
+import enum
+import threading
+import uuid
+
+from grpc.framework.core import _operation
+from grpc.framework.core import _utilities
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import later
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
+
+
+class End(base.End, links.Link):
+  """A bridge between base.End and links.Link.
+
+  Implementations of this interface translate arriving tickets into
+  calls on application objects implementing base interfaces and
+  translate calls from application objects implementing base interfaces
+  into tickets sent to a joined link.
+  """
+  __metaclass__ = abc.ABCMeta
+
+
+class _Cycle(object):
+  """State for a single start-stop End lifecycle."""
+
+  def __init__(self, pool):
+    self.pool = pool
+    self.grace = False
+    self.futures = []
+    self.operations = {}
+    self.idle_actions = []
+
+
+def _abort(operations):
+  for operation in operations:
+    operation.abort(base.Outcome.LOCAL_SHUTDOWN)
+
+
+def _cancel_futures(futures):
+  for future in futures:
+    futures.cancel()
+
+
+def _future_shutdown(lock, cycle, event):
+  def in_future():
+    with lock:
+      _abort(cycle.operations.values())
+      _cancel_futures(cycle.futures)
+      pool = cycle.pool
+    cycle.pool.shutdown(wait=True)
+  return in_future
+
+
+def _termination_action(lock, stats, operation_id, cycle):
+  """Constructs the termination action for a single operation.
+
+  Args:
+    lock: A lock to hold during the termination action.
+    states: A mapping from base.Outcome values to integers to increment with
+      the outcome given to the termination action.
+    operation_id: The operation ID for the termination action.
+    cycle: A _Cycle value to be updated during the termination action.
+
+  Returns:
+    A callable that takes an operation outcome as its sole parameter and that
+      should be used as the termination action for the operation associated
+      with the given operation ID.
+  """
+  def termination_action(outcome):
+    with lock:
+      stats[outcome] += 1
+      cycle.operations.pop(operation_id, None)
+      if not cycle.operations:
+        for action in cycle.idle_actions:
+          cycle.pool.submit(action)
+        cycle.idle_actions = []
+        if cycle.grace:
+          _cancel_futures(cycle.futures)
+  return termination_action
+
+
+class _End(End):
+  """An End implementation."""
+
+  def __init__(self, servicer_package):
+    """Constructor.
+
+    Args:
+      servicer_package: A _ServicerPackage for servicing operations or None if
+        this end will not be used to service operations.
+    """
+    self._lock = threading.Condition()
+    self._servicer_package = servicer_package
+
+    self._stats = {outcome: 0 for outcome in base.Outcome}
+
+    self._mate = None
+
+    self._cycle = None
+
+  def start(self):
+    """See base.End.start for specification."""
+    with self._lock:
+      if self._cycle is not None:
+        raise ValueError('Tried to start a not-stopped End!')
+      else:
+        self._cycle = _Cycle(logging_pool.pool(1))
+
+  def stop(self, grace):
+    """See base.End.stop for specification."""
+    with self._lock:
+      if self._cycle is None:
+        event = threading.Event()
+        event.set()
+        return event
+      elif not self._cycle.operations:
+        event = threading.Event()
+        self._cycle.pool.submit(event.set)
+        self._cycle.pool.shutdown(wait=False)
+        self._cycle = None
+        return event
+      else:
+        self._cycle.grace = True
+        event = threading.Event()
+        self._cycle.idle_actions.append(event.set)
+        if 0 < grace:
+          future = later.later(
+              grace, _future_shutdown(self._lock, self._cycle, event))
+          self._cycle.futures.append(future)
+        else:
+          _abort(self._cycle.operations.values())
+        return event
+
+  def operate(
+      self, group, method, subscription, timeout, initial_metadata=None,
+      payload=None, completion=None):
+    """See base.End.operate for specification."""
+    operation_id = uuid.uuid4()
+    with self._lock:
+      if self._cycle is None or self._cycle.grace:
+        raise ValueError('Can\'t operate on stopped or stopping End!')
+      termination_action = _termination_action(
+          self._lock, self._stats, operation_id, self._cycle)
+      operation = _operation.invocation_operate(
+          operation_id, group, method, subscription, timeout, initial_metadata,
+          payload, completion, self._mate.accept_ticket, termination_action,
+          self._cycle.pool)
+      self._cycle.operations[operation_id] = operation
+      return operation.context, operation.operator
+
+  def operation_stats(self):
+    """See base.End.operation_stats for specification."""
+    with self._lock:
+      return dict(self._stats)
+
+  def add_idle_action(self, action):
+    """See base.End.add_idle_action for specification."""
+    with self._lock:
+      if self._cycle is None:
+        raise ValueError('Can\'t add idle action to stopped End!')
+      action_with_exceptions_logged = callable_util.with_exceptions_logged(
+          action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)
+      if self._cycle.operations:
+        self._cycle.idle_actions.append(action_with_exceptions_logged)
+      else:
+        self._cycle.pool.submit(action_with_exceptions_logged)
+
+  def accept_ticket(self, ticket):
+    """See links.Link.accept_ticket for specification."""
+    with self._lock:
+      if self._cycle is not None and not self._cycle.grace:
+        operation = self._cycle.operations.get(ticket.operation_id)
+        if operation is not None:
+          operation.handle_ticket(ticket)
+        elif self._servicer_package is not None:
+          termination_action = _termination_action(
+              self._lock, self._stats, ticket.operation_id, self._cycle)
+          operation = _operation.service_operate(
+              self._servicer_package, ticket, self._mate.accept_ticket,
+              termination_action, self._cycle.pool)
+          if operation is not None:
+            self._cycle.operations[ticket.operation_id] = operation
+
+  def join_link(self, link):
+    """See links.Link.join_link for specification."""
+    with self._lock:
+      self._mate = utilities.NULL_LINK if link is None else link
+
+
+def serviceless_end_link():
+  """Constructs an End usable only for invoking operations.
+
+  Returns:
+    An End usable for translating operations into ticket exchange.
+  """
+  return _End(None)
+
+
+def serviceful_end_link(servicer, default_timeout, maximum_timeout):
+  """Constructs an End capable of servicing operations.
+
+  Args:
+    servicer: An interfaces.Servicer for servicing operations.
+    default_timeout: A length of time in seconds to be used as the default
+      time alloted for a single operation.
+    maximum_timeout: A length of time in seconds to be used as the maximum
+      time alloted for a single operation.
+
+  Returns:
+    An End capable of servicing the operations requested of it through ticket
+      exchange.
+  """
+  return _End(
+      _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout))
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
new file mode 100644
index 0000000..d94bdf2
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_expiration.py
@@ -0,0 +1,152 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation expiration."""
+
+import time
+
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import later
+from grpc.framework.interfaces.base import base
+
+
+class _ExpirationManager(_interfaces.ExpirationManager):
+  """An implementation of _interfaces.ExpirationManager."""
+
+  def __init__(
+      self, commencement, timeout, maximum_timeout, lock, termination_manager,
+      transmission_manager):
+    """Constructor.
+
+    Args:
+      commencement: The time in seconds since the epoch at which the operation
+        began.
+      timeout: A length of time in seconds to allow for the operation to run.
+      maximum_timeout: The maximum length of time in seconds to allow for the
+        operation to run despite what is requested via this object's
+        change_timout method.
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._commencement = commencement
+    self._maximum_timeout = maximum_timeout
+
+    self._timeout = timeout
+    self._deadline = commencement + timeout
+    self._index = None
+    self._future = None
+
+  def _expire(self, index):
+    def expire():
+      with self._lock:
+        if self._future is not None and index == self._index:
+          self._future = None
+          self._termination_manager.expire()
+          self._transmission_manager.abort(base.Outcome.EXPIRED)
+    return expire
+
+  def start(self):
+    self._index = 0
+    self._future = later.later(self._timeout, self._expire(0))
+
+  def change_timeout(self, timeout):
+    if self._future is not None and timeout != self._timeout:
+      self._future.cancel()
+      new_timeout = min(timeout, self._maximum_timeout)
+      new_index = self._index + 1
+      self._timeout = new_timeout
+      self._deadline = self._commencement + new_timeout
+      self._index = new_index
+      delay = self._deadline - time.time()
+      self._future = later.later(delay, self._expire(new_index))
+      if new_timeout != timeout:
+        self._transmission_manager.timeout(new_timeout)
+
+  def deadline(self):
+    return self._deadline
+
+  def terminate(self):
+    if self._future:
+      self._future.cancel()
+      self._future = None
+    self._deadline_index = None
+
+
+def invocation_expiration_manager(
+    timeout, lock, termination_manager, transmission_manager):
+  """Creates an _interfaces.ExpirationManager appropriate for front-side use.
+
+  Args:
+    timeout: A length of time in seconds to allow for the operation to run.
+    lock: The operation-wide lock.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+
+  Returns:
+    An _interfaces.ExpirationManager appropriate for invocation-side use.
+  """
+  expiration_manager = _ExpirationManager(
+      time.time(), timeout, timeout, lock, termination_manager,
+      transmission_manager)
+  expiration_manager.start()
+  return expiration_manager
+
+
+def service_expiration_manager(
+    timeout, default_timeout, maximum_timeout, lock, termination_manager,
+    transmission_manager):
+  """Creates an _interfaces.ExpirationManager appropriate for back-side use.
+
+  Args:
+    timeout: A length of time in seconds to allow for the operation to run. May
+      be None in which case default_timeout will be used.
+    default_timeout: The default length of time in seconds to allow for the
+      operation to run if the front-side customer has not specified such a value
+      (or if the value they specified is not yet known).
+    maximum_timeout: The maximum length of time in seconds to allow for the
+      operation to run.
+    lock: The operation-wide lock.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+
+  Returns:
+    An _interfaces.ExpirationManager appropriate for service-side use.
+  """
+  expiration_manager = _ExpirationManager(
+      time.time(), default_timeout if timeout is None else timeout,
+      maximum_timeout, lock, termination_manager, transmission_manager)
+  expiration_manager.start()
+  return expiration_manager
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
new file mode 100644
index 0000000..59f7f8a
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -0,0 +1,410 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ingestion during an operation."""
+
+import abc
+import collections
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
+_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
+
+
+class _SubscriptionCreation(collections.namedtuple(
+    '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))):
+  """A sum type for the outcome of ingestion initialization.
+
+  Either subscription will be non-None, remote_error will be True, or abandoned
+  will be True.
+
+  Attributes:
+    subscription: A base.Subscription describing the customer's interest in
+      operation values from the other side.
+    remote_error: A boolean indicating that the subscription could not be
+      created due to an error on the remote side of the operation.
+    abandoned: A boolean indicating that subscription creation was abandoned.
+  """
+
+
+class _SubscriptionCreator(object):
+  """Common specification of subscription-creating behavior."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def create(self, group, method):
+    """Creates the base.Subscription of the local customer.
+
+    Any exceptions raised by this method should be attributed to and treated as
+    defects in the customer code called by this method.
+
+    Args:
+      group: The group identifier of the operation.
+      method: The method identifier of the operation.
+
+    Returns:
+      A _SubscriptionCreation describing the result of subscription creation.
+    """
+    raise NotImplementedError()
+
+
+class _ServiceSubscriptionCreator(_SubscriptionCreator):
+  """A _SubscriptionCreator appropriate for service-side use."""
+
+  def __init__(self, servicer, operation_context, output_operator):
+    """Constructor.
+
+    Args:
+      servicer: The base.Servicer that will service the operation.
+      operation_context: A base.OperationContext for the operation to be passed
+        to the customer.
+      output_operator: A base.Operator for the operation to be passed to the
+        customer and to be called by the customer to accept operation data
+        emitted by the customer.
+    """
+    self._servicer = servicer
+    self._operation_context = operation_context
+    self._output_operator = output_operator
+
+  def create(self, group, method):
+    try:
+      subscription = self._servicer.service(
+          group, method, self._operation_context, self._output_operator)
+    except base.NoSuchMethodError:
+      return _SubscriptionCreation(None, True, False)
+    except abandonment.Abandoned:
+      return _SubscriptionCreation(None, False, True)
+    else:
+      return _SubscriptionCreation(subscription, False, False)
+
+
+def _wrap(behavior):
+  def wrapped(*args, **kwargs):
+    try:
+      behavior(*args, **kwargs)
+    except abandonment.Abandoned:
+      return False
+    else:
+      return True
+  return wrapped
+
+
+class _IngestionManager(_interfaces.IngestionManager):
+  """An implementation of _interfaces.IngestionManager."""
+
+  def __init__(
+      self, lock, pool, subscription, subscription_creator, termination_manager,
+      transmission_manager, expiration_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      pool: A thread pool in which to execute customer code.
+      subscription: A base.Subscription describing the customer's interest in
+        operation values from the other side. May be None if
+        subscription_creator is not None.
+      subscription_creator: A _SubscriptionCreator wrapping the portion of
+        customer code that when called returns the base.Subscription describing
+        the customer's interest in operation values from the other side. May be
+        None if subscription is not None.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+    """
+    self._lock = lock
+    self._pool = pool
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+
+    if subscription is None:
+      self._subscription_creator = subscription_creator
+      self._wrapped_operator = None
+    elif subscription.kind is base.Subscription.Kind.FULL:
+      self._subscription_creator = None
+      self._wrapped_operator = _wrap(subscription.operator.advance)
+    else:
+      # TODO(nathaniel): Support other subscriptions.
+      raise ValueError('Unsupported subscription "%s"!' % subscription.kind)
+    self._pending_initial_metadata = None
+    self._pending_payloads = []
+    self._pending_completion = None
+    self._local_allowance = 1
+    # A nonnegative integer or None, with None indicating that the local
+    # customer is done emitting anyway so there's no need to bother it by
+    # informing it that the remote customer has granted it further permission to
+    # emit.
+    self._remote_allowance = 0
+    self._processing = False
+
+  def _abort_internal_only(self):
+    self._subscription_creator = None
+    self._wrapped_operator = None
+    self._pending_initial_metadata = None
+    self._pending_payloads = None
+    self._pending_completion = None
+
+  def _abort_and_notify(self, outcome):
+    self._abort_internal_only()
+    self._termination_manager.abort(outcome)
+    self._transmission_manager.abort(outcome)
+    self._expiration_manager.terminate()
+
+  def _operator_next(self):
+    """Computes the next step for full-subscription ingestion.
+
+    Returns:
+      An initial_metadata, payload, completion, allowance, continue quintet
+        indicating what operation values (if any) are available to pass into
+        customer code and whether or not there is anything immediately
+        actionable to call customer code to do.
+    """
+    if self._wrapped_operator is None:
+      return None, None, None, None, False
+    else:
+      initial_metadata, payload, completion, allowance, action = [None] * 5
+      if self._pending_initial_metadata is not None:
+        initial_metadata = self._pending_initial_metadata
+        self._pending_initial_metadata = None
+        action = True
+      if self._pending_payloads and 0 < self._local_allowance:
+        payload = self._pending_payloads.pop(0)
+        self._local_allowance -= 1
+        action = True
+      if not self._pending_payloads and self._pending_completion is not None:
+        completion = self._pending_completion
+        self._pending_completion = None
+        action = True
+      if self._remote_allowance is not None and 0 < self._remote_allowance:
+        allowance = self._remote_allowance
+        self._remote_allowance = 0
+        action = True
+      return initial_metadata, payload, completion, allowance, bool(action)
+
+  def _operator_process(
+      self, wrapped_operator, initial_metadata, payload,
+      completion, allowance):
+    while True:
+      advance_outcome = callable_util.call_logging_exceptions(
+          wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE,
+          initial_metadata=initial_metadata, payload=payload,
+          completion=completion, allowance=allowance)
+      if advance_outcome.exception is None:
+        if advance_outcome.return_value:
+          with self._lock:
+            if self._termination_manager.outcome is not None:
+              return
+            if completion is not None:
+              self._termination_manager.ingestion_complete()
+            initial_metadata, payload, completion, allowance, moar = (
+                self._operator_next())
+            if not moar:
+              self._processing = False
+              return
+        else:
+          with self._lock:
+            if self._termination_manager.outcome is None:
+              self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+            return
+      else:
+        with self._lock:
+          if self._termination_manager.outcome is None:
+            self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+          return
+
+  def _operator_post_create(self, subscription):
+    wrapped_operator = _wrap(subscription.operator.advance)
+    with self._lock:
+      if self._termination_manager.outcome is not None:
+        return
+      self._wrapped_operator = wrapped_operator
+      self._subscription_creator = None
+      metadata, payload, completion, allowance, moar = self._operator_next()
+      if not moar:
+        self._processing = False
+        return
+    self._operator_process(
+        wrapped_operator, metadata, payload, completion, allowance)
+
+  def _create(self, subscription_creator, group, name):
+    outcome = callable_util.call_logging_exceptions(
+        subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE,
+        group, name)
+    if outcome.return_value is None:
+      with self._lock:
+        if self._termination_manager.outcome is None:
+          self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+    elif outcome.return_value.abandoned:
+      with self._lock:
+        if self._termination_manager.outcome is None:
+          self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+    elif outcome.return_value.remote_error:
+      with self._lock:
+        if self._termination_manager.outcome is None:
+          self._abort_and_notify(base.Outcome.REMOTE_FAILURE)
+    elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
+      self._operator_post_create(outcome.return_value.subscription)
+    else:
+      # TODO(nathaniel): Support other subscriptions.
+      raise ValueError(
+          'Unsupported "%s"!' % outcome.return_value.subscription.kind)
+
+  def _store_advance(self, initial_metadata, payload, completion, allowance):
+    if initial_metadata is not None:
+      self._pending_initial_metadata = initial_metadata
+    if payload is not None:
+      self._pending_payloads.append(payload)
+    if completion is not None:
+      self._pending_completion = completion
+    if allowance is not None and self._remote_allowance is not None:
+      self._remote_allowance += allowance
+
+  def _operator_advance(self, initial_metadata, payload, completion, allowance):
+    if self._processing:
+      self._store_advance(initial_metadata, payload, completion, allowance)
+    else:
+      action = False
+      if initial_metadata is not None:
+        action = True
+      if payload is not None:
+        if 0 < self._local_allowance:
+          self._local_allowance -= 1
+          action = True
+        else:
+          self._pending_payloads.append(payload)
+          payload = False
+      if completion is not None:
+        if self._pending_payloads:
+          self._pending_completion = completion
+        else:
+          action = True
+      if allowance is not None and self._remote_allowance is not None:
+        allowance += self._remote_allowance
+        self._remote_allowance = 0
+        action = True
+      if action:
+        self._pool.submit(
+            callable_util.with_exceptions_logged(
+                self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+            self._wrapped_operator, initial_metadata, payload, completion,
+            allowance)
+
+  def set_group_and_method(self, group, method):
+    """See _interfaces.IngestionManager.set_group_and_method for spec."""
+    if self._subscription_creator is not None and not self._processing:
+      self._pool.submit(
+          callable_util.with_exceptions_logged(
+              self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+          self._subscription_creator, group, method)
+      self._processing = True
+
+  def add_local_allowance(self, allowance):
+    """See _interfaces.IngestionManager.add_local_allowance for spec."""
+    if any((self._subscription_creator, self._wrapped_operator,)):
+      self._local_allowance += allowance
+      if not self._processing:
+        initial_metadata, payload, completion, allowance, moar = (
+            self._operator_next())
+        if moar:
+          self._pool.submit(
+              callable_util.with_exceptions_logged(
+                  self._operator_process,
+                  _constants.INTERNAL_ERROR_LOG_MESSAGE),
+              initial_metadata, payload, completion, allowance)
+
+  def local_emissions_done(self):
+    self._remote_allowance = None
+
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """See _interfaces.IngestionManager.advance for specification."""
+    if self._subscription_creator is not None:
+      self._store_advance(initial_metadata, payload, completion, allowance)
+    elif self._wrapped_operator is not None:
+      self._operator_advance(initial_metadata, payload, completion, allowance)
+
+
+def invocation_ingestion_manager(
+    subscription, lock, pool, termination_manager, transmission_manager,
+    expiration_manager):
+  """Creates an IngestionManager appropriate for invocation-side use.
+
+  Args:
+    subscription: A base.Subscription indicating the customer's interest in the
+      data and results from the service-side of the operation.
+    lock: The operation-wide lock.
+    pool: A thread pool in which to execute customer code.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+    expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+  Returns:
+    An IngestionManager appropriate for invocation-side use.
+  """
+  return _IngestionManager(
+      lock, pool, subscription, None, termination_manager, transmission_manager,
+      expiration_manager)
+
+
+def service_ingestion_manager(
+    servicer, operation_context, output_operator, lock, pool,
+    termination_manager, transmission_manager, expiration_manager):
+  """Creates an IngestionManager appropriate for service-side use.
+
+  The returned IngestionManager will require its set_group_and_name method to be
+  called before its advance method may be called.
+
+  Args:
+    servicer: A base.Servicer for servicing the operation.
+    operation_context: A base.OperationContext for the operation to be passed to
+      the customer.
+    output_operator: A base.Operator for the operation to be passed to the
+      customer and to be called by the customer to accept operation data output
+      by the customer.
+    lock: The operation-wide lock.
+    pool: A thread pool in which to execute customer code.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+    expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+  Returns:
+    An IngestionManager appropriate for service-side use.
+  """
+  subscription_creator = _ServiceSubscriptionCreator(
+      servicer, operation_context, output_operator)
+  return _IngestionManager(
+      lock, pool, None, subscription_creator, termination_manager,
+      transmission_manager, expiration_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
new file mode 100644
index 0000000..a626b9f
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_interfaces.py
@@ -0,0 +1,308 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal interfaces."""
+
+import abc
+
+from grpc.framework.interfaces.base import base
+
+
+class TerminationManager(object):
+  """An object responsible for handling the termination of an operation.
+
+  Attributes:
+    outcome: None if the operation is active or a base.Outcome value if it has
+      terminated.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def add_callback(self, callback):
+    """Registers a callback to be called on operation termination.
+
+    If the operation has already terminated the callback will not be called.
+
+    Args:
+      callback: A callable that will be passed an interfaces.Outcome value.
+
+    Returns:
+      None if the operation has not yet terminated and the passed callback will
+        be called when it does, or a base.Outcome value describing the operation
+        termination if the operation has terminated and the callback will not be
+        called as a result of this method call.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def emission_complete(self):
+    """Indicates that emissions from customer code have completed."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def transmission_complete(self):
+    """Indicates that transmissions to the remote end are complete.
+
+    Returns:
+      True if the operation has terminated or False if the operation remains
+        ongoing.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def reception_complete(self):
+    """Indicates that reception from the other side is complete."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def ingestion_complete(self):
+    """Indicates that customer code ingestion of received values is complete."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def expire(self):
+    """Indicates that the operation must abort because it has taken too long."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def abort(self, outcome):
+    """Indicates that the operation must abort for the indicated reason.
+
+    Args:
+      outcome: An interfaces.Outcome indicating operation abortion.
+    """
+    raise NotImplementedError()
+
+
+class TransmissionManager(object):
+  """A manager responsible for transmitting to the other end of an operation."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def kick_off(
+      self, group, method, timeout, initial_metadata, payload, completion,
+      allowance):
+    """Transmits the values associated with operation invocation."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """Accepts values for transmission to the other end of the operation.
+
+    Args:
+      initial_metadata: An initial metadata value to be transmitted to the other
+        side of the operation. May only ever be non-None once.
+      payload: A payload value.
+      completion: A base.Completion value. May only ever be non-None in the last
+        transmission to be made to the other side.
+      allowance: A positive integer communicating the number of additional
+        payloads allowed to be transmitted from the other side to this side of
+        the operation, or None if no additional allowance is being granted in
+        this call.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def timeout(self, timeout):
+    """Accepts for transmission to the other side a new timeout value.
+
+    Args:
+      timeout: A positive float used as the new timeout value for the operation
+        to be transmitted to the other side.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def allowance(self, allowance):
+    """Indicates to this manager that the remote customer is allowing payloads.
+
+    Args:
+      allowance: A positive integer indicating the number of additional payloads
+        the remote customer is allowing to be transmitted from this side of the
+        operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def remote_complete(self):
+    """Indicates to this manager that data from the remote side is complete."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def abort(self, outcome):
+    """Indicates that the operation has aborted.
+
+    Args:
+      outcome: An interfaces.Outcome for the operation. If None, indicates that
+        the operation abortion should not be communicated to the other side of
+        the operation.
+    """
+    raise NotImplementedError()
+
+
+class ExpirationManager(object):
+  """A manager responsible for aborting the operation if it runs out of time."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def change_timeout(self, timeout):
+    """Changes the timeout allotted for the operation.
+
+    Operation duration is always measure from the beginning of the operation;
+    calling this method changes the operation's allotted time to timeout total
+    seconds, not timeout seconds from the time of this method call.
+
+    Args:
+      timeout: A length of time in seconds to allow for the operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def deadline(self):
+    """Returns the time until which the operation is allowed to run.
+
+    Returns:
+      The time (seconds since the epoch) at which the operation will expire.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def terminate(self):
+    """Indicates to this manager that the operation has terminated."""
+    raise NotImplementedError()
+
+
+class EmissionManager(base.Operator):
+  """A manager of values emitted by customer code."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def advance(
+      self, initial_metadata=None, payload=None, completion=None,
+      allowance=None):
+    """Accepts a value emitted by customer code.
+
+    This method should only be called by customer code.
+
+    Args:
+      initial_metadata: An initial metadata value emitted by the local customer
+        to be sent to the other side of the operation.
+      payload: A payload value emitted by the local customer to be sent to the
+        other side of the operation.
+      completion: A Completion value emitted by the local customer to be sent to
+        the other side of the operation.
+      allowance: A positive integer indicating an additional number of payloads
+        that the local customer is willing to accept from the other side of the
+        operation.
+    """
+    raise NotImplementedError()
+
+
+class IngestionManager(object):
+  """A manager responsible for executing customer code.
+
+  This name of this manager comes from its responsibility to pass successive
+  values from the other side of the operation into the code of the local
+  customer.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def set_group_and_method(self, group, method):
+    """Communicates to this IngestionManager the operation group and method.
+
+    Args:
+      group: The group identifier of the operation.
+      method: The method identifier of the operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def add_local_allowance(self, allowance):
+    """Communicates to this IngestionManager that more payloads may be ingested.
+
+    Args:
+      allowance: A positive integer indicating an additional number of payloads
+        that the local customer is willing to ingest.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def local_emissions_done(self):
+    """Indicates to this manager that local emissions are done."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """Advances the operation by passing values to the local customer."""
+    raise NotImplementedError()
+
+
+class ReceptionManager(object):
+  """A manager responsible for receiving tickets from the other end."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def receive_ticket(self, ticket):
+    """Handle a ticket from the other side of the operation.
+
+    Args:
+      ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
+        appropriate to this end of the operation and this object.
+    """
+    raise NotImplementedError()
+
+
+class Operation(object):
+  """An ongoing operation.
+
+  Attributes:
+    context: A base.OperationContext object for the operation.
+    operator: A base.Operator object for the operation for use by the customer
+      of the operation.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def handle_ticket(self, ticket):
+    """Handle a ticket from the other side of the operation.
+
+    Args:
+      ticket: A links.Ticket from the other side of the operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def abort(self, outcome):
+    """Aborts the operation.
+
+    Args:
+      outcome: A base.Outcome value indicating operation abortion.
+    """
+    raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
new file mode 100644
index 0000000..d20e40a
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_operation.py
@@ -0,0 +1,192 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of operations."""
+
+import threading
+
+# _utilities is referenced from specification in this module.
+from grpc.framework.core import _context
+from grpc.framework.core import _emission
+from grpc.framework.core import _expiration
+from grpc.framework.core import _ingestion
+from grpc.framework.core import _interfaces
+from grpc.framework.core import _reception
+from grpc.framework.core import _termination
+from grpc.framework.core import _transmission
+from grpc.framework.core import _utilities  # pylint: disable=unused-import
+
+
+class _EasyOperation(_interfaces.Operation):
+  """A trivial implementation of interfaces.Operation."""
+
+  def __init__(
+      self, lock, termination_manager, transmission_manager, expiration_manager,
+      context, operator, reception_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+      context: A base.OperationContext for use by the customer during the
+        operation.
+      operator: A base.Operator for use by the customer during the operation.
+      reception_manager: The _interfaces.ReceptionManager for the operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+    self._reception_manager = reception_manager
+
+    self.context = context
+    self.operator = operator
+
+  def handle_ticket(self, ticket):
+    with self._lock:
+      self._reception_manager.receive_ticket(ticket)
+
+  def abort(self, outcome):
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        self._termination_manager.abort(outcome)
+        self._transmission_manager.abort(outcome)
+        self._expiration_manager.terminate()
+
+
+def invocation_operate(
+    operation_id, group, method, subscription, timeout, initial_metadata,
+    payload, completion, ticket_sink, termination_action, pool):
+  """Constructs objects necessary for front-side operation management.
+
+  Args:
+    operation_id: An object identifying the operation.
+    group: The group identifier of the operation.
+    method: The method identifier of the operation.
+    subscription: A base.Subscription describing the customer's interest in the
+      results of the operation.
+    timeout: A length of time in seconds to allow for the operation.
+    initial_metadata: An initial metadata value to be sent to the other side of
+      the operation. May be None if the initial metadata will be passed later or
+      if there will be no initial metadata passed at all.
+    payload: The first payload value to be transmitted to the other side. May be
+      None if there is no such value or if the customer chose not to pass it at
+      operation invocation.
+    completion: A base.Completion value indicating the end of values passed to
+      the other side of the operation.
+    ticket_sink: A callable that accepts links.Tickets and delivers them to the
+      other side of the operation.
+    termination_action: A callable that accepts the outcome of the operation as
+      a base.Outcome value to be called on operation completion.
+    pool: A thread pool with which to do the work of the operation.
+
+  Returns:
+    An _interfaces.Operation for the operation.
+  """
+  lock = threading.Lock()
+  with lock:
+    termination_manager = _termination.invocation_termination_manager(
+        termination_action, pool)
+    transmission_manager = _transmission.TransmissionManager(
+        operation_id, ticket_sink, lock, pool, termination_manager)
+    expiration_manager = _expiration.invocation_expiration_manager(
+        timeout, lock, termination_manager, transmission_manager)
+    operation_context = _context.OperationContext(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    emission_manager = _emission.EmissionManager(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    ingestion_manager = _ingestion.invocation_ingestion_manager(
+        subscription, lock, pool, termination_manager, transmission_manager,
+        expiration_manager)
+    reception_manager = _reception.ReceptionManager(
+        termination_manager, transmission_manager, expiration_manager,
+        ingestion_manager)
+
+    termination_manager.set_expiration_manager(expiration_manager)
+    transmission_manager.set_expiration_manager(expiration_manager)
+    emission_manager.set_ingestion_manager(ingestion_manager)
+
+    transmission_manager.kick_off(
+        group, method, timeout, initial_metadata, payload, completion, None)
+
+  return _EasyOperation(
+      lock, termination_manager, transmission_manager, expiration_manager,
+      operation_context, emission_manager, reception_manager)
+
+
+def service_operate(
+    servicer_package, ticket, ticket_sink, termination_action, pool):
+  """Constructs an Operation for service of an operation.
+
+  Args:
+    servicer_package: A _utilities.ServicerPackage to be used servicing the
+      operation.
+    ticket: The first links.Ticket received for the operation.
+    ticket_sink: A callable that accepts links.Tickets and delivers them to the
+      other side of the operation.
+    termination_action: A callable that accepts the outcome of the operation as
+      a base.Outcome value to be called on operation completion.
+    pool: A thread pool with which to do the work of the operation.
+
+  Returns:
+    An _interfaces.Operation for the operation.
+  """
+  lock = threading.Lock()
+  with lock:
+    termination_manager = _termination.service_termination_manager(
+        termination_action, pool)
+    transmission_manager = _transmission.TransmissionManager(
+        ticket.operation_id, ticket_sink, lock, pool, termination_manager)
+    expiration_manager = _expiration.service_expiration_manager(
+        ticket.timeout, servicer_package.default_timeout,
+        servicer_package.maximum_timeout, lock, termination_manager,
+        transmission_manager)
+    operation_context = _context.OperationContext(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    emission_manager = _emission.EmissionManager(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    ingestion_manager = _ingestion.service_ingestion_manager(
+        servicer_package.servicer, operation_context, emission_manager, lock,
+        pool, termination_manager, transmission_manager, expiration_manager)
+    reception_manager = _reception.ReceptionManager(
+        termination_manager, transmission_manager, expiration_manager,
+        ingestion_manager)
+
+    termination_manager.set_expiration_manager(expiration_manager)
+    transmission_manager.set_expiration_manager(expiration_manager)
+    emission_manager.set_ingestion_manager(ingestion_manager)
+
+    reception_manager.receive_ticket(ticket)
+
+  return _EasyOperation(
+      lock, termination_manager, transmission_manager, expiration_manager,
+      operation_context, emission_manager, reception_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
new file mode 100644
index 0000000..b64faf8
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_reception.py
@@ -0,0 +1,137 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket reception."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.links import links
+
+_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = {
+    links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED,
+    links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED,
+    links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN,
+    links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE,
+    links.Ticket.Termination.TRANSMISSION_FAILURE:
+        base.Outcome.TRANSMISSION_FAILURE,
+    links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE,
+}
+
+
+class ReceptionManager(_interfaces.ReceptionManager):
+  """A ReceptionManager based around a _Receiver passed to it."""
+
+  def __init__(
+      self, termination_manager, transmission_manager, expiration_manager,
+      ingestion_manager):
+    """Constructor.
+
+    Args:
+      termination_manager: The operation's _interfaces.TerminationManager.
+      transmission_manager: The operation's _interfaces.TransmissionManager.
+      expiration_manager: The operation's _interfaces.ExpirationManager.
+      ingestion_manager: The operation's _interfaces.IngestionManager.
+    """
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+    self._ingestion_manager = ingestion_manager
+
+    self._lowest_unseen_sequence_number = 0
+    self._out_of_sequence_tickets = {}
+    self._aborted = False
+
+  def _abort(self, outcome):
+    self._aborted = True
+    self._termination_manager.abort(outcome)
+    self._transmission_manager.abort(outcome)
+    self._expiration_manager.terminate()
+
+  def _sequence_failure(self, ticket):
+    """Determines a just-arrived ticket's sequential legitimacy.
+
+    Args:
+      ticket: A just-arrived ticket.
+
+    Returns:
+      True if the ticket is sequentially legitimate; False otherwise.
+    """
+    if ticket.sequence_number < self._lowest_unseen_sequence_number:
+      return True
+    elif ticket.sequence_number in self._out_of_sequence_tickets:
+      return True
+    else:
+      return False
+
+  def _process_one(self, ticket):
+    if ticket.sequence_number == 0:
+      self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
+    if ticket.timeout is not None:
+      self._expiration_manager.change_timeout(ticket.timeout)
+    if ticket.termination is None:
+      completion = None
+    else:
+      completion = utilities.completion(
+          ticket.terminal_metadata, ticket.code, ticket.message)
+    self._ingestion_manager.advance(
+        ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
+    if ticket.allowance is not None:
+      self._transmission_manager.allowance(ticket.allowance)
+
+  def _process(self, ticket):
+    """Process those tickets ready to be processed.
+
+    Args:
+      ticket: A just-arrived ticket the sequence number of which matches this
+        _ReceptionManager's _lowest_unseen_sequence_number field.
+    """
+    while True:
+      self._process_one(ticket)
+      next_ticket = self._out_of_sequence_tickets.pop(
+          ticket.sequence_number + 1, None)
+      if next_ticket is None:
+        self._lowest_unseen_sequence_number = ticket.sequence_number + 1
+        return
+      else:
+        ticket = next_ticket
+
+  def receive_ticket(self, ticket):
+    """See _interfaces.ReceptionManager.receive_ticket for specification."""
+    if self._aborted:
+      return
+    elif self._sequence_failure(ticket):
+      self._abort(base.Outcome.RECEPTION_FAILURE)
+    elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
+      outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination]
+      self._abort(outcome)
+    elif ticket.sequence_number == self._lowest_unseen_sequence_number:
+      self._process(ticket)
+    else:
+      self._out_of_sequence_tickets[ticket.sequence_number] = ticket
diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py
new file mode 100644
index 0000000..ad9f612
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_termination.py
@@ -0,0 +1,212 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation termination."""
+
+import abc
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+
+def _invocation_completion_predicate(
+    unused_emission_complete, unused_transmission_complete,
+    unused_reception_complete, ingestion_complete):
+  return ingestion_complete
+
+
+def _service_completion_predicate(
+    unused_emission_complete, transmission_complete, unused_reception_complete,
+    unused_ingestion_complete):
+  return transmission_complete
+
+
+class TerminationManager(_interfaces.TerminationManager):
+  """A _interfaces.TransmissionManager on which another manager may be set."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def set_expiration_manager(self, expiration_manager):
+    """Sets the expiration manager with which this manager will interact.
+
+    Args:
+      expiration_manager: The _interfaces.ExpirationManager associated with the
+        current operation.
+    """
+    raise NotImplementedError()
+
+
+class _TerminationManager(TerminationManager):
+  """An implementation of TerminationManager."""
+
+  def __init__(self, predicate, action, pool):
+    """Constructor.
+
+    Args:
+      predicate: One of _invocation_completion_predicate or
+        _service_completion_predicate to be used to determine when the operation
+        has completed.
+      action: A behavior to pass the operation outcome on operation termination.
+      pool: A thread pool.
+    """
+    self._predicate = predicate
+    self._action = action
+    self._pool = pool
+    self._expiration_manager = None
+
+    self.outcome = None
+    self._callbacks = []
+
+    self._emission_complete = False
+    self._transmission_complete = False
+    self._reception_complete = False
+    self._ingestion_complete = False
+
+  def set_expiration_manager(self, expiration_manager):
+    self._expiration_manager = expiration_manager
+
+  def _terminate_internal_only(self, outcome):
+    """Terminates the operation.
+
+    Args:
+      outcome: A base.Outcome describing the outcome of the operation.
+    """
+    self.outcome = outcome
+    callbacks = list(self._callbacks)
+    self._callbacks = None
+
+    act = callable_util.with_exceptions_logged(
+        self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
+
+    if outcome is base.Outcome.LOCAL_FAILURE:
+      self._pool.submit(act, outcome)
+    else:
+      def call_callbacks_and_act(callbacks, outcome):
+        for callback in callbacks:
+          callback_outcome = callable_util.call_logging_exceptions(
+              callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
+              outcome)
+          if callback_outcome.exception is not None:
+            outcome = base.Outcome.LOCAL_FAILURE
+            break
+        act(outcome)
+
+      self._pool.submit(
+          callable_util.with_exceptions_logged(
+              call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+          callbacks, outcome)
+
+  def _terminate_and_notify(self, outcome):
+    self._terminate_internal_only(outcome)
+    self._expiration_manager.terminate()
+
+  def _perhaps_complete(self):
+    if self._predicate(
+        self._emission_complete, self._transmission_complete,
+        self._reception_complete, self._ingestion_complete):
+      self._terminate_and_notify(base.Outcome.COMPLETED)
+      return True
+    else:
+      return False
+
+  def is_active(self):
+    """See _interfaces.TerminationManager.is_active for specification."""
+    return self.outcome is None
+
+  def add_callback(self, callback):
+    """See _interfaces.TerminationManager.add_callback for specification."""
+    if self.outcome is None:
+      self._callbacks.append(callback)
+      return None
+    else:
+      return self.outcome
+
+  def emission_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._emission_complete = True
+      self._perhaps_complete()
+
+  def transmission_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._transmission_complete = True
+      return self._perhaps_complete()
+    else:
+      return False
+
+  def reception_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._reception_complete = True
+      self._perhaps_complete()
+
+  def ingestion_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._ingestion_complete = True
+      self._perhaps_complete()
+
+  def expire(self):
+    """See _interfaces.TerminationManager.expire for specification."""
+    self._terminate_internal_only(base.Outcome.EXPIRED)
+
+  def abort(self, outcome):
+    """See _interfaces.TerminationManager.abort for specification."""
+    self._terminate_and_notify(outcome)
+
+
+def invocation_termination_manager(action, pool):
+  """Creates a TerminationManager appropriate for invocation-side use.
+
+  Args:
+    action: An action to call on operation termination.
+    pool: A thread pool in which to execute the passed action and any
+      termination callbacks that are registered during the operation.
+
+  Returns:
+    A TerminationManager appropriate for invocation-side use.
+  """
+  return _TerminationManager(_invocation_completion_predicate, action, pool)
+
+
+def service_termination_manager(action, pool):
+  """Creates a TerminationManager appropriate for service-side use.
+
+  Args:
+    action: An action to call on operation termination.
+    pool: A thread pool in which to execute the passed action and any
+      termination callbacks that are registered during the operation.
+
+  Returns:
+    A TerminationManager appropriate for service-side use.
+  """
+  return _TerminationManager(_service_completion_predicate, action, pool)
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
new file mode 100644
index 0000000..01894d3
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -0,0 +1,294 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket transmission during an operation."""
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+
+def _explode_completion(completion):
+  if completion is None:
+    return None, None, None, None
+  else:
+    return (
+        completion.terminal_metadata, completion.code, completion.message,
+        links.Ticket.Termination.COMPLETION)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+  """An _interfaces.TransmissionManager that sends links.Tickets."""
+
+  def __init__(
+      self, operation_id, ticket_sink, lock, pool, termination_manager):
+    """Constructor.
+
+    Args:
+      operation_id: The operation's ID.
+      ticket_sink: A callable that accepts tickets and sends them to the other
+        side of the operation.
+      lock: The operation-servicing-wide lock object.
+      pool: A thread pool in which the work of transmitting tickets will be
+        performed.
+      termination_manager: The _interfaces.TerminationManager associated with
+        this operation.
+    """
+    self._lock = lock
+    self._pool = pool
+    self._ticket_sink = ticket_sink
+    self._operation_id = operation_id
+    self._termination_manager = termination_manager
+    self._expiration_manager = None
+
+    self._lowest_unused_sequence_number = 0
+    self._remote_allowance = 1
+    self._remote_complete = False
+    self._timeout = None
+    self._local_allowance = 0
+    self._initial_metadata = None
+    self._payloads = []
+    self._completion = None
+    self._aborted = False
+    self._abortion_outcome = None
+    self._transmitting = False
+
+  def set_expiration_manager(self, expiration_manager):
+    """Sets the ExpirationManager with which this manager will cooperate."""
+    self._expiration_manager = expiration_manager
+
+  def _next_ticket(self):
+    """Creates the next ticket to be transmitted.
+
+    Returns:
+      A links.Ticket to be sent to the other side of the operation or None if
+        there is nothing to be sent at this time.
+    """
+    if self._aborted:
+      if self._abortion_outcome is None:
+        return None
+      else:
+        termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+            self._abortion_outcome]
+        if termination is None:
+          return None
+        else:
+          self._abortion_outcome = None
+          return links.Ticket(
+              self._operation_id, self._lowest_unused_sequence_number, None,
+              None, None, None, None, None, None, None, None, None,
+              termination)
+
+    action = False
+    # TODO(nathaniel): Support other subscriptions.
+    local_subscription = links.Ticket.Subscription.FULL
+    timeout = self._timeout
+    if timeout is not None:
+      self._timeout = None
+      action = True
+    if self._local_allowance <= 0:
+      allowance = None
+    else:
+      allowance = self._local_allowance
+      self._local_allowance = 0
+      action = True
+    initial_metadata = self._initial_metadata
+    if initial_metadata is not None:
+      self._initial_metadata = None
+      action = True
+    if not self._payloads or self._remote_allowance <= 0:
+      payload = None
+    else:
+      payload = self._payloads.pop(0)
+      self._remote_allowance -= 1
+      action = True
+    if self._completion is None or self._payloads:
+      terminal_metadata, code, message, termination = None, None, None, None
+    else:
+      terminal_metadata, code, message, termination = _explode_completion(
+          self._completion)
+      self._completion = None
+      action = True
+
+    if action:
+      ticket = links.Ticket(
+          self._operation_id, self._lowest_unused_sequence_number, None, None,
+          local_subscription, timeout, allowance, initial_metadata, payload,
+          terminal_metadata, code, message, termination)
+      self._lowest_unused_sequence_number += 1
+      return ticket
+    else:
+      return None
+
+  def _transmit(self, ticket):
+    """Commences the transmission loop sending tickets.
+
+    Args:
+      ticket: A links.Ticket to be sent to the other side of the operation.
+    """
+    def transmit(ticket):
+      while True:
+        transmission_outcome = callable_util.call_logging_exceptions(
+            self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
+        if transmission_outcome.exception is None:
+          with self._lock:
+            if ticket.termination is links.Ticket.Termination.COMPLETION:
+              self._termination_manager.transmission_complete()
+            ticket = self._next_ticket()
+            if ticket is None:
+              self._transmitting = False
+              return
+        else:
+          with self._lock:
+            if self._termination_manager.outcome is None:
+              self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
+              self._expiration_manager.terminate()
+            return
+
+    self._pool.submit(callable_util.with_exceptions_logged(
+        transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
+    self._transmitting = True
+
+  def kick_off(
+      self, group, method, timeout, initial_metadata, payload, completion,
+      allowance):
+    """See _interfaces.TransmissionManager.kickoff for specification."""
+    # TODO(nathaniel): Support other subscriptions.
+    subscription = links.Ticket.Subscription.FULL
+    terminal_metadata, code, message, termination = _explode_completion(
+        completion)
+    self._remote_allowance = 1 if payload is None else 0
+    ticket = links.Ticket(
+        self._operation_id, 0, group, method, subscription, timeout, allowance,
+        initial_metadata, payload, terminal_metadata, code, message,
+        termination)
+    self._lowest_unused_sequence_number = 1
+    self._transmit(ticket)
+
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """See _interfaces.TransmissionManager.advance for specification."""
+    effective_initial_metadata = initial_metadata
+    effective_payload = payload
+    effective_completion = completion
+    if allowance is not None and not self._remote_complete:
+      effective_allowance = allowance
+    else:
+      effective_allowance = None
+    if self._transmitting:
+      if effective_initial_metadata is not None:
+        self._initial_metadata = effective_initial_metadata
+      if effective_payload is not None:
+        self._payloads.append(effective_payload)
+      if effective_completion is not None:
+        self._completion = effective_completion
+      if effective_allowance is not None:
+        self._local_allowance += effective_allowance
+    else:
+      if effective_payload is not None:
+        if 0 < self._remote_allowance:
+          ticket_payload = effective_payload
+          self._remote_allowance -= 1
+        else:
+          self._payloads.append(effective_payload)
+          ticket_payload = None
+      else:
+        ticket_payload = None
+      if effective_completion is not None and not self._payloads:
+        ticket_completion = effective_completion
+      else:
+        self._completion = effective_completion
+        ticket_completion = None
+      if any(
+          (effective_initial_metadata, ticket_payload, ticket_completion,
+           effective_allowance)):
+        terminal_metadata, code, message, termination = _explode_completion(
+            completion)
+        ticket = links.Ticket(
+            self._operation_id, self._lowest_unused_sequence_number, None, None,
+            None, None, allowance, effective_initial_metadata, ticket_payload,
+            terminal_metadata, code, message, termination)
+        self._lowest_unused_sequence_number += 1
+        self._transmit(ticket)
+
+  def timeout(self, timeout):
+    """See _interfaces.TransmissionManager.timeout for specification."""
+    if self._transmitting:
+      self._timeout = timeout
+    else:
+      ticket = links.Ticket(
+          self._operation_id, self._lowest_unused_sequence_number, None, None,
+          None, timeout, None, None, None, None, None, None, None)
+      self._lowest_unused_sequence_number += 1
+      self._transmit(ticket)
+
+  def allowance(self, allowance):
+    """See _interfaces.TransmissionManager.allowance for specification."""
+    if self._transmitting or not self._payloads:
+      self._remote_allowance += allowance
+    else:
+      self._remote_allowance += allowance - 1
+      payload = self._payloads.pop(0)
+      if self._payloads:
+        completion = None
+      else:
+        completion = self._completion
+        self._completion = None
+      terminal_metadata, code, message, termination = _explode_completion(
+          completion)
+      ticket = links.Ticket(
+          self._operation_id, self._lowest_unused_sequence_number, None, None,
+          None, None, None, None, payload, terminal_metadata, code, message,
+          termination)
+      self._lowest_unused_sequence_number += 1
+      self._transmit(ticket)
+
+  def remote_complete(self):
+    """See _interfaces.TransmissionManager.remote_complete for specification."""
+    self._remote_complete = True
+    self._local_allowance = 0
+
+  def abort(self, outcome):
+    """See _interfaces.TransmissionManager.abort for specification."""
+    if self._transmitting:
+      self._aborted, self._abortion_outcome = True, outcome
+    else:
+      self._aborted = True
+      if outcome is not None:
+        termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+            outcome]
+        if termination is not None:
+          ticket = links.Ticket(
+              self._operation_id, self._lowest_unused_sequence_number, None,
+              None, None, None, None, None, None, None, None, None,
+              termination)
+          self._transmit(ticket)
diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py
new file mode 100644
index 0000000..5b0d798
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_utilities.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal utilities."""
+
+import collections
+
+
+class ServicerPackage(
+    collections.namedtuple(
+        'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))):
+  """A trivial bundle class.
+
+  Attributes:
+    servicer: A base.Servicer.
+    default_timeout: A float indicating the length of time in seconds to allow
+      for an operation invoked without a timeout.
+    maximum_timeout: A float indicating the maximum length of time in seconds to
+      allow for an operation.
+  """
diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py
new file mode 100644
index 0000000..364a7fa
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/implementations.py
@@ -0,0 +1,62 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the ticket-exchange-based base layer implementation."""
+
+# base and links are referenced from specification in this module.
+from grpc.framework.core import _end
+from grpc.framework.interfaces.base import base  # pylint: disable=unused-import
+from grpc.framework.interfaces.links import links  # pylint: disable=unused-import
+
+
+def invocation_end_link():
+  """Creates a base.End-links.Link suitable for operation invocation.
+
+  Returns:
+    An object that is both a base.End and a links.Link, that supports operation
+      invocation, and that translates operation invocation into ticket exchange.
+  """
+  return _end.serviceless_end_link()
+
+
+def service_end_link(servicer, default_timeout, maximum_timeout):
+  """Creates a base.End-links.Link suitable for operation service.
+
+  Args:
+    servicer: A base.Servicer for servicing operations.
+    default_timeout: A length of time in seconds to be used as the default
+      time alloted for a single operation.
+    maximum_timeout: A length of time in seconds to be used as the maximum
+      time alloted for a single operation.
+
+  Returns:
+    An object that is both a base.End and a links.Link and that services
+      operations that arrive at it through ticket exchange.
+  """
+  return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout)
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py
index 9d1651d..76e0a5b 100644
--- a/src/python/grpcio/grpc/framework/interfaces/base/base.py
+++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py
@@ -27,10 +27,20 @@
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-"""The base interface of RPC Framework."""
+"""The base interface of RPC Framework.
 
+Implementations of this interface support the conduct of "operations":
+exchanges between two distinct ends of an arbitrary number of data payloads
+and metadata such as a name for the operation, initial and terminal metadata
+in each direction, and flow control. These operations may be used for transfers
+of data, remote procedure calls, status indication, or anything else
+applications choose.
+"""
+
+# threading is referenced from specification in this module.
 import abc
 import enum
+import threading
 
 # abandonment is referenced from specification in this module.
 from grpc.framework.foundation import abandonment  # pylint: disable=unused-import
@@ -208,19 +218,26 @@
     raise NotImplementedError()
 
   @abc.abstractmethod
-  def stop_gracefully(self):
-    """Gracefully stops this object's service of operations.
+  def stop(self, grace):
+    """Stops this object's service of operations.
 
-    Operations in progress will be allowed to complete, and this method blocks
-    until all of them have.
-    """
-    raise NotImplementedError()
+    This object will refuse service of new operations as soon as this method is
+    called but operations under way at the time of the call may be given a
+    grace period during which they are allowed to finish.
 
-  @abc.abstractmethod
-  def stop_immediately(self):
-    """Immediately stops this object's service of operations.
+    Args:
+      grace: A duration of time in seconds to allow ongoing operations to
+        terminate before being forcefully terminated by the stopping of this
+        End. May be zero to terminate all ongoing operations and immediately
+        stop.
 
-    Operations in progress will not be allowed to complete.
+    Returns:
+      A threading.Event that will be set to indicate all operations having
+        terminated and this End having completely stopped. The returned event
+        may not be set until after the full grace period (if some ongoing
+        operation continues for the full length of the period) or it may be set
+        much sooner (if for example this End had no operations in progress at
+        the time its stop method was called).
     """
     raise NotImplementedError()
 
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/__init__.py b/src/python/grpcio/grpc/framework/interfaces/face/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/face/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py
new file mode 100644
index 0000000..948e750
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py
@@ -0,0 +1,933 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces defining the Face layer of RPC Framework."""
+
+import abc
+import collections
+import enum
+
+# cardinality, style, abandonment, future, and stream are
+# referenced from specification in this module.
+from grpc.framework.common import cardinality  # pylint: disable=unused-import
+from grpc.framework.common import style  # pylint: disable=unused-import
+from grpc.framework.foundation import abandonment  # pylint: disable=unused-import
+from grpc.framework.foundation import future  # pylint: disable=unused-import
+from grpc.framework.foundation import stream  # pylint: disable=unused-import
+
+
+class NoSuchMethodError(Exception):
+  """Raised by customer code to indicate an unrecognized method.
+
+  Attributes:
+    group: The group of the unrecognized method.
+    name: The name of the unrecognized method.
+  """
+
+  def __init__(self, group, method):
+    """Constructor.
+
+    Args:
+      group: The group identifier of the unrecognized RPC name.
+      method: The method identifier of the unrecognized RPC name.
+    """
+    super(NoSuchMethodError, self).__init__()
+    self.group = group
+    self.method = method
+
+  def __repr__(self):
+    return 'face.NoSuchMethodError(%s, %s)' % (self.group, self.method,)
+
+
+class Abortion(
+    collections.namedtuple(
+        'Abortion',
+        ('kind', 'initial_metadata', 'terminal_metadata', 'code', 'details',))):
+  """A value describing RPC abortion.
+
+  Attributes:
+    kind: A Kind value identifying how the RPC failed.
+    initial_metadata: The initial metadata from the other side of the RPC or
+      None if no initial metadata value was received.
+    terminal_metadata: The terminal metadata from the other side of the RPC or
+      None if no terminal metadata value was received.
+    code: The code value from the other side of the RPC or None if no code value
+      was received.
+    details: The details value from the other side of the RPC or None if no
+      details value was received.
+  """
+
+  @enum.unique
+  class Kind(enum.Enum):
+    """Types of RPC abortion."""
+
+    CANCELLED = 'cancelled'
+    EXPIRED = 'expired'
+    LOCAL_SHUTDOWN = 'local shutdown'
+    REMOTE_SHUTDOWN = 'remote shutdown'
+    NETWORK_FAILURE = 'network failure'
+    LOCAL_FAILURE = 'local failure'
+    REMOTE_FAILURE = 'remote failure'
+
+
+class AbortionError(Exception):
+  """Common super type for exceptions indicating RPC abortion.
+
+    initial_metadata: The initial metadata from the other side of the RPC or
+      None if no initial metadata value was received.
+    terminal_metadata: The terminal metadata from the other side of the RPC or
+      None if no terminal metadata value was received.
+    code: The code value from the other side of the RPC or None if no code value
+      was received.
+    details: The details value from the other side of the RPC or None if no
+      details value was received.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  def __init__(self, initial_metadata, terminal_metadata, code, details):
+    super(AbortionError, self).__init__()
+    self.initial_metadata = initial_metadata
+    self.terminal_metadata = terminal_metadata
+    self.code = code
+    self.details = details
+
+
+class CancellationError(AbortionError):
+  """Indicates that an RPC has been cancelled."""
+
+
+class ExpirationError(AbortionError):
+  """Indicates that an RPC has expired ("timed out")."""
+
+
+class LocalShutdownError(AbortionError):
+  """Indicates that an RPC has terminated due to local shutdown of RPCs."""
+
+
+class RemoteShutdownError(AbortionError):
+  """Indicates that an RPC has terminated due to remote shutdown of RPCs."""
+
+
+class NetworkError(AbortionError):
+  """Indicates that some error occurred on the network."""
+
+
+class LocalError(AbortionError):
+  """Indicates that an RPC has terminated due to a local defect."""
+
+
+class RemoteError(AbortionError):
+  """Indicates that an RPC has terminated due to a remote defect."""
+
+
+class RpcContext(object):
+  """Provides RPC-related information and action."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def is_active(self):
+    """Describes whether the RPC is active or has terminated."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def time_remaining(self):
+    """Describes the length of allowed time remaining for the RPC.
+
+    Returns:
+      A nonnegative float indicating the length of allowed time in seconds
+      remaining for the RPC to complete before it is considered to have timed
+      out.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def add_abortion_callback(self, abortion_callback):
+    """Registers a callback to be called if the RPC is aborted.
+
+    Args:
+      abortion_callback: A callable to be called and passed an Abortion value
+        in the event of RPC abortion.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def cancel(self):
+    """Cancels the RPC.
+
+    Idempotent and has no effect if the RPC has already terminated.
+    """
+    raise NotImplementedError()
+
+
+class Call(RpcContext):
+  """Invocation-side utility object for an RPC."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def initial_metadata(self):
+    """Accesses the initial metadata from the service-side of the RPC.
+
+    This method blocks until the value is available or is known not to have been
+    emitted from the service-side of the RPC.
+
+    Returns:
+      The initial metadata object emitted by the service-side of the RPC, or
+        None if there was no such value.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def terminal_metadata(self):
+    """Accesses the terminal metadata from the service-side of the RPC.
+
+    This method blocks until the value is available or is known not to have been
+    emitted from the service-side of the RPC.
+
+    Returns:
+      The terminal metadata object emitted by the service-side of the RPC, or
+        None if there was no such value.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def code(self):
+    """Accesses the code emitted by the service-side of the RPC.
+
+    This method blocks until the value is available or is known not to have been
+    emitted from the service-side of the RPC.
+
+    Returns:
+      The code object emitted by the service-side of the RPC, or None if there
+        was no such value.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def details(self):
+    """Accesses the details value emitted by the service-side of the RPC.
+
+    This method blocks until the value is available or is known not to have been
+    emitted from the service-side of the RPC.
+
+    Returns:
+      The details value emitted by the service-side of the RPC, or None if there
+        was no such value.
+    """
+    raise NotImplementedError()
+
+
+class ServicerContext(RpcContext):
+  """A context object passed to method implementations."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def invocation_metadata(self):
+    """Accesses the metadata from the invocation-side of the RPC.
+
+    This method blocks until the value is available or is known not to have been
+    emitted from the invocation-side of the RPC.
+
+    Returns:
+      The metadata object emitted by the invocation-side of the RPC, or None if
+        there was no such value.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def initial_metadata(self, initial_metadata):
+    """Accepts the service-side initial metadata value of the RPC.
+
+    This method need not be called by method implementations if they have no
+    service-side initial metadata to transmit.
+
+    Args:
+      initial_metadata: The service-side initial metadata value of the RPC to
+        be transmitted to the invocation side of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def terminal_metadata(self, terminal_metadata):
+    """Accepts the service-side terminal metadata value of the RPC.
+
+    This method need not be called by method implementations if they have no
+    service-side terminal metadata to transmit.
+
+    Args:
+      terminal_metadata: The service-side terminal metadata value of the RPC to
+        be transmitted to the invocation side of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def code(self, code):
+    """Accepts the service-side code of the RPC.
+
+    This method need not be called by method implementations if they have no
+    code to transmit.
+
+    Args:
+      code: The code of the RPC to be transmitted to the invocation side of the
+        RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def details(self, details):
+    """Accepts the service-side details of the RPC.
+
+    This method need not be called by method implementations if they have no
+    service-side details to transmit.
+
+    Args:
+      details: The service-side details value of the RPC to be transmitted to
+        the invocation side of the RPC.
+    """
+    raise NotImplementedError()
+
+
+class ResponseReceiver(object):
+  """Invocation-side object used to accept the output of an RPC."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def initial_metadata(self, initial_metadata):
+    """Receives the initial metadata from the service-side of the RPC.
+
+    Args:
+      initial_metadata: The initial metadata object emitted from the
+        service-side of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def response(self, response):
+    """Receives a response from the service-side of the RPC.
+
+    Args:
+      response: A response object emitted from the service-side of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def complete(self, terminal_metadata, code, details):
+    """Receives the completion values emitted from the service-side of the RPC.
+
+    Args:
+      terminal_metadata: The terminal metadata object emitted from the
+        service-side of the RPC.
+      code: The code object emitted from the service-side of the RPC.
+      details: The details object emitted from the service-side of the RPC.
+    """
+    raise NotImplementedError()
+
+
+class UnaryUnaryMultiCallable(object):
+  """Affords invoking a unary-unary RPC in any call style."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def __call__(
+      self, request, timeout, metadata=None, with_call=False):
+    """Synchronously invokes the underlying RPC.
+
+    Args:
+      request: The request value for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+      with_call: Whether or not to include return a Call for the RPC in addition
+        to the reponse.
+
+    Returns:
+      The response value for the RPC, and a Call for the RPC if with_call was
+        set to True at invocation.
+
+    Raises:
+      AbortionError: Indicating that the RPC was aborted.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def future(self, request, timeout, metadata=None):
+    """Asynchronously invokes the underlying RPC.
+
+    Args:
+      request: The request value for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and a future.Future. In the
+        event of RPC completion, the return Future's result value will be the
+        response value of the RPC. In the event of RPC abortion, the returned
+        Future's exception value will be an AbortionError.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event(
+      self, request, receiver, abortion_callback, timeout,
+      metadata=None):
+    """Asynchronously invokes the underlying RPC.
+
+    Args:
+      request: The request value for the RPC.
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      A Call for the RPC.
+    """
+    raise NotImplementedError()
+
+
+class UnaryStreamMultiCallable(object):
+  """Affords invoking a unary-stream RPC in any call style."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def __call__(self, request, timeout, metadata=None):
+    """Invokes the underlying RPC.
+
+    Args:
+      request: The request value for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and an iterator of response
+        values. Drawing response values from the returned iterator may raise
+        AbortionError indicating abortion of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event(
+      self, request, receiver, abortion_callback, timeout,
+      metadata=None):
+    """Asynchronously invokes the underlying RPC.
+
+    Args:
+      request: The request value for the RPC.
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      A Call object for the RPC.
+    """
+    raise NotImplementedError()
+
+
+class StreamUnaryMultiCallable(object):
+  """Affords invoking a stream-unary RPC in any call style."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def __call__(
+      self, request_iterator, timeout, metadata=None,
+      with_call=False):
+    """Synchronously invokes the underlying RPC.
+
+    Args:
+      request_iterator: An iterator that yields request values for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+      with_call: Whether or not to include return a Call for the RPC in addition
+        to the reponse.
+
+    Returns:
+      The response value for the RPC, and a Call for the RPC if with_call was
+        set to True at invocation.
+
+    Raises:
+      AbortionError: Indicating that the RPC was aborted.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def future(self, request_iterator, timeout, metadata=None):
+    """Asynchronously invokes the underlying RPC.
+
+    Args:
+      request_iterator: An iterator that yields request values for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and a future.Future. In the
+        event of RPC completion, the return Future's result value will be the
+        response value of the RPC. In the event of RPC abortion, the returned
+        Future's exception value will be an AbortionError.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event(
+      self, receiver, abortion_callback, timeout, metadata=None):
+    """Asynchronously invokes the underlying RPC.
+
+    Args:
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      A single object that is both a Call object for the RPC and a
+        stream.Consumer to which the request values of the RPC should be passed.
+    """
+    raise NotImplementedError()
+
+
+class StreamStreamMultiCallable(object):
+  """Affords invoking a stream-stream RPC in any call style."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def __call__(self, request_iterator, timeout, metadata=None):
+    """Invokes the underlying RPC.
+
+    Args:
+      request_iterator: An iterator that yields request values for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and an iterator of response
+        values. Drawing response values from the returned iterator may raise
+        AbortionError indicating abortion of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event(
+      self, receiver, abortion_callback, timeout, metadata=None):
+    """Asynchronously invokes the underlying RPC.
+
+    Args:
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of
+        the RPC.
+
+    Returns:
+      A single object that is both a Call object for the RPC and a
+        stream.Consumer to which the request values of the RPC should be passed.
+    """
+    raise NotImplementedError()
+
+
+class MethodImplementation(object):
+  """A sum type that describes a method implementation.
+
+  Attributes:
+    cardinality: A cardinality.Cardinality value.
+    style: A style.Service value.
+    unary_unary_inline: The implementation of the method as a callable value
+      that takes a request value and a ServicerContext object and returns a
+      response value. Only non-None if cardinality is
+      cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
+    unary_stream_inline: The implementation of the method as a callable value
+      that takes a request value and a ServicerContext object and returns an
+      iterator of response values. Only non-None if cardinality is
+      cardinality.Cardinality.UNARY_STREAM and style is style.Service.INLINE.
+    stream_unary_inline: The implementation of the method as a callable value
+      that takes an iterator of request values and a ServicerContext object and
+      returns a response value. Only non-None if cardinality is
+      cardinality.Cardinality.STREAM_UNARY and style is style.Service.INLINE.
+    stream_stream_inline: The implementation of the method as a callable value
+      that takes an iterator of request values and a ServicerContext object and
+      returns an iterator of response values. Only non-None if cardinality is
+      cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
+    unary_unary_event: The implementation of the method as a callable value that
+      takes a request value, a response callback to which to pass the response
+      value of the RPC, and a ServicerContext. Only non-None if cardinality is
+      cardinality.Cardinality.UNARY_UNARY and style is style.Service.EVENT.
+    unary_stream_event: The implementation of the method as a callable value
+      that takes a request value, a stream.Consumer to which to pass the
+      response values of the RPC, and a ServicerContext. Only non-None if
+      cardinality is cardinality.Cardinality.UNARY_STREAM and style is
+      style.Service.EVENT.
+    stream_unary_event: The implementation of the method as a callable value
+      that takes a response callback to which to pass the response value of the
+      RPC and a ServicerContext and returns a stream.Consumer to which the
+      request values of the RPC should be passed. Only non-None if cardinality
+      is cardinality.Cardinality.STREAM_UNARY and style is style.Service.EVENT.
+    stream_stream_event: The implementation of the method as a callable value
+      that takes a stream.Consumer to which to pass the response values of the
+      RPC and a ServicerContext and returns a stream.Consumer to which the
+      request values of the RPC should be passed. Only non-None if cardinality
+      is cardinality.Cardinality.STREAM_STREAM and style is
+      style.Service.EVENT.
+  """
+  __metaclass__ = abc.ABCMeta
+
+
+class MultiMethodImplementation(object):
+  """A general type able to service many methods."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def service(self, group, method, response_consumer, context):
+    """Services an RPC.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      response_consumer: A stream.Consumer to be called to accept the response
+        values of the RPC.
+      context: a ServicerContext object.
+
+    Returns:
+      A stream.Consumer with which to accept the request values of the RPC. The
+        consumer returned from this method may or may not be invoked to
+        completion: in the case of RPC abortion, RPC Framework will simply stop
+        passing values to this object. Implementations must not assume that this
+        object will be called to completion of the request stream or even called
+        at all.
+
+    Raises:
+      abandonment.Abandoned: May or may not be raised when the RPC has been
+        aborted.
+      NoSuchMethodError: If this MultiMethod does not recognize the given group
+        and name for the RPC and is not able to service the RPC.
+    """
+    raise NotImplementedError()
+
+
+class GenericStub(object):
+  """Affords RPC invocation via generic methods."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def blocking_unary_unary(
+      self, group, method, request, timeout, metadata=None,
+      with_call=False):
+    """Invokes a unary-request-unary-response method.
+
+    This method blocks until either returning the response value of the RPC
+    (in the event of RPC completion) or raising an exception (in the event of
+    RPC abortion).
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request: The request value for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+      with_call: Whether or not to include return a Call for the RPC in addition
+        to the reponse.
+
+    Returns:
+      The response value for the RPC, and a Call for the RPC if with_call was
+        set to True at invocation.
+
+    Raises:
+      AbortionError: Indicating that the RPC was aborted.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def future_unary_unary(
+      self, group, method, request, timeout, metadata=None):
+    """Invokes a unary-request-unary-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request: The request value for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and a future.Future. In the
+        event of RPC completion, the return Future's result value will be the
+        response value of the RPC. In the event of RPC abortion, the returned
+        Future's exception value will be an AbortionError.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def inline_unary_stream(
+      self, group, method, request, timeout, metadata=None):
+    """Invokes a unary-request-stream-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request: The request value for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and an iterator of response
+        values. Drawing response values from the returned iterator may raise
+        AbortionError indicating abortion of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def blocking_stream_unary(
+      self, group, method, request_iterator, timeout, metadata=None,
+      with_call=False):
+    """Invokes a stream-request-unary-response method.
+
+    This method blocks until either returning the response value of the RPC
+    (in the event of RPC completion) or raising an exception (in the event of
+    RPC abortion).
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request_iterator: An iterator that yields request values for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+      with_call: Whether or not to include return a Call for the RPC in addition
+        to the reponse.
+
+    Returns:
+      The response value for the RPC, and a Call for the RPC if with_call was
+        set to True at invocation.
+
+    Raises:
+      AbortionError: Indicating that the RPC was aborted.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def future_stream_unary(
+      self, group, method, request_iterator, timeout, metadata=None):
+    """Invokes a stream-request-unary-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request_iterator: An iterator that yields request values for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and a future.Future. In the
+        event of RPC completion, the return Future's result value will be the
+        response value of the RPC. In the event of RPC abortion, the returned
+        Future's exception value will be an AbortionError.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def inline_stream_stream(
+      self, group, method, request_iterator, timeout, metadata=None):
+    """Invokes a stream-request-stream-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request_iterator: An iterator that yields request values for the RPC.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      An object that is both a Call for the RPC and an iterator of response
+        values. Drawing response values from the returned iterator may raise
+        AbortionError indicating abortion of the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event_unary_unary(
+      self, group, method, request, receiver, abortion_callback, timeout,
+      metadata=None):
+    """Event-driven invocation of a unary-request-unary-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request: The request value for the RPC.
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      A Call for the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event_unary_stream(
+      self, group, method, request, receiver, abortion_callback, timeout,
+      metadata=None):
+    """Event-driven invocation of a unary-request-stream-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      request: The request value for the RPC.
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      A Call for the RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event_stream_unary(
+      self, group, method, receiver, abortion_callback, timeout,
+      metadata=None):
+    """Event-driven invocation of a unary-request-unary-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      A pair of a Call object for the RPC and a stream.Consumer to which the
+        request values of the RPC should be passed.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event_stream_stream(
+      self, group, method, receiver, abortion_callback, timeout,
+      metadata=None):
+    """Event-driven invocation of a unary-request-stream-response method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+      receiver: A ResponseReceiver to be passed the response data of the RPC.
+      abortion_callback: A callback to be called and passed an Abortion value
+        in the event of RPC abortion.
+      timeout: A duration of time in seconds to allow for the RPC.
+      metadata: A metadata value to be passed to the service-side of the RPC.
+
+    Returns:
+      A pair of a Call object for the RPC and a stream.Consumer to which the
+        request values of the RPC should be passed.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def unary_unary(self, group, method):
+    """Creates a UnaryUnaryMultiCallable for a unary-unary method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+
+    Returns:
+      A UnaryUnaryMultiCallable value for the named unary-unary method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def unary_stream(self, group, method):
+    """Creates a UnaryStreamMultiCallable for a unary-stream method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+
+    Returns:
+      A UnaryStreamMultiCallable value for the name unary-stream method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def stream_unary(self, group, method):
+    """Creates a StreamUnaryMultiCallable for a stream-unary method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+
+    Returns:
+      A StreamUnaryMultiCallable value for the named stream-unary method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def stream_stream(self, group, method):
+    """Creates a StreamStreamMultiCallable for a stream-stream method.
+
+    Args:
+      group: The group identifier of the RPC.
+      method: The method identifier of the RPC.
+
+    Returns:
+      A StreamStreamMultiCallable value for the named stream-stream method.
+    """
+    raise NotImplementedError()
+
+
+class DynamicStub(object):
+  """Affords RPC invocation via attributes corresponding to afforded methods.
+
+  Instances of this type may be scoped to a single group so that attribute
+  access is unambiguous.
+
+  Instances of this type respond to attribute access as follows: if the
+  requested attribute is the name of a unary-unary method, the value of the
+  attribute will be a UnaryUnaryMultiCallable with which to invoke an RPC; if
+  the requested attribute is the name of a unary-stream method, the value of the
+  attribute will be a UnaryStreamMultiCallable with which to invoke an RPC; if
+  the requested attribute is the name of a stream-unary method, the value of the
+  attribute will be a StreamUnaryMultiCallable with which to invoke an RPC; and
+  if the requested attribute is the name of a stream-stream method, the value of
+  the attribute will be a StreamStreamMultiCallable with which to invoke an RPC.
+  """
+  __metaclass__ = abc.ABCMeta
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/utilities.py b/src/python/grpcio/grpc/framework/interfaces/face/utilities.py
new file mode 100644
index 0000000..db2ec6e
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/face/utilities.py
@@ -0,0 +1,178 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for RPC Framework's Face interface."""
+
+import collections
+
+# stream is referenced from specification in this module.
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import stream  # pylint: disable=unused-import
+from grpc.framework.interfaces.face import face
+
+
+class _MethodImplementation(
+    face.MethodImplementation,
+    collections.namedtuple(
+        '_MethodImplementation',
+        ['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
+         'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
+         'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
+  pass
+
+
+def unary_unary_inline(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a unary-unary RPC method as a callable value
+      that takes a request value and an face.ServicerContext object and
+      returns a response value.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
+      None, None, None, None, None, None, None)
+
+
+def unary_stream_inline(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a unary-stream RPC method as a callable
+      value that takes a request value and an face.ServicerContext object and
+      returns an iterator of response values.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
+      behavior, None, None, None, None, None, None)
+
+
+def stream_unary_inline(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a stream-unary RPC method as a callable
+      value that takes an iterator of request values and an
+      face.ServicerContext object and returns a response value.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
+      behavior, None, None, None, None, None)
+
+
+def stream_stream_inline(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a stream-stream RPC method as a callable
+      value that takes an iterator of request values and an
+      face.ServicerContext object and returns an iterator of response values.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
+      None, behavior, None, None, None, None)
+
+
+def unary_unary_event(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a unary-unary RPC method as a callable
+      value that takes a request value, a response callback to which to pass
+      the response value of the RPC, and an face.ServicerContext.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
+      None, None, behavior, None, None, None)
+
+
+def unary_stream_event(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a unary-stream RPC method as a callable
+      value that takes a request value, a stream.Consumer to which to pass the
+      the response values of the RPC, and an face.ServicerContext.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
+      None, None, None, behavior, None, None)
+
+
+def stream_unary_event(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a stream-unary RPC method as a callable
+      value that takes a response callback to which to pass the response value
+      of the RPC and an face.ServicerContext and returns a stream.Consumer to
+      which the request values of the RPC should be passed.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
+      None, None, None, None, behavior, None)
+
+
+def stream_stream_event(behavior):
+  """Creates an face.MethodImplementation for the given behavior.
+
+  Args:
+    behavior: The implementation of a stream-stream RPC method as a callable
+      value that takes a stream.Consumer to which to pass the response values
+      of the RPC and an face.ServicerContext and returns a stream.Consumer to
+      which the request values of the RPC should be passed.
+
+  Returns:
+    An face.MethodImplementation derived from the given behavior.
+  """
+  return _MethodImplementation(
+      cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
+      None, None, None, None, None, behavior)
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py
index 5ebbac8..069ff02 100644
--- a/src/python/grpcio/grpc/framework/interfaces/links/links.py
+++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py
@@ -98,7 +98,7 @@
     COMPLETION = 'completion'
     CANCELLATION = 'cancellation'
     EXPIRATION = 'expiration'
-    LOCAL_SHUTDOWN = 'local shutdown'
+    SHUTDOWN = 'shutdown'
     RECEPTION_FAILURE = 'reception failure'
     TRANSMISSION_FAILURE = 'transmission failure'
     LOCAL_FAILURE = 'local failure'
diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
new file mode 100644
index 0000000..72b1ae5
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
@@ -0,0 +1,165 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import collections
+import logging
+import random
+import time
+import unittest
+
+from grpc._adapter import _intermediary_low
+from grpc._links import invocation
+from grpc._links import service
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test import test_common as grpc_test_common
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
+_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
+_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
+_CODE = _intermediary_low.Code.OK
+_MESSAGE = b'test message'
+
+
+class _SerializationBehaviors(
+    collections.namedtuple(
+        '_SerializationBehaviors',
+        ('request_serializers', 'request_deserializers', 'response_serializers',
+         'response_deserializers',))):
+  pass
+
+
+class _Links(
+    collections.namedtuple(
+        '_Links',
+        ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
+         'service_end_link'))):
+  pass
+
+
+def _serialization_behaviors_from_serializations(serializations):
+  request_serializers = {}
+  request_deserializers = {}
+  response_serializers = {}
+  response_deserializers = {}
+  for (group, method), serialization in serializations.iteritems():
+    request_serializers[group, method] = serialization.serialize_request
+    request_deserializers[group, method] = serialization.deserialize_request
+    response_serializers[group, method] = serialization.serialize_response
+    response_deserializers[group, method] = serialization.deserialize_response
+  return _SerializationBehaviors(
+      request_serializers, request_deserializers, response_serializers,
+      response_deserializers)
+
+
+class _Implementation(test_interfaces.Implementation):
+
+  def instantiate(self, serializations, servicer):
+    serialization_behaviors = _serialization_behaviors_from_serializations(
+        serializations)
+    invocation_end_link = implementations.invocation_end_link()
+    service_end_link = implementations.service_end_link(
+        servicer, test_constants.DEFAULT_TIMEOUT,
+        test_constants.MAXIMUM_TIMEOUT)
+    service_grpc_link = service.service_link(
+        serialization_behaviors.request_deserializers,
+        serialization_behaviors.response_serializers)
+    port = service_grpc_link.add_port(0, None)
+    channel = _intermediary_low.Channel('localhost:%d' % port, None)
+    invocation_grpc_link = invocation.invocation_link(
+        channel, b'localhost',
+        serialization_behaviors.request_serializers,
+        serialization_behaviors.response_deserializers)
+
+    invocation_end_link.join_link(invocation_grpc_link)
+    invocation_grpc_link.join_link(invocation_end_link)
+    service_end_link.join_link(service_grpc_link)
+    service_grpc_link.join_link(service_end_link)
+    invocation_grpc_link.start()
+    service_grpc_link.start()
+    return invocation_end_link, service_end_link, (
+        invocation_grpc_link, service_grpc_link)
+
+  def destantiate(self, memo):
+    invocation_grpc_link, service_grpc_link = memo
+    invocation_grpc_link.stop()
+    service_grpc_link.stop_gracefully()
+
+  def invocation_initial_metadata(self):
+    return _INVOCATION_INITIAL_METADATA
+
+  def service_initial_metadata(self):
+    return _SERVICE_INITIAL_METADATA
+
+  def invocation_completion(self):
+    return utilities.completion(None, None, None)
+
+  def service_completion(self):
+    return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
+
+  def metadata_transmitted(self, original_metadata, transmitted_metadata):
+    return original_metadata is None or grpc_test_common.metadata_transmitted(
+        original_metadata, transmitted_metadata)
+
+  def completion_transmitted(self, original_completion, transmitted_completion):
+    if (original_completion.terminal_metadata is not None and
+        not grpc_test_common.metadata_transmitted(
+            original_completion.terminal_metadata,
+            transmitted_completion.terminal_metadata)):
+        return False
+    elif original_completion.code is not transmitted_completion.code:
+      return False
+    elif original_completion.message != transmitted_completion.message:
+      return False
+    else:
+      return True
+
+
+def setUpModule():
+  logging.warn('setUpModule!')
+
+
+def tearDownModule():
+  logging.warn('tearDownModule!')
+
+
+def load_tests(loader, tests, pattern):
+  return unittest.TestSuite(
+      tests=tuple(
+          loader.loadTestsFromTestCase(test_case_class)
+          for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+  unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
new file mode 100644
index 0000000..8d72f13
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
@@ -0,0 +1,96 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import logging
+import random
+import time
+import unittest
+
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+
+class _Implementation(test_interfaces.Implementation):
+
+  def __init__(self):
+    self._invocation_initial_metadata = object()
+    self._service_initial_metadata = object()
+    self._invocation_terminal_metadata = object()
+    self._service_terminal_metadata = object()
+
+  def instantiate(self, serializations, servicer):
+    invocation = implementations.invocation_end_link()
+    service = implementations.service_end_link(
+        servicer, test_constants.DEFAULT_TIMEOUT,
+        test_constants.MAXIMUM_TIMEOUT)
+    invocation.join_link(service)
+    service.join_link(invocation)
+    return invocation, service, None
+
+  def destantiate(self, memo):
+    pass
+
+  def invocation_initial_metadata(self):
+    return self._invocation_initial_metadata
+
+  def service_initial_metadata(self):
+    return self._service_initial_metadata
+
+  def invocation_completion(self):
+    return utilities.completion(self._invocation_terminal_metadata, None, None)
+
+  def service_completion(self):
+    return utilities.completion(self._service_terminal_metadata, None, None)
+
+  def metadata_transmitted(self, original_metadata, transmitted_metadata):
+    return transmitted_metadata is original_metadata
+
+  def completion_transmitted(self, original_completion, transmitted_completion):
+    return (
+        (original_completion.terminal_metadata is
+         transmitted_completion.terminal_metadata) and
+        original_completion.code is transmitted_completion.code and
+        original_completion.message is transmitted_completion.message
+    )
+
+
+def load_tests(loader, tests, pattern):
+  return unittest.TestSuite(
+      tests=tuple(
+          loader.loadTestsFromTestCase(test_case_class)
+          for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+  unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
index dd332fe..5c8b176 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -211,8 +211,10 @@
       elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
         break
 
-    invocation_end.stop_gracefully()
-    service_end.stop_gracefully()
+    invocation_stop_event = invocation_end.stop(0)
+    service_stop_event = service_end.stop(0)
+    invocation_stop_event.wait()
+    service_stop_event.wait()
     invocation_stats = invocation_end.operation_stats()
     service_stats = service_end.operation_stats()
 
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
index 6c2e334..a2bd710 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
@@ -29,9 +29,42 @@
 
 """State and behavior appropriate for use in tests."""
 
+import logging
 import threading
+import time
 
 from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+# A more-or-less arbitrary limit on the length of raw data values to be logged.
+_UNCOMFORTABLY_LONG = 48
+
+
+def _safe_for_log_ticket(ticket):
+  """Creates a safe-for-printing-to-the-log ticket for a given ticket.
+
+  Args:
+    ticket: Any links.Ticket.
+
+  Returns:
+    A links.Ticket that is as much as can be equal to the given ticket but
+      possibly features values like the string "<payload of length 972321>" in
+      place of the actual values of the given ticket.
+  """
+  if isinstance(ticket.payload, (basestring,)):
+    payload_length = len(ticket.payload)
+  else:
+    payload_length = -1
+  if payload_length < _UNCOMFORTABLY_LONG:
+    return ticket
+  else:
+    return links.Ticket(
+        ticket.operation_id, ticket.sequence_number,
+        ticket.group, ticket.method, ticket.subscription, ticket.timeout,
+        ticket.allowance, ticket.initial_metadata,
+        '<payload of length {}>'.format(payload_length),
+        ticket.terminal_metadata, ticket.code, ticket.message,
+        ticket.termination)
 
 
 class RecordingLink(links.Link):
@@ -64,3 +97,71 @@
     """Returns a copy of the list of all tickets received by this Link."""
     with self._condition:
       return tuple(self._tickets)
+
+
+class _Pipe(object):
+  """A conduit that logs all tickets passed through it."""
+
+  def __init__(self, name):
+    self._lock = threading.Lock()
+    self._name = name
+    self._left_mate = utilities.NULL_LINK
+    self._right_mate = utilities.NULL_LINK
+
+  def accept_left_to_right_ticket(self, ticket):
+    with self._lock:
+      logging.warning(
+          '%s: moving left to right through %s: %s', time.time(), self._name,
+          _safe_for_log_ticket(ticket))
+      try:
+        self._right_mate.accept_ticket(ticket)
+      except Exception as e:  # pylint: disable=broad-except
+        logging.exception(e)
+
+  def accept_right_to_left_ticket(self, ticket):
+    with self._lock:
+      logging.warning(
+          '%s: moving right to left through %s: %s', time.time(), self._name,
+          _safe_for_log_ticket(ticket))
+      try:
+        self._left_mate.accept_ticket(ticket)
+      except Exception as e:  # pylint: disable=broad-except
+        logging.exception(e)
+
+  def join_left_mate(self, left_mate):
+    with self._lock:
+      self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate
+
+  def join_right_mate(self, right_mate):
+    with self._lock:
+      self._right_mate = (
+          utilities.NULL_LINK if right_mate is None else right_mate)
+
+
+class _Facade(links.Link):
+
+  def __init__(self, accept, join):
+    self._accept = accept
+    self._join = join
+
+  def accept_ticket(self, ticket):
+    self._accept(ticket)
+
+  def join_link(self, link):
+    self._join(link)
+
+
+def logging_links(name):
+  """Creates a conduit that logs all tickets passed through it.
+
+  Args:
+    name: A name to use for the conduit to identify itself in logging output.
+
+  Returns:
+    Two links.Links, the first of which is the "left" side of the conduit
+      and the second of which is the "right" side of the conduit.
+  """
+  pipe = _Pipe(name)
+  left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate)
+  right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate)
+  return left_facade, right_facade
diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c
index 27531ec..a0a6793 100644
--- a/test/core/end2end/fixtures/chttp2_fake_security.c
+++ b/test/core/end2end/fixtures/chttp2_fake_security.c
@@ -70,7 +70,7 @@
                                  grpc_process_auth_metadata_done_cb cb,
                                  void *user_data) {
   GPR_ASSERT(state == NULL);
-  cb(user_data, NULL, 0, 0);
+  cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
 }
 
 static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
index 491a293..beae241 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
@@ -73,7 +73,7 @@
                                  grpc_process_auth_metadata_done_cb cb,
                                  void *user_data) {
   GPR_ASSERT(state == NULL);
-  cb(user_data, NULL, 0, 0);
+  cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
 }
 
 static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c
index f2736cc..c8971be 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c
@@ -73,7 +73,7 @@
                                  grpc_process_auth_metadata_done_cb cb,
                                  void *user_data) {
   GPR_ASSERT(state == NULL);
-  cb(user_data, NULL, 0, 0);
+  cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
 }
 
 static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c
index cc0b9db..a518a7d 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c
@@ -101,7 +101,7 @@
                                  grpc_process_auth_metadata_done_cb cb,
                                  void *user_data) {
   GPR_ASSERT(state == NULL);
-  cb(user_data, NULL, 0, 0);
+  cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
 }
 
 static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
index d82e623..7f11028 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
@@ -79,7 +79,7 @@
                                          client_identity);
   GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(
                  ctx, client_identity_property_name) == 1);
-  cb(user_data, oauth2, 1, 1);
+  cb(user_data, oauth2, 1, NULL, 0, GRPC_STATUS_OK, NULL);
 }
 
 static void process_oauth2_failure(void *state, grpc_auth_context *ctx,
@@ -90,7 +90,7 @@
       find_metadata(md, md_count, "Authorization", oauth2_md);
   GPR_ASSERT(state == NULL);
   GPR_ASSERT(oauth2 != NULL);
-  cb(user_data, oauth2, 1, 0);
+  cb(user_data, oauth2, 1, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
 }
 
 static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack(
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 6f80219..6fdca93 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -39,4 +39,12 @@
 export DYLD_LIBRARY_PATH=$ROOT/libs/$CONFIG
 export PATH=$ROOT/bins/$CONFIG:$ROOT/bins/$CONFIG/protobuf:$PATH
 source "python"$PYVER"_virtual_environment"/bin/activate
+
+# TODO(atash): These tests don't currently run under py.test and thus don't
+# appear under the coverage report. Find a way to get these tests to work with
+# py.test (or find another tool or *something*) that's acceptable to the rest of
+# the team...
+"python"$PYVER -m grpc_test._core_over_links_base_interface_test
+"python"$PYVER -m grpc_test.framework.core._base_interface_test
+
 "python"$PYVER $GRPCIO_TEST/setup.py test -a "-n8 --cov=grpc --junitxml=./report.xml"