[2.7] bpo-31234: Join threads explicitly in tests (#7406)
* Add support.wait_threads_exit(): context manager looping at exit
until the number of threads decreases to its original number.
* Add some missing thread.join()
* test_asyncore.test_send(): call explicitly t.join() because the cleanup
function is only called outside the test method, whereas the method
has a @test_support.reap_threads decorator
* test_hashlib: replace threading.Event with thread.join()
* test_thread:
* Use wait_threads_exit() context manager
* Replace test_support with support
* test_forkinthread(): check child process exit status in the
main thread to better handle error.
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index c8caa5d..93690a6 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -1,8 +1,8 @@
import os
import unittest
import random
-from test import test_support
-thread = test_support.import_module('thread')
+from test import support
+thread = support.import_module('thread')
import time
import sys
import weakref
@@ -17,7 +17,7 @@
def verbose_print(arg):
"""Helper function for printing out debugging output."""
- if test_support.verbose:
+ if support.verbose:
with _print_mutex:
print arg
@@ -34,8 +34,8 @@
self.running = 0
self.next_ident = 0
- key = test_support.threading_setup()
- self.addCleanup(test_support.threading_cleanup, *key)
+ key = support.threading_setup()
+ self.addCleanup(support.threading_cleanup, *key)
class ThreadRunningTests(BasicThreadTest):
@@ -60,12 +60,13 @@
self.done_mutex.release()
def test_starting_threads(self):
- # Basic test for thread creation.
- for i in range(NUMTASKS):
- self.newtask()
- verbose_print("waiting for tasks to complete...")
- self.done_mutex.acquire()
- verbose_print("all tasks done")
+ with support.wait_threads_exit():
+ # Basic test for thread creation.
+ for i in range(NUMTASKS):
+ self.newtask()
+ verbose_print("waiting for tasks to complete...")
+ self.done_mutex.acquire()
+ verbose_print("all tasks done")
def test_stack_size(self):
# Various stack size tests.
@@ -95,12 +96,13 @@
verbose_print("trying stack_size = (%d)" % tss)
self.next_ident = 0
self.created = 0
- for i in range(NUMTASKS):
- self.newtask()
+ with support.wait_threads_exit():
+ for i in range(NUMTASKS):
+ self.newtask()
- verbose_print("waiting for all tasks to complete")
- self.done_mutex.acquire()
- verbose_print("all tasks done")
+ verbose_print("waiting for all tasks to complete")
+ self.done_mutex.acquire()
+ verbose_print("all tasks done")
thread.stack_size(0)
@@ -110,25 +112,28 @@
mut = thread.allocate_lock()
mut.acquire()
started = []
+
def task():
started.append(None)
mut.acquire()
mut.release()
- thread.start_new_thread(task, ())
- while not started:
- time.sleep(0.01)
- self.assertEqual(thread._count(), orig + 1)
- # Allow the task to finish.
- mut.release()
- # The only reliable way to be sure that the thread ended from the
- # interpreter's point of view is to wait for the function object to be
- # destroyed.
- done = []
- wr = weakref.ref(task, lambda _: done.append(None))
- del task
- while not done:
- time.sleep(0.01)
- self.assertEqual(thread._count(), orig)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(task, ())
+ while not started:
+ time.sleep(0.01)
+ self.assertEqual(thread._count(), orig + 1)
+ # Allow the task to finish.
+ mut.release()
+ # The only reliable way to be sure that the thread ended from the
+ # interpreter's point of view is to wait for the function object to be
+ # destroyed.
+ done = []
+ wr = weakref.ref(task, lambda _: done.append(None))
+ del task
+ while not done:
+ time.sleep(0.01)
+ self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self):
# See issue #14474
@@ -143,14 +148,13 @@
real_write(self, *args)
c = thread._count()
started = thread.allocate_lock()
- with test_support.captured_output("stderr") as stderr:
+ with support.captured_output("stderr") as stderr:
real_write = stderr.write
stderr.write = mywrite
started.acquire()
- thread.start_new_thread(task, ())
- started.acquire()
- while thread._count() > c:
- time.sleep(0.01)
+ with support.wait_threads_exit():
+ thread.start_new_thread(task, ())
+ started.acquire()
self.assertIn("Traceback", stderr.getvalue())
@@ -182,13 +186,14 @@
class BarrierTest(BasicThreadTest):
def test_barrier(self):
- self.bar = Barrier(NUMTASKS)
- self.running = NUMTASKS
- for i in range(NUMTASKS):
- thread.start_new_thread(self.task2, (i,))
- verbose_print("waiting for tasks to end")
- self.done_mutex.acquire()
- verbose_print("tasks done")
+ with support.wait_threads_exit():
+ self.bar = Barrier(NUMTASKS)
+ self.running = NUMTASKS
+ for i in range(NUMTASKS):
+ thread.start_new_thread(self.task2, (i,))
+ verbose_print("waiting for tasks to end")
+ self.done_mutex.acquire()
+ verbose_print("tasks done")
def task2(self, ident):
for i in range(NUMTRIPS):
@@ -226,8 +231,9 @@
@unittest.skipIf(sys.platform.startswith('win'),
"This test is only appropriate for POSIX-like systems.")
- @test_support.reap_threads
+ @support.reap_threads
def test_forkinthread(self):
+ non_local = {'status': None}
def thread1():
try:
pid = os.fork() # fork in a thread
@@ -246,11 +252,13 @@
else: # parent
os.close(self.write_fd)
pid, status = os.waitpid(pid, 0)
- self.assertEqual(status, 0)
+ non_local['status'] = status
- thread.start_new_thread(thread1, ())
- self.assertEqual(os.read(self.read_fd, 2), "OK",
- "Unable to fork() in thread")
+ with support.wait_threads_exit():
+ thread.start_new_thread(thread1, ())
+ self.assertEqual(os.read(self.read_fd, 2), "OK",
+ "Unable to fork() in thread")
+ self.assertEqual(non_local['status'], 0)
def tearDown(self):
try:
@@ -265,7 +273,7 @@
def test_main():
- test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
+ support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
TestForkInThread)
if __name__ == "__main__":