Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Module providing various facilities to other parts of the package |
| 3 | # |
| 4 | # multiprocessing/util.py |
| 5 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 6 | # Copyright (c) 2006-2008, R Oudkerk |
Richard Oudkerk | 3e268aa | 2012-04-30 12:13:55 +0100 | [diff] [blame] | 7 | # Licensed to PSF under a Contributor Agreement. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 8 | # |
| 9 | |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 10 | import os |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 11 | import itertools |
| 12 | import weakref |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 13 | import atexit |
| 14 | import threading # we want threading to install it's |
| 15 | # cleanup function before multiprocessing does |
Antoine Pitrou | ebdcd85 | 2012-05-18 18:33:07 +0200 | [diff] [blame] | 16 | from subprocess import _args_from_interpreter_flags |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 17 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 18 | from . import process |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 19 | |
| 20 | __all__ = [ |
| 21 | 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', |
| 22 | 'log_to_stderr', 'get_temp_dir', 'register_after_fork', |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 23 | 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 24 | 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 25 | ] |
| 26 | |
| 27 | # |
| 28 | # Logging |
| 29 | # |
| 30 | |
| 31 | NOTSET = 0 |
| 32 | SUBDEBUG = 5 |
| 33 | DEBUG = 10 |
| 34 | INFO = 20 |
| 35 | SUBWARNING = 25 |
| 36 | |
| 37 | LOGGER_NAME = 'multiprocessing' |
| 38 | DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' |
| 39 | |
| 40 | _logger = None |
| 41 | _log_to_stderr = False |
| 42 | |
| 43 | def sub_debug(msg, *args): |
| 44 | if _logger: |
| 45 | _logger.log(SUBDEBUG, msg, *args) |
| 46 | |
| 47 | def debug(msg, *args): |
| 48 | if _logger: |
| 49 | _logger.log(DEBUG, msg, *args) |
| 50 | |
| 51 | def info(msg, *args): |
| 52 | if _logger: |
| 53 | _logger.log(INFO, msg, *args) |
| 54 | |
| 55 | def sub_warning(msg, *args): |
| 56 | if _logger: |
| 57 | _logger.log(SUBWARNING, msg, *args) |
| 58 | |
| 59 | def get_logger(): |
| 60 | ''' |
| 61 | Returns logger used by multiprocessing |
| 62 | ''' |
| 63 | global _logger |
Florent Xicluna | 04842a8 | 2011-11-11 20:05:50 +0100 | [diff] [blame] | 64 | import logging |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 65 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 66 | logging._acquireLock() |
| 67 | try: |
| 68 | if not _logger: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 69 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 70 | _logger = logging.getLogger(LOGGER_NAME) |
| 71 | _logger.propagate = 0 |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 72 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 73 | # XXX multiprocessing should cleanup before logging |
| 74 | if hasattr(atexit, 'unregister'): |
| 75 | atexit.unregister(_exit_function) |
| 76 | atexit.register(_exit_function) |
| 77 | else: |
| 78 | atexit._exithandlers.remove((_exit_function, (), {})) |
| 79 | atexit._exithandlers.append((_exit_function, (), {})) |
| 80 | |
| 81 | finally: |
| 82 | logging._releaseLock() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 83 | |
| 84 | return _logger |
| 85 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 86 | def log_to_stderr(level=None): |
| 87 | ''' |
| 88 | Turn on logging and add a handler which prints to stderr |
| 89 | ''' |
| 90 | global _log_to_stderr |
| 91 | import logging |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 92 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 93 | logger = get_logger() |
| 94 | formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) |
| 95 | handler = logging.StreamHandler() |
| 96 | handler.setFormatter(formatter) |
| 97 | logger.addHandler(handler) |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 98 | |
| 99 | if level: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 100 | logger.setLevel(level) |
| 101 | _log_to_stderr = True |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 102 | return _logger |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 103 | |
| 104 | # |
| 105 | # Function returning a temp directory which will be removed on exit |
| 106 | # |
| 107 | |
| 108 | def get_temp_dir(): |
| 109 | # get name of a temp directory which will be automatically cleaned up |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 110 | tempdir = process.current_process()._config.get('tempdir') |
| 111 | if tempdir is None: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 112 | import shutil, tempfile |
| 113 | tempdir = tempfile.mkdtemp(prefix='pymp-') |
| 114 | info('created temp directory %s', tempdir) |
| 115 | Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 116 | process.current_process()._config['tempdir'] = tempdir |
| 117 | return tempdir |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 118 | |
| 119 | # |
| 120 | # Support for reinitialization of objects when bootstrapping a child process |
| 121 | # |
| 122 | |
| 123 | _afterfork_registry = weakref.WeakValueDictionary() |
| 124 | _afterfork_counter = itertools.count() |
| 125 | |
| 126 | def _run_after_forkers(): |
| 127 | items = list(_afterfork_registry.items()) |
| 128 | items.sort() |
| 129 | for (index, ident, func), obj in items: |
| 130 | try: |
| 131 | func(obj) |
| 132 | except Exception as e: |
| 133 | info('after forker raised exception %s', e) |
| 134 | |
| 135 | def register_after_fork(obj, func): |
| 136 | _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj |
| 137 | |
| 138 | # |
| 139 | # Finalization using weakrefs |
| 140 | # |
| 141 | |
| 142 | _finalizer_registry = {} |
| 143 | _finalizer_counter = itertools.count() |
| 144 | |
| 145 | |
| 146 | class Finalize(object): |
| 147 | ''' |
| 148 | Class which supports object finalization using weakrefs |
| 149 | ''' |
| 150 | def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): |
| 151 | assert exitpriority is None or type(exitpriority) is int |
| 152 | |
| 153 | if obj is not None: |
| 154 | self._weakref = weakref.ref(obj, self) |
| 155 | else: |
| 156 | assert exitpriority is not None |
| 157 | |
| 158 | self._callback = callback |
| 159 | self._args = args |
| 160 | self._kwargs = kwargs or {} |
| 161 | self._key = (exitpriority, next(_finalizer_counter)) |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 162 | self._pid = os.getpid() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 163 | |
| 164 | _finalizer_registry[self._key] = self |
| 165 | |
Antoine Pitrou | 71a28a9 | 2011-07-09 01:03:00 +0200 | [diff] [blame] | 166 | def __call__(self, wr=None, |
| 167 | # Need to bind these locally because the globals can have |
| 168 | # been cleared at shutdown |
| 169 | _finalizer_registry=_finalizer_registry, |
Richard Oudkerk | ad06444 | 2012-06-04 18:58:59 +0100 | [diff] [blame] | 170 | sub_debug=sub_debug, getpid=os.getpid): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 171 | ''' |
| 172 | Run the callback unless it has already been called or cancelled |
| 173 | ''' |
| 174 | try: |
| 175 | del _finalizer_registry[self._key] |
| 176 | except KeyError: |
| 177 | sub_debug('finalizer no longer registered') |
| 178 | else: |
Richard Oudkerk | ad06444 | 2012-06-04 18:58:59 +0100 | [diff] [blame] | 179 | if self._pid != getpid(): |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 180 | sub_debug('finalizer ignored because different process') |
| 181 | res = None |
| 182 | else: |
| 183 | sub_debug('finalizer calling %s with args %s and kwargs %s', |
| 184 | self._callback, self._args, self._kwargs) |
| 185 | res = self._callback(*self._args, **self._kwargs) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 186 | self._weakref = self._callback = self._args = \ |
| 187 | self._kwargs = self._key = None |
| 188 | return res |
| 189 | |
| 190 | def cancel(self): |
| 191 | ''' |
| 192 | Cancel finalization of the object |
| 193 | ''' |
| 194 | try: |
| 195 | del _finalizer_registry[self._key] |
| 196 | except KeyError: |
| 197 | pass |
| 198 | else: |
| 199 | self._weakref = self._callback = self._args = \ |
| 200 | self._kwargs = self._key = None |
| 201 | |
| 202 | def still_active(self): |
| 203 | ''' |
| 204 | Return whether this finalizer is still waiting to invoke callback |
| 205 | ''' |
| 206 | return self._key in _finalizer_registry |
| 207 | |
| 208 | def __repr__(self): |
| 209 | try: |
| 210 | obj = self._weakref() |
| 211 | except (AttributeError, TypeError): |
| 212 | obj = None |
| 213 | |
| 214 | if obj is None: |
Serhiy Storchaka | 465e60e | 2014-07-25 23:36:00 +0300 | [diff] [blame] | 215 | return '<%s object, dead>' % self.__class__.__name__ |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 216 | |
Serhiy Storchaka | 465e60e | 2014-07-25 23:36:00 +0300 | [diff] [blame] | 217 | x = '<%s object, callback=%s' % ( |
| 218 | self.__class__.__name__, |
| 219 | getattr(self._callback, '__name__', self._callback)) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 220 | if self._args: |
| 221 | x += ', args=' + str(self._args) |
| 222 | if self._kwargs: |
| 223 | x += ', kwargs=' + str(self._kwargs) |
| 224 | if self._key[0] is not None: |
| 225 | x += ', exitprority=' + str(self._key[0]) |
| 226 | return x + '>' |
| 227 | |
| 228 | |
| 229 | def _run_finalizers(minpriority=None): |
| 230 | ''' |
| 231 | Run all finalizers whose exit priority is not None and at least minpriority |
| 232 | |
| 233 | Finalizers with highest priority are called first; finalizers with |
| 234 | the same priority will be called in reverse order of creation. |
| 235 | ''' |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 236 | if _finalizer_registry is None: |
| 237 | # This function may be called after this module's globals are |
| 238 | # destroyed. See the _exit_function function in this module for more |
| 239 | # notes. |
| 240 | return |
Alexander Belopolsky | 7f704c1 | 2012-09-09 13:25:06 -0400 | [diff] [blame] | 241 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 242 | if minpriority is None: |
| 243 | f = lambda p : p[0][0] is not None |
| 244 | else: |
| 245 | f = lambda p : p[0][0] is not None and p[0][0] >= minpriority |
| 246 | |
| 247 | items = [x for x in list(_finalizer_registry.items()) if f(x)] |
| 248 | items.sort(reverse=True) |
| 249 | |
| 250 | for key, finalizer in items: |
| 251 | sub_debug('calling %s', finalizer) |
| 252 | try: |
| 253 | finalizer() |
| 254 | except Exception: |
| 255 | import traceback |
| 256 | traceback.print_exc() |
| 257 | |
| 258 | if minpriority is None: |
| 259 | _finalizer_registry.clear() |
| 260 | |
| 261 | # |
| 262 | # Clean up on exit |
| 263 | # |
| 264 | |
| 265 | def is_exiting(): |
| 266 | ''' |
| 267 | Returns true if the process is shutting down |
| 268 | ''' |
| 269 | return _exiting or _exiting is None |
| 270 | |
| 271 | _exiting = False |
| 272 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 273 | def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 274 | active_children=process.active_children, |
| 275 | current_process=process.current_process): |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 276 | # We hold on to references to functions in the arglist due to the |
| 277 | # situation described below, where this function is called after this |
| 278 | # module's globals are destroyed. |
| 279 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 280 | global _exiting |
| 281 | |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 282 | if not _exiting: |
| 283 | _exiting = True |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 284 | |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 285 | info('process shutting down') |
| 286 | debug('running all "atexit" finalizers with priority >= 0') |
| 287 | _run_finalizers(0) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 288 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 289 | if current_process() is not None: |
| 290 | # We check if the current process is None here because if |
Andrew Svetlov | 5b89840 | 2012-12-18 21:26:36 +0200 | [diff] [blame] | 291 | # it's None, any call to ``active_children()`` will raise |
Richard Oudkerk | e8cd6bb | 2012-09-13 17:27:15 +0100 | [diff] [blame] | 292 | # an AttributeError (active_children winds up trying to |
| 293 | # get attributes from util._current_process). One |
| 294 | # situation where this can happen is if someone has |
| 295 | # manipulated sys.modules, causing this module to be |
| 296 | # garbage collected. The destructor for the module type |
| 297 | # then replaces all values in the module dict with None. |
| 298 | # For instance, after setuptools runs a test it replaces |
| 299 | # sys.modules with a copy created earlier. See issues |
| 300 | # #9775 and #15881. Also related: #4106, #9205, and |
| 301 | # #9207. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 302 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 303 | for p in active_children(): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 304 | if p.daemon: |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 305 | info('calling terminate() for daemon %s', p.name) |
| 306 | p._popen.terminate() |
| 307 | |
| 308 | for p in active_children(): |
| 309 | info('calling join() for process %s', p.name) |
| 310 | p.join() |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 311 | |
| 312 | debug('running the remaining "atexit" finalizers') |
| 313 | _run_finalizers() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 314 | |
| 315 | atexit.register(_exit_function) |
| 316 | |
| 317 | # |
| 318 | # Some fork aware types |
| 319 | # |
| 320 | |
| 321 | class ForkAwareThreadLock(object): |
| 322 | def __init__(self): |
Richard Oudkerk | 409c313 | 2013-04-17 20:58:00 +0100 | [diff] [blame] | 323 | self._reset() |
| 324 | register_after_fork(self, ForkAwareThreadLock._reset) |
| 325 | |
| 326 | def _reset(self): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 327 | self._lock = threading.Lock() |
| 328 | self.acquire = self._lock.acquire |
| 329 | self.release = self._lock.release |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 330 | |
Charles-François Natali | a924fc7 | 2014-05-25 14:12:12 +0100 | [diff] [blame] | 331 | def __enter__(self): |
| 332 | return self._lock.__enter__() |
| 333 | |
| 334 | def __exit__(self, *args): |
| 335 | return self._lock.__exit__(*args) |
| 336 | |
| 337 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 338 | class ForkAwareLocal(threading.local): |
| 339 | def __init__(self): |
| 340 | register_after_fork(self, lambda obj : obj.__dict__.clear()) |
| 341 | def __reduce__(self): |
| 342 | return type(self), () |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 343 | |
| 344 | # |
| 345 | # Close fds except those specified |
| 346 | # |
| 347 | |
| 348 | try: |
| 349 | MAXFD = os.sysconf("SC_OPEN_MAX") |
| 350 | except Exception: |
| 351 | MAXFD = 256 |
| 352 | |
| 353 | def close_all_fds_except(fds): |
| 354 | fds = list(fds) + [-1, MAXFD] |
| 355 | fds.sort() |
| 356 | assert fds[-1] == MAXFD, 'fd too large' |
| 357 | for i in range(len(fds) - 1): |
| 358 | os.closerange(fds[i]+1, fds[i+1]) |
| 359 | |
| 360 | # |
| 361 | # Start a program with only specified fds kept open |
| 362 | # |
| 363 | |
| 364 | def spawnv_passfds(path, args, passfds): |
Victor Stinner | 67973c0 | 2013-08-28 12:21:47 +0200 | [diff] [blame] | 365 | import _posixsubprocess |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 366 | passfds = sorted(passfds) |
Victor Stinner | daf4555 | 2013-08-28 00:53:59 +0200 | [diff] [blame] | 367 | errpipe_read, errpipe_write = os.pipe() |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 368 | try: |
| 369 | return _posixsubprocess.fork_exec( |
| 370 | args, [os.fsencode(path)], True, passfds, None, None, |
| 371 | -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, |
| 372 | False, False, None) |
| 373 | finally: |
| 374 | os.close(errpipe_read) |
| 375 | os.close(errpipe_write) |