blob: 46a1e232e35f215d6d93e09c218bde2948af7f49 [file] [log] [blame]
Jorge Canizales9409ad82015-02-18 16:19:56 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Jorge Canizales5e0efd92015-02-17 18:23:58 -080034#import "GRPCCall.h"
35
36#include <grpc.h>
37#include <support/time.h>
38
39#import "GRPCMethodName.h"
40#import "private/GRPCChannel.h"
41#import "private/GRPCCompletionQueue.h"
42#import "private/GRPCDelegateWrapper.h"
43#import "private/GRPCMethodName+HTTP2Encoding.h"
44#import "private/NSData+GRPC.h"
45#import "private/NSDictionary+GRPC.h"
46#import "private/NSError+GRPC.h"
47
48// A grpc_call_error represents a precondition failure when invoking the
49// grpc_call_* functions. If one ever happens, it's a bug in this library.
50//
51// TODO(jcanizales): Can an application shut down gracefully when a thread other
52// than the main one throws an exception?
53static void AssertNoErrorInCall(grpc_call_error error) {
54 if (error != GRPC_CALL_OK) {
55 @throw [NSException exceptionWithName:NSInternalInconsistencyException
56 reason:@"Precondition of grpc_call_* not met."
57 userInfo:nil];
58 }
59}
60
61@interface GRPCCall () <GRXWriteable>
62// Makes it readwrite.
63@property(atomic, strong) NSDictionary *responseMetadata;
64@end
65
66// The following methods of a C gRPC call object aren't reentrant, and thus
67// calls to them must be serialized:
68// - add_metadata
69// - invoke
70// - start_write
71// - writes_done
72// - start_read
73// - destroy
74// The first four are called as part of responding to client commands, but
75// start_read we want to call as soon as we're notified that the RPC was
76// successfully established (which happens concurrently in the network queue).
77// Serialization is achieved by using a private serial queue to operate the
78// call object.
79// Because add_metadata and invoke are called and return successfully before
80// any of the other methods is called, they don't need to use the queue.
81//
82// Furthermore, start_write and writes_done can only be called after the
83// WRITE_ACCEPTED event for any previous write is received. This is achieved by
84// pausing the requests writer immediately every time it writes a value, and
85// resuming it again when WRITE_ACCEPTED is received.
86//
87// Similarly, start_read can only be called after the READ event for any
88// previous read is received. This is easier to enforce, as we're writing the
89// received messages into the writeable: start_read is enqueued once upon receiving
90// the CLIENT_METADATA_READ event, and then once after receiving each READ
91// event.
92@implementation GRPCCall {
93 dispatch_queue_t _callQueue;
94
95 grpc_call *_gRPCCall;
96 dispatch_once_t _callAlreadyInvoked;
97
98 GRPCChannel *_channel;
99 GRPCCompletionQueue *_completionQueue;
100
101 // The C gRPC library has less guarantees on the ordering of events than we
102 // do. Particularly, in the face of errors, there's no ordering guarantee at
103 // all. This wrapper over our actual writeable ensures thread-safety and
104 // correct ordering.
105 GRPCDelegateWrapper *_responseWriteable;
106 id<GRXWriter> _requestWriter;
107}
108
109@synthesize state = _state;
110
111- (instancetype)init {
112 return [self initWithHost:nil method:nil requestsWriter:nil];
113}
114
115// Designated initializer
116- (instancetype)initWithHost:(NSString *)host
117 method:(GRPCMethodName *)method
118 requestsWriter:(id<GRXWriter>)requestWriter {
119 if (!host || !method) {
120 [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."];
121 }
122 // TODO(jcanizales): Throw if the requestWriter was already started.
123 if ((self = [super init])) {
124 static dispatch_once_t initialization;
125 dispatch_once(&initialization, ^{
126 grpc_init();
127 });
128
129 _completionQueue = [GRPCCompletionQueue completionQueue];
130
131 _channel = [GRPCChannel channelToHost:host];
132 _gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel,
133 method.HTTP2Path.UTF8String,
134 host.UTF8String,
135 gpr_inf_future);
136
137 // Serial queue to invoke the non-reentrant methods of the grpc_call object.
138 _callQueue = dispatch_queue_create("org.grpc.call", NULL);
139
140 _requestWriter = requestWriter;
141 }
142 return self;
143}
144
145#pragma mark Finish
146
147- (void)finishWithError:(NSError *)errorOrNil {
148 _requestWriter.state = GRXWriterStateFinished;
149 _requestWriter = nil;
150 if (errorOrNil) {
151 [_responseWriteable cancelWithError:errorOrNil];
152 } else {
153 [_responseWriteable enqueueSuccessfulCompletion];
154 }
155}
156
157- (void)cancelCall {
158 // Can be called from any thread, any number of times.
159 AssertNoErrorInCall(grpc_call_cancel(_gRPCCall));
160}
161
162- (void)cancel {
163 [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
164 code:GRPCErrorCodeCancelled
165 userInfo:nil]];
166 [self cancelCall];
167}
168
169- (void)dealloc {
170 grpc_call *gRPCCall = _gRPCCall;
171 dispatch_async(_callQueue, ^{
172 grpc_call_destroy(gRPCCall);
173 });
174}
175
176#pragma mark Read messages
177
178// Only called from the call queue.
179// The handler will be called from the network queue.
180- (void)startReadWithHandler:(GRPCEventHandler)handler {
181 AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler));
182}
183
184// Called initially from the network queue once response headers are received,
185// then "recursively" from the responseWriteable queue after each response from the
186// server has been written.
187// If the call is currently paused, this is a noop. Restarting the call will invoke this
188// method.
189// TODO(jcanizales): Rename to readResponseIfNotPaused.
190- (void)startNextRead {
191 if (self.state == GRXWriterStatePaused) {
192 return;
193 }
194 __weak GRPCCall *weakSelf = self;
195 __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
196
197 dispatch_async(_callQueue, ^{
198 [weakSelf startReadWithHandler:^(grpc_event *event) {
199 if (!event->data.read) {
200 // No more responses from the server.
201 return;
202 }
203 NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read];
204 if (!data) {
205 // The app doesn't have enough memory to hold the server response. We
206 // don't want to throw, because the app shouldn't crash for a behavior
207 // that's on the hands of any server to have. Instead we finish and ask
208 // the server to cancel.
209 //
210 // TODO(jcanizales): No canonical code is appropriate for this situation
211 // (because it's just a client problem). Use another domain and an
212 // appropriately-documented code.
213 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
214 code:GRPCErrorCodeInternal
215 userInfo:nil]];
216 [weakSelf cancelCall];
217 return;
218 }
219 [weakWriteable enqueueMessage:data completionHandler:^{
220 [weakSelf startNextRead];
221 }];
222 }];
223 });
224}
225
226#pragma mark Send headers
227
228- (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value {
229 grpc_metadata metadata;
230 // Safe to discard const qualifiers; we're not going to modify the contents.
231 metadata.key = (char *)name.UTF8String;
232 metadata.value = (char *)value.bytes;
233 metadata.value_length = value.length;
234 grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
235}
236
237- (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value {
238 grpc_metadata metadata;
239 // Safe to discard const qualifiers; we're not going to modify the contents.
240 metadata.key = (char *)name.UTF8String;
241 metadata.value = (char *)value.UTF8String;
242 // The trailing \0 isn't encoded in HTTP2.
243 metadata.value_length = value.length;
244 grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
245}
246
247// TODO(jcanizales): Rename to commitHeaders.
248- (void)sendHeaders:(NSDictionary *)metadata {
249 for (NSString *name in metadata) {
250 id value = metadata[name];
251 if ([value isKindOfClass:[NSData class]]) {
252 [self addHeaderWithName:name binaryValue:value];
253 } else if ([value isKindOfClass:[NSString class]]) {
254 [self addHeaderWithName:name ASCIIValue:value];
255 }
256 }
257}
258
259#pragma mark GRXWriteable implementation
260
261// Only called from the call queue. The error handler will be called from the
262// network queue if the write didn't succeed.
263- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
264
265 __weak GRPCCall *weakSelf = self;
266 GRPCEventHandler resumingHandler = ^(grpc_event *event) {
267 if (event->data.write_accepted != GRPC_OP_OK) {
268 errorHandler();
269 }
270 // Resume the request writer (even in the case of error).
271 // TODO(jcanizales): No need to do it in the case of errors anymore?
272 GRPCCall *strongSelf = weakSelf;
273 if (strongSelf) {
274 strongSelf->_requestWriter.state = GRXWriterStateStarted;
275 }
276 };
277
278 grpc_byte_buffer *buffer = message.grpc_byteBuffer;
279 AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall,
280 buffer,
281 (__bridge_retained void *)resumingHandler,
282 0));
283 grpc_byte_buffer_destroy(buffer);
284}
285
286- (void)didReceiveValue:(id)value {
287 // TODO(jcanizales): Throw/assert if value isn't NSData.
288
289 // Pause the input and only resume it when the C layer notifies us that writes
290 // can proceed.
291 _requestWriter.state = GRXWriterStatePaused;
292
293 __weak GRPCCall *weakSelf = self;
294 dispatch_async(_callQueue, ^{
295 [weakSelf writeMessage:value withErrorHandler:^{
296 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
297 code:GRPCErrorCodeInternal
298 userInfo:nil]];
299 }];
300 });
301}
302
303// Only called from the call queue. The error handler will be called from the
304// network queue if the requests stream couldn't be closed successfully.
305- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
306 GRPCEventHandler handler = ^(grpc_event *event) {
307 if (event->data.finish_accepted != GRPC_OP_OK) {
308 errorHandler();
309 }
310 };
311 AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler));
312}
313
314- (void)didFinishWithError:(NSError *)errorOrNil {
315 if (errorOrNil) {
316 [self cancel];
317 } else {
318 __weak GRPCCall *weakSelf = self;
319 dispatch_async(_callQueue, ^{
320 [weakSelf finishRequestWithErrorHandler:^{
321 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
322 code:GRPCErrorCodeInternal
323 userInfo:nil]];
324 }];
325 });
326 }
327}
328
329#pragma mark Invoke
330
331// Both handlers will eventually be called, from the network queue. Writes can start immediately
332// after this.
333// The first one (metadataHandler), when the response headers are received.
334// The second one (completionHandler), whenever the RPC finishes for any reason.
335- (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler
336 completionHandler:(GRPCEventHandler)completionHandler {
337 AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall,
338 _completionQueue.unmanagedQueue,
339 (__bridge_retained void *)metadataHandler,
340 (__bridge_retained void *)completionHandler,
341 0));
342}
343
344- (void)invokeCall {
345 __weak GRPCCall *weakSelf = self;
346 [self invokeCallWithMetadataHandler:^(grpc_event *event) {
347 // Response metadata received.
348 // TODO(jcanizales): Name the type of event->data.client_metadata_read
349 // in the C library so one can actually pass the object to a method.
350 grpc_metadata *entries = event->data.client_metadata_read.elements;
351 size_t count = event->data.client_metadata_read.count;
352 GRPCCall *strongSelf = weakSelf;
353 if (strongSelf) {
354 strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries
355 count:count];
356 [strongSelf startNextRead];
357 }
358 } completionHandler:^(grpc_event *event) {
359 // TODO(jcanizales): Merge HTTP2 trailers into response metadata.
360 [weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]];
361 }];
362 // Now that the RPC has been initiated, request writes can start.
363 [_requestWriter startWithWriteable:self];
364}
365
366#pragma mark GRXWriter implementation
367
368- (void)startWithWriteable:(id<GRXWriteable>)writeable {
369 // The following produces a retain cycle self:_responseWriteable:self, which is only
370 // broken when didFinishWithError: is sent to the wrapped writeable.
371 // Care is taken not to retain self strongly in any of the blocks used in
372 // the implementation of GRPCCall, so that the life of the instance is
373 // determined by this retain cycle.
374 _responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self];
375 [self sendHeaders:_requestMetadata];
376 [self invokeCall];
377}
378
379- (void)setState:(GRXWriterState)newState {
380 // Manual transitions are only allowed from the started or paused states.
381 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
382 return;
383 }
384
385 switch (newState) {
386 case GRXWriterStateFinished:
387 _state = newState;
388 // Per GRXWriter's contract, setting the state to Finished manually
389 // means one doesn't wish the writeable to be messaged anymore.
390 [_responseWriteable cancelSilently];
391 _responseWriteable = nil;
392 return;
393 case GRXWriterStatePaused:
394 _state = newState;
395 return;
396 case GRXWriterStateStarted:
397 if (_state == GRXWriterStatePaused) {
398 _state = newState;
399 [self startNextRead];
400 }
401 return;
402 case GRXWriterStateNotStarted:
403 return;
404 }
405}
406@end