blob: 3fca8b1132618bcfa3d8a1c4decbd10482e7cba1 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray79af2452010-12-14 01:42:40 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000033#
34
35import os
36import sys
37import signal
38
39from multiprocessing import util, process
40
Jesse Noller13e9d582008-07-16 14:32:36 +000041__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000042
43#
44# Check that the current thread is spawning a child process
45#
46
47def assert_spawning(self):
48 if not Popen.thread_is_spawning():
49 raise RuntimeError(
50 '%s objects should only be shared between processes'
51 ' through inheritance' % type(self).__name__
52 )
53
54#
Jesse Noller13e9d582008-07-16 14:32:36 +000055# Try making some callable types picklable
56#
57
58from pickle import Pickler
59class ForkingPickler(Pickler):
60 dispatch = Pickler.dispatch.copy()
61
62 @classmethod
63 def register(cls, type, reduce):
64 def dispatcher(self, obj):
65 rv = reduce(obj)
66 self.save_reduce(obj=obj, *rv)
67 cls.dispatch[type] = dispatcher
68
69def _reduce_method(m):
70 if m.im_self is None:
71 return getattr, (m.im_class, m.im_func.func_name)
72 else:
73 return getattr, (m.im_self, m.im_func.func_name)
74ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
75
76def _reduce_method_descriptor(m):
77 return getattr, (m.__objclass__, m.__name__)
78ForkingPickler.register(type(list.append), _reduce_method_descriptor)
79ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
80
81#def _reduce_builtin_function_or_method(m):
82# return getattr, (m.__self__, m.__name__)
83#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
84#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
85
86try:
87 from functools import partial
88except ImportError:
89 pass
90else:
91 def _reduce_partial(p):
92 return _rebuild_partial, (p.func, p.args, p.keywords or {})
93 def _rebuild_partial(func, args, keywords):
94 return partial(func, *args, **keywords)
95 ForkingPickler.register(partial, _reduce_partial)
96
97#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000098# Unix
99#
100
101if sys.platform != 'win32':
102 import time
103
104 exit = os._exit
105 duplicate = os.dup
106 close = os.close
107
108 #
109 # We define a Popen class similar to the one from subprocess, but
110 # whose constructor takes a process object as its argument.
111 #
112
113 class Popen(object):
114
115 def __init__(self, process_obj):
116 sys.stdout.flush()
117 sys.stderr.flush()
118 self.returncode = None
119
120 self.pid = os.fork()
121 if self.pid == 0:
122 if 'random' in sys.modules:
123 import random
124 random.seed()
125 code = process_obj._bootstrap()
126 sys.stdout.flush()
127 sys.stderr.flush()
128 os._exit(code)
129
130 def poll(self, flag=os.WNOHANG):
131 if self.returncode is None:
Florent Xicluna16cd8882010-03-07 23:49:03 +0000132 try:
133 pid, sts = os.waitpid(self.pid, flag)
134 except os.error:
135 # Child process not yet created. See #1731717
136 # e.errno == errno.ECHILD == 10
137 return None
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000138 if pid == self.pid:
139 if os.WIFSIGNALED(sts):
140 self.returncode = -os.WTERMSIG(sts)
141 else:
142 assert os.WIFEXITED(sts)
143 self.returncode = os.WEXITSTATUS(sts)
144 return self.returncode
145
146 def wait(self, timeout=None):
147 if timeout is None:
148 return self.poll(0)
149 deadline = time.time() + timeout
150 delay = 0.0005
151 while 1:
152 res = self.poll()
153 if res is not None:
154 break
155 remaining = deadline - time.time()
156 if remaining <= 0:
157 break
158 delay = min(delay * 2, remaining, 0.05)
159 time.sleep(delay)
160 return res
161
162 def terminate(self):
163 if self.returncode is None:
164 try:
165 os.kill(self.pid, signal.SIGTERM)
166 except OSError, e:
167 if self.wait(timeout=0.1) is None:
168 raise
169
170 @staticmethod
171 def thread_is_spawning():
172 return False
173
174#
175# Windows
176#
177
178else:
179 import thread
180 import msvcrt
181 import _subprocess
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000182 import time
183
Jesse Noller2f8c8f42010-07-03 12:26:02 +0000184 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000185 from .util import Finalize
186
Jesse Noller13e9d582008-07-16 14:32:36 +0000187 #try:
188 # from cPickle import dump, load, HIGHEST_PROTOCOL
189 #except ImportError:
190 from pickle import load, HIGHEST_PROTOCOL
191
192 def dump(obj, file, protocol=None):
193 ForkingPickler(file, protocol).dump(obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000194
195 #
196 #
197 #
198
199 TERMINATE = 0x10000
200 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
brian.curtin40b53162011-04-11 18:00:59 -0500201 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000202
203 exit = win32.ExitProcess
204 close = win32.CloseHandle
205
206 #
207 # _python_exe is the assumed path to the python executable.
208 # People embedding Python want to modify it.
209 #
210
brian.curtin40b53162011-04-11 18:00:59 -0500211 if WINSERVICE:
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000212 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
213 else:
214 _python_exe = sys.executable
215
216 def set_executable(exe):
217 global _python_exe
218 _python_exe = exe
219
220 #
221 #
222 #
223
224 def duplicate(handle, target_process=None, inheritable=False):
225 if target_process is None:
226 target_process = _subprocess.GetCurrentProcess()
227 return _subprocess.DuplicateHandle(
228 _subprocess.GetCurrentProcess(), handle, target_process,
229 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
230 ).Detach()
231
232 #
233 # We define a Popen class similar to the one from subprocess, but
234 # whose constructor takes a process object as its argument.
235 #
236
237 class Popen(object):
238 '''
239 Start a subprocess to run the code of a process object
240 '''
241 _tls = thread._local()
242
243 def __init__(self, process_obj):
244 # create pipe for communication with child
245 rfd, wfd = os.pipe()
246
247 # get handle for read end of the pipe and make it inheritable
248 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
249 os.close(rfd)
250
251 # start process
252 cmd = get_command_line() + [rhandle]
253 cmd = ' '.join('"%s"' % x for x in cmd)
254 hp, ht, pid, tid = _subprocess.CreateProcess(
255 _python_exe, cmd, None, None, 1, 0, None, None, None
256 )
257 ht.Close()
258 close(rhandle)
259
260 # set attributes of self
261 self.pid = pid
262 self.returncode = None
263 self._handle = hp
264
265 # send information to child
266 prep_data = get_preparation_data(process_obj._name)
267 to_child = os.fdopen(wfd, 'wb')
268 Popen._tls.process_handle = int(hp)
269 try:
270 dump(prep_data, to_child, HIGHEST_PROTOCOL)
271 dump(process_obj, to_child, HIGHEST_PROTOCOL)
272 finally:
273 del Popen._tls.process_handle
274 to_child.close()
275
276 @staticmethod
277 def thread_is_spawning():
278 return getattr(Popen._tls, 'process_handle', None) is not None
279
280 @staticmethod
281 def duplicate_for_child(handle):
282 return duplicate(handle, Popen._tls.process_handle)
283
284 def wait(self, timeout=None):
285 if self.returncode is None:
286 if timeout is None:
287 msecs = _subprocess.INFINITE
288 else:
289 msecs = max(0, int(timeout * 1000 + 0.5))
290
291 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
292 if res == _subprocess.WAIT_OBJECT_0:
293 code = _subprocess.GetExitCodeProcess(self._handle)
294 if code == TERMINATE:
295 code = -signal.SIGTERM
296 self.returncode = code
297
298 return self.returncode
299
300 def poll(self):
301 return self.wait(timeout=0)
302
303 def terminate(self):
304 if self.returncode is None:
305 try:
306 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
307 except WindowsError:
308 if self.wait(timeout=0.1) is None:
309 raise
310
311 #
312 #
313 #
314
315 def is_forking(argv):
316 '''
317 Return whether commandline indicates we are forking
318 '''
319 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
320 assert len(argv) == 3
321 return True
322 else:
323 return False
324
325
326 def freeze_support():
327 '''
328 Run code for process object if this in not the main process
329 '''
330 if is_forking(sys.argv):
331 main()
332 sys.exit()
333
334
335 def get_command_line():
336 '''
337 Returns prefix of command line used for spawning a child process
338 '''
339 if process.current_process()._identity==() and is_forking(sys.argv):
340 raise RuntimeError('''
341 Attempt to start a new process before the current process
342 has finished its bootstrapping phase.
343
344 This probably means that you are on Windows and you have
345 forgotten to use the proper idiom in the main module:
346
347 if __name__ == '__main__':
348 freeze_support()
349 ...
350
351 The "freeze_support()" line can be omitted if the program
352 is not going to be frozen to produce a Windows executable.''')
353
354 if getattr(sys, 'frozen', False):
355 return [sys.executable, '--multiprocessing-fork']
356 else:
357 prog = 'from multiprocessing.forking import main; main()'
358 return [_python_exe, '-c', prog, '--multiprocessing-fork']
359
360
361 def main():
362 '''
363 Run code specifed by data received over pipe
364 '''
365 assert is_forking(sys.argv)
366
367 handle = int(sys.argv[-1])
368 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
369 from_parent = os.fdopen(fd, 'rb')
370
371 process.current_process()._inheriting = True
372 preparation_data = load(from_parent)
373 prepare(preparation_data)
374 self = load(from_parent)
375 process.current_process()._inheriting = False
376
377 from_parent.close()
378
379 exitcode = self._bootstrap()
380 exit(exitcode)
381
382
383 def get_preparation_data(name):
384 '''
385 Return info about parent needed by child to unpickle process object
386 '''
387 from .util import _logger, _log_to_stderr
388
389 d = dict(
390 name=name,
391 sys_path=sys.path,
392 sys_argv=sys.argv,
393 log_to_stderr=_log_to_stderr,
394 orig_dir=process.ORIGINAL_DIR,
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000395 authkey=process.current_process().authkey,
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000396 )
397
398 if _logger is not None:
399 d['log_level'] = _logger.getEffectiveLevel()
400
brian.curtin40b53162011-04-11 18:00:59 -0500401 if not WINEXE and not WINSERVICE:
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000402 main_path = getattr(sys.modules['__main__'], '__file__', None)
403 if not main_path and sys.argv[0] not in ('', '-c'):
404 main_path = sys.argv[0]
405 if main_path is not None:
406 if not os.path.isabs(main_path) and \
407 process.ORIGINAL_DIR is not None:
408 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
409 d['main_path'] = os.path.normpath(main_path)
410
411 return d
412
413 #
414 # Make (Pipe)Connection picklable
415 #
416
417 def reduce_connection(conn):
418 if not Popen.thread_is_spawning():
419 raise RuntimeError(
420 'By default %s objects can only be shared between processes\n'
421 'using inheritance' % type(conn).__name__
422 )
423 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
424 conn.readable, conn.writable)
425
Jesse Noller13e9d582008-07-16 14:32:36 +0000426 ForkingPickler.register(Connection, reduce_connection)
427 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000428
429#
430# Prepare current process
431#
432
433old_main_modules = []
434
435def prepare(data):
436 '''
437 Try to get current process ready to unpickle process object
438 '''
439 old_main_modules.append(sys.modules['__main__'])
440
441 if 'name' in data:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000442 process.current_process().name = data['name']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000443
444 if 'authkey' in data:
445 process.current_process()._authkey = data['authkey']
446
447 if 'log_to_stderr' in data and data['log_to_stderr']:
448 util.log_to_stderr()
449
450 if 'log_level' in data:
451 util.get_logger().setLevel(data['log_level'])
452
453 if 'sys_path' in data:
454 sys.path = data['sys_path']
455
456 if 'sys_argv' in data:
457 sys.argv = data['sys_argv']
458
459 if 'dir' in data:
460 os.chdir(data['dir'])
461
462 if 'orig_dir' in data:
463 process.ORIGINAL_DIR = data['orig_dir']
464
465 if 'main_path' in data:
466 main_path = data['main_path']
467 main_name = os.path.splitext(os.path.basename(main_path))[0]
468 if main_name == '__init__':
469 main_name = os.path.basename(os.path.dirname(main_path))
470
471 if main_name != 'ipython':
472 import imp
473
474 if main_path is None:
475 dirs = None
476 elif os.path.basename(main_path).startswith('__init__.py'):
477 dirs = [os.path.dirname(os.path.dirname(main_path))]
478 else:
479 dirs = [os.path.dirname(main_path)]
480
481 assert main_name not in sys.modules, main_name
482 file, path_name, etc = imp.find_module(main_name, dirs)
483 try:
484 # We would like to do "imp.load_module('__main__', ...)"
485 # here. However, that would cause 'if __name__ ==
486 # "__main__"' clauses to be executed.
487 main_module = imp.load_module(
488 '__parents_main__', file, path_name, etc
489 )
490 finally:
491 if file:
492 file.close()
493
494 sys.modules['__main__'] = main_module
495 main_module.__name__ = '__main__'
496
497 # Try to make the potentially picklable objects in
498 # sys.modules['__main__'] realize they are in the main
499 # module -- somewhat ugly.
500 for obj in main_module.__dict__.values():
501 try:
502 if obj.__module__ == '__parents_main__':
503 obj.__module__ = '__main__'
504 except Exception:
505 pass