bpo-40692: Run more test_concurrent_futures tests (GH-20239)
In the case of multiprocessing.synchronize() being missing, the
test_concurrent_futures test suite now skips only the tests that
require multiprocessing.synchronize().
Validate that multiprocessing.synchronize exists as part of
_check_system_limits(), allowing ProcessPoolExecutor to raise
NotImplementedError during __init__, rather than crashing with
ImportError during __init__ when creating a lock imported from
multiprocessing.synchronize.
Use _check_system_limits() to disable tests of
ProcessPoolExecutor on systems without multiprocessing.synchronize.
Running the test suite without multiprocessing.synchronize reveals
that Lib/compileall.py crashes when it uses a ProcessPoolExecutor.
Therefore, change Lib/compileall.py to call _check_system_limits()
before creating the ProcessPoolExecutor.
Note that both Lib/compileall.py and Lib/test/test_compileall.py
were attempting to sanity-check ProcessPoolExecutor by expecting
ImportError. In multiprocessing.resource_tracker, sem_unlink() is also absent
on platforms where POSIX semaphores aren't available. Avoid using
sem_unlink() if it, too, does not exist.
Co-authored-by: Pablo Galindo <Pablogsal@gmail.com>
diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py
index be1149a..fa24b3c 100644
--- a/Lib/test/test_compileall.py
+++ b/Lib/test/test_compileall.py
@@ -16,10 +16,14 @@
import unittest
from unittest import mock, skipUnless
+from concurrent.futures import ProcessPoolExecutor
try:
- from concurrent.futures import ProcessPoolExecutor
+ # compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists
+ # and it can function.
+ from concurrent.futures.process import _check_system_limits
+ _check_system_limits()
_have_multiprocessing = True
-except ImportError:
+except NotImplementedError:
_have_multiprocessing = False
from test import support
@@ -188,6 +192,7 @@ def test_compile_dir_pathlike(self):
self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)')
self.assertTrue(os.path.isfile(self.bc_path))
+ @skipUnless(_have_multiprocessing, "requires multiprocessing")
@mock.patch('concurrent.futures.ProcessPoolExecutor')
def test_compile_pool_called(self, pool_mock):
compileall.compile_dir(self.directory, quiet=True, workers=5)
@@ -198,11 +203,13 @@ def test_compile_workers_non_positive(self):
"workers must be greater or equal to 0"):
compileall.compile_dir(self.directory, workers=-1)
+ @skipUnless(_have_multiprocessing, "requires multiprocessing")
@mock.patch('concurrent.futures.ProcessPoolExecutor')
def test_compile_workers_cpu_count(self, pool_mock):
compileall.compile_dir(self.directory, quiet=True, workers=0)
self.assertEqual(pool_mock.call_args[1]['max_workers'], None)
+ @skipUnless(_have_multiprocessing, "requires multiprocessing")
@mock.patch('concurrent.futures.ProcessPoolExecutor')
@mock.patch('compileall.compile_file')
def test_compile_one_worker(self, compile_file_mock, pool_mock):
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index a182b14..99651f5 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -4,8 +4,6 @@
# Skip tests if _multiprocessing wasn't built.
import_helper.import_module('_multiprocessing')
-# Skip tests if sem_open implementation is broken.
-support.skip_if_broken_multiprocessing_synchronize()
from test.support import hashlib_helper
from test.support.script_helper import assert_python_ok
@@ -27,7 +25,7 @@
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
BrokenExecutor)
-from concurrent.futures.process import BrokenProcessPool
+from concurrent.futures.process import BrokenProcessPool, _check_system_limits
from multiprocessing import get_context
import multiprocessing.process
@@ -161,6 +159,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
ctx = "fork"
def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
if sys.platform == "win32":
self.skipTest("require unix system")
return super().get_context()
@@ -170,12 +172,23 @@ class ProcessPoolSpawnMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "spawn"
+ def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
+ return super().get_context()
+
class ProcessPoolForkserverMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "forkserver"
def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
if sys.platform == "win32":
self.skipTest("require unix system")
return super().get_context()