blob: cdb93088a268e619fa114f16ff8f6eba08e5572e [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
7# import threading after _multiprocessing to raise a more revelant error
8# message: "No module named _multiprocessing". _multiprocessing is not compiled
9# without thread support.
10test.support.import_module('threading')
11
Berker Peksagce643912015-05-06 06:33:17 +030012from test.support.script_helper import assert_python_ok
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010013
Guido van Rossumcfd46612014-09-02 10:39:18 -070014import os
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010015import sys
Brian Quinlan81c4d362010-09-18 22:35:02 +000016import threading
17import time
18import unittest
Andrew Svetlov6b973742012-11-03 15:36:01 +020019import weakref
Brian Quinlan81c4d362010-09-18 22:35:02 +000020
Brian Quinlan81c4d362010-09-18 22:35:02 +000021from concurrent import futures
22from concurrent.futures._base import (
Brian Quinlan1d1df822011-01-03 02:56:39 +000023 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
Antoine Pitroudd696492011-06-08 17:21:55 +020024from concurrent.futures.process import BrokenProcessPool
Brian Quinlan81c4d362010-09-18 22:35:02 +000025
Brian Quinlan1d1df822011-01-03 02:56:39 +000026
Brian Quinlan81c4d362010-09-18 22:35:02 +000027def create_future(state=PENDING, exception=None, result=None):
28 f = Future()
29 f._state = state
30 f._exception = exception
31 f._result = result
32 return f
33
Brian Quinlan1d1df822011-01-03 02:56:39 +000034
Brian Quinlan81c4d362010-09-18 22:35:02 +000035PENDING_FUTURE = create_future(state=PENDING)
36RUNNING_FUTURE = create_future(state=RUNNING)
37CANCELLED_FUTURE = create_future(state=CANCELLED)
38CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +020039EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +000040SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
41
Brian Quinlan1d1df822011-01-03 02:56:39 +000042
Brian Quinlan81c4d362010-09-18 22:35:02 +000043def mul(x, y):
44 return x * y
45
Brian Quinlan81c4d362010-09-18 22:35:02 +000046
Brian Quinlan1d1df822011-01-03 02:56:39 +000047def sleep_and_raise(t):
48 time.sleep(t)
49 raise Exception('this is an exception')
Brian Quinlan81c4d362010-09-18 22:35:02 +000050
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010051def sleep_and_print(t, msg):
52 time.sleep(t)
53 print(msg)
54 sys.stdout.flush()
55
Brian Quinlan81c4d362010-09-18 22:35:02 +000056
Andrew Svetlov6b973742012-11-03 15:36:01 +020057class MyObject(object):
58 def my_method(self):
59 pass
60
61
Brian Quinlan1d1df822011-01-03 02:56:39 +000062class ExecutorMixin:
63 worker_count = 5
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010064
65 def setUp(self):
66 self.t1 = time.time()
67 try:
68 self.executor = self.executor_type(max_workers=self.worker_count)
69 except NotImplementedError as e:
70 self.skipTest(str(e))
71 self._prime_executor()
72
73 def tearDown(self):
74 self.executor.shutdown(wait=True)
75 dt = time.time() - self.t1
76 if test.support.verbose:
77 print("%.2fs" % dt, end=' ')
78 self.assertLess(dt, 60, "synchronization issue: test lasted too long")
79
Brian Quinlan1d1df822011-01-03 02:56:39 +000080 def _prime_executor(self):
81 # Make sure that the executor is ready to do work before running the
82 # tests. This should reduce the probability of timeouts in the tests.
83 futures = [self.executor.submit(time.sleep, 0.1)
84 for _ in range(self.worker_count)]
Brian Quinlan81c4d362010-09-18 22:35:02 +000085
Brian Quinlan1d1df822011-01-03 02:56:39 +000086 for f in futures:
87 f.result()
Brian Quinlan81c4d362010-09-18 22:35:02 +000088
Brian Quinlan81c4d362010-09-18 22:35:02 +000089
Brian Quinlan1d1df822011-01-03 02:56:39 +000090class ThreadPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010091 executor_type = futures.ThreadPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000092
Brian Quinlan81c4d362010-09-18 22:35:02 +000093
Brian Quinlan1d1df822011-01-03 02:56:39 +000094class ProcessPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010095 executor_type = futures.ProcessPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000096
Brian Quinlan81c4d362010-09-18 22:35:02 +000097
Antoine Pitrou9816a1e2013-10-15 23:23:32 +020098class ExecutorShutdownTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +000099 def test_run_after_shutdown(self):
100 self.executor.shutdown()
101 self.assertRaises(RuntimeError,
102 self.executor.submit,
103 pow, 2, 5)
104
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100105 def test_interpreter_shutdown(self):
106 # Test the atexit hook for shutdown of worker threads and processes
107 rc, out, err = assert_python_ok('-c', """if 1:
108 from concurrent.futures import {executor_type}
109 from time import sleep
110 from test.test_concurrent_futures import sleep_and_print
111 t = {executor_type}(5)
112 t.submit(sleep_and_print, 1.0, "apple")
113 """.format(executor_type=self.executor_type.__name__))
114 # Errors in atexit hooks don't change the process exit code, check
115 # stderr manually.
116 self.assertFalse(err)
117 self.assertEqual(out.strip(), b"apple")
118
Ross Lagerwall66e2fb62012-01-08 08:29:40 +0200119 def test_hang_issue12364(self):
120 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
121 self.executor.shutdown()
122 for f in fs:
123 f.result()
124
Brian Quinlan81c4d362010-09-18 22:35:02 +0000125
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200126class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000127 def _prime_executor(self):
128 pass
129
Brian Quinlan81c4d362010-09-18 22:35:02 +0000130 def test_threads_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000131 self.executor.submit(mul, 21, 2)
132 self.executor.submit(mul, 6, 7)
133 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000134 self.assertEqual(len(self.executor._threads), 3)
135 self.executor.shutdown()
136 for t in self.executor._threads:
137 t.join()
138
139 def test_context_manager_shutdown(self):
140 with futures.ThreadPoolExecutor(max_workers=5) as e:
141 executor = e
142 self.assertEqual(list(e.map(abs, range(-5, 5))),
143 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
144
145 for t in executor._threads:
146 t.join()
147
148 def test_del_shutdown(self):
149 executor = futures.ThreadPoolExecutor(max_workers=5)
150 executor.map(abs, range(-5, 5))
151 threads = executor._threads
152 del executor
153
154 for t in threads:
155 t.join()
156
Brian Quinlan1d1df822011-01-03 02:56:39 +0000157
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200158class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000159 def _prime_executor(self):
160 pass
161
Brian Quinlan81c4d362010-09-18 22:35:02 +0000162 def test_processes_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000163 self.executor.submit(mul, 21, 2)
164 self.executor.submit(mul, 6, 7)
165 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000166 self.assertEqual(len(self.executor._processes), 5)
167 processes = self.executor._processes
168 self.executor.shutdown()
169
Antoine Pitroudd696492011-06-08 17:21:55 +0200170 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000171 p.join()
172
173 def test_context_manager_shutdown(self):
174 with futures.ProcessPoolExecutor(max_workers=5) as e:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000175 processes = e._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000176 self.assertEqual(list(e.map(abs, range(-5, 5))),
177 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
178
Antoine Pitroudd696492011-06-08 17:21:55 +0200179 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000180 p.join()
181
182 def test_del_shutdown(self):
183 executor = futures.ProcessPoolExecutor(max_workers=5)
184 list(executor.map(abs, range(-5, 5)))
185 queue_management_thread = executor._queue_management_thread
186 processes = executor._processes
187 del executor
188
189 queue_management_thread.join()
Antoine Pitroudd696492011-06-08 17:21:55 +0200190 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000191 p.join()
192
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200193
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200194class WaitTests:
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200195
Brian Quinlan81c4d362010-09-18 22:35:02 +0000196 def test_first_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000197 future1 = self.executor.submit(mul, 21, 2)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100198 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000199
Brian Quinlan1d1df822011-01-03 02:56:39 +0000200 done, not_done = futures.wait(
201 [CANCELLED_FUTURE, future1, future2],
202 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000203
Brian Quinlan1d1df822011-01-03 02:56:39 +0000204 self.assertEqual(set([future1]), done)
205 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000206
Brian Quinlan1d1df822011-01-03 02:56:39 +0000207 def test_first_completed_some_already_completed(self):
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100208 future1 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000209
Brian Quinlan1d1df822011-01-03 02:56:39 +0000210 finished, pending = futures.wait(
211 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
212 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000213
Brian Quinlan1d1df822011-01-03 02:56:39 +0000214 self.assertEqual(
215 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
216 finished)
217 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000218
219 def test_first_exception(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000220 future1 = self.executor.submit(mul, 2, 21)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100221 future2 = self.executor.submit(sleep_and_raise, 1.5)
222 future3 = self.executor.submit(time.sleep, 3)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223
Brian Quinlan1d1df822011-01-03 02:56:39 +0000224 finished, pending = futures.wait(
225 [future1, future2, future3],
226 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000227
Brian Quinlan1d1df822011-01-03 02:56:39 +0000228 self.assertEqual(set([future1, future2]), finished)
229 self.assertEqual(set([future3]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000230
231 def test_first_exception_some_already_complete(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000232 future1 = self.executor.submit(divmod, 21, 0)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100233 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000234
Brian Quinlan1d1df822011-01-03 02:56:39 +0000235 finished, pending = futures.wait(
236 [SUCCESSFUL_FUTURE,
237 CANCELLED_FUTURE,
238 CANCELLED_AND_NOTIFIED_FUTURE,
239 future1, future2],
240 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000241
Brian Quinlan1d1df822011-01-03 02:56:39 +0000242 self.assertEqual(set([SUCCESSFUL_FUTURE,
243 CANCELLED_AND_NOTIFIED_FUTURE,
244 future1]), finished)
245 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000246
247 def test_first_exception_one_already_failed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000248 future1 = self.executor.submit(time.sleep, 2)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000249
Brian Quinlan1d1df822011-01-03 02:56:39 +0000250 finished, pending = futures.wait(
251 [EXCEPTION_FUTURE, future1],
252 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000253
Brian Quinlan1d1df822011-01-03 02:56:39 +0000254 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
255 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000256
257 def test_all_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000258 future1 = self.executor.submit(divmod, 2, 0)
259 future2 = self.executor.submit(mul, 2, 21)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000260
Brian Quinlan1d1df822011-01-03 02:56:39 +0000261 finished, pending = futures.wait(
262 [SUCCESSFUL_FUTURE,
263 CANCELLED_AND_NOTIFIED_FUTURE,
264 EXCEPTION_FUTURE,
265 future1,
266 future2],
267 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000268
Brian Quinlan1d1df822011-01-03 02:56:39 +0000269 self.assertEqual(set([SUCCESSFUL_FUTURE,
270 CANCELLED_AND_NOTIFIED_FUTURE,
271 EXCEPTION_FUTURE,
272 future1,
273 future2]), finished)
274 self.assertEqual(set(), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000275
276 def test_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000277 future1 = self.executor.submit(mul, 6, 7)
Brian Quinlan1ae29982011-05-30 21:52:24 +1000278 future2 = self.executor.submit(time.sleep, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000279
Brian Quinlan1d1df822011-01-03 02:56:39 +0000280 finished, pending = futures.wait(
281 [CANCELLED_AND_NOTIFIED_FUTURE,
282 EXCEPTION_FUTURE,
283 SUCCESSFUL_FUTURE,
284 future1, future2],
Brian Quinlan1ae29982011-05-30 21:52:24 +1000285 timeout=5,
Brian Quinlan1d1df822011-01-03 02:56:39 +0000286 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000287
Brian Quinlan1d1df822011-01-03 02:56:39 +0000288 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
289 EXCEPTION_FUTURE,
290 SUCCESSFUL_FUTURE,
291 future1]), finished)
292 self.assertEqual(set([future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000293
294
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200295class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200296
297 def test_pending_calls_race(self):
298 # Issue #14406: multi-threaded race condition when waiting on all
299 # futures.
300 event = threading.Event()
301 def future_func():
302 event.wait()
303 oldswitchinterval = sys.getswitchinterval()
304 sys.setswitchinterval(1e-6)
305 try:
306 fs = {self.executor.submit(future_func) for i in range(100)}
307 event.set()
308 futures.wait(fs, return_when=futures.ALL_COMPLETED)
309 finally:
310 sys.setswitchinterval(oldswitchinterval)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000311
Brian Quinlan1d1df822011-01-03 02:56:39 +0000312
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200313class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000314 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000315
Brian Quinlan1d1df822011-01-03 02:56:39 +0000316
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200317class AsCompletedTests:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000318 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
319 def test_no_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000320 future1 = self.executor.submit(mul, 2, 21)
321 future2 = self.executor.submit(mul, 7, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000322
Brian Quinlan1d1df822011-01-03 02:56:39 +0000323 completed = set(futures.as_completed(
324 [CANCELLED_AND_NOTIFIED_FUTURE,
325 EXCEPTION_FUTURE,
326 SUCCESSFUL_FUTURE,
327 future1, future2]))
328 self.assertEqual(set(
329 [CANCELLED_AND_NOTIFIED_FUTURE,
330 EXCEPTION_FUTURE,
331 SUCCESSFUL_FUTURE,
332 future1, future2]),
333 completed)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000334
335 def test_zero_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000336 future1 = self.executor.submit(time.sleep, 2)
337 completed_futures = set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000338 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000339 for future in futures.as_completed(
340 [CANCELLED_AND_NOTIFIED_FUTURE,
341 EXCEPTION_FUTURE,
342 SUCCESSFUL_FUTURE,
343 future1],
344 timeout=0):
345 completed_futures.add(future)
346 except futures.TimeoutError:
347 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000348
Brian Quinlan1d1df822011-01-03 02:56:39 +0000349 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
350 EXCEPTION_FUTURE,
351 SUCCESSFUL_FUTURE]),
352 completed_futures)
353
Guido van Rossume6994ff2014-01-26 09:57:51 -0800354 def test_duplicate_futures(self):
355 # Issue 20367. Duplicate futures should not raise exceptions or give
356 # duplicate responses.
357 future1 = self.executor.submit(time.sleep, 2)
358 completed = [f for f in futures.as_completed([future1,future1])]
359 self.assertEqual(len(completed), 1)
360
Brian Quinlan81c4d362010-09-18 22:35:02 +0000361
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200362class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000363 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000364
Brian Quinlan1d1df822011-01-03 02:56:39 +0000365
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200366class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000367 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000368
Brian Quinlan1d1df822011-01-03 02:56:39 +0000369
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200370class ExecutorTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000371 # Executor.shutdown() and context manager usage is tested by
372 # ExecutorShutdownTest.
373 def test_submit(self):
374 future = self.executor.submit(pow, 2, 8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000375 self.assertEqual(256, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000376
377 def test_submit_keyword(self):
378 future = self.executor.submit(mul, 2, y=8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000379 self.assertEqual(16, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000380
381 def test_map(self):
382 self.assertEqual(
383 list(self.executor.map(pow, range(10), range(10))),
384 list(map(pow, range(10), range(10))))
385
386 def test_map_exception(self):
387 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
388 self.assertEqual(i.__next__(), (0, 1))
389 self.assertEqual(i.__next__(), (0, 1))
390 self.assertRaises(ZeroDivisionError, i.__next__)
391
392 def test_map_timeout(self):
393 results = []
Brian Quinlan81c4d362010-09-18 22:35:02 +0000394 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000395 for i in self.executor.map(time.sleep,
Brian Quinlan1ae29982011-05-30 21:52:24 +1000396 [0, 0, 6],
397 timeout=5):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000398 results.append(i)
399 except futures.TimeoutError:
400 pass
401 else:
402 self.fail('expected TimeoutError')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000403
Brian Quinlan1d1df822011-01-03 02:56:39 +0000404 self.assertEqual([None, None], results)
405
Antoine Pitrou020436b2011-07-02 21:20:25 +0200406 def test_shutdown_race_issue12456(self):
407 # Issue #12456: race condition at shutdown where trying to post a
408 # sentinel in the call queue blocks (the queue is full while processes
409 # have exited).
410 self.executor.map(str, [2] * (self.worker_count + 1))
411 self.executor.shutdown()
412
Andrew Svetlov6b973742012-11-03 15:36:01 +0200413 @test.support.cpython_only
414 def test_no_stale_references(self):
415 # Issue #16284: check that the executors don't unnecessarily hang onto
416 # references.
417 my_object = MyObject()
418 my_object_collected = threading.Event()
419 my_object_callback = weakref.ref(
420 my_object, lambda obj: my_object_collected.set())
421 # Deliberately discarding the future.
422 self.executor.submit(my_object.my_method)
423 del my_object
424
425 collected = my_object_collected.wait(timeout=5.0)
426 self.assertTrue(collected,
427 "Stale reference not collected within timeout.")
428
Brian Quinlan20efceb2014-05-17 13:51:10 -0700429 def test_max_workers_negative(self):
430 for number in (0, -1):
R David Murray475a4762014-06-11 16:25:05 -0400431 with self.assertRaisesRegex(ValueError,
432 "max_workers must be greater "
433 "than 0"):
Brian Quinlan20efceb2014-05-17 13:51:10 -0700434 self.executor_type(max_workers=number)
435
Brian Quinlan81c4d362010-09-18 22:35:02 +0000436
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200437class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
Brian Quinlanf0078762011-04-08 08:19:33 +1000438 def test_map_submits_without_iteration(self):
439 """Tests verifying issue 11777."""
440 finished = []
441 def record_finished(n):
442 finished.append(n)
443
444 self.executor.map(record_finished, range(10))
445 self.executor.shutdown(wait=True)
446 self.assertCountEqual(finished, range(10))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000447
Guido van Rossumcfd46612014-09-02 10:39:18 -0700448 def test_default_workers(self):
449 executor = self.executor_type()
450 self.assertEqual(executor._max_workers,
451 (os.cpu_count() or 1) * 5)
452
Brian Quinlan1d1df822011-01-03 02:56:39 +0000453
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200454class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
Antoine Pitroudd696492011-06-08 17:21:55 +0200455 def test_killed_child(self):
456 # When a child process is abruptly terminated, the whole pool gets
457 # "broken".
458 futures = [self.executor.submit(time.sleep, 3)]
459 # Get one of the processes, and terminate (kill) it
460 p = next(iter(self.executor._processes.values()))
461 p.terminate()
462 for fut in futures:
463 self.assertRaises(BrokenProcessPool, fut.result)
464 # Submitting other jobs fails as well.
465 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000466
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200467 def test_map_chunksize(self):
468 def bad_map():
469 list(self.executor.map(pow, range(40), range(40), chunksize=-1))
470
471 ref = list(map(pow, range(40), range(40)))
472 self.assertEqual(
473 list(self.executor.map(pow, range(40), range(40), chunksize=6)),
474 ref)
475 self.assertEqual(
476 list(self.executor.map(pow, range(40), range(40), chunksize=50)),
477 ref)
478 self.assertEqual(
479 list(self.executor.map(pow, range(40), range(40), chunksize=40)),
480 ref)
481 self.assertRaises(ValueError, bad_map)
482
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100483 @classmethod
484 def _test_traceback(cls):
485 raise RuntimeError(123) # some comment
486
487 def test_traceback(self):
488 # We want ensure that the traceback from the child process is
489 # contained in the traceback raised in the main process.
490 future = self.executor.submit(self._test_traceback)
491 with self.assertRaises(Exception) as cm:
492 future.result()
493
494 exc = cm.exception
495 self.assertIs(type(exc), RuntimeError)
496 self.assertEqual(exc.args, (123,))
497 cause = exc.__cause__
498 self.assertIs(type(cause), futures.process._RemoteTraceback)
499 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
500
501 with test.support.captured_stderr() as f1:
502 try:
503 raise exc
504 except RuntimeError:
505 sys.excepthook(*sys.exc_info())
506 self.assertIn('raise RuntimeError(123) # some comment',
507 f1.getvalue())
508
Brian Quinlan1d1df822011-01-03 02:56:39 +0000509
Brian Quinlan81c4d362010-09-18 22:35:02 +0000510class FutureTests(unittest.TestCase):
511 def test_done_callback_with_result(self):
512 callback_result = None
513 def fn(callback_future):
514 nonlocal callback_result
515 callback_result = callback_future.result()
516
517 f = Future()
518 f.add_done_callback(fn)
519 f.set_result(5)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000520 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000521
522 def test_done_callback_with_exception(self):
523 callback_exception = None
524 def fn(callback_future):
525 nonlocal callback_exception
526 callback_exception = callback_future.exception()
527
528 f = Future()
529 f.add_done_callback(fn)
530 f.set_exception(Exception('test'))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000531 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000532
533 def test_done_callback_with_cancel(self):
534 was_cancelled = None
535 def fn(callback_future):
536 nonlocal was_cancelled
537 was_cancelled = callback_future.cancelled()
538
539 f = Future()
540 f.add_done_callback(fn)
541 self.assertTrue(f.cancel())
542 self.assertTrue(was_cancelled)
543
544 def test_done_callback_raises(self):
Brian Quinlan251cc842010-12-28 21:14:34 +0000545 with test.support.captured_stderr() as stderr:
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000546 raising_was_called = False
547 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000548
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000549 def raising_fn(callback_future):
550 nonlocal raising_was_called
551 raising_was_called = True
552 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000553
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000554 def fn(callback_future):
555 nonlocal fn_was_called
556 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +0000557
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000558 f = Future()
559 f.add_done_callback(raising_fn)
560 f.add_done_callback(fn)
561 f.set_result(5)
562 self.assertTrue(raising_was_called)
563 self.assertTrue(fn_was_called)
Brian Quinlan251cc842010-12-28 21:14:34 +0000564 self.assertIn('Exception: doh!', stderr.getvalue())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000565
566 def test_done_callback_already_successful(self):
567 callback_result = None
568 def fn(callback_future):
569 nonlocal callback_result
570 callback_result = callback_future.result()
571
572 f = Future()
573 f.set_result(5)
574 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000575 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000576
577 def test_done_callback_already_failed(self):
578 callback_exception = None
579 def fn(callback_future):
580 nonlocal callback_exception
581 callback_exception = callback_future.exception()
582
583 f = Future()
584 f.set_exception(Exception('test'))
585 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000586 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000587
588 def test_done_callback_already_cancelled(self):
589 was_cancelled = None
590 def fn(callback_future):
591 nonlocal was_cancelled
592 was_cancelled = callback_future.cancelled()
593
594 f = Future()
595 self.assertTrue(f.cancel())
596 f.add_done_callback(fn)
597 self.assertTrue(was_cancelled)
598
599 def test_repr(self):
Ezio Melottied3a7d22010-12-01 02:32:32 +0000600 self.assertRegex(repr(PENDING_FUTURE),
601 '<Future at 0x[0-9a-f]+ state=pending>')
602 self.assertRegex(repr(RUNNING_FUTURE),
603 '<Future at 0x[0-9a-f]+ state=running>')
604 self.assertRegex(repr(CANCELLED_FUTURE),
605 '<Future at 0x[0-9a-f]+ state=cancelled>')
606 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
607 '<Future at 0x[0-9a-f]+ state=cancelled>')
608 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000609 repr(EXCEPTION_FUTURE),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200610 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
Ezio Melottied3a7d22010-12-01 02:32:32 +0000611 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000612 repr(SUCCESSFUL_FUTURE),
613 '<Future at 0x[0-9a-f]+ state=finished returned int>')
614
615
616 def test_cancel(self):
617 f1 = create_future(state=PENDING)
618 f2 = create_future(state=RUNNING)
619 f3 = create_future(state=CANCELLED)
620 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200621 f5 = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000622 f6 = create_future(state=FINISHED, result=5)
623
624 self.assertTrue(f1.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000625 self.assertEqual(f1._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000626
627 self.assertFalse(f2.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000628 self.assertEqual(f2._state, RUNNING)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000629
630 self.assertTrue(f3.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000631 self.assertEqual(f3._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000632
633 self.assertTrue(f4.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000634 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000635
636 self.assertFalse(f5.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000637 self.assertEqual(f5._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000638
639 self.assertFalse(f6.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000640 self.assertEqual(f6._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000641
642 def test_cancelled(self):
643 self.assertFalse(PENDING_FUTURE.cancelled())
644 self.assertFalse(RUNNING_FUTURE.cancelled())
645 self.assertTrue(CANCELLED_FUTURE.cancelled())
646 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
647 self.assertFalse(EXCEPTION_FUTURE.cancelled())
648 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
649
650 def test_done(self):
651 self.assertFalse(PENDING_FUTURE.done())
652 self.assertFalse(RUNNING_FUTURE.done())
653 self.assertTrue(CANCELLED_FUTURE.done())
654 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
655 self.assertTrue(EXCEPTION_FUTURE.done())
656 self.assertTrue(SUCCESSFUL_FUTURE.done())
657
658 def test_running(self):
659 self.assertFalse(PENDING_FUTURE.running())
660 self.assertTrue(RUNNING_FUTURE.running())
661 self.assertFalse(CANCELLED_FUTURE.running())
662 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
663 self.assertFalse(EXCEPTION_FUTURE.running())
664 self.assertFalse(SUCCESSFUL_FUTURE.running())
665
666 def test_result_with_timeout(self):
667 self.assertRaises(futures.TimeoutError,
668 PENDING_FUTURE.result, timeout=0)
669 self.assertRaises(futures.TimeoutError,
670 RUNNING_FUTURE.result, timeout=0)
671 self.assertRaises(futures.CancelledError,
672 CANCELLED_FUTURE.result, timeout=0)
673 self.assertRaises(futures.CancelledError,
674 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200675 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000676 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
677
678 def test_result_with_success(self):
Martin Panter46f50722016-05-26 05:35:26 +0000679 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000680 def notification():
681 # Wait until the main thread is waiting for the result.
682 time.sleep(1)
683 f1.set_result(42)
684
685 f1 = create_future(state=PENDING)
686 t = threading.Thread(target=notification)
687 t.start()
688
Ezio Melottib3aedd42010-11-20 19:04:17 +0000689 self.assertEqual(f1.result(timeout=5), 42)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000690
691 def test_result_with_cancel(self):
Martin Panter46f50722016-05-26 05:35:26 +0000692 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000693 def notification():
694 # Wait until the main thread is waiting for the result.
695 time.sleep(1)
696 f1.cancel()
697
698 f1 = create_future(state=PENDING)
699 t = threading.Thread(target=notification)
700 t.start()
701
702 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
703
704 def test_exception_with_timeout(self):
705 self.assertRaises(futures.TimeoutError,
706 PENDING_FUTURE.exception, timeout=0)
707 self.assertRaises(futures.TimeoutError,
708 RUNNING_FUTURE.exception, timeout=0)
709 self.assertRaises(futures.CancelledError,
710 CANCELLED_FUTURE.exception, timeout=0)
711 self.assertRaises(futures.CancelledError,
712 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
713 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200714 OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000715 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
716
717 def test_exception_with_success(self):
718 def notification():
719 # Wait until the main thread is waiting for the exception.
720 time.sleep(1)
721 with f1._condition:
722 f1._state = FINISHED
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200723 f1._exception = OSError()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000724 f1._condition.notify_all()
725
726 f1 = create_future(state=PENDING)
727 t = threading.Thread(target=notification)
728 t.start()
729
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200730 self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000731
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200732@test.support.reap_threads
Brian Quinlan81c4d362010-09-18 22:35:02 +0000733def test_main():
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200734 try:
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200735 test.support.run_unittest(__name__)
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200736 finally:
737 test.support.reap_children()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000738
739if __name__ == "__main__":
740 test_main()