blob: 0c7473363f95db7d25494d57ebdf08b940438d5d [file] [log] [blame]
/*
* Copyright 2015, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.Attributes;
import io.grpc.Attributes.Key;
import io.grpc.Codec;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link DelayedStream}. Most of the state checking is enforced by
* {@link ClientCallImpl} so we don't check it here.
*/
@RunWith(JUnit4.class)
public class DelayedStreamTest {
@Mock private ClientStreamListener listener;
@Mock private ClientStream realStream;
@Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor;
private DelayedStream stream = new DelayedStream();
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void setStream_setAuthority() {
final String authority = "becauseIsaidSo";
stream.setAuthority(authority);
stream.start(listener);
stream.setStream(realStream);
InOrder inOrder = inOrder(realStream);
inOrder.verify(realStream).setAuthority(authority);
inOrder.verify(realStream).start(any(ClientStreamListener.class));
}
@Test(expected = IllegalStateException.class)
public void setAuthority_afterStart() {
stream.start(listener);
stream.setAuthority("notgonnawork");
}
@Test(expected = IllegalStateException.class)
public void start_afterStart() {
stream.start(listener);
stream.start(mock(ClientStreamListener.class));
}
@Test
public void setStream_sendsAllMessages() {
stream.start(listener);
stream.setCompressor(Codec.Identity.NONE);
stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
stream.setMessageCompression(true);
InputStream message = new ByteArrayInputStream(new byte[]{'a'});
stream.writeMessage(message);
stream.setMessageCompression(false);
stream.writeMessage(message);
stream.setStream(realStream);
verify(realStream).setCompressor(Codec.Identity.NONE);
verify(realStream).setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
verify(realStream).setMessageCompression(true);
verify(realStream).setMessageCompression(false);
verify(realStream, times(2)).writeMessage(message);
verify(realStream).start(listenerCaptor.capture());
stream.writeMessage(message);
verify(realStream, times(3)).writeMessage(message);
verifyNoMoreInteractions(listener);
listenerCaptor.getValue().onReady();
verify(listener).onReady();
}
@Test
public void setStream_halfClose() {
stream.start(listener);
stream.halfClose();
stream.setStream(realStream);
verify(realStream).halfClose();
}
@Test
public void setStream_flush() {
stream.start(listener);
stream.flush();
stream.setStream(realStream);
verify(realStream).flush();
stream.flush();
verify(realStream, times(2)).flush();
}
@Test
public void setStream_flowControl() {
stream.start(listener);
stream.request(1);
stream.request(2);
stream.setStream(realStream);
verify(realStream).request(1);
verify(realStream).request(2);
stream.request(3);
verify(realStream).request(3);
}
@Test
public void setStream_setMessageCompression() {
stream.start(listener);
stream.setMessageCompression(false);
stream.setStream(realStream);
verify(realStream).setMessageCompression(false);
stream.setMessageCompression(true);
verify(realStream).setMessageCompression(true);
}
@Test
public void setStream_isReady() {
stream.start(listener);
assertFalse(stream.isReady());
stream.setStream(realStream);
verify(realStream, never()).isReady();
assertFalse(stream.isReady());
verify(realStream).isReady();
when(realStream.isReady()).thenReturn(true);
assertTrue(stream.isReady());
verify(realStream, times(2)).isReady();
}
@Test
public void setStream_getAttributes() {
Attributes attributes =
Attributes.newBuilder().set(Key.<String>of("fakeKey"), "fakeValue").build();
when(realStream.getAttributes()).thenReturn(attributes);
stream.start(listener);
try {
stream.getAttributes(); // expect to throw IllegalStateException, otherwise fail()
fail();
} catch (IllegalStateException expected) {
// ignore
}
stream.setStream(realStream);
assertEquals(attributes, stream.getAttributes());
}
@Test
public void startThenCancelled() {
stream.start(listener);
stream.cancel(Status.CANCELLED);
verify(listener).closed(eq(Status.CANCELLED), any(Metadata.class));
}
@Test
public void startThenSetStreamThenCancelled() {
stream.start(listener);
stream.setStream(realStream);
stream.cancel(Status.CANCELLED);
verify(realStream).start(any(ClientStreamListener.class));
verify(realStream).cancel(same(Status.CANCELLED));
}
@Test
public void setStreamThenStartThenCancelled() {
stream.setStream(realStream);
stream.start(listener);
stream.cancel(Status.CANCELLED);
verify(realStream).start(same(listener));
verify(realStream).cancel(same(Status.CANCELLED));
}
@Test
public void setStreamThenCancelled() {
stream.setStream(realStream);
stream.cancel(Status.CANCELLED);
verify(realStream).cancel(same(Status.CANCELLED));
}
@Test
public void setStreamTwice() {
stream.start(listener);
stream.setStream(realStream);
verify(realStream).start(any(ClientStreamListener.class));
stream.setStream(mock(ClientStream.class));
stream.flush();
verify(realStream).flush();
}
@Test
public void cancelThenSetStream() {
stream.cancel(Status.CANCELLED);
stream.setStream(realStream);
stream.start(listener);
stream.isReady();
verifyNoMoreInteractions(realStream);
}
@Test
public void cancel_beforeStart() {
Status status = Status.CANCELLED.withDescription("that was quick");
stream.cancel(status);
stream.start(listener);
verify(listener).closed(same(status), any(Metadata.class));
}
@Test
public void cancelledThenStart() {
stream.cancel(Status.CANCELLED);
stream.start(listener);
verify(listener).closed(eq(Status.CANCELLED), any(Metadata.class));
}
@Test
public void listener_onReadyDelayedUntilPassthrough() {
class IsReadyListener extends NoopClientStreamListener {
boolean onReadyCalled;
@Override
public void onReady() {
// If onReady was not delayed, then passthrough==false and isReady will return false.
assertTrue(stream.isReady());
onReadyCalled = true;
}
}
IsReadyListener isReadyListener = new IsReadyListener();
stream.start(isReadyListener);
stream.setStream(new NoopClientStream() {
@Override
public void start(ClientStreamListener listener) {
// This call to the listener should end up being delayed.
listener.onReady();
}
@Override
public boolean isReady() {
return true;
}
});
assertTrue(isReadyListener.onReadyCalled);
}
@Test
public void listener_allQueued() {
final Metadata headers = new Metadata();
final InputStream message1 = mock(InputStream.class);
final InputStream message2 = mock(InputStream.class);
final Metadata trailers = new Metadata();
final Status status = Status.UNKNOWN.withDescription("unique status");
final InOrder inOrder = inOrder(listener);
stream.start(listener);
stream.setStream(new NoopClientStream() {
@Override
public void start(ClientStreamListener passedListener) {
passedListener.onReady();
passedListener.headersRead(headers);
passedListener.messageRead(message1);
passedListener.onReady();
passedListener.messageRead(message2);
passedListener.closed(status, trailers);
verifyNoMoreInteractions(listener);
}
});
inOrder.verify(listener).onReady();
inOrder.verify(listener).headersRead(headers);
inOrder.verify(listener).messageRead(message1);
inOrder.verify(listener).onReady();
inOrder.verify(listener).messageRead(message2);
inOrder.verify(listener).closed(status, trailers);
}
@Test
public void listener_noQueued() {
final Metadata headers = new Metadata();
final InputStream message = mock(InputStream.class);
final Metadata trailers = new Metadata();
final Status status = Status.UNKNOWN.withDescription("unique status");
stream.start(listener);
stream.setStream(realStream);
verify(realStream).start(listenerCaptor.capture());
ClientStreamListener delayedListener = listenerCaptor.getValue();
delayedListener.onReady();
verify(listener).onReady();
delayedListener.headersRead(headers);
verify(listener).headersRead(headers);
delayedListener.messageRead(message);
verify(listener).messageRead(message);
delayedListener.closed(status, trailers);
verify(listener).closed(status, trailers);
}
}