| /* |
| * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| |
| package com.sun.corba.se.impl.protocol; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SelectionKey; |
| import java.util.EmptyStackException; |
| import java.util.Iterator; |
| |
| import org.omg.CORBA.Any; |
| import org.omg.CORBA.CompletionStatus; |
| import org.omg.CORBA.ExceptionList; |
| import org.omg.CORBA.INTERNAL; |
| import org.omg.CORBA.Principal; |
| import org.omg.CORBA.SystemException; |
| import org.omg.CORBA.TypeCode; |
| import org.omg.CORBA.UnknownUserException; |
| import org.omg.CORBA.UNKNOWN; |
| import org.omg.CORBA.portable.ResponseHandler; |
| import org.omg.CORBA.portable.UnknownException; |
| import org.omg.CORBA_2_3.portable.InputStream; |
| import org.omg.CORBA_2_3.portable.OutputStream; |
| import org.omg.IOP.ExceptionDetailMessage; |
| import org.omg.IOP.TAG_RMI_CUSTOM_MAX_STREAM_FORMAT; |
| |
| import com.sun.corba.se.pept.broker.Broker; |
| import com.sun.corba.se.pept.encoding.InputObject; |
| import com.sun.corba.se.pept.encoding.OutputObject; |
| import com.sun.corba.se.pept.protocol.MessageMediator; |
| import com.sun.corba.se.pept.protocol.ProtocolHandler; |
| import com.sun.corba.se.pept.transport.ByteBufferPool; |
| import com.sun.corba.se.pept.transport.Connection; |
| import com.sun.corba.se.pept.transport.ContactInfo; |
| import com.sun.corba.se.pept.transport.EventHandler; |
| |
| import com.sun.corba.se.spi.ior.IOR; |
| import com.sun.corba.se.spi.ior.ObjectKey; |
| import com.sun.corba.se.spi.ior.ObjectKeyTemplate; |
| import com.sun.corba.se.spi.ior.iiop.GIOPVersion; |
| import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate; |
| import com.sun.corba.se.spi.ior.iiop.IIOPProfile; |
| import com.sun.corba.se.spi.ior.iiop.MaxStreamFormatVersionComponent; |
| import com.sun.corba.se.spi.oa.OAInvocationInfo; |
| import com.sun.corba.se.spi.oa.ObjectAdapter; |
| import com.sun.corba.se.spi.orb.ORB; |
| import com.sun.corba.se.spi.orb.ORBVersionFactory; |
| import com.sun.corba.se.spi.protocol.CorbaMessageMediator; |
| import com.sun.corba.se.spi.protocol.CorbaProtocolHandler; |
| import com.sun.corba.se.spi.protocol.CorbaServerRequestDispatcher; |
| import com.sun.corba.se.spi.protocol.ForwardException; |
| import com.sun.corba.se.spi.transport.CorbaConnection; |
| import com.sun.corba.se.spi.transport.CorbaContactInfo; |
| import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; |
| import com.sun.corba.se.spi.logging.CORBALogDomains; |
| |
| import com.sun.corba.se.spi.servicecontext.ORBVersionServiceContext; |
| import com.sun.corba.se.spi.servicecontext.ServiceContexts; |
| import com.sun.corba.se.spi.servicecontext.UEInfoServiceContext; |
| import com.sun.corba.se.spi.servicecontext.MaxStreamFormatVersionServiceContext; |
| import com.sun.corba.se.spi.servicecontext.SendingContextServiceContext; |
| import com.sun.corba.se.spi.servicecontext.UnknownServiceContext; |
| |
| import com.sun.corba.se.impl.corba.RequestImpl; |
| import com.sun.corba.se.impl.encoding.BufferManagerFactory; |
| import com.sun.corba.se.impl.encoding.BufferManagerReadStream; |
| import com.sun.corba.se.impl.encoding.CDRInputObject; |
| import com.sun.corba.se.impl.encoding.CDROutputObject; |
| import com.sun.corba.se.impl.encoding.EncapsOutputStream; |
| import com.sun.corba.se.impl.logging.ORBUtilSystemException; |
| import com.sun.corba.se.impl.logging.InterceptorsSystemException; |
| import com.sun.corba.se.impl.orbutil.ORBConstants; |
| import com.sun.corba.se.impl.orbutil.ORBUtility; |
| import com.sun.corba.se.impl.ior.iiop.JavaSerializationComponent; |
| import com.sun.corba.se.impl.protocol.AddressingDispositionException; |
| import com.sun.corba.se.impl.protocol.RequestCanceledException; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.AddressingDispositionHelper; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.CancelRequestMessage; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_1; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_2; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_0; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_1; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_2; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_0; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_1; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_2; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageHandler; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_0; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_1; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_2; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_0 ; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_1 ; |
| import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_2 ; |
| |
| // REVISIT: make sure no memory leaks in client/server request/reply maps. |
| // REVISIT: normalize requestHeader, replyHeader, messageHeader. |
| |
| /** |
| * @author Harold Carr |
| */ |
| public class CorbaMessageMediatorImpl |
| implements |
| CorbaMessageMediator, |
| CorbaProtocolHandler, |
| MessageHandler |
| { |
| protected ORB orb; |
| protected ORBUtilSystemException wrapper ; |
| protected InterceptorsSystemException interceptorWrapper ; |
| protected CorbaContactInfo contactInfo; |
| protected CorbaConnection connection; |
| protected short addrDisposition; |
| protected CDROutputObject outputObject; |
| protected CDRInputObject inputObject; |
| protected Message messageHeader; |
| protected RequestMessage requestHeader; |
| protected LocateReplyOrReplyMessage replyHeader; |
| protected String replyExceptionDetailMessage; |
| protected IOR replyIOR; |
| protected Integer requestIdInteger; |
| protected Message dispatchHeader; |
| protected ByteBuffer dispatchByteBuffer; |
| protected byte streamFormatVersion; |
| protected boolean streamFormatVersionSet = false; |
| |
| protected org.omg.CORBA.Request diiRequest; |
| |
| protected boolean cancelRequestAlreadySent = false; |
| |
| protected ProtocolHandler protocolHandler; |
| protected boolean _executeReturnServantInResponseConstructor = false; |
| protected boolean _executeRemoveThreadInfoInResponseConstructor = false; |
| protected boolean _executePIInResponseConstructor = false; |
| |
| // |
| // Client-side constructor. |
| // |
| |
| public CorbaMessageMediatorImpl(ORB orb, |
| ContactInfo contactInfo, |
| Connection connection, |
| GIOPVersion giopVersion, |
| IOR ior, |
| int requestId, |
| short addrDisposition, |
| String operationName, |
| boolean isOneWay) |
| { |
| this( orb, connection ) ; |
| |
| this.contactInfo = (CorbaContactInfo) contactInfo; |
| this.addrDisposition = addrDisposition; |
| |
| streamFormatVersion = |
| getStreamFormatVersionForThisRequest( |
| ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(), |
| giopVersion); |
| streamFormatVersionSet = true; |
| |
| requestHeader = (RequestMessage) MessageBase.createRequest( |
| this.orb, |
| giopVersion, |
| ORBUtility.getEncodingVersion(orb, ior), |
| requestId, |
| !isOneWay, |
| ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(), |
| this.addrDisposition, |
| operationName, |
| new ServiceContexts(orb), |
| null); |
| } |
| |
| // |
| // Acceptor constructor. |
| // |
| |
| public CorbaMessageMediatorImpl(ORB orb, |
| Connection connection) |
| { |
| this.orb = orb; |
| this.connection = (CorbaConnection)connection; |
| this.wrapper = ORBUtilSystemException.get( orb, |
| CORBALogDomains.RPC_PROTOCOL ) ; |
| this.interceptorWrapper = InterceptorsSystemException.get( orb, |
| CORBALogDomains.RPC_PROTOCOL ) ; |
| } |
| |
| // |
| // Dispatcher constructor. |
| // |
| |
| // Note: in some cases (e.g., a reply message) this message |
| // mediator will only be used for dispatch. Then the original |
| // request side mediator will take over. |
| public CorbaMessageMediatorImpl(ORB orb, |
| CorbaConnection connection, |
| Message dispatchHeader, |
| ByteBuffer byteBuffer) |
| { |
| this( orb, connection ) ; |
| this.dispatchHeader = dispatchHeader; |
| this.dispatchByteBuffer = byteBuffer; |
| } |
| |
| //////////////////////////////////////////////////// |
| // |
| // MessageMediator |
| // |
| |
| public Broker getBroker() |
| { |
| return orb; |
| } |
| |
| public ContactInfo getContactInfo() |
| { |
| return contactInfo; |
| } |
| |
| public Connection getConnection() |
| { |
| return connection; |
| } |
| |
| public void initializeMessage() |
| { |
| getRequestHeader().write(outputObject); |
| } |
| |
| public void finishSendingRequest() |
| { |
| // REVISIT: probably move logic in outputObject to here. |
| outputObject.finishSendingMessage(); |
| } |
| |
| public InputObject waitForResponse() |
| { |
| if (getRequestHeader().isResponseExpected()) { |
| return connection.waitForResponse(this); |
| } |
| return null; |
| } |
| |
| public void setOutputObject(OutputObject outputObject) |
| { |
| this.outputObject = (CDROutputObject) outputObject; |
| } |
| |
| public OutputObject getOutputObject() |
| { |
| return outputObject; |
| } |
| |
| public void setInputObject(InputObject inputObject) |
| { |
| this.inputObject = (CDRInputObject) inputObject; |
| } |
| |
| public InputObject getInputObject() |
| { |
| return inputObject; |
| } |
| |
| //////////////////////////////////////////////////// |
| // |
| // CorbaMessageMediator |
| // |
| |
| public void setReplyHeader(LocateReplyOrReplyMessage header) |
| { |
| this.replyHeader = header; |
| this.replyIOR = header.getIOR(); // REVISIT - need separate field? |
| } |
| |
| public LocateReplyMessage getLocateReplyHeader() |
| { |
| return (LocateReplyMessage) replyHeader; |
| } |
| |
| public ReplyMessage getReplyHeader() |
| { |
| return (ReplyMessage) replyHeader; |
| } |
| |
| public void setReplyExceptionDetailMessage(String message) |
| { |
| replyExceptionDetailMessage = message; |
| } |
| |
| public RequestMessage getRequestHeader() |
| { |
| return requestHeader; |
| } |
| |
| public GIOPVersion getGIOPVersion() |
| { |
| if (messageHeader != null) { |
| return messageHeader.getGIOPVersion(); |
| } |
| return getRequestHeader().getGIOPVersion(); |
| } |
| |
| public byte getEncodingVersion() { |
| if (messageHeader != null) { |
| return messageHeader.getEncodingVersion(); |
| } |
| return getRequestHeader().getEncodingVersion(); |
| } |
| |
| public int getRequestId() |
| { |
| return getRequestHeader().getRequestId(); |
| } |
| |
| public Integer getRequestIdInteger() |
| { |
| if (requestIdInteger == null) { |
| requestIdInteger = new Integer(getRequestHeader().getRequestId()); |
| } |
| return requestIdInteger; |
| } |
| |
| public boolean isOneWay() |
| { |
| return ! getRequestHeader().isResponseExpected(); |
| } |
| |
| public short getAddrDisposition() |
| { |
| return addrDisposition; |
| } |
| |
| public String getOperationName() |
| { |
| return getRequestHeader().getOperation(); |
| } |
| |
| public ServiceContexts getRequestServiceContexts() |
| { |
| return getRequestHeader().getServiceContexts(); |
| } |
| |
| public ServiceContexts getReplyServiceContexts() |
| { |
| return getReplyHeader().getServiceContexts(); |
| } |
| |
| public void sendCancelRequestIfFinalFragmentNotSent() |
| { |
| if ((!sentFullMessage()) && sentFragment() && |
| (!cancelRequestAlreadySent)) |
| { |
| try { |
| if (orb.subcontractDebugFlag) { |
| dprint(".sendCancelRequestIfFinalFragmentNotSent->: " |
| + opAndId(this)); |
| } |
| connection.sendCancelRequestWithLock(getGIOPVersion(), |
| getRequestId()); |
| // Case: first a location forward, then a marshaling |
| // exception (e.g., non-serializable object). Only |
| // send cancel once. |
| cancelRequestAlreadySent = true; |
| } catch (IOException e) { |
| if (orb.subcontractDebugFlag) { |
| dprint(".sendCancelRequestIfFinalFragmentNotSent: !ERROR : " + opAndId(this), |
| e); |
| } |
| |
| // REVISIT: we could attempt to send a final incomplete |
| // fragment in this case. |
| throw interceptorWrapper.ioexceptionDuringCancelRequest( |
| CompletionStatus.COMPLETED_MAYBE, e ); |
| } finally { |
| if (orb.subcontractDebugFlag) { |
| dprint(".sendCancelRequestIfFinalFragmentNotSent<-: " |
| + opAndId(this)); |
| } |
| } |
| } |
| } |
| |
| public boolean sentFullMessage() |
| { |
| return outputObject.getBufferManager().sentFullMessage(); |
| } |
| |
| public boolean sentFragment() |
| { |
| return outputObject.getBufferManager().sentFragment(); |
| } |
| |
| public void setDIIInfo(org.omg.CORBA.Request diiRequest) |
| { |
| this.diiRequest = diiRequest; |
| } |
| |
| public boolean isDIIRequest() |
| { |
| return diiRequest != null; |
| } |
| |
| public Exception unmarshalDIIUserException(String repoId, InputStream is) |
| { |
| if (! isDIIRequest()) { |
| return null; |
| } |
| |
| ExceptionList _exceptions = diiRequest.exceptions(); |
| |
| try { |
| // Find the typecode for the exception |
| for (int i=0; i<_exceptions.count() ; i++) { |
| TypeCode tc = _exceptions.item(i); |
| if ( tc.id().equals(repoId) ) { |
| // Since we dont have the actual user exception |
| // class, the spec says we have to create an |
| // UnknownUserException and put it in the |
| // environment. |
| Any eany = orb.create_any(); |
| eany.read_value(is, (TypeCode)tc); |
| |
| return new UnknownUserException(eany); |
| } |
| } |
| } catch (Exception b) { |
| throw wrapper.unexpectedDiiException(b); |
| } |
| |
| // must be a truly unknown exception |
| return wrapper.unknownCorbaExc( CompletionStatus.COMPLETED_MAYBE); |
| } |
| |
| public void setDIIException(Exception exception) |
| { |
| diiRequest.env().exception(exception); |
| } |
| |
| public void handleDIIReply(InputStream inputStream) |
| { |
| if (! isDIIRequest()) { |
| return; |
| } |
| ((RequestImpl)diiRequest).unmarshalReply(inputStream); |
| } |
| |
| public Message getDispatchHeader() |
| { |
| return dispatchHeader; |
| } |
| |
| public void setDispatchHeader(Message msg) |
| { |
| dispatchHeader = msg; |
| } |
| |
| public ByteBuffer getDispatchBuffer() |
| { |
| return dispatchByteBuffer; |
| } |
| |
| public void setDispatchBuffer(ByteBuffer byteBuffer) |
| { |
| dispatchByteBuffer = byteBuffer; |
| } |
| |
| public int getThreadPoolToUse() { |
| int poolToUse = 0; |
| Message msg = getDispatchHeader(); |
| // A null msg should never happen. But, we'll be |
| // defensive just in case. |
| if (msg != null) { |
| poolToUse = msg.getThreadPoolToUse(); |
| } |
| return poolToUse; |
| } |
| |
| public byte getStreamFormatVersion() |
| { |
| // REVISIT: ContactInfo/Acceptor output object factories |
| // just use this. Maybe need to distinguish: |
| // createOutputObjectForRequest |
| // createOutputObjectForReply |
| // then do getStreamFormatVersionForRequest/ForReply here. |
| if (streamFormatVersionSet) { |
| return streamFormatVersion; |
| } |
| return getStreamFormatVersionForReply(); |
| } |
| |
| /** |
| * If the RMI-IIOP maximum stream format version service context |
| * is present, it indicates the maximum stream format version we |
| * could use for the reply. If it isn't present, the default is |
| * 2 for GIOP 1.3 or greater, 1 for lower. |
| * |
| * This is only sent on requests. Clients can find out the |
| * server's maximum by looking for a tagged component in the IOR. |
| */ |
| public byte getStreamFormatVersionForReply() { |
| |
| // NOTE: The request service contexts may indicate the max. |
| ServiceContexts svc = getRequestServiceContexts(); |
| |
| MaxStreamFormatVersionServiceContext msfvsc |
| = (MaxStreamFormatVersionServiceContext)svc.get( |
| MaxStreamFormatVersionServiceContext.SERVICE_CONTEXT_ID); |
| |
| if (msfvsc != null) { |
| byte localMaxVersion = ORBUtility.getMaxStreamFormatVersion(); |
| byte remoteMaxVersion = msfvsc.getMaximumStreamFormatVersion(); |
| |
| return (byte)Math.min(localMaxVersion, remoteMaxVersion); |
| } else { |
| // Defaults to 1 for GIOP 1.2 or less, 2 for |
| // GIOP 1.3 or higher. |
| if (getGIOPVersion().lessThan(GIOPVersion.V1_3)) |
| return ORBConstants.STREAM_FORMAT_VERSION_1; |
| else |
| return ORBConstants.STREAM_FORMAT_VERSION_2; |
| } |
| } |
| |
| public boolean isSystemExceptionReply() |
| { |
| return replyHeader.getReplyStatus() == ReplyMessage.SYSTEM_EXCEPTION; |
| } |
| |
| public boolean isUserExceptionReply() |
| { |
| return replyHeader.getReplyStatus() == ReplyMessage.USER_EXCEPTION; |
| } |
| |
| public boolean isLocationForwardReply() |
| { |
| return ( (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD) || |
| (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD_PERM) ); |
| //return replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD; |
| } |
| |
| public boolean isDifferentAddrDispositionRequestedReply() |
| { |
| return replyHeader.getReplyStatus() == ReplyMessage.NEEDS_ADDRESSING_MODE; |
| } |
| |
| public short getAddrDispositionReply() |
| { |
| return replyHeader.getAddrDisposition(); |
| } |
| |
| public IOR getForwardedIOR() |
| { |
| return replyHeader.getIOR(); |
| } |
| |
| public SystemException getSystemExceptionReply() |
| { |
| return replyHeader.getSystemException(replyExceptionDetailMessage); |
| } |
| |
| //////////////////////////////////////////////////// |
| // |
| // Used by server side. |
| // |
| |
| public ObjectKey getObjectKey() |
| { |
| return getRequestHeader().getObjectKey(); |
| } |
| |
| public void setProtocolHandler(CorbaProtocolHandler protocolHandler) |
| { |
| throw wrapper.methodShouldNotBeCalled() ; |
| } |
| |
| public CorbaProtocolHandler getProtocolHandler() |
| { |
| // REVISIT: should look up in orb registry. |
| return this; |
| } |
| |
| //////////////////////////////////////////////////// |
| // |
| // ResponseHandler |
| // |
| |
| public org.omg.CORBA.portable.OutputStream createReply() |
| { |
| // Note: relies on side-effect of setting mediator output field. |
| // REVISIT - cast - need interface |
| getProtocolHandler().createResponse(this, (ServiceContexts) null); |
| return (OutputStream) getOutputObject(); |
| } |
| |
| public org.omg.CORBA.portable.OutputStream createExceptionReply() |
| { |
| // Note: relies on side-effect of setting mediator output field. |
| // REVISIT - cast - need interface |
| getProtocolHandler().createUserExceptionResponse(this, (ServiceContexts) null); |
| return (OutputStream) getOutputObject(); |
| } |
| |
| public boolean executeReturnServantInResponseConstructor() |
| { |
| return _executeReturnServantInResponseConstructor; |
| |
| } |
| |
| public void setExecuteReturnServantInResponseConstructor(boolean b) |
| { |
| _executeReturnServantInResponseConstructor = b; |
| } |
| |
| public boolean executeRemoveThreadInfoInResponseConstructor() |
| { |
| return _executeRemoveThreadInfoInResponseConstructor; |
| } |
| |
| public void setExecuteRemoveThreadInfoInResponseConstructor(boolean b) |
| { |
| _executeRemoveThreadInfoInResponseConstructor = b; |
| } |
| |
| public boolean executePIInResponseConstructor() |
| { |
| return _executePIInResponseConstructor; |
| } |
| |
| public void setExecutePIInResponseConstructor( boolean b ) |
| { |
| _executePIInResponseConstructor = b; |
| } |
| |
| private byte getStreamFormatVersionForThisRequest(IOR ior, |
| GIOPVersion giopVersion) |
| { |
| |
| byte localMaxVersion |
| = ORBUtility.getMaxStreamFormatVersion(); |
| |
| IOR effectiveTargetIOR = |
| ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(); |
| IIOPProfileTemplate temp = |
| (IIOPProfileTemplate)effectiveTargetIOR.getProfile().getTaggedProfileTemplate(); |
| Iterator iter = temp.iteratorById(TAG_RMI_CUSTOM_MAX_STREAM_FORMAT.value); |
| if (!iter.hasNext()) { |
| // Didn't have the max stream format version tagged |
| // component. |
| if (giopVersion.lessThan(GIOPVersion.V1_3)) |
| return ORBConstants.STREAM_FORMAT_VERSION_1; |
| else |
| return ORBConstants.STREAM_FORMAT_VERSION_2; |
| } |
| |
| byte remoteMaxVersion |
| = ((MaxStreamFormatVersionComponent)iter.next()).getMaxStreamFormatVersion(); |
| |
| return (byte)Math.min(localMaxVersion, remoteMaxVersion); |
| } |
| |
| //////////////////////////////////////////////////////////////////////// |
| //////////////////////////////////////////////////////////////////////// |
| //////////////////////////////////////////////////////////////////////// |
| |
| // REVISIT - This could be a separate implementation object looked |
| // up in a registry. However it needs some state in the message |
| // mediator so combine for now. |
| |
| |
| protected boolean isThreadDone = false; |
| |
| //////////////////////////////////////////////////// |
| // |
| // pept.protocol.ProtocolHandler |
| // |
| |
| public boolean handleRequest(MessageMediator messageMediator) |
| { |
| try { |
| dispatchHeader.callback(this); |
| } catch (IOException e) { |
| // REVISIT - this should be handled internally. |
| ; |
| } |
| return isThreadDone; |
| } |
| |
| //////////////////////////////////////////////////// |
| // |
| // iiop.messages.MessageHandler |
| // |
| |
| private void setWorkThenPoolOrResumeSelect(Message header) |
| { |
| if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) { |
| resumeSelect(header); |
| } else { |
| // Leader/Follower when using reader thread. |
| // When this thread is done working it will go back in pool. |
| |
| isThreadDone = true; |
| |
| // First unregister current registration. |
| orb.getTransportManager().getSelector(0) |
| .unregisterForEvent(getConnection().getEventHandler()); |
| // Have another thread become the reader. |
| orb.getTransportManager().getSelector(0) |
| .registerForEvent(getConnection().getEventHandler()); |
| } |
| } |
| |
| private void setWorkThenReadOrResumeSelect(Message header) |
| { |
| if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) { |
| resumeSelect(header); |
| } else { |
| // When using reader thread then wen this thread is |
| // done working it will continue reading. |
| isThreadDone = false; |
| } |
| } |
| |
| private void resumeSelect(Message header) |
| { |
| // NOTE: VERY IMPORTANT: |
| // Only participate in select after getting to the point |
| // that proper serialization of fragments is ensured. |
| |
| if (transportDebug()) { |
| dprint(".resumeSelect:->"); |
| // REVISIT: not-OO: |
| String requestId = "?"; |
| if (header instanceof RequestMessage) { |
| requestId = |
| new Integer(((RequestMessage)header) |
| .getRequestId()).toString(); |
| } else if (header instanceof ReplyMessage) { |
| requestId = |
| new Integer(((ReplyMessage)header) |
| .getRequestId()).toString(); |
| } else if (header instanceof FragmentMessage_1_2) { |
| requestId = |
| new Integer(((FragmentMessage_1_2)header) |
| .getRequestId()).toString(); |
| } |
| dprint(".resumeSelect: id/" |
| + requestId |
| + " " + getConnection() |
| ); |
| |
| } |
| |
| // IMPORTANT: To avoid bug (4953599), we force the Thread that does the NIO select |
| // to also do the enable/disable of Ops using SelectionKey.interestOps(Ops of Interest). |
| // Otherwise, the SelectionKey.interestOps(Ops of Interest) may block indefinitely in |
| // this thread. |
| EventHandler eventHandler = getConnection().getEventHandler(); |
| orb.getTransportManager().getSelector(0).registerInterestOps(eventHandler); |
| |
| if (transportDebug()) { |
| dprint(".resumeSelect:<-"); |
| } |
| } |
| |
| private void setInputObject() |
| { |
| // REVISIT: refactor createInputObject (and createMessageMediator) |
| // into base PlugInFactory. Get via connection (either ContactInfo |
| // or Acceptor). |
| if (getConnection().getContactInfo() != null) { |
| inputObject = (CDRInputObject) |
| getConnection().getContactInfo() |
| .createInputObject(orb, this); |
| } else if (getConnection().getAcceptor() != null) { |
| inputObject = (CDRInputObject) |
| getConnection().getAcceptor() |
| .createInputObject(orb, this); |
| } else { |
| throw new RuntimeException("CorbaMessageMediatorImpl.setInputObject"); |
| } |
| inputObject.setMessageMediator(this); |
| setInputObject(inputObject); |
| } |
| |
| private void signalResponseReceived() |
| { |
| // This will end up using the MessageMediator associated with |
| // the original request instead of the current mediator (which |
| // need to be constructed to hold the dispatchBuffer and connection). |
| connection.getResponseWaitingRoom() |
| .responseReceived((InputObject)inputObject); |
| } |
| |
| // This handles message types for which we don't create classes. |
| public void handleInput(Message header) throws IOException |
| { |
| try { |
| messageHeader = header; |
| |
| if (transportDebug()) |
| dprint(".handleInput->: " |
| + MessageBase.typeToString(header.getType())); |
| |
| setWorkThenReadOrResumeSelect(header); |
| |
| switch(header.getType()) |
| { |
| case Message.GIOPCloseConnection: |
| if (transportDebug()) { |
| dprint(".handleInput: CloseConnection: purging"); |
| } |
| connection.purgeCalls(wrapper.connectionRebind(), true, false); |
| break; |
| case Message.GIOPMessageError: |
| if (transportDebug()) { |
| dprint(".handleInput: MessageError: purging"); |
| } |
| connection.purgeCalls(wrapper.recvMsgError(), true, false); |
| break; |
| default: |
| if (transportDebug()) { |
| dprint(".handleInput: ERROR: " |
| + MessageBase.typeToString(header.getType())); |
| } |
| throw wrapper.badGiopRequestType() ; |
| } |
| releaseByteBufferToPool(); |
| } finally { |
| if (transportDebug()) { |
| dprint(".handleInput<-: " |
| + MessageBase.typeToString(header.getType())); |
| } |
| } |
| } |
| |
| public void handleInput(RequestMessage_1_0 header) throws IOException |
| { |
| try { |
| if (transportDebug()) dprint(".REQUEST 1.0->: " + header); |
| try { |
| messageHeader = requestHeader = (RequestMessage) header; |
| setInputObject(); |
| } finally { |
| setWorkThenPoolOrResumeSelect(header); |
| } |
| getProtocolHandler().handleRequest(header, this); |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".REQUEST 1.0: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".REQUEST 1.0<-: " + header); |
| } |
| } |
| |
| public void handleInput(RequestMessage_1_1 header) throws IOException |
| { |
| try { |
| if (transportDebug()) dprint(".REQUEST 1.1->: " + header); |
| try { |
| messageHeader = requestHeader = (RequestMessage) header; |
| setInputObject(); |
| connection.serverRequest_1_1_Put(this); |
| } finally { |
| setWorkThenPoolOrResumeSelect(header); |
| } |
| getProtocolHandler().handleRequest(header, this); |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".REQUEST 1.1: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".REQUEST 1.1<-: " + header); |
| } |
| } |
| |
| // REVISIT: this is identical to 1_0 except for fragment part. |
| public void handleInput(RequestMessage_1_2 header) throws IOException |
| { |
| try { |
| try { |
| |
| messageHeader = requestHeader = (RequestMessage) header; |
| |
| header.unmarshalRequestID(dispatchByteBuffer); |
| setInputObject(); |
| |
| if (transportDebug()) dprint(".REQUEST 1.2->: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| |
| // NOTE: in the old code this used to be done conditionally: |
| // if (header.moreFragmentsToFollow()). |
| // Now we always put it in. We take it out when |
| // the response is done. |
| // This must happen now so if a header is fragmented the stream |
| // may be found. |
| connection.serverRequestMapPut(header.getRequestId(), this); |
| } finally { |
| // Leader/Follower. |
| // Note: This *MUST* come after putting stream in above map |
| // since the header may be fragmented and you do not want to |
| // start reading again until the map above is set. |
| setWorkThenPoolOrResumeSelect(header); |
| } |
| //inputObject.unmarshalHeader(); // done in subcontract. |
| getProtocolHandler().handleRequest(header, this); |
| } catch (Throwable t) { |
| if (transportDebug()) dprint(".REQUEST 1.2: id/" |
| + header.getRequestId() |
| + ": !!ERROR!!: " |
| + header, |
| t); |
| // Mask the exception from thread.; |
| } finally { |
| connection.serverRequestMapRemove(header.getRequestId()); |
| |
| if (transportDebug()) dprint(".REQUEST 1.2<-: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| } |
| } |
| |
| public void handleInput(ReplyMessage_1_0 header) throws IOException |
| { |
| try { |
| try { |
| if (transportDebug()) dprint(".REPLY 1.0->: " + header); |
| messageHeader = replyHeader = (ReplyMessage) header; |
| setInputObject(); |
| |
| // REVISIT: this should be done by waiting thread. |
| inputObject.unmarshalHeader(); |
| |
| signalResponseReceived(); |
| } finally{ |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug())dprint(".REPLY 1.0: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".REPLY 1.0<-: " + header); |
| } |
| } |
| |
| public void handleInput(ReplyMessage_1_1 header) throws IOException |
| { |
| try { |
| if (transportDebug()) dprint(".REPLY 1.1->: " + header); |
| messageHeader = replyHeader = (ReplyMessage) header; |
| setInputObject(); |
| |
| if (header.moreFragmentsToFollow()) { |
| |
| // More fragments are coming to complete this reply, so keep |
| // a reference to the InputStream so we can add the fragments |
| connection.clientReply_1_1_Put(this); |
| |
| // In 1.1, we can't assume that we have the request ID in the |
| // first fragment. Thus, another thread is used |
| // to be the reader while this thread unmarshals |
| // the extended header and wakes up the client thread. |
| setWorkThenPoolOrResumeSelect(header); |
| |
| // REVISIT - error handling. |
| // This must be done now. |
| inputObject.unmarshalHeader(); |
| |
| signalResponseReceived(); |
| |
| } else { |
| |
| // Not fragmented, therefore we know the request |
| // ID is here. Thus, we can unmarshal the extended header |
| // and wake up the client thread without using a third |
| // thread as above. |
| |
| // REVISIT - error handling during unmarshal. |
| // This must be done now to get the request id. |
| inputObject.unmarshalHeader(); |
| |
| signalResponseReceived(); |
| |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) dprint(".REPLY 1.1: !!ERROR!!: " + header); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".REPLY 1.1<-: " + header); |
| } |
| } |
| |
| public void handleInput(ReplyMessage_1_2 header) throws IOException |
| { |
| try { |
| try { |
| messageHeader = replyHeader = (ReplyMessage) header; |
| |
| // We know that the request ID is in the first fragment |
| header.unmarshalRequestID(dispatchByteBuffer); |
| |
| if (transportDebug()) { |
| dprint(".REPLY 1.2->: id/" |
| + + header.getRequestId() |
| + ": more?: " + header.moreFragmentsToFollow() |
| + ": " + header); |
| } |
| |
| setInputObject(); |
| |
| signalResponseReceived(); |
| } finally { |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) dprint(".REPLY 1.2: id/" |
| + header.getRequestId() |
| + ": !!ERROR!!: " |
| + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".REPLY 1.2<-: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| } |
| } |
| |
| public void handleInput(LocateRequestMessage_1_0 header) throws IOException |
| { |
| try { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.0->: " + header); |
| try { |
| messageHeader = header; |
| setInputObject(); |
| } finally { |
| setWorkThenPoolOrResumeSelect(header); |
| } |
| getProtocolHandler().handleRequest(header, this); |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.0: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.0<-: " + header); |
| } |
| |
| } |
| |
| public void handleInput(LocateRequestMessage_1_1 header) throws IOException |
| { |
| try { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.1->: " + header); |
| try { |
| messageHeader = header; |
| setInputObject(); |
| } finally { |
| setWorkThenPoolOrResumeSelect(header); |
| } |
| getProtocolHandler().handleRequest(header, this); |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.1: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.1<-:" + header); |
| } |
| } |
| |
| public void handleInput(LocateRequestMessage_1_2 header) throws IOException |
| { |
| try { |
| try { |
| messageHeader = header; |
| |
| header.unmarshalRequestID(dispatchByteBuffer); |
| setInputObject(); |
| |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.2->: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| |
| if (header.moreFragmentsToFollow()) { |
| connection.serverRequestMapPut(header.getRequestId(),this); |
| } |
| } finally { |
| setWorkThenPoolOrResumeSelect(header); |
| } |
| getProtocolHandler().handleRequest(header, this); |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.2: id/" |
| + header.getRequestId() |
| + ": !!ERROR!!: " |
| + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) |
| dprint(".LOCATE_REQUEST 1.2<-: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| } |
| } |
| |
| public void handleInput(LocateReplyMessage_1_0 header) throws IOException |
| { |
| try { |
| if (transportDebug()) |
| dprint(".LOCATE_REPLY 1.0->:" + header); |
| try { |
| messageHeader = header; |
| setInputObject(); |
| inputObject.unmarshalHeader(); // REVISIT Put in subcontract. |
| signalResponseReceived(); |
| } finally { |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".LOCATE_REPLY 1.0: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) |
| dprint(".LOCATE_REPLY 1.0<-: " + header); |
| } |
| } |
| |
| public void handleInput(LocateReplyMessage_1_1 header) throws IOException |
| { |
| try { |
| if (transportDebug()) dprint(".LOCATE_REPLY 1.1->: " + header); |
| try { |
| messageHeader = header; |
| setInputObject(); |
| // Fragmented LocateReplies are not allowed in 1.1. |
| inputObject.unmarshalHeader(); |
| signalResponseReceived(); |
| } finally { |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".LOCATE_REPLY 1.1: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".LOCATE_REPLY 1.1<-: " + header); |
| } |
| } |
| |
| public void handleInput(LocateReplyMessage_1_2 header) throws IOException |
| { |
| try { |
| try { |
| messageHeader = header; |
| |
| // No need to put in client reply map - already there. |
| header.unmarshalRequestID(dispatchByteBuffer); |
| |
| setInputObject(); |
| |
| if (transportDebug()) dprint(".LOCATE_REPLY 1.2->: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| |
| signalResponseReceived(); |
| } finally { |
| setWorkThenPoolOrResumeSelect(header); // REVISIT |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".LOCATE_REPLY 1.2: id/" |
| + header.getRequestId() |
| + ": !!ERROR!!: " |
| + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".LOCATE_REPLY 1.2<-: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| } |
| } |
| |
| public void handleInput(FragmentMessage_1_1 header) throws IOException |
| { |
| try { |
| if (transportDebug()) { |
| dprint(".FRAGMENT 1.1->: " |
| + "more?: " + header.moreFragmentsToFollow() |
| + ": " + header); |
| } |
| try { |
| messageHeader = header; |
| MessageMediator mediator = null; |
| CDRInputObject inputObject = null; |
| |
| if (connection.isServer()) { |
| mediator = connection.serverRequest_1_1_Get(); |
| } else { |
| mediator = connection.clientReply_1_1_Get(); |
| } |
| if (mediator != null) { |
| inputObject = (CDRInputObject) mediator.getInputObject(); |
| } |
| |
| // If no input stream available, then discard the fragment. |
| // This can happen: |
| // 1. if a fragment message is received prior to receiving |
| // the original request/reply message. Very unlikely. |
| // 2. if a fragment message is received after the |
| // reply has been sent (early replies) |
| // Note: In the case of early replies, the fragments received |
| // during the request processing (which are never unmarshaled), |
| // will eventually be discarded by the GC. |
| if (inputObject == null) { |
| if (transportDebug()) |
| dprint(".FRAGMENT 1.1: ++++DISCARDING++++: " + header); |
| // need to release dispatchByteBuffer to pool if |
| // we are discarding |
| releaseByteBufferToPool(); |
| return; |
| } |
| |
| inputObject.getBufferManager() |
| .processFragment(dispatchByteBuffer, header); |
| |
| if (! header.moreFragmentsToFollow()) { |
| if (connection.isServer()) { |
| connection.serverRequest_1_1_Remove(); |
| } else { |
| connection.clientReply_1_1_Remove(); |
| } |
| } |
| } finally { |
| // NOTE: This *must* come after queing the fragment |
| // when using the selector to ensure fragments stay in order. |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".FRAGMENT 1.1: !!ERROR!!: " + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".FRAGMENT 1.1<-: " + header); |
| } |
| } |
| |
| public void handleInput(FragmentMessage_1_2 header) throws IOException |
| { |
| try { |
| try { |
| messageHeader = header; |
| |
| // Note: We know it's a 1.2 fragment, we have the data, but |
| // we need the IIOPInputStream instance to unmarshal the |
| // request ID... but we need the request ID to get the |
| // IIOPInputStream instance. So we peek at the raw bytes. |
| |
| header.unmarshalRequestID(dispatchByteBuffer); |
| |
| if (transportDebug()) { |
| dprint(".FRAGMENT 1.2->: id/" |
| + header.getRequestId() |
| + ": more?: " + header.moreFragmentsToFollow() |
| + ": " + header); |
| } |
| |
| MessageMediator mediator = null; |
| InputObject inputObject = null; |
| |
| if (connection.isServer()) { |
| mediator = |
| connection.serverRequestMapGet(header.getRequestId()); |
| } else { |
| mediator = |
| connection.clientRequestMapGet(header.getRequestId()); |
| } |
| if (mediator != null) { |
| inputObject = mediator.getInputObject(); |
| } |
| // See 1.1 comments. |
| if (inputObject == null) { |
| if (transportDebug()) { |
| dprint(".FRAGMENT 1.2: id/" |
| + header.getRequestId() |
| + ": ++++DISCARDING++++: " |
| + header); |
| } |
| // need to release dispatchByteBuffer to pool if |
| // we are discarding |
| releaseByteBufferToPool(); |
| return; |
| } |
| ((CDRInputObject)inputObject) |
| .getBufferManager().processFragment( |
| dispatchByteBuffer, header); |
| |
| // REVISIT: but if it is a server don't you have to remove the |
| // stream from the map? |
| if (! connection.isServer()) { |
| /* REVISIT |
| * No need to do anything. |
| * Should we mark that last was received? |
| if (! header.moreFragmentsToFollow()) { |
| // Last fragment. |
| } |
| */ |
| } |
| } finally { |
| // NOTE: This *must* come after queing the fragment |
| // when using the selector to ensure fragments stay in order. |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) |
| dprint(".FRAGMENT 1.2: id/" |
| + header.getRequestId() |
| + ": !!ERROR!!: " |
| + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".FRAGMENT 1.2<-: id/" |
| + header.getRequestId() |
| + ": " |
| + header); |
| } |
| } |
| |
| public void handleInput(CancelRequestMessage header) throws IOException |
| { |
| try { |
| try { |
| messageHeader = header; |
| setInputObject(); |
| |
| // REVISIT: Move these two to subcontract. |
| inputObject.unmarshalHeader(); |
| |
| if (transportDebug()) dprint(".CANCEL->: id/" |
| + header.getRequestId() + ": " |
| + header.getGIOPVersion() + ": " |
| + header); |
| |
| processCancelRequest(header.getRequestId()); |
| releaseByteBufferToPool(); |
| } finally { |
| setWorkThenReadOrResumeSelect(header); |
| } |
| } catch (Throwable t) { |
| if (transportDebug()) dprint(".CANCEL: id/" |
| + header.getRequestId() |
| + ": !!ERROR!!: " |
| + header, t); |
| // Mask the exception from thread.; |
| } finally { |
| if (transportDebug()) dprint(".CANCEL<-: id/" |
| + header.getRequestId() + ": " |
| + header.getGIOPVersion() + ": " |
| + header); |
| } |
| } |
| |
| private void throwNotImplemented() |
| { |
| isThreadDone = false; |
| throwNotImplemented(""); |
| } |
| |
| private void throwNotImplemented(String msg) |
| { |
| throw new RuntimeException("CorbaMessageMediatorImpl: not implemented " + msg); |
| } |
| |
| private void dprint(String msg, Throwable t) |
| { |
| dprint(msg); |
| t.printStackTrace(System.out); |
| } |
| |
| private void dprint(String msg) |
| { |
| ORBUtility.dprint("CorbaMessageMediatorImpl", msg); |
| } |
| |
| protected String opAndId(CorbaMessageMediator mediator) |
| { |
| return ORBUtility.operationNameAndRequestId(mediator); |
| } |
| |
| private boolean transportDebug() |
| { |
| return orb.transportDebugFlag; |
| } |
| |
| // REVISIT: move this to subcontract (but both client and server need it). |
| private final void processCancelRequest(int cancelReqId) { |
| |
| // The GIOP version of CancelRequest does not matter, since |
| // CancelRequest_1_0 could be sent to cancel a request which |
| // has a different GIOP version. |
| |
| /* |
| * CancelRequest processing logic : |
| * |
| * - find the request with matching requestId |
| * |
| * - call cancelProcessing() in BufferManagerRead [BMR] |
| * |
| * - the hope is that worker thread would call BMR.underflow() |
| * to wait for more fragments to come in. When BMR.underflow() is |
| * called, if a CancelRequest had already arrived, |
| * the worker thread would throw ThreadDeath, |
| * else the thread would wait to be notified of the |
| * arrival of a new fragment or CancelRequest. Upon notification, |
| * the woken up thread would check to see if a CancelRequest had |
| * arrived and if so throw a ThreadDeath or it will continue to |
| * process the received fragment. |
| * |
| * - if all the fragments had been received prior to CancelRequest |
| * then the worker thread would never block in BMR.underflow(). |
| * So, setting the abort flag in BMR has no effect. The request |
| * processing will complete normally. |
| * |
| * - in the case where the server has received enough fragments to |
| * start processing the request and the server sends out |
| * an early reply. In such a case if the CancelRequest arrives |
| * after the reply has been sent, it has no effect. |
| */ |
| |
| if (!connection.isServer()) { |
| return; // we do not support bi-directional giop yet, ignore. |
| } |
| |
| // Try to get hold of the InputStream buffer. |
| // In the case of 1.0 requests there is no way to get hold of |
| // InputStream. Try out the 1.1 and 1.2 cases. |
| |
| // was the request 1.2 ? |
| MessageMediator mediator = connection.serverRequestMapGet(cancelReqId); |
| int requestId ; |
| if (mediator == null) { |
| // was the request 1.1 ? |
| mediator = connection.serverRequest_1_1_Get(); |
| if (mediator == null) { |
| // XXX log this! |
| // either the request was 1.0 |
| // or an early reply has already been sent |
| // or request processing is over |
| // or its a spurious CancelRequest |
| return; // do nothing. |
| } |
| |
| requestId = ((CorbaMessageMediator) mediator).getRequestId(); |
| |
| if (requestId != cancelReqId) { |
| // A spurious 1.1 CancelRequest has been received. |
| // XXX log this! |
| return; // do nothing |
| } |
| |
| if (requestId == 0) { // special case |
| // XXX log this |
| // this means that |
| // 1. the 1.1 requests' requestId has not been received |
| // i.e., a CancelRequest was received even before the |
| // 1.1 request was received. The spec disallows this. |
| // 2. or the 1.1 request has a requestId 0. |
| // |
| // It is a little tricky to distinguish these two. So, be |
| // conservative and do not cancel the request. Downside is that |
| // 1.1 requests with requestId of 0 will never be cancelled. |
| return; // do nothing |
| } |
| } else { |
| requestId = ((CorbaMessageMediator) mediator).getRequestId(); |
| } |
| |
| Message msg = ((CorbaMessageMediator)mediator).getRequestHeader(); |
| if (msg.getType() != Message.GIOPRequest) { |
| // Any mediator obtained here should only ever be for a GIOP |
| // request. |
| wrapper.badMessageTypeForCancel() ; |
| } |
| |
| // At this point we have a valid message mediator that contains |
| // a valid requestId. |
| |
| // at this point we have chosen a request to be cancelled. But we |
| // do not know if the target object's method has been invoked or not. |
| // Request input stream being available simply means that the request |
| // processing is not over yet. simply set the abort flag in the |
| // BMRS and hope that the worker thread would notice it (this can |
| // happen only if the request stream is being unmarshalled and the |
| // target's method has not been invoked yet). This guarantees |
| // that the requests which have been dispatched to the |
| // target's method will never be cancelled. |
| |
| BufferManagerReadStream bufferManager = (BufferManagerReadStream) |
| ((CDRInputObject)mediator.getInputObject()).getBufferManager(); |
| bufferManager.cancelProcessing(cancelReqId); |
| } |
| |
| //////////////////////////////////////////////////// |
| // |
| // spi.protocol.CorbaProtocolHandler |
| // |
| |
| public void handleRequest(RequestMessage msg, |
| CorbaMessageMediator messageMediator) |
| { |
| try { |
| beginRequest(messageMediator); |
| try { |
| handleRequestRequest(messageMediator); |
| if (messageMediator.isOneWay()) { |
| return; |
| } |
| } catch (Throwable t) { |
| if (messageMediator.isOneWay()) { |
| return; |
| } |
| handleThrowableDuringServerDispatch( |
| messageMediator, t, CompletionStatus.COMPLETED_MAYBE); |
| } |
| sendResponse(messageMediator); |
| } catch (Throwable t) { |
| dispatchError(messageMediator, "RequestMessage", t); |
| } finally { |
| endRequest(messageMediator); |
| } |
| } |
| |
| public void handleRequest(LocateRequestMessage msg, |
| CorbaMessageMediator messageMediator) |
| { |
| try { |
| beginRequest(messageMediator); |
| try { |
| handleLocateRequest(messageMediator); |
| } catch (Throwable t) { |
| handleThrowableDuringServerDispatch( |
| messageMediator, t, CompletionStatus.COMPLETED_MAYBE); |
| } |
| sendResponse(messageMediator); |
| } catch (Throwable t) { |
| dispatchError(messageMediator, "LocateRequestMessage", t); |
| } finally { |
| endRequest(messageMediator); |
| } |
| } |
| |
| private void beginRequest(CorbaMessageMediator messageMediator) |
| { |
| ORB orb = (ORB) messageMediator.getBroker(); |
| if (orb.subcontractDebugFlag) { |
| dprint(".handleRequest->:"); |
| } |
| connection.serverRequestProcessingBegins(); |
| } |
| |
| private void dispatchError(CorbaMessageMediator messageMediator, |
| String msg, Throwable t) |
| { |
| if (orb.subcontractDebugFlag) { |
| dprint(".handleRequest: " + opAndId(messageMediator) |
| + ": !!ERROR!!: " |
| + msg, |
| t); |
| } |
| // REVISIT - this makes hcks sendTwoObjects fail |
| // messageMediator.getConnection().close(); |
| } |
| |
| private void sendResponse(CorbaMessageMediator messageMediator) |
| { |
| if (orb.subcontractDebugFlag) { |
| dprint(".handleRequest: " + opAndId(messageMediator) |
| + ": sending response"); |
| } |
| // REVISIT - type and location |
| CDROutputObject outputObject = (CDROutputObject) |
| messageMediator.getOutputObject(); |
| if (outputObject != null) { |
| // REVISIT - can be null for TRANSIENT below. |
| outputObject.finishSendingMessage(); |
| } |
| } |
| |
| private void endRequest(CorbaMessageMediator messageMediator) |
| { |
| ORB orb = (ORB) messageMediator.getBroker(); |
| if (orb.subcontractDebugFlag) { |
| dprint(".handleRequest<-: " + opAndId(messageMediator)); |
| } |
| |
| // release NIO ByteBuffers to ByteBufferPool |
| |
| try { |
| OutputObject outputObj = messageMediator.getOutputObject(); |
| if (outputObj != null) { |
| outputObj.close(); |
| } |
| InputObject inputObj = messageMediator.getInputObject(); |
| if (inputObj != null) { |
| inputObj.close(); |
| } |
| } catch (IOException ex) { |
| // Given what close() does, this catch shouldn't ever happen. |
| // See CDRInput/OutputObject.close() for more info. |
| // It also won't result in a Corba error if an IOException happens. |
| if (orb.subcontractDebugFlag) { |
| dprint(".endRequest: IOException:" + ex.getMessage(), ex); |
| } |
| } finally { |
| ((CorbaConnection)messageMediator.getConnection()).serverRequestProcessingEnds(); |
| } |
| } |
| |
| protected void handleRequestRequest(CorbaMessageMediator messageMediator) |
| { |
| // Does nothing if already unmarshaled. |
| ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader(); |
| |
| ORB orb = (ORB)messageMediator.getBroker(); |
| synchronized (orb) { |
| orb.checkShutdownState(); |
| } |
| |
| ObjectKey okey = messageMediator.getObjectKey(); |
| if (orb.subcontractDebugFlag) { |
| ObjectKeyTemplate oktemp = okey.getTemplate() ; |
| dprint( ".handleRequest: " + opAndId(messageMediator) |
| + ": dispatching to scid: " + oktemp.getSubcontractId()); |
| } |
| |
| CorbaServerRequestDispatcher sc = okey.getServerRequestDispatcher(orb); |
| |
| if (orb.subcontractDebugFlag) { |
| dprint(".handleRequest: " + opAndId(messageMediator) |
| + ": dispatching to sc: " + sc); |
| } |
| |
| if (sc == null) { |
| throw wrapper.noServerScInDispatch() ; |
| } |
| |
| // NOTE: |
| // This is necessary so mediator can act as ResponseHandler |
| // and pass necessary info to response constructors located |
| // in the subcontract. |
| // REVISIT - same class right now. |
| //messageMediator.setProtocolHandler(this); |
| |
| try { |
| orb.startingDispatch(); |
| sc.dispatch(messageMediator); |
| } finally { |
| orb.finishedDispatch(); |
| } |
| } |
| |
| protected void handleLocateRequest(CorbaMessageMediator messageMediator) |
| { |
| ORB orb = (ORB)messageMediator.getBroker(); |
| LocateRequestMessage msg = (LocateRequestMessage) |
| messageMediator.getDispatchHeader(); |
| IOR ior = null; |
| LocateReplyMessage reply = null; |
| short addrDisp = -1; |
| |
| try { |
| ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader(); |
| CorbaServerRequestDispatcher sc = |
| msg.getObjectKey().getServerRequestDispatcher( orb ) ; |
| if (sc == null) { |
| return; |
| } |
| |
| ior = sc.locate(msg.getObjectKey()); |
| |
| if ( ior == null ) { |
| reply = MessageBase.createLocateReply( |
| orb, msg.getGIOPVersion(), |
| msg.getEncodingVersion(), |
| msg.getRequestId(), |
| LocateReplyMessage.OBJECT_HERE, null); |
| |
| } else { |
| reply = MessageBase.createLocateReply( |
| orb, msg.getGIOPVersion(), |
| msg.getEncodingVersion(), |
| msg.getRequestId(), |
| LocateReplyMessage.OBJECT_FORWARD, ior); |
| } |
| // REVISIT: Should we catch SystemExceptions? |
| |
| } catch (AddressingDispositionException ex) { |
| |
| // create a response containing the expected target |
| // addressing disposition. |
| |
| reply = MessageBase.createLocateReply( |
| orb, msg.getGIOPVersion(), |
| msg.getEncodingVersion(), |
| msg.getRequestId(), |
| LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE, null); |
| |
| addrDisp = ex.expectedAddrDisp(); |
| |
| } catch (RequestCanceledException ex) { |
| |
| return; // no need to send reply |
| |
| } catch ( Exception ex ) { |
| |
| // REVISIT If exception is not OBJECT_NOT_EXIST, it should |
| // have a different reply |
| |
| // This handles OBJECT_NOT_EXIST exceptions thrown in |
| // the subcontract or obj manager. Send back UNKNOWN_OBJECT. |
| |
| reply = MessageBase.createLocateReply( |
| orb, msg.getGIOPVersion(), |
| msg.getEncodingVersion(), |
| msg.getRequestId(), |
| LocateReplyMessage.UNKNOWN_OBJECT, null); |
| } |
| |
| CDROutputObject outputObject = |
| createAppropriateOutputObject(messageMediator, |
| msg, reply); |
| messageMediator.setOutputObject(outputObject); |
| outputObject.setMessageMediator(messageMediator); |
| |
| reply.write(outputObject); |
| // outputObject.setMessage(reply); // REVISIT - not necessary |
| if (ior != null) { |
| ior.write(outputObject); |
| } |
| if (addrDisp != -1) { |
| AddressingDispositionHelper.write(outputObject, addrDisp); |
| } |
| } |
| |
| private CDROutputObject createAppropriateOutputObject( |
| CorbaMessageMediator messageMediator, |
| Message msg, LocateReplyMessage reply) |
| { |
| CDROutputObject outputObject; |
| |
| if (msg.getGIOPVersion().lessThan(GIOPVersion.V1_2)) { |
| // locate msgs 1.0 & 1.1 :=> grow, |
| outputObject = sun.corba.OutputStreamFactory.newCDROutputObject( |
| (ORB) messageMediator.getBroker(), |
| this, |
| GIOPVersion.V1_0, |
| (CorbaConnection) messageMediator.getConnection(), |
| reply, |
| ORBConstants.STREAM_FORMAT_VERSION_1); |
| } else { |
| // 1.2 :=> stream |
| outputObject = sun.corba.OutputStreamFactory.newCDROutputObject( |
| (ORB) messageMediator.getBroker(), |
| messageMediator, |
| reply, |
| ORBConstants.STREAM_FORMAT_VERSION_1); |
| } |
| return outputObject; |
| } |
| |
| public void handleThrowableDuringServerDispatch( |
| CorbaMessageMediator messageMediator, |
| Throwable throwable, |
| CompletionStatus completionStatus) |
| { |
| if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { |
| dprint(".handleThrowableDuringServerDispatch: " |
| + opAndId(messageMediator) + ": " |
| + throwable); |
| } |
| |
| // If we haven't unmarshaled the header, we probably don't |
| // have enough information to even send back a reply. |
| |
| // REVISIT |
| // Cannot do this check. When target addressing disposition does |
| // not match (during header unmarshaling) it throws an exception |
| // to be handled here. |
| /* |
| if (! ((CDRInputObject)messageMediator.getInputObject()) |
| .unmarshaledHeader()) { |
| return; |
| } |
| */ |
| handleThrowableDuringServerDispatch(messageMediator, |
| throwable, |
| completionStatus, |
| 1); |
| } |
| |
| |
| // REVISIT - catch and ignore RequestCanceledException. |
| |
| protected void handleThrowableDuringServerDispatch( |
| CorbaMessageMediator messageMediator, |
| Throwable throwable, |
| CompletionStatus completionStatus, |
| int iteration) |
| { |
| if (iteration > 10) { |
| if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { |
| dprint(".handleThrowableDuringServerDispatch: " |
| + opAndId(messageMediator) |
| + ": cannot handle: " |
| + throwable); |
| } |
| |
| // REVISIT - should we close connection? |
| RuntimeException rte = |
| new RuntimeException("handleThrowableDuringServerDispatch: " + |
| "cannot create response."); |
| rte.initCause(throwable); |
| throw rte; |
| } |
| |
| try { |
| if (throwable instanceof ForwardException) { |
| ForwardException fex = (ForwardException)throwable ; |
| createLocationForward( messageMediator, fex.getIOR(), null ) ; |
| return; |
| } |
| |
| if (throwable instanceof AddressingDispositionException) { |
| handleAddressingDisposition( |
| messageMediator, |
| (AddressingDispositionException)throwable); |
| return; |
| } |
| |
| // Else. |
| |
| SystemException sex = |
| convertThrowableToSystemException(throwable, completionStatus); |
| |
| createSystemExceptionResponse(messageMediator, sex, null); |
| return; |
| |
| } catch (Throwable throwable2) { |
| |
| // User code (e.g., postinvoke, interceptors) may change |
| // the exception, so we end up back here. |
| // Report the changed exception. |
| |
| handleThrowableDuringServerDispatch(messageMediator, |
| throwable2, |
| completionStatus, |
| iteration + 1); |
| return; |
| } |
| } |
| |
| protected SystemException convertThrowableToSystemException( |
| Throwable throwable, |
| CompletionStatus completionStatus) |
| { |
| if (throwable instanceof SystemException) { |
| return (SystemException)throwable; |
| } |
| |
| if (throwable instanceof RequestCanceledException) { |
| // Reporting an exception response causes the |
| // poa current stack, the interceptor stacks, etc. |
| // to be balanced. It also notifies interceptors |
| // that the request was cancelled. |
| |
| return wrapper.requestCanceled( throwable ) ; |
| } |
| |
| // NOTE: We do not trap ThreadDeath above Throwable. |
| // There is no reason to stop the thread. It is |
| // just a worker thread. The ORB never throws |
| // ThreadDeath. Client code may (e.g., in ServantManagers, |
| // interceptors, or servants) but that should not |
| // effect the ORB threads. So it is just handled |
| // generically. |
| |
| // |
| // Last resort. |
| // If user code throws a non-SystemException report it generically. |
| // |
| |
| return wrapper.runtimeexception( CompletionStatus.COMPLETED_MAYBE, throwable ) ; |
| } |
| |
| protected void handleAddressingDisposition( |
| CorbaMessageMediator messageMediator, |
| AddressingDispositionException ex) |
| { |
| |
| short addrDisp = -1; |
| |
| // from iiop.RequestProcessor. |
| |
| // Respond with expected target addressing disposition. |
| |
| switch (messageMediator.getRequestHeader().getType()) { |
| case Message.GIOPRequest : |
| ReplyMessage replyHeader = MessageBase.createReply( |
| (ORB)messageMediator.getBroker(), |
| messageMediator.getGIOPVersion(), |
| messageMediator.getEncodingVersion(), |
| messageMediator.getRequestId(), |
| ReplyMessage.NEEDS_ADDRESSING_MODE, |
| null, null); |
| // REVISIT: via acceptor factory. |
| CDROutputObject outputObject = |
| sun.corba.OutputStreamFactory.newCDROutputObject( |
| (ORB)messageMediator.getBroker(), |
| this, |
| messageMediator.getGIOPVersion(), |
| (CorbaConnection)messageMediator.getConnection(), |
| replyHeader, |
| ORBConstants.STREAM_FORMAT_VERSION_1); |
| messageMediator.setOutputObject(outputObject); |
| outputObject.setMessageMediator(messageMediator); |
| replyHeader.write(outputObject); |
| AddressingDispositionHelper.write(outputObject, |
| ex.expectedAddrDisp()); |
| return; |
| |
| case Message.GIOPLocateRequest : |
| LocateReplyMessage locateReplyHeader = MessageBase.createLocateReply( |
| (ORB)messageMediator.getBroker(), |
| messageMediator.getGIOPVersion(), |
| messageMediator.getEncodingVersion(), |
| messageMediator.getRequestId(), |
| LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE, |
| null); |
| |
| addrDisp = ex.expectedAddrDisp(); |
| |
| // REVISIT: via acceptor factory. |
| outputObject = |
| createAppropriateOutputObject(messageMediator, |
| messageMediator.getRequestHeader(), |
| locateReplyHeader); |
| messageMediator.setOutputObject(outputObject); |
| outputObject.setMessageMediator(messageMediator); |
| locateReplyHeader.write(outputObject); |
| IOR ior = null; |
| if (ior != null) { |
| ior.write(outputObject); |
| } |
| if (addrDisp != -1) { |
| AddressingDispositionHelper.write(outputObject, addrDisp); |
| } |
| return; |
| } |
| } |
| |
| public CorbaMessageMediator createResponse( |
| CorbaMessageMediator messageMediator, |
| ServiceContexts svc) |
| { |
| // REVISIT: ignore service contexts during framework transition. |
| // They are set in SubcontractResponseHandler to the wrong connection. |
| // Then they would be set again here and a duplicate contexts |
| // exception occurs. |
| return createResponseHelper( |
| messageMediator, |
| getServiceContextsForReply(messageMediator, null)); |
| } |
| |
| public CorbaMessageMediator createUserExceptionResponse( |
| CorbaMessageMediator messageMediator, ServiceContexts svc) |
| { |
| // REVISIT - same as above |
| return createResponseHelper( |
| messageMediator, |
| getServiceContextsForReply(messageMediator, null), |
| true); |
| } |
| |
| public CorbaMessageMediator createUnknownExceptionResponse( |
| CorbaMessageMediator messageMediator, UnknownException ex) |
| { |
| // NOTE: This service context container gets augmented in |
| // tail call. |
| ServiceContexts contexts = null; |
| SystemException sys = new UNKNOWN( 0, |
| CompletionStatus.COMPLETED_MAYBE); |
| contexts = new ServiceContexts( (ORB)messageMediator.getBroker() ); |
| UEInfoServiceContext uei = new UEInfoServiceContext(sys); |
| contexts.put( uei ) ; |
| return createSystemExceptionResponse(messageMediator, sys, contexts); |
| } |
| |
| public CorbaMessageMediator createSystemExceptionResponse( |
| CorbaMessageMediator messageMediator, |
| SystemException ex, |
| ServiceContexts svc) |
| { |
| if (messageMediator.getConnection() != null) { |
| // It is possible that fragments of response have already been |
| // sent. Then an error may occur (e.g. marshaling error like |
| // non serializable object). In that case it is too late |
| // to send the exception. We just return the existing fragmented |
| // stream here. This will cause an incomplete last fragment |
| // to be sent. Then the other side will get a marshaling error |
| // when attempting to unmarshal. |
| |
| // REVISIT: Impl - make interface method to do the following. |
| CorbaMessageMediatorImpl mediator = (CorbaMessageMediatorImpl) |
| ((CorbaConnection)messageMediator.getConnection()) |
| .serverRequestMapGet(messageMediator.getRequestId()); |
| |
| OutputObject existingOutputObject = null; |
| if (mediator != null) { |
| existingOutputObject = mediator.getOutputObject(); |
| } |
| |
| // REVISIT: need to think about messageMediator containing correct |
| // pointer to output object. |
| if (existingOutputObject != null && |
| mediator.sentFragment() && |
| ! mediator.sentFullMessage()) |
| { |
| return mediator; |
| } |
| } |
| |
| // Only do this if interceptors have been initialized on this request |
| // and have not completed their lifecycle (otherwise the info stack |
| // may be empty or have a different request's entry on top). |
| if (messageMediator.executePIInResponseConstructor()) { |
| // REVISIT: not necessary in framework now? |
| // Inform Portable Interceptors of the SystemException. This is |
| // required to be done here because the ending interception point |
| // is called in the when creating the response below |
| // but we do not currently write the SystemException into the |
| // response until after the ending point is called. |
| ((ORB)messageMediator.getBroker()).getPIHandler().setServerPIInfo( ex ); |
| } |
| |
| if (((ORB)messageMediator.getBroker()).subcontractDebugFlag && |
| ex != null) |
| { |
| dprint(".createSystemExceptionResponse: " |
| + opAndId(messageMediator), |
| ex); |
| } |
| |
| ServiceContexts serviceContexts = |
| getServiceContextsForReply(messageMediator, svc); |
| |
| // NOTE: We MUST add the service context before creating |
| // the response since service contexts are written to the |
| // stream when the response object is created. |
| |
| addExceptionDetailMessage(messageMediator, ex, serviceContexts); |
| |
| CorbaMessageMediator response = |
| createResponseHelper(messageMediator, serviceContexts, false); |
| |
| // NOTE: From here on, it is too late to add more service contexts. |
| // They have already been serialized to the stream (and maybe fragments |
| // sent). |
| |
| ORBUtility.writeSystemException( |
| ex, (OutputStream)response.getOutputObject()); |
| |
| return response; |
| } |
| |
| private void addExceptionDetailMessage(CorbaMessageMediator mediator, |
| SystemException ex, |
| ServiceContexts serviceContexts) |
| { |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PrintWriter pw = new PrintWriter(baos); |
| ex.printStackTrace(pw); |
| pw.flush(); // NOTE: you must flush or baos will be empty. |
| EncapsOutputStream encapsOutputStream = |
| sun.corba.OutputStreamFactory.newEncapsOutputStream((ORB)mediator.getBroker()); |
| encapsOutputStream.putEndian(); |
| encapsOutputStream.write_wstring(baos.toString()); |
| UnknownServiceContext serviceContext = |
| new UnknownServiceContext(ExceptionDetailMessage.value, |
| encapsOutputStream.toByteArray()); |
| serviceContexts.put(serviceContext); |
| } |
| |
| public CorbaMessageMediator createLocationForward( |
| CorbaMessageMediator messageMediator, IOR ior, ServiceContexts svc) |
| { |
| ReplyMessage reply |
| = MessageBase.createReply( |
| (ORB)messageMediator.getBroker(), |
| messageMediator.getGIOPVersion(), |
| messageMediator.getEncodingVersion(), |
| messageMediator.getRequestId(), |
| ReplyMessage.LOCATION_FORWARD, |
| getServiceContextsForReply(messageMediator, svc), |
| ior); |
| |
| return createResponseHelper(messageMediator, reply, ior); |
| } |
| |
| protected CorbaMessageMediator createResponseHelper( |
| CorbaMessageMediator messageMediator, ServiceContexts svc) |
| { |
| ReplyMessage message = |
| MessageBase.createReply( |
| (ORB)messageMediator.getBroker(), |
| messageMediator.getGIOPVersion(), |
| messageMediator.getEncodingVersion(), |
| messageMediator.getRequestId(), |
| ReplyMessage.NO_EXCEPTION, |
| svc, |
| null); |
| return createResponseHelper(messageMediator, message, null); |
| } |
| |
| protected CorbaMessageMediator createResponseHelper( |
| CorbaMessageMediator messageMediator, ServiceContexts svc,boolean user) |
| { |
| ReplyMessage message = |
| MessageBase.createReply( |
| (ORB)messageMediator.getBroker(), |
| messageMediator.getGIOPVersion(), |
| messageMediator.getEncodingVersion(), |
| messageMediator.getRequestId(), |
| user ? ReplyMessage.USER_EXCEPTION : |
| ReplyMessage.SYSTEM_EXCEPTION, |
| svc, |
| null); |
| return createResponseHelper(messageMediator, message, null); |
| } |
| |
| // REVISIT - IOR arg is ignored. |
| protected CorbaMessageMediator createResponseHelper( |
| CorbaMessageMediator messageMediator, ReplyMessage reply, IOR ior) |
| { |
| // REVISIT - these should be invoked from subcontract. |
| runServantPostInvoke(messageMediator); |
| runInterceptors(messageMediator, reply); |
| runRemoveThreadInfo(messageMediator); |
| |
| if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { |
| dprint(".createResponseHelper: " |
| + opAndId(messageMediator) + ": " |
| + reply); |
| } |
| |
| messageMediator.setReplyHeader(reply); |
| |
| OutputObject replyOutputObject; |
| // REVISIT = do not use null. |
| // |
| if (messageMediator.getConnection() == null) { |
| replyOutputObject = |
| sun.corba.OutputStreamFactory.newCDROutputObject(orb, |
| messageMediator, messageMediator.getReplyHeader(), |
| messageMediator.getStreamFormatVersion(), |
| BufferManagerFactory.GROW); |
| } else { |
| replyOutputObject = messageMediator.getConnection().getAcceptor() |
| .createOutputObject(messageMediator.getBroker(), messageMediator); |
| } |
| messageMediator.setOutputObject(replyOutputObject); |
| messageMediator.getOutputObject().setMessageMediator(messageMediator); |
| |
| reply.write((OutputStream) messageMediator.getOutputObject()); |
| if (reply.getIOR() != null) { |
| reply.getIOR().write((OutputStream) messageMediator.getOutputObject()); |
| } |
| // REVISIT - not necessary? |
| //messageMediator.this.replyIOR = reply.getIOR(); |
| |
| // NOTE: The mediator holds onto output object so return value |
| // not really necessary. |
| return messageMediator; |
| } |
| |
| protected void runServantPostInvoke(CorbaMessageMediator messageMediator) |
| { |
| // Run ServantLocator::postinvoke. This may cause a SystemException |
| // which will throw out of the constructor and return later |
| // to construct a reply for that exception. The internal logic |
| // of returnServant makes sure that postinvoke is only called once. |
| // REVISIT: instead of instanceof, put method on all orbs. |
| ORB orb = null; |
| // This flag is to deal with BootstrapServer use of reply streams, |
| // with ServerRequestDispatcher's use of reply streams, etc. |
| if (messageMediator.executeReturnServantInResponseConstructor()) { |
| // It is possible to get marshaling errors in the skeleton after |
| // postinvoke has completed. We must set this to false so that |
| // when the error exception reply is constructed we don't try |
| // to incorrectly access poa current (which will be the wrong |
| // one or an empty stack. |
| messageMediator.setExecuteReturnServantInResponseConstructor(false); |
| messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(true); |
| |
| try { |
| orb = (ORB)messageMediator.getBroker(); |
| OAInvocationInfo info = orb.peekInvocationInfo() ; |
| ObjectAdapter oa = info.oa(); |
| try { |
| oa.returnServant() ; |
| } catch (Throwable thr) { |
| wrapper.unexpectedException( thr ) ; |
| |
| if (thr instanceof Error) |
| throw (Error)thr ; |
| else if (thr instanceof RuntimeException) |
| throw (RuntimeException)thr ; |
| } finally { |
| oa.exit(); |
| } |
| } catch (EmptyStackException ese) { |
| throw wrapper.emptyStackRunServantPostInvoke( ese ) ; |
| } |
| } |
| } |
| |
| protected void runInterceptors(CorbaMessageMediator messageMediator, |
| ReplyMessage reply) |
| { |
| if( messageMediator.executePIInResponseConstructor() ) { |
| // Invoke server request ending interception points (send_*): |
| // Note: this may end up with a SystemException or an internal |
| // Runtime ForwardRequest |
| ((ORB)messageMediator.getBroker()).getPIHandler(). |
| invokeServerPIEndingPoint( reply ); |
| |
| // Note this will be executed even if a ForwardRequest or |
| // SystemException is thrown by a Portable Interceptors ending |
| // point since we end up in this constructor again anyway. |
| ((ORB)messageMediator.getBroker()).getPIHandler(). |
| cleanupServerPIRequest(); |
| |
| // See createSystemExceptionResponse for why this is necesary. |
| messageMediator.setExecutePIInResponseConstructor(false); |
| } |
| } |
| |
| protected void runRemoveThreadInfo(CorbaMessageMediator messageMediator) |
| { |
| // Once you get here then the final reply is available (i.e., |
| // postinvoke and interceptors have completed. |
| if (messageMediator.executeRemoveThreadInfoInResponseConstructor()) { |
| messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(false); |
| ((ORB)messageMediator.getBroker()).popInvocationInfo() ; |
| } |
| } |
| |
| protected ServiceContexts getServiceContextsForReply( |
| CorbaMessageMediator messageMediator, ServiceContexts contexts) |
| { |
| CorbaConnection c = (CorbaConnection) messageMediator.getConnection(); |
| |
| if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { |
| dprint(".getServiceContextsForReply: " |
| + opAndId(messageMediator) |
| + ": " + c); |
| } |
| |
| if (contexts == null) { |
| contexts = new ServiceContexts(((ORB)messageMediator.getBroker())); |
| } |
| |
| // NOTE : We only want to send the runtime context the first time |
| |
| if (c != null && !c.isPostInitialContexts()) { |
| c.setPostInitialContexts(); |
| SendingContextServiceContext scsc = |
| new SendingContextServiceContext( |
| ((ORB)messageMediator.getBroker()).getFVDCodeBaseIOR()) ; |
| |
| if (contexts.get( scsc.getId() ) != null) |
| throw wrapper.duplicateSendingContextServiceContext() ; |
| |
| contexts.put( scsc ) ; |
| |
| if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag) |
| dprint(".getServiceContextsForReply: " |
| + opAndId(messageMediator) |
| + ": added SendingContextServiceContext" ) ; |
| } |
| |
| // send ORBVersion servicecontext as part of the Reply |
| |
| ORBVersionServiceContext ovsc |
| = new ORBVersionServiceContext(ORBVersionFactory.getORBVersion()); |
| |
| if (contexts.get( ovsc.getId() ) != null) |
| throw wrapper.duplicateOrbVersionServiceContext() ; |
| |
| contexts.put( ovsc ) ; |
| |
| if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag) |
| dprint(".getServiceContextsForReply: " |
| + opAndId(messageMediator) |
| + ": added ORB version service context"); |
| |
| return contexts; |
| } |
| |
| // REVISIT - this method should be migrated to orbutil.ORBUtility |
| // since all locations that release ByteBuffers use |
| // very similar logic and debug information. |
| private void releaseByteBufferToPool() { |
| if (dispatchByteBuffer != null) { |
| orb.getByteBufferPool().releaseByteBuffer(dispatchByteBuffer); |
| if (transportDebug()) { |
| int bbId = System.identityHashCode(dispatchByteBuffer); |
| StringBuffer sb = new StringBuffer(); |
| sb.append(".handleInput: releasing ByteBuffer (" + bbId + |
| ") to ByteBufferPool"); |
| dprint(sb.toString()); |
| } |
| } |
| } |
| } |
| |
| // End of file. |