blob: dc465b4ed25da82ffda457a05e83ecbfcbcb7772 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray79af2452010-12-14 01:42:40 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000033#
34
35import os
36import sys
37import signal
Richard Oudkerkba482642013-02-26 12:37:07 +000038import errno
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000039
40from multiprocessing import util, process
41
Jesse Noller13e9d582008-07-16 14:32:36 +000042__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000043
44#
45# Check that the current thread is spawning a child process
46#
47
48def assert_spawning(self):
49 if not Popen.thread_is_spawning():
50 raise RuntimeError(
51 '%s objects should only be shared between processes'
52 ' through inheritance' % type(self).__name__
53 )
54
55#
Jesse Noller13e9d582008-07-16 14:32:36 +000056# Try making some callable types picklable
57#
58
59from pickle import Pickler
60class ForkingPickler(Pickler):
61 dispatch = Pickler.dispatch.copy()
62
63 @classmethod
64 def register(cls, type, reduce):
65 def dispatcher(self, obj):
66 rv = reduce(obj)
67 self.save_reduce(obj=obj, *rv)
68 cls.dispatch[type] = dispatcher
69
70def _reduce_method(m):
71 if m.im_self is None:
72 return getattr, (m.im_class, m.im_func.func_name)
73 else:
74 return getattr, (m.im_self, m.im_func.func_name)
75ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
76
77def _reduce_method_descriptor(m):
78 return getattr, (m.__objclass__, m.__name__)
79ForkingPickler.register(type(list.append), _reduce_method_descriptor)
80ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
81
82#def _reduce_builtin_function_or_method(m):
83# return getattr, (m.__self__, m.__name__)
84#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
85#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
86
87try:
88 from functools import partial
89except ImportError:
90 pass
91else:
92 def _reduce_partial(p):
93 return _rebuild_partial, (p.func, p.args, p.keywords or {})
94 def _rebuild_partial(func, args, keywords):
95 return partial(func, *args, **keywords)
96 ForkingPickler.register(partial, _reduce_partial)
97
98#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000099# Unix
100#
101
102if sys.platform != 'win32':
103 import time
104
105 exit = os._exit
106 duplicate = os.dup
107 close = os.close
108
109 #
110 # We define a Popen class similar to the one from subprocess, but
111 # whose constructor takes a process object as its argument.
112 #
113
114 class Popen(object):
115
116 def __init__(self, process_obj):
117 sys.stdout.flush()
118 sys.stderr.flush()
119 self.returncode = None
120
121 self.pid = os.fork()
122 if self.pid == 0:
123 if 'random' in sys.modules:
124 import random
125 random.seed()
126 code = process_obj._bootstrap()
127 sys.stdout.flush()
128 sys.stderr.flush()
129 os._exit(code)
130
131 def poll(self, flag=os.WNOHANG):
132 if self.returncode is None:
Richard Oudkerkba482642013-02-26 12:37:07 +0000133 while True:
134 try:
135 pid, sts = os.waitpid(self.pid, flag)
136 except os.error as e:
137 if e.errno == errno.EINTR:
138 continue
139 # Child process not yet created. See #1731717
140 # e.errno == errno.ECHILD == 10
141 return None
142 else:
143 break
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000144 if pid == self.pid:
145 if os.WIFSIGNALED(sts):
146 self.returncode = -os.WTERMSIG(sts)
147 else:
148 assert os.WIFEXITED(sts)
149 self.returncode = os.WEXITSTATUS(sts)
150 return self.returncode
151
152 def wait(self, timeout=None):
153 if timeout is None:
154 return self.poll(0)
155 deadline = time.time() + timeout
156 delay = 0.0005
157 while 1:
158 res = self.poll()
159 if res is not None:
160 break
161 remaining = deadline - time.time()
162 if remaining <= 0:
163 break
164 delay = min(delay * 2, remaining, 0.05)
165 time.sleep(delay)
166 return res
167
168 def terminate(self):
169 if self.returncode is None:
170 try:
171 os.kill(self.pid, signal.SIGTERM)
172 except OSError, e:
173 if self.wait(timeout=0.1) is None:
174 raise
175
176 @staticmethod
177 def thread_is_spawning():
178 return False
179
180#
181# Windows
182#
183
184else:
185 import thread
186 import msvcrt
187 import _subprocess
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000188 import time
189
Jesse Noller2f8c8f42010-07-03 12:26:02 +0000190 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000191 from .util import Finalize
192
Jesse Noller13e9d582008-07-16 14:32:36 +0000193 #try:
194 # from cPickle import dump, load, HIGHEST_PROTOCOL
195 #except ImportError:
196 from pickle import load, HIGHEST_PROTOCOL
197
198 def dump(obj, file, protocol=None):
199 ForkingPickler(file, protocol).dump(obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000200
201 #
202 #
203 #
204
205 TERMINATE = 0x10000
206 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
brian.curtin40b53162011-04-11 18:00:59 -0500207 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000208
209 exit = win32.ExitProcess
210 close = win32.CloseHandle
211
212 #
213 # _python_exe is the assumed path to the python executable.
214 # People embedding Python want to modify it.
215 #
216
brian.curtin40b53162011-04-11 18:00:59 -0500217 if WINSERVICE:
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000218 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
219 else:
220 _python_exe = sys.executable
221
222 def set_executable(exe):
223 global _python_exe
224 _python_exe = exe
225
226 #
227 #
228 #
229
230 def duplicate(handle, target_process=None, inheritable=False):
231 if target_process is None:
232 target_process = _subprocess.GetCurrentProcess()
233 return _subprocess.DuplicateHandle(
234 _subprocess.GetCurrentProcess(), handle, target_process,
235 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
236 ).Detach()
237
238 #
239 # We define a Popen class similar to the one from subprocess, but
240 # whose constructor takes a process object as its argument.
241 #
242
243 class Popen(object):
244 '''
245 Start a subprocess to run the code of a process object
246 '''
247 _tls = thread._local()
248
249 def __init__(self, process_obj):
250 # create pipe for communication with child
251 rfd, wfd = os.pipe()
252
253 # get handle for read end of the pipe and make it inheritable
254 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
255 os.close(rfd)
256
257 # start process
258 cmd = get_command_line() + [rhandle]
259 cmd = ' '.join('"%s"' % x for x in cmd)
260 hp, ht, pid, tid = _subprocess.CreateProcess(
261 _python_exe, cmd, None, None, 1, 0, None, None, None
262 )
263 ht.Close()
264 close(rhandle)
265
266 # set attributes of self
267 self.pid = pid
268 self.returncode = None
269 self._handle = hp
270
271 # send information to child
272 prep_data = get_preparation_data(process_obj._name)
273 to_child = os.fdopen(wfd, 'wb')
274 Popen._tls.process_handle = int(hp)
275 try:
276 dump(prep_data, to_child, HIGHEST_PROTOCOL)
277 dump(process_obj, to_child, HIGHEST_PROTOCOL)
278 finally:
279 del Popen._tls.process_handle
280 to_child.close()
281
282 @staticmethod
283 def thread_is_spawning():
284 return getattr(Popen._tls, 'process_handle', None) is not None
285
286 @staticmethod
287 def duplicate_for_child(handle):
288 return duplicate(handle, Popen._tls.process_handle)
289
290 def wait(self, timeout=None):
291 if self.returncode is None:
292 if timeout is None:
293 msecs = _subprocess.INFINITE
294 else:
295 msecs = max(0, int(timeout * 1000 + 0.5))
296
297 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
298 if res == _subprocess.WAIT_OBJECT_0:
299 code = _subprocess.GetExitCodeProcess(self._handle)
300 if code == TERMINATE:
301 code = -signal.SIGTERM
302 self.returncode = code
303
304 return self.returncode
305
306 def poll(self):
307 return self.wait(timeout=0)
308
309 def terminate(self):
310 if self.returncode is None:
311 try:
312 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
313 except WindowsError:
314 if self.wait(timeout=0.1) is None:
315 raise
316
317 #
318 #
319 #
320
321 def is_forking(argv):
322 '''
323 Return whether commandline indicates we are forking
324 '''
325 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
326 assert len(argv) == 3
327 return True
328 else:
329 return False
330
331
332 def freeze_support():
333 '''
334 Run code for process object if this in not the main process
335 '''
336 if is_forking(sys.argv):
337 main()
338 sys.exit()
339
340
341 def get_command_line():
342 '''
343 Returns prefix of command line used for spawning a child process
344 '''
Richard Oudkerkfaee75c2012-08-14 11:41:19 +0100345 if getattr(process.current_process(), '_inheriting', False):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000346 raise RuntimeError('''
347 Attempt to start a new process before the current process
348 has finished its bootstrapping phase.
349
350 This probably means that you are on Windows and you have
351 forgotten to use the proper idiom in the main module:
352
353 if __name__ == '__main__':
354 freeze_support()
355 ...
356
357 The "freeze_support()" line can be omitted if the program
358 is not going to be frozen to produce a Windows executable.''')
359
360 if getattr(sys, 'frozen', False):
361 return [sys.executable, '--multiprocessing-fork']
362 else:
363 prog = 'from multiprocessing.forking import main; main()'
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -0700364 opts = util._args_from_interpreter_flags()
365 return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000366
367
368 def main():
369 '''
370 Run code specifed by data received over pipe
371 '''
372 assert is_forking(sys.argv)
373
374 handle = int(sys.argv[-1])
375 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
376 from_parent = os.fdopen(fd, 'rb')
377
378 process.current_process()._inheriting = True
379 preparation_data = load(from_parent)
380 prepare(preparation_data)
381 self = load(from_parent)
382 process.current_process()._inheriting = False
383
384 from_parent.close()
385
386 exitcode = self._bootstrap()
387 exit(exitcode)
388
389
390 def get_preparation_data(name):
391 '''
392 Return info about parent needed by child to unpickle process object
393 '''
394 from .util import _logger, _log_to_stderr
395
396 d = dict(
397 name=name,
398 sys_path=sys.path,
399 sys_argv=sys.argv,
400 log_to_stderr=_log_to_stderr,
401 orig_dir=process.ORIGINAL_DIR,
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000402 authkey=process.current_process().authkey,
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000403 )
404
405 if _logger is not None:
406 d['log_level'] = _logger.getEffectiveLevel()
407
brian.curtin40b53162011-04-11 18:00:59 -0500408 if not WINEXE and not WINSERVICE:
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000409 main_path = getattr(sys.modules['__main__'], '__file__', None)
410 if not main_path and sys.argv[0] not in ('', '-c'):
411 main_path = sys.argv[0]
412 if main_path is not None:
413 if not os.path.isabs(main_path) and \
414 process.ORIGINAL_DIR is not None:
415 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
416 d['main_path'] = os.path.normpath(main_path)
417
418 return d
419
420 #
421 # Make (Pipe)Connection picklable
422 #
423
424 def reduce_connection(conn):
425 if not Popen.thread_is_spawning():
426 raise RuntimeError(
427 'By default %s objects can only be shared between processes\n'
428 'using inheritance' % type(conn).__name__
429 )
430 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
431 conn.readable, conn.writable)
432
Jesse Noller13e9d582008-07-16 14:32:36 +0000433 ForkingPickler.register(Connection, reduce_connection)
434 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000435
436#
437# Prepare current process
438#
439
440old_main_modules = []
441
442def prepare(data):
443 '''
444 Try to get current process ready to unpickle process object
445 '''
446 old_main_modules.append(sys.modules['__main__'])
447
448 if 'name' in data:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000449 process.current_process().name = data['name']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000450
451 if 'authkey' in data:
452 process.current_process()._authkey = data['authkey']
453
454 if 'log_to_stderr' in data and data['log_to_stderr']:
455 util.log_to_stderr()
456
457 if 'log_level' in data:
458 util.get_logger().setLevel(data['log_level'])
459
460 if 'sys_path' in data:
461 sys.path = data['sys_path']
462
463 if 'sys_argv' in data:
464 sys.argv = data['sys_argv']
465
466 if 'dir' in data:
467 os.chdir(data['dir'])
468
469 if 'orig_dir' in data:
470 process.ORIGINAL_DIR = data['orig_dir']
471
472 if 'main_path' in data:
473 main_path = data['main_path']
474 main_name = os.path.splitext(os.path.basename(main_path))[0]
475 if main_name == '__init__':
476 main_name = os.path.basename(os.path.dirname(main_path))
477
478 if main_name != 'ipython':
479 import imp
480
481 if main_path is None:
482 dirs = None
483 elif os.path.basename(main_path).startswith('__init__.py'):
484 dirs = [os.path.dirname(os.path.dirname(main_path))]
485 else:
486 dirs = [os.path.dirname(main_path)]
487
488 assert main_name not in sys.modules, main_name
489 file, path_name, etc = imp.find_module(main_name, dirs)
490 try:
491 # We would like to do "imp.load_module('__main__', ...)"
492 # here. However, that would cause 'if __name__ ==
493 # "__main__"' clauses to be executed.
494 main_module = imp.load_module(
495 '__parents_main__', file, path_name, etc
496 )
497 finally:
498 if file:
499 file.close()
500
501 sys.modules['__main__'] = main_module
502 main_module.__name__ = '__main__'
503
504 # Try to make the potentially picklable objects in
505 # sys.modules['__main__'] realize they are in the main
506 # module -- somewhat ugly.
507 for obj in main_module.__dict__.values():
508 try:
509 if obj.__module__ == '__parents_main__':
510 obj.__module__ = '__main__'
511 except Exception:
512 pass