Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Module providing various facilities to other parts of the package |
| 3 | # |
| 4 | # multiprocessing/util.py |
| 5 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 6 | # Copyright (c) 2006-2008, R Oudkerk |
Richard Oudkerk | 3e268aa | 2012-04-30 12:13:55 +0100 | [diff] [blame] | 7 | # Licensed to PSF under a Contributor Agreement. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 8 | # |
| 9 | |
Richard Oudkerk | 77c84f2 | 2012-05-18 14:28:02 +0100 | [diff] [blame] | 10 | import sys |
Antoine Pitrou | 176f07d | 2011-06-06 19:35:31 +0200 | [diff] [blame] | 11 | import functools |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 12 | import os |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 13 | import itertools |
| 14 | import weakref |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 15 | import atexit |
| 16 | import threading # we want threading to install it's |
| 17 | # cleanup function before multiprocessing does |
Antoine Pitrou | ebdcd85 | 2012-05-18 18:33:07 +0200 | [diff] [blame] | 18 | from subprocess import _args_from_interpreter_flags |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 19 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 20 | from . import process |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 21 | |
| 22 | __all__ = [ |
| 23 | 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', |
| 24 | 'log_to_stderr', 'get_temp_dir', 'register_after_fork', |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 25 | 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 26 | 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 27 | ] |
| 28 | |
| 29 | # |
| 30 | # Logging |
| 31 | # |
| 32 | |
| 33 | NOTSET = 0 |
| 34 | SUBDEBUG = 5 |
| 35 | DEBUG = 10 |
| 36 | INFO = 20 |
| 37 | SUBWARNING = 25 |
| 38 | |
| 39 | LOGGER_NAME = 'multiprocessing' |
| 40 | DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' |
| 41 | |
| 42 | _logger = None |
| 43 | _log_to_stderr = False |
| 44 | |
| 45 | def sub_debug(msg, *args): |
| 46 | if _logger: |
| 47 | _logger.log(SUBDEBUG, msg, *args) |
| 48 | |
| 49 | def debug(msg, *args): |
| 50 | if _logger: |
| 51 | _logger.log(DEBUG, msg, *args) |
| 52 | |
| 53 | def info(msg, *args): |
| 54 | if _logger: |
| 55 | _logger.log(INFO, msg, *args) |
| 56 | |
| 57 | def sub_warning(msg, *args): |
| 58 | if _logger: |
| 59 | _logger.log(SUBWARNING, msg, *args) |
| 60 | |
| 61 | def get_logger(): |
| 62 | ''' |
| 63 | Returns logger used by multiprocessing |
| 64 | ''' |
| 65 | global _logger |
Florent Xicluna | 04842a8 | 2011-11-11 20:05:50 +0100 | [diff] [blame] | 66 | import logging |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 67 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 68 | logging._acquireLock() |
| 69 | try: |
| 70 | if not _logger: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 71 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 72 | _logger = logging.getLogger(LOGGER_NAME) |
| 73 | _logger.propagate = 0 |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 74 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 75 | # XXX multiprocessing should cleanup before logging |
| 76 | if hasattr(atexit, 'unregister'): |
| 77 | atexit.unregister(_exit_function) |
| 78 | atexit.register(_exit_function) |
| 79 | else: |
| 80 | atexit._exithandlers.remove((_exit_function, (), {})) |
| 81 | atexit._exithandlers.append((_exit_function, (), {})) |
| 82 | |
| 83 | finally: |
| 84 | logging._releaseLock() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 85 | |
| 86 | return _logger |
| 87 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 88 | def log_to_stderr(level=None): |
| 89 | ''' |
| 90 | Turn on logging and add a handler which prints to stderr |
| 91 | ''' |
| 92 | global _log_to_stderr |
| 93 | import logging |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 94 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 95 | logger = get_logger() |
| 96 | formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) |
| 97 | handler = logging.StreamHandler() |
| 98 | handler.setFormatter(formatter) |
| 99 | logger.addHandler(handler) |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 100 | |
| 101 | if level: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 102 | logger.setLevel(level) |
| 103 | _log_to_stderr = True |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 104 | return _logger |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 105 | |
| 106 | # |
| 107 | # Function returning a temp directory which will be removed on exit |
| 108 | # |
| 109 | |
| 110 | def get_temp_dir(): |
| 111 | # get name of a temp directory which will be automatically cleaned up |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 112 | tempdir = process.current_process()._config.get('tempdir') |
| 113 | if tempdir is None: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 114 | import shutil, tempfile |
| 115 | tempdir = tempfile.mkdtemp(prefix='pymp-') |
| 116 | info('created temp directory %s', tempdir) |
| 117 | Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 118 | process.current_process()._config['tempdir'] = tempdir |
| 119 | return tempdir |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 120 | |
| 121 | # |
| 122 | # Support for reinitialization of objects when bootstrapping a child process |
| 123 | # |
| 124 | |
| 125 | _afterfork_registry = weakref.WeakValueDictionary() |
| 126 | _afterfork_counter = itertools.count() |
| 127 | |
| 128 | def _run_after_forkers(): |
| 129 | items = list(_afterfork_registry.items()) |
| 130 | items.sort() |
| 131 | for (index, ident, func), obj in items: |
| 132 | try: |
| 133 | func(obj) |
| 134 | except Exception as e: |
| 135 | info('after forker raised exception %s', e) |
| 136 | |
| 137 | def register_after_fork(obj, func): |
| 138 | _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj |
| 139 | |
| 140 | # |
| 141 | # Finalization using weakrefs |
| 142 | # |
| 143 | |
| 144 | _finalizer_registry = {} |
| 145 | _finalizer_counter = itertools.count() |
| 146 | |
| 147 | |
| 148 | class Finalize(object): |
| 149 | ''' |
| 150 | Class which supports object finalization using weakrefs |
| 151 | ''' |
| 152 | def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): |
| 153 | assert exitpriority is None or type(exitpriority) is int |
| 154 | |
| 155 | if obj is not None: |
| 156 | self._weakref = weakref.ref(obj, self) |
| 157 | else: |
| 158 | assert exitpriority is not None |
| 159 | |
| 160 | self._callback = callback |
| 161 | self._args = args |
| 162 | self._kwargs = kwargs or {} |
| 163 | self._key = (exitpriority, next(_finalizer_counter)) |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 164 | self._pid = os.getpid() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 165 | |
| 166 | _finalizer_registry[self._key] = self |
| 167 | |
Antoine Pitrou | 71a28a9 | 2011-07-09 01:03:00 +0200 | [diff] [blame] | 168 | def __call__(self, wr=None, |
| 169 | # Need to bind these locally because the globals can have |
| 170 | # been cleared at shutdown |
| 171 | _finalizer_registry=_finalizer_registry, |
Richard Oudkerk | ad06444 | 2012-06-04 18:58:59 +0100 | [diff] [blame] | 172 | sub_debug=sub_debug, getpid=os.getpid): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 173 | ''' |
| 174 | Run the callback unless it has already been called or cancelled |
| 175 | ''' |
| 176 | try: |
| 177 | del _finalizer_registry[self._key] |
| 178 | except KeyError: |
| 179 | sub_debug('finalizer no longer registered') |
| 180 | else: |
Richard Oudkerk | ad06444 | 2012-06-04 18:58:59 +0100 | [diff] [blame] | 181 | if self._pid != getpid(): |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 182 | sub_debug('finalizer ignored because different process') |
| 183 | res = None |
| 184 | else: |
| 185 | sub_debug('finalizer calling %s with args %s and kwargs %s', |
| 186 | self._callback, self._args, self._kwargs) |
| 187 | res = self._callback(*self._args, **self._kwargs) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 188 | self._weakref = self._callback = self._args = \ |
| 189 | self._kwargs = self._key = None |
| 190 | return res |
| 191 | |
| 192 | def cancel(self): |
| 193 | ''' |
| 194 | Cancel finalization of the object |
| 195 | ''' |
| 196 | try: |
| 197 | del _finalizer_registry[self._key] |
| 198 | except KeyError: |
| 199 | pass |
| 200 | else: |
| 201 | self._weakref = self._callback = self._args = \ |
| 202 | self._kwargs = self._key = None |
| 203 | |
| 204 | def still_active(self): |
| 205 | ''' |
| 206 | Return whether this finalizer is still waiting to invoke callback |
| 207 | ''' |
| 208 | return self._key in _finalizer_registry |
| 209 | |
| 210 | def __repr__(self): |
| 211 | try: |
| 212 | obj = self._weakref() |
| 213 | except (AttributeError, TypeError): |
| 214 | obj = None |
| 215 | |
| 216 | if obj is None: |
| 217 | return '<Finalize object, dead>' |
| 218 | |
| 219 | x = '<Finalize object, callback=%s' % \ |
| 220 | getattr(self._callback, '__name__', self._callback) |
| 221 | if self._args: |
| 222 | x += ', args=' + str(self._args) |
| 223 | if self._kwargs: |
| 224 | x += ', kwargs=' + str(self._kwargs) |
| 225 | if self._key[0] is not None: |
| 226 | x += ', exitprority=' + str(self._key[0]) |
| 227 | return x + '>' |
| 228 | |
| 229 | |
| 230 | def _run_finalizers(minpriority=None): |
| 231 | ''' |
| 232 | Run all finalizers whose exit priority is not None and at least minpriority |
| 233 | |
| 234 | Finalizers with highest priority are called first; finalizers with |
| 235 | the same priority will be called in reverse order of creation. |
| 236 | ''' |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 237 | if _finalizer_registry is None: |
| 238 | # This function may be called after this module's globals are |
| 239 | # destroyed. See the _exit_function function in this module for more |
| 240 | # notes. |
| 241 | return |
Alexander Belopolsky | 7f704c1 | 2012-09-09 13:25:06 -0400 | [diff] [blame] | 242 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 243 | if minpriority is None: |
| 244 | f = lambda p : p[0][0] is not None |
| 245 | else: |
| 246 | f = lambda p : p[0][0] is not None and p[0][0] >= minpriority |
| 247 | |
| 248 | items = [x for x in list(_finalizer_registry.items()) if f(x)] |
| 249 | items.sort(reverse=True) |
| 250 | |
| 251 | for key, finalizer in items: |
| 252 | sub_debug('calling %s', finalizer) |
| 253 | try: |
| 254 | finalizer() |
| 255 | except Exception: |
| 256 | import traceback |
| 257 | traceback.print_exc() |
| 258 | |
| 259 | if minpriority is None: |
| 260 | _finalizer_registry.clear() |
| 261 | |
| 262 | # |
| 263 | # Clean up on exit |
| 264 | # |
| 265 | |
| 266 | def is_exiting(): |
| 267 | ''' |
| 268 | Returns true if the process is shutting down |
| 269 | ''' |
| 270 | return _exiting or _exiting is None |
| 271 | |
| 272 | _exiting = False |
| 273 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 274 | def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 275 | active_children=process.active_children, |
| 276 | current_process=process.current_process): |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 277 | # We hold on to references to functions in the arglist due to the |
| 278 | # situation described below, where this function is called after this |
| 279 | # module's globals are destroyed. |
| 280 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 281 | global _exiting |
| 282 | |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 283 | if not _exiting: |
| 284 | _exiting = True |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 285 | |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 286 | info('process shutting down') |
| 287 | debug('running all "atexit" finalizers with priority >= 0') |
| 288 | _run_finalizers(0) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 289 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 290 | if current_process() is not None: |
| 291 | # We check if the current process is None here because if |
Andrew Svetlov | 5b89840 | 2012-12-18 21:26:36 +0200 | [diff] [blame] | 292 | # it's None, any call to ``active_children()`` will raise |
Richard Oudkerk | e8cd6bb | 2012-09-13 17:27:15 +0100 | [diff] [blame] | 293 | # an AttributeError (active_children winds up trying to |
| 294 | # get attributes from util._current_process). One |
| 295 | # situation where this can happen is if someone has |
| 296 | # manipulated sys.modules, causing this module to be |
| 297 | # garbage collected. The destructor for the module type |
| 298 | # then replaces all values in the module dict with None. |
| 299 | # For instance, after setuptools runs a test it replaces |
| 300 | # sys.modules with a copy created earlier. See issues |
| 301 | # #9775 and #15881. Also related: #4106, #9205, and |
| 302 | # #9207. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 303 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 304 | for p in active_children(): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 305 | if p.daemon: |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 306 | info('calling terminate() for daemon %s', p.name) |
| 307 | p._popen.terminate() |
| 308 | |
| 309 | for p in active_children(): |
| 310 | info('calling join() for process %s', p.name) |
| 311 | p.join() |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 312 | |
| 313 | debug('running the remaining "atexit" finalizers') |
| 314 | _run_finalizers() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 315 | |
| 316 | atexit.register(_exit_function) |
| 317 | |
| 318 | # |
| 319 | # Some fork aware types |
| 320 | # |
| 321 | |
| 322 | class ForkAwareThreadLock(object): |
| 323 | def __init__(self): |
Richard Oudkerk | 409c313 | 2013-04-17 20:58:00 +0100 | [diff] [blame] | 324 | self._reset() |
| 325 | register_after_fork(self, ForkAwareThreadLock._reset) |
| 326 | |
| 327 | def _reset(self): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 328 | self._lock = threading.Lock() |
| 329 | self.acquire = self._lock.acquire |
| 330 | self.release = self._lock.release |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 331 | |
| 332 | class ForkAwareLocal(threading.local): |
| 333 | def __init__(self): |
| 334 | register_after_fork(self, lambda obj : obj.__dict__.clear()) |
| 335 | def __reduce__(self): |
| 336 | return type(self), () |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 337 | |
| 338 | # |
| 339 | # Close fds except those specified |
| 340 | # |
| 341 | |
| 342 | try: |
| 343 | MAXFD = os.sysconf("SC_OPEN_MAX") |
| 344 | except Exception: |
| 345 | MAXFD = 256 |
| 346 | |
| 347 | def close_all_fds_except(fds): |
| 348 | fds = list(fds) + [-1, MAXFD] |
| 349 | fds.sort() |
| 350 | assert fds[-1] == MAXFD, 'fd too large' |
| 351 | for i in range(len(fds) - 1): |
| 352 | os.closerange(fds[i]+1, fds[i+1]) |
| 353 | |
| 354 | # |
| 355 | # Start a program with only specified fds kept open |
| 356 | # |
| 357 | |
| 358 | def spawnv_passfds(path, args, passfds): |
Victor Stinner | 67973c0 | 2013-08-28 12:21:47 +0200 | [diff] [blame] | 359 | import _posixsubprocess |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 360 | passfds = sorted(passfds) |
Victor Stinner | daf4555 | 2013-08-28 00:53:59 +0200 | [diff] [blame] | 361 | errpipe_read, errpipe_write = os.pipe() |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 362 | try: |
| 363 | return _posixsubprocess.fork_exec( |
| 364 | args, [os.fsencode(path)], True, passfds, None, None, |
| 365 | -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, |
| 366 | False, False, None) |
| 367 | finally: |
| 368 | os.close(errpipe_read) |
| 369 | os.close(errpipe_write) |