# | |
# Module providing various facilities to other parts of the package | |
# | |
# multiprocessing/util.py | |
# | |
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt | |
# | |
import itertools | |
import weakref | |
import copy_reg | |
import atexit | |
import threading # we want threading to install it's | |
# cleanup function before multiprocessing does | |
from multiprocessing.process import current_process, active_children | |
__all__ = [ | |
'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', | |
'log_to_stderr', 'get_temp_dir', 'register_after_fork', | |
'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal' | |
] | |
# | |
# Logging | |
# | |
NOTSET = 0 | |
SUBDEBUG = 5 | |
DEBUG = 10 | |
INFO = 20 | |
SUBWARNING = 25 | |
LOGGER_NAME = 'multiprocessing' | |
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' | |
_logger = None | |
_log_to_stderr = False | |
def sub_debug(msg, *args): | |
if _logger: | |
_logger.log(SUBDEBUG, msg, *args) | |
def debug(msg, *args): | |
if _logger: | |
_logger.log(DEBUG, msg, *args) | |
def info(msg, *args): | |
if _logger: | |
_logger.log(INFO, msg, *args) | |
def sub_warning(msg, *args): | |
if _logger: | |
_logger.log(SUBWARNING, msg, *args) | |
def get_logger(): | |
''' | |
Returns logger used by multiprocessing | |
''' | |
global _logger | |
if not _logger: | |
import logging, atexit | |
# XXX multiprocessing should cleanup before logging | |
if hasattr(atexit, 'unregister'): | |
atexit.unregister(_exit_function) | |
atexit.register(_exit_function) | |
else: | |
atexit._exithandlers.remove((_exit_function, (), {})) | |
atexit._exithandlers.append((_exit_function, (), {})) | |
_check_logger_class() | |
_logger = logging.getLogger(LOGGER_NAME) | |
return _logger | |
def _check_logger_class(): | |
''' | |
Make sure process name is recorded when loggers are used | |
''' | |
# XXX This function is unnecessary once logging is patched | |
import logging | |
if hasattr(logging, 'multiprocessing'): | |
return | |
logging._acquireLock() | |
try: | |
OldLoggerClass = logging.getLoggerClass() | |
if not getattr(OldLoggerClass, '_process_aware', False): | |
class ProcessAwareLogger(OldLoggerClass): | |
_process_aware = True | |
def makeRecord(self, *args, **kwds): | |
record = OldLoggerClass.makeRecord(self, *args, **kwds) | |
record.processName = current_process()._name | |
return record | |
logging.setLoggerClass(ProcessAwareLogger) | |
finally: | |
logging._releaseLock() | |
def log_to_stderr(level=None): | |
''' | |
Turn on logging and add a handler which prints to stderr | |
''' | |
global _log_to_stderr | |
import logging | |
logger = get_logger() | |
formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) | |
handler = logging.StreamHandler() | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
if level is not None: | |
logger.setLevel(level) | |
_log_to_stderr = True | |
# | |
# Function returning a temp directory which will be removed on exit | |
# | |
def get_temp_dir(): | |
# get name of a temp directory which will be automatically cleaned up | |
if current_process()._tempdir is None: | |
import shutil, tempfile | |
tempdir = tempfile.mkdtemp(prefix='pymp-') | |
info('created temp directory %s', tempdir) | |
Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) | |
current_process()._tempdir = tempdir | |
return current_process()._tempdir | |
# | |
# Support for reinitialization of objects when bootstrapping a child process | |
# | |
_afterfork_registry = weakref.WeakValueDictionary() | |
_afterfork_counter = itertools.count() | |
def _run_after_forkers(): | |
items = list(_afterfork_registry.items()) | |
items.sort() | |
for (index, ident, func), obj in items: | |
try: | |
func(obj) | |
except Exception, e: | |
info('after forker raised exception %s', e) | |
def register_after_fork(obj, func): | |
_afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj | |
# | |
# Finalization using weakrefs | |
# | |
_finalizer_registry = {} | |
_finalizer_counter = itertools.count() | |
class Finalize(object): | |
''' | |
Class which supports object finalization using weakrefs | |
''' | |
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): | |
assert exitpriority is None or type(exitpriority) is int | |
if obj is not None: | |
self._weakref = weakref.ref(obj, self) | |
else: | |
assert exitpriority is not None | |
self._callback = callback | |
self._args = args | |
self._kwargs = kwargs or {} | |
self._key = (exitpriority, _finalizer_counter.next()) | |
_finalizer_registry[self._key] = self | |
def __call__(self, wr=None): | |
''' | |
Run the callback unless it has already been called or cancelled | |
''' | |
try: | |
del _finalizer_registry[self._key] | |
except KeyError: | |
sub_debug('finalizer no longer registered') | |
else: | |
sub_debug('finalizer calling %s with args %s and kwargs %s', | |
self._callback, self._args, self._kwargs) | |
res = self._callback(*self._args, **self._kwargs) | |
self._weakref = self._callback = self._args = \ | |
self._kwargs = self._key = None | |
return res | |
def cancel(self): | |
''' | |
Cancel finalization of the object | |
''' | |
try: | |
del _finalizer_registry[self._key] | |
except KeyError: | |
pass | |
else: | |
self._weakref = self._callback = self._args = \ | |
self._kwargs = self._key = None | |
def still_active(self): | |
''' | |
Return whether this finalizer is still waiting to invoke callback | |
''' | |
return self._key in _finalizer_registry | |
def __repr__(self): | |
try: | |
obj = self._weakref() | |
except (AttributeError, TypeError): | |
obj = None | |
if obj is None: | |
return '<Finalize object, dead>' | |
x = '<Finalize object, callback=%s' % \ | |
getattr(self._callback, '__name__', self._callback) | |
if self._args: | |
x += ', args=' + str(self._args) | |
if self._kwargs: | |
x += ', kwargs=' + str(self._kwargs) | |
if self._key[0] is not None: | |
x += ', exitprority=' + str(self._key[0]) | |
return x + '>' | |
def _run_finalizers(minpriority=None): | |
''' | |
Run all finalizers whose exit priority is not None and at least minpriority | |
Finalizers with highest priority are called first; finalizers with | |
the same priority will be called in reverse order of creation. | |
''' | |
if minpriority is None: | |
f = lambda p : p[0][0] is not None | |
else: | |
f = lambda p : p[0][0] is not None and p[0][0] >= minpriority | |
items = [x for x in _finalizer_registry.items() if f(x)] | |
items.sort(reverse=True) | |
for key, finalizer in items: | |
sub_debug('calling %s', finalizer) | |
try: | |
finalizer() | |
except Exception: | |
import traceback | |
traceback.print_exc() | |
if minpriority is None: | |
_finalizer_registry.clear() | |
# | |
# Clean up on exit | |
# | |
def is_exiting(): | |
''' | |
Returns true if the process is shutting down | |
''' | |
return _exiting or _exiting is None | |
_exiting = False | |
def _exit_function(): | |
global _exiting | |
info('process shutting down') | |
debug('running all "atexit" finalizers with priority >= 0') | |
_run_finalizers(0) | |
for p in active_children(): | |
if p._daemonic: | |
info('calling terminate() for daemon %s', p.get_name()) | |
p._popen.terminate() | |
for p in active_children(): | |
info('calling join() for process %s', p.get_name()) | |
p.join() | |
debug('running the remaining "atexit" finalizers') | |
_run_finalizers() | |
atexit.register(_exit_function) | |
# | |
# Some fork aware types | |
# | |
class ForkAwareThreadLock(object): | |
def __init__(self): | |
self._lock = threading.Lock() | |
self.acquire = self._lock.acquire | |
self.release = self._lock.release | |
register_after_fork(self, ForkAwareThreadLock.__init__) | |
class ForkAwareLocal(threading.local): | |
def __init__(self): | |
register_after_fork(self, lambda obj : obj.__dict__.clear()) | |
def __reduce__(self): | |
return type(self), () | |
# | |
# Try making some callable types picklable | |
# | |
def _reduce_method(m): | |
if m.im_self is None: | |
return getattr, (m.im_class, m.im_func.func_name) | |
else: | |
return getattr, (m.im_self, m.im_func.func_name) | |
copy_reg.pickle(type(Finalize.__init__), _reduce_method) | |
def _reduce_method_descriptor(m): | |
return getattr, (m.__objclass__, m.__name__) | |
copy_reg.pickle(type(list.append), _reduce_method_descriptor) | |
copy_reg.pickle(type(int.__add__), _reduce_method_descriptor) | |
def _reduce_builtin_function_or_method(m): | |
return getattr, (m.__self__, m.__name__) | |
copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method) | |
copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method) | |
try: | |
from functools import partial | |
except ImportError: | |
pass | |
else: | |
def _reduce_partial(p): | |
return _rebuild_partial, (p.func, p.args, p.keywords or {}) | |
def _rebuild_partial(func, args, keywords): | |
return partial(func, *args, **keywords) | |
copy_reg.pickle(partial, _reduce_partial) |