blob: e1a64df2bbfe2986c0a0a637415a0e082bf86099 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import os
10import sys
11import signal
12
13from multiprocessing import util, process
14
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000015__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000016
17#
18# Check that the current thread is spawning a child process
19#
20
21def assert_spawning(self):
22 if not Popen.thread_is_spawning():
23 raise RuntimeError(
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self).__name__
26 )
27
28#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000029# Try making some callable types picklable
30#
31
32from pickle import _Pickler as Pickler
33class ForkingPickler(Pickler):
34 dispatch = Pickler.dispatch.copy()
35 @classmethod
36 def register(cls, type, reduce):
37 def dispatcher(self, obj):
38 rv = reduce(obj)
39 if isinstance(rv, str):
40 self.save_global(obj, rv)
41 else:
42 self.save_reduce(obj=obj, *rv)
43 cls.dispatch[type] = dispatcher
44
45def _reduce_method(m):
46 if m.__self__ is None:
47 return getattr, (m.__class__, m.__func__.__name__)
48 else:
49 return getattr, (m.__self__, m.__func__.__name__)
50class _C:
51 def f(self):
52 pass
53ForkingPickler.register(type(_C().f), _reduce_method)
54
55
56def _reduce_method_descriptor(m):
57 return getattr, (m.__objclass__, m.__name__)
58ForkingPickler.register(type(list.append), _reduce_method_descriptor)
59ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
60
61try:
62 from functools import partial
63except ImportError:
64 pass
65else:
66 def _reduce_partial(p):
67 return _rebuild_partial, (p.func, p.args, p.keywords or {})
68 def _rebuild_partial(func, args, keywords):
69 return partial(func, *args, **keywords)
70 ForkingPickler.register(partial, _reduce_partial)
71
72#
Benjamin Petersone711caf2008-06-11 16:44:04 +000073# Unix
74#
75
76if sys.platform != 'win32':
77 import time
78
79 exit = os._exit
80 duplicate = os.dup
81 close = os.close
82
83 #
84 # We define a Popen class similar to the one from subprocess, but
85 # whose constructor takes a process object as its argument.
86 #
87
88 class Popen(object):
89
90 def __init__(self, process_obj):
91 sys.stdout.flush()
92 sys.stderr.flush()
93 self.returncode = None
94
95 self.pid = os.fork()
96 if self.pid == 0:
97 if 'random' in sys.modules:
98 import random
99 random.seed()
100 code = process_obj._bootstrap()
101 sys.stdout.flush()
102 sys.stderr.flush()
103 os._exit(code)
104
105 def poll(self, flag=os.WNOHANG):
106 if self.returncode is None:
107 pid, sts = os.waitpid(self.pid, flag)
108 if pid == self.pid:
109 if os.WIFSIGNALED(sts):
110 self.returncode = -os.WTERMSIG(sts)
111 else:
112 assert os.WIFEXITED(sts)
113 self.returncode = os.WEXITSTATUS(sts)
114 return self.returncode
115
116 def wait(self, timeout=None):
117 if timeout is None:
118 return self.poll(0)
119 deadline = time.time() + timeout
120 delay = 0.0005
121 while 1:
122 res = self.poll()
123 if res is not None:
124 break
125 remaining = deadline - time.time()
126 if remaining <= 0:
127 break
128 delay = min(delay * 2, remaining, 0.05)
129 time.sleep(delay)
130 return res
131
132 def terminate(self):
133 if self.returncode is None:
134 try:
135 os.kill(self.pid, signal.SIGTERM)
136 except OSError as e:
137 if self.wait(timeout=0.1) is None:
138 raise
139
140 @staticmethod
141 def thread_is_spawning():
142 return False
143
144#
145# Windows
146#
147
148else:
149 import _thread
150 import msvcrt
151 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152 import time
153
154 from ._multiprocessing import win32, Connection, PipeConnection
155 from .util import Finalize
156
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000157 #try:
158 # from cPickle import dump, load, HIGHEST_PROTOCOL
159 #except ImportError:
160 from pickle import load, HIGHEST_PROTOCOL
161
162 def dump(obj, file, protocol=None):
163 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164
165 #
166 #
167 #
168
169 TERMINATE = 0x10000
170 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
171
172 exit = win32.ExitProcess
173 close = win32.CloseHandle
174
175 #
176 # _python_exe is the assumed path to the python executable.
177 # People embedding Python want to modify it.
178 #
179
180 if sys.executable.lower().endswith('pythonservice.exe'):
181 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
182 else:
183 _python_exe = sys.executable
184
185 def set_executable(exe):
186 global _python_exe
187 _python_exe = exe
188
189 #
190 #
191 #
192
193 def duplicate(handle, target_process=None, inheritable=False):
194 if target_process is None:
195 target_process = _subprocess.GetCurrentProcess()
196 return _subprocess.DuplicateHandle(
197 _subprocess.GetCurrentProcess(), handle, target_process,
198 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
199 ).Detach()
200
201 #
202 # We define a Popen class similar to the one from subprocess, but
203 # whose constructor takes a process object as its argument.
204 #
205
206 class Popen(object):
207 '''
208 Start a subprocess to run the code of a process object
209 '''
210 _tls = _thread._local()
211
212 def __init__(self, process_obj):
213 # create pipe for communication with child
214 rfd, wfd = os.pipe()
215
216 # get handle for read end of the pipe and make it inheritable
217 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
218 os.close(rfd)
219
220 # start process
221 cmd = get_command_line() + [rhandle]
222 cmd = ' '.join('"%s"' % x for x in cmd)
223 hp, ht, pid, tid = _subprocess.CreateProcess(
224 _python_exe, cmd, None, None, 1, 0, None, None, None
225 )
226 ht.Close()
227 close(rhandle)
228
229 # set attributes of self
230 self.pid = pid
231 self.returncode = None
232 self._handle = hp
233
234 # send information to child
235 prep_data = get_preparation_data(process_obj._name)
236 to_child = os.fdopen(wfd, 'wb')
237 Popen._tls.process_handle = int(hp)
238 try:
239 dump(prep_data, to_child, HIGHEST_PROTOCOL)
240 dump(process_obj, to_child, HIGHEST_PROTOCOL)
241 finally:
242 del Popen._tls.process_handle
243 to_child.close()
244
245 @staticmethod
246 def thread_is_spawning():
247 return getattr(Popen._tls, 'process_handle', None) is not None
248
249 @staticmethod
250 def duplicate_for_child(handle):
251 return duplicate(handle, Popen._tls.process_handle)
252
253 def wait(self, timeout=None):
254 if self.returncode is None:
255 if timeout is None:
256 msecs = _subprocess.INFINITE
257 else:
258 msecs = max(0, int(timeout * 1000 + 0.5))
259
260 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
261 if res == _subprocess.WAIT_OBJECT_0:
262 code = _subprocess.GetExitCodeProcess(self._handle)
263 if code == TERMINATE:
264 code = -signal.SIGTERM
265 self.returncode = code
266
267 return self.returncode
268
269 def poll(self):
270 return self.wait(timeout=0)
271
272 def terminate(self):
273 if self.returncode is None:
274 try:
275 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
276 except WindowsError:
277 if self.wait(timeout=0.1) is None:
278 raise
279
280 #
281 #
282 #
283
284 def is_forking(argv):
285 '''
286 Return whether commandline indicates we are forking
287 '''
288 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
289 assert len(argv) == 3
290 return True
291 else:
292 return False
293
294
295 def freeze_support():
296 '''
297 Run code for process object if this in not the main process
298 '''
299 if is_forking(sys.argv):
300 main()
301 sys.exit()
302
303
304 def get_command_line():
305 '''
306 Returns prefix of command line used for spawning a child process
307 '''
308 if process.current_process()._identity==() and is_forking(sys.argv):
309 raise RuntimeError('''
310 Attempt to start a new process before the current process
311 has finished its bootstrapping phase.
312
313 This probably means that you are on Windows and you have
314 forgotten to use the proper idiom in the main module:
315
316 if __name__ == '__main__':
317 freeze_support()
318 ...
319
320 The "freeze_support()" line can be omitted if the program
321 is not going to be frozen to produce a Windows executable.''')
322
323 if getattr(sys, 'frozen', False):
324 return [sys.executable, '--multiprocessing-fork']
325 else:
326 prog = 'from multiprocessing.forking import main; main()'
327 return [_python_exe, '-c', prog, '--multiprocessing-fork']
328
329
330 def main():
331 '''
332 Run code specifed by data received over pipe
333 '''
334 assert is_forking(sys.argv)
335
336 handle = int(sys.argv[-1])
337 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
338 from_parent = os.fdopen(fd, 'rb')
339
340 process.current_process()._inheriting = True
341 preparation_data = load(from_parent)
342 prepare(preparation_data)
343 self = load(from_parent)
344 process.current_process()._inheriting = False
345
346 from_parent.close()
347
348 exitcode = self._bootstrap()
349 exit(exitcode)
350
351
352 def get_preparation_data(name):
353 '''
354 Return info about parent needed by child to unpickle process object
355 '''
356 from .util import _logger, _log_to_stderr
357
358 d = dict(
359 name=name,
360 sys_path=sys.path,
361 sys_argv=sys.argv,
362 log_to_stderr=_log_to_stderr,
363 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000364 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000365 )
366
367 if _logger is not None:
368 d['log_level'] = _logger.getEffectiveLevel()
369
370 if not WINEXE:
371 main_path = getattr(sys.modules['__main__'], '__file__', None)
372 if not main_path and sys.argv[0] not in ('', '-c'):
373 main_path = sys.argv[0]
374 if main_path is not None:
375 if not os.path.isabs(main_path) and \
376 process.ORIGINAL_DIR is not None:
377 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
378 d['main_path'] = os.path.normpath(main_path)
379
380 return d
381
382 #
383 # Make (Pipe)Connection picklable
384 #
385
386 def reduce_connection(conn):
387 if not Popen.thread_is_spawning():
388 raise RuntimeError(
389 'By default %s objects can only be shared between processes\n'
390 'using inheritance' % type(conn).__name__
391 )
392 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
393 conn.readable, conn.writable)
394
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000395 ForkingPickler.register(Connection, reduce_connection)
396 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397
398#
399# Prepare current process
400#
401
402old_main_modules = []
403
404def prepare(data):
405 '''
406 Try to get current process ready to unpickle process object
407 '''
408 old_main_modules.append(sys.modules['__main__'])
409
410 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000411 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000412
413 if 'authkey' in data:
414 process.current_process()._authkey = data['authkey']
415
416 if 'log_to_stderr' in data and data['log_to_stderr']:
417 util.log_to_stderr()
418
419 if 'log_level' in data:
420 util.get_logger().setLevel(data['log_level'])
421
422 if 'sys_path' in data:
423 sys.path = data['sys_path']
424
425 if 'sys_argv' in data:
426 sys.argv = data['sys_argv']
427
428 if 'dir' in data:
429 os.chdir(data['dir'])
430
431 if 'orig_dir' in data:
432 process.ORIGINAL_DIR = data['orig_dir']
433
434 if 'main_path' in data:
435 main_path = data['main_path']
436 main_name = os.path.splitext(os.path.basename(main_path))[0]
437 if main_name == '__init__':
438 main_name = os.path.basename(os.path.dirname(main_path))
439
440 if main_name != 'ipython':
441 import imp
442
443 if main_path is None:
444 dirs = None
445 elif os.path.basename(main_path).startswith('__init__.py'):
446 dirs = [os.path.dirname(os.path.dirname(main_path))]
447 else:
448 dirs = [os.path.dirname(main_path)]
449
450 assert main_name not in sys.modules, main_name
451 file, path_name, etc = imp.find_module(main_name, dirs)
452 try:
453 # We would like to do "imp.load_module('__main__', ...)"
454 # here. However, that would cause 'if __name__ ==
455 # "__main__"' clauses to be executed.
456 main_module = imp.load_module(
457 '__parents_main__', file, path_name, etc
458 )
459 finally:
460 if file:
461 file.close()
462
463 sys.modules['__main__'] = main_module
464 main_module.__name__ = '__main__'
465
466 # Try to make the potentially picklable objects in
467 # sys.modules['__main__'] realize they are in the main
468 # module -- somewhat ugly.
469 for obj in list(main_module.__dict__.values()):
470 try:
471 if obj.__module__ == '__parents_main__':
472 obj.__module__ = '__main__'
473 except Exception:
474 pass