blob: 23e95b212447c866a0ec7d282b58cd416d5e60a9 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07007# import threading after _multiprocessing to raise a more relevant error
Brian Quinlan81c4d362010-09-18 22:35:02 +00008# message: "No module named _multiprocessing". _multiprocessing is not compiled
9# without thread support.
10test.support.import_module('threading')
11
Berker Peksagce643912015-05-06 06:33:17 +030012from test.support.script_helper import assert_python_ok
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010013
Guido van Rossumcfd46612014-09-02 10:39:18 -070014import os
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010015import sys
Brian Quinlan81c4d362010-09-18 22:35:02 +000016import threading
17import time
18import unittest
Andrew Svetlov6b973742012-11-03 15:36:01 +020019import weakref
Brian Quinlan81c4d362010-09-18 22:35:02 +000020
Brian Quinlan81c4d362010-09-18 22:35:02 +000021from concurrent import futures
22from concurrent.futures._base import (
Brian Quinlan1d1df822011-01-03 02:56:39 +000023 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
Antoine Pitroudd696492011-06-08 17:21:55 +020024from concurrent.futures.process import BrokenProcessPool
Brian Quinlan81c4d362010-09-18 22:35:02 +000025
Brian Quinlan1d1df822011-01-03 02:56:39 +000026
Brian Quinlan81c4d362010-09-18 22:35:02 +000027def create_future(state=PENDING, exception=None, result=None):
28 f = Future()
29 f._state = state
30 f._exception = exception
31 f._result = result
32 return f
33
Brian Quinlan1d1df822011-01-03 02:56:39 +000034
Brian Quinlan81c4d362010-09-18 22:35:02 +000035PENDING_FUTURE = create_future(state=PENDING)
36RUNNING_FUTURE = create_future(state=RUNNING)
37CANCELLED_FUTURE = create_future(state=CANCELLED)
38CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +020039EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +000040SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
41
Brian Quinlan1d1df822011-01-03 02:56:39 +000042
Brian Quinlan81c4d362010-09-18 22:35:02 +000043def mul(x, y):
44 return x * y
45
Brian Quinlan81c4d362010-09-18 22:35:02 +000046
Brian Quinlan1d1df822011-01-03 02:56:39 +000047def sleep_and_raise(t):
48 time.sleep(t)
49 raise Exception('this is an exception')
Brian Quinlan81c4d362010-09-18 22:35:02 +000050
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010051def sleep_and_print(t, msg):
52 time.sleep(t)
53 print(msg)
54 sys.stdout.flush()
55
Brian Quinlan81c4d362010-09-18 22:35:02 +000056
Andrew Svetlov6b973742012-11-03 15:36:01 +020057class MyObject(object):
58 def my_method(self):
59 pass
60
61
Brian Quinlan1d1df822011-01-03 02:56:39 +000062class ExecutorMixin:
63 worker_count = 5
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010064
65 def setUp(self):
66 self.t1 = time.time()
67 try:
68 self.executor = self.executor_type(max_workers=self.worker_count)
69 except NotImplementedError as e:
70 self.skipTest(str(e))
71 self._prime_executor()
72
73 def tearDown(self):
74 self.executor.shutdown(wait=True)
75 dt = time.time() - self.t1
76 if test.support.verbose:
77 print("%.2fs" % dt, end=' ')
78 self.assertLess(dt, 60, "synchronization issue: test lasted too long")
79
Brian Quinlan1d1df822011-01-03 02:56:39 +000080 def _prime_executor(self):
81 # Make sure that the executor is ready to do work before running the
82 # tests. This should reduce the probability of timeouts in the tests.
83 futures = [self.executor.submit(time.sleep, 0.1)
84 for _ in range(self.worker_count)]
Brian Quinlan81c4d362010-09-18 22:35:02 +000085
Brian Quinlan1d1df822011-01-03 02:56:39 +000086 for f in futures:
87 f.result()
Brian Quinlan81c4d362010-09-18 22:35:02 +000088
Brian Quinlan81c4d362010-09-18 22:35:02 +000089
Brian Quinlan1d1df822011-01-03 02:56:39 +000090class ThreadPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010091 executor_type = futures.ThreadPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000092
Brian Quinlan81c4d362010-09-18 22:35:02 +000093
Brian Quinlan1d1df822011-01-03 02:56:39 +000094class ProcessPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010095 executor_type = futures.ProcessPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000096
Brian Quinlan81c4d362010-09-18 22:35:02 +000097
Antoine Pitrou9816a1e2013-10-15 23:23:32 +020098class ExecutorShutdownTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +000099 def test_run_after_shutdown(self):
100 self.executor.shutdown()
101 self.assertRaises(RuntimeError,
102 self.executor.submit,
103 pow, 2, 5)
104
Antoine Pitrouaebac0b2011-03-24 15:47:39 +0100105 def test_interpreter_shutdown(self):
106 # Test the atexit hook for shutdown of worker threads and processes
107 rc, out, err = assert_python_ok('-c', """if 1:
108 from concurrent.futures import {executor_type}
109 from time import sleep
110 from test.test_concurrent_futures import sleep_and_print
111 t = {executor_type}(5)
112 t.submit(sleep_and_print, 1.0, "apple")
113 """.format(executor_type=self.executor_type.__name__))
114 # Errors in atexit hooks don't change the process exit code, check
115 # stderr manually.
116 self.assertFalse(err)
117 self.assertEqual(out.strip(), b"apple")
118
Ross Lagerwall66e2fb62012-01-08 08:29:40 +0200119 def test_hang_issue12364(self):
120 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
121 self.executor.shutdown()
122 for f in fs:
123 f.result()
124
Brian Quinlan81c4d362010-09-18 22:35:02 +0000125
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200126class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000127 def _prime_executor(self):
128 pass
129
Brian Quinlan81c4d362010-09-18 22:35:02 +0000130 def test_threads_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000131 self.executor.submit(mul, 21, 2)
132 self.executor.submit(mul, 6, 7)
133 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000134 self.assertEqual(len(self.executor._threads), 3)
135 self.executor.shutdown()
136 for t in self.executor._threads:
137 t.join()
138
139 def test_context_manager_shutdown(self):
140 with futures.ThreadPoolExecutor(max_workers=5) as e:
141 executor = e
142 self.assertEqual(list(e.map(abs, range(-5, 5))),
143 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
144
145 for t in executor._threads:
146 t.join()
147
148 def test_del_shutdown(self):
149 executor = futures.ThreadPoolExecutor(max_workers=5)
150 executor.map(abs, range(-5, 5))
151 threads = executor._threads
152 del executor
153
154 for t in threads:
155 t.join()
156
Gregory P. Smith50abe872016-08-07 10:19:20 -0700157 def test_thread_names_assigned(self):
158 executor = futures.ThreadPoolExecutor(
159 max_workers=5, thread_name_prefix='SpecialPool')
160 executor.map(abs, range(-5, 5))
161 threads = executor._threads
162 del executor
163
164 for t in threads:
165 self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
166 t.join()
167
168 def test_thread_names_default(self):
169 executor = futures.ThreadPoolExecutor(max_workers=5)
170 executor.map(abs, range(-5, 5))
171 threads = executor._threads
172 del executor
173
174 for t in threads:
175 # We don't particularly care what the default name is, just that
176 # it has a default name implying that it is a ThreadPoolExecutor
177 # followed by what looks like a thread number.
178 self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$')
179 t.join()
180
Brian Quinlan1d1df822011-01-03 02:56:39 +0000181
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200182class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000183 def _prime_executor(self):
184 pass
185
Brian Quinlan81c4d362010-09-18 22:35:02 +0000186 def test_processes_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000187 self.executor.submit(mul, 21, 2)
188 self.executor.submit(mul, 6, 7)
189 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000190 self.assertEqual(len(self.executor._processes), 5)
191 processes = self.executor._processes
192 self.executor.shutdown()
193
Antoine Pitroudd696492011-06-08 17:21:55 +0200194 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000195 p.join()
196
197 def test_context_manager_shutdown(self):
198 with futures.ProcessPoolExecutor(max_workers=5) as e:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000199 processes = e._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000200 self.assertEqual(list(e.map(abs, range(-5, 5))),
201 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
202
Antoine Pitroudd696492011-06-08 17:21:55 +0200203 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000204 p.join()
205
206 def test_del_shutdown(self):
207 executor = futures.ProcessPoolExecutor(max_workers=5)
208 list(executor.map(abs, range(-5, 5)))
209 queue_management_thread = executor._queue_management_thread
210 processes = executor._processes
211 del executor
212
213 queue_management_thread.join()
Antoine Pitroudd696492011-06-08 17:21:55 +0200214 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000215 p.join()
216
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200217
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200218class WaitTests:
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200219
Brian Quinlan81c4d362010-09-18 22:35:02 +0000220 def test_first_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000221 future1 = self.executor.submit(mul, 21, 2)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100222 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223
Brian Quinlan1d1df822011-01-03 02:56:39 +0000224 done, not_done = futures.wait(
225 [CANCELLED_FUTURE, future1, future2],
226 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000227
Brian Quinlan1d1df822011-01-03 02:56:39 +0000228 self.assertEqual(set([future1]), done)
229 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000230
Brian Quinlan1d1df822011-01-03 02:56:39 +0000231 def test_first_completed_some_already_completed(self):
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100232 future1 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000233
Brian Quinlan1d1df822011-01-03 02:56:39 +0000234 finished, pending = futures.wait(
235 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
236 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000237
Brian Quinlan1d1df822011-01-03 02:56:39 +0000238 self.assertEqual(
239 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
240 finished)
241 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000242
243 def test_first_exception(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000244 future1 = self.executor.submit(mul, 2, 21)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100245 future2 = self.executor.submit(sleep_and_raise, 1.5)
246 future3 = self.executor.submit(time.sleep, 3)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000247
Brian Quinlan1d1df822011-01-03 02:56:39 +0000248 finished, pending = futures.wait(
249 [future1, future2, future3],
250 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000251
Brian Quinlan1d1df822011-01-03 02:56:39 +0000252 self.assertEqual(set([future1, future2]), finished)
253 self.assertEqual(set([future3]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000254
255 def test_first_exception_some_already_complete(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000256 future1 = self.executor.submit(divmod, 21, 0)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100257 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000258
Brian Quinlan1d1df822011-01-03 02:56:39 +0000259 finished, pending = futures.wait(
260 [SUCCESSFUL_FUTURE,
261 CANCELLED_FUTURE,
262 CANCELLED_AND_NOTIFIED_FUTURE,
263 future1, future2],
264 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000265
Brian Quinlan1d1df822011-01-03 02:56:39 +0000266 self.assertEqual(set([SUCCESSFUL_FUTURE,
267 CANCELLED_AND_NOTIFIED_FUTURE,
268 future1]), finished)
269 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000270
271 def test_first_exception_one_already_failed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000272 future1 = self.executor.submit(time.sleep, 2)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000273
Brian Quinlan1d1df822011-01-03 02:56:39 +0000274 finished, pending = futures.wait(
275 [EXCEPTION_FUTURE, future1],
276 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000277
Brian Quinlan1d1df822011-01-03 02:56:39 +0000278 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
279 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000280
281 def test_all_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000282 future1 = self.executor.submit(divmod, 2, 0)
283 future2 = self.executor.submit(mul, 2, 21)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000284
Brian Quinlan1d1df822011-01-03 02:56:39 +0000285 finished, pending = futures.wait(
286 [SUCCESSFUL_FUTURE,
287 CANCELLED_AND_NOTIFIED_FUTURE,
288 EXCEPTION_FUTURE,
289 future1,
290 future2],
291 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000292
Brian Quinlan1d1df822011-01-03 02:56:39 +0000293 self.assertEqual(set([SUCCESSFUL_FUTURE,
294 CANCELLED_AND_NOTIFIED_FUTURE,
295 EXCEPTION_FUTURE,
296 future1,
297 future2]), finished)
298 self.assertEqual(set(), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000299
300 def test_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000301 future1 = self.executor.submit(mul, 6, 7)
Brian Quinlan1ae29982011-05-30 21:52:24 +1000302 future2 = self.executor.submit(time.sleep, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000303
Brian Quinlan1d1df822011-01-03 02:56:39 +0000304 finished, pending = futures.wait(
305 [CANCELLED_AND_NOTIFIED_FUTURE,
306 EXCEPTION_FUTURE,
307 SUCCESSFUL_FUTURE,
308 future1, future2],
Brian Quinlan1ae29982011-05-30 21:52:24 +1000309 timeout=5,
Brian Quinlan1d1df822011-01-03 02:56:39 +0000310 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000311
Brian Quinlan1d1df822011-01-03 02:56:39 +0000312 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
313 EXCEPTION_FUTURE,
314 SUCCESSFUL_FUTURE,
315 future1]), finished)
316 self.assertEqual(set([future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000317
318
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200319class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200320
321 def test_pending_calls_race(self):
322 # Issue #14406: multi-threaded race condition when waiting on all
323 # futures.
324 event = threading.Event()
325 def future_func():
326 event.wait()
327 oldswitchinterval = sys.getswitchinterval()
328 sys.setswitchinterval(1e-6)
329 try:
330 fs = {self.executor.submit(future_func) for i in range(100)}
331 event.set()
332 futures.wait(fs, return_when=futures.ALL_COMPLETED)
333 finally:
334 sys.setswitchinterval(oldswitchinterval)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000335
Brian Quinlan1d1df822011-01-03 02:56:39 +0000336
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200337class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000338 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000339
Brian Quinlan1d1df822011-01-03 02:56:39 +0000340
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200341class AsCompletedTests:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000342 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
343 def test_no_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000344 future1 = self.executor.submit(mul, 2, 21)
345 future2 = self.executor.submit(mul, 7, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000346
Brian Quinlan1d1df822011-01-03 02:56:39 +0000347 completed = set(futures.as_completed(
348 [CANCELLED_AND_NOTIFIED_FUTURE,
349 EXCEPTION_FUTURE,
350 SUCCESSFUL_FUTURE,
351 future1, future2]))
352 self.assertEqual(set(
353 [CANCELLED_AND_NOTIFIED_FUTURE,
354 EXCEPTION_FUTURE,
355 SUCCESSFUL_FUTURE,
356 future1, future2]),
357 completed)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000358
359 def test_zero_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000360 future1 = self.executor.submit(time.sleep, 2)
361 completed_futures = set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000362 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000363 for future in futures.as_completed(
364 [CANCELLED_AND_NOTIFIED_FUTURE,
365 EXCEPTION_FUTURE,
366 SUCCESSFUL_FUTURE,
367 future1],
368 timeout=0):
369 completed_futures.add(future)
370 except futures.TimeoutError:
371 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000372
Brian Quinlan1d1df822011-01-03 02:56:39 +0000373 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
374 EXCEPTION_FUTURE,
375 SUCCESSFUL_FUTURE]),
376 completed_futures)
377
Guido van Rossume6994ff2014-01-26 09:57:51 -0800378 def test_duplicate_futures(self):
379 # Issue 20367. Duplicate futures should not raise exceptions or give
380 # duplicate responses.
381 future1 = self.executor.submit(time.sleep, 2)
382 completed = [f for f in futures.as_completed([future1,future1])]
383 self.assertEqual(len(completed), 1)
384
Brian Quinlan81c4d362010-09-18 22:35:02 +0000385
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200386class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000387 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000388
Brian Quinlan1d1df822011-01-03 02:56:39 +0000389
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200390class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000391 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000392
Brian Quinlan1d1df822011-01-03 02:56:39 +0000393
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200394class ExecutorTest:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000395 # Executor.shutdown() and context manager usage is tested by
396 # ExecutorShutdownTest.
397 def test_submit(self):
398 future = self.executor.submit(pow, 2, 8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000399 self.assertEqual(256, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000400
401 def test_submit_keyword(self):
402 future = self.executor.submit(mul, 2, y=8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000403 self.assertEqual(16, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000404
405 def test_map(self):
406 self.assertEqual(
407 list(self.executor.map(pow, range(10), range(10))),
408 list(map(pow, range(10), range(10))))
409
410 def test_map_exception(self):
411 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
412 self.assertEqual(i.__next__(), (0, 1))
413 self.assertEqual(i.__next__(), (0, 1))
414 self.assertRaises(ZeroDivisionError, i.__next__)
415
416 def test_map_timeout(self):
417 results = []
Brian Quinlan81c4d362010-09-18 22:35:02 +0000418 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000419 for i in self.executor.map(time.sleep,
Brian Quinlan1ae29982011-05-30 21:52:24 +1000420 [0, 0, 6],
421 timeout=5):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000422 results.append(i)
423 except futures.TimeoutError:
424 pass
425 else:
426 self.fail('expected TimeoutError')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000427
Brian Quinlan1d1df822011-01-03 02:56:39 +0000428 self.assertEqual([None, None], results)
429
Antoine Pitrou020436b2011-07-02 21:20:25 +0200430 def test_shutdown_race_issue12456(self):
431 # Issue #12456: race condition at shutdown where trying to post a
432 # sentinel in the call queue blocks (the queue is full while processes
433 # have exited).
434 self.executor.map(str, [2] * (self.worker_count + 1))
435 self.executor.shutdown()
436
Andrew Svetlov6b973742012-11-03 15:36:01 +0200437 @test.support.cpython_only
438 def test_no_stale_references(self):
439 # Issue #16284: check that the executors don't unnecessarily hang onto
440 # references.
441 my_object = MyObject()
442 my_object_collected = threading.Event()
443 my_object_callback = weakref.ref(
444 my_object, lambda obj: my_object_collected.set())
445 # Deliberately discarding the future.
446 self.executor.submit(my_object.my_method)
447 del my_object
448
449 collected = my_object_collected.wait(timeout=5.0)
450 self.assertTrue(collected,
451 "Stale reference not collected within timeout.")
452
Brian Quinlan20efceb2014-05-17 13:51:10 -0700453 def test_max_workers_negative(self):
454 for number in (0, -1):
R David Murray475a4762014-06-11 16:25:05 -0400455 with self.assertRaisesRegex(ValueError,
456 "max_workers must be greater "
457 "than 0"):
Brian Quinlan20efceb2014-05-17 13:51:10 -0700458 self.executor_type(max_workers=number)
459
Brian Quinlan81c4d362010-09-18 22:35:02 +0000460
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200461class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
Brian Quinlanf0078762011-04-08 08:19:33 +1000462 def test_map_submits_without_iteration(self):
463 """Tests verifying issue 11777."""
464 finished = []
465 def record_finished(n):
466 finished.append(n)
467
468 self.executor.map(record_finished, range(10))
469 self.executor.shutdown(wait=True)
470 self.assertCountEqual(finished, range(10))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000471
Guido van Rossumcfd46612014-09-02 10:39:18 -0700472 def test_default_workers(self):
473 executor = self.executor_type()
474 self.assertEqual(executor._max_workers,
475 (os.cpu_count() or 1) * 5)
476
Brian Quinlan1d1df822011-01-03 02:56:39 +0000477
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200478class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
Antoine Pitroudd696492011-06-08 17:21:55 +0200479 def test_killed_child(self):
480 # When a child process is abruptly terminated, the whole pool gets
481 # "broken".
482 futures = [self.executor.submit(time.sleep, 3)]
483 # Get one of the processes, and terminate (kill) it
484 p = next(iter(self.executor._processes.values()))
485 p.terminate()
486 for fut in futures:
487 self.assertRaises(BrokenProcessPool, fut.result)
488 # Submitting other jobs fails as well.
489 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000490
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200491 def test_map_chunksize(self):
492 def bad_map():
493 list(self.executor.map(pow, range(40), range(40), chunksize=-1))
494
495 ref = list(map(pow, range(40), range(40)))
496 self.assertEqual(
497 list(self.executor.map(pow, range(40), range(40), chunksize=6)),
498 ref)
499 self.assertEqual(
500 list(self.executor.map(pow, range(40), range(40), chunksize=50)),
501 ref)
502 self.assertEqual(
503 list(self.executor.map(pow, range(40), range(40), chunksize=40)),
504 ref)
505 self.assertRaises(ValueError, bad_map)
506
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100507 @classmethod
508 def _test_traceback(cls):
509 raise RuntimeError(123) # some comment
510
511 def test_traceback(self):
512 # We want ensure that the traceback from the child process is
513 # contained in the traceback raised in the main process.
514 future = self.executor.submit(self._test_traceback)
515 with self.assertRaises(Exception) as cm:
516 future.result()
517
518 exc = cm.exception
519 self.assertIs(type(exc), RuntimeError)
520 self.assertEqual(exc.args, (123,))
521 cause = exc.__cause__
522 self.assertIs(type(cause), futures.process._RemoteTraceback)
523 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
524
525 with test.support.captured_stderr() as f1:
526 try:
527 raise exc
528 except RuntimeError:
529 sys.excepthook(*sys.exc_info())
530 self.assertIn('raise RuntimeError(123) # some comment',
531 f1.getvalue())
532
Brian Quinlan1d1df822011-01-03 02:56:39 +0000533
Brian Quinlan81c4d362010-09-18 22:35:02 +0000534class FutureTests(unittest.TestCase):
535 def test_done_callback_with_result(self):
536 callback_result = None
537 def fn(callback_future):
538 nonlocal callback_result
539 callback_result = callback_future.result()
540
541 f = Future()
542 f.add_done_callback(fn)
543 f.set_result(5)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000544 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000545
546 def test_done_callback_with_exception(self):
547 callback_exception = None
548 def fn(callback_future):
549 nonlocal callback_exception
550 callback_exception = callback_future.exception()
551
552 f = Future()
553 f.add_done_callback(fn)
554 f.set_exception(Exception('test'))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000555 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000556
557 def test_done_callback_with_cancel(self):
558 was_cancelled = None
559 def fn(callback_future):
560 nonlocal was_cancelled
561 was_cancelled = callback_future.cancelled()
562
563 f = Future()
564 f.add_done_callback(fn)
565 self.assertTrue(f.cancel())
566 self.assertTrue(was_cancelled)
567
568 def test_done_callback_raises(self):
Brian Quinlan251cc842010-12-28 21:14:34 +0000569 with test.support.captured_stderr() as stderr:
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000570 raising_was_called = False
571 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000572
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000573 def raising_fn(callback_future):
574 nonlocal raising_was_called
575 raising_was_called = True
576 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000577
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000578 def fn(callback_future):
579 nonlocal fn_was_called
580 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +0000581
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000582 f = Future()
583 f.add_done_callback(raising_fn)
584 f.add_done_callback(fn)
585 f.set_result(5)
586 self.assertTrue(raising_was_called)
587 self.assertTrue(fn_was_called)
Brian Quinlan251cc842010-12-28 21:14:34 +0000588 self.assertIn('Exception: doh!', stderr.getvalue())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000589
590 def test_done_callback_already_successful(self):
591 callback_result = None
592 def fn(callback_future):
593 nonlocal callback_result
594 callback_result = callback_future.result()
595
596 f = Future()
597 f.set_result(5)
598 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000599 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000600
601 def test_done_callback_already_failed(self):
602 callback_exception = None
603 def fn(callback_future):
604 nonlocal callback_exception
605 callback_exception = callback_future.exception()
606
607 f = Future()
608 f.set_exception(Exception('test'))
609 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000610 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000611
612 def test_done_callback_already_cancelled(self):
613 was_cancelled = None
614 def fn(callback_future):
615 nonlocal was_cancelled
616 was_cancelled = callback_future.cancelled()
617
618 f = Future()
619 self.assertTrue(f.cancel())
620 f.add_done_callback(fn)
621 self.assertTrue(was_cancelled)
622
623 def test_repr(self):
Ezio Melottied3a7d22010-12-01 02:32:32 +0000624 self.assertRegex(repr(PENDING_FUTURE),
625 '<Future at 0x[0-9a-f]+ state=pending>')
626 self.assertRegex(repr(RUNNING_FUTURE),
627 '<Future at 0x[0-9a-f]+ state=running>')
628 self.assertRegex(repr(CANCELLED_FUTURE),
629 '<Future at 0x[0-9a-f]+ state=cancelled>')
630 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
631 '<Future at 0x[0-9a-f]+ state=cancelled>')
632 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000633 repr(EXCEPTION_FUTURE),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200634 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
Ezio Melottied3a7d22010-12-01 02:32:32 +0000635 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000636 repr(SUCCESSFUL_FUTURE),
637 '<Future at 0x[0-9a-f]+ state=finished returned int>')
638
639
640 def test_cancel(self):
641 f1 = create_future(state=PENDING)
642 f2 = create_future(state=RUNNING)
643 f3 = create_future(state=CANCELLED)
644 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200645 f5 = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000646 f6 = create_future(state=FINISHED, result=5)
647
648 self.assertTrue(f1.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000649 self.assertEqual(f1._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000650
651 self.assertFalse(f2.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000652 self.assertEqual(f2._state, RUNNING)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000653
654 self.assertTrue(f3.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000655 self.assertEqual(f3._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000656
657 self.assertTrue(f4.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000658 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000659
660 self.assertFalse(f5.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000661 self.assertEqual(f5._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000662
663 self.assertFalse(f6.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000664 self.assertEqual(f6._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000665
666 def test_cancelled(self):
667 self.assertFalse(PENDING_FUTURE.cancelled())
668 self.assertFalse(RUNNING_FUTURE.cancelled())
669 self.assertTrue(CANCELLED_FUTURE.cancelled())
670 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
671 self.assertFalse(EXCEPTION_FUTURE.cancelled())
672 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
673
674 def test_done(self):
675 self.assertFalse(PENDING_FUTURE.done())
676 self.assertFalse(RUNNING_FUTURE.done())
677 self.assertTrue(CANCELLED_FUTURE.done())
678 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
679 self.assertTrue(EXCEPTION_FUTURE.done())
680 self.assertTrue(SUCCESSFUL_FUTURE.done())
681
682 def test_running(self):
683 self.assertFalse(PENDING_FUTURE.running())
684 self.assertTrue(RUNNING_FUTURE.running())
685 self.assertFalse(CANCELLED_FUTURE.running())
686 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
687 self.assertFalse(EXCEPTION_FUTURE.running())
688 self.assertFalse(SUCCESSFUL_FUTURE.running())
689
690 def test_result_with_timeout(self):
691 self.assertRaises(futures.TimeoutError,
692 PENDING_FUTURE.result, timeout=0)
693 self.assertRaises(futures.TimeoutError,
694 RUNNING_FUTURE.result, timeout=0)
695 self.assertRaises(futures.CancelledError,
696 CANCELLED_FUTURE.result, timeout=0)
697 self.assertRaises(futures.CancelledError,
698 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200699 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000700 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
701
702 def test_result_with_success(self):
Martin Panter46f50722016-05-26 05:35:26 +0000703 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000704 def notification():
705 # Wait until the main thread is waiting for the result.
706 time.sleep(1)
707 f1.set_result(42)
708
709 f1 = create_future(state=PENDING)
710 t = threading.Thread(target=notification)
711 t.start()
712
Ezio Melottib3aedd42010-11-20 19:04:17 +0000713 self.assertEqual(f1.result(timeout=5), 42)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000714
715 def test_result_with_cancel(self):
Martin Panter46f50722016-05-26 05:35:26 +0000716 # TODO(brian@sweetapp.com): This test is timing dependent.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000717 def notification():
718 # Wait until the main thread is waiting for the result.
719 time.sleep(1)
720 f1.cancel()
721
722 f1 = create_future(state=PENDING)
723 t = threading.Thread(target=notification)
724 t.start()
725
726 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
727
728 def test_exception_with_timeout(self):
729 self.assertRaises(futures.TimeoutError,
730 PENDING_FUTURE.exception, timeout=0)
731 self.assertRaises(futures.TimeoutError,
732 RUNNING_FUTURE.exception, timeout=0)
733 self.assertRaises(futures.CancelledError,
734 CANCELLED_FUTURE.exception, timeout=0)
735 self.assertRaises(futures.CancelledError,
736 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
737 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200738 OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000739 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
740
741 def test_exception_with_success(self):
742 def notification():
743 # Wait until the main thread is waiting for the exception.
744 time.sleep(1)
745 with f1._condition:
746 f1._state = FINISHED
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200747 f1._exception = OSError()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000748 f1._condition.notify_all()
749
750 f1 = create_future(state=PENDING)
751 t = threading.Thread(target=notification)
752 t.start()
753
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200754 self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000755
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200756@test.support.reap_threads
Brian Quinlan81c4d362010-09-18 22:35:02 +0000757def test_main():
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200758 try:
Antoine Pitrou9816a1e2013-10-15 23:23:32 +0200759 test.support.run_unittest(__name__)
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200760 finally:
761 test.support.reap_children()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000762
763if __name__ == "__main__":
764 test_main()