blob: 47cf2fa872cbec437adaf3029f97591b12a6cfe5 [file] [log] [blame]
/*
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import java.io.*;
import java.nio.ByteBuffer;
import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.Queue;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.frame.DataFrame;
import jdk.incubator.http.internal.frame.Http2Frame;
import jdk.incubator.http.internal.frame.ResetFrame;
/**
* InputStream reads frames off stream q and supplies read demand from any
* DataFrames it finds. Window updates are sent back on the connections send
* q.
*/
class BodyInputStream extends InputStream {
final Queue<Http2Frame> q;
final int streamid;
boolean closed;
boolean eof;
final Http2TestServerConnection conn;
@SuppressWarnings({"rawtypes","unchecked"})
BodyInputStream(Queue q, int streamid, Http2TestServerConnection conn) {
this.q = q;
this.streamid = streamid;
this.conn = conn;
}
DataFrame df;
ByteBuffer[] buffers;
ByteBuffer buffer;
int nextIndex = -1;
private DataFrame getData() throws IOException {
if (eof) {
return null;
}
Http2Frame frame;
do {
frame = q.take();
if (frame.type() == ResetFrame.TYPE) {
conn.handleStreamReset((ResetFrame) frame); // throws IOException
}
// ignoring others for now Wupdates handled elsewhere
if (frame.type() != DataFrame.TYPE) {
System.out.println("Ignoring " + frame.toString() + " CHECK THIS");
}
} while (frame.type() != DataFrame.TYPE);
df = (DataFrame) frame;
int len = df.payloadLength();
eof = frame.getFlag(DataFrame.END_STREAM);
// acknowledge
conn.sendWindowUpdates(len, streamid);
return (DataFrame) frame;
}
// null return means EOF
private ByteBuffer getBuffer() throws IOException {
if (buffer == null || !buffer.hasRemaining()) {
if (nextIndex == -1 || nextIndex == buffers.length) {
DataFrame df = getData();
if (df == null) {
return null;
}
ByteBufferReference[] data = df.getData();
int len = Utils.remaining(data);
if ((len == 0) && eof) {
return null;
}
buffers = ByteBufferReference.toBuffers(data);
nextIndex = 0;
}
buffer = buffers[nextIndex++];
}
return buffer;
}
@Override
public int read(byte[] buf, int offset, int length) throws IOException {
if (closed) {
throw new IOException("closed");
}
ByteBuffer b = getBuffer();
if (b == null) {
return -1;
}
int remaining = b.remaining();
if (remaining < length) {
length = remaining;
}
b.get(buf, offset, length);
return length;
}
byte[] one = new byte[1];
@Override
public int read() throws IOException {
int c = read(one, 0, 1);
if (c == -1) {
return -1;
}
return one[0];
}
@Override
public void close() {
// TODO reset this stream
closed = true;
}
}