blob: cc7c326c077ec8594f28f8e29fcbeaad80bb709b [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import os
36import sys
37import signal
38
39from multiprocessing import util, process
40
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
43#
44# Check that the current thread is spawning a child process
45#
46
47def assert_spawning(self):
48 if not Popen.thread_is_spawning():
49 raise RuntimeError(
50 '%s objects should only be shared between processes'
51 ' through inheritance' % type(self).__name__
52 )
53
54#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000055# Try making some callable types picklable
56#
57
58from pickle import _Pickler as Pickler
59class ForkingPickler(Pickler):
60 dispatch = Pickler.dispatch.copy()
61 @classmethod
62 def register(cls, type, reduce):
63 def dispatcher(self, obj):
64 rv = reduce(obj)
65 if isinstance(rv, str):
66 self.save_global(obj, rv)
67 else:
68 self.save_reduce(obj=obj, *rv)
69 cls.dispatch[type] = dispatcher
70
71def _reduce_method(m):
72 if m.__self__ is None:
73 return getattr, (m.__class__, m.__func__.__name__)
74 else:
75 return getattr, (m.__self__, m.__func__.__name__)
76class _C:
77 def f(self):
78 pass
79ForkingPickler.register(type(_C().f), _reduce_method)
80
81
82def _reduce_method_descriptor(m):
83 return getattr, (m.__objclass__, m.__name__)
84ForkingPickler.register(type(list.append), _reduce_method_descriptor)
85ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
86
87try:
88 from functools import partial
89except ImportError:
90 pass
91else:
92 def _reduce_partial(p):
93 return _rebuild_partial, (p.func, p.args, p.keywords or {})
94 def _rebuild_partial(func, args, keywords):
95 return partial(func, *args, **keywords)
96 ForkingPickler.register(partial, _reduce_partial)
97
98#
Benjamin Petersone711caf2008-06-11 16:44:04 +000099# Unix
100#
101
102if sys.platform != 'win32':
103 import time
104
105 exit = os._exit
106 duplicate = os.dup
107 close = os.close
108
109 #
110 # We define a Popen class similar to the one from subprocess, but
111 # whose constructor takes a process object as its argument.
112 #
113
114 class Popen(object):
115
116 def __init__(self, process_obj):
117 sys.stdout.flush()
118 sys.stderr.flush()
119 self.returncode = None
120
121 self.pid = os.fork()
122 if self.pid == 0:
123 if 'random' in sys.modules:
124 import random
125 random.seed()
126 code = process_obj._bootstrap()
127 sys.stdout.flush()
128 sys.stderr.flush()
129 os._exit(code)
130
131 def poll(self, flag=os.WNOHANG):
132 if self.returncode is None:
Florent Xicluna998171f2010-03-08 13:32:17 +0000133 try:
134 pid, sts = os.waitpid(self.pid, flag)
135 except os.error:
136 # Child process not yet created. See #1731717
137 # e.errno == errno.ECHILD == 10
138 return None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139 if pid == self.pid:
140 if os.WIFSIGNALED(sts):
141 self.returncode = -os.WTERMSIG(sts)
142 else:
143 assert os.WIFEXITED(sts)
144 self.returncode = os.WEXITSTATUS(sts)
145 return self.returncode
146
147 def wait(self, timeout=None):
148 if timeout is None:
149 return self.poll(0)
150 deadline = time.time() + timeout
151 delay = 0.0005
152 while 1:
153 res = self.poll()
154 if res is not None:
155 break
156 remaining = deadline - time.time()
157 if remaining <= 0:
158 break
159 delay = min(delay * 2, remaining, 0.05)
160 time.sleep(delay)
161 return res
162
163 def terminate(self):
164 if self.returncode is None:
165 try:
166 os.kill(self.pid, signal.SIGTERM)
167 except OSError as e:
168 if self.wait(timeout=0.1) is None:
169 raise
170
171 @staticmethod
172 def thread_is_spawning():
173 return False
174
175#
176# Windows
177#
178
179else:
180 import _thread
181 import msvcrt
182 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 import time
184
Jesse Nollerf70a5382009-01-18 19:44:02 +0000185 from pickle import dump, load, HIGHEST_PROTOCOL
Brian Curtina6a32742010-08-04 15:47:24 +0000186 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 from .util import Finalize
188
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000189 def dump(obj, file, protocol=None):
190 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 #
193 #
194 #
195
196 TERMINATE = 0x10000
197 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
brian.curtine2f29982011-04-11 17:56:23 -0500198 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
200 exit = win32.ExitProcess
201 close = win32.CloseHandle
202
203 #
204 # _python_exe is the assumed path to the python executable.
205 # People embedding Python want to modify it.
206 #
207
brian.curtine2f29982011-04-11 17:56:23 -0500208 if WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
210 else:
211 _python_exe = sys.executable
212
213 def set_executable(exe):
214 global _python_exe
215 _python_exe = exe
216
217 #
218 #
219 #
220
221 def duplicate(handle, target_process=None, inheritable=False):
222 if target_process is None:
223 target_process = _subprocess.GetCurrentProcess()
224 return _subprocess.DuplicateHandle(
225 _subprocess.GetCurrentProcess(), handle, target_process,
226 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
227 ).Detach()
228
229 #
230 # We define a Popen class similar to the one from subprocess, but
231 # whose constructor takes a process object as its argument.
232 #
233
234 class Popen(object):
235 '''
236 Start a subprocess to run the code of a process object
237 '''
238 _tls = _thread._local()
239
240 def __init__(self, process_obj):
241 # create pipe for communication with child
242 rfd, wfd = os.pipe()
243
244 # get handle for read end of the pipe and make it inheritable
245 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
246 os.close(rfd)
247
248 # start process
249 cmd = get_command_line() + [rhandle]
250 cmd = ' '.join('"%s"' % x for x in cmd)
251 hp, ht, pid, tid = _subprocess.CreateProcess(
252 _python_exe, cmd, None, None, 1, 0, None, None, None
253 )
254 ht.Close()
255 close(rhandle)
256
257 # set attributes of self
258 self.pid = pid
259 self.returncode = None
260 self._handle = hp
261
262 # send information to child
263 prep_data = get_preparation_data(process_obj._name)
264 to_child = os.fdopen(wfd, 'wb')
265 Popen._tls.process_handle = int(hp)
266 try:
267 dump(prep_data, to_child, HIGHEST_PROTOCOL)
268 dump(process_obj, to_child, HIGHEST_PROTOCOL)
269 finally:
270 del Popen._tls.process_handle
271 to_child.close()
272
273 @staticmethod
274 def thread_is_spawning():
275 return getattr(Popen._tls, 'process_handle', None) is not None
276
277 @staticmethod
278 def duplicate_for_child(handle):
279 return duplicate(handle, Popen._tls.process_handle)
280
281 def wait(self, timeout=None):
282 if self.returncode is None:
283 if timeout is None:
284 msecs = _subprocess.INFINITE
285 else:
286 msecs = max(0, int(timeout * 1000 + 0.5))
287
288 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
289 if res == _subprocess.WAIT_OBJECT_0:
290 code = _subprocess.GetExitCodeProcess(self._handle)
291 if code == TERMINATE:
292 code = -signal.SIGTERM
293 self.returncode = code
294
295 return self.returncode
296
297 def poll(self):
298 return self.wait(timeout=0)
299
300 def terminate(self):
301 if self.returncode is None:
302 try:
303 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
304 except WindowsError:
305 if self.wait(timeout=0.1) is None:
306 raise
307
308 #
309 #
310 #
311
312 def is_forking(argv):
313 '''
314 Return whether commandline indicates we are forking
315 '''
316 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
317 assert len(argv) == 3
318 return True
319 else:
320 return False
321
322
323 def freeze_support():
324 '''
325 Run code for process object if this in not the main process
326 '''
327 if is_forking(sys.argv):
328 main()
329 sys.exit()
330
331
332 def get_command_line():
333 '''
334 Returns prefix of command line used for spawning a child process
335 '''
336 if process.current_process()._identity==() and is_forking(sys.argv):
337 raise RuntimeError('''
338 Attempt to start a new process before the current process
339 has finished its bootstrapping phase.
340
341 This probably means that you are on Windows and you have
342 forgotten to use the proper idiom in the main module:
343
344 if __name__ == '__main__':
345 freeze_support()
346 ...
347
348 The "freeze_support()" line can be omitted if the program
349 is not going to be frozen to produce a Windows executable.''')
350
351 if getattr(sys, 'frozen', False):
352 return [sys.executable, '--multiprocessing-fork']
353 else:
354 prog = 'from multiprocessing.forking import main; main()'
355 return [_python_exe, '-c', prog, '--multiprocessing-fork']
356
357
358 def main():
359 '''
360 Run code specifed by data received over pipe
361 '''
362 assert is_forking(sys.argv)
363
364 handle = int(sys.argv[-1])
365 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
366 from_parent = os.fdopen(fd, 'rb')
367
368 process.current_process()._inheriting = True
369 preparation_data = load(from_parent)
370 prepare(preparation_data)
371 self = load(from_parent)
372 process.current_process()._inheriting = False
373
374 from_parent.close()
375
376 exitcode = self._bootstrap()
377 exit(exitcode)
378
379
380 def get_preparation_data(name):
381 '''
382 Return info about parent needed by child to unpickle process object
383 '''
384 from .util import _logger, _log_to_stderr
385
386 d = dict(
387 name=name,
388 sys_path=sys.path,
389 sys_argv=sys.argv,
390 log_to_stderr=_log_to_stderr,
391 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000392 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393 )
394
395 if _logger is not None:
396 d['log_level'] = _logger.getEffectiveLevel()
397
brian.curtine2f29982011-04-11 17:56:23 -0500398 if not WINEXE and not WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000399 main_path = getattr(sys.modules['__main__'], '__file__', None)
400 if not main_path and sys.argv[0] not in ('', '-c'):
401 main_path = sys.argv[0]
402 if main_path is not None:
403 if not os.path.isabs(main_path) and \
404 process.ORIGINAL_DIR is not None:
405 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
406 d['main_path'] = os.path.normpath(main_path)
407
408 return d
409
410 #
411 # Make (Pipe)Connection picklable
412 #
413
414 def reduce_connection(conn):
415 if not Popen.thread_is_spawning():
416 raise RuntimeError(
417 'By default %s objects can only be shared between processes\n'
418 'using inheritance' % type(conn).__name__
419 )
420 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
421 conn.readable, conn.writable)
422
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000423 ForkingPickler.register(Connection, reduce_connection)
424 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425
426#
427# Prepare current process
428#
429
430old_main_modules = []
431
432def prepare(data):
433 '''
434 Try to get current process ready to unpickle process object
435 '''
436 old_main_modules.append(sys.modules['__main__'])
437
438 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000439 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440
441 if 'authkey' in data:
442 process.current_process()._authkey = data['authkey']
443
444 if 'log_to_stderr' in data and data['log_to_stderr']:
445 util.log_to_stderr()
446
447 if 'log_level' in data:
448 util.get_logger().setLevel(data['log_level'])
449
450 if 'sys_path' in data:
451 sys.path = data['sys_path']
452
453 if 'sys_argv' in data:
454 sys.argv = data['sys_argv']
455
456 if 'dir' in data:
457 os.chdir(data['dir'])
458
459 if 'orig_dir' in data:
460 process.ORIGINAL_DIR = data['orig_dir']
461
462 if 'main_path' in data:
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000463 # XXX (ncoghlan): The following code makes several bogus
464 # assumptions regarding the relationship between __file__
465 # and a module's real name. See PEP 302 and issue #10845
Benjamin Petersone711caf2008-06-11 16:44:04 +0000466 main_path = data['main_path']
467 main_name = os.path.splitext(os.path.basename(main_path))[0]
468 if main_name == '__init__':
469 main_name = os.path.basename(os.path.dirname(main_path))
470
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000471 if main_name == '__main__':
472 main_module = sys.modules['__main__']
473 main_module.__file__ = main_path
474 elif main_name != 'ipython':
475 # Main modules not actually called __main__.py may
476 # contain additional code that should still be executed
Benjamin Petersone711caf2008-06-11 16:44:04 +0000477 import imp
478
479 if main_path is None:
480 dirs = None
481 elif os.path.basename(main_path).startswith('__init__.py'):
482 dirs = [os.path.dirname(os.path.dirname(main_path))]
483 else:
484 dirs = [os.path.dirname(main_path)]
485
486 assert main_name not in sys.modules, main_name
487 file, path_name, etc = imp.find_module(main_name, dirs)
488 try:
489 # We would like to do "imp.load_module('__main__', ...)"
490 # here. However, that would cause 'if __name__ ==
491 # "__main__"' clauses to be executed.
492 main_module = imp.load_module(
493 '__parents_main__', file, path_name, etc
494 )
495 finally:
496 if file:
497 file.close()
498
499 sys.modules['__main__'] = main_module
500 main_module.__name__ = '__main__'
501
502 # Try to make the potentially picklable objects in
503 # sys.modules['__main__'] realize they are in the main
504 # module -- somewhat ugly.
505 for obj in list(main_module.__dict__.values()):
506 try:
507 if obj.__module__ == '__parents_main__':
508 obj.__module__ = '__main__'
509 except Exception:
510 pass