| /* |
| * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| package com.sun.corba.se.impl.encoding; |
| |
| import java.nio.ByteBuffer; |
| import com.sun.corba.se.pept.transport.ByteBufferPool; |
| import com.sun.corba.se.spi.logging.CORBALogDomains; |
| import com.sun.corba.se.spi.orb.ORB; |
| import com.sun.corba.se.impl.logging.ORBUtilSystemException; |
| import com.sun.corba.se.impl.orbutil.ORBUtility; |
| import com.sun.corba.se.impl.protocol.RequestCanceledException; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; |
| import java.util.*; |
| |
| public class BufferManagerReadStream |
| implements BufferManagerRead, MarkAndResetHandler |
| { |
| private boolean receivedCancel = false; |
| private int cancelReqId = 0; |
| |
| // We should convert endOfStream to a final static dummy end node |
| private boolean endOfStream = true; |
| private BufferQueue fragmentQueue = new BufferQueue(); |
| private long FRAGMENT_TIMEOUT = 60000; |
| |
| // REVISIT - This should go in BufferManagerRead. But, since |
| // BufferManagerRead is an interface. BufferManagerRead |
| // might ought to be an abstract class instead of an |
| // interface. |
| private ORB orb ; |
| private ORBUtilSystemException wrapper ; |
| private boolean debug = false; |
| |
| BufferManagerReadStream( ORB orb ) |
| { |
| this.orb = orb ; |
| this.wrapper = ORBUtilSystemException.get( orb, |
| CORBALogDomains.RPC_ENCODING ) ; |
| debug = orb.transportDebugFlag; |
| } |
| |
| public void cancelProcessing(int requestId) { |
| synchronized(fragmentQueue) { |
| receivedCancel = true; |
| cancelReqId = requestId; |
| fragmentQueue.notify(); |
| } |
| } |
| |
| public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg) |
| { |
| ByteBufferWithInfo bbwi = |
| new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength()); |
| |
| synchronized (fragmentQueue) { |
| if (debug) |
| { |
| // print address of ByteBuffer being queued |
| int bbAddress = System.identityHashCode(byteBuffer); |
| StringBuffer sb = new StringBuffer(80); |
| sb.append("processFragment() - queueing ByteBuffer id ("); |
| sb.append(bbAddress).append(") to fragment queue."); |
| String strMsg = sb.toString(); |
| dprint(strMsg); |
| } |
| fragmentQueue.enqueue(bbwi); |
| endOfStream = !msg.moreFragmentsToFollow(); |
| fragmentQueue.notify(); |
| } |
| } |
| |
| public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi) |
| { |
| |
| ByteBufferWithInfo result = null; |
| |
| try { |
| //System.out.println("ENTER underflow"); |
| |
| synchronized (fragmentQueue) { |
| |
| if (receivedCancel) { |
| throw new RequestCanceledException(cancelReqId); |
| } |
| |
| while (fragmentQueue.size() == 0) { |
| |
| if (endOfStream) { |
| throw wrapper.endOfStream() ; |
| } |
| |
| boolean interrupted = false; |
| try { |
| fragmentQueue.wait(FRAGMENT_TIMEOUT); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } |
| |
| if (!interrupted && fragmentQueue.size() == 0) { |
| throw wrapper.bufferReadManagerTimeout(); |
| } |
| |
| if (receivedCancel) { |
| throw new RequestCanceledException(cancelReqId); |
| } |
| } |
| |
| result = fragmentQueue.dequeue(); |
| result.fragmented = true; |
| |
| if (debug) |
| { |
| // print address of ByteBuffer being dequeued |
| int bbAddr = System.identityHashCode(result.byteBuffer); |
| StringBuffer sb1 = new StringBuffer(80); |
| sb1.append("underflow() - dequeued ByteBuffer id ("); |
| sb1.append(bbAddr).append(") from fragment queue."); |
| String msg1 = sb1.toString(); |
| dprint(msg1); |
| } |
| |
| // VERY IMPORTANT |
| // Release bbwi.byteBuffer to the ByteBufferPool only if |
| // this BufferManagerStream is not marked for potential restore. |
| if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null) |
| { |
| ByteBufferPool byteBufferPool = getByteBufferPool(); |
| |
| if (debug) |
| { |
| // print address of ByteBuffer being released |
| int bbAddress = System.identityHashCode(bbwi.byteBuffer); |
| StringBuffer sb = new StringBuffer(80); |
| sb.append("underflow() - releasing ByteBuffer id ("); |
| sb.append(bbAddress).append(") to ByteBufferPool."); |
| String msg = sb.toString(); |
| dprint(msg); |
| } |
| |
| byteBufferPool.releaseByteBuffer(bbwi.byteBuffer); |
| bbwi.byteBuffer = null; |
| bbwi = null; |
| } |
| } |
| return result; |
| } finally { |
| //System.out.println("EXIT underflow"); |
| } |
| } |
| |
| public void init(Message msg) { |
| if (msg != null) |
| endOfStream = !msg.moreFragmentsToFollow(); |
| } |
| |
| // Release any queued ByteBufferWithInfo's byteBuffers to the |
| // ByteBufferPoool |
| public void close(ByteBufferWithInfo bbwi) |
| { |
| int inputBbAddress = 0; |
| |
| // release ByteBuffers on fragmentQueue |
| if (fragmentQueue != null) |
| { |
| synchronized (fragmentQueue) |
| { |
| // IMPORTANT: The fragment queue may have one ByteBuffer |
| // on it that's also on the CDRInputStream if |
| // this method is called when the stream is 'marked'. |
| // Thus, we'll compare the ByteBuffer passed |
| // in (from a CDRInputStream) with all ByteBuffers |
| // on the stack. If one is found to equal, it will |
| // not be released to the ByteBufferPool. |
| if (bbwi != null) |
| { |
| inputBbAddress = System.identityHashCode(bbwi.byteBuffer); |
| } |
| |
| ByteBufferWithInfo abbwi = null; |
| ByteBufferPool byteBufferPool = getByteBufferPool(); |
| while (fragmentQueue.size() != 0) |
| { |
| abbwi = fragmentQueue.dequeue(); |
| if (abbwi != null && abbwi.byteBuffer != null) |
| { |
| int bbAddress = System.identityHashCode(abbwi.byteBuffer); |
| if (inputBbAddress != bbAddress) |
| { |
| if (debug) |
| { |
| // print address of ByteBuffer released |
| StringBuffer sb = new StringBuffer(80); |
| sb.append("close() - fragmentQueue is ") |
| .append("releasing ByteBuffer id (") |
| .append(bbAddress).append(") to ") |
| .append("ByteBufferPool."); |
| String msg = sb.toString(); |
| dprint(msg); |
| } |
| } |
| byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); |
| } |
| } |
| } |
| fragmentQueue = null; |
| } |
| |
| // release ByteBuffers on fragmentStack |
| if (fragmentStack != null && fragmentStack.size() != 0) |
| { |
| // IMPORTANT: The fragment stack may have one ByteBuffer |
| // on it that's also on the CDRInputStream if |
| // this method is called when the stream is 'marked'. |
| // Thus, we'll compare the ByteBuffer passed |
| // in (from a CDRInputStream) with all ByteBuffers |
| // on the stack. If one is found to equal, it will |
| // not be released to the ByteBufferPool. |
| if (bbwi != null) |
| { |
| inputBbAddress = System.identityHashCode(bbwi.byteBuffer); |
| } |
| |
| ByteBufferWithInfo abbwi = null; |
| ByteBufferPool byteBufferPool = getByteBufferPool(); |
| ListIterator itr = fragmentStack.listIterator(); |
| while (itr.hasNext()) |
| { |
| abbwi = (ByteBufferWithInfo)itr.next(); |
| |
| if (abbwi != null && abbwi.byteBuffer != null) |
| { |
| int bbAddress = System.identityHashCode(abbwi.byteBuffer); |
| if (inputBbAddress != bbAddress) |
| { |
| if (debug) |
| { |
| // print address of ByteBuffer being released |
| StringBuffer sb = new StringBuffer(80); |
| sb.append("close() - fragmentStack - releasing ") |
| .append("ByteBuffer id (" + bbAddress + ") to ") |
| .append("ByteBufferPool."); |
| String msg = sb.toString(); |
| dprint(msg); |
| } |
| byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); |
| } |
| } |
| } |
| fragmentStack = null; |
| } |
| |
| } |
| |
| protected ByteBufferPool getByteBufferPool() |
| { |
| return orb.getByteBufferPool(); |
| } |
| |
| private void dprint(String msg) |
| { |
| ORBUtility.dprint("BufferManagerReadStream", msg); |
| } |
| |
| // Mark and reset handler ---------------------------------------- |
| |
| private boolean markEngaged = false; |
| |
| // List of fragment ByteBufferWithInfos received since |
| // the mark was engaged. |
| private LinkedList fragmentStack = null; |
| private RestorableInputStream inputStream = null; |
| |
| // Original state of the stream |
| private Object streamMemento = null; |
| |
| public void mark(RestorableInputStream inputStream) |
| { |
| this.inputStream = inputStream; |
| markEngaged = true; |
| |
| // Get the magic Object that the stream will use to |
| // reconstruct it's state when reset is called |
| streamMemento = inputStream.createStreamMemento(); |
| |
| if (fragmentStack != null) { |
| fragmentStack.clear(); |
| } |
| } |
| |
| // Collects fragments received since the mark was engaged. |
| public void fragmentationOccured(ByteBufferWithInfo newFragment) |
| { |
| if (!markEngaged) |
| return; |
| |
| if (fragmentStack == null) |
| fragmentStack = new LinkedList(); |
| |
| fragmentStack.addFirst(new ByteBufferWithInfo(newFragment)); |
| } |
| |
| public void reset() |
| { |
| if (!markEngaged) { |
| // REVISIT - call to reset without call to mark |
| return; |
| } |
| |
| markEngaged = false; |
| |
| // If we actually did peek across fragments, we need |
| // to push those fragments onto the front of the |
| // buffer queue. |
| if (fragmentStack != null && fragmentStack.size() != 0) { |
| ListIterator iter = fragmentStack.listIterator(); |
| |
| synchronized(fragmentQueue) { |
| while (iter.hasNext()) { |
| fragmentQueue.push((ByteBufferWithInfo)iter.next()); |
| } |
| } |
| |
| fragmentStack.clear(); |
| } |
| |
| // Give the stream the magic Object to restore |
| // it's state. |
| inputStream.restoreInternalState(streamMemento); |
| } |
| |
| public MarkAndResetHandler getMarkAndResetHandler() { |
| return this; |
| } |
| } |