Merge github.com:grpc/grpc into error
diff --git a/Makefile b/Makefile
index 1df3976..70d3955 100644
--- a/Makefile
+++ b/Makefile
@@ -1845,7 +1845,7 @@
 $(LIBDIR)/$(CONFIG)/pkgconfig/grpc_zookeeper.pc:
 	$(E) "[MAKE]    Generating $@"
 	$(Q) mkdir -p $(@D)
-	$(Q) echo -e "$(GRPC_ZOOKEEPER_PC_FILE)" >$@
+	$(Q) echo "$(GRPC_ZOOKEEPER_PC_FILE)" | tr , '\n' >$@
 
 $(LIBDIR)/$(CONFIG)/pkgconfig/grpc++.pc:
 	$(E) "[MAKE]    Generating $@"
diff --git a/doc/connectivity-semantics-and-api.md b/doc/connectivity-semantics-and-api.md
index 5427900..cc007ea 100644
--- a/doc/connectivity-semantics-and-api.md
+++ b/doc/connectivity-semantics-and-api.md
@@ -101,7 +101,7 @@
     <td>Shutdown triggered by application.</td>
   </tr>
   <tr>
-    <th>FATAL_FAILURE</th>
+    <th>SHUTDOWN</th>
     <td></td>
     <td></td>
     <td></td>
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index c5318f0..0c04c77 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -218,8 +218,11 @@
 static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
   gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
   uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
-  memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
-  gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+  if (s->total_read_bytes > 0) {
+    // Only copy if there is non-zero number of bytes
+    memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+    gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+  }
   grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
   *s->recv_message = (grpc_byte_buffer *)&s->sbs;
 }
@@ -347,8 +350,17 @@
           if (grpc_cronet_trace) {
             gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
           }
-          cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
-                                           s->remaining_read_bytes);
+          if (s->remaining_read_bytes > 0) {
+            cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+                                             s->remaining_read_bytes);
+          } else {
+            // Calling the closing callback directly since this is a 0 byte read
+            // for an empty message.
+            process_recv_message(s, NULL);
+            enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+            invoke_closing_callback(s);
+            set_recv_state(s, CRONET_RECV_CLOSED);
+          }
         }
       }
       break;
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 6b10d3e..2ba7408 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -130,6 +130,9 @@
   grpc_pollset **pollsets;
   /* number of pollsets in the pollsets array */
   size_t pollset_count;
+
+  /* next pollset to assign a channel to */
+  size_t next_pollset_to_assign;
 };
 
 grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
@@ -148,6 +151,7 @@
   s->head = NULL;
   s->tail = NULL;
   s->nports = 0;
+  s->next_pollset_to_assign = 0;
   *server = s;
   return GRPC_ERROR_NONE;
 }
@@ -328,7 +332,9 @@
     goto error;
   }
 
-  read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd);
+  read_notifier_pollset =
+      sp->server->pollsets[(sp->server->next_pollset_to_assign++) %
+                           sp->server->pollset_count];
 
   /* loop until accept4 returns EAGAIN, and then re-arm notification */
   for (;;) {
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index d92addb..dcdddc7 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -235,8 +235,16 @@
             await barrier.Task;  // make sure the handler has started.
             cts.Cancel();
 
-            var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
-            Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+            try
+            {
+                // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+                await call.ResponseAsync;
+                Assert.Fail();
+            }
+            catch (RpcException ex)
+            {
+                Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+            }
         }
 
         [Test]
@@ -265,9 +273,15 @@
             await handlerStartedBarrier.Task;
             cts.Cancel();
 
-            var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
-            Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
-
+            try
+            {
+                await call.ResponseAsync;
+                Assert.Fail();
+            }
+            catch (RpcException ex)
+            {
+                Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+            }
             Assert.AreEqual("SUCCESS", await successTcs.Task);
         }
 
diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
index cec8c7c..6a15629 100644
--- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
@@ -105,7 +105,15 @@
             var parentCall = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
             await readyToCancelTcs.Task;
             cts.Cancel();
-            Assert.ThrowsAsync(typeof(RpcException), async () => await parentCall);
+            try
+            {
+                // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+                await parentCall;
+                Assert.Fail();
+            }
+            catch (RpcException)
+            {
+            }
             Assert.AreEqual("CHILD_CALL_CANCELLED", await successTcs.Task);
         }
 
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index ab12c12..6fe3827 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -32,7 +32,7 @@
 #endregion
 
 using System;
-using System.Threading;
+using System.Linq;
 using Grpc.Core;
 using NUnit.Framework;
 
@@ -44,7 +44,11 @@
         public void InitializeAndShutdownGrpcEnvironment()
         {
             var env = GrpcEnvironment.AddRef();
-            Assert.IsNotNull(env.CompletionQueue);
+            Assert.IsTrue(env.CompletionQueues.Count > 0);
+            for (int i = 0; i < env.CompletionQueues.Count; i++)
+            {
+                Assert.IsNotNull(env.CompletionQueues.ElementAt(i));
+            }
             GrpcEnvironment.Release();
         }
 
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 0e20476..c35aaf6 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -53,8 +53,6 @@
         [SetUp]
         public void Init()
         {
-            var environment = GrpcEnvironment.AddRef();
-
             // Create a fake server just so we have an instance to refer to.
             // The server won't actually be used at all.
             server = new Server()
@@ -66,7 +64,6 @@
             fakeCall = new FakeNativeCall();
             asyncCallServer = new AsyncCallServer<string, string>(
                 Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
-                environment,
                 server);
             asyncCallServer.InitializeForTesting(fakeCall);
         }
@@ -75,7 +72,6 @@
         public void Cleanup()
         {
             server.ShutdownAsync().Wait();
-            GrpcEnvironment.Release();
         }
 
         [Test]
@@ -136,7 +132,6 @@
         public void WriteAfterCancelNotificationFails()
         {
             var finishedTask = asyncCallServer.ServerSideCallAsync();
-            var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
             var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
 
             fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
@@ -181,6 +176,21 @@
             AssertFinished(asyncCallServer, fakeCall, finishedTask);
         }
 
+        [Test]
+        public void WriteAfterWriteStatusThrowsInvalidOperationException()
+        {
+            var finishedTask = asyncCallServer.ServerSideCallAsync();
+            var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+            asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
+            Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
+
+            fakeCall.SendStatusFromServerHandler(true);
+            fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+            AssertFinished(asyncCallServer, fakeCall, finishedTask);
+        }
+
         static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
         {
             Assert.IsTrue(fakeCall.IsDisposed);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 777a1c8..81897f8 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -33,7 +33,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Runtime.InteropServices;
 using System.Threading.Tasks;
 
 using Grpc.Core.Internal;
@@ -82,7 +81,7 @@
             Assert.ThrowsAsync(typeof(InvalidOperationException),
                 async () => await asyncCall.ReadMessageAsync());
             Assert.Throws(typeof(InvalidOperationException),
-                () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+                () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
         }
 
         [Test]
@@ -103,7 +102,7 @@
             var resultTask = asyncCall.UnaryCallAsync("request1");
             fakeCall.UnaryResponseClientHandler(true,
                 CreateClientSideStatus(StatusCode.InvalidArgument),
-                CreateResponsePayload(),
+                null,
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -148,7 +147,7 @@
             var resultTask = asyncCall.ClientStreamingCallAsync();
             fakeCall.UnaryResponseClientHandler(true,
                 CreateClientSideStatus(StatusCode.InvalidArgument),
-                CreateResponsePayload(),
+                null,
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -193,7 +192,7 @@
 
             fakeCall.UnaryResponseClientHandler(true,
                 CreateClientSideStatus(StatusCode.Internal),
-                CreateResponsePayload(),
+                null,
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
@@ -211,7 +210,9 @@
                 new Metadata());
 
             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
-            var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+
+            var writeTask = requestStream.WriteAsync("request1");
+            var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
             Assert.AreEqual(Status.DefaultSuccess, ex.Status);
         }
 
@@ -223,11 +224,13 @@
 
             fakeCall.UnaryResponseClientHandler(true,
                 new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
-                CreateResponsePayload(),
+                null,
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
-            var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+
+            var writeTask = requestStream.WriteAsync("request1");
+            var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
             Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
         }
 
@@ -267,7 +270,7 @@
         }
 
         [Test]
-        public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
+        public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
         {
             var resultTask = asyncCall.ClientStreamingCallAsync();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -275,11 +278,12 @@
             asyncCall.Cancel();
             Assert.IsTrue(fakeCall.IsCancelled);
 
-            Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+            var writeTask = requestStream.WriteAsync("request1");
+            Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
 
             fakeCall.UnaryResponseClientHandler(true,
                 CreateClientSideStatus(StatusCode.Cancelled),
-                CreateResponsePayload(),
+                null,
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
@@ -290,7 +294,7 @@
         {
             asyncCall.StartServerStreamingCall("request1");
             Assert.Throws(typeof(InvalidOperationException),
-                () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+                () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
         }
 
         [Test]
@@ -390,12 +394,13 @@
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
 
-            var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
+            var writeTask = requestStream.WriteAsync("request1");
+            var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
             Assert.AreEqual(Status.DefaultSuccess, ex.Status);
         }
 
         [Test]
-        public void DuplexStreaming_CompleteAfterReceivingStatusFails()
+        public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
         {
             asyncCall.StartDuplexStreamingCall();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -411,7 +416,7 @@
         }
 
         [Test]
-        public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
+        public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
         {
             asyncCall.StartDuplexStreamingCall();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -419,7 +424,9 @@
 
             asyncCall.Cancel();
             Assert.IsTrue(fakeCall.IsCancelled);
-            Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+
+            var writeTask = requestStream.WriteAsync("request1");
+            Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
 
             var readTask = responseStream.MoveNext();
             fakeCall.ReceivedMessageHandler(true, null);
diff --git a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
index 0663e77..d770f82 100644
--- a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
@@ -134,7 +134,15 @@
         {
             helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
             {
-                Assert.ThrowsAsync<IOException>(async () => await requestStream.MoveNext());
+                try
+                {
+                    // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+                    await requestStream.MoveNext();
+                    Assert.Fail();
+                }
+                catch (IOException)
+                {
+                }
                 return "RESPONSE";
             });
 
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 93a6e6a..886adfe 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -31,7 +31,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -56,6 +55,7 @@
 
         readonly string target;
         readonly GrpcEnvironment environment;
+        readonly CompletionQueueSafeHandle completionQueue;
         readonly ChannelSafeHandle handle;
         readonly Dictionary<string, ChannelOption> options;
 
@@ -75,6 +75,7 @@
             EnsureUserAgentChannelOption(this.options);
             this.environment = GrpcEnvironment.AddRef();
 
+            this.completionQueue = this.environment.PickCompletionQueue();
             using (var nativeCredentials = credentials.ToNativeCredentials())
             using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
             {
@@ -135,7 +136,7 @@
                     tcs.SetCanceled();
                 }
             });
-            handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
+            handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
             return tcs.Task;
         }
 
@@ -231,6 +232,14 @@
             }
         }
 
+        internal CompletionQueueSafeHandle CompletionQueue
+        {
+            get
+            {
+                return this.completionQueue;
+            }
+        }
+
         internal void AddCallReference(object call)
         {
             activeCallCounter.Increment();
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 4bf30e8..a8b7b5f 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -86,7 +86,6 @@
     <Compile Include="Utils\BenchmarkUtil.cs" />
     <Compile Include="ChannelCredentials.cs" />
     <Compile Include="Internal\ChannelArgsSafeHandle.cs" />
-    <Compile Include="Internal\AsyncCompletion.cs" />
     <Compile Include="Internal\AsyncCallBase.cs" />
     <Compile Include="Internal\AsyncCallServer.cs" />
     <Compile Include="Internal\AsyncCall.cs" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index bee0ef1..18af109 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -32,8 +32,9 @@
 #endregion
 
 using System;
+using System.Collections.Generic;
+using System.Linq;
 using System.Runtime.InteropServices;
-using System.Threading.Tasks;
 using Grpc.Core.Internal;
 using Grpc.Core.Logging;
 using Grpc.Core.Utils;
@@ -51,12 +52,13 @@
         static GrpcEnvironment instance;
         static int refCount;
         static int? customThreadPoolSize;
+        static int? customCompletionQueueCount;
 
         static ILogger logger = new ConsoleLogger();
 
         readonly GrpcThreadPool threadPool;
-        readonly CompletionRegistry completionRegistry;
         readonly DebugStats debugStats = new DebugStats();
+        readonly AtomicCounter cqPickerCounter = new AtomicCounter();
         bool isClosed;
 
         /// <summary>
@@ -141,36 +143,51 @@
         }
 
         /// <summary>
+        /// Sets the number of completion queues in the  gRPC thread pool that polls for internal RPC events.
+        /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+        /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
+        /// Most users should rely on the default value provided by gRPC library.
+        /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+        /// </summary>
+        public static void SetCompletionQueueCount(int completionQueueCount)
+        {
+            lock (staticLock)
+            {
+                GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+                GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
+                customCompletionQueueCount = completionQueueCount;
+            }
+        }
+
+        /// <summary>
         /// Creates gRPC environment.
         /// </summary>
         private GrpcEnvironment()
         {
             GrpcNativeInit();
-            completionRegistry = new CompletionRegistry(this);
-            threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
+            threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
             threadPool.Start();
         }
 
         /// <summary>
-        /// Gets the completion registry used by this gRPC environment.
+        /// Gets the completion queues used by this gRPC environment.
         /// </summary>
-        internal CompletionRegistry CompletionRegistry
+        internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
         {
             get
             {
-                return this.completionRegistry;
+                return this.threadPool.CompletionQueues;
             }
         }
 
         /// <summary>
-        /// Gets the completion queue used by this gRPC environment.
+        /// Picks a completion queue in a round-robin fashion.
+        /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
         /// </summary>
-        internal CompletionQueueSafeHandle CompletionQueue
+        internal CompletionQueueSafeHandle PickCompletionQueue()
         {
-            get
-            {
-                return this.threadPool.CompletionQueue;
-            }
+            var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
+            return this.threadPool.CompletionQueues.ElementAt(cqIndex);
         }
 
         /// <summary>
@@ -230,5 +247,15 @@
             // more work, but seems to work reasonably well for a start.
             return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
         }
+
+        private int GetCompletionQueueCountOrDefault()
+        {
+            if (customCompletionQueueCount.HasValue)
+            {
+                return customCompletionQueueCount.Value;
+            }
+            // by default, create a completion queue for each thread
+            return GetThreadPoolSizeOrDefault();
+        }
     }
 }
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 5535186..895be69 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -32,12 +32,7 @@
 #endregion
 
 using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
 using System.Threading.Tasks;
-using Grpc.Core.Internal;
 using Grpc.Core.Logging;
 using Grpc.Core.Profiling;
 using Grpc.Core.Utils;
@@ -57,9 +52,11 @@
         // Completion of a pending unary response if not null.
         TaskCompletionSource<TResponse> unaryResponseTcs;
 
+        // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
         // Indicates that response streaming call has finished.
         TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
 
+        // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
         // Response headers set here once received.
         TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
 
@@ -67,7 +64,7 @@
         ClientSideStatus? finishedStatus;
 
         public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
-            : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
+            : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
         {
             this.details = callDetails.WithOptions(callDetails.Options.Normalize());
             this.initialMetadataSent = true;  // we always send metadata at the very beginning of the call.
@@ -144,7 +141,7 @@
                 GrpcPreconditions.CheckState(!started);
                 started = true;
 
-                Initialize(environment.CompletionQueue);
+                Initialize(details.Channel.CompletionQueue);
 
                 halfcloseRequested = true;
                 readingDone = true;
@@ -171,7 +168,7 @@
                 GrpcPreconditions.CheckState(!started);
                 started = true;
 
-                Initialize(environment.CompletionQueue);
+                Initialize(details.Channel.CompletionQueue);
 
                 readingDone = true;
 
@@ -195,7 +192,7 @@
                 GrpcPreconditions.CheckState(!started);
                 started = true;
 
-                Initialize(environment.CompletionQueue);
+                Initialize(details.Channel.CompletionQueue);
 
                 halfcloseRequested = true;
 
@@ -220,7 +217,7 @@
                 GrpcPreconditions.CheckState(!started);
                 started = true;
 
-                Initialize(environment.CompletionQueue);
+                Initialize(details.Channel.CompletionQueue);
 
                 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
                 {
@@ -232,11 +229,10 @@
 
         /// <summary>
         /// Sends a streaming request. Only one pending send action is allowed at any given time.
-        /// completionDelegate is called when the operation finishes.
         /// </summary>
-        public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+        public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
         {
-            StartSendMessageInternal(msg, writeFlags, completionDelegate);
+            return SendMessageInternalAsync(msg, writeFlags);
         }
 
         /// <summary>
@@ -250,29 +246,32 @@
         /// <summary>
         /// Sends halfclose, indicating client is done with streaming requests.
         /// Only one pending send action is allowed at any given time.
-        /// completionDelegate is called when the operation finishes.
         /// </summary>
-        public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
+        public Task SendCloseFromClientAsync()
         {
             lock (myLock)
             {
-                GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
-                CheckSendingAllowed(allowFinished: true);
+                GrpcPreconditions.CheckState(started);
 
-                if (!disposed && !finished)
+                var earlyResult = CheckSendPreconditionsClientSide();
+                if (earlyResult != null)
                 {
-                    call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
+                    return earlyResult;
                 }
-                else
+
+                if (disposed || finished)
                 {
                     // In case the call has already been finished by the serverside,
-                    // the halfclose has already been done implicitly, so we only
-                    // emit the notification for the completion delegate.
-                    Task.Run(() => HandleSendCloseFromClientFinished(true));
+                    // the halfclose has already been done implicitly, so just return
+                    // completed task here.
+                    halfcloseRequested = true;
+                    return Task.FromResult<object>(null);
                 }
+                call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
 
                 halfcloseRequested = true;
-                sendCompletionDelegate = completionDelegate;
+                streamingWriteTcs = new TaskCompletionSource<object>();
+                return streamingWriteTcs.Task;
             }
         }
 
@@ -342,6 +341,45 @@
             get { return true; }
         }
 
+        protected override Task CheckSendAllowedOrEarlyResult()
+        {
+            var earlyResult = CheckSendPreconditionsClientSide();
+            if (earlyResult != null)
+            {
+                return earlyResult;
+            }
+
+            if (finishedStatus.HasValue)
+            {
+                // throwing RpcException if we already received status on client
+                // side makes the most sense.
+                // Note that this throws even for StatusCode.OK.
+                // Writing after the call has finished is not a programming error because server can close
+                // the call anytime, so don't throw directly, but let the write task finish with an error.
+                var tcs = new TaskCompletionSource<object>();
+                tcs.SetException(new RpcException(finishedStatus.Value.Status));
+                return tcs.Task;
+            }
+
+            return null;
+        }
+
+        private Task CheckSendPreconditionsClientSide()
+        {
+            GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
+            GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
+
+            if (cancelRequested)
+            {
+                // Return a cancelled task.
+                var tcs = new TaskCompletionSource<object>();
+                tcs.SetCanceled();
+                return tcs.Task;
+            }
+
+            return null;
+        }
+
         private void Initialize(CompletionQueueSafeHandle cq)
         {
             using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@@ -368,7 +406,7 @@
                 var credentials = details.Options.Credentials;
                 using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
                 {
-                    var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+                    var result = details.Channel.Handle.CreateCall(
                                  parentCall, ContextPropagationToken.DefaultMask, cq,
                                  details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
                     return result;
@@ -400,6 +438,7 @@
         /// </summary>
         private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
         {
+            // TODO(jtattermusch): handle success==false
             responseHeadersTcs.SetResult(responseHeaders);
         }
 
@@ -443,19 +482,6 @@
             }
         }
 
-        protected override void CheckSendingAllowed(bool allowFinished)
-        {
-            base.CheckSendingAllowed(true);
-
-            // throwing RpcException if we already received status on client
-            // side makes the most sense.
-            // Note that this throws even for StatusCode.OK.
-            if (!allowFinished && finishedStatus.HasValue)
-            {
-                throw new RpcException(finishedStatus.Value.Status);
-            }
-        }
-
         /// <summary>
         /// Handles receive status completion for calls with streaming response.
         /// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 4de2370..cb8366c 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -58,7 +58,6 @@
         readonly Func<TWrite, byte[]> serializer;
         readonly Func<byte[], TRead> deserializer;
 
-        protected readonly GrpcEnvironment environment;
         protected readonly object myLock = new object();
 
         protected INativeCall call;
@@ -67,8 +66,8 @@
         protected bool started;
         protected bool cancelRequested;
 
-        protected AsyncCompletionDelegate<object> sendCompletionDelegate;  // Completion of a pending send or sendclose if not null.
         protected TaskCompletionSource<TRead> streamingReadTcs;  // Completion of a pending streaming read if not null.
+        protected TaskCompletionSource<object> streamingWriteTcs;  // Completion of a pending streaming write or send close from client if not null.
         protected TaskCompletionSource<object> sendStatusFromServerTcs;
 
         protected bool readingDone;  // True if last read (i.e. read with null payload) was already received.
@@ -78,11 +77,10 @@
         protected bool initialMetadataSent;
         protected long streamingWritesCounter;  // Number of streaming send operations started so far.
 
-        public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
+        public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
         {
             this.serializer = GrpcPreconditions.CheckNotNull(serializer);
             this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
-            this.environment = GrpcPreconditions.CheckNotNull(environment);
         }
 
         /// <summary>
@@ -128,28 +126,31 @@
 
         /// <summary>
         /// Initiates sending a message. Only one send operation can be active at a time.
-        /// completionDelegate is invoked upon completion.
         /// </summary>
-        protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+        protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
         {
             byte[] payload = UnsafeSerialize(msg);
 
             lock (myLock)
             {
-                GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
-                CheckSendingAllowed(allowFinished: false);
+                GrpcPreconditions.CheckState(started);
+                var earlyResult = CheckSendAllowedOrEarlyResult();
+                if (earlyResult != null)
+                {
+                    return earlyResult;
+                }
 
                 call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
 
-                sendCompletionDelegate = completionDelegate;
                 initialMetadataSent = true;
                 streamingWritesCounter++;
+                streamingWriteTcs = new TaskCompletionSource<object>();
+                return streamingWriteTcs.Task;
             }
         }
 
         /// <summary>
         /// Initiates reading a message. Only one read operation can be active at a time.
-        /// completionDelegate is invoked upon completion.
         /// </summary>
         protected Task<TRead> ReadMessageInternalAsync()
         {
@@ -159,7 +160,7 @@
                 if (readingDone)
                 {
                     // the last read that returns null or throws an exception is idempotent
-                    // and maintain its state.
+                    // and maintains its state.
                     GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
                     return streamingReadTcs.Task;
                 }
@@ -183,7 +184,7 @@
             {
                 if (!disposed && call != null)
                 {
-                    bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
+                    bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
                     if (noMoreSendCompletions && readingDone && finished)
                     {
                         ReleaseResources();
@@ -213,24 +214,11 @@
         {
         }
 
-        protected virtual void CheckSendingAllowed(bool allowFinished)
-        {
-            GrpcPreconditions.CheckState(started);
-            CheckNotCancelled();
-            GrpcPreconditions.CheckState(!disposed || allowFinished);
-
-            GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
-            GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
-            GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
-        }
-
-        protected void CheckNotCancelled()
-        {
-            if (cancelRequested)
-            {
-                throw new OperationCanceledException("Remote call has been cancelled.");
-            }
-        }
+        /// <summary>
+        /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
+        /// logic by directly returning the write operation result task. Normally, null is returned.
+        /// </summary>
+        protected abstract Task CheckSendAllowedOrEarlyResult();
 
         protected byte[] UnsafeSerialize(TWrite msg)
         {
@@ -259,39 +247,27 @@
             }
         }
 
-        protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
-        {
-            try
-            {
-                completionDelegate(value, error);
-            }
-            catch (Exception e)
-            {
-                Logger.Error(e, "Exception occured while invoking completion delegate.");
-            }
-        }
-
         /// <summary>
         /// Handles send completion.
         /// </summary>
         protected void HandleSendFinished(bool success)
         {
-            AsyncCompletionDelegate<object> origCompletionDelegate = null;
+            TaskCompletionSource<object> origTcs = null;
             lock (myLock)
             {
-                origCompletionDelegate = sendCompletionDelegate;
-                sendCompletionDelegate = null;
+                origTcs = streamingWriteTcs;
+                streamingWriteTcs = null;
 
                 ReleaseResourcesIfPossible();
             }
 
             if (!success)
             {
-                FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
+                origTcs.SetException(new InvalidOperationException("Send failed"));
             }
             else
             {
-                FireCompletion(origCompletionDelegate, null, null);
+                origTcs.SetResult(null);
             }
         }
 
@@ -300,22 +276,23 @@
         /// </summary>
         protected void HandleSendCloseFromClientFinished(bool success)
         {
-            AsyncCompletionDelegate<object> origCompletionDelegate = null;
+            TaskCompletionSource<object> origTcs = null;
             lock (myLock)
             {
-                origCompletionDelegate = sendCompletionDelegate;
-                sendCompletionDelegate = null;
+                origTcs = streamingWriteTcs;
+                streamingWriteTcs = null;
 
                 ReleaseResourcesIfPossible();
             }
 
             if (!success)
             {
-                FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
+                // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs).
+                origTcs.SetException(new InvalidOperationException("Sending close from client has failed."));
             }
             else
             {
-                FireCompletion(origCompletionDelegate, null, null);
+                origTcs.SetResult(null);
             }
         }
 
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index b1566b4..56c23ba 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -51,14 +51,14 @@
         readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
         readonly Server server;
 
-        public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
+        public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
         {
             this.server = GrpcPreconditions.CheckNotNull(server);
         }
 
-        public void Initialize(CallSafeHandle call)
+        public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
         {
-            call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
+            call.Initialize(completionQueue);
 
             server.AddCallReference(this);
             InitializeInternal(call);
@@ -91,11 +91,10 @@
 
         /// <summary>
         /// Sends a streaming response. Only one pending send action is allowed at any given time.
-        /// completionDelegate is called when the operation finishes.
         /// </summary>
-        public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+        public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
         {
-            StartSendMessageInternal(msg, writeFlags, completionDelegate);
+            return SendMessageInternalAsync(msg, writeFlags);
         }
 
         /// <summary>
@@ -110,20 +109,22 @@
         /// Initiates sending a initial metadata. 
         /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
         /// to make things simpler.
-        /// completionDelegate is invoked upon completion.
         /// </summary>
-        public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+        public Task SendInitialMetadataAsync(Metadata headers)
         {
             lock (myLock)
             {
                 GrpcPreconditions.CheckNotNull(headers, "metadata");
-                GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
 
+                GrpcPreconditions.CheckState(started);
                 GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
                 GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
-                CheckSendingAllowed(allowFinished: false);
 
-                GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+                var earlyResult = CheckSendAllowedOrEarlyResult();
+                if (earlyResult != null)
+                {
+                    return earlyResult;
+                }
 
                 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
                 {
@@ -131,7 +132,8 @@
                 }
 
                 this.initialMetadataSent = true;
-                sendCompletionDelegate = completionDelegate;
+                streamingWriteTcs = new TaskCompletionSource<object>();
+                return streamingWriteTcs.Task;
             }
         }
 
@@ -196,6 +198,16 @@
             server.RemoveCallReference(this);
         }
 
+        protected override Task CheckSendAllowedOrEarlyResult()
+        {
+            GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
+            GrpcPreconditions.CheckState(!finished, "Already finished.");
+            GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
+            GrpcPreconditions.CheckState(!disposed);
+
+            return null;
+        }
+
         /// <summary>
         /// Handles the server side close completion.
         /// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
deleted file mode 100644
index 7e86fdd..0000000
--- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
+++ /dev/null
@@ -1,94 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-// 
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-// 
-//     * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-//     * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-//     * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-// 
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-#endregion
-
-using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
-using System.Threading.Tasks;
-using Grpc.Core.Internal;
-using Grpc.Core.Utils;
-
-namespace Grpc.Core.Internal
-{
-    /// <summary>
-    /// If error != null, there's been an error or operation has been cancelled.
-    /// </summary>
-    internal delegate void AsyncCompletionDelegate<T>(T result, Exception error);
-
-    /// <summary>
-    /// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
-    /// </summary>
-    internal class AsyncCompletionTaskSource<T>
-    {
-        readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
-        readonly AsyncCompletionDelegate<T> completionDelegate;
-
-        public AsyncCompletionTaskSource()
-        {
-            completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion);
-        }
-
-        public Task<T> Task
-        {
-            get
-            {
-                return tcs.Task;
-            }
-        }
-
-        public AsyncCompletionDelegate<T> CompletionDelegate
-        {
-            get
-            {
-                return completionDelegate;
-            }
-        }
-
-        private void HandleCompletion(T value, Exception error)
-        {
-            if (error == null)
-            {
-                tcs.SetResult(value);
-                return;
-            }
-            if (error is OperationCanceledException)
-            {
-                tcs.SetCanceled();
-                return;
-            }
-            tcs.SetException(error);
-        }
-    }
-}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 244b97d..82361f5 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -47,16 +47,14 @@
         static readonly NativeMethods Native = NativeMethods.Get();
 
         const uint GRPC_WRITE_BUFFER_HINT = 1;
-        CompletionRegistry completionRegistry;
         CompletionQueueSafeHandle completionQueue;
 
         private CallSafeHandle()
         {
         }
 
-        public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
+        public void Initialize(CompletionQueueSafeHandle completionQueue)
         {
-            this.completionRegistry = completionRegistry;
             this.completionQueue = completionQueue;
         }
 
@@ -70,7 +68,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
                 Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
                     .CheckOk();
             }
@@ -90,7 +88,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
                 Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
             }
         }
@@ -100,7 +98,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
                 Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
             }
         }
@@ -110,7 +108,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
                 Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
             }
         }
@@ -120,7 +118,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
                 Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
             }
         }
@@ -130,7 +128,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
                 Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
             }
         }
@@ -142,7 +140,7 @@
             {
                 var ctx = BatchContextSafeHandle.Create();
                 var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
                 Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
                     optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
             }
@@ -153,7 +151,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
                 Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
             }
         }
@@ -163,7 +161,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
                 Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
             }
         }
@@ -173,7 +171,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
                 Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
             }
         }
@@ -183,7 +181,7 @@
             using (completionQueue.NewScope())
             {
                 var ctx = BatchContextSafeHandle.Create();
-                completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+                completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
                 Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
             }
         }
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 1dbd1f4..62864df 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -63,7 +63,7 @@
             return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs);
         }
 
-        public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
+        public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
         {
             using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
             {
@@ -72,7 +72,7 @@
                 {
                     result.SetCredentials(credentials);
                 }
-                result.Initialize(registry, cq);
+                result.Initialize(cq);
                 return result;
             }
         }
@@ -82,11 +82,10 @@
             return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
         }
 
-        public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
-            CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
+        public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
         {
             var ctx = BatchContextSafeHandle.Create();
-            completionRegistry.RegisterBatchCompletion(ctx, callback);
+            cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
             Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
         }
 
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 013f00f..924de02 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -50,16 +50,12 @@
 
         public Task WriteAsync(TRequest message)
         {
-            var taskSource = new AsyncCompletionTaskSource<object>();
-            call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
-            return taskSource.Task;
+            return call.SendMessageAsync(message, GetWriteFlags());
         }
 
         public Task CompleteAsync()
         {
-            var taskSource = new AsyncCompletionTaskSource<object>();
-            call.StartSendCloseFromClient(taskSource.CompletionDelegate);
-            return taskSource.Task;
+            return call.SendCloseFromClientAsync();
         }
 
         public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 91364cd..46f5624 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -45,6 +45,7 @@
         static readonly NativeMethods Native = NativeMethods.Get();
 
         AtomicCounter shutdownRefcount = new AtomicCounter(1);
+        CompletionRegistry completionRegistry;
 
         private CompletionQueueSafeHandle()
         {
@@ -53,7 +54,13 @@
         public static CompletionQueueSafeHandle Create()
         {
             return Native.grpcsharp_completion_queue_create();
+        }
 
+        public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry)
+        {
+            var cq = Native.grpcsharp_completion_queue_create();
+            cq.completionRegistry = completionRegistry;
+            return cq;
         }
 
         public CompletionQueueEvent Next()
@@ -83,6 +90,15 @@
             DecrementShutdownRefcount();
         }
 
+        /// <summary>
+        /// Completion registry associated with this completion queue.
+        /// Doesn't need to be set if only using Pluck() operations.
+        /// </summary>
+        public CompletionRegistry CompletionRegistry
+        {
+            get { return completionRegistry; }
+        }
+
         protected override bool ReleaseHandle()
         {
             Native.grpcsharp_completion_queue_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index b538726..4de543b 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,15 +33,15 @@
 
 using System;
 using System.Collections.Generic;
-using System.Runtime.InteropServices;
+using System.Linq;
 using System.Threading;
-using System.Threading.Tasks;
 using Grpc.Core.Logging;
+using Grpc.Core.Utils;
 
 namespace Grpc.Core.Internal
 {
     /// <summary>
-    /// Pool of threads polling on the same completion queue.
+    /// Pool of threads polling on a set of completions queues.
     /// </summary>
     internal class GrpcThreadPool
     {
@@ -51,25 +51,31 @@
         readonly object myLock = new object();
         readonly List<Thread> threads = new List<Thread>();
         readonly int poolSize;
+        readonly int completionQueueCount;
 
-        CompletionQueueSafeHandle cq;
+        IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
 
-        public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
+        /// <summary>
+        /// Creates a thread pool threads polling on a set of completions queues.
+        /// </summary>
+        /// <param name="environment">Environment.</param>
+        /// <param name="poolSize">Pool size.</param>
+        /// <param name="completionQueueCount">Completion queue count.</param>
+        public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
         {
             this.environment = environment;
             this.poolSize = poolSize;
+            this.completionQueueCount = completionQueueCount;
+            GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
+                "Thread pool size cannot be smaller than the number of completion queues used.");
         }
 
         public void Start()
         {
             lock (myLock)
             {
-                if (cq != null)
-                {
-                    throw new InvalidOperationException("Already started.");
-                }
-
-                cq = CompletionQueueSafeHandle.Create();
+                GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
+                completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
 
                 for (int i = 0; i < poolSize; i++)
                 {
@@ -82,37 +88,48 @@
         {
             lock (myLock)
             {
-                cq.Shutdown();
+                foreach (var cq in completionQueues)
+                {
+                    cq.Shutdown();
+                }
+
                 foreach (var thread in threads)
                 {
                     thread.Join();
                 }
 
-                cq.Dispose();
+                foreach (var cq in completionQueues)
+                {
+                    cq.Dispose();
+                }
             }
         }
 
-        internal CompletionQueueSafeHandle CompletionQueue
+        internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
         {
             get
             {
-                return cq;
+                return completionQueues;
             }
         }
 
-        private Thread CreateAndStartThread(int i)
+        private Thread CreateAndStartThread(int threadIndex)
         {
-            var thread = new Thread(new ThreadStart(RunHandlerLoop));
+            var cqIndex = threadIndex % completionQueues.Count;
+            var cq = completionQueues.ElementAt(cqIndex);
+
+            var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
             thread.IsBackground = false;
+            thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
             thread.Start();
-            thread.Name = "grpc " + i;
+
             return thread;
         }
 
         /// <summary>
         /// Body of the polling thread.
         /// </summary>
-        private void RunHandlerLoop()
+        private void RunHandlerLoop(CompletionQueueSafeHandle cq)
         {
             CompletionQueueEvent ev;
             do
@@ -124,7 +141,7 @@
                     IntPtr tag = ev.tag;
                     try
                     {
-                        var callback = environment.CompletionRegistry.Extract(tag);
+                        var callback = cq.CompletionRegistry.Extract(tag);
                         callback(success);
                     }
                     catch (Exception e)
@@ -135,5 +152,16 @@
             }
             while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
         }
+
+        private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
+        {
+            var list = new List<CompletionQueueSafeHandle>();
+            for (int i = 0; i < completionQueueCount; i++)
+            {
+                var completionRegistry = new CompletionRegistry(environment);
+                list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
+            }
+            return list.AsReadOnly();
+        }
     }
 }
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 42fd4d4..65607ed 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -137,6 +137,7 @@
         public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release;
 
         public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create;
+        public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue;
         public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port;
         public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port;
         public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start;
@@ -244,6 +245,7 @@
                 this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library);
 
                 this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library);
+                this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library);
                 this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library);
                 this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library);
                 this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library);
@@ -348,6 +350,7 @@
                 this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release;
 
                 this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create;
+                this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue;
                 this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port;
                 this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port;
                 this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start;
@@ -493,7 +496,8 @@
             public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
             public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials);
 
-            public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+            public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args);
+            public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq);
             public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
             public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
             public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
@@ -773,7 +777,10 @@
             // ServerSafeHandle
 
             [DllImport("grpc_csharp_ext.dll")]
-            public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+            public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args);
+
+            [DllImport("grpc_csharp_ext.dll")]
+            public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq);
 
             [DllImport("grpc_csharp_ext.dll")]
             public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr);
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index febebba..6a2f520 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -44,7 +44,7 @@
 {
     internal interface IServerCallHandler
     {
-        Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
+        Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
     }
 
     internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@@ -62,14 +62,14 @@
             this.handler = handler;
         }
 
-        public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+        public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
         {
             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
                 method.ResponseMarshaller.Serializer,
                 method.RequestMarshaller.Deserializer,
-                environment, newRpc.Server);
+                newRpc.Server);
 
-            asyncCall.Initialize(newRpc.Call);
+            asyncCall.Initialize(newRpc.Call, cq);
             var finishedTask = asyncCall.ServerSideCallAsync();
             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -121,14 +121,14 @@
             this.handler = handler;
         }
 
-        public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+        public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
         {
             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
                 method.ResponseMarshaller.Serializer,
                 method.RequestMarshaller.Deserializer,
-                environment, newRpc.Server);
+                newRpc.Server);
 
-            asyncCall.Initialize(newRpc.Call);
+            asyncCall.Initialize(newRpc.Call, cq);
             var finishedTask = asyncCall.ServerSideCallAsync();
             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -179,14 +179,14 @@
             this.handler = handler;
         }
 
-        public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+        public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
         {
             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
                 method.ResponseMarshaller.Serializer,
                 method.RequestMarshaller.Deserializer,
-                environment, newRpc.Server);
+                newRpc.Server);
 
-            asyncCall.Initialize(newRpc.Call);
+            asyncCall.Initialize(newRpc.Call, cq);
             var finishedTask = asyncCall.ServerSideCallAsync();
             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -237,14 +237,14 @@
             this.handler = handler;
         }
 
-        public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+        public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
         {
             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
                 method.ResponseMarshaller.Serializer,
                 method.RequestMarshaller.Deserializer,
-                environment, newRpc.Server);
+                newRpc.Server);
 
-            asyncCall.Initialize(newRpc.Call);
+            asyncCall.Initialize(newRpc.Call, cq);
             var finishedTask = asyncCall.ServerSideCallAsync();
             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -281,13 +281,13 @@
     {
         public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
 
-        public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+        public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
         {
             // We don't care about the payload type here.
             var asyncCall = new AsyncCallServer<byte[], byte[]>(
-                (payload) => payload, (payload) => payload, environment, newRpc.Server);
+                (payload) => payload, (payload) => payload, newRpc.Server);
             
-            asyncCall.Initialize(newRpc.Call);
+            asyncCall.Initialize(newRpc.Call, cq);
             var finishedTask = asyncCall.ServerSideCallAsync();
             await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
             await finishedTask.ConfigureAwait(false);
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index ecfee0b..25b79b4 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -52,16 +52,12 @@
 
         public Task WriteAsync(TResponse message)
         {
-            var taskSource = new AsyncCompletionTaskSource<object>();
-            call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
-            return taskSource.Task;
+            return call.SendMessageAsync(message, GetWriteFlags());
         }
 
         public Task WriteResponseHeadersAsync(Metadata responseHeaders)
         {
-            var taskSource = new AsyncCompletionTaskSource<object>();
-            call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
-            return taskSource.Task;
+            return call.SendInitialMetadataAsync(responseHeaders);
         }
 
         public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 6b5f70e..8581302 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -31,12 +31,6 @@
 
 #endregion
 
-using System;
-using System.Collections.Concurrent;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
-using Grpc.Core.Utils;
-
 namespace Grpc.Core.Internal
 {
     /// <summary>
@@ -50,12 +44,17 @@
         {
         }
 
-        public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
+        public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args)
         {
             // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
             // Doing so would make object finalizer crash if we end up abandoning the handle.
             GrpcEnvironment.GrpcNativeInit();
-            return Native.grpcsharp_server_create(cq, args);
+            return Native.grpcsharp_server_create(args);
+        }
+
+        public void RegisterCompletionQueue(CompletionQueueSafeHandle cq)
+        {
+            Native.grpcsharp_server_register_completion_queue(this, cq);
         }
 
         public int AddInsecurePort(string addr)
@@ -73,18 +72,18 @@
             Native.grpcsharp_server_start(this);
         }
     
-        public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
+        public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
         {
             var ctx = BatchContextSafeHandle.Create();
-            environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
-            Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
+            completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+            Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
         }
 
-        public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
+        public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
         {
             var ctx = BatchContextSafeHandle.Create();
-            environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
-            Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
+            completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+            Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
         }
 
         protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index d538a46..069185e 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -34,8 +34,7 @@
 using System;
 using System.Collections;
 using System.Collections.Generic;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
+using System.Linq;
 using System.Threading.Tasks;
 using Grpc.Core.Internal;
 using Grpc.Core.Logging;
@@ -48,7 +47,7 @@
     /// </summary>
     public class Server
     {
-        const int InitialAllowRpcTokenCount = 10;
+        const int InitialAllowRpcTokenCountPerCq = 10;
         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
 
         readonly AtomicCounter activeCallCounter = new AtomicCounter();
@@ -80,7 +79,12 @@
             this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
             using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
             {
-                this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
+                this.handle = ServerSafeHandle.NewServer(channelArgs);
+            }
+
+            foreach (var cq in environment.CompletionQueues)
+            {
+                this.handle.RegisterCompletionQueue(cq);
             }
         }
 
@@ -133,9 +137,12 @@
 
                 // Starting with more than one AllowOneRpc tokens can significantly increase
                 // unary RPC throughput.
-                for (int i = 0; i < InitialAllowRpcTokenCount; i++)
+                for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
                 {
-                    AllowOneRpc();
+                    foreach (var cq in environment.CompletionQueues)
+                    {
+                        AllowOneRpc(cq);
+                    }
                 }
             }
         }
@@ -154,7 +161,8 @@
                 shutdownRequested = true;
             }
 
-            handle.ShutdownAndNotify(HandleServerShutdown, environment);
+            var cq = environment.CompletionQueues.First();  // any cq will do
+            handle.ShutdownAndNotify(HandleServerShutdown, cq);
             await shutdownTcs.Task.ConfigureAwait(false);
             DisposeHandle();
 
@@ -174,7 +182,8 @@
                 shutdownRequested = true;
             }
 
-            handle.ShutdownAndNotify(HandleServerShutdown, environment);
+            var cq = environment.CompletionQueues.First();  // any cq will do
+            handle.ShutdownAndNotify(HandleServerShutdown, cq);
             handle.CancelAllCalls();
             await shutdownTcs.Task.ConfigureAwait(false);
             DisposeHandle();
@@ -244,11 +253,11 @@
         /// <summary>
         /// Allows one new RPC call to be received by server.
         /// </summary>
-        private void AllowOneRpc()
+        private void AllowOneRpc(CompletionQueueSafeHandle cq)
         {
             if (!shutdownRequested)
             {
-                handle.RequestCall(HandleNewServerRpc, environment);
+                handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
             }
         }
 
@@ -265,7 +274,7 @@
         /// <summary>
         /// Selects corresponding handler for given call and handles the call.
         /// </summary>
-        private async Task HandleCallAsync(ServerRpcNew newRpc)
+        private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
         {
             try
             {
@@ -274,7 +283,7 @@
                 {
                     callHandler = NoSuchMethodCallHandler.Instance;
                 }
-                await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
+                await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
             }
             catch (Exception e)
             {
@@ -285,9 +294,9 @@
         /// <summary>
         /// Handles the native callback.
         /// </summary>
-        private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
+        private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
         {
-			Task.Run(() => AllowOneRpc());
+			Task.Run(() => AllowOneRpc(cq));
 
             if (success)
             {
@@ -296,7 +305,7 @@
                 // after server shutdown, the callback returns with null call
                 if (!newRpc.Call.IsInvalid)
                 {
-                    HandleCallAsync(newRpc);  // we don't need to await.
+                    HandleCallAsync(newRpc, cq);  // we don't need to await.
                 }
             }
         }
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 1541cfd..aea40af 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -471,8 +471,16 @@
 
                 cts.Cancel();
 
-                var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
-                Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+                try
+                {
+                    // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+                    await call.ResponseStream.MoveNext();
+                    Assert.Fail();
+                }
+                catch (RpcException ex)
+                {
+                    Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+                }
             }
             Console.WriteLine("Passed!");
         }
@@ -497,9 +505,16 @@
                     // Deadline was reached before write has started. Eat the exception and continue.
                 }
 
-                var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
-                // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
-                Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
+                try
+                {
+                    await call.ResponseStream.MoveNext();
+                    Assert.Fail();
+                }
+                catch (RpcException ex)
+                {
+                    // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+                    Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
+                }
             }
             Console.WriteLine("Passed!");
         }
@@ -577,9 +592,17 @@
                 await call.RequestStream.WriteAsync(request);
                 await call.RequestStream.CompleteAsync();
 
-                var e = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.ToListAsync());
-                Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
-                Assert.AreEqual(echoStatus.Message, e.Status.Detail);
+                try
+                {
+                    // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+                    await call.ResponseStream.ToListAsync();
+                    Assert.Fail();
+                }
+                catch (RpcException e)
+                {
+                    Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
+                    Assert.AreEqual(echoStatus.Message, e.Status.Detail);
+                }
             }
 
             Console.WriteLine("Passed!");
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 5b8ff9b..4beef9d 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -806,11 +806,14 @@
 /* Server */
 
 GPR_EXPORT grpc_server *GPR_CALLTYPE
-grpcsharp_server_create(grpc_completion_queue *cq,
-                        const grpc_channel_args *args) {
-  grpc_server *server = grpc_server_create(args, NULL);
+grpcsharp_server_create(const grpc_channel_args *args) {
+  return grpc_server_create(args, NULL);
+}
+
+GPR_EXPORT void GPR_CALLTYPE
+grpcsharp_server_register_completion_queue(grpc_server *server,
+                                           grpc_completion_queue *cq) {
   grpc_server_register_completion_queue(server, cq, NULL);
-  return server;
 }
 
 GPR_EXPORT int32_t GPR_CALLTYPE
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index b9e1fe9..dd1b777 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -35,15 +35,15 @@
 
 #include "server.h"
 
-#include <node.h>
 #include <nan.h>
+#include <node.h>
 
 #include <vector>
+#include "call.h"
+#include "completion_queue_async_worker.h"
 #include "grpc/grpc.h"
 #include "grpc/grpc_security.h"
 #include "grpc/support/log.h"
-#include "call.h"
-#include "completion_queue_async_worker.h"
 #include "server_credentials.h"
 #include "timeval.h"
 
@@ -100,8 +100,8 @@
     Nan::Set(obj, Nan::New("host").ToLocalChecked(),
              Nan::New(details.host).ToLocalChecked());
     Nan::Set(obj, Nan::New("deadline").ToLocalChecked(),
-             Nan::New<Date>(
-                 TimespecToMilliseconds(details.deadline)).ToLocalChecked());
+             Nan::New<Date>(TimespecToMilliseconds(details.deadline))
+                 .ToLocalChecked());
     Nan::Set(obj, Nan::New("metadata").ToLocalChecked(),
              ParseMetadata(&request_metadata));
     return scope.Escape(obj);
@@ -117,14 +117,13 @@
   grpc_metadata_array request_metadata;
 
  protected:
-  std::string GetTypeString() const {
-    return "new_call";
-  }
+  std::string GetTypeString() const { return "new_call"; }
 };
 
 Server::Server(grpc_server *server) : wrapped_server(server) {
   shutdown_queue = grpc_completion_queue_create(NULL);
-  grpc_server_register_completion_queue(server, shutdown_queue, NULL);
+  grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
+                                                      NULL);
 }
 
 Server::~Server() {
@@ -156,8 +155,7 @@
 }
 
 void Server::ShutdownServer() {
-  grpc_server_shutdown_and_notify(this->wrapped_server,
-                                  this->shutdown_queue,
+  grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue,
                                   NULL);
   grpc_server_cancel_all_calls(this->wrapped_server);
   grpc_completion_queue_pluck(this->shutdown_queue, NULL,
@@ -170,8 +168,8 @@
   if (!info.IsConstructCall()) {
     const int argc = 1;
     Local<Value> argv[argc] = {info[0]};
-    MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance(
-        argc, argv);
+    MaybeLocal<Object> maybe_instance =
+        constructor->GetFunction()->NewInstance(argc, argv);
     if (maybe_instance.IsEmpty()) {
       // There's probably a pending exception
       return;
@@ -185,8 +183,9 @@
   grpc_channel_args *channel_args;
   if (!ParseChannelArgs(info[0], &channel_args)) {
     DeallocateChannelArgs(channel_args);
-    return Nan::ThrowTypeError("Server options must be an object with "
-                               "string keys and integer or string values");
+    return Nan::ThrowTypeError(
+        "Server options must be an object with "
+        "string keys and integer or string values");
   }
   wrapped_server = grpc_server_create(channel_args, NULL);
   DeallocateChannelArgs(channel_args);
@@ -218,8 +217,7 @@
 
 NAN_METHOD(Server::AddHttp2Port) {
   if (!HasInstance(info.This())) {
-    return Nan::ThrowTypeError(
-        "addHttp2Port can only be called on a Server");
+    return Nan::ThrowTypeError("addHttp2Port can only be called on a Server");
   }
   if (!info[0]->IsString()) {
     return Nan::ThrowTypeError(
@@ -239,8 +237,7 @@
                                                *Utf8String(info[0]));
   } else {
     port = grpc_server_add_secure_http2_port(server->wrapped_server,
-                                             *Utf8String(info[0]),
-                                             creds);
+                                             *Utf8String(info[0]), creds);
   }
   info.GetReturnValue().Set(Nan::New<Number>(port));
 }
@@ -262,8 +259,7 @@
   Server *server = ObjectWrap::Unwrap<Server>(info.This());
   unique_ptr<OpVec> ops(new OpVec());
   grpc_server_shutdown_and_notify(
-      server->wrapped_server,
-      CompletionQueueAsyncWorker::GetQueue(),
+      server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(),
       new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
                      shared_ptr<Resources>(nullptr)));
   CompletionQueueAsyncWorker::Next();
diff --git a/src/objective-c/CronetFramework.podspec b/src/objective-c/CronetFramework.podspec
new file mode 100644
index 0000000..20af764
--- /dev/null
+++ b/src/objective-c/CronetFramework.podspec
@@ -0,0 +1,43 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+Pod::Spec.new do |s|
+  s.name         = "CronetFramework"
+  s.version      = "0.0.2"
+  s.summary      = "Cronet, precompiled and used as a framework."
+  s.homepage     = "http://chromium.org"
+  s.license      = { :type => 'BSD' }
+  s.vendored_framework = "Cronet.framework"
+  s.author             = "The Chromium Authors"
+  s.ios.deployment_target = "8.0"
+  s.source       = { :http => 'https://storage.googleapis.com/grpc-precompiled-binaries/cronet/Cronet.framework.zip' }
+  s.preserve_paths = "Cronet.framework"
+  s.public_header_files = "Cronet.framework/Headers/**/*{.h}"
+end
diff --git a/src/objective-c/GRPCClient/GRPCCall+Cronet.h b/src/objective-c/GRPCClient/GRPCCall+Cronet.h
new file mode 100644
index 0000000..7f7fc6c
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCCall+Cronet.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+#import <Cronet/Cronet.h>
+
+#import "GRPCCall.h"
+
+/**
+ * Methods for using cronet transport.
+ */
+@interface GRPCCall (Cronet)
+
+/**
+ * This method should be called before issuing the first RPC. It should be
+ * called only once. Create an instance of Cronet engine in your app elsewhere
+ * and pass the instance pointer in the cronet_engine parameter. Once set,
+ * all subsequent RPCs will use Cronet transport. The method is not thread
+ * safe.
+ */
++(void)useCronetWithEngine:(cronet_engine *)engine;
+
++(cronet_engine *)cronetEngine;
+
++(BOOL)isUsingCronet;
+
+@end
diff --git a/src/objective-c/GRPCClient/GRPCCall+Cronet.m b/src/objective-c/GRPCClient/GRPCCall+Cronet.m
new file mode 100644
index 0000000..69a410e
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCCall+Cronet.m
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import "GRPCCall+Cronet.h"
+
+static BOOL useCronet = NO;
+static cronet_engine *globalCronetEngine;
+
+@implementation GRPCCall (Cronet)
+
++ (void)useCronetWithEngine:(cronet_engine *)engine {
+  useCronet = YES;
+  globalCronetEngine = engine;
+}
+
++ (cronet_engine *)cronetEngine {
+  return globalCronetEngine;
+}
+
++ (BOOL)isUsingCronet {
+  return useCronet;
+}
+
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h
index 70d1a9b..3219835 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.h
@@ -56,6 +56,12 @@
 + (nullable GRPCChannel *)secureChannelWithHost:(nonnull NSString *)host;
 
 /**
+ * Creates a secure channel to the specified @c host using Cronet as a transport mechanism.
+ */
++ (nullable GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
+                                          channelArgs:(NSDictionary *)channelArgs;
+
+/**
  * Creates a secure channel to the specified @c host using the specified @c credentials and
  * @c channelArgs. Only in tests should @c GRPC_SSL_TARGET_NAME_OVERRIDE_ARG channel arg be set.
  */
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m
index 203ef58..e4e0dbe 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.m
@@ -34,10 +34,13 @@
 #import "GRPCChannel.h"
 
 #include <grpc/grpc_security.h>
+#include <grpc/grpc_cronet.h>
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 
+#import <Cronet/Cronet.h>
+#import <GRPCClient/GRPCCall+Cronet.h>
 #import "GRPCCompletionQueue.h"
 
 void freeChannelArgs(grpc_channel_args *channel_args) {
@@ -99,6 +102,22 @@
   grpc_channel_args *_channelArgs;
 }
 
+- (instancetype)initWithHost:(NSString *)host
+                cronetEngine:(cronet_engine *)cronetEngine
+                 channelArgs:(NSDictionary *)channelArgs {
+  if (!host) {
+    [NSException raise:NSInvalidArgumentException format:@"host argument missing"];
+  }
+
+  if (self = [super init]) {
+    _channelArgs = buildChannelArgs(channelArgs);
+    _host = [host copy];
+    _unmanagedChannel = grpc_cronet_secure_channel_create(cronetEngine, _host.UTF8String, _channelArgs,
+                                                     NULL);
+  }
+
+  return self;
+}
 
 - (instancetype)initWithHost:(NSString *)host
                       secure:(BOOL)secure
@@ -133,6 +152,17 @@
   freeChannelArgs(_channelArgs);
 }
 
++ (GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
+                                 channelArgs:(NSDictionary *)channelArgs {
+  cronet_engine *engine = [GRPCCall cronetEngine];
+  if (!engine) {
+    [NSException raise:NSInvalidArgumentException
+                format:@"cronet_engine is NULL. Set it first."];
+    return nil;
+  }
+  return [[GRPCChannel alloc] initWithHost:host cronetEngine:engine channelArgs:channelArgs];
+}
+
 + (GRPCChannel *)secureChannelWithHost:(NSString *)host {
   return [[GRPCChannel alloc] initWithHost:host secure:YES credentials:NULL channelArgs:NULL];
 }
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m
index 43166cb..7da5088 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.m
+++ b/src/objective-c/GRPCClient/private/GRPCHost.m
@@ -37,6 +37,7 @@
 #include <grpc/grpc_security.h>
 #import <GRPCClient/GRPCCall.h>
 #import <GRPCClient/GRPCCall+ChannelArg.h>
+#import <GRPCClient/GRPCCall+Cronet.h>
 
 #import "GRPCChannel.h"
 #import "GRPCCompletionQueue.h"
@@ -200,15 +201,21 @@
 
 - (GRPCChannel *)newChannel {
   NSDictionary *args = [self channelArgs];
+  BOOL useCronet = [GRPCCall isUsingCronet];
   if (_secure) {
       GRPCChannel *channel;
       @synchronized(self) {
         if (_channelCreds == nil) {
           [self setTLSPEMRootCerts:nil withPrivateKey:nil withCertChain:nil error:nil];
         }
-        channel = [GRPCChannel secureChannelWithHost:_address
-                                          credentials:_channelCreds
-                                          channelArgs:args];
+        if (useCronet) {
+          channel = [GRPCChannel secureCronetChannelWithHost:_address
+                                                 channelArgs:args];
+        } else {
+          channel = [GRPCChannel secureChannelWithHost:_address
+                                            credentials:_channelCreds
+                                            channelArgs:args];
+        }
       }
       return channel;
   } else {
diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m
index 4f096b9..781c500 100644
--- a/src/objective-c/tests/InteropTests.m
+++ b/src/objective-c/tests/InteropTests.m
@@ -35,7 +35,9 @@
 
 #include <grpc/status.h>
 
+#import <Cronet/Cronet.h>
 #import <GRPCClient/GRPCCall+Tests.h>
+#import <GRPCClient/GRPCCall+Cronet.h>
 #import <ProtoRPC/ProtoRPC.h>
 #import <RemoteTest/Empty.pbobjc.h>
 #import <RemoteTest/Messages.pbobjc.h>
@@ -78,6 +80,8 @@
 
 #pragma mark Tests
 
+static cronet_engine *cronetEngine = NULL;
+
 @implementation InteropTests {
   RMTTestService *_service;
 }
@@ -88,6 +92,15 @@
 
 - (void)setUp {
   _service = self.class.host ? [RMTTestService serviceWithHost:self.class.host] : nil;
+#ifdef GRPC_COMPILE_WITH_CRONET
+  if (cronetEngine == NULL) {
+    // Cronet setup
+    [Cronet setHttp2Enabled:YES];
+    [Cronet start];
+    cronetEngine = [Cronet getGlobalEngine];
+    [GRPCCall useCronetWithEngine:cronetEngine];
+  }
+#endif
 }
 
 - (void)testEmptyUnaryRPC {
@@ -245,6 +258,8 @@
   [self waitForExpectationsWithTimeout:4 handler:nil];
 }
 
+#ifndef GRPC_COMPILE_WITH_CRONET
+// TODO(makdharma@): Fix this test
 - (void)testEmptyStreamRPC {
   XCTAssertNotNil(self.class.host);
   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
@@ -258,6 +273,7 @@
   }];
   [self waitForExpectationsWithTimeout:2 handler:nil];
 }
+#endif
 
 - (void)testCancelAfterBeginRPC {
   XCTAssertNotNil(self.class.host);
diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile
index 7ec7a25..508641d 100644
--- a/src/objective-c/tests/Podfile
+++ b/src/objective-c/tests/Podfile
@@ -3,6 +3,7 @@
 
 pod 'Protobuf', :path => "../../../third_party/protobuf"
 pod 'BoringSSL', :podspec => ".."
+pod 'CronetFramework', :podspec => ".."
 pod 'gRPC', :path => "../../.."
 pod 'RemoteTest', :path => "RemoteTestClient"
 
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 66e6e6b..d42c580 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -336,6 +336,8 @@
   void grpc_server_register_completion_queue(grpc_server *server,
                                              grpc_completion_queue *cq,
                                              void *reserved) nogil
+  void grpc_server_register_non_listening_completion_queue(
+      grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil
   int grpc_server_add_insecure_http2_port(
       grpc_server *server, const char *addr) nogil
   void grpc_server_start(grpc_server *server) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index 8419a59..5594875 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -81,11 +81,20 @@
           self.c_server, queue.c_completion_queue, NULL)
     self.registered_completion_queues.append(queue)
 
+  def register_non_listening_completion_queue(
+      self, CompletionQueue queue not None):
+    if self.is_started:
+      raise ValueError("cannot register completion queues after start")
+    with nogil:
+      grpc_server_register_non_listening_completion_queue(
+          self.c_server, queue.c_completion_queue, NULL)
+    self.registered_completion_queues.append(queue)
+
   def start(self):
     if self.is_started:
       raise ValueError("the server has already started")
     self.backup_shutdown_queue = CompletionQueue()
-    self.register_completion_queue(self.backup_shutdown_queue)
+    self.register_non_listening_completion_queue(self.backup_shutdown_queue)
     self.is_started = True
     with nogil:
       grpc_server_start(self.c_server)
@@ -169,4 +178,3 @@
           time.sleep(0)
       with nogil:
         grpc_server_destroy(self.c_server)
-
diff --git a/templates/Makefile.template b/templates/Makefile.template
index 0d5f0ec..f84b5f6 100644
--- a/templates/Makefile.template
+++ b/templates/Makefile.template
@@ -1207,7 +1207,7 @@
   $(LIBDIR)/$(CONFIG)/pkgconfig/grpc_zookeeper.pc:
   	$(E) "[MAKE]    Generating $@"
   	$(Q) mkdir -p $(@D)
-  	$(Q) echo -e "$(GRPC_ZOOKEEPER_PC_FILE)" >$@
+  	$(Q) echo "$(GRPC_ZOOKEEPER_PC_FILE)" | tr , '\n' >$@
 
   $(LIBDIR)/$(CONFIG)/pkgconfig/grpc++.pc:
   	$(E) "[MAKE]    Generating $@"
diff --git a/tools/jenkins/run_full_performance.sh b/tools/jenkins/run_full_performance.sh
index bb1f79f..3feda86 100755
--- a/tools/jenkins/run_full_performance.sh
+++ b/tools/jenkins/run_full_performance.sh
@@ -40,7 +40,7 @@
     --netperf \
     --category all \
     --bq_result_table performance_test.performance_experiment \
-    --remote_worker_host grpc-performance-server-8core grpc-performance-client-8core \
+    --remote_worker_host grpc-performance-server-8core grpc-performance-client-8core grpc-performance-client2-8core \
     || EXIT_CODE=1
 
 # scalability with 32cores (and upload to a different BQ table)
@@ -49,7 +49,7 @@
     --netperf \
     --category scalable \
     --bq_result_table performance_test.performance_experiment_32core \
-    --remote_worker_host grpc-performance-server-32core grpc-performance-client-32core \
+    --remote_worker_host grpc-performance-server-32core grpc-performance-client-32core grpc-performance-client2-32core \
     || EXIT_CODE=1
 
 exit $EXIT_CODE