Better concurrency handling
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index eee41cc..3bb9e2a 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -34,13 +34,14 @@
 #import "GRXBufferedPipe.h"
 
 @interface GRXBufferedPipe ()
-@property(atomic) NSError *errorOrNil;
+@property(atomic) id<GRXWriteable> writeable;
+@property(atomic) BOOL inputIsFinished;
 @end
 
 @implementation GRXBufferedPipe {
-  id<GRXWriteable> _writeable;
-  BOOL _inputIsFinished;
+  NSError *_errorOrNil;
   dispatch_queue_t _writeQueue;
+  dispatch_once_t _finishQueue;
 }
 
 @synthesize state = _state;
@@ -53,6 +54,7 @@
   if (self = [super init]) {
     _state = GRXWriterStateNotStarted;
     _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+    self.inputIsFinished = NO;
     dispatch_suspend(_writeQueue);
   }
   return self;
@@ -61,7 +63,7 @@
 #pragma mark GRXWriteable implementation
 
 - (void)writeValue:(id)value {
-  if (_inputIsFinished) {
+  if (self.inputIsFinished) {
     return;
   }
   if ([value respondsToSelector:@selector(copy)]) {
@@ -73,28 +75,32 @@
   __weak GRXBufferedPipe *weakSelf = self;
   dispatch_async(_writeQueue, ^(void) {
     GRXBufferedPipe *strongSelf = weakSelf;
-    if (strongSelf && !strongSelf.errorOrNil) {
-      [strongSelf->_writeable writeValue:value];
+    if (strongSelf && strongSelf.writeable) {
+      [strongSelf.writeable writeValue:value];
     }
   });
 }
 
 - (void)writesFinishedWithError:(NSError *)errorOrNil {
-  if (_inputIsFinished) {
+  if (self.inputIsFinished) {
     return;
   }
-  _inputIsFinished = YES;
-  self.errorOrNil = errorOrNil;
+  self.inputIsFinished = YES;
   if (errorOrNil) {
     // No need to write pending values.
-    [self finishWithError:_errorOrNil];
+    dispatch_once(&_finishQueue, ^{
+      _errorOrNil = errorOrNil;
+      [self finishWithError:_errorOrNil];
+    });
   } else {
-    __weak GRXBufferedPipe *weakSelf = self;
-    dispatch_async(_writeQueue, ^{
-      GRXBufferedPipe *strongSelf = weakSelf;
-      if (strongSelf) {
-        [strongSelf finishWithError:nil];
-      }
+    dispatch_once(&_finishQueue, ^{
+      __weak GRXBufferedPipe *weakSelf = self;
+      dispatch_async(_writeQueue, ^{
+        GRXBufferedPipe *strongSelf = weakSelf;
+        if (strongSelf) {
+          [strongSelf finishWithError:nil];
+        }
+      });
     });
   }
 }
@@ -143,9 +149,12 @@
 }
 
 - (void)finishWithError:(NSError *)errorOrNil {
-  id<GRXWriteable> writeable = _writeable;
+  id<GRXWriteable> writeable = self.writeable;
+  self.writeable = nil;
   self.state = GRXWriterStateFinished;
-  [writeable writesFinishedWithError:errorOrNil];
+  dispatch_async(_writeQueue, ^{
+    [writeable writesFinishedWithError:errorOrNil];
+  });
 }
 
 @end
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index 770c034..500b8a6 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -182,9 +182,13 @@
 }
 
 - (void)testBufferedPipePropagatesError {
+  __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
   // Given:
   CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
-  id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+  id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+    handler.block(value, errorOrNil);
+    [expectation fulfill];
+  }];
   NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
 
   // If:
@@ -193,6 +197,7 @@
   [pipe writesFinishedWithError:anyError];
 
   // Then:
+  [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
   XCTAssertEqual(handler.timesCalled, 1);
   XCTAssertEqualObjects(handler.value, nil);
   XCTAssertEqualObjects(handler.errorOrNil, anyError);