blob: ac9d79e71d70f9bc2c23642877e9762e7e2b8045 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing various facilities to other parts of the package
3#
4# multiprocessing/util.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import itertools
10import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import atexit
12import threading # we want threading to install it's
13 # cleanup function before multiprocessing does
14
15from multiprocessing.process import current_process, active_children
16
17__all__ = [
18 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
19 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
20 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
21 ]
22
23#
24# Logging
25#
26
27NOTSET = 0
28SUBDEBUG = 5
29DEBUG = 10
30INFO = 20
31SUBWARNING = 25
32
33LOGGER_NAME = 'multiprocessing'
34DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
35
36_logger = None
37_log_to_stderr = False
38
39def sub_debug(msg, *args):
40 if _logger:
41 _logger.log(SUBDEBUG, msg, *args)
42
43def debug(msg, *args):
44 if _logger:
45 _logger.log(DEBUG, msg, *args)
46
47def info(msg, *args):
48 if _logger:
49 _logger.log(INFO, msg, *args)
50
51def sub_warning(msg, *args):
52 if _logger:
53 _logger.log(SUBWARNING, msg, *args)
54
55def get_logger():
56 '''
57 Returns logger used by multiprocessing
58 '''
59 global _logger
60
61 if not _logger:
62 import logging, atexit
63
64 # XXX multiprocessing should cleanup before logging
65 if hasattr(atexit, 'unregister'):
66 atexit.unregister(_exit_function)
67 atexit.register(_exit_function)
68 else:
69 atexit._exithandlers.remove((_exit_function, (), {}))
70 atexit._exithandlers.append((_exit_function, (), {}))
71
Benjamin Petersone711caf2008-06-11 16:44:04 +000072 _logger = logging.getLogger(LOGGER_NAME)
73
74 return _logger
75
Benjamin Petersone711caf2008-06-11 16:44:04 +000076def log_to_stderr(level=None):
77 '''
78 Turn on logging and add a handler which prints to stderr
79 '''
80 global _log_to_stderr
81 import logging
82 logger = get_logger()
83 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
84 handler = logging.StreamHandler()
85 handler.setFormatter(formatter)
86 logger.addHandler(handler)
87 if level is not None:
88 logger.setLevel(level)
89 _log_to_stderr = True
90
91#
92# Function returning a temp directory which will be removed on exit
93#
94
95def get_temp_dir():
96 # get name of a temp directory which will be automatically cleaned up
97 if current_process()._tempdir is None:
98 import shutil, tempfile
99 tempdir = tempfile.mkdtemp(prefix='pymp-')
100 info('created temp directory %s', tempdir)
101 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
102 current_process()._tempdir = tempdir
103 return current_process()._tempdir
104
105#
106# Support for reinitialization of objects when bootstrapping a child process
107#
108
109_afterfork_registry = weakref.WeakValueDictionary()
110_afterfork_counter = itertools.count()
111
112def _run_after_forkers():
113 items = list(_afterfork_registry.items())
114 items.sort()
115 for (index, ident, func), obj in items:
116 try:
117 func(obj)
118 except Exception as e:
119 info('after forker raised exception %s', e)
120
121def register_after_fork(obj, func):
122 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
123
124#
125# Finalization using weakrefs
126#
127
128_finalizer_registry = {}
129_finalizer_counter = itertools.count()
130
131
132class Finalize(object):
133 '''
134 Class which supports object finalization using weakrefs
135 '''
136 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
137 assert exitpriority is None or type(exitpriority) is int
138
139 if obj is not None:
140 self._weakref = weakref.ref(obj, self)
141 else:
142 assert exitpriority is not None
143
144 self._callback = callback
145 self._args = args
146 self._kwargs = kwargs or {}
147 self._key = (exitpriority, next(_finalizer_counter))
148
149 _finalizer_registry[self._key] = self
150
151 def __call__(self, wr=None):
152 '''
153 Run the callback unless it has already been called or cancelled
154 '''
155 try:
156 del _finalizer_registry[self._key]
157 except KeyError:
158 sub_debug('finalizer no longer registered')
159 else:
160 sub_debug('finalizer calling %s with args %s and kwargs %s',
161 self._callback, self._args, self._kwargs)
162 res = self._callback(*self._args, **self._kwargs)
163 self._weakref = self._callback = self._args = \
164 self._kwargs = self._key = None
165 return res
166
167 def cancel(self):
168 '''
169 Cancel finalization of the object
170 '''
171 try:
172 del _finalizer_registry[self._key]
173 except KeyError:
174 pass
175 else:
176 self._weakref = self._callback = self._args = \
177 self._kwargs = self._key = None
178
179 def still_active(self):
180 '''
181 Return whether this finalizer is still waiting to invoke callback
182 '''
183 return self._key in _finalizer_registry
184
185 def __repr__(self):
186 try:
187 obj = self._weakref()
188 except (AttributeError, TypeError):
189 obj = None
190
191 if obj is None:
192 return '<Finalize object, dead>'
193
194 x = '<Finalize object, callback=%s' % \
195 getattr(self._callback, '__name__', self._callback)
196 if self._args:
197 x += ', args=' + str(self._args)
198 if self._kwargs:
199 x += ', kwargs=' + str(self._kwargs)
200 if self._key[0] is not None:
201 x += ', exitprority=' + str(self._key[0])
202 return x + '>'
203
204
205def _run_finalizers(minpriority=None):
206 '''
207 Run all finalizers whose exit priority is not None and at least minpriority
208
209 Finalizers with highest priority are called first; finalizers with
210 the same priority will be called in reverse order of creation.
211 '''
212 if minpriority is None:
213 f = lambda p : p[0][0] is not None
214 else:
215 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
216
217 items = [x for x in list(_finalizer_registry.items()) if f(x)]
218 items.sort(reverse=True)
219
220 for key, finalizer in items:
221 sub_debug('calling %s', finalizer)
222 try:
223 finalizer()
224 except Exception:
225 import traceback
226 traceback.print_exc()
227
228 if minpriority is None:
229 _finalizer_registry.clear()
230
231#
232# Clean up on exit
233#
234
235def is_exiting():
236 '''
237 Returns true if the process is shutting down
238 '''
239 return _exiting or _exiting is None
240
241_exiting = False
242
243def _exit_function():
244 global _exiting
245
246 info('process shutting down')
247 debug('running all "atexit" finalizers with priority >= 0')
248 _run_finalizers(0)
249
250 for p in active_children():
251 if p._daemonic:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000252 info('calling terminate() for daemon %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253 p._popen.terminate()
254
255 for p in active_children():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000256 info('calling join() for process %s', p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257 p.join()
258
259 debug('running the remaining "atexit" finalizers')
260 _run_finalizers()
261
262atexit.register(_exit_function)
263
264#
265# Some fork aware types
266#
267
268class ForkAwareThreadLock(object):
269 def __init__(self):
270 self._lock = threading.Lock()
271 self.acquire = self._lock.acquire
272 self.release = self._lock.release
273 register_after_fork(self, ForkAwareThreadLock.__init__)
274
275class ForkAwareLocal(threading.local):
276 def __init__(self):
277 register_after_fork(self, lambda obj : obj.__dict__.clear())
278 def __reduce__(self):
279 return type(self), ()