blob: 3fe0175c327c8b5df9d0c0392cbe2af9b5324178 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing various facilities to other parts of the package
3#
4# multiprocessing/util.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import itertools
10import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import atexit
12import threading # we want threading to install it's
13 # cleanup function before multiprocessing does
14
15from multiprocessing.process import current_process, active_children
16
17__all__ = [
18 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
19 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
20 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
21 ]
22
23#
24# Logging
25#
26
27NOTSET = 0
28SUBDEBUG = 5
29DEBUG = 10
30INFO = 20
31SUBWARNING = 25
32
33LOGGER_NAME = 'multiprocessing'
34DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
35
36_logger = None
37_log_to_stderr = False
38
39def sub_debug(msg, *args):
40 if _logger:
41 _logger.log(SUBDEBUG, msg, *args)
42
43def debug(msg, *args):
44 if _logger:
45 _logger.log(DEBUG, msg, *args)
46
47def info(msg, *args):
48 if _logger:
49 _logger.log(INFO, msg, *args)
50
51def sub_warning(msg, *args):
52 if _logger:
53 _logger.log(SUBWARNING, msg, *args)
54
55def get_logger():
56 '''
57 Returns logger used by multiprocessing
58 '''
59 global _logger
60
61 if not _logger:
62 import logging, atexit
63
64 # XXX multiprocessing should cleanup before logging
65 if hasattr(atexit, 'unregister'):
66 atexit.unregister(_exit_function)
67 atexit.register(_exit_function)
68 else:
69 atexit._exithandlers.remove((_exit_function, (), {}))
70 atexit._exithandlers.append((_exit_function, (), {}))
71
72 _check_logger_class()
73 _logger = logging.getLogger(LOGGER_NAME)
74
75 return _logger
76
77def _check_logger_class():
78 '''
79 Make sure process name is recorded when loggers are used
80 '''
81 # XXX This function is unnecessary once logging is patched
82 import logging
83 if hasattr(logging, 'multiprocessing'):
84 return
85
86 logging._acquireLock()
87 try:
88 OldLoggerClass = logging.getLoggerClass()
89 if not getattr(OldLoggerClass, '_process_aware', False):
90 class ProcessAwareLogger(OldLoggerClass):
91 _process_aware = True
92 def makeRecord(self, *args, **kwds):
93 record = OldLoggerClass.makeRecord(self, *args, **kwds)
94 record.processName = current_process()._name
95 return record
96 logging.setLoggerClass(ProcessAwareLogger)
97 finally:
98 logging._releaseLock()
99
100def log_to_stderr(level=None):
101 '''
102 Turn on logging and add a handler which prints to stderr
103 '''
104 global _log_to_stderr
105 import logging
106 logger = get_logger()
107 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
108 handler = logging.StreamHandler()
109 handler.setFormatter(formatter)
110 logger.addHandler(handler)
111 if level is not None:
112 logger.setLevel(level)
113 _log_to_stderr = True
114
115#
116# Function returning a temp directory which will be removed on exit
117#
118
119def get_temp_dir():
120 # get name of a temp directory which will be automatically cleaned up
121 if current_process()._tempdir is None:
122 import shutil, tempfile
123 tempdir = tempfile.mkdtemp(prefix='pymp-')
124 info('created temp directory %s', tempdir)
125 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
126 current_process()._tempdir = tempdir
127 return current_process()._tempdir
128
129#
130# Support for reinitialization of objects when bootstrapping a child process
131#
132
133_afterfork_registry = weakref.WeakValueDictionary()
134_afterfork_counter = itertools.count()
135
136def _run_after_forkers():
137 items = list(_afterfork_registry.items())
138 items.sort()
139 for (index, ident, func), obj in items:
140 try:
141 func(obj)
142 except Exception as e:
143 info('after forker raised exception %s', e)
144
145def register_after_fork(obj, func):
146 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
147
148#
149# Finalization using weakrefs
150#
151
152_finalizer_registry = {}
153_finalizer_counter = itertools.count()
154
155
156class Finalize(object):
157 '''
158 Class which supports object finalization using weakrefs
159 '''
160 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
161 assert exitpriority is None or type(exitpriority) is int
162
163 if obj is not None:
164 self._weakref = weakref.ref(obj, self)
165 else:
166 assert exitpriority is not None
167
168 self._callback = callback
169 self._args = args
170 self._kwargs = kwargs or {}
171 self._key = (exitpriority, next(_finalizer_counter))
172
173 _finalizer_registry[self._key] = self
174
175 def __call__(self, wr=None):
176 '''
177 Run the callback unless it has already been called or cancelled
178 '''
179 try:
180 del _finalizer_registry[self._key]
181 except KeyError:
182 sub_debug('finalizer no longer registered')
183 else:
184 sub_debug('finalizer calling %s with args %s and kwargs %s',
185 self._callback, self._args, self._kwargs)
186 res = self._callback(*self._args, **self._kwargs)
187 self._weakref = self._callback = self._args = \
188 self._kwargs = self._key = None
189 return res
190
191 def cancel(self):
192 '''
193 Cancel finalization of the object
194 '''
195 try:
196 del _finalizer_registry[self._key]
197 except KeyError:
198 pass
199 else:
200 self._weakref = self._callback = self._args = \
201 self._kwargs = self._key = None
202
203 def still_active(self):
204 '''
205 Return whether this finalizer is still waiting to invoke callback
206 '''
207 return self._key in _finalizer_registry
208
209 def __repr__(self):
210 try:
211 obj = self._weakref()
212 except (AttributeError, TypeError):
213 obj = None
214
215 if obj is None:
216 return '<Finalize object, dead>'
217
218 x = '<Finalize object, callback=%s' % \
219 getattr(self._callback, '__name__', self._callback)
220 if self._args:
221 x += ', args=' + str(self._args)
222 if self._kwargs:
223 x += ', kwargs=' + str(self._kwargs)
224 if self._key[0] is not None:
225 x += ', exitprority=' + str(self._key[0])
226 return x + '>'
227
228
229def _run_finalizers(minpriority=None):
230 '''
231 Run all finalizers whose exit priority is not None and at least minpriority
232
233 Finalizers with highest priority are called first; finalizers with
234 the same priority will be called in reverse order of creation.
235 '''
236 if minpriority is None:
237 f = lambda p : p[0][0] is not None
238 else:
239 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
240
241 items = [x for x in list(_finalizer_registry.items()) if f(x)]
242 items.sort(reverse=True)
243
244 for key, finalizer in items:
245 sub_debug('calling %s', finalizer)
246 try:
247 finalizer()
248 except Exception:
249 import traceback
250 traceback.print_exc()
251
252 if minpriority is None:
253 _finalizer_registry.clear()
254
255#
256# Clean up on exit
257#
258
259def is_exiting():
260 '''
261 Returns true if the process is shutting down
262 '''
263 return _exiting or _exiting is None
264
265_exiting = False
266
267def _exit_function():
268 global _exiting
269
270 info('process shutting down')
271 debug('running all "atexit" finalizers with priority >= 0')
272 _run_finalizers(0)
273
274 for p in active_children():
275 if p._daemonic:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000276 info('calling terminate() for daemon %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277 p._popen.terminate()
278
279 for p in active_children():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 info('calling join() for process %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.join()
282
283 debug('running the remaining "atexit" finalizers')
284 _run_finalizers()
285
286atexit.register(_exit_function)
287
288#
289# Some fork aware types
290#
291
292class ForkAwareThreadLock(object):
293 def __init__(self):
294 self._lock = threading.Lock()
295 self.acquire = self._lock.acquire
296 self.release = self._lock.release
297 register_after_fork(self, ForkAwareThreadLock.__init__)
298
299class ForkAwareLocal(threading.local):
300 def __init__(self):
301 register_after_fork(self, lambda obj : obj.__dict__.clear())
302 def __reduce__(self):
303 return type(self), ()