blob: 5e04725a1e20c09b166b28c2ba17006e637b88ae [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import os
10import sys
11import signal
12
13from multiprocessing import util, process
14
Jesse Noller13e9d582008-07-16 14:32:36 +000015__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000016
17#
18# Check that the current thread is spawning a child process
19#
20
21def assert_spawning(self):
22 if not Popen.thread_is_spawning():
23 raise RuntimeError(
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self).__name__
26 )
27
28#
Jesse Noller13e9d582008-07-16 14:32:36 +000029# Try making some callable types picklable
30#
31
32from pickle import Pickler
33class ForkingPickler(Pickler):
34 dispatch = Pickler.dispatch.copy()
35
36 @classmethod
37 def register(cls, type, reduce):
38 def dispatcher(self, obj):
39 rv = reduce(obj)
40 self.save_reduce(obj=obj, *rv)
41 cls.dispatch[type] = dispatcher
42
43def _reduce_method(m):
44 if m.im_self is None:
45 return getattr, (m.im_class, m.im_func.func_name)
46 else:
47 return getattr, (m.im_self, m.im_func.func_name)
48ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
49
50def _reduce_method_descriptor(m):
51 return getattr, (m.__objclass__, m.__name__)
52ForkingPickler.register(type(list.append), _reduce_method_descriptor)
53ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
54
55#def _reduce_builtin_function_or_method(m):
56# return getattr, (m.__self__, m.__name__)
57#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
58#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
59
60try:
61 from functools import partial
62except ImportError:
63 pass
64else:
65 def _reduce_partial(p):
66 return _rebuild_partial, (p.func, p.args, p.keywords or {})
67 def _rebuild_partial(func, args, keywords):
68 return partial(func, *args, **keywords)
69 ForkingPickler.register(partial, _reduce_partial)
70
71#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000072# Unix
73#
74
75if sys.platform != 'win32':
76 import time
77
78 exit = os._exit
79 duplicate = os.dup
80 close = os.close
81
82 #
83 # We define a Popen class similar to the one from subprocess, but
84 # whose constructor takes a process object as its argument.
85 #
86
87 class Popen(object):
88
89 def __init__(self, process_obj):
90 sys.stdout.flush()
91 sys.stderr.flush()
92 self.returncode = None
93
94 self.pid = os.fork()
95 if self.pid == 0:
96 if 'random' in sys.modules:
97 import random
98 random.seed()
99 code = process_obj._bootstrap()
100 sys.stdout.flush()
101 sys.stderr.flush()
102 os._exit(code)
103
104 def poll(self, flag=os.WNOHANG):
105 if self.returncode is None:
Florent Xicluna16cd8882010-03-07 23:49:03 +0000106 try:
107 pid, sts = os.waitpid(self.pid, flag)
108 except os.error:
109 # Child process not yet created. See #1731717
110 # e.errno == errno.ECHILD == 10
111 return None
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000112 if pid == self.pid:
113 if os.WIFSIGNALED(sts):
114 self.returncode = -os.WTERMSIG(sts)
115 else:
116 assert os.WIFEXITED(sts)
117 self.returncode = os.WEXITSTATUS(sts)
118 return self.returncode
119
120 def wait(self, timeout=None):
121 if timeout is None:
122 return self.poll(0)
123 deadline = time.time() + timeout
124 delay = 0.0005
125 while 1:
126 res = self.poll()
127 if res is not None:
128 break
129 remaining = deadline - time.time()
130 if remaining <= 0:
131 break
132 delay = min(delay * 2, remaining, 0.05)
133 time.sleep(delay)
134 return res
135
136 def terminate(self):
137 if self.returncode is None:
138 try:
139 os.kill(self.pid, signal.SIGTERM)
140 except OSError, e:
141 if self.wait(timeout=0.1) is None:
142 raise
143
144 @staticmethod
145 def thread_is_spawning():
146 return False
147
148#
149# Windows
150#
151
152else:
153 import thread
154 import msvcrt
155 import _subprocess
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000156 import time
157
Jesse Noller2f8c8f42010-07-03 12:26:02 +0000158 from _multiprocessing import win32, Connection, PipeConnection
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000159 from .util import Finalize
160
Jesse Noller13e9d582008-07-16 14:32:36 +0000161 #try:
162 # from cPickle import dump, load, HIGHEST_PROTOCOL
163 #except ImportError:
164 from pickle import load, HIGHEST_PROTOCOL
165
166 def dump(obj, file, protocol=None):
167 ForkingPickler(file, protocol).dump(obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000168
169 #
170 #
171 #
172
173 TERMINATE = 0x10000
174 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
175
176 exit = win32.ExitProcess
177 close = win32.CloseHandle
178
179 #
180 # _python_exe is the assumed path to the python executable.
181 # People embedding Python want to modify it.
182 #
183
184 if sys.executable.lower().endswith('pythonservice.exe'):
185 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
186 else:
187 _python_exe = sys.executable
188
189 def set_executable(exe):
190 global _python_exe
191 _python_exe = exe
192
193 #
194 #
195 #
196
197 def duplicate(handle, target_process=None, inheritable=False):
198 if target_process is None:
199 target_process = _subprocess.GetCurrentProcess()
200 return _subprocess.DuplicateHandle(
201 _subprocess.GetCurrentProcess(), handle, target_process,
202 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
203 ).Detach()
204
205 #
206 # We define a Popen class similar to the one from subprocess, but
207 # whose constructor takes a process object as its argument.
208 #
209
210 class Popen(object):
211 '''
212 Start a subprocess to run the code of a process object
213 '''
214 _tls = thread._local()
215
216 def __init__(self, process_obj):
217 # create pipe for communication with child
218 rfd, wfd = os.pipe()
219
220 # get handle for read end of the pipe and make it inheritable
221 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
222 os.close(rfd)
223
224 # start process
225 cmd = get_command_line() + [rhandle]
226 cmd = ' '.join('"%s"' % x for x in cmd)
227 hp, ht, pid, tid = _subprocess.CreateProcess(
228 _python_exe, cmd, None, None, 1, 0, None, None, None
229 )
230 ht.Close()
231 close(rhandle)
232
233 # set attributes of self
234 self.pid = pid
235 self.returncode = None
236 self._handle = hp
237
238 # send information to child
239 prep_data = get_preparation_data(process_obj._name)
240 to_child = os.fdopen(wfd, 'wb')
241 Popen._tls.process_handle = int(hp)
242 try:
243 dump(prep_data, to_child, HIGHEST_PROTOCOL)
244 dump(process_obj, to_child, HIGHEST_PROTOCOL)
245 finally:
246 del Popen._tls.process_handle
247 to_child.close()
248
249 @staticmethod
250 def thread_is_spawning():
251 return getattr(Popen._tls, 'process_handle', None) is not None
252
253 @staticmethod
254 def duplicate_for_child(handle):
255 return duplicate(handle, Popen._tls.process_handle)
256
257 def wait(self, timeout=None):
258 if self.returncode is None:
259 if timeout is None:
260 msecs = _subprocess.INFINITE
261 else:
262 msecs = max(0, int(timeout * 1000 + 0.5))
263
264 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
265 if res == _subprocess.WAIT_OBJECT_0:
266 code = _subprocess.GetExitCodeProcess(self._handle)
267 if code == TERMINATE:
268 code = -signal.SIGTERM
269 self.returncode = code
270
271 return self.returncode
272
273 def poll(self):
274 return self.wait(timeout=0)
275
276 def terminate(self):
277 if self.returncode is None:
278 try:
279 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
280 except WindowsError:
281 if self.wait(timeout=0.1) is None:
282 raise
283
284 #
285 #
286 #
287
288 def is_forking(argv):
289 '''
290 Return whether commandline indicates we are forking
291 '''
292 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
293 assert len(argv) == 3
294 return True
295 else:
296 return False
297
298
299 def freeze_support():
300 '''
301 Run code for process object if this in not the main process
302 '''
303 if is_forking(sys.argv):
304 main()
305 sys.exit()
306
307
308 def get_command_line():
309 '''
310 Returns prefix of command line used for spawning a child process
311 '''
312 if process.current_process()._identity==() and is_forking(sys.argv):
313 raise RuntimeError('''
314 Attempt to start a new process before the current process
315 has finished its bootstrapping phase.
316
317 This probably means that you are on Windows and you have
318 forgotten to use the proper idiom in the main module:
319
320 if __name__ == '__main__':
321 freeze_support()
322 ...
323
324 The "freeze_support()" line can be omitted if the program
325 is not going to be frozen to produce a Windows executable.''')
326
327 if getattr(sys, 'frozen', False):
328 return [sys.executable, '--multiprocessing-fork']
329 else:
330 prog = 'from multiprocessing.forking import main; main()'
331 return [_python_exe, '-c', prog, '--multiprocessing-fork']
332
333
334 def main():
335 '''
336 Run code specifed by data received over pipe
337 '''
338 assert is_forking(sys.argv)
339
340 handle = int(sys.argv[-1])
341 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
342 from_parent = os.fdopen(fd, 'rb')
343
344 process.current_process()._inheriting = True
345 preparation_data = load(from_parent)
346 prepare(preparation_data)
347 self = load(from_parent)
348 process.current_process()._inheriting = False
349
350 from_parent.close()
351
352 exitcode = self._bootstrap()
353 exit(exitcode)
354
355
356 def get_preparation_data(name):
357 '''
358 Return info about parent needed by child to unpickle process object
359 '''
360 from .util import _logger, _log_to_stderr
361
362 d = dict(
363 name=name,
364 sys_path=sys.path,
365 sys_argv=sys.argv,
366 log_to_stderr=_log_to_stderr,
367 orig_dir=process.ORIGINAL_DIR,
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000368 authkey=process.current_process().authkey,
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000369 )
370
371 if _logger is not None:
372 d['log_level'] = _logger.getEffectiveLevel()
373
374 if not WINEXE:
375 main_path = getattr(sys.modules['__main__'], '__file__', None)
376 if not main_path and sys.argv[0] not in ('', '-c'):
377 main_path = sys.argv[0]
378 if main_path is not None:
379 if not os.path.isabs(main_path) and \
380 process.ORIGINAL_DIR is not None:
381 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
382 d['main_path'] = os.path.normpath(main_path)
383
384 return d
385
386 #
387 # Make (Pipe)Connection picklable
388 #
389
390 def reduce_connection(conn):
391 if not Popen.thread_is_spawning():
392 raise RuntimeError(
393 'By default %s objects can only be shared between processes\n'
394 'using inheritance' % type(conn).__name__
395 )
396 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
397 conn.readable, conn.writable)
398
Jesse Noller13e9d582008-07-16 14:32:36 +0000399 ForkingPickler.register(Connection, reduce_connection)
400 ForkingPickler.register(PipeConnection, reduce_connection)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000401
402#
403# Prepare current process
404#
405
406old_main_modules = []
407
408def prepare(data):
409 '''
410 Try to get current process ready to unpickle process object
411 '''
412 old_main_modules.append(sys.modules['__main__'])
413
414 if 'name' in data:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000415 process.current_process().name = data['name']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000416
417 if 'authkey' in data:
418 process.current_process()._authkey = data['authkey']
419
420 if 'log_to_stderr' in data and data['log_to_stderr']:
421 util.log_to_stderr()
422
423 if 'log_level' in data:
424 util.get_logger().setLevel(data['log_level'])
425
426 if 'sys_path' in data:
427 sys.path = data['sys_path']
428
429 if 'sys_argv' in data:
430 sys.argv = data['sys_argv']
431
432 if 'dir' in data:
433 os.chdir(data['dir'])
434
435 if 'orig_dir' in data:
436 process.ORIGINAL_DIR = data['orig_dir']
437
438 if 'main_path' in data:
439 main_path = data['main_path']
440 main_name = os.path.splitext(os.path.basename(main_path))[0]
441 if main_name == '__init__':
442 main_name = os.path.basename(os.path.dirname(main_path))
443
444 if main_name != 'ipython':
445 import imp
446
447 if main_path is None:
448 dirs = None
449 elif os.path.basename(main_path).startswith('__init__.py'):
450 dirs = [os.path.dirname(os.path.dirname(main_path))]
451 else:
452 dirs = [os.path.dirname(main_path)]
453
454 assert main_name not in sys.modules, main_name
455 file, path_name, etc = imp.find_module(main_name, dirs)
456 try:
457 # We would like to do "imp.load_module('__main__', ...)"
458 # here. However, that would cause 'if __name__ ==
459 # "__main__"' clauses to be executed.
460 main_module = imp.load_module(
461 '__parents_main__', file, path_name, etc
462 )
463 finally:
464 if file:
465 file.close()
466
467 sys.modules['__main__'] = main_module
468 main_module.__name__ = '__main__'
469
470 # Try to make the potentially picklable objects in
471 # sys.modules['__main__'] realize they are in the main
472 # module -- somewhat ugly.
473 for obj in main_module.__dict__.values():
474 try:
475 if obj.__module__ == '__parents_main__':
476 obj.__module__ = '__main__'
477 except Exception:
478 pass