blob: f426012bf7f1524f3796b448922536464d65a9c0 [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * Copyright 1996-2005 Sun Microsystems, Inc. All Rights Reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Sun designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Sun in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
22 * CA 95054 USA or visit www.sun.com if you need additional information or
23 * have any questions.
24 */
25package sun.rmi.transport.tcp;
26
27import java.io.DataInputStream;
28import java.io.DataOutputStream;
29import java.io.IOException;
30import java.lang.ref.Reference;
31import java.lang.ref.SoftReference;
32import java.net.Socket;
33import java.rmi.ConnectIOException;
34import java.rmi.RemoteException;
35import java.security.AccessControlContext;
36import java.security.AccessController;
37import java.util.ArrayList;
38import java.util.List;
39import java.util.ListIterator;
40import java.util.WeakHashMap;
41import java.util.concurrent.Future;
42import java.util.concurrent.ScheduledExecutorService;
43import java.util.concurrent.TimeUnit;
44import sun.rmi.runtime.Log;
45import sun.rmi.runtime.NewThreadAction;
46import sun.rmi.runtime.RuntimeUtil;
47import sun.rmi.transport.Channel;
48import sun.rmi.transport.Connection;
49import sun.rmi.transport.Endpoint;
50import sun.rmi.transport.TransportConstants;
51import sun.security.action.GetIntegerAction;
52import sun.security.action.GetLongAction;
53
54/**
55 * TCPChannel is the socket-based implementation of the RMI Channel
56 * abstraction.
57 *
58 * @author Ann Wollrath
59 */
60public class TCPChannel implements Channel {
61 /** endpoint for this channel */
62 private final TCPEndpoint ep;
63 /** transport for this channel */
64 private final TCPTransport tr;
65 /** list of cached connections */
66 private final List<TCPConnection> freeList =
67 new ArrayList<TCPConnection>();
68 /** frees cached connections that have expired (guarded by freeList) */
69 private Future<?> reaper = null;
70
71 /** using multiplexer (for bi-directional applet communication */
72 private boolean usingMultiplexer = false;
73 /** connection multiplexer, if used */
74 private ConnectionMultiplexer multiplexer = null;
75 /** connection acceptor (should be in TCPTransport) */
76 private ConnectionAcceptor acceptor;
77
78 /** most recently authorized AccessControlContext */
79 private AccessControlContext okContext;
80
81 /** cache of authorized AccessControlContexts */
82 private WeakHashMap<AccessControlContext,
83 Reference<AccessControlContext>> authcache;
84
85 /** the SecurityManager which authorized okContext and authcache */
86 private SecurityManager cacheSecurityManager = null;
87
88 /** client-side connection idle usage timeout */
89 private static final long idleTimeout = // default 15 seconds
90 AccessController.doPrivileged(
91 new GetLongAction("sun.rmi.transport.connectionTimeout", 15000));
92
93 /** client-side connection handshake read timeout */
94 private static final int handshakeTimeout = // default 1 minute
95 AccessController.doPrivileged(
96 new GetIntegerAction("sun.rmi.transport.tcp.handshakeTimeout",
97 60000));
98
99 /** client-side connection response read timeout (after handshake) */
100 private static final int responseTimeout = // default infinity
101 AccessController.doPrivileged(
102 new GetIntegerAction("sun.rmi.transport.tcp.responseTimeout", 0));
103
104 /** thread pool for scheduling delayed tasks */
105 private static final ScheduledExecutorService scheduler =
106 AccessController.doPrivileged(
107 new RuntimeUtil.GetInstanceAction()).getScheduler();
108
109 /**
110 * Create channel for endpoint.
111 */
112 TCPChannel(TCPTransport tr, TCPEndpoint ep) {
113 this.tr = tr;
114 this.ep = ep;
115 }
116
117 /**
118 * Return the endpoint for this channel.
119 */
120 public Endpoint getEndpoint() {
121 return ep;
122 }
123
124 /**
125 * Checks if the current caller has sufficient privilege to make
126 * a connection to the remote endpoint.
127 * @exception SecurityException if caller is not allowed to use this
128 * Channel.
129 */
130 private void checkConnectPermission() throws SecurityException {
131 SecurityManager security = System.getSecurityManager();
132 if (security == null)
133 return;
134
135 if (security != cacheSecurityManager) {
136 // The security manager changed: flush the cache
137 okContext = null;
138 authcache = new WeakHashMap<AccessControlContext,
139 Reference<AccessControlContext>>();
140 cacheSecurityManager = security;
141 }
142
143 AccessControlContext ctx = AccessController.getContext();
144
145 // If ctx is the same context as last time, or if it
146 // appears in the cache, bypass the checkConnect.
147 if (okContext == null ||
148 !(okContext.equals(ctx) || authcache.containsKey(ctx)))
149 {
150 security.checkConnect(ep.getHost(), ep.getPort());
151 authcache.put(ctx, new SoftReference<AccessControlContext>(ctx));
152 // A WeakHashMap is transformed into a SoftHashSet by making
153 // each value softly refer to its own key (Peter's idea).
154 }
155 okContext = ctx;
156 }
157
158 /**
159 * Supplies a connection to the endpoint of the address space
160 * for which this is a channel. The returned connection may
161 * be one retrieved from a cache of idle connections.
162 */
163 public Connection newConnection() throws RemoteException {
164 TCPConnection conn;
165
166 // loop until we find a free live connection (in which case
167 // we return) or until we run out of freelist (in which case
168 // the loop exits)
169 do {
170 conn = null;
171 // try to get a free connection
172 synchronized (freeList) {
173 int elementPos = freeList.size()-1;
174
175 if (elementPos >= 0) {
176 // If there is a security manager, make sure
177 // the caller is allowed to connect to the
178 // requested endpoint.
179 checkConnectPermission();
180 conn = freeList.get(elementPos);
181 freeList.remove(elementPos);
182 }
183 }
184
185 // at this point, conn is null iff the freelist is empty,
186 // and nonnull if a free connection of uncertain vitality
187 // has been found.
188
189 if (conn != null) {
190 // check to see if the connection has closed since last use
191 if (!conn.isDead()) {
192 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
193 return conn;
194 }
195
196 // conn is dead, and cannot be reused (reuse => false)
197 this.free(conn, false);
198 }
199 } while (conn != null);
200
201 // none free, so create a new connection
202 return (createConnection());
203 }
204
205 /**
206 * Create a new connection to the remote endpoint of this channel.
207 * The returned connection is new. The caller must already have
208 * passed a security checkConnect or equivalent.
209 */
210 private Connection createConnection() throws RemoteException {
211 Connection conn;
212
213 TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
214
215 if (!usingMultiplexer) {
216 Socket sock = ep.newSocket();
217 conn = new TCPConnection(this, sock);
218
219 try {
220 DataOutputStream out =
221 new DataOutputStream(conn.getOutputStream());
222 writeTransportHeader(out);
223
224 // choose protocol (single op if not reusable socket)
225 if (!conn.isReusable()) {
226 out.writeByte(TransportConstants.SingleOpProtocol);
227 } else {
228 out.writeByte(TransportConstants.StreamProtocol);
229 out.flush();
230
231 /*
232 * Set socket read timeout to configured value for JRMP
233 * connection handshake; this also serves to guard against
234 * non-JRMP servers that do not respond (see 4322806).
235 */
236 int originalSoTimeout = 0;
237 try {
238 originalSoTimeout = sock.getSoTimeout();
239 sock.setSoTimeout(handshakeTimeout);
240 } catch (Exception e) {
241 // if we fail to set this, ignore and proceed anyway
242 }
243
244 DataInputStream in =
245 new DataInputStream(conn.getInputStream());
246 byte ack = in.readByte();
247 if (ack != TransportConstants.ProtocolAck) {
248 throw new ConnectIOException(
249 ack == TransportConstants.ProtocolNack ?
250 "JRMP StreamProtocol not supported by server" :
251 "non-JRMP server at remote endpoint");
252 }
253
254 String suggestedHost = in.readUTF();
255 int suggestedPort = in.readInt();
256 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
257 TCPTransport.tcpLog.log(Log.VERBOSE,
258 "server suggested " + suggestedHost + ":" +
259 suggestedPort);
260 }
261
262 // set local host name, if unknown
263 TCPEndpoint.setLocalHost(suggestedHost);
264 // do NOT set the default port, because we don't
265 // know if we can't listen YET...
266
267 // write out default endpoint to match protocol
268 // (but it serves no purpose)
269 TCPEndpoint localEp =
270 TCPEndpoint.getLocalEndpoint(0, null, null);
271 out.writeUTF(localEp.getHost());
272 out.writeInt(localEp.getPort());
273 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
274 TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
275 localEp.getHost() + ":" + localEp.getPort());
276 }
277
278 /*
279 * After JRMP handshake, set socket read timeout to value
280 * configured for the rest of the lifetime of the
281 * connection. NOTE: this timeout, if configured to a
282 * finite duration, places an upper bound on the time
283 * that a remote method call is permitted to execute.
284 */
285 try {
286 /*
287 * If socket factory had set a non-zero timeout on its
288 * own, then restore it instead of using the property-
289 * configured value.
290 */
291 sock.setSoTimeout((originalSoTimeout != 0 ?
292 originalSoTimeout :
293 responseTimeout));
294 } catch (Exception e) {
295 // if we fail to set this, ignore and proceed anyway
296 }
297
298 out.flush();
299 }
300 } catch (IOException e) {
301 if (e instanceof RemoteException)
302 throw (RemoteException) e;
303 else
304 throw new ConnectIOException(
305 "error during JRMP connection establishment", e);
306 }
307 } else {
308 try {
309 conn = multiplexer.openConnection();
310 } catch (IOException e) {
311 synchronized (this) {
312 usingMultiplexer = false;
313 multiplexer = null;
314 }
315 throw new ConnectIOException(
316 "error opening virtual connection " +
317 "over multiplexed connection", e);
318 }
319 }
320 return conn;
321 }
322
323 /**
324 * Free the connection generated by this channel.
325 * @param conn The connection
326 * @param reuse If true, the connection is in a state in which it
327 * can be reused for another method call.
328 */
329 public void free(Connection conn, boolean reuse) {
330 if (conn == null) return;
331
332 if (reuse && conn.isReusable()) {
333 long lastuse = System.currentTimeMillis();
334 TCPConnection tcpConnection = (TCPConnection) conn;
335
336 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
337
338 /*
339 * Cache connection; if reaper task for expired
340 * connections isn't scheduled, then schedule it.
341 */
342 synchronized (freeList) {
343 freeList.add(tcpConnection);
344 if (reaper == null) {
345 TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
346
347 reaper = scheduler.scheduleWithFixedDelay(
348 new Runnable() {
349 public void run() {
350 TCPTransport.tcpLog.log(Log.VERBOSE,
351 "wake up");
352 freeCachedConnections();
353 }
354 }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
355 }
356 }
357
358 tcpConnection.setLastUseTime(lastuse);
359 tcpConnection.setExpiration(lastuse + idleTimeout);
360 } else {
361 TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
362
363 try {
364 conn.close();
365 } catch (IOException ignored) {
366 }
367 }
368 }
369
370 /**
371 * Send transport header over stream.
372 */
373 private void writeTransportHeader(DataOutputStream out)
374 throws RemoteException
375 {
376 try {
377 // write out transport header
378 DataOutputStream dataOut =
379 new DataOutputStream(out);
380 dataOut.writeInt(TransportConstants.Magic);
381 dataOut.writeShort(TransportConstants.Version);
382 } catch (IOException e) {
383 throw new ConnectIOException(
384 "error writing JRMP transport header", e);
385 }
386 }
387
388 /**
389 * Use given connection multiplexer object to obtain new connections
390 * through this channel.
391 */
392 synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) {
393 // for now, always just use the last one given
394 multiplexer = newMultiplexer;
395
396 usingMultiplexer = true;
397 }
398
399 /**
400 * Accept a connection provided over a multiplexed channel.
401 */
402 void acceptMultiplexConnection(Connection conn) {
403 if (acceptor == null) {
404 acceptor = new ConnectionAcceptor(tr);
405 acceptor.startNewAcceptor();
406 }
407 acceptor.accept(conn);
408 }
409
410 /**
411 * Closes all the connections in the cache, whether timed out or not.
412 */
413 public void shedCache() {
414 // Build a list of connections, to avoid holding the freeList
415 // lock during (potentially long-running) close() calls.
416 Connection[] conn;
417 synchronized (freeList) {
418 conn = freeList.toArray(new Connection[freeList.size()]);
419 freeList.clear();
420 }
421
422 // Close all the connections that were free
423 for (int i = conn.length; --i >= 0; ) {
424 Connection c = conn[i];
425 conn[i] = null; // help gc
426 try {
427 c.close();
428 } catch (java.io.IOException e) {
429 // eat exception
430 }
431 }
432 }
433
434 private void freeCachedConnections() {
435 /*
436 * Remove each connection whose time out has expired.
437 */
438 synchronized (freeList) {
439 int size = freeList.size();
440
441 if (size > 0) {
442 long time = System.currentTimeMillis();
443 ListIterator<TCPConnection> iter = freeList.listIterator(size);
444
445 while (iter.hasPrevious()) {
446 TCPConnection conn = iter.previous();
447 if (conn.expired(time)) {
448 TCPTransport.tcpLog.log(Log.VERBOSE,
449 "connection timeout expired");
450
451 try {
452 conn.close();
453 } catch (java.io.IOException e) {
454 // eat exception
455 }
456 iter.remove();
457 }
458 }
459 }
460
461 if (freeList.isEmpty()) {
462 reaper.cancel(false);
463 reaper = null;
464 }
465 }
466 }
467}
468
469/**
470 * ConnectionAcceptor manages accepting new connections and giving them
471 * to TCPTransport's message handler on new threads.
472 *
473 * Since this object only needs to know which transport to give new
474 * connections to, it doesn't need to be per-channel as currently
475 * implemented.
476 */
477class ConnectionAcceptor implements Runnable {
478
479 /** transport that will handle message on accepted connections */
480 private TCPTransport transport;
481
482 /** queue of connections to be accepted */
483 private List<Connection> queue = new ArrayList<Connection>();
484
485 /** thread ID counter */
486 private static int threadNum = 0;
487
488 /**
489 * Create a new ConnectionAcceptor that will give connections
490 * to the specified transport on a new thread.
491 */
492 public ConnectionAcceptor(TCPTransport transport) {
493 this.transport = transport;
494 }
495
496 /**
497 * Start a new thread to accept connections.
498 */
499 public void startNewAcceptor() {
500 Thread t = AccessController.doPrivileged(
501 new NewThreadAction(ConnectionAcceptor.this,
502 "Multiplex Accept-" + ++ threadNum,
503 true));
504 t.start();
505 }
506
507 /**
508 * Add connection to queue of connections to be accepted.
509 */
510 public void accept(Connection conn) {
511 synchronized (queue) {
512 queue.add(conn);
513 queue.notify();
514 }
515 }
516
517 /**
518 * Give transport next accepted conection, when available.
519 */
520 public void run() {
521 Connection conn;
522
523 synchronized (queue) {
524 while (queue.size() == 0) {
525 try {
526 queue.wait();
527 } catch (InterruptedException e) {
528 }
529 }
530 startNewAcceptor();
531 conn = queue.remove(0);
532 }
533
534 transport.handleMessages(conn, true);
535 }
536}