blob: 94427df43e8c7bf33d5547e7e3a5eeab7d3e877c [file] [log] [blame]
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8package jsr166;
9
10import junit.framework.*;
11import java.util.Arrays;
12import java.util.ArrayList;
13import java.util.Collection;
14import java.util.Iterator;
15import java.util.List;
16import java.util.NoSuchElementException;
17import java.util.Queue;
18import java.util.concurrent.BlockingQueue;
19import java.util.concurrent.CountDownLatch;
20import java.util.concurrent.Executors;
21import java.util.concurrent.ExecutorService;
22import java.util.concurrent.LinkedTransferQueue;
23import static java.util.concurrent.TimeUnit.MILLISECONDS;
24import static java.util.concurrent.TimeUnit.NANOSECONDS;
25
26@SuppressWarnings({"unchecked", "rawtypes"})
Calin Juravle008167d2014-05-15 19:29:37 +010027public class LinkedTransferQueueTest extends BlockingQueueTest {
Calin Juravle8f0d92b2013-08-01 17:26:00 +010028
Calin Juravle008167d2014-05-15 19:29:37 +010029 protected BlockingQueue emptyCollection() {
30 return new LinkedTransferQueue();
Calin Juravle8f0d92b2013-08-01 17:26:00 +010031 }
32
33 /**
34 * Constructor builds new queue with size being zero and empty
35 * being true
36 */
37 public void testConstructor1() {
38 assertEquals(0, new LinkedTransferQueue().size());
39 assertTrue(new LinkedTransferQueue().isEmpty());
40 }
41
42 /**
43 * Initializing constructor with null collection throws
44 * NullPointerException
45 */
46 public void testConstructor2() {
47 try {
48 new LinkedTransferQueue(null);
49 shouldThrow();
50 } catch (NullPointerException success) {}
51 }
52
53 /**
54 * Initializing from Collection of null elements throws
55 * NullPointerException
56 */
57 public void testConstructor3() {
58 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
59 try {
60 new LinkedTransferQueue(elements);
61 shouldThrow();
62 } catch (NullPointerException success) {}
63 }
64
65 /**
66 * Initializing constructor with a collection containing some null elements
67 * throws NullPointerException
68 */
69 public void testConstructor4() {
70 Integer[] ints = new Integer[SIZE];
71 for (int i = 0; i < SIZE-1; ++i)
72 ints[i] = i;
73 Collection<Integer> elements = Arrays.asList(ints);
74 try {
75 new LinkedTransferQueue(elements);
76 shouldThrow();
77 } catch (NullPointerException success) {}
78 }
79
80 /**
81 * Queue contains all elements of the collection it is initialized by
82 */
83 public void testConstructor5() {
84 Integer[] ints = new Integer[SIZE];
85 for (int i = 0; i < SIZE; ++i) {
86 ints[i] = i;
87 }
88 List intList = Arrays.asList(ints);
89 LinkedTransferQueue q
90 = new LinkedTransferQueue(intList);
91 assertEquals(q.size(), intList.size());
92 assertEquals(q.toString(), intList.toString());
93 assertTrue(Arrays.equals(q.toArray(),
94 intList.toArray()));
95 assertTrue(Arrays.equals(q.toArray(new Object[0]),
96 intList.toArray(new Object[0])));
97 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
98 intList.toArray(new Object[SIZE])));
99 for (int i = 0; i < SIZE; ++i) {
100 assertEquals(ints[i], q.poll());
101 }
102 }
103
104 /**
105 * remainingCapacity() always returns Integer.MAX_VALUE
106 */
107 public void testRemainingCapacity() {
108 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
109 for (int i = 0; i < SIZE; ++i) {
110 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
111 assertEquals(SIZE - i, q.size());
112 q.remove();
113 }
114 for (int i = 0; i < SIZE; ++i) {
115 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
116 assertEquals(i, q.size());
117 q.add(i);
118 }
119 }
120
121 /**
122 * addAll(this) throws IllegalArgumentException
123 */
124 public void testAddAllSelf() {
125 try {
126 LinkedTransferQueue q = populatedQueue(SIZE);
127 q.addAll(q);
128 shouldThrow();
129 } catch (IllegalArgumentException success) {}
130 }
131
132 /**
133 * addAll of a collection with any null elements throws
134 * NullPointerException after possibly adding some elements
135 */
136 public void testAddAll3() {
137 try {
138 LinkedTransferQueue q = new LinkedTransferQueue();
139 Integer[] ints = new Integer[SIZE];
140 for (int i = 0; i < SIZE - 1; ++i) {
141 ints[i] = i;
142 }
143 q.addAll(Arrays.asList(ints));
144 shouldThrow();
145 } catch (NullPointerException success) {}
146 }
147
148 /**
149 * Queue contains all elements, in traversal order, of successful addAll
150 */
151 public void testAddAll5() {
152 Integer[] empty = new Integer[0];
153 Integer[] ints = new Integer[SIZE];
154 for (int i = 0; i < SIZE; ++i) {
155 ints[i] = i;
156 }
157 LinkedTransferQueue q = new LinkedTransferQueue();
158 assertFalse(q.addAll(Arrays.asList(empty)));
159 assertTrue(q.addAll(Arrays.asList(ints)));
160 for (int i = 0; i < SIZE; ++i) {
161 assertEquals(ints[i], q.poll());
162 }
163 }
164
165 /**
166 * all elements successfully put are contained
167 */
168 public void testPut() {
169 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
170 for (int i = 0; i < SIZE; ++i) {
171 assertEquals(i, q.size());
172 q.put(i);
173 assertTrue(q.contains(i));
174 }
175 }
176
177 /**
178 * take retrieves elements in FIFO order
179 */
180 public void testTake() throws InterruptedException {
181 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
182 for (int i = 0; i < SIZE; ++i) {
183 assertEquals(i, (int) q.take());
184 }
185 }
186
187 /**
188 * take removes existing elements until empty, then blocks interruptibly
189 */
190 public void testBlockingTake() throws InterruptedException {
191 final BlockingQueue q = populatedQueue(SIZE);
192 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
193 Thread t = newStartedThread(new CheckedRunnable() {
194 public void realRun() throws InterruptedException {
195 for (int i = 0; i < SIZE; ++i) {
196 assertEquals(i, q.take());
197 }
198
199 Thread.currentThread().interrupt();
200 try {
201 q.take();
202 shouldThrow();
203 } catch (InterruptedException success) {}
204 assertFalse(Thread.interrupted());
205
206 pleaseInterrupt.countDown();
207 try {
208 q.take();
209 shouldThrow();
210 } catch (InterruptedException success) {}
211 assertFalse(Thread.interrupted());
212 }});
213
214 await(pleaseInterrupt);
215 assertThreadStaysAlive(t);
216 t.interrupt();
217 awaitTermination(t);
218 }
219
220 /**
221 * poll succeeds unless empty
222 */
223 public void testPoll() throws InterruptedException {
224 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
225 for (int i = 0; i < SIZE; ++i) {
226 assertEquals(i, (int) q.poll());
227 }
228 assertNull(q.poll());
229 checkEmpty(q);
230 }
231
232 /**
233 * timed poll with zero timeout succeeds when non-empty, else times out
234 */
235 public void testTimedPoll0() throws InterruptedException {
236 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
237 for (int i = 0; i < SIZE; ++i) {
238 assertEquals(i, (int) q.poll(0, MILLISECONDS));
239 }
240 assertNull(q.poll(0, MILLISECONDS));
241 checkEmpty(q);
242 }
243
244 /**
245 * timed poll with nonzero timeout succeeds when non-empty, else times out
246 */
247 public void testTimedPoll() throws InterruptedException {
248 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
249 for (int i = 0; i < SIZE; ++i) {
250 long startTime = System.nanoTime();
251 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
252 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
253 }
254 long startTime = System.nanoTime();
255 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
256 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
257 checkEmpty(q);
258 }
259
260 /**
261 * Interrupted timed poll throws InterruptedException instead of
262 * returning timeout status
263 */
264 public void testInterruptedTimedPoll() throws InterruptedException {
265 final BlockingQueue<Integer> q = populatedQueue(SIZE);
266 final CountDownLatch aboutToWait = new CountDownLatch(1);
267 Thread t = newStartedThread(new CheckedRunnable() {
268 public void realRun() throws InterruptedException {
269 for (int i = 0; i < SIZE; ++i) {
270 long t0 = System.nanoTime();
271 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
272 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
273 }
274 long t0 = System.nanoTime();
275 aboutToWait.countDown();
276 try {
277 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
278 shouldThrow();
279 } catch (InterruptedException success) {
280 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
281 }
282 }});
283
284 aboutToWait.await();
285 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
286 t.interrupt();
287 awaitTermination(t, MEDIUM_DELAY_MS);
288 checkEmpty(q);
289 }
290
291 /**
292 * timed poll after thread interrupted throws InterruptedException
293 * instead of returning timeout status
294 */
295 public void testTimedPollAfterInterrupt() throws InterruptedException {
296 final BlockingQueue<Integer> q = populatedQueue(SIZE);
297 Thread t = newStartedThread(new CheckedRunnable() {
298 public void realRun() throws InterruptedException {
299 Thread.currentThread().interrupt();
300 for (int i = 0; i < SIZE; ++i) {
301 long t0 = System.nanoTime();
302 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
303 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
304 }
305 try {
306 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
307 shouldThrow();
308 } catch (InterruptedException success) {}
309 }});
310
311 awaitTermination(t, MEDIUM_DELAY_MS);
312 checkEmpty(q);
313 }
314
315 /**
316 * peek returns next element, or null if empty
317 */
318 public void testPeek() throws InterruptedException {
319 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
320 for (int i = 0; i < SIZE; ++i) {
321 assertEquals(i, (int) q.peek());
322 assertEquals(i, (int) q.poll());
323 assertTrue(q.peek() == null ||
324 i != (int) q.peek());
325 }
326 assertNull(q.peek());
327 checkEmpty(q);
328 }
329
330 /**
331 * element returns next element, or throws NoSuchElementException if empty
332 */
333 public void testElement() throws InterruptedException {
334 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
335 for (int i = 0; i < SIZE; ++i) {
336 assertEquals(i, (int) q.element());
337 assertEquals(i, (int) q.poll());
338 }
339 try {
340 q.element();
341 shouldThrow();
342 } catch (NoSuchElementException success) {}
343 checkEmpty(q);
344 }
345
346 /**
347 * remove removes next element, or throws NoSuchElementException if empty
348 */
349 public void testRemove() throws InterruptedException {
350 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
351 for (int i = 0; i < SIZE; ++i) {
352 assertEquals(i, (int) q.remove());
353 }
354 try {
355 q.remove();
356 shouldThrow();
357 } catch (NoSuchElementException success) {}
358 checkEmpty(q);
359 }
360
361 /**
362 * An add following remove(x) succeeds
363 */
364 public void testRemoveElementAndAdd() throws InterruptedException {
365 LinkedTransferQueue q = new LinkedTransferQueue();
366 assertTrue(q.add(one));
367 assertTrue(q.add(two));
368 assertTrue(q.remove(one));
369 assertTrue(q.remove(two));
370 assertTrue(q.add(three));
371 assertSame(q.take(), three);
372 }
373
374 /**
375 * contains(x) reports true when elements added but not yet removed
376 */
377 public void testContains() {
378 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
379 for (int i = 0; i < SIZE; ++i) {
380 assertTrue(q.contains(i));
381 assertEquals(i, (int) q.poll());
382 assertFalse(q.contains(i));
383 }
384 }
385
386 /**
387 * clear removes all elements
388 */
389 public void testClear() throws InterruptedException {
390 LinkedTransferQueue q = populatedQueue(SIZE);
391 q.clear();
392 checkEmpty(q);
393 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
394 q.add(one);
395 assertFalse(q.isEmpty());
396 assertEquals(1, q.size());
397 assertTrue(q.contains(one));
398 q.clear();
399 checkEmpty(q);
400 }
401
402 /**
403 * containsAll(c) is true when c contains a subset of elements
404 */
405 public void testContainsAll() {
406 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
407 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
408 for (int i = 0; i < SIZE; ++i) {
409 assertTrue(q.containsAll(p));
410 assertFalse(p.containsAll(q));
411 p.add(i);
412 }
413 assertTrue(p.containsAll(q));
414 }
415
416 /**
417 * retainAll(c) retains only those elements of c and reports true
418 * if changed
419 */
420 public void testRetainAll() {
421 LinkedTransferQueue q = populatedQueue(SIZE);
422 LinkedTransferQueue p = populatedQueue(SIZE);
423 for (int i = 0; i < SIZE; ++i) {
424 boolean changed = q.retainAll(p);
425 if (i == 0) {
426 assertFalse(changed);
427 } else {
428 assertTrue(changed);
429 }
430 assertTrue(q.containsAll(p));
431 assertEquals(SIZE - i, q.size());
432 p.remove();
433 }
434 }
435
436 /**
437 * removeAll(c) removes only those elements of c and reports true
438 * if changed
439 */
440 public void testRemoveAll() {
441 for (int i = 1; i < SIZE; ++i) {
442 LinkedTransferQueue q = populatedQueue(SIZE);
443 LinkedTransferQueue p = populatedQueue(i);
444 assertTrue(q.removeAll(p));
445 assertEquals(SIZE - i, q.size());
446 for (int j = 0; j < i; ++j) {
447 assertFalse(q.contains(p.remove()));
448 }
449 }
450 }
451
452 /**
453 * toArray() contains all elements in FIFO order
454 */
455 public void testToArray() {
456 LinkedTransferQueue q = populatedQueue(SIZE);
457 Object[] o = q.toArray();
458 for (int i = 0; i < o.length; i++) {
459 assertSame(o[i], q.poll());
460 }
461 }
462
463 /**
464 * toArray(a) contains all elements in FIFO order
465 */
466 public void testToArray2() {
467 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
468 Integer[] ints = new Integer[SIZE];
469 Integer[] array = q.toArray(ints);
470 assertSame(ints, array);
471 for (int i = 0; i < ints.length; i++) {
472 assertSame(ints[i], q.poll());
473 }
474 }
475
476 /**
477 * toArray(incompatible array type) throws ArrayStoreException
478 */
479 public void testToArray1_BadArg() {
480 LinkedTransferQueue q = populatedQueue(SIZE);
481 try {
482 q.toArray(new String[10]);
483 shouldThrow();
484 } catch (ArrayStoreException success) {}
485 }
486
487 /**
488 * iterator iterates through all elements
489 */
490 public void testIterator() throws InterruptedException {
491 LinkedTransferQueue q = populatedQueue(SIZE);
492 Iterator it = q.iterator();
493 int i = 0;
494 while (it.hasNext()) {
495 assertEquals(it.next(), i++);
496 }
497 assertEquals(i, SIZE);
498 }
499
500 /**
501 * iterator.remove() removes current element
502 */
503 public void testIteratorRemove() {
504 final LinkedTransferQueue q = new LinkedTransferQueue();
505 q.add(two);
506 q.add(one);
507 q.add(three);
508
509 Iterator it = q.iterator();
510 it.next();
511 it.remove();
512
513 it = q.iterator();
514 assertSame(it.next(), one);
515 assertSame(it.next(), three);
516 assertFalse(it.hasNext());
517 }
518
519 /**
520 * iterator ordering is FIFO
521 */
522 public void testIteratorOrdering() {
523 final LinkedTransferQueue<Integer> q
524 = new LinkedTransferQueue<Integer>();
525 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
526 q.add(one);
527 q.add(two);
528 q.add(three);
529 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
530 int k = 0;
531 for (Integer n : q) {
532 assertEquals(++k, (int) n);
533 }
534 assertEquals(3, k);
535 }
536
537 /**
538 * Modifications do not cause iterators to fail
539 */
540 public void testWeaklyConsistentIteration() {
541 final LinkedTransferQueue q = new LinkedTransferQueue();
542 q.add(one);
543 q.add(two);
544 q.add(three);
545 for (Iterator it = q.iterator(); it.hasNext();) {
546 q.remove();
547 it.next();
548 }
549 assertEquals(0, q.size());
550 }
551
552 /**
553 * toString contains toStrings of elements
554 */
555 public void testToString() {
556 LinkedTransferQueue q = populatedQueue(SIZE);
557 String s = q.toString();
558 for (int i = 0; i < SIZE; ++i) {
559 assertTrue(s.contains(String.valueOf(i)));
560 }
561 }
562
563 /**
564 * offer transfers elements across Executor tasks
565 */
566 public void testOfferInExecutor() {
567 final LinkedTransferQueue q = new LinkedTransferQueue();
568 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
569 ExecutorService executor = Executors.newFixedThreadPool(2);
570
571 executor.execute(new CheckedRunnable() {
572 public void realRun() throws InterruptedException {
573 threadsStarted.await();
574 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
575 }});
576
577 executor.execute(new CheckedRunnable() {
578 public void realRun() throws InterruptedException {
579 threadsStarted.await();
580 assertSame(one, q.take());
581 checkEmpty(q);
582 }});
583
584 joinPool(executor);
585 }
586
587 /**
588 * timed poll retrieves elements across Executor threads
589 */
590 public void testPollInExecutor() {
591 final LinkedTransferQueue q = new LinkedTransferQueue();
592 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
593 ExecutorService executor = Executors.newFixedThreadPool(2);
594
595 executor.execute(new CheckedRunnable() {
596 public void realRun() throws InterruptedException {
597 assertNull(q.poll());
598 threadsStarted.await();
599 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
600 checkEmpty(q);
601 }});
602
603 executor.execute(new CheckedRunnable() {
604 public void realRun() throws InterruptedException {
605 threadsStarted.await();
606 q.put(one);
607 }});
608
609 joinPool(executor);
610 }
611
612 /**
613 * A deserialized serialized queue has same elements in same order
614 */
615 public void testSerialization() throws Exception {
616 Queue x = populatedQueue(SIZE);
617 Queue y = serialClone(x);
618
619 assertNotSame(y, x);
620 assertEquals(x.size(), y.size());
621 assertEquals(x.toString(), y.toString());
622 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
623 while (!x.isEmpty()) {
624 assertFalse(y.isEmpty());
625 assertEquals(x.remove(), y.remove());
626 }
627 assertTrue(y.isEmpty());
628 }
629
630 /**
631 * drainTo(c) empties queue into another collection c
632 */
633 public void testDrainTo() {
634 LinkedTransferQueue q = populatedQueue(SIZE);
635 ArrayList l = new ArrayList();
636 q.drainTo(l);
637 assertEquals(0, q.size());
638 assertEquals(SIZE, l.size());
639 for (int i = 0; i < SIZE; ++i) {
640 assertEquals(i, l.get(i));
641 }
642 q.add(zero);
643 q.add(one);
644 assertFalse(q.isEmpty());
645 assertTrue(q.contains(zero));
646 assertTrue(q.contains(one));
647 l.clear();
648 q.drainTo(l);
649 assertEquals(0, q.size());
650 assertEquals(2, l.size());
651 for (int i = 0; i < 2; ++i) {
652 assertEquals(i, l.get(i));
653 }
654 }
655
656 /**
657 * drainTo(c) empties full queue, unblocking a waiting put.
658 */
659 public void testDrainToWithActivePut() throws InterruptedException {
660 final LinkedTransferQueue q = populatedQueue(SIZE);
661 Thread t = newStartedThread(new CheckedRunnable() {
662 public void realRun() {
663 q.put(SIZE + 1);
664 }});
665 ArrayList l = new ArrayList();
666 q.drainTo(l);
667 assertTrue(l.size() >= SIZE);
668 for (int i = 0; i < SIZE; ++i)
669 assertEquals(i, l.get(i));
670 awaitTermination(t, MEDIUM_DELAY_MS);
671 assertTrue(q.size() + l.size() >= SIZE);
672 }
673
674 /**
675 * drainTo(c, n) empties first min(n, size) elements of queue into c
676 */
677 public void testDrainToN() {
678 LinkedTransferQueue q = new LinkedTransferQueue();
679 for (int i = 0; i < SIZE + 2; ++i) {
680 for (int j = 0; j < SIZE; j++) {
681 assertTrue(q.offer(j));
682 }
683 ArrayList l = new ArrayList();
684 q.drainTo(l, i);
685 int k = (i < SIZE) ? i : SIZE;
686 assertEquals(k, l.size());
687 assertEquals(SIZE - k, q.size());
688 for (int j = 0; j < k; ++j)
689 assertEquals(j, l.get(j));
690 while (q.poll() != null)
691 ;
692 }
693 }
694
695 /**
696 * timed poll() or take() increments the waiting consumer count;
697 * offer(e) decrements the waiting consumer count
698 */
699 public void testWaitingConsumer() throws InterruptedException {
700 final LinkedTransferQueue q = new LinkedTransferQueue();
701 assertEquals(0, q.getWaitingConsumerCount());
702 assertFalse(q.hasWaitingConsumer());
703 final CountDownLatch threadStarted = new CountDownLatch(1);
704
705 Thread t = newStartedThread(new CheckedRunnable() {
706 public void realRun() throws InterruptedException {
707 threadStarted.countDown();
708 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
709 assertEquals(0, q.getWaitingConsumerCount());
710 assertFalse(q.hasWaitingConsumer());
711 }});
712
713 threadStarted.await();
714 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
715 assertEquals(1, q.getWaitingConsumerCount());
716 assertTrue(q.hasWaitingConsumer());
717
718 assertTrue(q.offer(one));
719 assertEquals(0, q.getWaitingConsumerCount());
720 assertFalse(q.hasWaitingConsumer());
721
722 awaitTermination(t, MEDIUM_DELAY_MS);
723 }
724
725 /**
726 * transfer(null) throws NullPointerException
727 */
728 public void testTransfer1() throws InterruptedException {
729 try {
730 LinkedTransferQueue q = new LinkedTransferQueue();
731 q.transfer(null);
732 shouldThrow();
733 } catch (NullPointerException success) {}
734 }
735
736 /**
737 * transfer waits until a poll occurs. The transfered element
738 * is returned by this associated poll.
739 */
740 public void testTransfer2() throws InterruptedException {
741 final LinkedTransferQueue<Integer> q
742 = new LinkedTransferQueue<Integer>();
743 final CountDownLatch threadStarted = new CountDownLatch(1);
744
745 Thread t = newStartedThread(new CheckedRunnable() {
746 public void realRun() throws InterruptedException {
747 threadStarted.countDown();
748 q.transfer(five);
749 checkEmpty(q);
750 }});
751
752 threadStarted.await();
753 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
754 assertEquals(1, q.size());
755 assertSame(five, q.poll());
756 checkEmpty(q);
757 awaitTermination(t, MEDIUM_DELAY_MS);
758 }
759
760 /**
761 * transfer waits until a poll occurs, and then transfers in fifo order
762 */
763 public void testTransfer3() throws InterruptedException {
764 final LinkedTransferQueue<Integer> q
765 = new LinkedTransferQueue<Integer>();
766
767 Thread first = newStartedThread(new CheckedRunnable() {
768 public void realRun() throws InterruptedException {
769 q.transfer(four);
770 assertTrue(!q.contains(four));
771 assertEquals(1, q.size());
772 }});
773
774 Thread interruptedThread = newStartedThread(
775 new CheckedInterruptedRunnable() {
776 public void realRun() throws InterruptedException {
777 while (q.isEmpty())
778 Thread.yield();
779 q.transfer(five);
780 }});
781
782 while (q.size() < 2)
783 Thread.yield();
784 assertEquals(2, q.size());
785 assertSame(four, q.poll());
786 first.join();
787 assertEquals(1, q.size());
788 interruptedThread.interrupt();
789 interruptedThread.join();
790 checkEmpty(q);
791 }
792
793 /**
794 * transfer waits until a poll occurs, at which point the polling
795 * thread returns the element
796 */
797 public void testTransfer4() throws InterruptedException {
798 final LinkedTransferQueue q = new LinkedTransferQueue();
799
800 Thread t = newStartedThread(new CheckedRunnable() {
801 public void realRun() throws InterruptedException {
802 q.transfer(four);
803 assertFalse(q.contains(four));
804 assertSame(three, q.poll());
805 }});
806
807 while (q.isEmpty())
808 Thread.yield();
809 assertFalse(q.isEmpty());
810 assertEquals(1, q.size());
811 assertTrue(q.offer(three));
812 assertSame(four, q.poll());
813 awaitTermination(t, MEDIUM_DELAY_MS);
814 }
815
816 /**
817 * transfer waits until a take occurs. The transfered element
818 * is returned by this associated take.
819 */
820 public void testTransfer5() throws InterruptedException {
821 final LinkedTransferQueue<Integer> q
822 = new LinkedTransferQueue<Integer>();
823
824 Thread t = newStartedThread(new CheckedRunnable() {
825 public void realRun() throws InterruptedException {
826 q.transfer(four);
827 checkEmpty(q);
828 }});
829
830 while (q.isEmpty())
831 Thread.yield();
832 assertFalse(q.isEmpty());
833 assertEquals(1, q.size());
834 assertSame(four, q.take());
835 checkEmpty(q);
836 awaitTermination(t, MEDIUM_DELAY_MS);
837 }
838
839 /**
840 * tryTransfer(null) throws NullPointerException
841 */
842 public void testTryTransfer1() {
843 try {
844 final LinkedTransferQueue q = new LinkedTransferQueue();
845 q.tryTransfer(null);
846 shouldThrow();
847 } catch (NullPointerException success) {}
848 }
849
850 /**
851 * tryTransfer returns false and does not enqueue if there are no
852 * consumers waiting to poll or take.
853 */
854 public void testTryTransfer2() throws InterruptedException {
855 final LinkedTransferQueue q = new LinkedTransferQueue();
856 assertFalse(q.tryTransfer(new Object()));
857 assertFalse(q.hasWaitingConsumer());
858 checkEmpty(q);
859 }
860
861 /**
862 * If there is a consumer waiting in timed poll, tryTransfer
863 * returns true while successfully transfering object.
864 */
865 public void testTryTransfer3() throws InterruptedException {
866 final Object hotPotato = new Object();
867 final LinkedTransferQueue q = new LinkedTransferQueue();
868
869 Thread t = newStartedThread(new CheckedRunnable() {
870 public void realRun() {
871 while (! q.hasWaitingConsumer())
872 Thread.yield();
873 assertTrue(q.hasWaitingConsumer());
874 checkEmpty(q);
875 assertTrue(q.tryTransfer(hotPotato));
876 }});
877
878 assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
879 checkEmpty(q);
880 awaitTermination(t, MEDIUM_DELAY_MS);
881 }
882
883 /**
884 * If there is a consumer waiting in take, tryTransfer returns
885 * true while successfully transfering object.
886 */
887 public void testTryTransfer4() throws InterruptedException {
888 final Object hotPotato = new Object();
889 final LinkedTransferQueue q = new LinkedTransferQueue();
890
891 Thread t = newStartedThread(new CheckedRunnable() {
892 public void realRun() {
893 while (! q.hasWaitingConsumer())
894 Thread.yield();
895 assertTrue(q.hasWaitingConsumer());
896 checkEmpty(q);
897 assertTrue(q.tryTransfer(hotPotato));
898 }});
899
900 assertSame(q.take(), hotPotato);
901 checkEmpty(q);
902 awaitTermination(t, MEDIUM_DELAY_MS);
903 }
904
905 /**
906 * tryTransfer blocks interruptibly if no takers
907 */
908 public void testTryTransfer5() throws InterruptedException {
909 final LinkedTransferQueue q = new LinkedTransferQueue();
910 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
911 assertTrue(q.isEmpty());
912
913 Thread t = newStartedThread(new CheckedRunnable() {
914 public void realRun() throws InterruptedException {
915 Thread.currentThread().interrupt();
916 try {
917 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
918 shouldThrow();
919 } catch (InterruptedException success) {}
920 assertFalse(Thread.interrupted());
921
922 pleaseInterrupt.countDown();
923 try {
924 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
925 shouldThrow();
926 } catch (InterruptedException success) {}
927 assertFalse(Thread.interrupted());
928 }});
929
930 await(pleaseInterrupt);
931 assertThreadStaysAlive(t);
932 t.interrupt();
933 awaitTermination(t);
934 checkEmpty(q);
935 }
936
937 /**
938 * tryTransfer gives up after the timeout and returns false
939 */
940 public void testTryTransfer6() throws InterruptedException {
941 final LinkedTransferQueue q = new LinkedTransferQueue();
942
943 Thread t = newStartedThread(new CheckedRunnable() {
944 public void realRun() throws InterruptedException {
945 long t0 = System.nanoTime();
946 assertFalse(q.tryTransfer(new Object(),
947 timeoutMillis(), MILLISECONDS));
948 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
949 checkEmpty(q);
950 }});
951
952 awaitTermination(t);
953 checkEmpty(q);
954 }
955
956 /**
957 * tryTransfer waits for any elements previously in to be removed
958 * before transfering to a poll or take
959 */
960 public void testTryTransfer7() throws InterruptedException {
961 final LinkedTransferQueue q = new LinkedTransferQueue();
962 assertTrue(q.offer(four));
963
964 Thread t = newStartedThread(new CheckedRunnable() {
965 public void realRun() throws InterruptedException {
966 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS));
967 checkEmpty(q);
968 }});
969
970 while (q.size() != 2)
971 Thread.yield();
972 assertEquals(2, q.size());
973 assertSame(four, q.poll());
974 assertSame(five, q.poll());
975 checkEmpty(q);
976 awaitTermination(t, MEDIUM_DELAY_MS);
977 }
978
979 /**
980 * tryTransfer attempts to enqueue into the queue and fails
981 * returning false not enqueueing and the successive poll is null
982 */
983 public void testTryTransfer8() throws InterruptedException {
984 final LinkedTransferQueue q = new LinkedTransferQueue();
985 assertTrue(q.offer(four));
986 assertEquals(1, q.size());
987 long t0 = System.nanoTime();
988 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
989 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
990 assertEquals(1, q.size());
991 assertSame(four, q.poll());
992 assertNull(q.poll());
993 checkEmpty(q);
994 }
995
996 private LinkedTransferQueue<Integer> populatedQueue(int n) {
997 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
998 checkEmpty(q);
999 for (int i = 0; i < n; i++) {
1000 assertEquals(i, q.size());
1001 assertTrue(q.offer(i));
1002 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1003 }
1004 assertFalse(q.isEmpty());
1005 return q;
1006 }
1007}