blob: cb7f323fec915c8111c7412b177a9e796a9ac156 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import os
10import sys
11import signal
12
13from multiprocessing import util, process
14
Jesse Noller13e9d582008-07-16 14:32:36 +000015__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000016
17#
18# Check that the current thread is spawning a child process
19#
20
21def assert_spawning(self):
22 if not Popen.thread_is_spawning():
23 raise RuntimeError(
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self).__name__
26 )
27
28#
Jesse Noller13e9d582008-07-16 14:32:36 +000029# Try making some callable types picklable
30#
31
32from pickle import Pickler
33class ForkingPickler(Pickler):
34 dispatch = Pickler.dispatch.copy()
35
36 @classmethod
37 def register(cls, type, reduce):
38 def dispatcher(self, obj):
39 rv = reduce(obj)
40 self.save_reduce(obj=obj, *rv)
41 cls.dispatch[type] = dispatcher
42
43def _reduce_method(m):
44 if m.im_self is None:
45 return getattr, (m.im_class, m.im_func.func_name)
46 else:
47 return getattr, (m.im_self, m.im_func.func_name)
48ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
49
50def _reduce_method_descriptor(m):
51 return getattr, (m.__objclass__, m.__name__)
52ForkingPickler.register(type(list.append), _reduce_method_descriptor)
53ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
54
55#def _reduce_builtin_function_or_method(m):
56# return getattr, (m.__self__, m.__name__)
57#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
58#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
59
60try:
61 from functools import partial
62except ImportError:
63 pass
64else:
65 def _reduce_partial(p):
66 return _rebuild_partial, (p.func, p.args, p.keywords or {})
67 def _rebuild_partial(func, args, keywords):
68 return partial(func, *args, **keywords)
69 ForkingPickler.register(partial, _reduce_partial)
70
71#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000072# Unix
73#
74
75if sys.platform != 'win32':
76 import time
77
78 exit = os._exit
79 duplicate = os.dup
80 close = os.close
81
82 #
83 # We define a Popen class similar to the one from subprocess, but
84 # whose constructor takes a process object as its argument.
85 #
86
87 class Popen(object):
88
89 def __init__(self, process_obj):
90 sys.stdout.flush()
91 sys.stderr.flush()
92 self.returncode = None
93
94 self.pid = os.fork()
95 if self.pid == 0:
96 if 'random' in sys.modules:
97 import random
98 random.seed()
99 code = process_obj._bootstrap()
100 sys.stdout.flush()
101 sys.stderr.flush()
102 os._exit(code)
103
104 def poll(self, flag=os.WNOHANG):
105 if self.returncode is None:
106 pid, sts = os.waitpid(self.pid, flag)
107 if pid == self.pid:
108 if os.WIFSIGNALED(sts):
109 self.returncode = -os.WTERMSIG(sts)
110 else:
111 assert os.WIFEXITED(sts)
112 self.returncode = os.WEXITSTATUS(sts)
113 return self.returncode
114
115 def wait(self, timeout=None):
116 if timeout is None:
117 return self.poll(0)
118 deadline = time.time() + timeout
119 delay = 0.0005
120 while 1:
121 res = self.poll()
122 if res is not None:
123 break
124 remaining = deadline - time.time()
125 if remaining <= 0:
126 break
127 delay = min(delay * 2, remaining, 0.05)
128 time.sleep(delay)
129 return res
130
131 def terminate(self):
132 if self.returncode is None:
133 try:
134 os.kill(self.pid, signal.SIGTERM)
135 except OSError, e:
136 if self.wait(timeout=0.1) is None:
137 raise
138
139 @staticmethod
140 def thread_is_spawning():
141 return False
142
143#
144# Windows
145#
146
147else:
148 import thread
149 import msvcrt
150 import _subprocess
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000151 import time
152
153 from ._multiprocessing import win32, Connection, PipeConnection
154 from .util import Finalize
155
Jesse Noller13e9d582008-07-16 14:32:36 +0000156 #try:
157 # from cPickle import dump, load, HIGHEST_PROTOCOL
158 #except ImportError:
159 from pickle import load, HIGHEST_PROTOCOL
160
161 def dump(obj, file, protocol=None):
162 ForkingPickler(file, protocol).dump(obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000163
164 #
165 #
166 #
167
168 TERMINATE = 0x10000
169 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
170
171 exit = win32.ExitProcess
172 close = win32.CloseHandle
173
174 #
175 # _python_exe is the assumed path to the python executable.
176 # People embedding Python want to modify it.
177 #
178
179 if sys.executable.lower().endswith('pythonservice.exe'):
180 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
181 else:
182 _python_exe = sys.executable
183
184 def set_executable(exe):
185 global _python_exe
186 _python_exe = exe
187
188 #
189 #
190 #
191
192 def duplicate(handle, target_process=None, inheritable=False):
193 if target_process is None:
194 target_process = _subprocess.GetCurrentProcess()
195 return _subprocess.DuplicateHandle(
196 _subprocess.GetCurrentProcess(), handle, target_process,
197 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
198 ).Detach()
199
200 #
201 # We define a Popen class similar to the one from subprocess, but
202 # whose constructor takes a process object as its argument.
203 #
204
205 class Popen(object):
206 '''
207 Start a subprocess to run the code of a process object
208 '''
209 _tls = thread._local()
210
211 def __init__(self, process_obj):
212 # create pipe for communication with child
213 rfd, wfd = os.pipe()
214
215 # get handle for read end of the pipe and make it inheritable
216 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
217 os.close(rfd)
218
219 # start process
220 cmd = get_command_line() + [rhandle]
221 cmd = ' '.join('"%s"' % x for x in cmd)
222 hp, ht, pid, tid = _subprocess.CreateProcess(
223 _python_exe, cmd, None, None, 1, 0, None, None, None
224 )
225 ht.Close()
226 close(rhandle)
227
228 # set attributes of self
229 self.pid = pid
230 self.returncode = None
231 self._handle = hp
232
233 # send information to child
234 prep_data = get_preparation_data(process_obj._name)
235 to_child = os.fdopen(wfd, 'wb')
236 Popen._tls.process_handle = int(hp)
237 try:
238 dump(prep_data, to_child, HIGHEST_PROTOCOL)
239 dump(process_obj, to_child, HIGHEST_PROTOCOL)
240 finally:
241 del Popen._tls.process_handle
242 to_child.close()
243
244 @staticmethod
245 def thread_is_spawning():
246 return getattr(Popen._tls, 'process_handle', None) is not None
247
248 @staticmethod
249 def duplicate_for_child(handle):
250 return duplicate(handle, Popen._tls.process_handle)
251
252 def wait(self, timeout=None):
253 if self.returncode is None:
254 if timeout is None:
255 msecs = _subprocess.INFINITE
256 else:
257 msecs = max(0, int(timeout * 1000 + 0.5))
258
259 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
260 if res == _subprocess.WAIT_OBJECT_0:
261 code = _subprocess.GetExitCodeProcess(self._handle)
262 if code == TERMINATE:
263 code = -signal.SIGTERM
264 self.returncode = code
265
266 return self.returncode
267
268 def poll(self):
269 return self.wait(timeout=0)
270
271 def terminate(self):
272 if self.returncode is None:
273 try:
274 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
275 except WindowsError:
276 if self.wait(timeout=0.1) is None:
277 raise
278
279 #
280 #
281 #
282
283 def is_forking(argv):
284 '''
285 Return whether commandline indicates we are forking
286 '''
287 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
288 assert len(argv) == 3
289 return True
290 else:
291 return False
292
293
294 def freeze_support():
295 '''
296 Run code for process object if this in not the main process
297 '''
298 if is_forking(sys.argv):
299 main()
300 sys.exit()
301
302
303 def get_command_line():
304 '''
305 Returns prefix of command line used for spawning a child process
306 '''
307 if process.current_process()._identity==() and is_forking(sys.argv):
308 raise RuntimeError('''
309 Attempt to start a new process before the current process
310 has finished its bootstrapping phase.
311
312 This probably means that you are on Windows and you have
313 forgotten to use the proper idiom in the main module:
314
315 if __name__ == '__main__':
316 freeze_support()
317 ...
318
319 The "freeze_support()" line can be omitted if the program
320 is not going to be frozen to produce a Windows executable.''')
321
322 if getattr(sys, 'frozen', False):
323 return [sys.executable, '--multiprocessing-fork']
324 else:
325 prog = 'from multiprocessing.forking import main; main()'
326 return [_python_exe, '-c', prog, '--multiprocessing-fork']
327
328
329 def main():
330 '''
331 Run code specifed by data received over pipe
332 '''
333 assert is_forking(sys.argv)
334
335 handle = int(sys.argv[-1])
336 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
337 from_parent = os.fdopen(fd, 'rb')
338
339 process.current_process()._inheriting = True
340 preparation_data = load(from_parent)
341 prepare(preparation_data)
342 self = load(from_parent)
343 process.current_process()._inheriting = False
344
345 from_parent.close()
346
347 exitcode = self._bootstrap()
348 exit(exitcode)
349
350
351 def get_preparation_data(name):
352 '''
353 Return info about parent needed by child to unpickle process object
354 '''
355 from .util import _logger, _log_to_stderr
356
357 d = dict(
358 name=name,
359 sys_path=sys.path,
360 sys_argv=sys.argv,
361 log_to_stderr=_log_to_stderr,
362 orig_dir=process.ORIGINAL_DIR,
363 authkey=process.current_process().get_authkey(),
364 )
365
366 if _logger is not None:
367 d['log_level'] = _logger.getEffectiveLevel()
368
369 if not WINEXE:
370 main_path = getattr(sys.modules['__main__'], '__file__', None)
371 if not main_path and sys.argv[0] not in ('', '-c'):
372 main_path = sys.argv[0]
373 if main_path is not None:
374 if not os.path.isabs(main_path) and \
375 process.ORIGINAL_DIR is not None:
376 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
377 d['main_path'] = os.path.normpath(main_path)
378
379 return d
380
381 #
382 # Make (Pipe)Connection picklable
383 #
384
385 def reduce_connection(conn):
386 if not Popen.thread_is_spawning():
387 raise RuntimeError(
388 'By default %s objects can only be shared between processes\n'
389 'using inheritance' % type(conn).__name__
390 )
391 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
392 conn.readable, conn.writable)
393
Jesse Noller13e9d582008-07-16 14:32:36 +0000394 ForkingPickler.register(Connection, reduce_connection)
395 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000396
397#
398# Prepare current process
399#
400
401old_main_modules = []
402
403def prepare(data):
404 '''
405 Try to get current process ready to unpickle process object
406 '''
407 old_main_modules.append(sys.modules['__main__'])
408
409 if 'name' in data:
410 process.current_process().set_name(data['name'])
411
412 if 'authkey' in data:
413 process.current_process()._authkey = data['authkey']
414
415 if 'log_to_stderr' in data and data['log_to_stderr']:
416 util.log_to_stderr()
417
418 if 'log_level' in data:
419 util.get_logger().setLevel(data['log_level'])
420
421 if 'sys_path' in data:
422 sys.path = data['sys_path']
423
424 if 'sys_argv' in data:
425 sys.argv = data['sys_argv']
426
427 if 'dir' in data:
428 os.chdir(data['dir'])
429
430 if 'orig_dir' in data:
431 process.ORIGINAL_DIR = data['orig_dir']
432
433 if 'main_path' in data:
434 main_path = data['main_path']
435 main_name = os.path.splitext(os.path.basename(main_path))[0]
436 if main_name == '__init__':
437 main_name = os.path.basename(os.path.dirname(main_path))
438
439 if main_name != 'ipython':
440 import imp
441
442 if main_path is None:
443 dirs = None
444 elif os.path.basename(main_path).startswith('__init__.py'):
445 dirs = [os.path.dirname(os.path.dirname(main_path))]
446 else:
447 dirs = [os.path.dirname(main_path)]
448
449 assert main_name not in sys.modules, main_name
450 file, path_name, etc = imp.find_module(main_name, dirs)
451 try:
452 # We would like to do "imp.load_module('__main__', ...)"
453 # here. However, that would cause 'if __name__ ==
454 # "__main__"' clauses to be executed.
455 main_module = imp.load_module(
456 '__parents_main__', file, path_name, etc
457 )
458 finally:
459 if file:
460 file.close()
461
462 sys.modules['__main__'] = main_module
463 main_module.__name__ = '__main__'
464
465 # Try to make the potentially picklable objects in
466 # sys.modules['__main__'] realize they are in the main
467 # module -- somewhat ugly.
468 for obj in main_module.__dict__.values():
469 try:
470 if obj.__module__ == '__parents_main__':
471 obj.__module__ = '__main__'
472 except Exception:
473 pass