blob: 18d0265f3f61a63ffdb39f30237972362648e0a3 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
Berker Peksagce643912015-05-06 06:33:17 +03008from test.support.script_helper import assert_python_ok
Antoine Pitrouaebac0b2011-03-24 15:47:39 +01009
Antoine Pitrou63ff4132017-11-04 11:05:49 +010010import contextlib
Ɓukasz Langa574562c2017-09-29 14:33:34 -070011import itertools
Antoine Pitrou0a2ff232017-11-09 15:33:43 +010012import logging
13from logging.handlers import QueueHandler
Guido van Rossumcfd46612014-09-02 10:39:18 -070014import os
Antoine Pitrou0a2ff232017-11-09 15:33:43 +010015import queue
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010016import sys
Brian Quinlan81c4d362010-09-18 22:35:02 +000017import threading
18import time
19import unittest
Andrew Svetlov6b973742012-11-03 15:36:01 +020020import weakref
Thomas Moreau94459fd2018-01-05 11:15:54 +010021from pickle import PicklingError
Brian Quinlan81c4d362010-09-18 22:35:02 +000022
Brian Quinlan81c4d362010-09-18 22:35:02 +000023from concurrent import futures
24from concurrent.futures._base import (
Antoine Pitrou63ff4132017-11-04 11:05:49 +010025 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
26 BrokenExecutor)
Antoine Pitroudd696492011-06-08 17:21:55 +020027from concurrent.futures.process import BrokenProcessPool
Thomas Moreaue8c368d2017-10-03 11:53:17 +020028from multiprocessing import get_context
Brian Quinlan81c4d362010-09-18 22:35:02 +000029
Brian Quinlan1d1df822011-01-03 02:56:39 +000030
Brian Quinlan81c4d362010-09-18 22:35:02 +000031def create_future(state=PENDING, exception=None, result=None):
32 f = Future()
33 f._state = state
34 f._exception = exception
35 f._result = result
36 return f
37
Brian Quinlan1d1df822011-01-03 02:56:39 +000038
Brian Quinlan81c4d362010-09-18 22:35:02 +000039PENDING_FUTURE = create_future(state=PENDING)
40RUNNING_FUTURE = create_future(state=RUNNING)
41CANCELLED_FUTURE = create_future(state=CANCELLED)
42CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +020043EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +000044SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
45
Antoine Pitrou63ff4132017-11-04 11:05:49 +010046INITIALIZER_STATUS = 'uninitialized'
47
Brian Quinlan1d1df822011-01-03 02:56:39 +000048
Brian Quinlan81c4d362010-09-18 22:35:02 +000049def mul(x, y):
50 return x * y
51
Brian Quinlan1d1df822011-01-03 02:56:39 +000052def sleep_and_raise(t):
53 time.sleep(t)
54 raise Exception('this is an exception')
Brian Quinlan81c4d362010-09-18 22:35:02 +000055
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010056def sleep_and_print(t, msg):
57 time.sleep(t)
58 print(msg)
59 sys.stdout.flush()
60
Antoine Pitrou63ff4132017-11-04 11:05:49 +010061def init(x):
62 global INITIALIZER_STATUS
63 INITIALIZER_STATUS = x
64
65def get_init_status():
66 return INITIALIZER_STATUS
67
Antoine Pitrou0a2ff232017-11-09 15:33:43 +010068def init_fail(log_queue=None):
69 if log_queue is not None:
70 logger = logging.getLogger('concurrent.futures')
71 logger.addHandler(QueueHandler(log_queue))
72 logger.setLevel('CRITICAL')
73 logger.propagate = False
Antoine Pitrou63ff4132017-11-04 11:05:49 +010074 time.sleep(0.1) # let some futures be scheduled
75 raise ValueError('error in initializer')
76
Brian Quinlan81c4d362010-09-18 22:35:02 +000077
Andrew Svetlov6b973742012-11-03 15:36:01 +020078class MyObject(object):
79 def my_method(self):
80 pass
81
82
Thomas Moreaue8c368d2017-10-03 11:53:17 +020083class EventfulGCObj():
84 def __init__(self, ctx):
85 mgr = get_context(ctx).Manager()
86 self.event = mgr.Event()
87
88 def __del__(self):
89 self.event.set()
90
91
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +020092def make_dummy_object(_):
93 return MyObject()
94
95
Victor Stinner489d91c2017-08-21 23:24:24 +020096class BaseTestCase(unittest.TestCase):
97 def setUp(self):
98 self._thread_key = test.support.threading_setup()
99
100 def tearDown(self):
101 test.support.reap_children()
102 test.support.threading_cleanup(*self._thread_key)
103
104
Brian Quinlan1d1df822011-01-03 02:56:39 +0000105class ExecutorMixin:
106 worker_count = 5
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100107 executor_kwargs = {}
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100108
109 def setUp(self):
Victor Stinner489d91c2017-08-21 23:24:24 +0200110 super().setUp()
Victor Stinner3df9dec2017-07-04 13:14:04 +0200111
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100112 self.t1 = time.time()
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100113 if hasattr(self, "ctx"):
114 self.executor = self.executor_type(
115 max_workers=self.worker_count,
116 mp_context=self.get_context(),
117 **self.executor_kwargs)
118 else:
119 self.executor = self.executor_type(
120 max_workers=self.worker_count,
121 **self.executor_kwargs)
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100122 self._prime_executor()
123
124 def tearDown(self):
125 self.executor.shutdown(wait=True)
Victor Stinner3df9dec2017-07-04 13:14:04 +0200126 self.executor = None
127
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100128 dt = time.time() - self.t1
129 if test.support.verbose:
130 print("%.2fs" % dt, end=' ')
131 self.assertLess(dt, 60, "synchronization issue: test lasted too long")
132
Victor Stinner489d91c2017-08-21 23:24:24 +0200133 super().tearDown()
Victor Stinner3df9dec2017-07-04 13:14:04 +0200134
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100135 def get_context(self):
136 return get_context(self.ctx)
137
Brian Quinlan1d1df822011-01-03 02:56:39 +0000138 def _prime_executor(self):
139 # Make sure that the executor is ready to do work before running the
140 # tests. This should reduce the probability of timeouts in the tests.
141 futures = [self.executor.submit(time.sleep, 0.1)
142 for _ in range(self.worker_count)]
Brian Quinlan1d1df822011-01-03 02:56:39 +0000143 for f in futures:
144 f.result()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000145
Brian Quinlan81c4d362010-09-18 22:35:02 +0000146
Brian Quinlan1d1df822011-01-03 02:56:39 +0000147class ThreadPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100148 executor_type = futures.ThreadPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +0000149
Brian Quinlan81c4d362010-09-18 22:35:02 +0000150
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200151class ProcessPoolForkMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100152 executor_type = futures.ProcessPoolExecutor
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200153 ctx = "fork"
154
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100155 def get_context(self):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200156 if sys.platform == "win32":
157 self.skipTest("require unix system")
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100158 return super().get_context()
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200159
160
161class ProcessPoolSpawnMixin(ExecutorMixin):
162 executor_type = futures.ProcessPoolExecutor
163 ctx = "spawn"
164
165
166class ProcessPoolForkserverMixin(ExecutorMixin):
167 executor_type = futures.ProcessPoolExecutor
168 ctx = "forkserver"
169
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100170 def get_context(self):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200171 if sys.platform == "win32":
172 self.skipTest("require unix system")
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100173 return super().get_context()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000174
Brian Quinlan81c4d362010-09-18 22:35:02 +0000175
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100176def create_executor_tests(mixin, bases=(BaseTestCase,),
177 executor_mixins=(ThreadPoolMixin,
178 ProcessPoolForkMixin,
179 ProcessPoolForkserverMixin,
180 ProcessPoolSpawnMixin)):
181 def strip_mixin(name):
182 if name.endswith(('Mixin', 'Tests')):
183 return name[:-5]
184 elif name.endswith('Test'):
185 return name[:-4]
186 else:
187 return name
188
189 for exe in executor_mixins:
190 name = ("%s%sTest"
191 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
192 cls = type(name, (mixin,) + (exe,) + bases, {})
193 globals()[name] = cls
194
195
196class InitializerMixin(ExecutorMixin):
197 worker_count = 2
198
199 def setUp(self):
200 global INITIALIZER_STATUS
201 INITIALIZER_STATUS = 'uninitialized'
202 self.executor_kwargs = dict(initializer=init,
203 initargs=('initialized',))
204 super().setUp()
205
206 def test_initializer(self):
207 futures = [self.executor.submit(get_init_status)
208 for _ in range(self.worker_count)]
209
210 for f in futures:
211 self.assertEqual(f.result(), 'initialized')
212
213
214class FailingInitializerMixin(ExecutorMixin):
215 worker_count = 2
216
217 def setUp(self):
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100218 if hasattr(self, "ctx"):
219 # Pass a queue to redirect the child's logging output
220 self.mp_context = self.get_context()
221 self.log_queue = self.mp_context.Queue()
222 self.executor_kwargs = dict(initializer=init_fail,
223 initargs=(self.log_queue,))
224 else:
225 # In a thread pool, the child shares our logging setup
226 # (see _assert_logged())
227 self.mp_context = None
228 self.log_queue = None
229 self.executor_kwargs = dict(initializer=init_fail)
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100230 super().setUp()
231
232 def test_initializer(self):
233 with self._assert_logged('ValueError: error in initializer'):
234 try:
235 future = self.executor.submit(get_init_status)
236 except BrokenExecutor:
237 # Perhaps the executor is already broken
238 pass
239 else:
240 with self.assertRaises(BrokenExecutor):
241 future.result()
242 # At some point, the executor should break
243 t1 = time.time()
244 while not self.executor._broken:
245 if time.time() - t1 > 5:
246 self.fail("executor not broken after 5 s.")
247 time.sleep(0.01)
248 # ... and from this point submit() is guaranteed to fail
249 with self.assertRaises(BrokenExecutor):
250 self.executor.submit(get_init_status)
251
252 def _prime_executor(self):
253 pass
254
255 @contextlib.contextmanager
256 def _assert_logged(self, msg):
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100257 if self.log_queue is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100258 yield
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100259 output = []
260 try:
261 while True:
262 output.append(self.log_queue.get_nowait().getMessage())
263 except queue.Empty:
264 pass
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100265 else:
266 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
267 yield
Antoine Pitrou0a2ff232017-11-09 15:33:43 +0100268 output = cm.output
269 self.assertTrue(any(msg in line for line in output),
270 output)
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100271
272
273create_executor_tests(InitializerMixin)
274create_executor_tests(FailingInitializerMixin)
275
276
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200277class ExecutorShutdownTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000278 def test_run_after_shutdown(self):
279 self.executor.shutdown()
280 self.assertRaises(RuntimeError,
281 self.executor.submit,
282 pow, 2, 5)
283
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100284 def test_interpreter_shutdown(self):
285 # Test the atexit hook for shutdown of worker threads and processes
286 rc, out, err = assert_python_ok('-c', """if 1:
287 from concurrent.futures import {executor_type}
288 from time import sleep
289 from test.test_concurrent_futures import sleep_and_print
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200290 if __name__ == "__main__":
291 context = '{context}'
292 if context == "":
293 t = {executor_type}(5)
294 else:
295 from multiprocessing import get_context
296 context = get_context(context)
297 t = {executor_type}(5, mp_context=context)
298 t.submit(sleep_and_print, 1.0, "apple")
299 """.format(executor_type=self.executor_type.__name__,
300 context=getattr(self, "ctx", "")))
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100301 # Errors in atexit hooks don't change the process exit code, check
302 # stderr manually.
303 self.assertFalse(err)
304 self.assertEqual(out.strip(), b"apple")
305
Ross Lagerwall66e2fb62012-01-08 08:29:40 +0200306 def test_hang_issue12364(self):
307 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
308 self.executor.shutdown()
309 for f in fs:
310 f.result()
311
Brian Quinlan81c4d362010-09-18 22:35:02 +0000312
Victor Stinner489d91c2017-08-21 23:24:24 +0200313class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000314 def _prime_executor(self):
315 pass
316
Brian Quinlan81c4d362010-09-18 22:35:02 +0000317 def test_threads_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000318 self.executor.submit(mul, 21, 2)
319 self.executor.submit(mul, 6, 7)
320 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000321 self.assertEqual(len(self.executor._threads), 3)
322 self.executor.shutdown()
323 for t in self.executor._threads:
324 t.join()
325
326 def test_context_manager_shutdown(self):
327 with futures.ThreadPoolExecutor(max_workers=5) as e:
328 executor = e
329 self.assertEqual(list(e.map(abs, range(-5, 5))),
330 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
331
332 for t in executor._threads:
333 t.join()
334
335 def test_del_shutdown(self):
336 executor = futures.ThreadPoolExecutor(max_workers=5)
337 executor.map(abs, range(-5, 5))
338 threads = executor._threads
339 del executor
340
341 for t in threads:
342 t.join()
343
Gregory P. Smith50abe872016-08-07 10:19:20 -0700344 def test_thread_names_assigned(self):
345 executor = futures.ThreadPoolExecutor(
346 max_workers=5, thread_name_prefix='SpecialPool')
347 executor.map(abs, range(-5, 5))
348 threads = executor._threads
349 del executor
350
351 for t in threads:
352 self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
353 t.join()
354
355 def test_thread_names_default(self):
356 executor = futures.ThreadPoolExecutor(max_workers=5)
357 executor.map(abs, range(-5, 5))
358 threads = executor._threads
359 del executor
360
361 for t in threads:
Gregory P. Smitha3d91b42017-06-21 23:41:13 -0700362 # Ensure that our default name is reasonably sane and unique when
363 # no thread_name_prefix was supplied.
364 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
Gregory P. Smith50abe872016-08-07 10:19:20 -0700365 t.join()
366
Brian Quinlan1d1df822011-01-03 02:56:39 +0000367
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200368class ProcessPoolShutdownTest(ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000369 def _prime_executor(self):
370 pass
371
Brian Quinlan81c4d362010-09-18 22:35:02 +0000372 def test_processes_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000373 self.executor.submit(mul, 21, 2)
374 self.executor.submit(mul, 6, 7)
375 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000376 self.assertEqual(len(self.executor._processes), 5)
377 processes = self.executor._processes
378 self.executor.shutdown()
379
Antoine Pitroudd696492011-06-08 17:21:55 +0200380 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000381 p.join()
382
383 def test_context_manager_shutdown(self):
384 with futures.ProcessPoolExecutor(max_workers=5) as e:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000385 processes = e._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000386 self.assertEqual(list(e.map(abs, range(-5, 5))),
387 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
388
Antoine Pitroudd696492011-06-08 17:21:55 +0200389 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000390 p.join()
391
392 def test_del_shutdown(self):
393 executor = futures.ProcessPoolExecutor(max_workers=5)
394 list(executor.map(abs, range(-5, 5)))
395 queue_management_thread = executor._queue_management_thread
396 processes = executor._processes
Victor Stinner3bcf1572017-09-12 17:05:53 -0700397 call_queue = executor._call_queue
Thomas Moreau94459fd2018-01-05 11:15:54 +0100398 queue_management_thread = executor._queue_management_thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000399 del executor
400
Leo Ariasc3d95082018-02-03 18:36:10 -0600401 # Make sure that all the executor resources were properly cleaned by
Thomas Moreau94459fd2018-01-05 11:15:54 +0100402 # the shutdown process
Brian Quinlan81c4d362010-09-18 22:35:02 +0000403 queue_management_thread.join()
Antoine Pitroudd696492011-06-08 17:21:55 +0200404 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000405 p.join()
Victor Stinner3bcf1572017-09-12 17:05:53 -0700406 call_queue.join_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000407
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200408
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100409create_executor_tests(ProcessPoolShutdownTest,
410 executor_mixins=(ProcessPoolForkMixin,
411 ProcessPoolForkserverMixin,
412 ProcessPoolSpawnMixin))
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200413
414
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200415class WaitTests:
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200416
Brian Quinlan81c4d362010-09-18 22:35:02 +0000417 def test_first_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000418 future1 = self.executor.submit(mul, 21, 2)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100419 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000420
Brian Quinlan1d1df822011-01-03 02:56:39 +0000421 done, not_done = futures.wait(
422 [CANCELLED_FUTURE, future1, future2],
423 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000424
Brian Quinlan1d1df822011-01-03 02:56:39 +0000425 self.assertEqual(set([future1]), done)
426 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000427
Brian Quinlan1d1df822011-01-03 02:56:39 +0000428 def test_first_completed_some_already_completed(self):
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100429 future1 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000430
Brian Quinlan1d1df822011-01-03 02:56:39 +0000431 finished, pending = futures.wait(
432 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
433 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000434
Brian Quinlan1d1df822011-01-03 02:56:39 +0000435 self.assertEqual(
436 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
437 finished)
438 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000439
440 def test_first_exception(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000441 future1 = self.executor.submit(mul, 2, 21)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100442 future2 = self.executor.submit(sleep_and_raise, 1.5)
443 future3 = self.executor.submit(time.sleep, 3)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000444
Brian Quinlan1d1df822011-01-03 02:56:39 +0000445 finished, pending = futures.wait(
446 [future1, future2, future3],
447 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000448
Brian Quinlan1d1df822011-01-03 02:56:39 +0000449 self.assertEqual(set([future1, future2]), finished)
450 self.assertEqual(set([future3]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000451
452 def test_first_exception_some_already_complete(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000453 future1 = self.executor.submit(divmod, 21, 0)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100454 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000455
Brian Quinlan1d1df822011-01-03 02:56:39 +0000456 finished, pending = futures.wait(
457 [SUCCESSFUL_FUTURE,
458 CANCELLED_FUTURE,
459 CANCELLED_AND_NOTIFIED_FUTURE,
460 future1, future2],
461 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000462
Brian Quinlan1d1df822011-01-03 02:56:39 +0000463 self.assertEqual(set([SUCCESSFUL_FUTURE,
464 CANCELLED_AND_NOTIFIED_FUTURE,
465 future1]), finished)
466 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000467
468 def test_first_exception_one_already_failed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000469 future1 = self.executor.submit(time.sleep, 2)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000470
Brian Quinlan1d1df822011-01-03 02:56:39 +0000471 finished, pending = futures.wait(
472 [EXCEPTION_FUTURE, future1],
473 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000474
Brian Quinlan1d1df822011-01-03 02:56:39 +0000475 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
476 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000477
478 def test_all_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000479 future1 = self.executor.submit(divmod, 2, 0)
480 future2 = self.executor.submit(mul, 2, 21)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000481
Brian Quinlan1d1df822011-01-03 02:56:39 +0000482 finished, pending = futures.wait(
483 [SUCCESSFUL_FUTURE,
484 CANCELLED_AND_NOTIFIED_FUTURE,
485 EXCEPTION_FUTURE,
486 future1,
487 future2],
488 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000489
Brian Quinlan1d1df822011-01-03 02:56:39 +0000490 self.assertEqual(set([SUCCESSFUL_FUTURE,
491 CANCELLED_AND_NOTIFIED_FUTURE,
492 EXCEPTION_FUTURE,
493 future1,
494 future2]), finished)
495 self.assertEqual(set(), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000496
497 def test_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000498 future1 = self.executor.submit(mul, 6, 7)
Brian Quinlan1ae29982011-05-30 21:52:24 +1000499 future2 = self.executor.submit(time.sleep, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000500
Brian Quinlan1d1df822011-01-03 02:56:39 +0000501 finished, pending = futures.wait(
502 [CANCELLED_AND_NOTIFIED_FUTURE,
503 EXCEPTION_FUTURE,
504 SUCCESSFUL_FUTURE,
505 future1, future2],
Brian Quinlan1ae29982011-05-30 21:52:24 +1000506 timeout=5,
Brian Quinlan1d1df822011-01-03 02:56:39 +0000507 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000508
Brian Quinlan1d1df822011-01-03 02:56:39 +0000509 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
510 EXCEPTION_FUTURE,
511 SUCCESSFUL_FUTURE,
512 future1]), finished)
513 self.assertEqual(set([future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000514
515
Victor Stinner489d91c2017-08-21 23:24:24 +0200516class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200517
518 def test_pending_calls_race(self):
519 # Issue #14406: multi-threaded race condition when waiting on all
520 # futures.
521 event = threading.Event()
522 def future_func():
523 event.wait()
524 oldswitchinterval = sys.getswitchinterval()
525 sys.setswitchinterval(1e-6)
526 try:
527 fs = {self.executor.submit(future_func) for i in range(100)}
528 event.set()
529 futures.wait(fs, return_when=futures.ALL_COMPLETED)
530 finally:
531 sys.setswitchinterval(oldswitchinterval)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000532
Brian Quinlan1d1df822011-01-03 02:56:39 +0000533
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100534create_executor_tests(WaitTests,
535 executor_mixins=(ProcessPoolForkMixin,
536 ProcessPoolForkserverMixin,
537 ProcessPoolSpawnMixin))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000538
Brian Quinlan1d1df822011-01-03 02:56:39 +0000539
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200540class AsCompletedTests:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000541 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
542 def test_no_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000543 future1 = self.executor.submit(mul, 2, 21)
544 future2 = self.executor.submit(mul, 7, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000545
Brian Quinlan1d1df822011-01-03 02:56:39 +0000546 completed = set(futures.as_completed(
547 [CANCELLED_AND_NOTIFIED_FUTURE,
548 EXCEPTION_FUTURE,
549 SUCCESSFUL_FUTURE,
550 future1, future2]))
551 self.assertEqual(set(
552 [CANCELLED_AND_NOTIFIED_FUTURE,
553 EXCEPTION_FUTURE,
554 SUCCESSFUL_FUTURE,
555 future1, future2]),
556 completed)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000557
558 def test_zero_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000559 future1 = self.executor.submit(time.sleep, 2)
560 completed_futures = set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000561 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000562 for future in futures.as_completed(
563 [CANCELLED_AND_NOTIFIED_FUTURE,
564 EXCEPTION_FUTURE,
565 SUCCESSFUL_FUTURE,
566 future1],
567 timeout=0):
568 completed_futures.add(future)
569 except futures.TimeoutError:
570 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000571
Brian Quinlan1d1df822011-01-03 02:56:39 +0000572 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
573 EXCEPTION_FUTURE,
574 SUCCESSFUL_FUTURE]),
575 completed_futures)
576
Guido van Rossume6994ff2014-01-26 09:57:51 -0800577 def test_duplicate_futures(self):
578 # Issue 20367. Duplicate futures should not raise exceptions or give
579 # duplicate responses.
Ɓukasz Langa574562c2017-09-29 14:33:34 -0700580 # Issue #31641: accept arbitrary iterables.
Guido van Rossume6994ff2014-01-26 09:57:51 -0800581 future1 = self.executor.submit(time.sleep, 2)
Ɓukasz Langa574562c2017-09-29 14:33:34 -0700582 completed = [
583 f for f in futures.as_completed(itertools.repeat(future1, 3))
584 ]
Guido van Rossume6994ff2014-01-26 09:57:51 -0800585 self.assertEqual(len(completed), 1)
586
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200587 def test_free_reference_yielded_future(self):
588 # Issue #14406: Generator should not keep references
589 # to finished futures.
590 futures_list = [Future() for _ in range(8)]
591 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
Antoine Pitrou2ef37602017-09-03 15:09:23 +0200592 futures_list.append(create_future(state=FINISHED, result=42))
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200593
594 with self.assertRaises(futures.TimeoutError):
595 for future in futures.as_completed(futures_list, timeout=0):
596 futures_list.remove(future)
597 wr = weakref.ref(future)
598 del future
599 self.assertIsNone(wr())
600
601 futures_list[0].set_result("test")
602 for future in futures.as_completed(futures_list):
603 futures_list.remove(future)
604 wr = weakref.ref(future)
605 del future
606 self.assertIsNone(wr())
607 if futures_list:
608 futures_list[0].set_result("test")
609
610 def test_correct_timeout_exception_msg(self):
611 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
612 RUNNING_FUTURE, SUCCESSFUL_FUTURE]
613
614 with self.assertRaises(futures.TimeoutError) as cm:
615 list(futures.as_completed(futures_list, timeout=0))
616
617 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
618
Brian Quinlan81c4d362010-09-18 22:35:02 +0000619
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100620create_executor_tests(AsCompletedTests)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000621
Brian Quinlan1d1df822011-01-03 02:56:39 +0000622
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200623class ExecutorTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000624 # Executor.shutdown() and context manager usage is tested by
625 # ExecutorShutdownTest.
626 def test_submit(self):
627 future = self.executor.submit(pow, 2, 8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000628 self.assertEqual(256, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000629
630 def test_submit_keyword(self):
631 future = self.executor.submit(mul, 2, y=8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000632 self.assertEqual(16, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000633
634 def test_map(self):
635 self.assertEqual(
636 list(self.executor.map(pow, range(10), range(10))),
637 list(map(pow, range(10), range(10))))
638
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200639 self.assertEqual(
640 list(self.executor.map(pow, range(10), range(10), chunksize=3)),
641 list(map(pow, range(10), range(10))))
642
Brian Quinlan81c4d362010-09-18 22:35:02 +0000643 def test_map_exception(self):
644 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
645 self.assertEqual(i.__next__(), (0, 1))
646 self.assertEqual(i.__next__(), (0, 1))
647 self.assertRaises(ZeroDivisionError, i.__next__)
648
649 def test_map_timeout(self):
650 results = []
Brian Quinlan81c4d362010-09-18 22:35:02 +0000651 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000652 for i in self.executor.map(time.sleep,
Brian Quinlan1ae29982011-05-30 21:52:24 +1000653 [0, 0, 6],
654 timeout=5):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000655 results.append(i)
656 except futures.TimeoutError:
657 pass
658 else:
659 self.fail('expected TimeoutError')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000660
Brian Quinlan1d1df822011-01-03 02:56:39 +0000661 self.assertEqual([None, None], results)
662
Antoine Pitrou020436b2011-07-02 21:20:25 +0200663 def test_shutdown_race_issue12456(self):
664 # Issue #12456: race condition at shutdown where trying to post a
665 # sentinel in the call queue blocks (the queue is full while processes
666 # have exited).
667 self.executor.map(str, [2] * (self.worker_count + 1))
668 self.executor.shutdown()
669
Andrew Svetlov6b973742012-11-03 15:36:01 +0200670 @test.support.cpython_only
671 def test_no_stale_references(self):
672 # Issue #16284: check that the executors don't unnecessarily hang onto
673 # references.
674 my_object = MyObject()
675 my_object_collected = threading.Event()
676 my_object_callback = weakref.ref(
677 my_object, lambda obj: my_object_collected.set())
678 # Deliberately discarding the future.
679 self.executor.submit(my_object.my_method)
680 del my_object
681
682 collected = my_object_collected.wait(timeout=5.0)
683 self.assertTrue(collected,
684 "Stale reference not collected within timeout.")
685
Brian Quinlan20efceb2014-05-17 13:51:10 -0700686 def test_max_workers_negative(self):
687 for number in (0, -1):
R David Murray475a4762014-06-11 16:25:05 -0400688 with self.assertRaisesRegex(ValueError,
689 "max_workers must be greater "
690 "than 0"):
Brian Quinlan20efceb2014-05-17 13:51:10 -0700691 self.executor_type(max_workers=number)
692
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200693 def test_free_reference(self):
694 # Issue #14406: Result iterator should not keep an internal
695 # reference to result objects.
696 for obj in self.executor.map(make_dummy_object, range(10)):
697 wr = weakref.ref(obj)
698 del obj
699 self.assertIsNone(wr())
700
Brian Quinlan81c4d362010-09-18 22:35:02 +0000701
Victor Stinner489d91c2017-08-21 23:24:24 +0200702class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
Brian Quinlanf0078762011-04-08 08:19:33 +1000703 def test_map_submits_without_iteration(self):
704 """Tests verifying issue 11777."""
705 finished = []
706 def record_finished(n):
707 finished.append(n)
708
709 self.executor.map(record_finished, range(10))
710 self.executor.shutdown(wait=True)
711 self.assertCountEqual(finished, range(10))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000712
Guido van Rossumcfd46612014-09-02 10:39:18 -0700713 def test_default_workers(self):
714 executor = self.executor_type()
715 self.assertEqual(executor._max_workers,
716 (os.cpu_count() or 1) * 5)
717
Brian Quinlan1d1df822011-01-03 02:56:39 +0000718
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200719class ProcessPoolExecutorTest(ExecutorTest):
Antoine Pitroudd696492011-06-08 17:21:55 +0200720 def test_killed_child(self):
721 # When a child process is abruptly terminated, the whole pool gets
722 # "broken".
723 futures = [self.executor.submit(time.sleep, 3)]
724 # Get one of the processes, and terminate (kill) it
725 p = next(iter(self.executor._processes.values()))
726 p.terminate()
727 for fut in futures:
728 self.assertRaises(BrokenProcessPool, fut.result)
729 # Submitting other jobs fails as well.
730 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000731
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200732 def test_map_chunksize(self):
733 def bad_map():
734 list(self.executor.map(pow, range(40), range(40), chunksize=-1))
735
736 ref = list(map(pow, range(40), range(40)))
737 self.assertEqual(
738 list(self.executor.map(pow, range(40), range(40), chunksize=6)),
739 ref)
740 self.assertEqual(
741 list(self.executor.map(pow, range(40), range(40), chunksize=50)),
742 ref)
743 self.assertEqual(
744 list(self.executor.map(pow, range(40), range(40), chunksize=40)),
745 ref)
746 self.assertRaises(ValueError, bad_map)
747
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100748 @classmethod
749 def _test_traceback(cls):
750 raise RuntimeError(123) # some comment
751
752 def test_traceback(self):
753 # We want ensure that the traceback from the child process is
754 # contained in the traceback raised in the main process.
755 future = self.executor.submit(self._test_traceback)
756 with self.assertRaises(Exception) as cm:
757 future.result()
758
759 exc = cm.exception
760 self.assertIs(type(exc), RuntimeError)
761 self.assertEqual(exc.args, (123,))
762 cause = exc.__cause__
763 self.assertIs(type(cause), futures.process._RemoteTraceback)
764 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
765
766 with test.support.captured_stderr() as f1:
767 try:
768 raise exc
769 except RuntimeError:
770 sys.excepthook(*sys.exc_info())
771 self.assertIn('raise RuntimeError(123) # some comment',
772 f1.getvalue())
773
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200774 def test_ressources_gced_in_workers(self):
775 # Ensure that argument for a job are correctly gc-ed after the job
776 # is finished
777 obj = EventfulGCObj(self.ctx)
778 future = self.executor.submit(id, obj)
779 future.result()
780
781 self.assertTrue(obj.event.wait(timeout=1))
782
783
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100784create_executor_tests(ProcessPoolExecutorTest,
785 executor_mixins=(ProcessPoolForkMixin,
786 ProcessPoolForkserverMixin,
787 ProcessPoolSpawnMixin))
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200788
Thomas Moreau94459fd2018-01-05 11:15:54 +0100789def hide_process_stderr():
790 import io
791 sys.stderr = io.StringIO()
792
793
794def _crash(delay=None):
795 """Induces a segfault."""
796 if delay:
797 time.sleep(delay)
798 import faulthandler
799 faulthandler.disable()
800 faulthandler._sigsegv()
801
802
803def _exit():
804 """Induces a sys exit with exitcode 1."""
805 sys.exit(1)
806
807
808def _raise_error(Err):
809 """Function that raises an Exception in process."""
810 hide_process_stderr()
811 raise Err()
812
813
814def _return_instance(cls):
815 """Function that returns a instance of cls."""
816 hide_process_stderr()
817 return cls()
818
819
820class CrashAtPickle(object):
821 """Bad object that triggers a segfault at pickling time."""
822 def __reduce__(self):
823 _crash()
824
825
826class CrashAtUnpickle(object):
827 """Bad object that triggers a segfault at unpickling time."""
828 def __reduce__(self):
829 return _crash, ()
830
831
832class ExitAtPickle(object):
833 """Bad object that triggers a process exit at pickling time."""
834 def __reduce__(self):
835 _exit()
836
837
838class ExitAtUnpickle(object):
839 """Bad object that triggers a process exit at unpickling time."""
840 def __reduce__(self):
841 return _exit, ()
842
843
844class ErrorAtPickle(object):
845 """Bad object that triggers an error at pickling time."""
846 def __reduce__(self):
847 from pickle import PicklingError
848 raise PicklingError("Error in pickle")
849
850
851class ErrorAtUnpickle(object):
852 """Bad object that triggers an error at unpickling time."""
853 def __reduce__(self):
854 from pickle import UnpicklingError
855 return _raise_error, (UnpicklingError, )
856
857
858class ExecutorDeadlockTest:
859 TIMEOUT = 15
860
861 @classmethod
862 def _sleep_id(cls, x, delay):
863 time.sleep(delay)
864 return x
865
866 def _fail_on_deadlock(self, executor):
867 # If we did not recover before TIMEOUT seconds, consider that the
868 # executor is in a deadlock state and forcefully clean all its
869 # composants.
870 import faulthandler
871 from tempfile import TemporaryFile
872 with TemporaryFile(mode="w+") as f:
873 faulthandler.dump_traceback(file=f)
874 f.seek(0)
875 tb = f.read()
876 for p in executor._processes.values():
877 p.terminate()
878 # This should be safe to call executor.shutdown here as all possible
879 # deadlocks should have been broken.
880 executor.shutdown(wait=True)
881 print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
882 self.fail(f"Executor deadlock:\n\n{tb}")
883
884
885 def test_crash(self):
886 # extensive testing for deadlock caused by crashes in a pool.
887 self.executor.shutdown(wait=True)
888 crash_cases = [
Leo Ariasc3d95082018-02-03 18:36:10 -0600889 # Check problem occurring while pickling a task in
Thomas Moreau94459fd2018-01-05 11:15:54 +0100890 # the task_handler thread
891 (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600892 # Check problem occurring while unpickling a task on workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100893 (id, (ExitAtUnpickle(),), BrokenProcessPool,
894 "exit at task unpickle"),
895 (id, (ErrorAtUnpickle(),), BrokenProcessPool,
896 "error at task unpickle"),
897 (id, (CrashAtUnpickle(),), BrokenProcessPool,
898 "crash at task unpickle"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600899 # Check problem occurring during func execution on workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100900 (_crash, (), BrokenProcessPool,
901 "crash during func execution on worker"),
902 (_exit, (), SystemExit,
903 "exit during func execution on worker"),
904 (_raise_error, (RuntimeError, ), RuntimeError,
905 "error during func execution on worker"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600906 # Check problem occurring while pickling a task result
Thomas Moreau94459fd2018-01-05 11:15:54 +0100907 # on workers
908 (_return_instance, (CrashAtPickle,), BrokenProcessPool,
909 "crash during result pickle on worker"),
910 (_return_instance, (ExitAtPickle,), SystemExit,
911 "exit during result pickle on worker"),
912 (_return_instance, (ErrorAtPickle,), PicklingError,
913 "error during result pickle on worker"),
Leo Ariasc3d95082018-02-03 18:36:10 -0600914 # Check problem occurring while unpickling a task in
Thomas Moreau94459fd2018-01-05 11:15:54 +0100915 # the result_handler thread
916 (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
917 "error during result unpickle in result_handler"),
918 (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
919 "exit during result unpickle in result_handler")
920 ]
921 for func, args, error, name in crash_cases:
922 with self.subTest(name):
923 # The captured_stderr reduces the noise in the test report
924 with test.support.captured_stderr():
925 executor = self.executor_type(
926 max_workers=2, mp_context=get_context(self.ctx))
927 res = executor.submit(func, *args)
928 with self.assertRaises(error):
929 try:
930 res.result(timeout=self.TIMEOUT)
931 except futures.TimeoutError:
932 # If we did not recover before TIMEOUT seconds,
933 # consider that the executor is in a deadlock state
934 self._fail_on_deadlock(executor)
935 executor.shutdown(wait=True)
936
937 def test_shutdown_deadlock(self):
938 # Test that the pool calling shutdown do not cause deadlock
939 # if a worker fails after the shutdown call.
940 self.executor.shutdown(wait=True)
941 with self.executor_type(max_workers=2,
942 mp_context=get_context(self.ctx)) as executor:
943 self.executor = executor # Allow clean up in fail_on_deadlock
944 f = executor.submit(_crash, delay=.1)
945 executor.shutdown(wait=True)
946 with self.assertRaises(BrokenProcessPool):
947 f.result()
948
949
950create_executor_tests(ExecutorDeadlockTest,
951 executor_mixins=(ProcessPoolForkMixin,
952 ProcessPoolForkserverMixin,
953 ProcessPoolSpawnMixin))
954
Brian Quinlan1d1df822011-01-03 02:56:39 +0000955
Victor Stinner489d91c2017-08-21 23:24:24 +0200956class FutureTests(BaseTestCase):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000957 def test_done_callback_with_result(self):
958 callback_result = None
959 def fn(callback_future):
960 nonlocal callback_result
961 callback_result = callback_future.result()
962
963 f = Future()
964 f.add_done_callback(fn)
965 f.set_result(5)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000966 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000967
968 def test_done_callback_with_exception(self):
969 callback_exception = None
970 def fn(callback_future):
971 nonlocal callback_exception
972 callback_exception = callback_future.exception()
973
974 f = Future()
975 f.add_done_callback(fn)
976 f.set_exception(Exception('test'))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000977 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000978
979 def test_done_callback_with_cancel(self):
980 was_cancelled = None
981 def fn(callback_future):
982 nonlocal was_cancelled
983 was_cancelled = callback_future.cancelled()
984
985 f = Future()
986 f.add_done_callback(fn)
987 self.assertTrue(f.cancel())
988 self.assertTrue(was_cancelled)
989
990 def test_done_callback_raises(self):
Brian Quinlan251cc842010-12-28 21:14:34 +0000991 with test.support.captured_stderr() as stderr:
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000992 raising_was_called = False
993 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000994
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000995 def raising_fn(callback_future):
996 nonlocal raising_was_called
997 raising_was_called = True
998 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000999
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +00001000 def fn(callback_future):
1001 nonlocal fn_was_called
1002 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +00001003
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +00001004 f = Future()
1005 f.add_done_callback(raising_fn)
1006 f.add_done_callback(fn)
1007 f.set_result(5)
1008 self.assertTrue(raising_was_called)
1009 self.assertTrue(fn_was_called)
Brian Quinlan251cc842010-12-28 21:14:34 +00001010 self.assertIn('Exception: doh!', stderr.getvalue())
Brian Quinlan81c4d362010-09-18 22:35:02 +00001011
1012 def test_done_callback_already_successful(self):
1013 callback_result = None
1014 def fn(callback_future):
1015 nonlocal callback_result
1016 callback_result = callback_future.result()
1017
1018 f = Future()
1019 f.set_result(5)
1020 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001021 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001022
1023 def test_done_callback_already_failed(self):
1024 callback_exception = None
1025 def fn(callback_future):
1026 nonlocal callback_exception
1027 callback_exception = callback_future.exception()
1028
1029 f = Future()
1030 f.set_exception(Exception('test'))
1031 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001032 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001033
1034 def test_done_callback_already_cancelled(self):
1035 was_cancelled = None
1036 def fn(callback_future):
1037 nonlocal was_cancelled
1038 was_cancelled = callback_future.cancelled()
1039
1040 f = Future()
1041 self.assertTrue(f.cancel())
1042 f.add_done_callback(fn)
1043 self.assertTrue(was_cancelled)
1044
1045 def test_repr(self):
Ezio Melottied3a7d22010-12-01 02:32:32 +00001046 self.assertRegex(repr(PENDING_FUTURE),
1047 '<Future at 0x[0-9a-f]+ state=pending>')
1048 self.assertRegex(repr(RUNNING_FUTURE),
1049 '<Future at 0x[0-9a-f]+ state=running>')
1050 self.assertRegex(repr(CANCELLED_FUTURE),
1051 '<Future at 0x[0-9a-f]+ state=cancelled>')
1052 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1053 '<Future at 0x[0-9a-f]+ state=cancelled>')
1054 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +00001055 repr(EXCEPTION_FUTURE),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001056 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
Ezio Melottied3a7d22010-12-01 02:32:32 +00001057 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +00001058 repr(SUCCESSFUL_FUTURE),
1059 '<Future at 0x[0-9a-f]+ state=finished returned int>')
1060
1061
1062 def test_cancel(self):
1063 f1 = create_future(state=PENDING)
1064 f2 = create_future(state=RUNNING)
1065 f3 = create_future(state=CANCELLED)
1066 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001067 f5 = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +00001068 f6 = create_future(state=FINISHED, result=5)
1069
1070 self.assertTrue(f1.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001071 self.assertEqual(f1._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001072
1073 self.assertFalse(f2.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001074 self.assertEqual(f2._state, RUNNING)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001075
1076 self.assertTrue(f3.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001077 self.assertEqual(f3._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001078
1079 self.assertTrue(f4.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001080 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001081
1082 self.assertFalse(f5.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001083 self.assertEqual(f5._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001084
1085 self.assertFalse(f6.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001086 self.assertEqual(f6._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001087
1088 def test_cancelled(self):
1089 self.assertFalse(PENDING_FUTURE.cancelled())
1090 self.assertFalse(RUNNING_FUTURE.cancelled())
1091 self.assertTrue(CANCELLED_FUTURE.cancelled())
1092 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1093 self.assertFalse(EXCEPTION_FUTURE.cancelled())
1094 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1095
1096 def test_done(self):
1097 self.assertFalse(PENDING_FUTURE.done())
1098 self.assertFalse(RUNNING_FUTURE.done())
1099 self.assertTrue(CANCELLED_FUTURE.done())
1100 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1101 self.assertTrue(EXCEPTION_FUTURE.done())
1102 self.assertTrue(SUCCESSFUL_FUTURE.done())
1103
1104 def test_running(self):
1105 self.assertFalse(PENDING_FUTURE.running())
1106 self.assertTrue(RUNNING_FUTURE.running())
1107 self.assertFalse(CANCELLED_FUTURE.running())
1108 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1109 self.assertFalse(EXCEPTION_FUTURE.running())
1110 self.assertFalse(SUCCESSFUL_FUTURE.running())
1111
1112 def test_result_with_timeout(self):
1113 self.assertRaises(futures.TimeoutError,
1114 PENDING_FUTURE.result, timeout=0)
1115 self.assertRaises(futures.TimeoutError,
1116 RUNNING_FUTURE.result, timeout=0)
1117 self.assertRaises(futures.CancelledError,
1118 CANCELLED_FUTURE.result, timeout=0)
1119 self.assertRaises(futures.CancelledError,
1120 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001121 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
Brian Quinlan81c4d362010-09-18 22:35:02 +00001122 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1123
1124 def test_result_with_success(self):
Martin Panter46f50722016-05-26 05:35:26 +00001125 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +00001126 def notification():
1127 # Wait until the main thread is waiting for the result.
1128 time.sleep(1)
1129 f1.set_result(42)
1130
1131 f1 = create_future(state=PENDING)
1132 t = threading.Thread(target=notification)
1133 t.start()
1134
Ezio Melottib3aedd42010-11-20 19:04:17 +00001135 self.assertEqual(f1.result(timeout=5), 42)
Victor Stinner18e95b42017-09-14 08:43:04 -07001136 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001137
1138 def test_result_with_cancel(self):
Martin Panter46f50722016-05-26 05:35:26 +00001139 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +00001140 def notification():
1141 # Wait until the main thread is waiting for the result.
1142 time.sleep(1)
1143 f1.cancel()
1144
1145 f1 = create_future(state=PENDING)
1146 t = threading.Thread(target=notification)
1147 t.start()
1148
1149 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
Victor Stinner18e95b42017-09-14 08:43:04 -07001150 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001151
1152 def test_exception_with_timeout(self):
1153 self.assertRaises(futures.TimeoutError,
1154 PENDING_FUTURE.exception, timeout=0)
1155 self.assertRaises(futures.TimeoutError,
1156 RUNNING_FUTURE.exception, timeout=0)
1157 self.assertRaises(futures.CancelledError,
1158 CANCELLED_FUTURE.exception, timeout=0)
1159 self.assertRaises(futures.CancelledError,
1160 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1161 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001162 OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +00001163 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1164
1165 def test_exception_with_success(self):
1166 def notification():
1167 # Wait until the main thread is waiting for the exception.
1168 time.sleep(1)
1169 with f1._condition:
1170 f1._state = FINISHED
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001171 f1._exception = OSError()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001172 f1._condition.notify_all()
1173
1174 f1 = create_future(state=PENDING)
1175 t = threading.Thread(target=notification)
1176 t.start()
1177
Antoine Pitrou6b4883d2011-10-12 02:54:14 +02001178 self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
Victor Stinner18e95b42017-09-14 08:43:04 -07001179 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001180
Antoine Pitrou63ff4132017-11-04 11:05:49 +01001181
Antoine Pitrou9470ab42011-07-15 20:25:20 +02001182@test.support.reap_threads
Brian Quinlan81c4d362010-09-18 22:35:02 +00001183def test_main():
Antoine Pitrou9470ab42011-07-15 20:25:20 +02001184 try:
Antoine Pitrou9816a1e2013-10-15 23:23:32 +02001185 test.support.run_unittest(__name__)
Antoine Pitrou9470ab42011-07-15 20:25:20 +02001186 finally:
1187 test.support.reap_children()
Brian Quinlan81c4d362010-09-18 22:35:02 +00001188
1189if __name__ == "__main__":
1190 test_main()