blob: 61a6dd66e72e67ec433218a1755175ebae7f4656 [file] [log] [blame]
Pierre Glaserf22cc692019-05-10 22:59:08 +02001###############################################################################
2# Server process to keep track of unlinked resources (like shared memory
3# segments, semaphores etc.) and clean them.
4#
5# On Unix we run a server process which keeps track of unlinked
6# resources. The server ignores SIGINT and SIGTERM and reads from a
7# pipe. Every other process of the program has a copy of the writable
8# end of the pipe, so we get EOF when all other processes have exited.
9# Then the server process unlinks any remaining resource names.
10#
11# This is important because there may be system limits for such resources: for
12# instance, the system only supports a limited number of named semaphores, and
13# shared-memory segments live in the RAM. If a python process leaks such a
14# resource, this resource will not be removed till the next reboot. Without
15# this resource tracker process, "killall python" would probably leave unlinked
16# resources.
17
18import os
19import signal
20import sys
21import threading
22import warnings
Pierre Glaserf22cc692019-05-10 22:59:08 +020023
24from . import spawn
25from . import util
26
27__all__ = ['ensure_running', 'register', 'unregister']
28
29_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
30_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
31
32_CLEANUP_FUNCS = {
33 'noop': lambda: None,
Pierre Glaserf22cc692019-05-10 22:59:08 +020034}
35
Antoine Pitrou95da83d2019-05-13 20:02:46 +020036if os.name == 'posix':
37 import _multiprocessing
38 import _posixshmem
39
40 _CLEANUP_FUNCS.update({
41 'semaphore': _multiprocessing.sem_unlink,
42 'shared_memory': _posixshmem.shm_unlink,
43 })
44
Pierre Glaserf22cc692019-05-10 22:59:08 +020045
46class ResourceTracker(object):
47
48 def __init__(self):
49 self._lock = threading.Lock()
50 self._fd = None
51 self._pid = None
52
53 def getfd(self):
54 self.ensure_running()
55 return self._fd
56
57 def ensure_running(self):
58 '''Make sure that resource tracker process is running.
59
60 This can be run from any process. Usually a child process will use
61 the resource created by its parent.'''
62 with self._lock:
63 if self._fd is not None:
64 # resource tracker was launched before, is it still running?
65 if self._check_alive():
66 # => still alive
67 return
68 # => dead, launch it again
69 os.close(self._fd)
70
71 # Clean-up to avoid dangling processes.
72 try:
73 # _pid can be None if this process is a child from another
74 # python process, which has started the resource_tracker.
75 if self._pid is not None:
76 os.waitpid(self._pid, 0)
77 except ChildProcessError:
78 # The resource_tracker has already been terminated.
79 pass
80 self._fd = None
81 self._pid = None
82
83 warnings.warn('resource_tracker: process died unexpectedly, '
84 'relaunching. Some resources might leak.')
85
86 fds_to_pass = []
87 try:
88 fds_to_pass.append(sys.stderr.fileno())
89 except Exception:
90 pass
91 cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
92 r, w = os.pipe()
93 try:
94 fds_to_pass.append(r)
95 # process will out live us, so no need to wait on pid
96 exe = spawn.get_executable()
97 args = [exe] + util._args_from_interpreter_flags()
98 args += ['-c', cmd % r]
99 # bpo-33613: Register a signal mask that will block the signals.
100 # This signal mask will be inherited by the child that is going
101 # to be spawned and will protect the child from a race condition
102 # that can make the child die before it registers signal handlers
103 # for SIGINT and SIGTERM. The mask is unregistered after spawning
104 # the child.
105 try:
106 if _HAVE_SIGMASK:
107 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
108 pid = util.spawnv_passfds(exe, args, fds_to_pass)
109 finally:
110 if _HAVE_SIGMASK:
111 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
112 except:
113 os.close(w)
114 raise
115 else:
116 self._fd = w
117 self._pid = pid
118 finally:
119 os.close(r)
120
121 def _check_alive(self):
122 '''Check that the pipe has not been closed by sending a probe.'''
123 try:
124 # We cannot use send here as it calls ensure_running, creating
125 # a cycle.
126 os.write(self._fd, b'PROBE:0:noop\n')
127 except OSError:
128 return False
129 else:
130 return True
131
132 def register(self, name, rtype):
133 '''Register name of resource with resource tracker.'''
134 self._send('REGISTER', name, rtype)
135
136 def unregister(self, name, rtype):
137 '''Unregister name of resource with resource tracker.'''
138 self._send('UNREGISTER', name, rtype)
139
140 def _send(self, cmd, name, rtype):
141 self.ensure_running()
142 msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
143 if len(name) > 512:
144 # posix guarantees that writes to a pipe of less than PIPE_BUF
145 # bytes are atomic, and that PIPE_BUF >= 512
146 raise ValueError('name too long')
147 nbytes = os.write(self._fd, msg)
148 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
149 nbytes, len(msg))
150
151
152_resource_tracker = ResourceTracker()
153ensure_running = _resource_tracker.ensure_running
154register = _resource_tracker.register
155unregister = _resource_tracker.unregister
156getfd = _resource_tracker.getfd
157
158def main(fd):
159 '''Run resource tracker.'''
160 # protect the process from ^C and "killall python" etc
161 signal.signal(signal.SIGINT, signal.SIG_IGN)
162 signal.signal(signal.SIGTERM, signal.SIG_IGN)
163 if _HAVE_SIGMASK:
164 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
165
166 for f in (sys.stdin, sys.stdout):
167 try:
168 f.close()
169 except Exception:
170 pass
171
172 cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
173 try:
174 # keep track of registered/unregistered resources
175 with open(fd, 'rb') as f:
176 for line in f:
177 try:
178 cmd, name, rtype = line.strip().decode('ascii').split(':')
179 cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
180 if cleanup_func is None:
181 raise ValueError(
182 f'Cannot register {name} for automatic cleanup: '
183 f'unknown resource type {rtype}')
184
185 if cmd == 'REGISTER':
186 cache[rtype].add(name)
187 elif cmd == 'UNREGISTER':
188 cache[rtype].remove(name)
189 elif cmd == 'PROBE':
190 pass
191 else:
192 raise RuntimeError('unrecognized command %r' % cmd)
193 except Exception:
194 try:
195 sys.excepthook(*sys.exc_info())
196 except:
197 pass
198 finally:
199 # all processes have terminated; cleanup any remaining resources
200 for rtype, rtype_cache in cache.items():
201 if rtype_cache:
202 try:
203 warnings.warn('resource_tracker: There appear to be %d '
204 'leaked %s objects to clean up at shutdown' %
205 (len(rtype_cache), rtype))
206 except Exception:
207 pass
208 for name in rtype_cache:
209 # For some reason the process which created and registered this
210 # resource has failed to unregister it. Presumably it has
211 # died. We therefore unlink it.
212 try:
213 try:
214 _CLEANUP_FUNCS[rtype](name)
215 except Exception as e:
216 warnings.warn('resource_tracker: %r: %s' % (name, e))
217 finally:
218 pass