blob: 3d9555708eae1e17ecb530b4ce9e8d4afe56e4c7 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import os
36import sys
37import signal
38
39from multiprocessing import util, process
40
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
43#
44# Check that the current thread is spawning a child process
45#
46
47def assert_spawning(self):
48 if not Popen.thread_is_spawning():
49 raise RuntimeError(
50 '%s objects should only be shared between processes'
51 ' through inheritance' % type(self).__name__
52 )
53
54#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000055# Try making some callable types picklable
56#
57
58from pickle import _Pickler as Pickler
59class ForkingPickler(Pickler):
60 dispatch = Pickler.dispatch.copy()
61 @classmethod
62 def register(cls, type, reduce):
63 def dispatcher(self, obj):
64 rv = reduce(obj)
65 if isinstance(rv, str):
66 self.save_global(obj, rv)
67 else:
68 self.save_reduce(obj=obj, *rv)
69 cls.dispatch[type] = dispatcher
70
71def _reduce_method(m):
72 if m.__self__ is None:
73 return getattr, (m.__class__, m.__func__.__name__)
74 else:
75 return getattr, (m.__self__, m.__func__.__name__)
76class _C:
77 def f(self):
78 pass
79ForkingPickler.register(type(_C().f), _reduce_method)
80
81
82def _reduce_method_descriptor(m):
83 return getattr, (m.__objclass__, m.__name__)
84ForkingPickler.register(type(list.append), _reduce_method_descriptor)
85ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
86
87try:
88 from functools import partial
89except ImportError:
90 pass
91else:
92 def _reduce_partial(p):
93 return _rebuild_partial, (p.func, p.args, p.keywords or {})
94 def _rebuild_partial(func, args, keywords):
95 return partial(func, *args, **keywords)
96 ForkingPickler.register(partial, _reduce_partial)
97
98#
Benjamin Petersone711caf2008-06-11 16:44:04 +000099# Unix
100#
101
102if sys.platform != 'win32':
103 import time
104
105 exit = os._exit
106 duplicate = os.dup
107 close = os.close
108
109 #
110 # We define a Popen class similar to the one from subprocess, but
111 # whose constructor takes a process object as its argument.
112 #
113
114 class Popen(object):
115
116 def __init__(self, process_obj):
117 sys.stdout.flush()
118 sys.stderr.flush()
119 self.returncode = None
120
121 self.pid = os.fork()
122 if self.pid == 0:
123 if 'random' in sys.modules:
124 import random
125 random.seed()
126 code = process_obj._bootstrap()
127 sys.stdout.flush()
128 sys.stderr.flush()
129 os._exit(code)
130
131 def poll(self, flag=os.WNOHANG):
132 if self.returncode is None:
Florent Xicluna998171f2010-03-08 13:32:17 +0000133 try:
134 pid, sts = os.waitpid(self.pid, flag)
135 except os.error:
136 # Child process not yet created. See #1731717
137 # e.errno == errno.ECHILD == 10
138 return None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139 if pid == self.pid:
140 if os.WIFSIGNALED(sts):
141 self.returncode = -os.WTERMSIG(sts)
142 else:
143 assert os.WIFEXITED(sts)
144 self.returncode = os.WEXITSTATUS(sts)
145 return self.returncode
146
147 def wait(self, timeout=None):
148 if timeout is None:
149 return self.poll(0)
150 deadline = time.time() + timeout
151 delay = 0.0005
152 while 1:
153 res = self.poll()
154 if res is not None:
155 break
156 remaining = deadline - time.time()
157 if remaining <= 0:
158 break
159 delay = min(delay * 2, remaining, 0.05)
160 time.sleep(delay)
161 return res
162
163 def terminate(self):
164 if self.returncode is None:
165 try:
166 os.kill(self.pid, signal.SIGTERM)
167 except OSError as e:
168 if self.wait(timeout=0.1) is None:
169 raise
170
171 @staticmethod
172 def thread_is_spawning():
173 return False
174
175#
176# Windows
177#
178
179else:
180 import _thread
181 import msvcrt
182 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 import time
184
Jesse Nollerf70a5382009-01-18 19:44:02 +0000185 from pickle import dump, load, HIGHEST_PROTOCOL
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200186 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 from .util import Finalize
188
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000189 def dump(obj, file, protocol=None):
190 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 #
193 #
194 #
195
196 TERMINATE = 0x10000
197 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
brian.curtine2f29982011-04-11 17:56:23 -0500198 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
200 exit = win32.ExitProcess
201 close = win32.CloseHandle
202
203 #
204 # _python_exe is the assumed path to the python executable.
205 # People embedding Python want to modify it.
206 #
207
brian.curtine2f29982011-04-11 17:56:23 -0500208 if WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
210 else:
211 _python_exe = sys.executable
212
213 def set_executable(exe):
214 global _python_exe
215 _python_exe = exe
216
217 #
218 #
219 #
220
221 def duplicate(handle, target_process=None, inheritable=False):
222 if target_process is None:
223 target_process = _subprocess.GetCurrentProcess()
224 return _subprocess.DuplicateHandle(
225 _subprocess.GetCurrentProcess(), handle, target_process,
226 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
227 ).Detach()
228
229 #
230 # We define a Popen class similar to the one from subprocess, but
231 # whose constructor takes a process object as its argument.
232 #
233
234 class Popen(object):
235 '''
236 Start a subprocess to run the code of a process object
237 '''
238 _tls = _thread._local()
239
240 def __init__(self, process_obj):
241 # create pipe for communication with child
242 rfd, wfd = os.pipe()
243
244 # get handle for read end of the pipe and make it inheritable
245 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
246 os.close(rfd)
247
248 # start process
249 cmd = get_command_line() + [rhandle]
250 cmd = ' '.join('"%s"' % x for x in cmd)
251 hp, ht, pid, tid = _subprocess.CreateProcess(
252 _python_exe, cmd, None, None, 1, 0, None, None, None
253 )
254 ht.Close()
255 close(rhandle)
256
257 # set attributes of self
258 self.pid = pid
259 self.returncode = None
260 self._handle = hp
261
262 # send information to child
263 prep_data = get_preparation_data(process_obj._name)
264 to_child = os.fdopen(wfd, 'wb')
265 Popen._tls.process_handle = int(hp)
266 try:
267 dump(prep_data, to_child, HIGHEST_PROTOCOL)
268 dump(process_obj, to_child, HIGHEST_PROTOCOL)
269 finally:
270 del Popen._tls.process_handle
271 to_child.close()
272
273 @staticmethod
274 def thread_is_spawning():
275 return getattr(Popen._tls, 'process_handle', None) is not None
276
277 @staticmethod
278 def duplicate_for_child(handle):
279 return duplicate(handle, Popen._tls.process_handle)
280
281 def wait(self, timeout=None):
282 if self.returncode is None:
283 if timeout is None:
284 msecs = _subprocess.INFINITE
285 else:
286 msecs = max(0, int(timeout * 1000 + 0.5))
287
288 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
289 if res == _subprocess.WAIT_OBJECT_0:
290 code = _subprocess.GetExitCodeProcess(self._handle)
291 if code == TERMINATE:
292 code = -signal.SIGTERM
293 self.returncode = code
294
295 return self.returncode
296
297 def poll(self):
298 return self.wait(timeout=0)
299
300 def terminate(self):
301 if self.returncode is None:
302 try:
303 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
304 except WindowsError:
305 if self.wait(timeout=0.1) is None:
306 raise
307
308 #
309 #
310 #
311
312 def is_forking(argv):
313 '''
314 Return whether commandline indicates we are forking
315 '''
316 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
317 assert len(argv) == 3
318 return True
319 else:
320 return False
321
322
323 def freeze_support():
324 '''
325 Run code for process object if this in not the main process
326 '''
327 if is_forking(sys.argv):
328 main()
329 sys.exit()
330
331
332 def get_command_line():
333 '''
334 Returns prefix of command line used for spawning a child process
335 '''
336 if process.current_process()._identity==() and is_forking(sys.argv):
337 raise RuntimeError('''
338 Attempt to start a new process before the current process
339 has finished its bootstrapping phase.
340
341 This probably means that you are on Windows and you have
342 forgotten to use the proper idiom in the main module:
343
344 if __name__ == '__main__':
345 freeze_support()
346 ...
347
348 The "freeze_support()" line can be omitted if the program
349 is not going to be frozen to produce a Windows executable.''')
350
351 if getattr(sys, 'frozen', False):
352 return [sys.executable, '--multiprocessing-fork']
353 else:
354 prog = 'from multiprocessing.forking import main; main()'
355 return [_python_exe, '-c', prog, '--multiprocessing-fork']
356
357
358 def main():
359 '''
360 Run code specifed by data received over pipe
361 '''
362 assert is_forking(sys.argv)
363
364 handle = int(sys.argv[-1])
365 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
366 from_parent = os.fdopen(fd, 'rb')
367
368 process.current_process()._inheriting = True
369 preparation_data = load(from_parent)
370 prepare(preparation_data)
371 self = load(from_parent)
372 process.current_process()._inheriting = False
373
374 from_parent.close()
375
376 exitcode = self._bootstrap()
377 exit(exitcode)
378
379
380 def get_preparation_data(name):
381 '''
382 Return info about parent needed by child to unpickle process object
383 '''
384 from .util import _logger, _log_to_stderr
385
386 d = dict(
387 name=name,
388 sys_path=sys.path,
389 sys_argv=sys.argv,
390 log_to_stderr=_log_to_stderr,
391 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000392 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393 )
394
395 if _logger is not None:
396 d['log_level'] = _logger.getEffectiveLevel()
397
brian.curtine2f29982011-04-11 17:56:23 -0500398 if not WINEXE and not WINSERVICE:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000399 main_path = getattr(sys.modules['__main__'], '__file__', None)
400 if not main_path and sys.argv[0] not in ('', '-c'):
401 main_path = sys.argv[0]
402 if main_path is not None:
403 if not os.path.isabs(main_path) and \
404 process.ORIGINAL_DIR is not None:
405 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
406 d['main_path'] = os.path.normpath(main_path)
407
408 return d
409
410 #
411 # Make (Pipe)Connection picklable
412 #
413
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200414 # Late import because of circular import
415 from .connection import Connection, PipeConnection
416
Benjamin Petersone711caf2008-06-11 16:44:04 +0000417 def reduce_connection(conn):
418 if not Popen.thread_is_spawning():
419 raise RuntimeError(
420 'By default %s objects can only be shared between processes\n'
421 'using inheritance' % type(conn).__name__
422 )
423 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
424 conn.readable, conn.writable)
425
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000426 ForkingPickler.register(Connection, reduce_connection)
427 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000428
429#
430# Prepare current process
431#
432
433old_main_modules = []
434
435def prepare(data):
436 '''
437 Try to get current process ready to unpickle process object
438 '''
439 old_main_modules.append(sys.modules['__main__'])
440
441 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000442 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443
444 if 'authkey' in data:
445 process.current_process()._authkey = data['authkey']
446
447 if 'log_to_stderr' in data and data['log_to_stderr']:
448 util.log_to_stderr()
449
450 if 'log_level' in data:
451 util.get_logger().setLevel(data['log_level'])
452
453 if 'sys_path' in data:
454 sys.path = data['sys_path']
455
456 if 'sys_argv' in data:
457 sys.argv = data['sys_argv']
458
459 if 'dir' in data:
460 os.chdir(data['dir'])
461
462 if 'orig_dir' in data:
463 process.ORIGINAL_DIR = data['orig_dir']
464
465 if 'main_path' in data:
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000466 # XXX (ncoghlan): The following code makes several bogus
467 # assumptions regarding the relationship between __file__
468 # and a module's real name. See PEP 302 and issue #10845
Benjamin Petersone711caf2008-06-11 16:44:04 +0000469 main_path = data['main_path']
470 main_name = os.path.splitext(os.path.basename(main_path))[0]
471 if main_name == '__init__':
472 main_name = os.path.basename(os.path.dirname(main_path))
473
Nick Coghlan793ee1f2011-01-30 01:24:08 +0000474 if main_name == '__main__':
475 main_module = sys.modules['__main__']
476 main_module.__file__ = main_path
477 elif main_name != 'ipython':
478 # Main modules not actually called __main__.py may
479 # contain additional code that should still be executed
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480 import imp
481
482 if main_path is None:
483 dirs = None
484 elif os.path.basename(main_path).startswith('__init__.py'):
485 dirs = [os.path.dirname(os.path.dirname(main_path))]
486 else:
487 dirs = [os.path.dirname(main_path)]
488
489 assert main_name not in sys.modules, main_name
490 file, path_name, etc = imp.find_module(main_name, dirs)
491 try:
492 # We would like to do "imp.load_module('__main__', ...)"
493 # here. However, that would cause 'if __name__ ==
494 # "__main__"' clauses to be executed.
495 main_module = imp.load_module(
496 '__parents_main__', file, path_name, etc
497 )
498 finally:
499 if file:
500 file.close()
501
502 sys.modules['__main__'] = main_module
503 main_module.__name__ = '__main__'
504
505 # Try to make the potentially picklable objects in
506 # sys.modules['__main__'] realize they are in the main
507 # module -- somewhat ugly.
508 for obj in list(main_module.__dict__.values()):
509 try:
510 if obj.__module__ == '__parents_main__':
511 obj.__module__ = '__main__'
512 except Exception:
513 pass