blob: 69b842aa939a3c34c3fe067977af1360730587f0 [file] [log] [blame]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001import errno
2import os
Charles-François Natalie241ac92013-09-05 20:46:49 +02003import selectors
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004import signal
5import socket
6import struct
7import sys
8import threading
Antoine Pitroudfd5f342017-06-12 15:28:19 +02009import warnings
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010010
11from . import connection
12from . import process
Davin Potts54586472016-09-09 18:03:10 -050013from .context import reduction
Richard Oudkerk7d2d43c2013-08-22 11:38:57 +010014from . import semaphore_tracker
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010015from . import spawn
16from . import util
17
18__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
19 'set_forkserver_preload']
20
21#
22#
23#
24
25MAXFDS_TO_SEND = 256
Antoine Pitroudfd5f342017-06-12 15:28:19 +020026SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010028#
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010029# Forkserver class
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010030#
31
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010032class ForkServer(object):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010034 def __init__(self):
35 self._forkserver_address = None
36 self._forkserver_alive_fd = None
37 self._inherited_fds = None
38 self._lock = threading.Lock()
39 self._preload_modules = ['__main__']
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010040
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010041 def set_forkserver_preload(self, modules_names):
42 '''Set list of module names to try to load in forkserver process.'''
43 if not all(type(mod) is str for mod in self._preload_modules):
44 raise TypeError('module_names must be a list of strings')
45 self._preload_modules = modules_names
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010046
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010047 def get_inherited_fds(self):
48 '''Return list of fds inherited from parent process.
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010049
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010050 This returns None if the current process was not started by fork
51 server.
52 '''
53 return self._inherited_fds
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010054
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010055 def connect_to_new_process(self, fds):
56 '''Request forkserver to create a child process.
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010057
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010058 Returns a pair of fds (status_r, data_w). The calling process can read
59 the child process's pid and (eventually) its returncode from status_r.
60 The calling process should write to data_w the pickled preparation and
61 process data.
62 '''
63 self.ensure_running()
64 if len(fds) + 4 >= MAXFDS_TO_SEND:
65 raise ValueError('too many fds')
66 with socket.socket(socket.AF_UNIX) as client:
67 client.connect(self._forkserver_address)
68 parent_r, child_w = os.pipe()
69 child_r, parent_w = os.pipe()
70 allfds = [child_r, child_w, self._forkserver_alive_fd,
71 semaphore_tracker.getfd()]
72 allfds += fds
Richard Oudkerk0718f702013-08-22 11:38:55 +010073 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010074 reduction.sendfds(client, allfds)
75 return parent_r, parent_w
Richard Oudkerk0718f702013-08-22 11:38:55 +010076 except:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010077 os.close(parent_r)
78 os.close(parent_w)
Richard Oudkerk0718f702013-08-22 11:38:55 +010079 raise
80 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010081 os.close(child_r)
82 os.close(child_w)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010083
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010084 def ensure_running(self):
85 '''Make sure that a fork server is running.
86
87 This can be called from any process. Note that usually a child
88 process will just reuse the forkserver started by its parent, so
89 ensure_running() will do nothing.
90 '''
91 with self._lock:
92 semaphore_tracker.ensure_running()
93 if self._forkserver_alive_fd is not None:
94 return
95
96 cmd = ('from multiprocessing.forkserver import main; ' +
97 'main(%d, %d, %r, **%r)')
98
99 if self._preload_modules:
100 desired_keys = {'main_path', 'sys_path'}
101 data = spawn.get_preparation_data('ignore')
Jon Dufresne39726282017-05-18 07:35:54 -0700102 data = {x: y for x, y in data.items() if x in desired_keys}
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100103 else:
104 data = {}
105
106 with socket.socket(socket.AF_UNIX) as listener:
107 address = connection.arbitrary_address('AF_UNIX')
108 listener.bind(address)
109 os.chmod(address, 0o600)
Charles-François Natali6e204602014-07-23 19:28:13 +0100110 listener.listen()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100111
112 # all client processes own the write end of the "alive" pipe;
113 # when they all terminate the read end becomes ready.
114 alive_r, alive_w = os.pipe()
115 try:
116 fds_to_pass = [listener.fileno(), alive_r]
117 cmd %= (listener.fileno(), alive_r, self._preload_modules,
118 data)
119 exe = spawn.get_executable()
120 args = [exe] + util._args_from_interpreter_flags()
121 args += ['-c', cmd]
122 pid = util.spawnv_passfds(exe, args, fds_to_pass)
123 except:
124 os.close(alive_w)
125 raise
126 finally:
127 os.close(alive_r)
128 self._forkserver_address = address
129 self._forkserver_alive_fd = alive_w
130
131#
132#
133#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100134
135def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
136 '''Run forkserver.'''
137 if preload:
138 if '__main__' in preload and main_path is not None:
139 process.current_process()._inheriting = True
140 try:
141 spawn.import_main_path(main_path)
142 finally:
143 del process.current_process()._inheriting
144 for modname in preload:
145 try:
146 __import__(modname)
147 except ImportError:
148 pass
149
Victor Stinnera6d865c2016-03-25 09:29:50 +0100150 util._close_stdin()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100151
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200152 sig_r, sig_w = os.pipe()
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200153 os.set_blocking(sig_r, False)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200154 os.set_blocking(sig_w, False)
155
156 def sigchld_handler(*_unused):
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200157 # Dummy signal handler, doesn't do anything
158 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200159
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200160 # letting SIGINT through avoids KeyboardInterrupt tracebacks
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200161 # unblocking SIGCHLD allows the wakeup fd to notify our event loop
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200162 handlers = {
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200163 signal.SIGCHLD: sigchld_handler,
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200164 signal.SIGINT: signal.SIG_DFL,
165 }
166 old_handlers = {sig: signal.signal(sig, val)
167 for (sig, val) in handlers.items()}
168
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200169 # calling os.write() in the Python signal handler is racy
170 signal.set_wakeup_fd(sig_w)
171
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200172 # map child pids to client fds
173 pid_to_fd = {}
174
Charles-François Natalie241ac92013-09-05 20:46:49 +0200175 with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
176 selectors.DefaultSelector() as selector:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100177 _forkserver._forkserver_address = listener.getsockname()
Charles-François Natalie241ac92013-09-05 20:46:49 +0200178
179 selector.register(listener, selectors.EVENT_READ)
180 selector.register(alive_r, selectors.EVENT_READ)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200181 selector.register(sig_r, selectors.EVENT_READ)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100182
183 while True:
184 try:
Charles-François Natalie241ac92013-09-05 20:46:49 +0200185 while True:
186 rfds = [key.fileobj for (key, events) in selector.select()]
187 if rfds:
188 break
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100189
190 if alive_r in rfds:
191 # EOF because no more client processes left
192 assert os.read(alive_r, 1) == b''
193 raise SystemExit
194
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200195 if sig_r in rfds:
196 # Got SIGCHLD
197 os.read(sig_r, 65536) # exhaust
198 while True:
199 # Scan for child processes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100200 try:
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200201 pid, sts = os.waitpid(-1, os.WNOHANG)
202 except ChildProcessError:
203 break
204 if pid == 0:
205 break
206 child_w = pid_to_fd.pop(pid, None)
207 if child_w is not None:
208 if os.WIFSIGNALED(sts):
209 returncode = -os.WTERMSIG(sts)
210 else:
211 assert os.WIFEXITED(sts)
212 returncode = os.WEXITSTATUS(sts)
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200213 # Send exit code to client process
214 try:
215 write_signed(child_w, returncode)
216 except BrokenPipeError:
217 # client vanished
218 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200219 os.close(child_w)
220 else:
221 # This shouldn't happen really
222 warnings.warn('forkserver: waitpid returned '
223 'unexpected pid %d' % pid)
224
225 if listener in rfds:
226 # Incoming fork request
227 with listener.accept()[0] as s:
228 # Receive fds from client
229 fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
230 assert len(fds) <= MAXFDS_TO_SEND
231 child_r, child_w, *fds = fds
232 s.close()
233 pid = os.fork()
234 if pid == 0:
235 # Child
236 code = 1
237 try:
238 listener.close()
Antoine Pitrou896145d2017-07-22 13:22:54 +0200239 selector.close()
240 unused_fds = [alive_r, child_w, sig_r, sig_w]
241 unused_fds.extend(pid_to_fd.values())
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200242 code = _serve_one(child_r, fds,
Antoine Pitrou896145d2017-07-22 13:22:54 +0200243 unused_fds,
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200244 old_handlers)
245 except Exception:
246 sys.excepthook(*sys.exc_info())
247 sys.stderr.flush()
248 finally:
249 os._exit(code)
250 else:
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200251 # Send pid to client process
252 try:
253 write_signed(child_w, pid)
254 except BrokenPipeError:
255 # client vanished
256 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200257 pid_to_fd[pid] = child_w
258 os.close(child_r)
259 for fd in fds:
260 os.close(fd)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100261
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100262 except OSError as e:
263 if e.errno != errno.ECONNABORTED:
264 raise
265
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200266
267def _serve_one(child_r, fds, unused_fds, handlers):
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200268 # close unnecessary stuff and reset signal handlers
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200269 signal.set_wakeup_fd(-1)
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200270 for sig, val in handlers.items():
271 signal.signal(sig, val)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200272 for fd in unused_fds:
273 os.close(fd)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100274
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200275 (_forkserver._forkserver_alive_fd,
276 semaphore_tracker._semaphore_tracker._fd,
277 *_forkserver._inherited_fds) = fds
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100278
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200279 # Run process object received over pipe
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100280 code = spawn._main(child_r)
281
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200282 return code
283
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100284
285#
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200286# Read and write signed numbers
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100287#
288
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200289def read_signed(fd):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100290 data = b''
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200291 length = SIGNED_STRUCT.size
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100292 while len(data) < length:
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000293 s = os.read(fd, length - len(data))
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100294 if not s:
295 raise EOFError('unexpected EOF')
296 data += s
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200297 return SIGNED_STRUCT.unpack(data)[0]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100298
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200299def write_signed(fd, n):
300 msg = SIGNED_STRUCT.pack(n)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100301 while msg:
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000302 nbytes = os.write(fd, msg)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100303 if nbytes == 0:
304 raise RuntimeError('should not get here')
305 msg = msg[nbytes:]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100306
307#
308#
309#
310
311_forkserver = ForkServer()
312ensure_running = _forkserver.ensure_running
313get_inherited_fds = _forkserver.get_inherited_fds
314connect_to_new_process = _forkserver.connect_to_new_process
315set_forkserver_preload = _forkserver.set_forkserver_preload