blob: 22a911a7a29cdc0219cb46da48fa7e2081abcc83 [file] [log] [blame]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001import errno
2import os
Charles-François Natalie241ac92013-09-05 20:46:49 +02003import selectors
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004import signal
5import socket
6import struct
7import sys
8import threading
Antoine Pitroudfd5f342017-06-12 15:28:19 +02009import warnings
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010010
11from . import connection
12from . import process
Davin Potts54586472016-09-09 18:03:10 -050013from .context import reduction
Pierre Glaserf22cc692019-05-10 22:59:08 +020014from . import resource_tracker
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010015from . import spawn
16from . import util
17
18__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
19 'set_forkserver_preload']
20
21#
22#
23#
24
25MAXFDS_TO_SEND = 256
Antoine Pitroudfd5f342017-06-12 15:28:19 +020026SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010028#
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010029# Forkserver class
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010030#
31
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010032class ForkServer(object):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010034 def __init__(self):
35 self._forkserver_address = None
36 self._forkserver_alive_fd = None
Antoine Pitroufc6b3482017-11-03 13:34:22 +010037 self._forkserver_pid = None
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010038 self._inherited_fds = None
39 self._lock = threading.Lock()
40 self._preload_modules = ['__main__']
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010041
Victor Stinner8fbeb142019-07-05 16:15:39 +020042 def _stop(self):
43 # Method used by unit tests to stop the server
44 with self._lock:
45 self._stop_unlocked()
46
47 def _stop_unlocked(self):
48 if self._forkserver_pid is None:
49 return
50
51 # close the "alive" file descriptor asks the server to stop
52 os.close(self._forkserver_alive_fd)
53 self._forkserver_alive_fd = None
54
55 os.waitpid(self._forkserver_pid, 0)
56 self._forkserver_pid = None
57
Pablo Galindo6012f302020-03-09 13:48:01 +000058 if not util.is_abstract_socket_namespace(self._forkserver_address):
59 os.unlink(self._forkserver_address)
Victor Stinner8fbeb142019-07-05 16:15:39 +020060 self._forkserver_address = None
61
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010062 def set_forkserver_preload(self, modules_names):
63 '''Set list of module names to try to load in forkserver process.'''
64 if not all(type(mod) is str for mod in self._preload_modules):
65 raise TypeError('module_names must be a list of strings')
66 self._preload_modules = modules_names
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010067
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010068 def get_inherited_fds(self):
69 '''Return list of fds inherited from parent process.
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010070
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010071 This returns None if the current process was not started by fork
72 server.
73 '''
74 return self._inherited_fds
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010075
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010076 def connect_to_new_process(self, fds):
77 '''Request forkserver to create a child process.
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010078
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010079 Returns a pair of fds (status_r, data_w). The calling process can read
80 the child process's pid and (eventually) its returncode from status_r.
81 The calling process should write to data_w the pickled preparation and
82 process data.
83 '''
84 self.ensure_running()
85 if len(fds) + 4 >= MAXFDS_TO_SEND:
86 raise ValueError('too many fds')
87 with socket.socket(socket.AF_UNIX) as client:
88 client.connect(self._forkserver_address)
89 parent_r, child_w = os.pipe()
90 child_r, parent_w = os.pipe()
91 allfds = [child_r, child_w, self._forkserver_alive_fd,
Pierre Glaserf22cc692019-05-10 22:59:08 +020092 resource_tracker.getfd()]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010093 allfds += fds
Richard Oudkerk0718f702013-08-22 11:38:55 +010094 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010095 reduction.sendfds(client, allfds)
96 return parent_r, parent_w
Richard Oudkerk0718f702013-08-22 11:38:55 +010097 except:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010098 os.close(parent_r)
99 os.close(parent_w)
Richard Oudkerk0718f702013-08-22 11:38:55 +0100100 raise
101 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100102 os.close(child_r)
103 os.close(child_w)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100104
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100105 def ensure_running(self):
106 '''Make sure that a fork server is running.
107
108 This can be called from any process. Note that usually a child
109 process will just reuse the forkserver started by its parent, so
110 ensure_running() will do nothing.
111 '''
112 with self._lock:
Pierre Glaserf22cc692019-05-10 22:59:08 +0200113 resource_tracker.ensure_running()
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100114 if self._forkserver_pid is not None:
115 # forkserver was launched before, is it still running?
116 pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
117 if not pid:
118 # still alive
119 return
120 # dead, launch it again
121 os.close(self._forkserver_alive_fd)
122 self._forkserver_address = None
123 self._forkserver_alive_fd = None
124 self._forkserver_pid = None
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100125
126 cmd = ('from multiprocessing.forkserver import main; ' +
127 'main(%d, %d, %r, **%r)')
128
129 if self._preload_modules:
130 desired_keys = {'main_path', 'sys_path'}
131 data = spawn.get_preparation_data('ignore')
Jon Dufresne39726282017-05-18 07:35:54 -0700132 data = {x: y for x, y in data.items() if x in desired_keys}
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100133 else:
134 data = {}
135
136 with socket.socket(socket.AF_UNIX) as listener:
137 address = connection.arbitrary_address('AF_UNIX')
138 listener.bind(address)
Pablo Galindo6012f302020-03-09 13:48:01 +0000139 if not util.is_abstract_socket_namespace(address):
140 os.chmod(address, 0o600)
Charles-François Natali6e204602014-07-23 19:28:13 +0100141 listener.listen()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100142
143 # all client processes own the write end of the "alive" pipe;
144 # when they all terminate the read end becomes ready.
145 alive_r, alive_w = os.pipe()
146 try:
147 fds_to_pass = [listener.fileno(), alive_r]
148 cmd %= (listener.fileno(), alive_r, self._preload_modules,
149 data)
150 exe = spawn.get_executable()
151 args = [exe] + util._args_from_interpreter_flags()
152 args += ['-c', cmd]
153 pid = util.spawnv_passfds(exe, args, fds_to_pass)
154 except:
155 os.close(alive_w)
156 raise
157 finally:
158 os.close(alive_r)
159 self._forkserver_address = address
160 self._forkserver_alive_fd = alive_w
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100161 self._forkserver_pid = pid
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100162
163#
164#
165#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100166
167def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
168 '''Run forkserver.'''
169 if preload:
170 if '__main__' in preload and main_path is not None:
171 process.current_process()._inheriting = True
172 try:
173 spawn.import_main_path(main_path)
174 finally:
175 del process.current_process()._inheriting
176 for modname in preload:
177 try:
178 __import__(modname)
179 except ImportError:
180 pass
181
Victor Stinnera6d865c2016-03-25 09:29:50 +0100182 util._close_stdin()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100183
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200184 sig_r, sig_w = os.pipe()
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200185 os.set_blocking(sig_r, False)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200186 os.set_blocking(sig_w, False)
187
188 def sigchld_handler(*_unused):
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200189 # Dummy signal handler, doesn't do anything
190 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200191
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200192 handlers = {
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100193 # unblocking SIGCHLD allows the wakeup fd to notify our event loop
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200194 signal.SIGCHLD: sigchld_handler,
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100195 # protect the process from ^C
196 signal.SIGINT: signal.SIG_IGN,
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200197 }
198 old_handlers = {sig: signal.signal(sig, val)
199 for (sig, val) in handlers.items()}
200
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200201 # calling os.write() in the Python signal handler is racy
202 signal.set_wakeup_fd(sig_w)
203
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200204 # map child pids to client fds
205 pid_to_fd = {}
206
Charles-François Natalie241ac92013-09-05 20:46:49 +0200207 with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
208 selectors.DefaultSelector() as selector:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100209 _forkserver._forkserver_address = listener.getsockname()
Charles-François Natalie241ac92013-09-05 20:46:49 +0200210
211 selector.register(listener, selectors.EVENT_READ)
212 selector.register(alive_r, selectors.EVENT_READ)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200213 selector.register(sig_r, selectors.EVENT_READ)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100214
215 while True:
216 try:
Charles-François Natalie241ac92013-09-05 20:46:49 +0200217 while True:
218 rfds = [key.fileobj for (key, events) in selector.select()]
219 if rfds:
220 break
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100221
222 if alive_r in rfds:
223 # EOF because no more client processes left
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500224 assert os.read(alive_r, 1) == b'', "Not at EOF?"
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100225 raise SystemExit
226
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200227 if sig_r in rfds:
228 # Got SIGCHLD
229 os.read(sig_r, 65536) # exhaust
230 while True:
231 # Scan for child processes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100232 try:
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200233 pid, sts = os.waitpid(-1, os.WNOHANG)
234 except ChildProcessError:
235 break
236 if pid == 0:
237 break
238 child_w = pid_to_fd.pop(pid, None)
239 if child_w is not None:
Victor Stinner65a796e2020-04-01 18:49:29 +0200240 returncode = os.waitstatus_to_exitcode(sts)
241
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200242 # Send exit code to client process
243 try:
244 write_signed(child_w, returncode)
245 except BrokenPipeError:
246 # client vanished
247 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200248 os.close(child_w)
249 else:
250 # This shouldn't happen really
251 warnings.warn('forkserver: waitpid returned '
252 'unexpected pid %d' % pid)
253
254 if listener in rfds:
255 # Incoming fork request
256 with listener.accept()[0] as s:
257 # Receive fds from client
258 fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500259 if len(fds) > MAXFDS_TO_SEND:
260 raise RuntimeError(
261 "Too many ({0:n}) fds to send".format(
262 len(fds)))
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200263 child_r, child_w, *fds = fds
264 s.close()
265 pid = os.fork()
266 if pid == 0:
267 # Child
268 code = 1
269 try:
270 listener.close()
Antoine Pitrou896145d2017-07-22 13:22:54 +0200271 selector.close()
272 unused_fds = [alive_r, child_w, sig_r, sig_w]
273 unused_fds.extend(pid_to_fd.values())
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200274 code = _serve_one(child_r, fds,
Antoine Pitrou896145d2017-07-22 13:22:54 +0200275 unused_fds,
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200276 old_handlers)
277 except Exception:
278 sys.excepthook(*sys.exc_info())
279 sys.stderr.flush()
280 finally:
281 os._exit(code)
282 else:
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200283 # Send pid to client process
284 try:
285 write_signed(child_w, pid)
286 except BrokenPipeError:
287 # client vanished
288 pass
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200289 pid_to_fd[pid] = child_w
290 os.close(child_r)
291 for fd in fds:
292 os.close(fd)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100293
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100294 except OSError as e:
295 if e.errno != errno.ECONNABORTED:
296 raise
297
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200298
299def _serve_one(child_r, fds, unused_fds, handlers):
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200300 # close unnecessary stuff and reset signal handlers
Antoine Pitrou2b5cc5e2017-06-13 09:46:06 +0200301 signal.set_wakeup_fd(-1)
Antoine Pitrou6dd4d732017-05-04 16:44:53 +0200302 for sig, val in handlers.items():
303 signal.signal(sig, val)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200304 for fd in unused_fds:
305 os.close(fd)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100306
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200307 (_forkserver._forkserver_alive_fd,
Pierre Glaserf22cc692019-05-10 22:59:08 +0200308 resource_tracker._resource_tracker._fd,
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200309 *_forkserver._inherited_fds) = fds
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100310
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200311 # Run process object received over pipe
Thomas Moreauc09a9f52019-05-20 21:37:05 +0200312 parent_sentinel = os.dup(child_r)
313 code = spawn._main(child_r, parent_sentinel)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100314
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200315 return code
316
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100317
318#
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200319# Read and write signed numbers
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100320#
321
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200322def read_signed(fd):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100323 data = b''
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200324 length = SIGNED_STRUCT.size
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100325 while len(data) < length:
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000326 s = os.read(fd, length - len(data))
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100327 if not s:
328 raise EOFError('unexpected EOF')
329 data += s
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200330 return SIGNED_STRUCT.unpack(data)[0]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100331
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200332def write_signed(fd, n):
333 msg = SIGNED_STRUCT.pack(n)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100334 while msg:
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000335 nbytes = os.write(fd, msg)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100336 if nbytes == 0:
337 raise RuntimeError('should not get here')
338 msg = msg[nbytes:]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100339
340#
341#
342#
343
344_forkserver = ForkServer()
345ensure_running = _forkserver.ensure_running
346get_inherited_fds = _forkserver.get_inherited_fds
347connect_to_new_process = _forkserver.connect_to_new_process
348set_forkserver_preload = _forkserver.set_forkserver_preload