blob: 1aa3b396086c59a479453c5ec6ca209a3e7960e2 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
32 'FastChildWatcher', 'DefaultEventLoopPolicy',
33)
34
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036if sys.platform == 'win32': # pragma: no cover
37 raise ImportError('Signals are not really supported on Windows')
38
39
Victor Stinnerfe5649c2014-07-17 22:43:40 +020040def _sighandler_noop(signum, frame):
41 """Dummy signal handler."""
42 pass
43
44
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080045class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050046 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
Yury Selivanovb057c522014-02-18 12:15:06 -050048 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049 """
50
51 def __init__(self, selector=None):
52 super().__init__(selector)
53 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020057 if not sys.is_finalizing():
58 for sig in list(self._signal_handlers):
59 self.remove_signal_handler(sig)
60 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020061 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020062 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020064 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020065 ResourceWarning,
66 source=self)
67 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080068
Victor Stinnerfe5649c2014-07-17 22:43:40 +020069 def _process_self_data(self, data):
70 for signum in data:
71 if not signum:
72 # ignore null bytes written by _write_to_self()
73 continue
74 self._handle_signal(signum)
75
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 def add_signal_handler(self, sig, callback, *args):
77 """Add a handler for a signal. UNIX only.
78
79 Raise ValueError if the signal number is invalid or uncatchable.
80 Raise RuntimeError if there is a problem setting up the handler.
81 """
Yury Selivanov6370f342017-12-10 18:36:12 -050082 if (coroutines.iscoroutine(callback) or
83 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010084 raise TypeError("coroutines cannot be used "
85 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010087 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 try:
89 # set_wakeup_fd() raises ValueError if this is not the
90 # main thread. By calling it early we ensure that an
91 # event loop running in another thread cannot add a signal
92 # handler.
93 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020094 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 raise RuntimeError(str(exc))
96
Yury Selivanovf23746a2018-01-22 19:11:18 -050097 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._signal_handlers[sig] = handle
99
100 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200101 # Register a dummy signal handler to ask Python to write the signal
102 # number in the wakup file descriptor. _process_self_data() will
103 # read signal numbers from this file descriptor to handle signals.
104 signal.signal(sig, _sighandler_noop)
105
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100106 # Set SA_RESTART to limit EINTR occurrences.
107 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 except OSError as exc:
109 del self._signal_handlers[sig]
110 if not self._signal_handlers:
111 try:
112 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200113 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700114 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500117 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118 else:
119 raise
120
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200121 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 """Internal helper that is the actual signal handler."""
123 handle = self._signal_handlers.get(sig)
124 if handle is None:
125 return # Assume it's some race condition.
126 if handle._cancelled:
127 self.remove_signal_handler(sig) # Remove it properly.
128 else:
129 self._add_callback_signalsafe(handle)
130
131 def remove_signal_handler(self, sig):
132 """Remove a handler for a signal. UNIX only.
133
134 Return True if a signal handler was removed, False if not.
135 """
136 self._check_signal(sig)
137 try:
138 del self._signal_handlers[sig]
139 except KeyError:
140 return False
141
142 if sig == signal.SIGINT:
143 handler = signal.default_int_handler
144 else:
145 handler = signal.SIG_DFL
146
147 try:
148 signal.signal(sig, handler)
149 except OSError as exc:
150 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500151 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 else:
153 raise
154
155 if not self._signal_handlers:
156 try:
157 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200158 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700159 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160
161 return True
162
163 def _check_signal(self, sig):
164 """Internal helper to validate a signal.
165
166 Raise ValueError if the signal number is invalid or uncatchable.
167 Raise RuntimeError if there is a problem setting up the handler.
168 """
169 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500170 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200172 if sig not in signal.valid_signals():
173 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
176 extra=None):
177 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
178
179 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
180 extra=None):
181 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
182
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200183 async def _make_subprocess_transport(self, protocol, args, shell,
184 stdin, stdout, stderr, bufsize,
185 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400187 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188 transp = _UnixSubprocessTransport(self, protocol, args, shell,
189 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100190 waiter=waiter, extra=extra,
191 **kwargs)
192
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800193 watcher.add_child_handler(transp.get_pid(),
194 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100195 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200196 await waiter
197 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100198 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200199 await transp._wait()
200 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800201
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 return transp
203
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800204 def _child_watcher_callback(self, pid, returncode, transp):
205 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
Neil Aspinallf7686c12017-12-19 19:45:42 +0000207 async def create_unix_connection(
208 self, protocol_factory, path=None, *,
209 ssl=None, sock=None,
210 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200211 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200220 if ssl_handshake_timeout is not None:
221 raise ValueError(
222 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500223
224 if path is not None:
225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Andrew Svetlovcc839202017-11-29 18:23:43 +0200229 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200233 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100234 except:
235 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 raise
237
238 else:
239 if sock is None:
240 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400241 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500242 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400243 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500244 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 sock.setblocking(False)
246
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200247 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000248 sock, protocol_factory, ssl, server_hostname,
249 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500250 return transport, protocol
251
Neil Aspinallf7686c12017-12-19 19:45:42 +0000252 async def create_unix_server(
253 self, protocol_factory, path=None, *,
254 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500255 ssl_handshake_timeout=None,
256 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500257 if isinstance(ssl, bool):
258 raise TypeError('ssl argument must be an SSLContext or None')
259
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200260 if ssl_handshake_timeout is not None and not ssl:
261 raise ValueError(
262 'ssl_handshake_timeout is only meaningful with ssl')
263
Yury Selivanovb057c522014-02-18 12:15:06 -0500264 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200265 if sock is not None:
266 raise ValueError(
267 'path and sock can not be specified at the same time')
268
Andrew Svetlovcc839202017-11-29 18:23:43 +0200269 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
271
Yury Selivanov908d55d2016-10-09 12:15:08 -0400272 # Check for abstract socket. `str` and `bytes` paths are supported.
273 if path[0] not in (0, '\x00'):
274 try:
275 if stat.S_ISSOCK(os.stat(path).st_mode):
276 os.remove(path)
277 except FileNotFoundError:
278 pass
279 except OSError as err:
280 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200281 logger.error('Unable to check or remove stale UNIX socket '
282 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400283
Yury Selivanovb057c522014-02-18 12:15:06 -0500284 try:
285 sock.bind(path)
286 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100287 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500288 if exc.errno == errno.EADDRINUSE:
289 # Let's improve the error message by adding
290 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500291 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500292 raise OSError(errno.EADDRINUSE, msg) from None
293 else:
294 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200295 except:
296 sock.close()
297 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500298 else:
299 if sock is None:
300 raise ValueError(
301 'path was not specified, and no sock specified')
302
Yury Selivanov36e7e972016-10-07 12:39:57 -0400303 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500304 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500305 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500306 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500307
Yury Selivanovb057c522014-02-18 12:15:06 -0500308 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500309 server = base_events.Server(self, [sock], protocol_factory,
310 ssl, backlog, ssl_handshake_timeout)
311 if start_serving:
312 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400313 # Skip one loop iteration so that all 'loop.add_reader'
314 # go through.
315 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500316
Yury Selivanovb057c522014-02-18 12:15:06 -0500317 return server
318
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200319 async def _sock_sendfile_native(self, sock, file, offset, count):
320 try:
321 os.sendfile
322 except AttributeError as exc:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700323 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200324 "os.sendfile() is not available")
325 try:
326 fileno = file.fileno()
327 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700328 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200329 try:
330 fsize = os.fstat(fileno).st_size
331 except OSError as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700332 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200333 blocksize = count if count else fsize
334 if not blocksize:
335 return 0 # empty file
336
337 fut = self.create_future()
338 self._sock_sendfile_native_impl(fut, None, sock, fileno,
339 offset, count, blocksize, 0)
340 return await fut
341
342 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
343 offset, count, blocksize, total_sent):
344 fd = sock.fileno()
345 if registered_fd is not None:
346 # Remove the callback early. It should be rare that the
347 # selector says the fd is ready but the call still returns
348 # EAGAIN, and I am willing to take a hit in that case in
349 # order to simplify the common case.
350 self.remove_writer(registered_fd)
351 if fut.cancelled():
352 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
353 return
354 if count:
355 blocksize = count - total_sent
356 if blocksize <= 0:
357 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
358 fut.set_result(total_sent)
359 return
360
361 try:
362 sent = os.sendfile(fd, fileno, offset, blocksize)
363 except (BlockingIOError, InterruptedError):
364 if registered_fd is None:
365 self._sock_add_cancellation_callback(fut, sock)
366 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
367 fd, sock, fileno,
368 offset, count, blocksize, total_sent)
369 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500370 if (registered_fd is not None and
371 exc.errno == errno.ENOTCONN and
372 type(exc) is not ConnectionError):
373 # If we have an ENOTCONN and this isn't a first call to
374 # sendfile(), i.e. the connection was closed in the middle
375 # of the operation, normalize the error to ConnectionError
376 # to make it consistent across all Posix systems.
377 new_exc = ConnectionError(
378 "socket is not connected", errno.ENOTCONN)
379 new_exc.__cause__ = exc
380 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200381 if total_sent == 0:
382 # We can get here for different reasons, the main
383 # one being 'file' is not a regular mmap(2)-like
384 # file, in which case we'll fall back on using
385 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700386 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200387 "os.sendfile call failed")
388 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
389 fut.set_exception(err)
390 else:
391 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
392 fut.set_exception(exc)
393 except Exception as exc:
394 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
395 fut.set_exception(exc)
396 else:
397 if sent == 0:
398 # EOF
399 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
400 fut.set_result(total_sent)
401 else:
402 offset += sent
403 total_sent += sent
404 if registered_fd is None:
405 self._sock_add_cancellation_callback(fut, sock)
406 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
407 fd, sock, fileno,
408 offset, count, blocksize, total_sent)
409
410 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
411 if total_sent > 0:
412 os.lseek(fileno, offset, os.SEEK_SET)
413
414 def _sock_add_cancellation_callback(self, fut, sock):
415 def cb(fut):
416 if fut.cancelled():
417 fd = sock.fileno()
418 if fd != -1:
419 self.remove_writer(fd)
420 fut.add_done_callback(cb)
421
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423class _UnixReadPipeTransport(transports.ReadTransport):
424
Yury Selivanovdec1a452014-02-18 22:27:48 -0500425 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
428 super().__init__(extra)
429 self._extra['pipe'] = pipe
430 self._loop = loop
431 self._pipe = pipe
432 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700433 self._protocol = protocol
434 self._closing = False
435
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700436 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800437 if not (stat.S_ISFIFO(mode) or
438 stat.S_ISSOCK(mode) or
439 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700440 self._pipe = None
441 self._fileno = None
442 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700443 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700444
Andrew Svetlovcc839202017-11-29 18:23:43 +0200445 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700446
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100448 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400449 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100450 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100452 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500453 self._loop.call_soon(futures._set_result_unless_cancelled,
454 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
Victor Stinnere912e652014-07-12 03:11:53 +0200456 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100457 info = [self.__class__.__name__]
458 if self._pipe is None:
459 info.append('closed')
460 elif self._closing:
461 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500462 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400463 selector = getattr(self._loop, '_selector', None)
464 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200465 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500466 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200467 if polling:
468 info.append('polling')
469 else:
470 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400471 elif self._pipe is not None:
472 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200473 else:
474 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500475 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200476
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 def _read_ready(self):
478 try:
479 data = os.read(self._fileno, self.max_size)
480 except (BlockingIOError, InterruptedError):
481 pass
482 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100483 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 else:
485 if data:
486 self._protocol.data_received(data)
487 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200488 if self._loop.get_debug():
489 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400491 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 self._loop.call_soon(self._protocol.eof_received)
493 self._loop.call_soon(self._call_connection_lost, None)
494
Guido van Rossum57497ad2013-10-18 07:58:20 -0700495 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400496 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
Guido van Rossum57497ad2013-10-18 07:58:20 -0700498 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400499 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400501 def set_protocol(self, protocol):
502 self._protocol = protocol
503
504 def get_protocol(self):
505 return self._protocol
506
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500507 def is_closing(self):
508 return self._closing
509
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 def close(self):
511 if not self._closing:
512 self._close(None)
513
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100514 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900515 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100516 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900517 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100518
Victor Stinner0ee29c22014-02-19 01:40:41 +0100519 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200521 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
522 if self._loop.get_debug():
523 logger.debug("%r: %s", self, message, exc_info=True)
524 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500525 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100526 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500527 'exception': exc,
528 'transport': self,
529 'protocol': self._protocol,
530 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 self._close(exc)
532
533 def _close(self, exc):
534 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400535 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 self._loop.call_soon(self._call_connection_lost, exc)
537
538 def _call_connection_lost(self, exc):
539 try:
540 self._protocol.connection_lost(exc)
541 finally:
542 self._pipe.close()
543 self._pipe = None
544 self._protocol = None
545 self._loop = None
546
547
Yury Selivanov3cb99142014-02-18 18:41:13 -0500548class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800549 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100552 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 self._pipe = pipe
555 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400557 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 self._conn_lost = 0
559 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700560
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700561 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700562 is_char = stat.S_ISCHR(mode)
563 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700564 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700565 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700566 self._pipe = None
567 self._fileno = None
568 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100569 raise ValueError("Pipe transport is only for "
570 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700571
Andrew Svetlovcc839202017-11-29 18:23:43 +0200572 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100574
575 # On AIX, the reader trick (to be notified when the read end of the
576 # socket is closed) only works for sockets. On other platforms it
577 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700578 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100579 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400580 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100581 self._fileno, self._read_ready)
582
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100584 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500585 self._loop.call_soon(futures._set_result_unless_cancelled,
586 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587
Victor Stinnere912e652014-07-12 03:11:53 +0200588 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100589 info = [self.__class__.__name__]
590 if self._pipe is None:
591 info.append('closed')
592 elif self._closing:
593 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500594 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400595 selector = getattr(self._loop, '_selector', None)
596 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200597 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500598 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200599 if polling:
600 info.append('polling')
601 else:
602 info.append('idle')
603
604 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500605 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400606 elif self._pipe is not None:
607 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200608 else:
609 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500610 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200611
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800612 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400613 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800614
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700616 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200617 if self._loop.get_debug():
618 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100619 if self._buffer:
620 self._close(BrokenPipeError())
621 else:
622 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623
624 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800625 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
626 if isinstance(data, bytearray):
627 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 if not data:
629 return
630
631 if self._conn_lost or self._closing:
632 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700633 logger.warning('pipe closed by peer or '
634 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 self._conn_lost += 1
636 return
637
638 if not self._buffer:
639 # Attempt to send it right away first.
640 try:
641 n = os.write(self._fileno, data)
642 except (BlockingIOError, InterruptedError):
643 n = 0
644 except Exception as exc:
645 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100646 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 return
648 if n == len(data):
649 return
650 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400651 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400652 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400654 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800655 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
657 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400658 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400661 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400663 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400665 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 self._conn_lost += 1
667 # Remove writer here, _fatal_error() doesn't it
668 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400669 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100670 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400672 if n == len(self._buffer):
673 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400674 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800675 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400676 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400677 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 self._call_connection_lost(None)
679 return
680 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400681 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
683 def can_write_eof(self):
684 return True
685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 def write_eof(self):
687 if self._closing:
688 return
689 assert self._pipe
690 self._closing = True
691 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400692 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 self._loop.call_soon(self._call_connection_lost, None)
694
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400695 def set_protocol(self, protocol):
696 self._protocol = protocol
697
698 def get_protocol(self):
699 return self._protocol
700
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500701 def is_closing(self):
702 return self._closing
703
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100705 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 # write_eof is all what we needed to close the write pipe
707 self.write_eof()
708
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100709 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900710 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100711 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900712 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100713
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714 def abort(self):
715 self._close(None)
716
Victor Stinner0ee29c22014-02-19 01:40:41 +0100717 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200719 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200720 if self._loop.get_debug():
721 logger.debug("%r: %s", self, message, exc_info=True)
722 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500723 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100724 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500725 'exception': exc,
726 'transport': self,
727 'protocol': self._protocol,
728 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 self._close(exc)
730
731 def _close(self, exc=None):
732 self._closing = True
733 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400734 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400736 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 self._loop.call_soon(self._call_connection_lost, exc)
738
739 def _call_connection_lost(self, exc):
740 try:
741 self._protocol.connection_lost(exc)
742 finally:
743 self._pipe.close()
744 self._pipe = None
745 self._protocol = None
746 self._loop = None
747
748
Guido van Rossum59691282013-10-30 14:52:03 -0700749class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750
Guido van Rossum59691282013-10-30 14:52:03 -0700751 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700752 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700753 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700754 # Use a socket pair for stdin, since not all platforms
755 # support selecting read events on the write end of a
756 # socket (which we use in order to detect closing of the
757 # other end). Notably this is needed on AIX, and works
758 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100759 stdin, stdin_w = socket.socketpair()
Niklas Fiekas9932fd92019-05-20 14:02:17 +0200760 try:
761 self._proc = subprocess.Popen(
762 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
763 universal_newlines=False, bufsize=bufsize, **kwargs)
764 if stdin_w is not None:
765 stdin.close()
766 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
767 stdin_w = None
768 finally:
769 if stdin_w is not None:
770 stdin.close()
771 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800772
773
774class AbstractChildWatcher:
775 """Abstract base class for monitoring child processes.
776
777 Objects derived from this class monitor a collection of subprocesses and
778 report their termination or interruption by a signal.
779
780 New callbacks are registered with .add_child_handler(). Starting a new
781 process must be done within a 'with' block to allow the watcher to suspend
782 its activity until the new process if fully registered (this is needed to
783 prevent a race condition in some implementations).
784
785 Example:
786 with watcher:
787 proc = subprocess.Popen("sleep 1")
788 watcher.add_child_handler(proc.pid, callback)
789
790 Notes:
791 Implementations of this class must be thread-safe.
792
793 Since child watcher objects may catch the SIGCHLD signal and call
794 waitpid(-1), there should be only one active object per process.
795 """
796
797 def add_child_handler(self, pid, callback, *args):
798 """Register a new child handler.
799
800 Arrange for callback(pid, returncode, *args) to be called when
801 process 'pid' terminates. Specifying another callback for the same
802 process replaces the previous handler.
803
Victor Stinneracdb7822014-07-14 18:33:40 +0200804 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800805 """
806 raise NotImplementedError()
807
808 def remove_child_handler(self, pid):
809 """Removes the handler for process 'pid'.
810
811 The function returns True if the handler was successfully removed,
812 False if there was nothing to remove."""
813
814 raise NotImplementedError()
815
Guido van Rossum2bcae702013-11-13 15:50:08 -0800816 def attach_loop(self, loop):
817 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800818
Guido van Rossum2bcae702013-11-13 15:50:08 -0800819 If the watcher was previously attached to an event loop, then it is
820 first detached before attaching to the new loop.
821
822 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800823 """
824 raise NotImplementedError()
825
826 def close(self):
827 """Close the watcher.
828
829 This must be called to make sure that any underlying resource is freed.
830 """
831 raise NotImplementedError()
832
833 def __enter__(self):
834 """Enter the watcher's context and allow starting new processes
835
836 This function must return self"""
837 raise NotImplementedError()
838
839 def __exit__(self, a, b, c):
840 """Exit the watcher's context"""
841 raise NotImplementedError()
842
843
844class BaseChildWatcher(AbstractChildWatcher):
845
Guido van Rossum2bcae702013-11-13 15:50:08 -0800846 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800847 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400848 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800849
850 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800851 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800852
853 def _do_waitpid(self, expected_pid):
854 raise NotImplementedError()
855
856 def _do_waitpid_all(self):
857 raise NotImplementedError()
858
Guido van Rossum2bcae702013-11-13 15:50:08 -0800859 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800860 assert loop is None or isinstance(loop, events.AbstractEventLoop)
861
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400862 if self._loop is not None and loop is None and self._callbacks:
863 warnings.warn(
864 'A loop is being detached '
865 'from a child watcher with pending handlers',
866 RuntimeWarning)
867
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800868 if self._loop is not None:
869 self._loop.remove_signal_handler(signal.SIGCHLD)
870
871 self._loop = loop
872 if loop is not None:
873 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
874
875 # Prevent a race condition in case a child terminated
876 # during the switch.
877 self._do_waitpid_all()
878
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800879 def _sig_chld(self):
880 try:
881 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500882 except Exception as exc:
883 # self._loop should always be available here
884 # as '_sig_chld' is added as a signal handler
885 # in 'attach_loop'
886 self._loop.call_exception_handler({
887 'message': 'Unknown exception in SIGCHLD handler',
888 'exception': exc,
889 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800890
891 def _compute_returncode(self, status):
892 if os.WIFSIGNALED(status):
893 # The child process died because of a signal.
894 return -os.WTERMSIG(status)
895 elif os.WIFEXITED(status):
896 # The child process exited (e.g sys.exit()).
897 return os.WEXITSTATUS(status)
898 else:
899 # The child exited, but we don't understand its status.
900 # This shouldn't happen, but if it does, let's just
901 # return that status; perhaps that helps debug it.
902 return status
903
904
905class SafeChildWatcher(BaseChildWatcher):
906 """'Safe' child watcher implementation.
907
908 This implementation avoids disrupting other code spawning processes by
909 polling explicitly each process in the SIGCHLD handler instead of calling
910 os.waitpid(-1).
911
912 This is a safe solution but it has a significant overhead when handling a
913 big number of children (O(n) each time SIGCHLD is raised)
914 """
915
Guido van Rossum2bcae702013-11-13 15:50:08 -0800916 def close(self):
917 self._callbacks.clear()
918 super().close()
919
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800920 def __enter__(self):
921 return self
922
923 def __exit__(self, a, b, c):
924 pass
925
926 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400927 if self._loop is None:
928 raise RuntimeError(
929 "Cannot add child handler, "
930 "the child watcher does not have a loop attached")
931
Victor Stinner47cd10d2015-01-30 00:05:19 +0100932 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800933
934 # Prevent a race condition in case the child is already terminated.
935 self._do_waitpid(pid)
936
Guido van Rossum2bcae702013-11-13 15:50:08 -0800937 def remove_child_handler(self, pid):
938 try:
939 del self._callbacks[pid]
940 return True
941 except KeyError:
942 return False
943
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800944 def _do_waitpid_all(self):
945
946 for pid in list(self._callbacks):
947 self._do_waitpid(pid)
948
949 def _do_waitpid(self, expected_pid):
950 assert expected_pid > 0
951
952 try:
953 pid, status = os.waitpid(expected_pid, os.WNOHANG)
954 except ChildProcessError:
955 # The child process is already reaped
956 # (may happen if waitpid() is called elsewhere).
957 pid = expected_pid
958 returncode = 255
959 logger.warning(
960 "Unknown child process pid %d, will report returncode 255",
961 pid)
962 else:
963 if pid == 0:
964 # The child process is still alive.
965 return
966
967 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200968 if self._loop.get_debug():
969 logger.debug('process %s exited with returncode %s',
970 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800971
972 try:
973 callback, args = self._callbacks.pop(pid)
974 except KeyError: # pragma: no cover
975 # May happen if .remove_child_handler() is called
976 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200977 if self._loop.get_debug():
978 logger.warning("Child watcher got an unexpected pid: %r",
979 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800980 else:
981 callback(pid, returncode, *args)
982
983
984class FastChildWatcher(BaseChildWatcher):
985 """'Fast' child watcher implementation.
986
987 This implementation reaps every terminated processes by calling
988 os.waitpid(-1) directly, possibly breaking other code spawning processes
989 and waiting for their termination.
990
991 There is no noticeable overhead when handling a big number of children
992 (O(1) each time a child terminates).
993 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800994 def __init__(self):
995 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800996 self._lock = threading.Lock()
997 self._zombies = {}
998 self._forks = 0
999
1000 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001001 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001002 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001003 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001004
1005 def __enter__(self):
1006 with self._lock:
1007 self._forks += 1
1008
1009 return self
1010
1011 def __exit__(self, a, b, c):
1012 with self._lock:
1013 self._forks -= 1
1014
1015 if self._forks or not self._zombies:
1016 return
1017
1018 collateral_victims = str(self._zombies)
1019 self._zombies.clear()
1020
1021 logger.warning(
1022 "Caught subprocesses termination from unknown pids: %s",
1023 collateral_victims)
1024
1025 def add_child_handler(self, pid, callback, *args):
1026 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001027
1028 if self._loop is None:
1029 raise RuntimeError(
1030 "Cannot add child handler, "
1031 "the child watcher does not have a loop attached")
1032
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001033 with self._lock:
1034 try:
1035 returncode = self._zombies.pop(pid)
1036 except KeyError:
1037 # The child is running.
1038 self._callbacks[pid] = callback, args
1039 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001040
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001041 # The child is dead already. We can fire the callback.
1042 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001043
Guido van Rossum2bcae702013-11-13 15:50:08 -08001044 def remove_child_handler(self, pid):
1045 try:
1046 del self._callbacks[pid]
1047 return True
1048 except KeyError:
1049 return False
1050
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001051 def _do_waitpid_all(self):
1052 # Because of signal coalescing, we must keep calling waitpid() as
1053 # long as we're able to reap a child.
1054 while True:
1055 try:
1056 pid, status = os.waitpid(-1, os.WNOHANG)
1057 except ChildProcessError:
1058 # No more child processes exist.
1059 return
1060 else:
1061 if pid == 0:
1062 # A child process is still alive.
1063 return
1064
1065 returncode = self._compute_returncode(status)
1066
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001067 with self._lock:
1068 try:
1069 callback, args = self._callbacks.pop(pid)
1070 except KeyError:
1071 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001072 if self._forks:
1073 # It may not be registered yet.
1074 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001075 if self._loop.get_debug():
1076 logger.debug('unknown process %s exited '
1077 'with returncode %s',
1078 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001079 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001080 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001081 else:
1082 if self._loop.get_debug():
1083 logger.debug('process %s exited with returncode %s',
1084 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001085
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001086 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001087 logger.warning(
1088 "Caught subprocess termination from unknown pid: "
1089 "%d -> %d", pid, returncode)
1090 else:
1091 callback(pid, returncode, *args)
1092
1093
1094class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001095 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001096 _loop_factory = _UnixSelectorEventLoop
1097
1098 def __init__(self):
1099 super().__init__()
1100 self._watcher = None
1101
1102 def _init_watcher(self):
1103 with events._lock:
1104 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001105 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001106 if isinstance(threading.current_thread(),
1107 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001108 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001109
1110 def set_event_loop(self, loop):
1111 """Set the event loop.
1112
1113 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001114 .set_event_loop() from the main thread will call .attach_loop(loop) on
1115 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001116 """
1117
1118 super().set_event_loop(loop)
1119
Andrew Svetlovcc839202017-11-29 18:23:43 +02001120 if (self._watcher is not None and
1121 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001122 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001123
1124 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001125 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001126
1127 If not yet set, a SafeChildWatcher object is automatically created.
1128 """
1129 if self._watcher is None:
1130 self._init_watcher()
1131
1132 return self._watcher
1133
1134 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001135 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001136
1137 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1138
1139 if self._watcher is not None:
1140 self._watcher.close()
1141
1142 self._watcher = watcher
1143
Yury Selivanov6370f342017-12-10 18:36:12 -05001144
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001145SelectorEventLoop = _UnixSelectorEventLoop
1146DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy