blob: 1edede08a089b1d9751607268c4f57e0e0be0a12 [file] [log] [blame]
package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.newtransport.Buffers.readAsStringUtf8;
import static com.google.net.stubby.newtransport.TransportFrameUtil.COMPRESSION_HEADER_LENGTH;
import static com.google.net.stubby.newtransport.TransportFrameUtil.FLATE_FLAG;
import static com.google.net.stubby.newtransport.TransportFrameUtil.NO_COMPRESS_FLAG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import com.google.net.stubby.newtransport.Buffer;
import com.google.net.stubby.newtransport.Buffers;
import com.google.net.stubby.newtransport.CompositeBuffer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.DeflaterOutputStream;
/**
* Tests for {@link NettyDecompressor}.
*/
@RunWith(JUnit4.class)
public class NettyDecompressorTest {
private static final String MESSAGE = "hello world";
private NettyDecompressor decompressor;
@Before
public void setup() {
decompressor = new NettyDecompressor(UnpooledByteBufAllocator.DEFAULT);
}
@Test
public void uncompressedDataShouldSucceed() throws Exception {
fullMessageShouldSucceed(false);
}
@Test
public void compressedDataShouldSucceed() throws Exception {
fullMessageShouldSucceed(true);
}
@Test
public void uncompressedFrameShouldNotBeReadableUntilComplete() throws Exception {
byte[] frame = frame(false);
byte[] chunk1 = Arrays.copyOf(frame, 5);
byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length);
// Decompress the first chunk and verify it's not readable yet.
decompressor.decompress(Buffers.wrap(chunk1));
CompositeBuffer composite = new CompositeBuffer();
Buffer buffer = decompressor.readBytes(2);
assertEquals(1, buffer.readableBytes());
composite.addBuffer(buffer);
// Decompress the rest of the frame and verify it's readable.
decompressor.decompress(Buffers.wrap(chunk2));
composite.addBuffer(decompressor.readBytes(MESSAGE.length() - 1));
assertEquals(MESSAGE, readAsStringUtf8(composite));
}
@Test
public void compressedFrameShouldNotBeReadableUntilComplete() throws Exception {
byte[] frame = frame(true);
byte[] chunk1 = Arrays.copyOf(frame, 5);
byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length);
// Decompress the first chunk and verify it's not readable yet.
decompressor.decompress(Buffers.wrap(chunk1));
Buffer buffer = decompressor.readBytes(2);
assertNull(buffer);
// Decompress the rest of the frame and verify it's readable.
decompressor.decompress(Buffers.wrap(chunk2));
CompositeBuffer composite = new CompositeBuffer();
for(int remaining = MESSAGE.length(); remaining > 0; ) {
Buffer buf = decompressor.readBytes(remaining);
if (buf == null) {
break;
}
composite.addBuffer(buf);
remaining -= buf.readableBytes();
}
assertEquals(MESSAGE, readAsStringUtf8(composite));
}
@Test
public void nettyBufferShouldBeReleasedAfterRead() throws Exception {
byte[] frame = frame(false);
byte[] chunk1 = Arrays.copyOf(frame, 5);
byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length);
NettyBuffer buffer1 = new NettyBuffer(Unpooled.wrappedBuffer(chunk1));
NettyBuffer buffer2 = new NettyBuffer(Unpooled.wrappedBuffer(chunk2));
// CompositeByteBuf always keeps at least one buffer internally, so we add a second so
// that it will release the first after it is read.
decompressor.decompress(buffer1);
decompressor.decompress(buffer2);
NettyBuffer readBuffer = (NettyBuffer) decompressor.readBytes(buffer1.readableBytes());
assertEquals(0, buffer1.buffer().refCnt());
assertEquals(1, readBuffer.buffer().refCnt());
}
@Test
public void closeShouldReleasedBuffers() throws Exception {
byte[] frame = frame(false);
byte[] chunk1 = Arrays.copyOf(frame, 5);
NettyBuffer buffer1 = new NettyBuffer(Unpooled.wrappedBuffer(chunk1));
decompressor.decompress(buffer1);
assertEquals(1, buffer1.buffer().refCnt());
decompressor.close();
assertEquals(0, buffer1.buffer().refCnt());
}
private void fullMessageShouldSucceed(boolean compress) throws Exception {
// Decompress the entire frame all at once.
byte[] frame = frame(compress);
decompressor.decompress(Buffers.wrap(frame));
// Read some bytes and verify.
int chunkSize = MESSAGE.length() / 2;
assertEquals(MESSAGE.substring(0, chunkSize),
readAsStringUtf8(decompressor.readBytes(chunkSize)));
// Read the rest and verify.
assertEquals(MESSAGE.substring(chunkSize),
readAsStringUtf8(decompressor.readBytes(MESSAGE.length() - chunkSize)));
}
/**
* Creates a compression frame from {@link #MESSAGE}, applying compression if requested.
*/
private byte[] frame(boolean compress) throws Exception {
byte[] msgBytes = bytes(MESSAGE);
if (compress) {
msgBytes = compress(msgBytes);
}
ByteArrayOutputStream os =
new ByteArrayOutputStream(msgBytes.length + COMPRESSION_HEADER_LENGTH);
DataOutputStream dos = new DataOutputStream(os);
int frameFlag = compress ? FLATE_FLAG : NO_COMPRESS_FLAG;
// Header = 1b flag | 3b length of GRPC frame
int header = (frameFlag << 24) | msgBytes.length;
dos.writeInt(header);
dos.write(msgBytes);
dos.close();
return os.toByteArray();
}
private byte[] bytes(String str) {
return str.getBytes(StandardCharsets.UTF_8);
}
private byte[] compress(byte[] data) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DeflaterOutputStream dos = new DeflaterOutputStream(out);
dos.write(data);
dos.close();
return out.toByteArray();
}
}