simplify stream reads on client side
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 132b426..d687bb6 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -56,14 +56,15 @@
         // Completion of a pending unary response if not null.
         TaskCompletionSource<TResponse> unaryResponseTcs;
 
+        // Indicates that steaming call has finished.
+        TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+
         // Response headers set here once received.
         TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
 
         // Set after status is received. Used for both unary and streaming response calls.
         ClientSideStatus? finishedStatus;
 
-        bool readObserverCompleted;  // True if readObserver has already been completed.
-
         public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
             : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
         {
@@ -74,8 +75,7 @@
         /// <summary>
         /// This constructor should only be used for testing.
         /// </summary>
-        public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall)
-            : this(callDetails)
+        public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
         {
             this.injectedNativeCall = injectedNativeCall;
         }
@@ -192,7 +192,6 @@
                 Initialize(environment.CompletionQueue);
 
                 halfcloseRequested = true;
-                halfclosed = true;  // halfclose not confirmed yet, but it will be once finishedHandler is called.
 
                 byte[] payload = UnsafeSerialize(msg);
 
@@ -261,6 +260,17 @@
         }
 
         /// <summary>
+        /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
+        /// </summary>
+        public Task StreamingCallFinishedTask
+        {
+            get
+            {
+                return streamingCallFinishedTcs.Task;
+            }
+        }
+
+        /// <summary>
         /// Get the task that completes once response headers are received.
         /// </summary>
         public Task<Metadata> ResponseHeadersAsync
@@ -305,36 +315,6 @@
             }
         }
 
-        /// <summary>
-        /// On client-side, we only fire readCompletionDelegate once all messages have been read 
-        /// and status has been received.
-        /// </summary>
-        protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
-        {
-            if (completionDelegate != null && readingDone && finishedStatus.HasValue)
-            {
-                bool shouldComplete;
-                lock (myLock)
-                {
-                    shouldComplete = !readObserverCompleted;
-                    readObserverCompleted = true;
-                }
-
-                if (shouldComplete)
-                {
-                    var status = finishedStatus.Value.Status;
-                    if (status.StatusCode != StatusCode.OK)
-                    {
-                        FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
-                    }
-                    else
-                    {
-                        FireCompletion(completionDelegate, default(TResponse), null);
-                    }
-                }
-            }
-        }
-
         protected override void OnAfterReleaseResources()
         {
             details.Channel.RemoveCallReference(this);
@@ -392,8 +372,6 @@
                 finished = true;
                 finishedStatus = receivedStatus;
 
-                halfclosed = true;
-
                 ReleaseResourcesIfPossible();
             }
 
@@ -403,7 +381,6 @@
 
             if (!success || status.StatusCode != StatusCode.OK)
             {
-
                 unaryResponseTcs.SetException(new RpcException(status));
                 return;
             }
@@ -420,18 +397,23 @@
         /// </summary>
         private void HandleFinished(bool success, ClientSideStatus receivedStatus)
         {
-            AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
             lock (myLock)
             {
                 finished = true;
                 finishedStatus = receivedStatus;
 
-                origReadCompletionDelegate = readCompletionDelegate;
-
                 ReleaseResourcesIfPossible();
             }
 
-            ProcessLastRead(origReadCompletionDelegate);
+            var status = receivedStatus.Status;
+
+            if (!success || status.StatusCode != StatusCode.OK)
+            {
+                streamingCallFinishedTcs.SetException(new RpcException(status));
+                return;
+            }
+
+            streamingCallFinishedTcs.SetResult(null);
         }
     }
 }
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 7744dbe..4d20394 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -61,19 +61,17 @@
         protected bool disposed;
 
         protected bool started;
-        protected bool errorOccured;
         protected bool cancelRequested;
 
         protected AsyncCompletionDelegate<object> sendCompletionDelegate;  // Completion of a pending send or sendclose if not null.
         protected AsyncCompletionDelegate<TRead> readCompletionDelegate;  // Completion of a pending send or sendclose if not null.
 
-        protected bool readingDone;
-        protected bool halfcloseRequested;
-        protected bool halfclosed;
+        protected bool readingDone;  // True if last read (i.e. read with null payload) was already received.
+        protected bool halfcloseRequested;  // True if send close have been initiated.
         protected bool finished;  // True if close has been received from the peer.
 
         protected bool initialMetadataSent;
-        protected long streamingWritesCounter;
+        protected long streamingWritesCounter;  // Number of streaming send operations started so far.
 
         public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
         {
@@ -161,16 +159,6 @@
             }
         }
 
-        // TODO(jtattermusch): find more fitting name for this method.
-        /// <summary>
-        /// Default behavior just completes the read observer, but more sofisticated behavior might be required
-        /// by subclasses.
-        /// </summary>
-        protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate)
-        {
-            FireCompletion(completionDelegate, default(TRead), null);
-        }
-
         /// <summary>
         /// If there are no more pending actions and no new actions can be started, releases
         /// the underlying native resources.
@@ -179,7 +167,7 @@
         {
             if (!disposed && call != null)
             {
-                bool noMoreSendCompletions = halfclosed || ((cancelRequested || finished) && sendCompletionDelegate == null);
+                bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
                 if (noMoreSendCompletions && readingDone && finished)
                 {
                     ReleaseResources();
@@ -206,7 +194,6 @@
         protected void CheckSendingAllowed()
         {
             Preconditions.CheckState(started);
-            Preconditions.CheckState(!errorOccured);
             CheckNotCancelled();
             Preconditions.CheckState(!disposed);
 
@@ -218,7 +205,6 @@
         protected virtual void CheckReadingAllowed()
         {
             Preconditions.CheckState(started);
-            Preconditions.CheckState(!errorOccured);
             Preconditions.CheckState(!disposed);
 
             Preconditions.CheckState(!readingDone, "Stream has already been closed.");
@@ -312,7 +298,6 @@
             AsyncCompletionDelegate<object> origCompletionDelegate = null;
             lock (myLock)
             {
-                halfclosed = true;
                 origCompletionDelegate = sendCompletionDelegate;
                 sendCompletionDelegate = null;
 
@@ -338,15 +323,11 @@
             lock (myLock)
             {
                 origCompletionDelegate = readCompletionDelegate;
-                if (receivedMessage != null)
+                readCompletionDelegate = null;
+
+                if (receivedMessage == null)
                 {
-                    readCompletionDelegate = null;
-                }
-                else
-                {
-                    // This was the last read. Keeping the readCompletionDelegate
-                    // to be either fired by this handler or by client-side finished
-                    // handler.
+                    // This was the last read.
                     readingDone = true;
                 }
 
@@ -365,7 +346,7 @@
             }
             else
             {
-                ProcessLastRead(origCompletionDelegate);
+                FireCompletion(origCompletionDelegate, default(TRead), null);
             }
         }
     }
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index 6c44521..b4a7335 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -72,7 +72,13 @@
             call.StartReadMessage(taskSource.CompletionDelegate);
             var result = await taskSource.Task;
             this.current = result;
-            return result != null;
+
+            if (result == null)
+            {
+                await call.StreamingCallFinishedTask;
+                return false;
+            }
+            return true;
         }
 
         public void Dispose()