blob: 1a62db4f59bcfef8227dcdc889e5f59057e7bc21 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
32 'FastChildWatcher', 'DefaultEventLoopPolicy',
33)
34
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036if sys.platform == 'win32': # pragma: no cover
37 raise ImportError('Signals are not really supported on Windows')
38
39
Victor Stinnerfe5649c2014-07-17 22:43:40 +020040def _sighandler_noop(signum, frame):
41 """Dummy signal handler."""
42 pass
43
44
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080045class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050046 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
Yury Selivanovb057c522014-02-18 12:15:06 -050048 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049 """
50
51 def __init__(self, selector=None):
52 super().__init__(selector)
53 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020057 if not sys.is_finalizing():
58 for sig in list(self._signal_handlers):
59 self.remove_signal_handler(sig)
60 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020061 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020062 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020064 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020065 ResourceWarning,
66 source=self)
67 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080068
Victor Stinnerfe5649c2014-07-17 22:43:40 +020069 def _process_self_data(self, data):
70 for signum in data:
71 if not signum:
72 # ignore null bytes written by _write_to_self()
73 continue
74 self._handle_signal(signum)
75
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 def add_signal_handler(self, sig, callback, *args):
77 """Add a handler for a signal. UNIX only.
78
79 Raise ValueError if the signal number is invalid or uncatchable.
80 Raise RuntimeError if there is a problem setting up the handler.
81 """
Yury Selivanov6370f342017-12-10 18:36:12 -050082 if (coroutines.iscoroutine(callback) or
83 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010084 raise TypeError("coroutines cannot be used "
85 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010087 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 try:
89 # set_wakeup_fd() raises ValueError if this is not the
90 # main thread. By calling it early we ensure that an
91 # event loop running in another thread cannot add a signal
92 # handler.
93 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020094 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 raise RuntimeError(str(exc))
96
Yury Selivanovf23746a2018-01-22 19:11:18 -050097 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._signal_handlers[sig] = handle
99
100 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200101 # Register a dummy signal handler to ask Python to write the signal
102 # number in the wakup file descriptor. _process_self_data() will
103 # read signal numbers from this file descriptor to handle signals.
104 signal.signal(sig, _sighandler_noop)
105
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100106 # Set SA_RESTART to limit EINTR occurrences.
107 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 except OSError as exc:
109 del self._signal_handlers[sig]
110 if not self._signal_handlers:
111 try:
112 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200113 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700114 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500117 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118 else:
119 raise
120
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200121 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 """Internal helper that is the actual signal handler."""
123 handle = self._signal_handlers.get(sig)
124 if handle is None:
125 return # Assume it's some race condition.
126 if handle._cancelled:
127 self.remove_signal_handler(sig) # Remove it properly.
128 else:
129 self._add_callback_signalsafe(handle)
130
131 def remove_signal_handler(self, sig):
132 """Remove a handler for a signal. UNIX only.
133
134 Return True if a signal handler was removed, False if not.
135 """
136 self._check_signal(sig)
137 try:
138 del self._signal_handlers[sig]
139 except KeyError:
140 return False
141
142 if sig == signal.SIGINT:
143 handler = signal.default_int_handler
144 else:
145 handler = signal.SIG_DFL
146
147 try:
148 signal.signal(sig, handler)
149 except OSError as exc:
150 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500151 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 else:
153 raise
154
155 if not self._signal_handlers:
156 try:
157 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200158 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700159 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160
161 return True
162
163 def _check_signal(self, sig):
164 """Internal helper to validate a signal.
165
166 Raise ValueError if the signal number is invalid or uncatchable.
167 Raise RuntimeError if there is a problem setting up the handler.
168 """
169 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500170 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200172 if sig not in signal.valid_signals():
173 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
176 extra=None):
177 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
178
179 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
180 extra=None):
181 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
182
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200183 async def _make_subprocess_transport(self, protocol, args, shell,
184 stdin, stdout, stderr, bufsize,
185 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400187 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188 transp = _UnixSubprocessTransport(self, protocol, args, shell,
189 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100190 waiter=waiter, extra=extra,
191 **kwargs)
192
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800193 watcher.add_child_handler(transp.get_pid(),
194 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100195 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200196 await waiter
197 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100198 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200199 await transp._wait()
200 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800201
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 return transp
203
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800204 def _child_watcher_callback(self, pid, returncode, transp):
205 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
Neil Aspinallf7686c12017-12-19 19:45:42 +0000207 async def create_unix_connection(
208 self, protocol_factory, path=None, *,
209 ssl=None, sock=None,
210 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200211 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200220 if ssl_handshake_timeout is not None:
221 raise ValueError(
222 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500223
224 if path is not None:
225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Andrew Svetlovcc839202017-11-29 18:23:43 +0200229 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200233 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100234 except:
235 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 raise
237
238 else:
239 if sock is None:
240 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400241 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500242 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400243 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500244 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 sock.setblocking(False)
246
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200247 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000248 sock, protocol_factory, ssl, server_hostname,
249 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500250 return transport, protocol
251
Neil Aspinallf7686c12017-12-19 19:45:42 +0000252 async def create_unix_server(
253 self, protocol_factory, path=None, *,
254 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500255 ssl_handshake_timeout=None,
256 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500257 if isinstance(ssl, bool):
258 raise TypeError('ssl argument must be an SSLContext or None')
259
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200260 if ssl_handshake_timeout is not None and not ssl:
261 raise ValueError(
262 'ssl_handshake_timeout is only meaningful with ssl')
263
Yury Selivanovb057c522014-02-18 12:15:06 -0500264 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200265 if sock is not None:
266 raise ValueError(
267 'path and sock can not be specified at the same time')
268
Andrew Svetlovcc839202017-11-29 18:23:43 +0200269 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
271
Yury Selivanov908d55d2016-10-09 12:15:08 -0400272 # Check for abstract socket. `str` and `bytes` paths are supported.
273 if path[0] not in (0, '\x00'):
274 try:
275 if stat.S_ISSOCK(os.stat(path).st_mode):
276 os.remove(path)
277 except FileNotFoundError:
278 pass
279 except OSError as err:
280 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200281 logger.error('Unable to check or remove stale UNIX socket '
282 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400283
Yury Selivanovb057c522014-02-18 12:15:06 -0500284 try:
285 sock.bind(path)
286 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100287 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500288 if exc.errno == errno.EADDRINUSE:
289 # Let's improve the error message by adding
290 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500291 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500292 raise OSError(errno.EADDRINUSE, msg) from None
293 else:
294 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200295 except:
296 sock.close()
297 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500298 else:
299 if sock is None:
300 raise ValueError(
301 'path was not specified, and no sock specified')
302
Yury Selivanov36e7e972016-10-07 12:39:57 -0400303 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500304 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500305 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500306 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500307
Yury Selivanovb057c522014-02-18 12:15:06 -0500308 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500309 server = base_events.Server(self, [sock], protocol_factory,
310 ssl, backlog, ssl_handshake_timeout)
311 if start_serving:
312 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400313 # Skip one loop iteration so that all 'loop.add_reader'
314 # go through.
315 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500316
Yury Selivanovb057c522014-02-18 12:15:06 -0500317 return server
318
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200319 async def _sock_sendfile_native(self, sock, file, offset, count):
320 try:
321 os.sendfile
322 except AttributeError as exc:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700323 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200324 "os.sendfile() is not available")
325 try:
326 fileno = file.fileno()
327 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700328 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200329 try:
330 fsize = os.fstat(fileno).st_size
331 except OSError as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700332 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200333 blocksize = count if count else fsize
334 if not blocksize:
335 return 0 # empty file
336
337 fut = self.create_future()
338 self._sock_sendfile_native_impl(fut, None, sock, fileno,
339 offset, count, blocksize, 0)
340 return await fut
341
342 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
343 offset, count, blocksize, total_sent):
344 fd = sock.fileno()
345 if registered_fd is not None:
346 # Remove the callback early. It should be rare that the
347 # selector says the fd is ready but the call still returns
348 # EAGAIN, and I am willing to take a hit in that case in
349 # order to simplify the common case.
350 self.remove_writer(registered_fd)
351 if fut.cancelled():
352 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
353 return
354 if count:
355 blocksize = count - total_sent
356 if blocksize <= 0:
357 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
358 fut.set_result(total_sent)
359 return
360
361 try:
362 sent = os.sendfile(fd, fileno, offset, blocksize)
363 except (BlockingIOError, InterruptedError):
364 if registered_fd is None:
365 self._sock_add_cancellation_callback(fut, sock)
366 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
367 fd, sock, fileno,
368 offset, count, blocksize, total_sent)
369 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500370 if (registered_fd is not None and
371 exc.errno == errno.ENOTCONN and
372 type(exc) is not ConnectionError):
373 # If we have an ENOTCONN and this isn't a first call to
374 # sendfile(), i.e. the connection was closed in the middle
375 # of the operation, normalize the error to ConnectionError
376 # to make it consistent across all Posix systems.
377 new_exc = ConnectionError(
378 "socket is not connected", errno.ENOTCONN)
379 new_exc.__cause__ = exc
380 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200381 if total_sent == 0:
382 # We can get here for different reasons, the main
383 # one being 'file' is not a regular mmap(2)-like
384 # file, in which case we'll fall back on using
385 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700386 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200387 "os.sendfile call failed")
388 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
389 fut.set_exception(err)
390 else:
391 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
392 fut.set_exception(exc)
393 except Exception as exc:
394 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
395 fut.set_exception(exc)
396 else:
397 if sent == 0:
398 # EOF
399 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
400 fut.set_result(total_sent)
401 else:
402 offset += sent
403 total_sent += sent
404 if registered_fd is None:
405 self._sock_add_cancellation_callback(fut, sock)
406 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
407 fd, sock, fileno,
408 offset, count, blocksize, total_sent)
409
410 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
411 if total_sent > 0:
412 os.lseek(fileno, offset, os.SEEK_SET)
413
414 def _sock_add_cancellation_callback(self, fut, sock):
415 def cb(fut):
416 if fut.cancelled():
417 fd = sock.fileno()
418 if fd != -1:
419 self.remove_writer(fd)
420 fut.add_done_callback(cb)
421
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423class _UnixReadPipeTransport(transports.ReadTransport):
424
Yury Selivanovdec1a452014-02-18 22:27:48 -0500425 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
428 super().__init__(extra)
429 self._extra['pipe'] = pipe
430 self._loop = loop
431 self._pipe = pipe
432 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700433 self._protocol = protocol
434 self._closing = False
435
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700436 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800437 if not (stat.S_ISFIFO(mode) or
438 stat.S_ISSOCK(mode) or
439 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700440 self._pipe = None
441 self._fileno = None
442 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700443 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700444
Andrew Svetlovcc839202017-11-29 18:23:43 +0200445 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700446
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100448 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400449 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100450 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100452 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500453 self._loop.call_soon(futures._set_result_unless_cancelled,
454 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
Victor Stinnere912e652014-07-12 03:11:53 +0200456 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100457 info = [self.__class__.__name__]
458 if self._pipe is None:
459 info.append('closed')
460 elif self._closing:
461 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500462 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400463 selector = getattr(self._loop, '_selector', None)
464 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200465 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500466 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200467 if polling:
468 info.append('polling')
469 else:
470 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400471 elif self._pipe is not None:
472 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200473 else:
474 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500475 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200476
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 def _read_ready(self):
478 try:
479 data = os.read(self._fileno, self.max_size)
480 except (BlockingIOError, InterruptedError):
481 pass
482 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100483 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 else:
485 if data:
486 self._protocol.data_received(data)
487 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200488 if self._loop.get_debug():
489 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400491 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 self._loop.call_soon(self._protocol.eof_received)
493 self._loop.call_soon(self._call_connection_lost, None)
494
Guido van Rossum57497ad2013-10-18 07:58:20 -0700495 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400496 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
Guido van Rossum57497ad2013-10-18 07:58:20 -0700498 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400499 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400501 def set_protocol(self, protocol):
502 self._protocol = protocol
503
504 def get_protocol(self):
505 return self._protocol
506
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500507 def is_closing(self):
508 return self._closing
509
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 def close(self):
511 if not self._closing:
512 self._close(None)
513
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900514 def __del__(self):
515 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500516 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900517 source=self)
518 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100519
Victor Stinner0ee29c22014-02-19 01:40:41 +0100520 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200522 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
523 if self._loop.get_debug():
524 logger.debug("%r: %s", self, message, exc_info=True)
525 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500526 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100527 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500528 'exception': exc,
529 'transport': self,
530 'protocol': self._protocol,
531 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 self._close(exc)
533
534 def _close(self, exc):
535 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400536 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 self._loop.call_soon(self._call_connection_lost, exc)
538
539 def _call_connection_lost(self, exc):
540 try:
541 self._protocol.connection_lost(exc)
542 finally:
543 self._pipe.close()
544 self._pipe = None
545 self._protocol = None
546 self._loop = None
547
548
Yury Selivanov3cb99142014-02-18 18:41:13 -0500549class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800550 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700551
552 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100553 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 self._pipe = pipe
556 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400558 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 self._conn_lost = 0
560 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700561
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700562 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700563 is_char = stat.S_ISCHR(mode)
564 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700565 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700566 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700567 self._pipe = None
568 self._fileno = None
569 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100570 raise ValueError("Pipe transport is only for "
571 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700572
Andrew Svetlovcc839202017-11-29 18:23:43 +0200573 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100575
576 # On AIX, the reader trick (to be notified when the read end of the
577 # socket is closed) only works for sockets. On other platforms it
578 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700579 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100580 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400581 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100582 self._fileno, self._read_ready)
583
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100585 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500586 self._loop.call_soon(futures._set_result_unless_cancelled,
587 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588
Victor Stinnere912e652014-07-12 03:11:53 +0200589 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100590 info = [self.__class__.__name__]
591 if self._pipe is None:
592 info.append('closed')
593 elif self._closing:
594 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500595 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400596 selector = getattr(self._loop, '_selector', None)
597 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200598 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500599 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200600 if polling:
601 info.append('polling')
602 else:
603 info.append('idle')
604
605 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500606 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400607 elif self._pipe is not None:
608 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200609 else:
610 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500611 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200612
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800613 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400614 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800615
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700617 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200618 if self._loop.get_debug():
619 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100620 if self._buffer:
621 self._close(BrokenPipeError())
622 else:
623 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624
625 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800626 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
627 if isinstance(data, bytearray):
628 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 if not data:
630 return
631
632 if self._conn_lost or self._closing:
633 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700634 logger.warning('pipe closed by peer or '
635 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 self._conn_lost += 1
637 return
638
639 if not self._buffer:
640 # Attempt to send it right away first.
641 try:
642 n = os.write(self._fileno, data)
643 except (BlockingIOError, InterruptedError):
644 n = 0
645 except Exception as exc:
646 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100647 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 return
649 if n == len(data):
650 return
651 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400652 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400653 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400655 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800656 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657
658 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400659 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400662 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400664 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400666 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 self._conn_lost += 1
668 # Remove writer here, _fatal_error() doesn't it
669 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400670 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100671 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400673 if n == len(self._buffer):
674 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400675 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800676 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400677 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400678 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 self._call_connection_lost(None)
680 return
681 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400682 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683
684 def can_write_eof(self):
685 return True
686
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 def write_eof(self):
688 if self._closing:
689 return
690 assert self._pipe
691 self._closing = True
692 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400693 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 self._loop.call_soon(self._call_connection_lost, None)
695
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400696 def set_protocol(self, protocol):
697 self._protocol = protocol
698
699 def get_protocol(self):
700 return self._protocol
701
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500702 def is_closing(self):
703 return self._closing
704
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100706 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 # write_eof is all what we needed to close the write pipe
708 self.write_eof()
709
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900710 def __del__(self):
711 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500712 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900713 source=self)
714 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100715
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716 def abort(self):
717 self._close(None)
718
Victor Stinner0ee29c22014-02-19 01:40:41 +0100719 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200721 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200722 if self._loop.get_debug():
723 logger.debug("%r: %s", self, message, exc_info=True)
724 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500725 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100726 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500727 'exception': exc,
728 'transport': self,
729 'protocol': self._protocol,
730 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 self._close(exc)
732
733 def _close(self, exc=None):
734 self._closing = True
735 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400736 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400738 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700739 self._loop.call_soon(self._call_connection_lost, exc)
740
741 def _call_connection_lost(self, exc):
742 try:
743 self._protocol.connection_lost(exc)
744 finally:
745 self._pipe.close()
746 self._pipe = None
747 self._protocol = None
748 self._loop = None
749
750
Guido van Rossum59691282013-10-30 14:52:03 -0700751class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752
Guido van Rossum59691282013-10-30 14:52:03 -0700753 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700754 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700755 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700756 # Use a socket pair for stdin, since not all platforms
757 # support selecting read events on the write end of a
758 # socket (which we use in order to detect closing of the
759 # other end). Notably this is needed on AIX, and works
760 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100761 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700762 self._proc = subprocess.Popen(
763 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
764 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700765 if stdin_w is not None:
766 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200767 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800768
769
770class AbstractChildWatcher:
771 """Abstract base class for monitoring child processes.
772
773 Objects derived from this class monitor a collection of subprocesses and
774 report their termination or interruption by a signal.
775
776 New callbacks are registered with .add_child_handler(). Starting a new
777 process must be done within a 'with' block to allow the watcher to suspend
778 its activity until the new process if fully registered (this is needed to
779 prevent a race condition in some implementations).
780
781 Example:
782 with watcher:
783 proc = subprocess.Popen("sleep 1")
784 watcher.add_child_handler(proc.pid, callback)
785
786 Notes:
787 Implementations of this class must be thread-safe.
788
789 Since child watcher objects may catch the SIGCHLD signal and call
790 waitpid(-1), there should be only one active object per process.
791 """
792
793 def add_child_handler(self, pid, callback, *args):
794 """Register a new child handler.
795
796 Arrange for callback(pid, returncode, *args) to be called when
797 process 'pid' terminates. Specifying another callback for the same
798 process replaces the previous handler.
799
Victor Stinneracdb7822014-07-14 18:33:40 +0200800 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800801 """
802 raise NotImplementedError()
803
804 def remove_child_handler(self, pid):
805 """Removes the handler for process 'pid'.
806
807 The function returns True if the handler was successfully removed,
808 False if there was nothing to remove."""
809
810 raise NotImplementedError()
811
Guido van Rossum2bcae702013-11-13 15:50:08 -0800812 def attach_loop(self, loop):
813 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800814
Guido van Rossum2bcae702013-11-13 15:50:08 -0800815 If the watcher was previously attached to an event loop, then it is
816 first detached before attaching to the new loop.
817
818 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800819 """
820 raise NotImplementedError()
821
822 def close(self):
823 """Close the watcher.
824
825 This must be called to make sure that any underlying resource is freed.
826 """
827 raise NotImplementedError()
828
829 def __enter__(self):
830 """Enter the watcher's context and allow starting new processes
831
832 This function must return self"""
833 raise NotImplementedError()
834
835 def __exit__(self, a, b, c):
836 """Exit the watcher's context"""
837 raise NotImplementedError()
838
839
840class BaseChildWatcher(AbstractChildWatcher):
841
Guido van Rossum2bcae702013-11-13 15:50:08 -0800842 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800843 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400844 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800845
846 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800847 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800848
849 def _do_waitpid(self, expected_pid):
850 raise NotImplementedError()
851
852 def _do_waitpid_all(self):
853 raise NotImplementedError()
854
Guido van Rossum2bcae702013-11-13 15:50:08 -0800855 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856 assert loop is None or isinstance(loop, events.AbstractEventLoop)
857
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400858 if self._loop is not None and loop is None and self._callbacks:
859 warnings.warn(
860 'A loop is being detached '
861 'from a child watcher with pending handlers',
862 RuntimeWarning)
863
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800864 if self._loop is not None:
865 self._loop.remove_signal_handler(signal.SIGCHLD)
866
867 self._loop = loop
868 if loop is not None:
869 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
870
871 # Prevent a race condition in case a child terminated
872 # during the switch.
873 self._do_waitpid_all()
874
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800875 def _sig_chld(self):
876 try:
877 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500878 except Exception as exc:
879 # self._loop should always be available here
880 # as '_sig_chld' is added as a signal handler
881 # in 'attach_loop'
882 self._loop.call_exception_handler({
883 'message': 'Unknown exception in SIGCHLD handler',
884 'exception': exc,
885 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800886
887 def _compute_returncode(self, status):
888 if os.WIFSIGNALED(status):
889 # The child process died because of a signal.
890 return -os.WTERMSIG(status)
891 elif os.WIFEXITED(status):
892 # The child process exited (e.g sys.exit()).
893 return os.WEXITSTATUS(status)
894 else:
895 # The child exited, but we don't understand its status.
896 # This shouldn't happen, but if it does, let's just
897 # return that status; perhaps that helps debug it.
898 return status
899
900
901class SafeChildWatcher(BaseChildWatcher):
902 """'Safe' child watcher implementation.
903
904 This implementation avoids disrupting other code spawning processes by
905 polling explicitly each process in the SIGCHLD handler instead of calling
906 os.waitpid(-1).
907
908 This is a safe solution but it has a significant overhead when handling a
909 big number of children (O(n) each time SIGCHLD is raised)
910 """
911
Guido van Rossum2bcae702013-11-13 15:50:08 -0800912 def close(self):
913 self._callbacks.clear()
914 super().close()
915
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800916 def __enter__(self):
917 return self
918
919 def __exit__(self, a, b, c):
920 pass
921
922 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400923 if self._loop is None:
924 raise RuntimeError(
925 "Cannot add child handler, "
926 "the child watcher does not have a loop attached")
927
Victor Stinner47cd10d2015-01-30 00:05:19 +0100928 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800929
930 # Prevent a race condition in case the child is already terminated.
931 self._do_waitpid(pid)
932
Guido van Rossum2bcae702013-11-13 15:50:08 -0800933 def remove_child_handler(self, pid):
934 try:
935 del self._callbacks[pid]
936 return True
937 except KeyError:
938 return False
939
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800940 def _do_waitpid_all(self):
941
942 for pid in list(self._callbacks):
943 self._do_waitpid(pid)
944
945 def _do_waitpid(self, expected_pid):
946 assert expected_pid > 0
947
948 try:
949 pid, status = os.waitpid(expected_pid, os.WNOHANG)
950 except ChildProcessError:
951 # The child process is already reaped
952 # (may happen if waitpid() is called elsewhere).
953 pid = expected_pid
954 returncode = 255
955 logger.warning(
956 "Unknown child process pid %d, will report returncode 255",
957 pid)
958 else:
959 if pid == 0:
960 # The child process is still alive.
961 return
962
963 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200964 if self._loop.get_debug():
965 logger.debug('process %s exited with returncode %s',
966 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800967
968 try:
969 callback, args = self._callbacks.pop(pid)
970 except KeyError: # pragma: no cover
971 # May happen if .remove_child_handler() is called
972 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200973 if self._loop.get_debug():
974 logger.warning("Child watcher got an unexpected pid: %r",
975 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800976 else:
977 callback(pid, returncode, *args)
978
979
980class FastChildWatcher(BaseChildWatcher):
981 """'Fast' child watcher implementation.
982
983 This implementation reaps every terminated processes by calling
984 os.waitpid(-1) directly, possibly breaking other code spawning processes
985 and waiting for their termination.
986
987 There is no noticeable overhead when handling a big number of children
988 (O(1) each time a child terminates).
989 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800990 def __init__(self):
991 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800992 self._lock = threading.Lock()
993 self._zombies = {}
994 self._forks = 0
995
996 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800997 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800998 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800999 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001000
1001 def __enter__(self):
1002 with self._lock:
1003 self._forks += 1
1004
1005 return self
1006
1007 def __exit__(self, a, b, c):
1008 with self._lock:
1009 self._forks -= 1
1010
1011 if self._forks or not self._zombies:
1012 return
1013
1014 collateral_victims = str(self._zombies)
1015 self._zombies.clear()
1016
1017 logger.warning(
1018 "Caught subprocesses termination from unknown pids: %s",
1019 collateral_victims)
1020
1021 def add_child_handler(self, pid, callback, *args):
1022 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001023
1024 if self._loop is None:
1025 raise RuntimeError(
1026 "Cannot add child handler, "
1027 "the child watcher does not have a loop attached")
1028
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001029 with self._lock:
1030 try:
1031 returncode = self._zombies.pop(pid)
1032 except KeyError:
1033 # The child is running.
1034 self._callbacks[pid] = callback, args
1035 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001036
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001037 # The child is dead already. We can fire the callback.
1038 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001039
Guido van Rossum2bcae702013-11-13 15:50:08 -08001040 def remove_child_handler(self, pid):
1041 try:
1042 del self._callbacks[pid]
1043 return True
1044 except KeyError:
1045 return False
1046
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001047 def _do_waitpid_all(self):
1048 # Because of signal coalescing, we must keep calling waitpid() as
1049 # long as we're able to reap a child.
1050 while True:
1051 try:
1052 pid, status = os.waitpid(-1, os.WNOHANG)
1053 except ChildProcessError:
1054 # No more child processes exist.
1055 return
1056 else:
1057 if pid == 0:
1058 # A child process is still alive.
1059 return
1060
1061 returncode = self._compute_returncode(status)
1062
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001063 with self._lock:
1064 try:
1065 callback, args = self._callbacks.pop(pid)
1066 except KeyError:
1067 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001068 if self._forks:
1069 # It may not be registered yet.
1070 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001071 if self._loop.get_debug():
1072 logger.debug('unknown process %s exited '
1073 'with returncode %s',
1074 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001075 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001076 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001077 else:
1078 if self._loop.get_debug():
1079 logger.debug('process %s exited with returncode %s',
1080 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001081
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001082 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001083 logger.warning(
1084 "Caught subprocess termination from unknown pid: "
1085 "%d -> %d", pid, returncode)
1086 else:
1087 callback(pid, returncode, *args)
1088
1089
1090class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001091 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001092 _loop_factory = _UnixSelectorEventLoop
1093
1094 def __init__(self):
1095 super().__init__()
1096 self._watcher = None
1097
1098 def _init_watcher(self):
1099 with events._lock:
1100 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001101 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001102 if isinstance(threading.current_thread(),
1103 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001104 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001105
1106 def set_event_loop(self, loop):
1107 """Set the event loop.
1108
1109 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001110 .set_event_loop() from the main thread will call .attach_loop(loop) on
1111 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001112 """
1113
1114 super().set_event_loop(loop)
1115
Andrew Svetlovcc839202017-11-29 18:23:43 +02001116 if (self._watcher is not None and
1117 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001118 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001119
1120 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001121 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001122
1123 If not yet set, a SafeChildWatcher object is automatically created.
1124 """
1125 if self._watcher is None:
1126 self._init_watcher()
1127
1128 return self._watcher
1129
1130 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001131 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001132
1133 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1134
1135 if self._watcher is not None:
1136 self._watcher.close()
1137
1138 self._watcher = watcher
1139
Yury Selivanov6370f342017-12-10 18:36:12 -05001140
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001141SelectorEventLoop = _UnixSelectorEventLoop
1142DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy