blob: 6ae450df066e9d37d077ac8c8322029974d2dd26 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
7# import threading after _multiprocessing to raise a more revelant error
8# message: "No module named _multiprocessing". _multiprocessing is not compiled
9# without thread support.
10test.support.import_module('threading')
11
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010012from test.script_helper import assert_python_ok
13
14import sys
Brian Quinlan81c4d362010-09-18 22:35:02 +000015import threading
16import time
17import unittest
18
Brian Quinlan81c4d362010-09-18 22:35:02 +000019from concurrent import futures
20from concurrent.futures._base import (
Brian Quinlan1d1df822011-01-03 02:56:39 +000021 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
Antoine Pitroudd696492011-06-08 17:21:55 +020022from concurrent.futures.process import BrokenProcessPool
Brian Quinlan81c4d362010-09-18 22:35:02 +000023
Brian Quinlan1d1df822011-01-03 02:56:39 +000024
Brian Quinlan81c4d362010-09-18 22:35:02 +000025def create_future(state=PENDING, exception=None, result=None):
26 f = Future()
27 f._state = state
28 f._exception = exception
29 f._result = result
30 return f
31
Brian Quinlan1d1df822011-01-03 02:56:39 +000032
Brian Quinlan81c4d362010-09-18 22:35:02 +000033PENDING_FUTURE = create_future(state=PENDING)
34RUNNING_FUTURE = create_future(state=RUNNING)
35CANCELLED_FUTURE = create_future(state=CANCELLED)
36CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +020037EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +000038SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
39
Brian Quinlan1d1df822011-01-03 02:56:39 +000040
Brian Quinlan81c4d362010-09-18 22:35:02 +000041def mul(x, y):
42 return x * y
43
Brian Quinlan81c4d362010-09-18 22:35:02 +000044
Brian Quinlan1d1df822011-01-03 02:56:39 +000045def sleep_and_raise(t):
46 time.sleep(t)
47 raise Exception('this is an exception')
Brian Quinlan81c4d362010-09-18 22:35:02 +000048
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010049def sleep_and_print(t, msg):
50 time.sleep(t)
51 print(msg)
52 sys.stdout.flush()
53
Brian Quinlan81c4d362010-09-18 22:35:02 +000054
Brian Quinlan1d1df822011-01-03 02:56:39 +000055class ExecutorMixin:
56 worker_count = 5
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010057
58 def setUp(self):
59 self.t1 = time.time()
60 try:
61 self.executor = self.executor_type(max_workers=self.worker_count)
62 except NotImplementedError as e:
63 self.skipTest(str(e))
64 self._prime_executor()
65
66 def tearDown(self):
67 self.executor.shutdown(wait=True)
68 dt = time.time() - self.t1
69 if test.support.verbose:
70 print("%.2fs" % dt, end=' ')
71 self.assertLess(dt, 60, "synchronization issue: test lasted too long")
72
Brian Quinlan1d1df822011-01-03 02:56:39 +000073 def _prime_executor(self):
74 # Make sure that the executor is ready to do work before running the
75 # tests. This should reduce the probability of timeouts in the tests.
76 futures = [self.executor.submit(time.sleep, 0.1)
77 for _ in range(self.worker_count)]
Brian Quinlan81c4d362010-09-18 22:35:02 +000078
Brian Quinlan1d1df822011-01-03 02:56:39 +000079 for f in futures:
80 f.result()
Brian Quinlan81c4d362010-09-18 22:35:02 +000081
Brian Quinlan81c4d362010-09-18 22:35:02 +000082
Brian Quinlan1d1df822011-01-03 02:56:39 +000083class ThreadPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010084 executor_type = futures.ThreadPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000085
Brian Quinlan81c4d362010-09-18 22:35:02 +000086
Brian Quinlan1d1df822011-01-03 02:56:39 +000087class ProcessPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010088 executor_type = futures.ProcessPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000089
Brian Quinlan81c4d362010-09-18 22:35:02 +000090
91class ExecutorShutdownTest(unittest.TestCase):
92 def test_run_after_shutdown(self):
93 self.executor.shutdown()
94 self.assertRaises(RuntimeError,
95 self.executor.submit,
96 pow, 2, 5)
97
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010098 def test_interpreter_shutdown(self):
99 # Test the atexit hook for shutdown of worker threads and processes
100 rc, out, err = assert_python_ok('-c', """if 1:
101 from concurrent.futures import {executor_type}
102 from time import sleep
103 from test.test_concurrent_futures import sleep_and_print
104 t = {executor_type}(5)
105 t.submit(sleep_and_print, 1.0, "apple")
106 """.format(executor_type=self.executor_type.__name__))
107 # Errors in atexit hooks don't change the process exit code, check
108 # stderr manually.
109 self.assertFalse(err)
110 self.assertEqual(out.strip(), b"apple")
111
Ross Lagerwall66e2fb62012-01-08 08:29:40 +0200112 def test_hang_issue12364(self):
113 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
114 self.executor.shutdown()
115 for f in fs:
116 f.result()
117
Brian Quinlan81c4d362010-09-18 22:35:02 +0000118
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000119class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000120 def _prime_executor(self):
121 pass
122
Brian Quinlan81c4d362010-09-18 22:35:02 +0000123 def test_threads_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000124 self.executor.submit(mul, 21, 2)
125 self.executor.submit(mul, 6, 7)
126 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000127 self.assertEqual(len(self.executor._threads), 3)
128 self.executor.shutdown()
129 for t in self.executor._threads:
130 t.join()
131
132 def test_context_manager_shutdown(self):
133 with futures.ThreadPoolExecutor(max_workers=5) as e:
134 executor = e
135 self.assertEqual(list(e.map(abs, range(-5, 5))),
136 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
137
138 for t in executor._threads:
139 t.join()
140
141 def test_del_shutdown(self):
142 executor = futures.ThreadPoolExecutor(max_workers=5)
143 executor.map(abs, range(-5, 5))
144 threads = executor._threads
145 del executor
146
147 for t in threads:
148 t.join()
149
Brian Quinlan1d1df822011-01-03 02:56:39 +0000150
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000151class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000152 def _prime_executor(self):
153 pass
154
Brian Quinlan81c4d362010-09-18 22:35:02 +0000155 def test_processes_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000156 self.executor.submit(mul, 21, 2)
157 self.executor.submit(mul, 6, 7)
158 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000159 self.assertEqual(len(self.executor._processes), 5)
160 processes = self.executor._processes
161 self.executor.shutdown()
162
Antoine Pitroudd696492011-06-08 17:21:55 +0200163 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000164 p.join()
165
166 def test_context_manager_shutdown(self):
167 with futures.ProcessPoolExecutor(max_workers=5) as e:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000168 processes = e._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000169 self.assertEqual(list(e.map(abs, range(-5, 5))),
170 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
171
Antoine Pitroudd696492011-06-08 17:21:55 +0200172 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000173 p.join()
174
175 def test_del_shutdown(self):
176 executor = futures.ProcessPoolExecutor(max_workers=5)
177 list(executor.map(abs, range(-5, 5)))
178 queue_management_thread = executor._queue_management_thread
179 processes = executor._processes
180 del executor
181
182 queue_management_thread.join()
Antoine Pitroudd696492011-06-08 17:21:55 +0200183 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000184 p.join()
185
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200186
Brian Quinlan81c4d362010-09-18 22:35:02 +0000187class WaitTests(unittest.TestCase):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200188
Brian Quinlan81c4d362010-09-18 22:35:02 +0000189 def test_first_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000190 future1 = self.executor.submit(mul, 21, 2)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100191 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000192
Brian Quinlan1d1df822011-01-03 02:56:39 +0000193 done, not_done = futures.wait(
194 [CANCELLED_FUTURE, future1, future2],
195 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000196
Brian Quinlan1d1df822011-01-03 02:56:39 +0000197 self.assertEqual(set([future1]), done)
198 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000199
Brian Quinlan1d1df822011-01-03 02:56:39 +0000200 def test_first_completed_some_already_completed(self):
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100201 future1 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000202
Brian Quinlan1d1df822011-01-03 02:56:39 +0000203 finished, pending = futures.wait(
204 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
205 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000206
Brian Quinlan1d1df822011-01-03 02:56:39 +0000207 self.assertEqual(
208 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
209 finished)
210 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000211
212 def test_first_exception(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000213 future1 = self.executor.submit(mul, 2, 21)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100214 future2 = self.executor.submit(sleep_and_raise, 1.5)
215 future3 = self.executor.submit(time.sleep, 3)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000216
Brian Quinlan1d1df822011-01-03 02:56:39 +0000217 finished, pending = futures.wait(
218 [future1, future2, future3],
219 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000220
Brian Quinlan1d1df822011-01-03 02:56:39 +0000221 self.assertEqual(set([future1, future2]), finished)
222 self.assertEqual(set([future3]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223
224 def test_first_exception_some_already_complete(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000225 future1 = self.executor.submit(divmod, 21, 0)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100226 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000227
Brian Quinlan1d1df822011-01-03 02:56:39 +0000228 finished, pending = futures.wait(
229 [SUCCESSFUL_FUTURE,
230 CANCELLED_FUTURE,
231 CANCELLED_AND_NOTIFIED_FUTURE,
232 future1, future2],
233 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000234
Brian Quinlan1d1df822011-01-03 02:56:39 +0000235 self.assertEqual(set([SUCCESSFUL_FUTURE,
236 CANCELLED_AND_NOTIFIED_FUTURE,
237 future1]), finished)
238 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000239
240 def test_first_exception_one_already_failed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000241 future1 = self.executor.submit(time.sleep, 2)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000242
Brian Quinlan1d1df822011-01-03 02:56:39 +0000243 finished, pending = futures.wait(
244 [EXCEPTION_FUTURE, future1],
245 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000246
Brian Quinlan1d1df822011-01-03 02:56:39 +0000247 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
248 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000249
250 def test_all_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000251 future1 = self.executor.submit(divmod, 2, 0)
252 future2 = self.executor.submit(mul, 2, 21)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000253
Brian Quinlan1d1df822011-01-03 02:56:39 +0000254 finished, pending = futures.wait(
255 [SUCCESSFUL_FUTURE,
256 CANCELLED_AND_NOTIFIED_FUTURE,
257 EXCEPTION_FUTURE,
258 future1,
259 future2],
260 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000261
Brian Quinlan1d1df822011-01-03 02:56:39 +0000262 self.assertEqual(set([SUCCESSFUL_FUTURE,
263 CANCELLED_AND_NOTIFIED_FUTURE,
264 EXCEPTION_FUTURE,
265 future1,
266 future2]), finished)
267 self.assertEqual(set(), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000268
269 def test_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000270 future1 = self.executor.submit(mul, 6, 7)
Brian Quinlan1ae29982011-05-30 21:52:24 +1000271 future2 = self.executor.submit(time.sleep, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000272
Brian Quinlan1d1df822011-01-03 02:56:39 +0000273 finished, pending = futures.wait(
274 [CANCELLED_AND_NOTIFIED_FUTURE,
275 EXCEPTION_FUTURE,
276 SUCCESSFUL_FUTURE,
277 future1, future2],
Brian Quinlan1ae29982011-05-30 21:52:24 +1000278 timeout=5,
Brian Quinlan1d1df822011-01-03 02:56:39 +0000279 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000280
Brian Quinlan1d1df822011-01-03 02:56:39 +0000281 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
282 EXCEPTION_FUTURE,
283 SUCCESSFUL_FUTURE,
284 future1]), finished)
285 self.assertEqual(set([future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000286
287
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000288class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200289
290 def test_pending_calls_race(self):
291 # Issue #14406: multi-threaded race condition when waiting on all
292 # futures.
293 event = threading.Event()
294 def future_func():
295 event.wait()
296 oldswitchinterval = sys.getswitchinterval()
297 sys.setswitchinterval(1e-6)
298 try:
299 fs = {self.executor.submit(future_func) for i in range(100)}
300 event.set()
301 futures.wait(fs, return_when=futures.ALL_COMPLETED)
302 finally:
303 sys.setswitchinterval(oldswitchinterval)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000304
Brian Quinlan1d1df822011-01-03 02:56:39 +0000305
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000306class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
307 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000308
Brian Quinlan1d1df822011-01-03 02:56:39 +0000309
Brian Quinlan81c4d362010-09-18 22:35:02 +0000310class AsCompletedTests(unittest.TestCase):
311 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
312 def test_no_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000313 future1 = self.executor.submit(mul, 2, 21)
314 future2 = self.executor.submit(mul, 7, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000315
Brian Quinlan1d1df822011-01-03 02:56:39 +0000316 completed = set(futures.as_completed(
317 [CANCELLED_AND_NOTIFIED_FUTURE,
318 EXCEPTION_FUTURE,
319 SUCCESSFUL_FUTURE,
320 future1, future2]))
321 self.assertEqual(set(
322 [CANCELLED_AND_NOTIFIED_FUTURE,
323 EXCEPTION_FUTURE,
324 SUCCESSFUL_FUTURE,
325 future1, future2]),
326 completed)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000327
328 def test_zero_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000329 future1 = self.executor.submit(time.sleep, 2)
330 completed_futures = set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000331 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000332 for future in futures.as_completed(
333 [CANCELLED_AND_NOTIFIED_FUTURE,
334 EXCEPTION_FUTURE,
335 SUCCESSFUL_FUTURE,
336 future1],
337 timeout=0):
338 completed_futures.add(future)
339 except futures.TimeoutError:
340 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000341
Brian Quinlan1d1df822011-01-03 02:56:39 +0000342 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
343 EXCEPTION_FUTURE,
344 SUCCESSFUL_FUTURE]),
345 completed_futures)
346
Brian Quinlan81c4d362010-09-18 22:35:02 +0000347
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000348class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
349 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000350
Brian Quinlan1d1df822011-01-03 02:56:39 +0000351
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000352class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
353 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000354
Brian Quinlan1d1df822011-01-03 02:56:39 +0000355
Brian Quinlan81c4d362010-09-18 22:35:02 +0000356class ExecutorTest(unittest.TestCase):
357 # Executor.shutdown() and context manager usage is tested by
358 # ExecutorShutdownTest.
359 def test_submit(self):
360 future = self.executor.submit(pow, 2, 8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000361 self.assertEqual(256, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000362
363 def test_submit_keyword(self):
364 future = self.executor.submit(mul, 2, y=8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000365 self.assertEqual(16, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000366
367 def test_map(self):
368 self.assertEqual(
369 list(self.executor.map(pow, range(10), range(10))),
370 list(map(pow, range(10), range(10))))
371
372 def test_map_exception(self):
373 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
374 self.assertEqual(i.__next__(), (0, 1))
375 self.assertEqual(i.__next__(), (0, 1))
376 self.assertRaises(ZeroDivisionError, i.__next__)
377
378 def test_map_timeout(self):
379 results = []
Brian Quinlan81c4d362010-09-18 22:35:02 +0000380 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000381 for i in self.executor.map(time.sleep,
Brian Quinlan1ae29982011-05-30 21:52:24 +1000382 [0, 0, 6],
383 timeout=5):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000384 results.append(i)
385 except futures.TimeoutError:
386 pass
387 else:
388 self.fail('expected TimeoutError')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000389
Brian Quinlan1d1df822011-01-03 02:56:39 +0000390 self.assertEqual([None, None], results)
391
Antoine Pitrou020436b2011-07-02 21:20:25 +0200392 def test_shutdown_race_issue12456(self):
393 # Issue #12456: race condition at shutdown where trying to post a
394 # sentinel in the call queue blocks (the queue is full while processes
395 # have exited).
396 self.executor.map(str, [2] * (self.worker_count + 1))
397 self.executor.shutdown()
398
Brian Quinlan81c4d362010-09-18 22:35:02 +0000399
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000400class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
Brian Quinlanf0078762011-04-08 08:19:33 +1000401 def test_map_submits_without_iteration(self):
402 """Tests verifying issue 11777."""
403 finished = []
404 def record_finished(n):
405 finished.append(n)
406
407 self.executor.map(record_finished, range(10))
408 self.executor.shutdown(wait=True)
409 self.assertCountEqual(finished, range(10))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000410
Brian Quinlan1d1df822011-01-03 02:56:39 +0000411
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000412class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
Antoine Pitroudd696492011-06-08 17:21:55 +0200413 def test_killed_child(self):
414 # When a child process is abruptly terminated, the whole pool gets
415 # "broken".
416 futures = [self.executor.submit(time.sleep, 3)]
417 # Get one of the processes, and terminate (kill) it
418 p = next(iter(self.executor._processes.values()))
419 p.terminate()
420 for fut in futures:
421 self.assertRaises(BrokenProcessPool, fut.result)
422 # Submitting other jobs fails as well.
423 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000424
Brian Quinlan1d1df822011-01-03 02:56:39 +0000425
Brian Quinlan81c4d362010-09-18 22:35:02 +0000426class FutureTests(unittest.TestCase):
427 def test_done_callback_with_result(self):
428 callback_result = None
429 def fn(callback_future):
430 nonlocal callback_result
431 callback_result = callback_future.result()
432
433 f = Future()
434 f.add_done_callback(fn)
435 f.set_result(5)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000436 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000437
438 def test_done_callback_with_exception(self):
439 callback_exception = None
440 def fn(callback_future):
441 nonlocal callback_exception
442 callback_exception = callback_future.exception()
443
444 f = Future()
445 f.add_done_callback(fn)
446 f.set_exception(Exception('test'))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000447 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000448
449 def test_done_callback_with_cancel(self):
450 was_cancelled = None
451 def fn(callback_future):
452 nonlocal was_cancelled
453 was_cancelled = callback_future.cancelled()
454
455 f = Future()
456 f.add_done_callback(fn)
457 self.assertTrue(f.cancel())
458 self.assertTrue(was_cancelled)
459
460 def test_done_callback_raises(self):
Brian Quinlan251cc842010-12-28 21:14:34 +0000461 with test.support.captured_stderr() as stderr:
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000462 raising_was_called = False
463 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000464
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000465 def raising_fn(callback_future):
466 nonlocal raising_was_called
467 raising_was_called = True
468 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000469
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000470 def fn(callback_future):
471 nonlocal fn_was_called
472 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +0000473
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000474 f = Future()
475 f.add_done_callback(raising_fn)
476 f.add_done_callback(fn)
477 f.set_result(5)
478 self.assertTrue(raising_was_called)
479 self.assertTrue(fn_was_called)
Brian Quinlan251cc842010-12-28 21:14:34 +0000480 self.assertIn('Exception: doh!', stderr.getvalue())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000481
482 def test_done_callback_already_successful(self):
483 callback_result = None
484 def fn(callback_future):
485 nonlocal callback_result
486 callback_result = callback_future.result()
487
488 f = Future()
489 f.set_result(5)
490 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000491 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000492
493 def test_done_callback_already_failed(self):
494 callback_exception = None
495 def fn(callback_future):
496 nonlocal callback_exception
497 callback_exception = callback_future.exception()
498
499 f = Future()
500 f.set_exception(Exception('test'))
501 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000502 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000503
504 def test_done_callback_already_cancelled(self):
505 was_cancelled = None
506 def fn(callback_future):
507 nonlocal was_cancelled
508 was_cancelled = callback_future.cancelled()
509
510 f = Future()
511 self.assertTrue(f.cancel())
512 f.add_done_callback(fn)
513 self.assertTrue(was_cancelled)
514
515 def test_repr(self):
Ezio Melottied3a7d22010-12-01 02:32:32 +0000516 self.assertRegex(repr(PENDING_FUTURE),
517 '<Future at 0x[0-9a-f]+ state=pending>')
518 self.assertRegex(repr(RUNNING_FUTURE),
519 '<Future at 0x[0-9a-f]+ state=running>')
520 self.assertRegex(repr(CANCELLED_FUTURE),
521 '<Future at 0x[0-9a-f]+ state=cancelled>')
522 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
523 '<Future at 0x[0-9a-f]+ state=cancelled>')
524 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000525 repr(EXCEPTION_FUTURE),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200526 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000528 repr(SUCCESSFUL_FUTURE),
529 '<Future at 0x[0-9a-f]+ state=finished returned int>')
530
531
532 def test_cancel(self):
533 f1 = create_future(state=PENDING)
534 f2 = create_future(state=RUNNING)
535 f3 = create_future(state=CANCELLED)
536 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200537 f5 = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000538 f6 = create_future(state=FINISHED, result=5)
539
540 self.assertTrue(f1.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000541 self.assertEqual(f1._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000542
543 self.assertFalse(f2.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000544 self.assertEqual(f2._state, RUNNING)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000545
546 self.assertTrue(f3.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000547 self.assertEqual(f3._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000548
549 self.assertTrue(f4.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000550 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000551
552 self.assertFalse(f5.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000553 self.assertEqual(f5._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000554
555 self.assertFalse(f6.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000556 self.assertEqual(f6._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000557
558 def test_cancelled(self):
559 self.assertFalse(PENDING_FUTURE.cancelled())
560 self.assertFalse(RUNNING_FUTURE.cancelled())
561 self.assertTrue(CANCELLED_FUTURE.cancelled())
562 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
563 self.assertFalse(EXCEPTION_FUTURE.cancelled())
564 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
565
566 def test_done(self):
567 self.assertFalse(PENDING_FUTURE.done())
568 self.assertFalse(RUNNING_FUTURE.done())
569 self.assertTrue(CANCELLED_FUTURE.done())
570 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
571 self.assertTrue(EXCEPTION_FUTURE.done())
572 self.assertTrue(SUCCESSFUL_FUTURE.done())
573
574 def test_running(self):
575 self.assertFalse(PENDING_FUTURE.running())
576 self.assertTrue(RUNNING_FUTURE.running())
577 self.assertFalse(CANCELLED_FUTURE.running())
578 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
579 self.assertFalse(EXCEPTION_FUTURE.running())
580 self.assertFalse(SUCCESSFUL_FUTURE.running())
581
582 def test_result_with_timeout(self):
583 self.assertRaises(futures.TimeoutError,
584 PENDING_FUTURE.result, timeout=0)
585 self.assertRaises(futures.TimeoutError,
586 RUNNING_FUTURE.result, timeout=0)
587 self.assertRaises(futures.CancelledError,
588 CANCELLED_FUTURE.result, timeout=0)
589 self.assertRaises(futures.CancelledError,
590 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200591 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000592 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
593
594 def test_result_with_success(self):
595 # TODO(brian@sweetapp.com): This test is timing dependant.
596 def notification():
597 # Wait until the main thread is waiting for the result.
598 time.sleep(1)
599 f1.set_result(42)
600
601 f1 = create_future(state=PENDING)
602 t = threading.Thread(target=notification)
603 t.start()
604
Ezio Melottib3aedd42010-11-20 19:04:17 +0000605 self.assertEqual(f1.result(timeout=5), 42)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000606
607 def test_result_with_cancel(self):
608 # TODO(brian@sweetapp.com): This test is timing dependant.
609 def notification():
610 # Wait until the main thread is waiting for the result.
611 time.sleep(1)
612 f1.cancel()
613
614 f1 = create_future(state=PENDING)
615 t = threading.Thread(target=notification)
616 t.start()
617
618 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
619
620 def test_exception_with_timeout(self):
621 self.assertRaises(futures.TimeoutError,
622 PENDING_FUTURE.exception, timeout=0)
623 self.assertRaises(futures.TimeoutError,
624 RUNNING_FUTURE.exception, timeout=0)
625 self.assertRaises(futures.CancelledError,
626 CANCELLED_FUTURE.exception, timeout=0)
627 self.assertRaises(futures.CancelledError,
628 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
629 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200630 OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000631 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
632
633 def test_exception_with_success(self):
634 def notification():
635 # Wait until the main thread is waiting for the exception.
636 time.sleep(1)
637 with f1._condition:
638 f1._state = FINISHED
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200639 f1._exception = OSError()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000640 f1._condition.notify_all()
641
642 f1 = create_future(state=PENDING)
643 t = threading.Thread(target=notification)
644 t.start()
645
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200646 self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000647
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200648@test.support.reap_threads
Brian Quinlan81c4d362010-09-18 22:35:02 +0000649def test_main():
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200650 try:
651 test.support.run_unittest(ProcessPoolExecutorTest,
652 ThreadPoolExecutorTest,
653 ProcessPoolWaitTests,
654 ThreadPoolWaitTests,
655 ProcessPoolAsCompletedTests,
656 ThreadPoolAsCompletedTests,
657 FutureTests,
658 ProcessPoolShutdownTest,
Antoine Pitroud06a0652011-07-16 01:13:34 +0200659 ThreadPoolShutdownTest,
660 )
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200661 finally:
662 test.support.reap_children()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000663
664if __name__ == "__main__":
665 test_main()