blob: c9bfa9b82b6e6c9c5ef2eebd1cbe02fc7111961b [file] [log] [blame]
Pierre Glaserf22cc692019-05-10 22:59:08 +02001###############################################################################
2# Server process to keep track of unlinked resources (like shared memory
3# segments, semaphores etc.) and clean them.
4#
5# On Unix we run a server process which keeps track of unlinked
6# resources. The server ignores SIGINT and SIGTERM and reads from a
7# pipe. Every other process of the program has a copy of the writable
8# end of the pipe, so we get EOF when all other processes have exited.
9# Then the server process unlinks any remaining resource names.
10#
11# This is important because there may be system limits for such resources: for
12# instance, the system only supports a limited number of named semaphores, and
13# shared-memory segments live in the RAM. If a python process leaks such a
14# resource, this resource will not be removed till the next reboot. Without
15# this resource tracker process, "killall python" would probably leave unlinked
16# resources.
17
18import os
19import signal
20import sys
21import threading
22import warnings
Pierre Glaserf22cc692019-05-10 22:59:08 +020023
24from . import spawn
25from . import util
26
27__all__ = ['ensure_running', 'register', 'unregister']
28
29_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
30_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
31
32_CLEANUP_FUNCS = {
33 'noop': lambda: None,
Pierre Glaserf22cc692019-05-10 22:59:08 +020034}
35
Antoine Pitrou95da83d2019-05-13 20:02:46 +020036if os.name == 'posix':
37 import _multiprocessing
38 import _posixshmem
39
40 _CLEANUP_FUNCS.update({
41 'semaphore': _multiprocessing.sem_unlink,
42 'shared_memory': _posixshmem.shm_unlink,
43 })
44
Pierre Glaserf22cc692019-05-10 22:59:08 +020045
46class ResourceTracker(object):
47
48 def __init__(self):
49 self._lock = threading.Lock()
50 self._fd = None
51 self._pid = None
52
Victor Stinner9707e8e2019-12-17 18:37:26 +010053 def _stop(self):
54 with self._lock:
55 if self._fd is None:
56 # not running
57 return
58
59 # closing the "alive" file descriptor stops main()
60 os.close(self._fd)
61 self._fd = None
62
63 os.waitpid(self._pid, 0)
64 self._pid = None
65
Pierre Glaserf22cc692019-05-10 22:59:08 +020066 def getfd(self):
67 self.ensure_running()
68 return self._fd
69
70 def ensure_running(self):
71 '''Make sure that resource tracker process is running.
72
73 This can be run from any process. Usually a child process will use
74 the resource created by its parent.'''
75 with self._lock:
76 if self._fd is not None:
77 # resource tracker was launched before, is it still running?
78 if self._check_alive():
79 # => still alive
80 return
81 # => dead, launch it again
82 os.close(self._fd)
83
84 # Clean-up to avoid dangling processes.
85 try:
86 # _pid can be None if this process is a child from another
87 # python process, which has started the resource_tracker.
88 if self._pid is not None:
89 os.waitpid(self._pid, 0)
90 except ChildProcessError:
91 # The resource_tracker has already been terminated.
92 pass
93 self._fd = None
94 self._pid = None
95
96 warnings.warn('resource_tracker: process died unexpectedly, '
97 'relaunching. Some resources might leak.')
98
99 fds_to_pass = []
100 try:
101 fds_to_pass.append(sys.stderr.fileno())
102 except Exception:
103 pass
104 cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
105 r, w = os.pipe()
106 try:
107 fds_to_pass.append(r)
108 # process will out live us, so no need to wait on pid
109 exe = spawn.get_executable()
110 args = [exe] + util._args_from_interpreter_flags()
111 args += ['-c', cmd % r]
112 # bpo-33613: Register a signal mask that will block the signals.
113 # This signal mask will be inherited by the child that is going
114 # to be spawned and will protect the child from a race condition
115 # that can make the child die before it registers signal handlers
116 # for SIGINT and SIGTERM. The mask is unregistered after spawning
117 # the child.
118 try:
119 if _HAVE_SIGMASK:
120 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
121 pid = util.spawnv_passfds(exe, args, fds_to_pass)
122 finally:
123 if _HAVE_SIGMASK:
124 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
125 except:
126 os.close(w)
127 raise
128 else:
129 self._fd = w
130 self._pid = pid
131 finally:
132 os.close(r)
133
134 def _check_alive(self):
135 '''Check that the pipe has not been closed by sending a probe.'''
136 try:
137 # We cannot use send here as it calls ensure_running, creating
138 # a cycle.
139 os.write(self._fd, b'PROBE:0:noop\n')
140 except OSError:
141 return False
142 else:
143 return True
144
145 def register(self, name, rtype):
146 '''Register name of resource with resource tracker.'''
147 self._send('REGISTER', name, rtype)
148
149 def unregister(self, name, rtype):
150 '''Unregister name of resource with resource tracker.'''
151 self._send('UNREGISTER', name, rtype)
152
153 def _send(self, cmd, name, rtype):
154 self.ensure_running()
155 msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
156 if len(name) > 512:
157 # posix guarantees that writes to a pipe of less than PIPE_BUF
158 # bytes are atomic, and that PIPE_BUF >= 512
159 raise ValueError('name too long')
160 nbytes = os.write(self._fd, msg)
161 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
162 nbytes, len(msg))
163
164
165_resource_tracker = ResourceTracker()
166ensure_running = _resource_tracker.ensure_running
167register = _resource_tracker.register
168unregister = _resource_tracker.unregister
169getfd = _resource_tracker.getfd
170
171def main(fd):
172 '''Run resource tracker.'''
173 # protect the process from ^C and "killall python" etc
174 signal.signal(signal.SIGINT, signal.SIG_IGN)
175 signal.signal(signal.SIGTERM, signal.SIG_IGN)
176 if _HAVE_SIGMASK:
177 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
178
179 for f in (sys.stdin, sys.stdout):
180 try:
181 f.close()
182 except Exception:
183 pass
184
185 cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
186 try:
187 # keep track of registered/unregistered resources
188 with open(fd, 'rb') as f:
189 for line in f:
190 try:
191 cmd, name, rtype = line.strip().decode('ascii').split(':')
192 cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
193 if cleanup_func is None:
194 raise ValueError(
195 f'Cannot register {name} for automatic cleanup: '
196 f'unknown resource type {rtype}')
197
198 if cmd == 'REGISTER':
199 cache[rtype].add(name)
200 elif cmd == 'UNREGISTER':
201 cache[rtype].remove(name)
202 elif cmd == 'PROBE':
203 pass
204 else:
205 raise RuntimeError('unrecognized command %r' % cmd)
206 except Exception:
207 try:
208 sys.excepthook(*sys.exc_info())
209 except:
210 pass
211 finally:
212 # all processes have terminated; cleanup any remaining resources
213 for rtype, rtype_cache in cache.items():
214 if rtype_cache:
215 try:
216 warnings.warn('resource_tracker: There appear to be %d '
217 'leaked %s objects to clean up at shutdown' %
218 (len(rtype_cache), rtype))
219 except Exception:
220 pass
221 for name in rtype_cache:
222 # For some reason the process which created and registered this
223 # resource has failed to unregister it. Presumably it has
224 # died. We therefore unlink it.
225 try:
226 try:
227 _CLEANUP_FUNCS[rtype](name)
228 except Exception as e:
229 warnings.warn('resource_tracker: %r: %s' % (name, e))
230 finally:
231 pass