Issue #23799: Added test.test_support.start_threads() for running and
cleaning up multiple threads.
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
index e2c222a..bf5eb20 100644
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -1,4 +1,4 @@
-from test import test_support
+from test import test_support as support
from test.test_support import TESTFN, _4G, bigmemtest, import_module, findfile
import unittest
@@ -306,10 +306,8 @@
for i in range(5):
f.write(data)
threads = [threading.Thread(target=comp) for i in range(nthreads)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
+ with support.start_threads(threads):
+ pass
def testMixedIterationReads(self):
# Issue #8397: mixed iteration and reads should be forbidden.
@@ -482,13 +480,13 @@
self.assertEqual(text.strip("a"), "")
def test_main():
- test_support.run_unittest(
+ support.run_unittest(
BZ2FileTest,
BZ2CompressorTest,
BZ2DecompressorTest,
FuncTest
)
- test_support.reap_children()
+ support.reap_children()
if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index a2cb5c7..2029265 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -6,7 +6,7 @@
import time
import random
import unittest
-from test import test_support
+from test import test_support as support
try:
import thread
import threading
@@ -14,7 +14,7 @@
thread = None
threading = None
# Skip this test if the _testcapi module isn't available.
-_testcapi = test_support.import_module('_testcapi')
+_testcapi = support.import_module('_testcapi')
@unittest.skipUnless(threading, 'Threading required for this test.')
@@ -42,7 +42,7 @@
#this busy loop is where we expect to be interrupted to
#run our callbacks. Note that callbacks are only run on the
#main thread
- if False and test_support.verbose:
+ if False and support.verbose:
print "(%i)"%(len(l),),
for i in xrange(1000):
a = i*i
@@ -51,7 +51,7 @@
count += 1
self.assertTrue(count < 10000,
"timeout waiting for %i callbacks, got %i"%(n, len(l)))
- if False and test_support.verbose:
+ if False and support.verbose:
print "(%i)"%(len(l),)
def test_pendingcalls_threaded(self):
@@ -67,15 +67,11 @@
context.lock = threading.Lock()
context.event = threading.Event()
- for i in range(context.nThreads):
- t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
- t.start()
- threads.append(t)
-
- self.pendingcalls_wait(context.l, n, context)
-
- for t in threads:
- t.join()
+ threads = [threading.Thread(target=self.pendingcalls_thread,
+ args=(context,))
+ for i in range(context.nThreads)]
+ with support.start_threads(threads):
+ self.pendingcalls_wait(context.l, n, context)
def pendingcalls_thread(self, context):
try:
@@ -84,7 +80,7 @@
with context.lock:
context.nFinished += 1
nFinished = context.nFinished
- if False and test_support.verbose:
+ if False and support.verbose:
print "finished threads: ", nFinished
if nFinished == context.nThreads:
context.event.set()
@@ -103,7 +99,7 @@
@unittest.skipUnless(threading and thread, 'Threading required for this test.')
class TestThreadState(unittest.TestCase):
- @test_support.reap_threads
+ @support.reap_threads
def test_thread_state(self):
# some extra thread-state tests driven via _testcapi
def target():
@@ -129,14 +125,14 @@
for name in dir(_testcapi):
if name.startswith('test_'):
test = getattr(_testcapi, name)
- if test_support.verbose:
+ if support.verbose:
print "internal", name
try:
test()
except _testcapi.error:
- raise test_support.TestFailed, sys.exc_info()[1]
+ raise support.TestFailed, sys.exc_info()[1]
- test_support.run_unittest(TestPendingCalls, TestThreadState)
+ support.run_unittest(TestPendingCalls, TestThreadState)
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
index 5746c39..ed01c98 100644
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -1,5 +1,5 @@
import unittest
-from test.test_support import verbose, run_unittest
+from test.test_support import verbose, run_unittest, start_threads
import sys
import time
import gc
@@ -352,19 +352,13 @@
old_checkinterval = sys.getcheckinterval()
sys.setcheckinterval(3)
try:
- exit = False
+ exit = []
threads = []
for i in range(N_THREADS):
t = threading.Thread(target=run_thread)
threads.append(t)
- try:
- for t in threads:
- t.start()
- finally:
+ with start_threads(threads, lambda: exit.append(1)):
time.sleep(1.0)
- exit = True
- for t in threads:
- t.join()
finally:
sys.setcheckinterval(old_checkinterval)
gc.collect()
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index fc68e4d..2914a80 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -985,11 +985,8 @@
errors.append(e)
raise
threads = [threading.Thread(target=f) for x in range(20)]
- for t in threads:
- t.start()
- time.sleep(0.02) # yield
- for t in threads:
- t.join()
+ with support.start_threads(threads):
+ time.sleep(0.02) # yield
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
s = b''.join(results)
@@ -1299,11 +1296,8 @@
errors.append(e)
raise
threads = [threading.Thread(target=f) for x in range(20)]
- for t in threads:
- t.start()
- time.sleep(0.02) # yield
- for t in threads:
- t.join()
+ with support.start_threads(threads):
+ time.sleep(0.02) # yield
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
bufio.close()
@@ -2555,14 +2549,10 @@
text = "Thread%03d\n" % n
event.wait()
f.write(text)
- threads = [threading.Thread(target=lambda n=x: run(n))
+ threads = [threading.Thread(target=run, args=(x,))
for x in range(20)]
- for t in threads:
- t.start()
- time.sleep(0.02)
- event.set()
- for t in threads:
- t.join()
+ with support.start_threads(threads, event.set):
+ time.sleep(0.02)
with self.open(support.TESTFN) as f:
content = f.read()
for n in range(20):
@@ -3042,9 +3032,11 @@
# return with a successful (partial) result rather than an EINTR.
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
- self.assertRaises(ZeroDivisionError,
- wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
- t.join()
+ try:
+ with self.assertRaises(ZeroDivisionError):
+ wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
+ finally:
+ t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
read_results.append(os.read(r, 1))
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 1fbd5b4..75563cb 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -37,7 +37,7 @@
"captured_stdout", "TransientResource", "transient_internet",
"run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
"BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
- "threading_cleanup", "reap_children", "cpython_only",
+ "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
"check_impl_detail", "get_attribute", "py3k_bytes",
"import_fresh_module", "threading_cleanup", "reap_children",
"strip_python_stderr", "IPV6_ENABLED"]
@@ -1509,6 +1509,39 @@
break
@contextlib.contextmanager
+def start_threads(threads, unlock=None):
+ threads = list(threads)
+ started = []
+ try:
+ try:
+ for t in threads:
+ t.start()
+ started.append(t)
+ except:
+ if verbose:
+ print("Can't start %d threads, only %d threads started" %
+ (len(threads), len(started)))
+ raise
+ yield
+ finally:
+ if unlock:
+ unlock()
+ endtime = starttime = time.time()
+ for timeout in range(1, 16):
+ endtime += 60
+ for t in started:
+ t.join(max(endtime - time.time(), 0.01))
+ started = [t for t in started if t.isAlive()]
+ if not started:
+ break
+ if verbose:
+ print('Unable to join %d threads during a period of '
+ '%d minutes' % (len(started), timeout))
+ started = [t for t in started if t.isAlive()]
+ if started:
+ raise AssertionError('Unable to join %d threads' % len(started))
+
+@contextlib.contextmanager
def swap_attr(obj, attr, new_val):
"""Temporary swap out an attribute with a new object.
diff --git a/Lib/test/test_threadedtempfile.py b/Lib/test/test_threadedtempfile.py
index 81d9687..c2c30de 100644
--- a/Lib/test/test_threadedtempfile.py
+++ b/Lib/test/test_threadedtempfile.py
@@ -18,7 +18,7 @@
import tempfile
-from test.test_support import threading_setup, threading_cleanup, run_unittest, import_module
+from test.test_support import start_threads, run_unittest, import_module
threading = import_module('threading')
import unittest
import StringIO
@@ -46,25 +46,12 @@
class ThreadedTempFileTest(unittest.TestCase):
def test_main(self):
- threads = []
- thread_info = threading_setup()
-
- for i in range(NUM_THREADS):
- t = TempFileGreedy()
- threads.append(t)
- t.start()
-
- startEvent.set()
-
- ok = 0
- errors = []
- for t in threads:
- t.join()
- ok += t.ok_count
- if t.error_count:
- errors.append(str(t.getName()) + str(t.errors.getvalue()))
-
- threading_cleanup(*thread_info)
+ threads = [TempFileGreedy() for i in range(NUM_THREADS)]
+ with start_threads(threads, startEvent.set):
+ pass
+ ok = sum(t.ok_count for t in threads)
+ errors = [str(t.getName()) + str(t.errors.getvalue())
+ for t in threads if t.error_count]
msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
'\n'.join(errors))
diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py
index 4c9f296..b161315 100644
--- a/Lib/test/test_threading_local.py
+++ b/Lib/test/test_threading_local.py
@@ -1,12 +1,12 @@
import unittest
from doctest import DocTestSuite
-from test import test_support
+from test import test_support as support
import weakref
import gc
# Modules under test
-_thread = test_support.import_module('thread')
-threading = test_support.import_module('threading')
+_thread = support.import_module('thread')
+threading = support.import_module('threading')
import _threading_local
@@ -63,14 +63,9 @@
# Simply check that the variable is correctly set
self.assertEqual(local.x, i)
- threads= []
- for i in range(10):
- t = threading.Thread(target=f, args=(i,))
- t.start()
- threads.append(t)
-
- for t in threads:
- t.join()
+ with support.start_threads(threading.Thread(target=f, args=(i,))
+ for i in range(10)):
+ pass
def test_derived_cycle_dealloc(self):
# http://bugs.python.org/issue6990
@@ -228,7 +223,7 @@
setUp=setUp, tearDown=tearDown)
)
- test_support.run_unittest(suite)
+ support.run_unittest(suite)
if __name__ == '__main__':
test_main()
diff --git a/Misc/NEWS b/Misc/NEWS
index 9c3cd3d..410f0b0 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -182,6 +182,9 @@
Tests
-----
+- Issue #23799: Added test.test_support.start_threads() for running and
+ cleaning up multiple threads.
+
- Issue #22390: test.regrtest now emits a warning if temporary files or
directories are left after running a test.