blob: 7c710810568e599f332fe4eccc96841ea806af56 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing various facilities to other parts of the package
3#
4# multiprocessing/util.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
Richard Oudkerk77c84f22012-05-18 14:28:02 +010010import sys
Antoine Pitrou176f07d2011-06-06 19:35:31 +020011import functools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import itertools
13import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import atexit
15import threading # we want threading to install it's
16 # cleanup function before multiprocessing does
17
18from multiprocessing.process import current_process, active_children
19
20__all__ = [
21 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
22 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
Jesse Noller41faa542009-01-25 03:45:53 +000023 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
24 'SUBDEBUG', 'SUBWARNING',
Benjamin Petersone711caf2008-06-11 16:44:04 +000025 ]
26
27#
28# Logging
29#
30
31NOTSET = 0
32SUBDEBUG = 5
33DEBUG = 10
34INFO = 20
35SUBWARNING = 25
36
37LOGGER_NAME = 'multiprocessing'
38DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
39
40_logger = None
41_log_to_stderr = False
42
43def sub_debug(msg, *args):
44 if _logger:
45 _logger.log(SUBDEBUG, msg, *args)
46
47def debug(msg, *args):
48 if _logger:
49 _logger.log(DEBUG, msg, *args)
50
51def info(msg, *args):
52 if _logger:
53 _logger.log(INFO, msg, *args)
54
55def sub_warning(msg, *args):
56 if _logger:
57 _logger.log(SUBWARNING, msg, *args)
58
59def get_logger():
60 '''
61 Returns logger used by multiprocessing
62 '''
63 global _logger
Florent Xicluna04842a82011-11-11 20:05:50 +010064 import logging
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
Jesse Noller41faa542009-01-25 03:45:53 +000066 logging._acquireLock()
67 try:
68 if not _logger:
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
Jesse Noller41faa542009-01-25 03:45:53 +000070 _logger = logging.getLogger(LOGGER_NAME)
71 _logger.propagate = 0
72 logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
73 logging.addLevelName(SUBWARNING, 'SUBWARNING')
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
Jesse Noller41faa542009-01-25 03:45:53 +000075 # XXX multiprocessing should cleanup before logging
76 if hasattr(atexit, 'unregister'):
77 atexit.unregister(_exit_function)
78 atexit.register(_exit_function)
79 else:
80 atexit._exithandlers.remove((_exit_function, (), {}))
81 atexit._exithandlers.append((_exit_function, (), {}))
82
83 finally:
84 logging._releaseLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +000085
86 return _logger
87
Benjamin Petersone711caf2008-06-11 16:44:04 +000088def log_to_stderr(level=None):
89 '''
90 Turn on logging and add a handler which prints to stderr
91 '''
92 global _log_to_stderr
93 import logging
Jesse Noller41faa542009-01-25 03:45:53 +000094
Benjamin Petersone711caf2008-06-11 16:44:04 +000095 logger = get_logger()
96 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
97 handler = logging.StreamHandler()
98 handler.setFormatter(formatter)
99 logger.addHandler(handler)
Jesse Noller41faa542009-01-25 03:45:53 +0000100
101 if level:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102 logger.setLevel(level)
103 _log_to_stderr = True
Jesse Noller41faa542009-01-25 03:45:53 +0000104 return _logger
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105
106#
107# Function returning a temp directory which will be removed on exit
108#
109
110def get_temp_dir():
111 # get name of a temp directory which will be automatically cleaned up
112 if current_process()._tempdir is None:
113 import shutil, tempfile
114 tempdir = tempfile.mkdtemp(prefix='pymp-')
115 info('created temp directory %s', tempdir)
116 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
117 current_process()._tempdir = tempdir
118 return current_process()._tempdir
119
120#
121# Support for reinitialization of objects when bootstrapping a child process
122#
123
124_afterfork_registry = weakref.WeakValueDictionary()
125_afterfork_counter = itertools.count()
126
127def _run_after_forkers():
128 items = list(_afterfork_registry.items())
129 items.sort()
130 for (index, ident, func), obj in items:
131 try:
132 func(obj)
133 except Exception as e:
134 info('after forker raised exception %s', e)
135
136def register_after_fork(obj, func):
137 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
138
139#
140# Finalization using weakrefs
141#
142
143_finalizer_registry = {}
144_finalizer_counter = itertools.count()
145
146
147class Finalize(object):
148 '''
149 Class which supports object finalization using weakrefs
150 '''
151 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
152 assert exitpriority is None or type(exitpriority) is int
153
154 if obj is not None:
155 self._weakref = weakref.ref(obj, self)
156 else:
157 assert exitpriority is not None
158
159 self._callback = callback
160 self._args = args
161 self._kwargs = kwargs or {}
162 self._key = (exitpriority, next(_finalizer_counter))
163
164 _finalizer_registry[self._key] = self
165
Antoine Pitrou71a28a92011-07-09 01:03:00 +0200166 def __call__(self, wr=None,
167 # Need to bind these locally because the globals can have
168 # been cleared at shutdown
169 _finalizer_registry=_finalizer_registry,
170 sub_debug=sub_debug):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171 '''
172 Run the callback unless it has already been called or cancelled
173 '''
174 try:
175 del _finalizer_registry[self._key]
176 except KeyError:
177 sub_debug('finalizer no longer registered')
178 else:
179 sub_debug('finalizer calling %s with args %s and kwargs %s',
180 self._callback, self._args, self._kwargs)
181 res = self._callback(*self._args, **self._kwargs)
182 self._weakref = self._callback = self._args = \
183 self._kwargs = self._key = None
184 return res
185
186 def cancel(self):
187 '''
188 Cancel finalization of the object
189 '''
190 try:
191 del _finalizer_registry[self._key]
192 except KeyError:
193 pass
194 else:
195 self._weakref = self._callback = self._args = \
196 self._kwargs = self._key = None
197
198 def still_active(self):
199 '''
200 Return whether this finalizer is still waiting to invoke callback
201 '''
202 return self._key in _finalizer_registry
203
204 def __repr__(self):
205 try:
206 obj = self._weakref()
207 except (AttributeError, TypeError):
208 obj = None
209
210 if obj is None:
211 return '<Finalize object, dead>'
212
213 x = '<Finalize object, callback=%s' % \
214 getattr(self._callback, '__name__', self._callback)
215 if self._args:
216 x += ', args=' + str(self._args)
217 if self._kwargs:
218 x += ', kwargs=' + str(self._kwargs)
219 if self._key[0] is not None:
220 x += ', exitprority=' + str(self._key[0])
221 return x + '>'
222
223
224def _run_finalizers(minpriority=None):
225 '''
226 Run all finalizers whose exit priority is not None and at least minpriority
227
228 Finalizers with highest priority are called first; finalizers with
229 the same priority will be called in reverse order of creation.
230 '''
231 if minpriority is None:
232 f = lambda p : p[0][0] is not None
233 else:
234 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
235
236 items = [x for x in list(_finalizer_registry.items()) if f(x)]
237 items.sort(reverse=True)
238
239 for key, finalizer in items:
240 sub_debug('calling %s', finalizer)
241 try:
242 finalizer()
243 except Exception:
244 import traceback
245 traceback.print_exc()
246
247 if minpriority is None:
248 _finalizer_registry.clear()
249
250#
251# Clean up on exit
252#
253
254def is_exiting():
255 '''
256 Returns true if the process is shutting down
257 '''
258 return _exiting or _exiting is None
259
260_exiting = False
261
262def _exit_function():
263 global _exiting
264
265 info('process shutting down')
266 debug('running all "atexit" finalizers with priority >= 0')
267 _run_finalizers(0)
268
269 for p in active_children():
270 if p._daemonic:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000271 info('calling terminate() for daemon %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272 p._popen.terminate()
273
274 for p in active_children():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 info('calling join() for process %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276 p.join()
277
278 debug('running the remaining "atexit" finalizers')
279 _run_finalizers()
280
281atexit.register(_exit_function)
282
283#
284# Some fork aware types
285#
286
287class ForkAwareThreadLock(object):
288 def __init__(self):
289 self._lock = threading.Lock()
290 self.acquire = self._lock.acquire
291 self.release = self._lock.release
292 register_after_fork(self, ForkAwareThreadLock.__init__)
293
294class ForkAwareLocal(threading.local):
295 def __init__(self):
296 register_after_fork(self, lambda obj : obj.__dict__.clear())
297 def __reduce__(self):
298 return type(self), ()
Richard Oudkerk77c84f22012-05-18 14:28:02 +0100299
300#
301# Get options for python to produce the same sys.flags
302#
303
304def _args_from_interpreter_flags():
305 """Return a list of command-line arguments reproducing the current
306 settings in sys.flags and sys.warnoptions."""
307 flag_opt_map = {
308 'debug': 'd',
309 # 'inspect': 'i',
310 # 'interactive': 'i',
311 'optimize': 'O',
312 'dont_write_bytecode': 'B',
313 'no_user_site': 's',
314 'no_site': 'S',
315 'ignore_environment': 'E',
316 'verbose': 'v',
317 'bytes_warning': 'b',
318 'quiet': 'q',
319 'hash_randomization': 'R',
320 }
321 args = []
322 for flag, opt in flag_opt_map.items():
323 v = getattr(sys.flags, flag)
324 if v > 0:
325 args.append('-' + opt * v)
326 for opt in sys.warnoptions:
327 args.append('-W' + opt)
328 return args