blob: 73d4bda7c2341b7c90fb071b5b8ecdef4c6a3dac [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
32 'FastChildWatcher', 'DefaultEventLoopPolicy',
33)
34
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036if sys.platform == 'win32': # pragma: no cover
37 raise ImportError('Signals are not really supported on Windows')
38
39
Victor Stinnerfe5649c2014-07-17 22:43:40 +020040def _sighandler_noop(signum, frame):
41 """Dummy signal handler."""
42 pass
43
44
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080045class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050046 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
Yury Selivanovb057c522014-02-18 12:15:06 -050048 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049 """
50
51 def __init__(self, selector=None):
52 super().__init__(selector)
53 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020057 if not sys.is_finalizing():
58 for sig in list(self._signal_handlers):
59 self.remove_signal_handler(sig)
60 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020061 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020062 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020064 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020065 ResourceWarning,
66 source=self)
67 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080068
Victor Stinnerfe5649c2014-07-17 22:43:40 +020069 def _process_self_data(self, data):
70 for signum in data:
71 if not signum:
72 # ignore null bytes written by _write_to_self()
73 continue
74 self._handle_signal(signum)
75
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 def add_signal_handler(self, sig, callback, *args):
77 """Add a handler for a signal. UNIX only.
78
79 Raise ValueError if the signal number is invalid or uncatchable.
80 Raise RuntimeError if there is a problem setting up the handler.
81 """
Yury Selivanov6370f342017-12-10 18:36:12 -050082 if (coroutines.iscoroutine(callback) or
83 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010084 raise TypeError("coroutines cannot be used "
85 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010087 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 try:
89 # set_wakeup_fd() raises ValueError if this is not the
90 # main thread. By calling it early we ensure that an
91 # event loop running in another thread cannot add a signal
92 # handler.
93 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020094 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 raise RuntimeError(str(exc))
96
Yury Selivanovf23746a2018-01-22 19:11:18 -050097 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._signal_handlers[sig] = handle
99
100 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200101 # Register a dummy signal handler to ask Python to write the signal
102 # number in the wakup file descriptor. _process_self_data() will
103 # read signal numbers from this file descriptor to handle signals.
104 signal.signal(sig, _sighandler_noop)
105
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100106 # Set SA_RESTART to limit EINTR occurrences.
107 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 except OSError as exc:
109 del self._signal_handlers[sig]
110 if not self._signal_handlers:
111 try:
112 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200113 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700114 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500117 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118 else:
119 raise
120
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200121 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 """Internal helper that is the actual signal handler."""
123 handle = self._signal_handlers.get(sig)
124 if handle is None:
125 return # Assume it's some race condition.
126 if handle._cancelled:
127 self.remove_signal_handler(sig) # Remove it properly.
128 else:
129 self._add_callback_signalsafe(handle)
130
131 def remove_signal_handler(self, sig):
132 """Remove a handler for a signal. UNIX only.
133
134 Return True if a signal handler was removed, False if not.
135 """
136 self._check_signal(sig)
137 try:
138 del self._signal_handlers[sig]
139 except KeyError:
140 return False
141
142 if sig == signal.SIGINT:
143 handler = signal.default_int_handler
144 else:
145 handler = signal.SIG_DFL
146
147 try:
148 signal.signal(sig, handler)
149 except OSError as exc:
150 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500151 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 else:
153 raise
154
155 if not self._signal_handlers:
156 try:
157 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200158 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700159 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160
161 return True
162
163 def _check_signal(self, sig):
164 """Internal helper to validate a signal.
165
166 Raise ValueError if the signal number is invalid or uncatchable.
167 Raise RuntimeError if there is a problem setting up the handler.
168 """
169 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500170 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200172 if sig not in signal.valid_signals():
173 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
176 extra=None):
177 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
178
179 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
180 extra=None):
181 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
182
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200183 async def _make_subprocess_transport(self, protocol, args, shell,
184 stdin, stdout, stderr, bufsize,
185 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400187 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188 transp = _UnixSubprocessTransport(self, protocol, args, shell,
189 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100190 waiter=waiter, extra=extra,
191 **kwargs)
192
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800193 watcher.add_child_handler(transp.get_pid(),
194 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100195 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200196 await waiter
197 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100198 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200199 await transp._wait()
200 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800201
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 return transp
203
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800204 def _child_watcher_callback(self, pid, returncode, transp):
205 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
Neil Aspinallf7686c12017-12-19 19:45:42 +0000207 async def create_unix_connection(
208 self, protocol_factory, path=None, *,
209 ssl=None, sock=None,
210 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200211 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200220 if ssl_handshake_timeout is not None:
221 raise ValueError(
222 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500223
224 if path is not None:
225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Andrew Svetlovcc839202017-11-29 18:23:43 +0200229 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200233 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100234 except:
235 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 raise
237
238 else:
239 if sock is None:
240 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400241 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500242 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400243 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500244 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 sock.setblocking(False)
246
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200247 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000248 sock, protocol_factory, ssl, server_hostname,
249 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500250 return transport, protocol
251
Neil Aspinallf7686c12017-12-19 19:45:42 +0000252 async def create_unix_server(
253 self, protocol_factory, path=None, *,
254 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500255 ssl_handshake_timeout=None,
256 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500257 if isinstance(ssl, bool):
258 raise TypeError('ssl argument must be an SSLContext or None')
259
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200260 if ssl_handshake_timeout is not None and not ssl:
261 raise ValueError(
262 'ssl_handshake_timeout is only meaningful with ssl')
263
Yury Selivanovb057c522014-02-18 12:15:06 -0500264 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200265 if sock is not None:
266 raise ValueError(
267 'path and sock can not be specified at the same time')
268
Andrew Svetlovcc839202017-11-29 18:23:43 +0200269 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
271
Yury Selivanov908d55d2016-10-09 12:15:08 -0400272 # Check for abstract socket. `str` and `bytes` paths are supported.
273 if path[0] not in (0, '\x00'):
274 try:
275 if stat.S_ISSOCK(os.stat(path).st_mode):
276 os.remove(path)
277 except FileNotFoundError:
278 pass
279 except OSError as err:
280 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200281 logger.error('Unable to check or remove stale UNIX socket '
282 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400283
Yury Selivanovb057c522014-02-18 12:15:06 -0500284 try:
285 sock.bind(path)
286 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100287 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500288 if exc.errno == errno.EADDRINUSE:
289 # Let's improve the error message by adding
290 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500291 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500292 raise OSError(errno.EADDRINUSE, msg) from None
293 else:
294 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200295 except:
296 sock.close()
297 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500298 else:
299 if sock is None:
300 raise ValueError(
301 'path was not specified, and no sock specified')
302
Yury Selivanov36e7e972016-10-07 12:39:57 -0400303 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500304 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500305 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500306 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500307
Yury Selivanovb057c522014-02-18 12:15:06 -0500308 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500309 server = base_events.Server(self, [sock], protocol_factory,
310 ssl, backlog, ssl_handshake_timeout)
311 if start_serving:
312 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400313 # Skip one loop iteration so that all 'loop.add_reader'
314 # go through.
315 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500316
Yury Selivanovb057c522014-02-18 12:15:06 -0500317 return server
318
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200319 async def _sock_sendfile_native(self, sock, file, offset, count):
320 try:
321 os.sendfile
322 except AttributeError as exc:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700323 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200324 "os.sendfile() is not available")
325 try:
326 fileno = file.fileno()
327 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700328 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200329 try:
330 fsize = os.fstat(fileno).st_size
331 except OSError as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700332 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200333 blocksize = count if count else fsize
334 if not blocksize:
335 return 0 # empty file
336
337 fut = self.create_future()
338 self._sock_sendfile_native_impl(fut, None, sock, fileno,
339 offset, count, blocksize, 0)
340 return await fut
341
342 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
343 offset, count, blocksize, total_sent):
344 fd = sock.fileno()
345 if registered_fd is not None:
346 # Remove the callback early. It should be rare that the
347 # selector says the fd is ready but the call still returns
348 # EAGAIN, and I am willing to take a hit in that case in
349 # order to simplify the common case.
350 self.remove_writer(registered_fd)
351 if fut.cancelled():
352 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
353 return
354 if count:
355 blocksize = count - total_sent
356 if blocksize <= 0:
357 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
358 fut.set_result(total_sent)
359 return
360
361 try:
362 sent = os.sendfile(fd, fileno, offset, blocksize)
363 except (BlockingIOError, InterruptedError):
364 if registered_fd is None:
365 self._sock_add_cancellation_callback(fut, sock)
366 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
367 fd, sock, fileno,
368 offset, count, blocksize, total_sent)
369 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500370 if (registered_fd is not None and
371 exc.errno == errno.ENOTCONN and
372 type(exc) is not ConnectionError):
373 # If we have an ENOTCONN and this isn't a first call to
374 # sendfile(), i.e. the connection was closed in the middle
375 # of the operation, normalize the error to ConnectionError
376 # to make it consistent across all Posix systems.
377 new_exc = ConnectionError(
378 "socket is not connected", errno.ENOTCONN)
379 new_exc.__cause__ = exc
380 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200381 if total_sent == 0:
382 # We can get here for different reasons, the main
383 # one being 'file' is not a regular mmap(2)-like
384 # file, in which case we'll fall back on using
385 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700386 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200387 "os.sendfile call failed")
388 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
389 fut.set_exception(err)
390 else:
391 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
392 fut.set_exception(exc)
393 except Exception as exc:
394 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
395 fut.set_exception(exc)
396 else:
397 if sent == 0:
398 # EOF
399 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
400 fut.set_result(total_sent)
401 else:
402 offset += sent
403 total_sent += sent
404 if registered_fd is None:
405 self._sock_add_cancellation_callback(fut, sock)
406 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
407 fd, sock, fileno,
408 offset, count, blocksize, total_sent)
409
410 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
411 if total_sent > 0:
412 os.lseek(fileno, offset, os.SEEK_SET)
413
414 def _sock_add_cancellation_callback(self, fut, sock):
415 def cb(fut):
416 if fut.cancelled():
417 fd = sock.fileno()
418 if fd != -1:
419 self.remove_writer(fd)
420 fut.add_done_callback(cb)
421
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423class _UnixReadPipeTransport(transports.ReadTransport):
424
Yury Selivanovdec1a452014-02-18 22:27:48 -0500425 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
428 super().__init__(extra)
429 self._extra['pipe'] = pipe
430 self._loop = loop
431 self._pipe = pipe
432 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700433 self._protocol = protocol
434 self._closing = False
435
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700436 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800437 if not (stat.S_ISFIFO(mode) or
438 stat.S_ISSOCK(mode) or
439 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700440 self._pipe = None
441 self._fileno = None
442 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700443 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700444
Andrew Svetlovcc839202017-11-29 18:23:43 +0200445 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700446
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100448 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400449 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100450 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100452 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500453 self._loop.call_soon(futures._set_result_unless_cancelled,
454 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
Victor Stinnere912e652014-07-12 03:11:53 +0200456 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100457 info = [self.__class__.__name__]
458 if self._pipe is None:
459 info.append('closed')
460 elif self._closing:
461 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500462 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400463 selector = getattr(self._loop, '_selector', None)
464 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200465 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500466 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200467 if polling:
468 info.append('polling')
469 else:
470 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400471 elif self._pipe is not None:
472 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200473 else:
474 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500475 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200476
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 def _read_ready(self):
478 try:
479 data = os.read(self._fileno, self.max_size)
480 except (BlockingIOError, InterruptedError):
481 pass
482 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100483 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 else:
485 if data:
486 self._protocol.data_received(data)
487 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200488 if self._loop.get_debug():
489 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400491 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 self._loop.call_soon(self._protocol.eof_received)
493 self._loop.call_soon(self._call_connection_lost, None)
494
Guido van Rossum57497ad2013-10-18 07:58:20 -0700495 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400496 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
Guido van Rossum57497ad2013-10-18 07:58:20 -0700498 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400499 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400501 def set_protocol(self, protocol):
502 self._protocol = protocol
503
504 def get_protocol(self):
505 return self._protocol
506
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500507 def is_closing(self):
508 return self._closing
509
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 def close(self):
511 if not self._closing:
512 self._close(None)
513
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100514 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900515 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100516 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900517 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100518
Victor Stinner0ee29c22014-02-19 01:40:41 +0100519 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200521 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
522 if self._loop.get_debug():
523 logger.debug("%r: %s", self, message, exc_info=True)
524 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500525 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100526 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500527 'exception': exc,
528 'transport': self,
529 'protocol': self._protocol,
530 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 self._close(exc)
532
533 def _close(self, exc):
534 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400535 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 self._loop.call_soon(self._call_connection_lost, exc)
537
538 def _call_connection_lost(self, exc):
539 try:
540 self._protocol.connection_lost(exc)
541 finally:
542 self._pipe.close()
543 self._pipe = None
544 self._protocol = None
545 self._loop = None
546
547
Yury Selivanov3cb99142014-02-18 18:41:13 -0500548class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800549 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100552 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 self._pipe = pipe
555 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400557 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 self._conn_lost = 0
559 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700560
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700561 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700562 is_char = stat.S_ISCHR(mode)
563 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700564 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700565 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700566 self._pipe = None
567 self._fileno = None
568 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100569 raise ValueError("Pipe transport is only for "
570 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700571
Andrew Svetlovcc839202017-11-29 18:23:43 +0200572 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100574
575 # On AIX, the reader trick (to be notified when the read end of the
576 # socket is closed) only works for sockets. On other platforms it
577 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700578 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100579 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400580 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100581 self._fileno, self._read_ready)
582
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100584 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500585 self._loop.call_soon(futures._set_result_unless_cancelled,
586 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587
Victor Stinnere912e652014-07-12 03:11:53 +0200588 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100589 info = [self.__class__.__name__]
590 if self._pipe is None:
591 info.append('closed')
592 elif self._closing:
593 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500594 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400595 selector = getattr(self._loop, '_selector', None)
596 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200597 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500598 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200599 if polling:
600 info.append('polling')
601 else:
602 info.append('idle')
603
604 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500605 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400606 elif self._pipe is not None:
607 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200608 else:
609 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500610 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200611
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800612 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400613 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800614
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700616 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200617 if self._loop.get_debug():
618 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100619 if self._buffer:
620 self._close(BrokenPipeError())
621 else:
622 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623
624 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800625 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
626 if isinstance(data, bytearray):
627 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 if not data:
629 return
630
631 if self._conn_lost or self._closing:
632 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700633 logger.warning('pipe closed by peer or '
634 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 self._conn_lost += 1
636 return
637
638 if not self._buffer:
639 # Attempt to send it right away first.
640 try:
641 n = os.write(self._fileno, data)
642 except (BlockingIOError, InterruptedError):
643 n = 0
644 except Exception as exc:
645 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100646 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 return
648 if n == len(data):
649 return
650 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400651 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400652 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400654 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800655 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
657 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400658 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400661 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400663 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400665 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 self._conn_lost += 1
667 # Remove writer here, _fatal_error() doesn't it
668 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400669 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100670 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400672 if n == len(self._buffer):
673 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400674 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800675 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400676 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400677 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 self._call_connection_lost(None)
679 return
680 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400681 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
683 def can_write_eof(self):
684 return True
685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 def write_eof(self):
687 if self._closing:
688 return
689 assert self._pipe
690 self._closing = True
691 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400692 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 self._loop.call_soon(self._call_connection_lost, None)
694
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400695 def set_protocol(self, protocol):
696 self._protocol = protocol
697
698 def get_protocol(self):
699 return self._protocol
700
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500701 def is_closing(self):
702 return self._closing
703
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100705 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 # write_eof is all what we needed to close the write pipe
707 self.write_eof()
708
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100709 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900710 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100711 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900712 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100713
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714 def abort(self):
715 self._close(None)
716
Victor Stinner0ee29c22014-02-19 01:40:41 +0100717 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200719 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200720 if self._loop.get_debug():
721 logger.debug("%r: %s", self, message, exc_info=True)
722 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500723 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100724 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500725 'exception': exc,
726 'transport': self,
727 'protocol': self._protocol,
728 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 self._close(exc)
730
731 def _close(self, exc=None):
732 self._closing = True
733 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400734 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400736 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 self._loop.call_soon(self._call_connection_lost, exc)
738
739 def _call_connection_lost(self, exc):
740 try:
741 self._protocol.connection_lost(exc)
742 finally:
743 self._pipe.close()
744 self._pipe = None
745 self._protocol = None
746 self._loop = None
747
748
Guido van Rossum59691282013-10-30 14:52:03 -0700749class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750
Guido van Rossum59691282013-10-30 14:52:03 -0700751 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700752 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700753 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700754 # Use a socket pair for stdin, since not all platforms
755 # support selecting read events on the write end of a
756 # socket (which we use in order to detect closing of the
757 # other end). Notably this is needed on AIX, and works
758 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100759 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700760 self._proc = subprocess.Popen(
761 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
762 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700763 if stdin_w is not None:
764 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200765 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800766
767
768class AbstractChildWatcher:
769 """Abstract base class for monitoring child processes.
770
771 Objects derived from this class monitor a collection of subprocesses and
772 report their termination or interruption by a signal.
773
774 New callbacks are registered with .add_child_handler(). Starting a new
775 process must be done within a 'with' block to allow the watcher to suspend
776 its activity until the new process if fully registered (this is needed to
777 prevent a race condition in some implementations).
778
779 Example:
780 with watcher:
781 proc = subprocess.Popen("sleep 1")
782 watcher.add_child_handler(proc.pid, callback)
783
784 Notes:
785 Implementations of this class must be thread-safe.
786
787 Since child watcher objects may catch the SIGCHLD signal and call
788 waitpid(-1), there should be only one active object per process.
789 """
790
791 def add_child_handler(self, pid, callback, *args):
792 """Register a new child handler.
793
794 Arrange for callback(pid, returncode, *args) to be called when
795 process 'pid' terminates. Specifying another callback for the same
796 process replaces the previous handler.
797
Victor Stinneracdb7822014-07-14 18:33:40 +0200798 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800799 """
800 raise NotImplementedError()
801
802 def remove_child_handler(self, pid):
803 """Removes the handler for process 'pid'.
804
805 The function returns True if the handler was successfully removed,
806 False if there was nothing to remove."""
807
808 raise NotImplementedError()
809
Guido van Rossum2bcae702013-11-13 15:50:08 -0800810 def attach_loop(self, loop):
811 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800812
Guido van Rossum2bcae702013-11-13 15:50:08 -0800813 If the watcher was previously attached to an event loop, then it is
814 first detached before attaching to the new loop.
815
816 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800817 """
818 raise NotImplementedError()
819
820 def close(self):
821 """Close the watcher.
822
823 This must be called to make sure that any underlying resource is freed.
824 """
825 raise NotImplementedError()
826
827 def __enter__(self):
828 """Enter the watcher's context and allow starting new processes
829
830 This function must return self"""
831 raise NotImplementedError()
832
833 def __exit__(self, a, b, c):
834 """Exit the watcher's context"""
835 raise NotImplementedError()
836
837
838class BaseChildWatcher(AbstractChildWatcher):
839
Guido van Rossum2bcae702013-11-13 15:50:08 -0800840 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800841 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400842 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800843
844 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800845 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800846
847 def _do_waitpid(self, expected_pid):
848 raise NotImplementedError()
849
850 def _do_waitpid_all(self):
851 raise NotImplementedError()
852
Guido van Rossum2bcae702013-11-13 15:50:08 -0800853 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800854 assert loop is None or isinstance(loop, events.AbstractEventLoop)
855
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400856 if self._loop is not None and loop is None and self._callbacks:
857 warnings.warn(
858 'A loop is being detached '
859 'from a child watcher with pending handlers',
860 RuntimeWarning)
861
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800862 if self._loop is not None:
863 self._loop.remove_signal_handler(signal.SIGCHLD)
864
865 self._loop = loop
866 if loop is not None:
867 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
868
869 # Prevent a race condition in case a child terminated
870 # during the switch.
871 self._do_waitpid_all()
872
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800873 def _sig_chld(self):
874 try:
875 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500876 except Exception as exc:
877 # self._loop should always be available here
878 # as '_sig_chld' is added as a signal handler
879 # in 'attach_loop'
880 self._loop.call_exception_handler({
881 'message': 'Unknown exception in SIGCHLD handler',
882 'exception': exc,
883 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800884
885 def _compute_returncode(self, status):
886 if os.WIFSIGNALED(status):
887 # The child process died because of a signal.
888 return -os.WTERMSIG(status)
889 elif os.WIFEXITED(status):
890 # The child process exited (e.g sys.exit()).
891 return os.WEXITSTATUS(status)
892 else:
893 # The child exited, but we don't understand its status.
894 # This shouldn't happen, but if it does, let's just
895 # return that status; perhaps that helps debug it.
896 return status
897
898
899class SafeChildWatcher(BaseChildWatcher):
900 """'Safe' child watcher implementation.
901
902 This implementation avoids disrupting other code spawning processes by
903 polling explicitly each process in the SIGCHLD handler instead of calling
904 os.waitpid(-1).
905
906 This is a safe solution but it has a significant overhead when handling a
907 big number of children (O(n) each time SIGCHLD is raised)
908 """
909
Guido van Rossum2bcae702013-11-13 15:50:08 -0800910 def close(self):
911 self._callbacks.clear()
912 super().close()
913
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800914 def __enter__(self):
915 return self
916
917 def __exit__(self, a, b, c):
918 pass
919
920 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400921 if self._loop is None:
922 raise RuntimeError(
923 "Cannot add child handler, "
924 "the child watcher does not have a loop attached")
925
Victor Stinner47cd10d2015-01-30 00:05:19 +0100926 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800927
928 # Prevent a race condition in case the child is already terminated.
929 self._do_waitpid(pid)
930
Guido van Rossum2bcae702013-11-13 15:50:08 -0800931 def remove_child_handler(self, pid):
932 try:
933 del self._callbacks[pid]
934 return True
935 except KeyError:
936 return False
937
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800938 def _do_waitpid_all(self):
939
940 for pid in list(self._callbacks):
941 self._do_waitpid(pid)
942
943 def _do_waitpid(self, expected_pid):
944 assert expected_pid > 0
945
946 try:
947 pid, status = os.waitpid(expected_pid, os.WNOHANG)
948 except ChildProcessError:
949 # The child process is already reaped
950 # (may happen if waitpid() is called elsewhere).
951 pid = expected_pid
952 returncode = 255
953 logger.warning(
954 "Unknown child process pid %d, will report returncode 255",
955 pid)
956 else:
957 if pid == 0:
958 # The child process is still alive.
959 return
960
961 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200962 if self._loop.get_debug():
963 logger.debug('process %s exited with returncode %s',
964 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800965
966 try:
967 callback, args = self._callbacks.pop(pid)
968 except KeyError: # pragma: no cover
969 # May happen if .remove_child_handler() is called
970 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200971 if self._loop.get_debug():
972 logger.warning("Child watcher got an unexpected pid: %r",
973 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800974 else:
975 callback(pid, returncode, *args)
976
977
978class FastChildWatcher(BaseChildWatcher):
979 """'Fast' child watcher implementation.
980
981 This implementation reaps every terminated processes by calling
982 os.waitpid(-1) directly, possibly breaking other code spawning processes
983 and waiting for their termination.
984
985 There is no noticeable overhead when handling a big number of children
986 (O(1) each time a child terminates).
987 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800988 def __init__(self):
989 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800990 self._lock = threading.Lock()
991 self._zombies = {}
992 self._forks = 0
993
994 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800995 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800996 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800997 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800998
999 def __enter__(self):
1000 with self._lock:
1001 self._forks += 1
1002
1003 return self
1004
1005 def __exit__(self, a, b, c):
1006 with self._lock:
1007 self._forks -= 1
1008
1009 if self._forks or not self._zombies:
1010 return
1011
1012 collateral_victims = str(self._zombies)
1013 self._zombies.clear()
1014
1015 logger.warning(
1016 "Caught subprocesses termination from unknown pids: %s",
1017 collateral_victims)
1018
1019 def add_child_handler(self, pid, callback, *args):
1020 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001021
1022 if self._loop is None:
1023 raise RuntimeError(
1024 "Cannot add child handler, "
1025 "the child watcher does not have a loop attached")
1026
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001027 with self._lock:
1028 try:
1029 returncode = self._zombies.pop(pid)
1030 except KeyError:
1031 # The child is running.
1032 self._callbacks[pid] = callback, args
1033 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001034
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001035 # The child is dead already. We can fire the callback.
1036 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001037
Guido van Rossum2bcae702013-11-13 15:50:08 -08001038 def remove_child_handler(self, pid):
1039 try:
1040 del self._callbacks[pid]
1041 return True
1042 except KeyError:
1043 return False
1044
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001045 def _do_waitpid_all(self):
1046 # Because of signal coalescing, we must keep calling waitpid() as
1047 # long as we're able to reap a child.
1048 while True:
1049 try:
1050 pid, status = os.waitpid(-1, os.WNOHANG)
1051 except ChildProcessError:
1052 # No more child processes exist.
1053 return
1054 else:
1055 if pid == 0:
1056 # A child process is still alive.
1057 return
1058
1059 returncode = self._compute_returncode(status)
1060
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001061 with self._lock:
1062 try:
1063 callback, args = self._callbacks.pop(pid)
1064 except KeyError:
1065 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001066 if self._forks:
1067 # It may not be registered yet.
1068 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001069 if self._loop.get_debug():
1070 logger.debug('unknown process %s exited '
1071 'with returncode %s',
1072 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001073 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001074 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001075 else:
1076 if self._loop.get_debug():
1077 logger.debug('process %s exited with returncode %s',
1078 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001079
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001080 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001081 logger.warning(
1082 "Caught subprocess termination from unknown pid: "
1083 "%d -> %d", pid, returncode)
1084 else:
1085 callback(pid, returncode, *args)
1086
1087
1088class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001089 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001090 _loop_factory = _UnixSelectorEventLoop
1091
1092 def __init__(self):
1093 super().__init__()
1094 self._watcher = None
1095
1096 def _init_watcher(self):
1097 with events._lock:
1098 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001099 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001100 if isinstance(threading.current_thread(),
1101 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001102 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001103
1104 def set_event_loop(self, loop):
1105 """Set the event loop.
1106
1107 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001108 .set_event_loop() from the main thread will call .attach_loop(loop) on
1109 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001110 """
1111
1112 super().set_event_loop(loop)
1113
Andrew Svetlovcc839202017-11-29 18:23:43 +02001114 if (self._watcher is not None and
1115 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001116 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001117
1118 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001119 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001120
1121 If not yet set, a SafeChildWatcher object is automatically created.
1122 """
1123 if self._watcher is None:
1124 self._init_watcher()
1125
1126 return self._watcher
1127
1128 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001129 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001130
1131 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1132
1133 if self._watcher is not None:
1134 self._watcher.close()
1135
1136 self._watcher = watcher
1137
Yury Selivanov6370f342017-12-10 18:36:12 -05001138
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001139SelectorEventLoop = _UnixSelectorEventLoop
1140DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy