blob: 3fbf5219232766d84f6c205812cef48a4fa9fbc3 [file] [log] [blame]
package com.google.net.stubby.newtransport;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* A {@link Buffer} that is composed of 0 or more {@link Buffer}s. This provides a facade that
* allows multiple buffers to be treated as one.
*
* <p>When a buffer is added to a composite, it's life cycle is controlled by the composite. Once
* the composite has read past the end of a given buffer, that buffer is automatically closed and
* removed from the composite.
*/
public class CompositeBuffer extends AbstractBuffer {
private int readableBytes;
private final Queue<Buffer> buffers = new ArrayDeque<Buffer>();
/**
* Adds a new {@link Buffer} at the end of the buffer list. After a buffer is added, it is
* expected that this {@link CompositeBuffer} has complete ownership. Any attempt to modify the
* buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
* this {@link CompositeBuffer}.
*/
public void addBuffer(Buffer buffer) {
if (!(buffer instanceof CompositeBuffer)) {
buffers.add(buffer);
readableBytes += buffer.readableBytes();
return;
}
CompositeBuffer compositeBuffer = (CompositeBuffer) buffer;
while (!compositeBuffer.buffers.isEmpty()) {
Buffer subBuffer = compositeBuffer.buffers.remove();
buffers.add(subBuffer);
}
readableBytes += compositeBuffer.readableBytes;
compositeBuffer.readableBytes = 0;
compositeBuffer.close();
}
@Override
public int readableBytes() {
return readableBytes;
}
@Override
public int readUnsignedByte() {
ReadOperation op = new ReadOperation() {
@Override
int readInternal(Buffer buffer, int length) {
return buffer.readUnsignedByte();
}
};
execute(op, 1);
return op.value;
}
@Override
public void skipBytes(int length) {
execute(new ReadOperation() {
@Override
public int readInternal(Buffer buffer, int length) {
buffer.skipBytes(length);
return 0;
}
}, length);
}
@Override
public void readBytes(final byte[] dest, final int destOffset, int length) {
execute(new ReadOperation() {
int currentOffset = destOffset;
@Override
public int readInternal(Buffer buffer, int length) {
buffer.readBytes(dest, currentOffset, length);
currentOffset += length;
return 0;
}
}, length);
}
@Override
public void readBytes(final ByteBuffer dest) {
execute(new ReadOperation() {
@Override
public int readInternal(Buffer buffer, int length) {
// Change the limit so that only lengthToCopy bytes are available.
int prevLimit = dest.limit();
dest.limit(dest.position() + length);
// Write the bytes and restore the original limit.
buffer.readBytes(dest);
dest.limit(prevLimit);
return 0;
}
}, dest.remaining());
}
@Override
public void readBytes(final OutputStream dest, int length) throws IOException {
ReadOperation op = new ReadOperation() {
@Override
public int readInternal(Buffer buffer, int length) throws IOException {
buffer.readBytes(dest, length);
return 0;
}
};
execute(op, length);
// If an exception occurred, throw it.
if (op.isError()) {
throw op.ex;
}
}
@Override
public CompositeBuffer readBytes(int length) {
checkReadable(length);
readableBytes -= length;
CompositeBuffer newBuffer = new CompositeBuffer();
while (length > 0) {
Buffer buffer = buffers.peek();
if (buffer.readableBytes() > length) {
newBuffer.addBuffer(buffer.readBytes(length));
length = 0;
} else {
newBuffer.addBuffer(buffers.poll());
length -= buffer.readableBytes();
}
}
return newBuffer;
}
@Override
public void close() {
while (!buffers.isEmpty()) {
buffers.remove().close();
}
}
/**
* Executes the given {@link ReadOperation} against the {@link Buffer}s required to satisfy the
* requested {@code length}.
*/
private void execute(ReadOperation op, int length) {
checkReadable(length);
for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
Buffer buffer = buffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes());
// Perform the read operation for this buffer.
op.read(buffer, lengthToCopy);
if (op.isError()) {
return;
}
length -= lengthToCopy;
readableBytes -= lengthToCopy;
}
if (length > 0) {
// Should never get here.
throw new AssertionError("Failed executing read operation");
}
}
/**
* If the current buffer is exhausted, removes and closes it.
*/
private void advanceBufferIfNecessary() {
Buffer buffer = buffers.peek();
if (buffer.readableBytes() == 0) {
buffers.remove().close();
}
}
/**
* A simple read operation to perform on a single {@link Buffer}. All state management for the
* buffers is done by {@link CompositeBuffer#execute(ReadOperation, int)}.
*/
private abstract class ReadOperation {
/**
* Only used by {@link CompositeBuffer#readUnsignedByte()}.
*/
int value;
/**
* Only used by {@link CompositeBuffer#readBytes(OutputStream, int)};
*/
IOException ex;
final void read(Buffer buffer, int length) {
try {
value = readInternal(buffer, length);
} catch (IOException e) {
ex = e;
}
}
final boolean isError() {
return ex != null;
}
abstract int readInternal(Buffer buffer, int length) throws IOException;
}
}