blob: d1461fd02485222ed5dc66a4570276f863de56b5 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Victor Stinner2d99d932014-11-20 15:03:52 +010070 if (coroutines.iscoroutine(callback)
71 or coroutines.iscoroutinefunction(callback)):
72 raise TypeError("coroutines cannot be used with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010074 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 try:
76 # set_wakeup_fd() raises ValueError if this is not the
77 # main thread. By calling it early we ensure that an
78 # event loop running in another thread cannot add a signal
79 # handler.
80 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020081 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082 raise RuntimeError(str(exc))
83
Yury Selivanov569efa22014-02-18 18:02:19 -050084 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 self._signal_handlers[sig] = handle
86
87 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020088 # Register a dummy signal handler to ask Python to write the signal
89 # number in the wakup file descriptor. _process_self_data() will
90 # read signal numbers from this file descriptor to handle signals.
91 signal.signal(sig, _sighandler_noop)
92
Charles-François Natali74e7cf32013-12-05 22:47:19 +010093 # Set SA_RESTART to limit EINTR occurrences.
94 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 except OSError as exc:
96 del self._signal_handlers[sig]
97 if not self._signal_handlers:
98 try:
99 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200100 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700101 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102
103 if exc.errno == errno.EINVAL:
104 raise RuntimeError('sig {} cannot be caught'.format(sig))
105 else:
106 raise
107
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200108 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 """Internal helper that is the actual signal handler."""
110 handle = self._signal_handlers.get(sig)
111 if handle is None:
112 return # Assume it's some race condition.
113 if handle._cancelled:
114 self.remove_signal_handler(sig) # Remove it properly.
115 else:
116 self._add_callback_signalsafe(handle)
117
118 def remove_signal_handler(self, sig):
119 """Remove a handler for a signal. UNIX only.
120
121 Return True if a signal handler was removed, False if not.
122 """
123 self._check_signal(sig)
124 try:
125 del self._signal_handlers[sig]
126 except KeyError:
127 return False
128
129 if sig == signal.SIGINT:
130 handler = signal.default_int_handler
131 else:
132 handler = signal.SIG_DFL
133
134 try:
135 signal.signal(sig, handler)
136 except OSError as exc:
137 if exc.errno == errno.EINVAL:
138 raise RuntimeError('sig {} cannot be caught'.format(sig))
139 else:
140 raise
141
142 if not self._signal_handlers:
143 try:
144 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200145 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700146 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147
148 return True
149
150 def _check_signal(self, sig):
151 """Internal helper to validate a signal.
152
153 Raise ValueError if the signal number is invalid or uncatchable.
154 Raise RuntimeError if there is a problem setting up the handler.
155 """
156 if not isinstance(sig, int):
157 raise TypeError('sig must be an int, not {!r}'.format(sig))
158
159 if not (1 <= sig < signal.NSIG):
160 raise ValueError(
161 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
162
163 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
164 extra=None):
165 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
166
167 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
170
Victor Stinnerf951d282014-06-29 00:46:45 +0200171 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 def _make_subprocess_transport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
174 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 with events.get_child_watcher() as watcher:
176 transp = _UnixSubprocessTransport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800178 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800179 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 watcher.add_child_handler(transp.get_pid(),
181 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800182
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 return transp
184
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 def _child_watcher_callback(self, pid, returncode, transp):
186 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187
Victor Stinnerf951d282014-06-29 00:46:45 +0200188 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500189 def create_unix_connection(self, protocol_factory, path, *,
190 ssl=None, sock=None,
191 server_hostname=None):
192 assert server_hostname is None or isinstance(server_hostname, str)
193 if ssl:
194 if server_hostname is None:
195 raise ValueError(
196 'you have to pass server_hostname when using ssl')
197 else:
198 if server_hostname is not None:
199 raise ValueError('server_hostname is only meaningful with ssl')
200
201 if path is not None:
202 if sock is not None:
203 raise ValueError(
204 'path and sock can not be specified at the same time')
205
Victor Stinner79a29522014-02-19 01:45:59 +0100206 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500207 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 sock.setblocking(False)
209 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100210 except:
211 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500212 raise
213
214 else:
215 if sock is None:
216 raise ValueError('no path and sock were specified')
217 sock.setblocking(False)
218
219 transport, protocol = yield from self._create_connection_transport(
220 sock, protocol_factory, ssl, server_hostname)
221 return transport, protocol
222
Victor Stinnerf951d282014-06-29 00:46:45 +0200223 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500224 def create_unix_server(self, protocol_factory, path=None, *,
225 sock=None, backlog=100, ssl=None):
226 if isinstance(ssl, bool):
227 raise TypeError('ssl argument must be an SSLContext or None')
228
229 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200230 if sock is not None:
231 raise ValueError(
232 'path and sock can not be specified at the same time')
233
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
235
236 try:
237 sock.bind(path)
238 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100239 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500240 if exc.errno == errno.EADDRINUSE:
241 # Let's improve the error message by adding
242 # with what exact address it occurs.
243 msg = 'Address {!r} is already in use'.format(path)
244 raise OSError(errno.EADDRINUSE, msg) from None
245 else:
246 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200247 except:
248 sock.close()
249 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500250 else:
251 if sock is None:
252 raise ValueError(
253 'path was not specified, and no sock specified')
254
255 if sock.family != socket.AF_UNIX:
256 raise ValueError(
257 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
258
259 server = base_events.Server(self, [sock])
260 sock.listen(backlog)
261 sock.setblocking(False)
262 self._start_serving(protocol_factory, sock, ssl, server)
263 return server
264
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200266if hasattr(os, 'set_blocking'):
267 def _set_nonblocking(fd):
268 os.set_blocking(fd, False)
269else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400270 import fcntl
271
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200272 def _set_nonblocking(fd):
273 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
274 flags = flags | os.O_NONBLOCK
275 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277
278class _UnixReadPipeTransport(transports.ReadTransport):
279
Yury Selivanovdec1a452014-02-18 22:27:48 -0500280 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281
282 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
283 super().__init__(extra)
284 self._extra['pipe'] = pipe
285 self._loop = loop
286 self._pipe = pipe
287 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700288 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800289 if not (stat.S_ISFIFO(mode) or
290 stat.S_ISSOCK(mode) or
291 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700292 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 _set_nonblocking(self._fileno)
294 self._protocol = protocol
295 self._closing = False
296 self._loop.add_reader(self._fileno, self._read_ready)
297 self._loop.call_soon(self._protocol.connection_made, self)
298 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200299 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200300 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
Victor Stinnere912e652014-07-12 03:11:53 +0200302 def __repr__(self):
303 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
304 if self._pipe is not None:
305 polling = selector_events._test_selector_event(
306 self._loop._selector,
307 self._fileno, selectors.EVENT_READ)
308 if polling:
309 info.append('polling')
310 else:
311 info.append('idle')
312 else:
313 info.append('closed')
314 return '<%s>' % ' '.join(info)
315
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 def _read_ready(self):
317 try:
318 data = os.read(self._fileno, self.max_size)
319 except (BlockingIOError, InterruptedError):
320 pass
321 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100322 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 else:
324 if data:
325 self._protocol.data_received(data)
326 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200327 if self._loop.get_debug():
328 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self._closing = True
330 self._loop.remove_reader(self._fileno)
331 self._loop.call_soon(self._protocol.eof_received)
332 self._loop.call_soon(self._call_connection_lost, None)
333
Guido van Rossum57497ad2013-10-18 07:58:20 -0700334 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 self._loop.remove_reader(self._fileno)
336
Guido van Rossum57497ad2013-10-18 07:58:20 -0700337 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 self._loop.add_reader(self._fileno, self._read_ready)
339
340 def close(self):
341 if not self._closing:
342 self._close(None)
343
Victor Stinner0ee29c22014-02-19 01:40:41 +0100344 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200346 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
347 if self._loop.get_debug():
348 logger.debug("%r: %s", self, message, exc_info=True)
349 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500350 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100351 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500352 'exception': exc,
353 'transport': self,
354 'protocol': self._protocol,
355 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 self._close(exc)
357
358 def _close(self, exc):
359 self._closing = True
360 self._loop.remove_reader(self._fileno)
361 self._loop.call_soon(self._call_connection_lost, exc)
362
363 def _call_connection_lost(self, exc):
364 try:
365 self._protocol.connection_lost(exc)
366 finally:
367 self._pipe.close()
368 self._pipe = None
369 self._protocol = None
370 self._loop = None
371
372
Yury Selivanov3cb99142014-02-18 18:41:13 -0500373class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800374 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375
376 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100377 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 self._pipe = pipe
380 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700381 mode = os.fstat(self._fileno).st_mode
382 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100383 if not (is_socket or
384 stat.S_ISFIFO(mode) or
385 stat.S_ISCHR(mode)):
386 raise ValueError("Pipe transport is only for "
387 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 _set_nonblocking(self._fileno)
389 self._protocol = protocol
390 self._buffer = []
391 self._conn_lost = 0
392 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700393
394 # On AIX, the reader trick only works for sockets.
395 # On other platforms it works for pipes and sockets.
396 # (Exception: OS X 10.4? Issue #19294.)
397 if is_socket or not sys.platform.startswith("aix"):
398 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
400 self._loop.call_soon(self._protocol.connection_made, self)
401 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200402 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200403 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Victor Stinnere912e652014-07-12 03:11:53 +0200405 def __repr__(self):
406 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
407 if self._pipe is not None:
408 polling = selector_events._test_selector_event(
409 self._loop._selector,
410 self._fileno, selectors.EVENT_WRITE)
411 if polling:
412 info.append('polling')
413 else:
414 info.append('idle')
415
416 bufsize = self.get_write_buffer_size()
417 info.append('bufsize=%s' % bufsize)
418 else:
419 info.append('closed')
420 return '<%s>' % ' '.join(info)
421
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800422 def get_write_buffer_size(self):
423 return sum(len(data) for data in self._buffer)
424
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700426 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200427 if self._loop.get_debug():
428 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100429 if self._buffer:
430 self._close(BrokenPipeError())
431 else:
432 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
434 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800435 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
436 if isinstance(data, bytearray):
437 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 if not data:
439 return
440
441 if self._conn_lost or self._closing:
442 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700443 logger.warning('pipe closed by peer or '
444 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 self._conn_lost += 1
446 return
447
448 if not self._buffer:
449 # Attempt to send it right away first.
450 try:
451 n = os.write(self._fileno, data)
452 except (BlockingIOError, InterruptedError):
453 n = 0
454 except Exception as exc:
455 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100456 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 return
458 if n == len(data):
459 return
460 elif n > 0:
461 data = data[n:]
462 self._loop.add_writer(self._fileno, self._write_ready)
463
464 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800465 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
467 def _write_ready(self):
468 data = b''.join(self._buffer)
469 assert data, 'Data should not be empty'
470
471 self._buffer.clear()
472 try:
473 n = os.write(self._fileno, data)
474 except (BlockingIOError, InterruptedError):
475 self._buffer.append(data)
476 except Exception as exc:
477 self._conn_lost += 1
478 # Remove writer here, _fatal_error() doesn't it
479 # because _buffer is empty.
480 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100481 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 else:
483 if n == len(data):
484 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800485 self._maybe_resume_protocol() # May append to buffer.
486 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 self._loop.remove_reader(self._fileno)
488 self._call_connection_lost(None)
489 return
490 elif n > 0:
491 data = data[n:]
492
493 self._buffer.append(data) # Try again later.
494
495 def can_write_eof(self):
496 return True
497
498 # TODO: Make the relationships between write_eof(), close(),
499 # abort(), _fatal_error() and _close() more straightforward.
500
501 def write_eof(self):
502 if self._closing:
503 return
504 assert self._pipe
505 self._closing = True
506 if not self._buffer:
507 self._loop.remove_reader(self._fileno)
508 self._loop.call_soon(self._call_connection_lost, None)
509
510 def close(self):
511 if not self._closing:
512 # write_eof is all what we needed to close the write pipe
513 self.write_eof()
514
515 def abort(self):
516 self._close(None)
517
Victor Stinner0ee29c22014-02-19 01:40:41 +0100518 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200520 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
521 if self._loop.get_debug():
522 logger.debug("%r: %s", self, message, exc_info=True)
523 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500524 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100525 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500526 'exception': exc,
527 'transport': self,
528 'protocol': self._protocol,
529 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 self._close(exc)
531
532 def _close(self, exc=None):
533 self._closing = True
534 if self._buffer:
535 self._loop.remove_writer(self._fileno)
536 self._buffer.clear()
537 self._loop.remove_reader(self._fileno)
538 self._loop.call_soon(self._call_connection_lost, exc)
539
540 def _call_connection_lost(self, exc):
541 try:
542 self._protocol.connection_lost(exc)
543 finally:
544 self._pipe.close()
545 self._pipe = None
546 self._protocol = None
547 self._loop = None
548
549
Victor Stinner1e40f102014-12-11 23:30:17 +0100550if hasattr(os, 'set_inheritable'):
551 # Python 3.4 and newer
552 _set_inheritable = os.set_inheritable
553else:
554 import fcntl
555
556 def _set_inheritable(fd, inheritable):
557 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
558
559 old = fcntl.fcntl(fd, fcntl.F_GETFD)
560 if not inheritable:
561 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
562 else:
563 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
564
565
Guido van Rossum59691282013-10-30 14:52:03 -0700566class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567
Guido van Rossum59691282013-10-30 14:52:03 -0700568 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700569 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700571 # Use a socket pair for stdin, since not all platforms
572 # support selecting read events on the write end of a
573 # socket (which we use in order to detect closing of the
574 # other end). Notably this is needed on AIX, and works
575 # just fine on other platforms.
576 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100577
578 # Mark the write end of the stdin pipe as non-inheritable,
579 # needed by close_fds=False on Python 3.3 and older
580 # (Python 3.4 implements the PEP 446, socketpair returns
581 # non-inheritable sockets)
582 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 self._proc = subprocess.Popen(
584 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
585 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700586 if stdin_w is not None:
587 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200588 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800589
590
591class AbstractChildWatcher:
592 """Abstract base class for monitoring child processes.
593
594 Objects derived from this class monitor a collection of subprocesses and
595 report their termination or interruption by a signal.
596
597 New callbacks are registered with .add_child_handler(). Starting a new
598 process must be done within a 'with' block to allow the watcher to suspend
599 its activity until the new process if fully registered (this is needed to
600 prevent a race condition in some implementations).
601
602 Example:
603 with watcher:
604 proc = subprocess.Popen("sleep 1")
605 watcher.add_child_handler(proc.pid, callback)
606
607 Notes:
608 Implementations of this class must be thread-safe.
609
610 Since child watcher objects may catch the SIGCHLD signal and call
611 waitpid(-1), there should be only one active object per process.
612 """
613
614 def add_child_handler(self, pid, callback, *args):
615 """Register a new child handler.
616
617 Arrange for callback(pid, returncode, *args) to be called when
618 process 'pid' terminates. Specifying another callback for the same
619 process replaces the previous handler.
620
Victor Stinneracdb7822014-07-14 18:33:40 +0200621 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800622 """
623 raise NotImplementedError()
624
625 def remove_child_handler(self, pid):
626 """Removes the handler for process 'pid'.
627
628 The function returns True if the handler was successfully removed,
629 False if there was nothing to remove."""
630
631 raise NotImplementedError()
632
Guido van Rossum2bcae702013-11-13 15:50:08 -0800633 def attach_loop(self, loop):
634 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800635
Guido van Rossum2bcae702013-11-13 15:50:08 -0800636 If the watcher was previously attached to an event loop, then it is
637 first detached before attaching to the new loop.
638
639 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800640 """
641 raise NotImplementedError()
642
643 def close(self):
644 """Close the watcher.
645
646 This must be called to make sure that any underlying resource is freed.
647 """
648 raise NotImplementedError()
649
650 def __enter__(self):
651 """Enter the watcher's context and allow starting new processes
652
653 This function must return self"""
654 raise NotImplementedError()
655
656 def __exit__(self, a, b, c):
657 """Exit the watcher's context"""
658 raise NotImplementedError()
659
660
661class BaseChildWatcher(AbstractChildWatcher):
662
Guido van Rossum2bcae702013-11-13 15:50:08 -0800663 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800664 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800665
666 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800667 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800668
669 def _do_waitpid(self, expected_pid):
670 raise NotImplementedError()
671
672 def _do_waitpid_all(self):
673 raise NotImplementedError()
674
Guido van Rossum2bcae702013-11-13 15:50:08 -0800675 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800676 assert loop is None or isinstance(loop, events.AbstractEventLoop)
677
678 if self._loop is not None:
679 self._loop.remove_signal_handler(signal.SIGCHLD)
680
681 self._loop = loop
682 if loop is not None:
683 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
684
685 # Prevent a race condition in case a child terminated
686 # during the switch.
687 self._do_waitpid_all()
688
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800689 def _sig_chld(self):
690 try:
691 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500692 except Exception as exc:
693 # self._loop should always be available here
694 # as '_sig_chld' is added as a signal handler
695 # in 'attach_loop'
696 self._loop.call_exception_handler({
697 'message': 'Unknown exception in SIGCHLD handler',
698 'exception': exc,
699 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800700
701 def _compute_returncode(self, status):
702 if os.WIFSIGNALED(status):
703 # The child process died because of a signal.
704 return -os.WTERMSIG(status)
705 elif os.WIFEXITED(status):
706 # The child process exited (e.g sys.exit()).
707 return os.WEXITSTATUS(status)
708 else:
709 # The child exited, but we don't understand its status.
710 # This shouldn't happen, but if it does, let's just
711 # return that status; perhaps that helps debug it.
712 return status
713
714
715class SafeChildWatcher(BaseChildWatcher):
716 """'Safe' child watcher implementation.
717
718 This implementation avoids disrupting other code spawning processes by
719 polling explicitly each process in the SIGCHLD handler instead of calling
720 os.waitpid(-1).
721
722 This is a safe solution but it has a significant overhead when handling a
723 big number of children (O(n) each time SIGCHLD is raised)
724 """
725
Guido van Rossum2bcae702013-11-13 15:50:08 -0800726 def __init__(self):
727 super().__init__()
728 self._callbacks = {}
729
730 def close(self):
731 self._callbacks.clear()
732 super().close()
733
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800734 def __enter__(self):
735 return self
736
737 def __exit__(self, a, b, c):
738 pass
739
740 def add_child_handler(self, pid, callback, *args):
741 self._callbacks[pid] = callback, args
742
743 # Prevent a race condition in case the child is already terminated.
744 self._do_waitpid(pid)
745
Guido van Rossum2bcae702013-11-13 15:50:08 -0800746 def remove_child_handler(self, pid):
747 try:
748 del self._callbacks[pid]
749 return True
750 except KeyError:
751 return False
752
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753 def _do_waitpid_all(self):
754
755 for pid in list(self._callbacks):
756 self._do_waitpid(pid)
757
758 def _do_waitpid(self, expected_pid):
759 assert expected_pid > 0
760
761 try:
762 pid, status = os.waitpid(expected_pid, os.WNOHANG)
763 except ChildProcessError:
764 # The child process is already reaped
765 # (may happen if waitpid() is called elsewhere).
766 pid = expected_pid
767 returncode = 255
768 logger.warning(
769 "Unknown child process pid %d, will report returncode 255",
770 pid)
771 else:
772 if pid == 0:
773 # The child process is still alive.
774 return
775
776 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200777 if self._loop.get_debug():
778 logger.debug('process %s exited with returncode %s',
779 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800780
781 try:
782 callback, args = self._callbacks.pop(pid)
783 except KeyError: # pragma: no cover
784 # May happen if .remove_child_handler() is called
785 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200786 if self._loop.get_debug():
787 logger.warning("Child watcher got an unexpected pid: %r",
788 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800789 else:
790 callback(pid, returncode, *args)
791
792
793class FastChildWatcher(BaseChildWatcher):
794 """'Fast' child watcher implementation.
795
796 This implementation reaps every terminated processes by calling
797 os.waitpid(-1) directly, possibly breaking other code spawning processes
798 and waiting for their termination.
799
800 There is no noticeable overhead when handling a big number of children
801 (O(1) each time a child terminates).
802 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800803 def __init__(self):
804 super().__init__()
805 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800806 self._lock = threading.Lock()
807 self._zombies = {}
808 self._forks = 0
809
810 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800811 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800812 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800813 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800814
815 def __enter__(self):
816 with self._lock:
817 self._forks += 1
818
819 return self
820
821 def __exit__(self, a, b, c):
822 with self._lock:
823 self._forks -= 1
824
825 if self._forks or not self._zombies:
826 return
827
828 collateral_victims = str(self._zombies)
829 self._zombies.clear()
830
831 logger.warning(
832 "Caught subprocesses termination from unknown pids: %s",
833 collateral_victims)
834
835 def add_child_handler(self, pid, callback, *args):
836 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800837 with self._lock:
838 try:
839 returncode = self._zombies.pop(pid)
840 except KeyError:
841 # The child is running.
842 self._callbacks[pid] = callback, args
843 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800844
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800845 # The child is dead already. We can fire the callback.
846 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800847
Guido van Rossum2bcae702013-11-13 15:50:08 -0800848 def remove_child_handler(self, pid):
849 try:
850 del self._callbacks[pid]
851 return True
852 except KeyError:
853 return False
854
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 def _do_waitpid_all(self):
856 # Because of signal coalescing, we must keep calling waitpid() as
857 # long as we're able to reap a child.
858 while True:
859 try:
860 pid, status = os.waitpid(-1, os.WNOHANG)
861 except ChildProcessError:
862 # No more child processes exist.
863 return
864 else:
865 if pid == 0:
866 # A child process is still alive.
867 return
868
869 returncode = self._compute_returncode(status)
870
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800871 with self._lock:
872 try:
873 callback, args = self._callbacks.pop(pid)
874 except KeyError:
875 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800876 if self._forks:
877 # It may not be registered yet.
878 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200879 if self._loop.get_debug():
880 logger.debug('unknown process %s exited '
881 'with returncode %s',
882 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800883 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800884 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200885 else:
886 if self._loop.get_debug():
887 logger.debug('process %s exited with returncode %s',
888 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800889
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800890 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891 logger.warning(
892 "Caught subprocess termination from unknown pid: "
893 "%d -> %d", pid, returncode)
894 else:
895 callback(pid, returncode, *args)
896
897
898class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
899 """XXX"""
900 _loop_factory = _UnixSelectorEventLoop
901
902 def __init__(self):
903 super().__init__()
904 self._watcher = None
905
906 def _init_watcher(self):
907 with events._lock:
908 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800909 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800910 if isinstance(threading.current_thread(),
911 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800912 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800913
914 def set_event_loop(self, loop):
915 """Set the event loop.
916
917 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800918 .set_event_loop() from the main thread will call .attach_loop(loop) on
919 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800920 """
921
922 super().set_event_loop(loop)
923
924 if self._watcher is not None and \
925 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800926 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800927
928 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200929 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800930
931 If not yet set, a SafeChildWatcher object is automatically created.
932 """
933 if self._watcher is None:
934 self._init_watcher()
935
936 return self._watcher
937
938 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200939 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800940
941 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
942
943 if self._watcher is not None:
944 self._watcher.close()
945
946 self._watcher = watcher
947
948SelectorEventLoop = _UnixSelectorEventLoop
949DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy