blob: cadb10a67b163e4a205568c9d4eebb2b12a6b071 [file] [log] [blame]
lryan56e307f2014-12-05 13:25:08 -08001/*
2 * Copyright 2014, Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 *
15 * * Neither the name of Google Inc. nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
ejona9a5a8de2014-10-30 11:47:51 -070032package com.google.net.stubby.testing.integration;
33
34import static com.google.net.stubby.testing.integration.Messages.PayloadType.COMPRESSABLE;
ejona3d4ea0d2014-11-03 11:03:23 -080035import static com.google.net.stubby.testing.integration.Util.assertEquals;
ejona9a5a8de2014-10-30 11:47:51 -070036import static org.junit.Assert.assertEquals;
ejona42fcc502014-12-23 11:17:10 -080037import static org.mockito.Mockito.mock;
38import static org.mockito.Mockito.timeout;
39import static org.mockito.Mockito.verify;
40import static org.mockito.Mockito.verifyNoMoreInteractions;
ejona9a5a8de2014-10-30 11:47:51 -070041
ejona9a5a8de2014-10-30 11:47:51 -070042import com.google.common.util.concurrent.SettableFuture;
43import com.google.common.util.concurrent.Uninterruptibles;
ejona2a93c472014-11-07 10:02:20 -080044import com.google.net.stubby.AbstractServerBuilder;
ejona9a5a8de2014-10-30 11:47:51 -070045import com.google.net.stubby.Call;
46import com.google.net.stubby.ChannelImpl;
47import com.google.net.stubby.Metadata;
ejona2a93c472014-11-07 10:02:20 -080048import com.google.net.stubby.ServerImpl;
49import com.google.net.stubby.ServerInterceptors;
ejona9a5a8de2014-10-30 11:47:51 -070050import com.google.net.stubby.Status;
51import com.google.net.stubby.proto.ProtoUtils;
52import com.google.net.stubby.stub.MetadataUtils;
53import com.google.net.stubby.stub.StreamObserver;
54import com.google.net.stubby.stub.StreamRecorder;
ejona2a93c472014-11-07 10:02:20 -080055import com.google.net.stubby.testing.TestUtils;
ejona3d4ea0d2014-11-03 11:03:23 -080056import com.google.net.stubby.testing.integration.Messages.Payload;
ejona9a5a8de2014-10-30 11:47:51 -070057import com.google.net.stubby.testing.integration.Messages.PayloadType;
ejona3d4ea0d2014-11-03 11:03:23 -080058import com.google.net.stubby.testing.integration.Messages.ResponseParameters;
ejona9a5a8de2014-10-30 11:47:51 -070059import com.google.net.stubby.testing.integration.Messages.SimpleRequest;
60import com.google.net.stubby.testing.integration.Messages.SimpleResponse;
61import com.google.net.stubby.testing.integration.Messages.StreamingInputCallRequest;
62import com.google.net.stubby.testing.integration.Messages.StreamingInputCallResponse;
63import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallRequest;
64import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallResponse;
ejona9a5a8de2014-10-30 11:47:51 -070065import com.google.protobuf.ByteString;
66import com.google.protobuf.EmptyProtos.Empty;
67
68import org.junit.After;
69import org.junit.Assert;
ejona9a5a8de2014-10-30 11:47:51 -070070import org.junit.Before;
71import org.junit.Test;
ejona1c20eb62014-12-23 11:30:37 -080072import org.mockito.ArgumentCaptor;
ejona9a5a8de2014-10-30 11:47:51 -070073
ejona9a5a8de2014-10-30 11:47:51 -070074import java.util.ArrayList;
nathanmittler0d8477c2014-11-20 09:35:16 -080075import java.util.Arrays;
ejona9a5a8de2014-10-30 11:47:51 -070076import java.util.Collections;
ejona9a5a8de2014-10-30 11:47:51 -070077import java.util.List;
78import java.util.Random;
ejona2a93c472014-11-07 10:02:20 -080079import java.util.concurrent.Executors;
80import java.util.concurrent.ScheduledExecutorService;
ejona9a5a8de2014-10-30 11:47:51 -070081import java.util.concurrent.TimeUnit;
nmittlerde3a1312015-01-16 11:54:24 -080082import java.util.concurrent.atomic.AtomicInteger;
ejona9a5a8de2014-10-30 11:47:51 -070083import java.util.concurrent.atomic.AtomicReference;
84
85/**
86 * Abstract base class for all GRPC transport tests.
87 */
88public abstract class AbstractTransportTest {
89
90 public static final Metadata.Key<Messages.SimpleContext> METADATA_KEY =
91 ProtoUtils.keyForProto(Messages.SimpleContext.getDefaultInstance());
ejona2a93c472014-11-07 10:02:20 -080092 private static ScheduledExecutorService testServiceExecutor;
93 private static ServerImpl server;
94
95 protected static void startStaticServer(AbstractServerBuilder<?> builder) {
96 testServiceExecutor = Executors.newScheduledThreadPool(2);
97
98 server = builder
99 .addService(ServerInterceptors.intercept(
100 TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)),
101 TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY)))
102 .buildAndWaitForRunning();
103 }
104
105 protected static void stopStaticServer() {
106 server.stopAsync();
107 testServiceExecutor.shutdown();
108 }
ejona9a5a8de2014-10-30 11:47:51 -0700109
110 protected ChannelImpl channel;
111 protected TestServiceGrpc.TestServiceBlockingStub blockingStub;
112 protected TestServiceGrpc.TestService asyncStub;
113
114 /**
115 * Must be called by the subclass setup method.
116 */
117 @Before
ejona7235a392015-01-13 13:38:54 -0800118 public void setup() {
ejona9a5a8de2014-10-30 11:47:51 -0700119 channel = createChannel();
ejona9a5a8de2014-10-30 11:47:51 -0700120 blockingStub = TestServiceGrpc.newBlockingStub(channel);
121 asyncStub = TestServiceGrpc.newStub(channel);
122 }
123
124 @After
125 public void teardown() throws Exception {
126 if (channel != null) {
ejona7235a392015-01-13 13:38:54 -0800127 channel.shutdown();
ejona9a5a8de2014-10-30 11:47:51 -0700128 }
129 }
130
131 protected abstract ChannelImpl createChannel();
132
133 @Test
ejona3d4ea0d2014-11-03 11:03:23 -0800134 public void emptyUnary() throws Exception {
135 assertEquals(Empty.getDefaultInstance(), blockingStub.emptyCall(Empty.getDefaultInstance()));
ejona9a5a8de2014-10-30 11:47:51 -0700136 }
137
138 @Test
ejona3d4ea0d2014-11-03 11:03:23 -0800139 public void largeUnary() throws Exception {
140 final SimpleRequest request = SimpleRequest.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800141 .setResponseSize(314159)
ejona3d4ea0d2014-11-03 11:03:23 -0800142 .setResponseType(PayloadType.COMPRESSABLE)
143 .setPayload(Payload.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800144 .setBody(ByteString.copyFrom(new byte[271828])))
ejona3d4ea0d2014-11-03 11:03:23 -0800145 .build();
146 final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
147 .setPayload(Payload.newBuilder()
148 .setType(PayloadType.COMPRESSABLE)
ejona28a2aba2014-11-19 08:54:05 -0800149 .setBody(ByteString.copyFrom(new byte[314159])))
ejona3d4ea0d2014-11-03 11:03:23 -0800150 .build();
151
152 assertEquals(goldenResponse, blockingStub.unaryCall(request));
ejona9a5a8de2014-10-30 11:47:51 -0700153 }
154
155 @Test
ejona3d4ea0d2014-11-03 11:03:23 -0800156 public void serverStreaming() throws Exception {
157 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
158 .setResponseType(PayloadType.COMPRESSABLE)
ejona3d4ea0d2014-11-03 11:03:23 -0800159 .addResponseParameters(ResponseParameters.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800160 .setSize(31415))
ejona3d4ea0d2014-11-03 11:03:23 -0800161 .addResponseParameters(ResponseParameters.newBuilder()
162 .setSize(9))
163 .addResponseParameters(ResponseParameters.newBuilder()
164 .setSize(2653))
165 .addResponseParameters(ResponseParameters.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800166 .setSize(58979))
ejona3d4ea0d2014-11-03 11:03:23 -0800167 .build();
168 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
169 StreamingOutputCallResponse.newBuilder()
170 .setPayload(Payload.newBuilder()
171 .setType(PayloadType.COMPRESSABLE)
ejona28a2aba2014-11-19 08:54:05 -0800172 .setBody(ByteString.copyFrom(new byte[31415])))
ejona3d4ea0d2014-11-03 11:03:23 -0800173 .build(),
174 StreamingOutputCallResponse.newBuilder()
175 .setPayload(Payload.newBuilder()
176 .setType(PayloadType.COMPRESSABLE)
177 .setBody(ByteString.copyFrom(new byte[9])))
178 .build(),
179 StreamingOutputCallResponse.newBuilder()
180 .setPayload(Payload.newBuilder()
181 .setType(PayloadType.COMPRESSABLE)
182 .setBody(ByteString.copyFrom(new byte[2653])))
183 .build(),
184 StreamingOutputCallResponse.newBuilder()
185 .setPayload(Payload.newBuilder()
186 .setType(PayloadType.COMPRESSABLE)
ejona28a2aba2014-11-19 08:54:05 -0800187 .setBody(ByteString.copyFrom(new byte[58979])))
ejona3d4ea0d2014-11-03 11:03:23 -0800188 .build());
ejona9a5a8de2014-10-30 11:47:51 -0700189
190 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
191 asyncStub.streamingOutputCall(request, recorder);
192 recorder.awaitCompletion();
193 assertSuccess(recorder);
ejona3d4ea0d2014-11-03 11:03:23 -0800194 assertEquals(goldenResponses, recorder.getValues());
ejona9a5a8de2014-10-30 11:47:51 -0700195 }
196
197 @Test
ejona3d4ea0d2014-11-03 11:03:23 -0800198 public void clientStreaming() throws Exception {
199 final List<StreamingInputCallRequest> requests = Arrays.asList(
200 StreamingInputCallRequest.newBuilder()
ejona3d4ea0d2014-11-03 11:03:23 -0800201 .setPayload(Payload.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800202 .setBody(ByteString.copyFrom(new byte[27182])))
ejona3d4ea0d2014-11-03 11:03:23 -0800203 .build(),
204 StreamingInputCallRequest.newBuilder()
205 .setPayload(Payload.newBuilder()
206 .setBody(ByteString.copyFrom(new byte[8])))
207 .build(),
208 StreamingInputCallRequest.newBuilder()
209 .setPayload(Payload.newBuilder()
210 .setBody(ByteString.copyFrom(new byte[1828])))
211 .build(),
212 StreamingInputCallRequest.newBuilder()
213 .setPayload(Payload.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800214 .setBody(ByteString.copyFrom(new byte[45904])))
ejona3d4ea0d2014-11-03 11:03:23 -0800215 .build());
216 final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800217 .setAggregatedPayloadSize(74922)
ejona3d4ea0d2014-11-03 11:03:23 -0800218 .build();
ejona9a5a8de2014-10-30 11:47:51 -0700219
zhangkunb186b372014-11-20 11:45:33 -0800220 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
221 StreamObserver<StreamingInputCallRequest> requestObserver =
222 asyncStub.streamingInputCall(responseObserver);
223 for (StreamingInputCallRequest request : requests) {
224 requestObserver.onValue(request);
225 }
226 requestObserver.onCompleted();
227 assertEquals(goldenResponse, responseObserver.firstValue().get());
ejona3d4ea0d2014-11-03 11:03:23 -0800228 }
229
230 @Test(timeout=5000)
231 public void pingPong() throws Exception {
232 final List<StreamingOutputCallRequest> requests = Arrays.asList(
233 StreamingOutputCallRequest.newBuilder()
ejona3d4ea0d2014-11-03 11:03:23 -0800234 .addResponseParameters(ResponseParameters.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800235 .setSize(31415))
ejona3d4ea0d2014-11-03 11:03:23 -0800236 .setPayload(Payload.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800237 .setBody(ByteString.copyFrom(new byte[27182])))
ejona3d4ea0d2014-11-03 11:03:23 -0800238 .build(),
239 StreamingOutputCallRequest.newBuilder()
240 .addResponseParameters(ResponseParameters.newBuilder()
241 .setSize(9))
242 .setPayload(Payload.newBuilder()
243 .setBody(ByteString.copyFrom(new byte[8])))
244 .build(),
245 StreamingOutputCallRequest.newBuilder()
246 .addResponseParameters(ResponseParameters.newBuilder()
247 .setSize(2653))
248 .setPayload(Payload.newBuilder()
249 .setBody(ByteString.copyFrom(new byte[1828])))
250 .build(),
251 StreamingOutputCallRequest.newBuilder()
252 .addResponseParameters(ResponseParameters.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800253 .setSize(58979))
ejona3d4ea0d2014-11-03 11:03:23 -0800254 .setPayload(Payload.newBuilder()
ejona28a2aba2014-11-19 08:54:05 -0800255 .setBody(ByteString.copyFrom(new byte[45904])))
ejona3d4ea0d2014-11-03 11:03:23 -0800256 .build());
257 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
258 StreamingOutputCallResponse.newBuilder()
259 .setPayload(Payload.newBuilder()
260 .setType(PayloadType.COMPRESSABLE)
ejona28a2aba2014-11-19 08:54:05 -0800261 .setBody(ByteString.copyFrom(new byte[31415])))
ejona3d4ea0d2014-11-03 11:03:23 -0800262 .build(),
263 StreamingOutputCallResponse.newBuilder()
264 .setPayload(Payload.newBuilder()
265 .setType(PayloadType.COMPRESSABLE)
266 .setBody(ByteString.copyFrom(new byte[9])))
267 .build(),
268 StreamingOutputCallResponse.newBuilder()
269 .setPayload(Payload.newBuilder()
270 .setType(PayloadType.COMPRESSABLE)
271 .setBody(ByteString.copyFrom(new byte[2653])))
272 .build(),
273 StreamingOutputCallResponse.newBuilder()
274 .setPayload(Payload.newBuilder()
275 .setType(PayloadType.COMPRESSABLE)
ejona28a2aba2014-11-19 08:54:05 -0800276 .setBody(ByteString.copyFrom(new byte[58979])))
ejona3d4ea0d2014-11-03 11:03:23 -0800277 .build());
278
ejona42fcc502014-12-23 11:17:10 -0800279 @SuppressWarnings("unchecked")
280 StreamObserver<StreamingOutputCallResponse> responseObserver = mock(StreamObserver.class);
281 StreamObserver<StreamingOutputCallRequest> requestObserver
282 = asyncStub.fullDuplexCall(responseObserver);
ejona3d4ea0d2014-11-03 11:03:23 -0800283 for (int i = 0; i < requests.size(); i++) {
284 requestObserver.onValue(requests.get(i));
ejona42fcc502014-12-23 11:17:10 -0800285 verify(responseObserver, timeout(1000)).onValue(goldenResponses.get(i));
286 verifyNoMoreInteractions(responseObserver);
ejona9a5a8de2014-10-30 11:47:51 -0700287 }
ejona3d4ea0d2014-11-03 11:03:23 -0800288 requestObserver.onCompleted();
ejona42fcc502014-12-23 11:17:10 -0800289 verify(responseObserver, timeout(1000)).onCompleted();
290 verifyNoMoreInteractions(responseObserver);
291 }
292
293 @Test
294 public void emptyStream() throws Exception {
295 @SuppressWarnings("unchecked")
296 StreamObserver<StreamingOutputCallResponse> responseObserver = mock(StreamObserver.class);
297 StreamObserver<StreamingOutputCallRequest> requestObserver
298 = asyncStub.fullDuplexCall(responseObserver);
299 requestObserver.onCompleted();
300 verify(responseObserver, timeout(1000)).onCompleted();
301 verifyNoMoreInteractions(responseObserver);
ejona9a5a8de2014-10-30 11:47:51 -0700302 }
303
304 @Test
ejona1c20eb62014-12-23 11:30:37 -0800305 public void cancelAfterBegin() throws Exception {
306 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
307 StreamObserver<StreamingInputCallRequest> requestObserver =
308 asyncStub.streamingInputCall(responseObserver);
309 requestObserver.onError(new RuntimeException());
310 responseObserver.awaitCompletion();
311 assertEquals(Arrays.<StreamingInputCallResponse>asList(), responseObserver.getValues());
312 assertEquals(Status.CANCELLED, Status.fromThrowable(responseObserver.getError()));
313 }
314
315 @Test
316 public void cancelAfterFirstResponse() throws Exception {
317 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
318 .addResponseParameters(ResponseParameters.newBuilder()
319 .setSize(31415))
320 .setPayload(Payload.newBuilder()
321 .setBody(ByteString.copyFrom(new byte[27182])))
322 .build();
323 final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder()
324 .setPayload(Payload.newBuilder()
325 .setType(PayloadType.COMPRESSABLE)
326 .setBody(ByteString.copyFrom(new byte[31415])))
327 .build();
328
329 @SuppressWarnings("unchecked")
330 StreamObserver<StreamingOutputCallResponse> responseObserver = mock(StreamObserver.class);
331 StreamObserver<StreamingOutputCallRequest> requestObserver
332 = asyncStub.fullDuplexCall(responseObserver);
333 requestObserver.onValue(request);
334 verify(responseObserver, timeout(1000)).onValue(goldenResponse);
335 verifyNoMoreInteractions(responseObserver);
336
337 requestObserver.onError(new RuntimeException());
338 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
339 verify(responseObserver, timeout(1000)).onError(captor.capture());
340 assertEquals(Status.CANCELLED, Status.fromThrowable(captor.getValue()));
341 verifyNoMoreInteractions(responseObserver);
342 }
343
344 @Test
ejona9a5a8de2014-10-30 11:47:51 -0700345 public void fullDuplexCallShouldSucceed() throws Exception {
346 // Build the request.
347 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
348 StreamingOutputCallRequest.Builder streamingOutputBuilder =
349 StreamingOutputCallRequest.newBuilder();
350 streamingOutputBuilder.setResponseType(COMPRESSABLE);
351 for (Integer size : responseSizes) {
352 streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
353 }
354 final StreamingOutputCallRequest request = streamingOutputBuilder.build();
355
356 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
357 StreamObserver<StreamingOutputCallRequest> requestStream =
zhangkunb186b372014-11-20 11:45:33 -0800358 asyncStub.fullDuplexCall(recorder);
ejona9a5a8de2014-10-30 11:47:51 -0700359
360 final int numRequests = 10;
361 for (int ix = numRequests; ix > 0; --ix) {
362 requestStream.onValue(request);
363 }
364 requestStream.onCompleted();
365 recorder.awaitCompletion();
366 assertSuccess(recorder);
367 assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
368 for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
369 StreamingOutputCallResponse response = recorder.getValues().get(ix);
370 assertEquals(COMPRESSABLE, response.getPayload().getType());
371 int length = response.getPayload().getBody().size();
372 int expectedSize = responseSizes.get(ix % responseSizes.size());
373 assertEquals("comparison failed at index " + ix, expectedSize, length);
374 }
375 }
376
377 @Test
378 public void halfDuplexCallShouldSucceed() throws Exception {
379 // Build the request.
380 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
381 StreamingOutputCallRequest.Builder streamingOutputBuilder =
382 StreamingOutputCallRequest.newBuilder();
383 streamingOutputBuilder.setResponseType(COMPRESSABLE);
384 for (Integer size : responseSizes) {
385 streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
386 }
387 final StreamingOutputCallRequest request = streamingOutputBuilder.build();
388
389 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
390 StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder);
391
392 final int numRequests = 10;
393 for (int ix = numRequests; ix > 0; --ix) {
394 requestStream.onValue(request);
395 }
396 requestStream.onCompleted();
397 recorder.awaitCompletion();
398 assertSuccess(recorder);
399 assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
400 for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
401 StreamingOutputCallResponse response = recorder.getValues().get(ix);
402 assertEquals(COMPRESSABLE, response.getPayload().getType());
403 int length = response.getPayload().getBody().size();
404 int expectedSize = responseSizes.get(ix % responseSizes.size());
405 assertEquals("comparison failed at index " + ix, expectedSize, length);
406 }
407 }
408
409 @Test
410 public void streamingOutputShouldBeFlowControlled() throws Exception {
411 // Create the call object.
412 Call<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
413 channel.newCall(TestServiceGrpc.CONFIG.streamingOutputCall);
414
415 // Build the request.
416 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
417 StreamingOutputCallRequest.Builder streamingOutputBuilder =
418 StreamingOutputCallRequest.newBuilder();
419 streamingOutputBuilder.setResponseType(COMPRESSABLE);
420 for (Integer size : responseSizes) {
421 streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
422 }
423 StreamingOutputCallRequest request = streamingOutputBuilder.build();
424
425 // Start the call and prepare capture of results.
426 final List<StreamingOutputCallResponse> results =
427 Collections.synchronizedList(new ArrayList<StreamingOutputCallResponse>());
ejona9a5a8de2014-10-30 11:47:51 -0700428 final SettableFuture<Void> completionFuture = SettableFuture.create();
nmittlerde3a1312015-01-16 11:54:24 -0800429 final AtomicInteger count = new AtomicInteger();
ejona9a5a8de2014-10-30 11:47:51 -0700430 call.start(new Call.Listener<StreamingOutputCallResponse>() {
431
432 @Override
nmittlerde3a1312015-01-16 11:54:24 -0800433 public void onHeaders(Metadata.Headers headers) {
ejona9a5a8de2014-10-30 11:47:51 -0700434 }
435
436 @Override
nmittlerde3a1312015-01-16 11:54:24 -0800437 public void onPayload(final StreamingOutputCallResponse payload) {
ejona9a5a8de2014-10-30 11:47:51 -0700438 results.add(payload);
nmittlerde3a1312015-01-16 11:54:24 -0800439 count.incrementAndGet();
ejona9a5a8de2014-10-30 11:47:51 -0700440 }
441
442 @Override
443 public void onClose(Status status, Metadata.Trailers trailers) {
444 if (status.isOk()) {
445 completionFuture.set(null);
446 } else {
447 completionFuture.setException(status.asException());
448 }
449 }
450 }, new Metadata.Headers());
451
452 // Send the request.
453 call.sendPayload(request);
454 call.halfClose();
455
456 // Slowly set completion on all of the futures.
457 int expectedResults = responseSizes.size();
nmittlerde3a1312015-01-16 11:54:24 -0800458 while (count.get() < expectedResults) {
459 // Allow one more inbound message to be delivered to the application.
460 call.request(1);
ejona9a5a8de2014-10-30 11:47:51 -0700461
462 // Sleep a bit.
463 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
464 }
465
466 // Wait for successful completion of the response.
467 completionFuture.get();
468
469 assertEquals(responseSizes.size(), results.size());
470 for (int ix = 0; ix < results.size(); ++ix) {
471 StreamingOutputCallResponse response = results.get(ix);
472 assertEquals(COMPRESSABLE, response.getPayload().getType());
473 int length = response.getPayload().getBody().size();
474 assertEquals("comparison failed at index " + ix, responseSizes.get(ix).intValue(), length);
475 }
476 }
477
478 @org.junit.Test
479 public void exchangeContextUnaryCall() throws Exception {
ejona9a5a8de2014-10-30 11:47:51 -0700480 TestServiceGrpc.TestServiceBlockingStub stub =
481 TestServiceGrpc.newBlockingStub(channel);
482
483 // Capture the context exchange
484 Metadata.Headers fixedHeaders = new Metadata.Headers();
485 // Send a context proto (as it's in the default extension registry)
486 Messages.SimpleContext contextValue =
487 Messages.SimpleContext.newBuilder().setValue("dog").build();
488 fixedHeaders.put(METADATA_KEY, contextValue);
489 stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
490 // .. and expect it to be echoed back in trailers
nathanmittler70341df2014-12-05 15:12:53 -0800491 AtomicReference<Metadata.Trailers> trailersCapture = new AtomicReference<Metadata.Trailers>();
492 AtomicReference<Metadata.Headers> headersCapture = new AtomicReference<Metadata.Headers>();
ejona9a5a8de2014-10-30 11:47:51 -0700493 stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
494
495 Assert.assertNotNull(stub.unaryCall(unaryRequest()));
496
497 // Assert that our side channel object is echoed back in both headers and trailers
498 Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY));
ejonac0f41922014-12-23 12:16:17 -0800499 Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
ejona9a5a8de2014-10-30 11:47:51 -0700500 }
501
502 @org.junit.Test
503 public void exchangeContextStreamingCall() throws Exception {
zhangkunb186b372014-11-20 11:45:33 -0800504 TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel);
ejona9a5a8de2014-10-30 11:47:51 -0700505
506 // Capture the context exchange
507 Metadata.Headers fixedHeaders = new Metadata.Headers();
508 // Send a context proto (as it's in the default extension registry)
509 Messages.SimpleContext contextValue =
510 Messages.SimpleContext.newBuilder().setValue("dog").build();
511 fixedHeaders.put(METADATA_KEY, contextValue);
512 stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
513 // .. and expect it to be echoed back in trailers
nathanmittler70341df2014-12-05 15:12:53 -0800514 AtomicReference<Metadata.Trailers> trailersCapture = new AtomicReference<Metadata.Trailers>();
515 AtomicReference<Metadata.Headers> headersCapture = new AtomicReference<Metadata.Headers>();
ejona9a5a8de2014-10-30 11:47:51 -0700516 stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
517
518 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
519 Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder =
520 Messages.StreamingOutputCallRequest.newBuilder();
521 streamingOutputBuilder.setResponseType(COMPRESSABLE);
522 for (Integer size : responseSizes) {
523 streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
524 }
525 final Messages.StreamingOutputCallRequest request = streamingOutputBuilder.build();
526
527 StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
528 StreamObserver<Messages.StreamingOutputCallRequest> requestStream =
529 stub.fullDuplexCall(recorder);
530
531 final int numRequests = 10;
532 for (int ix = numRequests; ix > 0; --ix) {
533 requestStream.onValue(request);
534 }
535 requestStream.onCompleted();
536 recorder.awaitCompletion();
537 assertSuccess(recorder);
538 org.junit.Assert.assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
539
540 // Assert that our side channel object is echoed back in both headers and trailers
541 Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY));
ejonac0f41922014-12-23 12:16:17 -0800542 Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
ejona9a5a8de2014-10-30 11:47:51 -0700543 }
544
545
546 protected int unaryPayloadLength() {
547 // 10MiB.
548 return 10485760;
549 }
550
551 protected SimpleRequest unaryRequest() {
552 SimpleRequest.Builder unaryBuilder = SimpleRequest.newBuilder();
553 unaryBuilder.getPayloadBuilder().setType(PayloadType.COMPRESSABLE);
554 byte[] data = new byte[unaryPayloadLength()];
555 new Random().nextBytes(data);
556 unaryBuilder.getPayloadBuilder().setBody(ByteString.copyFrom(data));
557 unaryBuilder.setResponseSize(10).setResponseType(PayloadType.COMPRESSABLE);
558 return unaryBuilder.build();
559 }
560
561 protected static void assertSuccess(StreamRecorder<?> recorder) {
562 if (recorder.getError() != null) {
nathanmittler70341df2014-12-05 15:12:53 -0800563 throw new AssertionError(recorder.getError());
ejona9a5a8de2014-10-30 11:47:51 -0700564 }
565 }
566}