Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Module providing various facilities to other parts of the package |
| 3 | # |
| 4 | # multiprocessing/util.py |
| 5 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 6 | # Copyright (c) 2006-2008, R Oudkerk |
Richard Oudkerk | 3e268aa | 2012-04-30 12:13:55 +0100 | [diff] [blame] | 7 | # Licensed to PSF under a Contributor Agreement. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 8 | # |
| 9 | |
Richard Oudkerk | 77c84f2 | 2012-05-18 14:28:02 +0100 | [diff] [blame] | 10 | import sys |
Antoine Pitrou | 176f07d | 2011-06-06 19:35:31 +0200 | [diff] [blame] | 11 | import functools |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 12 | import os |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 13 | import itertools |
| 14 | import weakref |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 15 | import atexit |
| 16 | import threading # we want threading to install it's |
| 17 | # cleanup function before multiprocessing does |
Antoine Pitrou | ebdcd85 | 2012-05-18 18:33:07 +0200 | [diff] [blame] | 18 | from subprocess import _args_from_interpreter_flags |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 19 | |
| 20 | from multiprocessing.process import current_process, active_children |
| 21 | |
| 22 | __all__ = [ |
| 23 | 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', |
| 24 | 'log_to_stderr', 'get_temp_dir', 'register_after_fork', |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 25 | 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', |
| 26 | 'SUBDEBUG', 'SUBWARNING', |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 27 | ] |
| 28 | |
| 29 | # |
| 30 | # Logging |
| 31 | # |
| 32 | |
| 33 | NOTSET = 0 |
| 34 | SUBDEBUG = 5 |
| 35 | DEBUG = 10 |
| 36 | INFO = 20 |
| 37 | SUBWARNING = 25 |
| 38 | |
| 39 | LOGGER_NAME = 'multiprocessing' |
| 40 | DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' |
| 41 | |
| 42 | _logger = None |
| 43 | _log_to_stderr = False |
| 44 | |
| 45 | def sub_debug(msg, *args): |
| 46 | if _logger: |
| 47 | _logger.log(SUBDEBUG, msg, *args) |
| 48 | |
| 49 | def debug(msg, *args): |
| 50 | if _logger: |
| 51 | _logger.log(DEBUG, msg, *args) |
| 52 | |
| 53 | def info(msg, *args): |
| 54 | if _logger: |
| 55 | _logger.log(INFO, msg, *args) |
| 56 | |
| 57 | def sub_warning(msg, *args): |
| 58 | if _logger: |
| 59 | _logger.log(SUBWARNING, msg, *args) |
| 60 | |
| 61 | def get_logger(): |
| 62 | ''' |
| 63 | Returns logger used by multiprocessing |
| 64 | ''' |
| 65 | global _logger |
Florent Xicluna | 04842a8 | 2011-11-11 20:05:50 +0100 | [diff] [blame] | 66 | import logging |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 67 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 68 | logging._acquireLock() |
| 69 | try: |
| 70 | if not _logger: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 71 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 72 | _logger = logging.getLogger(LOGGER_NAME) |
| 73 | _logger.propagate = 0 |
| 74 | logging.addLevelName(SUBDEBUG, 'SUBDEBUG') |
| 75 | logging.addLevelName(SUBWARNING, 'SUBWARNING') |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 76 | |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 77 | # XXX multiprocessing should cleanup before logging |
| 78 | if hasattr(atexit, 'unregister'): |
| 79 | atexit.unregister(_exit_function) |
| 80 | atexit.register(_exit_function) |
| 81 | else: |
| 82 | atexit._exithandlers.remove((_exit_function, (), {})) |
| 83 | atexit._exithandlers.append((_exit_function, (), {})) |
| 84 | |
| 85 | finally: |
| 86 | logging._releaseLock() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 87 | |
| 88 | return _logger |
| 89 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 90 | def log_to_stderr(level=None): |
| 91 | ''' |
| 92 | Turn on logging and add a handler which prints to stderr |
| 93 | ''' |
| 94 | global _log_to_stderr |
| 95 | import logging |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 96 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 97 | logger = get_logger() |
| 98 | formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) |
| 99 | handler = logging.StreamHandler() |
| 100 | handler.setFormatter(formatter) |
| 101 | logger.addHandler(handler) |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 102 | |
| 103 | if level: |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 104 | logger.setLevel(level) |
| 105 | _log_to_stderr = True |
Jesse Noller | 41faa54 | 2009-01-25 03:45:53 +0000 | [diff] [blame] | 106 | return _logger |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 107 | |
| 108 | # |
| 109 | # Function returning a temp directory which will be removed on exit |
| 110 | # |
| 111 | |
| 112 | def get_temp_dir(): |
| 113 | # get name of a temp directory which will be automatically cleaned up |
| 114 | if current_process()._tempdir is None: |
| 115 | import shutil, tempfile |
| 116 | tempdir = tempfile.mkdtemp(prefix='pymp-') |
| 117 | info('created temp directory %s', tempdir) |
| 118 | Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) |
| 119 | current_process()._tempdir = tempdir |
| 120 | return current_process()._tempdir |
| 121 | |
| 122 | # |
| 123 | # Support for reinitialization of objects when bootstrapping a child process |
| 124 | # |
| 125 | |
| 126 | _afterfork_registry = weakref.WeakValueDictionary() |
| 127 | _afterfork_counter = itertools.count() |
| 128 | |
| 129 | def _run_after_forkers(): |
| 130 | items = list(_afterfork_registry.items()) |
| 131 | items.sort() |
| 132 | for (index, ident, func), obj in items: |
| 133 | try: |
| 134 | func(obj) |
| 135 | except Exception as e: |
| 136 | info('after forker raised exception %s', e) |
| 137 | |
| 138 | def register_after_fork(obj, func): |
| 139 | _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj |
| 140 | |
| 141 | # |
| 142 | # Finalization using weakrefs |
| 143 | # |
| 144 | |
| 145 | _finalizer_registry = {} |
| 146 | _finalizer_counter = itertools.count() |
| 147 | |
| 148 | |
| 149 | class Finalize(object): |
| 150 | ''' |
| 151 | Class which supports object finalization using weakrefs |
| 152 | ''' |
| 153 | def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): |
| 154 | assert exitpriority is None or type(exitpriority) is int |
| 155 | |
| 156 | if obj is not None: |
| 157 | self._weakref = weakref.ref(obj, self) |
| 158 | else: |
| 159 | assert exitpriority is not None |
| 160 | |
| 161 | self._callback = callback |
| 162 | self._args = args |
| 163 | self._kwargs = kwargs or {} |
| 164 | self._key = (exitpriority, next(_finalizer_counter)) |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 165 | self._pid = os.getpid() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 166 | |
| 167 | _finalizer_registry[self._key] = self |
| 168 | |
Antoine Pitrou | 71a28a9 | 2011-07-09 01:03:00 +0200 | [diff] [blame] | 169 | def __call__(self, wr=None, |
| 170 | # Need to bind these locally because the globals can have |
| 171 | # been cleared at shutdown |
| 172 | _finalizer_registry=_finalizer_registry, |
Richard Oudkerk | ad06444 | 2012-06-04 18:58:59 +0100 | [diff] [blame] | 173 | sub_debug=sub_debug, getpid=os.getpid): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 174 | ''' |
| 175 | Run the callback unless it has already been called or cancelled |
| 176 | ''' |
| 177 | try: |
| 178 | del _finalizer_registry[self._key] |
| 179 | except KeyError: |
| 180 | sub_debug('finalizer no longer registered') |
| 181 | else: |
Richard Oudkerk | ad06444 | 2012-06-04 18:58:59 +0100 | [diff] [blame] | 182 | if self._pid != getpid(): |
Richard Oudkerk | 739ae56 | 2012-05-25 13:54:53 +0100 | [diff] [blame] | 183 | sub_debug('finalizer ignored because different process') |
| 184 | res = None |
| 185 | else: |
| 186 | sub_debug('finalizer calling %s with args %s and kwargs %s', |
| 187 | self._callback, self._args, self._kwargs) |
| 188 | res = self._callback(*self._args, **self._kwargs) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 189 | self._weakref = self._callback = self._args = \ |
| 190 | self._kwargs = self._key = None |
| 191 | return res |
| 192 | |
| 193 | def cancel(self): |
| 194 | ''' |
| 195 | Cancel finalization of the object |
| 196 | ''' |
| 197 | try: |
| 198 | del _finalizer_registry[self._key] |
| 199 | except KeyError: |
| 200 | pass |
| 201 | else: |
| 202 | self._weakref = self._callback = self._args = \ |
| 203 | self._kwargs = self._key = None |
| 204 | |
| 205 | def still_active(self): |
| 206 | ''' |
| 207 | Return whether this finalizer is still waiting to invoke callback |
| 208 | ''' |
| 209 | return self._key in _finalizer_registry |
| 210 | |
| 211 | def __repr__(self): |
| 212 | try: |
| 213 | obj = self._weakref() |
| 214 | except (AttributeError, TypeError): |
| 215 | obj = None |
| 216 | |
| 217 | if obj is None: |
| 218 | return '<Finalize object, dead>' |
| 219 | |
| 220 | x = '<Finalize object, callback=%s' % \ |
| 221 | getattr(self._callback, '__name__', self._callback) |
| 222 | if self._args: |
| 223 | x += ', args=' + str(self._args) |
| 224 | if self._kwargs: |
| 225 | x += ', kwargs=' + str(self._kwargs) |
| 226 | if self._key[0] is not None: |
| 227 | x += ', exitprority=' + str(self._key[0]) |
| 228 | return x + '>' |
| 229 | |
| 230 | |
| 231 | def _run_finalizers(minpriority=None): |
| 232 | ''' |
| 233 | Run all finalizers whose exit priority is not None and at least minpriority |
| 234 | |
| 235 | Finalizers with highest priority are called first; finalizers with |
| 236 | the same priority will be called in reverse order of creation. |
| 237 | ''' |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 238 | if _finalizer_registry is None: |
| 239 | # This function may be called after this module's globals are |
| 240 | # destroyed. See the _exit_function function in this module for more |
| 241 | # notes. |
| 242 | return |
Alexander Belopolsky | 7f704c1 | 2012-09-09 13:25:06 -0400 | [diff] [blame] | 243 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 244 | if minpriority is None: |
| 245 | f = lambda p : p[0][0] is not None |
| 246 | else: |
| 247 | f = lambda p : p[0][0] is not None and p[0][0] >= minpriority |
| 248 | |
| 249 | items = [x for x in list(_finalizer_registry.items()) if f(x)] |
| 250 | items.sort(reverse=True) |
| 251 | |
| 252 | for key, finalizer in items: |
| 253 | sub_debug('calling %s', finalizer) |
| 254 | try: |
| 255 | finalizer() |
| 256 | except Exception: |
| 257 | import traceback |
| 258 | traceback.print_exc() |
| 259 | |
| 260 | if minpriority is None: |
| 261 | _finalizer_registry.clear() |
| 262 | |
| 263 | # |
| 264 | # Clean up on exit |
| 265 | # |
| 266 | |
| 267 | def is_exiting(): |
| 268 | ''' |
| 269 | Returns true if the process is shutting down |
| 270 | ''' |
| 271 | return _exiting or _exiting is None |
| 272 | |
| 273 | _exiting = False |
| 274 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 275 | def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, |
| 276 | active_children=active_children, |
| 277 | current_process=current_process): |
| 278 | # We hold on to references to functions in the arglist due to the |
| 279 | # situation described below, where this function is called after this |
| 280 | # module's globals are destroyed. |
| 281 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 282 | global _exiting |
| 283 | |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 284 | if not _exiting: |
| 285 | _exiting = True |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 286 | |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 287 | info('process shutting down') |
| 288 | debug('running all "atexit" finalizers with priority >= 0') |
| 289 | _run_finalizers(0) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 290 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 291 | if current_process() is not None: |
| 292 | # We check if the current process is None here because if |
Andrew Svetlov | 5b89840 | 2012-12-18 21:26:36 +0200 | [diff] [blame] | 293 | # it's None, any call to ``active_children()`` will raise |
Richard Oudkerk | e8cd6bb | 2012-09-13 17:27:15 +0100 | [diff] [blame] | 294 | # an AttributeError (active_children winds up trying to |
| 295 | # get attributes from util._current_process). One |
| 296 | # situation where this can happen is if someone has |
| 297 | # manipulated sys.modules, causing this module to be |
| 298 | # garbage collected. The destructor for the module type |
| 299 | # then replaces all values in the module dict with None. |
| 300 | # For instance, after setuptools runs a test it replaces |
| 301 | # sys.modules with a copy created earlier. See issues |
| 302 | # #9775 and #15881. Also related: #4106, #9205, and |
| 303 | # #9207. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 304 | |
Alexander Belopolsky | f36c49d | 2012-09-09 13:20:58 -0400 | [diff] [blame] | 305 | for p in active_children(): |
| 306 | if p._daemonic: |
| 307 | info('calling terminate() for daemon %s', p.name) |
| 308 | p._popen.terminate() |
| 309 | |
| 310 | for p in active_children(): |
| 311 | info('calling join() for process %s', p.name) |
| 312 | p.join() |
Richard Oudkerk | 73d9a29 | 2012-06-14 15:30:10 +0100 | [diff] [blame] | 313 | |
| 314 | debug('running the remaining "atexit" finalizers') |
| 315 | _run_finalizers() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 316 | |
| 317 | atexit.register(_exit_function) |
| 318 | |
| 319 | # |
| 320 | # Some fork aware types |
| 321 | # |
| 322 | |
| 323 | class ForkAwareThreadLock(object): |
| 324 | def __init__(self): |
| 325 | self._lock = threading.Lock() |
| 326 | self.acquire = self._lock.acquire |
| 327 | self.release = self._lock.release |
| 328 | register_after_fork(self, ForkAwareThreadLock.__init__) |
| 329 | |
| 330 | class ForkAwareLocal(threading.local): |
| 331 | def __init__(self): |
| 332 | register_after_fork(self, lambda obj : obj.__dict__.clear()) |
| 333 | def __reduce__(self): |
| 334 | return type(self), () |