J. Duke | 319a3b9 | 2007-12-01 00:00:00 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright 2003-2006 Sun Microsystems, Inc. All Rights Reserved. |
| 3 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| 4 | * |
| 5 | * This code is free software; you can redistribute it and/or modify it |
| 6 | * under the terms of the GNU General Public License version 2 only, as |
| 7 | * published by the Free Software Foundation. Sun designates this |
| 8 | * particular file as subject to the "Classpath" exception as provided |
| 9 | * by Sun in the LICENSE file that accompanied this code. |
| 10 | * |
| 11 | * This code is distributed in the hope that it will be useful, but WITHOUT |
| 12 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 13 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| 14 | * version 2 for more details (a copy is included in the LICENSE file that |
| 15 | * accompanied this code). |
| 16 | * |
| 17 | * You should have received a copy of the GNU General Public License version |
| 18 | * 2 along with this work; if not, write to the Free Software Foundation, |
| 19 | * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| 20 | * |
| 21 | * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
| 22 | * CA 95054 USA or visit www.sun.com if you need additional information or |
| 23 | * have any questions. |
| 24 | */ |
| 25 | |
| 26 | package com.sun.jmx.remote.internal; |
| 27 | |
| 28 | import java.security.AccessController; |
| 29 | import java.security.PrivilegedAction; |
| 30 | import java.security.PrivilegedActionException; |
| 31 | import java.security.PrivilegedExceptionAction; |
| 32 | import java.util.ArrayList; |
| 33 | import java.util.Collection; |
| 34 | import java.util.Collections; |
| 35 | import java.util.HashSet; |
| 36 | import java.util.List; |
| 37 | import java.util.Set; |
| 38 | import java.util.HashMap; |
| 39 | import java.util.Map; |
| 40 | |
| 41 | import javax.management.InstanceNotFoundException; |
| 42 | import javax.management.MBeanServer; |
| 43 | import javax.management.MBeanServerDelegate; |
| 44 | import javax.management.MBeanServerNotification; |
| 45 | import javax.management.Notification; |
| 46 | import javax.management.NotificationBroadcaster; |
| 47 | import javax.management.NotificationFilter; |
| 48 | import javax.management.NotificationFilterSupport; |
| 49 | import javax.management.NotificationListener; |
| 50 | import javax.management.ObjectName; |
| 51 | import javax.management.QueryEval; |
| 52 | import javax.management.QueryExp; |
| 53 | |
| 54 | import javax.management.remote.NotificationResult; |
| 55 | import javax.management.remote.TargetedNotification; |
| 56 | |
| 57 | import com.sun.jmx.remote.util.EnvHelp; |
| 58 | import com.sun.jmx.remote.util.ClassLogger; |
| 59 | |
| 60 | /** A circular buffer of notifications received from an MBean server. */ |
| 61 | /* |
| 62 | There is one instance of ArrayNotificationBuffer for every |
| 63 | MBeanServer object that has an attached ConnectorServer. Then, for |
| 64 | every ConnectorServer attached to a given MBeanServer, there is an |
| 65 | instance of the inner class ShareBuffer. So for example with two |
| 66 | ConnectorServers it looks like this: |
| 67 | |
| 68 | ConnectorServer1 -> ShareBuffer1 -\ |
| 69 | }-> ArrayNotificationBuffer |
| 70 | ConnectorServer2 -> ShareBuffer2 -/ | |
| 71 | | |
| 72 | v |
| 73 | MBeanServer |
| 74 | |
| 75 | The ArrayNotificationBuffer has a circular buffer of |
| 76 | NamedNotification objects. Each ConnectorServer defines a |
| 77 | notification buffer size, and this size is recorded by the |
| 78 | corresponding ShareBuffer. The buffer size of the |
| 79 | ArrayNotificationBuffer is the maximum of all of its ShareBuffers. |
| 80 | When a ShareBuffer is added or removed, the ArrayNotificationBuffer |
| 81 | size is adjusted accordingly. |
| 82 | |
| 83 | An ArrayNotificationBuffer also has a BufferListener (which is a |
| 84 | NotificationListener) registered on every NotificationBroadcaster |
| 85 | MBean in the MBeanServer to which it is attached. The cost of this |
| 86 | potentially large set of listeners is the principal motivation for |
| 87 | sharing the ArrayNotificationBuffer between ConnectorServers, and |
| 88 | also the reason that we are careful to discard the |
| 89 | ArrayNotificationBuffer (and its BufferListeners) when there are no |
| 90 | longer any ConnectorServers using it. |
| 91 | |
| 92 | The synchronization of this class is inherently complex. In an attempt |
| 93 | to limit the complexity, we use just two locks: |
| 94 | |
| 95 | - globalLock controls access to the mapping between an MBeanServer |
| 96 | and its ArrayNotificationBuffer and to the set of ShareBuffers for |
| 97 | each ArrayNotificationBuffer. |
| 98 | |
| 99 | - the instance lock of each ArrayNotificationBuffer controls access |
| 100 | to the array of notifications, including its size, and to the |
| 101 | dispose flag of the ArrayNotificationBuffer. The wait/notify |
| 102 | mechanism is used to indicate changes to the array. |
| 103 | |
| 104 | If both locks are held at the same time, the globalLock must be |
| 105 | taken first. |
| 106 | |
| 107 | Since adding or removing a BufferListener to an MBean can involve |
| 108 | calling user code, we are careful not to hold any locks while it is |
| 109 | done. |
| 110 | */ |
| 111 | public class ArrayNotificationBuffer implements NotificationBuffer { |
| 112 | private boolean disposed = false; |
| 113 | |
| 114 | // FACTORY STUFF, INCLUDING SHARING |
| 115 | |
| 116 | private static final Object globalLock = new Object(); |
| 117 | private static final |
| 118 | HashMap<MBeanServer,ArrayNotificationBuffer> mbsToBuffer = |
| 119 | new HashMap<MBeanServer,ArrayNotificationBuffer>(1); |
| 120 | private final Collection<ShareBuffer> sharers = new HashSet<ShareBuffer>(1); |
| 121 | |
| 122 | public static NotificationBuffer getNotificationBuffer( |
| 123 | MBeanServer mbs, Map env) { |
| 124 | |
| 125 | if (env == null) |
| 126 | env = Collections.emptyMap(); |
| 127 | |
| 128 | //Find out queue size |
| 129 | int queueSize = EnvHelp.getNotifBufferSize(env); |
| 130 | |
| 131 | ArrayNotificationBuffer buf; |
| 132 | boolean create; |
| 133 | NotificationBuffer sharer; |
| 134 | synchronized (globalLock) { |
| 135 | buf = mbsToBuffer.get(mbs); |
| 136 | create = (buf == null); |
| 137 | if (create) { |
| 138 | buf = new ArrayNotificationBuffer(mbs, queueSize); |
| 139 | mbsToBuffer.put(mbs, buf); |
| 140 | } |
| 141 | sharer = buf.new ShareBuffer(queueSize); |
| 142 | } |
| 143 | /* We avoid holding any locks while calling createListeners. |
| 144 | * This prevents possible deadlocks involving user code, but |
| 145 | * does mean that a second ConnectorServer created and started |
| 146 | * in this window will return before all the listeners are ready, |
| 147 | * which could lead to surprising behaviour. The alternative |
| 148 | * would be to block the second ConnectorServer until the first |
| 149 | * one has finished adding all the listeners, but that would then |
| 150 | * be subject to deadlock. |
| 151 | */ |
| 152 | if (create) |
| 153 | buf.createListeners(); |
| 154 | return sharer; |
| 155 | } |
| 156 | |
| 157 | /* Ensure that this buffer is no longer the one that will be returned by |
| 158 | * getNotificationBuffer. This method is idempotent - calling it more |
| 159 | * than once has no effect beyond that of calling it once. |
| 160 | */ |
| 161 | static void removeNotificationBuffer(MBeanServer mbs) { |
| 162 | synchronized (globalLock) { |
| 163 | mbsToBuffer.remove(mbs); |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | void addSharer(ShareBuffer sharer) { |
| 168 | synchronized (globalLock) { |
| 169 | synchronized (this) { |
| 170 | if (sharer.getSize() > queueSize) |
| 171 | resize(sharer.getSize()); |
| 172 | } |
| 173 | sharers.add(sharer); |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | private void removeSharer(ShareBuffer sharer) { |
| 178 | boolean empty; |
| 179 | synchronized (globalLock) { |
| 180 | sharers.remove(sharer); |
| 181 | empty = sharers.isEmpty(); |
| 182 | if (empty) |
| 183 | removeNotificationBuffer(mBeanServer); |
| 184 | else { |
| 185 | int max = 0; |
| 186 | for (ShareBuffer buf : sharers) { |
| 187 | int bufsize = buf.getSize(); |
| 188 | if (bufsize > max) |
| 189 | max = bufsize; |
| 190 | } |
| 191 | if (max < queueSize) |
| 192 | resize(max); |
| 193 | } |
| 194 | } |
| 195 | if (empty) { |
| 196 | synchronized (this) { |
| 197 | disposed = true; |
| 198 | // Notify potential waiting fetchNotification call |
| 199 | notifyAll(); |
| 200 | } |
| 201 | destroyListeners(); |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | private synchronized void resize(int newSize) { |
| 206 | if (newSize == queueSize) |
| 207 | return; |
| 208 | while (queue.size() > newSize) |
| 209 | dropNotification(); |
| 210 | queue.resize(newSize); |
| 211 | queueSize = newSize; |
| 212 | } |
| 213 | |
| 214 | private class ShareBuffer implements NotificationBuffer { |
| 215 | ShareBuffer(int size) { |
| 216 | this.size = size; |
| 217 | addSharer(this); |
| 218 | } |
| 219 | |
| 220 | public NotificationResult |
| 221 | fetchNotifications(NotificationBufferFilter filter, |
| 222 | long startSequenceNumber, |
| 223 | long timeout, |
| 224 | int maxNotifications) |
| 225 | throws InterruptedException { |
| 226 | NotificationBuffer buf = ArrayNotificationBuffer.this; |
| 227 | return buf.fetchNotifications(filter, startSequenceNumber, |
| 228 | timeout, maxNotifications); |
| 229 | } |
| 230 | |
| 231 | public void dispose() { |
| 232 | ArrayNotificationBuffer.this.removeSharer(this); |
| 233 | } |
| 234 | |
| 235 | int getSize() { |
| 236 | return size; |
| 237 | } |
| 238 | |
| 239 | private final int size; |
| 240 | } |
| 241 | |
| 242 | |
| 243 | // ARRAYNOTIFICATIONBUFFER IMPLEMENTATION |
| 244 | |
| 245 | private ArrayNotificationBuffer(MBeanServer mbs, int queueSize) { |
| 246 | if (logger.traceOn()) |
| 247 | logger.trace("Constructor", "queueSize=" + queueSize); |
| 248 | |
| 249 | if (mbs == null || queueSize < 1) |
| 250 | throw new IllegalArgumentException("Bad args"); |
| 251 | |
| 252 | this.mBeanServer = mbs; |
| 253 | this.queueSize = queueSize; |
| 254 | this.queue = new ArrayQueue<NamedNotification>(queueSize); |
| 255 | this.earliestSequenceNumber = System.currentTimeMillis(); |
| 256 | this.nextSequenceNumber = this.earliestSequenceNumber; |
| 257 | |
| 258 | logger.trace("Constructor", "ends"); |
| 259 | } |
| 260 | |
| 261 | private synchronized boolean isDisposed() { |
| 262 | return disposed; |
| 263 | } |
| 264 | |
| 265 | // We no longer support calling this method from outside. |
| 266 | // The JDK doesn't contain any such calls and users are not |
| 267 | // supposed to be accessing this class. |
| 268 | public void dispose() { |
| 269 | throw new UnsupportedOperationException(); |
| 270 | } |
| 271 | |
| 272 | /** |
| 273 | * <p>Fetch notifications that match the given listeners.</p> |
| 274 | * |
| 275 | * <p>The operation only considers notifications with a sequence |
| 276 | * number at least <code>startSequenceNumber</code>. It will take |
| 277 | * no longer than <code>timeout</code>, and will return no more |
| 278 | * than <code>maxNotifications</code> different notifications.</p> |
| 279 | * |
| 280 | * <p>If there are no notifications matching the criteria, the |
| 281 | * operation will block until one arrives, subject to the |
| 282 | * timeout.</p> |
| 283 | * |
| 284 | * @param filter an object that will add notifications to a |
| 285 | * {@code List<TargetedNotification>} if they match the current |
| 286 | * listeners with their filters. |
| 287 | * @param startSequenceNumber the first sequence number to |
| 288 | * consider. |
| 289 | * @param timeout the maximum time to wait. May be 0 to indicate |
| 290 | * not to wait if there are no notifications. |
| 291 | * @param maxNotifications the maximum number of notifications to |
| 292 | * return. May be 0 to indicate a wait for eligible notifications |
| 293 | * that will return a usable <code>nextSequenceNumber</code>. The |
| 294 | * {@link TargetedNotification} array in the returned {@link |
| 295 | * NotificationResult} may contain more than this number of |
| 296 | * elements but will not contain more than this number of |
| 297 | * different notifications. |
| 298 | */ |
| 299 | public NotificationResult |
| 300 | fetchNotifications(NotificationBufferFilter filter, |
| 301 | long startSequenceNumber, |
| 302 | long timeout, |
| 303 | int maxNotifications) |
| 304 | throws InterruptedException { |
| 305 | |
| 306 | logger.trace("fetchNotifications", "starts"); |
| 307 | |
| 308 | if (startSequenceNumber < 0 || isDisposed()) { |
| 309 | synchronized(this) { |
| 310 | return new NotificationResult(earliestSequenceNumber(), |
| 311 | nextSequenceNumber(), |
| 312 | new TargetedNotification[0]); |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | // Check arg validity |
| 317 | if (filter == null |
| 318 | || startSequenceNumber < 0 || timeout < 0 |
| 319 | || maxNotifications < 0) { |
| 320 | logger.trace("fetchNotifications", "Bad args"); |
| 321 | throw new IllegalArgumentException("Bad args to fetch"); |
| 322 | } |
| 323 | |
| 324 | if (logger.debugOn()) { |
| 325 | logger.trace("fetchNotifications", |
| 326 | "filter=" + filter + "; startSeq=" + |
| 327 | startSequenceNumber + "; timeout=" + timeout + |
| 328 | "; max=" + maxNotifications); |
| 329 | } |
| 330 | |
| 331 | if (startSequenceNumber > nextSequenceNumber()) { |
| 332 | final String msg = "Start sequence number too big: " + |
| 333 | startSequenceNumber + " > " + nextSequenceNumber(); |
| 334 | logger.trace("fetchNotifications", msg); |
| 335 | throw new IllegalArgumentException(msg); |
| 336 | } |
| 337 | |
| 338 | /* Determine the end time corresponding to the timeout value. |
| 339 | Caller may legitimately supply Long.MAX_VALUE to indicate no |
| 340 | timeout. In that case the addition will overflow and produce |
| 341 | a negative end time. Set end time to Long.MAX_VALUE in that |
| 342 | case. We assume System.currentTimeMillis() is positive. */ |
| 343 | long endTime = System.currentTimeMillis() + timeout; |
| 344 | if (endTime < 0) // overflow |
| 345 | endTime = Long.MAX_VALUE; |
| 346 | |
| 347 | if (logger.debugOn()) |
| 348 | logger.debug("fetchNotifications", "endTime=" + endTime); |
| 349 | |
| 350 | /* We set earliestSeq the first time through the loop. If we |
| 351 | set it here, notifications could be dropped before we |
| 352 | started examining them, so earliestSeq might not correspond |
| 353 | to the earliest notification we examined. */ |
| 354 | long earliestSeq = -1; |
| 355 | long nextSeq = startSequenceNumber; |
| 356 | List<TargetedNotification> notifs = |
| 357 | new ArrayList<TargetedNotification>(); |
| 358 | |
| 359 | /* On exit from this loop, notifs, earliestSeq, and nextSeq must |
| 360 | all be correct values for the returned NotificationResult. */ |
| 361 | while (true) { |
| 362 | logger.debug("fetchNotifications", "main loop starts"); |
| 363 | |
| 364 | NamedNotification candidate; |
| 365 | |
| 366 | /* Get the next available notification regardless of filters, |
| 367 | or wait for one to arrive if there is none. */ |
| 368 | synchronized (this) { |
| 369 | |
| 370 | /* First time through. The current earliestSequenceNumber |
| 371 | is the first one we could have examined. */ |
| 372 | if (earliestSeq < 0) { |
| 373 | earliestSeq = earliestSequenceNumber(); |
| 374 | if (logger.debugOn()) { |
| 375 | logger.debug("fetchNotifications", |
| 376 | "earliestSeq=" + earliestSeq); |
| 377 | } |
| 378 | if (nextSeq < earliestSeq) { |
| 379 | nextSeq = earliestSeq; |
| 380 | logger.debug("fetchNotifications", |
| 381 | "nextSeq=earliestSeq"); |
| 382 | } |
| 383 | } else |
| 384 | earliestSeq = earliestSequenceNumber(); |
| 385 | |
| 386 | /* If many notifications have been dropped since the |
| 387 | last time through, nextSeq could now be earlier |
| 388 | than the current earliest. If so, notifications |
| 389 | may have been lost and we return now so the caller |
| 390 | can see this next time it calls. */ |
| 391 | if (nextSeq < earliestSeq) { |
| 392 | logger.trace("fetchNotifications", |
| 393 | "nextSeq=" + nextSeq + " < " + "earliestSeq=" + |
| 394 | earliestSeq + " so may have lost notifs"); |
| 395 | break; |
| 396 | } |
| 397 | |
| 398 | if (nextSeq < nextSequenceNumber()) { |
| 399 | candidate = notificationAt(nextSeq); |
| 400 | if (logger.debugOn()) { |
| 401 | logger.debug("fetchNotifications", "candidate: " + |
| 402 | candidate); |
| 403 | logger.debug("fetchNotifications", "nextSeq now " + |
| 404 | nextSeq); |
| 405 | } |
| 406 | } else { |
| 407 | /* nextSeq is the largest sequence number. If we |
| 408 | already got notifications, return them now. |
| 409 | Otherwise wait for some to arrive, with |
| 410 | timeout. */ |
| 411 | if (notifs.size() > 0) { |
| 412 | logger.debug("fetchNotifications", |
| 413 | "no more notifs but have some so don't wait"); |
| 414 | break; |
| 415 | } |
| 416 | long toWait = endTime - System.currentTimeMillis(); |
| 417 | if (toWait <= 0) { |
| 418 | logger.debug("fetchNotifications", "timeout"); |
| 419 | break; |
| 420 | } |
| 421 | |
| 422 | /* dispose called */ |
| 423 | if (isDisposed()) { |
| 424 | if (logger.debugOn()) |
| 425 | logger.debug("fetchNotifications", |
| 426 | "dispose callled, no wait"); |
| 427 | return new NotificationResult(earliestSequenceNumber(), |
| 428 | nextSequenceNumber(), |
| 429 | new TargetedNotification[0]); |
| 430 | } |
| 431 | |
| 432 | if (logger.debugOn()) |
| 433 | logger.debug("fetchNotifications", |
| 434 | "wait(" + toWait + ")"); |
| 435 | wait(toWait); |
| 436 | |
| 437 | continue; |
| 438 | } |
| 439 | } |
| 440 | |
| 441 | /* We have a candidate notification. See if it matches |
| 442 | our filters. We do this outside the synchronized block |
| 443 | so we don't hold up everyone accessing the buffer |
| 444 | (including notification senders) while we evaluate |
| 445 | potentially slow filters. */ |
| 446 | ObjectName name = candidate.getObjectName(); |
| 447 | Notification notif = candidate.getNotification(); |
| 448 | List<TargetedNotification> matchedNotifs = |
| 449 | new ArrayList<TargetedNotification>(); |
| 450 | logger.debug("fetchNotifications", |
| 451 | "applying filter to candidate"); |
| 452 | filter.apply(matchedNotifs, name, notif); |
| 453 | |
| 454 | if (matchedNotifs.size() > 0) { |
| 455 | /* We only check the max size now, so that our |
| 456 | returned nextSeq is as large as possible. This |
| 457 | prevents the caller from thinking it missed |
| 458 | interesting notifications when in fact we knew they |
| 459 | weren't. */ |
| 460 | if (maxNotifications <= 0) { |
| 461 | logger.debug("fetchNotifications", |
| 462 | "reached maxNotifications"); |
| 463 | break; |
| 464 | } |
| 465 | --maxNotifications; |
| 466 | if (logger.debugOn()) |
| 467 | logger.debug("fetchNotifications", "add: " + |
| 468 | matchedNotifs); |
| 469 | notifs.addAll(matchedNotifs); |
| 470 | } |
| 471 | |
| 472 | ++nextSeq; |
| 473 | } // end while |
| 474 | |
| 475 | /* Construct and return the result. */ |
| 476 | int nnotifs = notifs.size(); |
| 477 | TargetedNotification[] resultNotifs = |
| 478 | new TargetedNotification[nnotifs]; |
| 479 | notifs.toArray(resultNotifs); |
| 480 | NotificationResult nr = |
| 481 | new NotificationResult(earliestSeq, nextSeq, resultNotifs); |
| 482 | if (logger.debugOn()) |
| 483 | logger.debug("fetchNotifications", nr.toString()); |
| 484 | logger.trace("fetchNotifications", "ends"); |
| 485 | |
| 486 | return nr; |
| 487 | } |
| 488 | |
| 489 | synchronized long earliestSequenceNumber() { |
| 490 | return earliestSequenceNumber; |
| 491 | } |
| 492 | |
| 493 | synchronized long nextSequenceNumber() { |
| 494 | return nextSequenceNumber; |
| 495 | } |
| 496 | |
| 497 | synchronized void addNotification(NamedNotification notif) { |
| 498 | if (logger.traceOn()) |
| 499 | logger.trace("addNotification", notif.toString()); |
| 500 | |
| 501 | while (queue.size() >= queueSize) { |
| 502 | dropNotification(); |
| 503 | if (logger.debugOn()) { |
| 504 | logger.debug("addNotification", |
| 505 | "dropped oldest notif, earliestSeq=" + |
| 506 | earliestSequenceNumber); |
| 507 | } |
| 508 | } |
| 509 | queue.add(notif); |
| 510 | nextSequenceNumber++; |
| 511 | if (logger.debugOn()) |
| 512 | logger.debug("addNotification", "nextSeq=" + nextSequenceNumber); |
| 513 | notifyAll(); |
| 514 | } |
| 515 | |
| 516 | private void dropNotification() { |
| 517 | queue.remove(0); |
| 518 | earliestSequenceNumber++; |
| 519 | } |
| 520 | |
| 521 | synchronized NamedNotification notificationAt(long seqNo) { |
| 522 | long index = seqNo - earliestSequenceNumber; |
| 523 | if (index < 0 || index > Integer.MAX_VALUE) { |
| 524 | final String msg = "Bad sequence number: " + seqNo + " (earliest " |
| 525 | + earliestSequenceNumber + ")"; |
| 526 | logger.trace("notificationAt", msg); |
| 527 | throw new IllegalArgumentException(msg); |
| 528 | } |
| 529 | return queue.get((int) index); |
| 530 | } |
| 531 | |
| 532 | private static class NamedNotification { |
| 533 | NamedNotification(ObjectName sender, Notification notif) { |
| 534 | this.sender = sender; |
| 535 | this.notification = notif; |
| 536 | } |
| 537 | |
| 538 | ObjectName getObjectName() { |
| 539 | return sender; |
| 540 | } |
| 541 | |
| 542 | Notification getNotification() { |
| 543 | return notification; |
| 544 | } |
| 545 | |
| 546 | public String toString() { |
| 547 | return "NamedNotification(" + sender + ", " + notification + ")"; |
| 548 | } |
| 549 | |
| 550 | private final ObjectName sender; |
| 551 | private final Notification notification; |
| 552 | } |
| 553 | |
| 554 | /* |
| 555 | * Add our listener to every NotificationBroadcaster MBean |
| 556 | * currently in the MBean server and to every |
| 557 | * NotificationBroadcaster later created. |
| 558 | * |
| 559 | * It would be really nice if we could just do |
| 560 | * mbs.addNotificationListener(new ObjectName("*:*"), ...); |
| 561 | * Definitely something for the next version of JMX. |
| 562 | * |
| 563 | * There is a nasty race condition that we must handle. We |
| 564 | * first register for MBean-creation notifications so we can add |
| 565 | * listeners to new MBeans, then we query the existing MBeans to |
| 566 | * add listeners to them. The problem is that a new MBean could |
| 567 | * arrive after we register for creations but before the query has |
| 568 | * completed. Then we could see the MBean both in the query and |
| 569 | * in an MBean-creation notification, and we would end up |
| 570 | * registering our listener twice. |
| 571 | * |
| 572 | * To solve this problem, we arrange for new MBeans that arrive |
| 573 | * while the query is being done to be added to the Set createdDuringQuery |
| 574 | * and we do not add a listener immediately. When the query is done, |
| 575 | * we atomically turn off the addition of new names to createdDuringQuery |
| 576 | * and add all the names that were there to the result of the query. |
| 577 | * Since we are dealing with Sets, the result is the same whether or not |
| 578 | * the newly-created MBean was included in the query result. |
| 579 | * |
| 580 | * It is important not to hold any locks during the operation of adding |
| 581 | * listeners to MBeans. An MBean's addNotificationListener can be |
| 582 | * arbitrary user code, and this could deadlock with any locks we hold |
| 583 | * (see bug 6239400). The corollary is that we must not do any operations |
| 584 | * in this method or the methods it calls that require locks. |
| 585 | */ |
| 586 | private void createListeners() { |
| 587 | logger.debug("createListeners", "starts"); |
| 588 | |
| 589 | synchronized (this) { |
| 590 | createdDuringQuery = new HashSet<ObjectName>(); |
| 591 | } |
| 592 | |
| 593 | try { |
| 594 | addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, |
| 595 | creationListener, creationFilter, null); |
| 596 | logger.debug("createListeners", "added creationListener"); |
| 597 | } catch (Exception e) { |
| 598 | final String msg = "Can't add listener to MBean server delegate: "; |
| 599 | RuntimeException re = new IllegalArgumentException(msg + e); |
| 600 | EnvHelp.initCause(re, e); |
| 601 | logger.fine("createListeners", msg + e); |
| 602 | logger.debug("createListeners", e); |
| 603 | throw re; |
| 604 | } |
| 605 | |
| 606 | /* Spec doesn't say whether Set returned by QueryNames can be modified |
| 607 | so we clone it. */ |
| 608 | Set<ObjectName> names = queryNames(null, broadcasterQuery); |
| 609 | names = new HashSet<ObjectName>(names); |
| 610 | |
| 611 | synchronized (this) { |
| 612 | names.addAll(createdDuringQuery); |
| 613 | createdDuringQuery = null; |
| 614 | } |
| 615 | |
| 616 | for (ObjectName name : names) |
| 617 | addBufferListener(name); |
| 618 | logger.debug("createListeners", "ends"); |
| 619 | } |
| 620 | |
| 621 | private void addBufferListener(ObjectName name) { |
| 622 | checkNoLocks(); |
| 623 | if (logger.debugOn()) |
| 624 | logger.debug("addBufferListener", name.toString()); |
| 625 | try { |
| 626 | addNotificationListener(name, bufferListener, null, name); |
| 627 | } catch (Exception e) { |
| 628 | logger.trace("addBufferListener", e); |
| 629 | /* This can happen if the MBean was unregistered just |
| 630 | after the query. Or user NotificationBroadcaster might |
| 631 | throw unexpected exception. */ |
| 632 | } |
| 633 | } |
| 634 | |
| 635 | private void removeBufferListener(ObjectName name) { |
| 636 | checkNoLocks(); |
| 637 | if (logger.debugOn()) |
| 638 | logger.debug("removeBufferListener", name.toString()); |
| 639 | try { |
| 640 | removeNotificationListener(name, bufferListener); |
| 641 | } catch (Exception e) { |
| 642 | logger.trace("removeBufferListener", e); |
| 643 | } |
| 644 | } |
| 645 | |
| 646 | private void addNotificationListener(final ObjectName name, |
| 647 | final NotificationListener listener, |
| 648 | final NotificationFilter filter, |
| 649 | final Object handback) |
| 650 | throws Exception { |
| 651 | try { |
| 652 | AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { |
| 653 | public Void run() throws InstanceNotFoundException { |
| 654 | mBeanServer.addNotificationListener(name, |
| 655 | listener, |
| 656 | filter, |
| 657 | handback); |
| 658 | return null; |
| 659 | } |
| 660 | }); |
| 661 | } catch (Exception e) { |
| 662 | throw extractException(e); |
| 663 | } |
| 664 | } |
| 665 | |
| 666 | private void removeNotificationListener(final ObjectName name, |
| 667 | final NotificationListener listener) |
| 668 | throws Exception { |
| 669 | try { |
| 670 | AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { |
| 671 | public Void run() throws Exception { |
| 672 | mBeanServer.removeNotificationListener(name, listener); |
| 673 | return null; |
| 674 | } |
| 675 | }); |
| 676 | } catch (Exception e) { |
| 677 | throw extractException(e); |
| 678 | } |
| 679 | } |
| 680 | |
| 681 | private Set<ObjectName> queryNames(final ObjectName name, |
| 682 | final QueryExp query) { |
| 683 | PrivilegedAction<Set<ObjectName>> act = |
| 684 | new PrivilegedAction<Set<ObjectName>>() { |
| 685 | public Set<ObjectName> run() { |
| 686 | return mBeanServer.queryNames(name, query); |
| 687 | } |
| 688 | }; |
| 689 | try { |
| 690 | return AccessController.doPrivileged(act); |
| 691 | } catch (RuntimeException e) { |
| 692 | logger.fine("queryNames", "Failed to query names: " + e); |
| 693 | logger.debug("queryNames", e); |
| 694 | throw e; |
| 695 | } |
| 696 | } |
| 697 | |
| 698 | private static boolean isInstanceOf(final MBeanServer mbs, |
| 699 | final ObjectName name, |
| 700 | final String className) { |
| 701 | PrivilegedExceptionAction<Boolean> act = |
| 702 | new PrivilegedExceptionAction<Boolean>() { |
| 703 | public Boolean run() throws InstanceNotFoundException { |
| 704 | return mbs.isInstanceOf(name, className); |
| 705 | } |
| 706 | }; |
| 707 | try { |
| 708 | return AccessController.doPrivileged(act); |
| 709 | } catch (Exception e) { |
| 710 | logger.fine("isInstanceOf", "failed: " + e); |
| 711 | logger.debug("isInstanceOf", e); |
| 712 | return false; |
| 713 | } |
| 714 | } |
| 715 | |
| 716 | /* This method must not be synchronized. See the comment on the |
| 717 | * createListeners method. |
| 718 | * |
| 719 | * The notification could arrive after our buffer has been destroyed |
| 720 | * or even during its destruction. So we always add our listener |
| 721 | * (without synchronization), then we check if the buffer has been |
| 722 | * destroyed and if so remove the listener we just added. |
| 723 | */ |
| 724 | private void createdNotification(MBeanServerNotification n) { |
| 725 | final String shouldEqual = |
| 726 | MBeanServerNotification.REGISTRATION_NOTIFICATION; |
| 727 | if (!n.getType().equals(shouldEqual)) { |
| 728 | logger.warning("createNotification", "bad type: " + n.getType()); |
| 729 | return; |
| 730 | } |
| 731 | |
| 732 | ObjectName name = n.getMBeanName(); |
| 733 | if (logger.debugOn()) |
| 734 | logger.debug("createdNotification", "for: " + name); |
| 735 | |
| 736 | synchronized (this) { |
| 737 | if (createdDuringQuery != null) { |
| 738 | createdDuringQuery.add(name); |
| 739 | return; |
| 740 | } |
| 741 | } |
| 742 | |
| 743 | if (isInstanceOf(mBeanServer, name, broadcasterClass)) { |
| 744 | addBufferListener(name); |
| 745 | if (isDisposed()) |
| 746 | removeBufferListener(name); |
| 747 | } |
| 748 | } |
| 749 | |
| 750 | private class BufferListener implements NotificationListener { |
| 751 | public void handleNotification(Notification notif, Object handback) { |
| 752 | if (logger.debugOn()) { |
| 753 | logger.debug("BufferListener.handleNotification", |
| 754 | "notif=" + notif + "; handback=" + handback); |
| 755 | } |
| 756 | ObjectName name = (ObjectName) handback; |
| 757 | addNotification(new NamedNotification(name, notif)); |
| 758 | } |
| 759 | } |
| 760 | |
| 761 | private final NotificationListener bufferListener = new BufferListener(); |
| 762 | |
| 763 | private static class BroadcasterQuery |
| 764 | extends QueryEval implements QueryExp { |
| 765 | private static final long serialVersionUID = 7378487660587592048L; |
| 766 | |
| 767 | public boolean apply(final ObjectName name) { |
| 768 | final MBeanServer mbs = QueryEval.getMBeanServer(); |
| 769 | return isInstanceOf(mbs, name, broadcasterClass); |
| 770 | } |
| 771 | } |
| 772 | private static final QueryExp broadcasterQuery = new BroadcasterQuery(); |
| 773 | |
| 774 | private static final NotificationFilter creationFilter; |
| 775 | static { |
| 776 | NotificationFilterSupport nfs = new NotificationFilterSupport(); |
| 777 | nfs.enableType(MBeanServerNotification.REGISTRATION_NOTIFICATION); |
| 778 | creationFilter = nfs; |
| 779 | } |
| 780 | |
| 781 | private final NotificationListener creationListener = |
| 782 | new NotificationListener() { |
| 783 | public void handleNotification(Notification notif, |
| 784 | Object handback) { |
| 785 | logger.debug("creationListener", "handleNotification called"); |
| 786 | createdNotification((MBeanServerNotification) notif); |
| 787 | } |
| 788 | }; |
| 789 | |
| 790 | private void destroyListeners() { |
| 791 | checkNoLocks(); |
| 792 | logger.debug("destroyListeners", "starts"); |
| 793 | try { |
| 794 | removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME, |
| 795 | creationListener); |
| 796 | } catch (Exception e) { |
| 797 | logger.warning("remove listener from MBeanServer delegate", e); |
| 798 | } |
| 799 | Set<ObjectName> names = queryNames(null, broadcasterQuery); |
| 800 | for (final ObjectName name : names) { |
| 801 | if (logger.debugOn()) |
| 802 | logger.debug("destroyListeners", |
| 803 | "remove listener from " + name); |
| 804 | removeBufferListener(name); |
| 805 | } |
| 806 | logger.debug("destroyListeners", "ends"); |
| 807 | } |
| 808 | |
| 809 | private void checkNoLocks() { |
| 810 | if (Thread.holdsLock(this) || Thread.holdsLock(globalLock)) |
| 811 | logger.warning("checkNoLocks", "lock protocol violation"); |
| 812 | } |
| 813 | |
| 814 | /** |
| 815 | * Iterate until we extract the real exception |
| 816 | * from a stack of PrivilegedActionExceptions. |
| 817 | */ |
| 818 | private static Exception extractException(Exception e) { |
| 819 | while (e instanceof PrivilegedActionException) { |
| 820 | e = ((PrivilegedActionException)e).getException(); |
| 821 | } |
| 822 | return e; |
| 823 | } |
| 824 | |
| 825 | private static final ClassLogger logger = |
| 826 | new ClassLogger("javax.management.remote.misc", |
| 827 | "ArrayNotificationBuffer"); |
| 828 | |
| 829 | private final MBeanServer mBeanServer; |
| 830 | private final ArrayQueue<NamedNotification> queue; |
| 831 | private int queueSize; |
| 832 | private long earliestSequenceNumber; |
| 833 | private long nextSequenceNumber; |
| 834 | private Set<ObjectName> createdDuringQuery; |
| 835 | |
| 836 | static final String broadcasterClass = |
| 837 | NotificationBroadcaster.class.getName(); |
| 838 | } |