blob: fafbcc01704c682b8abd5c2bd7e993c77a0eb95b [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Sun designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Sun in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36package java.util.concurrent;
37import java.util.*;
38import java.util.concurrent.atomic.*;
39
40
41/**
42 * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
43 * This queue orders elements FIFO (first-in-first-out).
44 * The <em>head</em> of the queue is that element that has been on the
45 * queue the longest time.
46 * The <em>tail</em> of the queue is that element that has been on the
47 * queue the shortest time. New elements
48 * are inserted at the tail of the queue, and the queue retrieval
49 * operations obtain elements at the head of the queue.
50 * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
51 * many threads will share access to a common collection.
52 * This queue does not permit <tt>null</tt> elements.
53 *
54 * <p>This implementation employs an efficient &quot;wait-free&quot;
55 * algorithm based on one described in <a
56 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
57 * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
58 * Algorithms</a> by Maged M. Michael and Michael L. Scott.
59 *
60 * <p>Beware that, unlike in most collections, the <tt>size</tt> method
61 * is <em>NOT</em> a constant-time operation. Because of the
62 * asynchronous nature of these queues, determining the current number
63 * of elements requires a traversal of the elements.
64 *
65 * <p>This class and its iterator implement all of the
66 * <em>optional</em> methods of the {@link Collection} and {@link
67 * Iterator} interfaces.
68 *
69 * <p>Memory consistency effects: As with other concurrent
70 * collections, actions in a thread prior to placing an object into a
71 * {@code ConcurrentLinkedQueue}
72 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
73 * actions subsequent to the access or removal of that element from
74 * the {@code ConcurrentLinkedQueue} in another thread.
75 *
76 * <p>This class is a member of the
77 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
78 * Java Collections Framework</a>.
79 *
80 * @since 1.5
81 * @author Doug Lea
82 * @param <E> the type of elements held in this collection
83 *
84 */
85public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
86 implements Queue<E>, java.io.Serializable {
87 private static final long serialVersionUID = 196745693267521676L;
88
89 /*
90 * This is a straight adaptation of Michael & Scott algorithm.
91 * For explanation, read the paper. The only (minor) algorithmic
92 * difference is that this version supports lazy deletion of
93 * internal nodes (method remove(Object)) -- remove CAS'es item
94 * fields to null. The normal queue operations unlink but then
95 * pass over nodes with null item fields. Similarly, iteration
96 * methods ignore those with nulls.
97 *
98 * Also note that like most non-blocking algorithms in this
99 * package, this implementation relies on the fact that in garbage
100 * collected systems, there is no possibility of ABA problems due
101 * to recycled nodes, so there is no need to use "counted
102 * pointers" or related techniques seen in versions used in
103 * non-GC'ed settings.
104 */
105
106 private static class Node<E> {
107 private volatile E item;
108 private volatile Node<E> next;
109
110 private static final
111 AtomicReferenceFieldUpdater<Node, Node>
112 nextUpdater =
113 AtomicReferenceFieldUpdater.newUpdater
114 (Node.class, Node.class, "next");
115 private static final
116 AtomicReferenceFieldUpdater<Node, Object>
117 itemUpdater =
118 AtomicReferenceFieldUpdater.newUpdater
119 (Node.class, Object.class, "item");
120
121 Node(E x) { item = x; }
122
123 Node(E x, Node<E> n) { item = x; next = n; }
124
125 E getItem() {
126 return item;
127 }
128
129 boolean casItem(E cmp, E val) {
130 return itemUpdater.compareAndSet(this, cmp, val);
131 }
132
133 void setItem(E val) {
134 itemUpdater.set(this, val);
135 }
136
137 Node<E> getNext() {
138 return next;
139 }
140
141 boolean casNext(Node<E> cmp, Node<E> val) {
142 return nextUpdater.compareAndSet(this, cmp, val);
143 }
144
145 void setNext(Node<E> val) {
146 nextUpdater.set(this, val);
147 }
148
149 }
150
151 private static final
152 AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
153 tailUpdater =
154 AtomicReferenceFieldUpdater.newUpdater
155 (ConcurrentLinkedQueue.class, Node.class, "tail");
156 private static final
157 AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
158 headUpdater =
159 AtomicReferenceFieldUpdater.newUpdater
160 (ConcurrentLinkedQueue.class, Node.class, "head");
161
162 private boolean casTail(Node<E> cmp, Node<E> val) {
163 return tailUpdater.compareAndSet(this, cmp, val);
164 }
165
166 private boolean casHead(Node<E> cmp, Node<E> val) {
167 return headUpdater.compareAndSet(this, cmp, val);
168 }
169
170
171 /**
172 * Pointer to header node, initialized to a dummy node. The first
173 * actual node is at head.getNext().
174 */
175 private transient volatile Node<E> head = new Node<E>(null, null);
176
177 /** Pointer to last node on list **/
178 private transient volatile Node<E> tail = head;
179
180
181 /**
182 * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
183 */
184 public ConcurrentLinkedQueue() {}
185
186 /**
187 * Creates a <tt>ConcurrentLinkedQueue</tt>
188 * initially containing the elements of the given collection,
189 * added in traversal order of the collection's iterator.
190 * @param c the collection of elements to initially contain
191 * @throws NullPointerException if the specified collection or any
192 * of its elements are null
193 */
194 public ConcurrentLinkedQueue(Collection<? extends E> c) {
195 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
196 add(it.next());
197 }
198
199 // Have to override just to update the javadoc
200
201 /**
202 * Inserts the specified element at the tail of this queue.
203 *
204 * @return <tt>true</tt> (as specified by {@link Collection#add})
205 * @throws NullPointerException if the specified element is null
206 */
207 public boolean add(E e) {
208 return offer(e);
209 }
210
211 /**
212 * Inserts the specified element at the tail of this queue.
213 *
214 * @return <tt>true</tt> (as specified by {@link Queue#offer})
215 * @throws NullPointerException if the specified element is null
216 */
217 public boolean offer(E e) {
218 if (e == null) throw new NullPointerException();
219 Node<E> n = new Node<E>(e, null);
220 for (;;) {
221 Node<E> t = tail;
222 Node<E> s = t.getNext();
223 if (t == tail) {
224 if (s == null) {
225 if (t.casNext(s, n)) {
226 casTail(t, n);
227 return true;
228 }
229 } else {
230 casTail(t, s);
231 }
232 }
233 }
234 }
235
236 public E poll() {
237 for (;;) {
238 Node<E> h = head;
239 Node<E> t = tail;
240 Node<E> first = h.getNext();
241 if (h == head) {
242 if (h == t) {
243 if (first == null)
244 return null;
245 else
246 casTail(t, first);
247 } else if (casHead(h, first)) {
248 E item = first.getItem();
249 if (item != null) {
250 first.setItem(null);
251 return item;
252 }
253 // else skip over deleted item, continue loop,
254 }
255 }
256 }
257 }
258
259 public E peek() { // same as poll except don't remove item
260 for (;;) {
261 Node<E> h = head;
262 Node<E> t = tail;
263 Node<E> first = h.getNext();
264 if (h == head) {
265 if (h == t) {
266 if (first == null)
267 return null;
268 else
269 casTail(t, first);
270 } else {
271 E item = first.getItem();
272 if (item != null)
273 return item;
274 else // remove deleted node and continue
275 casHead(h, first);
276 }
277 }
278 }
279 }
280
281 /**
282 * Returns the first actual (non-header) node on list. This is yet
283 * another variant of poll/peek; here returning out the first
284 * node, not element (so we cannot collapse with peek() without
285 * introducing race.)
286 */
287 Node<E> first() {
288 for (;;) {
289 Node<E> h = head;
290 Node<E> t = tail;
291 Node<E> first = h.getNext();
292 if (h == head) {
293 if (h == t) {
294 if (first == null)
295 return null;
296 else
297 casTail(t, first);
298 } else {
299 if (first.getItem() != null)
300 return first;
301 else // remove deleted node and continue
302 casHead(h, first);
303 }
304 }
305 }
306 }
307
308
309 /**
310 * Returns <tt>true</tt> if this queue contains no elements.
311 *
312 * @return <tt>true</tt> if this queue contains no elements
313 */
314 public boolean isEmpty() {
315 return first() == null;
316 }
317
318 /**
319 * Returns the number of elements in this queue. If this queue
320 * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
321 * <tt>Integer.MAX_VALUE</tt>.
322 *
323 * <p>Beware that, unlike in most collections, this method is
324 * <em>NOT</em> a constant-time operation. Because of the
325 * asynchronous nature of these queues, determining the current
326 * number of elements requires an O(n) traversal.
327 *
328 * @return the number of elements in this queue
329 */
330 public int size() {
331 int count = 0;
332 for (Node<E> p = first(); p != null; p = p.getNext()) {
333 if (p.getItem() != null) {
334 // Collections.size() spec says to max out
335 if (++count == Integer.MAX_VALUE)
336 break;
337 }
338 }
339 return count;
340 }
341
342 /**
343 * Returns <tt>true</tt> if this queue contains the specified element.
344 * More formally, returns <tt>true</tt> if and only if this queue contains
345 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
346 *
347 * @param o object to be checked for containment in this queue
348 * @return <tt>true</tt> if this queue contains the specified element
349 */
350 public boolean contains(Object o) {
351 if (o == null) return false;
352 for (Node<E> p = first(); p != null; p = p.getNext()) {
353 E item = p.getItem();
354 if (item != null &&
355 o.equals(item))
356 return true;
357 }
358 return false;
359 }
360
361 /**
362 * Removes a single instance of the specified element from this queue,
363 * if it is present. More formally, removes an element <tt>e</tt> such
364 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
365 * elements.
366 * Returns <tt>true</tt> if this queue contained the specified element
367 * (or equivalently, if this queue changed as a result of the call).
368 *
369 * @param o element to be removed from this queue, if present
370 * @return <tt>true</tt> if this queue changed as a result of the call
371 */
372 public boolean remove(Object o) {
373 if (o == null) return false;
374 for (Node<E> p = first(); p != null; p = p.getNext()) {
375 E item = p.getItem();
376 if (item != null &&
377 o.equals(item) &&
378 p.casItem(item, null))
379 return true;
380 }
381 return false;
382 }
383
384 /**
385 * Returns an array containing all of the elements in this queue, in
386 * proper sequence.
387 *
388 * <p>The returned array will be "safe" in that no references to it are
389 * maintained by this queue. (In other words, this method must allocate
390 * a new array). The caller is thus free to modify the returned array.
391 *
392 * <p>This method acts as bridge between array-based and collection-based
393 * APIs.
394 *
395 * @return an array containing all of the elements in this queue
396 */
397 public Object[] toArray() {
398 // Use ArrayList to deal with resizing.
399 ArrayList<E> al = new ArrayList<E>();
400 for (Node<E> p = first(); p != null; p = p.getNext()) {
401 E item = p.getItem();
402 if (item != null)
403 al.add(item);
404 }
405 return al.toArray();
406 }
407
408 /**
409 * Returns an array containing all of the elements in this queue, in
410 * proper sequence; the runtime type of the returned array is that of
411 * the specified array. If the queue fits in the specified array, it
412 * is returned therein. Otherwise, a new array is allocated with the
413 * runtime type of the specified array and the size of this queue.
414 *
415 * <p>If this queue fits in the specified array with room to spare
416 * (i.e., the array has more elements than this queue), the element in
417 * the array immediately following the end of the queue is set to
418 * <tt>null</tt>.
419 *
420 * <p>Like the {@link #toArray()} method, this method acts as bridge between
421 * array-based and collection-based APIs. Further, this method allows
422 * precise control over the runtime type of the output array, and may,
423 * under certain circumstances, be used to save allocation costs.
424 *
425 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
426 * The following code can be used to dump the queue into a newly
427 * allocated array of <tt>String</tt>:
428 *
429 * <pre>
430 * String[] y = x.toArray(new String[0]);</pre>
431 *
432 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
433 * <tt>toArray()</tt>.
434 *
435 * @param a the array into which the elements of the queue are to
436 * be stored, if it is big enough; otherwise, a new array of the
437 * same runtime type is allocated for this purpose
438 * @return an array containing all of the elements in this queue
439 * @throws ArrayStoreException if the runtime type of the specified array
440 * is not a supertype of the runtime type of every element in
441 * this queue
442 * @throws NullPointerException if the specified array is null
443 */
444 public <T> T[] toArray(T[] a) {
445 // try to use sent-in array
446 int k = 0;
447 Node<E> p;
448 for (p = first(); p != null && k < a.length; p = p.getNext()) {
449 E item = p.getItem();
450 if (item != null)
451 a[k++] = (T)item;
452 }
453 if (p == null) {
454 if (k < a.length)
455 a[k] = null;
456 return a;
457 }
458
459 // If won't fit, use ArrayList version
460 ArrayList<E> al = new ArrayList<E>();
461 for (Node<E> q = first(); q != null; q = q.getNext()) {
462 E item = q.getItem();
463 if (item != null)
464 al.add(item);
465 }
466 return al.toArray(a);
467 }
468
469 /**
470 * Returns an iterator over the elements in this queue in proper sequence.
471 * The returned iterator is a "weakly consistent" iterator that
472 * will never throw {@link ConcurrentModificationException},
473 * and guarantees to traverse elements as they existed upon
474 * construction of the iterator, and may (but is not guaranteed to)
475 * reflect any modifications subsequent to construction.
476 *
477 * @return an iterator over the elements in this queue in proper sequence
478 */
479 public Iterator<E> iterator() {
480 return new Itr();
481 }
482
483 private class Itr implements Iterator<E> {
484 /**
485 * Next node to return item for.
486 */
487 private Node<E> nextNode;
488
489 /**
490 * nextItem holds on to item fields because once we claim
491 * that an element exists in hasNext(), we must return it in
492 * the following next() call even if it was in the process of
493 * being removed when hasNext() was called.
494 */
495 private E nextItem;
496
497 /**
498 * Node of the last returned item, to support remove.
499 */
500 private Node<E> lastRet;
501
502 Itr() {
503 advance();
504 }
505
506 /**
507 * Moves to next valid node and returns item to return for
508 * next(), or null if no such.
509 */
510 private E advance() {
511 lastRet = nextNode;
512 E x = nextItem;
513
514 Node<E> p = (nextNode == null)? first() : nextNode.getNext();
515 for (;;) {
516 if (p == null) {
517 nextNode = null;
518 nextItem = null;
519 return x;
520 }
521 E item = p.getItem();
522 if (item != null) {
523 nextNode = p;
524 nextItem = item;
525 return x;
526 } else // skip over nulls
527 p = p.getNext();
528 }
529 }
530
531 public boolean hasNext() {
532 return nextNode != null;
533 }
534
535 public E next() {
536 if (nextNode == null) throw new NoSuchElementException();
537 return advance();
538 }
539
540 public void remove() {
541 Node<E> l = lastRet;
542 if (l == null) throw new IllegalStateException();
543 // rely on a future traversal to relink.
544 l.setItem(null);
545 lastRet = null;
546 }
547 }
548
549 /**
550 * Save the state to a stream (that is, serialize it).
551 *
552 * @serialData All of the elements (each an <tt>E</tt>) in
553 * the proper order, followed by a null
554 * @param s the stream
555 */
556 private void writeObject(java.io.ObjectOutputStream s)
557 throws java.io.IOException {
558
559 // Write out any hidden stuff
560 s.defaultWriteObject();
561
562 // Write out all elements in the proper order.
563 for (Node<E> p = first(); p != null; p = p.getNext()) {
564 Object item = p.getItem();
565 if (item != null)
566 s.writeObject(item);
567 }
568
569 // Use trailing null as sentinel
570 s.writeObject(null);
571 }
572
573 /**
574 * Reconstitute the Queue instance from a stream (that is,
575 * deserialize it).
576 * @param s the stream
577 */
578 private void readObject(java.io.ObjectInputStream s)
579 throws java.io.IOException, ClassNotFoundException {
580 // Read in capacity, and any hidden stuff
581 s.defaultReadObject();
582 head = new Node<E>(null, null);
583 tail = head;
584 // Read in all elements and place in queue
585 for (;;) {
586 E item = (E)s.readObject();
587 if (item == null)
588 break;
589 else
590 offer(item);
591 }
592 }
593
594}