blob: b9248be5dbc1a6d4985f96d1389f72fb3952c8ac [file] [log] [blame]
Jorge Canizales5e0efd92015-02-17 18:23:58 -08001#import "GRPCCall.h"
2
3#include <grpc.h>
4#include <support/time.h>
5
6#import "GRPCMethodName.h"
7#import "private/GRPCChannel.h"
8#import "private/GRPCCompletionQueue.h"
9#import "private/GRPCDelegateWrapper.h"
10#import "private/GRPCMethodName+HTTP2Encoding.h"
11#import "private/NSData+GRPC.h"
12#import "private/NSDictionary+GRPC.h"
13#import "private/NSError+GRPC.h"
14
15// A grpc_call_error represents a precondition failure when invoking the
16// grpc_call_* functions. If one ever happens, it's a bug in this library.
17//
18// TODO(jcanizales): Can an application shut down gracefully when a thread other
19// than the main one throws an exception?
20static void AssertNoErrorInCall(grpc_call_error error) {
21 if (error != GRPC_CALL_OK) {
22 @throw [NSException exceptionWithName:NSInternalInconsistencyException
23 reason:@"Precondition of grpc_call_* not met."
24 userInfo:nil];
25 }
26}
27
28@interface GRPCCall () <GRXWriteable>
29// Makes it readwrite.
30@property(atomic, strong) NSDictionary *responseMetadata;
31@end
32
33// The following methods of a C gRPC call object aren't reentrant, and thus
34// calls to them must be serialized:
35// - add_metadata
36// - invoke
37// - start_write
38// - writes_done
39// - start_read
40// - destroy
41// The first four are called as part of responding to client commands, but
42// start_read we want to call as soon as we're notified that the RPC was
43// successfully established (which happens concurrently in the network queue).
44// Serialization is achieved by using a private serial queue to operate the
45// call object.
46// Because add_metadata and invoke are called and return successfully before
47// any of the other methods is called, they don't need to use the queue.
48//
49// Furthermore, start_write and writes_done can only be called after the
50// WRITE_ACCEPTED event for any previous write is received. This is achieved by
51// pausing the requests writer immediately every time it writes a value, and
52// resuming it again when WRITE_ACCEPTED is received.
53//
54// Similarly, start_read can only be called after the READ event for any
55// previous read is received. This is easier to enforce, as we're writing the
56// received messages into the writeable: start_read is enqueued once upon receiving
57// the CLIENT_METADATA_READ event, and then once after receiving each READ
58// event.
59@implementation GRPCCall {
60 dispatch_queue_t _callQueue;
61
62 grpc_call *_gRPCCall;
63 dispatch_once_t _callAlreadyInvoked;
64
65 GRPCChannel *_channel;
66 GRPCCompletionQueue *_completionQueue;
67
68 // The C gRPC library has less guarantees on the ordering of events than we
69 // do. Particularly, in the face of errors, there's no ordering guarantee at
70 // all. This wrapper over our actual writeable ensures thread-safety and
71 // correct ordering.
72 GRPCDelegateWrapper *_responseWriteable;
73 id<GRXWriter> _requestWriter;
74}
75
76@synthesize state = _state;
77
78- (instancetype)init {
79 return [self initWithHost:nil method:nil requestsWriter:nil];
80}
81
82// Designated initializer
83- (instancetype)initWithHost:(NSString *)host
84 method:(GRPCMethodName *)method
85 requestsWriter:(id<GRXWriter>)requestWriter {
86 if (!host || !method) {
87 [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."];
88 }
89 // TODO(jcanizales): Throw if the requestWriter was already started.
90 if ((self = [super init])) {
91 static dispatch_once_t initialization;
92 dispatch_once(&initialization, ^{
93 grpc_init();
94 });
95
96 _completionQueue = [GRPCCompletionQueue completionQueue];
97
98 _channel = [GRPCChannel channelToHost:host];
99 _gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel,
100 method.HTTP2Path.UTF8String,
101 host.UTF8String,
102 gpr_inf_future);
103
104 // Serial queue to invoke the non-reentrant methods of the grpc_call object.
105 _callQueue = dispatch_queue_create("org.grpc.call", NULL);
106
107 _requestWriter = requestWriter;
108 }
109 return self;
110}
111
112#pragma mark Finish
113
114- (void)finishWithError:(NSError *)errorOrNil {
115 _requestWriter.state = GRXWriterStateFinished;
116 _requestWriter = nil;
117 if (errorOrNil) {
118 [_responseWriteable cancelWithError:errorOrNil];
119 } else {
120 [_responseWriteable enqueueSuccessfulCompletion];
121 }
122}
123
124- (void)cancelCall {
125 // Can be called from any thread, any number of times.
126 AssertNoErrorInCall(grpc_call_cancel(_gRPCCall));
127}
128
129- (void)cancel {
130 [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
131 code:GRPCErrorCodeCancelled
132 userInfo:nil]];
133 [self cancelCall];
134}
135
136- (void)dealloc {
137 grpc_call *gRPCCall = _gRPCCall;
138 dispatch_async(_callQueue, ^{
139 grpc_call_destroy(gRPCCall);
140 });
141}
142
143#pragma mark Read messages
144
145// Only called from the call queue.
146// The handler will be called from the network queue.
147- (void)startReadWithHandler:(GRPCEventHandler)handler {
148 AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler));
149}
150
151// Called initially from the network queue once response headers are received,
152// then "recursively" from the responseWriteable queue after each response from the
153// server has been written.
154// If the call is currently paused, this is a noop. Restarting the call will invoke this
155// method.
156// TODO(jcanizales): Rename to readResponseIfNotPaused.
157- (void)startNextRead {
158 if (self.state == GRXWriterStatePaused) {
159 return;
160 }
161 __weak GRPCCall *weakSelf = self;
162 __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
163
164 dispatch_async(_callQueue, ^{
165 [weakSelf startReadWithHandler:^(grpc_event *event) {
166 if (!event->data.read) {
167 // No more responses from the server.
168 return;
169 }
170 NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read];
171 if (!data) {
172 // The app doesn't have enough memory to hold the server response. We
173 // don't want to throw, because the app shouldn't crash for a behavior
174 // that's on the hands of any server to have. Instead we finish and ask
175 // the server to cancel.
176 //
177 // TODO(jcanizales): No canonical code is appropriate for this situation
178 // (because it's just a client problem). Use another domain and an
179 // appropriately-documented code.
180 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
181 code:GRPCErrorCodeInternal
182 userInfo:nil]];
183 [weakSelf cancelCall];
184 return;
185 }
186 [weakWriteable enqueueMessage:data completionHandler:^{
187 [weakSelf startNextRead];
188 }];
189 }];
190 });
191}
192
193#pragma mark Send headers
194
195- (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value {
196 grpc_metadata metadata;
197 // Safe to discard const qualifiers; we're not going to modify the contents.
198 metadata.key = (char *)name.UTF8String;
199 metadata.value = (char *)value.bytes;
200 metadata.value_length = value.length;
201 grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
202}
203
204- (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value {
205 grpc_metadata metadata;
206 // Safe to discard const qualifiers; we're not going to modify the contents.
207 metadata.key = (char *)name.UTF8String;
208 metadata.value = (char *)value.UTF8String;
209 // The trailing \0 isn't encoded in HTTP2.
210 metadata.value_length = value.length;
211 grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
212}
213
214// TODO(jcanizales): Rename to commitHeaders.
215- (void)sendHeaders:(NSDictionary *)metadata {
216 for (NSString *name in metadata) {
217 id value = metadata[name];
218 if ([value isKindOfClass:[NSData class]]) {
219 [self addHeaderWithName:name binaryValue:value];
220 } else if ([value isKindOfClass:[NSString class]]) {
221 [self addHeaderWithName:name ASCIIValue:value];
222 }
223 }
224}
225
226#pragma mark GRXWriteable implementation
227
228// Only called from the call queue. The error handler will be called from the
229// network queue if the write didn't succeed.
230- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
231
232 __weak GRPCCall *weakSelf = self;
233 GRPCEventHandler resumingHandler = ^(grpc_event *event) {
234 if (event->data.write_accepted != GRPC_OP_OK) {
235 errorHandler();
236 }
237 // Resume the request writer (even in the case of error).
238 // TODO(jcanizales): No need to do it in the case of errors anymore?
239 GRPCCall *strongSelf = weakSelf;
240 if (strongSelf) {
241 strongSelf->_requestWriter.state = GRXWriterStateStarted;
242 }
243 };
244
245 grpc_byte_buffer *buffer = message.grpc_byteBuffer;
246 AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall,
247 buffer,
248 (__bridge_retained void *)resumingHandler,
249 0));
250 grpc_byte_buffer_destroy(buffer);
251}
252
253- (void)didReceiveValue:(id)value {
254 // TODO(jcanizales): Throw/assert if value isn't NSData.
255
256 // Pause the input and only resume it when the C layer notifies us that writes
257 // can proceed.
258 _requestWriter.state = GRXWriterStatePaused;
259
260 __weak GRPCCall *weakSelf = self;
261 dispatch_async(_callQueue, ^{
262 [weakSelf writeMessage:value withErrorHandler:^{
263 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
264 code:GRPCErrorCodeInternal
265 userInfo:nil]];
266 }];
267 });
268}
269
270// Only called from the call queue. The error handler will be called from the
271// network queue if the requests stream couldn't be closed successfully.
272- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
273 GRPCEventHandler handler = ^(grpc_event *event) {
274 if (event->data.finish_accepted != GRPC_OP_OK) {
275 errorHandler();
276 }
277 };
278 AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler));
279}
280
281- (void)didFinishWithError:(NSError *)errorOrNil {
282 if (errorOrNil) {
283 [self cancel];
284 } else {
285 __weak GRPCCall *weakSelf = self;
286 dispatch_async(_callQueue, ^{
287 [weakSelf finishRequestWithErrorHandler:^{
288 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
289 code:GRPCErrorCodeInternal
290 userInfo:nil]];
291 }];
292 });
293 }
294}
295
296#pragma mark Invoke
297
298// Both handlers will eventually be called, from the network queue. Writes can start immediately
299// after this.
300// The first one (metadataHandler), when the response headers are received.
301// The second one (completionHandler), whenever the RPC finishes for any reason.
302- (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler
303 completionHandler:(GRPCEventHandler)completionHandler {
304 AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall,
305 _completionQueue.unmanagedQueue,
306 (__bridge_retained void *)metadataHandler,
307 (__bridge_retained void *)completionHandler,
308 0));
309}
310
311- (void)invokeCall {
312 __weak GRPCCall *weakSelf = self;
313 [self invokeCallWithMetadataHandler:^(grpc_event *event) {
314 // Response metadata received.
315 // TODO(jcanizales): Name the type of event->data.client_metadata_read
316 // in the C library so one can actually pass the object to a method.
317 grpc_metadata *entries = event->data.client_metadata_read.elements;
318 size_t count = event->data.client_metadata_read.count;
319 GRPCCall *strongSelf = weakSelf;
320 if (strongSelf) {
321 strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries
322 count:count];
323 [strongSelf startNextRead];
324 }
325 } completionHandler:^(grpc_event *event) {
326 // TODO(jcanizales): Merge HTTP2 trailers into response metadata.
327 [weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]];
328 }];
329 // Now that the RPC has been initiated, request writes can start.
330 [_requestWriter startWithWriteable:self];
331}
332
333#pragma mark GRXWriter implementation
334
335- (void)startWithWriteable:(id<GRXWriteable>)writeable {
336 // The following produces a retain cycle self:_responseWriteable:self, which is only
337 // broken when didFinishWithError: is sent to the wrapped writeable.
338 // Care is taken not to retain self strongly in any of the blocks used in
339 // the implementation of GRPCCall, so that the life of the instance is
340 // determined by this retain cycle.
341 _responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self];
342 [self sendHeaders:_requestMetadata];
343 [self invokeCall];
344}
345
346- (void)setState:(GRXWriterState)newState {
347 // Manual transitions are only allowed from the started or paused states.
348 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
349 return;
350 }
351
352 switch (newState) {
353 case GRXWriterStateFinished:
354 _state = newState;
355 // Per GRXWriter's contract, setting the state to Finished manually
356 // means one doesn't wish the writeable to be messaged anymore.
357 [_responseWriteable cancelSilently];
358 _responseWriteable = nil;
359 return;
360 case GRXWriterStatePaused:
361 _state = newState;
362 return;
363 case GRXWriterStateStarted:
364 if (_state == GRXWriterStatePaused) {
365 _state = newState;
366 [self startNextRead];
367 }
368 return;
369 case GRXWriterStateNotStarted:
370 return;
371 }
372}
373@end