blob: f40ef12f2655fa607e96fc682ff1571cb6e61101 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010021from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Yury Selivanov6370f342017-12-10 18:36:12 -050027__all__ = (
28 'SelectorEventLoop',
29 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31)
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Victor Stinnerfe5649c2014-07-17 22:43:40 +020038def _sighandler_noop(signum, frame):
39 """Dummy signal handler."""
40 pass
41
42
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080043class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050044 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
Yury Selivanovb057c522014-02-18 12:15:06 -050046 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047 """
48
49 def __init__(self, selector=None):
50 super().__init__(selector)
51 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020054 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020055 if not sys.is_finalizing():
56 for sig in list(self._signal_handlers):
57 self.remove_signal_handler(sig)
58 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020059 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020060 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020061 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020062 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 ResourceWarning,
64 source=self)
65 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080066
Victor Stinnerfe5649c2014-07-17 22:43:40 +020067 def _process_self_data(self, data):
68 for signum in data:
69 if not signum:
70 # ignore null bytes written by _write_to_self()
71 continue
72 self._handle_signal(signum)
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def add_signal_handler(self, sig, callback, *args):
75 """Add a handler for a signal. UNIX only.
76
77 Raise ValueError if the signal number is invalid or uncatchable.
78 Raise RuntimeError if there is a problem setting up the handler.
79 """
Yury Selivanov6370f342017-12-10 18:36:12 -050080 if (coroutines.iscoroutine(callback) or
81 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010082 raise TypeError("coroutines cannot be used "
83 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010085 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 try:
87 # set_wakeup_fd() raises ValueError if this is not the
88 # main thread. By calling it early we ensure that an
89 # event loop running in another thread cannot add a signal
90 # handler.
91 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020092 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 raise RuntimeError(str(exc))
94
Yury Selivanov569efa22014-02-18 18:02:19 -050095 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 self._signal_handlers[sig] = handle
97
98 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020099 # Register a dummy signal handler to ask Python to write the signal
100 # number in the wakup file descriptor. _process_self_data() will
101 # read signal numbers from this file descriptor to handle signals.
102 signal.signal(sig, _sighandler_noop)
103
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100104 # Set SA_RESTART to limit EINTR occurrences.
105 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 except OSError as exc:
107 del self._signal_handlers[sig]
108 if not self._signal_handlers:
109 try:
110 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200111 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700112 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500115 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 else:
117 raise
118
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200119 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 """Internal helper that is the actual signal handler."""
121 handle = self._signal_handlers.get(sig)
122 if handle is None:
123 return # Assume it's some race condition.
124 if handle._cancelled:
125 self.remove_signal_handler(sig) # Remove it properly.
126 else:
127 self._add_callback_signalsafe(handle)
128
129 def remove_signal_handler(self, sig):
130 """Remove a handler for a signal. UNIX only.
131
132 Return True if a signal handler was removed, False if not.
133 """
134 self._check_signal(sig)
135 try:
136 del self._signal_handlers[sig]
137 except KeyError:
138 return False
139
140 if sig == signal.SIGINT:
141 handler = signal.default_int_handler
142 else:
143 handler = signal.SIG_DFL
144
145 try:
146 signal.signal(sig, handler)
147 except OSError as exc:
148 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500149 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 else:
151 raise
152
153 if not self._signal_handlers:
154 try:
155 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200156 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700157 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 return True
160
161 def _check_signal(self, sig):
162 """Internal helper to validate a signal.
163
164 Raise ValueError if the signal number is invalid or uncatchable.
165 Raise RuntimeError if there is a problem setting up the handler.
166 """
167 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500168 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
170 if not (1 <= sig < signal.NSIG):
Yury Selivanov6370f342017-12-10 18:36:12 -0500171 raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
174 extra=None):
175 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
176
177 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
178 extra=None):
179 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
180
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200181 async def _make_subprocess_transport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
183 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800184 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400185 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 transp = _UnixSubprocessTransport(self, protocol, args, shell,
187 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 waiter=waiter, extra=extra,
189 **kwargs)
190
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800191 watcher.add_child_handler(transp.get_pid(),
192 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100193 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200194 await waiter
195 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100196 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200197 await transp._wait()
198 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800199
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 return transp
201
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800202 def _child_watcher_callback(self, pid, returncode, transp):
203 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204
Neil Aspinallf7686c12017-12-19 19:45:42 +0000205 async def create_unix_connection(
206 self, protocol_factory, path=None, *,
207 ssl=None, sock=None,
208 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200209 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500210 assert server_hostname is None or isinstance(server_hostname, str)
211 if ssl:
212 if server_hostname is None:
213 raise ValueError(
214 'you have to pass server_hostname when using ssl')
215 else:
216 if server_hostname is not None:
217 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200218 if ssl_handshake_timeout is not None:
219 raise ValueError(
220 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500221
222 if path is not None:
223 if sock is not None:
224 raise ValueError(
225 'path and sock can not be specified at the same time')
226
Andrew Svetlovcc839202017-11-29 18:23:43 +0200227 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100228 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200231 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100232 except:
233 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 raise
235
236 else:
237 if sock is None:
238 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400239 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500240 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400241 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500242 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500243 sock.setblocking(False)
244
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200245 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000246 sock, protocol_factory, ssl, server_hostname,
247 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500248 return transport, protocol
249
Neil Aspinallf7686c12017-12-19 19:45:42 +0000250 async def create_unix_server(
251 self, protocol_factory, path=None, *,
252 sock=None, backlog=100, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200253 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500254 if isinstance(ssl, bool):
255 raise TypeError('ssl argument must be an SSLContext or None')
256
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200257 if ssl_handshake_timeout is not None and not ssl:
258 raise ValueError(
259 'ssl_handshake_timeout is only meaningful with ssl')
260
Yury Selivanovb057c522014-02-18 12:15:06 -0500261 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200262 if sock is not None:
263 raise ValueError(
264 'path and sock can not be specified at the same time')
265
Andrew Svetlovcc839202017-11-29 18:23:43 +0200266 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500267 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268
Yury Selivanov908d55d2016-10-09 12:15:08 -0400269 # Check for abstract socket. `str` and `bytes` paths are supported.
270 if path[0] not in (0, '\x00'):
271 try:
272 if stat.S_ISSOCK(os.stat(path).st_mode):
273 os.remove(path)
274 except FileNotFoundError:
275 pass
276 except OSError as err:
277 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200278 logger.error('Unable to check or remove stale UNIX socket '
279 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400280
Yury Selivanovb057c522014-02-18 12:15:06 -0500281 try:
282 sock.bind(path)
283 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100284 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500285 if exc.errno == errno.EADDRINUSE:
286 # Let's improve the error message by adding
287 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500288 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500289 raise OSError(errno.EADDRINUSE, msg) from None
290 else:
291 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200292 except:
293 sock.close()
294 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500295 else:
296 if sock is None:
297 raise ValueError(
298 'path was not specified, and no sock specified')
299
Yury Selivanov36e7e972016-10-07 12:39:57 -0400300 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500301 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500302 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500303 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500304
305 server = base_events.Server(self, [sock])
306 sock.listen(backlog)
307 sock.setblocking(False)
Neil Aspinallf7686c12017-12-19 19:45:42 +0000308 self._start_serving(protocol_factory, sock, ssl, server,
309 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500310 return server
311
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200312 async def _sock_sendfile_native(self, sock, file, offset, count):
313 try:
314 os.sendfile
315 except AttributeError as exc:
316 raise base_events._SendfileNotAvailable(
317 "os.sendfile() is not available")
318 try:
319 fileno = file.fileno()
320 except (AttributeError, io.UnsupportedOperation) as err:
321 raise base_events._SendfileNotAvailable("not a regular file")
322 try:
323 fsize = os.fstat(fileno).st_size
324 except OSError as err:
325 raise base_events._SendfileNotAvailable("not a regular file")
326 blocksize = count if count else fsize
327 if not blocksize:
328 return 0 # empty file
329
330 fut = self.create_future()
331 self._sock_sendfile_native_impl(fut, None, sock, fileno,
332 offset, count, blocksize, 0)
333 return await fut
334
335 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
336 offset, count, blocksize, total_sent):
337 fd = sock.fileno()
338 if registered_fd is not None:
339 # Remove the callback early. It should be rare that the
340 # selector says the fd is ready but the call still returns
341 # EAGAIN, and I am willing to take a hit in that case in
342 # order to simplify the common case.
343 self.remove_writer(registered_fd)
344 if fut.cancelled():
345 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
346 return
347 if count:
348 blocksize = count - total_sent
349 if blocksize <= 0:
350 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
351 fut.set_result(total_sent)
352 return
353
354 try:
355 sent = os.sendfile(fd, fileno, offset, blocksize)
356 except (BlockingIOError, InterruptedError):
357 if registered_fd is None:
358 self._sock_add_cancellation_callback(fut, sock)
359 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
360 fd, sock, fileno,
361 offset, count, blocksize, total_sent)
362 except OSError as exc:
363 if total_sent == 0:
364 # We can get here for different reasons, the main
365 # one being 'file' is not a regular mmap(2)-like
366 # file, in which case we'll fall back on using
367 # plain send().
368 err = base_events._SendfileNotAvailable(
369 "os.sendfile call failed")
370 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
371 fut.set_exception(err)
372 else:
373 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
374 fut.set_exception(exc)
375 except Exception as exc:
376 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
377 fut.set_exception(exc)
378 else:
379 if sent == 0:
380 # EOF
381 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
382 fut.set_result(total_sent)
383 else:
384 offset += sent
385 total_sent += sent
386 if registered_fd is None:
387 self._sock_add_cancellation_callback(fut, sock)
388 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
389 fd, sock, fileno,
390 offset, count, blocksize, total_sent)
391
392 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
393 if total_sent > 0:
394 os.lseek(fileno, offset, os.SEEK_SET)
395
396 def _sock_add_cancellation_callback(self, fut, sock):
397 def cb(fut):
398 if fut.cancelled():
399 fd = sock.fileno()
400 if fd != -1:
401 self.remove_writer(fd)
402 fut.add_done_callback(cb)
403
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405class _UnixReadPipeTransport(transports.ReadTransport):
406
Yury Selivanovdec1a452014-02-18 22:27:48 -0500407 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408
409 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
410 super().__init__(extra)
411 self._extra['pipe'] = pipe
412 self._loop = loop
413 self._pipe = pipe
414 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700415 self._protocol = protocol
416 self._closing = False
417
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700418 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800419 if not (stat.S_ISFIFO(mode) or
420 stat.S_ISSOCK(mode) or
421 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700422 self._pipe = None
423 self._fileno = None
424 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700425 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700426
Andrew Svetlovcc839202017-11-29 18:23:43 +0200427 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700428
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100430 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400431 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100432 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100434 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500435 self._loop.call_soon(futures._set_result_unless_cancelled,
436 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
Victor Stinnere912e652014-07-12 03:11:53 +0200438 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100439 info = [self.__class__.__name__]
440 if self._pipe is None:
441 info.append('closed')
442 elif self._closing:
443 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500444 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400445 selector = getattr(self._loop, '_selector', None)
446 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200447 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500448 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200449 if polling:
450 info.append('polling')
451 else:
452 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400453 elif self._pipe is not None:
454 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200455 else:
456 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500457 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200458
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 def _read_ready(self):
460 try:
461 data = os.read(self._fileno, self.max_size)
462 except (BlockingIOError, InterruptedError):
463 pass
464 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100465 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 else:
467 if data:
468 self._protocol.data_received(data)
469 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200470 if self._loop.get_debug():
471 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400473 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 self._loop.call_soon(self._protocol.eof_received)
475 self._loop.call_soon(self._call_connection_lost, None)
476
Guido van Rossum57497ad2013-10-18 07:58:20 -0700477 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400478 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
Guido van Rossum57497ad2013-10-18 07:58:20 -0700480 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400481 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400483 def set_protocol(self, protocol):
484 self._protocol = protocol
485
486 def get_protocol(self):
487 return self._protocol
488
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500489 def is_closing(self):
490 return self._closing
491
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 def close(self):
493 if not self._closing:
494 self._close(None)
495
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900496 def __del__(self):
497 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500498 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900499 source=self)
500 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100501
Victor Stinner0ee29c22014-02-19 01:40:41 +0100502 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200504 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
505 if self._loop.get_debug():
506 logger.debug("%r: %s", self, message, exc_info=True)
507 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500508 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100509 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500510 'exception': exc,
511 'transport': self,
512 'protocol': self._protocol,
513 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 self._close(exc)
515
516 def _close(self, exc):
517 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400518 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 self._loop.call_soon(self._call_connection_lost, exc)
520
521 def _call_connection_lost(self, exc):
522 try:
523 self._protocol.connection_lost(exc)
524 finally:
525 self._pipe.close()
526 self._pipe = None
527 self._protocol = None
528 self._loop = None
529
530
Yury Selivanov3cb99142014-02-18 18:41:13 -0500531class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800532 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533
534 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100535 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 self._pipe = pipe
538 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400540 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 self._conn_lost = 0
542 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700543
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700544 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700545 is_char = stat.S_ISCHR(mode)
546 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700547 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700548 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700549 self._pipe = None
550 self._fileno = None
551 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100552 raise ValueError("Pipe transport is only for "
553 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700554
Andrew Svetlovcc839202017-11-29 18:23:43 +0200555 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100557
558 # On AIX, the reader trick (to be notified when the read end of the
559 # socket is closed) only works for sockets. On other platforms it
560 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700561 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100562 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400563 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100564 self._fileno, self._read_ready)
565
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100567 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500568 self._loop.call_soon(futures._set_result_unless_cancelled,
569 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570
Victor Stinnere912e652014-07-12 03:11:53 +0200571 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100572 info = [self.__class__.__name__]
573 if self._pipe is None:
574 info.append('closed')
575 elif self._closing:
576 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500577 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400578 selector = getattr(self._loop, '_selector', None)
579 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200580 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500581 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200582 if polling:
583 info.append('polling')
584 else:
585 info.append('idle')
586
587 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500588 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400589 elif self._pipe is not None:
590 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200591 else:
592 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500593 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200594
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800595 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400596 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800597
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700599 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200600 if self._loop.get_debug():
601 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100602 if self._buffer:
603 self._close(BrokenPipeError())
604 else:
605 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606
607 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800608 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
609 if isinstance(data, bytearray):
610 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 if not data:
612 return
613
614 if self._conn_lost or self._closing:
615 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700616 logger.warning('pipe closed by peer or '
617 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 self._conn_lost += 1
619 return
620
621 if not self._buffer:
622 # Attempt to send it right away first.
623 try:
624 n = os.write(self._fileno, data)
625 except (BlockingIOError, InterruptedError):
626 n = 0
627 except Exception as exc:
628 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100629 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 return
631 if n == len(data):
632 return
633 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400634 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400635 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400637 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800638 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639
640 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400641 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400644 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400646 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400648 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 self._conn_lost += 1
650 # Remove writer here, _fatal_error() doesn't it
651 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400652 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100653 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400655 if n == len(self._buffer):
656 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400657 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800658 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400659 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400660 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 self._call_connection_lost(None)
662 return
663 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400664 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
666 def can_write_eof(self):
667 return True
668
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 def write_eof(self):
670 if self._closing:
671 return
672 assert self._pipe
673 self._closing = True
674 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400675 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 self._loop.call_soon(self._call_connection_lost, None)
677
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400678 def set_protocol(self, protocol):
679 self._protocol = protocol
680
681 def get_protocol(self):
682 return self._protocol
683
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500684 def is_closing(self):
685 return self._closing
686
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100688 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 # write_eof is all what we needed to close the write pipe
690 self.write_eof()
691
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900692 def __del__(self):
693 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500694 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900695 source=self)
696 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100697
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 def abort(self):
699 self._close(None)
700
Victor Stinner0ee29c22014-02-19 01:40:41 +0100701 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200703 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200704 if self._loop.get_debug():
705 logger.debug("%r: %s", self, message, exc_info=True)
706 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500707 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100708 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500709 'exception': exc,
710 'transport': self,
711 'protocol': self._protocol,
712 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 self._close(exc)
714
715 def _close(self, exc=None):
716 self._closing = True
717 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400718 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400720 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 self._loop.call_soon(self._call_connection_lost, exc)
722
723 def _call_connection_lost(self, exc):
724 try:
725 self._protocol.connection_lost(exc)
726 finally:
727 self._pipe.close()
728 self._pipe = None
729 self._protocol = None
730 self._loop = None
731
732
Guido van Rossum59691282013-10-30 14:52:03 -0700733class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734
Guido van Rossum59691282013-10-30 14:52:03 -0700735 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700736 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700738 # Use a socket pair for stdin, since not all platforms
739 # support selecting read events on the write end of a
740 # socket (which we use in order to detect closing of the
741 # other end). Notably this is needed on AIX, and works
742 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100743 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700744 self._proc = subprocess.Popen(
745 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
746 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700747 if stdin_w is not None:
748 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200749 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800750
751
752class AbstractChildWatcher:
753 """Abstract base class for monitoring child processes.
754
755 Objects derived from this class monitor a collection of subprocesses and
756 report their termination or interruption by a signal.
757
758 New callbacks are registered with .add_child_handler(). Starting a new
759 process must be done within a 'with' block to allow the watcher to suspend
760 its activity until the new process if fully registered (this is needed to
761 prevent a race condition in some implementations).
762
763 Example:
764 with watcher:
765 proc = subprocess.Popen("sleep 1")
766 watcher.add_child_handler(proc.pid, callback)
767
768 Notes:
769 Implementations of this class must be thread-safe.
770
771 Since child watcher objects may catch the SIGCHLD signal and call
772 waitpid(-1), there should be only one active object per process.
773 """
774
775 def add_child_handler(self, pid, callback, *args):
776 """Register a new child handler.
777
778 Arrange for callback(pid, returncode, *args) to be called when
779 process 'pid' terminates. Specifying another callback for the same
780 process replaces the previous handler.
781
Victor Stinneracdb7822014-07-14 18:33:40 +0200782 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800783 """
784 raise NotImplementedError()
785
786 def remove_child_handler(self, pid):
787 """Removes the handler for process 'pid'.
788
789 The function returns True if the handler was successfully removed,
790 False if there was nothing to remove."""
791
792 raise NotImplementedError()
793
Guido van Rossum2bcae702013-11-13 15:50:08 -0800794 def attach_loop(self, loop):
795 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800796
Guido van Rossum2bcae702013-11-13 15:50:08 -0800797 If the watcher was previously attached to an event loop, then it is
798 first detached before attaching to the new loop.
799
800 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800801 """
802 raise NotImplementedError()
803
804 def close(self):
805 """Close the watcher.
806
807 This must be called to make sure that any underlying resource is freed.
808 """
809 raise NotImplementedError()
810
811 def __enter__(self):
812 """Enter the watcher's context and allow starting new processes
813
814 This function must return self"""
815 raise NotImplementedError()
816
817 def __exit__(self, a, b, c):
818 """Exit the watcher's context"""
819 raise NotImplementedError()
820
821
822class BaseChildWatcher(AbstractChildWatcher):
823
Guido van Rossum2bcae702013-11-13 15:50:08 -0800824 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800825 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400826 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800827
828 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800829 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800830
831 def _do_waitpid(self, expected_pid):
832 raise NotImplementedError()
833
834 def _do_waitpid_all(self):
835 raise NotImplementedError()
836
Guido van Rossum2bcae702013-11-13 15:50:08 -0800837 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800838 assert loop is None or isinstance(loop, events.AbstractEventLoop)
839
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400840 if self._loop is not None and loop is None and self._callbacks:
841 warnings.warn(
842 'A loop is being detached '
843 'from a child watcher with pending handlers',
844 RuntimeWarning)
845
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800846 if self._loop is not None:
847 self._loop.remove_signal_handler(signal.SIGCHLD)
848
849 self._loop = loop
850 if loop is not None:
851 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
852
853 # Prevent a race condition in case a child terminated
854 # during the switch.
855 self._do_waitpid_all()
856
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800857 def _sig_chld(self):
858 try:
859 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500860 except Exception as exc:
861 # self._loop should always be available here
862 # as '_sig_chld' is added as a signal handler
863 # in 'attach_loop'
864 self._loop.call_exception_handler({
865 'message': 'Unknown exception in SIGCHLD handler',
866 'exception': exc,
867 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800868
869 def _compute_returncode(self, status):
870 if os.WIFSIGNALED(status):
871 # The child process died because of a signal.
872 return -os.WTERMSIG(status)
873 elif os.WIFEXITED(status):
874 # The child process exited (e.g sys.exit()).
875 return os.WEXITSTATUS(status)
876 else:
877 # The child exited, but we don't understand its status.
878 # This shouldn't happen, but if it does, let's just
879 # return that status; perhaps that helps debug it.
880 return status
881
882
883class SafeChildWatcher(BaseChildWatcher):
884 """'Safe' child watcher implementation.
885
886 This implementation avoids disrupting other code spawning processes by
887 polling explicitly each process in the SIGCHLD handler instead of calling
888 os.waitpid(-1).
889
890 This is a safe solution but it has a significant overhead when handling a
891 big number of children (O(n) each time SIGCHLD is raised)
892 """
893
Guido van Rossum2bcae702013-11-13 15:50:08 -0800894 def close(self):
895 self._callbacks.clear()
896 super().close()
897
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800898 def __enter__(self):
899 return self
900
901 def __exit__(self, a, b, c):
902 pass
903
904 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400905 if self._loop is None:
906 raise RuntimeError(
907 "Cannot add child handler, "
908 "the child watcher does not have a loop attached")
909
Victor Stinner47cd10d2015-01-30 00:05:19 +0100910 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800911
912 # Prevent a race condition in case the child is already terminated.
913 self._do_waitpid(pid)
914
Guido van Rossum2bcae702013-11-13 15:50:08 -0800915 def remove_child_handler(self, pid):
916 try:
917 del self._callbacks[pid]
918 return True
919 except KeyError:
920 return False
921
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800922 def _do_waitpid_all(self):
923
924 for pid in list(self._callbacks):
925 self._do_waitpid(pid)
926
927 def _do_waitpid(self, expected_pid):
928 assert expected_pid > 0
929
930 try:
931 pid, status = os.waitpid(expected_pid, os.WNOHANG)
932 except ChildProcessError:
933 # The child process is already reaped
934 # (may happen if waitpid() is called elsewhere).
935 pid = expected_pid
936 returncode = 255
937 logger.warning(
938 "Unknown child process pid %d, will report returncode 255",
939 pid)
940 else:
941 if pid == 0:
942 # The child process is still alive.
943 return
944
945 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200946 if self._loop.get_debug():
947 logger.debug('process %s exited with returncode %s',
948 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800949
950 try:
951 callback, args = self._callbacks.pop(pid)
952 except KeyError: # pragma: no cover
953 # May happen if .remove_child_handler() is called
954 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200955 if self._loop.get_debug():
956 logger.warning("Child watcher got an unexpected pid: %r",
957 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800958 else:
959 callback(pid, returncode, *args)
960
961
962class FastChildWatcher(BaseChildWatcher):
963 """'Fast' child watcher implementation.
964
965 This implementation reaps every terminated processes by calling
966 os.waitpid(-1) directly, possibly breaking other code spawning processes
967 and waiting for their termination.
968
969 There is no noticeable overhead when handling a big number of children
970 (O(1) each time a child terminates).
971 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800972 def __init__(self):
973 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800974 self._lock = threading.Lock()
975 self._zombies = {}
976 self._forks = 0
977
978 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800979 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800980 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800981 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800982
983 def __enter__(self):
984 with self._lock:
985 self._forks += 1
986
987 return self
988
989 def __exit__(self, a, b, c):
990 with self._lock:
991 self._forks -= 1
992
993 if self._forks or not self._zombies:
994 return
995
996 collateral_victims = str(self._zombies)
997 self._zombies.clear()
998
999 logger.warning(
1000 "Caught subprocesses termination from unknown pids: %s",
1001 collateral_victims)
1002
1003 def add_child_handler(self, pid, callback, *args):
1004 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001005
1006 if self._loop is None:
1007 raise RuntimeError(
1008 "Cannot add child handler, "
1009 "the child watcher does not have a loop attached")
1010
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001011 with self._lock:
1012 try:
1013 returncode = self._zombies.pop(pid)
1014 except KeyError:
1015 # The child is running.
1016 self._callbacks[pid] = callback, args
1017 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001018
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001019 # The child is dead already. We can fire the callback.
1020 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001021
Guido van Rossum2bcae702013-11-13 15:50:08 -08001022 def remove_child_handler(self, pid):
1023 try:
1024 del self._callbacks[pid]
1025 return True
1026 except KeyError:
1027 return False
1028
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001029 def _do_waitpid_all(self):
1030 # Because of signal coalescing, we must keep calling waitpid() as
1031 # long as we're able to reap a child.
1032 while True:
1033 try:
1034 pid, status = os.waitpid(-1, os.WNOHANG)
1035 except ChildProcessError:
1036 # No more child processes exist.
1037 return
1038 else:
1039 if pid == 0:
1040 # A child process is still alive.
1041 return
1042
1043 returncode = self._compute_returncode(status)
1044
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001045 with self._lock:
1046 try:
1047 callback, args = self._callbacks.pop(pid)
1048 except KeyError:
1049 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001050 if self._forks:
1051 # It may not be registered yet.
1052 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001053 if self._loop.get_debug():
1054 logger.debug('unknown process %s exited '
1055 'with returncode %s',
1056 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001057 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001058 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001059 else:
1060 if self._loop.get_debug():
1061 logger.debug('process %s exited with returncode %s',
1062 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001063
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001064 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001065 logger.warning(
1066 "Caught subprocess termination from unknown pid: "
1067 "%d -> %d", pid, returncode)
1068 else:
1069 callback(pid, returncode, *args)
1070
1071
1072class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001073 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001074 _loop_factory = _UnixSelectorEventLoop
1075
1076 def __init__(self):
1077 super().__init__()
1078 self._watcher = None
1079
1080 def _init_watcher(self):
1081 with events._lock:
1082 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001083 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001084 if isinstance(threading.current_thread(),
1085 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001086 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001087
1088 def set_event_loop(self, loop):
1089 """Set the event loop.
1090
1091 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001092 .set_event_loop() from the main thread will call .attach_loop(loop) on
1093 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001094 """
1095
1096 super().set_event_loop(loop)
1097
Andrew Svetlovcc839202017-11-29 18:23:43 +02001098 if (self._watcher is not None and
1099 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001100 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001101
1102 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001103 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001104
1105 If not yet set, a SafeChildWatcher object is automatically created.
1106 """
1107 if self._watcher is None:
1108 self._init_watcher()
1109
1110 return self._watcher
1111
1112 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001113 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001114
1115 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1116
1117 if self._watcher is not None:
1118 self._watcher.close()
1119
1120 self._watcher = watcher
1121
Yury Selivanov6370f342017-12-10 18:36:12 -05001122
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001123SelectorEventLoop = _UnixSelectorEventLoop
1124DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy