blob: 93c8c1c8197d95e45050a03b15a0705ec087f1f3 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020018from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Victor Stinnerfe5649c2014-07-17 22:43:40 +020033def _sighandler_noop(signum, frame):
34 """Dummy signal handler."""
35 pass
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050039 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
Yury Selivanovb057c522014-02-18 12:15:06 -050041 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020052 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 for sig in list(self._signal_handlers):
54 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055
Victor Stinnerfe5649c2014-07-17 22:43:40 +020056 def _process_self_data(self, data):
57 for signum in data:
58 if not signum:
59 # ignore null bytes written by _write_to_self()
60 continue
61 self._handle_signal(signum)
62
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063 def add_signal_handler(self, sig, callback, *args):
64 """Add a handler for a signal. UNIX only.
65
66 Raise ValueError if the signal number is invalid or uncatchable.
67 Raise RuntimeError if there is a problem setting up the handler.
68 """
69 self._check_signal(sig)
70 try:
71 # set_wakeup_fd() raises ValueError if this is not the
72 # main thread. By calling it early we ensure that an
73 # event loop running in another thread cannot add a signal
74 # handler.
75 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020076 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 raise RuntimeError(str(exc))
78
Yury Selivanov569efa22014-02-18 18:02:19 -050079 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 self._signal_handlers[sig] = handle
81
82 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020083 # Register a dummy signal handler to ask Python to write the signal
84 # number in the wakup file descriptor. _process_self_data() will
85 # read signal numbers from this file descriptor to handle signals.
86 signal.signal(sig, _sighandler_noop)
87
Charles-François Natali74e7cf32013-12-05 22:47:19 +010088 # Set SA_RESTART to limit EINTR occurrences.
89 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 except OSError as exc:
91 del self._signal_handlers[sig]
92 if not self._signal_handlers:
93 try:
94 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020095 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070096 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097
98 if exc.errno == errno.EINVAL:
99 raise RuntimeError('sig {} cannot be caught'.format(sig))
100 else:
101 raise
102
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200103 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104 """Internal helper that is the actual signal handler."""
105 handle = self._signal_handlers.get(sig)
106 if handle is None:
107 return # Assume it's some race condition.
108 if handle._cancelled:
109 self.remove_signal_handler(sig) # Remove it properly.
110 else:
111 self._add_callback_signalsafe(handle)
112
113 def remove_signal_handler(self, sig):
114 """Remove a handler for a signal. UNIX only.
115
116 Return True if a signal handler was removed, False if not.
117 """
118 self._check_signal(sig)
119 try:
120 del self._signal_handlers[sig]
121 except KeyError:
122 return False
123
124 if sig == signal.SIGINT:
125 handler = signal.default_int_handler
126 else:
127 handler = signal.SIG_DFL
128
129 try:
130 signal.signal(sig, handler)
131 except OSError as exc:
132 if exc.errno == errno.EINVAL:
133 raise RuntimeError('sig {} cannot be caught'.format(sig))
134 else:
135 raise
136
137 if not self._signal_handlers:
138 try:
139 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200140 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700141 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 return True
144
145 def _check_signal(self, sig):
146 """Internal helper to validate a signal.
147
148 Raise ValueError if the signal number is invalid or uncatchable.
149 Raise RuntimeError if there is a problem setting up the handler.
150 """
151 if not isinstance(sig, int):
152 raise TypeError('sig must be an int, not {!r}'.format(sig))
153
154 if not (1 <= sig < signal.NSIG):
155 raise ValueError(
156 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
157
158 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
159 extra=None):
160 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
161
162 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
163 extra=None):
164 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
165
Victor Stinnerf951d282014-06-29 00:46:45 +0200166 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 def _make_subprocess_transport(self, protocol, args, shell,
168 stdin, stdout, stderr, bufsize,
169 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800170 with events.get_child_watcher() as watcher:
171 transp = _UnixSubprocessTransport(self, protocol, args, shell,
172 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800173 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800174 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 watcher.add_child_handler(transp.get_pid(),
176 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800177
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 return transp
179
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 def _child_watcher_callback(self, pid, returncode, transp):
181 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182
Victor Stinnerf951d282014-06-29 00:46:45 +0200183 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500184 def create_unix_connection(self, protocol_factory, path, *,
185 ssl=None, sock=None,
186 server_hostname=None):
187 assert server_hostname is None or isinstance(server_hostname, str)
188 if ssl:
189 if server_hostname is None:
190 raise ValueError(
191 'you have to pass server_hostname when using ssl')
192 else:
193 if server_hostname is not None:
194 raise ValueError('server_hostname is only meaningful with ssl')
195
196 if path is not None:
197 if sock is not None:
198 raise ValueError(
199 'path and sock can not be specified at the same time')
200
Victor Stinner79a29522014-02-19 01:45:59 +0100201 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500202 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500203 sock.setblocking(False)
204 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100205 except:
206 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500207 raise
208
209 else:
210 if sock is None:
211 raise ValueError('no path and sock were specified')
212 sock.setblocking(False)
213
214 transport, protocol = yield from self._create_connection_transport(
215 sock, protocol_factory, ssl, server_hostname)
216 return transport, protocol
217
Victor Stinnerf951d282014-06-29 00:46:45 +0200218 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 def create_unix_server(self, protocol_factory, path=None, *,
220 sock=None, backlog=100, ssl=None):
221 if isinstance(ssl, bool):
222 raise TypeError('ssl argument must be an SSLContext or None')
223
224 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
230
231 try:
232 sock.bind(path)
233 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100234 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 if exc.errno == errno.EADDRINUSE:
236 # Let's improve the error message by adding
237 # with what exact address it occurs.
238 msg = 'Address {!r} is already in use'.format(path)
239 raise OSError(errno.EADDRINUSE, msg) from None
240 else:
241 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200242 except:
243 sock.close()
244 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 else:
246 if sock is None:
247 raise ValueError(
248 'path was not specified, and no sock specified')
249
250 if sock.family != socket.AF_UNIX:
251 raise ValueError(
252 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
253
254 server = base_events.Server(self, [sock])
255 sock.listen(backlog)
256 sock.setblocking(False)
257 self._start_serving(protocol_factory, sock, ssl, server)
258 return server
259
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200261if hasattr(os, 'set_blocking'):
262 def _set_nonblocking(fd):
263 os.set_blocking(fd, False)
264else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400265 import fcntl
266
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200267 def _set_nonblocking(fd):
268 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
269 flags = flags | os.O_NONBLOCK
270 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271
272
273class _UnixReadPipeTransport(transports.ReadTransport):
274
Yury Selivanovdec1a452014-02-18 22:27:48 -0500275 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
278 super().__init__(extra)
279 self._extra['pipe'] = pipe
280 self._loop = loop
281 self._pipe = pipe
282 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700283 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800284 if not (stat.S_ISFIFO(mode) or
285 stat.S_ISSOCK(mode) or
286 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700287 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 _set_nonblocking(self._fileno)
289 self._protocol = protocol
290 self._closing = False
291 self._loop.add_reader(self._fileno, self._read_ready)
292 self._loop.call_soon(self._protocol.connection_made, self)
293 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200294 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200295 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296
Victor Stinnere912e652014-07-12 03:11:53 +0200297 def __repr__(self):
298 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
299 if self._pipe is not None:
300 polling = selector_events._test_selector_event(
301 self._loop._selector,
302 self._fileno, selectors.EVENT_READ)
303 if polling:
304 info.append('polling')
305 else:
306 info.append('idle')
307 else:
308 info.append('closed')
309 return '<%s>' % ' '.join(info)
310
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 def _read_ready(self):
312 try:
313 data = os.read(self._fileno, self.max_size)
314 except (BlockingIOError, InterruptedError):
315 pass
316 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100317 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 else:
319 if data:
320 self._protocol.data_received(data)
321 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200322 if self._loop.get_debug():
323 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 self._closing = True
325 self._loop.remove_reader(self._fileno)
326 self._loop.call_soon(self._protocol.eof_received)
327 self._loop.call_soon(self._call_connection_lost, None)
328
Guido van Rossum57497ad2013-10-18 07:58:20 -0700329 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 self._loop.remove_reader(self._fileno)
331
Guido van Rossum57497ad2013-10-18 07:58:20 -0700332 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 self._loop.add_reader(self._fileno, self._read_ready)
334
335 def close(self):
336 if not self._closing:
337 self._close(None)
338
Victor Stinner0ee29c22014-02-19 01:40:41 +0100339 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200341 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
342 if self._loop.get_debug():
343 logger.debug("%r: %s", self, message, exc_info=True)
344 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500345 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100346 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500347 'exception': exc,
348 'transport': self,
349 'protocol': self._protocol,
350 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 self._close(exc)
352
353 def _close(self, exc):
354 self._closing = True
355 self._loop.remove_reader(self._fileno)
356 self._loop.call_soon(self._call_connection_lost, exc)
357
358 def _call_connection_lost(self, exc):
359 try:
360 self._protocol.connection_lost(exc)
361 finally:
362 self._pipe.close()
363 self._pipe = None
364 self._protocol = None
365 self._loop = None
366
367
Yury Selivanov3cb99142014-02-18 18:41:13 -0500368class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800369 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
371 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
372 super().__init__(extra)
373 self._extra['pipe'] = pipe
374 self._loop = loop
375 self._pipe = pipe
376 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700377 mode = os.fstat(self._fileno).st_mode
378 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100379 if not (is_socket or
380 stat.S_ISFIFO(mode) or
381 stat.S_ISCHR(mode)):
382 raise ValueError("Pipe transport is only for "
383 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 _set_nonblocking(self._fileno)
385 self._protocol = protocol
386 self._buffer = []
387 self._conn_lost = 0
388 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700389
390 # On AIX, the reader trick only works for sockets.
391 # On other platforms it works for pipes and sockets.
392 # (Exception: OS X 10.4? Issue #19294.)
393 if is_socket or not sys.platform.startswith("aix"):
394 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
396 self._loop.call_soon(self._protocol.connection_made, self)
397 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200398 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200399 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Victor Stinnere912e652014-07-12 03:11:53 +0200401 def __repr__(self):
402 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
403 if self._pipe is not None:
404 polling = selector_events._test_selector_event(
405 self._loop._selector,
406 self._fileno, selectors.EVENT_WRITE)
407 if polling:
408 info.append('polling')
409 else:
410 info.append('idle')
411
412 bufsize = self.get_write_buffer_size()
413 info.append('bufsize=%s' % bufsize)
414 else:
415 info.append('closed')
416 return '<%s>' % ' '.join(info)
417
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800418 def get_write_buffer_size(self):
419 return sum(len(data) for data in self._buffer)
420
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700422 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200423 if self._loop.get_debug():
424 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100425 if self._buffer:
426 self._close(BrokenPipeError())
427 else:
428 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
430 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800431 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
432 if isinstance(data, bytearray):
433 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 if not data:
435 return
436
437 if self._conn_lost or self._closing:
438 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700439 logger.warning('pipe closed by peer or '
440 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 self._conn_lost += 1
442 return
443
444 if not self._buffer:
445 # Attempt to send it right away first.
446 try:
447 n = os.write(self._fileno, data)
448 except (BlockingIOError, InterruptedError):
449 n = 0
450 except Exception as exc:
451 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100452 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 return
454 if n == len(data):
455 return
456 elif n > 0:
457 data = data[n:]
458 self._loop.add_writer(self._fileno, self._write_ready)
459
460 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800461 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
463 def _write_ready(self):
464 data = b''.join(self._buffer)
465 assert data, 'Data should not be empty'
466
467 self._buffer.clear()
468 try:
469 n = os.write(self._fileno, data)
470 except (BlockingIOError, InterruptedError):
471 self._buffer.append(data)
472 except Exception as exc:
473 self._conn_lost += 1
474 # Remove writer here, _fatal_error() doesn't it
475 # because _buffer is empty.
476 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100477 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478 else:
479 if n == len(data):
480 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800481 self._maybe_resume_protocol() # May append to buffer.
482 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 self._loop.remove_reader(self._fileno)
484 self._call_connection_lost(None)
485 return
486 elif n > 0:
487 data = data[n:]
488
489 self._buffer.append(data) # Try again later.
490
491 def can_write_eof(self):
492 return True
493
494 # TODO: Make the relationships between write_eof(), close(),
495 # abort(), _fatal_error() and _close() more straightforward.
496
497 def write_eof(self):
498 if self._closing:
499 return
500 assert self._pipe
501 self._closing = True
502 if not self._buffer:
503 self._loop.remove_reader(self._fileno)
504 self._loop.call_soon(self._call_connection_lost, None)
505
506 def close(self):
507 if not self._closing:
508 # write_eof is all what we needed to close the write pipe
509 self.write_eof()
510
511 def abort(self):
512 self._close(None)
513
Victor Stinner0ee29c22014-02-19 01:40:41 +0100514 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200516 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
517 if self._loop.get_debug():
518 logger.debug("%r: %s", self, message, exc_info=True)
519 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500520 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100521 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500522 'exception': exc,
523 'transport': self,
524 'protocol': self._protocol,
525 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 self._close(exc)
527
528 def _close(self, exc=None):
529 self._closing = True
530 if self._buffer:
531 self._loop.remove_writer(self._fileno)
532 self._buffer.clear()
533 self._loop.remove_reader(self._fileno)
534 self._loop.call_soon(self._call_connection_lost, exc)
535
536 def _call_connection_lost(self, exc):
537 try:
538 self._protocol.connection_lost(exc)
539 finally:
540 self._pipe.close()
541 self._pipe = None
542 self._protocol = None
543 self._loop = None
544
545
Guido van Rossum59691282013-10-30 14:52:03 -0700546class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547
Guido van Rossum59691282013-10-30 14:52:03 -0700548 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700549 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700551 # Use a socket pair for stdin, since not all platforms
552 # support selecting read events on the write end of a
553 # socket (which we use in order to detect closing of the
554 # other end). Notably this is needed on AIX, and works
555 # just fine on other platforms.
556 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 self._proc = subprocess.Popen(
558 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
559 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700560 if stdin_w is not None:
561 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200562 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800563
564
565class AbstractChildWatcher:
566 """Abstract base class for monitoring child processes.
567
568 Objects derived from this class monitor a collection of subprocesses and
569 report their termination or interruption by a signal.
570
571 New callbacks are registered with .add_child_handler(). Starting a new
572 process must be done within a 'with' block to allow the watcher to suspend
573 its activity until the new process if fully registered (this is needed to
574 prevent a race condition in some implementations).
575
576 Example:
577 with watcher:
578 proc = subprocess.Popen("sleep 1")
579 watcher.add_child_handler(proc.pid, callback)
580
581 Notes:
582 Implementations of this class must be thread-safe.
583
584 Since child watcher objects may catch the SIGCHLD signal and call
585 waitpid(-1), there should be only one active object per process.
586 """
587
588 def add_child_handler(self, pid, callback, *args):
589 """Register a new child handler.
590
591 Arrange for callback(pid, returncode, *args) to be called when
592 process 'pid' terminates. Specifying another callback for the same
593 process replaces the previous handler.
594
Victor Stinneracdb7822014-07-14 18:33:40 +0200595 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800596 """
597 raise NotImplementedError()
598
599 def remove_child_handler(self, pid):
600 """Removes the handler for process 'pid'.
601
602 The function returns True if the handler was successfully removed,
603 False if there was nothing to remove."""
604
605 raise NotImplementedError()
606
Guido van Rossum2bcae702013-11-13 15:50:08 -0800607 def attach_loop(self, loop):
608 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800609
Guido van Rossum2bcae702013-11-13 15:50:08 -0800610 If the watcher was previously attached to an event loop, then it is
611 first detached before attaching to the new loop.
612
613 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800614 """
615 raise NotImplementedError()
616
617 def close(self):
618 """Close the watcher.
619
620 This must be called to make sure that any underlying resource is freed.
621 """
622 raise NotImplementedError()
623
624 def __enter__(self):
625 """Enter the watcher's context and allow starting new processes
626
627 This function must return self"""
628 raise NotImplementedError()
629
630 def __exit__(self, a, b, c):
631 """Exit the watcher's context"""
632 raise NotImplementedError()
633
634
635class BaseChildWatcher(AbstractChildWatcher):
636
Guido van Rossum2bcae702013-11-13 15:50:08 -0800637 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800638 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800639
640 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800641 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800642
643 def _do_waitpid(self, expected_pid):
644 raise NotImplementedError()
645
646 def _do_waitpid_all(self):
647 raise NotImplementedError()
648
Guido van Rossum2bcae702013-11-13 15:50:08 -0800649 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800650 assert loop is None or isinstance(loop, events.AbstractEventLoop)
651
652 if self._loop is not None:
653 self._loop.remove_signal_handler(signal.SIGCHLD)
654
655 self._loop = loop
656 if loop is not None:
657 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
658
659 # Prevent a race condition in case a child terminated
660 # during the switch.
661 self._do_waitpid_all()
662
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800663 def _sig_chld(self):
664 try:
665 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500666 except Exception as exc:
667 # self._loop should always be available here
668 # as '_sig_chld' is added as a signal handler
669 # in 'attach_loop'
670 self._loop.call_exception_handler({
671 'message': 'Unknown exception in SIGCHLD handler',
672 'exception': exc,
673 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800674
675 def _compute_returncode(self, status):
676 if os.WIFSIGNALED(status):
677 # The child process died because of a signal.
678 return -os.WTERMSIG(status)
679 elif os.WIFEXITED(status):
680 # The child process exited (e.g sys.exit()).
681 return os.WEXITSTATUS(status)
682 else:
683 # The child exited, but we don't understand its status.
684 # This shouldn't happen, but if it does, let's just
685 # return that status; perhaps that helps debug it.
686 return status
687
688
689class SafeChildWatcher(BaseChildWatcher):
690 """'Safe' child watcher implementation.
691
692 This implementation avoids disrupting other code spawning processes by
693 polling explicitly each process in the SIGCHLD handler instead of calling
694 os.waitpid(-1).
695
696 This is a safe solution but it has a significant overhead when handling a
697 big number of children (O(n) each time SIGCHLD is raised)
698 """
699
Guido van Rossum2bcae702013-11-13 15:50:08 -0800700 def __init__(self):
701 super().__init__()
702 self._callbacks = {}
703
704 def close(self):
705 self._callbacks.clear()
706 super().close()
707
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800708 def __enter__(self):
709 return self
710
711 def __exit__(self, a, b, c):
712 pass
713
714 def add_child_handler(self, pid, callback, *args):
715 self._callbacks[pid] = callback, args
716
717 # Prevent a race condition in case the child is already terminated.
718 self._do_waitpid(pid)
719
Guido van Rossum2bcae702013-11-13 15:50:08 -0800720 def remove_child_handler(self, pid):
721 try:
722 del self._callbacks[pid]
723 return True
724 except KeyError:
725 return False
726
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800727 def _do_waitpid_all(self):
728
729 for pid in list(self._callbacks):
730 self._do_waitpid(pid)
731
732 def _do_waitpid(self, expected_pid):
733 assert expected_pid > 0
734
735 try:
736 pid, status = os.waitpid(expected_pid, os.WNOHANG)
737 except ChildProcessError:
738 # The child process is already reaped
739 # (may happen if waitpid() is called elsewhere).
740 pid = expected_pid
741 returncode = 255
742 logger.warning(
743 "Unknown child process pid %d, will report returncode 255",
744 pid)
745 else:
746 if pid == 0:
747 # The child process is still alive.
748 return
749
750 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200751 if self._loop.get_debug():
752 logger.debug('process %s exited with returncode %s',
753 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800754
755 try:
756 callback, args = self._callbacks.pop(pid)
757 except KeyError: # pragma: no cover
758 # May happen if .remove_child_handler() is called
759 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200760 if self._loop.get_debug():
761 logger.warning("Child watcher got an unexpected pid: %r",
762 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800763 else:
764 callback(pid, returncode, *args)
765
766
767class FastChildWatcher(BaseChildWatcher):
768 """'Fast' child watcher implementation.
769
770 This implementation reaps every terminated processes by calling
771 os.waitpid(-1) directly, possibly breaking other code spawning processes
772 and waiting for their termination.
773
774 There is no noticeable overhead when handling a big number of children
775 (O(1) each time a child terminates).
776 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800777 def __init__(self):
778 super().__init__()
779 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800780 self._lock = threading.Lock()
781 self._zombies = {}
782 self._forks = 0
783
784 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800785 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800786 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800787 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800788
789 def __enter__(self):
790 with self._lock:
791 self._forks += 1
792
793 return self
794
795 def __exit__(self, a, b, c):
796 with self._lock:
797 self._forks -= 1
798
799 if self._forks or not self._zombies:
800 return
801
802 collateral_victims = str(self._zombies)
803 self._zombies.clear()
804
805 logger.warning(
806 "Caught subprocesses termination from unknown pids: %s",
807 collateral_victims)
808
809 def add_child_handler(self, pid, callback, *args):
810 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800811 with self._lock:
812 try:
813 returncode = self._zombies.pop(pid)
814 except KeyError:
815 # The child is running.
816 self._callbacks[pid] = callback, args
817 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800818
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800819 # The child is dead already. We can fire the callback.
820 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800821
Guido van Rossum2bcae702013-11-13 15:50:08 -0800822 def remove_child_handler(self, pid):
823 try:
824 del self._callbacks[pid]
825 return True
826 except KeyError:
827 return False
828
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800829 def _do_waitpid_all(self):
830 # Because of signal coalescing, we must keep calling waitpid() as
831 # long as we're able to reap a child.
832 while True:
833 try:
834 pid, status = os.waitpid(-1, os.WNOHANG)
835 except ChildProcessError:
836 # No more child processes exist.
837 return
838 else:
839 if pid == 0:
840 # A child process is still alive.
841 return
842
843 returncode = self._compute_returncode(status)
844
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800845 with self._lock:
846 try:
847 callback, args = self._callbacks.pop(pid)
848 except KeyError:
849 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800850 if self._forks:
851 # It may not be registered yet.
852 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200853 if self._loop.get_debug():
854 logger.debug('unknown process %s exited '
855 'with returncode %s',
856 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800857 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800858 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200859 else:
860 if self._loop.get_debug():
861 logger.debug('process %s exited with returncode %s',
862 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800863
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800864 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800865 logger.warning(
866 "Caught subprocess termination from unknown pid: "
867 "%d -> %d", pid, returncode)
868 else:
869 callback(pid, returncode, *args)
870
871
872class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
873 """XXX"""
874 _loop_factory = _UnixSelectorEventLoop
875
876 def __init__(self):
877 super().__init__()
878 self._watcher = None
879
880 def _init_watcher(self):
881 with events._lock:
882 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800883 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800884 if isinstance(threading.current_thread(),
885 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800886 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800887
888 def set_event_loop(self, loop):
889 """Set the event loop.
890
891 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800892 .set_event_loop() from the main thread will call .attach_loop(loop) on
893 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800894 """
895
896 super().set_event_loop(loop)
897
898 if self._watcher is not None and \
899 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800900 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800901
902 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200903 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800904
905 If not yet set, a SafeChildWatcher object is automatically created.
906 """
907 if self._watcher is None:
908 self._init_watcher()
909
910 return self._watcher
911
912 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200913 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800914
915 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
916
917 if self._watcher is not None:
918 self._watcher.close()
919
920 self._watcher = watcher
921
922SelectorEventLoop = _UnixSelectorEventLoop
923DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy