blob: 91316fe5e21d9b28623f1025f7446456d32a1934 [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Sun designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Sun in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36package java.util.concurrent;
37import java.util.concurrent.locks.*;
38
39/**
40 * A synchronization aid that allows a set of threads to all wait for
41 * each other to reach a common barrier point. CyclicBarriers are
42 * useful in programs involving a fixed sized party of threads that
43 * must occasionally wait for each other. The barrier is called
44 * <em>cyclic</em> because it can be re-used after the waiting threads
45 * are released.
46 *
47 * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
48 * that is run once per barrier point, after the last thread in the party
49 * arrives, but before any threads are released.
50 * This <em>barrier action</em> is useful
51 * for updating shared-state before any of the parties continue.
52 *
53 * <p><b>Sample usage:</b> Here is an example of
54 * using a barrier in a parallel decomposition design:
55 * <pre>
56 * class Solver {
57 * final int N;
58 * final float[][] data;
59 * final CyclicBarrier barrier;
60 *
61 * class Worker implements Runnable {
62 * int myRow;
63 * Worker(int row) { myRow = row; }
64 * public void run() {
65 * while (!done()) {
66 * processRow(myRow);
67 *
68 * try {
69 * barrier.await();
70 * } catch (InterruptedException ex) {
71 * return;
72 * } catch (BrokenBarrierException ex) {
73 * return;
74 * }
75 * }
76 * }
77 * }
78 *
79 * public Solver(float[][] matrix) {
80 * data = matrix;
81 * N = matrix.length;
82 * barrier = new CyclicBarrier(N,
83 * new Runnable() {
84 * public void run() {
85 * mergeRows(...);
86 * }
87 * });
88 * for (int i = 0; i < N; ++i)
89 * new Thread(new Worker(i)).start();
90 *
91 * waitUntilDone();
92 * }
93 * }
94 * </pre>
95 * Here, each worker thread processes a row of the matrix then waits at the
96 * barrier until all rows have been processed. When all rows are processed
97 * the supplied {@link Runnable} barrier action is executed and merges the
98 * rows. If the merger
99 * determines that a solution has been found then <tt>done()</tt> will return
100 * <tt>true</tt> and each worker will terminate.
101 *
102 * <p>If the barrier action does not rely on the parties being suspended when
103 * it is executed, then any of the threads in the party could execute that
104 * action when it is released. To facilitate this, each invocation of
105 * {@link #await} returns the arrival index of that thread at the barrier.
106 * You can then choose which thread should execute the barrier action, for
107 * example:
108 * <pre> if (barrier.await() == 0) {
109 * // log the completion of this iteration
110 * }</pre>
111 *
112 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
113 * for failed synchronization attempts: If a thread leaves a barrier
114 * point prematurely because of interruption, failure, or timeout, all
115 * other threads waiting at that barrier point will also leave
116 * abnormally via {@link BrokenBarrierException} (or
117 * {@link InterruptedException} if they too were interrupted at about
118 * the same time).
119 *
120 * <p>Memory consistency effects: Actions in a thread prior to calling
121 * {@code await()}
122 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
123 * actions that are part of the barrier action, which in turn
124 * <i>happen-before</i> actions following a successful return from the
125 * corresponding {@code await()} in other threads.
126 *
127 * @since 1.5
128 * @see CountDownLatch
129 *
130 * @author Doug Lea
131 */
132public class CyclicBarrier {
133 /**
134 * Each use of the barrier is represented as a generation instance.
135 * The generation changes whenever the barrier is tripped, or
136 * is reset. There can be many generations associated with threads
137 * using the barrier - due to the non-deterministic way the lock
138 * may be allocated to waiting threads - but only one of these
139 * can be active at a time (the one to which <tt>count</tt> applies)
140 * and all the rest are either broken or tripped.
141 * There need not be an active generation if there has been a break
142 * but no subsequent reset.
143 */
144 private static class Generation {
145 boolean broken = false;
146 }
147
148 /** The lock for guarding barrier entry */
149 private final ReentrantLock lock = new ReentrantLock();
150 /** Condition to wait on until tripped */
151 private final Condition trip = lock.newCondition();
152 /** The number of parties */
153 private final int parties;
154 /* The command to run when tripped */
155 private final Runnable barrierCommand;
156 /** The current generation */
157 private Generation generation = new Generation();
158
159 /**
160 * Number of parties still waiting. Counts down from parties to 0
161 * on each generation. It is reset to parties on each new
162 * generation or when broken.
163 */
164 private int count;
165
166 /**
167 * Updates state on barrier trip and wakes up everyone.
168 * Called only while holding lock.
169 */
170 private void nextGeneration() {
171 // signal completion of last generation
172 trip.signalAll();
173 // set up next generation
174 count = parties;
175 generation = new Generation();
176 }
177
178 /**
179 * Sets current barrier generation as broken and wakes up everyone.
180 * Called only while holding lock.
181 */
182 private void breakBarrier() {
183 generation.broken = true;
184 count = parties;
185 trip.signalAll();
186 }
187
188 /**
189 * Main barrier code, covering the various policies.
190 */
191 private int dowait(boolean timed, long nanos)
192 throws InterruptedException, BrokenBarrierException,
193 TimeoutException {
194 final ReentrantLock lock = this.lock;
195 lock.lock();
196 try {
197 final Generation g = generation;
198
199 if (g.broken)
200 throw new BrokenBarrierException();
201
202 if (Thread.interrupted()) {
203 breakBarrier();
204 throw new InterruptedException();
205 }
206
207 int index = --count;
208 if (index == 0) { // tripped
209 boolean ranAction = false;
210 try {
211 final Runnable command = barrierCommand;
212 if (command != null)
213 command.run();
214 ranAction = true;
215 nextGeneration();
216 return 0;
217 } finally {
218 if (!ranAction)
219 breakBarrier();
220 }
221 }
222
223 // loop until tripped, broken, interrupted, or timed out
224 for (;;) {
225 try {
226 if (!timed)
227 trip.await();
228 else if (nanos > 0L)
229 nanos = trip.awaitNanos(nanos);
230 } catch (InterruptedException ie) {
231 if (g == generation && ! g.broken) {
232 breakBarrier();
233 throw ie;
234 } else {
235 // We're about to finish waiting even if we had not
236 // been interrupted, so this interrupt is deemed to
237 // "belong" to subsequent execution.
238 Thread.currentThread().interrupt();
239 }
240 }
241
242 if (g.broken)
243 throw new BrokenBarrierException();
244
245 if (g != generation)
246 return index;
247
248 if (timed && nanos <= 0L) {
249 breakBarrier();
250 throw new TimeoutException();
251 }
252 }
253 } finally {
254 lock.unlock();
255 }
256 }
257
258 /**
259 * Creates a new <tt>CyclicBarrier</tt> that will trip when the
260 * given number of parties (threads) are waiting upon it, and which
261 * will execute the given barrier action when the barrier is tripped,
262 * performed by the last thread entering the barrier.
263 *
264 * @param parties the number of threads that must invoke {@link #await}
265 * before the barrier is tripped
266 * @param barrierAction the command to execute when the barrier is
267 * tripped, or {@code null} if there is no action
268 * @throws IllegalArgumentException if {@code parties} is less than 1
269 */
270 public CyclicBarrier(int parties, Runnable barrierAction) {
271 if (parties <= 0) throw new IllegalArgumentException();
272 this.parties = parties;
273 this.count = parties;
274 this.barrierCommand = barrierAction;
275 }
276
277 /**
278 * Creates a new <tt>CyclicBarrier</tt> that will trip when the
279 * given number of parties (threads) are waiting upon it, and
280 * does not perform a predefined action when the barrier is tripped.
281 *
282 * @param parties the number of threads that must invoke {@link #await}
283 * before the barrier is tripped
284 * @throws IllegalArgumentException if {@code parties} is less than 1
285 */
286 public CyclicBarrier(int parties) {
287 this(parties, null);
288 }
289
290 /**
291 * Returns the number of parties required to trip this barrier.
292 *
293 * @return the number of parties required to trip this barrier
294 */
295 public int getParties() {
296 return parties;
297 }
298
299 /**
300 * Waits until all {@linkplain #getParties parties} have invoked
301 * <tt>await</tt> on this barrier.
302 *
303 * <p>If the current thread is not the last to arrive then it is
304 * disabled for thread scheduling purposes and lies dormant until
305 * one of the following things happens:
306 * <ul>
307 * <li>The last thread arrives; or
308 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
309 * the current thread; or
310 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
311 * one of the other waiting threads; or
312 * <li>Some other thread times out while waiting for barrier; or
313 * <li>Some other thread invokes {@link #reset} on this barrier.
314 * </ul>
315 *
316 * <p>If the current thread:
317 * <ul>
318 * <li>has its interrupted status set on entry to this method; or
319 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
320 * </ul>
321 * then {@link InterruptedException} is thrown and the current thread's
322 * interrupted status is cleared.
323 *
324 * <p>If the barrier is {@link #reset} while any thread is waiting,
325 * or if the barrier {@linkplain #isBroken is broken} when
326 * <tt>await</tt> is invoked, or while any thread is waiting, then
327 * {@link BrokenBarrierException} is thrown.
328 *
329 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
330 * then all other waiting threads will throw
331 * {@link BrokenBarrierException} and the barrier is placed in the broken
332 * state.
333 *
334 * <p>If the current thread is the last thread to arrive, and a
335 * non-null barrier action was supplied in the constructor, then the
336 * current thread runs the action before allowing the other threads to
337 * continue.
338 * If an exception occurs during the barrier action then that exception
339 * will be propagated in the current thread and the barrier is placed in
340 * the broken state.
341 *
342 * @return the arrival index of the current thread, where index
343 * <tt>{@link #getParties()} - 1</tt> indicates the first
344 * to arrive and zero indicates the last to arrive
345 * @throws InterruptedException if the current thread was interrupted
346 * while waiting
347 * @throws BrokenBarrierException if <em>another</em> thread was
348 * interrupted or timed out while the current thread was
349 * waiting, or the barrier was reset, or the barrier was
350 * broken when {@code await} was called, or the barrier
351 * action (if present) failed due an exception.
352 */
353 public int await() throws InterruptedException, BrokenBarrierException {
354 try {
355 return dowait(false, 0L);
356 } catch (TimeoutException toe) {
357 throw new Error(toe); // cannot happen;
358 }
359 }
360
361 /**
362 * Waits until all {@linkplain #getParties parties} have invoked
363 * <tt>await</tt> on this barrier, or the specified waiting time elapses.
364 *
365 * <p>If the current thread is not the last to arrive then it is
366 * disabled for thread scheduling purposes and lies dormant until
367 * one of the following things happens:
368 * <ul>
369 * <li>The last thread arrives; or
370 * <li>The specified timeout elapses; or
371 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
372 * the current thread; or
373 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
374 * one of the other waiting threads; or
375 * <li>Some other thread times out while waiting for barrier; or
376 * <li>Some other thread invokes {@link #reset} on this barrier.
377 * </ul>
378 *
379 * <p>If the current thread:
380 * <ul>
381 * <li>has its interrupted status set on entry to this method; or
382 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
383 * </ul>
384 * then {@link InterruptedException} is thrown and the current thread's
385 * interrupted status is cleared.
386 *
387 * <p>If the specified waiting time elapses then {@link TimeoutException}
388 * is thrown. If the time is less than or equal to zero, the
389 * method will not wait at all.
390 *
391 * <p>If the barrier is {@link #reset} while any thread is waiting,
392 * or if the barrier {@linkplain #isBroken is broken} when
393 * <tt>await</tt> is invoked, or while any thread is waiting, then
394 * {@link BrokenBarrierException} is thrown.
395 *
396 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
397 * waiting, then all other waiting threads will throw {@link
398 * BrokenBarrierException} and the barrier is placed in the broken
399 * state.
400 *
401 * <p>If the current thread is the last thread to arrive, and a
402 * non-null barrier action was supplied in the constructor, then the
403 * current thread runs the action before allowing the other threads to
404 * continue.
405 * If an exception occurs during the barrier action then that exception
406 * will be propagated in the current thread and the barrier is placed in
407 * the broken state.
408 *
409 * @param timeout the time to wait for the barrier
410 * @param unit the time unit of the timeout parameter
411 * @return the arrival index of the current thread, where index
412 * <tt>{@link #getParties()} - 1</tt> indicates the first
413 * to arrive and zero indicates the last to arrive
414 * @throws InterruptedException if the current thread was interrupted
415 * while waiting
416 * @throws TimeoutException if the specified timeout elapses
417 * @throws BrokenBarrierException if <em>another</em> thread was
418 * interrupted or timed out while the current thread was
419 * waiting, or the barrier was reset, or the barrier was broken
420 * when {@code await} was called, or the barrier action (if
421 * present) failed due an exception
422 */
423 public int await(long timeout, TimeUnit unit)
424 throws InterruptedException,
425 BrokenBarrierException,
426 TimeoutException {
427 return dowait(true, unit.toNanos(timeout));
428 }
429
430 /**
431 * Queries if this barrier is in a broken state.
432 *
433 * @return {@code true} if one or more parties broke out of this
434 * barrier due to interruption or timeout since
435 * construction or the last reset, or a barrier action
436 * failed due to an exception; {@code false} otherwise.
437 */
438 public boolean isBroken() {
439 final ReentrantLock lock = this.lock;
440 lock.lock();
441 try {
442 return generation.broken;
443 } finally {
444 lock.unlock();
445 }
446 }
447
448 /**
449 * Resets the barrier to its initial state. If any parties are
450 * currently waiting at the barrier, they will return with a
451 * {@link BrokenBarrierException}. Note that resets <em>after</em>
452 * a breakage has occurred for other reasons can be complicated to
453 * carry out; threads need to re-synchronize in some other way,
454 * and choose one to perform the reset. It may be preferable to
455 * instead create a new barrier for subsequent use.
456 */
457 public void reset() {
458 final ReentrantLock lock = this.lock;
459 lock.lock();
460 try {
461 breakBarrier(); // break the current generation
462 nextGeneration(); // start a new generation
463 } finally {
464 lock.unlock();
465 }
466 }
467
468 /**
469 * Returns the number of parties currently waiting at the barrier.
470 * This method is primarily useful for debugging and assertions.
471 *
472 * @return the number of parties currently blocked in {@link #await}
473 */
474 public int getNumberWaiting() {
475 final ReentrantLock lock = this.lock;
476 lock.lock();
477 try {
478 return parties - count;
479 } finally {
480 lock.unlock();
481 }
482 }
483}