Jorge Canizales | 9409ad8 | 2015-02-18 16:19:56 -0800 | [diff] [blame] | 1 | /* |
| 2 | * |
Jan Tattermusch | 7897ae9 | 2017-06-07 22:57:36 +0200 | [diff] [blame] | 3 | * Copyright 2015 gRPC authors. |
Jorge Canizales | 9409ad8 | 2015-02-18 16:19:56 -0800 | [diff] [blame] | 4 | * |
Jan Tattermusch | 7897ae9 | 2017-06-07 22:57:36 +0200 | [diff] [blame] | 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
Jorge Canizales | 9409ad8 | 2015-02-18 16:19:56 -0800 | [diff] [blame] | 8 | * |
Jan Tattermusch | 7897ae9 | 2017-06-07 22:57:36 +0200 | [diff] [blame] | 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
Jorge Canizales | 9409ad8 | 2015-02-18 16:19:56 -0800 | [diff] [blame] | 10 | * |
Jan Tattermusch | 7897ae9 | 2017-06-07 22:57:36 +0200 | [diff] [blame] | 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
Jorge Canizales | 9409ad8 | 2015-02-18 16:19:56 -0800 | [diff] [blame] | 16 | * |
| 17 | */ |
| 18 | |
Jorge Canizales | 35f003b | 2015-07-17 21:14:36 -0700 | [diff] [blame] | 19 | #import "GRXConcurrentWriteable.h" |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 20 | |
Jorge Canizales | 3936ed7 | 2015-06-21 14:43:56 -0700 | [diff] [blame] | 21 | #import <RxLibrary/GRXWriteable.h> |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 22 | |
Jorge Canizales | 35f003b | 2015-07-17 21:14:36 -0700 | [diff] [blame] | 23 | @interface GRXConcurrentWriteable () |
Jorge Canizales | 6531b2b | 2015-07-18 00:19:14 -0700 | [diff] [blame] | 24 | // This is atomic so that cancellation can nillify it from any thread. |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 25 | @property(atomic, strong) id<GRXWriteable> writeable; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 26 | @end |
| 27 | |
Jorge Canizales | 35f003b | 2015-07-17 21:14:36 -0700 | [diff] [blame] | 28 | @implementation GRXConcurrentWriteable { |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 29 | dispatch_queue_t _writeableQueue; |
Jorge Canizales | b2c300c | 2015-05-18 17:19:16 -0700 | [diff] [blame] | 30 | // This ensures that writesFinishedWithError: is only sent once to the writeable. |
Muxi Yan | 4e1b26e | 2017-08-18 11:03:35 -0700 | [diff] [blame] | 31 | BOOL _alreadyFinished; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 32 | } |
| 33 | |
| 34 | - (instancetype)init { |
Jorge Canizales | 6531b2b | 2015-07-18 00:19:14 -0700 | [diff] [blame] | 35 | return [self initWithWriteable:nil]; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 36 | } |
| 37 | |
| 38 | // Designated initializer |
Muxi Yan | 895f3d8 | 2017-04-05 13:12:30 -0700 | [diff] [blame] | 39 | - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable |
| 40 | dispatchQueue:(dispatch_queue_t)queue { |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 41 | if (self = [super init]) { |
Muxi Yan | 895f3d8 | 2017-04-05 13:12:30 -0700 | [diff] [blame] | 42 | _writeableQueue = queue; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 43 | _writeable = writeable; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 44 | } |
| 45 | return self; |
| 46 | } |
| 47 | |
Muxi Yan | 895f3d8 | 2017-04-05 13:12:30 -0700 | [diff] [blame] | 48 | - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { |
| 49 | return [self initWithWriteable:writeable |
| 50 | dispatchQueue:dispatch_get_main_queue()]; |
| 51 | } |
| 52 | |
Muxi Yan | 0c0ebc5 | 2017-10-19 18:41:01 -0700 | [diff] [blame] | 53 | - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 54 | dispatch_async(_writeableQueue, ^{ |
Jorge Canizales | 6531b2b | 2015-07-18 00:19:14 -0700 | [diff] [blame] | 55 | // We're racing a possible cancellation performed by another thread. To turn all already- |
| 56 | // enqueued messages into noops, cancellation nillifies the writeable property. If we get it |
| 57 | // before it's nil, we won the race. |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 58 | id<GRXWriteable> writeable = self.writeable; |
| 59 | if (writeable) { |
Jorge Canizales | 4c6f778 | 2015-07-17 23:13:36 -0700 | [diff] [blame] | 60 | [writeable writeValue:value]; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 61 | handler(); |
| 62 | } |
| 63 | }); |
| 64 | } |
| 65 | |
| 66 | - (void)enqueueSuccessfulCompletion { |
Muxi Yan | d65458d | 2018-02-05 09:52:33 -0800 | [diff] [blame] | 67 | __weak typeof(self) weakSelf = self; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 68 | dispatch_async(_writeableQueue, ^{ |
Muxi Yan | 53a295b | 2018-02-05 10:16:15 -0800 | [diff] [blame] | 69 | typeof(self) strongSelf = weakSelf; |
Muxi Yan | d65458d | 2018-02-05 09:52:33 -0800 | [diff] [blame] | 70 | if (strongSelf) { |
| 71 | BOOL finished = NO; |
| 72 | @synchronized (self) { |
| 73 | if (!strongSelf->_alreadyFinished) { |
| 74 | strongSelf->_alreadyFinished = YES; |
| 75 | } else { |
| 76 | finished = YES; |
| 77 | } |
Muxi Yan | 4e1b26e | 2017-08-18 11:03:35 -0700 | [diff] [blame] | 78 | } |
Muxi Yan | d65458d | 2018-02-05 09:52:33 -0800 | [diff] [blame] | 79 | if (!finished) { |
| 80 | // Cancellation is now impossible. None of the other three blocks can run concurrently with |
| 81 | // this one. |
| 82 | [self.writeable writesFinishedWithError:nil]; |
| 83 | // Skip any possible message to the wrapped writeable enqueued after this one. |
| 84 | self.writeable = nil; |
| 85 | } |
Muxi Yan | 4e1b26e | 2017-08-18 11:03:35 -0700 | [diff] [blame] | 86 | } |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 87 | }); |
| 88 | } |
| 89 | |
| 90 | - (void)cancelWithError:(NSError *)error { |
| 91 | NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); |
Muxi Yan | 4e1b26e | 2017-08-18 11:03:35 -0700 | [diff] [blame] | 92 | BOOL finished = NO; |
| 93 | @synchronized (self) { |
| 94 | if (!_alreadyFinished) { |
| 95 | _alreadyFinished = YES; |
| 96 | } else { |
| 97 | finished = YES; |
| 98 | } |
| 99 | } |
| 100 | if (!finished) { |
Jorge Canizales | 6531b2b | 2015-07-18 00:19:14 -0700 | [diff] [blame] | 101 | // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to |
| 102 | // nillify writeable because we might be running concurrently with the blocks in |
| 103 | // _writeableQueue, and assignment with ARC isn't atomic. |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 104 | id<GRXWriteable> writeable = self.writeable; |
| 105 | self.writeable = nil; |
| 106 | |
| 107 | dispatch_async(_writeableQueue, ^{ |
Jorge Canizales | b2c300c | 2015-05-18 17:19:16 -0700 | [diff] [blame] | 108 | [writeable writesFinishedWithError:error]; |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 109 | }); |
Muxi Yan | 4e1b26e | 2017-08-18 11:03:35 -0700 | [diff] [blame] | 110 | } |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 111 | } |
| 112 | |
| 113 | - (void)cancelSilently { |
Muxi Yan | 4e1b26e | 2017-08-18 11:03:35 -0700 | [diff] [blame] | 114 | BOOL finished = NO; |
| 115 | @synchronized (self) { |
| 116 | if (!_alreadyFinished) { |
| 117 | _alreadyFinished = YES; |
| 118 | } else { |
| 119 | finished = YES; |
| 120 | } |
| 121 | } |
| 122 | if (!finished) { |
Jorge Canizales | 6531b2b | 2015-07-18 00:19:14 -0700 | [diff] [blame] | 123 | // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to |
| 124 | // nillify writeable because we might be running concurrently with the blocks in |
| 125 | // _writeableQueue, and assignment with ARC isn't atomic. |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 126 | self.writeable = nil; |
Muxi Yan | 4e1b26e | 2017-08-18 11:03:35 -0700 | [diff] [blame] | 127 | } |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 128 | } |
| 129 | @end |