blob: e988cc6c7232fd8c4990853675352f215773bd68 [file] [log] [blame]
Calin Juravle8f0d92b2013-08-01 17:26:00 +01001/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9package jsr166;
10
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010011import static java.util.concurrent.TimeUnit.MILLISECONDS;
12
Calin Juravle8f0d92b2013-08-01 17:26:00 +010013import java.util.concurrent.ArrayBlockingQueue;
14import java.util.concurrent.Callable;
15import java.util.concurrent.ExecutorCompletionService;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010016import java.util.concurrent.Executors;
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010017import java.util.concurrent.ExecutorService;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010018import java.util.concurrent.Future;
19import java.util.concurrent.FutureTask;
20import java.util.concurrent.RunnableFuture;
21import java.util.concurrent.ThreadPoolExecutor;
22import java.util.concurrent.TimeUnit;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010023import java.util.concurrent.atomic.AtomicBoolean;
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010024
25import junit.framework.Test;
26import junit.framework.TestSuite;
Calin Juravle8f0d92b2013-08-01 17:26:00 +010027
28public class ExecutorCompletionServiceTest extends JSR166TestCase {
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010029 // android-note: Removed because the CTS runner does a bad job of
30 // retrying tests that have suite() declarations.
31 //
32 // public static void main(String[] args) {
33 // main(suite(), args);
34 // }
35 // public static Test suite() {
36 // return new TestSuite(...);
37 // }
Calin Juravle8f0d92b2013-08-01 17:26:00 +010038
39 /**
40 * Creating a new ECS with null Executor throw NPE
41 */
42 public void testConstructorNPE() {
43 try {
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010044 new ExecutorCompletionService(null);
Calin Juravle8f0d92b2013-08-01 17:26:00 +010045 shouldThrow();
46 } catch (NullPointerException success) {}
47 }
48
49 /**
50 * Creating a new ECS with null queue throw NPE
51 */
52 public void testConstructorNPE2() {
53 try {
54 ExecutorService e = Executors.newCachedThreadPool();
Narayan Kamath8e9a0e92015-04-28 11:40:00 +010055 new ExecutorCompletionService(e, null);
Calin Juravle8f0d92b2013-08-01 17:26:00 +010056 shouldThrow();
57 } catch (NullPointerException success) {}
58 }
59
60 /**
61 * Submitting a null callable throws NPE
62 */
63 public void testSubmitNPE() {
64 ExecutorService e = Executors.newCachedThreadPool();
65 ExecutorCompletionService ecs = new ExecutorCompletionService(e);
66 try {
67 Callable c = null;
68 ecs.submit(c);
69 shouldThrow();
70 } catch (NullPointerException success) {
71 } finally {
72 joinPool(e);
73 }
74 }
75
76 /**
77 * Submitting a null runnable throws NPE
78 */
79 public void testSubmitNPE2() {
80 ExecutorService e = Executors.newCachedThreadPool();
81 ExecutorCompletionService ecs = new ExecutorCompletionService(e);
82 try {
83 Runnable r = null;
84 ecs.submit(r, Boolean.TRUE);
85 shouldThrow();
86 } catch (NullPointerException success) {
87 } finally {
88 joinPool(e);
89 }
90 }
91
92 /**
93 * A taken submitted task is completed
94 */
95 public void testTake() throws InterruptedException {
96 ExecutorService e = Executors.newCachedThreadPool();
97 ExecutorCompletionService ecs = new ExecutorCompletionService(e);
98 try {
99 Callable c = new StringTask();
100 ecs.submit(c);
101 Future f = ecs.take();
102 assertTrue(f.isDone());
103 } finally {
104 joinPool(e);
105 }
106 }
107
108 /**
109 * Take returns the same future object returned by submit
110 */
111 public void testTake2() throws InterruptedException {
112 ExecutorService e = Executors.newCachedThreadPool();
113 ExecutorCompletionService ecs = new ExecutorCompletionService(e);
114 try {
115 Callable c = new StringTask();
116 Future f1 = ecs.submit(c);
117 Future f2 = ecs.take();
118 assertSame(f1, f2);
119 } finally {
120 joinPool(e);
121 }
122 }
123
124 /**
125 * If poll returns non-null, the returned task is completed
126 */
127 public void testPoll1() throws Exception {
128 ExecutorService e = Executors.newCachedThreadPool();
129 ExecutorCompletionService ecs = new ExecutorCompletionService(e);
130 try {
131 assertNull(ecs.poll());
132 Callable c = new StringTask();
133 ecs.submit(c);
134
135 long startTime = System.nanoTime();
136 Future f;
137 while ((f = ecs.poll()) == null) {
138 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
139 fail("timed out");
140 Thread.yield();
141 }
142 assertTrue(f.isDone());
143 assertSame(TEST_STRING, f.get());
144 } finally {
145 joinPool(e);
146 }
147 }
148
149 /**
150 * If timed poll returns non-null, the returned task is completed
151 */
152 public void testPoll2() throws InterruptedException {
153 ExecutorService e = Executors.newCachedThreadPool();
154 ExecutorCompletionService ecs = new ExecutorCompletionService(e);
155 try {
156 assertNull(ecs.poll());
157 Callable c = new StringTask();
158 ecs.submit(c);
159 Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS);
160 if (f != null)
161 assertTrue(f.isDone());
162 } finally {
163 joinPool(e);
164 }
165 }
166
167 /**
168 * Submitting to underlying AES that overrides newTaskFor(Callable)
169 * returns and eventually runs Future returned by newTaskFor.
170 */
171 public void testNewTaskForCallable() throws InterruptedException {
172 final AtomicBoolean done = new AtomicBoolean(false);
173 class MyCallableFuture<V> extends FutureTask<V> {
174 MyCallableFuture(Callable<V> c) { super(c); }
175 protected void done() { done.set(true); }
176 }
177 ExecutorService e = new ThreadPoolExecutor(
178 1, 1, 30L, TimeUnit.SECONDS,
179 new ArrayBlockingQueue<Runnable>(1)) {
180 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
181 return new MyCallableFuture<T>(c);
182 }};
183 ExecutorCompletionService<String> ecs =
184 new ExecutorCompletionService<String>(e);
185 try {
186 assertNull(ecs.poll());
187 Callable<String> c = new StringTask();
188 Future f1 = ecs.submit(c);
189 assertTrue("submit must return MyCallableFuture",
190 f1 instanceof MyCallableFuture);
191 Future f2 = ecs.take();
192 assertSame("submit and take must return same objects", f1, f2);
193 assertTrue("completed task must have set done", done.get());
194 } finally {
195 joinPool(e);
196 }
197 }
198
199 /**
200 * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
201 * returns and eventually runs Future returned by newTaskFor.
202 */
203 public void testNewTaskForRunnable() throws InterruptedException {
204 final AtomicBoolean done = new AtomicBoolean(false);
205 class MyRunnableFuture<V> extends FutureTask<V> {
206 MyRunnableFuture(Runnable t, V r) { super(t, r); }
207 protected void done() { done.set(true); }
208 }
209 ExecutorService e = new ThreadPoolExecutor(
210 1, 1, 30L, TimeUnit.SECONDS,
211 new ArrayBlockingQueue<Runnable>(1)) {
212 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
213 return new MyRunnableFuture<T>(t, r);
214 }};
215 ExecutorCompletionService<String> ecs =
216 new ExecutorCompletionService<String>(e);
217 try {
218 assertNull(ecs.poll());
219 Runnable r = new NoOpRunnable();
220 Future f1 = ecs.submit(r, null);
221 assertTrue("submit must return MyRunnableFuture",
222 f1 instanceof MyRunnableFuture);
223 Future f2 = ecs.take();
224 assertSame("submit and take must return same objects", f1, f2);
225 assertTrue("completed task must have set done", done.get());
226 } finally {
227 joinPool(e);
228 }
229 }
230
231}