blob: cc42dbdda05b913efd43cbe988773a08bc177e05 [file] [log] [blame]
Pierre Glaserf22cc692019-05-10 22:59:08 +02001###############################################################################
2# Server process to keep track of unlinked resources (like shared memory
3# segments, semaphores etc.) and clean them.
4#
5# On Unix we run a server process which keeps track of unlinked
6# resources. The server ignores SIGINT and SIGTERM and reads from a
7# pipe. Every other process of the program has a copy of the writable
8# end of the pipe, so we get EOF when all other processes have exited.
9# Then the server process unlinks any remaining resource names.
10#
11# This is important because there may be system limits for such resources: for
12# instance, the system only supports a limited number of named semaphores, and
13# shared-memory segments live in the RAM. If a python process leaks such a
14# resource, this resource will not be removed till the next reboot. Without
15# this resource tracker process, "killall python" would probably leave unlinked
16# resources.
17
18import os
19import signal
20import sys
21import threading
22import warnings
Pierre Glaserf22cc692019-05-10 22:59:08 +020023
24from . import spawn
25from . import util
26
27__all__ = ['ensure_running', 'register', 'unregister']
28
29_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
30_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
31
32_CLEANUP_FUNCS = {
33 'noop': lambda: None,
Pierre Glaserf22cc692019-05-10 22:59:08 +020034}
35
Antoine Pitrou95da83d2019-05-13 20:02:46 +020036if os.name == 'posix':
37 import _multiprocessing
38 import _posixshmem
39
Asheesh Laroiabf2e7e52021-02-07 19:15:51 -080040 # Use sem_unlink() to clean up named semaphores.
41 #
42 # sem_unlink() may be missing if the Python build process detected the
43 # absence of POSIX named semaphores. In that case, no named semaphores were
44 # ever opened, so no cleanup would be necessary.
45 if hasattr(_multiprocessing, 'sem_unlink'):
46 _CLEANUP_FUNCS.update({
47 'semaphore': _multiprocessing.sem_unlink,
48 })
Antoine Pitrou95da83d2019-05-13 20:02:46 +020049 _CLEANUP_FUNCS.update({
Antoine Pitrou95da83d2019-05-13 20:02:46 +020050 'shared_memory': _posixshmem.shm_unlink,
51 })
52
Pierre Glaserf22cc692019-05-10 22:59:08 +020053
54class ResourceTracker(object):
55
56 def __init__(self):
57 self._lock = threading.Lock()
58 self._fd = None
59 self._pid = None
60
Victor Stinner9707e8e2019-12-17 18:37:26 +010061 def _stop(self):
62 with self._lock:
63 if self._fd is None:
64 # not running
65 return
66
67 # closing the "alive" file descriptor stops main()
68 os.close(self._fd)
69 self._fd = None
70
71 os.waitpid(self._pid, 0)
72 self._pid = None
73
Pierre Glaserf22cc692019-05-10 22:59:08 +020074 def getfd(self):
75 self.ensure_running()
76 return self._fd
77
78 def ensure_running(self):
79 '''Make sure that resource tracker process is running.
80
81 This can be run from any process. Usually a child process will use
82 the resource created by its parent.'''
83 with self._lock:
84 if self._fd is not None:
85 # resource tracker was launched before, is it still running?
86 if self._check_alive():
87 # => still alive
88 return
89 # => dead, launch it again
90 os.close(self._fd)
91
92 # Clean-up to avoid dangling processes.
93 try:
94 # _pid can be None if this process is a child from another
95 # python process, which has started the resource_tracker.
96 if self._pid is not None:
97 os.waitpid(self._pid, 0)
98 except ChildProcessError:
99 # The resource_tracker has already been terminated.
100 pass
101 self._fd = None
102 self._pid = None
103
104 warnings.warn('resource_tracker: process died unexpectedly, '
105 'relaunching. Some resources might leak.')
106
107 fds_to_pass = []
108 try:
109 fds_to_pass.append(sys.stderr.fileno())
110 except Exception:
111 pass
112 cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
113 r, w = os.pipe()
114 try:
115 fds_to_pass.append(r)
116 # process will out live us, so no need to wait on pid
117 exe = spawn.get_executable()
118 args = [exe] + util._args_from_interpreter_flags()
119 args += ['-c', cmd % r]
120 # bpo-33613: Register a signal mask that will block the signals.
121 # This signal mask will be inherited by the child that is going
122 # to be spawned and will protect the child from a race condition
123 # that can make the child die before it registers signal handlers
124 # for SIGINT and SIGTERM. The mask is unregistered after spawning
125 # the child.
126 try:
127 if _HAVE_SIGMASK:
128 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
129 pid = util.spawnv_passfds(exe, args, fds_to_pass)
130 finally:
131 if _HAVE_SIGMASK:
132 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
133 except:
134 os.close(w)
135 raise
136 else:
137 self._fd = w
138 self._pid = pid
139 finally:
140 os.close(r)
141
142 def _check_alive(self):
143 '''Check that the pipe has not been closed by sending a probe.'''
144 try:
145 # We cannot use send here as it calls ensure_running, creating
146 # a cycle.
147 os.write(self._fd, b'PROBE:0:noop\n')
148 except OSError:
149 return False
150 else:
151 return True
152
153 def register(self, name, rtype):
154 '''Register name of resource with resource tracker.'''
155 self._send('REGISTER', name, rtype)
156
157 def unregister(self, name, rtype):
158 '''Unregister name of resource with resource tracker.'''
159 self._send('UNREGISTER', name, rtype)
160
161 def _send(self, cmd, name, rtype):
162 self.ensure_running()
163 msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
164 if len(name) > 512:
165 # posix guarantees that writes to a pipe of less than PIPE_BUF
166 # bytes are atomic, and that PIPE_BUF >= 512
167 raise ValueError('name too long')
168 nbytes = os.write(self._fd, msg)
169 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
170 nbytes, len(msg))
171
172
173_resource_tracker = ResourceTracker()
174ensure_running = _resource_tracker.ensure_running
175register = _resource_tracker.register
176unregister = _resource_tracker.unregister
177getfd = _resource_tracker.getfd
178
179def main(fd):
180 '''Run resource tracker.'''
181 # protect the process from ^C and "killall python" etc
182 signal.signal(signal.SIGINT, signal.SIG_IGN)
183 signal.signal(signal.SIGTERM, signal.SIG_IGN)
184 if _HAVE_SIGMASK:
185 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
186
187 for f in (sys.stdin, sys.stdout):
188 try:
189 f.close()
190 except Exception:
191 pass
192
193 cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
194 try:
195 # keep track of registered/unregistered resources
196 with open(fd, 'rb') as f:
197 for line in f:
198 try:
199 cmd, name, rtype = line.strip().decode('ascii').split(':')
200 cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
201 if cleanup_func is None:
202 raise ValueError(
203 f'Cannot register {name} for automatic cleanup: '
204 f'unknown resource type {rtype}')
205
206 if cmd == 'REGISTER':
207 cache[rtype].add(name)
208 elif cmd == 'UNREGISTER':
209 cache[rtype].remove(name)
210 elif cmd == 'PROBE':
211 pass
212 else:
213 raise RuntimeError('unrecognized command %r' % cmd)
214 except Exception:
215 try:
216 sys.excepthook(*sys.exc_info())
217 except:
218 pass
219 finally:
220 # all processes have terminated; cleanup any remaining resources
221 for rtype, rtype_cache in cache.items():
222 if rtype_cache:
223 try:
224 warnings.warn('resource_tracker: There appear to be %d '
225 'leaked %s objects to clean up at shutdown' %
226 (len(rtype_cache), rtype))
227 except Exception:
228 pass
229 for name in rtype_cache:
230 # For some reason the process which created and registered this
231 # resource has failed to unregister it. Presumably it has
232 # died. We therefore unlink it.
233 try:
234 try:
235 _CLEANUP_FUNCS[rtype](name)
236 except Exception as e:
237 warnings.warn('resource_tracker: %r: %s' % (name, e))
238 finally:
239 pass