blob: 87ebef6588acd3db346224630c9e0cc8c560a12a [file] [log] [blame]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001import errno
2import os
Charles-François Natalie241ac92013-09-05 20:46:49 +02003import selectors
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004import signal
5import socket
6import struct
7import sys
8import threading
Antoine Pitroudfd5f342017-06-12 15:28:19 +02009import warnings
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010010
11from . import connection
12from . import process
Davin Potts54586472016-09-09 18:03:10 -050013from .context import reduction
Pierre Glaserf22cc692019-05-10 22:59:08 +020014from . import resource_tracker
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010015from . import spawn
16from . import util
17
18__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
19 'set_forkserver_preload']
20
21#
22#
23#
24
25MAXFDS_TO_SEND = 256
Antoine Pitroudfd5f342017-06-12 15:28:19 +020026SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010028#
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010029# Forkserver class
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010030#
31
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010032class ForkServer(object):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010034 def __init__(self):
35 self._forkserver_address = None
36 self._forkserver_alive_fd = None
Antoine Pitroufc6b3482017-11-03 13:34:22 +010037 self._forkserver_pid = None
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010038 self._inherited_fds = None
39 self._lock = threading.Lock()
40 self._preload_modules = ['__main__']
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010041
Miss Islington (bot)229f6e82019-07-05 07:35:38 -070042 def _stop(self):
43 # Method used by unit tests to stop the server
44 with self._lock:
45 self._stop_unlocked()
46
47 def _stop_unlocked(self):
48 if self._forkserver_pid is None:
49 return
50
51 # close the "alive" file descriptor asks the server to stop
52 os.close(self._forkserver_alive_fd)
53 self._forkserver_alive_fd = None
54
55 os.waitpid(self._forkserver_pid, 0)
56 self._forkserver_pid = None
57
58 os.unlink(self._forkserver_address)
59 self._forkserver_address = None
60
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010061 def set_forkserver_preload(self, modules_names):
62 '''Set list of module names to try to load in forkserver process.'''
63 if not all(type(mod) is str for mod in self._preload_modules):
64 raise TypeError('module_names must be a list of strings')
65 self._preload_modules = modules_names
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010066
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010067 def get_inherited_fds(self):
68 '''Return list of fds inherited from parent process.
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010069
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010070 This returns None if the current process was not started by fork
71 server.
72 '''
73 return self._inherited_fds
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010074
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010075 def connect_to_new_process(self, fds):
76 '''Request forkserver to create a child process.
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010077
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010078 Returns a pair of fds (status_r, data_w). The calling process can read
79 the child process's pid and (eventually) its returncode from status_r.
80 The calling process should write to data_w the pickled preparation and
81 process data.
82 '''
83 self.ensure_running()
84 if len(fds) + 4 >= MAXFDS_TO_SEND:
85 raise ValueError('too many fds')
86 with socket.socket(socket.AF_UNIX) as client:
87 client.connect(self._forkserver_address)
88 parent_r, child_w = os.pipe()
89 child_r, parent_w = os.pipe()
90 allfds = [child_r, child_w, self._forkserver_alive_fd,
Pierre Glaserf22cc692019-05-10 22:59:08 +020091 resource_tracker.getfd()]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010092 allfds += fds
Richard Oudkerk0718f702013-08-22 11:38:55 +010093 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010094 reduction.sendfds(client, allfds)
95 return parent_r, parent_w
Richard Oudkerk0718f702013-08-22 11:38:55 +010096 except:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010097 os.close(parent_r)
98 os.close(parent_w)
Richard Oudkerk0718f702013-08-22 11:38:55 +010099 raise
100 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100101 os.close(child_r)
102 os.close(child_w)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100103
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100104 def ensure_running(self):
105 '''Make sure that a fork server is running.
106
107 This can be called from any process. Note that usually a child
108 process will just reuse the forkserver started by its parent, so
109 ensure_running() will do nothing.
110 '''
111 with self._lock:
Pierre Glaserf22cc692019-05-10 22:59:08 +0200112 resource_tracker.ensure_running()
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100113 if self._forkserver_pid is not None:
114 # forkserver was launched before, is it still running?
115 pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
116 if not pid:
117 # still alive
118 return
119 # dead, launch it again
120 os.close(self._forkserver_alive_fd)
121 self._forkserver_address = None
122 self._forkserver_alive_fd = None
123 self._forkserver_pid = None
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100124
125 cmd = ('from multiprocessing.forkserver import main; ' +
126 'main(%d, %d, %r, **%r)')
127
128 if self._preload_modules:
129 desired_keys = {'main_path', 'sys_path'}
130 data = spawn.get_preparation_data('ignore')
Jon Dufresne39726282017-05-18 07:35:54 -0700131 data = {x: y for x, y in data.items() if x in desired_keys}
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100132 else:
133 data = {}
134
135 with socket.socket(socket.AF_UNIX) as listener:
136 address = connection.arbitrary_address('AF_UNIX')
137 listener.bind(address)
138 os.chmod(address, 0o600)
Charles-François Natali6e204602014-07-23 19:28:13 +0100139 listener.listen()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100140
141 # all client processes own the write end of the "alive" pipe;
142 # when they all terminate the read end becomes ready.
143 alive_r, alive_w = os.pipe()
144 try:
145 fds_to_pass = [listener.fileno(), alive_r]
146 cmd %= (listener.fileno(), alive_r, self._preload_modules,
147 data)
148 exe = spawn.get_executable()
149 args = [exe] + util._args_from_interpreter_flags()
150 args += ['-c', cmd]
151 pid = util.spawnv_passfds(exe, args, fds_to_pass)
152 except:
153 os.close(alive_w)
154 raise
155 finally:
156 os.close(alive_r)
157 self._forkserver_address = address
158 self._forkserver_alive_fd = alive_w
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100159 self._forkserver_pid = pid
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100160
161#
162#
163#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100164
165def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
166 '''Run forkserver.'''
167 if preload:
168 if '__main__' in preload and main_path is not None:
169 process.current_process()._inheriting = True
170 try:
171 spawn.import_main_path(main_path)
172 finally:
173 del process.current_process()._inheriting
174 for modname in preload:
175 try:
176 __import__(modname)
177 except ImportError:
178 pass
179
Victor Stinnera6d865c2016-03-25 09:29:50 +0100180 util._close_stdin()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100181
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200182 sig_r, sig_w = os.pipe()
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200183 os.set_blocking(sig_r, False)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200184 os.set_blocking(sig_w, False)
185
186 def sigchld_handler(*_unused):
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200187 # Dummy signal handler, doesn't do anything
188 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200189
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200190 handlers = {
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100191 # unblocking SIGCHLD allows the wakeup fd to notify our event loop
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200192 signal.SIGCHLD: sigchld_handler,
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100193 # protect the process from ^C
194 signal.SIGINT: signal.SIG_IGN,
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200195 }
196 old_handlers = {sig: signal.signal(sig, val)
197 for (sig, val) in handlers.items()}
198
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200199 # calling os.write() in the Python signal handler is racy
200 signal.set_wakeup_fd(sig_w)
201
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200202 # map child pids to client fds
203 pid_to_fd = {}
204
Charles-François Natalie241ac92013-09-05 20:46:49 +0200205 with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
206 selectors.DefaultSelector() as selector:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100207 _forkserver._forkserver_address = listener.getsockname()
Charles-François Natalie241ac92013-09-05 20:46:49 +0200208
209 selector.register(listener, selectors.EVENT_READ)
210 selector.register(alive_r, selectors.EVENT_READ)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200211 selector.register(sig_r, selectors.EVENT_READ)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100212
213 while True:
214 try:
Charles-François Natalie241ac92013-09-05 20:46:49 +0200215 while True:
216 rfds = [key.fileobj for (key, events) in selector.select()]
217 if rfds:
218 break
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100219
220 if alive_r in rfds:
221 # EOF because no more client processes left
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500222 assert os.read(alive_r, 1) == b'', "Not at EOF?"
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100223 raise SystemExit
224
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200225 if sig_r in rfds:
226 # Got SIGCHLD
227 os.read(sig_r, 65536) # exhaust
228 while True:
229 # Scan for child processes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100230 try:
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200231 pid, sts = os.waitpid(-1, os.WNOHANG)
232 except ChildProcessError:
233 break
234 if pid == 0:
235 break
236 child_w = pid_to_fd.pop(pid, None)
237 if child_w is not None:
238 if os.WIFSIGNALED(sts):
239 returncode = -os.WTERMSIG(sts)
240 else:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500241 if not os.WIFEXITED(sts):
242 raise AssertionError(
243 "Child {0:n} status is {1:n}".format(
244 pid,sts))
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200245 returncode = os.WEXITSTATUS(sts)
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200246 # Send exit code to client process
247 try:
248 write_signed(child_w, returncode)
249 except BrokenPipeError:
250 # client vanished
251 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200252 os.close(child_w)
253 else:
254 # This shouldn't happen really
255 warnings.warn('forkserver: waitpid returned '
256 'unexpected pid %d' % pid)
257
258 if listener in rfds:
259 # Incoming fork request
260 with listener.accept()[0] as s:
261 # Receive fds from client
262 fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500263 if len(fds) > MAXFDS_TO_SEND:
264 raise RuntimeError(
265 "Too many ({0:n}) fds to send".format(
266 len(fds)))
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200267 child_r, child_w, *fds = fds
268 s.close()
269 pid = os.fork()
270 if pid == 0:
271 # Child
272 code = 1
273 try:
274 listener.close()
Antoine Pitrou896145d2017-07-22 13:22:54 +0200275 selector.close()
276 unused_fds = [alive_r, child_w, sig_r, sig_w]
277 unused_fds.extend(pid_to_fd.values())
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200278 code = _serve_one(child_r, fds,
Antoine Pitrou896145d2017-07-22 13:22:54 +0200279 unused_fds,
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200280 old_handlers)
281 except Exception:
282 sys.excepthook(*sys.exc_info())
283 sys.stderr.flush()
284 finally:
285 os._exit(code)
286 else:
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200287 # Send pid to client process
288 try:
289 write_signed(child_w, pid)
290 except BrokenPipeError:
291 # client vanished
292 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200293 pid_to_fd[pid] = child_w
294 os.close(child_r)
295 for fd in fds:
296 os.close(fd)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100297
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100298 except OSError as e:
299 if e.errno != errno.ECONNABORTED:
300 raise
301
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200302
303def _serve_one(child_r, fds, unused_fds, handlers):
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200304 # close unnecessary stuff and reset signal handlers
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200305 signal.set_wakeup_fd(-1)
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200306 for sig, val in handlers.items():
307 signal.signal(sig, val)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200308 for fd in unused_fds:
309 os.close(fd)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100310
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200311 (_forkserver._forkserver_alive_fd,
Pierre Glaserf22cc692019-05-10 22:59:08 +0200312 resource_tracker._resource_tracker._fd,
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200313 *_forkserver._inherited_fds) = fds
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100314
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200315 # Run process object received over pipe
Thomas Moreauc09a9f52019-05-20 21:37:05 +0200316 parent_sentinel = os.dup(child_r)
317 code = spawn._main(child_r, parent_sentinel)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100318
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200319 return code
320
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100321
322#
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200323# Read and write signed numbers
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100324#
325
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200326def read_signed(fd):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100327 data = b''
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200328 length = SIGNED_STRUCT.size
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100329 while len(data) < length:
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000330 s = os.read(fd, length - len(data))
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100331 if not s:
332 raise EOFError('unexpected EOF')
333 data += s
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200334 return SIGNED_STRUCT.unpack(data)[0]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100335
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200336def write_signed(fd, n):
337 msg = SIGNED_STRUCT.pack(n)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100338 while msg:
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000339 nbytes = os.write(fd, msg)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100340 if nbytes == 0:
341 raise RuntimeError('should not get here')
342 msg = msg[nbytes:]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100343
344#
345#
346#
347
348_forkserver = ForkServer()
349ensure_running = _forkserver.ensure_running
350get_inherited_fds = _forkserver.get_inherited_fds
351connect_to_new_process = _forkserver.connect_to_new_process
352set_forkserver_preload = _forkserver.set_forkserver_preload