blob: 6427368fd673b43675dcb7b127f688342dc8089c [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
7# import threading after _multiprocessing to raise a more revelant error
8# message: "No module named _multiprocessing". _multiprocessing is not compiled
9# without thread support.
10test.support.import_module('threading')
11
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +000012import io
13import logging
Brian Quinlan81c4d362010-09-18 22:35:02 +000014import multiprocessing
15import sys
16import threading
17import time
18import unittest
19
20if sys.platform.startswith('win'):
21 import ctypes
22 import ctypes.wintypes
23
24from concurrent import futures
25from concurrent.futures._base import (
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +000026 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
27 LOGGER, STDERR_HANDLER, wait)
Brian Quinlan81c4d362010-09-18 22:35:02 +000028import concurrent.futures.process
29
30def create_future(state=PENDING, exception=None, result=None):
31 f = Future()
32 f._state = state
33 f._exception = exception
34 f._result = result
35 return f
36
37PENDING_FUTURE = create_future(state=PENDING)
38RUNNING_FUTURE = create_future(state=RUNNING)
39CANCELLED_FUTURE = create_future(state=CANCELLED)
40CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
41EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
42SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
43
44def mul(x, y):
45 return x * y
46
47class Call(object):
48 """A call that can be submitted to a future.Executor for testing.
49
50 The call signals when it is called and waits for an event before finishing.
51 """
52 CALL_LOCKS = {}
53 def _create_event(self):
54 if sys.platform.startswith('win'):
55 class SECURITY_ATTRIBUTES(ctypes.Structure):
56 _fields_ = [("nLength", ctypes.wintypes.DWORD),
57 ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
58 ("bInheritHandle", ctypes.wintypes.BOOL)]
59
60 s = SECURITY_ATTRIBUTES()
61 s.nLength = ctypes.sizeof(s)
62 s.lpSecurityDescriptor = None
63 s.bInheritHandle = True
64
65 handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
66 True,
67 False,
68 None)
69 assert handle is not None
70 return handle
71 else:
72 event = multiprocessing.Event()
73 self.CALL_LOCKS[id(event)] = event
74 return id(event)
75
76 def _wait_on_event(self, handle):
77 if sys.platform.startswith('win'):
78 r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
79 assert r == 0
80 else:
81 self.CALL_LOCKS[handle].wait()
82
83 def _signal_event(self, handle):
84 if sys.platform.startswith('win'):
85 r = ctypes.windll.kernel32.SetEvent(handle)
86 assert r != 0
87 else:
88 self.CALL_LOCKS[handle].set()
89
90 def __init__(self, manual_finish=False, result=42):
91 self._called_event = self._create_event()
92 self._can_finish = self._create_event()
93
94 self._result = result
95
96 if not manual_finish:
97 self._signal_event(self._can_finish)
98
99 def wait_on_called(self):
100 self._wait_on_event(self._called_event)
101
102 def set_can(self):
103 self._signal_event(self._can_finish)
104
105 def __call__(self):
106 self._signal_event(self._called_event)
107 self._wait_on_event(self._can_finish)
108
109 return self._result
110
111 def close(self):
112 self.set_can()
113 if sys.platform.startswith('win'):
114 ctypes.windll.kernel32.CloseHandle(self._called_event)
115 ctypes.windll.kernel32.CloseHandle(self._can_finish)
116 else:
117 del self.CALL_LOCKS[self._called_event]
118 del self.CALL_LOCKS[self._can_finish]
119
120class ExceptionCall(Call):
121 def __call__(self):
122 self._signal_event(self._called_event)
123 self._wait_on_event(self._can_finish)
124 raise ZeroDivisionError()
125
126class MapCall(Call):
127 def __init__(self, result=42):
128 super().__init__(manual_finish=True, result=result)
129
130 def __call__(self, manual_finish):
131 if manual_finish:
132 super().__call__()
133 return self._result
134
135class ExecutorShutdownTest(unittest.TestCase):
136 def test_run_after_shutdown(self):
137 self.executor.shutdown()
138 self.assertRaises(RuntimeError,
139 self.executor.submit,
140 pow, 2, 5)
141
142
143 def _start_some_futures(self):
144 call1 = Call(manual_finish=True)
145 call2 = Call(manual_finish=True)
146 call3 = Call(manual_finish=True)
147
148 try:
149 self.executor.submit(call1)
150 self.executor.submit(call2)
151 self.executor.submit(call3)
152
153 call1.wait_on_called()
154 call2.wait_on_called()
155 call3.wait_on_called()
156
157 call1.set_can()
158 call2.set_can()
159 call3.set_can()
160 finally:
161 call1.close()
162 call2.close()
163 call3.close()
164
165class ThreadPoolShutdownTest(ExecutorShutdownTest):
166 def setUp(self):
167 self.executor = futures.ThreadPoolExecutor(max_workers=5)
168
169 def tearDown(self):
170 self.executor.shutdown(wait=True)
171
172 def test_threads_terminate(self):
173 self._start_some_futures()
174 self.assertEqual(len(self.executor._threads), 3)
175 self.executor.shutdown()
176 for t in self.executor._threads:
177 t.join()
178
179 def test_context_manager_shutdown(self):
180 with futures.ThreadPoolExecutor(max_workers=5) as e:
181 executor = e
182 self.assertEqual(list(e.map(abs, range(-5, 5))),
183 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
184
185 for t in executor._threads:
186 t.join()
187
188 def test_del_shutdown(self):
189 executor = futures.ThreadPoolExecutor(max_workers=5)
190 executor.map(abs, range(-5, 5))
191 threads = executor._threads
192 del executor
193
194 for t in threads:
195 t.join()
196
197class ProcessPoolShutdownTest(ExecutorShutdownTest):
198 def setUp(self):
199 self.executor = futures.ProcessPoolExecutor(max_workers=5)
200
201 def tearDown(self):
202 self.executor.shutdown(wait=True)
203
204 def test_processes_terminate(self):
205 self._start_some_futures()
206 self.assertEqual(len(self.executor._processes), 5)
207 processes = self.executor._processes
208 self.executor.shutdown()
209
210 for p in processes:
211 p.join()
212
213 def test_context_manager_shutdown(self):
214 with futures.ProcessPoolExecutor(max_workers=5) as e:
215 executor = e
216 self.assertEqual(list(e.map(abs, range(-5, 5))),
217 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
218
219 for p in self.executor._processes:
220 p.join()
221
222 def test_del_shutdown(self):
223 executor = futures.ProcessPoolExecutor(max_workers=5)
224 list(executor.map(abs, range(-5, 5)))
225 queue_management_thread = executor._queue_management_thread
226 processes = executor._processes
227 del executor
228
229 queue_management_thread.join()
230 for p in processes:
231 p.join()
232
233class WaitTests(unittest.TestCase):
234 def test_first_completed(self):
235 def wait_test():
236 while not future1._waiters:
237 pass
238 call1.set_can()
239
240 call1 = Call(manual_finish=True)
241 call2 = Call(manual_finish=True)
242 try:
243 future1 = self.executor.submit(call1)
244 future2 = self.executor.submit(call2)
245
246 t = threading.Thread(target=wait_test)
247 t.start()
248 done, not_done = futures.wait(
249 [CANCELLED_FUTURE, future1, future2],
250 return_when=futures.FIRST_COMPLETED)
251
252 self.assertEquals(set([future1]), done)
253 self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
254 finally:
255 call1.close()
256 call2.close()
257
258 def test_first_completed_one_already_completed(self):
259 call1 = Call(manual_finish=True)
260 try:
261 future1 = self.executor.submit(call1)
262
263 finished, pending = futures.wait(
264 [SUCCESSFUL_FUTURE, future1],
265 return_when=futures.FIRST_COMPLETED)
266
267 self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
268 self.assertEquals(set([future1]), pending)
269 finally:
270 call1.close()
271
272 def test_first_exception(self):
273 def wait_test():
274 while not future1._waiters:
275 pass
276 call1.set_can()
277 call2.set_can()
278
279 call1 = Call(manual_finish=True)
280 call2 = ExceptionCall(manual_finish=True)
281 call3 = Call(manual_finish=True)
282 try:
283 future1 = self.executor.submit(call1)
284 future2 = self.executor.submit(call2)
285 future3 = self.executor.submit(call3)
286
287 t = threading.Thread(target=wait_test)
288 t.start()
289 finished, pending = futures.wait(
290 [future1, future2, future3],
291 return_when=futures.FIRST_EXCEPTION)
292
293 self.assertEquals(set([future1, future2]), finished)
294 self.assertEquals(set([future3]), pending)
295 finally:
296 call1.close()
297 call2.close()
298 call3.close()
299
300 def test_first_exception_some_already_complete(self):
301 def wait_test():
302 while not future1._waiters:
303 pass
304 call1.set_can()
305
306 call1 = ExceptionCall(manual_finish=True)
307 call2 = Call(manual_finish=True)
308 try:
309 future1 = self.executor.submit(call1)
310 future2 = self.executor.submit(call2)
311
312 t = threading.Thread(target=wait_test)
313 t.start()
314 finished, pending = futures.wait(
315 [SUCCESSFUL_FUTURE,
316 CANCELLED_FUTURE,
317 CANCELLED_AND_NOTIFIED_FUTURE,
318 future1, future2],
319 return_when=futures.FIRST_EXCEPTION)
320
321 self.assertEquals(set([SUCCESSFUL_FUTURE,
322 CANCELLED_AND_NOTIFIED_FUTURE,
323 future1]), finished)
324 self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
325
326
327 finally:
328 call1.close()
329 call2.close()
330
331 def test_first_exception_one_already_failed(self):
332 call1 = Call(manual_finish=True)
333 try:
334 future1 = self.executor.submit(call1)
335
336 finished, pending = futures.wait(
337 [EXCEPTION_FUTURE, future1],
338 return_when=futures.FIRST_EXCEPTION)
339
340 self.assertEquals(set([EXCEPTION_FUTURE]), finished)
341 self.assertEquals(set([future1]), pending)
342 finally:
343 call1.close()
344
345 def test_all_completed(self):
346 def wait_test():
347 while not future1._waiters:
348 pass
349 call1.set_can()
350 call2.set_can()
351
352 call1 = Call(manual_finish=True)
353 call2 = Call(manual_finish=True)
354 try:
355 future1 = self.executor.submit(call1)
356 future2 = self.executor.submit(call2)
357
358 t = threading.Thread(target=wait_test)
359 t.start()
360 finished, pending = futures.wait(
361 [future1, future2],
362 return_when=futures.ALL_COMPLETED)
363
364 self.assertEquals(set([future1, future2]), finished)
365 self.assertEquals(set(), pending)
366
367
368 finally:
369 call1.close()
370 call2.close()
371
372 def test_all_completed_some_already_completed(self):
373 def wait_test():
374 while not future1._waiters:
375 pass
376
377 future4.cancel()
378 call1.set_can()
379 call2.set_can()
380 call3.set_can()
381
382 self.assertLessEqual(
383 futures.process.EXTRA_QUEUED_CALLS,
384 1,
385 'this test assumes that future4 will be cancelled before it is '
386 'queued to run - which might not be the case if '
387 'ProcessPoolExecutor is too aggresive in scheduling futures')
388 call1 = Call(manual_finish=True)
389 call2 = Call(manual_finish=True)
390 call3 = Call(manual_finish=True)
391 call4 = Call(manual_finish=True)
392 try:
393 future1 = self.executor.submit(call1)
394 future2 = self.executor.submit(call2)
395 future3 = self.executor.submit(call3)
396 future4 = self.executor.submit(call4)
397
398 t = threading.Thread(target=wait_test)
399 t.start()
400 finished, pending = futures.wait(
401 [SUCCESSFUL_FUTURE,
402 CANCELLED_AND_NOTIFIED_FUTURE,
403 future1, future2, future3, future4],
404 return_when=futures.ALL_COMPLETED)
405
406 self.assertEquals(set([SUCCESSFUL_FUTURE,
407 CANCELLED_AND_NOTIFIED_FUTURE,
408 future1, future2, future3, future4]),
409 finished)
410 self.assertEquals(set(), pending)
411 finally:
412 call1.close()
413 call2.close()
414 call3.close()
415 call4.close()
416
417 def test_timeout(self):
418 def wait_test():
419 while not future1._waiters:
420 pass
421 call1.set_can()
422
423 call1 = Call(manual_finish=True)
424 call2 = Call(manual_finish=True)
425 try:
426 future1 = self.executor.submit(call1)
427 future2 = self.executor.submit(call2)
428
429 t = threading.Thread(target=wait_test)
430 t.start()
431 finished, pending = futures.wait(
432 [CANCELLED_AND_NOTIFIED_FUTURE,
433 EXCEPTION_FUTURE,
434 SUCCESSFUL_FUTURE,
435 future1, future2],
436 timeout=1,
437 return_when=futures.ALL_COMPLETED)
438
439 self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
440 EXCEPTION_FUTURE,
441 SUCCESSFUL_FUTURE,
442 future1]), finished)
443 self.assertEquals(set([future2]), pending)
444
445
446 finally:
447 call1.close()
448 call2.close()
449
450
451class ThreadPoolWaitTests(WaitTests):
452 def setUp(self):
453 self.executor = futures.ThreadPoolExecutor(max_workers=1)
454
455 def tearDown(self):
456 self.executor.shutdown(wait=True)
457
458class ProcessPoolWaitTests(WaitTests):
459 def setUp(self):
460 self.executor = futures.ProcessPoolExecutor(max_workers=1)
461
462 def tearDown(self):
463 self.executor.shutdown(wait=True)
464
465class AsCompletedTests(unittest.TestCase):
466 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
467 def test_no_timeout(self):
468 def wait_test():
469 while not future1._waiters:
470 pass
471 call1.set_can()
472 call2.set_can()
473
474 call1 = Call(manual_finish=True)
475 call2 = Call(manual_finish=True)
476 try:
477 future1 = self.executor.submit(call1)
478 future2 = self.executor.submit(call2)
479
480 t = threading.Thread(target=wait_test)
481 t.start()
482 completed = set(futures.as_completed(
483 [CANCELLED_AND_NOTIFIED_FUTURE,
484 EXCEPTION_FUTURE,
485 SUCCESSFUL_FUTURE,
486 future1, future2]))
487 self.assertEquals(set(
488 [CANCELLED_AND_NOTIFIED_FUTURE,
489 EXCEPTION_FUTURE,
490 SUCCESSFUL_FUTURE,
491 future1, future2]),
492 completed)
493 finally:
494 call1.close()
495 call2.close()
496
497 def test_zero_timeout(self):
498 call1 = Call(manual_finish=True)
499 try:
500 future1 = self.executor.submit(call1)
501 completed_futures = set()
502 try:
503 for future in futures.as_completed(
504 [CANCELLED_AND_NOTIFIED_FUTURE,
505 EXCEPTION_FUTURE,
506 SUCCESSFUL_FUTURE,
507 future1],
508 timeout=0):
509 completed_futures.add(future)
510 except futures.TimeoutError:
511 pass
512
513 self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
514 EXCEPTION_FUTURE,
515 SUCCESSFUL_FUTURE]),
516 completed_futures)
517 finally:
518 call1.close()
519
520class ThreadPoolAsCompletedTests(AsCompletedTests):
521 def setUp(self):
522 self.executor = futures.ThreadPoolExecutor(max_workers=1)
523
524 def tearDown(self):
525 self.executor.shutdown(wait=True)
526
527class ProcessPoolAsCompletedTests(AsCompletedTests):
528 def setUp(self):
529 self.executor = futures.ProcessPoolExecutor(max_workers=1)
530
531 def tearDown(self):
532 self.executor.shutdown(wait=True)
533
534class ExecutorTest(unittest.TestCase):
535 # Executor.shutdown() and context manager usage is tested by
536 # ExecutorShutdownTest.
537 def test_submit(self):
538 future = self.executor.submit(pow, 2, 8)
539 self.assertEquals(256, future.result())
540
541 def test_submit_keyword(self):
542 future = self.executor.submit(mul, 2, y=8)
543 self.assertEquals(16, future.result())
544
545 def test_map(self):
546 self.assertEqual(
547 list(self.executor.map(pow, range(10), range(10))),
548 list(map(pow, range(10), range(10))))
549
550 def test_map_exception(self):
551 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
552 self.assertEqual(i.__next__(), (0, 1))
553 self.assertEqual(i.__next__(), (0, 1))
554 self.assertRaises(ZeroDivisionError, i.__next__)
555
556 def test_map_timeout(self):
557 results = []
558 timeout_call = MapCall()
559 try:
560 try:
561 for i in self.executor.map(timeout_call,
562 [False, False, True],
563 timeout=1):
564 results.append(i)
565 except futures.TimeoutError:
566 pass
567 else:
568 self.fail('expected TimeoutError')
569 finally:
570 timeout_call.close()
571
572 self.assertEquals([42, 42], results)
573
574class ThreadPoolExecutorTest(ExecutorTest):
575 def setUp(self):
576 self.executor = futures.ThreadPoolExecutor(max_workers=1)
577
578 def tearDown(self):
579 self.executor.shutdown(wait=True)
580
581class ProcessPoolExecutorTest(ExecutorTest):
582 def setUp(self):
583 self.executor = futures.ProcessPoolExecutor(max_workers=1)
584
585 def tearDown(self):
586 self.executor.shutdown(wait=True)
587
588class FutureTests(unittest.TestCase):
589 def test_done_callback_with_result(self):
590 callback_result = None
591 def fn(callback_future):
592 nonlocal callback_result
593 callback_result = callback_future.result()
594
595 f = Future()
596 f.add_done_callback(fn)
597 f.set_result(5)
598 self.assertEquals(5, callback_result)
599
600 def test_done_callback_with_exception(self):
601 callback_exception = None
602 def fn(callback_future):
603 nonlocal callback_exception
604 callback_exception = callback_future.exception()
605
606 f = Future()
607 f.add_done_callback(fn)
608 f.set_exception(Exception('test'))
609 self.assertEquals(('test',), callback_exception.args)
610
611 def test_done_callback_with_cancel(self):
612 was_cancelled = None
613 def fn(callback_future):
614 nonlocal was_cancelled
615 was_cancelled = callback_future.cancelled()
616
617 f = Future()
618 f.add_done_callback(fn)
619 self.assertTrue(f.cancel())
620 self.assertTrue(was_cancelled)
621
622 def test_done_callback_raises(self):
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000623 LOGGER.removeHandler(STDERR_HANDLER)
624 logging_stream = io.StringIO()
625 handler = logging.StreamHandler(logging_stream)
626 LOGGER.addHandler(handler)
627 try:
628 raising_was_called = False
629 fn_was_called = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000630
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000631 def raising_fn(callback_future):
632 nonlocal raising_was_called
633 raising_was_called = True
634 raise Exception('doh!')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000635
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000636 def fn(callback_future):
637 nonlocal fn_was_called
638 fn_was_called = True
Brian Quinlan81c4d362010-09-18 22:35:02 +0000639
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +0000640 f = Future()
641 f.add_done_callback(raising_fn)
642 f.add_done_callback(fn)
643 f.set_result(5)
644 self.assertTrue(raising_was_called)
645 self.assertTrue(fn_was_called)
646 self.assertIn('Exception: doh!', logging_stream.getvalue())
647 finally:
648 LOGGER.removeHandler(handler)
649 LOGGER.addHandler(STDERR_HANDLER)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000650
651 def test_done_callback_already_successful(self):
652 callback_result = None
653 def fn(callback_future):
654 nonlocal callback_result
655 callback_result = callback_future.result()
656
657 f = Future()
658 f.set_result(5)
659 f.add_done_callback(fn)
660 self.assertEquals(5, callback_result)
661
662 def test_done_callback_already_failed(self):
663 callback_exception = None
664 def fn(callback_future):
665 nonlocal callback_exception
666 callback_exception = callback_future.exception()
667
668 f = Future()
669 f.set_exception(Exception('test'))
670 f.add_done_callback(fn)
671 self.assertEquals(('test',), callback_exception.args)
672
673 def test_done_callback_already_cancelled(self):
674 was_cancelled = None
675 def fn(callback_future):
676 nonlocal was_cancelled
677 was_cancelled = callback_future.cancelled()
678
679 f = Future()
680 self.assertTrue(f.cancel())
681 f.add_done_callback(fn)
682 self.assertTrue(was_cancelled)
683
684 def test_repr(self):
685 self.assertRegexpMatches(repr(PENDING_FUTURE),
686 '<Future at 0x[0-9a-f]+ state=pending>')
687 self.assertRegexpMatches(repr(RUNNING_FUTURE),
688 '<Future at 0x[0-9a-f]+ state=running>')
689 self.assertRegexpMatches(repr(CANCELLED_FUTURE),
690 '<Future at 0x[0-9a-f]+ state=cancelled>')
691 self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
692 '<Future at 0x[0-9a-f]+ state=cancelled>')
693 self.assertRegexpMatches(
694 repr(EXCEPTION_FUTURE),
695 '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
696 self.assertRegexpMatches(
697 repr(SUCCESSFUL_FUTURE),
698 '<Future at 0x[0-9a-f]+ state=finished returned int>')
699
700
701 def test_cancel(self):
702 f1 = create_future(state=PENDING)
703 f2 = create_future(state=RUNNING)
704 f3 = create_future(state=CANCELLED)
705 f4 = create_future(state=CANCELLED_AND_NOTIFIED)
706 f5 = create_future(state=FINISHED, exception=IOError())
707 f6 = create_future(state=FINISHED, result=5)
708
709 self.assertTrue(f1.cancel())
710 self.assertEquals(f1._state, CANCELLED)
711
712 self.assertFalse(f2.cancel())
713 self.assertEquals(f2._state, RUNNING)
714
715 self.assertTrue(f3.cancel())
716 self.assertEquals(f3._state, CANCELLED)
717
718 self.assertTrue(f4.cancel())
719 self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
720
721 self.assertFalse(f5.cancel())
722 self.assertEquals(f5._state, FINISHED)
723
724 self.assertFalse(f6.cancel())
725 self.assertEquals(f6._state, FINISHED)
726
727 def test_cancelled(self):
728 self.assertFalse(PENDING_FUTURE.cancelled())
729 self.assertFalse(RUNNING_FUTURE.cancelled())
730 self.assertTrue(CANCELLED_FUTURE.cancelled())
731 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
732 self.assertFalse(EXCEPTION_FUTURE.cancelled())
733 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
734
735 def test_done(self):
736 self.assertFalse(PENDING_FUTURE.done())
737 self.assertFalse(RUNNING_FUTURE.done())
738 self.assertTrue(CANCELLED_FUTURE.done())
739 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
740 self.assertTrue(EXCEPTION_FUTURE.done())
741 self.assertTrue(SUCCESSFUL_FUTURE.done())
742
743 def test_running(self):
744 self.assertFalse(PENDING_FUTURE.running())
745 self.assertTrue(RUNNING_FUTURE.running())
746 self.assertFalse(CANCELLED_FUTURE.running())
747 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
748 self.assertFalse(EXCEPTION_FUTURE.running())
749 self.assertFalse(SUCCESSFUL_FUTURE.running())
750
751 def test_result_with_timeout(self):
752 self.assertRaises(futures.TimeoutError,
753 PENDING_FUTURE.result, timeout=0)
754 self.assertRaises(futures.TimeoutError,
755 RUNNING_FUTURE.result, timeout=0)
756 self.assertRaises(futures.CancelledError,
757 CANCELLED_FUTURE.result, timeout=0)
758 self.assertRaises(futures.CancelledError,
759 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
760 self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
761 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
762
763 def test_result_with_success(self):
764 # TODO(brian@sweetapp.com): This test is timing dependant.
765 def notification():
766 # Wait until the main thread is waiting for the result.
767 time.sleep(1)
768 f1.set_result(42)
769
770 f1 = create_future(state=PENDING)
771 t = threading.Thread(target=notification)
772 t.start()
773
774 self.assertEquals(f1.result(timeout=5), 42)
775
776 def test_result_with_cancel(self):
777 # TODO(brian@sweetapp.com): This test is timing dependant.
778 def notification():
779 # Wait until the main thread is waiting for the result.
780 time.sleep(1)
781 f1.cancel()
782
783 f1 = create_future(state=PENDING)
784 t = threading.Thread(target=notification)
785 t.start()
786
787 self.assertRaises(futures.CancelledError, f1.result, timeout=5)
788
789 def test_exception_with_timeout(self):
790 self.assertRaises(futures.TimeoutError,
791 PENDING_FUTURE.exception, timeout=0)
792 self.assertRaises(futures.TimeoutError,
793 RUNNING_FUTURE.exception, timeout=0)
794 self.assertRaises(futures.CancelledError,
795 CANCELLED_FUTURE.exception, timeout=0)
796 self.assertRaises(futures.CancelledError,
797 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
798 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
799 IOError))
800 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
801
802 def test_exception_with_success(self):
803 def notification():
804 # Wait until the main thread is waiting for the exception.
805 time.sleep(1)
806 with f1._condition:
807 f1._state = FINISHED
808 f1._exception = IOError()
809 f1._condition.notify_all()
810
811 f1 = create_future(state=PENDING)
812 t = threading.Thread(target=notification)
813 t.start()
814
815 self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
816
817def test_main():
818 test.support.run_unittest(ProcessPoolExecutorTest,
819 ThreadPoolExecutorTest,
820 ProcessPoolWaitTests,
821 ThreadPoolWaitTests,
822 ProcessPoolAsCompletedTests,
823 ThreadPoolAsCompletedTests,
824 FutureTests,
825 ProcessPoolShutdownTest,
826 ThreadPoolShutdownTest)
827
828if __name__ == "__main__":
829 test_main()