blob: c26231349605821864dd5709fd8039978884692a [file] [log] [blame]
Jorge Canizales9409ad82015-02-18 16:19:56 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Jorge Canizales9409ad82015-02-18 16:19:56 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Jorge Canizales9409ad82015-02-18 16:19:56 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Jorge Canizales9409ad82015-02-18 16:19:56 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Jorge Canizales9409ad82015-02-18 16:19:56 -080016 *
17 */
18
Jorge Canizales35f003b2015-07-17 21:14:36 -070019#import "GRXConcurrentWriteable.h"
Jorge Canizales5e0efd92015-02-17 18:23:58 -080020
Jorge Canizales3936ed72015-06-21 14:43:56 -070021#import <RxLibrary/GRXWriteable.h>
Jorge Canizales5e0efd92015-02-17 18:23:58 -080022
Jorge Canizales35f003b2015-07-17 21:14:36 -070023@interface GRXConcurrentWriteable ()
Jorge Canizales6531b2b2015-07-18 00:19:14 -070024// This is atomic so that cancellation can nillify it from any thread.
Jorge Canizales5e0efd92015-02-17 18:23:58 -080025@property(atomic, strong) id<GRXWriteable> writeable;
Jorge Canizales5e0efd92015-02-17 18:23:58 -080026@end
27
Jorge Canizales35f003b2015-07-17 21:14:36 -070028@implementation GRXConcurrentWriteable {
Jorge Canizales5e0efd92015-02-17 18:23:58 -080029 dispatch_queue_t _writeableQueue;
Jorge Canizalesb2c300c2015-05-18 17:19:16 -070030 // This ensures that writesFinishedWithError: is only sent once to the writeable.
Muxi Yan4e1b26e2017-08-18 11:03:35 -070031 BOOL _alreadyFinished;
Jorge Canizales5e0efd92015-02-17 18:23:58 -080032}
33
34- (instancetype)init {
Jorge Canizales6531b2b2015-07-18 00:19:14 -070035 return [self initWithWriteable:nil];
Jorge Canizales5e0efd92015-02-17 18:23:58 -080036}
37
38// Designated initializer
Muxi Yan895f3d82017-04-05 13:12:30 -070039- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
40 dispatchQueue:(dispatch_queue_t)queue {
Jorge Canizales5e0efd92015-02-17 18:23:58 -080041 if (self = [super init]) {
Muxi Yan895f3d82017-04-05 13:12:30 -070042 _writeableQueue = queue;
Jorge Canizales5e0efd92015-02-17 18:23:58 -080043 _writeable = writeable;
Jorge Canizales5e0efd92015-02-17 18:23:58 -080044 }
45 return self;
46}
47
Muxi Yan895f3d82017-04-05 13:12:30 -070048- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
49 return [self initWithWriteable:writeable
50 dispatchQueue:dispatch_get_main_queue()];
51}
52
Muxi Yan0c0ebc52017-10-19 18:41:01 -070053- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
Jorge Canizales5e0efd92015-02-17 18:23:58 -080054 dispatch_async(_writeableQueue, ^{
Jorge Canizales6531b2b2015-07-18 00:19:14 -070055 // We're racing a possible cancellation performed by another thread. To turn all already-
56 // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
57 // before it's nil, we won the race.
Jorge Canizales5e0efd92015-02-17 18:23:58 -080058 id<GRXWriteable> writeable = self.writeable;
59 if (writeable) {
Jorge Canizales4c6f7782015-07-17 23:13:36 -070060 [writeable writeValue:value];
Jorge Canizales5e0efd92015-02-17 18:23:58 -080061 handler();
62 }
63 });
64}
65
66- (void)enqueueSuccessfulCompletion {
Muxi Yand65458d2018-02-05 09:52:33 -080067 __weak typeof(self) weakSelf = self;
Jorge Canizales5e0efd92015-02-17 18:23:58 -080068 dispatch_async(_writeableQueue, ^{
Muxi Yan53a295b2018-02-05 10:16:15 -080069 typeof(self) strongSelf = weakSelf;
Muxi Yand65458d2018-02-05 09:52:33 -080070 if (strongSelf) {
71 BOOL finished = NO;
72 @synchronized (self) {
73 if (!strongSelf->_alreadyFinished) {
74 strongSelf->_alreadyFinished = YES;
75 } else {
76 finished = YES;
77 }
Muxi Yan4e1b26e2017-08-18 11:03:35 -070078 }
Muxi Yand65458d2018-02-05 09:52:33 -080079 if (!finished) {
80 // Cancellation is now impossible. None of the other three blocks can run concurrently with
81 // this one.
82 [self.writeable writesFinishedWithError:nil];
83 // Skip any possible message to the wrapped writeable enqueued after this one.
84 self.writeable = nil;
85 }
Muxi Yan4e1b26e2017-08-18 11:03:35 -070086 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -080087 });
88}
89
90- (void)cancelWithError:(NSError *)error {
91 NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
Muxi Yan4e1b26e2017-08-18 11:03:35 -070092 BOOL finished = NO;
93 @synchronized (self) {
94 if (!_alreadyFinished) {
95 _alreadyFinished = YES;
96 } else {
97 finished = YES;
98 }
99 }
100 if (!finished) {
Jorge Canizales6531b2b2015-07-18 00:19:14 -0700101 // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
102 // nillify writeable because we might be running concurrently with the blocks in
103 // _writeableQueue, and assignment with ARC isn't atomic.
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800104 id<GRXWriteable> writeable = self.writeable;
105 self.writeable = nil;
106
107 dispatch_async(_writeableQueue, ^{
Jorge Canizalesb2c300c2015-05-18 17:19:16 -0700108 [writeable writesFinishedWithError:error];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800109 });
Muxi Yan4e1b26e2017-08-18 11:03:35 -0700110 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800111}
112
113- (void)cancelSilently {
Muxi Yan4e1b26e2017-08-18 11:03:35 -0700114 BOOL finished = NO;
115 @synchronized (self) {
116 if (!_alreadyFinished) {
117 _alreadyFinished = YES;
118 } else {
119 finished = YES;
120 }
121 }
122 if (!finished) {
Jorge Canizales6531b2b2015-07-18 00:19:14 -0700123 // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
124 // nillify writeable because we might be running concurrently with the blocks in
125 // _writeableQueue, and assignment with ARC isn't atomic.
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800126 self.writeable = nil;
Muxi Yan4e1b26e2017-08-18 11:03:35 -0700127 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800128}
129@end