blob: 4ad33091b0f1c66cd94b7b628ff2864ef9abcaa4 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
7# import threading after _multiprocessing to raise a more revelant error
8# message: "No module named _multiprocessing". _multiprocessing is not compiled
9# without thread support.
10test.support.import_module('threading')
11
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010012from test.script_helper import assert_python_ok
13
14import sys
Brian Quinlan81c4d362010-09-18 22:35:02 +000015import threading
16import time
17import unittest
Andrew Svetlov6b973742012-11-03 15:36:01 +020018import weakref
Brian Quinlan81c4d362010-09-18 22:35:02 +000019
Brian Quinlan81c4d362010-09-18 22:35:02 +000020from concurrent import futures
21from concurrent.futures._base import (
Brian Quinlan1d1df822011-01-03 02:56:39 +000022 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
Antoine Pitroudd696492011-06-08 17:21:55 +020023from concurrent.futures.process import BrokenProcessPool
Brian Quinlan81c4d362010-09-18 22:35:02 +000024
Brian Quinlan1d1df822011-01-03 02:56:39 +000025
Brian Quinlan81c4d362010-09-18 22:35:02 +000026def create_future(state=PENDING, exception=None, result=None):
27 f = Future()
28 f._state = state
29 f._exception = exception
30 f._result = result
31 return f
32
Brian Quinlan1d1df822011-01-03 02:56:39 +000033
Brian Quinlan81c4d362010-09-18 22:35:02 +000034PENDING_FUTURE = create_future(state=PENDING)
35RUNNING_FUTURE = create_future(state=RUNNING)
36CANCELLED_FUTURE = create_future(state=CANCELLED)
37CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +020038EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +000039SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
40
Brian Quinlan1d1df822011-01-03 02:56:39 +000041
Brian Quinlan81c4d362010-09-18 22:35:02 +000042def mul(x, y):
43 return x * y
44
Brian Quinlan81c4d362010-09-18 22:35:02 +000045
Brian Quinlan1d1df822011-01-03 02:56:39 +000046def sleep_and_raise(t):
47 time.sleep(t)
48 raise Exception('this is an exception')
Brian Quinlan81c4d362010-09-18 22:35:02 +000049
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010050def sleep_and_print(t, msg):
51 time.sleep(t)
52 print(msg)
53 sys.stdout.flush()
54
Brian Quinlan81c4d362010-09-18 22:35:02 +000055
Andrew Svetlov6b973742012-11-03 15:36:01 +020056class MyObject(object):
57 def my_method(self):
58 pass
59
60
Brian Quinlan1d1df822011-01-03 02:56:39 +000061class ExecutorMixin:
62 worker_count = 5
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010063
64 def setUp(self):
65 self.t1 = time.time()
66 try:
67 self.executor = self.executor_type(max_workers=self.worker_count)
68 except NotImplementedError as e:
69 self.skipTest(str(e))
70 self._prime_executor()
71
72 def tearDown(self):
73 self.executor.shutdown(wait=True)
74 dt = time.time() - self.t1
75 if test.support.verbose:
76 print("%.2fs" % dt, end=' ')
77 self.assertLess(dt, 60, "synchronization issue: test lasted too long")
78
Brian Quinlan1d1df822011-01-03 02:56:39 +000079 def _prime_executor(self):
80 # Make sure that the executor is ready to do work before running the
81 # tests. This should reduce the probability of timeouts in the tests.
82 futures = [self.executor.submit(time.sleep, 0.1)
83 for _ in range(self.worker_count)]
Brian Quinlan81c4d362010-09-18 22:35:02 +000084
Brian Quinlan1d1df822011-01-03 02:56:39 +000085 for f in futures:
86 f.result()
Brian Quinlan81c4d362010-09-18 22:35:02 +000087
Brian Quinlan81c4d362010-09-18 22:35:02 +000088
Brian Quinlan1d1df822011-01-03 02:56:39 +000089class ThreadPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010090 executor_type = futures.ThreadPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000091
Brian Quinlan81c4d362010-09-18 22:35:02 +000092
Brian Quinlan1d1df822011-01-03 02:56:39 +000093class ProcessPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010094 executor_type = futures.ProcessPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000095
Brian Quinlan81c4d362010-09-18 22:35:02 +000096
97class ExecutorShutdownTest(unittest.TestCase):
98 def test_run_after_shutdown(self):
99 self.executor.shutdown()
100 self.assertRaises(RuntimeError,
101 self.executor.submit,
102 pow, 2, 5)
103
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100104 def test_interpreter_shutdown(self):
105 # Test the atexit hook for shutdown of worker threads and processes
106 rc, out, err = assert_python_ok('-c', """if 1:
107 from concurrent.futures import {executor_type}
108 from time import sleep
109 from test.test_concurrent_futures import sleep_and_print
110 t = {executor_type}(5)
111 t.submit(sleep_and_print, 1.0, "apple")
112 """.format(executor_type=self.executor_type.__name__))
113 # Errors in atexit hooks don't change the process exit code, check
114 # stderr manually.
115 self.assertFalse(err)
116 self.assertEqual(out.strip(), b"apple")
117
Ross Lagerwall66e2fb62012-01-08 08:29:40 +0200118 def test_hang_issue12364(self):
119 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
120 self.executor.shutdown()
121 for f in fs:
122 f.result()
123
Brian Quinlan81c4d362010-09-18 22:35:02 +0000124
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000125class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000126 def _prime_executor(self):
127 pass
128
Brian Quinlan81c4d362010-09-18 22:35:02 +0000129 def test_threads_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000130 self.executor.submit(mul, 21, 2)
131 self.executor.submit(mul, 6, 7)
132 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000133 self.assertEqual(len(self.executor._threads), 3)
134 self.executor.shutdown()
135 for t in self.executor._threads:
136 t.join()
137
138 def test_context_manager_shutdown(self):
139 with futures.ThreadPoolExecutor(max_workers=5) as e:
140 executor = e
141 self.assertEqual(list(e.map(abs, range(-5, 5))),
142 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
143
144 for t in executor._threads:
145 t.join()
146
147 def test_del_shutdown(self):
148 executor = futures.ThreadPoolExecutor(max_workers=5)
149 executor.map(abs, range(-5, 5))
150 threads = executor._threads
151 del executor
152
153 for t in threads:
154 t.join()
155
Brian Quinlan1d1df822011-01-03 02:56:39 +0000156
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000157class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000158 def _prime_executor(self):
159 pass
160
Brian Quinlan81c4d362010-09-18 22:35:02 +0000161 def test_processes_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000162 self.executor.submit(mul, 21, 2)
163 self.executor.submit(mul, 6, 7)
164 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000165 self.assertEqual(len(self.executor._processes), 5)
166 processes = self.executor._processes
167 self.executor.shutdown()
168
Antoine Pitroudd696492011-06-08 17:21:55 +0200169 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000170 p.join()
171
172 def test_context_manager_shutdown(self):
173 with futures.ProcessPoolExecutor(max_workers=5) as e:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000174 processes = e._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000175 self.assertEqual(list(e.map(abs, range(-5, 5))),
176 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
177
Antoine Pitroudd696492011-06-08 17:21:55 +0200178 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000179 p.join()
180
181 def test_del_shutdown(self):
182 executor = futures.ProcessPoolExecutor(max_workers=5)
183 list(executor.map(abs, range(-5, 5)))
184 queue_management_thread = executor._queue_management_thread
185 processes = executor._processes
186 del executor
187
188 queue_management_thread.join()
Antoine Pitroudd696492011-06-08 17:21:55 +0200189 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000190 p.join()
191
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200192
Brian Quinlan81c4d362010-09-18 22:35:02 +0000193class WaitTests(unittest.TestCase):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200194
Brian Quinlan81c4d362010-09-18 22:35:02 +0000195 def test_first_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000196 future1 = self.executor.submit(mul, 21, 2)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100197 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000198
Brian Quinlan1d1df822011-01-03 02:56:39 +0000199 done, not_done = futures.wait(
200 [CANCELLED_FUTURE, future1, future2],
201 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000202
Brian Quinlan1d1df822011-01-03 02:56:39 +0000203 self.assertEqual(set([future1]), done)
204 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000205
Brian Quinlan1d1df822011-01-03 02:56:39 +0000206 def test_first_completed_some_already_completed(self):
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100207 future1 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000208
Brian Quinlan1d1df822011-01-03 02:56:39 +0000209 finished, pending = futures.wait(
210 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
211 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000212
Brian Quinlan1d1df822011-01-03 02:56:39 +0000213 self.assertEqual(
214 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
215 finished)
216 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000217
218 def test_first_exception(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000219 future1 = self.executor.submit(mul, 2, 21)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100220 future2 = self.executor.submit(sleep_and_raise, 1.5)
221 future3 = self.executor.submit(time.sleep, 3)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000222
Brian Quinlan1d1df822011-01-03 02:56:39 +0000223 finished, pending = futures.wait(
224 [future1, future2, future3],
225 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000226
Brian Quinlan1d1df822011-01-03 02:56:39 +0000227 self.assertEqual(set([future1, future2]), finished)
228 self.assertEqual(set([future3]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000229
230 def test_first_exception_some_already_complete(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000231 future1 = self.executor.submit(divmod, 21, 0)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100232 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000233
Brian Quinlan1d1df822011-01-03 02:56:39 +0000234 finished, pending = futures.wait(
235 [SUCCESSFUL_FUTURE,
236 CANCELLED_FUTURE,
237 CANCELLED_AND_NOTIFIED_FUTURE,
238 future1, future2],
239 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000240
Brian Quinlan1d1df822011-01-03 02:56:39 +0000241 self.assertEqual(set([SUCCESSFUL_FUTURE,
242 CANCELLED_AND_NOTIFIED_FUTURE,
243 future1]), finished)
244 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000245
246 def test_first_exception_one_already_failed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000247 future1 = self.executor.submit(time.sleep, 2)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000248
Brian Quinlan1d1df822011-01-03 02:56:39 +0000249 finished, pending = futures.wait(
250 [EXCEPTION_FUTURE, future1],
251 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000252
Brian Quinlan1d1df822011-01-03 02:56:39 +0000253 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
254 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000255
256 def test_all_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000257 future1 = self.executor.submit(divmod, 2, 0)
258 future2 = self.executor.submit(mul, 2, 21)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000259
Brian Quinlan1d1df822011-01-03 02:56:39 +0000260 finished, pending = futures.wait(
261 [SUCCESSFUL_FUTURE,
262 CANCELLED_AND_NOTIFIED_FUTURE,
263 EXCEPTION_FUTURE,
264 future1,
265 future2],
266 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000267
Brian Quinlan1d1df822011-01-03 02:56:39 +0000268 self.assertEqual(set([SUCCESSFUL_FUTURE,
269 CANCELLED_AND_NOTIFIED_FUTURE,
270 EXCEPTION_FUTURE,
271 future1,
272 future2]), finished)
273 self.assertEqual(set(), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000274
275 def test_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000276 future1 = self.executor.submit(mul, 6, 7)
Brian Quinlan1ae29982011-05-30 21:52:24 +1000277 future2 = self.executor.submit(time.sleep, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000278
Brian Quinlan1d1df822011-01-03 02:56:39 +0000279 finished, pending = futures.wait(
280 [CANCELLED_AND_NOTIFIED_FUTURE,
281 EXCEPTION_FUTURE,
282 SUCCESSFUL_FUTURE,
283 future1, future2],
Brian Quinlan1ae29982011-05-30 21:52:24 +1000284 timeout=5,
Brian Quinlan1d1df822011-01-03 02:56:39 +0000285 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000286
Brian Quinlan1d1df822011-01-03 02:56:39 +0000287 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
288 EXCEPTION_FUTURE,
289 SUCCESSFUL_FUTURE,
290 future1]), finished)
291 self.assertEqual(set([future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000292
293
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000294class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200295
296 def test_pending_calls_race(self):
297 # Issue #14406: multi-threaded race condition when waiting on all
298 # futures.
299 event = threading.Event()
300 def future_func():
301 event.wait()
302 oldswitchinterval = sys.getswitchinterval()
303 sys.setswitchinterval(1e-6)
304 try:
305 fs = {self.executor.submit(future_func) for i in range(100)}
306 event.set()
307 futures.wait(fs, return_when=futures.ALL_COMPLETED)
308 finally:
309 sys.setswitchinterval(oldswitchinterval)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000310
Brian Quinlan1d1df822011-01-03 02:56:39 +0000311
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000312class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
313 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000314
Brian Quinlan1d1df822011-01-03 02:56:39 +0000315
Brian Quinlan81c4d362010-09-18 22:35:02 +0000316class AsCompletedTests(unittest.TestCase):
317 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
318 def test_no_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000319 future1 = self.executor.submit(mul, 2, 21)
320 future2 = self.executor.submit(mul, 7, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000321
Brian Quinlan1d1df822011-01-03 02:56:39 +0000322 completed = set(futures.as_completed(
323 [CANCELLED_AND_NOTIFIED_FUTURE,
324 EXCEPTION_FUTURE,
325 SUCCESSFUL_FUTURE,
326 future1, future2]))
327 self.assertEqual(set(
328 [CANCELLED_AND_NOTIFIED_FUTURE,
329 EXCEPTION_FUTURE,
330 SUCCESSFUL_FUTURE,
331 future1, future2]),
332 completed)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000333
334 def test_zero_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000335 future1 = self.executor.submit(time.sleep, 2)
336 completed_futures = set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000337 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000338 for future in futures.as_completed(
339 [CANCELLED_AND_NOTIFIED_FUTURE,
340 EXCEPTION_FUTURE,
341 SUCCESSFUL_FUTURE,
342 future1],
343 timeout=0):
344 completed_futures.add(future)
345 except futures.TimeoutError:
346 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000347
Brian Quinlan1d1df822011-01-03 02:56:39 +0000348 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
349 EXCEPTION_FUTURE,
350 SUCCESSFUL_FUTURE]),
351 completed_futures)
352
Brian Quinlan81c4d362010-09-18 22:35:02 +0000353
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000354class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
355 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000356
Brian Quinlan1d1df822011-01-03 02:56:39 +0000357
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000358class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
359 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000360
Brian Quinlan1d1df822011-01-03 02:56:39 +0000361
Brian Quinlan81c4d362010-09-18 22:35:02 +0000362class ExecutorTest(unittest.TestCase):
363 # Executor.shutdown() and context manager usage is tested by
364 # ExecutorShutdownTest.
365 def test_submit(self):
366 future = self.executor.submit(pow, 2, 8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000367 self.assertEqual(256, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000368
369 def test_submit_keyword(self):
370 future = self.executor.submit(mul, 2, y=8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000371 self.assertEqual(16, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000372
373 def test_map(self):
374 self.assertEqual(
375 list(self.executor.map(pow, range(10), range(10))),
376 list(map(pow, range(10), range(10))))
377
378 def test_map_exception(self):
379 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
380 self.assertEqual(i.__next__(), (0, 1))
381 self.assertEqual(i.__next__(), (0, 1))
382 self.assertRaises(ZeroDivisionError, i.__next__)
383
384 def test_map_timeout(self):
385 results = []
Brian Quinlan81c4d362010-09-18 22:35:02 +0000386 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000387 for i in self.executor.map(time.sleep,
Brian Quinlan1ae29982011-05-30 21:52:24 +1000388 [0, 0, 6],
389 timeout=5):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000390 results.append(i)
391 except futures.TimeoutError:
392 pass
393 else:
394 self.fail('expected TimeoutError')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000395
Brian Quinlan1d1df822011-01-03 02:56:39 +0000396 self.assertEqual([None, None], results)
397
Antoine Pitrou020436b2011-07-02 21:20:25 +0200398 def test_shutdown_race_issue12456(self):
399 # Issue #12456: race condition at shutdown where trying to post a
400 # sentinel in the call queue blocks (the queue is full while processes
401 # have exited).
402 self.executor.map(str, [2] * (self.worker_count + 1))
403 self.executor.shutdown()
404
Andrew Svetlov6b973742012-11-03 15:36:01 +0200405 @test.support.cpython_only
406 def test_no_stale_references(self):
407 # Issue #16284: check that the executors don't unnecessarily hang onto
408 # references.
409 my_object = MyObject()
410 my_object_collected = threading.Event()
411 my_object_callback = weakref.ref(
412 my_object, lambda obj: my_object_collected.set())
413 # Deliberately discarding the future.
414 self.executor.submit(my_object.my_method)
415 del my_object
416
417 collected = my_object_collected.wait(timeout=5.0)
418 self.assertTrue(collected,
419 "Stale reference not collected within timeout.")
420
Brian Quinlan81c4d362010-09-18 22:35:02 +0000421
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000422class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
Brian Quinlanf0078762011-04-08 08:19:33 +1000423 def test_map_submits_without_iteration(self):
424 """Tests verifying issue 11777."""
425 finished = []
426 def record_finished(n):
427 finished.append(n)
428
429 self.executor.map(record_finished, range(10))
430 self.executor.shutdown(wait=True)
431 self.assertCountEqual(finished, range(10))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000432
Brian Quinlan1d1df822011-01-03 02:56:39 +0000433
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000434class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
Antoine Pitroudd696492011-06-08 17:21:55 +0200435 def test_killed_child(self):
436 # When a child process is abruptly terminated, the whole pool gets
437 # "broken".
438 futures = [self.executor.submit(time.sleep, 3)]
439 # Get one of the processes, and terminate (kill) it
440 p = next(iter(self.executor._processes.values()))
441 p.terminate()
442 for fut in futures:
443 self.assertRaises(BrokenProcessPool, fut.result)
444 # Submitting other jobs fails as well.
445 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000446
Brian Quinlan1d1df822011-01-03 02:56:39 +0000447
Brian Quinlan81c4d362010-09-18 22:35:02 +0000448class FutureTests(unittest.TestCase):
449 def test_done_callback_with_result(self):
450 callback_result = None
451 def fn(callback_future):
452 nonlocal callback_result
453 callback_result = callback_future.result()
454
455 f = Future()
456 f.add_done_callback(fn)
457 f.set_result(5)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000458 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000459
460 def test_done_callback_with_exception(self):
461 callback_exception = None
462 def fn(callback_future):
463 nonlocal callback_exception
464 callback_exception = callback_future.exception()
465
466 f = Future()
467 f.add_done_callback(fn)
468 f.set_exception(Exception('test'))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000469 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000470
471 def test_done_callback_with_cancel(self):
472 was_cancelled = None
473 def fn(callback_future):
474 nonlocal was_cancelled
475 was_cancelled = callback_future.cancelled()
476
477 f = Future()
478 f.add_done_callback(fn)
479 self.assertTrue(f.cancel())
480 self.assertTrue(was_cancelled)
481
482 def test_done_callback_raises(self):
Brian Quinlan251cc842010-12-28 21:14:34 +0000483 with test.support.captured_stderr() as stderr:
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000484 raising_was_called = False
485 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000486
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000487 def raising_fn(callback_future):
488 nonlocal raising_was_called
489 raising_was_called = True
490 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000491
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000492 def fn(callback_future):
493 nonlocal fn_was_called
494 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +0000495
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000496 f = Future()
497 f.add_done_callback(raising_fn)
498 f.add_done_callback(fn)
499 f.set_result(5)
500 self.assertTrue(raising_was_called)
501 self.assertTrue(fn_was_called)
Brian Quinlan251cc842010-12-28 21:14:34 +0000502 self.assertIn('Exception: doh!', stderr.getvalue())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000503
504 def test_done_callback_already_successful(self):
505 callback_result = None
506 def fn(callback_future):
507 nonlocal callback_result
508 callback_result = callback_future.result()
509
510 f = Future()
511 f.set_result(5)
512 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000513 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000514
515 def test_done_callback_already_failed(self):
516 callback_exception = None
517 def fn(callback_future):
518 nonlocal callback_exception
519 callback_exception = callback_future.exception()
520
521 f = Future()
522 f.set_exception(Exception('test'))
523 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000524 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000525
526 def test_done_callback_already_cancelled(self):
527 was_cancelled = None
528 def fn(callback_future):
529 nonlocal was_cancelled
530 was_cancelled = callback_future.cancelled()
531
532 f = Future()
533 self.assertTrue(f.cancel())
534 f.add_done_callback(fn)
535 self.assertTrue(was_cancelled)
536
537 def test_repr(self):
Ezio Melottied3a7d22010-12-01 02:32:32 +0000538 self.assertRegex(repr(PENDING_FUTURE),
539 '<Future at 0x[0-9a-f]+ state=pending>')
540 self.assertRegex(repr(RUNNING_FUTURE),
541 '<Future at 0x[0-9a-f]+ state=running>')
542 self.assertRegex(repr(CANCELLED_FUTURE),
543 '<Future at 0x[0-9a-f]+ state=cancelled>')
544 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
545 '<Future at 0x[0-9a-f]+ state=cancelled>')
546 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000547 repr(EXCEPTION_FUTURE),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200548 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
Ezio Melottied3a7d22010-12-01 02:32:32 +0000549 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000550 repr(SUCCESSFUL_FUTURE),
551 '<Future at 0x[0-9a-f]+ state=finished returned int>')
552
553
554 def test_cancel(self):
555 f1 = create_future(state=PENDING)
556 f2 = create_future(state=RUNNING)
557 f3 = create_future(state=CANCELLED)
558 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200559 f5 = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000560 f6 = create_future(state=FINISHED, result=5)
561
562 self.assertTrue(f1.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000563 self.assertEqual(f1._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000564
565 self.assertFalse(f2.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000566 self.assertEqual(f2._state, RUNNING)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000567
568 self.assertTrue(f3.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000569 self.assertEqual(f3._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000570
571 self.assertTrue(f4.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000572 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000573
574 self.assertFalse(f5.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000575 self.assertEqual(f5._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000576
577 self.assertFalse(f6.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000578 self.assertEqual(f6._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000579
580 def test_cancelled(self):
581 self.assertFalse(PENDING_FUTURE.cancelled())
582 self.assertFalse(RUNNING_FUTURE.cancelled())
583 self.assertTrue(CANCELLED_FUTURE.cancelled())
584 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
585 self.assertFalse(EXCEPTION_FUTURE.cancelled())
586 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
587
588 def test_done(self):
589 self.assertFalse(PENDING_FUTURE.done())
590 self.assertFalse(RUNNING_FUTURE.done())
591 self.assertTrue(CANCELLED_FUTURE.done())
592 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
593 self.assertTrue(EXCEPTION_FUTURE.done())
594 self.assertTrue(SUCCESSFUL_FUTURE.done())
595
596 def test_running(self):
597 self.assertFalse(PENDING_FUTURE.running())
598 self.assertTrue(RUNNING_FUTURE.running())
599 self.assertFalse(CANCELLED_FUTURE.running())
600 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
601 self.assertFalse(EXCEPTION_FUTURE.running())
602 self.assertFalse(SUCCESSFUL_FUTURE.running())
603
604 def test_result_with_timeout(self):
605 self.assertRaises(futures.TimeoutError,
606 PENDING_FUTURE.result, timeout=0)
607 self.assertRaises(futures.TimeoutError,
608 RUNNING_FUTURE.result, timeout=0)
609 self.assertRaises(futures.CancelledError,
610 CANCELLED_FUTURE.result, timeout=0)
611 self.assertRaises(futures.CancelledError,
612 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200613 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000614 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
615
616 def test_result_with_success(self):
617 # TODO(brian@sweetapp.com): This test is timing dependant.
618 def notification():
619 # Wait until the main thread is waiting for the result.
620 time.sleep(1)
621 f1.set_result(42)
622
623 f1 = create_future(state=PENDING)
624 t = threading.Thread(target=notification)
625 t.start()
626
Ezio Melottib3aedd42010-11-20 19:04:17 +0000627 self.assertEqual(f1.result(timeout=5), 42)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000628
629 def test_result_with_cancel(self):
630 # TODO(brian@sweetapp.com): This test is timing dependant.
631 def notification():
632 # Wait until the main thread is waiting for the result.
633 time.sleep(1)
634 f1.cancel()
635
636 f1 = create_future(state=PENDING)
637 t = threading.Thread(target=notification)
638 t.start()
639
640 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
641
642 def test_exception_with_timeout(self):
643 self.assertRaises(futures.TimeoutError,
644 PENDING_FUTURE.exception, timeout=0)
645 self.assertRaises(futures.TimeoutError,
646 RUNNING_FUTURE.exception, timeout=0)
647 self.assertRaises(futures.CancelledError,
648 CANCELLED_FUTURE.exception, timeout=0)
649 self.assertRaises(futures.CancelledError,
650 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
651 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200652 OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000653 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
654
655 def test_exception_with_success(self):
656 def notification():
657 # Wait until the main thread is waiting for the exception.
658 time.sleep(1)
659 with f1._condition:
660 f1._state = FINISHED
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200661 f1._exception = OSError()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000662 f1._condition.notify_all()
663
664 f1 = create_future(state=PENDING)
665 t = threading.Thread(target=notification)
666 t.start()
667
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200668 self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000669
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200670@test.support.reap_threads
Brian Quinlan81c4d362010-09-18 22:35:02 +0000671def test_main():
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200672 try:
673 test.support.run_unittest(ProcessPoolExecutorTest,
674 ThreadPoolExecutorTest,
675 ProcessPoolWaitTests,
676 ThreadPoolWaitTests,
677 ProcessPoolAsCompletedTests,
678 ThreadPoolAsCompletedTests,
679 FutureTests,
680 ProcessPoolShutdownTest,
Antoine Pitroud06a0652011-07-16 01:13:34 +0200681 ThreadPoolShutdownTest,
682 )
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200683 finally:
684 test.support.reap_children()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000685
686if __name__ == "__main__":
687 test_main()