bpo-36867: Make semaphore_tracker track other system resources (GH-13222)
The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser.
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index 040b46e..dabf7bc 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -11,7 +11,7 @@
from . import connection
from . import process
from .context import reduction
-from . import semaphore_tracker
+from . import resource_tracker
from . import spawn
from . import util
@@ -69,7 +69,7 @@
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
allfds = [child_r, child_w, self._forkserver_alive_fd,
- semaphore_tracker.getfd()]
+ resource_tracker.getfd()]
allfds += fds
try:
reduction.sendfds(client, allfds)
@@ -90,7 +90,7 @@
ensure_running() will do nothing.
'''
with self._lock:
- semaphore_tracker.ensure_running()
+ resource_tracker.ensure_running()
if self._forkserver_pid is not None:
# forkserver was launched before, is it still running?
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
@@ -290,7 +290,7 @@
os.close(fd)
(_forkserver._forkserver_alive_fd,
- semaphore_tracker._semaphore_tracker._fd,
+ resource_tracker._resource_tracker._fd,
*_forkserver._inherited_fds) = fds
# Run process object received over pipe
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
index 3815106..59f8e45 100644
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -36,8 +36,8 @@
return fd
def _launch(self, process_obj):
- from . import semaphore_tracker
- tracker_fd = semaphore_tracker.getfd()
+ from . import resource_tracker
+ tracker_fd = resource_tracker.getfd()
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py
new file mode 100644
index 0000000..e67e0b2
--- /dev/null
+++ b/Lib/multiprocessing/resource_tracker.py
@@ -0,0 +1,213 @@
+###############################################################################
+# Server process to keep track of unlinked resources (like shared memory
+# segments, semaphores etc.) and clean them.
+#
+# On Unix we run a server process which keeps track of unlinked
+# resources. The server ignores SIGINT and SIGTERM and reads from a
+# pipe. Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining resource names.
+#
+# This is important because there may be system limits for such resources: for
+# instance, the system only supports a limited number of named semaphores, and
+# shared-memory segments live in the RAM. If a python process leaks such a
+# resource, this resource will not be removed till the next reboot. Without
+# this resource tracker process, "killall python" would probably leave unlinked
+# resources.
+
+import os
+import signal
+import sys
+import threading
+import warnings
+import _multiprocessing
+import _posixshmem
+
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
+_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
+
+_CLEANUP_FUNCS = {
+ 'noop': lambda: None,
+ 'semaphore': _multiprocessing.sem_unlink,
+ 'shared_memory': _posixshmem.shm_unlink
+}
+
+
+class ResourceTracker(object):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._fd = None
+ self._pid = None
+
+ def getfd(self):
+ self.ensure_running()
+ return self._fd
+
+ def ensure_running(self):
+ '''Make sure that resource tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the resource created by its parent.'''
+ with self._lock:
+ if self._fd is not None:
+ # resource tracker was launched before, is it still running?
+ if self._check_alive():
+ # => still alive
+ return
+ # => dead, launch it again
+ os.close(self._fd)
+
+ # Clean-up to avoid dangling processes.
+ try:
+ # _pid can be None if this process is a child from another
+ # python process, which has started the resource_tracker.
+ if self._pid is not None:
+ os.waitpid(self._pid, 0)
+ except ChildProcessError:
+ # The resource_tracker has already been terminated.
+ pass
+ self._fd = None
+ self._pid = None
+
+ warnings.warn('resource_tracker: process died unexpectedly, '
+ 'relaunching. Some resources might leak.')
+
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
+ r, w = os.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ # bpo-33613: Register a signal mask that will block the signals.
+ # This signal mask will be inherited by the child that is going
+ # to be spawned and will protect the child from a race condition
+ # that can make the child die before it registers signal handlers
+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
+ # the child.
+ try:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ finally:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
+ except:
+ os.close(w)
+ raise
+ else:
+ self._fd = w
+ self._pid = pid
+ finally:
+ os.close(r)
+
+ def _check_alive(self):
+ '''Check that the pipe has not been closed by sending a probe.'''
+ try:
+ # We cannot use send here as it calls ensure_running, creating
+ # a cycle.
+ os.write(self._fd, b'PROBE:0:noop\n')
+ except OSError:
+ return False
+ else:
+ return True
+
+ def register(self, name, rtype):
+ '''Register name of resource with resource tracker.'''
+ self._send('REGISTER', name, rtype)
+
+ def unregister(self, name, rtype):
+ '''Unregister name of resource with resource tracker.'''
+ self._send('UNREGISTER', name, rtype)
+
+ def _send(self, cmd, name, rtype):
+ self.ensure_running()
+ msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
+ if len(name) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('name too long')
+ nbytes = os.write(self._fd, msg)
+ assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
+ nbytes, len(msg))
+
+
+_resource_tracker = ResourceTracker()
+ensure_running = _resource_tracker.ensure_running
+register = _resource_tracker.register
+unregister = _resource_tracker.unregister
+getfd = _resource_tracker.getfd
+
+def main(fd):
+ '''Run resource tracker.'''
+ # protect the process from ^C and "killall python" etc
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
+
+ for f in (sys.stdin, sys.stdout):
+ try:
+ f.close()
+ except Exception:
+ pass
+
+ cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
+ try:
+ # keep track of registered/unregistered resources
+ with open(fd, 'rb') as f:
+ for line in f:
+ try:
+ cmd, name, rtype = line.strip().decode('ascii').split(':')
+ cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
+ if cleanup_func is None:
+ raise ValueError(
+ f'Cannot register {name} for automatic cleanup: '
+ f'unknown resource type {rtype}')
+
+ if cmd == 'REGISTER':
+ cache[rtype].add(name)
+ elif cmd == 'UNREGISTER':
+ cache[rtype].remove(name)
+ elif cmd == 'PROBE':
+ pass
+ else:
+ raise RuntimeError('unrecognized command %r' % cmd)
+ except Exception:
+ try:
+ sys.excepthook(*sys.exc_info())
+ except:
+ pass
+ finally:
+ # all processes have terminated; cleanup any remaining resources
+ for rtype, rtype_cache in cache.items():
+ if rtype_cache:
+ try:
+ warnings.warn('resource_tracker: There appear to be %d '
+ 'leaked %s objects to clean up at shutdown' %
+ (len(rtype_cache), rtype))
+ except Exception:
+ pass
+ for name in rtype_cache:
+ # For some reason the process which created and registered this
+ # resource has failed to unregister it. Presumably it has
+ # died. We therefore unlink it.
+ try:
+ try:
+ _CLEANUP_FUNCS[rtype](name)
+ except Exception as e:
+ warnings.warn('resource_tracker: %r: %s' % (name, e))
+ finally:
+ pass
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
deleted file mode 100644
index 3c2c3ad..0000000
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ /dev/null
@@ -1,196 +0,0 @@
-#
-# On Unix we run a server process which keeps track of unlinked
-# semaphores. The server ignores SIGINT and SIGTERM and reads from a
-# pipe. Every other process of the program has a copy of the writable
-# end of the pipe, so we get EOF when all other processes have exited.
-# Then the server process unlinks any remaining semaphore names.
-#
-# This is important because the system only supports a limited number
-# of named semaphores, and they will not be automatically removed till
-# the next reboot. Without this semaphore tracker process, "killall
-# python" would probably leave unlinked semaphores.
-#
-
-import os
-import signal
-import sys
-import threading
-import warnings
-import _multiprocessing
-
-from . import spawn
-from . import util
-
-__all__ = ['ensure_running', 'register', 'unregister']
-
-_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
-_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
-
-
-class SemaphoreTracker(object):
-
- def __init__(self):
- self._lock = threading.Lock()
- self._fd = None
- self._pid = None
-
- def getfd(self):
- self.ensure_running()
- return self._fd
-
- def ensure_running(self):
- '''Make sure that semaphore tracker process is running.
-
- This can be run from any process. Usually a child process will use
- the semaphore created by its parent.'''
- with self._lock:
- if self._fd is not None:
- # semaphore tracker was launched before, is it still running?
- if self._check_alive():
- # => still alive
- return
- # => dead, launch it again
- os.close(self._fd)
-
- # Clean-up to avoid dangling processes.
- try:
- # _pid can be None if this process is a child from another
- # python process, which has started the semaphore_tracker.
- if self._pid is not None:
- os.waitpid(self._pid, 0)
- except ChildProcessError:
- # The semaphore_tracker has already been terminated.
- pass
- self._fd = None
- self._pid = None
-
- warnings.warn('semaphore_tracker: process died unexpectedly, '
- 'relaunching. Some semaphores might leak.')
-
- fds_to_pass = []
- try:
- fds_to_pass.append(sys.stderr.fileno())
- except Exception:
- pass
- cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
- r, w = os.pipe()
- try:
- fds_to_pass.append(r)
- # process will out live us, so no need to wait on pid
- exe = spawn.get_executable()
- args = [exe] + util._args_from_interpreter_flags()
- args += ['-c', cmd % r]
- # bpo-33613: Register a signal mask that will block the signals.
- # This signal mask will be inherited by the child that is going
- # to be spawned and will protect the child from a race condition
- # that can make the child die before it registers signal handlers
- # for SIGINT and SIGTERM. The mask is unregistered after spawning
- # the child.
- try:
- if _HAVE_SIGMASK:
- signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
- pid = util.spawnv_passfds(exe, args, fds_to_pass)
- finally:
- if _HAVE_SIGMASK:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
- except:
- os.close(w)
- raise
- else:
- self._fd = w
- self._pid = pid
- finally:
- os.close(r)
-
- def _check_alive(self):
- '''Check that the pipe has not been closed by sending a probe.'''
- try:
- # We cannot use send here as it calls ensure_running, creating
- # a cycle.
- os.write(self._fd, b'PROBE:0\n')
- except OSError:
- return False
- else:
- return True
-
- def register(self, name):
- '''Register name of semaphore with semaphore tracker.'''
- self._send('REGISTER', name)
-
- def unregister(self, name):
- '''Unregister name of semaphore with semaphore tracker.'''
- self._send('UNREGISTER', name)
-
- def _send(self, cmd, name):
- self.ensure_running()
- msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
- if len(name) > 512:
- # posix guarantees that writes to a pipe of less than PIPE_BUF
- # bytes are atomic, and that PIPE_BUF >= 512
- raise ValueError('name too long')
- nbytes = os.write(self._fd, msg)
- assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
- nbytes, len(msg))
-
-
-_semaphore_tracker = SemaphoreTracker()
-ensure_running = _semaphore_tracker.ensure_running
-register = _semaphore_tracker.register
-unregister = _semaphore_tracker.unregister
-getfd = _semaphore_tracker.getfd
-
-def main(fd):
- '''Run semaphore tracker.'''
- # protect the process from ^C and "killall python" etc
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- if _HAVE_SIGMASK:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
-
- for f in (sys.stdin, sys.stdout):
- try:
- f.close()
- except Exception:
- pass
-
- cache = set()
- try:
- # keep track of registered/unregistered semaphores
- with open(fd, 'rb') as f:
- for line in f:
- try:
- cmd, name = line.strip().split(b':')
- if cmd == b'REGISTER':
- cache.add(name)
- elif cmd == b'UNREGISTER':
- cache.remove(name)
- elif cmd == b'PROBE':
- pass
- else:
- raise RuntimeError('unrecognized command %r' % cmd)
- except Exception:
- try:
- sys.excepthook(*sys.exc_info())
- except:
- pass
- finally:
- # all processes have terminated; cleanup any remaining semaphores
- if cache:
- try:
- warnings.warn('semaphore_tracker: There appear to be %d '
- 'leaked semaphores to clean up at shutdown' %
- len(cache))
- except Exception:
- pass
- for name in cache:
- # For some reason the process which created and registered this
- # semaphore has failed to unregister it. Presumably it has died.
- # We therefore unlink it.
- try:
- name = name.decode('ascii')
- try:
- _multiprocessing.sem_unlink(name)
- except Exception as e:
- warnings.warn('semaphore_tracker: %r: %s' % (name, e))
- finally:
- pass
diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py
index ebc8885..184e367 100644
--- a/Lib/multiprocessing/shared_memory.py
+++ b/Lib/multiprocessing/shared_memory.py
@@ -113,6 +113,9 @@
self.unlink()
raise
+ from .resource_tracker import register
+ register(self._name, "shared_memory")
+
else:
# Windows Named Shared Memory
@@ -231,7 +234,9 @@
called once (and only once) across all processes which have access
to the shared memory block."""
if _USE_POSIX and self._name:
+ from .resource_tracker import unregister
_posixshmem.shm_unlink(self._name)
+ unregister(self._name, "shared_memory")
_encoding = "utf8"
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
index 6759351..f66b5aa 100644
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -111,8 +111,8 @@
_winapi.CloseHandle(source_process)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
else:
- from . import semaphore_tracker
- semaphore_tracker._semaphore_tracker._fd = tracker_fd
+ from . import resource_tracker
+ resource_tracker._resource_tracker._fd = tracker_fd
fd = pipe_handle
exitcode = _main(fd)
sys.exit(exitcode)
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 5137c49..4fcbefc 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -76,16 +76,16 @@
# We only get here if we are on Unix with forking
# disabled. When the object is garbage collected or the
# process shuts down we unlink the semaphore name
- from .semaphore_tracker import register
- register(self._semlock.name)
+ from .resource_tracker import register
+ register(self._semlock.name, "semaphore")
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
exitpriority=0)
@staticmethod
def _cleanup(name):
- from .semaphore_tracker import unregister
+ from .resource_tracker import unregister
sem_unlink(name)
- unregister(name)
+ unregister(name, "semaphore")
def _make_methods(self):
self.acquire = self._semlock.acquire
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index d97e423..a50293c 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -88,6 +88,13 @@
support.join_thread(process, timeout=TIMEOUT)
+if os.name == "posix":
+ from multiprocessing import resource_tracker
+
+ def _resource_unlink(name, rtype):
+ resource_tracker._CLEANUP_FUNCS[rtype](name)
+
+
#
# Constants
#
@@ -3896,6 +3903,32 @@
deserialized_sl.shm.close()
sl.shm.close()
+ def test_shared_memory_cleaned_after_process_termination(self):
+ import subprocess
+ from multiprocessing import shared_memory
+ cmd = '''if 1:
+ import os, time, sys
+ from multiprocessing import shared_memory
+
+ # Create a shared_memory segment, and send the segment name
+ sm = shared_memory.SharedMemory(create=True, size=10)
+ sys.stdout.write(sm._name + '\\n')
+ sys.stdout.flush()
+ time.sleep(100)
+ '''
+ p = subprocess.Popen([sys.executable, '-E', '-c', cmd],
+ stdout=subprocess.PIPE)
+ name = p.stdout.readline().strip().decode()
+
+ # killing abruptly processes holding reference to a shared memory
+ # segment should not leak the given memory segment.
+ p.terminate()
+ p.wait()
+ time.sleep(1.0) # wait for the OS to collect the segment
+
+ with self.assertRaises(FileNotFoundError):
+ smm = shared_memory.SharedMemory(name, create=False)
+
#
#
#
@@ -4827,57 +4860,86 @@
@unittest.skipIf(sys.platform == "win32",
"test semantics don't make sense on Windows")
-class TestSemaphoreTracker(unittest.TestCase):
+class TestResourceTracker(unittest.TestCase):
- def test_semaphore_tracker(self):
+ def test_resource_tracker(self):
#
# Check that killing process does not leak named semaphores
#
import subprocess
cmd = '''if 1:
- import multiprocessing as mp, time, os
+ import time, os, tempfile
+ import multiprocessing as mp
+ from multiprocessing import resource_tracker
+ from multiprocessing.shared_memory import SharedMemory
+
mp.set_start_method("spawn")
- lock1 = mp.Lock()
- lock2 = mp.Lock()
- os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
- os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
+ rand = tempfile._RandomNameSequence()
+
+
+ def create_and_register_resource(rtype):
+ if rtype == "semaphore":
+ lock = mp.Lock()
+ return lock, lock._semlock.name
+ elif rtype == "shared_memory":
+ sm = SharedMemory(create=True, size=10)
+ return sm, sm._name
+ else:
+ raise ValueError(
+ "Resource type {{}} not understood".format(rtype))
+
+
+ resource1, rname1 = create_and_register_resource("{rtype}")
+ resource2, rname2 = create_and_register_resource("{rtype}")
+
+ os.write({w}, rname1.encode("ascii") + b"\\n")
+ os.write({w}, rname2.encode("ascii") + b"\\n")
+
time.sleep(10)
'''
- r, w = os.pipe()
- p = subprocess.Popen([sys.executable,
- '-E', '-c', cmd % (w, w)],
- pass_fds=[w],
- stderr=subprocess.PIPE)
- os.close(w)
- with open(r, 'rb', closefd=True) as f:
- name1 = f.readline().rstrip().decode('ascii')
- name2 = f.readline().rstrip().decode('ascii')
- _multiprocessing.sem_unlink(name1)
- p.terminate()
- p.wait()
- time.sleep(2.0)
- with self.assertRaises(OSError) as ctx:
- _multiprocessing.sem_unlink(name2)
- # docs say it should be ENOENT, but OSX seems to give EINVAL
- self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
- err = p.stderr.read().decode('utf-8')
- p.stderr.close()
- expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
- self.assertRegex(err, expected)
- self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
+ for rtype in resource_tracker._CLEANUP_FUNCS:
+ with self.subTest(rtype=rtype):
+ if rtype == "noop":
+ # Artefact resource type used by the resource_tracker
+ continue
+ r, w = os.pipe()
+ p = subprocess.Popen([sys.executable,
+ '-E', '-c', cmd.format(w=w, rtype=rtype)],
+ pass_fds=[w],
+ stderr=subprocess.PIPE)
+ os.close(w)
+ with open(r, 'rb', closefd=True) as f:
+ name1 = f.readline().rstrip().decode('ascii')
+ name2 = f.readline().rstrip().decode('ascii')
+ _resource_unlink(name1, rtype)
+ p.terminate()
+ p.wait()
+ time.sleep(2.0)
+ with self.assertRaises(OSError) as ctx:
+ _resource_unlink(name2, rtype)
+ # docs say it should be ENOENT, but OSX seems to give EINVAL
+ self.assertIn(
+ ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
+ err = p.stderr.read().decode('utf-8')
+ p.stderr.close()
+ expected = ('resource_tracker: There appear to be 2 leaked {} '
+ 'objects'.format(
+ rtype))
+ self.assertRegex(err, expected)
+ self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
- def check_semaphore_tracker_death(self, signum, should_die):
+ def check_resource_tracker_death(self, signum, should_die):
# bpo-31310: if the semaphore tracker process has died, it should
# be restarted implicitly.
- from multiprocessing.semaphore_tracker import _semaphore_tracker
- pid = _semaphore_tracker._pid
+ from multiprocessing.resource_tracker import _resource_tracker
+ pid = _resource_tracker._pid
if pid is not None:
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
- _semaphore_tracker.ensure_running()
- pid = _semaphore_tracker._pid
+ _resource_tracker.ensure_running()
+ pid = _resource_tracker._pid
os.kill(pid, signum)
time.sleep(1.0) # give it time to die
@@ -4898,50 +4960,50 @@
self.assertEqual(len(all_warn), 1)
the_warn = all_warn[0]
self.assertTrue(issubclass(the_warn.category, UserWarning))
- self.assertTrue("semaphore_tracker: process died"
+ self.assertTrue("resource_tracker: process died"
in str(the_warn.message))
else:
self.assertEqual(len(all_warn), 0)
- def test_semaphore_tracker_sigint(self):
+ def test_resource_tracker_sigint(self):
# Catchable signal (ignored by semaphore tracker)
- self.check_semaphore_tracker_death(signal.SIGINT, False)
+ self.check_resource_tracker_death(signal.SIGINT, False)
- def test_semaphore_tracker_sigterm(self):
+ def test_resource_tracker_sigterm(self):
# Catchable signal (ignored by semaphore tracker)
- self.check_semaphore_tracker_death(signal.SIGTERM, False)
+ self.check_resource_tracker_death(signal.SIGTERM, False)
- def test_semaphore_tracker_sigkill(self):
+ def test_resource_tracker_sigkill(self):
# Uncatchable signal.
- self.check_semaphore_tracker_death(signal.SIGKILL, True)
+ self.check_resource_tracker_death(signal.SIGKILL, True)
@staticmethod
- def _is_semaphore_tracker_reused(conn, pid):
- from multiprocessing.semaphore_tracker import _semaphore_tracker
- _semaphore_tracker.ensure_running()
+ def _is_resource_tracker_reused(conn, pid):
+ from multiprocessing.resource_tracker import _resource_tracker
+ _resource_tracker.ensure_running()
# The pid should be None in the child process, expect for the fork
# context. It should not be a new value.
- reused = _semaphore_tracker._pid in (None, pid)
- reused &= _semaphore_tracker._check_alive()
+ reused = _resource_tracker._pid in (None, pid)
+ reused &= _resource_tracker._check_alive()
conn.send(reused)
- def test_semaphore_tracker_reused(self):
- from multiprocessing.semaphore_tracker import _semaphore_tracker
- _semaphore_tracker.ensure_running()
- pid = _semaphore_tracker._pid
+ def test_resource_tracker_reused(self):
+ from multiprocessing.resource_tracker import _resource_tracker
+ _resource_tracker.ensure_running()
+ pid = _resource_tracker._pid
r, w = multiprocessing.Pipe(duplex=False)
- p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
+ p = multiprocessing.Process(target=self._is_resource_tracker_reused,
args=(w, pid))
p.start()
- is_semaphore_tracker_reused = r.recv()
+ is_resource_tracker_reused = r.recv()
# Clean up
p.join()
w.close()
r.close()
- self.assertTrue(is_semaphore_tracker_reused)
+ self.assertTrue(is_resource_tracker_reused)
class TestSimpleQueue(unittest.TestCase):