blob: a7e8eb79cd248cf147350b6512814854e68c1d61 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import os
10import sys
11import signal
12
13from multiprocessing import util, process
14
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000015__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000016
17#
18# Check that the current thread is spawning a child process
19#
20
21def assert_spawning(self):
22 if not Popen.thread_is_spawning():
23 raise RuntimeError(
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self).__name__
26 )
27
28#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000029# Try making some callable types picklable
30#
31
32from pickle import _Pickler as Pickler
33class ForkingPickler(Pickler):
34 dispatch = Pickler.dispatch.copy()
35 @classmethod
36 def register(cls, type, reduce):
37 def dispatcher(self, obj):
38 rv = reduce(obj)
39 if isinstance(rv, str):
40 self.save_global(obj, rv)
41 else:
42 self.save_reduce(obj=obj, *rv)
43 cls.dispatch[type] = dispatcher
44
45def _reduce_method(m):
46 if m.__self__ is None:
47 return getattr, (m.__class__, m.__func__.__name__)
48 else:
49 return getattr, (m.__self__, m.__func__.__name__)
50class _C:
51 def f(self):
52 pass
53ForkingPickler.register(type(_C().f), _reduce_method)
54
55
56def _reduce_method_descriptor(m):
57 return getattr, (m.__objclass__, m.__name__)
58ForkingPickler.register(type(list.append), _reduce_method_descriptor)
59ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
60
61try:
62 from functools import partial
63except ImportError:
64 pass
65else:
66 def _reduce_partial(p):
67 return _rebuild_partial, (p.func, p.args, p.keywords or {})
68 def _rebuild_partial(func, args, keywords):
69 return partial(func, *args, **keywords)
70 ForkingPickler.register(partial, _reduce_partial)
71
72#
Benjamin Petersone711caf2008-06-11 16:44:04 +000073# Unix
74#
75
76if sys.platform != 'win32':
77 import time
78
79 exit = os._exit
80 duplicate = os.dup
81 close = os.close
82
83 #
84 # We define a Popen class similar to the one from subprocess, but
85 # whose constructor takes a process object as its argument.
86 #
87
88 class Popen(object):
89
90 def __init__(self, process_obj):
91 sys.stdout.flush()
92 sys.stderr.flush()
93 self.returncode = None
94
95 self.pid = os.fork()
96 if self.pid == 0:
97 if 'random' in sys.modules:
98 import random
99 random.seed()
100 code = process_obj._bootstrap()
101 sys.stdout.flush()
102 sys.stderr.flush()
103 os._exit(code)
104
105 def poll(self, flag=os.WNOHANG):
106 if self.returncode is None:
Florent Xicluna998171f2010-03-08 13:32:17 +0000107 try:
108 pid, sts = os.waitpid(self.pid, flag)
109 except os.error:
110 # Child process not yet created. See #1731717
111 # e.errno == errno.ECHILD == 10
112 return None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113 if pid == self.pid:
114 if os.WIFSIGNALED(sts):
115 self.returncode = -os.WTERMSIG(sts)
116 else:
117 assert os.WIFEXITED(sts)
118 self.returncode = os.WEXITSTATUS(sts)
119 return self.returncode
120
121 def wait(self, timeout=None):
122 if timeout is None:
123 return self.poll(0)
124 deadline = time.time() + timeout
125 delay = 0.0005
126 while 1:
127 res = self.poll()
128 if res is not None:
129 break
130 remaining = deadline - time.time()
131 if remaining <= 0:
132 break
133 delay = min(delay * 2, remaining, 0.05)
134 time.sleep(delay)
135 return res
136
137 def terminate(self):
138 if self.returncode is None:
139 try:
140 os.kill(self.pid, signal.SIGTERM)
141 except OSError as e:
142 if self.wait(timeout=0.1) is None:
143 raise
144
145 @staticmethod
146 def thread_is_spawning():
147 return False
148
149#
150# Windows
151#
152
153else:
154 import _thread
155 import msvcrt
156 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 import time
158
Jesse Nollerf70a5382009-01-18 19:44:02 +0000159 from pickle import dump, load, HIGHEST_PROTOCOL
Brian Curtina6a32742010-08-04 15:47:24 +0000160 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 from .util import Finalize
162
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000163 def dump(obj, file, protocol=None):
164 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165
166 #
167 #
168 #
169
170 TERMINATE = 0x10000
171 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
172
173 exit = win32.ExitProcess
174 close = win32.CloseHandle
175
176 #
177 # _python_exe is the assumed path to the python executable.
178 # People embedding Python want to modify it.
179 #
180
181 if sys.executable.lower().endswith('pythonservice.exe'):
182 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
183 else:
184 _python_exe = sys.executable
185
186 def set_executable(exe):
187 global _python_exe
188 _python_exe = exe
189
190 #
191 #
192 #
193
194 def duplicate(handle, target_process=None, inheritable=False):
195 if target_process is None:
196 target_process = _subprocess.GetCurrentProcess()
197 return _subprocess.DuplicateHandle(
198 _subprocess.GetCurrentProcess(), handle, target_process,
199 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
200 ).Detach()
201
202 #
203 # We define a Popen class similar to the one from subprocess, but
204 # whose constructor takes a process object as its argument.
205 #
206
207 class Popen(object):
208 '''
209 Start a subprocess to run the code of a process object
210 '''
211 _tls = _thread._local()
212
213 def __init__(self, process_obj):
214 # create pipe for communication with child
215 rfd, wfd = os.pipe()
216
217 # get handle for read end of the pipe and make it inheritable
218 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
219 os.close(rfd)
220
221 # start process
222 cmd = get_command_line() + [rhandle]
223 cmd = ' '.join('"%s"' % x for x in cmd)
224 hp, ht, pid, tid = _subprocess.CreateProcess(
225 _python_exe, cmd, None, None, 1, 0, None, None, None
226 )
227 ht.Close()
228 close(rhandle)
229
230 # set attributes of self
231 self.pid = pid
232 self.returncode = None
233 self._handle = hp
234
235 # send information to child
236 prep_data = get_preparation_data(process_obj._name)
237 to_child = os.fdopen(wfd, 'wb')
238 Popen._tls.process_handle = int(hp)
239 try:
240 dump(prep_data, to_child, HIGHEST_PROTOCOL)
241 dump(process_obj, to_child, HIGHEST_PROTOCOL)
242 finally:
243 del Popen._tls.process_handle
244 to_child.close()
245
246 @staticmethod
247 def thread_is_spawning():
248 return getattr(Popen._tls, 'process_handle', None) is not None
249
250 @staticmethod
251 def duplicate_for_child(handle):
252 return duplicate(handle, Popen._tls.process_handle)
253
254 def wait(self, timeout=None):
255 if self.returncode is None:
256 if timeout is None:
257 msecs = _subprocess.INFINITE
258 else:
259 msecs = max(0, int(timeout * 1000 + 0.5))
260
261 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
262 if res == _subprocess.WAIT_OBJECT_0:
263 code = _subprocess.GetExitCodeProcess(self._handle)
264 if code == TERMINATE:
265 code = -signal.SIGTERM
266 self.returncode = code
267
268 return self.returncode
269
270 def poll(self):
271 return self.wait(timeout=0)
272
273 def terminate(self):
274 if self.returncode is None:
275 try:
276 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
277 except WindowsError:
278 if self.wait(timeout=0.1) is None:
279 raise
280
281 #
282 #
283 #
284
285 def is_forking(argv):
286 '''
287 Return whether commandline indicates we are forking
288 '''
289 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
290 assert len(argv) == 3
291 return True
292 else:
293 return False
294
295
296 def freeze_support():
297 '''
298 Run code for process object if this in not the main process
299 '''
300 if is_forking(sys.argv):
301 main()
302 sys.exit()
303
304
305 def get_command_line():
306 '''
307 Returns prefix of command line used for spawning a child process
308 '''
309 if process.current_process()._identity==() and is_forking(sys.argv):
310 raise RuntimeError('''
311 Attempt to start a new process before the current process
312 has finished its bootstrapping phase.
313
314 This probably means that you are on Windows and you have
315 forgotten to use the proper idiom in the main module:
316
317 if __name__ == '__main__':
318 freeze_support()
319 ...
320
321 The "freeze_support()" line can be omitted if the program
322 is not going to be frozen to produce a Windows executable.''')
323
324 if getattr(sys, 'frozen', False):
325 return [sys.executable, '--multiprocessing-fork']
326 else:
327 prog = 'from multiprocessing.forking import main; main()'
328 return [_python_exe, '-c', prog, '--multiprocessing-fork']
329
330
331 def main():
332 '''
333 Run code specifed by data received over pipe
334 '''
335 assert is_forking(sys.argv)
336
337 handle = int(sys.argv[-1])
338 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
339 from_parent = os.fdopen(fd, 'rb')
340
341 process.current_process()._inheriting = True
342 preparation_data = load(from_parent)
343 prepare(preparation_data)
344 self = load(from_parent)
345 process.current_process()._inheriting = False
346
347 from_parent.close()
348
349 exitcode = self._bootstrap()
350 exit(exitcode)
351
352
353 def get_preparation_data(name):
354 '''
355 Return info about parent needed by child to unpickle process object
356 '''
357 from .util import _logger, _log_to_stderr
358
359 d = dict(
360 name=name,
361 sys_path=sys.path,
362 sys_argv=sys.argv,
363 log_to_stderr=_log_to_stderr,
364 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000365 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000366 )
367
368 if _logger is not None:
369 d['log_level'] = _logger.getEffectiveLevel()
370
371 if not WINEXE:
372 main_path = getattr(sys.modules['__main__'], '__file__', None)
373 if not main_path and sys.argv[0] not in ('', '-c'):
374 main_path = sys.argv[0]
375 if main_path is not None:
376 if not os.path.isabs(main_path) and \
377 process.ORIGINAL_DIR is not None:
378 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
379 d['main_path'] = os.path.normpath(main_path)
380
381 return d
382
383 #
384 # Make (Pipe)Connection picklable
385 #
386
387 def reduce_connection(conn):
388 if not Popen.thread_is_spawning():
389 raise RuntimeError(
390 'By default %s objects can only be shared between processes\n'
391 'using inheritance' % type(conn).__name__
392 )
393 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
394 conn.readable, conn.writable)
395
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000396 ForkingPickler.register(Connection, reduce_connection)
397 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000398
399#
400# Prepare current process
401#
402
403old_main_modules = []
404
405def prepare(data):
406 '''
407 Try to get current process ready to unpickle process object
408 '''
409 old_main_modules.append(sys.modules['__main__'])
410
411 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000412 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413
414 if 'authkey' in data:
415 process.current_process()._authkey = data['authkey']
416
417 if 'log_to_stderr' in data and data['log_to_stderr']:
418 util.log_to_stderr()
419
420 if 'log_level' in data:
421 util.get_logger().setLevel(data['log_level'])
422
423 if 'sys_path' in data:
424 sys.path = data['sys_path']
425
426 if 'sys_argv' in data:
427 sys.argv = data['sys_argv']
428
429 if 'dir' in data:
430 os.chdir(data['dir'])
431
432 if 'orig_dir' in data:
433 process.ORIGINAL_DIR = data['orig_dir']
434
435 if 'main_path' in data:
436 main_path = data['main_path']
437 main_name = os.path.splitext(os.path.basename(main_path))[0]
438 if main_name == '__init__':
439 main_name = os.path.basename(os.path.dirname(main_path))
440
441 if main_name != 'ipython':
442 import imp
443
444 if main_path is None:
445 dirs = None
446 elif os.path.basename(main_path).startswith('__init__.py'):
447 dirs = [os.path.dirname(os.path.dirname(main_path))]
448 else:
449 dirs = [os.path.dirname(main_path)]
450
451 assert main_name not in sys.modules, main_name
452 file, path_name, etc = imp.find_module(main_name, dirs)
453 try:
454 # We would like to do "imp.load_module('__main__', ...)"
455 # here. However, that would cause 'if __name__ ==
456 # "__main__"' clauses to be executed.
457 main_module = imp.load_module(
458 '__parents_main__', file, path_name, etc
459 )
460 finally:
461 if file:
462 file.close()
463
464 sys.modules['__main__'] = main_module
465 main_module.__name__ = '__main__'
466
467 # Try to make the potentially picklable objects in
468 # sys.modules['__main__'] realize they are in the main
469 # module -- somewhat ugly.
470 for obj in list(main_module.__dict__.values()):
471 try:
472 if obj.__module__ == '__parents_main__':
473 obj.__module__ = '__main__'
474 except Exception:
475 pass