J. Duke | 319a3b9 | 2007-12-01 00:00:00 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| 3 | * |
| 4 | * This code is free software; you can redistribute it and/or modify it |
| 5 | * under the terms of the GNU General Public License version 2 only, as |
| 6 | * published by the Free Software Foundation. Sun designates this |
| 7 | * particular file as subject to the "Classpath" exception as provided |
| 8 | * by Sun in the LICENSE file that accompanied this code. |
| 9 | * |
| 10 | * This code is distributed in the hope that it will be useful, but WITHOUT |
| 11 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 12 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| 13 | * version 2 for more details (a copy is included in the LICENSE file that |
| 14 | * accompanied this code). |
| 15 | * |
| 16 | * You should have received a copy of the GNU General Public License version |
| 17 | * 2 along with this work; if not, write to the Free Software Foundation, |
| 18 | * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| 19 | * |
| 20 | * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
| 21 | * CA 95054 USA or visit www.sun.com if you need additional information or |
| 22 | * have any questions. |
| 23 | */ |
| 24 | |
| 25 | /* |
| 26 | * This file is available under and governed by the GNU General Public |
| 27 | * License version 2 only, as published by the Free Software Foundation. |
| 28 | * However, the following notice accompanied the original version of this |
| 29 | * file: |
| 30 | * |
| 31 | * Written by Doug Lea, Bill Scherer, and Michael Scott with |
| 32 | * assistance from members of JCP JSR-166 Expert Group and released to |
| 33 | * the public domain, as explained at |
| 34 | * http://creativecommons.org/licenses/publicdomain |
| 35 | */ |
| 36 | |
| 37 | package java.util.concurrent; |
| 38 | import java.util.concurrent.atomic.*; |
| 39 | import java.util.concurrent.locks.LockSupport; |
| 40 | |
| 41 | /** |
| 42 | * A synchronization point at which threads can pair and swap elements |
| 43 | * within pairs. Each thread presents some object on entry to the |
| 44 | * {@link #exchange exchange} method, matches with a partner thread, |
| 45 | * and receives its partner's object on return. An Exchanger may be |
| 46 | * viewed as a bidirectional form of a {@link SynchronousQueue}. |
| 47 | * Exchangers may be useful in applications such as genetic algorithms |
| 48 | * and pipeline designs. |
| 49 | * |
| 50 | * <p><b>Sample Usage:</b> |
| 51 | * Here are the highlights of a class that uses an {@code Exchanger} |
| 52 | * to swap buffers between threads so that the thread filling the |
| 53 | * buffer gets a freshly emptied one when it needs it, handing off the |
| 54 | * filled one to the thread emptying the buffer. |
| 55 | * <pre>{@code |
| 56 | * class FillAndEmpty { |
| 57 | * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); |
| 58 | * DataBuffer initialEmptyBuffer = ... a made-up type |
| 59 | * DataBuffer initialFullBuffer = ... |
| 60 | * |
| 61 | * class FillingLoop implements Runnable { |
| 62 | * public void run() { |
| 63 | * DataBuffer currentBuffer = initialEmptyBuffer; |
| 64 | * try { |
| 65 | * while (currentBuffer != null) { |
| 66 | * addToBuffer(currentBuffer); |
| 67 | * if (currentBuffer.isFull()) |
| 68 | * currentBuffer = exchanger.exchange(currentBuffer); |
| 69 | * } |
| 70 | * } catch (InterruptedException ex) { ... handle ... } |
| 71 | * } |
| 72 | * } |
| 73 | * |
| 74 | * class EmptyingLoop implements Runnable { |
| 75 | * public void run() { |
| 76 | * DataBuffer currentBuffer = initialFullBuffer; |
| 77 | * try { |
| 78 | * while (currentBuffer != null) { |
| 79 | * takeFromBuffer(currentBuffer); |
| 80 | * if (currentBuffer.isEmpty()) |
| 81 | * currentBuffer = exchanger.exchange(currentBuffer); |
| 82 | * } |
| 83 | * } catch (InterruptedException ex) { ... handle ...} |
| 84 | * } |
| 85 | * } |
| 86 | * |
| 87 | * void start() { |
| 88 | * new Thread(new FillingLoop()).start(); |
| 89 | * new Thread(new EmptyingLoop()).start(); |
| 90 | * } |
| 91 | * } |
| 92 | * }</pre> |
| 93 | * |
| 94 | * <p>Memory consistency effects: For each pair of threads that |
| 95 | * successfully exchange objects via an {@code Exchanger}, actions |
| 96 | * prior to the {@code exchange()} in each thread |
| 97 | * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> |
| 98 | * those subsequent to a return from the corresponding {@code exchange()} |
| 99 | * in the other thread. |
| 100 | * |
| 101 | * @since 1.5 |
| 102 | * @author Doug Lea and Bill Scherer and Michael Scott |
| 103 | * @param <V> The type of objects that may be exchanged |
| 104 | */ |
| 105 | public class Exchanger<V> { |
| 106 | /* |
| 107 | * Algorithm Description: |
| 108 | * |
| 109 | * The basic idea is to maintain a "slot", which is a reference to |
| 110 | * a Node containing both an Item to offer and a "hole" waiting to |
| 111 | * get filled in. If an incoming "occupying" thread sees that the |
| 112 | * slot is null, it CAS'es (compareAndSets) a Node there and waits |
| 113 | * for another to invoke exchange. That second "fulfilling" thread |
| 114 | * sees that the slot is non-null, and so CASes it back to null, |
| 115 | * also exchanging items by CASing the hole, plus waking up the |
| 116 | * occupying thread if it is blocked. In each case CAS'es may |
| 117 | * fail because a slot at first appears non-null but is null upon |
| 118 | * CAS, or vice-versa. So threads may need to retry these |
| 119 | * actions. |
| 120 | * |
| 121 | * This simple approach works great when there are only a few |
| 122 | * threads using an Exchanger, but performance rapidly |
| 123 | * deteriorates due to CAS contention on the single slot when |
| 124 | * there are lots of threads using an exchanger. So instead we use |
| 125 | * an "arena"; basically a kind of hash table with a dynamically |
| 126 | * varying number of slots, any one of which can be used by |
| 127 | * threads performing an exchange. Incoming threads pick slots |
| 128 | * based on a hash of their Thread ids. If an incoming thread |
| 129 | * fails to CAS in its chosen slot, it picks an alternative slot |
| 130 | * instead. And similarly from there. If a thread successfully |
| 131 | * CASes into a slot but no other thread arrives, it tries |
| 132 | * another, heading toward the zero slot, which always exists even |
| 133 | * if the table shrinks. The particular mechanics controlling this |
| 134 | * are as follows: |
| 135 | * |
| 136 | * Waiting: Slot zero is special in that it is the only slot that |
| 137 | * exists when there is no contention. A thread occupying slot |
| 138 | * zero will block if no thread fulfills it after a short spin. |
| 139 | * In other cases, occupying threads eventually give up and try |
| 140 | * another slot. Waiting threads spin for a while (a period that |
| 141 | * should be a little less than a typical context-switch time) |
| 142 | * before either blocking (if slot zero) or giving up (if other |
| 143 | * slots) and restarting. There is no reason for threads to block |
| 144 | * unless there are unlikely to be any other threads present. |
| 145 | * Occupants are mainly avoiding memory contention so sit there |
| 146 | * quietly polling for a shorter period than it would take to |
| 147 | * block and then unblock them. Non-slot-zero waits that elapse |
| 148 | * because of lack of other threads waste around one extra |
| 149 | * context-switch time per try, which is still on average much |
| 150 | * faster than alternative approaches. |
| 151 | * |
| 152 | * Sizing: Usually, using only a few slots suffices to reduce |
| 153 | * contention. Especially with small numbers of threads, using |
| 154 | * too many slots can lead to just as poor performance as using |
| 155 | * too few of them, and there's not much room for error. The |
| 156 | * variable "max" maintains the number of slots actually in |
| 157 | * use. It is increased when a thread sees too many CAS |
| 158 | * failures. (This is analogous to resizing a regular hash table |
| 159 | * based on a target load factor, except here, growth steps are |
| 160 | * just one-by-one rather than proportional.) Growth requires |
| 161 | * contention failures in each of three tried slots. Requiring |
| 162 | * multiple failures for expansion copes with the fact that some |
| 163 | * failed CASes are not due to contention but instead to simple |
| 164 | * races between two threads or thread pre-emptions occurring |
| 165 | * between reading and CASing. Also, very transient peak |
| 166 | * contention can be much higher than the average sustainable |
| 167 | * levels. The max limit is decreased on average 50% of the times |
| 168 | * that a non-slot-zero wait elapses without being fulfilled. |
| 169 | * Threads experiencing elapsed waits move closer to zero, so |
| 170 | * eventually find existing (or future) threads even if the table |
| 171 | * has been shrunk due to inactivity. The chosen mechanics and |
| 172 | * thresholds for growing and shrinking are intrinsically |
| 173 | * entangled with indexing and hashing inside the exchange code, |
| 174 | * and can't be nicely abstracted out. |
| 175 | * |
| 176 | * Hashing: Each thread picks its initial slot to use in accord |
| 177 | * with a simple hashcode. The sequence is the same on each |
| 178 | * encounter by any given thread, but effectively random across |
| 179 | * threads. Using arenas encounters the classic cost vs quality |
| 180 | * tradeoffs of all hash tables. Here, we use a one-step FNV-1a |
| 181 | * hash code based on the current thread's Thread.getId(), along |
| 182 | * with a cheap approximation to a mod operation to select an |
| 183 | * index. The downside of optimizing index selection in this way |
| 184 | * is that the code is hardwired to use a maximum table size of |
| 185 | * 32. But this value more than suffices for known platforms and |
| 186 | * applications. |
| 187 | * |
| 188 | * Probing: On sensed contention of a selected slot, we probe |
| 189 | * sequentially through the table, analogously to linear probing |
| 190 | * after collision in a hash table. (We move circularly, in |
| 191 | * reverse order, to mesh best with table growth and shrinkage |
| 192 | * rules.) Except that to minimize the effects of false-alarms |
| 193 | * and cache thrashing, we try the first selected slot twice |
| 194 | * before moving. |
| 195 | * |
| 196 | * Padding: Even with contention management, slots are heavily |
| 197 | * contended, so use cache-padding to avoid poor memory |
| 198 | * performance. Because of this, slots are lazily constructed |
| 199 | * only when used, to avoid wasting this space unnecessarily. |
| 200 | * While isolation of locations is not much of an issue at first |
| 201 | * in an application, as time goes on and garbage-collectors |
| 202 | * perform compaction, slots are very likely to be moved adjacent |
| 203 | * to each other, which can cause much thrashing of cache lines on |
| 204 | * MPs unless padding is employed. |
| 205 | * |
| 206 | * This is an improvement of the algorithm described in the paper |
| 207 | * "A Scalable Elimination-based Exchange Channel" by William |
| 208 | * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05 |
| 209 | * workshop. Available at: http://hdl.handle.net/1802/2104 |
| 210 | */ |
| 211 | |
| 212 | /** The number of CPUs, for sizing and spin control */ |
| 213 | private static final int NCPU = Runtime.getRuntime().availableProcessors(); |
| 214 | |
| 215 | /** |
| 216 | * The capacity of the arena. Set to a value that provides more |
| 217 | * than enough space to handle contention. On small machines |
| 218 | * most slots won't be used, but it is still not wasted because |
| 219 | * the extra space provides some machine-level address padding |
| 220 | * to minimize interference with heavily CAS'ed Slot locations. |
| 221 | * And on very large machines, performance eventually becomes |
| 222 | * bounded by memory bandwidth, not numbers of threads/CPUs. |
| 223 | * This constant cannot be changed without also modifying |
| 224 | * indexing and hashing algorithms. |
| 225 | */ |
| 226 | private static final int CAPACITY = 32; |
| 227 | |
| 228 | /** |
| 229 | * The value of "max" that will hold all threads without |
| 230 | * contention. When this value is less than CAPACITY, some |
| 231 | * otherwise wasted expansion can be avoided. |
| 232 | */ |
| 233 | private static final int FULL = |
| 234 | Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1); |
| 235 | |
| 236 | /** |
| 237 | * The number of times to spin (doing nothing except polling a |
| 238 | * memory location) before blocking or giving up while waiting to |
| 239 | * be fulfilled. Should be zero on uniprocessors. On |
| 240 | * multiprocessors, this value should be large enough so that two |
| 241 | * threads exchanging items as fast as possible block only when |
| 242 | * one of them is stalled (due to GC or preemption), but not much |
| 243 | * longer, to avoid wasting CPU resources. Seen differently, this |
| 244 | * value is a little over half the number of cycles of an average |
| 245 | * context switch time on most systems. The value here is |
| 246 | * approximately the average of those across a range of tested |
| 247 | * systems. |
| 248 | */ |
| 249 | private static final int SPINS = (NCPU == 1) ? 0 : 2000; |
| 250 | |
| 251 | /** |
| 252 | * The number of times to spin before blocking in timed waits. |
| 253 | * Timed waits spin more slowly because checking the time takes |
| 254 | * time. The best value relies mainly on the relative rate of |
| 255 | * System.nanoTime vs memory accesses. The value is empirically |
| 256 | * derived to work well across a variety of systems. |
| 257 | */ |
| 258 | private static final int TIMED_SPINS = SPINS / 20; |
| 259 | |
| 260 | /** |
| 261 | * Sentinel item representing cancellation of a wait due to |
| 262 | * interruption, timeout, or elapsed spin-waits. This value is |
| 263 | * placed in holes on cancellation, and used as a return value |
| 264 | * from waiting methods to indicate failure to set or get hole. |
| 265 | */ |
| 266 | private static final Object CANCEL = new Object(); |
| 267 | |
| 268 | /** |
| 269 | * Value representing null arguments/returns from public |
| 270 | * methods. This disambiguates from internal requirement that |
| 271 | * holes start out as null to mean they are not yet set. |
| 272 | */ |
| 273 | private static final Object NULL_ITEM = new Object(); |
| 274 | |
| 275 | /** |
| 276 | * Nodes hold partially exchanged data. This class |
| 277 | * opportunistically subclasses AtomicReference to represent the |
| 278 | * hole. So get() returns hole, and compareAndSet CAS'es value |
| 279 | * into hole. This class cannot be parameterized as "V" because |
| 280 | * of the use of non-V CANCEL sentinels. |
| 281 | */ |
| 282 | private static final class Node extends AtomicReference<Object> { |
| 283 | /** The element offered by the Thread creating this node. */ |
| 284 | public final Object item; |
| 285 | |
| 286 | /** The Thread waiting to be signalled; null until waiting. */ |
| 287 | public volatile Thread waiter; |
| 288 | |
| 289 | /** |
| 290 | * Creates node with given item and empty hole. |
| 291 | * @param item the item |
| 292 | */ |
| 293 | public Node(Object item) { |
| 294 | this.item = item; |
| 295 | } |
| 296 | } |
| 297 | |
| 298 | /** |
| 299 | * A Slot is an AtomicReference with heuristic padding to lessen |
| 300 | * cache effects of this heavily CAS'ed location. While the |
| 301 | * padding adds noticeable space, all slots are created only on |
| 302 | * demand, and there will be more than one of them only when it |
| 303 | * would improve throughput more than enough to outweigh using |
| 304 | * extra space. |
| 305 | */ |
| 306 | private static final class Slot extends AtomicReference<Object> { |
| 307 | // Improve likelihood of isolation on <= 64 byte cache lines |
| 308 | long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; |
| 309 | } |
| 310 | |
| 311 | /** |
| 312 | * Slot array. Elements are lazily initialized when needed. |
| 313 | * Declared volatile to enable double-checked lazy construction. |
| 314 | */ |
| 315 | private volatile Slot[] arena = new Slot[CAPACITY]; |
| 316 | |
| 317 | /** |
| 318 | * The maximum slot index being used. The value sometimes |
| 319 | * increases when a thread experiences too many CAS contentions, |
| 320 | * and sometimes decreases when a spin-wait elapses. Changes |
| 321 | * are performed only via compareAndSet, to avoid stale values |
| 322 | * when a thread happens to stall right before setting. |
| 323 | */ |
| 324 | private final AtomicInteger max = new AtomicInteger(); |
| 325 | |
| 326 | /** |
| 327 | * Main exchange function, handling the different policy variants. |
| 328 | * Uses Object, not "V" as argument and return value to simplify |
| 329 | * handling of sentinel values. Callers from public methods decode |
| 330 | * and cast accordingly. |
| 331 | * |
| 332 | * @param item the (non-null) item to exchange |
| 333 | * @param timed true if the wait is timed |
| 334 | * @param nanos if timed, the maximum wait time |
| 335 | * @return the other thread's item, or CANCEL if interrupted or timed out |
| 336 | */ |
| 337 | private Object doExchange(Object item, boolean timed, long nanos) { |
| 338 | Node me = new Node(item); // Create in case occupying |
| 339 | int index = hashIndex(); // Index of current slot |
| 340 | int fails = 0; // Number of CAS failures |
| 341 | |
| 342 | for (;;) { |
| 343 | Object y; // Contents of current slot |
| 344 | Slot slot = arena[index]; |
| 345 | if (slot == null) // Lazily initialize slots |
| 346 | createSlot(index); // Continue loop to reread |
| 347 | else if ((y = slot.get()) != null && // Try to fulfill |
| 348 | slot.compareAndSet(y, null)) { |
| 349 | Node you = (Node)y; // Transfer item |
| 350 | if (you.compareAndSet(null, item)) { |
| 351 | LockSupport.unpark(you.waiter); |
| 352 | return you.item; |
| 353 | } // Else cancelled; continue |
| 354 | } |
| 355 | else if (y == null && // Try to occupy |
| 356 | slot.compareAndSet(null, me)) { |
| 357 | if (index == 0) // Blocking wait for slot 0 |
| 358 | return timed? awaitNanos(me, slot, nanos): await(me, slot); |
| 359 | Object v = spinWait(me, slot); // Spin wait for non-0 |
| 360 | if (v != CANCEL) |
| 361 | return v; |
| 362 | me = new Node(item); // Throw away cancelled node |
| 363 | int m = max.get(); |
| 364 | if (m > (index >>>= 1)) // Decrease index |
| 365 | max.compareAndSet(m, m - 1); // Maybe shrink table |
| 366 | } |
| 367 | else if (++fails > 1) { // Allow 2 fails on 1st slot |
| 368 | int m = max.get(); |
| 369 | if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) |
| 370 | index = m + 1; // Grow on 3rd failed slot |
| 371 | else if (--index < 0) |
| 372 | index = m; // Circularly traverse |
| 373 | } |
| 374 | } |
| 375 | } |
| 376 | |
| 377 | /** |
| 378 | * Returns a hash index for the current thread. Uses a one-step |
| 379 | * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/) |
| 380 | * based on the current thread's Thread.getId(). These hash codes |
| 381 | * have more uniform distribution properties with respect to small |
| 382 | * moduli (here 1-31) than do other simple hashing functions. |
| 383 | * |
| 384 | * <p>To return an index between 0 and max, we use a cheap |
| 385 | * approximation to a mod operation, that also corrects for bias |
| 386 | * due to non-power-of-2 remaindering (see {@link |
| 387 | * java.util.Random#nextInt}). Bits of the hashcode are masked |
| 388 | * with "nbits", the ceiling power of two of table size (looked up |
| 389 | * in a table packed into three ints). If too large, this is |
| 390 | * retried after rotating the hash by nbits bits, while forcing new |
| 391 | * top bit to 0, which guarantees eventual termination (although |
| 392 | * with a non-random-bias). This requires an average of less than |
| 393 | * 2 tries for all table sizes, and has a maximum 2% difference |
| 394 | * from perfectly uniform slot probabilities when applied to all |
| 395 | * possible hash codes for sizes less than 32. |
| 396 | * |
| 397 | * @return a per-thread-random index, 0 <= index < max |
| 398 | */ |
| 399 | private final int hashIndex() { |
| 400 | long id = Thread.currentThread().getId(); |
| 401 | int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193; |
| 402 | |
| 403 | int m = max.get(); |
| 404 | int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1)) |
| 405 | ((0x000001f8 >>> m) & 2) | // The constants hold |
| 406 | ((0xffff00f2 >>> m) & 1)); // a lookup table |
| 407 | int index; |
| 408 | while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on |
| 409 | hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m |
| 410 | return index; |
| 411 | } |
| 412 | |
| 413 | /** |
| 414 | * Creates a new slot at given index. Called only when the slot |
| 415 | * appears to be null. Relies on double-check using builtin |
| 416 | * locks, since they rarely contend. This in turn relies on the |
| 417 | * arena array being declared volatile. |
| 418 | * |
| 419 | * @param index the index to add slot at |
| 420 | */ |
| 421 | private void createSlot(int index) { |
| 422 | // Create slot outside of lock to narrow sync region |
| 423 | Slot newSlot = new Slot(); |
| 424 | Slot[] a = arena; |
| 425 | synchronized (a) { |
| 426 | if (a[index] == null) |
| 427 | a[index] = newSlot; |
| 428 | } |
| 429 | } |
| 430 | |
| 431 | /** |
| 432 | * Tries to cancel a wait for the given node waiting in the given |
| 433 | * slot, if so, helping clear the node from its slot to avoid |
| 434 | * garbage retention. |
| 435 | * |
| 436 | * @param node the waiting node |
| 437 | * @param the slot it is waiting in |
| 438 | * @return true if successfully cancelled |
| 439 | */ |
| 440 | private static boolean tryCancel(Node node, Slot slot) { |
| 441 | if (!node.compareAndSet(null, CANCEL)) |
| 442 | return false; |
| 443 | if (slot.get() == node) // pre-check to minimize contention |
| 444 | slot.compareAndSet(node, null); |
| 445 | return true; |
| 446 | } |
| 447 | |
| 448 | // Three forms of waiting. Each just different enough not to merge |
| 449 | // code with others. |
| 450 | |
| 451 | /** |
| 452 | * Spin-waits for hole for a non-0 slot. Fails if spin elapses |
| 453 | * before hole filled. Does not check interrupt, relying on check |
| 454 | * in public exchange method to abort if interrupted on entry. |
| 455 | * |
| 456 | * @param node the waiting node |
| 457 | * @return on success, the hole; on failure, CANCEL |
| 458 | */ |
| 459 | private static Object spinWait(Node node, Slot slot) { |
| 460 | int spins = SPINS; |
| 461 | for (;;) { |
| 462 | Object v = node.get(); |
| 463 | if (v != null) |
| 464 | return v; |
| 465 | else if (spins > 0) |
| 466 | --spins; |
| 467 | else |
| 468 | tryCancel(node, slot); |
| 469 | } |
| 470 | } |
| 471 | |
| 472 | /** |
| 473 | * Waits for (by spinning and/or blocking) and gets the hole |
| 474 | * filled in by another thread. Fails if interrupted before |
| 475 | * hole filled. |
| 476 | * |
| 477 | * When a node/thread is about to block, it sets its waiter field |
| 478 | * and then rechecks state at least one more time before actually |
| 479 | * parking, thus covering race vs fulfiller noticing that waiter |
| 480 | * is non-null so should be woken. |
| 481 | * |
| 482 | * Thread interruption status is checked only surrounding calls to |
| 483 | * park. The caller is assumed to have checked interrupt status |
| 484 | * on entry. |
| 485 | * |
| 486 | * @param node the waiting node |
| 487 | * @return on success, the hole; on failure, CANCEL |
| 488 | */ |
| 489 | private static Object await(Node node, Slot slot) { |
| 490 | Thread w = Thread.currentThread(); |
| 491 | int spins = SPINS; |
| 492 | for (;;) { |
| 493 | Object v = node.get(); |
| 494 | if (v != null) |
| 495 | return v; |
| 496 | else if (spins > 0) // Spin-wait phase |
| 497 | --spins; |
| 498 | else if (node.waiter == null) // Set up to block next |
| 499 | node.waiter = w; |
| 500 | else if (w.isInterrupted()) // Abort on interrupt |
| 501 | tryCancel(node, slot); |
| 502 | else // Block |
| 503 | LockSupport.park(node); |
| 504 | } |
| 505 | } |
| 506 | |
| 507 | /** |
| 508 | * Waits for (at index 0) and gets the hole filled in by another |
| 509 | * thread. Fails if timed out or interrupted before hole filled. |
| 510 | * Same basic logic as untimed version, but a bit messier. |
| 511 | * |
| 512 | * @param node the waiting node |
| 513 | * @param nanos the wait time |
| 514 | * @return on success, the hole; on failure, CANCEL |
| 515 | */ |
| 516 | private Object awaitNanos(Node node, Slot slot, long nanos) { |
| 517 | int spins = TIMED_SPINS; |
| 518 | long lastTime = 0; |
| 519 | Thread w = null; |
| 520 | for (;;) { |
| 521 | Object v = node.get(); |
| 522 | if (v != null) |
| 523 | return v; |
| 524 | long now = System.nanoTime(); |
| 525 | if (w == null) |
| 526 | w = Thread.currentThread(); |
| 527 | else |
| 528 | nanos -= now - lastTime; |
| 529 | lastTime = now; |
| 530 | if (nanos > 0) { |
| 531 | if (spins > 0) |
| 532 | --spins; |
| 533 | else if (node.waiter == null) |
| 534 | node.waiter = w; |
| 535 | else if (w.isInterrupted()) |
| 536 | tryCancel(node, slot); |
| 537 | else |
| 538 | LockSupport.parkNanos(node, nanos); |
| 539 | } |
| 540 | else if (tryCancel(node, slot) && !w.isInterrupted()) |
| 541 | return scanOnTimeout(node); |
| 542 | } |
| 543 | } |
| 544 | |
| 545 | /** |
| 546 | * Sweeps through arena checking for any waiting threads. Called |
| 547 | * only upon return from timeout while waiting in slot 0. When a |
| 548 | * thread gives up on a timed wait, it is possible that a |
| 549 | * previously-entered thread is still waiting in some other |
| 550 | * slot. So we scan to check for any. This is almost always |
| 551 | * overkill, but decreases the likelihood of timeouts when there |
| 552 | * are other threads present to far less than that in lock-based |
| 553 | * exchangers in which earlier-arriving threads may still be |
| 554 | * waiting on entry locks. |
| 555 | * |
| 556 | * @param node the waiting node |
| 557 | * @return another thread's item, or CANCEL |
| 558 | */ |
| 559 | private Object scanOnTimeout(Node node) { |
| 560 | Object y; |
| 561 | for (int j = arena.length - 1; j >= 0; --j) { |
| 562 | Slot slot = arena[j]; |
| 563 | if (slot != null) { |
| 564 | while ((y = slot.get()) != null) { |
| 565 | if (slot.compareAndSet(y, null)) { |
| 566 | Node you = (Node)y; |
| 567 | if (you.compareAndSet(null, node.item)) { |
| 568 | LockSupport.unpark(you.waiter); |
| 569 | return you.item; |
| 570 | } |
| 571 | } |
| 572 | } |
| 573 | } |
| 574 | } |
| 575 | return CANCEL; |
| 576 | } |
| 577 | |
| 578 | /** |
| 579 | * Creates a new Exchanger. |
| 580 | */ |
| 581 | public Exchanger() { |
| 582 | } |
| 583 | |
| 584 | /** |
| 585 | * Waits for another thread to arrive at this exchange point (unless |
| 586 | * the current thread is {@linkplain Thread#interrupt interrupted}), |
| 587 | * and then transfers the given object to it, receiving its object |
| 588 | * in return. |
| 589 | * |
| 590 | * <p>If another thread is already waiting at the exchange point then |
| 591 | * it is resumed for thread scheduling purposes and receives the object |
| 592 | * passed in by the current thread. The current thread returns immediately, |
| 593 | * receiving the object passed to the exchange by that other thread. |
| 594 | * |
| 595 | * <p>If no other thread is already waiting at the exchange then the |
| 596 | * current thread is disabled for thread scheduling purposes and lies |
| 597 | * dormant until one of two things happens: |
| 598 | * <ul> |
| 599 | * <li>Some other thread enters the exchange; or |
| 600 | * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current |
| 601 | * thread. |
| 602 | * </ul> |
| 603 | * <p>If the current thread: |
| 604 | * <ul> |
| 605 | * <li>has its interrupted status set on entry to this method; or |
| 606 | * <li>is {@linkplain Thread#interrupt interrupted} while waiting |
| 607 | * for the exchange, |
| 608 | * </ul> |
| 609 | * then {@link InterruptedException} is thrown and the current thread's |
| 610 | * interrupted status is cleared. |
| 611 | * |
| 612 | * @param x the object to exchange |
| 613 | * @return the object provided by the other thread |
| 614 | * @throws InterruptedException if the current thread was |
| 615 | * interrupted while waiting |
| 616 | */ |
| 617 | public V exchange(V x) throws InterruptedException { |
| 618 | if (!Thread.interrupted()) { |
| 619 | Object v = doExchange(x == null? NULL_ITEM : x, false, 0); |
| 620 | if (v == NULL_ITEM) |
| 621 | return null; |
| 622 | if (v != CANCEL) |
| 623 | return (V)v; |
| 624 | Thread.interrupted(); // Clear interrupt status on IE throw |
| 625 | } |
| 626 | throw new InterruptedException(); |
| 627 | } |
| 628 | |
| 629 | /** |
| 630 | * Waits for another thread to arrive at this exchange point (unless |
| 631 | * the current thread is {@linkplain Thread#interrupt interrupted} or |
| 632 | * the specified waiting time elapses), and then transfers the given |
| 633 | * object to it, receiving its object in return. |
| 634 | * |
| 635 | * <p>If another thread is already waiting at the exchange point then |
| 636 | * it is resumed for thread scheduling purposes and receives the object |
| 637 | * passed in by the current thread. The current thread returns immediately, |
| 638 | * receiving the object passed to the exchange by that other thread. |
| 639 | * |
| 640 | * <p>If no other thread is already waiting at the exchange then the |
| 641 | * current thread is disabled for thread scheduling purposes and lies |
| 642 | * dormant until one of three things happens: |
| 643 | * <ul> |
| 644 | * <li>Some other thread enters the exchange; or |
| 645 | * <li>Some other thread {@linkplain Thread#interrupt interrupts} |
| 646 | * the current thread; or |
| 647 | * <li>The specified waiting time elapses. |
| 648 | * </ul> |
| 649 | * <p>If the current thread: |
| 650 | * <ul> |
| 651 | * <li>has its interrupted status set on entry to this method; or |
| 652 | * <li>is {@linkplain Thread#interrupt interrupted} while waiting |
| 653 | * for the exchange, |
| 654 | * </ul> |
| 655 | * then {@link InterruptedException} is thrown and the current thread's |
| 656 | * interrupted status is cleared. |
| 657 | * |
| 658 | * <p>If the specified waiting time elapses then {@link |
| 659 | * TimeoutException} is thrown. If the time is less than or equal |
| 660 | * to zero, the method will not wait at all. |
| 661 | * |
| 662 | * @param x the object to exchange |
| 663 | * @param timeout the maximum time to wait |
| 664 | * @param unit the time unit of the <tt>timeout</tt> argument |
| 665 | * @return the object provided by the other thread |
| 666 | * @throws InterruptedException if the current thread was |
| 667 | * interrupted while waiting |
| 668 | * @throws TimeoutException if the specified waiting time elapses |
| 669 | * before another thread enters the exchange |
| 670 | */ |
| 671 | public V exchange(V x, long timeout, TimeUnit unit) |
| 672 | throws InterruptedException, TimeoutException { |
| 673 | if (!Thread.interrupted()) { |
| 674 | Object v = doExchange(x == null? NULL_ITEM : x, |
| 675 | true, unit.toNanos(timeout)); |
| 676 | if (v == NULL_ITEM) |
| 677 | return null; |
| 678 | if (v != CANCEL) |
| 679 | return (V)v; |
| 680 | if (!Thread.interrupted()) |
| 681 | throw new TimeoutException(); |
| 682 | } |
| 683 | throw new InterruptedException(); |
| 684 | } |
| 685 | } |