blob: eb02c21a7aafe54cc6d01d866007cc5672e925d6 [file] [log] [blame]
Jorge Canizales9409ad82015-02-18 16:19:56 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Jorge Canizales9409ad82015-02-18 16:19:56 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Jorge Canizales9409ad82015-02-18 16:19:56 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Jorge Canizales9409ad82015-02-18 16:19:56 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Jorge Canizales9409ad82015-02-18 16:19:56 -080016 *
17 */
18
Jorge Canizales5e0efd92015-02-17 18:23:58 -080019#import "GRPCCall.h"
20
Muxi Yanb82f82b2017-08-14 12:01:14 -070021#import "GRPCCall+OAuth2.h"
22
Jorge Canizalesc2d7ecb2015-02-27 01:22:41 -080023#include <grpc/grpc.h>
Jorge Canizales59bb9d72015-06-22 19:04:15 -070024#include <grpc/support/time.h>
Muxi Yan61274ca2016-10-28 12:09:59 -070025#import <RxLibrary/GRXConcurrentWriteable.h>
Muxi Yana40ccd82016-11-05 21:39:44 -070026#import <RxLibrary/GRXImmediateSingleWriter.h>
Jorge Canizales5e0efd92015-02-17 18:23:58 -080027
Jorge Canizalesf1d084a2015-10-30 11:14:09 -070028#import "private/GRPCConnectivityMonitor.h"
29#import "private/GRPCHost.h"
Jorge Canizales2f101272015-09-02 21:55:37 -070030#import "private/GRPCRequestHeaders.h"
murgatroid9969927d62015-04-24 13:32:48 -070031#import "private/GRPCWrappedCall.h"
Jorge Canizales5e0efd92015-02-17 18:23:58 -080032#import "private/NSData+GRPC.h"
33#import "private/NSDictionary+GRPC.h"
34#import "private/NSError+GRPC.h"
35
Muxi Yan016d1082017-03-07 18:26:42 -080036// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
37// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
38// and RECV_STATUS_ON_CLIENT.
39NSInteger kMaxClientBatch = 6;
40
Muxi Yan61274ca2016-10-28 12:09:59 -070041NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
42NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
Muxi Yan22f79732016-09-30 14:24:56 -070043static NSMutableDictionary *callFlags;
Jorge Canizalesf3a4f2c2015-06-12 22:12:50 -070044
Muxi Yanb82f82b2017-08-14 12:01:14 -070045static NSString * const kAuthorizationHeader = @"authorization";
46static NSString * const kBearerPrefix = @"Bearer ";
47
Muxi Yan61274ca2016-10-28 12:09:59 -070048@interface GRPCCall () <GRXWriteable>
Jorge Canizales0b34c892015-08-12 20:19:20 -070049// Make them read-write.
50@property(atomic, strong) NSDictionary *responseHeaders;
51@property(atomic, strong) NSDictionary *responseTrailers;
Muxi Yanb82f82b2017-08-14 12:01:14 -070052@property(atomic) BOOL isWaitingForToken;
Jorge Canizales5e0efd92015-02-17 18:23:58 -080053@end
54
55// The following methods of a C gRPC call object aren't reentrant, and thus
56// calls to them must be serialized:
murgatroid99b5c076f2015-04-27 17:25:36 -070057// - start_batch
Jorge Canizales5e0efd92015-02-17 18:23:58 -080058// - destroy
Jorge Canizales5e0efd92015-02-17 18:23:58 -080059//
murgatroid99b5c076f2015-04-27 17:25:36 -070060// start_batch with a SEND_MESSAGE argument can only be called after the
61// OP_COMPLETE event for any previous write is received. This is achieved by
Jorge Canizales5e0efd92015-02-17 18:23:58 -080062// pausing the requests writer immediately every time it writes a value, and
murgatroid99b5c076f2015-04-27 17:25:36 -070063// resuming it again when OP_COMPLETE is received.
Jorge Canizales5e0efd92015-02-17 18:23:58 -080064//
murgatroid99b5c076f2015-04-27 17:25:36 -070065// Similarly, start_batch with a RECV_MESSAGE argument can only be called after
66// the OP_COMPLETE event for any previous read is received.This is easier to
67// enforce, as we're writing the received messages into the writeable:
68// start_batch is enqueued once upon receiving the OP_COMPLETE event for the
69// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
70// each RECV_MESSAGE batch.
Jorge Canizales5e0efd92015-02-17 18:23:58 -080071@implementation GRPCCall {
72 dispatch_queue_t _callQueue;
73
Jorge Canizalesf1d084a2015-10-30 11:14:09 -070074 NSString *_host;
75 NSString *_path;
murgatroid9930b7d4e2015-04-24 10:36:43 -070076 GRPCWrappedCall *_wrappedCall;
Jorge Canizalesf1d084a2015-10-30 11:14:09 -070077 GRPCConnectivityMonitor *_connectivityMonitor;
Jorge Canizales5e0efd92015-02-17 18:23:58 -080078
Jorge Canizales5e0efd92015-02-17 18:23:58 -080079 // The C gRPC library has less guarantees on the ordering of events than we
80 // do. Particularly, in the face of errors, there's no ordering guarantee at
81 // all. This wrapper over our actual writeable ensures thread-safety and
82 // correct ordering.
Jorge Canizales35f003b2015-07-17 21:14:36 -070083 GRXConcurrentWriteable *_responseWriteable;
Jorge Canizales238ad782015-08-07 23:11:29 -070084
Muxi Yan61274ca2016-10-28 12:09:59 -070085 // The network thread wants the requestWriter to resume (when the server is ready for more input),
86 // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
87 // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
88 // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
89 // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
90 // pause the writer immediately on writeValue:, so we need our locking to be recursive.
Jorge Canizales56047122015-07-17 12:18:08 -070091 GRXWriter *_requestWriter;
Jorge Canizales544963e2015-06-12 19:46:27 -070092
Jorge Canizales6531b2b2015-07-18 00:19:14 -070093 // To create a retain cycle when a call is started, up until it finishes. See
Muxi Yan61274ca2016-10-28 12:09:59 -070094 // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
95 // reference to the call object if all they're interested in is the handler being executed when
Jorge Canizaleseb87b462015-08-08 16:16:43 -070096 // the response arrives.
97 GRPCCall *_retainSelf;
Jorge Canizales6531b2b2015-07-18 00:19:14 -070098
murgatroid9984fa5312015-08-28 10:55:55 -070099 GRPCRequestHeaders *_requestHeaders;
Muxi Yana40ccd82016-11-05 21:39:44 -0700100
Muxi Yanc2e53b52017-03-22 14:30:16 -0700101 // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
102 // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
103 // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
104 // the SendClose op is added.
Muxi Yana40ccd82016-11-05 21:39:44 -0700105 BOOL _unaryCall;
Muxi Yana40ccd82016-11-05 21:39:44 -0700106 NSMutableArray *_unaryOpBatch;
Muxi Yan895f3d82017-04-05 13:12:30 -0700107
108 // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
109 // queue
110 dispatch_queue_t _responseQueue;
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800111}
112
113@synthesize state = _state;
114
Muxi Yan61274ca2016-10-28 12:09:59 -0700115// TODO(jcanizales): If grpc_init is idempotent, this should be changed from load to initialize.
Jorge Canizales7603d642016-08-24 18:23:24 -0700116+ (void)load {
117 grpc_init();
Muxi Yan22f79732016-09-30 14:24:56 -0700118 callFlags = [NSMutableDictionary dictionary];
119}
120
Muxi Yan61274ca2016-10-28 12:09:59 -0700121+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
Muxi Yan6c0b9602016-10-02 14:32:06 -0700122 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
123 switch (callSafety) {
124 case GRPCCallSafetyDefault:
Muxi Yan22f79732016-09-30 14:24:56 -0700125 callFlags[hostAndPath] = @0;
126 break;
Muxi Yan6c0b9602016-10-02 14:32:06 -0700127 case GRPCCallSafetyIdempotentRequest:
Muxi Yan22f79732016-09-30 14:24:56 -0700128 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
129 break;
Muxi Yan6c0b9602016-10-02 14:32:06 -0700130 case GRPCCallSafetyCacheableRequest:
Muxi Yan22f79732016-09-30 14:24:56 -0700131 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
132 break;
133 default:
134 break;
135 }
136}
137
Muxi Yan6c0b9602016-10-02 14:32:06 -0700138+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
139 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
140 return [callFlags[hostAndPath] intValue];
Jorge Canizales7603d642016-08-24 18:23:24 -0700141}
142
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800143- (instancetype)init {
Muxi Yan22f79732016-09-30 14:24:56 -0700144 return [self initWithHost:nil path:nil requestsWriter:nil];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800145}
146
147// Designated initializer
148- (instancetype)initWithHost:(NSString *)host
Jorge Canizalesbe808e32015-07-04 14:37:58 -0700149 path:(NSString *)path
Muxi Yan22f79732016-09-30 14:24:56 -0700150 requestsWriter:(GRXWriter *)requestWriter {
Jorge Canizalesbe808e32015-07-04 14:37:58 -0700151 if (!host || !path) {
Muxi Yan61274ca2016-10-28 12:09:59 -0700152 [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800153 }
Jorge Canizales6bbfcc32015-06-17 14:10:52 -0700154 if (requestWriter.state != GRXWriterStateNotStarted) {
Jorge Canizales597ef982015-07-31 23:31:56 -0700155 [NSException raise:NSInvalidArgumentException
156 format:@"The requests writer can't be already started."];
Jorge Canizales6bbfcc32015-06-17 14:10:52 -0700157 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800158 if ((self = [super init])) {
Jorge Canizalesf1d084a2015-10-30 11:14:09 -0700159 _host = [host copy];
160 _path = [path copy];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800161
162 // Serial queue to invoke the non-reentrant methods of the grpc_call object.
Jorge Canizalesf1d084a2015-10-30 11:14:09 -0700163 _callQueue = dispatch_queue_create("io.grpc.call", NULL);
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800164
165 _requestWriter = requestWriter;
Jorge Canizales544963e2015-06-12 19:46:27 -0700166
murgatroid9984fa5312015-08-28 10:55:55 -0700167 _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
Muxi Yana40ccd82016-11-05 21:39:44 -0700168
169 if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
Muxi Yanbf803b92017-02-08 11:31:26 -0800170 _unaryCall = YES;
Muxi Yan016d1082017-03-07 18:26:42 -0800171 _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
Muxi Yana40ccd82016-11-05 21:39:44 -0700172 }
Muxi Yan895f3d82017-04-05 13:12:30 -0700173
174 _responseQueue = dispatch_get_main_queue();
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800175 }
176 return self;
177}
178
Muxi Yan895f3d82017-04-05 13:12:30 -0700179- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
180 if (_state != GRXWriterStateNotStarted) {
181 return;
182 }
183 _responseQueue = queue;
184}
185
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800186#pragma mark Finish
187
188- (void)finishWithError:(NSError *)errorOrNil {
Jorge Canizales0803bb02016-04-30 10:40:18 -0700189 @synchronized(self) {
190 _state = GRXWriterStateFinished;
191 }
192
Jorge Canizales6531b2b2015-07-18 00:19:14 -0700193 // If the call isn't retained anywhere else, it can be deallocated now.
Jorge Canizaleseb87b462015-08-08 16:16:43 -0700194 _retainSelf = nil;
Jorge Canizales6531b2b2015-07-18 00:19:14 -0700195
196 // If there were still request messages coming, stop them.
Jorge Canizales238ad782015-08-07 23:11:29 -0700197 @synchronized(_requestWriter) {
198 _requestWriter.state = GRXWriterStateFinished;
Jorge Canizales238ad782015-08-07 23:11:29 -0700199 }
Jorge Canizales6531b2b2015-07-18 00:19:14 -0700200
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800201 if (errorOrNil) {
202 [_responseWriteable cancelWithError:errorOrNil];
203 } else {
204 [_responseWriteable enqueueSuccessfulCompletion];
205 }
206}
207
208- (void)cancelCall {
209 // Can be called from any thread, any number of times.
murgatroid99b56609c2015-04-28 16:41:11 -0700210 [_wrappedCall cancel];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800211}
212
213- (void)cancel {
214 [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
215 code:GRPCErrorCodeCancelled
Muxi Yan61274ca2016-10-28 12:09:59 -0700216 userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]];
Muxi Yanb82f82b2017-08-14 12:01:14 -0700217 if (!self.isWaitingForToken) {
218 [self cancelCall];
219 } else {
220 self.isWaitingForToken = NO;
221 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800222}
223
224- (void)dealloc {
murgatroid996cc46802015-04-28 09:35:48 -0700225 __block GRPCWrappedCall *wrappedCall = _wrappedCall;
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800226 dispatch_async(_callQueue, ^{
murgatroid996cc46802015-04-28 09:35:48 -0700227 wrappedCall = nil;
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800228 });
229}
230
231#pragma mark Read messages
232
233// Only called from the call queue.
234// The handler will be called from the network queue.
Muxi Yan61274ca2016-10-28 12:09:59 -0700235- (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler {
murgatroid99ca38ddb2015-04-29 13:16:42 -0700236 // TODO(jcanizales): Add error handlers for async failures
Muxi Yan61274ca2016-10-28 12:09:59 -0700237 [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800238}
239
240// Called initially from the network queue once response headers are received,
Muxi Yan61274ca2016-10-28 12:09:59 -0700241// then "recursively" from the responseWriteable queue after each response from the
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800242// server has been written.
Muxi Yan61274ca2016-10-28 12:09:59 -0700243// If the call is currently paused, this is a noop. Restarting the call will invoke this
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800244// method.
245// TODO(jcanizales): Rename to readResponseIfNotPaused.
246- (void)startNextRead {
247 if (self.state == GRXWriterStatePaused) {
248 return;
249 }
250 __weak GRPCCall *weakSelf = self;
Jorge Canizales35f003b2015-07-17 21:14:36 -0700251 __weak GRXConcurrentWriteable *weakWriteable = _responseWriteable;
murgatroid9969927d62015-04-24 13:32:48 -0700252
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800253 dispatch_async(_callQueue, ^{
murgatroid996cc46802015-04-28 09:35:48 -0700254 [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) {
255 if (message == NULL) {
murgatroid99b56609c2015-04-28 16:41:11 -0700256 // No more messages from the server
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800257 return;
258 }
murgatroid996cc46802015-04-28 09:35:48 -0700259 NSData *data = [NSData grpc_dataWithByteBuffer:message];
260 grpc_byte_buffer_destroy(message);
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800261 if (!data) {
262 // The app doesn't have enough memory to hold the server response. We
263 // don't want to throw, because the app shouldn't crash for a behavior
264 // that's on the hands of any server to have. Instead we finish and ask
265 // the server to cancel.
Muxi Yan61274ca2016-10-28 12:09:59 -0700266 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
267 code:GRPCErrorCodeResourceExhausted
268 userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800269 [weakSelf cancelCall];
270 return;
271 }
Muxi Yan61274ca2016-10-28 12:09:59 -0700272 [weakWriteable enqueueValue:data completionHandler:^{
273 [weakSelf startNextRead];
274 }];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800275 }];
276 });
277}
278
279#pragma mark Send headers
280
Jorge Canizalesf4f150f2015-11-01 22:31:12 -0800281- (void)sendHeaders:(NSDictionary *)headers {
murgatroid99ca38ddb2015-04-29 13:16:42 -0700282 // TODO(jcanizales): Add error handlers for async failures
Muxi Yanbf803b92017-02-08 11:31:26 -0800283 GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers
284 flags:[GRPCCall callFlagsForHost:_host path:_path]
Muxi Yand5bac0d2017-03-07 18:44:19 -0800285 handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
Muxi Yana40ccd82016-11-05 21:39:44 -0700286 if (!_unaryCall) {
Muxi Yanbf803b92017-02-08 11:31:26 -0800287 [_wrappedCall startBatchWithOperations:@[op]];
Muxi Yana40ccd82016-11-05 21:39:44 -0700288 } else {
Muxi Yanbf803b92017-02-08 11:31:26 -0800289 [_unaryOpBatch addObject:op];
Muxi Yana40ccd82016-11-05 21:39:44 -0700290 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800291}
292
293#pragma mark GRXWriteable implementation
294
295// Only called from the call queue. The error handler will be called from the
296// network queue if the write didn't succeed.
Muxi Yan35653012017-03-07 18:54:24 -0800297// If the call is a unary call, parameter \a errorHandler will be ignored and
298// the error handler of GRPCOpSendClose will be executed in case of error.
Muxi Yan0c0ebc52017-10-19 18:41:01 -0700299- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
Muxi Yan61274ca2016-10-28 12:09:59 -0700300
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800301 __weak GRPCCall *weakSelf = self;
Muxi Yan61274ca2016-10-28 12:09:59 -0700302 void(^resumingHandler)(void) = ^{
murgatroid99ca38ddb2015-04-29 13:16:42 -0700303 // Resume the request writer.
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800304 GRPCCall *strongSelf = weakSelf;
305 if (strongSelf) {
Jorge Canizales578ab162015-08-08 17:11:43 -0700306 @synchronized(strongSelf->_requestWriter) {
Jorge Canizales238ad782015-08-07 23:11:29 -0700307 strongSelf->_requestWriter.state = GRXWriterStateStarted;
308 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800309 }
310 };
Muxi Yanbf803b92017-02-08 11:31:26 -0800311
312 GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message
313 handler:resumingHandler];
Muxi Yana40ccd82016-11-05 21:39:44 -0700314 if (!_unaryCall) {
Muxi Yanbf803b92017-02-08 11:31:26 -0800315 [_wrappedCall startBatchWithOperations:@[op]
Muxi Yana40ccd82016-11-05 21:39:44 -0700316 errorHandler:errorHandler];
317 } else {
Muxi Yan44e18372017-03-10 14:52:51 -0800318 // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
319 // TODO (mxyan): unify the error handlers of all Ops into a single closure.
Muxi Yanbf803b92017-02-08 11:31:26 -0800320 [_unaryOpBatch addObject:op];
Muxi Yana40ccd82016-11-05 21:39:44 -0700321 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800322}
323
Jorge Canizalesa90a9c32015-05-18 17:12:41 -0700324- (void)writeValue:(id)value {
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800325 // TODO(jcanizales): Throw/assert if value isn't NSData.
326
327 // Pause the input and only resume it when the C layer notifies us that writes
328 // can proceed.
Jorge Canizales238ad782015-08-07 23:11:29 -0700329 @synchronized(_requestWriter) {
330 _requestWriter.state = GRXWriterStatePaused;
331 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800332
333 __weak GRPCCall *weakSelf = self;
334 dispatch_async(_callQueue, ^{
Muxi Yan61274ca2016-10-28 12:09:59 -0700335 [weakSelf writeMessage:value withErrorHandler:^{
336 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800337 code:GRPCErrorCodeInternal
338 userInfo:nil]];
Muxi Yan61274ca2016-10-28 12:09:59 -0700339 }];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800340 });
341}
342
343// Only called from the call queue. The error handler will be called from the
344// network queue if the requests stream couldn't be closed successfully.
Muxi Yan0c0ebc52017-10-19 18:41:01 -0700345- (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
Muxi Yand5bac0d2017-03-07 18:44:19 -0800346 if (!_unaryCall) {
Muxi Yana40ccd82016-11-05 21:39:44 -0700347 [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
348 errorHandler:errorHandler];
349 } else {
350 [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
351 [_wrappedCall startBatchWithOperations:_unaryOpBatch
352 errorHandler:errorHandler];
353 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800354}
355
Jorge Canizalesb2c300c2015-05-18 17:19:16 -0700356- (void)writesFinishedWithError:(NSError *)errorOrNil {
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800357 if (errorOrNil) {
358 [self cancel];
359 } else {
360 __weak GRPCCall *weakSelf = self;
361 dispatch_async(_callQueue, ^{
362 [weakSelf finishRequestWithErrorHandler:^{
363 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
364 code:GRPCErrorCodeInternal
365 userInfo:nil]];
366 }];
367 });
368 }
369}
370
371#pragma mark Invoke
372
Muxi Yan61274ca2016-10-28 12:09:59 -0700373// Both handlers will eventually be called, from the network queue. Writes can start immediately
374// after this.
Jorge Canizales0b34c892015-08-12 20:19:20 -0700375// The first one (headersHandler), when the response headers are received.
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800376// The second one (completionHandler), whenever the RPC finishes for any reason.
Muxi Yan61274ca2016-10-28 12:09:59 -0700377- (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler
378 completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler {
murgatroid99ca38ddb2015-04-29 13:16:42 -0700379 // TODO(jcanizales): Add error handlers for async failures
Muxi Yan61274ca2016-10-28 12:09:59 -0700380 [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc]
381 initWithHandler:headersHandler]]];
382 [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc]
383 initWithHandler:completionHandler]]];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800384}
385
386- (void)invokeCall {
Jorge Canizales0b34c892015-08-12 20:19:20 -0700387 [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
Jorge Canizalesf3a4f2c2015-06-12 22:12:50 -0700388 // Response headers received.
Nicolas "Pixel" Noble70224362016-03-21 22:19:43 +0100389 self.responseHeaders = headers;
390 [self startNextRead];
Muxi Yan61274ca2016-10-28 12:09:59 -0700391 } completionHandler:^(NSError *error, NSDictionary *trailers) {
392 self.responseTrailers = trailers;
Jorge Canizalesf3a4f2c2015-06-12 22:12:50 -0700393
Muxi Yan61274ca2016-10-28 12:09:59 -0700394 if (error) {
395 NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
396 if (error.userInfo) {
397 [userInfo addEntriesFromDictionary:error.userInfo];
398 }
399 userInfo[kGRPCTrailersKey] = self.responseTrailers;
400 // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
401 // called before this one, so an error might end up with trailers but no headers. We
402 // shouldn't call finishWithError until ater both blocks are called. It is also when this is
403 // done that we can provide a merged view of response headers and trailers in a thread-safe
404 // way.
405 if (self.responseHeaders) {
406 userInfo[kGRPCHeadersKey] = self.responseHeaders;
407 }
408 error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
409 }
410 [self finishWithError:error];
411 }];
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800412 // Now that the RPC has been initiated, request writes can start.
Jorge Canizales238ad782015-08-07 23:11:29 -0700413 @synchronized(_requestWriter) {
414 [_requestWriter startWithWriteable:self];
415 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800416}
417
418#pragma mark GRXWriter implementation
419
Muxi Yanb82f82b2017-08-14 12:01:14 -0700420- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
421 _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable
422 dispatchQueue:_responseQueue];
423
424 _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host
425 serverName:_serverName
Muxi Yanf282c8f2017-08-30 14:40:15 -0700426 path:_path
Muxi Yanc7c8e3c2017-08-30 15:36:47 -0700427 timeout:_timeout];
Muxi Yanb82f82b2017-08-14 12:01:14 -0700428 NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
429
430 [self sendHeaders:_requestHeaders];
431 [self invokeCall];
432
433 // TODO(jcanizales): Extract this logic somewhere common.
434 NSString *host = [NSURL URLWithString:[@"https://" stringByAppendingString:_host]].host;
435 if (!host) {
436 // TODO(jcanizales): Check this on init.
437 [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host];
438 }
439 _connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host];
440 __weak typeof(self) weakSelf = self;
Muxi Yan0c0ebc52017-10-19 18:41:01 -0700441 void (^handler)(void) = ^{
Muxi Yanb82f82b2017-08-14 12:01:14 -0700442 typeof(self) strongSelf = weakSelf;
443 [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
444 code:GRPCErrorCodeUnavailable
445 userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]];
446 };
447 [_connectivityMonitor handleLossWithHandler:handler
448 wifiStatusChangeHandler:nil];
449}
450
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800451- (void)startWithWriteable:(id<GRXWriteable>)writeable {
Jorge Canizales0803bb02016-04-30 10:40:18 -0700452 @synchronized(self) {
453 _state = GRXWriterStateStarted;
454 }
455
Muxi Yan61274ca2016-10-28 12:09:59 -0700456 // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
457 // This makes RPCs in which the call isn't externally retained possible (as long as it is started
458 // before being autoreleased).
459 // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
460 // that the life of the instance is determined by this retain cycle.
Muxi Yane1443b12016-10-20 11:53:13 -0700461 _retainSelf = self;
462
Muxi Yanb82f82b2017-08-14 12:01:14 -0700463 if (self.tokenProvider != nil) {
464 self.isWaitingForToken = YES;
465 __weak typeof(self) weakSelf = self;
466 [self.tokenProvider getTokenWithHandler:^(NSString *token){
467 typeof(self) strongSelf = weakSelf;
468 if (strongSelf && strongSelf.isWaitingForToken) {
469 if (token) {
470 NSString *t = [kBearerPrefix stringByAppendingString:token];
471 strongSelf.requestHeaders[kAuthorizationHeader] = t;
472 }
473 [strongSelf startCallWithWriteable:writeable];
474 strongSelf.isWaitingForToken = NO;
475 }
476 }];
477 } else {
478 [self startCallWithWriteable:writeable];
Muxi Yan34949db2017-07-14 16:37:25 -0700479 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800480}
481
482- (void)setState:(GRXWriterState)newState {
Jorge Canizales0803bb02016-04-30 10:40:18 -0700483 @synchronized(self) {
484 // Manual transitions are only allowed from the started or paused states.
Muxi Yan61274ca2016-10-28 12:09:59 -0700485 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
Jorge Canizales0803bb02016-04-30 10:40:18 -0700486 return;
487 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800488
Jorge Canizales0803bb02016-04-30 10:40:18 -0700489 switch (newState) {
490 case GRXWriterStateFinished:
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800491 _state = newState;
Jorge Canizales0803bb02016-04-30 10:40:18 -0700492 // Per GRXWriter's contract, setting the state to Finished manually
493 // means one doesn't wish the writeable to be messaged anymore.
494 [_responseWriteable cancelSilently];
495 _responseWriteable = nil;
496 return;
497 case GRXWriterStatePaused:
498 _state = newState;
499 return;
500 case GRXWriterStateStarted:
501 if (_state == GRXWriterStatePaused) {
502 _state = newState;
503 [self startNextRead];
504 }
505 return;
506 case GRXWriterStateNotStarted:
507 return;
508 }
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800509 }
510}
Jorge Canizalesf1d084a2015-10-30 11:14:09 -0700511
Jorge Canizales5e0efd92015-02-17 18:23:58 -0800512@end