blob: 75e7c9ccadd094965ff5c900ff4b9ca9c166d417 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010019from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020021from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner915bcb02014-02-01 22:49:59 +010027__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080028 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032if sys.platform == 'win32': # pragma: no cover
33 raise ImportError('Signals are not really supported on Windows')
34
35
Victor Stinnerfe5649c2014-07-17 22:43:40 +020036def _sighandler_noop(signum, frame):
37 """Dummy signal handler."""
38 pass
39
40
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080041class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050042 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
Yury Selivanovb057c522014-02-18 12:15:06 -050044 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045 """
46
47 def __init__(self, selector=None):
48 super().__init__(selector)
49 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51 def _socketpair(self):
52 return socket.socketpair()
53
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020055 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056 for sig in list(self._signal_handlers):
57 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080058
Victor Stinnerfe5649c2014-07-17 22:43:40 +020059 def _process_self_data(self, data):
60 for signum in data:
61 if not signum:
62 # ignore null bytes written by _write_to_self()
63 continue
64 self._handle_signal(signum)
65
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066 def add_signal_handler(self, sig, callback, *args):
67 """Add a handler for a signal. UNIX only.
68
69 Raise ValueError if the signal number is invalid or uncatchable.
70 Raise RuntimeError if there is a problem setting up the handler.
71 """
Victor Stinner2d99d932014-11-20 15:03:52 +010072 if (coroutines.iscoroutine(callback)
73 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010074 raise TypeError("coroutines cannot be used "
75 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010077 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 try:
79 # set_wakeup_fd() raises ValueError if this is not the
80 # main thread. By calling it early we ensure that an
81 # event loop running in another thread cannot add a signal
82 # handler.
83 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020084 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 raise RuntimeError(str(exc))
86
Yury Selivanov569efa22014-02-18 18:02:19 -050087 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self._signal_handlers[sig] = handle
89
90 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020091 # Register a dummy signal handler to ask Python to write the signal
92 # number in the wakup file descriptor. _process_self_data() will
93 # read signal numbers from this file descriptor to handle signals.
94 signal.signal(sig, _sighandler_noop)
95
Charles-François Natali74e7cf32013-12-05 22:47:19 +010096 # Set SA_RESTART to limit EINTR occurrences.
97 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 except OSError as exc:
99 del self._signal_handlers[sig]
100 if not self._signal_handlers:
101 try:
102 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200103 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700104 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105
106 if exc.errno == errno.EINVAL:
107 raise RuntimeError('sig {} cannot be caught'.format(sig))
108 else:
109 raise
110
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200111 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112 """Internal helper that is the actual signal handler."""
113 handle = self._signal_handlers.get(sig)
114 if handle is None:
115 return # Assume it's some race condition.
116 if handle._cancelled:
117 self.remove_signal_handler(sig) # Remove it properly.
118 else:
119 self._add_callback_signalsafe(handle)
120
121 def remove_signal_handler(self, sig):
122 """Remove a handler for a signal. UNIX only.
123
124 Return True if a signal handler was removed, False if not.
125 """
126 self._check_signal(sig)
127 try:
128 del self._signal_handlers[sig]
129 except KeyError:
130 return False
131
132 if sig == signal.SIGINT:
133 handler = signal.default_int_handler
134 else:
135 handler = signal.SIG_DFL
136
137 try:
138 signal.signal(sig, handler)
139 except OSError as exc:
140 if exc.errno == errno.EINVAL:
141 raise RuntimeError('sig {} cannot be caught'.format(sig))
142 else:
143 raise
144
145 if not self._signal_handlers:
146 try:
147 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200148 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700149 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150
151 return True
152
153 def _check_signal(self, sig):
154 """Internal helper to validate a signal.
155
156 Raise ValueError if the signal number is invalid or uncatchable.
157 Raise RuntimeError if there is a problem setting up the handler.
158 """
159 if not isinstance(sig, int):
160 raise TypeError('sig must be an int, not {!r}'.format(sig))
161
162 if not (1 <= sig < signal.NSIG):
163 raise ValueError(
164 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
165
166 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
167 extra=None):
168 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
169
170 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
171 extra=None):
172 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
173
Victor Stinnerf951d282014-06-29 00:46:45 +0200174 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 def _make_subprocess_transport(self, protocol, args, shell,
176 stdin, stdout, stderr, bufsize,
177 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800178 with events.get_child_watcher() as watcher:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100179 waiter = futures.Future(loop=self)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 transp = _UnixSubprocessTransport(self, protocol, args, shell,
181 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100182 waiter=waiter, extra=extra,
183 **kwargs)
184
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 watcher.add_child_handler(transp.get_pid(),
186 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100187 try:
188 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100189 except Exception as exc:
190 # Workaround CPython bug #23353: using yield/yield-from in an
191 # except block of a generator doesn't clear properly
192 # sys.exc_info()
193 err = exc
194 else:
195 err = None
196
197 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100198 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100199 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100200 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800201
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 return transp
203
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800204 def _child_watcher_callback(self, pid, returncode, transp):
205 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
Victor Stinnerf951d282014-06-29 00:46:45 +0200207 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 def create_unix_connection(self, protocol_factory, path, *,
209 ssl=None, sock=None,
210 server_hostname=None):
211 assert server_hostname is None or isinstance(server_hostname, str)
212 if ssl:
213 if server_hostname is None:
214 raise ValueError(
215 'you have to pass server_hostname when using ssl')
216 else:
217 if server_hostname is not None:
218 raise ValueError('server_hostname is only meaningful with ssl')
219
220 if path is not None:
221 if sock is not None:
222 raise ValueError(
223 'path and sock can not be specified at the same time')
224
Victor Stinner79a29522014-02-19 01:45:59 +0100225 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500226 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 sock.setblocking(False)
228 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100229 except:
230 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 raise
232
233 else:
234 if sock is None:
235 raise ValueError('no path and sock were specified')
236 sock.setblocking(False)
237
238 transport, protocol = yield from self._create_connection_transport(
239 sock, protocol_factory, ssl, server_hostname)
240 return transport, protocol
241
Victor Stinnerf951d282014-06-29 00:46:45 +0200242 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500243 def create_unix_server(self, protocol_factory, path=None, *,
244 sock=None, backlog=100, ssl=None):
245 if isinstance(ssl, bool):
246 raise TypeError('ssl argument must be an SSLContext or None')
247
248 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200249 if sock is not None:
250 raise ValueError(
251 'path and sock can not be specified at the same time')
252
Yury Selivanovb057c522014-02-18 12:15:06 -0500253 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
254
255 try:
256 sock.bind(path)
257 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100258 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500259 if exc.errno == errno.EADDRINUSE:
260 # Let's improve the error message by adding
261 # with what exact address it occurs.
262 msg = 'Address {!r} is already in use'.format(path)
263 raise OSError(errno.EADDRINUSE, msg) from None
264 else:
265 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200266 except:
267 sock.close()
268 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500269 else:
270 if sock is None:
271 raise ValueError(
272 'path was not specified, and no sock specified')
273
274 if sock.family != socket.AF_UNIX:
275 raise ValueError(
276 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
277
278 server = base_events.Server(self, [sock])
279 sock.listen(backlog)
280 sock.setblocking(False)
281 self._start_serving(protocol_factory, sock, ssl, server)
282 return server
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200285if hasattr(os, 'set_blocking'):
286 def _set_nonblocking(fd):
287 os.set_blocking(fd, False)
288else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400289 import fcntl
290
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200291 def _set_nonblocking(fd):
292 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
293 flags = flags | os.O_NONBLOCK
294 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295
296
297class _UnixReadPipeTransport(transports.ReadTransport):
298
Yury Selivanovdec1a452014-02-18 22:27:48 -0500299 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300
301 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
302 super().__init__(extra)
303 self._extra['pipe'] = pipe
304 self._loop = loop
305 self._pipe = pipe
306 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700307 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800308 if not (stat.S_ISFIFO(mode) or
309 stat.S_ISSOCK(mode) or
310 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700311 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 _set_nonblocking(self._fileno)
313 self._protocol = protocol
314 self._closing = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100316 # only start reading when connection_made() has been called
317 self._loop.call_soon(self._loop.add_reader,
318 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100320 # only wake up the waiter when connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200321 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
Victor Stinnere912e652014-07-12 03:11:53 +0200323 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100324 info = [self.__class__.__name__]
325 if self._pipe is None:
326 info.append('closed')
327 elif self._closing:
328 info.append('closing')
329 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200330 if self._pipe is not None:
331 polling = selector_events._test_selector_event(
332 self._loop._selector,
333 self._fileno, selectors.EVENT_READ)
334 if polling:
335 info.append('polling')
336 else:
337 info.append('idle')
338 else:
339 info.append('closed')
340 return '<%s>' % ' '.join(info)
341
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 def _read_ready(self):
343 try:
344 data = os.read(self._fileno, self.max_size)
345 except (BlockingIOError, InterruptedError):
346 pass
347 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100348 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 else:
350 if data:
351 self._protocol.data_received(data)
352 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200353 if self._loop.get_debug():
354 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 self._closing = True
356 self._loop.remove_reader(self._fileno)
357 self._loop.call_soon(self._protocol.eof_received)
358 self._loop.call_soon(self._call_connection_lost, None)
359
Guido van Rossum57497ad2013-10-18 07:58:20 -0700360 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 self._loop.remove_reader(self._fileno)
362
Guido van Rossum57497ad2013-10-18 07:58:20 -0700363 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 self._loop.add_reader(self._fileno, self._read_ready)
365
366 def close(self):
367 if not self._closing:
368 self._close(None)
369
Victor Stinner978a9af2015-01-29 17:50:58 +0100370 # On Python 3.3 and older, objects with a destructor part of a reference
371 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
372 # to the PEP 442.
373 if sys.version_info >= (3, 4):
374 def __del__(self):
375 if self._pipe is not None:
376 warnings.warn("unclosed transport %r" % self, ResourceWarning)
377 self._pipe.close()
378
Victor Stinner0ee29c22014-02-19 01:40:41 +0100379 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200381 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
382 if self._loop.get_debug():
383 logger.debug("%r: %s", self, message, exc_info=True)
384 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500385 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100386 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500387 'exception': exc,
388 'transport': self,
389 'protocol': self._protocol,
390 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 self._close(exc)
392
393 def _close(self, exc):
394 self._closing = True
395 self._loop.remove_reader(self._fileno)
396 self._loop.call_soon(self._call_connection_lost, exc)
397
398 def _call_connection_lost(self, exc):
399 try:
400 self._protocol.connection_lost(exc)
401 finally:
402 self._pipe.close()
403 self._pipe = None
404 self._protocol = None
405 self._loop = None
406
407
Yury Selivanov3cb99142014-02-18 18:41:13 -0500408class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800409 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
411 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100412 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 self._pipe = pipe
415 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700416 mode = os.fstat(self._fileno).st_mode
417 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100418 if not (is_socket or
419 stat.S_ISFIFO(mode) or
420 stat.S_ISCHR(mode)):
421 raise ValueError("Pipe transport is only for "
422 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 _set_nonblocking(self._fileno)
424 self._protocol = protocol
425 self._buffer = []
426 self._conn_lost = 0
427 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700428
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100430
431 # On AIX, the reader trick (to be notified when the read end of the
432 # socket is closed) only works for sockets. On other platforms it
433 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
434 if is_socket or not sys.platform.startswith("aix"):
435 # only start reading when connection_made() has been called
436 self._loop.call_soon(self._loop.add_reader,
437 self._fileno, self._read_ready)
438
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100440 # only wake up the waiter when connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200441 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442
Victor Stinnere912e652014-07-12 03:11:53 +0200443 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100444 info = [self.__class__.__name__]
445 if self._pipe is None:
446 info.append('closed')
447 elif self._closing:
448 info.append('closing')
449 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200450 if self._pipe is not None:
451 polling = selector_events._test_selector_event(
452 self._loop._selector,
453 self._fileno, selectors.EVENT_WRITE)
454 if polling:
455 info.append('polling')
456 else:
457 info.append('idle')
458
459 bufsize = self.get_write_buffer_size()
460 info.append('bufsize=%s' % bufsize)
461 else:
462 info.append('closed')
463 return '<%s>' % ' '.join(info)
464
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800465 def get_write_buffer_size(self):
466 return sum(len(data) for data in self._buffer)
467
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700469 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200470 if self._loop.get_debug():
471 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100472 if self._buffer:
473 self._close(BrokenPipeError())
474 else:
475 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
477 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800478 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
479 if isinstance(data, bytearray):
480 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 if not data:
482 return
483
484 if self._conn_lost or self._closing:
485 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700486 logger.warning('pipe closed by peer or '
487 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 self._conn_lost += 1
489 return
490
491 if not self._buffer:
492 # Attempt to send it right away first.
493 try:
494 n = os.write(self._fileno, data)
495 except (BlockingIOError, InterruptedError):
496 n = 0
497 except Exception as exc:
498 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100499 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 return
501 if n == len(data):
502 return
503 elif n > 0:
504 data = data[n:]
505 self._loop.add_writer(self._fileno, self._write_ready)
506
507 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800508 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509
510 def _write_ready(self):
511 data = b''.join(self._buffer)
512 assert data, 'Data should not be empty'
513
514 self._buffer.clear()
515 try:
516 n = os.write(self._fileno, data)
517 except (BlockingIOError, InterruptedError):
518 self._buffer.append(data)
519 except Exception as exc:
520 self._conn_lost += 1
521 # Remove writer here, _fatal_error() doesn't it
522 # because _buffer is empty.
523 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100524 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 else:
526 if n == len(data):
527 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800528 self._maybe_resume_protocol() # May append to buffer.
529 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 self._loop.remove_reader(self._fileno)
531 self._call_connection_lost(None)
532 return
533 elif n > 0:
534 data = data[n:]
535
536 self._buffer.append(data) # Try again later.
537
538 def can_write_eof(self):
539 return True
540
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 def write_eof(self):
542 if self._closing:
543 return
544 assert self._pipe
545 self._closing = True
546 if not self._buffer:
547 self._loop.remove_reader(self._fileno)
548 self._loop.call_soon(self._call_connection_lost, None)
549
550 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100551 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 # write_eof is all what we needed to close the write pipe
553 self.write_eof()
554
Victor Stinner978a9af2015-01-29 17:50:58 +0100555 # On Python 3.3 and older, objects with a destructor part of a reference
556 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
557 # to the PEP 442.
558 if sys.version_info >= (3, 4):
559 def __del__(self):
560 if self._pipe is not None:
561 warnings.warn("unclosed transport %r" % self, ResourceWarning)
562 self._pipe.close()
563
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 def abort(self):
565 self._close(None)
566
Victor Stinner0ee29c22014-02-19 01:40:41 +0100567 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200569 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
570 if self._loop.get_debug():
571 logger.debug("%r: %s", self, message, exc_info=True)
572 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500573 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100574 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500575 'exception': exc,
576 'transport': self,
577 'protocol': self._protocol,
578 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 self._close(exc)
580
581 def _close(self, exc=None):
582 self._closing = True
583 if self._buffer:
584 self._loop.remove_writer(self._fileno)
585 self._buffer.clear()
586 self._loop.remove_reader(self._fileno)
587 self._loop.call_soon(self._call_connection_lost, exc)
588
589 def _call_connection_lost(self, exc):
590 try:
591 self._protocol.connection_lost(exc)
592 finally:
593 self._pipe.close()
594 self._pipe = None
595 self._protocol = None
596 self._loop = None
597
598
Victor Stinner1e40f102014-12-11 23:30:17 +0100599if hasattr(os, 'set_inheritable'):
600 # Python 3.4 and newer
601 _set_inheritable = os.set_inheritable
602else:
603 import fcntl
604
605 def _set_inheritable(fd, inheritable):
606 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
607
608 old = fcntl.fcntl(fd, fcntl.F_GETFD)
609 if not inheritable:
610 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
611 else:
612 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
613
614
Guido van Rossum59691282013-10-30 14:52:03 -0700615class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616
Guido van Rossum59691282013-10-30 14:52:03 -0700617 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700618 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700620 # Use a socket pair for stdin, since not all platforms
621 # support selecting read events on the write end of a
622 # socket (which we use in order to detect closing of the
623 # other end). Notably this is needed on AIX, and works
624 # just fine on other platforms.
625 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100626
627 # Mark the write end of the stdin pipe as non-inheritable,
628 # needed by close_fds=False on Python 3.3 and older
629 # (Python 3.4 implements the PEP 446, socketpair returns
630 # non-inheritable sockets)
631 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 self._proc = subprocess.Popen(
633 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
634 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700635 if stdin_w is not None:
636 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200637 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800638
639
640class AbstractChildWatcher:
641 """Abstract base class for monitoring child processes.
642
643 Objects derived from this class monitor a collection of subprocesses and
644 report their termination or interruption by a signal.
645
646 New callbacks are registered with .add_child_handler(). Starting a new
647 process must be done within a 'with' block to allow the watcher to suspend
648 its activity until the new process if fully registered (this is needed to
649 prevent a race condition in some implementations).
650
651 Example:
652 with watcher:
653 proc = subprocess.Popen("sleep 1")
654 watcher.add_child_handler(proc.pid, callback)
655
656 Notes:
657 Implementations of this class must be thread-safe.
658
659 Since child watcher objects may catch the SIGCHLD signal and call
660 waitpid(-1), there should be only one active object per process.
661 """
662
663 def add_child_handler(self, pid, callback, *args):
664 """Register a new child handler.
665
666 Arrange for callback(pid, returncode, *args) to be called when
667 process 'pid' terminates. Specifying another callback for the same
668 process replaces the previous handler.
669
Victor Stinneracdb7822014-07-14 18:33:40 +0200670 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800671 """
672 raise NotImplementedError()
673
674 def remove_child_handler(self, pid):
675 """Removes the handler for process 'pid'.
676
677 The function returns True if the handler was successfully removed,
678 False if there was nothing to remove."""
679
680 raise NotImplementedError()
681
Guido van Rossum2bcae702013-11-13 15:50:08 -0800682 def attach_loop(self, loop):
683 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800684
Guido van Rossum2bcae702013-11-13 15:50:08 -0800685 If the watcher was previously attached to an event loop, then it is
686 first detached before attaching to the new loop.
687
688 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800689 """
690 raise NotImplementedError()
691
692 def close(self):
693 """Close the watcher.
694
695 This must be called to make sure that any underlying resource is freed.
696 """
697 raise NotImplementedError()
698
699 def __enter__(self):
700 """Enter the watcher's context and allow starting new processes
701
702 This function must return self"""
703 raise NotImplementedError()
704
705 def __exit__(self, a, b, c):
706 """Exit the watcher's context"""
707 raise NotImplementedError()
708
709
710class BaseChildWatcher(AbstractChildWatcher):
711
Guido van Rossum2bcae702013-11-13 15:50:08 -0800712 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800713 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800714
715 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800716 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800717
718 def _do_waitpid(self, expected_pid):
719 raise NotImplementedError()
720
721 def _do_waitpid_all(self):
722 raise NotImplementedError()
723
Guido van Rossum2bcae702013-11-13 15:50:08 -0800724 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800725 assert loop is None or isinstance(loop, events.AbstractEventLoop)
726
727 if self._loop is not None:
728 self._loop.remove_signal_handler(signal.SIGCHLD)
729
730 self._loop = loop
731 if loop is not None:
732 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
733
734 # Prevent a race condition in case a child terminated
735 # during the switch.
736 self._do_waitpid_all()
737
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800738 def _sig_chld(self):
739 try:
740 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500741 except Exception as exc:
742 # self._loop should always be available here
743 # as '_sig_chld' is added as a signal handler
744 # in 'attach_loop'
745 self._loop.call_exception_handler({
746 'message': 'Unknown exception in SIGCHLD handler',
747 'exception': exc,
748 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800749
750 def _compute_returncode(self, status):
751 if os.WIFSIGNALED(status):
752 # The child process died because of a signal.
753 return -os.WTERMSIG(status)
754 elif os.WIFEXITED(status):
755 # The child process exited (e.g sys.exit()).
756 return os.WEXITSTATUS(status)
757 else:
758 # The child exited, but we don't understand its status.
759 # This shouldn't happen, but if it does, let's just
760 # return that status; perhaps that helps debug it.
761 return status
762
763
764class SafeChildWatcher(BaseChildWatcher):
765 """'Safe' child watcher implementation.
766
767 This implementation avoids disrupting other code spawning processes by
768 polling explicitly each process in the SIGCHLD handler instead of calling
769 os.waitpid(-1).
770
771 This is a safe solution but it has a significant overhead when handling a
772 big number of children (O(n) each time SIGCHLD is raised)
773 """
774
Guido van Rossum2bcae702013-11-13 15:50:08 -0800775 def __init__(self):
776 super().__init__()
777 self._callbacks = {}
778
779 def close(self):
780 self._callbacks.clear()
781 super().close()
782
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800783 def __enter__(self):
784 return self
785
786 def __exit__(self, a, b, c):
787 pass
788
789 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100790 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800791
792 # Prevent a race condition in case the child is already terminated.
793 self._do_waitpid(pid)
794
Guido van Rossum2bcae702013-11-13 15:50:08 -0800795 def remove_child_handler(self, pid):
796 try:
797 del self._callbacks[pid]
798 return True
799 except KeyError:
800 return False
801
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800802 def _do_waitpid_all(self):
803
804 for pid in list(self._callbacks):
805 self._do_waitpid(pid)
806
807 def _do_waitpid(self, expected_pid):
808 assert expected_pid > 0
809
810 try:
811 pid, status = os.waitpid(expected_pid, os.WNOHANG)
812 except ChildProcessError:
813 # The child process is already reaped
814 # (may happen if waitpid() is called elsewhere).
815 pid = expected_pid
816 returncode = 255
817 logger.warning(
818 "Unknown child process pid %d, will report returncode 255",
819 pid)
820 else:
821 if pid == 0:
822 # The child process is still alive.
823 return
824
825 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200826 if self._loop.get_debug():
827 logger.debug('process %s exited with returncode %s',
828 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800829
830 try:
831 callback, args = self._callbacks.pop(pid)
832 except KeyError: # pragma: no cover
833 # May happen if .remove_child_handler() is called
834 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200835 if self._loop.get_debug():
836 logger.warning("Child watcher got an unexpected pid: %r",
837 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800838 else:
839 callback(pid, returncode, *args)
840
841
842class FastChildWatcher(BaseChildWatcher):
843 """'Fast' child watcher implementation.
844
845 This implementation reaps every terminated processes by calling
846 os.waitpid(-1) directly, possibly breaking other code spawning processes
847 and waiting for their termination.
848
849 There is no noticeable overhead when handling a big number of children
850 (O(1) each time a child terminates).
851 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800852 def __init__(self):
853 super().__init__()
854 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 self._lock = threading.Lock()
856 self._zombies = {}
857 self._forks = 0
858
859 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800860 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800861 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800862 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800863
864 def __enter__(self):
865 with self._lock:
866 self._forks += 1
867
868 return self
869
870 def __exit__(self, a, b, c):
871 with self._lock:
872 self._forks -= 1
873
874 if self._forks or not self._zombies:
875 return
876
877 collateral_victims = str(self._zombies)
878 self._zombies.clear()
879
880 logger.warning(
881 "Caught subprocesses termination from unknown pids: %s",
882 collateral_victims)
883
884 def add_child_handler(self, pid, callback, *args):
885 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800886 with self._lock:
887 try:
888 returncode = self._zombies.pop(pid)
889 except KeyError:
890 # The child is running.
891 self._callbacks[pid] = callback, args
892 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800893
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800894 # The child is dead already. We can fire the callback.
895 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800896
Guido van Rossum2bcae702013-11-13 15:50:08 -0800897 def remove_child_handler(self, pid):
898 try:
899 del self._callbacks[pid]
900 return True
901 except KeyError:
902 return False
903
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800904 def _do_waitpid_all(self):
905 # Because of signal coalescing, we must keep calling waitpid() as
906 # long as we're able to reap a child.
907 while True:
908 try:
909 pid, status = os.waitpid(-1, os.WNOHANG)
910 except ChildProcessError:
911 # No more child processes exist.
912 return
913 else:
914 if pid == 0:
915 # A child process is still alive.
916 return
917
918 returncode = self._compute_returncode(status)
919
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800920 with self._lock:
921 try:
922 callback, args = self._callbacks.pop(pid)
923 except KeyError:
924 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800925 if self._forks:
926 # It may not be registered yet.
927 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200928 if self._loop.get_debug():
929 logger.debug('unknown process %s exited '
930 'with returncode %s',
931 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800932 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800933 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200934 else:
935 if self._loop.get_debug():
936 logger.debug('process %s exited with returncode %s',
937 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800938
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800939 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800940 logger.warning(
941 "Caught subprocess termination from unknown pid: "
942 "%d -> %d", pid, returncode)
943 else:
944 callback(pid, returncode, *args)
945
946
947class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100948 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800949 _loop_factory = _UnixSelectorEventLoop
950
951 def __init__(self):
952 super().__init__()
953 self._watcher = None
954
955 def _init_watcher(self):
956 with events._lock:
957 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800958 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800959 if isinstance(threading.current_thread(),
960 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800961 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800962
963 def set_event_loop(self, loop):
964 """Set the event loop.
965
966 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800967 .set_event_loop() from the main thread will call .attach_loop(loop) on
968 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800969 """
970
971 super().set_event_loop(loop)
972
973 if self._watcher is not None and \
974 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800975 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800976
977 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200978 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800979
980 If not yet set, a SafeChildWatcher object is automatically created.
981 """
982 if self._watcher is None:
983 self._init_watcher()
984
985 return self._watcher
986
987 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200988 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800989
990 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
991
992 if self._watcher is not None:
993 self._watcher.close()
994
995 self._watcher = watcher
996
997SelectorEventLoop = _UnixSelectorEventLoop
998DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy