blob: 4e24d6a10fa28dcfde4e9f9e0040108be8940d14 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import os
36import sys
37import signal
38
39from multiprocessing import util, process
40
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
43#
44# Check that the current thread is spawning a child process
45#
46
47def assert_spawning(self):
48 if not Popen.thread_is_spawning():
49 raise RuntimeError(
50 '%s objects should only be shared between processes'
51 ' through inheritance' % type(self).__name__
52 )
53
54#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000055# Try making some callable types picklable
56#
57
58from pickle import _Pickler as Pickler
59class ForkingPickler(Pickler):
60 dispatch = Pickler.dispatch.copy()
61 @classmethod
62 def register(cls, type, reduce):
63 def dispatcher(self, obj):
64 rv = reduce(obj)
65 if isinstance(rv, str):
66 self.save_global(obj, rv)
67 else:
68 self.save_reduce(obj=obj, *rv)
69 cls.dispatch[type] = dispatcher
70
71def _reduce_method(m):
72 if m.__self__ is None:
73 return getattr, (m.__class__, m.__func__.__name__)
74 else:
75 return getattr, (m.__self__, m.__func__.__name__)
76class _C:
77 def f(self):
78 pass
79ForkingPickler.register(type(_C().f), _reduce_method)
80
81
82def _reduce_method_descriptor(m):
83 return getattr, (m.__objclass__, m.__name__)
84ForkingPickler.register(type(list.append), _reduce_method_descriptor)
85ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
86
87try:
88 from functools import partial
89except ImportError:
90 pass
91else:
92 def _reduce_partial(p):
93 return _rebuild_partial, (p.func, p.args, p.keywords or {})
94 def _rebuild_partial(func, args, keywords):
95 return partial(func, *args, **keywords)
96 ForkingPickler.register(partial, _reduce_partial)
97
98#
Benjamin Petersone711caf2008-06-11 16:44:04 +000099# Unix
100#
101
102if sys.platform != 'win32':
103 import time
104
105 exit = os._exit
106 duplicate = os.dup
107 close = os.close
108
109 #
110 # We define a Popen class similar to the one from subprocess, but
111 # whose constructor takes a process object as its argument.
112 #
113
114 class Popen(object):
115
116 def __init__(self, process_obj):
117 sys.stdout.flush()
118 sys.stderr.flush()
119 self.returncode = None
120
121 self.pid = os.fork()
122 if self.pid == 0:
123 if 'random' in sys.modules:
124 import random
125 random.seed()
126 code = process_obj._bootstrap()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127 os._exit(code)
128
129 def poll(self, flag=os.WNOHANG):
130 if self.returncode is None:
Florent Xicluna998171f2010-03-08 13:32:17 +0000131 try:
132 pid, sts = os.waitpid(self.pid, flag)
133 except os.error:
134 # Child process not yet created. See #1731717
135 # e.errno == errno.ECHILD == 10
136 return None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 if pid == self.pid:
138 if os.WIFSIGNALED(sts):
139 self.returncode = -os.WTERMSIG(sts)
140 else:
141 assert os.WIFEXITED(sts)
142 self.returncode = os.WEXITSTATUS(sts)
143 return self.returncode
144
145 def wait(self, timeout=None):
146 if timeout is None:
147 return self.poll(0)
148 deadline = time.time() + timeout
149 delay = 0.0005
150 while 1:
151 res = self.poll()
152 if res is not None:
153 break
154 remaining = deadline - time.time()
155 if remaining <= 0:
156 break
157 delay = min(delay * 2, remaining, 0.05)
158 time.sleep(delay)
159 return res
160
161 def terminate(self):
162 if self.returncode is None:
163 try:
164 os.kill(self.pid, signal.SIGTERM)
165 except OSError as e:
166 if self.wait(timeout=0.1) is None:
167 raise
168
169 @staticmethod
170 def thread_is_spawning():
171 return False
172
173#
174# Windows
175#
176
177else:
178 import _thread
179 import msvcrt
180 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181 import time
182
Jesse Nollerf70a5382009-01-18 19:44:02 +0000183 from pickle import dump, load, HIGHEST_PROTOCOL
Brian Curtina6a32742010-08-04 15:47:24 +0000184 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 from .util import Finalize
186
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000187 def dump(obj, file, protocol=None):
188 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189
190 #
191 #
192 #
193
194 TERMINATE = 0x10000
195 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
brian.curtine2f29982011-04-11 17:56:23 -0500196 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197
198 exit = win32.ExitProcess
199 close = win32.CloseHandle
200
201 #
202 # _python_exe is the assumed path to the python executable.
203 # People embedding Python want to modify it.
204 #
205
brian.curtine2f29982011-04-11 17:56:23 -0500206 if WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
208 else:
209 _python_exe = sys.executable
210
211 def set_executable(exe):
212 global _python_exe
213 _python_exe = exe
214
215 #
216 #
217 #
218
219 def duplicate(handle, target_process=None, inheritable=False):
220 if target_process is None:
221 target_process = _subprocess.GetCurrentProcess()
222 return _subprocess.DuplicateHandle(
223 _subprocess.GetCurrentProcess(), handle, target_process,
224 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
225 ).Detach()
226
227 #
228 # We define a Popen class similar to the one from subprocess, but
229 # whose constructor takes a process object as its argument.
230 #
231
232 class Popen(object):
233 '''
234 Start a subprocess to run the code of a process object
235 '''
236 _tls = _thread._local()
237
238 def __init__(self, process_obj):
239 # create pipe for communication with child
240 rfd, wfd = os.pipe()
241
242 # get handle for read end of the pipe and make it inheritable
243 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
244 os.close(rfd)
245
246 # start process
247 cmd = get_command_line() + [rhandle]
248 cmd = ' '.join('"%s"' % x for x in cmd)
249 hp, ht, pid, tid = _subprocess.CreateProcess(
250 _python_exe, cmd, None, None, 1, 0, None, None, None
251 )
252 ht.Close()
253 close(rhandle)
254
255 # set attributes of self
256 self.pid = pid
257 self.returncode = None
258 self._handle = hp
259
260 # send information to child
261 prep_data = get_preparation_data(process_obj._name)
262 to_child = os.fdopen(wfd, 'wb')
263 Popen._tls.process_handle = int(hp)
264 try:
265 dump(prep_data, to_child, HIGHEST_PROTOCOL)
266 dump(process_obj, to_child, HIGHEST_PROTOCOL)
267 finally:
268 del Popen._tls.process_handle
269 to_child.close()
270
271 @staticmethod
272 def thread_is_spawning():
273 return getattr(Popen._tls, 'process_handle', None) is not None
274
275 @staticmethod
276 def duplicate_for_child(handle):
277 return duplicate(handle, Popen._tls.process_handle)
278
279 def wait(self, timeout=None):
280 if self.returncode is None:
281 if timeout is None:
282 msecs = _subprocess.INFINITE
283 else:
284 msecs = max(0, int(timeout * 1000 + 0.5))
285
286 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
287 if res == _subprocess.WAIT_OBJECT_0:
288 code = _subprocess.GetExitCodeProcess(self._handle)
289 if code == TERMINATE:
290 code = -signal.SIGTERM
291 self.returncode = code
292
293 return self.returncode
294
295 def poll(self):
296 return self.wait(timeout=0)
297
298 def terminate(self):
299 if self.returncode is None:
300 try:
301 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
302 except WindowsError:
303 if self.wait(timeout=0.1) is None:
304 raise
305
306 #
307 #
308 #
309
310 def is_forking(argv):
311 '''
312 Return whether commandline indicates we are forking
313 '''
314 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
315 assert len(argv) == 3
316 return True
317 else:
318 return False
319
320
321 def freeze_support():
322 '''
323 Run code for process object if this in not the main process
324 '''
325 if is_forking(sys.argv):
326 main()
327 sys.exit()
328
329
330 def get_command_line():
331 '''
332 Returns prefix of command line used for spawning a child process
333 '''
334 if process.current_process()._identity==() and is_forking(sys.argv):
335 raise RuntimeError('''
336 Attempt to start a new process before the current process
337 has finished its bootstrapping phase.
338
339 This probably means that you are on Windows and you have
340 forgotten to use the proper idiom in the main module:
341
342 if __name__ == '__main__':
343 freeze_support()
344 ...
345
346 The "freeze_support()" line can be omitted if the program
347 is not going to be frozen to produce a Windows executable.''')
348
349 if getattr(sys, 'frozen', False):
350 return [sys.executable, '--multiprocessing-fork']
351 else:
352 prog = 'from multiprocessing.forking import main; main()'
353 return [_python_exe, '-c', prog, '--multiprocessing-fork']
354
355
356 def main():
357 '''
358 Run code specifed by data received over pipe
359 '''
360 assert is_forking(sys.argv)
361
362 handle = int(sys.argv[-1])
363 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
364 from_parent = os.fdopen(fd, 'rb')
365
366 process.current_process()._inheriting = True
367 preparation_data = load(from_parent)
368 prepare(preparation_data)
369 self = load(from_parent)
370 process.current_process()._inheriting = False
371
372 from_parent.close()
373
374 exitcode = self._bootstrap()
375 exit(exitcode)
376
377
378 def get_preparation_data(name):
379 '''
380 Return info about parent needed by child to unpickle process object
381 '''
382 from .util import _logger, _log_to_stderr
383
384 d = dict(
385 name=name,
386 sys_path=sys.path,
387 sys_argv=sys.argv,
388 log_to_stderr=_log_to_stderr,
389 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000390 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000391 )
392
393 if _logger is not None:
394 d['log_level'] = _logger.getEffectiveLevel()
395
brian.curtine2f29982011-04-11 17:56:23 -0500396 if not WINEXE and not WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 main_path = getattr(sys.modules['__main__'], '__file__', None)
398 if not main_path and sys.argv[0] not in ('', '-c'):
399 main_path = sys.argv[0]
400 if main_path is not None:
401 if not os.path.isabs(main_path) and \
402 process.ORIGINAL_DIR is not None:
403 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
404 d['main_path'] = os.path.normpath(main_path)
405
406 return d
407
408 #
409 # Make (Pipe)Connection picklable
410 #
411
412 def reduce_connection(conn):
413 if not Popen.thread_is_spawning():
414 raise RuntimeError(
415 'By default %s objects can only be shared between processes\n'
416 'using inheritance' % type(conn).__name__
417 )
418 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
419 conn.readable, conn.writable)
420
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000421 ForkingPickler.register(Connection, reduce_connection)
422 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423
424#
425# Prepare current process
426#
427
428old_main_modules = []
429
430def prepare(data):
431 '''
432 Try to get current process ready to unpickle process object
433 '''
434 old_main_modules.append(sys.modules['__main__'])
435
436 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000437 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000438
439 if 'authkey' in data:
440 process.current_process()._authkey = data['authkey']
441
442 if 'log_to_stderr' in data and data['log_to_stderr']:
443 util.log_to_stderr()
444
445 if 'log_level' in data:
446 util.get_logger().setLevel(data['log_level'])
447
448 if 'sys_path' in data:
449 sys.path = data['sys_path']
450
451 if 'sys_argv' in data:
452 sys.argv = data['sys_argv']
453
454 if 'dir' in data:
455 os.chdir(data['dir'])
456
457 if 'orig_dir' in data:
458 process.ORIGINAL_DIR = data['orig_dir']
459
460 if 'main_path' in data:
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000461 # XXX (ncoghlan): The following code makes several bogus
462 # assumptions regarding the relationship between __file__
463 # and a module's real name. See PEP 302 and issue #10845
Benjamin Petersone711caf2008-06-11 16:44:04 +0000464 main_path = data['main_path']
465 main_name = os.path.splitext(os.path.basename(main_path))[0]
466 if main_name == '__init__':
467 main_name = os.path.basename(os.path.dirname(main_path))
468
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000469 if main_name == '__main__':
470 main_module = sys.modules['__main__']
471 main_module.__file__ = main_path
472 elif main_name != 'ipython':
473 # Main modules not actually called __main__.py may
474 # contain additional code that should still be executed
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475 import imp
476
477 if main_path is None:
478 dirs = None
479 elif os.path.basename(main_path).startswith('__init__.py'):
480 dirs = [os.path.dirname(os.path.dirname(main_path))]
481 else:
482 dirs = [os.path.dirname(main_path)]
483
484 assert main_name not in sys.modules, main_name
485 file, path_name, etc = imp.find_module(main_name, dirs)
486 try:
487 # We would like to do "imp.load_module('__main__', ...)"
488 # here. However, that would cause 'if __name__ ==
489 # "__main__"' clauses to be executed.
490 main_module = imp.load_module(
491 '__parents_main__', file, path_name, etc
492 )
493 finally:
494 if file:
495 file.close()
496
497 sys.modules['__main__'] = main_module
498 main_module.__name__ = '__main__'
499
500 # Try to make the potentially picklable objects in
501 # sys.modules['__main__'] realize they are in the main
502 # module -- somewhat ugly.
503 for obj in list(main_module.__dict__.values()):
504 try:
505 if obj.__module__ == '__parents_main__':
506 obj.__module__ = '__main__'
507 except Exception:
508 pass