blob: da99063e578717292d24820d348da6925134bb5b [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing various facilities to other parts of the package
3#
4# multiprocessing/util.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
Antoine Pitrou176f07d2011-06-06 19:35:31 +020010import functools
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import itertools
12import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000013import atexit
14import threading # we want threading to install it's
15 # cleanup function before multiprocessing does
16
17from multiprocessing.process import current_process, active_children
18
19__all__ = [
20 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
21 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
Jesse Noller41faa542009-01-25 03:45:53 +000022 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
23 'SUBDEBUG', 'SUBWARNING',
Benjamin Petersone711caf2008-06-11 16:44:04 +000024 ]
25
26#
27# Logging
28#
29
30NOTSET = 0
31SUBDEBUG = 5
32DEBUG = 10
33INFO = 20
34SUBWARNING = 25
35
36LOGGER_NAME = 'multiprocessing'
37DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
38
39_logger = None
40_log_to_stderr = False
41
42def sub_debug(msg, *args):
43 if _logger:
44 _logger.log(SUBDEBUG, msg, *args)
45
46def debug(msg, *args):
47 if _logger:
48 _logger.log(DEBUG, msg, *args)
49
50def info(msg, *args):
51 if _logger:
52 _logger.log(INFO, msg, *args)
53
54def sub_warning(msg, *args):
55 if _logger:
56 _logger.log(SUBWARNING, msg, *args)
57
58def get_logger():
59 '''
60 Returns logger used by multiprocessing
61 '''
62 global _logger
Florent Xicluna04842a82011-11-11 20:05:50 +010063 import logging
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Jesse Noller41faa542009-01-25 03:45:53 +000065 logging._acquireLock()
66 try:
67 if not _logger:
Benjamin Petersone711caf2008-06-11 16:44:04 +000068
Jesse Noller41faa542009-01-25 03:45:53 +000069 _logger = logging.getLogger(LOGGER_NAME)
70 _logger.propagate = 0
71 logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
72 logging.addLevelName(SUBWARNING, 'SUBWARNING')
Benjamin Petersone711caf2008-06-11 16:44:04 +000073
Jesse Noller41faa542009-01-25 03:45:53 +000074 # XXX multiprocessing should cleanup before logging
75 if hasattr(atexit, 'unregister'):
76 atexit.unregister(_exit_function)
77 atexit.register(_exit_function)
78 else:
79 atexit._exithandlers.remove((_exit_function, (), {}))
80 atexit._exithandlers.append((_exit_function, (), {}))
81
82 finally:
83 logging._releaseLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +000084
85 return _logger
86
Benjamin Petersone711caf2008-06-11 16:44:04 +000087def log_to_stderr(level=None):
88 '''
89 Turn on logging and add a handler which prints to stderr
90 '''
91 global _log_to_stderr
92 import logging
Jesse Noller41faa542009-01-25 03:45:53 +000093
Benjamin Petersone711caf2008-06-11 16:44:04 +000094 logger = get_logger()
95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
96 handler = logging.StreamHandler()
97 handler.setFormatter(formatter)
98 logger.addHandler(handler)
Jesse Noller41faa542009-01-25 03:45:53 +000099
100 if level:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000101 logger.setLevel(level)
102 _log_to_stderr = True
Jesse Noller41faa542009-01-25 03:45:53 +0000103 return _logger
Benjamin Petersone711caf2008-06-11 16:44:04 +0000104
105#
106# Function returning a temp directory which will be removed on exit
107#
108
109def get_temp_dir():
110 # get name of a temp directory which will be automatically cleaned up
111 if current_process()._tempdir is None:
112 import shutil, tempfile
113 tempdir = tempfile.mkdtemp(prefix='pymp-')
114 info('created temp directory %s', tempdir)
115 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
116 current_process()._tempdir = tempdir
117 return current_process()._tempdir
118
119#
120# Support for reinitialization of objects when bootstrapping a child process
121#
122
123_afterfork_registry = weakref.WeakValueDictionary()
124_afterfork_counter = itertools.count()
125
126def _run_after_forkers():
127 items = list(_afterfork_registry.items())
128 items.sort()
129 for (index, ident, func), obj in items:
130 try:
131 func(obj)
132 except Exception as e:
133 info('after forker raised exception %s', e)
134
135def register_after_fork(obj, func):
136 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
137
138#
139# Finalization using weakrefs
140#
141
142_finalizer_registry = {}
143_finalizer_counter = itertools.count()
144
145
146class Finalize(object):
147 '''
148 Class which supports object finalization using weakrefs
149 '''
150 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
151 assert exitpriority is None or type(exitpriority) is int
152
153 if obj is not None:
154 self._weakref = weakref.ref(obj, self)
155 else:
156 assert exitpriority is not None
157
158 self._callback = callback
159 self._args = args
160 self._kwargs = kwargs or {}
161 self._key = (exitpriority, next(_finalizer_counter))
162
163 _finalizer_registry[self._key] = self
164
Antoine Pitrou71a28a92011-07-09 01:03:00 +0200165 def __call__(self, wr=None,
166 # Need to bind these locally because the globals can have
167 # been cleared at shutdown
168 _finalizer_registry=_finalizer_registry,
169 sub_debug=sub_debug):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 '''
171 Run the callback unless it has already been called or cancelled
172 '''
173 try:
174 del _finalizer_registry[self._key]
175 except KeyError:
176 sub_debug('finalizer no longer registered')
177 else:
178 sub_debug('finalizer calling %s with args %s and kwargs %s',
179 self._callback, self._args, self._kwargs)
180 res = self._callback(*self._args, **self._kwargs)
181 self._weakref = self._callback = self._args = \
182 self._kwargs = self._key = None
183 return res
184
185 def cancel(self):
186 '''
187 Cancel finalization of the object
188 '''
189 try:
190 del _finalizer_registry[self._key]
191 except KeyError:
192 pass
193 else:
194 self._weakref = self._callback = self._args = \
195 self._kwargs = self._key = None
196
197 def still_active(self):
198 '''
199 Return whether this finalizer is still waiting to invoke callback
200 '''
201 return self._key in _finalizer_registry
202
203 def __repr__(self):
204 try:
205 obj = self._weakref()
206 except (AttributeError, TypeError):
207 obj = None
208
209 if obj is None:
210 return '<Finalize object, dead>'
211
212 x = '<Finalize object, callback=%s' % \
213 getattr(self._callback, '__name__', self._callback)
214 if self._args:
215 x += ', args=' + str(self._args)
216 if self._kwargs:
217 x += ', kwargs=' + str(self._kwargs)
218 if self._key[0] is not None:
219 x += ', exitprority=' + str(self._key[0])
220 return x + '>'
221
222
223def _run_finalizers(minpriority=None):
224 '''
225 Run all finalizers whose exit priority is not None and at least minpriority
226
227 Finalizers with highest priority are called first; finalizers with
228 the same priority will be called in reverse order of creation.
229 '''
230 if minpriority is None:
231 f = lambda p : p[0][0] is not None
232 else:
233 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
234
235 items = [x for x in list(_finalizer_registry.items()) if f(x)]
236 items.sort(reverse=True)
237
238 for key, finalizer in items:
239 sub_debug('calling %s', finalizer)
240 try:
241 finalizer()
242 except Exception:
243 import traceback
244 traceback.print_exc()
245
246 if minpriority is None:
247 _finalizer_registry.clear()
248
249#
250# Clean up on exit
251#
252
253def is_exiting():
254 '''
255 Returns true if the process is shutting down
256 '''
257 return _exiting or _exiting is None
258
259_exiting = False
260
261def _exit_function():
262 global _exiting
263
264 info('process shutting down')
265 debug('running all "atexit" finalizers with priority >= 0')
266 _run_finalizers(0)
267
268 for p in active_children():
269 if p._daemonic:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000270 info('calling terminate() for daemon %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 p._popen.terminate()
272
273 for p in active_children():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000274 info('calling join() for process %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275 p.join()
276
277 debug('running the remaining "atexit" finalizers')
278 _run_finalizers()
279
280atexit.register(_exit_function)
281
282#
283# Some fork aware types
284#
285
286class ForkAwareThreadLock(object):
287 def __init__(self):
288 self._lock = threading.Lock()
289 self.acquire = self._lock.acquire
290 self.release = self._lock.release
291 register_after_fork(self, ForkAwareThreadLock.__init__)
292
293class ForkAwareLocal(threading.local):
294 def __init__(self):
295 register_after_fork(self, lambda obj : obj.__dict__.clear())
296 def __reduce__(self):
297 return type(self), ()
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200298
299
300#
301# Automatic retry after EINTR
302#
303
Antoine Pitrou24d659d2011-10-23 23:49:42 +0200304def _eintr_retry(func):
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200305 @functools.wraps(func)
306 def wrapped(*args, **kwargs):
307 while True:
308 try:
309 return func(*args, **kwargs)
Antoine Pitrou24d659d2011-10-23 23:49:42 +0200310 except InterruptedError:
311 continue
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200312 return wrapped