blob: a4d892acad05d4ce2e8e99b33420a30dd4e52c37 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010021from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Yury Selivanov6370f342017-12-10 18:36:12 -050027__all__ = (
28 'SelectorEventLoop',
29 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31)
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Victor Stinnerfe5649c2014-07-17 22:43:40 +020038def _sighandler_noop(signum, frame):
39 """Dummy signal handler."""
40 pass
41
42
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080043class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050044 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
Yury Selivanovb057c522014-02-18 12:15:06 -050046 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047 """
48
49 def __init__(self, selector=None):
50 super().__init__(selector)
51 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020054 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020055 if not sys.is_finalizing():
56 for sig in list(self._signal_handlers):
57 self.remove_signal_handler(sig)
58 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020059 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020060 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020061 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020062 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 ResourceWarning,
64 source=self)
65 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080066
Victor Stinnerfe5649c2014-07-17 22:43:40 +020067 def _process_self_data(self, data):
68 for signum in data:
69 if not signum:
70 # ignore null bytes written by _write_to_self()
71 continue
72 self._handle_signal(signum)
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def add_signal_handler(self, sig, callback, *args):
75 """Add a handler for a signal. UNIX only.
76
77 Raise ValueError if the signal number is invalid or uncatchable.
78 Raise RuntimeError if there is a problem setting up the handler.
79 """
Yury Selivanov6370f342017-12-10 18:36:12 -050080 if (coroutines.iscoroutine(callback) or
81 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010082 raise TypeError("coroutines cannot be used "
83 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010085 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 try:
87 # set_wakeup_fd() raises ValueError if this is not the
88 # main thread. By calling it early we ensure that an
89 # event loop running in another thread cannot add a signal
90 # handler.
91 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020092 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 raise RuntimeError(str(exc))
94
Yury Selivanovf23746a2018-01-22 19:11:18 -050095 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 self._signal_handlers[sig] = handle
97
98 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020099 # Register a dummy signal handler to ask Python to write the signal
100 # number in the wakup file descriptor. _process_self_data() will
101 # read signal numbers from this file descriptor to handle signals.
102 signal.signal(sig, _sighandler_noop)
103
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100104 # Set SA_RESTART to limit EINTR occurrences.
105 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 except OSError as exc:
107 del self._signal_handlers[sig]
108 if not self._signal_handlers:
109 try:
110 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200111 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700112 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500115 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 else:
117 raise
118
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200119 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 """Internal helper that is the actual signal handler."""
121 handle = self._signal_handlers.get(sig)
122 if handle is None:
123 return # Assume it's some race condition.
124 if handle._cancelled:
125 self.remove_signal_handler(sig) # Remove it properly.
126 else:
127 self._add_callback_signalsafe(handle)
128
129 def remove_signal_handler(self, sig):
130 """Remove a handler for a signal. UNIX only.
131
132 Return True if a signal handler was removed, False if not.
133 """
134 self._check_signal(sig)
135 try:
136 del self._signal_handlers[sig]
137 except KeyError:
138 return False
139
140 if sig == signal.SIGINT:
141 handler = signal.default_int_handler
142 else:
143 handler = signal.SIG_DFL
144
145 try:
146 signal.signal(sig, handler)
147 except OSError as exc:
148 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500149 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 else:
151 raise
152
153 if not self._signal_handlers:
154 try:
155 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200156 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700157 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 return True
160
161 def _check_signal(self, sig):
162 """Internal helper to validate a signal.
163
164 Raise ValueError if the signal number is invalid or uncatchable.
165 Raise RuntimeError if there is a problem setting up the handler.
166 """
167 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500168 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
170 if not (1 <= sig < signal.NSIG):
Yury Selivanov6370f342017-12-10 18:36:12 -0500171 raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
174 extra=None):
175 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
176
177 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
178 extra=None):
179 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
180
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200181 async def _make_subprocess_transport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
183 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800184 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400185 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 transp = _UnixSubprocessTransport(self, protocol, args, shell,
187 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 waiter=waiter, extra=extra,
189 **kwargs)
190
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800191 watcher.add_child_handler(transp.get_pid(),
192 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100193 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200194 await waiter
195 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100196 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200197 await transp._wait()
198 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800199
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 return transp
201
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800202 def _child_watcher_callback(self, pid, returncode, transp):
203 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204
Neil Aspinallf7686c12017-12-19 19:45:42 +0000205 async def create_unix_connection(
206 self, protocol_factory, path=None, *,
207 ssl=None, sock=None,
208 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200209 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500210 assert server_hostname is None or isinstance(server_hostname, str)
211 if ssl:
212 if server_hostname is None:
213 raise ValueError(
214 'you have to pass server_hostname when using ssl')
215 else:
216 if server_hostname is not None:
217 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200218 if ssl_handshake_timeout is not None:
219 raise ValueError(
220 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500221
222 if path is not None:
223 if sock is not None:
224 raise ValueError(
225 'path and sock can not be specified at the same time')
226
Andrew Svetlovcc839202017-11-29 18:23:43 +0200227 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100228 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200231 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100232 except:
233 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 raise
235
236 else:
237 if sock is None:
238 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400239 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500240 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400241 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500242 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500243 sock.setblocking(False)
244
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200245 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000246 sock, protocol_factory, ssl, server_hostname,
247 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500248 return transport, protocol
249
Neil Aspinallf7686c12017-12-19 19:45:42 +0000250 async def create_unix_server(
251 self, protocol_factory, path=None, *,
252 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500253 ssl_handshake_timeout=None,
254 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500255 if isinstance(ssl, bool):
256 raise TypeError('ssl argument must be an SSLContext or None')
257
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200258 if ssl_handshake_timeout is not None and not ssl:
259 raise ValueError(
260 'ssl_handshake_timeout is only meaningful with ssl')
261
Yury Selivanovb057c522014-02-18 12:15:06 -0500262 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200263 if sock is not None:
264 raise ValueError(
265 'path and sock can not be specified at the same time')
266
Andrew Svetlovcc839202017-11-29 18:23:43 +0200267 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500268 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
269
Yury Selivanov908d55d2016-10-09 12:15:08 -0400270 # Check for abstract socket. `str` and `bytes` paths are supported.
271 if path[0] not in (0, '\x00'):
272 try:
273 if stat.S_ISSOCK(os.stat(path).st_mode):
274 os.remove(path)
275 except FileNotFoundError:
276 pass
277 except OSError as err:
278 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200279 logger.error('Unable to check or remove stale UNIX socket '
280 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400281
Yury Selivanovb057c522014-02-18 12:15:06 -0500282 try:
283 sock.bind(path)
284 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100285 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500286 if exc.errno == errno.EADDRINUSE:
287 # Let's improve the error message by adding
288 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500289 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500290 raise OSError(errno.EADDRINUSE, msg) from None
291 else:
292 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200293 except:
294 sock.close()
295 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500296 else:
297 if sock is None:
298 raise ValueError(
299 'path was not specified, and no sock specified')
300
Yury Selivanov36e7e972016-10-07 12:39:57 -0400301 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500302 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500303 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500304 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500305
Yury Selivanovb057c522014-02-18 12:15:06 -0500306 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500307 server = base_events.Server(self, [sock], protocol_factory,
308 ssl, backlog, ssl_handshake_timeout)
309 if start_serving:
310 server._start_serving()
311
Yury Selivanovb057c522014-02-18 12:15:06 -0500312 return server
313
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200314 async def _sock_sendfile_native(self, sock, file, offset, count):
315 try:
316 os.sendfile
317 except AttributeError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200318 raise events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200319 "os.sendfile() is not available")
320 try:
321 fileno = file.fileno()
322 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200323 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200324 try:
325 fsize = os.fstat(fileno).st_size
326 except OSError as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200327 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200328 blocksize = count if count else fsize
329 if not blocksize:
330 return 0 # empty file
331
332 fut = self.create_future()
333 self._sock_sendfile_native_impl(fut, None, sock, fileno,
334 offset, count, blocksize, 0)
335 return await fut
336
337 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
338 offset, count, blocksize, total_sent):
339 fd = sock.fileno()
340 if registered_fd is not None:
341 # Remove the callback early. It should be rare that the
342 # selector says the fd is ready but the call still returns
343 # EAGAIN, and I am willing to take a hit in that case in
344 # order to simplify the common case.
345 self.remove_writer(registered_fd)
346 if fut.cancelled():
347 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
348 return
349 if count:
350 blocksize = count - total_sent
351 if blocksize <= 0:
352 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
353 fut.set_result(total_sent)
354 return
355
356 try:
357 sent = os.sendfile(fd, fileno, offset, blocksize)
358 except (BlockingIOError, InterruptedError):
359 if registered_fd is None:
360 self._sock_add_cancellation_callback(fut, sock)
361 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
362 fd, sock, fileno,
363 offset, count, blocksize, total_sent)
364 except OSError as exc:
365 if total_sent == 0:
366 # We can get here for different reasons, the main
367 # one being 'file' is not a regular mmap(2)-like
368 # file, in which case we'll fall back on using
369 # plain send().
Andrew Svetlov7464e872018-01-19 20:04:29 +0200370 err = events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200371 "os.sendfile call failed")
372 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
373 fut.set_exception(err)
374 else:
375 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
376 fut.set_exception(exc)
377 except Exception as exc:
378 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
379 fut.set_exception(exc)
380 else:
381 if sent == 0:
382 # EOF
383 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
384 fut.set_result(total_sent)
385 else:
386 offset += sent
387 total_sent += sent
388 if registered_fd is None:
389 self._sock_add_cancellation_callback(fut, sock)
390 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
391 fd, sock, fileno,
392 offset, count, blocksize, total_sent)
393
394 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
395 if total_sent > 0:
396 os.lseek(fileno, offset, os.SEEK_SET)
397
398 def _sock_add_cancellation_callback(self, fut, sock):
399 def cb(fut):
400 if fut.cancelled():
401 fd = sock.fileno()
402 if fd != -1:
403 self.remove_writer(fd)
404 fut.add_done_callback(cb)
405
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407class _UnixReadPipeTransport(transports.ReadTransport):
408
Yury Selivanovdec1a452014-02-18 22:27:48 -0500409 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
411 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
412 super().__init__(extra)
413 self._extra['pipe'] = pipe
414 self._loop = loop
415 self._pipe = pipe
416 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700417 self._protocol = protocol
418 self._closing = False
419
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700420 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800421 if not (stat.S_ISFIFO(mode) or
422 stat.S_ISSOCK(mode) or
423 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700424 self._pipe = None
425 self._fileno = None
426 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700427 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700428
Andrew Svetlovcc839202017-11-29 18:23:43 +0200429 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700430
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100432 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400433 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100434 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100436 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500437 self._loop.call_soon(futures._set_result_unless_cancelled,
438 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439
Victor Stinnere912e652014-07-12 03:11:53 +0200440 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100441 info = [self.__class__.__name__]
442 if self._pipe is None:
443 info.append('closed')
444 elif self._closing:
445 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500446 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400447 selector = getattr(self._loop, '_selector', None)
448 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200449 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500450 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200451 if polling:
452 info.append('polling')
453 else:
454 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400455 elif self._pipe is not None:
456 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200457 else:
458 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500459 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200460
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 def _read_ready(self):
462 try:
463 data = os.read(self._fileno, self.max_size)
464 except (BlockingIOError, InterruptedError):
465 pass
466 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100467 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 else:
469 if data:
470 self._protocol.data_received(data)
471 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200472 if self._loop.get_debug():
473 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400475 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 self._loop.call_soon(self._protocol.eof_received)
477 self._loop.call_soon(self._call_connection_lost, None)
478
Guido van Rossum57497ad2013-10-18 07:58:20 -0700479 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400480 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481
Guido van Rossum57497ad2013-10-18 07:58:20 -0700482 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400483 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400485 def set_protocol(self, protocol):
486 self._protocol = protocol
487
488 def get_protocol(self):
489 return self._protocol
490
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500491 def is_closing(self):
492 return self._closing
493
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494 def close(self):
495 if not self._closing:
496 self._close(None)
497
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900498 def __del__(self):
499 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500500 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900501 source=self)
502 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100503
Victor Stinner0ee29c22014-02-19 01:40:41 +0100504 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200506 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
507 if self._loop.get_debug():
508 logger.debug("%r: %s", self, message, exc_info=True)
509 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500510 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100511 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500512 'exception': exc,
513 'transport': self,
514 'protocol': self._protocol,
515 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 self._close(exc)
517
518 def _close(self, exc):
519 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400520 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 self._loop.call_soon(self._call_connection_lost, exc)
522
523 def _call_connection_lost(self, exc):
524 try:
525 self._protocol.connection_lost(exc)
526 finally:
527 self._pipe.close()
528 self._pipe = None
529 self._protocol = None
530 self._loop = None
531
532
Yury Selivanov3cb99142014-02-18 18:41:13 -0500533class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800534 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535
536 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100537 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 self._pipe = pipe
540 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400542 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 self._conn_lost = 0
544 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700545
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700546 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700547 is_char = stat.S_ISCHR(mode)
548 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700549 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700550 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700551 self._pipe = None
552 self._fileno = None
553 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100554 raise ValueError("Pipe transport is only for "
555 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700556
Andrew Svetlovcc839202017-11-29 18:23:43 +0200557 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100559
560 # On AIX, the reader trick (to be notified when the read end of the
561 # socket is closed) only works for sockets. On other platforms it
562 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700563 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100564 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400565 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100566 self._fileno, self._read_ready)
567
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100569 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500570 self._loop.call_soon(futures._set_result_unless_cancelled,
571 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572
Victor Stinnere912e652014-07-12 03:11:53 +0200573 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100574 info = [self.__class__.__name__]
575 if self._pipe is None:
576 info.append('closed')
577 elif self._closing:
578 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500579 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400580 selector = getattr(self._loop, '_selector', None)
581 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200582 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500583 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200584 if polling:
585 info.append('polling')
586 else:
587 info.append('idle')
588
589 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500590 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400591 elif self._pipe is not None:
592 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200593 else:
594 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500595 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200596
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800597 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400598 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800599
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700601 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200602 if self._loop.get_debug():
603 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100604 if self._buffer:
605 self._close(BrokenPipeError())
606 else:
607 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608
609 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800610 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
611 if isinstance(data, bytearray):
612 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 if not data:
614 return
615
616 if self._conn_lost or self._closing:
617 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700618 logger.warning('pipe closed by peer or '
619 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 self._conn_lost += 1
621 return
622
623 if not self._buffer:
624 # Attempt to send it right away first.
625 try:
626 n = os.write(self._fileno, data)
627 except (BlockingIOError, InterruptedError):
628 n = 0
629 except Exception as exc:
630 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100631 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 return
633 if n == len(data):
634 return
635 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400636 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400637 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400639 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800640 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641
642 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400643 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700644
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400646 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400648 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400650 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 self._conn_lost += 1
652 # Remove writer here, _fatal_error() doesn't it
653 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400654 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100655 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400657 if n == len(self._buffer):
658 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400659 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800660 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400661 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400662 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663 self._call_connection_lost(None)
664 return
665 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400666 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667
668 def can_write_eof(self):
669 return True
670
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 def write_eof(self):
672 if self._closing:
673 return
674 assert self._pipe
675 self._closing = True
676 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400677 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 self._loop.call_soon(self._call_connection_lost, None)
679
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400680 def set_protocol(self, protocol):
681 self._protocol = protocol
682
683 def get_protocol(self):
684 return self._protocol
685
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500686 def is_closing(self):
687 return self._closing
688
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100690 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 # write_eof is all what we needed to close the write pipe
692 self.write_eof()
693
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900694 def __del__(self):
695 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500696 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900697 source=self)
698 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100699
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 def abort(self):
701 self._close(None)
702
Victor Stinner0ee29c22014-02-19 01:40:41 +0100703 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200705 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200706 if self._loop.get_debug():
707 logger.debug("%r: %s", self, message, exc_info=True)
708 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500709 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100710 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500711 'exception': exc,
712 'transport': self,
713 'protocol': self._protocol,
714 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 self._close(exc)
716
717 def _close(self, exc=None):
718 self._closing = True
719 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400720 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400722 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700723 self._loop.call_soon(self._call_connection_lost, exc)
724
725 def _call_connection_lost(self, exc):
726 try:
727 self._protocol.connection_lost(exc)
728 finally:
729 self._pipe.close()
730 self._pipe = None
731 self._protocol = None
732 self._loop = None
733
734
Guido van Rossum59691282013-10-30 14:52:03 -0700735class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736
Guido van Rossum59691282013-10-30 14:52:03 -0700737 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700738 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700739 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700740 # Use a socket pair for stdin, since not all platforms
741 # support selecting read events on the write end of a
742 # socket (which we use in order to detect closing of the
743 # other end). Notably this is needed on AIX, and works
744 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100745 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 self._proc = subprocess.Popen(
747 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
748 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700749 if stdin_w is not None:
750 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200751 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800752
753
754class AbstractChildWatcher:
755 """Abstract base class for monitoring child processes.
756
757 Objects derived from this class monitor a collection of subprocesses and
758 report their termination or interruption by a signal.
759
760 New callbacks are registered with .add_child_handler(). Starting a new
761 process must be done within a 'with' block to allow the watcher to suspend
762 its activity until the new process if fully registered (this is needed to
763 prevent a race condition in some implementations).
764
765 Example:
766 with watcher:
767 proc = subprocess.Popen("sleep 1")
768 watcher.add_child_handler(proc.pid, callback)
769
770 Notes:
771 Implementations of this class must be thread-safe.
772
773 Since child watcher objects may catch the SIGCHLD signal and call
774 waitpid(-1), there should be only one active object per process.
775 """
776
777 def add_child_handler(self, pid, callback, *args):
778 """Register a new child handler.
779
780 Arrange for callback(pid, returncode, *args) to be called when
781 process 'pid' terminates. Specifying another callback for the same
782 process replaces the previous handler.
783
Victor Stinneracdb7822014-07-14 18:33:40 +0200784 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800785 """
786 raise NotImplementedError()
787
788 def remove_child_handler(self, pid):
789 """Removes the handler for process 'pid'.
790
791 The function returns True if the handler was successfully removed,
792 False if there was nothing to remove."""
793
794 raise NotImplementedError()
795
Guido van Rossum2bcae702013-11-13 15:50:08 -0800796 def attach_loop(self, loop):
797 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800798
Guido van Rossum2bcae702013-11-13 15:50:08 -0800799 If the watcher was previously attached to an event loop, then it is
800 first detached before attaching to the new loop.
801
802 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800803 """
804 raise NotImplementedError()
805
806 def close(self):
807 """Close the watcher.
808
809 This must be called to make sure that any underlying resource is freed.
810 """
811 raise NotImplementedError()
812
813 def __enter__(self):
814 """Enter the watcher's context and allow starting new processes
815
816 This function must return self"""
817 raise NotImplementedError()
818
819 def __exit__(self, a, b, c):
820 """Exit the watcher's context"""
821 raise NotImplementedError()
822
823
824class BaseChildWatcher(AbstractChildWatcher):
825
Guido van Rossum2bcae702013-11-13 15:50:08 -0800826 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800827 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400828 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800829
830 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800831 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800832
833 def _do_waitpid(self, expected_pid):
834 raise NotImplementedError()
835
836 def _do_waitpid_all(self):
837 raise NotImplementedError()
838
Guido van Rossum2bcae702013-11-13 15:50:08 -0800839 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800840 assert loop is None or isinstance(loop, events.AbstractEventLoop)
841
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400842 if self._loop is not None and loop is None and self._callbacks:
843 warnings.warn(
844 'A loop is being detached '
845 'from a child watcher with pending handlers',
846 RuntimeWarning)
847
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800848 if self._loop is not None:
849 self._loop.remove_signal_handler(signal.SIGCHLD)
850
851 self._loop = loop
852 if loop is not None:
853 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
854
855 # Prevent a race condition in case a child terminated
856 # during the switch.
857 self._do_waitpid_all()
858
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800859 def _sig_chld(self):
860 try:
861 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500862 except Exception as exc:
863 # self._loop should always be available here
864 # as '_sig_chld' is added as a signal handler
865 # in 'attach_loop'
866 self._loop.call_exception_handler({
867 'message': 'Unknown exception in SIGCHLD handler',
868 'exception': exc,
869 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800870
871 def _compute_returncode(self, status):
872 if os.WIFSIGNALED(status):
873 # The child process died because of a signal.
874 return -os.WTERMSIG(status)
875 elif os.WIFEXITED(status):
876 # The child process exited (e.g sys.exit()).
877 return os.WEXITSTATUS(status)
878 else:
879 # The child exited, but we don't understand its status.
880 # This shouldn't happen, but if it does, let's just
881 # return that status; perhaps that helps debug it.
882 return status
883
884
885class SafeChildWatcher(BaseChildWatcher):
886 """'Safe' child watcher implementation.
887
888 This implementation avoids disrupting other code spawning processes by
889 polling explicitly each process in the SIGCHLD handler instead of calling
890 os.waitpid(-1).
891
892 This is a safe solution but it has a significant overhead when handling a
893 big number of children (O(n) each time SIGCHLD is raised)
894 """
895
Guido van Rossum2bcae702013-11-13 15:50:08 -0800896 def close(self):
897 self._callbacks.clear()
898 super().close()
899
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900 def __enter__(self):
901 return self
902
903 def __exit__(self, a, b, c):
904 pass
905
906 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400907 if self._loop is None:
908 raise RuntimeError(
909 "Cannot add child handler, "
910 "the child watcher does not have a loop attached")
911
Victor Stinner47cd10d2015-01-30 00:05:19 +0100912 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800913
914 # Prevent a race condition in case the child is already terminated.
915 self._do_waitpid(pid)
916
Guido van Rossum2bcae702013-11-13 15:50:08 -0800917 def remove_child_handler(self, pid):
918 try:
919 del self._callbacks[pid]
920 return True
921 except KeyError:
922 return False
923
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800924 def _do_waitpid_all(self):
925
926 for pid in list(self._callbacks):
927 self._do_waitpid(pid)
928
929 def _do_waitpid(self, expected_pid):
930 assert expected_pid > 0
931
932 try:
933 pid, status = os.waitpid(expected_pid, os.WNOHANG)
934 except ChildProcessError:
935 # The child process is already reaped
936 # (may happen if waitpid() is called elsewhere).
937 pid = expected_pid
938 returncode = 255
939 logger.warning(
940 "Unknown child process pid %d, will report returncode 255",
941 pid)
942 else:
943 if pid == 0:
944 # The child process is still alive.
945 return
946
947 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200948 if self._loop.get_debug():
949 logger.debug('process %s exited with returncode %s',
950 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800951
952 try:
953 callback, args = self._callbacks.pop(pid)
954 except KeyError: # pragma: no cover
955 # May happen if .remove_child_handler() is called
956 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200957 if self._loop.get_debug():
958 logger.warning("Child watcher got an unexpected pid: %r",
959 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800960 else:
961 callback(pid, returncode, *args)
962
963
964class FastChildWatcher(BaseChildWatcher):
965 """'Fast' child watcher implementation.
966
967 This implementation reaps every terminated processes by calling
968 os.waitpid(-1) directly, possibly breaking other code spawning processes
969 and waiting for their termination.
970
971 There is no noticeable overhead when handling a big number of children
972 (O(1) each time a child terminates).
973 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800974 def __init__(self):
975 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800976 self._lock = threading.Lock()
977 self._zombies = {}
978 self._forks = 0
979
980 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800981 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800982 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800983 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800984
985 def __enter__(self):
986 with self._lock:
987 self._forks += 1
988
989 return self
990
991 def __exit__(self, a, b, c):
992 with self._lock:
993 self._forks -= 1
994
995 if self._forks or not self._zombies:
996 return
997
998 collateral_victims = str(self._zombies)
999 self._zombies.clear()
1000
1001 logger.warning(
1002 "Caught subprocesses termination from unknown pids: %s",
1003 collateral_victims)
1004
1005 def add_child_handler(self, pid, callback, *args):
1006 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001007
1008 if self._loop is None:
1009 raise RuntimeError(
1010 "Cannot add child handler, "
1011 "the child watcher does not have a loop attached")
1012
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001013 with self._lock:
1014 try:
1015 returncode = self._zombies.pop(pid)
1016 except KeyError:
1017 # The child is running.
1018 self._callbacks[pid] = callback, args
1019 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001020
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001021 # The child is dead already. We can fire the callback.
1022 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001023
Guido van Rossum2bcae702013-11-13 15:50:08 -08001024 def remove_child_handler(self, pid):
1025 try:
1026 del self._callbacks[pid]
1027 return True
1028 except KeyError:
1029 return False
1030
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001031 def _do_waitpid_all(self):
1032 # Because of signal coalescing, we must keep calling waitpid() as
1033 # long as we're able to reap a child.
1034 while True:
1035 try:
1036 pid, status = os.waitpid(-1, os.WNOHANG)
1037 except ChildProcessError:
1038 # No more child processes exist.
1039 return
1040 else:
1041 if pid == 0:
1042 # A child process is still alive.
1043 return
1044
1045 returncode = self._compute_returncode(status)
1046
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001047 with self._lock:
1048 try:
1049 callback, args = self._callbacks.pop(pid)
1050 except KeyError:
1051 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001052 if self._forks:
1053 # It may not be registered yet.
1054 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001055 if self._loop.get_debug():
1056 logger.debug('unknown process %s exited '
1057 'with returncode %s',
1058 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001059 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001060 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001061 else:
1062 if self._loop.get_debug():
1063 logger.debug('process %s exited with returncode %s',
1064 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001065
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001066 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001067 logger.warning(
1068 "Caught subprocess termination from unknown pid: "
1069 "%d -> %d", pid, returncode)
1070 else:
1071 callback(pid, returncode, *args)
1072
1073
1074class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001075 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001076 _loop_factory = _UnixSelectorEventLoop
1077
1078 def __init__(self):
1079 super().__init__()
1080 self._watcher = None
1081
1082 def _init_watcher(self):
1083 with events._lock:
1084 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001085 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001086 if isinstance(threading.current_thread(),
1087 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001088 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001089
1090 def set_event_loop(self, loop):
1091 """Set the event loop.
1092
1093 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001094 .set_event_loop() from the main thread will call .attach_loop(loop) on
1095 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001096 """
1097
1098 super().set_event_loop(loop)
1099
Andrew Svetlovcc839202017-11-29 18:23:43 +02001100 if (self._watcher is not None and
1101 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001102 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001103
1104 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001105 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001106
1107 If not yet set, a SafeChildWatcher object is automatically created.
1108 """
1109 if self._watcher is None:
1110 self._init_watcher()
1111
1112 return self._watcher
1113
1114 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001115 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001116
1117 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1118
1119 if self._watcher is not None:
1120 self._watcher.close()
1121
1122 self._watcher = watcher
1123
Yury Selivanov6370f342017-12-10 18:36:12 -05001124
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001125SelectorEventLoop = _UnixSelectorEventLoop
1126DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy