Jorge Canizales | 9409ad8 | 2015-02-18 16:19:56 -0800 | [diff] [blame] | 1 | /* |
| 2 | * |
Yang Gao | 5fc9029 | 2015-02-20 09:46:22 -0800 | [diff] [blame] | 3 | * Copyright 2015, Google Inc. |
Jorge Canizales | 9409ad8 | 2015-02-18 16:19:56 -0800 | [diff] [blame] | 4 | * All rights reserved. |
| 5 | * |
| 6 | * Redistribution and use in source and binary forms, with or without |
| 7 | * modification, are permitted provided that the following conditions are |
| 8 | * met: |
| 9 | * |
| 10 | * * Redistributions of source code must retain the above copyright |
| 11 | * notice, this list of conditions and the following disclaimer. |
| 12 | * * Redistributions in binary form must reproduce the above |
| 13 | * copyright notice, this list of conditions and the following disclaimer |
| 14 | * in the documentation and/or other materials provided with the |
| 15 | * distribution. |
| 16 | * * Neither the name of Google Inc. nor the names of its |
| 17 | * contributors may be used to endorse or promote products derived from |
| 18 | * this software without specific prior written permission. |
| 19 | * |
| 20 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 | * |
| 32 | */ |
| 33 | |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 34 | #import "GRPCCall.h" |
| 35 | |
Jorge Canizales | c2d7ecb | 2015-02-27 01:22:41 -0800 | [diff] [blame^] | 36 | #include <grpc/grpc.h> |
| 37 | #include <grpc/support/grpc_time.h> |
Jorge Canizales | 5e0efd9 | 2015-02-17 18:23:58 -0800 | [diff] [blame] | 38 | |
| 39 | #import "GRPCMethodName.h" |
| 40 | #import "private/GRPCChannel.h" |
| 41 | #import "private/GRPCCompletionQueue.h" |
| 42 | #import "private/GRPCDelegateWrapper.h" |
| 43 | #import "private/GRPCMethodName+HTTP2Encoding.h" |
| 44 | #import "private/NSData+GRPC.h" |
| 45 | #import "private/NSDictionary+GRPC.h" |
| 46 | #import "private/NSError+GRPC.h" |
| 47 | |
| 48 | // A grpc_call_error represents a precondition failure when invoking the |
| 49 | // grpc_call_* functions. If one ever happens, it's a bug in this library. |
| 50 | // |
| 51 | // TODO(jcanizales): Can an application shut down gracefully when a thread other |
| 52 | // than the main one throws an exception? |
| 53 | static void AssertNoErrorInCall(grpc_call_error error) { |
| 54 | if (error != GRPC_CALL_OK) { |
| 55 | @throw [NSException exceptionWithName:NSInternalInconsistencyException |
| 56 | reason:@"Precondition of grpc_call_* not met." |
| 57 | userInfo:nil]; |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | @interface GRPCCall () <GRXWriteable> |
| 62 | // Makes it readwrite. |
| 63 | @property(atomic, strong) NSDictionary *responseMetadata; |
| 64 | @end |
| 65 | |
| 66 | // The following methods of a C gRPC call object aren't reentrant, and thus |
| 67 | // calls to them must be serialized: |
| 68 | // - add_metadata |
| 69 | // - invoke |
| 70 | // - start_write |
| 71 | // - writes_done |
| 72 | // - start_read |
| 73 | // - destroy |
| 74 | // The first four are called as part of responding to client commands, but |
| 75 | // start_read we want to call as soon as we're notified that the RPC was |
| 76 | // successfully established (which happens concurrently in the network queue). |
| 77 | // Serialization is achieved by using a private serial queue to operate the |
| 78 | // call object. |
| 79 | // Because add_metadata and invoke are called and return successfully before |
| 80 | // any of the other methods is called, they don't need to use the queue. |
| 81 | // |
| 82 | // Furthermore, start_write and writes_done can only be called after the |
| 83 | // WRITE_ACCEPTED event for any previous write is received. This is achieved by |
| 84 | // pausing the requests writer immediately every time it writes a value, and |
| 85 | // resuming it again when WRITE_ACCEPTED is received. |
| 86 | // |
| 87 | // Similarly, start_read can only be called after the READ event for any |
| 88 | // previous read is received. This is easier to enforce, as we're writing the |
| 89 | // received messages into the writeable: start_read is enqueued once upon receiving |
| 90 | // the CLIENT_METADATA_READ event, and then once after receiving each READ |
| 91 | // event. |
| 92 | @implementation GRPCCall { |
| 93 | dispatch_queue_t _callQueue; |
| 94 | |
| 95 | grpc_call *_gRPCCall; |
| 96 | dispatch_once_t _callAlreadyInvoked; |
| 97 | |
| 98 | GRPCChannel *_channel; |
| 99 | GRPCCompletionQueue *_completionQueue; |
| 100 | |
| 101 | // The C gRPC library has less guarantees on the ordering of events than we |
| 102 | // do. Particularly, in the face of errors, there's no ordering guarantee at |
| 103 | // all. This wrapper over our actual writeable ensures thread-safety and |
| 104 | // correct ordering. |
| 105 | GRPCDelegateWrapper *_responseWriteable; |
| 106 | id<GRXWriter> _requestWriter; |
| 107 | } |
| 108 | |
| 109 | @synthesize state = _state; |
| 110 | |
| 111 | - (instancetype)init { |
| 112 | return [self initWithHost:nil method:nil requestsWriter:nil]; |
| 113 | } |
| 114 | |
| 115 | // Designated initializer |
| 116 | - (instancetype)initWithHost:(NSString *)host |
| 117 | method:(GRPCMethodName *)method |
| 118 | requestsWriter:(id<GRXWriter>)requestWriter { |
| 119 | if (!host || !method) { |
| 120 | [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."]; |
| 121 | } |
| 122 | // TODO(jcanizales): Throw if the requestWriter was already started. |
| 123 | if ((self = [super init])) { |
| 124 | static dispatch_once_t initialization; |
| 125 | dispatch_once(&initialization, ^{ |
| 126 | grpc_init(); |
| 127 | }); |
| 128 | |
| 129 | _completionQueue = [GRPCCompletionQueue completionQueue]; |
| 130 | |
| 131 | _channel = [GRPCChannel channelToHost:host]; |
| 132 | _gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel, |
| 133 | method.HTTP2Path.UTF8String, |
| 134 | host.UTF8String, |
| 135 | gpr_inf_future); |
| 136 | |
| 137 | // Serial queue to invoke the non-reentrant methods of the grpc_call object. |
| 138 | _callQueue = dispatch_queue_create("org.grpc.call", NULL); |
| 139 | |
| 140 | _requestWriter = requestWriter; |
| 141 | } |
| 142 | return self; |
| 143 | } |
| 144 | |
| 145 | #pragma mark Finish |
| 146 | |
| 147 | - (void)finishWithError:(NSError *)errorOrNil { |
| 148 | _requestWriter.state = GRXWriterStateFinished; |
| 149 | _requestWriter = nil; |
| 150 | if (errorOrNil) { |
| 151 | [_responseWriteable cancelWithError:errorOrNil]; |
| 152 | } else { |
| 153 | [_responseWriteable enqueueSuccessfulCompletion]; |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | - (void)cancelCall { |
| 158 | // Can be called from any thread, any number of times. |
| 159 | AssertNoErrorInCall(grpc_call_cancel(_gRPCCall)); |
| 160 | } |
| 161 | |
| 162 | - (void)cancel { |
| 163 | [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
| 164 | code:GRPCErrorCodeCancelled |
| 165 | userInfo:nil]]; |
| 166 | [self cancelCall]; |
| 167 | } |
| 168 | |
| 169 | - (void)dealloc { |
| 170 | grpc_call *gRPCCall = _gRPCCall; |
| 171 | dispatch_async(_callQueue, ^{ |
| 172 | grpc_call_destroy(gRPCCall); |
| 173 | }); |
| 174 | } |
| 175 | |
| 176 | #pragma mark Read messages |
| 177 | |
| 178 | // Only called from the call queue. |
| 179 | // The handler will be called from the network queue. |
| 180 | - (void)startReadWithHandler:(GRPCEventHandler)handler { |
| 181 | AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler)); |
| 182 | } |
| 183 | |
| 184 | // Called initially from the network queue once response headers are received, |
| 185 | // then "recursively" from the responseWriteable queue after each response from the |
| 186 | // server has been written. |
| 187 | // If the call is currently paused, this is a noop. Restarting the call will invoke this |
| 188 | // method. |
| 189 | // TODO(jcanizales): Rename to readResponseIfNotPaused. |
| 190 | - (void)startNextRead { |
| 191 | if (self.state == GRXWriterStatePaused) { |
| 192 | return; |
| 193 | } |
| 194 | __weak GRPCCall *weakSelf = self; |
| 195 | __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable; |
| 196 | |
| 197 | dispatch_async(_callQueue, ^{ |
| 198 | [weakSelf startReadWithHandler:^(grpc_event *event) { |
| 199 | if (!event->data.read) { |
| 200 | // No more responses from the server. |
| 201 | return; |
| 202 | } |
| 203 | NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read]; |
| 204 | if (!data) { |
| 205 | // The app doesn't have enough memory to hold the server response. We |
| 206 | // don't want to throw, because the app shouldn't crash for a behavior |
| 207 | // that's on the hands of any server to have. Instead we finish and ask |
| 208 | // the server to cancel. |
| 209 | // |
| 210 | // TODO(jcanizales): No canonical code is appropriate for this situation |
| 211 | // (because it's just a client problem). Use another domain and an |
| 212 | // appropriately-documented code. |
| 213 | [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
| 214 | code:GRPCErrorCodeInternal |
| 215 | userInfo:nil]]; |
| 216 | [weakSelf cancelCall]; |
| 217 | return; |
| 218 | } |
| 219 | [weakWriteable enqueueMessage:data completionHandler:^{ |
| 220 | [weakSelf startNextRead]; |
| 221 | }]; |
| 222 | }]; |
| 223 | }); |
| 224 | } |
| 225 | |
| 226 | #pragma mark Send headers |
| 227 | |
| 228 | - (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value { |
| 229 | grpc_metadata metadata; |
| 230 | // Safe to discard const qualifiers; we're not going to modify the contents. |
| 231 | metadata.key = (char *)name.UTF8String; |
| 232 | metadata.value = (char *)value.bytes; |
| 233 | metadata.value_length = value.length; |
| 234 | grpc_call_add_metadata_old(_gRPCCall, &metadata, 0); |
| 235 | } |
| 236 | |
| 237 | - (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value { |
| 238 | grpc_metadata metadata; |
| 239 | // Safe to discard const qualifiers; we're not going to modify the contents. |
| 240 | metadata.key = (char *)name.UTF8String; |
| 241 | metadata.value = (char *)value.UTF8String; |
| 242 | // The trailing \0 isn't encoded in HTTP2. |
| 243 | metadata.value_length = value.length; |
| 244 | grpc_call_add_metadata_old(_gRPCCall, &metadata, 0); |
| 245 | } |
| 246 | |
| 247 | // TODO(jcanizales): Rename to commitHeaders. |
| 248 | - (void)sendHeaders:(NSDictionary *)metadata { |
| 249 | for (NSString *name in metadata) { |
| 250 | id value = metadata[name]; |
| 251 | if ([value isKindOfClass:[NSData class]]) { |
| 252 | [self addHeaderWithName:name binaryValue:value]; |
| 253 | } else if ([value isKindOfClass:[NSString class]]) { |
| 254 | [self addHeaderWithName:name ASCIIValue:value]; |
| 255 | } |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | #pragma mark GRXWriteable implementation |
| 260 | |
| 261 | // Only called from the call queue. The error handler will be called from the |
| 262 | // network queue if the write didn't succeed. |
| 263 | - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler { |
| 264 | |
| 265 | __weak GRPCCall *weakSelf = self; |
| 266 | GRPCEventHandler resumingHandler = ^(grpc_event *event) { |
| 267 | if (event->data.write_accepted != GRPC_OP_OK) { |
| 268 | errorHandler(); |
| 269 | } |
| 270 | // Resume the request writer (even in the case of error). |
| 271 | // TODO(jcanizales): No need to do it in the case of errors anymore? |
| 272 | GRPCCall *strongSelf = weakSelf; |
| 273 | if (strongSelf) { |
| 274 | strongSelf->_requestWriter.state = GRXWriterStateStarted; |
| 275 | } |
| 276 | }; |
| 277 | |
| 278 | grpc_byte_buffer *buffer = message.grpc_byteBuffer; |
| 279 | AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall, |
| 280 | buffer, |
| 281 | (__bridge_retained void *)resumingHandler, |
| 282 | 0)); |
| 283 | grpc_byte_buffer_destroy(buffer); |
| 284 | } |
| 285 | |
| 286 | - (void)didReceiveValue:(id)value { |
| 287 | // TODO(jcanizales): Throw/assert if value isn't NSData. |
| 288 | |
| 289 | // Pause the input and only resume it when the C layer notifies us that writes |
| 290 | // can proceed. |
| 291 | _requestWriter.state = GRXWriterStatePaused; |
| 292 | |
| 293 | __weak GRPCCall *weakSelf = self; |
| 294 | dispatch_async(_callQueue, ^{ |
| 295 | [weakSelf writeMessage:value withErrorHandler:^{ |
| 296 | [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
| 297 | code:GRPCErrorCodeInternal |
| 298 | userInfo:nil]]; |
| 299 | }]; |
| 300 | }); |
| 301 | } |
| 302 | |
| 303 | // Only called from the call queue. The error handler will be called from the |
| 304 | // network queue if the requests stream couldn't be closed successfully. |
| 305 | - (void)finishRequestWithErrorHandler:(void (^)())errorHandler { |
| 306 | GRPCEventHandler handler = ^(grpc_event *event) { |
| 307 | if (event->data.finish_accepted != GRPC_OP_OK) { |
| 308 | errorHandler(); |
| 309 | } |
| 310 | }; |
| 311 | AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler)); |
| 312 | } |
| 313 | |
| 314 | - (void)didFinishWithError:(NSError *)errorOrNil { |
| 315 | if (errorOrNil) { |
| 316 | [self cancel]; |
| 317 | } else { |
| 318 | __weak GRPCCall *weakSelf = self; |
| 319 | dispatch_async(_callQueue, ^{ |
| 320 | [weakSelf finishRequestWithErrorHandler:^{ |
| 321 | [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
| 322 | code:GRPCErrorCodeInternal |
| 323 | userInfo:nil]]; |
| 324 | }]; |
| 325 | }); |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | #pragma mark Invoke |
| 330 | |
| 331 | // Both handlers will eventually be called, from the network queue. Writes can start immediately |
| 332 | // after this. |
| 333 | // The first one (metadataHandler), when the response headers are received. |
| 334 | // The second one (completionHandler), whenever the RPC finishes for any reason. |
| 335 | - (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler |
| 336 | completionHandler:(GRPCEventHandler)completionHandler { |
| 337 | AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall, |
| 338 | _completionQueue.unmanagedQueue, |
| 339 | (__bridge_retained void *)metadataHandler, |
| 340 | (__bridge_retained void *)completionHandler, |
| 341 | 0)); |
| 342 | } |
| 343 | |
| 344 | - (void)invokeCall { |
| 345 | __weak GRPCCall *weakSelf = self; |
| 346 | [self invokeCallWithMetadataHandler:^(grpc_event *event) { |
| 347 | // Response metadata received. |
| 348 | // TODO(jcanizales): Name the type of event->data.client_metadata_read |
| 349 | // in the C library so one can actually pass the object to a method. |
| 350 | grpc_metadata *entries = event->data.client_metadata_read.elements; |
| 351 | size_t count = event->data.client_metadata_read.count; |
| 352 | GRPCCall *strongSelf = weakSelf; |
| 353 | if (strongSelf) { |
| 354 | strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries |
| 355 | count:count]; |
| 356 | [strongSelf startNextRead]; |
| 357 | } |
| 358 | } completionHandler:^(grpc_event *event) { |
| 359 | // TODO(jcanizales): Merge HTTP2 trailers into response metadata. |
| 360 | [weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]]; |
| 361 | }]; |
| 362 | // Now that the RPC has been initiated, request writes can start. |
| 363 | [_requestWriter startWithWriteable:self]; |
| 364 | } |
| 365 | |
| 366 | #pragma mark GRXWriter implementation |
| 367 | |
| 368 | - (void)startWithWriteable:(id<GRXWriteable>)writeable { |
| 369 | // The following produces a retain cycle self:_responseWriteable:self, which is only |
| 370 | // broken when didFinishWithError: is sent to the wrapped writeable. |
| 371 | // Care is taken not to retain self strongly in any of the blocks used in |
| 372 | // the implementation of GRPCCall, so that the life of the instance is |
| 373 | // determined by this retain cycle. |
| 374 | _responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self]; |
| 375 | [self sendHeaders:_requestMetadata]; |
| 376 | [self invokeCall]; |
| 377 | } |
| 378 | |
| 379 | - (void)setState:(GRXWriterState)newState { |
| 380 | // Manual transitions are only allowed from the started or paused states. |
| 381 | if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { |
| 382 | return; |
| 383 | } |
| 384 | |
| 385 | switch (newState) { |
| 386 | case GRXWriterStateFinished: |
| 387 | _state = newState; |
| 388 | // Per GRXWriter's contract, setting the state to Finished manually |
| 389 | // means one doesn't wish the writeable to be messaged anymore. |
| 390 | [_responseWriteable cancelSilently]; |
| 391 | _responseWriteable = nil; |
| 392 | return; |
| 393 | case GRXWriterStatePaused: |
| 394 | _state = newState; |
| 395 | return; |
| 396 | case GRXWriterStateStarted: |
| 397 | if (_state == GRXWriterStatePaused) { |
| 398 | _state = newState; |
| 399 | [self startNextRead]; |
| 400 | } |
| 401 | return; |
| 402 | case GRXWriterStateNotStarted: |
| 403 | return; |
| 404 | } |
| 405 | } |
| 406 | @end |