blob: 2c1d3cf99590623aea2bc02a60aa20cc9b2b3c33 [file] [log] [blame]
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9import os
10import sys
11import signal
12
13from multiprocessing import util, process
14
15__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close']
16
17#
18# Check that the current thread is spawning a child process
19#
20
21def assert_spawning(self):
22 if not Popen.thread_is_spawning():
23 raise RuntimeError(
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self).__name__
26 )
27
28#
29# Unix
30#
31
32if sys.platform != 'win32':
33 import time
34
35 exit = os._exit
36 duplicate = os.dup
37 close = os.close
38
39 #
40 # We define a Popen class similar to the one from subprocess, but
41 # whose constructor takes a process object as its argument.
42 #
43
44 class Popen(object):
45
46 def __init__(self, process_obj):
47 sys.stdout.flush()
48 sys.stderr.flush()
49 self.returncode = None
50
51 self.pid = os.fork()
52 if self.pid == 0:
53 if 'random' in sys.modules:
54 import random
55 random.seed()
56 code = process_obj._bootstrap()
57 sys.stdout.flush()
58 sys.stderr.flush()
59 os._exit(code)
60
61 def poll(self, flag=os.WNOHANG):
62 if self.returncode is None:
63 pid, sts = os.waitpid(self.pid, flag)
64 if pid == self.pid:
65 if os.WIFSIGNALED(sts):
66 self.returncode = -os.WTERMSIG(sts)
67 else:
68 assert os.WIFEXITED(sts)
69 self.returncode = os.WEXITSTATUS(sts)
70 return self.returncode
71
72 def wait(self, timeout=None):
73 if timeout is None:
74 return self.poll(0)
75 deadline = time.time() + timeout
76 delay = 0.0005
77 while 1:
78 res = self.poll()
79 if res is not None:
80 break
81 remaining = deadline - time.time()
82 if remaining <= 0:
83 break
84 delay = min(delay * 2, remaining, 0.05)
85 time.sleep(delay)
86 return res
87
88 def terminate(self):
89 if self.returncode is None:
90 try:
91 os.kill(self.pid, signal.SIGTERM)
92 except OSError, e:
93 if self.wait(timeout=0.1) is None:
94 raise
95
96 @staticmethod
97 def thread_is_spawning():
98 return False
99
100#
101# Windows
102#
103
104else:
105 import thread
106 import msvcrt
107 import _subprocess
108 import copy_reg
109 import time
110
111 from ._multiprocessing import win32, Connection, PipeConnection
112 from .util import Finalize
113
114 try:
115 from cPickle import dump, load, HIGHEST_PROTOCOL
116 except ImportError:
117 from pickle import dump, load, HIGHEST_PROTOCOL
118
119 #
120 #
121 #
122
123 TERMINATE = 0x10000
124 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
125
126 exit = win32.ExitProcess
127 close = win32.CloseHandle
128
129 #
130 # _python_exe is the assumed path to the python executable.
131 # People embedding Python want to modify it.
132 #
133
134 if sys.executable.lower().endswith('pythonservice.exe'):
135 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
136 else:
137 _python_exe = sys.executable
138
139 def set_executable(exe):
140 global _python_exe
141 _python_exe = exe
142
143 #
144 #
145 #
146
147 def duplicate(handle, target_process=None, inheritable=False):
148 if target_process is None:
149 target_process = _subprocess.GetCurrentProcess()
150 return _subprocess.DuplicateHandle(
151 _subprocess.GetCurrentProcess(), handle, target_process,
152 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
153 ).Detach()
154
155 #
156 # We define a Popen class similar to the one from subprocess, but
157 # whose constructor takes a process object as its argument.
158 #
159
160 class Popen(object):
161 '''
162 Start a subprocess to run the code of a process object
163 '''
164 _tls = thread._local()
165
166 def __init__(self, process_obj):
167 # create pipe for communication with child
168 rfd, wfd = os.pipe()
169
170 # get handle for read end of the pipe and make it inheritable
171 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
172 os.close(rfd)
173
174 # start process
175 cmd = get_command_line() + [rhandle]
176 cmd = ' '.join('"%s"' % x for x in cmd)
177 hp, ht, pid, tid = _subprocess.CreateProcess(
178 _python_exe, cmd, None, None, 1, 0, None, None, None
179 )
180 ht.Close()
181 close(rhandle)
182
183 # set attributes of self
184 self.pid = pid
185 self.returncode = None
186 self._handle = hp
187
188 # send information to child
189 prep_data = get_preparation_data(process_obj._name)
190 to_child = os.fdopen(wfd, 'wb')
191 Popen._tls.process_handle = int(hp)
192 try:
193 dump(prep_data, to_child, HIGHEST_PROTOCOL)
194 dump(process_obj, to_child, HIGHEST_PROTOCOL)
195 finally:
196 del Popen._tls.process_handle
197 to_child.close()
198
199 @staticmethod
200 def thread_is_spawning():
201 return getattr(Popen._tls, 'process_handle', None) is not None
202
203 @staticmethod
204 def duplicate_for_child(handle):
205 return duplicate(handle, Popen._tls.process_handle)
206
207 def wait(self, timeout=None):
208 if self.returncode is None:
209 if timeout is None:
210 msecs = _subprocess.INFINITE
211 else:
212 msecs = max(0, int(timeout * 1000 + 0.5))
213
214 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
215 if res == _subprocess.WAIT_OBJECT_0:
216 code = _subprocess.GetExitCodeProcess(self._handle)
217 if code == TERMINATE:
218 code = -signal.SIGTERM
219 self.returncode = code
220
221 return self.returncode
222
223 def poll(self):
224 return self.wait(timeout=0)
225
226 def terminate(self):
227 if self.returncode is None:
228 try:
229 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
230 except WindowsError:
231 if self.wait(timeout=0.1) is None:
232 raise
233
234 #
235 #
236 #
237
238 def is_forking(argv):
239 '''
240 Return whether commandline indicates we are forking
241 '''
242 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
243 assert len(argv) == 3
244 return True
245 else:
246 return False
247
248
249 def freeze_support():
250 '''
251 Run code for process object if this in not the main process
252 '''
253 if is_forking(sys.argv):
254 main()
255 sys.exit()
256
257
258 def get_command_line():
259 '''
260 Returns prefix of command line used for spawning a child process
261 '''
262 if process.current_process()._identity==() and is_forking(sys.argv):
263 raise RuntimeError('''
264 Attempt to start a new process before the current process
265 has finished its bootstrapping phase.
266
267 This probably means that you are on Windows and you have
268 forgotten to use the proper idiom in the main module:
269
270 if __name__ == '__main__':
271 freeze_support()
272 ...
273
274 The "freeze_support()" line can be omitted if the program
275 is not going to be frozen to produce a Windows executable.''')
276
277 if getattr(sys, 'frozen', False):
278 return [sys.executable, '--multiprocessing-fork']
279 else:
280 prog = 'from multiprocessing.forking import main; main()'
281 return [_python_exe, '-c', prog, '--multiprocessing-fork']
282
283
284 def main():
285 '''
286 Run code specifed by data received over pipe
287 '''
288 assert is_forking(sys.argv)
289
290 handle = int(sys.argv[-1])
291 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
292 from_parent = os.fdopen(fd, 'rb')
293
294 process.current_process()._inheriting = True
295 preparation_data = load(from_parent)
296 prepare(preparation_data)
297 self = load(from_parent)
298 process.current_process()._inheriting = False
299
300 from_parent.close()
301
302 exitcode = self._bootstrap()
303 exit(exitcode)
304
305
306 def get_preparation_data(name):
307 '''
308 Return info about parent needed by child to unpickle process object
309 '''
310 from .util import _logger, _log_to_stderr
311
312 d = dict(
313 name=name,
314 sys_path=sys.path,
315 sys_argv=sys.argv,
316 log_to_stderr=_log_to_stderr,
317 orig_dir=process.ORIGINAL_DIR,
318 authkey=process.current_process().get_authkey(),
319 )
320
321 if _logger is not None:
322 d['log_level'] = _logger.getEffectiveLevel()
323
324 if not WINEXE:
325 main_path = getattr(sys.modules['__main__'], '__file__', None)
326 if not main_path and sys.argv[0] not in ('', '-c'):
327 main_path = sys.argv[0]
328 if main_path is not None:
329 if not os.path.isabs(main_path) and \
330 process.ORIGINAL_DIR is not None:
331 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
332 d['main_path'] = os.path.normpath(main_path)
333
334 return d
335
336 #
337 # Make (Pipe)Connection picklable
338 #
339
340 def reduce_connection(conn):
341 if not Popen.thread_is_spawning():
342 raise RuntimeError(
343 'By default %s objects can only be shared between processes\n'
344 'using inheritance' % type(conn).__name__
345 )
346 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
347 conn.readable, conn.writable)
348
349 copy_reg.pickle(Connection, reduce_connection)
350 copy_reg.pickle(PipeConnection, reduce_connection)
351
352
353#
354# Prepare current process
355#
356
357old_main_modules = []
358
359def prepare(data):
360 '''
361 Try to get current process ready to unpickle process object
362 '''
363 old_main_modules.append(sys.modules['__main__'])
364
365 if 'name' in data:
366 process.current_process().set_name(data['name'])
367
368 if 'authkey' in data:
369 process.current_process()._authkey = data['authkey']
370
371 if 'log_to_stderr' in data and data['log_to_stderr']:
372 util.log_to_stderr()
373
374 if 'log_level' in data:
375 util.get_logger().setLevel(data['log_level'])
376
377 if 'sys_path' in data:
378 sys.path = data['sys_path']
379
380 if 'sys_argv' in data:
381 sys.argv = data['sys_argv']
382
383 if 'dir' in data:
384 os.chdir(data['dir'])
385
386 if 'orig_dir' in data:
387 process.ORIGINAL_DIR = data['orig_dir']
388
389 if 'main_path' in data:
390 main_path = data['main_path']
391 main_name = os.path.splitext(os.path.basename(main_path))[0]
392 if main_name == '__init__':
393 main_name = os.path.basename(os.path.dirname(main_path))
394
395 if main_name != 'ipython':
396 import imp
397
398 if main_path is None:
399 dirs = None
400 elif os.path.basename(main_path).startswith('__init__.py'):
401 dirs = [os.path.dirname(os.path.dirname(main_path))]
402 else:
403 dirs = [os.path.dirname(main_path)]
404
405 assert main_name not in sys.modules, main_name
406 file, path_name, etc = imp.find_module(main_name, dirs)
407 try:
408 # We would like to do "imp.load_module('__main__', ...)"
409 # here. However, that would cause 'if __name__ ==
410 # "__main__"' clauses to be executed.
411 main_module = imp.load_module(
412 '__parents_main__', file, path_name, etc
413 )
414 finally:
415 if file:
416 file.close()
417
418 sys.modules['__main__'] = main_module
419 main_module.__name__ = '__main__'
420
421 # Try to make the potentially picklable objects in
422 # sys.modules['__main__'] realize they are in the main
423 # module -- somewhat ugly.
424 for obj in main_module.__dict__.values():
425 try:
426 if obj.__module__ == '__parents_main__':
427 obj.__module__ = '__main__'
428 except Exception:
429 pass