blob: d5f3b2643750b4fcc17576fa55d73ad663dbbe5d [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray79af2452010-12-14 01:42:40 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000033#
34
35import os
36import sys
37import signal
38
39from multiprocessing import util, process
40
Jesse Noller13e9d582008-07-16 14:32:36 +000041__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000042
43#
44# Check that the current thread is spawning a child process
45#
46
47def assert_spawning(self):
48 if not Popen.thread_is_spawning():
49 raise RuntimeError(
50 '%s objects should only be shared between processes'
51 ' through inheritance' % type(self).__name__
52 )
53
54#
Jesse Noller13e9d582008-07-16 14:32:36 +000055# Try making some callable types picklable
56#
57
58from pickle import Pickler
59class ForkingPickler(Pickler):
60 dispatch = Pickler.dispatch.copy()
61
62 @classmethod
63 def register(cls, type, reduce):
64 def dispatcher(self, obj):
65 rv = reduce(obj)
66 self.save_reduce(obj=obj, *rv)
67 cls.dispatch[type] = dispatcher
68
69def _reduce_method(m):
70 if m.im_self is None:
71 return getattr, (m.im_class, m.im_func.func_name)
72 else:
73 return getattr, (m.im_self, m.im_func.func_name)
74ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
75
76def _reduce_method_descriptor(m):
77 return getattr, (m.__objclass__, m.__name__)
78ForkingPickler.register(type(list.append), _reduce_method_descriptor)
79ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
80
81#def _reduce_builtin_function_or_method(m):
82# return getattr, (m.__self__, m.__name__)
83#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
84#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
85
86try:
87 from functools import partial
88except ImportError:
89 pass
90else:
91 def _reduce_partial(p):
92 return _rebuild_partial, (p.func, p.args, p.keywords or {})
93 def _rebuild_partial(func, args, keywords):
94 return partial(func, *args, **keywords)
95 ForkingPickler.register(partial, _reduce_partial)
96
97#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000098# Unix
99#
100
101if sys.platform != 'win32':
102 import time
103
104 exit = os._exit
105 duplicate = os.dup
106 close = os.close
107
108 #
109 # We define a Popen class similar to the one from subprocess, but
110 # whose constructor takes a process object as its argument.
111 #
112
113 class Popen(object):
114
115 def __init__(self, process_obj):
116 sys.stdout.flush()
117 sys.stderr.flush()
118 self.returncode = None
119
120 self.pid = os.fork()
121 if self.pid == 0:
122 if 'random' in sys.modules:
123 import random
124 random.seed()
125 code = process_obj._bootstrap()
126 sys.stdout.flush()
127 sys.stderr.flush()
128 os._exit(code)
129
130 def poll(self, flag=os.WNOHANG):
131 if self.returncode is None:
Florent Xicluna16cd8882010-03-07 23:49:03 +0000132 try:
133 pid, sts = os.waitpid(self.pid, flag)
134 except os.error:
135 # Child process not yet created. See #1731717
136 # e.errno == errno.ECHILD == 10
137 return None
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000138 if pid == self.pid:
139 if os.WIFSIGNALED(sts):
140 self.returncode = -os.WTERMSIG(sts)
141 else:
142 assert os.WIFEXITED(sts)
143 self.returncode = os.WEXITSTATUS(sts)
144 return self.returncode
145
146 def wait(self, timeout=None):
147 if timeout is None:
148 return self.poll(0)
149 deadline = time.time() + timeout
150 delay = 0.0005
151 while 1:
152 res = self.poll()
153 if res is not None:
154 break
155 remaining = deadline - time.time()
156 if remaining <= 0:
157 break
158 delay = min(delay * 2, remaining, 0.05)
159 time.sleep(delay)
160 return res
161
162 def terminate(self):
163 if self.returncode is None:
164 try:
165 os.kill(self.pid, signal.SIGTERM)
166 except OSError, e:
167 if self.wait(timeout=0.1) is None:
168 raise
169
170 @staticmethod
171 def thread_is_spawning():
172 return False
173
174#
175# Windows
176#
177
178else:
179 import thread
180 import msvcrt
181 import _subprocess
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000182 import time
183
Jesse Noller2f8c8f42010-07-03 12:26:02 +0000184 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000185 from .util import Finalize
186
Jesse Noller13e9d582008-07-16 14:32:36 +0000187 #try:
188 # from cPickle import dump, load, HIGHEST_PROTOCOL
189 #except ImportError:
190 from pickle import load, HIGHEST_PROTOCOL
191
192 def dump(obj, file, protocol=None):
193 ForkingPickler(file, protocol).dump(obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000194
195 #
196 #
197 #
198
199 TERMINATE = 0x10000
200 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
201
202 exit = win32.ExitProcess
203 close = win32.CloseHandle
204
205 #
206 # _python_exe is the assumed path to the python executable.
207 # People embedding Python want to modify it.
208 #
209
210 if sys.executable.lower().endswith('pythonservice.exe'):
211 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
212 else:
213 _python_exe = sys.executable
214
215 def set_executable(exe):
216 global _python_exe
217 _python_exe = exe
218
219 #
220 #
221 #
222
223 def duplicate(handle, target_process=None, inheritable=False):
224 if target_process is None:
225 target_process = _subprocess.GetCurrentProcess()
226 return _subprocess.DuplicateHandle(
227 _subprocess.GetCurrentProcess(), handle, target_process,
228 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
229 ).Detach()
230
231 #
232 # We define a Popen class similar to the one from subprocess, but
233 # whose constructor takes a process object as its argument.
234 #
235
236 class Popen(object):
237 '''
238 Start a subprocess to run the code of a process object
239 '''
240 _tls = thread._local()
241
242 def __init__(self, process_obj):
243 # create pipe for communication with child
244 rfd, wfd = os.pipe()
245
246 # get handle for read end of the pipe and make it inheritable
247 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
248 os.close(rfd)
249
250 # start process
251 cmd = get_command_line() + [rhandle]
252 cmd = ' '.join('"%s"' % x for x in cmd)
253 hp, ht, pid, tid = _subprocess.CreateProcess(
254 _python_exe, cmd, None, None, 1, 0, None, None, None
255 )
256 ht.Close()
257 close(rhandle)
258
259 # set attributes of self
260 self.pid = pid
261 self.returncode = None
262 self._handle = hp
263
264 # send information to child
265 prep_data = get_preparation_data(process_obj._name)
266 to_child = os.fdopen(wfd, 'wb')
267 Popen._tls.process_handle = int(hp)
268 try:
269 dump(prep_data, to_child, HIGHEST_PROTOCOL)
270 dump(process_obj, to_child, HIGHEST_PROTOCOL)
271 finally:
272 del Popen._tls.process_handle
273 to_child.close()
274
275 @staticmethod
276 def thread_is_spawning():
277 return getattr(Popen._tls, 'process_handle', None) is not None
278
279 @staticmethod
280 def duplicate_for_child(handle):
281 return duplicate(handle, Popen._tls.process_handle)
282
283 def wait(self, timeout=None):
284 if self.returncode is None:
285 if timeout is None:
286 msecs = _subprocess.INFINITE
287 else:
288 msecs = max(0, int(timeout * 1000 + 0.5))
289
290 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
291 if res == _subprocess.WAIT_OBJECT_0:
292 code = _subprocess.GetExitCodeProcess(self._handle)
293 if code == TERMINATE:
294 code = -signal.SIGTERM
295 self.returncode = code
296
297 return self.returncode
298
299 def poll(self):
300 return self.wait(timeout=0)
301
302 def terminate(self):
303 if self.returncode is None:
304 try:
305 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
306 except WindowsError:
307 if self.wait(timeout=0.1) is None:
308 raise
309
310 #
311 #
312 #
313
314 def is_forking(argv):
315 '''
316 Return whether commandline indicates we are forking
317 '''
318 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
319 assert len(argv) == 3
320 return True
321 else:
322 return False
323
324
325 def freeze_support():
326 '''
327 Run code for process object if this in not the main process
328 '''
329 if is_forking(sys.argv):
330 main()
331 sys.exit()
332
333
334 def get_command_line():
335 '''
336 Returns prefix of command line used for spawning a child process
337 '''
338 if process.current_process()._identity==() and is_forking(sys.argv):
339 raise RuntimeError('''
340 Attempt to start a new process before the current process
341 has finished its bootstrapping phase.
342
343 This probably means that you are on Windows and you have
344 forgotten to use the proper idiom in the main module:
345
346 if __name__ == '__main__':
347 freeze_support()
348 ...
349
350 The "freeze_support()" line can be omitted if the program
351 is not going to be frozen to produce a Windows executable.''')
352
353 if getattr(sys, 'frozen', False):
354 return [sys.executable, '--multiprocessing-fork']
355 else:
356 prog = 'from multiprocessing.forking import main; main()'
357 return [_python_exe, '-c', prog, '--multiprocessing-fork']
358
359
360 def main():
361 '''
362 Run code specifed by data received over pipe
363 '''
364 assert is_forking(sys.argv)
365
366 handle = int(sys.argv[-1])
367 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
368 from_parent = os.fdopen(fd, 'rb')
369
370 process.current_process()._inheriting = True
371 preparation_data = load(from_parent)
372 prepare(preparation_data)
373 self = load(from_parent)
374 process.current_process()._inheriting = False
375
376 from_parent.close()
377
378 exitcode = self._bootstrap()
379 exit(exitcode)
380
381
382 def get_preparation_data(name):
383 '''
384 Return info about parent needed by child to unpickle process object
385 '''
386 from .util import _logger, _log_to_stderr
387
388 d = dict(
389 name=name,
390 sys_path=sys.path,
391 sys_argv=sys.argv,
392 log_to_stderr=_log_to_stderr,
393 orig_dir=process.ORIGINAL_DIR,
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000394 authkey=process.current_process().authkey,
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000395 )
396
397 if _logger is not None:
398 d['log_level'] = _logger.getEffectiveLevel()
399
400 if not WINEXE:
401 main_path = getattr(sys.modules['__main__'], '__file__', None)
402 if not main_path and sys.argv[0] not in ('', '-c'):
403 main_path = sys.argv[0]
404 if main_path is not None:
405 if not os.path.isabs(main_path) and \
406 process.ORIGINAL_DIR is not None:
407 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
408 d['main_path'] = os.path.normpath(main_path)
409
410 return d
411
412 #
413 # Make (Pipe)Connection picklable
414 #
415
416 def reduce_connection(conn):
417 if not Popen.thread_is_spawning():
418 raise RuntimeError(
419 'By default %s objects can only be shared between processes\n'
420 'using inheritance' % type(conn).__name__
421 )
422 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
423 conn.readable, conn.writable)
424
Jesse Noller13e9d582008-07-16 14:32:36 +0000425 ForkingPickler.register(Connection, reduce_connection)
426 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000427
428#
429# Prepare current process
430#
431
432old_main_modules = []
433
434def prepare(data):
435 '''
436 Try to get current process ready to unpickle process object
437 '''
438 old_main_modules.append(sys.modules['__main__'])
439
440 if 'name' in data:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000441 process.current_process().name = data['name']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000442
443 if 'authkey' in data:
444 process.current_process()._authkey = data['authkey']
445
446 if 'log_to_stderr' in data and data['log_to_stderr']:
447 util.log_to_stderr()
448
449 if 'log_level' in data:
450 util.get_logger().setLevel(data['log_level'])
451
452 if 'sys_path' in data:
453 sys.path = data['sys_path']
454
455 if 'sys_argv' in data:
456 sys.argv = data['sys_argv']
457
458 if 'dir' in data:
459 os.chdir(data['dir'])
460
461 if 'orig_dir' in data:
462 process.ORIGINAL_DIR = data['orig_dir']
463
464 if 'main_path' in data:
465 main_path = data['main_path']
466 main_name = os.path.splitext(os.path.basename(main_path))[0]
467 if main_name == '__init__':
468 main_name = os.path.basename(os.path.dirname(main_path))
469
470 if main_name != 'ipython':
471 import imp
472
473 if main_path is None:
474 dirs = None
475 elif os.path.basename(main_path).startswith('__init__.py'):
476 dirs = [os.path.dirname(os.path.dirname(main_path))]
477 else:
478 dirs = [os.path.dirname(main_path)]
479
480 assert main_name not in sys.modules, main_name
481 file, path_name, etc = imp.find_module(main_name, dirs)
482 try:
483 # We would like to do "imp.load_module('__main__', ...)"
484 # here. However, that would cause 'if __name__ ==
485 # "__main__"' clauses to be executed.
486 main_module = imp.load_module(
487 '__parents_main__', file, path_name, etc
488 )
489 finally:
490 if file:
491 file.close()
492
493 sys.modules['__main__'] = main_module
494 main_module.__name__ = '__main__'
495
496 # Try to make the potentially picklable objects in
497 # sys.modules['__main__'] realize they are in the main
498 # module -- somewhat ugly.
499 for obj in main_module.__dict__.values():
500 try:
501 if obj.__module__ == '__parents_main__':
502 obj.__module__ = '__main__'
503 except Exception:
504 pass