blob: 5430a853ba61f790244af5cab6fa4fe88c8abd82 [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Sun designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Sun in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36package java.util.concurrent;
37import java.util.concurrent.locks.*;
38import java.util.*;
39
40/**
41 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
42 * array. This queue orders elements FIFO (first-in-first-out). The
43 * <em>head</em> of the queue is that element that has been on the
44 * queue the longest time. The <em>tail</em> of the queue is that
45 * element that has been on the queue the shortest time. New elements
46 * are inserted at the tail of the queue, and the queue retrieval
47 * operations obtain elements at the head of the queue.
48 *
49 * <p>This is a classic &quot;bounded buffer&quot;, in which a
50 * fixed-sized array holds elements inserted by producers and
51 * extracted by consumers. Once created, the capacity cannot be
52 * increased. Attempts to <tt>put</tt> an element into a full queue
53 * will result in the operation blocking; attempts to <tt>take</tt> an
54 * element from an empty queue will similarly block.
55 *
56 * <p> This class supports an optional fairness policy for ordering
57 * waiting producer and consumer threads. By default, this ordering
58 * is not guaranteed. However, a queue constructed with fairness set
59 * to <tt>true</tt> grants threads access in FIFO order. Fairness
60 * generally decreases throughput but reduces variability and avoids
61 * starvation.
62 *
63 * <p>This class and its iterator implement all of the
64 * <em>optional</em> methods of the {@link Collection} and {@link
65 * Iterator} interfaces.
66 *
67 * <p>This class is a member of the
68 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
69 * Java Collections Framework</a>.
70 *
71 * @since 1.5
72 * @author Doug Lea
73 * @param <E> the type of elements held in this collection
74 */
75public class ArrayBlockingQueue<E> extends AbstractQueue<E>
76 implements BlockingQueue<E>, java.io.Serializable {
77
78 /**
79 * Serialization ID. This class relies on default serialization
80 * even for the items array, which is default-serialized, even if
81 * it is empty. Otherwise it could not be declared final, which is
82 * necessary here.
83 */
84 private static final long serialVersionUID = -817911632652898426L;
85
86 /** The queued items */
87 private final E[] items;
88 /** items index for next take, poll or remove */
89 private int takeIndex;
90 /** items index for next put, offer, or add. */
91 private int putIndex;
92 /** Number of items in the queue */
93 private int count;
94
95 /*
96 * Concurrency control uses the classic two-condition algorithm
97 * found in any textbook.
98 */
99
100 /** Main lock guarding all access */
101 private final ReentrantLock lock;
102 /** Condition for waiting takes */
103 private final Condition notEmpty;
104 /** Condition for waiting puts */
105 private final Condition notFull;
106
107 // Internal helper methods
108
109 /**
110 * Circularly increment i.
111 */
112 final int inc(int i) {
113 return (++i == items.length)? 0 : i;
114 }
115
116 /**
117 * Inserts element at current put position, advances, and signals.
118 * Call only when holding lock.
119 */
120 private void insert(E x) {
121 items[putIndex] = x;
122 putIndex = inc(putIndex);
123 ++count;
124 notEmpty.signal();
125 }
126
127 /**
128 * Extracts element at current take position, advances, and signals.
129 * Call only when holding lock.
130 */
131 private E extract() {
132 final E[] items = this.items;
133 E x = items[takeIndex];
134 items[takeIndex] = null;
135 takeIndex = inc(takeIndex);
136 --count;
137 notFull.signal();
138 return x;
139 }
140
141 /**
142 * Utility for remove and iterator.remove: Delete item at position i.
143 * Call only when holding lock.
144 */
145 void removeAt(int i) {
146 final E[] items = this.items;
147 // if removing front item, just advance
148 if (i == takeIndex) {
149 items[takeIndex] = null;
150 takeIndex = inc(takeIndex);
151 } else {
152 // slide over all others up through putIndex.
153 for (;;) {
154 int nexti = inc(i);
155 if (nexti != putIndex) {
156 items[i] = items[nexti];
157 i = nexti;
158 } else {
159 items[i] = null;
160 putIndex = i;
161 break;
162 }
163 }
164 }
165 --count;
166 notFull.signal();
167 }
168
169 /**
170 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
171 * capacity and default access policy.
172 *
173 * @param capacity the capacity of this queue
174 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
175 */
176 public ArrayBlockingQueue(int capacity) {
177 this(capacity, false);
178 }
179
180 /**
181 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
182 * capacity and the specified access policy.
183 *
184 * @param capacity the capacity of this queue
185 * @param fair if <tt>true</tt> then queue accesses for threads blocked
186 * on insertion or removal, are processed in FIFO order;
187 * if <tt>false</tt> the access order is unspecified.
188 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
189 */
190 public ArrayBlockingQueue(int capacity, boolean fair) {
191 if (capacity <= 0)
192 throw new IllegalArgumentException();
193 this.items = (E[]) new Object[capacity];
194 lock = new ReentrantLock(fair);
195 notEmpty = lock.newCondition();
196 notFull = lock.newCondition();
197 }
198
199 /**
200 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
201 * capacity, the specified access policy and initially containing the
202 * elements of the given collection,
203 * added in traversal order of the collection's iterator.
204 *
205 * @param capacity the capacity of this queue
206 * @param fair if <tt>true</tt> then queue accesses for threads blocked
207 * on insertion or removal, are processed in FIFO order;
208 * if <tt>false</tt> the access order is unspecified.
209 * @param c the collection of elements to initially contain
210 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
211 * <tt>c.size()</tt>, or less than 1.
212 * @throws NullPointerException if the specified collection or any
213 * of its elements are null
214 */
215 public ArrayBlockingQueue(int capacity, boolean fair,
216 Collection<? extends E> c) {
217 this(capacity, fair);
218 if (capacity < c.size())
219 throw new IllegalArgumentException();
220
221 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
222 add(it.next());
223 }
224
225 /**
226 * Inserts the specified element at the tail of this queue if it is
227 * possible to do so immediately without exceeding the queue's capacity,
228 * returning <tt>true</tt> upon success and throwing an
229 * <tt>IllegalStateException</tt> if this queue is full.
230 *
231 * @param e the element to add
232 * @return <tt>true</tt> (as specified by {@link Collection#add})
233 * @throws IllegalStateException if this queue is full
234 * @throws NullPointerException if the specified element is null
235 */
236 public boolean add(E e) {
237 return super.add(e);
238 }
239
240 /**
241 * Inserts the specified element at the tail of this queue if it is
242 * possible to do so immediately without exceeding the queue's capacity,
243 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
244 * is full. This method is generally preferable to method {@link #add},
245 * which can fail to insert an element only by throwing an exception.
246 *
247 * @throws NullPointerException if the specified element is null
248 */
249 public boolean offer(E e) {
250 if (e == null) throw new NullPointerException();
251 final ReentrantLock lock = this.lock;
252 lock.lock();
253 try {
254 if (count == items.length)
255 return false;
256 else {
257 insert(e);
258 return true;
259 }
260 } finally {
261 lock.unlock();
262 }
263 }
264
265 /**
266 * Inserts the specified element at the tail of this queue, waiting
267 * for space to become available if the queue is full.
268 *
269 * @throws InterruptedException {@inheritDoc}
270 * @throws NullPointerException {@inheritDoc}
271 */
272 public void put(E e) throws InterruptedException {
273 if (e == null) throw new NullPointerException();
274 final E[] items = this.items;
275 final ReentrantLock lock = this.lock;
276 lock.lockInterruptibly();
277 try {
278 try {
279 while (count == items.length)
280 notFull.await();
281 } catch (InterruptedException ie) {
282 notFull.signal(); // propagate to non-interrupted thread
283 throw ie;
284 }
285 insert(e);
286 } finally {
287 lock.unlock();
288 }
289 }
290
291 /**
292 * Inserts the specified element at the tail of this queue, waiting
293 * up to the specified wait time for space to become available if
294 * the queue is full.
295 *
296 * @throws InterruptedException {@inheritDoc}
297 * @throws NullPointerException {@inheritDoc}
298 */
299 public boolean offer(E e, long timeout, TimeUnit unit)
300 throws InterruptedException {
301
302 if (e == null) throw new NullPointerException();
303 long nanos = unit.toNanos(timeout);
304 final ReentrantLock lock = this.lock;
305 lock.lockInterruptibly();
306 try {
307 for (;;) {
308 if (count != items.length) {
309 insert(e);
310 return true;
311 }
312 if (nanos <= 0)
313 return false;
314 try {
315 nanos = notFull.awaitNanos(nanos);
316 } catch (InterruptedException ie) {
317 notFull.signal(); // propagate to non-interrupted thread
318 throw ie;
319 }
320 }
321 } finally {
322 lock.unlock();
323 }
324 }
325
326 public E poll() {
327 final ReentrantLock lock = this.lock;
328 lock.lock();
329 try {
330 if (count == 0)
331 return null;
332 E x = extract();
333 return x;
334 } finally {
335 lock.unlock();
336 }
337 }
338
339 public E take() throws InterruptedException {
340 final ReentrantLock lock = this.lock;
341 lock.lockInterruptibly();
342 try {
343 try {
344 while (count == 0)
345 notEmpty.await();
346 } catch (InterruptedException ie) {
347 notEmpty.signal(); // propagate to non-interrupted thread
348 throw ie;
349 }
350 E x = extract();
351 return x;
352 } finally {
353 lock.unlock();
354 }
355 }
356
357 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
358 long nanos = unit.toNanos(timeout);
359 final ReentrantLock lock = this.lock;
360 lock.lockInterruptibly();
361 try {
362 for (;;) {
363 if (count != 0) {
364 E x = extract();
365 return x;
366 }
367 if (nanos <= 0)
368 return null;
369 try {
370 nanos = notEmpty.awaitNanos(nanos);
371 } catch (InterruptedException ie) {
372 notEmpty.signal(); // propagate to non-interrupted thread
373 throw ie;
374 }
375
376 }
377 } finally {
378 lock.unlock();
379 }
380 }
381
382 public E peek() {
383 final ReentrantLock lock = this.lock;
384 lock.lock();
385 try {
386 return (count == 0) ? null : items[takeIndex];
387 } finally {
388 lock.unlock();
389 }
390 }
391
392 // this doc comment is overridden to remove the reference to collections
393 // greater in size than Integer.MAX_VALUE
394 /**
395 * Returns the number of elements in this queue.
396 *
397 * @return the number of elements in this queue
398 */
399 public int size() {
400 final ReentrantLock lock = this.lock;
401 lock.lock();
402 try {
403 return count;
404 } finally {
405 lock.unlock();
406 }
407 }
408
409 // this doc comment is a modified copy of the inherited doc comment,
410 // without the reference to unlimited queues.
411 /**
412 * Returns the number of additional elements that this queue can ideally
413 * (in the absence of memory or resource constraints) accept without
414 * blocking. This is always equal to the initial capacity of this queue
415 * less the current <tt>size</tt> of this queue.
416 *
417 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
418 * an element will succeed by inspecting <tt>remainingCapacity</tt>
419 * because it may be the case that another thread is about to
420 * insert or remove an element.
421 */
422 public int remainingCapacity() {
423 final ReentrantLock lock = this.lock;
424 lock.lock();
425 try {
426 return items.length - count;
427 } finally {
428 lock.unlock();
429 }
430 }
431
432 /**
433 * Removes a single instance of the specified element from this queue,
434 * if it is present. More formally, removes an element <tt>e</tt> such
435 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
436 * elements.
437 * Returns <tt>true</tt> if this queue contained the specified element
438 * (or equivalently, if this queue changed as a result of the call).
439 *
440 * @param o element to be removed from this queue, if present
441 * @return <tt>true</tt> if this queue changed as a result of the call
442 */
443 public boolean remove(Object o) {
444 if (o == null) return false;
445 final E[] items = this.items;
446 final ReentrantLock lock = this.lock;
447 lock.lock();
448 try {
449 int i = takeIndex;
450 int k = 0;
451 for (;;) {
452 if (k++ >= count)
453 return false;
454 if (o.equals(items[i])) {
455 removeAt(i);
456 return true;
457 }
458 i = inc(i);
459 }
460
461 } finally {
462 lock.unlock();
463 }
464 }
465
466 /**
467 * Returns <tt>true</tt> if this queue contains the specified element.
468 * More formally, returns <tt>true</tt> if and only if this queue contains
469 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
470 *
471 * @param o object to be checked for containment in this queue
472 * @return <tt>true</tt> if this queue contains the specified element
473 */
474 public boolean contains(Object o) {
475 if (o == null) return false;
476 final E[] items = this.items;
477 final ReentrantLock lock = this.lock;
478 lock.lock();
479 try {
480 int i = takeIndex;
481 int k = 0;
482 while (k++ < count) {
483 if (o.equals(items[i]))
484 return true;
485 i = inc(i);
486 }
487 return false;
488 } finally {
489 lock.unlock();
490 }
491 }
492
493 /**
494 * Returns an array containing all of the elements in this queue, in
495 * proper sequence.
496 *
497 * <p>The returned array will be "safe" in that no references to it are
498 * maintained by this queue. (In other words, this method must allocate
499 * a new array). The caller is thus free to modify the returned array.
500 *
501 * <p>This method acts as bridge between array-based and collection-based
502 * APIs.
503 *
504 * @return an array containing all of the elements in this queue
505 */
506 public Object[] toArray() {
507 final E[] items = this.items;
508 final ReentrantLock lock = this.lock;
509 lock.lock();
510 try {
511 Object[] a = new Object[count];
512 int k = 0;
513 int i = takeIndex;
514 while (k < count) {
515 a[k++] = items[i];
516 i = inc(i);
517 }
518 return a;
519 } finally {
520 lock.unlock();
521 }
522 }
523
524 /**
525 * Returns an array containing all of the elements in this queue, in
526 * proper sequence; the runtime type of the returned array is that of
527 * the specified array. If the queue fits in the specified array, it
528 * is returned therein. Otherwise, a new array is allocated with the
529 * runtime type of the specified array and the size of this queue.
530 *
531 * <p>If this queue fits in the specified array with room to spare
532 * (i.e., the array has more elements than this queue), the element in
533 * the array immediately following the end of the queue is set to
534 * <tt>null</tt>.
535 *
536 * <p>Like the {@link #toArray()} method, this method acts as bridge between
537 * array-based and collection-based APIs. Further, this method allows
538 * precise control over the runtime type of the output array, and may,
539 * under certain circumstances, be used to save allocation costs.
540 *
541 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
542 * The following code can be used to dump the queue into a newly
543 * allocated array of <tt>String</tt>:
544 *
545 * <pre>
546 * String[] y = x.toArray(new String[0]);</pre>
547 *
548 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
549 * <tt>toArray()</tt>.
550 *
551 * @param a the array into which the elements of the queue are to
552 * be stored, if it is big enough; otherwise, a new array of the
553 * same runtime type is allocated for this purpose
554 * @return an array containing all of the elements in this queue
555 * @throws ArrayStoreException if the runtime type of the specified array
556 * is not a supertype of the runtime type of every element in
557 * this queue
558 * @throws NullPointerException if the specified array is null
559 */
560 public <T> T[] toArray(T[] a) {
561 final E[] items = this.items;
562 final ReentrantLock lock = this.lock;
563 lock.lock();
564 try {
565 if (a.length < count)
566 a = (T[])java.lang.reflect.Array.newInstance(
567 a.getClass().getComponentType(),
568 count
569 );
570
571 int k = 0;
572 int i = takeIndex;
573 while (k < count) {
574 a[k++] = (T)items[i];
575 i = inc(i);
576 }
577 if (a.length > count)
578 a[count] = null;
579 return a;
580 } finally {
581 lock.unlock();
582 }
583 }
584
585 public String toString() {
586 final ReentrantLock lock = this.lock;
587 lock.lock();
588 try {
589 return super.toString();
590 } finally {
591 lock.unlock();
592 }
593 }
594
595 /**
596 * Atomically removes all of the elements from this queue.
597 * The queue will be empty after this call returns.
598 */
599 public void clear() {
600 final E[] items = this.items;
601 final ReentrantLock lock = this.lock;
602 lock.lock();
603 try {
604 int i = takeIndex;
605 int k = count;
606 while (k-- > 0) {
607 items[i] = null;
608 i = inc(i);
609 }
610 count = 0;
611 putIndex = 0;
612 takeIndex = 0;
613 notFull.signalAll();
614 } finally {
615 lock.unlock();
616 }
617 }
618
619 /**
620 * @throws UnsupportedOperationException {@inheritDoc}
621 * @throws ClassCastException {@inheritDoc}
622 * @throws NullPointerException {@inheritDoc}
623 * @throws IllegalArgumentException {@inheritDoc}
624 */
625 public int drainTo(Collection<? super E> c) {
626 if (c == null)
627 throw new NullPointerException();
628 if (c == this)
629 throw new IllegalArgumentException();
630 final E[] items = this.items;
631 final ReentrantLock lock = this.lock;
632 lock.lock();
633 try {
634 int i = takeIndex;
635 int n = 0;
636 int max = count;
637 while (n < max) {
638 c.add(items[i]);
639 items[i] = null;
640 i = inc(i);
641 ++n;
642 }
643 if (n > 0) {
644 count = 0;
645 putIndex = 0;
646 takeIndex = 0;
647 notFull.signalAll();
648 }
649 return n;
650 } finally {
651 lock.unlock();
652 }
653 }
654
655 /**
656 * @throws UnsupportedOperationException {@inheritDoc}
657 * @throws ClassCastException {@inheritDoc}
658 * @throws NullPointerException {@inheritDoc}
659 * @throws IllegalArgumentException {@inheritDoc}
660 */
661 public int drainTo(Collection<? super E> c, int maxElements) {
662 if (c == null)
663 throw new NullPointerException();
664 if (c == this)
665 throw new IllegalArgumentException();
666 if (maxElements <= 0)
667 return 0;
668 final E[] items = this.items;
669 final ReentrantLock lock = this.lock;
670 lock.lock();
671 try {
672 int i = takeIndex;
673 int n = 0;
674 int sz = count;
675 int max = (maxElements < count)? maxElements : count;
676 while (n < max) {
677 c.add(items[i]);
678 items[i] = null;
679 i = inc(i);
680 ++n;
681 }
682 if (n > 0) {
683 count -= n;
684 takeIndex = i;
685 notFull.signalAll();
686 }
687 return n;
688 } finally {
689 lock.unlock();
690 }
691 }
692
693
694 /**
695 * Returns an iterator over the elements in this queue in proper sequence.
696 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
697 * will never throw {@link ConcurrentModificationException},
698 * and guarantees to traverse elements as they existed upon
699 * construction of the iterator, and may (but is not guaranteed to)
700 * reflect any modifications subsequent to construction.
701 *
702 * @return an iterator over the elements in this queue in proper sequence
703 */
704 public Iterator<E> iterator() {
705 final ReentrantLock lock = this.lock;
706 lock.lock();
707 try {
708 return new Itr();
709 } finally {
710 lock.unlock();
711 }
712 }
713
714 /**
715 * Iterator for ArrayBlockingQueue
716 */
717 private class Itr implements Iterator<E> {
718 /**
719 * Index of element to be returned by next,
720 * or a negative number if no such.
721 */
722 private int nextIndex;
723
724 /**
725 * nextItem holds on to item fields because once we claim
726 * that an element exists in hasNext(), we must return it in
727 * the following next() call even if it was in the process of
728 * being removed when hasNext() was called.
729 */
730 private E nextItem;
731
732 /**
733 * Index of element returned by most recent call to next.
734 * Reset to -1 if this element is deleted by a call to remove.
735 */
736 private int lastRet;
737
738 Itr() {
739 lastRet = -1;
740 if (count == 0)
741 nextIndex = -1;
742 else {
743 nextIndex = takeIndex;
744 nextItem = items[takeIndex];
745 }
746 }
747
748 public boolean hasNext() {
749 /*
750 * No sync. We can return true by mistake here
751 * only if this iterator passed across threads,
752 * which we don't support anyway.
753 */
754 return nextIndex >= 0;
755 }
756
757 /**
758 * Checks whether nextIndex is valid; if so setting nextItem.
759 * Stops iterator when either hits putIndex or sees null item.
760 */
761 private void checkNext() {
762 if (nextIndex == putIndex) {
763 nextIndex = -1;
764 nextItem = null;
765 } else {
766 nextItem = items[nextIndex];
767 if (nextItem == null)
768 nextIndex = -1;
769 }
770 }
771
772 public E next() {
773 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
774 lock.lock();
775 try {
776 if (nextIndex < 0)
777 throw new NoSuchElementException();
778 lastRet = nextIndex;
779 E x = nextItem;
780 nextIndex = inc(nextIndex);
781 checkNext();
782 return x;
783 } finally {
784 lock.unlock();
785 }
786 }
787
788 public void remove() {
789 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
790 lock.lock();
791 try {
792 int i = lastRet;
793 if (i == -1)
794 throw new IllegalStateException();
795 lastRet = -1;
796
797 int ti = takeIndex;
798 removeAt(i);
799 // back up cursor (reset to front if was first element)
800 nextIndex = (i == ti) ? takeIndex : i;
801 checkNext();
802 } finally {
803 lock.unlock();
804 }
805 }
806 }
807}