blob: 1fc39abe09ce8b2527f3f43030395a0631064ed5 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010019from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020021from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner915bcb02014-02-01 22:49:59 +010027__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080028 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032if sys.platform == 'win32': # pragma: no cover
33 raise ImportError('Signals are not really supported on Windows')
34
35
Victor Stinnerfe5649c2014-07-17 22:43:40 +020036def _sighandler_noop(signum, frame):
37 """Dummy signal handler."""
38 pass
39
40
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080041class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050042 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
Yury Selivanovb057c522014-02-18 12:15:06 -050044 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045 """
46
47 def __init__(self, selector=None):
48 super().__init__(selector)
49 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51 def _socketpair(self):
52 return socket.socketpair()
53
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020055 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056 for sig in list(self._signal_handlers):
57 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080058
Victor Stinnerfe5649c2014-07-17 22:43:40 +020059 def _process_self_data(self, data):
60 for signum in data:
61 if not signum:
62 # ignore null bytes written by _write_to_self()
63 continue
64 self._handle_signal(signum)
65
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066 def add_signal_handler(self, sig, callback, *args):
67 """Add a handler for a signal. UNIX only.
68
69 Raise ValueError if the signal number is invalid or uncatchable.
70 Raise RuntimeError if there is a problem setting up the handler.
71 """
Victor Stinner2d99d932014-11-20 15:03:52 +010072 if (coroutines.iscoroutine(callback)
73 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010074 raise TypeError("coroutines cannot be used "
75 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010077 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 try:
79 # set_wakeup_fd() raises ValueError if this is not the
80 # main thread. By calling it early we ensure that an
81 # event loop running in another thread cannot add a signal
82 # handler.
83 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020084 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 raise RuntimeError(str(exc))
86
Yury Selivanov569efa22014-02-18 18:02:19 -050087 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self._signal_handlers[sig] = handle
89
90 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020091 # Register a dummy signal handler to ask Python to write the signal
92 # number in the wakup file descriptor. _process_self_data() will
93 # read signal numbers from this file descriptor to handle signals.
94 signal.signal(sig, _sighandler_noop)
95
Charles-François Natali74e7cf32013-12-05 22:47:19 +010096 # Set SA_RESTART to limit EINTR occurrences.
97 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 except OSError as exc:
99 del self._signal_handlers[sig]
100 if not self._signal_handlers:
101 try:
102 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200103 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700104 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105
106 if exc.errno == errno.EINVAL:
107 raise RuntimeError('sig {} cannot be caught'.format(sig))
108 else:
109 raise
110
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200111 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112 """Internal helper that is the actual signal handler."""
113 handle = self._signal_handlers.get(sig)
114 if handle is None:
115 return # Assume it's some race condition.
116 if handle._cancelled:
117 self.remove_signal_handler(sig) # Remove it properly.
118 else:
119 self._add_callback_signalsafe(handle)
120
121 def remove_signal_handler(self, sig):
122 """Remove a handler for a signal. UNIX only.
123
124 Return True if a signal handler was removed, False if not.
125 """
126 self._check_signal(sig)
127 try:
128 del self._signal_handlers[sig]
129 except KeyError:
130 return False
131
132 if sig == signal.SIGINT:
133 handler = signal.default_int_handler
134 else:
135 handler = signal.SIG_DFL
136
137 try:
138 signal.signal(sig, handler)
139 except OSError as exc:
140 if exc.errno == errno.EINVAL:
141 raise RuntimeError('sig {} cannot be caught'.format(sig))
142 else:
143 raise
144
145 if not self._signal_handlers:
146 try:
147 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200148 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700149 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150
151 return True
152
153 def _check_signal(self, sig):
154 """Internal helper to validate a signal.
155
156 Raise ValueError if the signal number is invalid or uncatchable.
157 Raise RuntimeError if there is a problem setting up the handler.
158 """
159 if not isinstance(sig, int):
160 raise TypeError('sig must be an int, not {!r}'.format(sig))
161
162 if not (1 <= sig < signal.NSIG):
163 raise ValueError(
164 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
165
166 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
167 extra=None):
168 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
169
170 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
171 extra=None):
172 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
173
Victor Stinnerf951d282014-06-29 00:46:45 +0200174 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 def _make_subprocess_transport(self, protocol, args, shell,
176 stdin, stdout, stderr, bufsize,
177 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800178 with events.get_child_watcher() as watcher:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100179 waiter = futures.Future(loop=self)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 transp = _UnixSubprocessTransport(self, protocol, args, shell,
181 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100182 waiter=waiter, extra=extra,
183 **kwargs)
184
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 watcher.add_child_handler(transp.get_pid(),
186 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100187 try:
188 yield from waiter
189 except:
190 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100191 yield from transp._wait()
Victor Stinner47cd10d2015-01-30 00:05:19 +0100192 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800193
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 return transp
195
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800196 def _child_watcher_callback(self, pid, returncode, transp):
197 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198
Victor Stinnerf951d282014-06-29 00:46:45 +0200199 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500200 def create_unix_connection(self, protocol_factory, path, *,
201 ssl=None, sock=None,
202 server_hostname=None):
203 assert server_hostname is None or isinstance(server_hostname, str)
204 if ssl:
205 if server_hostname is None:
206 raise ValueError(
207 'you have to pass server_hostname when using ssl')
208 else:
209 if server_hostname is not None:
210 raise ValueError('server_hostname is only meaningful with ssl')
211
212 if path is not None:
213 if sock is not None:
214 raise ValueError(
215 'path and sock can not be specified at the same time')
216
Victor Stinner79a29522014-02-19 01:45:59 +0100217 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500218 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 sock.setblocking(False)
220 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100221 except:
222 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500223 raise
224
225 else:
226 if sock is None:
227 raise ValueError('no path and sock were specified')
228 sock.setblocking(False)
229
230 transport, protocol = yield from self._create_connection_transport(
231 sock, protocol_factory, ssl, server_hostname)
232 return transport, protocol
233
Victor Stinnerf951d282014-06-29 00:46:45 +0200234 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 def create_unix_server(self, protocol_factory, path=None, *,
236 sock=None, backlog=100, ssl=None):
237 if isinstance(ssl, bool):
238 raise TypeError('ssl argument must be an SSLContext or None')
239
240 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200241 if sock is not None:
242 raise ValueError(
243 'path and sock can not be specified at the same time')
244
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
246
247 try:
248 sock.bind(path)
249 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100250 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500251 if exc.errno == errno.EADDRINUSE:
252 # Let's improve the error message by adding
253 # with what exact address it occurs.
254 msg = 'Address {!r} is already in use'.format(path)
255 raise OSError(errno.EADDRINUSE, msg) from None
256 else:
257 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200258 except:
259 sock.close()
260 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500261 else:
262 if sock is None:
263 raise ValueError(
264 'path was not specified, and no sock specified')
265
266 if sock.family != socket.AF_UNIX:
267 raise ValueError(
268 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
269
270 server = base_events.Server(self, [sock])
271 sock.listen(backlog)
272 sock.setblocking(False)
273 self._start_serving(protocol_factory, sock, ssl, server)
274 return server
275
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200277if hasattr(os, 'set_blocking'):
278 def _set_nonblocking(fd):
279 os.set_blocking(fd, False)
280else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400281 import fcntl
282
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200283 def _set_nonblocking(fd):
284 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
285 flags = flags | os.O_NONBLOCK
286 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
288
289class _UnixReadPipeTransport(transports.ReadTransport):
290
Yury Selivanovdec1a452014-02-18 22:27:48 -0500291 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292
293 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
294 super().__init__(extra)
295 self._extra['pipe'] = pipe
296 self._loop = loop
297 self._pipe = pipe
298 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700299 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800300 if not (stat.S_ISFIFO(mode) or
301 stat.S_ISSOCK(mode) or
302 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700303 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 _set_nonblocking(self._fileno)
305 self._protocol = protocol
306 self._closing = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100308 # only start reading when connection_made() has been called
309 self._loop.call_soon(self._loop.add_reader,
310 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100312 # only wake up the waiter when connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200313 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
Victor Stinnere912e652014-07-12 03:11:53 +0200315 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100316 info = [self.__class__.__name__]
317 if self._pipe is None:
318 info.append('closed')
319 elif self._closing:
320 info.append('closing')
321 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200322 if self._pipe is not None:
323 polling = selector_events._test_selector_event(
324 self._loop._selector,
325 self._fileno, selectors.EVENT_READ)
326 if polling:
327 info.append('polling')
328 else:
329 info.append('idle')
330 else:
331 info.append('closed')
332 return '<%s>' % ' '.join(info)
333
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 def _read_ready(self):
335 try:
336 data = os.read(self._fileno, self.max_size)
337 except (BlockingIOError, InterruptedError):
338 pass
339 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100340 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 else:
342 if data:
343 self._protocol.data_received(data)
344 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200345 if self._loop.get_debug():
346 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 self._closing = True
348 self._loop.remove_reader(self._fileno)
349 self._loop.call_soon(self._protocol.eof_received)
350 self._loop.call_soon(self._call_connection_lost, None)
351
Guido van Rossum57497ad2013-10-18 07:58:20 -0700352 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 self._loop.remove_reader(self._fileno)
354
Guido van Rossum57497ad2013-10-18 07:58:20 -0700355 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 self._loop.add_reader(self._fileno, self._read_ready)
357
358 def close(self):
359 if not self._closing:
360 self._close(None)
361
Victor Stinner978a9af2015-01-29 17:50:58 +0100362 # On Python 3.3 and older, objects with a destructor part of a reference
363 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
364 # to the PEP 442.
365 if sys.version_info >= (3, 4):
366 def __del__(self):
367 if self._pipe is not None:
368 warnings.warn("unclosed transport %r" % self, ResourceWarning)
369 self._pipe.close()
370
Victor Stinner0ee29c22014-02-19 01:40:41 +0100371 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200373 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
374 if self._loop.get_debug():
375 logger.debug("%r: %s", self, message, exc_info=True)
376 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500377 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100378 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500379 'exception': exc,
380 'transport': self,
381 'protocol': self._protocol,
382 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 self._close(exc)
384
385 def _close(self, exc):
386 self._closing = True
387 self._loop.remove_reader(self._fileno)
388 self._loop.call_soon(self._call_connection_lost, exc)
389
390 def _call_connection_lost(self, exc):
391 try:
392 self._protocol.connection_lost(exc)
393 finally:
394 self._pipe.close()
395 self._pipe = None
396 self._protocol = None
397 self._loop = None
398
399
Yury Selivanov3cb99142014-02-18 18:41:13 -0500400class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800401 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
403 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100404 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 self._pipe = pipe
407 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700408 mode = os.fstat(self._fileno).st_mode
409 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100410 if not (is_socket or
411 stat.S_ISFIFO(mode) or
412 stat.S_ISCHR(mode)):
413 raise ValueError("Pipe transport is only for "
414 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 _set_nonblocking(self._fileno)
416 self._protocol = protocol
417 self._buffer = []
418 self._conn_lost = 0
419 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700420
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100422
423 # On AIX, the reader trick (to be notified when the read end of the
424 # socket is closed) only works for sockets. On other platforms it
425 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
426 if is_socket or not sys.platform.startswith("aix"):
427 # only start reading when connection_made() has been called
428 self._loop.call_soon(self._loop.add_reader,
429 self._fileno, self._read_ready)
430
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100432 # only wake up the waiter when connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200433 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434
Victor Stinnere912e652014-07-12 03:11:53 +0200435 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100436 info = [self.__class__.__name__]
437 if self._pipe is None:
438 info.append('closed')
439 elif self._closing:
440 info.append('closing')
441 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200442 if self._pipe is not None:
443 polling = selector_events._test_selector_event(
444 self._loop._selector,
445 self._fileno, selectors.EVENT_WRITE)
446 if polling:
447 info.append('polling')
448 else:
449 info.append('idle')
450
451 bufsize = self.get_write_buffer_size()
452 info.append('bufsize=%s' % bufsize)
453 else:
454 info.append('closed')
455 return '<%s>' % ' '.join(info)
456
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800457 def get_write_buffer_size(self):
458 return sum(len(data) for data in self._buffer)
459
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700461 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200462 if self._loop.get_debug():
463 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100464 if self._buffer:
465 self._close(BrokenPipeError())
466 else:
467 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468
469 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800470 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
471 if isinstance(data, bytearray):
472 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 if not data:
474 return
475
476 if self._conn_lost or self._closing:
477 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700478 logger.warning('pipe closed by peer or '
479 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 self._conn_lost += 1
481 return
482
483 if not self._buffer:
484 # Attempt to send it right away first.
485 try:
486 n = os.write(self._fileno, data)
487 except (BlockingIOError, InterruptedError):
488 n = 0
489 except Exception as exc:
490 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100491 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 return
493 if n == len(data):
494 return
495 elif n > 0:
496 data = data[n:]
497 self._loop.add_writer(self._fileno, self._write_ready)
498
499 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800500 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501
502 def _write_ready(self):
503 data = b''.join(self._buffer)
504 assert data, 'Data should not be empty'
505
506 self._buffer.clear()
507 try:
508 n = os.write(self._fileno, data)
509 except (BlockingIOError, InterruptedError):
510 self._buffer.append(data)
511 except Exception as exc:
512 self._conn_lost += 1
513 # Remove writer here, _fatal_error() doesn't it
514 # because _buffer is empty.
515 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100516 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 else:
518 if n == len(data):
519 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800520 self._maybe_resume_protocol() # May append to buffer.
521 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700522 self._loop.remove_reader(self._fileno)
523 self._call_connection_lost(None)
524 return
525 elif n > 0:
526 data = data[n:]
527
528 self._buffer.append(data) # Try again later.
529
530 def can_write_eof(self):
531 return True
532
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 def write_eof(self):
534 if self._closing:
535 return
536 assert self._pipe
537 self._closing = True
538 if not self._buffer:
539 self._loop.remove_reader(self._fileno)
540 self._loop.call_soon(self._call_connection_lost, None)
541
542 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100543 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 # write_eof is all what we needed to close the write pipe
545 self.write_eof()
546
Victor Stinner978a9af2015-01-29 17:50:58 +0100547 # On Python 3.3 and older, objects with a destructor part of a reference
548 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
549 # to the PEP 442.
550 if sys.version_info >= (3, 4):
551 def __del__(self):
552 if self._pipe is not None:
553 warnings.warn("unclosed transport %r" % self, ResourceWarning)
554 self._pipe.close()
555
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 def abort(self):
557 self._close(None)
558
Victor Stinner0ee29c22014-02-19 01:40:41 +0100559 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200561 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
562 if self._loop.get_debug():
563 logger.debug("%r: %s", self, message, exc_info=True)
564 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500565 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100566 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500567 'exception': exc,
568 'transport': self,
569 'protocol': self._protocol,
570 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 self._close(exc)
572
573 def _close(self, exc=None):
574 self._closing = True
575 if self._buffer:
576 self._loop.remove_writer(self._fileno)
577 self._buffer.clear()
578 self._loop.remove_reader(self._fileno)
579 self._loop.call_soon(self._call_connection_lost, exc)
580
581 def _call_connection_lost(self, exc):
582 try:
583 self._protocol.connection_lost(exc)
584 finally:
585 self._pipe.close()
586 self._pipe = None
587 self._protocol = None
588 self._loop = None
589
590
Victor Stinner1e40f102014-12-11 23:30:17 +0100591if hasattr(os, 'set_inheritable'):
592 # Python 3.4 and newer
593 _set_inheritable = os.set_inheritable
594else:
595 import fcntl
596
597 def _set_inheritable(fd, inheritable):
598 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
599
600 old = fcntl.fcntl(fd, fcntl.F_GETFD)
601 if not inheritable:
602 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
603 else:
604 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
605
606
Guido van Rossum59691282013-10-30 14:52:03 -0700607class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608
Guido van Rossum59691282013-10-30 14:52:03 -0700609 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700610 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700612 # Use a socket pair for stdin, since not all platforms
613 # support selecting read events on the write end of a
614 # socket (which we use in order to detect closing of the
615 # other end). Notably this is needed on AIX, and works
616 # just fine on other platforms.
617 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100618
619 # Mark the write end of the stdin pipe as non-inheritable,
620 # needed by close_fds=False on Python 3.3 and older
621 # (Python 3.4 implements the PEP 446, socketpair returns
622 # non-inheritable sockets)
623 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 self._proc = subprocess.Popen(
625 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
626 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700627 if stdin_w is not None:
628 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200629 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800630
631
632class AbstractChildWatcher:
633 """Abstract base class for monitoring child processes.
634
635 Objects derived from this class monitor a collection of subprocesses and
636 report their termination or interruption by a signal.
637
638 New callbacks are registered with .add_child_handler(). Starting a new
639 process must be done within a 'with' block to allow the watcher to suspend
640 its activity until the new process if fully registered (this is needed to
641 prevent a race condition in some implementations).
642
643 Example:
644 with watcher:
645 proc = subprocess.Popen("sleep 1")
646 watcher.add_child_handler(proc.pid, callback)
647
648 Notes:
649 Implementations of this class must be thread-safe.
650
651 Since child watcher objects may catch the SIGCHLD signal and call
652 waitpid(-1), there should be only one active object per process.
653 """
654
655 def add_child_handler(self, pid, callback, *args):
656 """Register a new child handler.
657
658 Arrange for callback(pid, returncode, *args) to be called when
659 process 'pid' terminates. Specifying another callback for the same
660 process replaces the previous handler.
661
Victor Stinneracdb7822014-07-14 18:33:40 +0200662 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800663 """
664 raise NotImplementedError()
665
666 def remove_child_handler(self, pid):
667 """Removes the handler for process 'pid'.
668
669 The function returns True if the handler was successfully removed,
670 False if there was nothing to remove."""
671
672 raise NotImplementedError()
673
Guido van Rossum2bcae702013-11-13 15:50:08 -0800674 def attach_loop(self, loop):
675 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800676
Guido van Rossum2bcae702013-11-13 15:50:08 -0800677 If the watcher was previously attached to an event loop, then it is
678 first detached before attaching to the new loop.
679
680 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800681 """
682 raise NotImplementedError()
683
684 def close(self):
685 """Close the watcher.
686
687 This must be called to make sure that any underlying resource is freed.
688 """
689 raise NotImplementedError()
690
691 def __enter__(self):
692 """Enter the watcher's context and allow starting new processes
693
694 This function must return self"""
695 raise NotImplementedError()
696
697 def __exit__(self, a, b, c):
698 """Exit the watcher's context"""
699 raise NotImplementedError()
700
701
702class BaseChildWatcher(AbstractChildWatcher):
703
Guido van Rossum2bcae702013-11-13 15:50:08 -0800704 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800705 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800706
707 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800708 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800709
710 def _do_waitpid(self, expected_pid):
711 raise NotImplementedError()
712
713 def _do_waitpid_all(self):
714 raise NotImplementedError()
715
Guido van Rossum2bcae702013-11-13 15:50:08 -0800716 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800717 assert loop is None or isinstance(loop, events.AbstractEventLoop)
718
719 if self._loop is not None:
720 self._loop.remove_signal_handler(signal.SIGCHLD)
721
722 self._loop = loop
723 if loop is not None:
724 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
725
726 # Prevent a race condition in case a child terminated
727 # during the switch.
728 self._do_waitpid_all()
729
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800730 def _sig_chld(self):
731 try:
732 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500733 except Exception as exc:
734 # self._loop should always be available here
735 # as '_sig_chld' is added as a signal handler
736 # in 'attach_loop'
737 self._loop.call_exception_handler({
738 'message': 'Unknown exception in SIGCHLD handler',
739 'exception': exc,
740 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800741
742 def _compute_returncode(self, status):
743 if os.WIFSIGNALED(status):
744 # The child process died because of a signal.
745 return -os.WTERMSIG(status)
746 elif os.WIFEXITED(status):
747 # The child process exited (e.g sys.exit()).
748 return os.WEXITSTATUS(status)
749 else:
750 # The child exited, but we don't understand its status.
751 # This shouldn't happen, but if it does, let's just
752 # return that status; perhaps that helps debug it.
753 return status
754
755
756class SafeChildWatcher(BaseChildWatcher):
757 """'Safe' child watcher implementation.
758
759 This implementation avoids disrupting other code spawning processes by
760 polling explicitly each process in the SIGCHLD handler instead of calling
761 os.waitpid(-1).
762
763 This is a safe solution but it has a significant overhead when handling a
764 big number of children (O(n) each time SIGCHLD is raised)
765 """
766
Guido van Rossum2bcae702013-11-13 15:50:08 -0800767 def __init__(self):
768 super().__init__()
769 self._callbacks = {}
770
771 def close(self):
772 self._callbacks.clear()
773 super().close()
774
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800775 def __enter__(self):
776 return self
777
778 def __exit__(self, a, b, c):
779 pass
780
781 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100782 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800783
784 # Prevent a race condition in case the child is already terminated.
785 self._do_waitpid(pid)
786
Guido van Rossum2bcae702013-11-13 15:50:08 -0800787 def remove_child_handler(self, pid):
788 try:
789 del self._callbacks[pid]
790 return True
791 except KeyError:
792 return False
793
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800794 def _do_waitpid_all(self):
795
796 for pid in list(self._callbacks):
797 self._do_waitpid(pid)
798
799 def _do_waitpid(self, expected_pid):
800 assert expected_pid > 0
801
802 try:
803 pid, status = os.waitpid(expected_pid, os.WNOHANG)
804 except ChildProcessError:
805 # The child process is already reaped
806 # (may happen if waitpid() is called elsewhere).
807 pid = expected_pid
808 returncode = 255
809 logger.warning(
810 "Unknown child process pid %d, will report returncode 255",
811 pid)
812 else:
813 if pid == 0:
814 # The child process is still alive.
815 return
816
817 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200818 if self._loop.get_debug():
819 logger.debug('process %s exited with returncode %s',
820 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800821
822 try:
823 callback, args = self._callbacks.pop(pid)
824 except KeyError: # pragma: no cover
825 # May happen if .remove_child_handler() is called
826 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200827 if self._loop.get_debug():
828 logger.warning("Child watcher got an unexpected pid: %r",
829 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800830 else:
831 callback(pid, returncode, *args)
832
833
834class FastChildWatcher(BaseChildWatcher):
835 """'Fast' child watcher implementation.
836
837 This implementation reaps every terminated processes by calling
838 os.waitpid(-1) directly, possibly breaking other code spawning processes
839 and waiting for their termination.
840
841 There is no noticeable overhead when handling a big number of children
842 (O(1) each time a child terminates).
843 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800844 def __init__(self):
845 super().__init__()
846 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800847 self._lock = threading.Lock()
848 self._zombies = {}
849 self._forks = 0
850
851 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800852 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800853 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800854 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855
856 def __enter__(self):
857 with self._lock:
858 self._forks += 1
859
860 return self
861
862 def __exit__(self, a, b, c):
863 with self._lock:
864 self._forks -= 1
865
866 if self._forks or not self._zombies:
867 return
868
869 collateral_victims = str(self._zombies)
870 self._zombies.clear()
871
872 logger.warning(
873 "Caught subprocesses termination from unknown pids: %s",
874 collateral_victims)
875
876 def add_child_handler(self, pid, callback, *args):
877 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800878 with self._lock:
879 try:
880 returncode = self._zombies.pop(pid)
881 except KeyError:
882 # The child is running.
883 self._callbacks[pid] = callback, args
884 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800885
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800886 # The child is dead already. We can fire the callback.
887 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800888
Guido van Rossum2bcae702013-11-13 15:50:08 -0800889 def remove_child_handler(self, pid):
890 try:
891 del self._callbacks[pid]
892 return True
893 except KeyError:
894 return False
895
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800896 def _do_waitpid_all(self):
897 # Because of signal coalescing, we must keep calling waitpid() as
898 # long as we're able to reap a child.
899 while True:
900 try:
901 pid, status = os.waitpid(-1, os.WNOHANG)
902 except ChildProcessError:
903 # No more child processes exist.
904 return
905 else:
906 if pid == 0:
907 # A child process is still alive.
908 return
909
910 returncode = self._compute_returncode(status)
911
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800912 with self._lock:
913 try:
914 callback, args = self._callbacks.pop(pid)
915 except KeyError:
916 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800917 if self._forks:
918 # It may not be registered yet.
919 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200920 if self._loop.get_debug():
921 logger.debug('unknown process %s exited '
922 'with returncode %s',
923 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800924 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800925 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200926 else:
927 if self._loop.get_debug():
928 logger.debug('process %s exited with returncode %s',
929 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800930
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800931 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800932 logger.warning(
933 "Caught subprocess termination from unknown pid: "
934 "%d -> %d", pid, returncode)
935 else:
936 callback(pid, returncode, *args)
937
938
939class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100940 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800941 _loop_factory = _UnixSelectorEventLoop
942
943 def __init__(self):
944 super().__init__()
945 self._watcher = None
946
947 def _init_watcher(self):
948 with events._lock:
949 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800950 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800951 if isinstance(threading.current_thread(),
952 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800953 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800954
955 def set_event_loop(self, loop):
956 """Set the event loop.
957
958 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800959 .set_event_loop() from the main thread will call .attach_loop(loop) on
960 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800961 """
962
963 super().set_event_loop(loop)
964
965 if self._watcher is not None and \
966 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800967 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800968
969 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200970 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800971
972 If not yet set, a SafeChildWatcher object is automatically created.
973 """
974 if self._watcher is None:
975 self._init_watcher()
976
977 return self._watcher
978
979 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200980 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800981
982 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
983
984 if self._watcher is not None:
985 self._watcher.close()
986
987 self._watcher = watcher
988
989SelectorEventLoop = _UnixSelectorEventLoop
990DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy