blob: 3c0f568927222fecdc1099602c83ff72d029f59e [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import os
10import sys
11import signal
12
13from multiprocessing import util, process
14
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000015__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Petersone711caf2008-06-11 16:44:04 +000016
17#
18# Check that the current thread is spawning a child process
19#
20
21def assert_spawning(self):
22 if not Popen.thread_is_spawning():
23 raise RuntimeError(
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self).__name__
26 )
27
28#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000029# Try making some callable types picklable
30#
31
32from pickle import _Pickler as Pickler
33class ForkingPickler(Pickler):
34 dispatch = Pickler.dispatch.copy()
35 @classmethod
36 def register(cls, type, reduce):
37 def dispatcher(self, obj):
38 rv = reduce(obj)
39 if isinstance(rv, str):
40 self.save_global(obj, rv)
41 else:
42 self.save_reduce(obj=obj, *rv)
43 cls.dispatch[type] = dispatcher
44
45def _reduce_method(m):
46 if m.__self__ is None:
47 return getattr, (m.__class__, m.__func__.__name__)
48 else:
49 return getattr, (m.__self__, m.__func__.__name__)
50class _C:
51 def f(self):
52 pass
53ForkingPickler.register(type(_C().f), _reduce_method)
54
55
56def _reduce_method_descriptor(m):
57 return getattr, (m.__objclass__, m.__name__)
58ForkingPickler.register(type(list.append), _reduce_method_descriptor)
59ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
60
61try:
62 from functools import partial
63except ImportError:
64 pass
65else:
66 def _reduce_partial(p):
67 return _rebuild_partial, (p.func, p.args, p.keywords or {})
68 def _rebuild_partial(func, args, keywords):
69 return partial(func, *args, **keywords)
70 ForkingPickler.register(partial, _reduce_partial)
71
72#
Benjamin Petersone711caf2008-06-11 16:44:04 +000073# Unix
74#
75
76if sys.platform != 'win32':
77 import time
78
79 exit = os._exit
80 duplicate = os.dup
81 close = os.close
82
83 #
84 # We define a Popen class similar to the one from subprocess, but
85 # whose constructor takes a process object as its argument.
86 #
87
88 class Popen(object):
89
90 def __init__(self, process_obj):
91 sys.stdout.flush()
92 sys.stderr.flush()
93 self.returncode = None
94
95 self.pid = os.fork()
96 if self.pid == 0:
97 if 'random' in sys.modules:
98 import random
99 random.seed()
100 code = process_obj._bootstrap()
101 sys.stdout.flush()
102 sys.stderr.flush()
103 os._exit(code)
104
105 def poll(self, flag=os.WNOHANG):
106 if self.returncode is None:
107 pid, sts = os.waitpid(self.pid, flag)
108 if pid == self.pid:
109 if os.WIFSIGNALED(sts):
110 self.returncode = -os.WTERMSIG(sts)
111 else:
112 assert os.WIFEXITED(sts)
113 self.returncode = os.WEXITSTATUS(sts)
114 return self.returncode
115
116 def wait(self, timeout=None):
117 if timeout is None:
118 return self.poll(0)
119 deadline = time.time() + timeout
120 delay = 0.0005
121 while 1:
122 res = self.poll()
123 if res is not None:
124 break
125 remaining = deadline - time.time()
126 if remaining <= 0:
127 break
128 delay = min(delay * 2, remaining, 0.05)
129 time.sleep(delay)
130 return res
131
132 def terminate(self):
133 if self.returncode is None:
134 try:
135 os.kill(self.pid, signal.SIGTERM)
136 except OSError as e:
137 if self.wait(timeout=0.1) is None:
138 raise
139
140 @staticmethod
141 def thread_is_spawning():
142 return False
143
144#
145# Windows
146#
147
148else:
149 import _thread
150 import msvcrt
151 import _subprocess
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152 import time
153
Jesse Nollerf70a5382009-01-18 19:44:02 +0000154 from pickle import dump, load, HIGHEST_PROTOCOL
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155 from ._multiprocessing import win32, Connection, PipeConnection
156 from .util import Finalize
157
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000158 def dump(obj, file, protocol=None):
159 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160
161 #
162 #
163 #
164
165 TERMINATE = 0x10000
166 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
167
168 exit = win32.ExitProcess
169 close = win32.CloseHandle
170
171 #
172 # _python_exe is the assumed path to the python executable.
173 # People embedding Python want to modify it.
174 #
175
176 if sys.executable.lower().endswith('pythonservice.exe'):
177 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
178 else:
179 _python_exe = sys.executable
180
181 def set_executable(exe):
182 global _python_exe
183 _python_exe = exe
184
185 #
186 #
187 #
188
189 def duplicate(handle, target_process=None, inheritable=False):
190 if target_process is None:
191 target_process = _subprocess.GetCurrentProcess()
192 return _subprocess.DuplicateHandle(
193 _subprocess.GetCurrentProcess(), handle, target_process,
194 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
195 ).Detach()
196
197 #
198 # We define a Popen class similar to the one from subprocess, but
199 # whose constructor takes a process object as its argument.
200 #
201
202 class Popen(object):
203 '''
204 Start a subprocess to run the code of a process object
205 '''
206 _tls = _thread._local()
207
208 def __init__(self, process_obj):
209 # create pipe for communication with child
210 rfd, wfd = os.pipe()
211
212 # get handle for read end of the pipe and make it inheritable
213 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
214 os.close(rfd)
215
216 # start process
217 cmd = get_command_line() + [rhandle]
218 cmd = ' '.join('"%s"' % x for x in cmd)
219 hp, ht, pid, tid = _subprocess.CreateProcess(
220 _python_exe, cmd, None, None, 1, 0, None, None, None
221 )
222 ht.Close()
223 close(rhandle)
224
225 # set attributes of self
226 self.pid = pid
227 self.returncode = None
228 self._handle = hp
229
230 # send information to child
231 prep_data = get_preparation_data(process_obj._name)
232 to_child = os.fdopen(wfd, 'wb')
233 Popen._tls.process_handle = int(hp)
234 try:
235 dump(prep_data, to_child, HIGHEST_PROTOCOL)
236 dump(process_obj, to_child, HIGHEST_PROTOCOL)
237 finally:
238 del Popen._tls.process_handle
239 to_child.close()
240
241 @staticmethod
242 def thread_is_spawning():
243 return getattr(Popen._tls, 'process_handle', None) is not None
244
245 @staticmethod
246 def duplicate_for_child(handle):
247 return duplicate(handle, Popen._tls.process_handle)
248
249 def wait(self, timeout=None):
250 if self.returncode is None:
251 if timeout is None:
252 msecs = _subprocess.INFINITE
253 else:
254 msecs = max(0, int(timeout * 1000 + 0.5))
255
256 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
257 if res == _subprocess.WAIT_OBJECT_0:
258 code = _subprocess.GetExitCodeProcess(self._handle)
259 if code == TERMINATE:
260 code = -signal.SIGTERM
261 self.returncode = code
262
263 return self.returncode
264
265 def poll(self):
266 return self.wait(timeout=0)
267
268 def terminate(self):
269 if self.returncode is None:
270 try:
271 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
272 except WindowsError:
273 if self.wait(timeout=0.1) is None:
274 raise
275
276 #
277 #
278 #
279
280 def is_forking(argv):
281 '''
282 Return whether commandline indicates we are forking
283 '''
284 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
285 assert len(argv) == 3
286 return True
287 else:
288 return False
289
290
291 def freeze_support():
292 '''
293 Run code for process object if this in not the main process
294 '''
295 if is_forking(sys.argv):
296 main()
297 sys.exit()
298
299
300 def get_command_line():
301 '''
302 Returns prefix of command line used for spawning a child process
303 '''
304 if process.current_process()._identity==() and is_forking(sys.argv):
305 raise RuntimeError('''
306 Attempt to start a new process before the current process
307 has finished its bootstrapping phase.
308
309 This probably means that you are on Windows and you have
310 forgotten to use the proper idiom in the main module:
311
312 if __name__ == '__main__':
313 freeze_support()
314 ...
315
316 The "freeze_support()" line can be omitted if the program
317 is not going to be frozen to produce a Windows executable.''')
318
319 if getattr(sys, 'frozen', False):
320 return [sys.executable, '--multiprocessing-fork']
321 else:
322 prog = 'from multiprocessing.forking import main; main()'
323 return [_python_exe, '-c', prog, '--multiprocessing-fork']
324
325
326 def main():
327 '''
328 Run code specifed by data received over pipe
329 '''
330 assert is_forking(sys.argv)
331
332 handle = int(sys.argv[-1])
333 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
334 from_parent = os.fdopen(fd, 'rb')
335
336 process.current_process()._inheriting = True
337 preparation_data = load(from_parent)
338 prepare(preparation_data)
339 self = load(from_parent)
340 process.current_process()._inheriting = False
341
342 from_parent.close()
343
344 exitcode = self._bootstrap()
345 exit(exitcode)
346
347
348 def get_preparation_data(name):
349 '''
350 Return info about parent needed by child to unpickle process object
351 '''
352 from .util import _logger, _log_to_stderr
353
354 d = dict(
355 name=name,
356 sys_path=sys.path,
357 sys_argv=sys.argv,
358 log_to_stderr=_log_to_stderr,
359 orig_dir=process.ORIGINAL_DIR,
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000360 authkey=process.current_process().authkey,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000361 )
362
363 if _logger is not None:
364 d['log_level'] = _logger.getEffectiveLevel()
365
366 if not WINEXE:
367 main_path = getattr(sys.modules['__main__'], '__file__', None)
368 if not main_path and sys.argv[0] not in ('', '-c'):
369 main_path = sys.argv[0]
370 if main_path is not None:
371 if not os.path.isabs(main_path) and \
372 process.ORIGINAL_DIR is not None:
373 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
374 d['main_path'] = os.path.normpath(main_path)
375
376 return d
377
378 #
379 # Make (Pipe)Connection picklable
380 #
381
382 def reduce_connection(conn):
383 if not Popen.thread_is_spawning():
384 raise RuntimeError(
385 'By default %s objects can only be shared between processes\n'
386 'using inheritance' % type(conn).__name__
387 )
388 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
389 conn.readable, conn.writable)
390
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000391 ForkingPickler.register(Connection, reduce_connection)
392 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393
394#
395# Prepare current process
396#
397
398old_main_modules = []
399
400def prepare(data):
401 '''
402 Try to get current process ready to unpickle process object
403 '''
404 old_main_modules.append(sys.modules['__main__'])
405
406 if 'name' in data:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000407 process.current_process().name = data['name']
Benjamin Petersone711caf2008-06-11 16:44:04 +0000408
409 if 'authkey' in data:
410 process.current_process()._authkey = data['authkey']
411
412 if 'log_to_stderr' in data and data['log_to_stderr']:
413 util.log_to_stderr()
414
415 if 'log_level' in data:
416 util.get_logger().setLevel(data['log_level'])
417
418 if 'sys_path' in data:
419 sys.path = data['sys_path']
420
421 if 'sys_argv' in data:
422 sys.argv = data['sys_argv']
423
424 if 'dir' in data:
425 os.chdir(data['dir'])
426
427 if 'orig_dir' in data:
428 process.ORIGINAL_DIR = data['orig_dir']
429
430 if 'main_path' in data:
431 main_path = data['main_path']
432 main_name = os.path.splitext(os.path.basename(main_path))[0]
433 if main_name == '__init__':
434 main_name = os.path.basename(os.path.dirname(main_path))
435
436 if main_name != 'ipython':
437 import imp
438
439 if main_path is None:
440 dirs = None
441 elif os.path.basename(main_path).startswith('__init__.py'):
442 dirs = [os.path.dirname(os.path.dirname(main_path))]
443 else:
444 dirs = [os.path.dirname(main_path)]
445
446 assert main_name not in sys.modules, main_name
447 file, path_name, etc = imp.find_module(main_name, dirs)
448 try:
449 # We would like to do "imp.load_module('__main__', ...)"
450 # here. However, that would cause 'if __name__ ==
451 # "__main__"' clauses to be executed.
452 main_module = imp.load_module(
453 '__parents_main__', file, path_name, etc
454 )
455 finally:
456 if file:
457 file.close()
458
459 sys.modules['__main__'] = main_module
460 main_module.__name__ = '__main__'
461
462 # Try to make the potentially picklable objects in
463 # sys.modules['__main__'] realize they are in the main
464 # module -- somewhat ugly.
465 for obj in list(main_module.__dict__.values()):
466 try:
467 if obj.__module__ == '__parents_main__':
468 obj.__module__ = '__main__'
469 except Exception:
470 pass