Fix pipeline finishing bug
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index bffabc5..90d5116 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -70,48 +70,66 @@
   __weak GRXBufferedPipe *weakSelf = self;
   dispatch_async(_writeQueue, ^(void) {
     GRXBufferedPipe *strongSelf = weakSelf;
-    if (strongSelf) {
+    if (strongSelf && !strongSelf->_errorOrNil) {
       [strongSelf->_writeable writeValue:value];
     }
   });
 }
 
 - (void)writesFinishedWithError:(NSError *)errorOrNil {
+  if (_inputIsFinished) {
+    return;
+  }
   _inputIsFinished = YES;
   _errorOrNil = errorOrNil;
   if (errorOrNil) {
     // No need to write pending values.
     [self finishWithError:_errorOrNil];
+  } else {
+    __weak GRXBufferedPipe *weakSelf = self;
+    dispatch_async(_writeQueue, ^{
+      GRXBufferedPipe *strongSelf = weakSelf;
+      if (strongSelf) {
+        [strongSelf finishWithError:_errorOrNil];
+      }
+    });
   }
 }
 
 #pragma mark GRXWriter implementation
 
 - (void)setState:(GRXWriterState)newState {
-  // Manual transitions are only allowed from the started or paused states.
-  if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
-    return;
-  }
+  @synchronized (self) {
+    // Manual transitions are only allowed from the started or paused states.
+    if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+      return;
+    }
 
-  switch (newState) {
-    case GRXWriterStateFinished:
-      _state = newState;
-      // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
-      // writeable to be messaged anymore.
-      _writeable = nil;
-      return;
-    case GRXWriterStatePaused:
-      _state = newState;
-      dispatch_suspend(_writeQueue);
-      return;
-    case GRXWriterStateStarted:
-      if (_state == GRXWriterStatePaused) {
+    switch (newState) {
+      case GRXWriterStateFinished:
+        if (_state == GRXWriterStatePaused) {
+          dispatch_resume(_writeQueue);
+        }
         _state = newState;
-        dispatch_resume(_writeQueue);
-      }
-      return;
-    case GRXWriterStateNotStarted:
-      return;
+        // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
+        // writeable to be messaged anymore.
+        _writeable = nil;
+        return;
+      case GRXWriterStatePaused:
+        if (_state == GRXWriterStateStarted) {
+          _state = newState;
+          dispatch_suspend(_writeQueue);
+        }
+        return;
+      case GRXWriterStateStarted:
+        if (_state == GRXWriterStatePaused) {
+        _state = newState;
+          dispatch_resume(_writeQueue);
+        }
+        return;
+      case GRXWriterStateNotStarted:
+        return;
+    }
   }
 }