blob: efe5a58280a347d5810409c19f4888f30fed2a97 [file] [log] [blame]
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8package jsr166;
9
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010010import static java.util.concurrent.TimeUnit.MILLISECONDS;
11
Calin Juravle8f0d92b2013-08-01 17:26:00 +010012import java.util.ArrayList;
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010013import java.util.Arrays;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010014import java.util.Collection;
15import java.util.Iterator;
16import java.util.List;
17import java.util.NoSuchElementException;
18import java.util.Queue;
19import java.util.concurrent.BlockingQueue;
Tobias Thierercff661d2017-02-19 15:01:51 +000020import java.util.concurrent.Callable;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010021import java.util.concurrent.CountDownLatch;
22import java.util.concurrent.Executors;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.LinkedTransferQueue;
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010025
26import junit.framework.Test;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010027
28@SuppressWarnings({"unchecked", "rawtypes"})
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +000029public class LinkedTransferQueueTest extends JSR166TestCase {
30 static class Implementation implements CollectionImplementation {
31 public Class<?> klazz() { return LinkedTransferQueue.class; }
32 public Collection emptyCollection() { return new LinkedTransferQueue(); }
33 public Object makeElement(int i) { return i; }
34 public boolean isConcurrent() { return true; }
35 public boolean permitsNulls() { return false; }
36 }
Calin Juravle8f0d92b2013-08-01 17:26:00 +010037
Przemyslaw Szczepaniak7fe9db62016-03-15 16:35:29 +000038 // android-note: These tests have been moved into their own separate
39 // classes to work around CTS issues:
40 // LinkedTransferQueueBlockingQueueTest.java
41 // LinkedTransferQueueCollectionTest.java
42 //
43 // public static class Generic extends BlockingQueueTest {
44 // protected BlockingQueue emptyCollection() {
45 // return new LinkedTransferQueue();
46 // }
47 // }
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +000048
49 // android-note: Removed because the CTS runner does a bad job of
50 // retrying tests that have suite() declarations.
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010051 //
52 // public static void main(String[] args) {
53 // main(suite(), args);
54 // }
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010055 // public static Test suite() {
56 // return newTestSuite(LinkedTransferQueueTest.class,
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +000057 // new Generic().testSuite(),
58 // CollectionTest.testSuite(new Implementation()));
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010059 // }
60
Calin Juravle8f0d92b2013-08-01 17:26:00 +010061 /**
62 * Constructor builds new queue with size being zero and empty
63 * being true
64 */
65 public void testConstructor1() {
66 assertEquals(0, new LinkedTransferQueue().size());
67 assertTrue(new LinkedTransferQueue().isEmpty());
68 }
69
70 /**
71 * Initializing constructor with null collection throws
72 * NullPointerException
73 */
74 public void testConstructor2() {
75 try {
76 new LinkedTransferQueue(null);
77 shouldThrow();
78 } catch (NullPointerException success) {}
79 }
80
81 /**
82 * Initializing from Collection of null elements throws
83 * NullPointerException
84 */
85 public void testConstructor3() {
86 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
87 try {
88 new LinkedTransferQueue(elements);
89 shouldThrow();
90 } catch (NullPointerException success) {}
91 }
92
93 /**
94 * Initializing constructor with a collection containing some null elements
95 * throws NullPointerException
96 */
97 public void testConstructor4() {
98 Integer[] ints = new Integer[SIZE];
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +000099 for (int i = 0; i < SIZE - 1; ++i)
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100100 ints[i] = i;
101 Collection<Integer> elements = Arrays.asList(ints);
102 try {
103 new LinkedTransferQueue(elements);
104 shouldThrow();
105 } catch (NullPointerException success) {}
106 }
107
108 /**
109 * Queue contains all elements of the collection it is initialized by
110 */
111 public void testConstructor5() {
112 Integer[] ints = new Integer[SIZE];
113 for (int i = 0; i < SIZE; ++i) {
114 ints[i] = i;
115 }
116 List intList = Arrays.asList(ints);
117 LinkedTransferQueue q
118 = new LinkedTransferQueue(intList);
119 assertEquals(q.size(), intList.size());
120 assertEquals(q.toString(), intList.toString());
121 assertTrue(Arrays.equals(q.toArray(),
122 intList.toArray()));
123 assertTrue(Arrays.equals(q.toArray(new Object[0]),
124 intList.toArray(new Object[0])));
125 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
126 intList.toArray(new Object[SIZE])));
127 for (int i = 0; i < SIZE; ++i) {
128 assertEquals(ints[i], q.poll());
129 }
130 }
131
132 /**
133 * remainingCapacity() always returns Integer.MAX_VALUE
134 */
135 public void testRemainingCapacity() {
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100136 BlockingQueue q = populatedQueue(SIZE);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100137 for (int i = 0; i < SIZE; ++i) {
138 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
139 assertEquals(SIZE - i, q.size());
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100140 assertEquals(i, q.remove());
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100141 }
142 for (int i = 0; i < SIZE; ++i) {
143 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
144 assertEquals(i, q.size());
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100145 assertTrue(q.add(i));
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100146 }
147 }
148
149 /**
150 * addAll(this) throws IllegalArgumentException
151 */
152 public void testAddAllSelf() {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000153 LinkedTransferQueue q = populatedQueue(SIZE);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100154 try {
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100155 q.addAll(q);
156 shouldThrow();
157 } catch (IllegalArgumentException success) {}
158 }
159
160 /**
161 * addAll of a collection with any null elements throws
162 * NullPointerException after possibly adding some elements
163 */
164 public void testAddAll3() {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000165 LinkedTransferQueue q = new LinkedTransferQueue();
166 Integer[] ints = new Integer[SIZE];
167 for (int i = 0; i < SIZE - 1; ++i)
168 ints[i] = i;
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100169 try {
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100170 q.addAll(Arrays.asList(ints));
171 shouldThrow();
172 } catch (NullPointerException success) {}
173 }
174
175 /**
176 * Queue contains all elements, in traversal order, of successful addAll
177 */
178 public void testAddAll5() {
179 Integer[] empty = new Integer[0];
180 Integer[] ints = new Integer[SIZE];
181 for (int i = 0; i < SIZE; ++i) {
182 ints[i] = i;
183 }
184 LinkedTransferQueue q = new LinkedTransferQueue();
185 assertFalse(q.addAll(Arrays.asList(empty)));
186 assertTrue(q.addAll(Arrays.asList(ints)));
187 for (int i = 0; i < SIZE; ++i) {
188 assertEquals(ints[i], q.poll());
189 }
190 }
191
192 /**
193 * all elements successfully put are contained
194 */
195 public void testPut() {
196 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
197 for (int i = 0; i < SIZE; ++i) {
198 assertEquals(i, q.size());
199 q.put(i);
200 assertTrue(q.contains(i));
201 }
202 }
203
204 /**
205 * take retrieves elements in FIFO order
206 */
207 public void testTake() throws InterruptedException {
208 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
209 for (int i = 0; i < SIZE; ++i) {
210 assertEquals(i, (int) q.take());
211 }
212 }
213
214 /**
215 * take removes existing elements until empty, then blocks interruptibly
216 */
217 public void testBlockingTake() throws InterruptedException {
218 final BlockingQueue q = populatedQueue(SIZE);
219 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
220 Thread t = newStartedThread(new CheckedRunnable() {
221 public void realRun() throws InterruptedException {
222 for (int i = 0; i < SIZE; ++i) {
223 assertEquals(i, q.take());
224 }
225
226 Thread.currentThread().interrupt();
227 try {
228 q.take();
229 shouldThrow();
230 } catch (InterruptedException success) {}
231 assertFalse(Thread.interrupted());
232
233 pleaseInterrupt.countDown();
234 try {
235 q.take();
236 shouldThrow();
237 } catch (InterruptedException success) {}
238 assertFalse(Thread.interrupted());
239 }});
240
241 await(pleaseInterrupt);
242 assertThreadStaysAlive(t);
243 t.interrupt();
244 awaitTermination(t);
245 }
246
247 /**
248 * poll succeeds unless empty
249 */
250 public void testPoll() throws InterruptedException {
251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252 for (int i = 0; i < SIZE; ++i) {
253 assertEquals(i, (int) q.poll());
254 }
255 assertNull(q.poll());
256 checkEmpty(q);
257 }
258
259 /**
260 * timed poll with zero timeout succeeds when non-empty, else times out
261 */
262 public void testTimedPoll0() throws InterruptedException {
263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264 for (int i = 0; i < SIZE; ++i) {
265 assertEquals(i, (int) q.poll(0, MILLISECONDS));
266 }
267 assertNull(q.poll(0, MILLISECONDS));
268 checkEmpty(q);
269 }
270
271 /**
272 * timed poll with nonzero timeout succeeds when non-empty, else times out
273 */
274 public void testTimedPoll() throws InterruptedException {
275 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
Przemyslaw Szczepaniaked4f3652016-03-15 09:42:19 +0000276 long startTime = System.nanoTime();
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000277 for (int i = 0; i < SIZE; ++i)
278 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
279 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
280
281 startTime = System.nanoTime();
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100282 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
283 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
284 checkEmpty(q);
285 }
286
287 /**
288 * Interrupted timed poll throws InterruptedException instead of
289 * returning timeout status
290 */
291 public void testInterruptedTimedPoll() throws InterruptedException {
292 final BlockingQueue<Integer> q = populatedQueue(SIZE);
293 final CountDownLatch aboutToWait = new CountDownLatch(1);
294 Thread t = newStartedThread(new CheckedRunnable() {
295 public void realRun() throws InterruptedException {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000296 long startTime = System.nanoTime();
297 for (int i = 0; i < SIZE; ++i)
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100298 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100299 aboutToWait.countDown();
300 try {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000301 q.poll(LONG_DELAY_MS, MILLISECONDS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100302 shouldThrow();
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000303 } catch (InterruptedException success) {}
304 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100305 }});
306
307 aboutToWait.await();
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000308 waitForThreadToEnterWaitState(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100309 t.interrupt();
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000310 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100311 checkEmpty(q);
312 }
313
314 /**
315 * timed poll after thread interrupted throws InterruptedException
316 * instead of returning timeout status
317 */
318 public void testTimedPollAfterInterrupt() throws InterruptedException {
319 final BlockingQueue<Integer> q = populatedQueue(SIZE);
320 Thread t = newStartedThread(new CheckedRunnable() {
321 public void realRun() throws InterruptedException {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000322 long startTime = System.nanoTime();
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100323 Thread.currentThread().interrupt();
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000324 for (int i = 0; i < SIZE; ++i)
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100325 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100326 try {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000327 q.poll(LONG_DELAY_MS, MILLISECONDS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100328 shouldThrow();
329 } catch (InterruptedException success) {}
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000330 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100331 }});
332
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000333 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100334 checkEmpty(q);
335 }
336
337 /**
338 * peek returns next element, or null if empty
339 */
340 public void testPeek() throws InterruptedException {
341 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
342 for (int i = 0; i < SIZE; ++i) {
343 assertEquals(i, (int) q.peek());
344 assertEquals(i, (int) q.poll());
345 assertTrue(q.peek() == null ||
346 i != (int) q.peek());
347 }
348 assertNull(q.peek());
349 checkEmpty(q);
350 }
351
352 /**
353 * element returns next element, or throws NoSuchElementException if empty
354 */
355 public void testElement() throws InterruptedException {
356 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
357 for (int i = 0; i < SIZE; ++i) {
358 assertEquals(i, (int) q.element());
359 assertEquals(i, (int) q.poll());
360 }
361 try {
362 q.element();
363 shouldThrow();
364 } catch (NoSuchElementException success) {}
365 checkEmpty(q);
366 }
367
368 /**
369 * remove removes next element, or throws NoSuchElementException if empty
370 */
371 public void testRemove() throws InterruptedException {
372 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
373 for (int i = 0; i < SIZE; ++i) {
374 assertEquals(i, (int) q.remove());
375 }
376 try {
377 q.remove();
378 shouldThrow();
379 } catch (NoSuchElementException success) {}
380 checkEmpty(q);
381 }
382
383 /**
384 * An add following remove(x) succeeds
385 */
386 public void testRemoveElementAndAdd() throws InterruptedException {
387 LinkedTransferQueue q = new LinkedTransferQueue();
388 assertTrue(q.add(one));
389 assertTrue(q.add(two));
390 assertTrue(q.remove(one));
391 assertTrue(q.remove(two));
392 assertTrue(q.add(three));
393 assertSame(q.take(), three);
394 }
395
396 /**
397 * contains(x) reports true when elements added but not yet removed
398 */
399 public void testContains() {
400 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
401 for (int i = 0; i < SIZE; ++i) {
402 assertTrue(q.contains(i));
403 assertEquals(i, (int) q.poll());
404 assertFalse(q.contains(i));
405 }
406 }
407
408 /**
409 * clear removes all elements
410 */
411 public void testClear() throws InterruptedException {
412 LinkedTransferQueue q = populatedQueue(SIZE);
413 q.clear();
414 checkEmpty(q);
415 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
416 q.add(one);
417 assertFalse(q.isEmpty());
418 assertEquals(1, q.size());
419 assertTrue(q.contains(one));
420 q.clear();
421 checkEmpty(q);
422 }
423
424 /**
425 * containsAll(c) is true when c contains a subset of elements
426 */
427 public void testContainsAll() {
428 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
429 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
430 for (int i = 0; i < SIZE; ++i) {
431 assertTrue(q.containsAll(p));
432 assertFalse(p.containsAll(q));
433 p.add(i);
434 }
435 assertTrue(p.containsAll(q));
436 }
437
438 /**
439 * retainAll(c) retains only those elements of c and reports true
440 * if changed
441 */
442 public void testRetainAll() {
443 LinkedTransferQueue q = populatedQueue(SIZE);
444 LinkedTransferQueue p = populatedQueue(SIZE);
445 for (int i = 0; i < SIZE; ++i) {
446 boolean changed = q.retainAll(p);
447 if (i == 0) {
448 assertFalse(changed);
449 } else {
450 assertTrue(changed);
451 }
452 assertTrue(q.containsAll(p));
453 assertEquals(SIZE - i, q.size());
454 p.remove();
455 }
456 }
457
458 /**
459 * removeAll(c) removes only those elements of c and reports true
460 * if changed
461 */
462 public void testRemoveAll() {
463 for (int i = 1; i < SIZE; ++i) {
464 LinkedTransferQueue q = populatedQueue(SIZE);
465 LinkedTransferQueue p = populatedQueue(i);
466 assertTrue(q.removeAll(p));
467 assertEquals(SIZE - i, q.size());
468 for (int j = 0; j < i; ++j) {
469 assertFalse(q.contains(p.remove()));
470 }
471 }
472 }
473
474 /**
475 * toArray() contains all elements in FIFO order
476 */
477 public void testToArray() {
478 LinkedTransferQueue q = populatedQueue(SIZE);
479 Object[] o = q.toArray();
480 for (int i = 0; i < o.length; i++) {
481 assertSame(o[i], q.poll());
482 }
483 }
484
485 /**
486 * toArray(a) contains all elements in FIFO order
487 */
488 public void testToArray2() {
489 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
490 Integer[] ints = new Integer[SIZE];
491 Integer[] array = q.toArray(ints);
492 assertSame(ints, array);
493 for (int i = 0; i < ints.length; i++) {
494 assertSame(ints[i], q.poll());
495 }
496 }
497
498 /**
499 * toArray(incompatible array type) throws ArrayStoreException
500 */
501 public void testToArray1_BadArg() {
502 LinkedTransferQueue q = populatedQueue(SIZE);
503 try {
504 q.toArray(new String[10]);
505 shouldThrow();
506 } catch (ArrayStoreException success) {}
507 }
508
509 /**
510 * iterator iterates through all elements
511 */
512 public void testIterator() throws InterruptedException {
513 LinkedTransferQueue q = populatedQueue(SIZE);
514 Iterator it = q.iterator();
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100515 int i;
516 for (i = 0; it.hasNext(); i++)
517 assertTrue(q.contains(it.next()));
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100518 assertEquals(i, SIZE);
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100519 assertIteratorExhausted(it);
520
521 it = q.iterator();
522 for (i = 0; it.hasNext(); i++)
523 assertEquals(it.next(), q.take());
524 assertEquals(i, SIZE);
525 assertIteratorExhausted(it);
526 }
527
528 /**
529 * iterator of empty collection has no elements
530 */
531 public void testEmptyIterator() {
532 assertIteratorExhausted(new LinkedTransferQueue().iterator());
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100533 }
534
535 /**
536 * iterator.remove() removes current element
537 */
538 public void testIteratorRemove() {
539 final LinkedTransferQueue q = new LinkedTransferQueue();
540 q.add(two);
541 q.add(one);
542 q.add(three);
543
544 Iterator it = q.iterator();
545 it.next();
546 it.remove();
547
548 it = q.iterator();
549 assertSame(it.next(), one);
550 assertSame(it.next(), three);
551 assertFalse(it.hasNext());
552 }
553
554 /**
555 * iterator ordering is FIFO
556 */
557 public void testIteratorOrdering() {
558 final LinkedTransferQueue<Integer> q
559 = new LinkedTransferQueue<Integer>();
560 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
561 q.add(one);
562 q.add(two);
563 q.add(three);
564 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
565 int k = 0;
566 for (Integer n : q) {
567 assertEquals(++k, (int) n);
568 }
569 assertEquals(3, k);
570 }
571
572 /**
573 * Modifications do not cause iterators to fail
574 */
575 public void testWeaklyConsistentIteration() {
576 final LinkedTransferQueue q = new LinkedTransferQueue();
577 q.add(one);
578 q.add(two);
579 q.add(three);
580 for (Iterator it = q.iterator(); it.hasNext();) {
581 q.remove();
582 it.next();
583 }
584 assertEquals(0, q.size());
585 }
586
587 /**
588 * toString contains toStrings of elements
589 */
590 public void testToString() {
591 LinkedTransferQueue q = populatedQueue(SIZE);
592 String s = q.toString();
593 for (int i = 0; i < SIZE; ++i) {
594 assertTrue(s.contains(String.valueOf(i)));
595 }
596 }
597
598 /**
599 * offer transfers elements across Executor tasks
600 */
601 public void testOfferInExecutor() {
602 final LinkedTransferQueue q = new LinkedTransferQueue();
603 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000604 final ExecutorService executor = Executors.newFixedThreadPool(2);
605 try (PoolCleaner cleaner = cleaner(executor)) {
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100606
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000607 executor.execute(new CheckedRunnable() {
608 public void realRun() throws InterruptedException {
609 threadsStarted.await();
610 long startTime = System.nanoTime();
611 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
612 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
613 }});
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100614
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000615 executor.execute(new CheckedRunnable() {
616 public void realRun() throws InterruptedException {
617 threadsStarted.await();
618 assertSame(one, q.take());
619 checkEmpty(q);
620 }});
621 }
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100622 }
623
624 /**
625 * timed poll retrieves elements across Executor threads
626 */
627 public void testPollInExecutor() {
628 final LinkedTransferQueue q = new LinkedTransferQueue();
629 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000630 final ExecutorService executor = Executors.newFixedThreadPool(2);
631 try (PoolCleaner cleaner = cleaner(executor)) {
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100632
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000633 executor.execute(new CheckedRunnable() {
634 public void realRun() throws InterruptedException {
635 assertNull(q.poll());
636 threadsStarted.await();
637 long startTime = System.nanoTime();
638 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
639 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
640 checkEmpty(q);
641 }});
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100642
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000643 executor.execute(new CheckedRunnable() {
644 public void realRun() throws InterruptedException {
645 threadsStarted.await();
646 q.put(one);
647 }});
648 }
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100649 }
650
651 /**
652 * A deserialized serialized queue has same elements in same order
653 */
654 public void testSerialization() throws Exception {
655 Queue x = populatedQueue(SIZE);
656 Queue y = serialClone(x);
657
658 assertNotSame(y, x);
659 assertEquals(x.size(), y.size());
660 assertEquals(x.toString(), y.toString());
661 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
662 while (!x.isEmpty()) {
663 assertFalse(y.isEmpty());
664 assertEquals(x.remove(), y.remove());
665 }
666 assertTrue(y.isEmpty());
667 }
668
669 /**
670 * drainTo(c) empties queue into another collection c
671 */
672 public void testDrainTo() {
673 LinkedTransferQueue q = populatedQueue(SIZE);
674 ArrayList l = new ArrayList();
675 q.drainTo(l);
676 assertEquals(0, q.size());
677 assertEquals(SIZE, l.size());
678 for (int i = 0; i < SIZE; ++i) {
679 assertEquals(i, l.get(i));
680 }
681 q.add(zero);
682 q.add(one);
683 assertFalse(q.isEmpty());
684 assertTrue(q.contains(zero));
685 assertTrue(q.contains(one));
686 l.clear();
687 q.drainTo(l);
688 assertEquals(0, q.size());
689 assertEquals(2, l.size());
690 for (int i = 0; i < 2; ++i) {
691 assertEquals(i, l.get(i));
692 }
693 }
694
695 /**
696 * drainTo(c) empties full queue, unblocking a waiting put.
697 */
698 public void testDrainToWithActivePut() throws InterruptedException {
699 final LinkedTransferQueue q = populatedQueue(SIZE);
700 Thread t = newStartedThread(new CheckedRunnable() {
701 public void realRun() {
702 q.put(SIZE + 1);
703 }});
704 ArrayList l = new ArrayList();
705 q.drainTo(l);
706 assertTrue(l.size() >= SIZE);
707 for (int i = 0; i < SIZE; ++i)
708 assertEquals(i, l.get(i));
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000709 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100710 assertTrue(q.size() + l.size() >= SIZE);
711 }
712
713 /**
714 * drainTo(c, n) empties first min(n, size) elements of queue into c
715 */
716 public void testDrainToN() {
717 LinkedTransferQueue q = new LinkedTransferQueue();
718 for (int i = 0; i < SIZE + 2; ++i) {
719 for (int j = 0; j < SIZE; j++) {
720 assertTrue(q.offer(j));
721 }
722 ArrayList l = new ArrayList();
723 q.drainTo(l, i);
724 int k = (i < SIZE) ? i : SIZE;
725 assertEquals(k, l.size());
726 assertEquals(SIZE - k, q.size());
727 for (int j = 0; j < k; ++j)
728 assertEquals(j, l.get(j));
Narayan Kamath8e9a0e92015-04-28 11:40:00 +0100729 do {} while (q.poll() != null);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100730 }
731 }
732
733 /**
734 * timed poll() or take() increments the waiting consumer count;
735 * offer(e) decrements the waiting consumer count
736 */
737 public void testWaitingConsumer() throws InterruptedException {
738 final LinkedTransferQueue q = new LinkedTransferQueue();
739 assertEquals(0, q.getWaitingConsumerCount());
740 assertFalse(q.hasWaitingConsumer());
741 final CountDownLatch threadStarted = new CountDownLatch(1);
742
743 Thread t = newStartedThread(new CheckedRunnable() {
744 public void realRun() throws InterruptedException {
745 threadStarted.countDown();
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000746 long startTime = System.nanoTime();
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100747 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
748 assertEquals(0, q.getWaitingConsumerCount());
749 assertFalse(q.hasWaitingConsumer());
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000750 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100751 }});
752
753 threadStarted.await();
Tobias Thierercff661d2017-02-19 15:01:51 +0000754 Callable<Boolean> oneConsumer
755 = new Callable<Boolean>() { public Boolean call() {
756 return q.hasWaitingConsumer()
757 && q.getWaitingConsumerCount() == 1; }};
758 waitForThreadToEnterWaitState(t, oneConsumer);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100759
760 assertTrue(q.offer(one));
761 assertEquals(0, q.getWaitingConsumerCount());
762 assertFalse(q.hasWaitingConsumer());
763
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000764 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100765 }
766
767 /**
768 * transfer(null) throws NullPointerException
769 */
770 public void testTransfer1() throws InterruptedException {
771 try {
772 LinkedTransferQueue q = new LinkedTransferQueue();
773 q.transfer(null);
774 shouldThrow();
775 } catch (NullPointerException success) {}
776 }
777
778 /**
779 * transfer waits until a poll occurs. The transfered element
780 * is returned by this associated poll.
781 */
782 public void testTransfer2() throws InterruptedException {
783 final LinkedTransferQueue<Integer> q
784 = new LinkedTransferQueue<Integer>();
785 final CountDownLatch threadStarted = new CountDownLatch(1);
786
787 Thread t = newStartedThread(new CheckedRunnable() {
788 public void realRun() throws InterruptedException {
789 threadStarted.countDown();
790 q.transfer(five);
791 checkEmpty(q);
792 }});
793
794 threadStarted.await();
Tobias Thierercff661d2017-02-19 15:01:51 +0000795 Callable<Boolean> oneElement
796 = new Callable<Boolean>() { public Boolean call() {
797 return !q.isEmpty() && q.size() == 1; }};
798 waitForThreadToEnterWaitState(t, oneElement);
799
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100800 assertSame(five, q.poll());
801 checkEmpty(q);
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000802 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100803 }
804
805 /**
806 * transfer waits until a poll occurs, and then transfers in fifo order
807 */
808 public void testTransfer3() throws InterruptedException {
809 final LinkedTransferQueue<Integer> q
810 = new LinkedTransferQueue<Integer>();
811
812 Thread first = newStartedThread(new CheckedRunnable() {
813 public void realRun() throws InterruptedException {
814 q.transfer(four);
815 assertTrue(!q.contains(four));
816 assertEquals(1, q.size());
817 }});
818
819 Thread interruptedThread = newStartedThread(
820 new CheckedInterruptedRunnable() {
821 public void realRun() throws InterruptedException {
822 while (q.isEmpty())
823 Thread.yield();
824 q.transfer(five);
825 }});
826
827 while (q.size() < 2)
828 Thread.yield();
829 assertEquals(2, q.size());
830 assertSame(four, q.poll());
831 first.join();
832 assertEquals(1, q.size());
833 interruptedThread.interrupt();
834 interruptedThread.join();
835 checkEmpty(q);
836 }
837
838 /**
839 * transfer waits until a poll occurs, at which point the polling
840 * thread returns the element
841 */
842 public void testTransfer4() throws InterruptedException {
843 final LinkedTransferQueue q = new LinkedTransferQueue();
844
845 Thread t = newStartedThread(new CheckedRunnable() {
846 public void realRun() throws InterruptedException {
847 q.transfer(four);
848 assertFalse(q.contains(four));
849 assertSame(three, q.poll());
850 }});
851
852 while (q.isEmpty())
853 Thread.yield();
854 assertFalse(q.isEmpty());
855 assertEquals(1, q.size());
856 assertTrue(q.offer(three));
857 assertSame(four, q.poll());
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000858 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100859 }
860
861 /**
862 * transfer waits until a take occurs. The transfered element
863 * is returned by this associated take.
864 */
865 public void testTransfer5() throws InterruptedException {
866 final LinkedTransferQueue<Integer> q
867 = new LinkedTransferQueue<Integer>();
868
869 Thread t = newStartedThread(new CheckedRunnable() {
870 public void realRun() throws InterruptedException {
871 q.transfer(four);
872 checkEmpty(q);
873 }});
874
875 while (q.isEmpty())
876 Thread.yield();
877 assertFalse(q.isEmpty());
878 assertEquals(1, q.size());
879 assertSame(four, q.take());
880 checkEmpty(q);
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000881 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100882 }
883
884 /**
885 * tryTransfer(null) throws NullPointerException
886 */
887 public void testTryTransfer1() {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000888 final LinkedTransferQueue q = new LinkedTransferQueue();
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100889 try {
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100890 q.tryTransfer(null);
891 shouldThrow();
892 } catch (NullPointerException success) {}
893 }
894
895 /**
896 * tryTransfer returns false and does not enqueue if there are no
897 * consumers waiting to poll or take.
898 */
899 public void testTryTransfer2() throws InterruptedException {
900 final LinkedTransferQueue q = new LinkedTransferQueue();
901 assertFalse(q.tryTransfer(new Object()));
902 assertFalse(q.hasWaitingConsumer());
903 checkEmpty(q);
904 }
905
906 /**
907 * If there is a consumer waiting in timed poll, tryTransfer
908 * returns true while successfully transfering object.
909 */
910 public void testTryTransfer3() throws InterruptedException {
911 final Object hotPotato = new Object();
912 final LinkedTransferQueue q = new LinkedTransferQueue();
913
914 Thread t = newStartedThread(new CheckedRunnable() {
915 public void realRun() {
916 while (! q.hasWaitingConsumer())
917 Thread.yield();
918 assertTrue(q.hasWaitingConsumer());
919 checkEmpty(q);
920 assertTrue(q.tryTransfer(hotPotato));
921 }});
922
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000923 long startTime = System.nanoTime();
924 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
925 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100926 checkEmpty(q);
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000927 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100928 }
929
930 /**
931 * If there is a consumer waiting in take, tryTransfer returns
932 * true while successfully transfering object.
933 */
934 public void testTryTransfer4() throws InterruptedException {
935 final Object hotPotato = new Object();
936 final LinkedTransferQueue q = new LinkedTransferQueue();
937
938 Thread t = newStartedThread(new CheckedRunnable() {
939 public void realRun() {
940 while (! q.hasWaitingConsumer())
941 Thread.yield();
942 assertTrue(q.hasWaitingConsumer());
943 checkEmpty(q);
944 assertTrue(q.tryTransfer(hotPotato));
945 }});
946
947 assertSame(q.take(), hotPotato);
948 checkEmpty(q);
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000949 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100950 }
951
952 /**
953 * tryTransfer blocks interruptibly if no takers
954 */
955 public void testTryTransfer5() throws InterruptedException {
956 final LinkedTransferQueue q = new LinkedTransferQueue();
957 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
958 assertTrue(q.isEmpty());
959
960 Thread t = newStartedThread(new CheckedRunnable() {
961 public void realRun() throws InterruptedException {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000962 long startTime = System.nanoTime();
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100963 Thread.currentThread().interrupt();
964 try {
965 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
966 shouldThrow();
967 } catch (InterruptedException success) {}
968 assertFalse(Thread.interrupted());
969
970 pleaseInterrupt.countDown();
971 try {
972 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
973 shouldThrow();
974 } catch (InterruptedException success) {}
975 assertFalse(Thread.interrupted());
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000976 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100977 }});
978
979 await(pleaseInterrupt);
980 assertThreadStaysAlive(t);
981 t.interrupt();
982 awaitTermination(t);
983 checkEmpty(q);
984 }
985
986 /**
987 * tryTransfer gives up after the timeout and returns false
988 */
989 public void testTryTransfer6() throws InterruptedException {
990 final LinkedTransferQueue q = new LinkedTransferQueue();
991
992 Thread t = newStartedThread(new CheckedRunnable() {
993 public void realRun() throws InterruptedException {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000994 long startTime = System.nanoTime();
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100995 assertFalse(q.tryTransfer(new Object(),
996 timeoutMillis(), MILLISECONDS));
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +0000997 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
Calin Juravle8f0d92b2013-08-01 17:26:00 +0100998 checkEmpty(q);
999 }});
1000
1001 awaitTermination(t);
1002 checkEmpty(q);
1003 }
1004
1005 /**
1006 * tryTransfer waits for any elements previously in to be removed
1007 * before transfering to a poll or take
1008 */
1009 public void testTryTransfer7() throws InterruptedException {
1010 final LinkedTransferQueue q = new LinkedTransferQueue();
1011 assertTrue(q.offer(four));
1012
1013 Thread t = newStartedThread(new CheckedRunnable() {
1014 public void realRun() throws InterruptedException {
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +00001015 long startTime = System.nanoTime();
1016 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1017 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001018 checkEmpty(q);
1019 }});
1020
1021 while (q.size() != 2)
1022 Thread.yield();
1023 assertEquals(2, q.size());
1024 assertSame(four, q.poll());
1025 assertSame(five, q.poll());
1026 checkEmpty(q);
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +00001027 awaitTermination(t);
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001028 }
1029
1030 /**
1031 * tryTransfer attempts to enqueue into the queue and fails
1032 * returning false not enqueueing and the successive poll is null
1033 */
1034 public void testTryTransfer8() throws InterruptedException {
1035 final LinkedTransferQueue q = new LinkedTransferQueue();
1036 assertTrue(q.offer(four));
1037 assertEquals(1, q.size());
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +00001038 long startTime = System.nanoTime();
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001039 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
Przemyslaw Szczepaniakb8b75112016-03-11 15:59:10 +00001040 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001041 assertEquals(1, q.size());
1042 assertSame(four, q.poll());
1043 assertNull(q.poll());
1044 checkEmpty(q);
1045 }
1046
1047 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1048 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1049 checkEmpty(q);
1050 for (int i = 0; i < n; i++) {
1051 assertEquals(i, q.size());
1052 assertTrue(q.offer(i));
1053 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1054 }
1055 assertFalse(q.isEmpty());
1056 return q;
1057 }
Narayan Kamath8e9a0e92015-04-28 11:40:00 +01001058
1059 /**
1060 * remove(null), contains(null) always return false
1061 */
1062 public void testNeverContainsNull() {
1063 Collection<?>[] qs = {
1064 new LinkedTransferQueue<Object>(),
1065 populatedQueue(2),
1066 };
1067
1068 for (Collection<?> q : qs) {
1069 assertFalse(q.contains(null));
1070 assertFalse(q.remove(null));
1071 }
1072 }
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001073}