blob: 7522a54acbbe9877658fcf778e1b0ac3dd11535f [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
7# import threading after _multiprocessing to raise a more revelant error
8# message: "No module named _multiprocessing". _multiprocessing is not compiled
9# without thread support.
10test.support.import_module('threading')
11
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010012from test.script_helper import assert_python_ok
13
14import sys
Brian Quinlan81c4d362010-09-18 22:35:02 +000015import threading
16import time
17import unittest
18
Brian Quinlan81c4d362010-09-18 22:35:02 +000019from concurrent import futures
20from concurrent.futures._base import (
Brian Quinlan1d1df822011-01-03 02:56:39 +000021 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
Antoine Pitroudd696492011-06-08 17:21:55 +020022from concurrent.futures.process import BrokenProcessPool
Brian Quinlan81c4d362010-09-18 22:35:02 +000023
Brian Quinlan1d1df822011-01-03 02:56:39 +000024
Brian Quinlan81c4d362010-09-18 22:35:02 +000025def create_future(state=PENDING, exception=None, result=None):
26 f = Future()
27 f._state = state
28 f._exception = exception
29 f._result = result
30 return f
31
Brian Quinlan1d1df822011-01-03 02:56:39 +000032
Brian Quinlan81c4d362010-09-18 22:35:02 +000033PENDING_FUTURE = create_future(state=PENDING)
34RUNNING_FUTURE = create_future(state=RUNNING)
35CANCELLED_FUTURE = create_future(state=CANCELLED)
36CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +020037EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +000038SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
39
Brian Quinlan1d1df822011-01-03 02:56:39 +000040
Brian Quinlan81c4d362010-09-18 22:35:02 +000041def mul(x, y):
42 return x * y
43
Brian Quinlan81c4d362010-09-18 22:35:02 +000044
Brian Quinlan1d1df822011-01-03 02:56:39 +000045def sleep_and_raise(t):
46 time.sleep(t)
47 raise Exception('this is an exception')
Brian Quinlan81c4d362010-09-18 22:35:02 +000048
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010049def sleep_and_print(t, msg):
50 time.sleep(t)
51 print(msg)
52 sys.stdout.flush()
53
Brian Quinlan81c4d362010-09-18 22:35:02 +000054
Brian Quinlan1d1df822011-01-03 02:56:39 +000055class ExecutorMixin:
56 worker_count = 5
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010057
58 def setUp(self):
59 self.t1 = time.time()
60 try:
61 self.executor = self.executor_type(max_workers=self.worker_count)
62 except NotImplementedError as e:
63 self.skipTest(str(e))
64 self._prime_executor()
65
66 def tearDown(self):
67 self.executor.shutdown(wait=True)
68 dt = time.time() - self.t1
69 if test.support.verbose:
70 print("%.2fs" % dt, end=' ')
71 self.assertLess(dt, 60, "synchronization issue: test lasted too long")
72
Brian Quinlan1d1df822011-01-03 02:56:39 +000073 def _prime_executor(self):
74 # Make sure that the executor is ready to do work before running the
75 # tests. This should reduce the probability of timeouts in the tests.
76 futures = [self.executor.submit(time.sleep, 0.1)
77 for _ in range(self.worker_count)]
Brian Quinlan81c4d362010-09-18 22:35:02 +000078
Brian Quinlan1d1df822011-01-03 02:56:39 +000079 for f in futures:
80 f.result()
Brian Quinlan81c4d362010-09-18 22:35:02 +000081
Brian Quinlan81c4d362010-09-18 22:35:02 +000082
Brian Quinlan1d1df822011-01-03 02:56:39 +000083class ThreadPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010084 executor_type = futures.ThreadPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000085
Brian Quinlan81c4d362010-09-18 22:35:02 +000086
Brian Quinlan1d1df822011-01-03 02:56:39 +000087class ProcessPoolMixin(ExecutorMixin):
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010088 executor_type = futures.ProcessPoolExecutor
Brian Quinlan81c4d362010-09-18 22:35:02 +000089
Brian Quinlan81c4d362010-09-18 22:35:02 +000090
91class ExecutorShutdownTest(unittest.TestCase):
92 def test_run_after_shutdown(self):
93 self.executor.shutdown()
94 self.assertRaises(RuntimeError,
95 self.executor.submit,
96 pow, 2, 5)
97
Antoine Pitrouaebac0b2011-03-24 15:47:39 +010098 def test_interpreter_shutdown(self):
99 # Test the atexit hook for shutdown of worker threads and processes
100 rc, out, err = assert_python_ok('-c', """if 1:
101 from concurrent.futures import {executor_type}
102 from time import sleep
103 from test.test_concurrent_futures import sleep_and_print
104 t = {executor_type}(5)
105 t.submit(sleep_and_print, 1.0, "apple")
106 """.format(executor_type=self.executor_type.__name__))
107 # Errors in atexit hooks don't change the process exit code, check
108 # stderr manually.
109 self.assertFalse(err)
110 self.assertEqual(out.strip(), b"apple")
111
Brian Quinlan81c4d362010-09-18 22:35:02 +0000112
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000113class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000114 def _prime_executor(self):
115 pass
116
Brian Quinlan81c4d362010-09-18 22:35:02 +0000117 def test_threads_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000118 self.executor.submit(mul, 21, 2)
119 self.executor.submit(mul, 6, 7)
120 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000121 self.assertEqual(len(self.executor._threads), 3)
122 self.executor.shutdown()
123 for t in self.executor._threads:
124 t.join()
125
126 def test_context_manager_shutdown(self):
127 with futures.ThreadPoolExecutor(max_workers=5) as e:
128 executor = e
129 self.assertEqual(list(e.map(abs, range(-5, 5))),
130 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
131
132 for t in executor._threads:
133 t.join()
134
135 def test_del_shutdown(self):
136 executor = futures.ThreadPoolExecutor(max_workers=5)
137 executor.map(abs, range(-5, 5))
138 threads = executor._threads
139 del executor
140
141 for t in threads:
142 t.join()
143
Brian Quinlan1d1df822011-01-03 02:56:39 +0000144
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000145class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000146 def _prime_executor(self):
147 pass
148
Brian Quinlan81c4d362010-09-18 22:35:02 +0000149 def test_processes_terminate(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000150 self.executor.submit(mul, 21, 2)
151 self.executor.submit(mul, 6, 7)
152 self.executor.submit(mul, 3, 14)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000153 self.assertEqual(len(self.executor._processes), 5)
154 processes = self.executor._processes
155 self.executor.shutdown()
156
Antoine Pitroudd696492011-06-08 17:21:55 +0200157 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000158 p.join()
159
160 def test_context_manager_shutdown(self):
161 with futures.ProcessPoolExecutor(max_workers=5) as e:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000162 processes = e._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000163 self.assertEqual(list(e.map(abs, range(-5, 5))),
164 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
165
Antoine Pitroudd696492011-06-08 17:21:55 +0200166 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000167 p.join()
168
169 def test_del_shutdown(self):
170 executor = futures.ProcessPoolExecutor(max_workers=5)
171 list(executor.map(abs, range(-5, 5)))
172 queue_management_thread = executor._queue_management_thread
173 processes = executor._processes
174 del executor
175
176 queue_management_thread.join()
Antoine Pitroudd696492011-06-08 17:21:55 +0200177 for p in processes.values():
Brian Quinlan81c4d362010-09-18 22:35:02 +0000178 p.join()
179
180class WaitTests(unittest.TestCase):
181 def test_first_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000182 future1 = self.executor.submit(mul, 21, 2)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100183 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000184
Brian Quinlan1d1df822011-01-03 02:56:39 +0000185 done, not_done = futures.wait(
186 [CANCELLED_FUTURE, future1, future2],
187 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000188
Brian Quinlan1d1df822011-01-03 02:56:39 +0000189 self.assertEqual(set([future1]), done)
190 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000191
Brian Quinlan1d1df822011-01-03 02:56:39 +0000192 def test_first_completed_some_already_completed(self):
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100193 future1 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000194
Brian Quinlan1d1df822011-01-03 02:56:39 +0000195 finished, pending = futures.wait(
196 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
197 return_when=futures.FIRST_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000198
Brian Quinlan1d1df822011-01-03 02:56:39 +0000199 self.assertEqual(
200 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
201 finished)
202 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000203
204 def test_first_exception(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000205 future1 = self.executor.submit(mul, 2, 21)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100206 future2 = self.executor.submit(sleep_and_raise, 1.5)
207 future3 = self.executor.submit(time.sleep, 3)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000208
Brian Quinlan1d1df822011-01-03 02:56:39 +0000209 finished, pending = futures.wait(
210 [future1, future2, future3],
211 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000212
Brian Quinlan1d1df822011-01-03 02:56:39 +0000213 self.assertEqual(set([future1, future2]), finished)
214 self.assertEqual(set([future3]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000215
216 def test_first_exception_some_already_complete(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000217 future1 = self.executor.submit(divmod, 21, 0)
Antoine Pitrou8e5e9422011-03-22 18:30:30 +0100218 future2 = self.executor.submit(time.sleep, 1.5)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000219
Brian Quinlan1d1df822011-01-03 02:56:39 +0000220 finished, pending = futures.wait(
221 [SUCCESSFUL_FUTURE,
222 CANCELLED_FUTURE,
223 CANCELLED_AND_NOTIFIED_FUTURE,
224 future1, future2],
225 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000226
Brian Quinlan1d1df822011-01-03 02:56:39 +0000227 self.assertEqual(set([SUCCESSFUL_FUTURE,
228 CANCELLED_AND_NOTIFIED_FUTURE,
229 future1]), finished)
230 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000231
232 def test_first_exception_one_already_failed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000233 future1 = self.executor.submit(time.sleep, 2)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000234
Brian Quinlan1d1df822011-01-03 02:56:39 +0000235 finished, pending = futures.wait(
236 [EXCEPTION_FUTURE, future1],
237 return_when=futures.FIRST_EXCEPTION)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000238
Brian Quinlan1d1df822011-01-03 02:56:39 +0000239 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
240 self.assertEqual(set([future1]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000241
242 def test_all_completed(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000243 future1 = self.executor.submit(divmod, 2, 0)
244 future2 = self.executor.submit(mul, 2, 21)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000245
Brian Quinlan1d1df822011-01-03 02:56:39 +0000246 finished, pending = futures.wait(
247 [SUCCESSFUL_FUTURE,
248 CANCELLED_AND_NOTIFIED_FUTURE,
249 EXCEPTION_FUTURE,
250 future1,
251 future2],
252 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000253
Brian Quinlan1d1df822011-01-03 02:56:39 +0000254 self.assertEqual(set([SUCCESSFUL_FUTURE,
255 CANCELLED_AND_NOTIFIED_FUTURE,
256 EXCEPTION_FUTURE,
257 future1,
258 future2]), finished)
259 self.assertEqual(set(), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000260
261 def test_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000262 future1 = self.executor.submit(mul, 6, 7)
Brian Quinlan1ae29982011-05-30 21:52:24 +1000263 future2 = self.executor.submit(time.sleep, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000264
Brian Quinlan1d1df822011-01-03 02:56:39 +0000265 finished, pending = futures.wait(
266 [CANCELLED_AND_NOTIFIED_FUTURE,
267 EXCEPTION_FUTURE,
268 SUCCESSFUL_FUTURE,
269 future1, future2],
Brian Quinlan1ae29982011-05-30 21:52:24 +1000270 timeout=5,
Brian Quinlan1d1df822011-01-03 02:56:39 +0000271 return_when=futures.ALL_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000272
Brian Quinlan1d1df822011-01-03 02:56:39 +0000273 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
274 EXCEPTION_FUTURE,
275 SUCCESSFUL_FUTURE,
276 future1]), finished)
277 self.assertEqual(set([future2]), pending)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000278
279
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000280class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
281 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000282
Brian Quinlan1d1df822011-01-03 02:56:39 +0000283
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000284class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
285 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000286
Brian Quinlan1d1df822011-01-03 02:56:39 +0000287
Brian Quinlan81c4d362010-09-18 22:35:02 +0000288class AsCompletedTests(unittest.TestCase):
289 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
290 def test_no_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000291 future1 = self.executor.submit(mul, 2, 21)
292 future2 = self.executor.submit(mul, 7, 6)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000293
Brian Quinlan1d1df822011-01-03 02:56:39 +0000294 completed = set(futures.as_completed(
295 [CANCELLED_AND_NOTIFIED_FUTURE,
296 EXCEPTION_FUTURE,
297 SUCCESSFUL_FUTURE,
298 future1, future2]))
299 self.assertEqual(set(
300 [CANCELLED_AND_NOTIFIED_FUTURE,
301 EXCEPTION_FUTURE,
302 SUCCESSFUL_FUTURE,
303 future1, future2]),
304 completed)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000305
306 def test_zero_timeout(self):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000307 future1 = self.executor.submit(time.sleep, 2)
308 completed_futures = set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000309 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000310 for future in futures.as_completed(
311 [CANCELLED_AND_NOTIFIED_FUTURE,
312 EXCEPTION_FUTURE,
313 SUCCESSFUL_FUTURE,
314 future1],
315 timeout=0):
316 completed_futures.add(future)
317 except futures.TimeoutError:
318 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000319
Brian Quinlan1d1df822011-01-03 02:56:39 +0000320 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
321 EXCEPTION_FUTURE,
322 SUCCESSFUL_FUTURE]),
323 completed_futures)
324
Brian Quinlan81c4d362010-09-18 22:35:02 +0000325
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000326class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
327 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000328
Brian Quinlan1d1df822011-01-03 02:56:39 +0000329
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000330class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
331 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000332
Brian Quinlan1d1df822011-01-03 02:56:39 +0000333
Brian Quinlan81c4d362010-09-18 22:35:02 +0000334class ExecutorTest(unittest.TestCase):
335 # Executor.shutdown() and context manager usage is tested by
336 # ExecutorShutdownTest.
337 def test_submit(self):
338 future = self.executor.submit(pow, 2, 8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000339 self.assertEqual(256, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000340
341 def test_submit_keyword(self):
342 future = self.executor.submit(mul, 2, y=8)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(16, future.result())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000344
345 def test_map(self):
346 self.assertEqual(
347 list(self.executor.map(pow, range(10), range(10))),
348 list(map(pow, range(10), range(10))))
349
350 def test_map_exception(self):
351 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
352 self.assertEqual(i.__next__(), (0, 1))
353 self.assertEqual(i.__next__(), (0, 1))
354 self.assertRaises(ZeroDivisionError, i.__next__)
355
356 def test_map_timeout(self):
357 results = []
Brian Quinlan81c4d362010-09-18 22:35:02 +0000358 try:
Brian Quinlan1d1df822011-01-03 02:56:39 +0000359 for i in self.executor.map(time.sleep,
Brian Quinlan1ae29982011-05-30 21:52:24 +1000360 [0, 0, 6],
361 timeout=5):
Brian Quinlan1d1df822011-01-03 02:56:39 +0000362 results.append(i)
363 except futures.TimeoutError:
364 pass
365 else:
366 self.fail('expected TimeoutError')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000367
Brian Quinlan1d1df822011-01-03 02:56:39 +0000368 self.assertEqual([None, None], results)
369
Antoine Pitrou020436b2011-07-02 21:20:25 +0200370 def test_shutdown_race_issue12456(self):
371 # Issue #12456: race condition at shutdown where trying to post a
372 # sentinel in the call queue blocks (the queue is full while processes
373 # have exited).
374 self.executor.map(str, [2] * (self.worker_count + 1))
375 self.executor.shutdown()
376
Brian Quinlan81c4d362010-09-18 22:35:02 +0000377
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000378class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
Brian Quinlanf0078762011-04-08 08:19:33 +1000379 def test_map_submits_without_iteration(self):
380 """Tests verifying issue 11777."""
381 finished = []
382 def record_finished(n):
383 finished.append(n)
384
385 self.executor.map(record_finished, range(10))
386 self.executor.shutdown(wait=True)
387 self.assertCountEqual(finished, range(10))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000388
Brian Quinlan1d1df822011-01-03 02:56:39 +0000389
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000390class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
Antoine Pitroudd696492011-06-08 17:21:55 +0200391 def test_killed_child(self):
392 # When a child process is abruptly terminated, the whole pool gets
393 # "broken".
394 futures = [self.executor.submit(time.sleep, 3)]
395 # Get one of the processes, and terminate (kill) it
396 p = next(iter(self.executor._processes.values()))
397 p.terminate()
398 for fut in futures:
399 self.assertRaises(BrokenProcessPool, fut.result)
400 # Submitting other jobs fails as well.
401 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000402
Brian Quinlan1d1df822011-01-03 02:56:39 +0000403
Brian Quinlan81c4d362010-09-18 22:35:02 +0000404class FutureTests(unittest.TestCase):
405 def test_done_callback_with_result(self):
406 callback_result = None
407 def fn(callback_future):
408 nonlocal callback_result
409 callback_result = callback_future.result()
410
411 f = Future()
412 f.add_done_callback(fn)
413 f.set_result(5)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000414 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000415
416 def test_done_callback_with_exception(self):
417 callback_exception = None
418 def fn(callback_future):
419 nonlocal callback_exception
420 callback_exception = callback_future.exception()
421
422 f = Future()
423 f.add_done_callback(fn)
424 f.set_exception(Exception('test'))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000425 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000426
427 def test_done_callback_with_cancel(self):
428 was_cancelled = None
429 def fn(callback_future):
430 nonlocal was_cancelled
431 was_cancelled = callback_future.cancelled()
432
433 f = Future()
434 f.add_done_callback(fn)
435 self.assertTrue(f.cancel())
436 self.assertTrue(was_cancelled)
437
438 def test_done_callback_raises(self):
Brian Quinlan251cc842010-12-28 21:14:34 +0000439 with test.support.captured_stderr() as stderr:
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000440 raising_was_called = False
441 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000442
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000443 def raising_fn(callback_future):
444 nonlocal raising_was_called
445 raising_was_called = True
446 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000447
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000448 def fn(callback_future):
449 nonlocal fn_was_called
450 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +0000451
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000452 f = Future()
453 f.add_done_callback(raising_fn)
454 f.add_done_callback(fn)
455 f.set_result(5)
456 self.assertTrue(raising_was_called)
457 self.assertTrue(fn_was_called)
Brian Quinlan251cc842010-12-28 21:14:34 +0000458 self.assertIn('Exception: doh!', stderr.getvalue())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000459
460 def test_done_callback_already_successful(self):
461 callback_result = None
462 def fn(callback_future):
463 nonlocal callback_result
464 callback_result = callback_future.result()
465
466 f = Future()
467 f.set_result(5)
468 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000469 self.assertEqual(5, callback_result)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000470
471 def test_done_callback_already_failed(self):
472 callback_exception = None
473 def fn(callback_future):
474 nonlocal callback_exception
475 callback_exception = callback_future.exception()
476
477 f = Future()
478 f.set_exception(Exception('test'))
479 f.add_done_callback(fn)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000480 self.assertEqual(('test',), callback_exception.args)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000481
482 def test_done_callback_already_cancelled(self):
483 was_cancelled = None
484 def fn(callback_future):
485 nonlocal was_cancelled
486 was_cancelled = callback_future.cancelled()
487
488 f = Future()
489 self.assertTrue(f.cancel())
490 f.add_done_callback(fn)
491 self.assertTrue(was_cancelled)
492
493 def test_repr(self):
Ezio Melottied3a7d22010-12-01 02:32:32 +0000494 self.assertRegex(repr(PENDING_FUTURE),
495 '<Future at 0x[0-9a-f]+ state=pending>')
496 self.assertRegex(repr(RUNNING_FUTURE),
497 '<Future at 0x[0-9a-f]+ state=running>')
498 self.assertRegex(repr(CANCELLED_FUTURE),
499 '<Future at 0x[0-9a-f]+ state=cancelled>')
500 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
501 '<Future at 0x[0-9a-f]+ state=cancelled>')
502 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000503 repr(EXCEPTION_FUTURE),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200504 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
Ezio Melottied3a7d22010-12-01 02:32:32 +0000505 self.assertRegex(
Brian Quinlan81c4d362010-09-18 22:35:02 +0000506 repr(SUCCESSFUL_FUTURE),
507 '<Future at 0x[0-9a-f]+ state=finished returned int>')
508
509
510 def test_cancel(self):
511 f1 = create_future(state=PENDING)
512 f2 = create_future(state=RUNNING)
513 f3 = create_future(state=CANCELLED)
514 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200515 f5 = create_future(state=FINISHED, exception=OSError())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000516 f6 = create_future(state=FINISHED, result=5)
517
518 self.assertTrue(f1.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000519 self.assertEqual(f1._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000520
521 self.assertFalse(f2.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000522 self.assertEqual(f2._state, RUNNING)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000523
524 self.assertTrue(f3.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000525 self.assertEqual(f3._state, CANCELLED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000526
527 self.assertTrue(f4.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000528 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000529
530 self.assertFalse(f5.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000531 self.assertEqual(f5._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000532
533 self.assertFalse(f6.cancel())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000534 self.assertEqual(f6._state, FINISHED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000535
536 def test_cancelled(self):
537 self.assertFalse(PENDING_FUTURE.cancelled())
538 self.assertFalse(RUNNING_FUTURE.cancelled())
539 self.assertTrue(CANCELLED_FUTURE.cancelled())
540 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
541 self.assertFalse(EXCEPTION_FUTURE.cancelled())
542 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
543
544 def test_done(self):
545 self.assertFalse(PENDING_FUTURE.done())
546 self.assertFalse(RUNNING_FUTURE.done())
547 self.assertTrue(CANCELLED_FUTURE.done())
548 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
549 self.assertTrue(EXCEPTION_FUTURE.done())
550 self.assertTrue(SUCCESSFUL_FUTURE.done())
551
552 def test_running(self):
553 self.assertFalse(PENDING_FUTURE.running())
554 self.assertTrue(RUNNING_FUTURE.running())
555 self.assertFalse(CANCELLED_FUTURE.running())
556 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
557 self.assertFalse(EXCEPTION_FUTURE.running())
558 self.assertFalse(SUCCESSFUL_FUTURE.running())
559
560 def test_result_with_timeout(self):
561 self.assertRaises(futures.TimeoutError,
562 PENDING_FUTURE.result, timeout=0)
563 self.assertRaises(futures.TimeoutError,
564 RUNNING_FUTURE.result, timeout=0)
565 self.assertRaises(futures.CancelledError,
566 CANCELLED_FUTURE.result, timeout=0)
567 self.assertRaises(futures.CancelledError,
568 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200569 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000570 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
571
572 def test_result_with_success(self):
573 # TODO(brian@sweetapp.com): This test is timing dependant.
574 def notification():
575 # Wait until the main thread is waiting for the result.
576 time.sleep(1)
577 f1.set_result(42)
578
579 f1 = create_future(state=PENDING)
580 t = threading.Thread(target=notification)
581 t.start()
582
Ezio Melottib3aedd42010-11-20 19:04:17 +0000583 self.assertEqual(f1.result(timeout=5), 42)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000584
585 def test_result_with_cancel(self):
586 # TODO(brian@sweetapp.com): This test is timing dependant.
587 def notification():
588 # Wait until the main thread is waiting for the result.
589 time.sleep(1)
590 f1.cancel()
591
592 f1 = create_future(state=PENDING)
593 t = threading.Thread(target=notification)
594 t.start()
595
596 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
597
598 def test_exception_with_timeout(self):
599 self.assertRaises(futures.TimeoutError,
600 PENDING_FUTURE.exception, timeout=0)
601 self.assertRaises(futures.TimeoutError,
602 RUNNING_FUTURE.exception, timeout=0)
603 self.assertRaises(futures.CancelledError,
604 CANCELLED_FUTURE.exception, timeout=0)
605 self.assertRaises(futures.CancelledError,
606 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
607 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200608 OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000609 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
610
611 def test_exception_with_success(self):
612 def notification():
613 # Wait until the main thread is waiting for the exception.
614 time.sleep(1)
615 with f1._condition:
616 f1._state = FINISHED
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200617 f1._exception = OSError()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000618 f1._condition.notify_all()
619
620 f1 = create_future(state=PENDING)
621 t = threading.Thread(target=notification)
622 t.start()
623
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200624 self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000625
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200626@test.support.reap_threads
Brian Quinlan81c4d362010-09-18 22:35:02 +0000627def test_main():
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200628 try:
629 test.support.run_unittest(ProcessPoolExecutorTest,
630 ThreadPoolExecutorTest,
631 ProcessPoolWaitTests,
632 ThreadPoolWaitTests,
633 ProcessPoolAsCompletedTests,
634 ThreadPoolAsCompletedTests,
635 FutureTests,
636 ProcessPoolShutdownTest,
Antoine Pitroud06a0652011-07-16 01:13:34 +0200637 ThreadPoolShutdownTest,
638 )
Antoine Pitrou9470ab42011-07-15 20:25:20 +0200639 finally:
640 test.support.reap_children()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000641
642if __name__ == "__main__":
643 test_main()