blob: 1597ae8f2e3d36e4303c67483d38f651147380b5 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray79af2452010-12-14 01:42:40 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000033#
34
35import os
36import sys
37import signal
Richard Oudkerkba482642013-02-26 12:37:07 +000038import errno
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000039
40from multiprocessing import util, process
41
Jesse Noller13e9d582008-07-16 14:32:36 +000042__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000043
44#
45# Check that the current thread is spawning a child process
46#
47
48def assert_spawning(self):
49 if not Popen.thread_is_spawning():
50 raise RuntimeError(
51 '%s objects should only be shared between processes'
52 ' through inheritance' % type(self).__name__
53 )
54
55#
Jesse Noller13e9d582008-07-16 14:32:36 +000056# Try making some callable types picklable
57#
58
59from pickle import Pickler
60class ForkingPickler(Pickler):
61 dispatch = Pickler.dispatch.copy()
62
63 @classmethod
64 def register(cls, type, reduce):
65 def dispatcher(self, obj):
66 rv = reduce(obj)
67 self.save_reduce(obj=obj, *rv)
68 cls.dispatch[type] = dispatcher
69
70def _reduce_method(m):
71 if m.im_self is None:
72 return getattr, (m.im_class, m.im_func.func_name)
73 else:
74 return getattr, (m.im_self, m.im_func.func_name)
75ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
76
77def _reduce_method_descriptor(m):
78 return getattr, (m.__objclass__, m.__name__)
79ForkingPickler.register(type(list.append), _reduce_method_descriptor)
80ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
81
82#def _reduce_builtin_function_or_method(m):
83# return getattr, (m.__self__, m.__name__)
84#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
85#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
86
87try:
88 from functools import partial
89except ImportError:
90 pass
91else:
92 def _reduce_partial(p):
93 return _rebuild_partial, (p.func, p.args, p.keywords or {})
94 def _rebuild_partial(func, args, keywords):
95 return partial(func, *args, **keywords)
96 ForkingPickler.register(partial, _reduce_partial)
97
98#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000099# Unix
100#
101
102if sys.platform != 'win32':
103 import time
104
105 exit = os._exit
106 duplicate = os.dup
107 close = os.close
108
109 #
110 # We define a Popen class similar to the one from subprocess, but
111 # whose constructor takes a process object as its argument.
112 #
113
114 class Popen(object):
115
116 def __init__(self, process_obj):
117 sys.stdout.flush()
118 sys.stderr.flush()
119 self.returncode = None
120
121 self.pid = os.fork()
122 if self.pid == 0:
123 if 'random' in sys.modules:
124 import random
125 random.seed()
126 code = process_obj._bootstrap()
127 sys.stdout.flush()
128 sys.stderr.flush()
129 os._exit(code)
130
131 def poll(self, flag=os.WNOHANG):
132 if self.returncode is None:
Richard Oudkerkba482642013-02-26 12:37:07 +0000133 while True:
134 try:
135 pid, sts = os.waitpid(self.pid, flag)
136 except os.error as e:
137 if e.errno == errno.EINTR:
138 continue
139 # Child process not yet created. See #1731717
140 # e.errno == errno.ECHILD == 10
141 return None
142 else:
143 break
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000144 if pid == self.pid:
145 if os.WIFSIGNALED(sts):
146 self.returncode = -os.WTERMSIG(sts)
147 else:
148 assert os.WIFEXITED(sts)
149 self.returncode = os.WEXITSTATUS(sts)
150 return self.returncode
151
152 def wait(self, timeout=None):
153 if timeout is None:
154 return self.poll(0)
155 deadline = time.time() + timeout
156 delay = 0.0005
157 while 1:
158 res = self.poll()
159 if res is not None:
160 break
161 remaining = deadline - time.time()
162 if remaining <= 0:
163 break
164 delay = min(delay * 2, remaining, 0.05)
165 time.sleep(delay)
166 return res
167
168 def terminate(self):
169 if self.returncode is None:
170 try:
171 os.kill(self.pid, signal.SIGTERM)
172 except OSError, e:
173 if self.wait(timeout=0.1) is None:
174 raise
175
176 @staticmethod
177 def thread_is_spawning():
178 return False
179
180#
181# Windows
182#
183
184else:
185 import thread
186 import msvcrt
187 import _subprocess
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000188 import time
189
Jesse Noller2f8c8f42010-07-03 12:26:02 +0000190 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000191 from .util import Finalize
192
Jesse Noller13e9d582008-07-16 14:32:36 +0000193 #try:
194 # from cPickle import dump, load, HIGHEST_PROTOCOL
195 #except ImportError:
196 from pickle import load, HIGHEST_PROTOCOL
197
198 def dump(obj, file, protocol=None):
199 ForkingPickler(file, protocol).dump(obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000200
201 #
202 #
203 #
204
205 TERMINATE = 0x10000
206 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
brian.curtin40b53162011-04-11 18:00:59 -0500207 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000208
209 exit = win32.ExitProcess
210 close = win32.CloseHandle
211
212 #
213 # _python_exe is the assumed path to the python executable.
214 # People embedding Python want to modify it.
215 #
216
brian.curtin40b53162011-04-11 18:00:59 -0500217 if WINSERVICE:
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000218 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
219 else:
220 _python_exe = sys.executable
221
222 def set_executable(exe):
223 global _python_exe
224 _python_exe = exe
225
226 #
227 #
228 #
229
230 def duplicate(handle, target_process=None, inheritable=False):
231 if target_process is None:
232 target_process = _subprocess.GetCurrentProcess()
233 return _subprocess.DuplicateHandle(
234 _subprocess.GetCurrentProcess(), handle, target_process,
235 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
236 ).Detach()
237
238 #
239 # We define a Popen class similar to the one from subprocess, but
240 # whose constructor takes a process object as its argument.
241 #
242
243 class Popen(object):
244 '''
245 Start a subprocess to run the code of a process object
246 '''
247 _tls = thread._local()
248
249 def __init__(self, process_obj):
250 # create pipe for communication with child
251 rfd, wfd = os.pipe()
252
253 # get handle for read end of the pipe and make it inheritable
254 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
255 os.close(rfd)
256
257 # start process
258 cmd = get_command_line() + [rhandle]
259 cmd = ' '.join('"%s"' % x for x in cmd)
260 hp, ht, pid, tid = _subprocess.CreateProcess(
261 _python_exe, cmd, None, None, 1, 0, None, None, None
262 )
263 ht.Close()
264 close(rhandle)
265
266 # set attributes of self
267 self.pid = pid
268 self.returncode = None
269 self._handle = hp
270
271 # send information to child
272 prep_data = get_preparation_data(process_obj._name)
273 to_child = os.fdopen(wfd, 'wb')
274 Popen._tls.process_handle = int(hp)
275 try:
276 dump(prep_data, to_child, HIGHEST_PROTOCOL)
277 dump(process_obj, to_child, HIGHEST_PROTOCOL)
278 finally:
279 del Popen._tls.process_handle
280 to_child.close()
281
282 @staticmethod
283 def thread_is_spawning():
284 return getattr(Popen._tls, 'process_handle', None) is not None
285
286 @staticmethod
287 def duplicate_for_child(handle):
288 return duplicate(handle, Popen._tls.process_handle)
289
290 def wait(self, timeout=None):
291 if self.returncode is None:
292 if timeout is None:
293 msecs = _subprocess.INFINITE
294 else:
295 msecs = max(0, int(timeout * 1000 + 0.5))
296
297 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
298 if res == _subprocess.WAIT_OBJECT_0:
299 code = _subprocess.GetExitCodeProcess(self._handle)
300 if code == TERMINATE:
301 code = -signal.SIGTERM
302 self.returncode = code
303
304 return self.returncode
305
306 def poll(self):
307 return self.wait(timeout=0)
308
309 def terminate(self):
310 if self.returncode is None:
311 try:
312 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
313 except WindowsError:
314 if self.wait(timeout=0.1) is None:
315 raise
316
317 #
318 #
319 #
320
321 def is_forking(argv):
322 '''
323 Return whether commandline indicates we are forking
324 '''
325 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
326 assert len(argv) == 3
327 return True
328 else:
329 return False
330
331
332 def freeze_support():
333 '''
334 Run code for process object if this in not the main process
335 '''
336 if is_forking(sys.argv):
337 main()
338 sys.exit()
339
340
341 def get_command_line():
342 '''
343 Returns prefix of command line used for spawning a child process
344 '''
Richard Oudkerkfaee75c2012-08-14 11:41:19 +0100345 if getattr(process.current_process(), '_inheriting', False):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000346 raise RuntimeError('''
347 Attempt to start a new process before the current process
348 has finished its bootstrapping phase.
349
350 This probably means that you are on Windows and you have
351 forgotten to use the proper idiom in the main module:
352
353 if __name__ == '__main__':
354 freeze_support()
355 ...
356
357 The "freeze_support()" line can be omitted if the program
358 is not going to be frozen to produce a Windows executable.''')
359
360 if getattr(sys, 'frozen', False):
361 return [sys.executable, '--multiprocessing-fork']
362 else:
363 prog = 'from multiprocessing.forking import main; main()'
364 return [_python_exe, '-c', prog, '--multiprocessing-fork']
365
366
367 def main():
368 '''
369 Run code specifed by data received over pipe
370 '''
371 assert is_forking(sys.argv)
372
373 handle = int(sys.argv[-1])
374 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
375 from_parent = os.fdopen(fd, 'rb')
376
377 process.current_process()._inheriting = True
378 preparation_data = load(from_parent)
379 prepare(preparation_data)
380 self = load(from_parent)
381 process.current_process()._inheriting = False
382
383 from_parent.close()
384
385 exitcode = self._bootstrap()
386 exit(exitcode)
387
388
389 def get_preparation_data(name):
390 '''
391 Return info about parent needed by child to unpickle process object
392 '''
393 from .util import _logger, _log_to_stderr
394
395 d = dict(
396 name=name,
397 sys_path=sys.path,
398 sys_argv=sys.argv,
399 log_to_stderr=_log_to_stderr,
400 orig_dir=process.ORIGINAL_DIR,
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000401 authkey=process.current_process().authkey,
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000402 )
403
404 if _logger is not None:
405 d['log_level'] = _logger.getEffectiveLevel()
406
brian.curtin40b53162011-04-11 18:00:59 -0500407 if not WINEXE and not WINSERVICE:
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000408 main_path = getattr(sys.modules['__main__'], '__file__', None)
409 if not main_path and sys.argv[0] not in ('', '-c'):
410 main_path = sys.argv[0]
411 if main_path is not None:
412 if not os.path.isabs(main_path) and \
413 process.ORIGINAL_DIR is not None:
414 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
415 d['main_path'] = os.path.normpath(main_path)
416
417 return d
418
419 #
420 # Make (Pipe)Connection picklable
421 #
422
423 def reduce_connection(conn):
424 if not Popen.thread_is_spawning():
425 raise RuntimeError(
426 'By default %s objects can only be shared between processes\n'
427 'using inheritance' % type(conn).__name__
428 )
429 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
430 conn.readable, conn.writable)
431
Jesse Noller13e9d582008-07-16 14:32:36 +0000432 ForkingPickler.register(Connection, reduce_connection)
433 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000434
435#
436# Prepare current process
437#
438
439old_main_modules = []
440
441def prepare(data):
442 '''
443 Try to get current process ready to unpickle process object
444 '''
445 old_main_modules.append(sys.modules['__main__'])
446
447 if 'name' in data:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000448 process.current_process().name = data['name']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000449
450 if 'authkey' in data:
451 process.current_process()._authkey = data['authkey']
452
453 if 'log_to_stderr' in data and data['log_to_stderr']:
454 util.log_to_stderr()
455
456 if 'log_level' in data:
457 util.get_logger().setLevel(data['log_level'])
458
459 if 'sys_path' in data:
460 sys.path = data['sys_path']
461
462 if 'sys_argv' in data:
463 sys.argv = data['sys_argv']
464
465 if 'dir' in data:
466 os.chdir(data['dir'])
467
468 if 'orig_dir' in data:
469 process.ORIGINAL_DIR = data['orig_dir']
470
471 if 'main_path' in data:
472 main_path = data['main_path']
473 main_name = os.path.splitext(os.path.basename(main_path))[0]
474 if main_name == '__init__':
475 main_name = os.path.basename(os.path.dirname(main_path))
476
477 if main_name != 'ipython':
478 import imp
479
480 if main_path is None:
481 dirs = None
482 elif os.path.basename(main_path).startswith('__init__.py'):
483 dirs = [os.path.dirname(os.path.dirname(main_path))]
484 else:
485 dirs = [os.path.dirname(main_path)]
486
487 assert main_name not in sys.modules, main_name
488 file, path_name, etc = imp.find_module(main_name, dirs)
489 try:
490 # We would like to do "imp.load_module('__main__', ...)"
491 # here. However, that would cause 'if __name__ ==
492 # "__main__"' clauses to be executed.
493 main_module = imp.load_module(
494 '__parents_main__', file, path_name, etc
495 )
496 finally:
497 if file:
498 file.close()
499
500 sys.modules['__main__'] = main_module
501 main_module.__name__ = '__main__'
502
503 # Try to make the potentially picklable objects in
504 # sys.modules['__main__'] realize they are in the main
505 # module -- somewhat ugly.
506 for obj in main_module.__dict__.values():
507 try:
508 if obj.__module__ == '__parents_main__':
509 obj.__module__ = '__main__'
510 except Exception:
511 pass