Packet coalescing Objc layer and interop tests
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 44393f6..0a10322 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -36,6 +36,7 @@
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#import <RxLibrary/GRXConcurrentWriteable.h>
+#import <RxLibrary/GRXImmediateSingleWriter.h>
#import "private/GRPCConnectivityMonitor.h"
#import "private/GRPCHost.h"
@@ -100,6 +101,10 @@
GRPCCall *_retainSelf;
GRPCRequestHeaders *_requestHeaders;
+
+ BOOL _unaryCall;
+
+ NSMutableArray *_unaryOpBatch;
}
@synthesize state = _state;
@@ -157,6 +162,11 @@
_requestWriter = requestWriter;
_requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
+
+ if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
+ _unaryCall = true;
+ _unaryOpBatch = [[NSMutableArray alloc] init];
+ }
}
return self;
}
@@ -165,6 +175,9 @@
- (void)finishWithError:(NSError *)errorOrNil {
@synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
_state = GRXWriterStateFinished;
}
@@ -254,9 +267,15 @@
- (void)sendHeaders:(NSDictionary *)headers {
// TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
- flags:[GRPCCall callFlagsForHost:_host path:_path]
- handler:nil]]];
+ if (!_unaryCall) {
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
+ flags:[GRPCCall callFlagsForHost:_host path:_path]
+ handler:nil]]];
+ } else {
+ [_unaryOpBatch addObject:[[GRPCOpSendMetadata alloc] initWithMetadata:headers
+ flags:[GRPCCall callFlagsForHost:_host path:_path]
+ handler:nil]];
+ }
}
#pragma mark GRXWriteable implementation
@@ -275,9 +294,14 @@
}
}
};
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
- handler:resumingHandler]]
- errorHandler:errorHandler];
+ if (!_unaryCall) {
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]]
+ errorHandler:errorHandler];
+ } else {
+ [_unaryOpBatch addObject:[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]];
+ }
}
- (void)writeValue:(id)value {
@@ -302,8 +326,14 @@
// Only called from the call queue. The error handler will be called from the
// network queue if the requests stream couldn't be closed successfully.
- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
- errorHandler:errorHandler];
+ if (!_unaryOpBatch) {
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
+ errorHandler:errorHandler];
+ } else {
+ [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
+ [_wrappedCall startBatchWithOperations:_unaryOpBatch
+ errorHandler:errorHandler];
+ }
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {