blob: a2c61ef27e3543cd64c7381ad281c00d7c524bcb [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import os
36import sys
37import signal
Antoine Pitroudd696492011-06-08 17:21:55 +020038import select
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
40from multiprocessing import util, process
41
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000042__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
44#
45# Check that the current thread is spawning a child process
46#
47
48def assert_spawning(self):
49 if not Popen.thread_is_spawning():
50 raise RuntimeError(
51 '%s objects should only be shared between processes'
52 ' through inheritance' % type(self).__name__
53 )
54
55#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000056# Try making some callable types picklable
57#
58
59from pickle import _Pickler as Pickler
60class ForkingPickler(Pickler):
61 dispatch = Pickler.dispatch.copy()
62 @classmethod
63 def register(cls, type, reduce):
64 def dispatcher(self, obj):
65 rv = reduce(obj)
66 if isinstance(rv, str):
67 self.save_global(obj, rv)
68 else:
69 self.save_reduce(obj=obj, *rv)
70 cls.dispatch[type] = dispatcher
71
72def _reduce_method(m):
73 if m.__self__ is None:
74 return getattr, (m.__class__, m.__func__.__name__)
75 else:
76 return getattr, (m.__self__, m.__func__.__name__)
77class _C:
78 def f(self):
79 pass
80ForkingPickler.register(type(_C().f), _reduce_method)
81
82
83def _reduce_method_descriptor(m):
84 return getattr, (m.__objclass__, m.__name__)
85ForkingPickler.register(type(list.append), _reduce_method_descriptor)
86ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
87
88try:
89 from functools import partial
90except ImportError:
91 pass
92else:
93 def _reduce_partial(p):
94 return _rebuild_partial, (p.func, p.args, p.keywords or {})
95 def _rebuild_partial(func, args, keywords):
96 return partial(func, *args, **keywords)
97 ForkingPickler.register(partial, _reduce_partial)
98
99#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100# Unix
101#
102
103if sys.platform != 'win32':
104 import time
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200105 import select
Benjamin Petersone711caf2008-06-11 16:44:04 +0000106
107 exit = os._exit
108 duplicate = os.dup
109 close = os.close
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200110 _select = util._eintr_retry(select.select)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000111
112 #
113 # We define a Popen class similar to the one from subprocess, but
114 # whose constructor takes a process object as its argument.
115 #
116
117 class Popen(object):
118
119 def __init__(self, process_obj):
120 sys.stdout.flush()
121 sys.stderr.flush()
122 self.returncode = None
123
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200124 r, w = os.pipe()
125 self.sentinel = r
126
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127 self.pid = os.fork()
128 if self.pid == 0:
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200129 os.close(r)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000130 if 'random' in sys.modules:
131 import random
132 random.seed()
133 code = process_obj._bootstrap()
134 sys.stdout.flush()
135 sys.stderr.flush()
136 os._exit(code)
137
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200138 # `w` will be closed when the child exits, at which point `r`
139 # will become ready for reading (using e.g. select()).
140 os.close(w)
141 util.Finalize(self, os.close, (r,))
142
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143 def poll(self, flag=os.WNOHANG):
144 if self.returncode is None:
Florent Xicluna998171f2010-03-08 13:32:17 +0000145 try:
146 pid, sts = os.waitpid(self.pid, flag)
147 except os.error:
148 # Child process not yet created. See #1731717
149 # e.errno == errno.ECHILD == 10
150 return None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 if pid == self.pid:
152 if os.WIFSIGNALED(sts):
153 self.returncode = -os.WTERMSIG(sts)
154 else:
155 assert os.WIFEXITED(sts)
156 self.returncode = os.WEXITSTATUS(sts)
157 return self.returncode
158
159 def wait(self, timeout=None):
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200160 if self.returncode is None:
161 if timeout is not None:
162 r = _select([self.sentinel], [], [], timeout)[0]
163 if not r:
164 return None
165 # This shouldn't block if select() returned successfully.
166 return self.poll(os.WNOHANG if timeout == 0.0 else 0)
167 return self.returncode
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168
169 def terminate(self):
170 if self.returncode is None:
171 try:
172 os.kill(self.pid, signal.SIGTERM)
173 except OSError as e:
174 if self.wait(timeout=0.1) is None:
175 raise
176
177 @staticmethod
178 def thread_is_spawning():
179 return False
180
181#
182# Windows
183#
184
185else:
186 import _thread
187 import msvcrt
188 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 import time
190
Jesse Nollerf70a5382009-01-18 19:44:02 +0000191 from pickle import dump, load, HIGHEST_PROTOCOL
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200192 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193 from .util import Finalize
194
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000195 def dump(obj, file, protocol=None):
196 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197
198 #
199 #
200 #
201
202 TERMINATE = 0x10000
203 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
brian.curtine2f29982011-04-11 17:56:23 -0500204 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205
206 exit = win32.ExitProcess
207 close = win32.CloseHandle
208
209 #
210 # _python_exe is the assumed path to the python executable.
211 # People embedding Python want to modify it.
212 #
213
brian.curtine2f29982011-04-11 17:56:23 -0500214 if WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
216 else:
217 _python_exe = sys.executable
218
219 def set_executable(exe):
220 global _python_exe
221 _python_exe = exe
222
223 #
224 #
225 #
226
227 def duplicate(handle, target_process=None, inheritable=False):
228 if target_process is None:
229 target_process = _subprocess.GetCurrentProcess()
230 return _subprocess.DuplicateHandle(
231 _subprocess.GetCurrentProcess(), handle, target_process,
232 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
233 ).Detach()
234
235 #
236 # We define a Popen class similar to the one from subprocess, but
237 # whose constructor takes a process object as its argument.
238 #
239
240 class Popen(object):
241 '''
242 Start a subprocess to run the code of a process object
243 '''
244 _tls = _thread._local()
245
246 def __init__(self, process_obj):
247 # create pipe for communication with child
248 rfd, wfd = os.pipe()
249
250 # get handle for read end of the pipe and make it inheritable
251 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
252 os.close(rfd)
253
254 # start process
255 cmd = get_command_line() + [rhandle]
256 cmd = ' '.join('"%s"' % x for x in cmd)
257 hp, ht, pid, tid = _subprocess.CreateProcess(
258 _python_exe, cmd, None, None, 1, 0, None, None, None
259 )
260 ht.Close()
261 close(rhandle)
262
263 # set attributes of self
264 self.pid = pid
265 self.returncode = None
266 self._handle = hp
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200267 self.sentinel = int(hp)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268
269 # send information to child
270 prep_data = get_preparation_data(process_obj._name)
271 to_child = os.fdopen(wfd, 'wb')
272 Popen._tls.process_handle = int(hp)
273 try:
274 dump(prep_data, to_child, HIGHEST_PROTOCOL)
275 dump(process_obj, to_child, HIGHEST_PROTOCOL)
276 finally:
277 del Popen._tls.process_handle
278 to_child.close()
279
280 @staticmethod
281 def thread_is_spawning():
282 return getattr(Popen._tls, 'process_handle', None) is not None
283
284 @staticmethod
285 def duplicate_for_child(handle):
286 return duplicate(handle, Popen._tls.process_handle)
287
288 def wait(self, timeout=None):
289 if self.returncode is None:
290 if timeout is None:
291 msecs = _subprocess.INFINITE
292 else:
293 msecs = max(0, int(timeout * 1000 + 0.5))
294
295 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
296 if res == _subprocess.WAIT_OBJECT_0:
297 code = _subprocess.GetExitCodeProcess(self._handle)
298 if code == TERMINATE:
299 code = -signal.SIGTERM
300 self.returncode = code
301
302 return self.returncode
303
304 def poll(self):
305 return self.wait(timeout=0)
306
307 def terminate(self):
308 if self.returncode is None:
309 try:
310 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
311 except WindowsError:
312 if self.wait(timeout=0.1) is None:
313 raise
314
315 #
316 #
317 #
318
319 def is_forking(argv):
320 '''
321 Return whether commandline indicates we are forking
322 '''
323 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
324 assert len(argv) == 3
325 return True
326 else:
327 return False
328
329
330 def freeze_support():
331 '''
332 Run code for process object if this in not the main process
333 '''
334 if is_forking(sys.argv):
335 main()
336 sys.exit()
337
338
339 def get_command_line():
340 '''
341 Returns prefix of command line used for spawning a child process
342 '''
343 if process.current_process()._identity==() and is_forking(sys.argv):
344 raise RuntimeError('''
345 Attempt to start a new process before the current process
346 has finished its bootstrapping phase.
347
348 This probably means that you are on Windows and you have
349 forgotten to use the proper idiom in the main module:
350
351 if __name__ == '__main__':
352 freeze_support()
353 ...
354
355 The "freeze_support()" line can be omitted if the program
356 is not going to be frozen to produce a Windows executable.''')
357
358 if getattr(sys, 'frozen', False):
359 return [sys.executable, '--multiprocessing-fork']
360 else:
361 prog = 'from multiprocessing.forking import main; main()'
362 return [_python_exe, '-c', prog, '--multiprocessing-fork']
363
364
365 def main():
366 '''
367 Run code specifed by data received over pipe
368 '''
369 assert is_forking(sys.argv)
370
371 handle = int(sys.argv[-1])
372 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
373 from_parent = os.fdopen(fd, 'rb')
374
375 process.current_process()._inheriting = True
376 preparation_data = load(from_parent)
377 prepare(preparation_data)
378 self = load(from_parent)
379 process.current_process()._inheriting = False
380
381 from_parent.close()
382
383 exitcode = self._bootstrap()
384 exit(exitcode)
385
386
387 def get_preparation_data(name):
388 '''
389 Return info about parent needed by child to unpickle process object
390 '''
391 from .util import _logger, _log_to_stderr
392
393 d = dict(
394 name=name,
395 sys_path=sys.path,
396 sys_argv=sys.argv,
397 log_to_stderr=_log_to_stderr,
398 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000399 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000400 )
401
402 if _logger is not None:
403 d['log_level'] = _logger.getEffectiveLevel()
404
brian.curtine2f29982011-04-11 17:56:23 -0500405 if not WINEXE and not WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000406 main_path = getattr(sys.modules['__main__'], '__file__', None)
407 if not main_path and sys.argv[0] not in ('', '-c'):
408 main_path = sys.argv[0]
409 if main_path is not None:
410 if not os.path.isabs(main_path) and \
411 process.ORIGINAL_DIR is not None:
412 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
413 d['main_path'] = os.path.normpath(main_path)
414
415 return d
416
417 #
418 # Make (Pipe)Connection picklable
419 #
420
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200421 # Late import because of circular import
422 from .connection import Connection, PipeConnection
423
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424 def reduce_connection(conn):
425 if not Popen.thread_is_spawning():
426 raise RuntimeError(
427 'By default %s objects can only be shared between processes\n'
428 'using inheritance' % type(conn).__name__
429 )
430 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
431 conn.readable, conn.writable)
432
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000433 ForkingPickler.register(Connection, reduce_connection)
434 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000435
436#
437# Prepare current process
438#
439
440old_main_modules = []
441
442def prepare(data):
443 '''
444 Try to get current process ready to unpickle process object
445 '''
446 old_main_modules.append(sys.modules['__main__'])
447
448 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000449 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000450
451 if 'authkey' in data:
452 process.current_process()._authkey = data['authkey']
453
454 if 'log_to_stderr' in data and data['log_to_stderr']:
455 util.log_to_stderr()
456
457 if 'log_level' in data:
458 util.get_logger().setLevel(data['log_level'])
459
460 if 'sys_path' in data:
461 sys.path = data['sys_path']
462
463 if 'sys_argv' in data:
464 sys.argv = data['sys_argv']
465
466 if 'dir' in data:
467 os.chdir(data['dir'])
468
469 if 'orig_dir' in data:
470 process.ORIGINAL_DIR = data['orig_dir']
471
472 if 'main_path' in data:
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000473 # XXX (ncoghlan): The following code makes several bogus
474 # assumptions regarding the relationship between __file__
475 # and a module's real name. See PEP 302 and issue #10845
Benjamin Petersone711caf2008-06-11 16:44:04 +0000476 main_path = data['main_path']
477 main_name = os.path.splitext(os.path.basename(main_path))[0]
478 if main_name == '__init__':
479 main_name = os.path.basename(os.path.dirname(main_path))
480
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000481 if main_name == '__main__':
482 main_module = sys.modules['__main__']
483 main_module.__file__ = main_path
484 elif main_name != 'ipython':
485 # Main modules not actually called __main__.py may
486 # contain additional code that should still be executed
Benjamin Petersone711caf2008-06-11 16:44:04 +0000487 import imp
488
489 if main_path is None:
490 dirs = None
491 elif os.path.basename(main_path).startswith('__init__.py'):
492 dirs = [os.path.dirname(os.path.dirname(main_path))]
493 else:
494 dirs = [os.path.dirname(main_path)]
495
496 assert main_name not in sys.modules, main_name
497 file, path_name, etc = imp.find_module(main_name, dirs)
498 try:
499 # We would like to do "imp.load_module('__main__', ...)"
500 # here. However, that would cause 'if __name__ ==
501 # "__main__"' clauses to be executed.
502 main_module = imp.load_module(
503 '__parents_main__', file, path_name, etc
504 )
505 finally:
506 if file:
507 file.close()
508
509 sys.modules['__main__'] = main_module
510 main_module.__name__ = '__main__'
511
512 # Try to make the potentially picklable objects in
513 # sys.modules['__main__'] realize they are in the main
514 # module -- somewhat ugly.
515 for obj in list(main_module.__dict__.values()):
516 try:
517 if obj.__module__ == '__parents_main__':
518 obj.__module__ = '__main__'
519 except Exception:
520 pass