blob: 577a5e9a42b824a4e845d877b4ca996a62851b8a [file] [log] [blame]
Jorge Canizales142acc92015-05-15 18:43:34 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Jorge Canizales142acc92015-05-15 18:43:34 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Jorge Canizales142acc92015-05-15 18:43:34 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Jorge Canizales142acc92015-05-15 18:43:34 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Jorge Canizales142acc92015-05-15 18:43:34 -070016 *
17 */
18
19#import "GRXBufferedPipe.h"
20
Muxi Yand6545bb2017-06-05 09:22:15 -070021@interface GRXBufferedPipe ()
Muxi Yand7d6a2e2017-06-06 14:39:15 -070022@property(atomic) id<GRXWriteable> writeable;
Muxi Yand6545bb2017-06-05 09:22:15 -070023@end
24
Jorge Canizales142acc92015-05-15 18:43:34 -070025@implementation GRXBufferedPipe {
Muxi Yand7d6a2e2017-06-06 14:39:15 -070026 NSError *_errorOrNil;
Muxi Yan91d7bb02017-05-10 14:26:40 -070027 dispatch_queue_t _writeQueue;
Jorge Canizales142acc92015-05-15 18:43:34 -070028}
29
30@synthesize state = _state;
31
32+ (instancetype)pipe {
33 return [[self alloc] init];
34}
35
36- (instancetype)init {
37 if (self = [super init]) {
Jorge Canizales142acc92015-05-15 18:43:34 -070038 _state = GRXWriterStateNotStarted;
Muxi Yan91d7bb02017-05-10 14:26:40 -070039 _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
Muxi Yanec8e8252017-05-15 14:59:07 -070040 dispatch_suspend(_writeQueue);
Jorge Canizales142acc92015-05-15 18:43:34 -070041 }
42 return self;
43}
44
Jorge Canizales142acc92015-05-15 18:43:34 -070045#pragma mark GRXWriteable implementation
46
Jorge Canizalesa90a9c32015-05-18 17:12:41 -070047- (void)writeValue:(id)value {
Muxi Yan91d7bb02017-05-10 14:26:40 -070048 if ([value respondsToSelector:@selector(copy)]) {
Muxi Yanec8e8252017-05-15 14:59:07 -070049 // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
50 // So just buffer the new value.
51 // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
Muxi Yan91d7bb02017-05-10 14:26:40 -070052 value = [value copy];
Jorge Canizales142acc92015-05-15 18:43:34 -070053 }
Muxi Yanec8e8252017-05-15 14:59:07 -070054 __weak GRXBufferedPipe *weakSelf = self;
Muxi Yan91d7bb02017-05-10 14:26:40 -070055 dispatch_async(_writeQueue, ^(void) {
Muxi Yand4792e92017-06-13 10:00:53 -070056 [weakSelf.writeable writeValue:value];
Muxi Yan91d7bb02017-05-10 14:26:40 -070057 });
Jorge Canizales142acc92015-05-15 18:43:34 -070058}
59
Jorge Canizalesb2c300c2015-05-18 17:19:16 -070060- (void)writesFinishedWithError:(NSError *)errorOrNil {
Muxi Yand6cee152017-06-26 17:09:09 -070061 __weak GRXBufferedPipe *weakSelf = self;
62 dispatch_async(_writeQueue, ^{
Muxi Yanc05d1b42017-06-26 17:25:01 -070063 [weakSelf finishWithError:errorOrNil];
Muxi Yand6cee152017-06-26 17:09:09 -070064 });
Jorge Canizales142acc92015-05-15 18:43:34 -070065}
66
67#pragma mark GRXWriter implementation
68
69- (void)setState:(GRXWriterState)newState {
Muxi Yan033db462017-05-23 17:17:20 -070070 @synchronized (self) {
71 // Manual transitions are only allowed from the started or paused states.
72 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
73 return;
74 }
Jorge Canizales142acc92015-05-15 18:43:34 -070075
Muxi Yan033db462017-05-23 17:17:20 -070076 switch (newState) {
77 case GRXWriterStateFinished:
Muxi Yan08fef092017-06-26 12:19:27 -070078 self.writeable = nil;
Muxi Yan033db462017-05-23 17:17:20 -070079 if (_state == GRXWriterStatePaused) {
Muxi Yand6cee152017-06-26 17:09:09 -070080 dispatch_resume(_writeQueue);
Muxi Yan033db462017-05-23 17:17:20 -070081 }
Jorge Canizales142acc92015-05-15 18:43:34 -070082 _state = newState;
Muxi Yan033db462017-05-23 17:17:20 -070083 return;
84 case GRXWriterStatePaused:
85 if (_state == GRXWriterStateStarted) {
86 _state = newState;
87 dispatch_suspend(_writeQueue);
88 }
89 return;
90 case GRXWriterStateStarted:
Muxi Yanc05d1b42017-06-26 17:25:01 -070091 if (_state == GRXWriterStatePaused) {
Muxi Yand6545bb2017-06-05 09:22:15 -070092 _state = newState;
Muxi Yan033db462017-05-23 17:17:20 -070093 dispatch_resume(_writeQueue);
94 }
95 return;
96 case GRXWriterStateNotStarted:
97 return;
98 }
Jorge Canizales142acc92015-05-15 18:43:34 -070099 }
100}
101
102- (void)startWithWriteable:(id<GRXWriteable>)writeable {
Muxi Yan08fef092017-06-26 12:19:27 -0700103 self.writeable = writeable;
Muxi Yanc05d1b42017-06-26 17:25:01 -0700104 _state = GRXWriterStateStarted;
105 dispatch_resume(_writeQueue);
Jorge Canizales142acc92015-05-15 18:43:34 -0700106}
107
108- (void)finishWithError:(NSError *)errorOrNil {
Muxi Yand6cee152017-06-26 17:09:09 -0700109 [self.writeable writesFinishedWithError:errorOrNil];
Jorge Canizales142acc92015-05-15 18:43:34 -0700110 self.state = GRXWriterStateFinished;
Jorge Canizales142acc92015-05-15 18:43:34 -0700111}
112
Muxi Yan860b1da2017-07-29 12:05:19 -0700113- (void)dealloc {
114 GRXWriterState state = self.state;
115 if (state == GRXWriterStateNotStarted ||
116 state == GRXWriterStatePaused) {
117 dispatch_resume(_writeQueue);
118 }
119}
120
Jorge Canizales142acc92015-05-15 18:43:34 -0700121@end