blob: d2a320845365707011bb221abc48aa308a6f3844 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import os
36import sys
37import signal
38
39from multiprocessing import util, process
40
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
43#
44# Check that the current thread is spawning a child process
45#
46
47def assert_spawning(self):
48 if not Popen.thread_is_spawning():
49 raise RuntimeError(
50 '%s objects should only be shared between processes'
51 ' through inheritance' % type(self).__name__
52 )
53
54#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000055# Try making some callable types picklable
56#
57
58from pickle import _Pickler as Pickler
59class ForkingPickler(Pickler):
60 dispatch = Pickler.dispatch.copy()
61 @classmethod
62 def register(cls, type, reduce):
63 def dispatcher(self, obj):
64 rv = reduce(obj)
65 if isinstance(rv, str):
66 self.save_global(obj, rv)
67 else:
68 self.save_reduce(obj=obj, *rv)
69 cls.dispatch[type] = dispatcher
70
71def _reduce_method(m):
72 if m.__self__ is None:
73 return getattr, (m.__class__, m.__func__.__name__)
74 else:
75 return getattr, (m.__self__, m.__func__.__name__)
76class _C:
77 def f(self):
78 pass
79ForkingPickler.register(type(_C().f), _reduce_method)
80
81
82def _reduce_method_descriptor(m):
83 return getattr, (m.__objclass__, m.__name__)
84ForkingPickler.register(type(list.append), _reduce_method_descriptor)
85ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
86
87try:
88 from functools import partial
89except ImportError:
90 pass
91else:
92 def _reduce_partial(p):
93 return _rebuild_partial, (p.func, p.args, p.keywords or {})
94 def _rebuild_partial(func, args, keywords):
95 return partial(func, *args, **keywords)
96 ForkingPickler.register(partial, _reduce_partial)
97
98#
Benjamin Petersone711caf2008-06-11 16:44:04 +000099# Unix
100#
101
102if sys.platform != 'win32':
103 import time
104
105 exit = os._exit
106 duplicate = os.dup
107 close = os.close
108
109 #
110 # We define a Popen class similar to the one from subprocess, but
111 # whose constructor takes a process object as its argument.
112 #
113
114 class Popen(object):
115
116 def __init__(self, process_obj):
117 sys.stdout.flush()
118 sys.stderr.flush()
119 self.returncode = None
120
121 self.pid = os.fork()
122 if self.pid == 0:
123 if 'random' in sys.modules:
124 import random
125 random.seed()
126 code = process_obj._bootstrap()
127 sys.stdout.flush()
128 sys.stderr.flush()
129 os._exit(code)
130
131 def poll(self, flag=os.WNOHANG):
132 if self.returncode is None:
Florent Xicluna998171f2010-03-08 13:32:17 +0000133 try:
134 pid, sts = os.waitpid(self.pid, flag)
135 except os.error:
136 # Child process not yet created. See #1731717
137 # e.errno == errno.ECHILD == 10
138 return None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139 if pid == self.pid:
140 if os.WIFSIGNALED(sts):
141 self.returncode = -os.WTERMSIG(sts)
142 else:
143 assert os.WIFEXITED(sts)
144 self.returncode = os.WEXITSTATUS(sts)
145 return self.returncode
146
147 def wait(self, timeout=None):
148 if timeout is None:
149 return self.poll(0)
150 deadline = time.time() + timeout
151 delay = 0.0005
152 while 1:
153 res = self.poll()
154 if res is not None:
155 break
156 remaining = deadline - time.time()
157 if remaining <= 0:
158 break
159 delay = min(delay * 2, remaining, 0.05)
160 time.sleep(delay)
161 return res
162
163 def terminate(self):
164 if self.returncode is None:
165 try:
166 os.kill(self.pid, signal.SIGTERM)
167 except OSError as e:
168 if self.wait(timeout=0.1) is None:
169 raise
170
171 @staticmethod
172 def thread_is_spawning():
173 return False
174
175#
176# Windows
177#
178
179else:
180 import _thread
181 import msvcrt
182 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 import time
184
Jesse Nollerf70a5382009-01-18 19:44:02 +0000185 from pickle import dump, load, HIGHEST_PROTOCOL
Brian Curtina6a32742010-08-04 15:47:24 +0000186 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 from .util import Finalize
188
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000189 def dump(obj, file, protocol=None):
190 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 #
193 #
194 #
195
196 TERMINATE = 0x10000
197 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
198
199 exit = win32.ExitProcess
200 close = win32.CloseHandle
201
202 #
203 # _python_exe is the assumed path to the python executable.
204 # People embedding Python want to modify it.
205 #
206
207 if sys.executable.lower().endswith('pythonservice.exe'):
208 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
209 else:
210 _python_exe = sys.executable
211
212 def set_executable(exe):
213 global _python_exe
214 _python_exe = exe
215
216 #
217 #
218 #
219
220 def duplicate(handle, target_process=None, inheritable=False):
221 if target_process is None:
222 target_process = _subprocess.GetCurrentProcess()
223 return _subprocess.DuplicateHandle(
224 _subprocess.GetCurrentProcess(), handle, target_process,
225 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
226 ).Detach()
227
228 #
229 # We define a Popen class similar to the one from subprocess, but
230 # whose constructor takes a process object as its argument.
231 #
232
233 class Popen(object):
234 '''
235 Start a subprocess to run the code of a process object
236 '''
237 _tls = _thread._local()
238
239 def __init__(self, process_obj):
240 # create pipe for communication with child
241 rfd, wfd = os.pipe()
242
243 # get handle for read end of the pipe and make it inheritable
244 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
245 os.close(rfd)
246
247 # start process
248 cmd = get_command_line() + [rhandle]
249 cmd = ' '.join('"%s"' % x for x in cmd)
250 hp, ht, pid, tid = _subprocess.CreateProcess(
251 _python_exe, cmd, None, None, 1, 0, None, None, None
252 )
253 ht.Close()
254 close(rhandle)
255
256 # set attributes of self
257 self.pid = pid
258 self.returncode = None
259 self._handle = hp
260
261 # send information to child
262 prep_data = get_preparation_data(process_obj._name)
263 to_child = os.fdopen(wfd, 'wb')
264 Popen._tls.process_handle = int(hp)
265 try:
266 dump(prep_data, to_child, HIGHEST_PROTOCOL)
267 dump(process_obj, to_child, HIGHEST_PROTOCOL)
268 finally:
269 del Popen._tls.process_handle
270 to_child.close()
271
272 @staticmethod
273 def thread_is_spawning():
274 return getattr(Popen._tls, 'process_handle', None) is not None
275
276 @staticmethod
277 def duplicate_for_child(handle):
278 return duplicate(handle, Popen._tls.process_handle)
279
280 def wait(self, timeout=None):
281 if self.returncode is None:
282 if timeout is None:
283 msecs = _subprocess.INFINITE
284 else:
285 msecs = max(0, int(timeout * 1000 + 0.5))
286
287 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
288 if res == _subprocess.WAIT_OBJECT_0:
289 code = _subprocess.GetExitCodeProcess(self._handle)
290 if code == TERMINATE:
291 code = -signal.SIGTERM
292 self.returncode = code
293
294 return self.returncode
295
296 def poll(self):
297 return self.wait(timeout=0)
298
299 def terminate(self):
300 if self.returncode is None:
301 try:
302 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
303 except WindowsError:
304 if self.wait(timeout=0.1) is None:
305 raise
306
307 #
308 #
309 #
310
311 def is_forking(argv):
312 '''
313 Return whether commandline indicates we are forking
314 '''
315 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
316 assert len(argv) == 3
317 return True
318 else:
319 return False
320
321
322 def freeze_support():
323 '''
324 Run code for process object if this in not the main process
325 '''
326 if is_forking(sys.argv):
327 main()
328 sys.exit()
329
330
331 def get_command_line():
332 '''
333 Returns prefix of command line used for spawning a child process
334 '''
335 if process.current_process()._identity==() and is_forking(sys.argv):
336 raise RuntimeError('''
337 Attempt to start a new process before the current process
338 has finished its bootstrapping phase.
339
340 This probably means that you are on Windows and you have
341 forgotten to use the proper idiom in the main module:
342
343 if __name__ == '__main__':
344 freeze_support()
345 ...
346
347 The "freeze_support()" line can be omitted if the program
348 is not going to be frozen to produce a Windows executable.''')
349
350 if getattr(sys, 'frozen', False):
351 return [sys.executable, '--multiprocessing-fork']
352 else:
353 prog = 'from multiprocessing.forking import main; main()'
354 return [_python_exe, '-c', prog, '--multiprocessing-fork']
355
356
357 def main():
358 '''
359 Run code specifed by data received over pipe
360 '''
361 assert is_forking(sys.argv)
362
363 handle = int(sys.argv[-1])
364 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
365 from_parent = os.fdopen(fd, 'rb')
366
367 process.current_process()._inheriting = True
368 preparation_data = load(from_parent)
369 prepare(preparation_data)
370 self = load(from_parent)
371 process.current_process()._inheriting = False
372
373 from_parent.close()
374
375 exitcode = self._bootstrap()
376 exit(exitcode)
377
378
379 def get_preparation_data(name):
380 '''
381 Return info about parent needed by child to unpickle process object
382 '''
383 from .util import _logger, _log_to_stderr
384
385 d = dict(
386 name=name,
387 sys_path=sys.path,
388 sys_argv=sys.argv,
389 log_to_stderr=_log_to_stderr,
390 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000391 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000392 )
393
394 if _logger is not None:
395 d['log_level'] = _logger.getEffectiveLevel()
396
397 if not WINEXE:
398 main_path = getattr(sys.modules['__main__'], '__file__', None)
399 if not main_path and sys.argv[0] not in ('', '-c'):
400 main_path = sys.argv[0]
401 if main_path is not None:
402 if not os.path.isabs(main_path) and \
403 process.ORIGINAL_DIR is not None:
404 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
405 d['main_path'] = os.path.normpath(main_path)
406
407 return d
408
409 #
410 # Make (Pipe)Connection picklable
411 #
412
413 def reduce_connection(conn):
414 if not Popen.thread_is_spawning():
415 raise RuntimeError(
416 'By default %s objects can only be shared between processes\n'
417 'using inheritance' % type(conn).__name__
418 )
419 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
420 conn.readable, conn.writable)
421
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000422 ForkingPickler.register(Connection, reduce_connection)
423 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424
425#
426# Prepare current process
427#
428
429old_main_modules = []
430
431def prepare(data):
432 '''
433 Try to get current process ready to unpickle process object
434 '''
435 old_main_modules.append(sys.modules['__main__'])
436
437 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000438 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000439
440 if 'authkey' in data:
441 process.current_process()._authkey = data['authkey']
442
443 if 'log_to_stderr' in data and data['log_to_stderr']:
444 util.log_to_stderr()
445
446 if 'log_level' in data:
447 util.get_logger().setLevel(data['log_level'])
448
449 if 'sys_path' in data:
450 sys.path = data['sys_path']
451
452 if 'sys_argv' in data:
453 sys.argv = data['sys_argv']
454
455 if 'dir' in data:
456 os.chdir(data['dir'])
457
458 if 'orig_dir' in data:
459 process.ORIGINAL_DIR = data['orig_dir']
460
461 if 'main_path' in data:
462 main_path = data['main_path']
463 main_name = os.path.splitext(os.path.basename(main_path))[0]
464 if main_name == '__init__':
465 main_name = os.path.basename(os.path.dirname(main_path))
466
467 if main_name != 'ipython':
468 import imp
469
470 if main_path is None:
471 dirs = None
472 elif os.path.basename(main_path).startswith('__init__.py'):
473 dirs = [os.path.dirname(os.path.dirname(main_path))]
474 else:
475 dirs = [os.path.dirname(main_path)]
476
477 assert main_name not in sys.modules, main_name
478 file, path_name, etc = imp.find_module(main_name, dirs)
479 try:
480 # We would like to do "imp.load_module('__main__', ...)"
481 # here. However, that would cause 'if __name__ ==
482 # "__main__"' clauses to be executed.
483 main_module = imp.load_module(
484 '__parents_main__', file, path_name, etc
485 )
486 finally:
487 if file:
488 file.close()
489
490 sys.modules['__main__'] = main_module
491 main_module.__name__ = '__main__'
492
493 # Try to make the potentially picklable objects in
494 # sys.modules['__main__'] realize they are in the main
495 # module -- somewhat ugly.
496 for obj in list(main_module.__dict__.values()):
497 try:
498 if obj.__module__ == '__parents_main__':
499 obj.__module__ = '__main__'
500 except Exception:
501 pass