Adds GRXBufferedPipe
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
new file mode 100644
index 0000000..02dc6b5
--- /dev/null
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "GRXWriteable.h"
+#import "GRXWriter.h"
+
+// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
+// to -startWithWriteable:).
+// Once it is started, whatever values are written into it (via -didReceiveValue:) will be
+// propagated immediately, unless flow control prevents it.
+// If it is throttled and keeps receiving values, as well as if it receives values before being
+// started, it will buffer them and propagate them in order as soon as its state becomes
+// GRXWriterStateStarted.
+// If it receives an error (via -didFinishWithError:), it will drop any buffered values and
+// propagate the error immediately.
+@interface GRXBufferedPipe : NSObject<GRXWriteable, GRXWriter>
+
+// Convenience constructor.
++ (instancetype)pipe;
+
+@end
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
new file mode 100644
index 0000000..e1295ce
--- /dev/null
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import "GRXBufferedPipe.h"
+
+@implementation GRXBufferedPipe {
+  id<GRXWriteable> _writeable;
+  NSMutableArray *_queue;
+  BOOL _inputIsFinished;
+  NSError *_errorOrNil;
+}
+
+@synthesize state = _state;
+
++ (instancetype)pipe {
+  return [[self alloc] init];
+}
+
+- (instancetype)init {
+  if (self = [super init]) {
+    _queue = [NSMutableArray array];
+    _state = GRXWriterStateNotStarted;
+  }
+  return self;
+}
+
+- (id)popValue {
+  id value = _queue[0];
+  [_queue removeObjectAtIndex:0];
+  return value;
+}
+
+- (void)writeBufferUntilPausedOrStopped {
+  while (_state == GRXWriterStateStarted && _queue.count > 0) {
+    [_writeable didReceiveValue:[self popValue]];
+  }
+  if (_inputIsFinished && _queue.count == 0) {
+    // Our writer finished normally while we were paused or not-started-yet.
+    [self finishWithError:_errorOrNil];
+  }
+}
+
+#pragma mark GRXWriteable implementation
+
+// Returns whether events can be simply propagated to the other end of the pipe.
+- (BOOL)shouldFastForward {
+  return _state == GRXWriterStateStarted && _queue.count == 0;
+}
+
+- (void)didReceiveValue:(id)value {
+  if (self.shouldFastForward) {
+    // Skip the queue.
+    [_writeable didReceiveValue:value];
+  } else {
+    // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
+    // So just buffer the new value.
+    [_queue addObject:value];
+  }
+}
+
+- (void)didFinishWithError:(NSError *)errorOrNil {
+  _inputIsFinished = YES;
+  _errorOrNil = errorOrNil;
+  if (errorOrNil || self.shouldFastForward) {
+    // No need to write pending values.
+    [self finishWithError:_errorOrNil];
+  }
+}
+
+#pragma mark GRXWriter implementation
+
+- (void)setState:(GRXWriterState)newState {
+  // Manual transitions are only allowed from the started or paused states.
+  if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+    return;
+  }
+
+  switch (newState) {
+    case GRXWriterStateFinished:
+      _state = newState;
+      _queue = nil;
+      // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
+      // writeable to be messaged anymore.
+      _writeable = nil;
+      return;
+    case GRXWriterStatePaused:
+      _state = newState;
+      return;
+    case GRXWriterStateStarted:
+      if (_state == GRXWriterStatePaused) {
+        _state = newState;
+        [self writeBufferUntilPausedOrStopped];
+      }
+      return;
+    case GRXWriterStateNotStarted:
+      return;
+  }
+}
+
+- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+  _state = GRXWriterStateStarted;
+  _writeable = writeable;
+  [self writeBufferUntilPausedOrStopped];
+}
+
+- (void)finishWithError:(NSError *)errorOrNil {
+  id<GRXWriteable> writeable = _writeable;
+  self.state = GRXWriterStateFinished;
+  [writeable didFinishWithError:errorOrNil];
+}
+
+@end