blob: 3c963dff1db2c97a6158189f5a9655e984b9005f [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
Berker Peksagce643912015-05-06 06:33:17 +03008from test.support.script_helper import assert_python_ok
Antoine Pitrouaebac0b2011-03-24 15:47:39 +01009
Antoine Pitrou63ff4132017-11-04 11:05:49 +010010import contextlib
Ɓukasz Langa574562c2017-09-29 14:33:34 -070011import itertools
Antoine Pitrou0a2ff232017-11-09 15:33:43 +010012import logging
13from logging.handlers import QueueHandler
Guido van Rossumcfd46612014-09-02 10:39:18 -070014import os
Antoine Pitrou0a2ff232017-11-09 15:33:43 +010015import queue
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010016import sys
Brian Quinlan81c4d362010-09-18 22:35:02 +000017import threading
18import time
19import unittest
Andrew Svetlov6b973742012-11-03 15:36:01 +020020import weakref
Thomas Moreau94459fd2018-01-05 11:15:54 +010021from pickle import PicklingError
Brian Quinlan81c4d362010-09-18 22:35:02 +000022
Brian Quinlan81c4d362010-09-18 22:35:02 +000023from concurrent import futures
24from concurrent.futures._base import (
Antoine Pitrou63ff4132017-11-04 11:05:49 +010025 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
26 BrokenExecutor)
Antoine Pitroudd696492011-06-08 17:21:55 +020027from concurrent.futures.process import BrokenProcessPool
Thomas Moreaue8c368d2017-10-03 11:53:17 +020028from multiprocessing import get_context
Brian Quinlan81c4d362010-09-18 22:35:02 +000029
Brian Quinlan1d1df822011-01-03 02:56:39 +000030
Brian Quinlan81c4d362010-09-18 22:35:02 +000031def create_future(state=PENDING, exception=None, result=None):
32 f = Future()
33 f._state = state
34 f._exception = exception
35 f._result = result
36 return f
37
Brian Quinlan1d1df822011-01-03 02:56:39 +000038
Brian Quinlan81c4d362010-09-18 22:35:02 +000039PENDING_FUTURE = create_future(state=PENDING)
40RUNNING_FUTURE = create_future(state=RUNNING)
41CANCELLED_FUTURE = create_future(state=CANCELLED)
42CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +020043EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +000044SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
45
Antoine Pitrou63ff4132017-11-04 11:05:49 +010046INITIALIZER_STATUS = 'uninitialized'
47
Brian Quinlan1d1df822011-01-03 02:56:39 +000048
Brian Quinlan81c4d362010-09-18 22:35:02 +000049def mul(x, y):
50 return x * y
51
Serhiy Storchaka42a139e2019-04-01 09:16:35 +030052def capture(*args, **kwargs):
53 return args, kwargs
54
Brian Quinlan1d1df822011-01-03 02:56:39 +000055def sleep_and_raise(t):
56 time.sleep(t)
57 raise Exception('this is an exception')
Brian Quinlan81c4d362010-09-18 22:35:02 +000058
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010059def sleep_and_print(t, msg):
60 time.sleep(t)
61 print(msg)
62 sys.stdout.flush()
63
Antoine Pitrou63ff4132017-11-04 11:05:49 +010064def init(x):
65 global INITIALIZER_STATUS
66 INITIALIZER_STATUS = x
67
68def get_init_status():
69 return INITIALIZER_STATUS
70
Antoine Pitrou0a2ff232017-11-09 15:33:43 +010071def init_fail(log_queue=None):
72 if log_queue is not None:
73 logger = logging.getLogger('concurrent.futures')
74 logger.addHandler(QueueHandler(log_queue))
75 logger.setLevel('CRITICAL')
76 logger.propagate = False
Antoine Pitrou63ff4132017-11-04 11:05:49 +010077 time.sleep(0.1) # let some futures be scheduled
78 raise ValueError('error in initializer')
79
Brian Quinlan81c4d362010-09-18 22:35:02 +000080
Andrew Svetlov6b973742012-11-03 15:36:01 +020081class MyObject(object):
82 def my_method(self):
83 pass
84
85
Thomas Moreaue8c368d2017-10-03 11:53:17 +020086class EventfulGCObj():
87 def __init__(self, ctx):
88 mgr = get_context(ctx).Manager()
89 self.event = mgr.Event()
90
91 def __del__(self):
92 self.event.set()
93
94
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +020095def make_dummy_object(_):
96 return MyObject()
97
98
Victor Stinner489d91c2017-08-21 23:24:24 +020099class BaseTestCase(unittest.TestCase):
100 def setUp(self):
101 self._thread_key = test.support.threading_setup()
102
103 def tearDown(self):
104 test.support.reap_children()
105 test.support.threading_cleanup(*self._thread_key)
106
107
Brian Quinlan1d1df822011-01-03 02:56:39 +0000108class ExecutorMixin:
109 worker_count = 5
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100110 executor_kwargs = {}
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100111
112 def setUp(self):
Victor Stinner489d91c2017-08-21 23:24:24 +0200113 super().setUp()
Victor Stinner3df9dec2017-07-04 13:14:04 +0200114
Pablo Galindo3ad8dec2018-06-21 12:30:37 +0100115 self.t1 = time.monotonic()
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100116 if hasattr(self, "ctx"):
117 self.executor = self.executor_type(
118 max_workers=self.worker_count,
119 mp_context=self.get_context(),
120 **self.executor_kwargs)
121 else:
122 self.executor = self.executor_type(
123 max_workers=self.worker_count,
124 **self.executor_kwargs)
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100125 self._prime_executor()
126
127 def tearDown(self):
128 self.executor.shutdown(wait=True)
Victor Stinner3df9dec2017-07-04 13:14:04 +0200129 self.executor = None
130
Pablo Galindo3ad8dec2018-06-21 12:30:37 +0100131 dt = time.monotonic() - self.t1
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100132 if test.support.verbose:
133 print("%.2fs" % dt, end=' ')
Pablo Galindo3ad8dec2018-06-21 12:30:37 +0100134 self.assertLess(dt, 300, "synchronization issue: test lasted too long")
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100135
Victor Stinner489d91c2017-08-21 23:24:24 +0200136 super().tearDown()
Victor Stinner3df9dec2017-07-04 13:14:04 +0200137
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100138 def get_context(self):
139 return get_context(self.ctx)
140
Brian Quinlan1d1df822011-01-03 02:56:39 +0000141 def _prime_executor(self):
142 # Make sure that the executor is ready to do work before running the
143 # tests. This should reduce the probability of timeouts in the tests.
144 futures = [self.executor.submit(time.sleep, 0.1)
145 for _ in range(self.worker_count)]
Brian Quinlan1d1df822011-01-03 02:56:39 +0000146 for f in futures:
147 f.result()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000148
Brian Quinlan81c4d362010-09-18 22:35:02 +0000149
Brian Quinlan1d1df822011-01-03 02:56:39 +0000150class ThreadPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100151 executor_type = futures.ThreadPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +0000152
Brian Quinlan81c4d362010-09-18 22:35:02 +0000153
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200154class ProcessPoolForkMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100155 executor_type = futures.ProcessPoolExecutor
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200156 ctx = "fork"
157
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100158 def get_context(self):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200159 if sys.platform == "win32":
160 self.skipTest("require unix system")
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100161 return super().get_context()
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200162
163
164class ProcessPoolSpawnMixin(ExecutorMixin):
165 executor_type = futures.ProcessPoolExecutor
166 ctx = "spawn"
167
168
169class ProcessPoolForkserverMixin(ExecutorMixin):
170 executor_type = futures.ProcessPoolExecutor
171 ctx = "forkserver"
172
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100173 def get_context(self):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200174 if sys.platform == "win32":
175 self.skipTest("require unix system")
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100176 return super().get_context()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000177
Brian Quinlan81c4d362010-09-18 22:35:02 +0000178
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100179def create_executor_tests(mixin, bases=(BaseTestCase,),
180 executor_mixins=(ThreadPoolMixin,
181 ProcessPoolForkMixin,
182 ProcessPoolForkserverMixin,
183 ProcessPoolSpawnMixin)):
184 def strip_mixin(name):
185 if name.endswith(('Mixin', 'Tests')):
186 return name[:-5]
187 elif name.endswith('Test'):
188 return name[:-4]
189 else:
190 return name
191
192 for exe in executor_mixins:
193 name = ("%s%sTest"
194 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
195 cls = type(name, (mixin,) + (exe,) + bases, {})
196 globals()[name] = cls
197
198
199class InitializerMixin(ExecutorMixin):
200 worker_count = 2
201
202 def setUp(self):
203 global INITIALIZER_STATUS
204 INITIALIZER_STATUS = 'uninitialized'
205 self.executor_kwargs = dict(initializer=init,
206 initargs=('initialized',))
207 super().setUp()
208
209 def test_initializer(self):
210 futures = [self.executor.submit(get_init_status)
211 for _ in range(self.worker_count)]
212
213 for f in futures:
214 self.assertEqual(f.result(), 'initialized')
215
216
217class FailingInitializerMixin(ExecutorMixin):
218 worker_count = 2
219
220 def setUp(self):
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100221 if hasattr(self, "ctx"):
222 # Pass a queue to redirect the child's logging output
223 self.mp_context = self.get_context()
224 self.log_queue = self.mp_context.Queue()
225 self.executor_kwargs = dict(initializer=init_fail,
226 initargs=(self.log_queue,))
227 else:
228 # In a thread pool, the child shares our logging setup
229 # (see _assert_logged())
230 self.mp_context = None
231 self.log_queue = None
232 self.executor_kwargs = dict(initializer=init_fail)
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100233 super().setUp()
234
235 def test_initializer(self):
236 with self._assert_logged('ValueError: error in initializer'):
237 try:
238 future = self.executor.submit(get_init_status)
239 except BrokenExecutor:
240 # Perhaps the executor is already broken
241 pass
242 else:
243 with self.assertRaises(BrokenExecutor):
244 future.result()
245 # At some point, the executor should break
Pablo Galindo3ad8dec2018-06-21 12:30:37 +0100246 t1 = time.monotonic()
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100247 while not self.executor._broken:
Pablo Galindo3ad8dec2018-06-21 12:30:37 +0100248 if time.monotonic() - t1 > 5:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100249 self.fail("executor not broken after 5 s.")
250 time.sleep(0.01)
251 # ... and from this point submit() is guaranteed to fail
252 with self.assertRaises(BrokenExecutor):
253 self.executor.submit(get_init_status)
254
255 def _prime_executor(self):
256 pass
257
258 @contextlib.contextmanager
259 def _assert_logged(self, msg):
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100260 if self.log_queue is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100261 yield
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100262 output = []
263 try:
264 while True:
265 output.append(self.log_queue.get_nowait().getMessage())
266 except queue.Empty:
267 pass
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100268 else:
269 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
270 yield
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100271 output = cm.output
272 self.assertTrue(any(msg in line for line in output),
273 output)
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100274
275
276create_executor_tests(InitializerMixin)
277create_executor_tests(FailingInitializerMixin)
278
279
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200280class ExecutorShutdownTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000281 def test_run_after_shutdown(self):
282 self.executor.shutdown()
283 self.assertRaises(RuntimeError,
284 self.executor.submit,
285 pow, 2, 5)
286
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100287 def test_interpreter_shutdown(self):
288 # Test the atexit hook for shutdown of worker threads and processes
289 rc, out, err = assert_python_ok('-c', """if 1:
290 from concurrent.futures import {executor_type}
291 from time import sleep
292 from test.test_concurrent_futures import sleep_and_print
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200293 if __name__ == "__main__":
294 context = '{context}'
295 if context == "":
296 t = {executor_type}(5)
297 else:
298 from multiprocessing import get_context
299 context = get_context(context)
300 t = {executor_type}(5, mp_context=context)
301 t.submit(sleep_and_print, 1.0, "apple")
302 """.format(executor_type=self.executor_type.__name__,
303 context=getattr(self, "ctx", "")))
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100304 # Errors in atexit hooks don't change the process exit code, check
305 # stderr manually.
306 self.assertFalse(err)
307 self.assertEqual(out.strip(), b"apple")
308
Mark Nemecc4b695f2018-04-10 18:23:14 +0100309 def test_submit_after_interpreter_shutdown(self):
310 # Test the atexit hook for shutdown of worker threads and processes
311 rc, out, err = assert_python_ok('-c', """if 1:
312 import atexit
313 @atexit.register
314 def run_last():
315 try:
316 t.submit(id, None)
317 except RuntimeError:
318 print("runtime-error")
319 raise
320 from concurrent.futures import {executor_type}
321 if __name__ == "__main__":
322 context = '{context}'
323 if not context:
324 t = {executor_type}(5)
325 else:
326 from multiprocessing import get_context
327 context = get_context(context)
328 t = {executor_type}(5, mp_context=context)
329 t.submit(id, 42).result()
330 """.format(executor_type=self.executor_type.__name__,
331 context=getattr(self, "ctx", "")))
332 # Errors in atexit hooks don't change the process exit code, check
333 # stderr manually.
334 self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
335 self.assertEqual(out.strip(), b"runtime-error")
336
Ross Lagerwall66e2fb62012-01-08 08:29:40 +0200337 def test_hang_issue12364(self):
338 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
339 self.executor.shutdown()
340 for f in fs:
341 f.result()
342
Brian Quinlan81c4d362010-09-18 22:35:02 +0000343
Victor Stinner489d91c2017-08-21 23:24:24 +0200344class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000345 def _prime_executor(self):
346 pass
347
Brian Quinlan81c4d362010-09-18 22:35:02 +0000348 def test_threads_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000349 self.executor.submit(mul, 21, 2)
350 self.executor.submit(mul, 6, 7)
351 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000352 self.assertEqual(len(self.executor._threads), 3)
353 self.executor.shutdown()
354 for t in self.executor._threads:
355 t.join()
356
357 def test_context_manager_shutdown(self):
358 with futures.ThreadPoolExecutor(max_workers=5) as e:
359 executor = e
360 self.assertEqual(list(e.map(abs, range(-5, 5))),
361 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
362
363 for t in executor._threads:
364 t.join()
365
366 def test_del_shutdown(self):
367 executor = futures.ThreadPoolExecutor(max_workers=5)
368 executor.map(abs, range(-5, 5))
369 threads = executor._threads
370 del executor
371
372 for t in threads:
373 t.join()
374
Gregory P. Smith50abe872016-08-07 10:19:20 -0700375 def test_thread_names_assigned(self):
376 executor = futures.ThreadPoolExecutor(
377 max_workers=5, thread_name_prefix='SpecialPool')
378 executor.map(abs, range(-5, 5))
379 threads = executor._threads
380 del executor
381
382 for t in threads:
383 self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
384 t.join()
385
386 def test_thread_names_default(self):
387 executor = futures.ThreadPoolExecutor(max_workers=5)
388 executor.map(abs, range(-5, 5))
389 threads = executor._threads
390 del executor
391
392 for t in threads:
Gregory P. Smitha3d91b42017-06-21 23:41:13 -0700393 # Ensure that our default name is reasonably sane and unique when
394 # no thread_name_prefix was supplied.
395 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
Gregory P. Smith50abe872016-08-07 10:19:20 -0700396 t.join()
397
Brian Quinlan1d1df822011-01-03 02:56:39 +0000398
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200399class ProcessPoolShutdownTest(ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000400 def _prime_executor(self):
401 pass
402
Brian Quinlan81c4d362010-09-18 22:35:02 +0000403 def test_processes_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000404 self.executor.submit(mul, 21, 2)
405 self.executor.submit(mul, 6, 7)
406 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000407 self.assertEqual(len(self.executor._processes), 5)
408 processes = self.executor._processes
409 self.executor.shutdown()
410
Antoine Pitroudd696492011-06-08 17:21:55 +0200411 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000412 p.join()
413
414 def test_context_manager_shutdown(self):
415 with futures.ProcessPoolExecutor(max_workers=5) as e:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000416 processes = e._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000417 self.assertEqual(list(e.map(abs, range(-5, 5))),
418 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
419
Antoine Pitroudd696492011-06-08 17:21:55 +0200420 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000421 p.join()
422
423 def test_del_shutdown(self):
424 executor = futures.ProcessPoolExecutor(max_workers=5)
425 list(executor.map(abs, range(-5, 5)))
426 queue_management_thread = executor._queue_management_thread
427 processes = executor._processes
Victor Stinner3bcf1572017-09-12 17:05:53 -0700428 call_queue = executor._call_queue
Thomas Moreau94459fd2018-01-05 11:15:54 +0100429 queue_management_thread = executor._queue_management_thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000430 del executor
431
Leo Ariasc3d95082018-02-03 18:36:10 -0600432 # Make sure that all the executor resources were properly cleaned by
Thomas Moreau94459fd2018-01-05 11:15:54 +0100433 # the shutdown process
Brian Quinlan81c4d362010-09-18 22:35:02 +0000434 queue_management_thread.join()
Antoine Pitroudd696492011-06-08 17:21:55 +0200435 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000436 p.join()
Victor Stinner3bcf1572017-09-12 17:05:53 -0700437 call_queue.join_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000438
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200439
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100440create_executor_tests(ProcessPoolShutdownTest,
441 executor_mixins=(ProcessPoolForkMixin,
442 ProcessPoolForkserverMixin,
443 ProcessPoolSpawnMixin))
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200444
445
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200446class WaitTests:
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200447
Brian Quinlan81c4d362010-09-18 22:35:02 +0000448 def test_first_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000449 future1 = self.executor.submit(mul, 21, 2)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100450 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000451
Brian Quinlan1d1df822011-01-03 02:56:39 +0000452 done, not_done = futures.wait(
453 [CANCELLED_FUTURE, future1, future2],
454 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000455
Brian Quinlan1d1df822011-01-03 02:56:39 +0000456 self.assertEqual(set([future1]), done)
457 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000458
Brian Quinlan1d1df822011-01-03 02:56:39 +0000459 def test_first_completed_some_already_completed(self):
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100460 future1 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000461
Brian Quinlan1d1df822011-01-03 02:56:39 +0000462 finished, pending = futures.wait(
463 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
464 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000465
Brian Quinlan1d1df822011-01-03 02:56:39 +0000466 self.assertEqual(
467 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
468 finished)
469 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000470
471 def test_first_exception(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000472 future1 = self.executor.submit(mul, 2, 21)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100473 future2 = self.executor.submit(sleep_and_raise, 1.5)
474 future3 = self.executor.submit(time.sleep, 3)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000475
Brian Quinlan1d1df822011-01-03 02:56:39 +0000476 finished, pending = futures.wait(
477 [future1, future2, future3],
478 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000479
Brian Quinlan1d1df822011-01-03 02:56:39 +0000480 self.assertEqual(set([future1, future2]), finished)
481 self.assertEqual(set([future3]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000482
483 def test_first_exception_some_already_complete(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000484 future1 = self.executor.submit(divmod, 21, 0)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100485 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000486
Brian Quinlan1d1df822011-01-03 02:56:39 +0000487 finished, pending = futures.wait(
488 [SUCCESSFUL_FUTURE,
489 CANCELLED_FUTURE,
490 CANCELLED_AND_NOTIFIED_FUTURE,
491 future1, future2],
492 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000493
Brian Quinlan1d1df822011-01-03 02:56:39 +0000494 self.assertEqual(set([SUCCESSFUL_FUTURE,
495 CANCELLED_AND_NOTIFIED_FUTURE,
496 future1]), finished)
497 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000498
499 def test_first_exception_one_already_failed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000500 future1 = self.executor.submit(time.sleep, 2)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000501
Brian Quinlan1d1df822011-01-03 02:56:39 +0000502 finished, pending = futures.wait(
503 [EXCEPTION_FUTURE, future1],
504 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000505
Brian Quinlan1d1df822011-01-03 02:56:39 +0000506 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
507 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000508
509 def test_all_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000510 future1 = self.executor.submit(divmod, 2, 0)
511 future2 = self.executor.submit(mul, 2, 21)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000512
Brian Quinlan1d1df822011-01-03 02:56:39 +0000513 finished, pending = futures.wait(
514 [SUCCESSFUL_FUTURE,
515 CANCELLED_AND_NOTIFIED_FUTURE,
516 EXCEPTION_FUTURE,
517 future1,
518 future2],
519 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000520
Brian Quinlan1d1df822011-01-03 02:56:39 +0000521 self.assertEqual(set([SUCCESSFUL_FUTURE,
522 CANCELLED_AND_NOTIFIED_FUTURE,
523 EXCEPTION_FUTURE,
524 future1,
525 future2]), finished)
526 self.assertEqual(set(), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000527
528 def test_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000529 future1 = self.executor.submit(mul, 6, 7)
Brian Quinlan1ae29982011-05-30 21:52:24 +1000530 future2 = self.executor.submit(time.sleep, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000531
Brian Quinlan1d1df822011-01-03 02:56:39 +0000532 finished, pending = futures.wait(
533 [CANCELLED_AND_NOTIFIED_FUTURE,
534 EXCEPTION_FUTURE,
535 SUCCESSFUL_FUTURE,
536 future1, future2],
Brian Quinlan1ae29982011-05-30 21:52:24 +1000537 timeout=5,
Brian Quinlan1d1df822011-01-03 02:56:39 +0000538 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000539
Brian Quinlan1d1df822011-01-03 02:56:39 +0000540 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
541 EXCEPTION_FUTURE,
542 SUCCESSFUL_FUTURE,
543 future1]), finished)
544 self.assertEqual(set([future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000545
546
Victor Stinner489d91c2017-08-21 23:24:24 +0200547class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200548
549 def test_pending_calls_race(self):
550 # Issue #14406: multi-threaded race condition when waiting on all
551 # futures.
552 event = threading.Event()
553 def future_func():
554 event.wait()
555 oldswitchinterval = sys.getswitchinterval()
556 sys.setswitchinterval(1e-6)
557 try:
558 fs = {self.executor.submit(future_func) for i in range(100)}
559 event.set()
560 futures.wait(fs, return_when=futures.ALL_COMPLETED)
561 finally:
562 sys.setswitchinterval(oldswitchinterval)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000563
Brian Quinlan1d1df822011-01-03 02:56:39 +0000564
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100565create_executor_tests(WaitTests,
566 executor_mixins=(ProcessPoolForkMixin,
567 ProcessPoolForkserverMixin,
568 ProcessPoolSpawnMixin))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000569
Brian Quinlan1d1df822011-01-03 02:56:39 +0000570
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200571class AsCompletedTests:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000572 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
573 def test_no_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000574 future1 = self.executor.submit(mul, 2, 21)
575 future2 = self.executor.submit(mul, 7, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000576
Brian Quinlan1d1df822011-01-03 02:56:39 +0000577 completed = set(futures.as_completed(
578 [CANCELLED_AND_NOTIFIED_FUTURE,
579 EXCEPTION_FUTURE,
580 SUCCESSFUL_FUTURE,
581 future1, future2]))
582 self.assertEqual(set(
583 [CANCELLED_AND_NOTIFIED_FUTURE,
584 EXCEPTION_FUTURE,
585 SUCCESSFUL_FUTURE,
586 future1, future2]),
587 completed)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000588
589 def test_zero_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000590 future1 = self.executor.submit(time.sleep, 2)
591 completed_futures = set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000592 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000593 for future in futures.as_completed(
594 [CANCELLED_AND_NOTIFIED_FUTURE,
595 EXCEPTION_FUTURE,
596 SUCCESSFUL_FUTURE,
597 future1],
598 timeout=0):
599 completed_futures.add(future)
600 except futures.TimeoutError:
601 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000602
Brian Quinlan1d1df822011-01-03 02:56:39 +0000603 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
604 EXCEPTION_FUTURE,
605 SUCCESSFUL_FUTURE]),
606 completed_futures)
607
Guido van Rossume6994ff2014-01-26 09:57:51 -0800608 def test_duplicate_futures(self):
609 # Issue 20367. Duplicate futures should not raise exceptions or give
610 # duplicate responses.
Ɓukasz Langa574562c2017-09-29 14:33:34 -0700611 # Issue #31641: accept arbitrary iterables.
Guido van Rossume6994ff2014-01-26 09:57:51 -0800612 future1 = self.executor.submit(time.sleep, 2)
Ɓukasz Langa574562c2017-09-29 14:33:34 -0700613 completed = [
614 f for f in futures.as_completed(itertools.repeat(future1, 3))
615 ]
Guido van Rossume6994ff2014-01-26 09:57:51 -0800616 self.assertEqual(len(completed), 1)
617
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200618 def test_free_reference_yielded_future(self):
619 # Issue #14406: Generator should not keep references
620 # to finished futures.
621 futures_list = [Future() for _ in range(8)]
622 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
Antoine Pitrou2ef37602017-09-03 15:09:23 +0200623 futures_list.append(create_future(state=FINISHED, result=42))
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200624
625 with self.assertRaises(futures.TimeoutError):
626 for future in futures.as_completed(futures_list, timeout=0):
627 futures_list.remove(future)
628 wr = weakref.ref(future)
629 del future
630 self.assertIsNone(wr())
631
632 futures_list[0].set_result("test")
633 for future in futures.as_completed(futures_list):
634 futures_list.remove(future)
635 wr = weakref.ref(future)
636 del future
637 self.assertIsNone(wr())
638 if futures_list:
639 futures_list[0].set_result("test")
640
641 def test_correct_timeout_exception_msg(self):
642 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
643 RUNNING_FUTURE, SUCCESSFUL_FUTURE]
644
645 with self.assertRaises(futures.TimeoutError) as cm:
646 list(futures.as_completed(futures_list, timeout=0))
647
648 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
649
Brian Quinlan81c4d362010-09-18 22:35:02 +0000650
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100651create_executor_tests(AsCompletedTests)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000652
Brian Quinlan1d1df822011-01-03 02:56:39 +0000653
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200654class ExecutorTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000655 # Executor.shutdown() and context manager usage is tested by
656 # ExecutorShutdownTest.
657 def test_submit(self):
658 future = self.executor.submit(pow, 2, 8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000659 self.assertEqual(256, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000660
661 def test_submit_keyword(self):
662 future = self.executor.submit(mul, 2, y=8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000663 self.assertEqual(16, future.result())
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300664 future = self.executor.submit(capture, 1, self=2, fn=3)
665 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
666 with self.assertWarns(DeprecationWarning):
667 future = self.executor.submit(fn=capture, arg=1)
668 self.assertEqual(future.result(), ((), {'arg': 1}))
669 with self.assertRaises(TypeError):
670 self.executor.submit(arg=1)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000671
672 def test_map(self):
673 self.assertEqual(
674 list(self.executor.map(pow, range(10), range(10))),
675 list(map(pow, range(10), range(10))))
676
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200677 self.assertEqual(
678 list(self.executor.map(pow, range(10), range(10), chunksize=3)),
679 list(map(pow, range(10), range(10))))
680
Brian Quinlan81c4d362010-09-18 22:35:02 +0000681 def test_map_exception(self):
682 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
683 self.assertEqual(i.__next__(), (0, 1))
684 self.assertEqual(i.__next__(), (0, 1))
685 self.assertRaises(ZeroDivisionError, i.__next__)
686
687 def test_map_timeout(self):
688 results = []
Brian Quinlan81c4d362010-09-18 22:35:02 +0000689 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000690 for i in self.executor.map(time.sleep,
Brian Quinlan1ae29982011-05-30 21:52:24 +1000691 [0, 0, 6],
692 timeout=5):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000693 results.append(i)
694 except futures.TimeoutError:
695 pass
696 else:
697 self.fail('expected TimeoutError')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000698
Brian Quinlan1d1df822011-01-03 02:56:39 +0000699 self.assertEqual([None, None], results)
700
Antoine Pitrou020436b2011-07-02 21:20:25 +0200701 def test_shutdown_race_issue12456(self):
702 # Issue #12456: race condition at shutdown where trying to post a
703 # sentinel in the call queue blocks (the queue is full while processes
704 # have exited).
705 self.executor.map(str, [2] * (self.worker_count + 1))
706 self.executor.shutdown()
707
Andrew Svetlov6b973742012-11-03 15:36:01 +0200708 @test.support.cpython_only
709 def test_no_stale_references(self):
710 # Issue #16284: check that the executors don't unnecessarily hang onto
711 # references.
712 my_object = MyObject()
713 my_object_collected = threading.Event()
714 my_object_callback = weakref.ref(
715 my_object, lambda obj: my_object_collected.set())
716 # Deliberately discarding the future.
717 self.executor.submit(my_object.my_method)
718 del my_object
719
720 collected = my_object_collected.wait(timeout=5.0)
721 self.assertTrue(collected,
722 "Stale reference not collected within timeout.")
723
Brian Quinlan20efceb2014-05-17 13:51:10 -0700724 def test_max_workers_negative(self):
725 for number in (0, -1):
R David Murray475a4762014-06-11 16:25:05 -0400726 with self.assertRaisesRegex(ValueError,
727 "max_workers must be greater "
728 "than 0"):
Brian Quinlan20efceb2014-05-17 13:51:10 -0700729 self.executor_type(max_workers=number)
730
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200731 def test_free_reference(self):
732 # Issue #14406: Result iterator should not keep an internal
733 # reference to result objects.
734 for obj in self.executor.map(make_dummy_object, range(10)):
735 wr = weakref.ref(obj)
736 del obj
737 self.assertIsNone(wr())
738
Brian Quinlan81c4d362010-09-18 22:35:02 +0000739
Victor Stinner489d91c2017-08-21 23:24:24 +0200740class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
Brian Quinlanf0078762011-04-08 08:19:33 +1000741 def test_map_submits_without_iteration(self):
742 """Tests verifying issue 11777."""
743 finished = []
744 def record_finished(n):
745 finished.append(n)
746
747 self.executor.map(record_finished, range(10))
748 self.executor.shutdown(wait=True)
749 self.assertCountEqual(finished, range(10))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000750
Guido van Rossumcfd46612014-09-02 10:39:18 -0700751 def test_default_workers(self):
752 executor = self.executor_type()
753 self.assertEqual(executor._max_workers,
754 (os.cpu_count() or 1) * 5)
755
Brian Quinlan1d1df822011-01-03 02:56:39 +0000756
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200757class ProcessPoolExecutorTest(ExecutorTest):
Brian Quinlan39889862019-05-08 14:04:53 -0400758
759 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
760 def test_max_workers_too_large(self):
761 with self.assertRaisesRegex(ValueError,
762 "max_workers must be <= 61"):
763 futures.ProcessPoolExecutor(max_workers=62)
764
Antoine Pitroudd696492011-06-08 17:21:55 +0200765 def test_killed_child(self):
766 # When a child process is abruptly terminated, the whole pool gets
767 # "broken".
768 futures = [self.executor.submit(time.sleep, 3)]
769 # Get one of the processes, and terminate (kill) it
770 p = next(iter(self.executor._processes.values()))
771 p.terminate()
772 for fut in futures:
773 self.assertRaises(BrokenProcessPool, fut.result)
774 # Submitting other jobs fails as well.
775 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000776
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200777 def test_map_chunksize(self):
778 def bad_map():
779 list(self.executor.map(pow, range(40), range(40), chunksize=-1))
780
781 ref = list(map(pow, range(40), range(40)))
782 self.assertEqual(
783 list(self.executor.map(pow, range(40), range(40), chunksize=6)),
784 ref)
785 self.assertEqual(
786 list(self.executor.map(pow, range(40), range(40), chunksize=50)),
787 ref)
788 self.assertEqual(
789 list(self.executor.map(pow, range(40), range(40), chunksize=40)),
790 ref)
791 self.assertRaises(ValueError, bad_map)
792
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100793 @classmethod
794 def _test_traceback(cls):
795 raise RuntimeError(123) # some comment
796
797 def test_traceback(self):
798 # We want ensure that the traceback from the child process is
799 # contained in the traceback raised in the main process.
800 future = self.executor.submit(self._test_traceback)
801 with self.assertRaises(Exception) as cm:
802 future.result()
803
804 exc = cm.exception
805 self.assertIs(type(exc), RuntimeError)
806 self.assertEqual(exc.args, (123,))
807 cause = exc.__cause__
808 self.assertIs(type(cause), futures.process._RemoteTraceback)
809 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
810
811 with test.support.captured_stderr() as f1:
812 try:
813 raise exc
814 except RuntimeError:
815 sys.excepthook(*sys.exc_info())
816 self.assertIn('raise RuntimeError(123) # some comment',
817 f1.getvalue())
818
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200819 def test_ressources_gced_in_workers(self):
820 # Ensure that argument for a job are correctly gc-ed after the job
821 # is finished
822 obj = EventfulGCObj(self.ctx)
823 future = self.executor.submit(id, obj)
824 future.result()
825
826 self.assertTrue(obj.event.wait(timeout=1))
827
828
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100829create_executor_tests(ProcessPoolExecutorTest,
830 executor_mixins=(ProcessPoolForkMixin,
831 ProcessPoolForkserverMixin,
832 ProcessPoolSpawnMixin))
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200833
Thomas Moreau94459fd2018-01-05 11:15:54 +0100834def hide_process_stderr():
835 import io
836 sys.stderr = io.StringIO()
837
838
839def _crash(delay=None):
840 """Induces a segfault."""
841 if delay:
842 time.sleep(delay)
843 import faulthandler
844 faulthandler.disable()
845 faulthandler._sigsegv()
846
847
848def _exit():
849 """Induces a sys exit with exitcode 1."""
850 sys.exit(1)
851
852
853def _raise_error(Err):
854 """Function that raises an Exception in process."""
855 hide_process_stderr()
856 raise Err()
857
858
859def _return_instance(cls):
860 """Function that returns a instance of cls."""
861 hide_process_stderr()
862 return cls()
863
864
865class CrashAtPickle(object):
866 """Bad object that triggers a segfault at pickling time."""
867 def __reduce__(self):
868 _crash()
869
870
871class CrashAtUnpickle(object):
872 """Bad object that triggers a segfault at unpickling time."""
873 def __reduce__(self):
874 return _crash, ()
875
876
877class ExitAtPickle(object):
878 """Bad object that triggers a process exit at pickling time."""
879 def __reduce__(self):
880 _exit()
881
882
883class ExitAtUnpickle(object):
884 """Bad object that triggers a process exit at unpickling time."""
885 def __reduce__(self):
886 return _exit, ()
887
888
889class ErrorAtPickle(object):
890 """Bad object that triggers an error at pickling time."""
891 def __reduce__(self):
892 from pickle import PicklingError
893 raise PicklingError("Error in pickle")
894
895
896class ErrorAtUnpickle(object):
897 """Bad object that triggers an error at unpickling time."""
898 def __reduce__(self):
899 from pickle import UnpicklingError
900 return _raise_error, (UnpicklingError, )
901
902
903class ExecutorDeadlockTest:
904 TIMEOUT = 15
905
906 @classmethod
907 def _sleep_id(cls, x, delay):
908 time.sleep(delay)
909 return x
910
911 def _fail_on_deadlock(self, executor):
912 # If we did not recover before TIMEOUT seconds, consider that the
913 # executor is in a deadlock state and forcefully clean all its
914 # composants.
915 import faulthandler
916 from tempfile import TemporaryFile
917 with TemporaryFile(mode="w+") as f:
918 faulthandler.dump_traceback(file=f)
919 f.seek(0)
920 tb = f.read()
921 for p in executor._processes.values():
922 p.terminate()
923 # This should be safe to call executor.shutdown here as all possible
924 # deadlocks should have been broken.
925 executor.shutdown(wait=True)
926 print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
927 self.fail(f"Executor deadlock:\n\n{tb}")
928
929
930 def test_crash(self):
931 # extensive testing for deadlock caused by crashes in a pool.
932 self.executor.shutdown(wait=True)
933 crash_cases = [
Leo Ariasc3d95082018-02-03 18:36:10 -0600934 # Check problem occurring while pickling a task in
Thomas Moreau94459fd2018-01-05 11:15:54 +0100935 # the task_handler thread
936 (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600937 # Check problem occurring while unpickling a task on workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100938 (id, (ExitAtUnpickle(),), BrokenProcessPool,
939 "exit at task unpickle"),
940 (id, (ErrorAtUnpickle(),), BrokenProcessPool,
941 "error at task unpickle"),
942 (id, (CrashAtUnpickle(),), BrokenProcessPool,
943 "crash at task unpickle"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600944 # Check problem occurring during func execution on workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100945 (_crash, (), BrokenProcessPool,
946 "crash during func execution on worker"),
947 (_exit, (), SystemExit,
948 "exit during func execution on worker"),
949 (_raise_error, (RuntimeError, ), RuntimeError,
950 "error during func execution on worker"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600951 # Check problem occurring while pickling a task result
Thomas Moreau94459fd2018-01-05 11:15:54 +0100952 # on workers
953 (_return_instance, (CrashAtPickle,), BrokenProcessPool,
954 "crash during result pickle on worker"),
955 (_return_instance, (ExitAtPickle,), SystemExit,
956 "exit during result pickle on worker"),
957 (_return_instance, (ErrorAtPickle,), PicklingError,
958 "error during result pickle on worker"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600959 # Check problem occurring while unpickling a task in
Thomas Moreau94459fd2018-01-05 11:15:54 +0100960 # the result_handler thread
961 (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
962 "error during result unpickle in result_handler"),
963 (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
964 "exit during result unpickle in result_handler")
965 ]
966 for func, args, error, name in crash_cases:
967 with self.subTest(name):
968 # The captured_stderr reduces the noise in the test report
969 with test.support.captured_stderr():
970 executor = self.executor_type(
971 max_workers=2, mp_context=get_context(self.ctx))
972 res = executor.submit(func, *args)
973 with self.assertRaises(error):
974 try:
975 res.result(timeout=self.TIMEOUT)
976 except futures.TimeoutError:
977 # If we did not recover before TIMEOUT seconds,
978 # consider that the executor is in a deadlock state
979 self._fail_on_deadlock(executor)
980 executor.shutdown(wait=True)
981
982 def test_shutdown_deadlock(self):
983 # Test that the pool calling shutdown do not cause deadlock
984 # if a worker fails after the shutdown call.
985 self.executor.shutdown(wait=True)
986 with self.executor_type(max_workers=2,
987 mp_context=get_context(self.ctx)) as executor:
988 self.executor = executor # Allow clean up in fail_on_deadlock
989 f = executor.submit(_crash, delay=.1)
990 executor.shutdown(wait=True)
991 with self.assertRaises(BrokenProcessPool):
992 f.result()
993
994
995create_executor_tests(ExecutorDeadlockTest,
996 executor_mixins=(ProcessPoolForkMixin,
997 ProcessPoolForkserverMixin,
998 ProcessPoolSpawnMixin))
999
Brian Quinlan1d1df822011-01-03 02:56:39 +00001000
Victor Stinner489d91c2017-08-21 23:24:24 +02001001class FutureTests(BaseTestCase):
Brian Quinlan81c4d362010-09-18 22:35:02 +00001002 def test_done_callback_with_result(self):
1003 callback_result = None
1004 def fn(callback_future):
1005 nonlocal callback_result
1006 callback_result = callback_future.result()
1007
1008 f = Future()
1009 f.add_done_callback(fn)
1010 f.set_result(5)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001011 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001012
1013 def test_done_callback_with_exception(self):
1014 callback_exception = None
1015 def fn(callback_future):
1016 nonlocal callback_exception
1017 callback_exception = callback_future.exception()
1018
1019 f = Future()
1020 f.add_done_callback(fn)
1021 f.set_exception(Exception('test'))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001022 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001023
1024 def test_done_callback_with_cancel(self):
1025 was_cancelled = None
1026 def fn(callback_future):
1027 nonlocal was_cancelled
1028 was_cancelled = callback_future.cancelled()
1029
1030 f = Future()
1031 f.add_done_callback(fn)
1032 self.assertTrue(f.cancel())
1033 self.assertTrue(was_cancelled)
1034
1035 def test_done_callback_raises(self):
Brian Quinlan251cc842010-12-28 21:14:34 +00001036 with test.support.captured_stderr() as stderr:
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +00001037 raising_was_called = False
1038 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +00001039
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +00001040 def raising_fn(callback_future):
1041 nonlocal raising_was_called
1042 raising_was_called = True
1043 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +00001044
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +00001045 def fn(callback_future):
1046 nonlocal fn_was_called
1047 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +00001048
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +00001049 f = Future()
1050 f.add_done_callback(raising_fn)
1051 f.add_done_callback(fn)
1052 f.set_result(5)
1053 self.assertTrue(raising_was_called)
1054 self.assertTrue(fn_was_called)
Brian Quinlan251cc842010-12-28 21:14:34 +00001055 self.assertIn('Exception: doh!', stderr.getvalue())
Brian Quinlan81c4d362010-09-18 22:35:02 +00001056
1057 def test_done_callback_already_successful(self):
1058 callback_result = None
1059 def fn(callback_future):
1060 nonlocal callback_result
1061 callback_result = callback_future.result()
1062
1063 f = Future()
1064 f.set_result(5)
1065 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001066 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001067
1068 def test_done_callback_already_failed(self):
1069 callback_exception = None
1070 def fn(callback_future):
1071 nonlocal callback_exception
1072 callback_exception = callback_future.exception()
1073
1074 f = Future()
1075 f.set_exception(Exception('test'))
1076 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001077 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001078
1079 def test_done_callback_already_cancelled(self):
1080 was_cancelled = None
1081 def fn(callback_future):
1082 nonlocal was_cancelled
1083 was_cancelled = callback_future.cancelled()
1084
1085 f = Future()
1086 self.assertTrue(f.cancel())
1087 f.add_done_callback(fn)
1088 self.assertTrue(was_cancelled)
1089
1090 def test_repr(self):
Ezio Melottied3a7d22010-12-01 02:32:32 +00001091 self.assertRegex(repr(PENDING_FUTURE),
1092 '<Future at 0x[0-9a-f]+ state=pending>')
1093 self.assertRegex(repr(RUNNING_FUTURE),
1094 '<Future at 0x[0-9a-f]+ state=running>')
1095 self.assertRegex(repr(CANCELLED_FUTURE),
1096 '<Future at 0x[0-9a-f]+ state=cancelled>')
1097 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1098 '<Future at 0x[0-9a-f]+ state=cancelled>')
1099 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +00001100 repr(EXCEPTION_FUTURE),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001101 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
Ezio Melottied3a7d22010-12-01 02:32:32 +00001102 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +00001103 repr(SUCCESSFUL_FUTURE),
1104 '<Future at 0x[0-9a-f]+ state=finished returned int>')
1105
1106
1107 def test_cancel(self):
1108 f1 = create_future(state=PENDING)
1109 f2 = create_future(state=RUNNING)
1110 f3 = create_future(state=CANCELLED)
1111 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001112 f5 = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +00001113 f6 = create_future(state=FINISHED, result=5)
1114
1115 self.assertTrue(f1.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001116 self.assertEqual(f1._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001117
1118 self.assertFalse(f2.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001119 self.assertEqual(f2._state, RUNNING)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001120
1121 self.assertTrue(f3.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001122 self.assertEqual(f3._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001123
1124 self.assertTrue(f4.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001125 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001126
1127 self.assertFalse(f5.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001128 self.assertEqual(f5._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001129
1130 self.assertFalse(f6.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001131 self.assertEqual(f6._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001132
1133 def test_cancelled(self):
1134 self.assertFalse(PENDING_FUTURE.cancelled())
1135 self.assertFalse(RUNNING_FUTURE.cancelled())
1136 self.assertTrue(CANCELLED_FUTURE.cancelled())
1137 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1138 self.assertFalse(EXCEPTION_FUTURE.cancelled())
1139 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1140
1141 def test_done(self):
1142 self.assertFalse(PENDING_FUTURE.done())
1143 self.assertFalse(RUNNING_FUTURE.done())
1144 self.assertTrue(CANCELLED_FUTURE.done())
1145 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1146 self.assertTrue(EXCEPTION_FUTURE.done())
1147 self.assertTrue(SUCCESSFUL_FUTURE.done())
1148
1149 def test_running(self):
1150 self.assertFalse(PENDING_FUTURE.running())
1151 self.assertTrue(RUNNING_FUTURE.running())
1152 self.assertFalse(CANCELLED_FUTURE.running())
1153 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1154 self.assertFalse(EXCEPTION_FUTURE.running())
1155 self.assertFalse(SUCCESSFUL_FUTURE.running())
1156
1157 def test_result_with_timeout(self):
1158 self.assertRaises(futures.TimeoutError,
1159 PENDING_FUTURE.result, timeout=0)
1160 self.assertRaises(futures.TimeoutError,
1161 RUNNING_FUTURE.result, timeout=0)
1162 self.assertRaises(futures.CancelledError,
1163 CANCELLED_FUTURE.result, timeout=0)
1164 self.assertRaises(futures.CancelledError,
1165 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001166 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001167 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1168
1169 def test_result_with_success(self):
Martin Panter46f50722016-05-26 05:35:26 +00001170 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +00001171 def notification():
1172 # Wait until the main thread is waiting for the result.
1173 time.sleep(1)
1174 f1.set_result(42)
1175
1176 f1 = create_future(state=PENDING)
1177 t = threading.Thread(target=notification)
1178 t.start()
1179
Ezio Melottib3aedd42010-11-20 19:04:17 +00001180 self.assertEqual(f1.result(timeout=5), 42)
Victor Stinner18e95b42017-09-14 08:43:04 -07001181 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001182
1183 def test_result_with_cancel(self):
Martin Panter46f50722016-05-26 05:35:26 +00001184 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +00001185 def notification():
1186 # Wait until the main thread is waiting for the result.
1187 time.sleep(1)
1188 f1.cancel()
1189
1190 f1 = create_future(state=PENDING)
1191 t = threading.Thread(target=notification)
1192 t.start()
1193
1194 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
Victor Stinner18e95b42017-09-14 08:43:04 -07001195 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001196
1197 def test_exception_with_timeout(self):
1198 self.assertRaises(futures.TimeoutError,
1199 PENDING_FUTURE.exception, timeout=0)
1200 self.assertRaises(futures.TimeoutError,
1201 RUNNING_FUTURE.exception, timeout=0)
1202 self.assertRaises(futures.CancelledError,
1203 CANCELLED_FUTURE.exception, timeout=0)
1204 self.assertRaises(futures.CancelledError,
1205 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1206 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001207 OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +00001208 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1209
1210 def test_exception_with_success(self):
1211 def notification():
1212 # Wait until the main thread is waiting for the exception.
1213 time.sleep(1)
1214 with f1._condition:
1215 f1._state = FINISHED
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001216 f1._exception = OSError()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001217 f1._condition.notify_all()
1218
1219 f1 = create_future(state=PENDING)
1220 t = threading.Thread(target=notification)
1221 t.start()
1222
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001223 self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
Victor Stinner18e95b42017-09-14 08:43:04 -07001224 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001225
jhaydaman0a28c0d2018-05-30 02:15:06 -05001226 def test_multiple_set_result(self):
1227 f = create_future(state=PENDING)
1228 f.set_result(1)
1229
1230 with self.assertRaisesRegex(
1231 futures.InvalidStateError,
1232 'FINISHED: <Future at 0x[0-9a-f]+ '
1233 'state=finished returned int>'
1234 ):
1235 f.set_result(2)
1236
1237 self.assertTrue(f.done())
1238 self.assertEqual(f.result(), 1)
1239
1240 def test_multiple_set_exception(self):
1241 f = create_future(state=PENDING)
1242 e = ValueError()
1243 f.set_exception(e)
1244
1245 with self.assertRaisesRegex(
1246 futures.InvalidStateError,
1247 'FINISHED: <Future at 0x[0-9a-f]+ '
1248 'state=finished raised ValueError>'
1249 ):
1250 f.set_exception(Exception())
1251
1252 self.assertEqual(f.exception(), e)
1253
Antoine Pitrou63ff4132017-11-04 11:05:49 +01001254
Antoine Pitrou9470ab42011-07-15 20:25:20 +02001255@test.support.reap_threads
Brian Quinlan81c4d362010-09-18 22:35:02 +00001256def test_main():
Antoine Pitrou9470ab42011-07-15 20:25:20 +02001257 try:
Antoine Pitrou9816a1e2013-10-15 23:23:32 +02001258 test.support.run_unittest(__name__)
Antoine Pitrou9470ab42011-07-15 20:25:20 +02001259 finally:
1260 test.support.reap_children()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001261
1262if __name__ == "__main__":
1263 test_main()