blob: 57350dd1c3cd788d929b2df5039fdd92c92f953c [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Sun designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Sun in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36
37package java.util.concurrent;
38import java.util.concurrent.locks.*;
39import java.util.*;
40
41/**
42 * An unbounded {@linkplain BlockingQueue blocking queue} of
43 * <tt>Delayed</tt> elements, in which an element can only be taken
44 * when its delay has expired. The <em>head</em> of the queue is that
45 * <tt>Delayed</tt> element whose delay expired furthest in the
46 * past. If no delay has expired there is no head and <tt>poll</tt>
47 * will return <tt>null</tt>. Expiration occurs when an element's
48 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
49 * than or equal to zero. Even though unexpired elements cannot be
50 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
51 * treated as normal elements. For example, the <tt>size</tt> method
52 * returns the count of both expired and unexpired elements.
53 * This queue does not permit null elements.
54 *
55 * <p>This class and its iterator implement all of the
56 * <em>optional</em> methods of the {@link Collection} and {@link
57 * Iterator} interfaces.
58 *
59 * <p>This class is a member of the
60 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
61 * Java Collections Framework</a>.
62 *
63 * @since 1.5
64 * @author Doug Lea
65 * @param <E> the type of elements held in this collection
66 */
67
68public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
69 implements BlockingQueue<E> {
70
71 private transient final ReentrantLock lock = new ReentrantLock();
72 private transient final Condition available = lock.newCondition();
73 private final PriorityQueue<E> q = new PriorityQueue<E>();
74
75 /**
76 * Creates a new <tt>DelayQueue</tt> that is initially empty.
77 */
78 public DelayQueue() {}
79
80 /**
81 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
82 * given collection of {@link Delayed} instances.
83 *
84 * @param c the collection of elements to initially contain
85 * @throws NullPointerException if the specified collection or any
86 * of its elements are null
87 */
88 public DelayQueue(Collection<? extends E> c) {
89 this.addAll(c);
90 }
91
92 /**
93 * Inserts the specified element into this delay queue.
94 *
95 * @param e the element to add
96 * @return <tt>true</tt> (as specified by {@link Collection#add})
97 * @throws NullPointerException if the specified element is null
98 */
99 public boolean add(E e) {
100 return offer(e);
101 }
102
103 /**
104 * Inserts the specified element into this delay queue.
105 *
106 * @param e the element to add
107 * @return <tt>true</tt>
108 * @throws NullPointerException if the specified element is null
109 */
110 public boolean offer(E e) {
111 final ReentrantLock lock = this.lock;
112 lock.lock();
113 try {
114 E first = q.peek();
115 q.offer(e);
116 if (first == null || e.compareTo(first) < 0)
117 available.signalAll();
118 return true;
119 } finally {
120 lock.unlock();
121 }
122 }
123
124 /**
125 * Inserts the specified element into this delay queue. As the queue is
126 * unbounded this method will never block.
127 *
128 * @param e the element to add
129 * @throws NullPointerException {@inheritDoc}
130 */
131 public void put(E e) {
132 offer(e);
133 }
134
135 /**
136 * Inserts the specified element into this delay queue. As the queue is
137 * unbounded this method will never block.
138 *
139 * @param e the element to add
140 * @param timeout This parameter is ignored as the method never blocks
141 * @param unit This parameter is ignored as the method never blocks
142 * @return <tt>true</tt>
143 * @throws NullPointerException {@inheritDoc}
144 */
145 public boolean offer(E e, long timeout, TimeUnit unit) {
146 return offer(e);
147 }
148
149 /**
150 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
151 * if this queue has no elements with an expired delay.
152 *
153 * @return the head of this queue, or <tt>null</tt> if this
154 * queue has no elements with an expired delay
155 */
156 public E poll() {
157 final ReentrantLock lock = this.lock;
158 lock.lock();
159 try {
160 E first = q.peek();
161 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
162 return null;
163 else {
164 E x = q.poll();
165 assert x != null;
166 if (q.size() != 0)
167 available.signalAll();
168 return x;
169 }
170 } finally {
171 lock.unlock();
172 }
173 }
174
175 /**
176 * Retrieves and removes the head of this queue, waiting if necessary
177 * until an element with an expired delay is available on this queue.
178 *
179 * @return the head of this queue
180 * @throws InterruptedException {@inheritDoc}
181 */
182 public E take() throws InterruptedException {
183 final ReentrantLock lock = this.lock;
184 lock.lockInterruptibly();
185 try {
186 for (;;) {
187 E first = q.peek();
188 if (first == null) {
189 available.await();
190 } else {
191 long delay = first.getDelay(TimeUnit.NANOSECONDS);
192 if (delay > 0) {
193 long tl = available.awaitNanos(delay);
194 } else {
195 E x = q.poll();
196 assert x != null;
197 if (q.size() != 0)
198 available.signalAll(); // wake up other takers
199 return x;
200
201 }
202 }
203 }
204 } finally {
205 lock.unlock();
206 }
207 }
208
209 /**
210 * Retrieves and removes the head of this queue, waiting if necessary
211 * until an element with an expired delay is available on this queue,
212 * or the specified wait time expires.
213 *
214 * @return the head of this queue, or <tt>null</tt> if the
215 * specified waiting time elapses before an element with
216 * an expired delay becomes available
217 * @throws InterruptedException {@inheritDoc}
218 */
219 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
220 long nanos = unit.toNanos(timeout);
221 final ReentrantLock lock = this.lock;
222 lock.lockInterruptibly();
223 try {
224 for (;;) {
225 E first = q.peek();
226 if (first == null) {
227 if (nanos <= 0)
228 return null;
229 else
230 nanos = available.awaitNanos(nanos);
231 } else {
232 long delay = first.getDelay(TimeUnit.NANOSECONDS);
233 if (delay > 0) {
234 if (nanos <= 0)
235 return null;
236 if (delay > nanos)
237 delay = nanos;
238 long timeLeft = available.awaitNanos(delay);
239 nanos -= delay - timeLeft;
240 } else {
241 E x = q.poll();
242 assert x != null;
243 if (q.size() != 0)
244 available.signalAll();
245 return x;
246 }
247 }
248 }
249 } finally {
250 lock.unlock();
251 }
252 }
253
254 /**
255 * Retrieves, but does not remove, the head of this queue, or
256 * returns <tt>null</tt> if this queue is empty. Unlike
257 * <tt>poll</tt>, if no expired elements are available in the queue,
258 * this method returns the element that will expire next,
259 * if one exists.
260 *
261 * @return the head of this queue, or <tt>null</tt> if this
262 * queue is empty.
263 */
264 public E peek() {
265 final ReentrantLock lock = this.lock;
266 lock.lock();
267 try {
268 return q.peek();
269 } finally {
270 lock.unlock();
271 }
272 }
273
274 public int size() {
275 final ReentrantLock lock = this.lock;
276 lock.lock();
277 try {
278 return q.size();
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 /**
285 * @throws UnsupportedOperationException {@inheritDoc}
286 * @throws ClassCastException {@inheritDoc}
287 * @throws NullPointerException {@inheritDoc}
288 * @throws IllegalArgumentException {@inheritDoc}
289 */
290 public int drainTo(Collection<? super E> c) {
291 if (c == null)
292 throw new NullPointerException();
293 if (c == this)
294 throw new IllegalArgumentException();
295 final ReentrantLock lock = this.lock;
296 lock.lock();
297 try {
298 int n = 0;
299 for (;;) {
300 E first = q.peek();
301 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
302 break;
303 c.add(q.poll());
304 ++n;
305 }
306 if (n > 0)
307 available.signalAll();
308 return n;
309 } finally {
310 lock.unlock();
311 }
312 }
313
314 /**
315 * @throws UnsupportedOperationException {@inheritDoc}
316 * @throws ClassCastException {@inheritDoc}
317 * @throws NullPointerException {@inheritDoc}
318 * @throws IllegalArgumentException {@inheritDoc}
319 */
320 public int drainTo(Collection<? super E> c, int maxElements) {
321 if (c == null)
322 throw new NullPointerException();
323 if (c == this)
324 throw new IllegalArgumentException();
325 if (maxElements <= 0)
326 return 0;
327 final ReentrantLock lock = this.lock;
328 lock.lock();
329 try {
330 int n = 0;
331 while (n < maxElements) {
332 E first = q.peek();
333 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
334 break;
335 c.add(q.poll());
336 ++n;
337 }
338 if (n > 0)
339 available.signalAll();
340 return n;
341 } finally {
342 lock.unlock();
343 }
344 }
345
346 /**
347 * Atomically removes all of the elements from this delay queue.
348 * The queue will be empty after this call returns.
349 * Elements with an unexpired delay are not waited for; they are
350 * simply discarded from the queue.
351 */
352 public void clear() {
353 final ReentrantLock lock = this.lock;
354 lock.lock();
355 try {
356 q.clear();
357 } finally {
358 lock.unlock();
359 }
360 }
361
362 /**
363 * Always returns <tt>Integer.MAX_VALUE</tt> because
364 * a <tt>DelayQueue</tt> is not capacity constrained.
365 *
366 * @return <tt>Integer.MAX_VALUE</tt>
367 */
368 public int remainingCapacity() {
369 return Integer.MAX_VALUE;
370 }
371
372 /**
373 * Returns an array containing all of the elements in this queue.
374 * The returned array elements are in no particular order.
375 *
376 * <p>The returned array will be "safe" in that no references to it are
377 * maintained by this queue. (In other words, this method must allocate
378 * a new array). The caller is thus free to modify the returned array.
379 *
380 * <p>This method acts as bridge between array-based and collection-based
381 * APIs.
382 *
383 * @return an array containing all of the elements in this queue
384 */
385 public Object[] toArray() {
386 final ReentrantLock lock = this.lock;
387 lock.lock();
388 try {
389 return q.toArray();
390 } finally {
391 lock.unlock();
392 }
393 }
394
395 /**
396 * Returns an array containing all of the elements in this queue; the
397 * runtime type of the returned array is that of the specified array.
398 * The returned array elements are in no particular order.
399 * If the queue fits in the specified array, it is returned therein.
400 * Otherwise, a new array is allocated with the runtime type of the
401 * specified array and the size of this queue.
402 *
403 * <p>If this queue fits in the specified array with room to spare
404 * (i.e., the array has more elements than this queue), the element in
405 * the array immediately following the end of the queue is set to
406 * <tt>null</tt>.
407 *
408 * <p>Like the {@link #toArray()} method, this method acts as bridge between
409 * array-based and collection-based APIs. Further, this method allows
410 * precise control over the runtime type of the output array, and may,
411 * under certain circumstances, be used to save allocation costs.
412 *
413 * <p>The following code can be used to dump a delay queue into a newly
414 * allocated array of <tt>Delayed</tt>:
415 *
416 * <pre>
417 * Delayed[] a = q.toArray(new Delayed[0]);</pre>
418 *
419 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
420 * <tt>toArray()</tt>.
421 *
422 * @param a the array into which the elements of the queue are to
423 * be stored, if it is big enough; otherwise, a new array of the
424 * same runtime type is allocated for this purpose
425 * @return an array containing all of the elements in this queue
426 * @throws ArrayStoreException if the runtime type of the specified array
427 * is not a supertype of the runtime type of every element in
428 * this queue
429 * @throws NullPointerException if the specified array is null
430 */
431 public <T> T[] toArray(T[] a) {
432 final ReentrantLock lock = this.lock;
433 lock.lock();
434 try {
435 return q.toArray(a);
436 } finally {
437 lock.unlock();
438 }
439 }
440
441 /**
442 * Removes a single instance of the specified element from this
443 * queue, if it is present, whether or not it has expired.
444 */
445 public boolean remove(Object o) {
446 final ReentrantLock lock = this.lock;
447 lock.lock();
448 try {
449 return q.remove(o);
450 } finally {
451 lock.unlock();
452 }
453 }
454
455 /**
456 * Returns an iterator over all the elements (both expired and
457 * unexpired) in this queue. The iterator does not return the
458 * elements in any particular order. The returned
459 * <tt>Iterator</tt> is a "weakly consistent" iterator that will
460 * never throw {@link ConcurrentModificationException}, and
461 * guarantees to traverse elements as they existed upon
462 * construction of the iterator, and may (but is not guaranteed
463 * to) reflect any modifications subsequent to construction.
464 *
465 * @return an iterator over the elements in this queue
466 */
467 public Iterator<E> iterator() {
468 return new Itr(toArray());
469 }
470
471 /**
472 * Snapshot iterator that works off copy of underlying q array.
473 */
474 private class Itr implements Iterator<E> {
475 final Object[] array; // Array of all elements
476 int cursor; // index of next element to return;
477 int lastRet; // index of last element, or -1 if no such
478
479 Itr(Object[] array) {
480 lastRet = -1;
481 this.array = array;
482 }
483
484 public boolean hasNext() {
485 return cursor < array.length;
486 }
487
488 public E next() {
489 if (cursor >= array.length)
490 throw new NoSuchElementException();
491 lastRet = cursor;
492 return (E)array[cursor++];
493 }
494
495 public void remove() {
496 if (lastRet < 0)
497 throw new IllegalStateException();
498 Object x = array[lastRet];
499 lastRet = -1;
500 // Traverse underlying queue to find == element,
501 // not just a .equals element.
502 lock.lock();
503 try {
504 for (Iterator it = q.iterator(); it.hasNext(); ) {
505 if (it.next() == x) {
506 it.remove();
507 return;
508 }
509 }
510 } finally {
511 lock.unlock();
512 }
513 }
514 }
515
516}