blob: d47e10444aaa4bea7fa28b38b743cd91940366b5 [file] [log] [blame]
package com.google.net.stubby.newtransport.netty;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
/**
* Tests for {@link NettyClientStream}.
*/
@RunWith(JUnit4.class)
public class NettyClientStreamTest extends NettyStreamTestBase {
private NettyClientStream stream;
@Before
public void setup() {
init();
stream = new NettyClientStream(listener, channel);
assertEquals(StreamState.OPEN, stream.state());
}
@Override
protected NettyStream stream() {
return stream;
}
@Test
public void closeShouldSucceed() {
// Force stream creation.
stream.id(1);
stream.halfClose();
assertEquals(StreamState.READ_ONLY, stream.state());
}
@Test
public void cancelShouldSendCommand() {
stream.cancel();
verify(channel).writeAndFlush(any(CancelStreamCommand.class));
}
@Test
public void writeContextShouldSendRequest() throws Exception {
// Force stream creation.
stream.id(1);
stream.writeContext(CONTEXT_KEY, input, input.available(), accepted);
stream.flush();
verify(channel).writeAndFlush(new SendGrpcFrameCommand(1, contextFrame(), false));
verify(accepted).run();
}
@Test
public void writeMessageShouldSendRequest() throws Exception {
// Force stream creation.
stream.id(1);
stream.writeMessage(input, input.available(), accepted);
stream.flush();
verify(channel).writeAndFlush(new SendGrpcFrameCommand(1, messageFrame(), false));
verify(accepted).run();
}
@Test
public void setStatusWithOkShouldCloseStream() {
stream.id(1);
stream.setStatus(Status.OK);
verify(listener).closed(Status.OK);
assertEquals(StreamState.CLOSED, stream.state());
}
@Test
public void setStatusWithErrorShouldCloseStream() {
Status errorStatus = new Status(Transport.Code.INTERNAL);
stream.setStatus(errorStatus);
verify(listener).closed(eq(errorStatus));
assertEquals(StreamState.CLOSED, stream.state());
}
@Test
public void setStatusWithOkShouldNotOverrideError() {
Status errorStatus = new Status(Transport.Code.INTERNAL);
stream.setStatus(errorStatus);
stream.setStatus(Status.OK);
verify(listener).closed(any(Status.class));
assertEquals(StreamState.CLOSED, stream.state());
}
@Test
public void setStatusWithErrorShouldNotOverridePreviousError() {
Status errorStatus = new Status(Transport.Code.INTERNAL);
stream.setStatus(errorStatus);
stream.setStatus(Status.fromThrowable(new RuntimeException("fake")));
verify(listener).closed(any(Status.class));
assertEquals(StreamState.CLOSED, stream.state());
}
@Override
@Test
public void inboundContextShouldCallListener() throws Exception {
// Receive headers first so that it's a valid GRPC response.
stream.inboundHeadersRecieved(grpcResponseHeaders(), false);
super.inboundContextShouldCallListener();
}
@Override
@Test
public void inboundMessageShouldCallListener() throws Exception {
// Receive headers first so that it's a valid GRPC response.
stream.inboundHeadersRecieved(grpcResponseHeaders(), false);
super.inboundMessageShouldCallListener();
}
@Test
public void inboundStatusShouldSetStatus() throws Exception {
stream.id(1);
// Receive headers first so that it's a valid GRPC response.
stream.inboundHeadersRecieved(grpcResponseHeaders(), false);
stream.inboundDataReceived(statusFrame(new Status(Transport.Code.INTERNAL)), false, promise);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture());
assertEquals(Transport.Code.INTERNAL, captor.getValue().getCode());
verify(promise).setSuccess();
assertEquals(StreamState.CLOSED, stream.state());
}
@Test
public void nonGrpcResponseShouldSetStatus() throws Exception {
stream.inboundDataReceived(Unpooled.copiedBuffer(MESSAGE, UTF_8), true, promise);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture());
assertEquals(MESSAGE, captor.getValue().getDescription());
}
private Http2Headers grpcResponseHeaders() {
return DefaultHttp2Headers.newBuilder().status("200")
.set(HttpUtil.CONTENT_TYPE_HEADER, HttpUtil.CONTENT_TYPE_PROTORPC).build();
}
}