blob: ffd8b6e7001c72f63aa519072df18e08f4b23c46 [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Sun designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Sun in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36package java.util.concurrent;
37
38import java.util.concurrent.locks.*;
39import java.util.*;
40
41/**
42 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
43 * the same ordering rules as class {@link PriorityQueue} and supplies
44 * blocking retrieval operations. While this queue is logically
45 * unbounded, attempted additions may fail due to resource exhaustion
46 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
47 * <tt>null</tt> elements. A priority queue relying on {@linkplain
48 * Comparable natural ordering} also does not permit insertion of
49 * non-comparable objects (doing so results in
50 * <tt>ClassCastException</tt>).
51 *
52 * <p>This class and its iterator implement all of the
53 * <em>optional</em> methods of the {@link Collection} and {@link
54 * Iterator} interfaces. The Iterator provided in method {@link
55 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
56 * the PriorityBlockingQueue in any particular order. If you need
57 * ordered traversal, consider using
58 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
59 * can be used to <em>remove</em> some or all elements in priority
60 * order and place them in another collection.
61 *
62 * <p>Operations on this class make no guarantees about the ordering
63 * of elements with equal priority. If you need to enforce an
64 * ordering, you can define custom classes or comparators that use a
65 * secondary key to break ties in primary priority values. For
66 * example, here is a class that applies first-in-first-out
67 * tie-breaking to comparable elements. To use it, you would insert a
68 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
69 *
70 * <pre>
71 * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
72 * implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
73 * final static AtomicLong seq = new AtomicLong();
74 * final long seqNum;
75 * final E entry;
76 * public FIFOEntry(E entry) {
77 * seqNum = seq.getAndIncrement();
78 * this.entry = entry;
79 * }
80 * public E getEntry() { return entry; }
81 * public int compareTo(FIFOEntry&lt;E&gt; other) {
82 * int res = entry.compareTo(other.entry);
83 * if (res == 0 &amp;&amp; other.entry != this.entry)
84 * res = (seqNum &lt; other.seqNum ? -1 : 1);
85 * return res;
86 * }
87 * }</pre>
88 *
89 * <p>This class is a member of the
90 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
91 * Java Collections Framework</a>.
92 *
93 * @since 1.5
94 * @author Doug Lea
95 * @param <E> the type of elements held in this collection
96 */
97public class PriorityBlockingQueue<E> extends AbstractQueue<E>
98 implements BlockingQueue<E>, java.io.Serializable {
99 private static final long serialVersionUID = 5595510919245408276L;
100
101 private final PriorityQueue<E> q;
102 private final ReentrantLock lock = new ReentrantLock(true);
103 private final Condition notEmpty = lock.newCondition();
104
105 /**
106 * Creates a <tt>PriorityBlockingQueue</tt> with the default
107 * initial capacity (11) that orders its elements according to
108 * their {@linkplain Comparable natural ordering}.
109 */
110 public PriorityBlockingQueue() {
111 q = new PriorityQueue<E>();
112 }
113
114 /**
115 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
116 * initial capacity that orders its elements according to their
117 * {@linkplain Comparable natural ordering}.
118 *
119 * @param initialCapacity the initial capacity for this priority queue
120 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
121 * than 1
122 */
123 public PriorityBlockingQueue(int initialCapacity) {
124 q = new PriorityQueue<E>(initialCapacity, null);
125 }
126
127 /**
128 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
129 * capacity that orders its elements according to the specified
130 * comparator.
131 *
132 * @param initialCapacity the initial capacity for this priority queue
133 * @param comparator the comparator that will be used to order this
134 * priority queue. If {@code null}, the {@linkplain Comparable
135 * natural ordering} of the elements will be used.
136 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
137 * than 1
138 */
139 public PriorityBlockingQueue(int initialCapacity,
140 Comparator<? super E> comparator) {
141 q = new PriorityQueue<E>(initialCapacity, comparator);
142 }
143
144 /**
145 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
146 * in the specified collection. If the specified collection is a
147 * {@link SortedSet} or a {@link PriorityQueue}, this
148 * priority queue will be ordered according to the same ordering.
149 * Otherwise, this priority queue will be ordered according to the
150 * {@linkplain Comparable natural ordering} of its elements.
151 *
152 * @param c the collection whose elements are to be placed
153 * into this priority queue
154 * @throws ClassCastException if elements of the specified collection
155 * cannot be compared to one another according to the priority
156 * queue's ordering
157 * @throws NullPointerException if the specified collection or any
158 * of its elements are null
159 */
160 public PriorityBlockingQueue(Collection<? extends E> c) {
161 q = new PriorityQueue<E>(c);
162 }
163
164 /**
165 * Inserts the specified element into this priority queue.
166 *
167 * @param e the element to add
168 * @return <tt>true</tt> (as specified by {@link Collection#add})
169 * @throws ClassCastException if the specified element cannot be compared
170 * with elements currently in the priority queue according to the
171 * priority queue's ordering
172 * @throws NullPointerException if the specified element is null
173 */
174 public boolean add(E e) {
175 return offer(e);
176 }
177
178 /**
179 * Inserts the specified element into this priority queue.
180 *
181 * @param e the element to add
182 * @return <tt>true</tt> (as specified by {@link Queue#offer})
183 * @throws ClassCastException if the specified element cannot be compared
184 * with elements currently in the priority queue according to the
185 * priority queue's ordering
186 * @throws NullPointerException if the specified element is null
187 */
188 public boolean offer(E e) {
189 final ReentrantLock lock = this.lock;
190 lock.lock();
191 try {
192 boolean ok = q.offer(e);
193 assert ok;
194 notEmpty.signal();
195 return true;
196 } finally {
197 lock.unlock();
198 }
199 }
200
201 /**
202 * Inserts the specified element into this priority queue. As the queue is
203 * unbounded this method will never block.
204 *
205 * @param e the element to add
206 * @throws ClassCastException if the specified element cannot be compared
207 * with elements currently in the priority queue according to the
208 * priority queue's ordering
209 * @throws NullPointerException if the specified element is null
210 */
211 public void put(E e) {
212 offer(e); // never need to block
213 }
214
215 /**
216 * Inserts the specified element into this priority queue. As the queue is
217 * unbounded this method will never block.
218 *
219 * @param e the element to add
220 * @param timeout This parameter is ignored as the method never blocks
221 * @param unit This parameter is ignored as the method never blocks
222 * @return <tt>true</tt>
223 * @throws ClassCastException if the specified element cannot be compared
224 * with elements currently in the priority queue according to the
225 * priority queue's ordering
226 * @throws NullPointerException if the specified element is null
227 */
228 public boolean offer(E e, long timeout, TimeUnit unit) {
229 return offer(e); // never need to block
230 }
231
232 public E poll() {
233 final ReentrantLock lock = this.lock;
234 lock.lock();
235 try {
236 return q.poll();
237 } finally {
238 lock.unlock();
239 }
240 }
241
242 public E take() throws InterruptedException {
243 final ReentrantLock lock = this.lock;
244 lock.lockInterruptibly();
245 try {
246 try {
247 while (q.size() == 0)
248 notEmpty.await();
249 } catch (InterruptedException ie) {
250 notEmpty.signal(); // propagate to non-interrupted thread
251 throw ie;
252 }
253 E x = q.poll();
254 assert x != null;
255 return x;
256 } finally {
257 lock.unlock();
258 }
259 }
260
261 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
262 long nanos = unit.toNanos(timeout);
263 final ReentrantLock lock = this.lock;
264 lock.lockInterruptibly();
265 try {
266 for (;;) {
267 E x = q.poll();
268 if (x != null)
269 return x;
270 if (nanos <= 0)
271 return null;
272 try {
273 nanos = notEmpty.awaitNanos(nanos);
274 } catch (InterruptedException ie) {
275 notEmpty.signal(); // propagate to non-interrupted thread
276 throw ie;
277 }
278 }
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public E peek() {
285 final ReentrantLock lock = this.lock;
286 lock.lock();
287 try {
288 return q.peek();
289 } finally {
290 lock.unlock();
291 }
292 }
293
294 /**
295 * Returns the comparator used to order the elements in this queue,
296 * or <tt>null</tt> if this queue uses the {@linkplain Comparable
297 * natural ordering} of its elements.
298 *
299 * @return the comparator used to order the elements in this queue,
300 * or <tt>null</tt> if this queue uses the natural
301 * ordering of its elements
302 */
303 public Comparator<? super E> comparator() {
304 return q.comparator();
305 }
306
307 public int size() {
308 final ReentrantLock lock = this.lock;
309 lock.lock();
310 try {
311 return q.size();
312 } finally {
313 lock.unlock();
314 }
315 }
316
317 /**
318 * Always returns <tt>Integer.MAX_VALUE</tt> because
319 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
320 * @return <tt>Integer.MAX_VALUE</tt>
321 */
322 public int remainingCapacity() {
323 return Integer.MAX_VALUE;
324 }
325
326 /**
327 * Removes a single instance of the specified element from this queue,
328 * if it is present. More formally, removes an element {@code e} such
329 * that {@code o.equals(e)}, if this queue contains one or more such
330 * elements. Returns {@code true} if and only if this queue contained
331 * the specified element (or equivalently, if this queue changed as a
332 * result of the call).
333 *
334 * @param o element to be removed from this queue, if present
335 * @return <tt>true</tt> if this queue changed as a result of the call
336 */
337 public boolean remove(Object o) {
338 final ReentrantLock lock = this.lock;
339 lock.lock();
340 try {
341 return q.remove(o);
342 } finally {
343 lock.unlock();
344 }
345 }
346
347 /**
348 * Returns {@code true} if this queue contains the specified element.
349 * More formally, returns {@code true} if and only if this queue contains
350 * at least one element {@code e} such that {@code o.equals(e)}.
351 *
352 * @param o object to be checked for containment in this queue
353 * @return <tt>true</tt> if this queue contains the specified element
354 */
355 public boolean contains(Object o) {
356 final ReentrantLock lock = this.lock;
357 lock.lock();
358 try {
359 return q.contains(o);
360 } finally {
361 lock.unlock();
362 }
363 }
364
365 /**
366 * Returns an array containing all of the elements in this queue.
367 * The returned array elements are in no particular order.
368 *
369 * <p>The returned array will be "safe" in that no references to it are
370 * maintained by this queue. (In other words, this method must allocate
371 * a new array). The caller is thus free to modify the returned array.
372 *
373 * <p>This method acts as bridge between array-based and collection-based
374 * APIs.
375 *
376 * @return an array containing all of the elements in this queue
377 */
378 public Object[] toArray() {
379 final ReentrantLock lock = this.lock;
380 lock.lock();
381 try {
382 return q.toArray();
383 } finally {
384 lock.unlock();
385 }
386 }
387
388
389 public String toString() {
390 final ReentrantLock lock = this.lock;
391 lock.lock();
392 try {
393 return q.toString();
394 } finally {
395 lock.unlock();
396 }
397 }
398
399 /**
400 * @throws UnsupportedOperationException {@inheritDoc}
401 * @throws ClassCastException {@inheritDoc}
402 * @throws NullPointerException {@inheritDoc}
403 * @throws IllegalArgumentException {@inheritDoc}
404 */
405 public int drainTo(Collection<? super E> c) {
406 if (c == null)
407 throw new NullPointerException();
408 if (c == this)
409 throw new IllegalArgumentException();
410 final ReentrantLock lock = this.lock;
411 lock.lock();
412 try {
413 int n = 0;
414 E e;
415 while ( (e = q.poll()) != null) {
416 c.add(e);
417 ++n;
418 }
419 return n;
420 } finally {
421 lock.unlock();
422 }
423 }
424
425 /**
426 * @throws UnsupportedOperationException {@inheritDoc}
427 * @throws ClassCastException {@inheritDoc}
428 * @throws NullPointerException {@inheritDoc}
429 * @throws IllegalArgumentException {@inheritDoc}
430 */
431 public int drainTo(Collection<? super E> c, int maxElements) {
432 if (c == null)
433 throw new NullPointerException();
434 if (c == this)
435 throw new IllegalArgumentException();
436 if (maxElements <= 0)
437 return 0;
438 final ReentrantLock lock = this.lock;
439 lock.lock();
440 try {
441 int n = 0;
442 E e;
443 while (n < maxElements && (e = q.poll()) != null) {
444 c.add(e);
445 ++n;
446 }
447 return n;
448 } finally {
449 lock.unlock();
450 }
451 }
452
453 /**
454 * Atomically removes all of the elements from this queue.
455 * The queue will be empty after this call returns.
456 */
457 public void clear() {
458 final ReentrantLock lock = this.lock;
459 lock.lock();
460 try {
461 q.clear();
462 } finally {
463 lock.unlock();
464 }
465 }
466
467 /**
468 * Returns an array containing all of the elements in this queue; the
469 * runtime type of the returned array is that of the specified array.
470 * The returned array elements are in no particular order.
471 * If the queue fits in the specified array, it is returned therein.
472 * Otherwise, a new array is allocated with the runtime type of the
473 * specified array and the size of this queue.
474 *
475 * <p>If this queue fits in the specified array with room to spare
476 * (i.e., the array has more elements than this queue), the element in
477 * the array immediately following the end of the queue is set to
478 * <tt>null</tt>.
479 *
480 * <p>Like the {@link #toArray()} method, this method acts as bridge between
481 * array-based and collection-based APIs. Further, this method allows
482 * precise control over the runtime type of the output array, and may,
483 * under certain circumstances, be used to save allocation costs.
484 *
485 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
486 * The following code can be used to dump the queue into a newly
487 * allocated array of <tt>String</tt>:
488 *
489 * <pre>
490 * String[] y = x.toArray(new String[0]);</pre>
491 *
492 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
493 * <tt>toArray()</tt>.
494 *
495 * @param a the array into which the elements of the queue are to
496 * be stored, if it is big enough; otherwise, a new array of the
497 * same runtime type is allocated for this purpose
498 * @return an array containing all of the elements in this queue
499 * @throws ArrayStoreException if the runtime type of the specified array
500 * is not a supertype of the runtime type of every element in
501 * this queue
502 * @throws NullPointerException if the specified array is null
503 */
504 public <T> T[] toArray(T[] a) {
505 final ReentrantLock lock = this.lock;
506 lock.lock();
507 try {
508 return q.toArray(a);
509 } finally {
510 lock.unlock();
511 }
512 }
513
514 /**
515 * Returns an iterator over the elements in this queue. The
516 * iterator does not return the elements in any particular order.
517 * The returned <tt>Iterator</tt> is a "weakly consistent"
518 * iterator that will never throw {@link
519 * ConcurrentModificationException}, and guarantees to traverse
520 * elements as they existed upon construction of the iterator, and
521 * may (but is not guaranteed to) reflect any modifications
522 * subsequent to construction.
523 *
524 * @return an iterator over the elements in this queue
525 */
526 public Iterator<E> iterator() {
527 return new Itr(toArray());
528 }
529
530 /**
531 * Snapshot iterator that works off copy of underlying q array.
532 */
533 private class Itr implements Iterator<E> {
534 final Object[] array; // Array of all elements
535 int cursor; // index of next element to return;
536 int lastRet; // index of last element, or -1 if no such
537
538 Itr(Object[] array) {
539 lastRet = -1;
540 this.array = array;
541 }
542
543 public boolean hasNext() {
544 return cursor < array.length;
545 }
546
547 public E next() {
548 if (cursor >= array.length)
549 throw new NoSuchElementException();
550 lastRet = cursor;
551 return (E)array[cursor++];
552 }
553
554 public void remove() {
555 if (lastRet < 0)
556 throw new IllegalStateException();
557 Object x = array[lastRet];
558 lastRet = -1;
559 // Traverse underlying queue to find == element,
560 // not just a .equals element.
561 lock.lock();
562 try {
563 for (Iterator it = q.iterator(); it.hasNext(); ) {
564 if (it.next() == x) {
565 it.remove();
566 return;
567 }
568 }
569 } finally {
570 lock.unlock();
571 }
572 }
573 }
574
575 /**
576 * Saves the state to a stream (that is, serializes it). This
577 * merely wraps default serialization within lock. The
578 * serialization strategy for items is left to underlying
579 * Queue. Note that locking is not needed on deserialization, so
580 * readObject is not defined, just relying on default.
581 */
582 private void writeObject(java.io.ObjectOutputStream s)
583 throws java.io.IOException {
584 lock.lock();
585 try {
586 s.defaultWriteObject();
587 } finally {
588 lock.unlock();
589 }
590 }
591
592}