blob: 8d3f276e01fcaa735f6053131bdd8d31d3354803 [file] [log] [blame]
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8package jsr166;
9
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010010import static java.util.concurrent.TimeUnit.MILLISECONDS;
11
Calin Juravle8f0d92b2013-08-01 17:26:00 +010012import java.util.ArrayList;
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010013import java.util.Arrays;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010014import java.util.Collection;
15import java.util.Iterator;
16import java.util.List;
17import java.util.NoSuchElementException;
18import java.util.Queue;
19import java.util.concurrent.BlockingQueue;
20import java.util.concurrent.CountDownLatch;
21import java.util.concurrent.Executors;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.LinkedTransferQueue;
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010024
25import junit.framework.Test;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010026
27@SuppressWarnings({"unchecked", "rawtypes"})
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010028// android-changed: Extend BlockingQueueTest directly.
Calin Juravle008167d2014-05-15 19:29:37 +010029public class LinkedTransferQueueTest extends BlockingQueueTest {
Calin Juravle8f0d92b2013-08-01 17:26:00 +010030
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010031 // android-changed: Extend BlockingQueueTest directly.
32 //
33 // public static class Generic extends BlockingQueueTest {
34 // protected BlockingQueue emptyCollection() {
35 // return new LinkedTransferQueue();
36 // }
37 // }
38 //
39 // public static void main(String[] args) {
40 // main(suite(), args);
41 // }
42 //
43 // public static Test suite() {
44 // return newTestSuite(LinkedTransferQueueTest.class,
45 // new Generic().testSuite());
46 // }
47
Calin Juravle008167d2014-05-15 19:29:37 +010048 protected BlockingQueue emptyCollection() {
49 return new LinkedTransferQueue();
Calin Juravle8f0d92b2013-08-01 17:26:00 +010050 }
51
52 /**
53 * Constructor builds new queue with size being zero and empty
54 * being true
55 */
56 public void testConstructor1() {
57 assertEquals(0, new LinkedTransferQueue().size());
58 assertTrue(new LinkedTransferQueue().isEmpty());
59 }
60
61 /**
62 * Initializing constructor with null collection throws
63 * NullPointerException
64 */
65 public void testConstructor2() {
66 try {
67 new LinkedTransferQueue(null);
68 shouldThrow();
69 } catch (NullPointerException success) {}
70 }
71
72 /**
73 * Initializing from Collection of null elements throws
74 * NullPointerException
75 */
76 public void testConstructor3() {
77 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
78 try {
79 new LinkedTransferQueue(elements);
80 shouldThrow();
81 } catch (NullPointerException success) {}
82 }
83
84 /**
85 * Initializing constructor with a collection containing some null elements
86 * throws NullPointerException
87 */
88 public void testConstructor4() {
89 Integer[] ints = new Integer[SIZE];
90 for (int i = 0; i < SIZE-1; ++i)
91 ints[i] = i;
92 Collection<Integer> elements = Arrays.asList(ints);
93 try {
94 new LinkedTransferQueue(elements);
95 shouldThrow();
96 } catch (NullPointerException success) {}
97 }
98
99 /**
100 * Queue contains all elements of the collection it is initialized by
101 */
102 public void testConstructor5() {
103 Integer[] ints = new Integer[SIZE];
104 for (int i = 0; i < SIZE; ++i) {
105 ints[i] = i;
106 }
107 List intList = Arrays.asList(ints);
108 LinkedTransferQueue q
109 = new LinkedTransferQueue(intList);
110 assertEquals(q.size(), intList.size());
111 assertEquals(q.toString(), intList.toString());
112 assertTrue(Arrays.equals(q.toArray(),
113 intList.toArray()));
114 assertTrue(Arrays.equals(q.toArray(new Object[0]),
115 intList.toArray(new Object[0])));
116 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
117 intList.toArray(new Object[SIZE])));
118 for (int i = 0; i < SIZE; ++i) {
119 assertEquals(ints[i], q.poll());
120 }
121 }
122
123 /**
124 * remainingCapacity() always returns Integer.MAX_VALUE
125 */
126 public void testRemainingCapacity() {
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100127 BlockingQueue q = populatedQueue(SIZE);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100128 for (int i = 0; i < SIZE; ++i) {
129 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
130 assertEquals(SIZE - i, q.size());
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100131 assertEquals(i, q.remove());
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100132 }
133 for (int i = 0; i < SIZE; ++i) {
134 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
135 assertEquals(i, q.size());
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100136 assertTrue(q.add(i));
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100137 }
138 }
139
140 /**
141 * addAll(this) throws IllegalArgumentException
142 */
143 public void testAddAllSelf() {
144 try {
145 LinkedTransferQueue q = populatedQueue(SIZE);
146 q.addAll(q);
147 shouldThrow();
148 } catch (IllegalArgumentException success) {}
149 }
150
151 /**
152 * addAll of a collection with any null elements throws
153 * NullPointerException after possibly adding some elements
154 */
155 public void testAddAll3() {
156 try {
157 LinkedTransferQueue q = new LinkedTransferQueue();
158 Integer[] ints = new Integer[SIZE];
159 for (int i = 0; i < SIZE - 1; ++i) {
160 ints[i] = i;
161 }
162 q.addAll(Arrays.asList(ints));
163 shouldThrow();
164 } catch (NullPointerException success) {}
165 }
166
167 /**
168 * Queue contains all elements, in traversal order, of successful addAll
169 */
170 public void testAddAll5() {
171 Integer[] empty = new Integer[0];
172 Integer[] ints = new Integer[SIZE];
173 for (int i = 0; i < SIZE; ++i) {
174 ints[i] = i;
175 }
176 LinkedTransferQueue q = new LinkedTransferQueue();
177 assertFalse(q.addAll(Arrays.asList(empty)));
178 assertTrue(q.addAll(Arrays.asList(ints)));
179 for (int i = 0; i < SIZE; ++i) {
180 assertEquals(ints[i], q.poll());
181 }
182 }
183
184 /**
185 * all elements successfully put are contained
186 */
187 public void testPut() {
188 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
189 for (int i = 0; i < SIZE; ++i) {
190 assertEquals(i, q.size());
191 q.put(i);
192 assertTrue(q.contains(i));
193 }
194 }
195
196 /**
197 * take retrieves elements in FIFO order
198 */
199 public void testTake() throws InterruptedException {
200 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
201 for (int i = 0; i < SIZE; ++i) {
202 assertEquals(i, (int) q.take());
203 }
204 }
205
206 /**
207 * take removes existing elements until empty, then blocks interruptibly
208 */
209 public void testBlockingTake() throws InterruptedException {
210 final BlockingQueue q = populatedQueue(SIZE);
211 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
212 Thread t = newStartedThread(new CheckedRunnable() {
213 public void realRun() throws InterruptedException {
214 for (int i = 0; i < SIZE; ++i) {
215 assertEquals(i, q.take());
216 }
217
218 Thread.currentThread().interrupt();
219 try {
220 q.take();
221 shouldThrow();
222 } catch (InterruptedException success) {}
223 assertFalse(Thread.interrupted());
224
225 pleaseInterrupt.countDown();
226 try {
227 q.take();
228 shouldThrow();
229 } catch (InterruptedException success) {}
230 assertFalse(Thread.interrupted());
231 }});
232
233 await(pleaseInterrupt);
234 assertThreadStaysAlive(t);
235 t.interrupt();
236 awaitTermination(t);
237 }
238
239 /**
240 * poll succeeds unless empty
241 */
242 public void testPoll() throws InterruptedException {
243 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
244 for (int i = 0; i < SIZE; ++i) {
245 assertEquals(i, (int) q.poll());
246 }
247 assertNull(q.poll());
248 checkEmpty(q);
249 }
250
251 /**
252 * timed poll with zero timeout succeeds when non-empty, else times out
253 */
254 public void testTimedPoll0() throws InterruptedException {
255 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
256 for (int i = 0; i < SIZE; ++i) {
257 assertEquals(i, (int) q.poll(0, MILLISECONDS));
258 }
259 assertNull(q.poll(0, MILLISECONDS));
260 checkEmpty(q);
261 }
262
263 /**
264 * timed poll with nonzero timeout succeeds when non-empty, else times out
265 */
266 public void testTimedPoll() throws InterruptedException {
267 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
268 for (int i = 0; i < SIZE; ++i) {
269 long startTime = System.nanoTime();
270 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
271 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
272 }
273 long startTime = System.nanoTime();
274 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
275 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
276 checkEmpty(q);
277 }
278
279 /**
280 * Interrupted timed poll throws InterruptedException instead of
281 * returning timeout status
282 */
283 public void testInterruptedTimedPoll() throws InterruptedException {
284 final BlockingQueue<Integer> q = populatedQueue(SIZE);
285 final CountDownLatch aboutToWait = new CountDownLatch(1);
286 Thread t = newStartedThread(new CheckedRunnable() {
287 public void realRun() throws InterruptedException {
288 for (int i = 0; i < SIZE; ++i) {
289 long t0 = System.nanoTime();
290 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
291 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
292 }
293 long t0 = System.nanoTime();
294 aboutToWait.countDown();
295 try {
296 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
297 shouldThrow();
298 } catch (InterruptedException success) {
299 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
300 }
301 }});
302
303 aboutToWait.await();
304 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
305 t.interrupt();
306 awaitTermination(t, MEDIUM_DELAY_MS);
307 checkEmpty(q);
308 }
309
310 /**
311 * timed poll after thread interrupted throws InterruptedException
312 * instead of returning timeout status
313 */
314 public void testTimedPollAfterInterrupt() throws InterruptedException {
315 final BlockingQueue<Integer> q = populatedQueue(SIZE);
316 Thread t = newStartedThread(new CheckedRunnable() {
317 public void realRun() throws InterruptedException {
318 Thread.currentThread().interrupt();
319 for (int i = 0; i < SIZE; ++i) {
320 long t0 = System.nanoTime();
321 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
322 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
323 }
324 try {
325 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
326 shouldThrow();
327 } catch (InterruptedException success) {}
328 }});
329
330 awaitTermination(t, MEDIUM_DELAY_MS);
331 checkEmpty(q);
332 }
333
334 /**
335 * peek returns next element, or null if empty
336 */
337 public void testPeek() throws InterruptedException {
338 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
339 for (int i = 0; i < SIZE; ++i) {
340 assertEquals(i, (int) q.peek());
341 assertEquals(i, (int) q.poll());
342 assertTrue(q.peek() == null ||
343 i != (int) q.peek());
344 }
345 assertNull(q.peek());
346 checkEmpty(q);
347 }
348
349 /**
350 * element returns next element, or throws NoSuchElementException if empty
351 */
352 public void testElement() throws InterruptedException {
353 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
354 for (int i = 0; i < SIZE; ++i) {
355 assertEquals(i, (int) q.element());
356 assertEquals(i, (int) q.poll());
357 }
358 try {
359 q.element();
360 shouldThrow();
361 } catch (NoSuchElementException success) {}
362 checkEmpty(q);
363 }
364
365 /**
366 * remove removes next element, or throws NoSuchElementException if empty
367 */
368 public void testRemove() throws InterruptedException {
369 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
370 for (int i = 0; i < SIZE; ++i) {
371 assertEquals(i, (int) q.remove());
372 }
373 try {
374 q.remove();
375 shouldThrow();
376 } catch (NoSuchElementException success) {}
377 checkEmpty(q);
378 }
379
380 /**
381 * An add following remove(x) succeeds
382 */
383 public void testRemoveElementAndAdd() throws InterruptedException {
384 LinkedTransferQueue q = new LinkedTransferQueue();
385 assertTrue(q.add(one));
386 assertTrue(q.add(two));
387 assertTrue(q.remove(one));
388 assertTrue(q.remove(two));
389 assertTrue(q.add(three));
390 assertSame(q.take(), three);
391 }
392
393 /**
394 * contains(x) reports true when elements added but not yet removed
395 */
396 public void testContains() {
397 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
398 for (int i = 0; i < SIZE; ++i) {
399 assertTrue(q.contains(i));
400 assertEquals(i, (int) q.poll());
401 assertFalse(q.contains(i));
402 }
403 }
404
405 /**
406 * clear removes all elements
407 */
408 public void testClear() throws InterruptedException {
409 LinkedTransferQueue q = populatedQueue(SIZE);
410 q.clear();
411 checkEmpty(q);
412 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
413 q.add(one);
414 assertFalse(q.isEmpty());
415 assertEquals(1, q.size());
416 assertTrue(q.contains(one));
417 q.clear();
418 checkEmpty(q);
419 }
420
421 /**
422 * containsAll(c) is true when c contains a subset of elements
423 */
424 public void testContainsAll() {
425 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
426 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
427 for (int i = 0; i < SIZE; ++i) {
428 assertTrue(q.containsAll(p));
429 assertFalse(p.containsAll(q));
430 p.add(i);
431 }
432 assertTrue(p.containsAll(q));
433 }
434
435 /**
436 * retainAll(c) retains only those elements of c and reports true
437 * if changed
438 */
439 public void testRetainAll() {
440 LinkedTransferQueue q = populatedQueue(SIZE);
441 LinkedTransferQueue p = populatedQueue(SIZE);
442 for (int i = 0; i < SIZE; ++i) {
443 boolean changed = q.retainAll(p);
444 if (i == 0) {
445 assertFalse(changed);
446 } else {
447 assertTrue(changed);
448 }
449 assertTrue(q.containsAll(p));
450 assertEquals(SIZE - i, q.size());
451 p.remove();
452 }
453 }
454
455 /**
456 * removeAll(c) removes only those elements of c and reports true
457 * if changed
458 */
459 public void testRemoveAll() {
460 for (int i = 1; i < SIZE; ++i) {
461 LinkedTransferQueue q = populatedQueue(SIZE);
462 LinkedTransferQueue p = populatedQueue(i);
463 assertTrue(q.removeAll(p));
464 assertEquals(SIZE - i, q.size());
465 for (int j = 0; j < i; ++j) {
466 assertFalse(q.contains(p.remove()));
467 }
468 }
469 }
470
471 /**
472 * toArray() contains all elements in FIFO order
473 */
474 public void testToArray() {
475 LinkedTransferQueue q = populatedQueue(SIZE);
476 Object[] o = q.toArray();
477 for (int i = 0; i < o.length; i++) {
478 assertSame(o[i], q.poll());
479 }
480 }
481
482 /**
483 * toArray(a) contains all elements in FIFO order
484 */
485 public void testToArray2() {
486 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
487 Integer[] ints = new Integer[SIZE];
488 Integer[] array = q.toArray(ints);
489 assertSame(ints, array);
490 for (int i = 0; i < ints.length; i++) {
491 assertSame(ints[i], q.poll());
492 }
493 }
494
495 /**
496 * toArray(incompatible array type) throws ArrayStoreException
497 */
498 public void testToArray1_BadArg() {
499 LinkedTransferQueue q = populatedQueue(SIZE);
500 try {
501 q.toArray(new String[10]);
502 shouldThrow();
503 } catch (ArrayStoreException success) {}
504 }
505
506 /**
507 * iterator iterates through all elements
508 */
509 public void testIterator() throws InterruptedException {
510 LinkedTransferQueue q = populatedQueue(SIZE);
511 Iterator it = q.iterator();
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100512 int i;
513 for (i = 0; it.hasNext(); i++)
514 assertTrue(q.contains(it.next()));
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100515 assertEquals(i, SIZE);
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100516 assertIteratorExhausted(it);
517
518 it = q.iterator();
519 for (i = 0; it.hasNext(); i++)
520 assertEquals(it.next(), q.take());
521 assertEquals(i, SIZE);
522 assertIteratorExhausted(it);
523 }
524
525 /**
526 * iterator of empty collection has no elements
527 */
528 public void testEmptyIterator() {
529 assertIteratorExhausted(new LinkedTransferQueue().iterator());
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100530 }
531
532 /**
533 * iterator.remove() removes current element
534 */
535 public void testIteratorRemove() {
536 final LinkedTransferQueue q = new LinkedTransferQueue();
537 q.add(two);
538 q.add(one);
539 q.add(three);
540
541 Iterator it = q.iterator();
542 it.next();
543 it.remove();
544
545 it = q.iterator();
546 assertSame(it.next(), one);
547 assertSame(it.next(), three);
548 assertFalse(it.hasNext());
549 }
550
551 /**
552 * iterator ordering is FIFO
553 */
554 public void testIteratorOrdering() {
555 final LinkedTransferQueue<Integer> q
556 = new LinkedTransferQueue<Integer>();
557 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
558 q.add(one);
559 q.add(two);
560 q.add(three);
561 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
562 int k = 0;
563 for (Integer n : q) {
564 assertEquals(++k, (int) n);
565 }
566 assertEquals(3, k);
567 }
568
569 /**
570 * Modifications do not cause iterators to fail
571 */
572 public void testWeaklyConsistentIteration() {
573 final LinkedTransferQueue q = new LinkedTransferQueue();
574 q.add(one);
575 q.add(two);
576 q.add(three);
577 for (Iterator it = q.iterator(); it.hasNext();) {
578 q.remove();
579 it.next();
580 }
581 assertEquals(0, q.size());
582 }
583
584 /**
585 * toString contains toStrings of elements
586 */
587 public void testToString() {
588 LinkedTransferQueue q = populatedQueue(SIZE);
589 String s = q.toString();
590 for (int i = 0; i < SIZE; ++i) {
591 assertTrue(s.contains(String.valueOf(i)));
592 }
593 }
594
595 /**
596 * offer transfers elements across Executor tasks
597 */
598 public void testOfferInExecutor() {
599 final LinkedTransferQueue q = new LinkedTransferQueue();
600 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
601 ExecutorService executor = Executors.newFixedThreadPool(2);
602
603 executor.execute(new CheckedRunnable() {
604 public void realRun() throws InterruptedException {
605 threadsStarted.await();
606 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
607 }});
608
609 executor.execute(new CheckedRunnable() {
610 public void realRun() throws InterruptedException {
611 threadsStarted.await();
612 assertSame(one, q.take());
613 checkEmpty(q);
614 }});
615
616 joinPool(executor);
617 }
618
619 /**
620 * timed poll retrieves elements across Executor threads
621 */
622 public void testPollInExecutor() {
623 final LinkedTransferQueue q = new LinkedTransferQueue();
624 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
625 ExecutorService executor = Executors.newFixedThreadPool(2);
626
627 executor.execute(new CheckedRunnable() {
628 public void realRun() throws InterruptedException {
629 assertNull(q.poll());
630 threadsStarted.await();
631 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
632 checkEmpty(q);
633 }});
634
635 executor.execute(new CheckedRunnable() {
636 public void realRun() throws InterruptedException {
637 threadsStarted.await();
638 q.put(one);
639 }});
640
641 joinPool(executor);
642 }
643
644 /**
645 * A deserialized serialized queue has same elements in same order
646 */
647 public void testSerialization() throws Exception {
648 Queue x = populatedQueue(SIZE);
649 Queue y = serialClone(x);
650
651 assertNotSame(y, x);
652 assertEquals(x.size(), y.size());
653 assertEquals(x.toString(), y.toString());
654 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
655 while (!x.isEmpty()) {
656 assertFalse(y.isEmpty());
657 assertEquals(x.remove(), y.remove());
658 }
659 assertTrue(y.isEmpty());
660 }
661
662 /**
663 * drainTo(c) empties queue into another collection c
664 */
665 public void testDrainTo() {
666 LinkedTransferQueue q = populatedQueue(SIZE);
667 ArrayList l = new ArrayList();
668 q.drainTo(l);
669 assertEquals(0, q.size());
670 assertEquals(SIZE, l.size());
671 for (int i = 0; i < SIZE; ++i) {
672 assertEquals(i, l.get(i));
673 }
674 q.add(zero);
675 q.add(one);
676 assertFalse(q.isEmpty());
677 assertTrue(q.contains(zero));
678 assertTrue(q.contains(one));
679 l.clear();
680 q.drainTo(l);
681 assertEquals(0, q.size());
682 assertEquals(2, l.size());
683 for (int i = 0; i < 2; ++i) {
684 assertEquals(i, l.get(i));
685 }
686 }
687
688 /**
689 * drainTo(c) empties full queue, unblocking a waiting put.
690 */
691 public void testDrainToWithActivePut() throws InterruptedException {
692 final LinkedTransferQueue q = populatedQueue(SIZE);
693 Thread t = newStartedThread(new CheckedRunnable() {
694 public void realRun() {
695 q.put(SIZE + 1);
696 }});
697 ArrayList l = new ArrayList();
698 q.drainTo(l);
699 assertTrue(l.size() >= SIZE);
700 for (int i = 0; i < SIZE; ++i)
701 assertEquals(i, l.get(i));
702 awaitTermination(t, MEDIUM_DELAY_MS);
703 assertTrue(q.size() + l.size() >= SIZE);
704 }
705
706 /**
707 * drainTo(c, n) empties first min(n, size) elements of queue into c
708 */
709 public void testDrainToN() {
710 LinkedTransferQueue q = new LinkedTransferQueue();
711 for (int i = 0; i < SIZE + 2; ++i) {
712 for (int j = 0; j < SIZE; j++) {
713 assertTrue(q.offer(j));
714 }
715 ArrayList l = new ArrayList();
716 q.drainTo(l, i);
717 int k = (i < SIZE) ? i : SIZE;
718 assertEquals(k, l.size());
719 assertEquals(SIZE - k, q.size());
720 for (int j = 0; j < k; ++j)
721 assertEquals(j, l.get(j));
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100722 do {} while (q.poll() != null);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100723 }
724 }
725
726 /**
727 * timed poll() or take() increments the waiting consumer count;
728 * offer(e) decrements the waiting consumer count
729 */
730 public void testWaitingConsumer() throws InterruptedException {
731 final LinkedTransferQueue q = new LinkedTransferQueue();
732 assertEquals(0, q.getWaitingConsumerCount());
733 assertFalse(q.hasWaitingConsumer());
734 final CountDownLatch threadStarted = new CountDownLatch(1);
735
736 Thread t = newStartedThread(new CheckedRunnable() {
737 public void realRun() throws InterruptedException {
738 threadStarted.countDown();
739 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
740 assertEquals(0, q.getWaitingConsumerCount());
741 assertFalse(q.hasWaitingConsumer());
742 }});
743
744 threadStarted.await();
Hiroshi Yamauchi12b0b5b2016-01-05 17:53:01 -0800745 // waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
746 waitForThreadToEnterWaitStateNoTimeout(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100747 assertEquals(1, q.getWaitingConsumerCount());
748 assertTrue(q.hasWaitingConsumer());
749
750 assertTrue(q.offer(one));
751 assertEquals(0, q.getWaitingConsumerCount());
752 assertFalse(q.hasWaitingConsumer());
753
754 awaitTermination(t, MEDIUM_DELAY_MS);
755 }
756
757 /**
758 * transfer(null) throws NullPointerException
759 */
760 public void testTransfer1() throws InterruptedException {
761 try {
762 LinkedTransferQueue q = new LinkedTransferQueue();
763 q.transfer(null);
764 shouldThrow();
765 } catch (NullPointerException success) {}
766 }
767
768 /**
769 * transfer waits until a poll occurs. The transfered element
770 * is returned by this associated poll.
771 */
772 public void testTransfer2() throws InterruptedException {
773 final LinkedTransferQueue<Integer> q
774 = new LinkedTransferQueue<Integer>();
775 final CountDownLatch threadStarted = new CountDownLatch(1);
776
777 Thread t = newStartedThread(new CheckedRunnable() {
778 public void realRun() throws InterruptedException {
779 threadStarted.countDown();
780 q.transfer(five);
781 checkEmpty(q);
782 }});
783
784 threadStarted.await();
Hiroshi Yamauchi12b0b5b2016-01-05 17:53:01 -0800785 // waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
786 waitForThreadToEnterWaitStateNoTimeout(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100787 assertEquals(1, q.size());
788 assertSame(five, q.poll());
789 checkEmpty(q);
790 awaitTermination(t, MEDIUM_DELAY_MS);
791 }
792
793 /**
794 * transfer waits until a poll occurs, and then transfers in fifo order
795 */
796 public void testTransfer3() throws InterruptedException {
797 final LinkedTransferQueue<Integer> q
798 = new LinkedTransferQueue<Integer>();
799
800 Thread first = newStartedThread(new CheckedRunnable() {
801 public void realRun() throws InterruptedException {
802 q.transfer(four);
803 assertTrue(!q.contains(four));
804 assertEquals(1, q.size());
805 }});
806
807 Thread interruptedThread = newStartedThread(
808 new CheckedInterruptedRunnable() {
809 public void realRun() throws InterruptedException {
810 while (q.isEmpty())
811 Thread.yield();
812 q.transfer(five);
813 }});
814
815 while (q.size() < 2)
816 Thread.yield();
817 assertEquals(2, q.size());
818 assertSame(four, q.poll());
819 first.join();
820 assertEquals(1, q.size());
821 interruptedThread.interrupt();
822 interruptedThread.join();
823 checkEmpty(q);
824 }
825
826 /**
827 * transfer waits until a poll occurs, at which point the polling
828 * thread returns the element
829 */
830 public void testTransfer4() throws InterruptedException {
831 final LinkedTransferQueue q = new LinkedTransferQueue();
832
833 Thread t = newStartedThread(new CheckedRunnable() {
834 public void realRun() throws InterruptedException {
835 q.transfer(four);
836 assertFalse(q.contains(four));
837 assertSame(three, q.poll());
838 }});
839
840 while (q.isEmpty())
841 Thread.yield();
842 assertFalse(q.isEmpty());
843 assertEquals(1, q.size());
844 assertTrue(q.offer(three));
845 assertSame(four, q.poll());
846 awaitTermination(t, MEDIUM_DELAY_MS);
847 }
848
849 /**
850 * transfer waits until a take occurs. The transfered element
851 * is returned by this associated take.
852 */
853 public void testTransfer5() throws InterruptedException {
854 final LinkedTransferQueue<Integer> q
855 = new LinkedTransferQueue<Integer>();
856
857 Thread t = newStartedThread(new CheckedRunnable() {
858 public void realRun() throws InterruptedException {
859 q.transfer(four);
860 checkEmpty(q);
861 }});
862
863 while (q.isEmpty())
864 Thread.yield();
865 assertFalse(q.isEmpty());
866 assertEquals(1, q.size());
867 assertSame(four, q.take());
868 checkEmpty(q);
869 awaitTermination(t, MEDIUM_DELAY_MS);
870 }
871
872 /**
873 * tryTransfer(null) throws NullPointerException
874 */
875 public void testTryTransfer1() {
876 try {
877 final LinkedTransferQueue q = new LinkedTransferQueue();
878 q.tryTransfer(null);
879 shouldThrow();
880 } catch (NullPointerException success) {}
881 }
882
883 /**
884 * tryTransfer returns false and does not enqueue if there are no
885 * consumers waiting to poll or take.
886 */
887 public void testTryTransfer2() throws InterruptedException {
888 final LinkedTransferQueue q = new LinkedTransferQueue();
889 assertFalse(q.tryTransfer(new Object()));
890 assertFalse(q.hasWaitingConsumer());
891 checkEmpty(q);
892 }
893
894 /**
895 * If there is a consumer waiting in timed poll, tryTransfer
896 * returns true while successfully transfering object.
897 */
898 public void testTryTransfer3() throws InterruptedException {
899 final Object hotPotato = new Object();
900 final LinkedTransferQueue q = new LinkedTransferQueue();
901
902 Thread t = newStartedThread(new CheckedRunnable() {
903 public void realRun() {
904 while (! q.hasWaitingConsumer())
905 Thread.yield();
906 assertTrue(q.hasWaitingConsumer());
907 checkEmpty(q);
908 assertTrue(q.tryTransfer(hotPotato));
909 }});
910
911 assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
912 checkEmpty(q);
913 awaitTermination(t, MEDIUM_DELAY_MS);
914 }
915
916 /**
917 * If there is a consumer waiting in take, tryTransfer returns
918 * true while successfully transfering object.
919 */
920 public void testTryTransfer4() throws InterruptedException {
921 final Object hotPotato = new Object();
922 final LinkedTransferQueue q = new LinkedTransferQueue();
923
924 Thread t = newStartedThread(new CheckedRunnable() {
925 public void realRun() {
926 while (! q.hasWaitingConsumer())
927 Thread.yield();
928 assertTrue(q.hasWaitingConsumer());
929 checkEmpty(q);
930 assertTrue(q.tryTransfer(hotPotato));
931 }});
932
933 assertSame(q.take(), hotPotato);
934 checkEmpty(q);
935 awaitTermination(t, MEDIUM_DELAY_MS);
936 }
937
938 /**
939 * tryTransfer blocks interruptibly if no takers
940 */
941 public void testTryTransfer5() throws InterruptedException {
942 final LinkedTransferQueue q = new LinkedTransferQueue();
943 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
944 assertTrue(q.isEmpty());
945
946 Thread t = newStartedThread(new CheckedRunnable() {
947 public void realRun() throws InterruptedException {
948 Thread.currentThread().interrupt();
949 try {
950 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
951 shouldThrow();
952 } catch (InterruptedException success) {}
953 assertFalse(Thread.interrupted());
954
955 pleaseInterrupt.countDown();
956 try {
957 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
958 shouldThrow();
959 } catch (InterruptedException success) {}
960 assertFalse(Thread.interrupted());
961 }});
962
963 await(pleaseInterrupt);
964 assertThreadStaysAlive(t);
965 t.interrupt();
966 awaitTermination(t);
967 checkEmpty(q);
968 }
969
970 /**
971 * tryTransfer gives up after the timeout and returns false
972 */
973 public void testTryTransfer6() throws InterruptedException {
974 final LinkedTransferQueue q = new LinkedTransferQueue();
975
976 Thread t = newStartedThread(new CheckedRunnable() {
977 public void realRun() throws InterruptedException {
978 long t0 = System.nanoTime();
979 assertFalse(q.tryTransfer(new Object(),
980 timeoutMillis(), MILLISECONDS));
981 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
982 checkEmpty(q);
983 }});
984
985 awaitTermination(t);
986 checkEmpty(q);
987 }
988
989 /**
990 * tryTransfer waits for any elements previously in to be removed
991 * before transfering to a poll or take
992 */
993 public void testTryTransfer7() throws InterruptedException {
994 final LinkedTransferQueue q = new LinkedTransferQueue();
995 assertTrue(q.offer(four));
996
997 Thread t = newStartedThread(new CheckedRunnable() {
998 public void realRun() throws InterruptedException {
999 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS));
1000 checkEmpty(q);
1001 }});
1002
1003 while (q.size() != 2)
1004 Thread.yield();
1005 assertEquals(2, q.size());
1006 assertSame(four, q.poll());
1007 assertSame(five, q.poll());
1008 checkEmpty(q);
1009 awaitTermination(t, MEDIUM_DELAY_MS);
1010 }
1011
1012 /**
1013 * tryTransfer attempts to enqueue into the queue and fails
1014 * returning false not enqueueing and the successive poll is null
1015 */
1016 public void testTryTransfer8() throws InterruptedException {
1017 final LinkedTransferQueue q = new LinkedTransferQueue();
1018 assertTrue(q.offer(four));
1019 assertEquals(1, q.size());
1020 long t0 = System.nanoTime();
1021 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1022 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
1023 assertEquals(1, q.size());
1024 assertSame(four, q.poll());
1025 assertNull(q.poll());
1026 checkEmpty(q);
1027 }
1028
1029 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1030 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1031 checkEmpty(q);
1032 for (int i = 0; i < n; i++) {
1033 assertEquals(i, q.size());
1034 assertTrue(q.offer(i));
1035 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1036 }
1037 assertFalse(q.isEmpty());
1038 return q;
1039 }
Narayan Kamath8e9a0e92015-04-28 11:40:00 +01001040
1041 /**
1042 * remove(null), contains(null) always return false
1043 */
1044 public void testNeverContainsNull() {
1045 Collection<?>[] qs = {
1046 new LinkedTransferQueue<Object>(),
1047 populatedQueue(2),
1048 };
1049
1050 for (Collection<?> q : qs) {
1051 assertFalse(q.contains(null));
1052 assertFalse(q.remove(null));
1053 }
1054 }
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001055}