J. Duke | 319a3b9 | 2007-12-01 00:00:00 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright 1996-2005 Sun Microsystems, Inc. All Rights Reserved. |
| 3 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| 4 | * |
| 5 | * This code is free software; you can redistribute it and/or modify it |
| 6 | * under the terms of the GNU General Public License version 2 only, as |
| 7 | * published by the Free Software Foundation. Sun designates this |
| 8 | * particular file as subject to the "Classpath" exception as provided |
| 9 | * by Sun in the LICENSE file that accompanied this code. |
| 10 | * |
| 11 | * This code is distributed in the hope that it will be useful, but WITHOUT |
| 12 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 13 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| 14 | * version 2 for more details (a copy is included in the LICENSE file that |
| 15 | * accompanied this code). |
| 16 | * |
| 17 | * You should have received a copy of the GNU General Public License version |
| 18 | * 2 along with this work; if not, write to the Free Software Foundation, |
| 19 | * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| 20 | * |
| 21 | * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
| 22 | * CA 95054 USA or visit www.sun.com if you need additional information or |
| 23 | * have any questions. |
| 24 | */ |
| 25 | package sun.rmi.transport.tcp; |
| 26 | |
| 27 | import java.io.DataInputStream; |
| 28 | import java.io.DataOutputStream; |
| 29 | import java.io.IOException; |
| 30 | import java.lang.ref.Reference; |
| 31 | import java.lang.ref.SoftReference; |
| 32 | import java.net.Socket; |
| 33 | import java.rmi.ConnectIOException; |
| 34 | import java.rmi.RemoteException; |
| 35 | import java.security.AccessControlContext; |
| 36 | import java.security.AccessController; |
| 37 | import java.util.ArrayList; |
| 38 | import java.util.List; |
| 39 | import java.util.ListIterator; |
| 40 | import java.util.WeakHashMap; |
| 41 | import java.util.concurrent.Future; |
| 42 | import java.util.concurrent.ScheduledExecutorService; |
| 43 | import java.util.concurrent.TimeUnit; |
| 44 | import sun.rmi.runtime.Log; |
| 45 | import sun.rmi.runtime.NewThreadAction; |
| 46 | import sun.rmi.runtime.RuntimeUtil; |
| 47 | import sun.rmi.transport.Channel; |
| 48 | import sun.rmi.transport.Connection; |
| 49 | import sun.rmi.transport.Endpoint; |
| 50 | import sun.rmi.transport.TransportConstants; |
| 51 | import sun.security.action.GetIntegerAction; |
| 52 | import sun.security.action.GetLongAction; |
| 53 | |
| 54 | /** |
| 55 | * TCPChannel is the socket-based implementation of the RMI Channel |
| 56 | * abstraction. |
| 57 | * |
| 58 | * @author Ann Wollrath |
| 59 | */ |
| 60 | public class TCPChannel implements Channel { |
| 61 | /** endpoint for this channel */ |
| 62 | private final TCPEndpoint ep; |
| 63 | /** transport for this channel */ |
| 64 | private final TCPTransport tr; |
| 65 | /** list of cached connections */ |
| 66 | private final List<TCPConnection> freeList = |
| 67 | new ArrayList<TCPConnection>(); |
| 68 | /** frees cached connections that have expired (guarded by freeList) */ |
| 69 | private Future<?> reaper = null; |
| 70 | |
| 71 | /** using multiplexer (for bi-directional applet communication */ |
| 72 | private boolean usingMultiplexer = false; |
| 73 | /** connection multiplexer, if used */ |
| 74 | private ConnectionMultiplexer multiplexer = null; |
| 75 | /** connection acceptor (should be in TCPTransport) */ |
| 76 | private ConnectionAcceptor acceptor; |
| 77 | |
| 78 | /** most recently authorized AccessControlContext */ |
| 79 | private AccessControlContext okContext; |
| 80 | |
| 81 | /** cache of authorized AccessControlContexts */ |
| 82 | private WeakHashMap<AccessControlContext, |
| 83 | Reference<AccessControlContext>> authcache; |
| 84 | |
| 85 | /** the SecurityManager which authorized okContext and authcache */ |
| 86 | private SecurityManager cacheSecurityManager = null; |
| 87 | |
| 88 | /** client-side connection idle usage timeout */ |
| 89 | private static final long idleTimeout = // default 15 seconds |
| 90 | AccessController.doPrivileged( |
| 91 | new GetLongAction("sun.rmi.transport.connectionTimeout", 15000)); |
| 92 | |
| 93 | /** client-side connection handshake read timeout */ |
| 94 | private static final int handshakeTimeout = // default 1 minute |
| 95 | AccessController.doPrivileged( |
| 96 | new GetIntegerAction("sun.rmi.transport.tcp.handshakeTimeout", |
| 97 | 60000)); |
| 98 | |
| 99 | /** client-side connection response read timeout (after handshake) */ |
| 100 | private static final int responseTimeout = // default infinity |
| 101 | AccessController.doPrivileged( |
| 102 | new GetIntegerAction("sun.rmi.transport.tcp.responseTimeout", 0)); |
| 103 | |
| 104 | /** thread pool for scheduling delayed tasks */ |
| 105 | private static final ScheduledExecutorService scheduler = |
| 106 | AccessController.doPrivileged( |
| 107 | new RuntimeUtil.GetInstanceAction()).getScheduler(); |
| 108 | |
| 109 | /** |
| 110 | * Create channel for endpoint. |
| 111 | */ |
| 112 | TCPChannel(TCPTransport tr, TCPEndpoint ep) { |
| 113 | this.tr = tr; |
| 114 | this.ep = ep; |
| 115 | } |
| 116 | |
| 117 | /** |
| 118 | * Return the endpoint for this channel. |
| 119 | */ |
| 120 | public Endpoint getEndpoint() { |
| 121 | return ep; |
| 122 | } |
| 123 | |
| 124 | /** |
| 125 | * Checks if the current caller has sufficient privilege to make |
| 126 | * a connection to the remote endpoint. |
| 127 | * @exception SecurityException if caller is not allowed to use this |
| 128 | * Channel. |
| 129 | */ |
| 130 | private void checkConnectPermission() throws SecurityException { |
| 131 | SecurityManager security = System.getSecurityManager(); |
| 132 | if (security == null) |
| 133 | return; |
| 134 | |
| 135 | if (security != cacheSecurityManager) { |
| 136 | // The security manager changed: flush the cache |
| 137 | okContext = null; |
| 138 | authcache = new WeakHashMap<AccessControlContext, |
| 139 | Reference<AccessControlContext>>(); |
| 140 | cacheSecurityManager = security; |
| 141 | } |
| 142 | |
| 143 | AccessControlContext ctx = AccessController.getContext(); |
| 144 | |
| 145 | // If ctx is the same context as last time, or if it |
| 146 | // appears in the cache, bypass the checkConnect. |
| 147 | if (okContext == null || |
| 148 | !(okContext.equals(ctx) || authcache.containsKey(ctx))) |
| 149 | { |
| 150 | security.checkConnect(ep.getHost(), ep.getPort()); |
| 151 | authcache.put(ctx, new SoftReference<AccessControlContext>(ctx)); |
| 152 | // A WeakHashMap is transformed into a SoftHashSet by making |
| 153 | // each value softly refer to its own key (Peter's idea). |
| 154 | } |
| 155 | okContext = ctx; |
| 156 | } |
| 157 | |
| 158 | /** |
| 159 | * Supplies a connection to the endpoint of the address space |
| 160 | * for which this is a channel. The returned connection may |
| 161 | * be one retrieved from a cache of idle connections. |
| 162 | */ |
| 163 | public Connection newConnection() throws RemoteException { |
| 164 | TCPConnection conn; |
| 165 | |
| 166 | // loop until we find a free live connection (in which case |
| 167 | // we return) or until we run out of freelist (in which case |
| 168 | // the loop exits) |
| 169 | do { |
| 170 | conn = null; |
| 171 | // try to get a free connection |
| 172 | synchronized (freeList) { |
| 173 | int elementPos = freeList.size()-1; |
| 174 | |
| 175 | if (elementPos >= 0) { |
| 176 | // If there is a security manager, make sure |
| 177 | // the caller is allowed to connect to the |
| 178 | // requested endpoint. |
| 179 | checkConnectPermission(); |
| 180 | conn = freeList.get(elementPos); |
| 181 | freeList.remove(elementPos); |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | // at this point, conn is null iff the freelist is empty, |
| 186 | // and nonnull if a free connection of uncertain vitality |
| 187 | // has been found. |
| 188 | |
| 189 | if (conn != null) { |
| 190 | // check to see if the connection has closed since last use |
| 191 | if (!conn.isDead()) { |
| 192 | TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); |
| 193 | return conn; |
| 194 | } |
| 195 | |
| 196 | // conn is dead, and cannot be reused (reuse => false) |
| 197 | this.free(conn, false); |
| 198 | } |
| 199 | } while (conn != null); |
| 200 | |
| 201 | // none free, so create a new connection |
| 202 | return (createConnection()); |
| 203 | } |
| 204 | |
| 205 | /** |
| 206 | * Create a new connection to the remote endpoint of this channel. |
| 207 | * The returned connection is new. The caller must already have |
| 208 | * passed a security checkConnect or equivalent. |
| 209 | */ |
| 210 | private Connection createConnection() throws RemoteException { |
| 211 | Connection conn; |
| 212 | |
| 213 | TCPTransport.tcpLog.log(Log.BRIEF, "create connection"); |
| 214 | |
| 215 | if (!usingMultiplexer) { |
| 216 | Socket sock = ep.newSocket(); |
| 217 | conn = new TCPConnection(this, sock); |
| 218 | |
| 219 | try { |
| 220 | DataOutputStream out = |
| 221 | new DataOutputStream(conn.getOutputStream()); |
| 222 | writeTransportHeader(out); |
| 223 | |
| 224 | // choose protocol (single op if not reusable socket) |
| 225 | if (!conn.isReusable()) { |
| 226 | out.writeByte(TransportConstants.SingleOpProtocol); |
| 227 | } else { |
| 228 | out.writeByte(TransportConstants.StreamProtocol); |
| 229 | out.flush(); |
| 230 | |
| 231 | /* |
| 232 | * Set socket read timeout to configured value for JRMP |
| 233 | * connection handshake; this also serves to guard against |
| 234 | * non-JRMP servers that do not respond (see 4322806). |
| 235 | */ |
| 236 | int originalSoTimeout = 0; |
| 237 | try { |
| 238 | originalSoTimeout = sock.getSoTimeout(); |
| 239 | sock.setSoTimeout(handshakeTimeout); |
| 240 | } catch (Exception e) { |
| 241 | // if we fail to set this, ignore and proceed anyway |
| 242 | } |
| 243 | |
| 244 | DataInputStream in = |
| 245 | new DataInputStream(conn.getInputStream()); |
| 246 | byte ack = in.readByte(); |
| 247 | if (ack != TransportConstants.ProtocolAck) { |
| 248 | throw new ConnectIOException( |
| 249 | ack == TransportConstants.ProtocolNack ? |
| 250 | "JRMP StreamProtocol not supported by server" : |
| 251 | "non-JRMP server at remote endpoint"); |
| 252 | } |
| 253 | |
| 254 | String suggestedHost = in.readUTF(); |
| 255 | int suggestedPort = in.readInt(); |
| 256 | if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { |
| 257 | TCPTransport.tcpLog.log(Log.VERBOSE, |
| 258 | "server suggested " + suggestedHost + ":" + |
| 259 | suggestedPort); |
| 260 | } |
| 261 | |
| 262 | // set local host name, if unknown |
| 263 | TCPEndpoint.setLocalHost(suggestedHost); |
| 264 | // do NOT set the default port, because we don't |
| 265 | // know if we can't listen YET... |
| 266 | |
| 267 | // write out default endpoint to match protocol |
| 268 | // (but it serves no purpose) |
| 269 | TCPEndpoint localEp = |
| 270 | TCPEndpoint.getLocalEndpoint(0, null, null); |
| 271 | out.writeUTF(localEp.getHost()); |
| 272 | out.writeInt(localEp.getPort()); |
| 273 | if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { |
| 274 | TCPTransport.tcpLog.log(Log.VERBOSE, "using " + |
| 275 | localEp.getHost() + ":" + localEp.getPort()); |
| 276 | } |
| 277 | |
| 278 | /* |
| 279 | * After JRMP handshake, set socket read timeout to value |
| 280 | * configured for the rest of the lifetime of the |
| 281 | * connection. NOTE: this timeout, if configured to a |
| 282 | * finite duration, places an upper bound on the time |
| 283 | * that a remote method call is permitted to execute. |
| 284 | */ |
| 285 | try { |
| 286 | /* |
| 287 | * If socket factory had set a non-zero timeout on its |
| 288 | * own, then restore it instead of using the property- |
| 289 | * configured value. |
| 290 | */ |
| 291 | sock.setSoTimeout((originalSoTimeout != 0 ? |
| 292 | originalSoTimeout : |
| 293 | responseTimeout)); |
| 294 | } catch (Exception e) { |
| 295 | // if we fail to set this, ignore and proceed anyway |
| 296 | } |
| 297 | |
| 298 | out.flush(); |
| 299 | } |
| 300 | } catch (IOException e) { |
| 301 | if (e instanceof RemoteException) |
| 302 | throw (RemoteException) e; |
| 303 | else |
| 304 | throw new ConnectIOException( |
| 305 | "error during JRMP connection establishment", e); |
| 306 | } |
| 307 | } else { |
| 308 | try { |
| 309 | conn = multiplexer.openConnection(); |
| 310 | } catch (IOException e) { |
| 311 | synchronized (this) { |
| 312 | usingMultiplexer = false; |
| 313 | multiplexer = null; |
| 314 | } |
| 315 | throw new ConnectIOException( |
| 316 | "error opening virtual connection " + |
| 317 | "over multiplexed connection", e); |
| 318 | } |
| 319 | } |
| 320 | return conn; |
| 321 | } |
| 322 | |
| 323 | /** |
| 324 | * Free the connection generated by this channel. |
| 325 | * @param conn The connection |
| 326 | * @param reuse If true, the connection is in a state in which it |
| 327 | * can be reused for another method call. |
| 328 | */ |
| 329 | public void free(Connection conn, boolean reuse) { |
| 330 | if (conn == null) return; |
| 331 | |
| 332 | if (reuse && conn.isReusable()) { |
| 333 | long lastuse = System.currentTimeMillis(); |
| 334 | TCPConnection tcpConnection = (TCPConnection) conn; |
| 335 | |
| 336 | TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); |
| 337 | |
| 338 | /* |
| 339 | * Cache connection; if reaper task for expired |
| 340 | * connections isn't scheduled, then schedule it. |
| 341 | */ |
| 342 | synchronized (freeList) { |
| 343 | freeList.add(tcpConnection); |
| 344 | if (reaper == null) { |
| 345 | TCPTransport.tcpLog.log(Log.BRIEF, "create reaper"); |
| 346 | |
| 347 | reaper = scheduler.scheduleWithFixedDelay( |
| 348 | new Runnable() { |
| 349 | public void run() { |
| 350 | TCPTransport.tcpLog.log(Log.VERBOSE, |
| 351 | "wake up"); |
| 352 | freeCachedConnections(); |
| 353 | } |
| 354 | }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS); |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | tcpConnection.setLastUseTime(lastuse); |
| 359 | tcpConnection.setExpiration(lastuse + idleTimeout); |
| 360 | } else { |
| 361 | TCPTransport.tcpLog.log(Log.BRIEF, "close connection"); |
| 362 | |
| 363 | try { |
| 364 | conn.close(); |
| 365 | } catch (IOException ignored) { |
| 366 | } |
| 367 | } |
| 368 | } |
| 369 | |
| 370 | /** |
| 371 | * Send transport header over stream. |
| 372 | */ |
| 373 | private void writeTransportHeader(DataOutputStream out) |
| 374 | throws RemoteException |
| 375 | { |
| 376 | try { |
| 377 | // write out transport header |
| 378 | DataOutputStream dataOut = |
| 379 | new DataOutputStream(out); |
| 380 | dataOut.writeInt(TransportConstants.Magic); |
| 381 | dataOut.writeShort(TransportConstants.Version); |
| 382 | } catch (IOException e) { |
| 383 | throw new ConnectIOException( |
| 384 | "error writing JRMP transport header", e); |
| 385 | } |
| 386 | } |
| 387 | |
| 388 | /** |
| 389 | * Use given connection multiplexer object to obtain new connections |
| 390 | * through this channel. |
| 391 | */ |
| 392 | synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) { |
| 393 | // for now, always just use the last one given |
| 394 | multiplexer = newMultiplexer; |
| 395 | |
| 396 | usingMultiplexer = true; |
| 397 | } |
| 398 | |
| 399 | /** |
| 400 | * Accept a connection provided over a multiplexed channel. |
| 401 | */ |
| 402 | void acceptMultiplexConnection(Connection conn) { |
| 403 | if (acceptor == null) { |
| 404 | acceptor = new ConnectionAcceptor(tr); |
| 405 | acceptor.startNewAcceptor(); |
| 406 | } |
| 407 | acceptor.accept(conn); |
| 408 | } |
| 409 | |
| 410 | /** |
| 411 | * Closes all the connections in the cache, whether timed out or not. |
| 412 | */ |
| 413 | public void shedCache() { |
| 414 | // Build a list of connections, to avoid holding the freeList |
| 415 | // lock during (potentially long-running) close() calls. |
| 416 | Connection[] conn; |
| 417 | synchronized (freeList) { |
| 418 | conn = freeList.toArray(new Connection[freeList.size()]); |
| 419 | freeList.clear(); |
| 420 | } |
| 421 | |
| 422 | // Close all the connections that were free |
| 423 | for (int i = conn.length; --i >= 0; ) { |
| 424 | Connection c = conn[i]; |
| 425 | conn[i] = null; // help gc |
| 426 | try { |
| 427 | c.close(); |
| 428 | } catch (java.io.IOException e) { |
| 429 | // eat exception |
| 430 | } |
| 431 | } |
| 432 | } |
| 433 | |
| 434 | private void freeCachedConnections() { |
| 435 | /* |
| 436 | * Remove each connection whose time out has expired. |
| 437 | */ |
| 438 | synchronized (freeList) { |
| 439 | int size = freeList.size(); |
| 440 | |
| 441 | if (size > 0) { |
| 442 | long time = System.currentTimeMillis(); |
| 443 | ListIterator<TCPConnection> iter = freeList.listIterator(size); |
| 444 | |
| 445 | while (iter.hasPrevious()) { |
| 446 | TCPConnection conn = iter.previous(); |
| 447 | if (conn.expired(time)) { |
| 448 | TCPTransport.tcpLog.log(Log.VERBOSE, |
| 449 | "connection timeout expired"); |
| 450 | |
| 451 | try { |
| 452 | conn.close(); |
| 453 | } catch (java.io.IOException e) { |
| 454 | // eat exception |
| 455 | } |
| 456 | iter.remove(); |
| 457 | } |
| 458 | } |
| 459 | } |
| 460 | |
| 461 | if (freeList.isEmpty()) { |
| 462 | reaper.cancel(false); |
| 463 | reaper = null; |
| 464 | } |
| 465 | } |
| 466 | } |
| 467 | } |
| 468 | |
| 469 | /** |
| 470 | * ConnectionAcceptor manages accepting new connections and giving them |
| 471 | * to TCPTransport's message handler on new threads. |
| 472 | * |
| 473 | * Since this object only needs to know which transport to give new |
| 474 | * connections to, it doesn't need to be per-channel as currently |
| 475 | * implemented. |
| 476 | */ |
| 477 | class ConnectionAcceptor implements Runnable { |
| 478 | |
| 479 | /** transport that will handle message on accepted connections */ |
| 480 | private TCPTransport transport; |
| 481 | |
| 482 | /** queue of connections to be accepted */ |
| 483 | private List<Connection> queue = new ArrayList<Connection>(); |
| 484 | |
| 485 | /** thread ID counter */ |
| 486 | private static int threadNum = 0; |
| 487 | |
| 488 | /** |
| 489 | * Create a new ConnectionAcceptor that will give connections |
| 490 | * to the specified transport on a new thread. |
| 491 | */ |
| 492 | public ConnectionAcceptor(TCPTransport transport) { |
| 493 | this.transport = transport; |
| 494 | } |
| 495 | |
| 496 | /** |
| 497 | * Start a new thread to accept connections. |
| 498 | */ |
| 499 | public void startNewAcceptor() { |
| 500 | Thread t = AccessController.doPrivileged( |
| 501 | new NewThreadAction(ConnectionAcceptor.this, |
| 502 | "Multiplex Accept-" + ++ threadNum, |
| 503 | true)); |
| 504 | t.start(); |
| 505 | } |
| 506 | |
| 507 | /** |
| 508 | * Add connection to queue of connections to be accepted. |
| 509 | */ |
| 510 | public void accept(Connection conn) { |
| 511 | synchronized (queue) { |
| 512 | queue.add(conn); |
| 513 | queue.notify(); |
| 514 | } |
| 515 | } |
| 516 | |
| 517 | /** |
| 518 | * Give transport next accepted conection, when available. |
| 519 | */ |
| 520 | public void run() { |
| 521 | Connection conn; |
| 522 | |
| 523 | synchronized (queue) { |
| 524 | while (queue.size() == 0) { |
| 525 | try { |
| 526 | queue.wait(); |
| 527 | } catch (InterruptedException e) { |
| 528 | } |
| 529 | } |
| 530 | startNewAcceptor(); |
| 531 | conn = queue.remove(0); |
| 532 | } |
| 533 | |
| 534 | transport.handleMessages(conn, true); |
| 535 | } |
| 536 | } |