blob: f64037a25c67b8c5a97e3cd2a71acab07733266d [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010021from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Yury Selivanov6370f342017-12-10 18:36:12 -050027__all__ = (
28 'SelectorEventLoop',
29 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31)
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Victor Stinnerfe5649c2014-07-17 22:43:40 +020038def _sighandler_noop(signum, frame):
39 """Dummy signal handler."""
40 pass
41
42
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080043class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050044 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
Yury Selivanovb057c522014-02-18 12:15:06 -050046 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047 """
48
49 def __init__(self, selector=None):
50 super().__init__(selector)
51 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020054 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020055 if not sys.is_finalizing():
56 for sig in list(self._signal_handlers):
57 self.remove_signal_handler(sig)
58 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020059 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020060 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020061 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020062 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 ResourceWarning,
64 source=self)
65 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080066
Victor Stinnerfe5649c2014-07-17 22:43:40 +020067 def _process_self_data(self, data):
68 for signum in data:
69 if not signum:
70 # ignore null bytes written by _write_to_self()
71 continue
72 self._handle_signal(signum)
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def add_signal_handler(self, sig, callback, *args):
75 """Add a handler for a signal. UNIX only.
76
77 Raise ValueError if the signal number is invalid or uncatchable.
78 Raise RuntimeError if there is a problem setting up the handler.
79 """
Yury Selivanov6370f342017-12-10 18:36:12 -050080 if (coroutines.iscoroutine(callback) or
81 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010082 raise TypeError("coroutines cannot be used "
83 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010085 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 try:
87 # set_wakeup_fd() raises ValueError if this is not the
88 # main thread. By calling it early we ensure that an
89 # event loop running in another thread cannot add a signal
90 # handler.
91 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020092 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 raise RuntimeError(str(exc))
94
Yury Selivanovf23746a2018-01-22 19:11:18 -050095 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 self._signal_handlers[sig] = handle
97
98 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020099 # Register a dummy signal handler to ask Python to write the signal
100 # number in the wakup file descriptor. _process_self_data() will
101 # read signal numbers from this file descriptor to handle signals.
102 signal.signal(sig, _sighandler_noop)
103
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100104 # Set SA_RESTART to limit EINTR occurrences.
105 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 except OSError as exc:
107 del self._signal_handlers[sig]
108 if not self._signal_handlers:
109 try:
110 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200111 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700112 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500115 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 else:
117 raise
118
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200119 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 """Internal helper that is the actual signal handler."""
121 handle = self._signal_handlers.get(sig)
122 if handle is None:
123 return # Assume it's some race condition.
124 if handle._cancelled:
125 self.remove_signal_handler(sig) # Remove it properly.
126 else:
127 self._add_callback_signalsafe(handle)
128
129 def remove_signal_handler(self, sig):
130 """Remove a handler for a signal. UNIX only.
131
132 Return True if a signal handler was removed, False if not.
133 """
134 self._check_signal(sig)
135 try:
136 del self._signal_handlers[sig]
137 except KeyError:
138 return False
139
140 if sig == signal.SIGINT:
141 handler = signal.default_int_handler
142 else:
143 handler = signal.SIG_DFL
144
145 try:
146 signal.signal(sig, handler)
147 except OSError as exc:
148 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500149 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 else:
151 raise
152
153 if not self._signal_handlers:
154 try:
155 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200156 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700157 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 return True
160
161 def _check_signal(self, sig):
162 """Internal helper to validate a signal.
163
164 Raise ValueError if the signal number is invalid or uncatchable.
165 Raise RuntimeError if there is a problem setting up the handler.
166 """
167 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500168 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200170 if sig not in signal.valid_signals():
171 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
174 extra=None):
175 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
176
177 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
178 extra=None):
179 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
180
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200181 async def _make_subprocess_transport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
183 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800184 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400185 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 transp = _UnixSubprocessTransport(self, protocol, args, shell,
187 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 waiter=waiter, extra=extra,
189 **kwargs)
190
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800191 watcher.add_child_handler(transp.get_pid(),
192 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100193 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200194 await waiter
195 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100196 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200197 await transp._wait()
198 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800199
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 return transp
201
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800202 def _child_watcher_callback(self, pid, returncode, transp):
203 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204
Neil Aspinallf7686c12017-12-19 19:45:42 +0000205 async def create_unix_connection(
206 self, protocol_factory, path=None, *,
207 ssl=None, sock=None,
208 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200209 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500210 assert server_hostname is None or isinstance(server_hostname, str)
211 if ssl:
212 if server_hostname is None:
213 raise ValueError(
214 'you have to pass server_hostname when using ssl')
215 else:
216 if server_hostname is not None:
217 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200218 if ssl_handshake_timeout is not None:
219 raise ValueError(
220 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500221
222 if path is not None:
223 if sock is not None:
224 raise ValueError(
225 'path and sock can not be specified at the same time')
226
Andrew Svetlovcc839202017-11-29 18:23:43 +0200227 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100228 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200231 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100232 except:
233 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 raise
235
236 else:
237 if sock is None:
238 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400239 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500240 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400241 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500242 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500243 sock.setblocking(False)
244
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200245 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000246 sock, protocol_factory, ssl, server_hostname,
247 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500248 return transport, protocol
249
Neil Aspinallf7686c12017-12-19 19:45:42 +0000250 async def create_unix_server(
251 self, protocol_factory, path=None, *,
252 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500253 ssl_handshake_timeout=None,
254 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500255 if isinstance(ssl, bool):
256 raise TypeError('ssl argument must be an SSLContext or None')
257
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200258 if ssl_handshake_timeout is not None and not ssl:
259 raise ValueError(
260 'ssl_handshake_timeout is only meaningful with ssl')
261
Yury Selivanovb057c522014-02-18 12:15:06 -0500262 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200263 if sock is not None:
264 raise ValueError(
265 'path and sock can not be specified at the same time')
266
Andrew Svetlovcc839202017-11-29 18:23:43 +0200267 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500268 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
269
Yury Selivanov908d55d2016-10-09 12:15:08 -0400270 # Check for abstract socket. `str` and `bytes` paths are supported.
271 if path[0] not in (0, '\x00'):
272 try:
273 if stat.S_ISSOCK(os.stat(path).st_mode):
274 os.remove(path)
275 except FileNotFoundError:
276 pass
277 except OSError as err:
278 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200279 logger.error('Unable to check or remove stale UNIX socket '
280 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400281
Yury Selivanovb057c522014-02-18 12:15:06 -0500282 try:
283 sock.bind(path)
284 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100285 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500286 if exc.errno == errno.EADDRINUSE:
287 # Let's improve the error message by adding
288 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500289 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500290 raise OSError(errno.EADDRINUSE, msg) from None
291 else:
292 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200293 except:
294 sock.close()
295 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500296 else:
297 if sock is None:
298 raise ValueError(
299 'path was not specified, and no sock specified')
300
Yury Selivanov36e7e972016-10-07 12:39:57 -0400301 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500302 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500303 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500304 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500305
Yury Selivanovb057c522014-02-18 12:15:06 -0500306 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500307 server = base_events.Server(self, [sock], protocol_factory,
308 ssl, backlog, ssl_handshake_timeout)
309 if start_serving:
310 server._start_serving()
311
Yury Selivanovb057c522014-02-18 12:15:06 -0500312 return server
313
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200314 async def _sock_sendfile_native(self, sock, file, offset, count):
315 try:
316 os.sendfile
317 except AttributeError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200318 raise events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200319 "os.sendfile() is not available")
320 try:
321 fileno = file.fileno()
322 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200323 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200324 try:
325 fsize = os.fstat(fileno).st_size
326 except OSError as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200327 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200328 blocksize = count if count else fsize
329 if not blocksize:
330 return 0 # empty file
331
332 fut = self.create_future()
333 self._sock_sendfile_native_impl(fut, None, sock, fileno,
334 offset, count, blocksize, 0)
335 return await fut
336
337 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
338 offset, count, blocksize, total_sent):
339 fd = sock.fileno()
340 if registered_fd is not None:
341 # Remove the callback early. It should be rare that the
342 # selector says the fd is ready but the call still returns
343 # EAGAIN, and I am willing to take a hit in that case in
344 # order to simplify the common case.
345 self.remove_writer(registered_fd)
346 if fut.cancelled():
347 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
348 return
349 if count:
350 blocksize = count - total_sent
351 if blocksize <= 0:
352 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
353 fut.set_result(total_sent)
354 return
355
356 try:
357 sent = os.sendfile(fd, fileno, offset, blocksize)
358 except (BlockingIOError, InterruptedError):
359 if registered_fd is None:
360 self._sock_add_cancellation_callback(fut, sock)
361 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
362 fd, sock, fileno,
363 offset, count, blocksize, total_sent)
364 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500365 if (registered_fd is not None and
366 exc.errno == errno.ENOTCONN and
367 type(exc) is not ConnectionError):
368 # If we have an ENOTCONN and this isn't a first call to
369 # sendfile(), i.e. the connection was closed in the middle
370 # of the operation, normalize the error to ConnectionError
371 # to make it consistent across all Posix systems.
372 new_exc = ConnectionError(
373 "socket is not connected", errno.ENOTCONN)
374 new_exc.__cause__ = exc
375 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200376 if total_sent == 0:
377 # We can get here for different reasons, the main
378 # one being 'file' is not a regular mmap(2)-like
379 # file, in which case we'll fall back on using
380 # plain send().
Andrew Svetlov7464e872018-01-19 20:04:29 +0200381 err = events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200382 "os.sendfile call failed")
383 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
384 fut.set_exception(err)
385 else:
386 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
387 fut.set_exception(exc)
388 except Exception as exc:
389 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
390 fut.set_exception(exc)
391 else:
392 if sent == 0:
393 # EOF
394 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
395 fut.set_result(total_sent)
396 else:
397 offset += sent
398 total_sent += sent
399 if registered_fd is None:
400 self._sock_add_cancellation_callback(fut, sock)
401 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
402 fd, sock, fileno,
403 offset, count, blocksize, total_sent)
404
405 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
406 if total_sent > 0:
407 os.lseek(fileno, offset, os.SEEK_SET)
408
409 def _sock_add_cancellation_callback(self, fut, sock):
410 def cb(fut):
411 if fut.cancelled():
412 fd = sock.fileno()
413 if fd != -1:
414 self.remove_writer(fd)
415 fut.add_done_callback(cb)
416
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418class _UnixReadPipeTransport(transports.ReadTransport):
419
Yury Selivanovdec1a452014-02-18 22:27:48 -0500420 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
422 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
423 super().__init__(extra)
424 self._extra['pipe'] = pipe
425 self._loop = loop
426 self._pipe = pipe
427 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700428 self._protocol = protocol
429 self._closing = False
430
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700431 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800432 if not (stat.S_ISFIFO(mode) or
433 stat.S_ISSOCK(mode) or
434 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700435 self._pipe = None
436 self._fileno = None
437 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700438 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700439
Andrew Svetlovcc839202017-11-29 18:23:43 +0200440 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700441
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100443 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400444 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100445 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100447 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500448 self._loop.call_soon(futures._set_result_unless_cancelled,
449 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
Victor Stinnere912e652014-07-12 03:11:53 +0200451 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100452 info = [self.__class__.__name__]
453 if self._pipe is None:
454 info.append('closed')
455 elif self._closing:
456 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500457 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400458 selector = getattr(self._loop, '_selector', None)
459 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200460 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500461 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200462 if polling:
463 info.append('polling')
464 else:
465 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400466 elif self._pipe is not None:
467 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200468 else:
469 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500470 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200471
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 def _read_ready(self):
473 try:
474 data = os.read(self._fileno, self.max_size)
475 except (BlockingIOError, InterruptedError):
476 pass
477 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100478 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 else:
480 if data:
481 self._protocol.data_received(data)
482 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200483 if self._loop.get_debug():
484 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400486 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 self._loop.call_soon(self._protocol.eof_received)
488 self._loop.call_soon(self._call_connection_lost, None)
489
Guido van Rossum57497ad2013-10-18 07:58:20 -0700490 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400491 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492
Guido van Rossum57497ad2013-10-18 07:58:20 -0700493 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400494 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400496 def set_protocol(self, protocol):
497 self._protocol = protocol
498
499 def get_protocol(self):
500 return self._protocol
501
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500502 def is_closing(self):
503 return self._closing
504
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 def close(self):
506 if not self._closing:
507 self._close(None)
508
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900509 def __del__(self):
510 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500511 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900512 source=self)
513 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100514
Victor Stinner0ee29c22014-02-19 01:40:41 +0100515 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200517 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
518 if self._loop.get_debug():
519 logger.debug("%r: %s", self, message, exc_info=True)
520 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500521 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100522 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500523 'exception': exc,
524 'transport': self,
525 'protocol': self._protocol,
526 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 self._close(exc)
528
529 def _close(self, exc):
530 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400531 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 self._loop.call_soon(self._call_connection_lost, exc)
533
534 def _call_connection_lost(self, exc):
535 try:
536 self._protocol.connection_lost(exc)
537 finally:
538 self._pipe.close()
539 self._pipe = None
540 self._protocol = None
541 self._loop = None
542
543
Yury Selivanov3cb99142014-02-18 18:41:13 -0500544class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800545 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
547 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100548 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 self._pipe = pipe
551 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400553 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 self._conn_lost = 0
555 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700556
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700557 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700558 is_char = stat.S_ISCHR(mode)
559 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700560 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700561 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700562 self._pipe = None
563 self._fileno = None
564 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100565 raise ValueError("Pipe transport is only for "
566 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700567
Andrew Svetlovcc839202017-11-29 18:23:43 +0200568 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100570
571 # On AIX, the reader trick (to be notified when the read end of the
572 # socket is closed) only works for sockets. On other platforms it
573 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700574 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100575 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400576 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100577 self._fileno, self._read_ready)
578
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100580 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500581 self._loop.call_soon(futures._set_result_unless_cancelled,
582 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583
Victor Stinnere912e652014-07-12 03:11:53 +0200584 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100585 info = [self.__class__.__name__]
586 if self._pipe is None:
587 info.append('closed')
588 elif self._closing:
589 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500590 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400591 selector = getattr(self._loop, '_selector', None)
592 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200593 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500594 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200595 if polling:
596 info.append('polling')
597 else:
598 info.append('idle')
599
600 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500601 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400602 elif self._pipe is not None:
603 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200604 else:
605 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500606 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200607
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800608 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400609 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800610
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700612 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200613 if self._loop.get_debug():
614 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100615 if self._buffer:
616 self._close(BrokenPipeError())
617 else:
618 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619
620 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800621 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
622 if isinstance(data, bytearray):
623 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 if not data:
625 return
626
627 if self._conn_lost or self._closing:
628 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700629 logger.warning('pipe closed by peer or '
630 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 self._conn_lost += 1
632 return
633
634 if not self._buffer:
635 # Attempt to send it right away first.
636 try:
637 n = os.write(self._fileno, data)
638 except (BlockingIOError, InterruptedError):
639 n = 0
640 except Exception as exc:
641 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100642 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 return
644 if n == len(data):
645 return
646 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400647 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400648 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400650 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800651 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652
653 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400654 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400657 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400659 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400661 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 self._conn_lost += 1
663 # Remove writer here, _fatal_error() doesn't it
664 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400665 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100666 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400668 if n == len(self._buffer):
669 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400670 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800671 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400672 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400673 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 self._call_connection_lost(None)
675 return
676 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400677 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678
679 def can_write_eof(self):
680 return True
681
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 def write_eof(self):
683 if self._closing:
684 return
685 assert self._pipe
686 self._closing = True
687 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400688 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 self._loop.call_soon(self._call_connection_lost, None)
690
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400691 def set_protocol(self, protocol):
692 self._protocol = protocol
693
694 def get_protocol(self):
695 return self._protocol
696
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500697 def is_closing(self):
698 return self._closing
699
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100701 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 # write_eof is all what we needed to close the write pipe
703 self.write_eof()
704
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900705 def __del__(self):
706 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500707 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900708 source=self)
709 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100710
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711 def abort(self):
712 self._close(None)
713
Victor Stinner0ee29c22014-02-19 01:40:41 +0100714 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200716 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200717 if self._loop.get_debug():
718 logger.debug("%r: %s", self, message, exc_info=True)
719 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500720 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100721 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500722 'exception': exc,
723 'transport': self,
724 'protocol': self._protocol,
725 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700726 self._close(exc)
727
728 def _close(self, exc=None):
729 self._closing = True
730 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400731 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400733 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 self._loop.call_soon(self._call_connection_lost, exc)
735
736 def _call_connection_lost(self, exc):
737 try:
738 self._protocol.connection_lost(exc)
739 finally:
740 self._pipe.close()
741 self._pipe = None
742 self._protocol = None
743 self._loop = None
744
745
Guido van Rossum59691282013-10-30 14:52:03 -0700746class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747
Guido van Rossum59691282013-10-30 14:52:03 -0700748 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700749 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700751 # Use a socket pair for stdin, since not all platforms
752 # support selecting read events on the write end of a
753 # socket (which we use in order to detect closing of the
754 # other end). Notably this is needed on AIX, and works
755 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100756 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700757 self._proc = subprocess.Popen(
758 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
759 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700760 if stdin_w is not None:
761 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200762 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800763
764
765class AbstractChildWatcher:
766 """Abstract base class for monitoring child processes.
767
768 Objects derived from this class monitor a collection of subprocesses and
769 report their termination or interruption by a signal.
770
771 New callbacks are registered with .add_child_handler(). Starting a new
772 process must be done within a 'with' block to allow the watcher to suspend
773 its activity until the new process if fully registered (this is needed to
774 prevent a race condition in some implementations).
775
776 Example:
777 with watcher:
778 proc = subprocess.Popen("sleep 1")
779 watcher.add_child_handler(proc.pid, callback)
780
781 Notes:
782 Implementations of this class must be thread-safe.
783
784 Since child watcher objects may catch the SIGCHLD signal and call
785 waitpid(-1), there should be only one active object per process.
786 """
787
788 def add_child_handler(self, pid, callback, *args):
789 """Register a new child handler.
790
791 Arrange for callback(pid, returncode, *args) to be called when
792 process 'pid' terminates. Specifying another callback for the same
793 process replaces the previous handler.
794
Victor Stinneracdb7822014-07-14 18:33:40 +0200795 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800796 """
797 raise NotImplementedError()
798
799 def remove_child_handler(self, pid):
800 """Removes the handler for process 'pid'.
801
802 The function returns True if the handler was successfully removed,
803 False if there was nothing to remove."""
804
805 raise NotImplementedError()
806
Guido van Rossum2bcae702013-11-13 15:50:08 -0800807 def attach_loop(self, loop):
808 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800809
Guido van Rossum2bcae702013-11-13 15:50:08 -0800810 If the watcher was previously attached to an event loop, then it is
811 first detached before attaching to the new loop.
812
813 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800814 """
815 raise NotImplementedError()
816
817 def close(self):
818 """Close the watcher.
819
820 This must be called to make sure that any underlying resource is freed.
821 """
822 raise NotImplementedError()
823
824 def __enter__(self):
825 """Enter the watcher's context and allow starting new processes
826
827 This function must return self"""
828 raise NotImplementedError()
829
830 def __exit__(self, a, b, c):
831 """Exit the watcher's context"""
832 raise NotImplementedError()
833
834
835class BaseChildWatcher(AbstractChildWatcher):
836
Guido van Rossum2bcae702013-11-13 15:50:08 -0800837 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800838 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400839 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800840
841 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800842 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800843
844 def _do_waitpid(self, expected_pid):
845 raise NotImplementedError()
846
847 def _do_waitpid_all(self):
848 raise NotImplementedError()
849
Guido van Rossum2bcae702013-11-13 15:50:08 -0800850 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800851 assert loop is None or isinstance(loop, events.AbstractEventLoop)
852
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400853 if self._loop is not None and loop is None and self._callbacks:
854 warnings.warn(
855 'A loop is being detached '
856 'from a child watcher with pending handlers',
857 RuntimeWarning)
858
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800859 if self._loop is not None:
860 self._loop.remove_signal_handler(signal.SIGCHLD)
861
862 self._loop = loop
863 if loop is not None:
864 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
865
866 # Prevent a race condition in case a child terminated
867 # during the switch.
868 self._do_waitpid_all()
869
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800870 def _sig_chld(self):
871 try:
872 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500873 except Exception as exc:
874 # self._loop should always be available here
875 # as '_sig_chld' is added as a signal handler
876 # in 'attach_loop'
877 self._loop.call_exception_handler({
878 'message': 'Unknown exception in SIGCHLD handler',
879 'exception': exc,
880 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800881
882 def _compute_returncode(self, status):
883 if os.WIFSIGNALED(status):
884 # The child process died because of a signal.
885 return -os.WTERMSIG(status)
886 elif os.WIFEXITED(status):
887 # The child process exited (e.g sys.exit()).
888 return os.WEXITSTATUS(status)
889 else:
890 # The child exited, but we don't understand its status.
891 # This shouldn't happen, but if it does, let's just
892 # return that status; perhaps that helps debug it.
893 return status
894
895
896class SafeChildWatcher(BaseChildWatcher):
897 """'Safe' child watcher implementation.
898
899 This implementation avoids disrupting other code spawning processes by
900 polling explicitly each process in the SIGCHLD handler instead of calling
901 os.waitpid(-1).
902
903 This is a safe solution but it has a significant overhead when handling a
904 big number of children (O(n) each time SIGCHLD is raised)
905 """
906
Guido van Rossum2bcae702013-11-13 15:50:08 -0800907 def close(self):
908 self._callbacks.clear()
909 super().close()
910
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800911 def __enter__(self):
912 return self
913
914 def __exit__(self, a, b, c):
915 pass
916
917 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400918 if self._loop is None:
919 raise RuntimeError(
920 "Cannot add child handler, "
921 "the child watcher does not have a loop attached")
922
Victor Stinner47cd10d2015-01-30 00:05:19 +0100923 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800924
925 # Prevent a race condition in case the child is already terminated.
926 self._do_waitpid(pid)
927
Guido van Rossum2bcae702013-11-13 15:50:08 -0800928 def remove_child_handler(self, pid):
929 try:
930 del self._callbacks[pid]
931 return True
932 except KeyError:
933 return False
934
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800935 def _do_waitpid_all(self):
936
937 for pid in list(self._callbacks):
938 self._do_waitpid(pid)
939
940 def _do_waitpid(self, expected_pid):
941 assert expected_pid > 0
942
943 try:
944 pid, status = os.waitpid(expected_pid, os.WNOHANG)
945 except ChildProcessError:
946 # The child process is already reaped
947 # (may happen if waitpid() is called elsewhere).
948 pid = expected_pid
949 returncode = 255
950 logger.warning(
951 "Unknown child process pid %d, will report returncode 255",
952 pid)
953 else:
954 if pid == 0:
955 # The child process is still alive.
956 return
957
958 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200959 if self._loop.get_debug():
960 logger.debug('process %s exited with returncode %s',
961 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800962
963 try:
964 callback, args = self._callbacks.pop(pid)
965 except KeyError: # pragma: no cover
966 # May happen if .remove_child_handler() is called
967 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200968 if self._loop.get_debug():
969 logger.warning("Child watcher got an unexpected pid: %r",
970 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800971 else:
972 callback(pid, returncode, *args)
973
974
975class FastChildWatcher(BaseChildWatcher):
976 """'Fast' child watcher implementation.
977
978 This implementation reaps every terminated processes by calling
979 os.waitpid(-1) directly, possibly breaking other code spawning processes
980 and waiting for their termination.
981
982 There is no noticeable overhead when handling a big number of children
983 (O(1) each time a child terminates).
984 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800985 def __init__(self):
986 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800987 self._lock = threading.Lock()
988 self._zombies = {}
989 self._forks = 0
990
991 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800992 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800993 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800994 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800995
996 def __enter__(self):
997 with self._lock:
998 self._forks += 1
999
1000 return self
1001
1002 def __exit__(self, a, b, c):
1003 with self._lock:
1004 self._forks -= 1
1005
1006 if self._forks or not self._zombies:
1007 return
1008
1009 collateral_victims = str(self._zombies)
1010 self._zombies.clear()
1011
1012 logger.warning(
1013 "Caught subprocesses termination from unknown pids: %s",
1014 collateral_victims)
1015
1016 def add_child_handler(self, pid, callback, *args):
1017 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001018
1019 if self._loop is None:
1020 raise RuntimeError(
1021 "Cannot add child handler, "
1022 "the child watcher does not have a loop attached")
1023
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001024 with self._lock:
1025 try:
1026 returncode = self._zombies.pop(pid)
1027 except KeyError:
1028 # The child is running.
1029 self._callbacks[pid] = callback, args
1030 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001031
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001032 # The child is dead already. We can fire the callback.
1033 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001034
Guido van Rossum2bcae702013-11-13 15:50:08 -08001035 def remove_child_handler(self, pid):
1036 try:
1037 del self._callbacks[pid]
1038 return True
1039 except KeyError:
1040 return False
1041
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001042 def _do_waitpid_all(self):
1043 # Because of signal coalescing, we must keep calling waitpid() as
1044 # long as we're able to reap a child.
1045 while True:
1046 try:
1047 pid, status = os.waitpid(-1, os.WNOHANG)
1048 except ChildProcessError:
1049 # No more child processes exist.
1050 return
1051 else:
1052 if pid == 0:
1053 # A child process is still alive.
1054 return
1055
1056 returncode = self._compute_returncode(status)
1057
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001058 with self._lock:
1059 try:
1060 callback, args = self._callbacks.pop(pid)
1061 except KeyError:
1062 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001063 if self._forks:
1064 # It may not be registered yet.
1065 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001066 if self._loop.get_debug():
1067 logger.debug('unknown process %s exited '
1068 'with returncode %s',
1069 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001070 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001071 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001072 else:
1073 if self._loop.get_debug():
1074 logger.debug('process %s exited with returncode %s',
1075 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001076
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001077 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001078 logger.warning(
1079 "Caught subprocess termination from unknown pid: "
1080 "%d -> %d", pid, returncode)
1081 else:
1082 callback(pid, returncode, *args)
1083
1084
1085class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001086 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001087 _loop_factory = _UnixSelectorEventLoop
1088
1089 def __init__(self):
1090 super().__init__()
1091 self._watcher = None
1092
1093 def _init_watcher(self):
1094 with events._lock:
1095 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001096 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001097 if isinstance(threading.current_thread(),
1098 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001099 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001100
1101 def set_event_loop(self, loop):
1102 """Set the event loop.
1103
1104 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001105 .set_event_loop() from the main thread will call .attach_loop(loop) on
1106 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001107 """
1108
1109 super().set_event_loop(loop)
1110
Andrew Svetlovcc839202017-11-29 18:23:43 +02001111 if (self._watcher is not None and
1112 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001113 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001114
1115 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001116 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001117
1118 If not yet set, a SafeChildWatcher object is automatically created.
1119 """
1120 if self._watcher is None:
1121 self._init_watcher()
1122
1123 return self._watcher
1124
1125 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001126 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001127
1128 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1129
1130 if self._watcher is not None:
1131 self._watcher.close()
1132
1133 self._watcher = watcher
1134
Yury Selivanov6370f342017-12-10 18:36:12 -05001135
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001136SelectorEventLoop = _UnixSelectorEventLoop
1137DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy