blob: d5db4d55fa0c14881580f255cf4c3b597646dc8d [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Victor Stinner2d99d932014-11-20 15:03:52 +010070 if (coroutines.iscoroutine(callback)
71 or coroutines.iscoroutinefunction(callback)):
72 raise TypeError("coroutines cannot be used with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010074 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 try:
76 # set_wakeup_fd() raises ValueError if this is not the
77 # main thread. By calling it early we ensure that an
78 # event loop running in another thread cannot add a signal
79 # handler.
80 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020081 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082 raise RuntimeError(str(exc))
83
Yury Selivanov569efa22014-02-18 18:02:19 -050084 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 self._signal_handlers[sig] = handle
86
87 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020088 # Register a dummy signal handler to ask Python to write the signal
89 # number in the wakup file descriptor. _process_self_data() will
90 # read signal numbers from this file descriptor to handle signals.
91 signal.signal(sig, _sighandler_noop)
92
Charles-François Natali74e7cf32013-12-05 22:47:19 +010093 # Set SA_RESTART to limit EINTR occurrences.
94 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 except OSError as exc:
96 del self._signal_handlers[sig]
97 if not self._signal_handlers:
98 try:
99 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200100 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700101 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102
103 if exc.errno == errno.EINVAL:
104 raise RuntimeError('sig {} cannot be caught'.format(sig))
105 else:
106 raise
107
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200108 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 """Internal helper that is the actual signal handler."""
110 handle = self._signal_handlers.get(sig)
111 if handle is None:
112 return # Assume it's some race condition.
113 if handle._cancelled:
114 self.remove_signal_handler(sig) # Remove it properly.
115 else:
116 self._add_callback_signalsafe(handle)
117
118 def remove_signal_handler(self, sig):
119 """Remove a handler for a signal. UNIX only.
120
121 Return True if a signal handler was removed, False if not.
122 """
123 self._check_signal(sig)
124 try:
125 del self._signal_handlers[sig]
126 except KeyError:
127 return False
128
129 if sig == signal.SIGINT:
130 handler = signal.default_int_handler
131 else:
132 handler = signal.SIG_DFL
133
134 try:
135 signal.signal(sig, handler)
136 except OSError as exc:
137 if exc.errno == errno.EINVAL:
138 raise RuntimeError('sig {} cannot be caught'.format(sig))
139 else:
140 raise
141
142 if not self._signal_handlers:
143 try:
144 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200145 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700146 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147
148 return True
149
150 def _check_signal(self, sig):
151 """Internal helper to validate a signal.
152
153 Raise ValueError if the signal number is invalid or uncatchable.
154 Raise RuntimeError if there is a problem setting up the handler.
155 """
156 if not isinstance(sig, int):
157 raise TypeError('sig must be an int, not {!r}'.format(sig))
158
159 if not (1 <= sig < signal.NSIG):
160 raise ValueError(
161 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
162
163 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
164 extra=None):
165 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
166
167 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
170
Victor Stinnerf951d282014-06-29 00:46:45 +0200171 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 def _make_subprocess_transport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
174 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 with events.get_child_watcher() as watcher:
176 transp = _UnixSubprocessTransport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800178 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800179 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 watcher.add_child_handler(transp.get_pid(),
181 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800182
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 return transp
184
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 def _child_watcher_callback(self, pid, returncode, transp):
186 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187
Victor Stinnerf951d282014-06-29 00:46:45 +0200188 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500189 def create_unix_connection(self, protocol_factory, path, *,
190 ssl=None, sock=None,
191 server_hostname=None):
192 assert server_hostname is None or isinstance(server_hostname, str)
193 if ssl:
194 if server_hostname is None:
195 raise ValueError(
196 'you have to pass server_hostname when using ssl')
197 else:
198 if server_hostname is not None:
199 raise ValueError('server_hostname is only meaningful with ssl')
200
201 if path is not None:
202 if sock is not None:
203 raise ValueError(
204 'path and sock can not be specified at the same time')
205
Victor Stinner79a29522014-02-19 01:45:59 +0100206 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500207 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 sock.setblocking(False)
209 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100210 except:
211 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500212 raise
213
214 else:
215 if sock is None:
216 raise ValueError('no path and sock were specified')
217 sock.setblocking(False)
218
219 transport, protocol = yield from self._create_connection_transport(
220 sock, protocol_factory, ssl, server_hostname)
221 return transport, protocol
222
Victor Stinnerf951d282014-06-29 00:46:45 +0200223 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500224 def create_unix_server(self, protocol_factory, path=None, *,
225 sock=None, backlog=100, ssl=None):
226 if isinstance(ssl, bool):
227 raise TypeError('ssl argument must be an SSLContext or None')
228
229 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200230 if sock is not None:
231 raise ValueError(
232 'path and sock can not be specified at the same time')
233
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
235
236 try:
237 sock.bind(path)
238 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100239 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500240 if exc.errno == errno.EADDRINUSE:
241 # Let's improve the error message by adding
242 # with what exact address it occurs.
243 msg = 'Address {!r} is already in use'.format(path)
244 raise OSError(errno.EADDRINUSE, msg) from None
245 else:
246 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200247 except:
248 sock.close()
249 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500250 else:
251 if sock is None:
252 raise ValueError(
253 'path was not specified, and no sock specified')
254
255 if sock.family != socket.AF_UNIX:
256 raise ValueError(
257 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
258
259 server = base_events.Server(self, [sock])
260 sock.listen(backlog)
261 sock.setblocking(False)
262 self._start_serving(protocol_factory, sock, ssl, server)
263 return server
264
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200266if hasattr(os, 'set_blocking'):
267 def _set_nonblocking(fd):
268 os.set_blocking(fd, False)
269else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400270 import fcntl
271
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200272 def _set_nonblocking(fd):
273 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
274 flags = flags | os.O_NONBLOCK
275 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277
278class _UnixReadPipeTransport(transports.ReadTransport):
279
Yury Selivanovdec1a452014-02-18 22:27:48 -0500280 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281
282 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
283 super().__init__(extra)
284 self._extra['pipe'] = pipe
285 self._loop = loop
286 self._pipe = pipe
287 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700288 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800289 if not (stat.S_ISFIFO(mode) or
290 stat.S_ISSOCK(mode) or
291 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700292 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 _set_nonblocking(self._fileno)
294 self._protocol = protocol
295 self._closing = False
296 self._loop.add_reader(self._fileno, self._read_ready)
297 self._loop.call_soon(self._protocol.connection_made, self)
298 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200299 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200300 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
Victor Stinnere912e652014-07-12 03:11:53 +0200302 def __repr__(self):
303 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
304 if self._pipe is not None:
305 polling = selector_events._test_selector_event(
306 self._loop._selector,
307 self._fileno, selectors.EVENT_READ)
308 if polling:
309 info.append('polling')
310 else:
311 info.append('idle')
312 else:
313 info.append('closed')
314 return '<%s>' % ' '.join(info)
315
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 def _read_ready(self):
317 try:
318 data = os.read(self._fileno, self.max_size)
319 except (BlockingIOError, InterruptedError):
320 pass
321 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100322 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 else:
324 if data:
325 self._protocol.data_received(data)
326 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200327 if self._loop.get_debug():
328 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self._closing = True
330 self._loop.remove_reader(self._fileno)
331 self._loop.call_soon(self._protocol.eof_received)
332 self._loop.call_soon(self._call_connection_lost, None)
333
Guido van Rossum57497ad2013-10-18 07:58:20 -0700334 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 self._loop.remove_reader(self._fileno)
336
Guido van Rossum57497ad2013-10-18 07:58:20 -0700337 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 self._loop.add_reader(self._fileno, self._read_ready)
339
340 def close(self):
341 if not self._closing:
342 self._close(None)
343
Victor Stinner0ee29c22014-02-19 01:40:41 +0100344 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200346 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
347 if self._loop.get_debug():
348 logger.debug("%r: %s", self, message, exc_info=True)
349 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500350 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100351 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500352 'exception': exc,
353 'transport': self,
354 'protocol': self._protocol,
355 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 self._close(exc)
357
358 def _close(self, exc):
359 self._closing = True
360 self._loop.remove_reader(self._fileno)
361 self._loop.call_soon(self._call_connection_lost, exc)
362
363 def _call_connection_lost(self, exc):
364 try:
365 self._protocol.connection_lost(exc)
366 finally:
367 self._pipe.close()
368 self._pipe = None
369 self._protocol = None
370 self._loop = None
371
372
Yury Selivanov3cb99142014-02-18 18:41:13 -0500373class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800374 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375
376 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100377 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 self._pipe = pipe
380 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700381 mode = os.fstat(self._fileno).st_mode
382 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100383 if not (is_socket or
384 stat.S_ISFIFO(mode) or
385 stat.S_ISCHR(mode)):
386 raise ValueError("Pipe transport is only for "
387 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 _set_nonblocking(self._fileno)
389 self._protocol = protocol
390 self._buffer = []
391 self._conn_lost = 0
392 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700393
394 # On AIX, the reader trick only works for sockets.
395 # On other platforms it works for pipes and sockets.
396 # (Exception: OS X 10.4? Issue #19294.)
397 if is_socket or not sys.platform.startswith("aix"):
398 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
400 self._loop.call_soon(self._protocol.connection_made, self)
401 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200402 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200403 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Victor Stinnere912e652014-07-12 03:11:53 +0200405 def __repr__(self):
406 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
407 if self._pipe is not None:
408 polling = selector_events._test_selector_event(
409 self._loop._selector,
410 self._fileno, selectors.EVENT_WRITE)
411 if polling:
412 info.append('polling')
413 else:
414 info.append('idle')
415
416 bufsize = self.get_write_buffer_size()
417 info.append('bufsize=%s' % bufsize)
418 else:
419 info.append('closed')
420 return '<%s>' % ' '.join(info)
421
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800422 def get_write_buffer_size(self):
423 return sum(len(data) for data in self._buffer)
424
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700426 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200427 if self._loop.get_debug():
428 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100429 if self._buffer:
430 self._close(BrokenPipeError())
431 else:
432 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
434 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800435 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
436 if isinstance(data, bytearray):
437 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 if not data:
439 return
440
441 if self._conn_lost or self._closing:
442 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700443 logger.warning('pipe closed by peer or '
444 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 self._conn_lost += 1
446 return
447
448 if not self._buffer:
449 # Attempt to send it right away first.
450 try:
451 n = os.write(self._fileno, data)
452 except (BlockingIOError, InterruptedError):
453 n = 0
454 except Exception as exc:
455 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100456 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 return
458 if n == len(data):
459 return
460 elif n > 0:
461 data = data[n:]
462 self._loop.add_writer(self._fileno, self._write_ready)
463
464 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800465 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
467 def _write_ready(self):
468 data = b''.join(self._buffer)
469 assert data, 'Data should not be empty'
470
471 self._buffer.clear()
472 try:
473 n = os.write(self._fileno, data)
474 except (BlockingIOError, InterruptedError):
475 self._buffer.append(data)
476 except Exception as exc:
477 self._conn_lost += 1
478 # Remove writer here, _fatal_error() doesn't it
479 # because _buffer is empty.
480 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100481 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 else:
483 if n == len(data):
484 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800485 self._maybe_resume_protocol() # May append to buffer.
486 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 self._loop.remove_reader(self._fileno)
488 self._call_connection_lost(None)
489 return
490 elif n > 0:
491 data = data[n:]
492
493 self._buffer.append(data) # Try again later.
494
495 def can_write_eof(self):
496 return True
497
498 # TODO: Make the relationships between write_eof(), close(),
499 # abort(), _fatal_error() and _close() more straightforward.
500
501 def write_eof(self):
502 if self._closing:
503 return
504 assert self._pipe
505 self._closing = True
506 if not self._buffer:
507 self._loop.remove_reader(self._fileno)
508 self._loop.call_soon(self._call_connection_lost, None)
509
510 def close(self):
511 if not self._closing:
512 # write_eof is all what we needed to close the write pipe
513 self.write_eof()
514
515 def abort(self):
516 self._close(None)
517
Victor Stinner0ee29c22014-02-19 01:40:41 +0100518 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200520 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
521 if self._loop.get_debug():
522 logger.debug("%r: %s", self, message, exc_info=True)
523 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500524 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100525 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500526 'exception': exc,
527 'transport': self,
528 'protocol': self._protocol,
529 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 self._close(exc)
531
532 def _close(self, exc=None):
533 self._closing = True
534 if self._buffer:
535 self._loop.remove_writer(self._fileno)
536 self._buffer.clear()
537 self._loop.remove_reader(self._fileno)
538 self._loop.call_soon(self._call_connection_lost, exc)
539
540 def _call_connection_lost(self, exc):
541 try:
542 self._protocol.connection_lost(exc)
543 finally:
544 self._pipe.close()
545 self._pipe = None
546 self._protocol = None
547 self._loop = None
548
549
Guido van Rossum59691282013-10-30 14:52:03 -0700550class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700551
Guido van Rossum59691282013-10-30 14:52:03 -0700552 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700553 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700555 # Use a socket pair for stdin, since not all platforms
556 # support selecting read events on the write end of a
557 # socket (which we use in order to detect closing of the
558 # other end). Notably this is needed on AIX, and works
559 # just fine on other platforms.
560 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 self._proc = subprocess.Popen(
562 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
563 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700564 if stdin_w is not None:
565 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200566 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800567
568
569class AbstractChildWatcher:
570 """Abstract base class for monitoring child processes.
571
572 Objects derived from this class monitor a collection of subprocesses and
573 report their termination or interruption by a signal.
574
575 New callbacks are registered with .add_child_handler(). Starting a new
576 process must be done within a 'with' block to allow the watcher to suspend
577 its activity until the new process if fully registered (this is needed to
578 prevent a race condition in some implementations).
579
580 Example:
581 with watcher:
582 proc = subprocess.Popen("sleep 1")
583 watcher.add_child_handler(proc.pid, callback)
584
585 Notes:
586 Implementations of this class must be thread-safe.
587
588 Since child watcher objects may catch the SIGCHLD signal and call
589 waitpid(-1), there should be only one active object per process.
590 """
591
592 def add_child_handler(self, pid, callback, *args):
593 """Register a new child handler.
594
595 Arrange for callback(pid, returncode, *args) to be called when
596 process 'pid' terminates. Specifying another callback for the same
597 process replaces the previous handler.
598
Victor Stinneracdb7822014-07-14 18:33:40 +0200599 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800600 """
601 raise NotImplementedError()
602
603 def remove_child_handler(self, pid):
604 """Removes the handler for process 'pid'.
605
606 The function returns True if the handler was successfully removed,
607 False if there was nothing to remove."""
608
609 raise NotImplementedError()
610
Guido van Rossum2bcae702013-11-13 15:50:08 -0800611 def attach_loop(self, loop):
612 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800613
Guido van Rossum2bcae702013-11-13 15:50:08 -0800614 If the watcher was previously attached to an event loop, then it is
615 first detached before attaching to the new loop.
616
617 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800618 """
619 raise NotImplementedError()
620
621 def close(self):
622 """Close the watcher.
623
624 This must be called to make sure that any underlying resource is freed.
625 """
626 raise NotImplementedError()
627
628 def __enter__(self):
629 """Enter the watcher's context and allow starting new processes
630
631 This function must return self"""
632 raise NotImplementedError()
633
634 def __exit__(self, a, b, c):
635 """Exit the watcher's context"""
636 raise NotImplementedError()
637
638
639class BaseChildWatcher(AbstractChildWatcher):
640
Guido van Rossum2bcae702013-11-13 15:50:08 -0800641 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800642 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800643
644 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800645 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800646
647 def _do_waitpid(self, expected_pid):
648 raise NotImplementedError()
649
650 def _do_waitpid_all(self):
651 raise NotImplementedError()
652
Guido van Rossum2bcae702013-11-13 15:50:08 -0800653 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800654 assert loop is None or isinstance(loop, events.AbstractEventLoop)
655
656 if self._loop is not None:
657 self._loop.remove_signal_handler(signal.SIGCHLD)
658
659 self._loop = loop
660 if loop is not None:
661 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
662
663 # Prevent a race condition in case a child terminated
664 # during the switch.
665 self._do_waitpid_all()
666
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800667 def _sig_chld(self):
668 try:
669 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500670 except Exception as exc:
671 # self._loop should always be available here
672 # as '_sig_chld' is added as a signal handler
673 # in 'attach_loop'
674 self._loop.call_exception_handler({
675 'message': 'Unknown exception in SIGCHLD handler',
676 'exception': exc,
677 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800678
679 def _compute_returncode(self, status):
680 if os.WIFSIGNALED(status):
681 # The child process died because of a signal.
682 return -os.WTERMSIG(status)
683 elif os.WIFEXITED(status):
684 # The child process exited (e.g sys.exit()).
685 return os.WEXITSTATUS(status)
686 else:
687 # The child exited, but we don't understand its status.
688 # This shouldn't happen, but if it does, let's just
689 # return that status; perhaps that helps debug it.
690 return status
691
692
693class SafeChildWatcher(BaseChildWatcher):
694 """'Safe' child watcher implementation.
695
696 This implementation avoids disrupting other code spawning processes by
697 polling explicitly each process in the SIGCHLD handler instead of calling
698 os.waitpid(-1).
699
700 This is a safe solution but it has a significant overhead when handling a
701 big number of children (O(n) each time SIGCHLD is raised)
702 """
703
Guido van Rossum2bcae702013-11-13 15:50:08 -0800704 def __init__(self):
705 super().__init__()
706 self._callbacks = {}
707
708 def close(self):
709 self._callbacks.clear()
710 super().close()
711
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800712 def __enter__(self):
713 return self
714
715 def __exit__(self, a, b, c):
716 pass
717
718 def add_child_handler(self, pid, callback, *args):
719 self._callbacks[pid] = callback, args
720
721 # Prevent a race condition in case the child is already terminated.
722 self._do_waitpid(pid)
723
Guido van Rossum2bcae702013-11-13 15:50:08 -0800724 def remove_child_handler(self, pid):
725 try:
726 del self._callbacks[pid]
727 return True
728 except KeyError:
729 return False
730
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800731 def _do_waitpid_all(self):
732
733 for pid in list(self._callbacks):
734 self._do_waitpid(pid)
735
736 def _do_waitpid(self, expected_pid):
737 assert expected_pid > 0
738
739 try:
740 pid, status = os.waitpid(expected_pid, os.WNOHANG)
741 except ChildProcessError:
742 # The child process is already reaped
743 # (may happen if waitpid() is called elsewhere).
744 pid = expected_pid
745 returncode = 255
746 logger.warning(
747 "Unknown child process pid %d, will report returncode 255",
748 pid)
749 else:
750 if pid == 0:
751 # The child process is still alive.
752 return
753
754 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200755 if self._loop.get_debug():
756 logger.debug('process %s exited with returncode %s',
757 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800758
759 try:
760 callback, args = self._callbacks.pop(pid)
761 except KeyError: # pragma: no cover
762 # May happen if .remove_child_handler() is called
763 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200764 if self._loop.get_debug():
765 logger.warning("Child watcher got an unexpected pid: %r",
766 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800767 else:
768 callback(pid, returncode, *args)
769
770
771class FastChildWatcher(BaseChildWatcher):
772 """'Fast' child watcher implementation.
773
774 This implementation reaps every terminated processes by calling
775 os.waitpid(-1) directly, possibly breaking other code spawning processes
776 and waiting for their termination.
777
778 There is no noticeable overhead when handling a big number of children
779 (O(1) each time a child terminates).
780 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800781 def __init__(self):
782 super().__init__()
783 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800784 self._lock = threading.Lock()
785 self._zombies = {}
786 self._forks = 0
787
788 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800789 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800790 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800791 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800792
793 def __enter__(self):
794 with self._lock:
795 self._forks += 1
796
797 return self
798
799 def __exit__(self, a, b, c):
800 with self._lock:
801 self._forks -= 1
802
803 if self._forks or not self._zombies:
804 return
805
806 collateral_victims = str(self._zombies)
807 self._zombies.clear()
808
809 logger.warning(
810 "Caught subprocesses termination from unknown pids: %s",
811 collateral_victims)
812
813 def add_child_handler(self, pid, callback, *args):
814 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800815 with self._lock:
816 try:
817 returncode = self._zombies.pop(pid)
818 except KeyError:
819 # The child is running.
820 self._callbacks[pid] = callback, args
821 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800822
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800823 # The child is dead already. We can fire the callback.
824 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800825
Guido van Rossum2bcae702013-11-13 15:50:08 -0800826 def remove_child_handler(self, pid):
827 try:
828 del self._callbacks[pid]
829 return True
830 except KeyError:
831 return False
832
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800833 def _do_waitpid_all(self):
834 # Because of signal coalescing, we must keep calling waitpid() as
835 # long as we're able to reap a child.
836 while True:
837 try:
838 pid, status = os.waitpid(-1, os.WNOHANG)
839 except ChildProcessError:
840 # No more child processes exist.
841 return
842 else:
843 if pid == 0:
844 # A child process is still alive.
845 return
846
847 returncode = self._compute_returncode(status)
848
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800849 with self._lock:
850 try:
851 callback, args = self._callbacks.pop(pid)
852 except KeyError:
853 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800854 if self._forks:
855 # It may not be registered yet.
856 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200857 if self._loop.get_debug():
858 logger.debug('unknown process %s exited '
859 'with returncode %s',
860 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800861 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800862 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200863 else:
864 if self._loop.get_debug():
865 logger.debug('process %s exited with returncode %s',
866 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800867
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800868 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800869 logger.warning(
870 "Caught subprocess termination from unknown pid: "
871 "%d -> %d", pid, returncode)
872 else:
873 callback(pid, returncode, *args)
874
875
876class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
877 """XXX"""
878 _loop_factory = _UnixSelectorEventLoop
879
880 def __init__(self):
881 super().__init__()
882 self._watcher = None
883
884 def _init_watcher(self):
885 with events._lock:
886 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800887 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800888 if isinstance(threading.current_thread(),
889 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800890 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891
892 def set_event_loop(self, loop):
893 """Set the event loop.
894
895 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800896 .set_event_loop() from the main thread will call .attach_loop(loop) on
897 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800898 """
899
900 super().set_event_loop(loop)
901
902 if self._watcher is not None and \
903 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800904 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800905
906 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200907 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800908
909 If not yet set, a SafeChildWatcher object is automatically created.
910 """
911 if self._watcher is None:
912 self._init_watcher()
913
914 return self._watcher
915
916 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200917 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800918
919 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
920
921 if self._watcher is not None:
922 self._watcher.close()
923
924 self._watcher = watcher
925
926SelectorEventLoop = _UnixSelectorEventLoop
927DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy