blob: ce49c4f37a8e7118eb49dbc2d3a2d8ec36564d08 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52 def _socketpair(self):
53 return socket.socketpair()
54
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080059
Victor Stinnerfe5649c2014-07-17 22:43:40 +020060 def _process_self_data(self, data):
61 for signum in data:
62 if not signum:
63 # ignore null bytes written by _write_to_self()
64 continue
65 self._handle_signal(signum)
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 def add_signal_handler(self, sig, callback, *args):
68 """Add a handler for a signal. UNIX only.
69
70 Raise ValueError if the signal number is invalid or uncatchable.
71 Raise RuntimeError if there is a problem setting up the handler.
72 """
Victor Stinner2d99d932014-11-20 15:03:52 +010073 if (coroutines.iscoroutine(callback)
74 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010075 raise TypeError("coroutines cannot be used "
76 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010078 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 try:
80 # set_wakeup_fd() raises ValueError if this is not the
81 # main thread. By calling it early we ensure that an
82 # event loop running in another thread cannot add a signal
83 # handler.
84 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020085 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 raise RuntimeError(str(exc))
87
Yury Selivanov569efa22014-02-18 18:02:19 -050088 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._signal_handlers[sig] = handle
90
91 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020092 # Register a dummy signal handler to ask Python to write the signal
93 # number in the wakup file descriptor. _process_self_data() will
94 # read signal numbers from this file descriptor to handle signals.
95 signal.signal(sig, _sighandler_noop)
96
Charles-François Natali74e7cf32013-12-05 22:47:19 +010097 # Set SA_RESTART to limit EINTR occurrences.
98 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 except OSError as exc:
100 del self._signal_handlers[sig]
101 if not self._signal_handlers:
102 try:
103 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200104 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 if exc.errno == errno.EINVAL:
108 raise RuntimeError('sig {} cannot be caught'.format(sig))
109 else:
110 raise
111
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200112 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 """Internal helper that is the actual signal handler."""
114 handle = self._signal_handlers.get(sig)
115 if handle is None:
116 return # Assume it's some race condition.
117 if handle._cancelled:
118 self.remove_signal_handler(sig) # Remove it properly.
119 else:
120 self._add_callback_signalsafe(handle)
121
122 def remove_signal_handler(self, sig):
123 """Remove a handler for a signal. UNIX only.
124
125 Return True if a signal handler was removed, False if not.
126 """
127 self._check_signal(sig)
128 try:
129 del self._signal_handlers[sig]
130 except KeyError:
131 return False
132
133 if sig == signal.SIGINT:
134 handler = signal.default_int_handler
135 else:
136 handler = signal.SIG_DFL
137
138 try:
139 signal.signal(sig, handler)
140 except OSError as exc:
141 if exc.errno == errno.EINVAL:
142 raise RuntimeError('sig {} cannot be caught'.format(sig))
143 else:
144 raise
145
146 if not self._signal_handlers:
147 try:
148 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200149 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700150 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 return True
153
154 def _check_signal(self, sig):
155 """Internal helper to validate a signal.
156
157 Raise ValueError if the signal number is invalid or uncatchable.
158 Raise RuntimeError if there is a problem setting up the handler.
159 """
160 if not isinstance(sig, int):
161 raise TypeError('sig must be an int, not {!r}'.format(sig))
162
163 if not (1 <= sig < signal.NSIG):
164 raise ValueError(
165 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
170
171 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
172 extra=None):
173 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400180 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 transp = _UnixSubprocessTransport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 waiter=waiter, extra=extra,
184 **kwargs)
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 watcher.add_child_handler(transp.get_pid(),
187 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 try:
189 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100190 except Exception as exc:
191 # Workaround CPython bug #23353: using yield/yield-from in an
192 # except block of a generator doesn't clear properly
193 # sys.exc_info()
194 err = exc
195 else:
196 err = None
197
198 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100200 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100201 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 return transp
204
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800205 def _child_watcher_callback(self, pid, returncode, transp):
206 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 def create_unix_connection(self, protocol_factory, path, *,
210 ssl=None, sock=None,
211 server_hostname=None):
212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Victor Stinner79a29522014-02-19 01:45:59 +0100226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 except:
231 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 raise
233
234 else:
235 if sock is None:
236 raise ValueError('no path and sock were specified')
237 sock.setblocking(False)
238
239 transport, protocol = yield from self._create_connection_transport(
240 sock, protocol_factory, ssl, server_hostname)
241 return transport, protocol
242
Victor Stinnerf951d282014-06-29 00:46:45 +0200243 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 def create_unix_server(self, protocol_factory, path=None, *,
245 sock=None, backlog=100, ssl=None):
246 if isinstance(ssl, bool):
247 raise TypeError('ssl argument must be an SSLContext or None')
248
249 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200250 if sock is not None:
251 raise ValueError(
252 'path and sock can not be specified at the same time')
253
Yury Selivanovb057c522014-02-18 12:15:06 -0500254 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
255
256 try:
257 sock.bind(path)
258 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100259 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500260 if exc.errno == errno.EADDRINUSE:
261 # Let's improve the error message by adding
262 # with what exact address it occurs.
263 msg = 'Address {!r} is already in use'.format(path)
264 raise OSError(errno.EADDRINUSE, msg) from None
265 else:
266 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200267 except:
268 sock.close()
269 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 else:
271 if sock is None:
272 raise ValueError(
273 'path was not specified, and no sock specified')
274
275 if sock.family != socket.AF_UNIX:
276 raise ValueError(
277 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
278
279 server = base_events.Server(self, [sock])
280 sock.listen(backlog)
281 sock.setblocking(False)
282 self._start_serving(protocol_factory, sock, ssl, server)
283 return server
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200286if hasattr(os, 'set_blocking'):
287 def _set_nonblocking(fd):
288 os.set_blocking(fd, False)
289else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400290 import fcntl
291
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200292 def _set_nonblocking(fd):
293 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
294 flags = flags | os.O_NONBLOCK
295 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296
297
298class _UnixReadPipeTransport(transports.ReadTransport):
299
Yury Selivanovdec1a452014-02-18 22:27:48 -0500300 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
303 super().__init__(extra)
304 self._extra['pipe'] = pipe
305 self._loop = loop
306 self._pipe = pipe
307 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700308 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800309 if not (stat.S_ISFIFO(mode) or
310 stat.S_ISSOCK(mode) or
311 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700312 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 _set_nonblocking(self._fileno)
314 self._protocol = protocol
315 self._closing = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100317 # only start reading when connection_made() has been called
318 self._loop.call_soon(self._loop.add_reader,
319 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100321 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500322 self._loop.call_soon(futures._set_result_unless_cancelled,
323 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
Victor Stinnere912e652014-07-12 03:11:53 +0200325 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100326 info = [self.__class__.__name__]
327 if self._pipe is None:
328 info.append('closed')
329 elif self._closing:
330 info.append('closing')
331 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400332 selector = getattr(self._loop, '_selector', None)
333 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200334 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400335 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200336 self._fileno, selectors.EVENT_READ)
337 if polling:
338 info.append('polling')
339 else:
340 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400341 elif self._pipe is not None:
342 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200343 else:
344 info.append('closed')
345 return '<%s>' % ' '.join(info)
346
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 def _read_ready(self):
348 try:
349 data = os.read(self._fileno, self.max_size)
350 except (BlockingIOError, InterruptedError):
351 pass
352 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100353 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 else:
355 if data:
356 self._protocol.data_received(data)
357 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200358 if self._loop.get_debug():
359 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 self._closing = True
361 self._loop.remove_reader(self._fileno)
362 self._loop.call_soon(self._protocol.eof_received)
363 self._loop.call_soon(self._call_connection_lost, None)
364
Guido van Rossum57497ad2013-10-18 07:58:20 -0700365 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 self._loop.remove_reader(self._fileno)
367
Guido van Rossum57497ad2013-10-18 07:58:20 -0700368 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 self._loop.add_reader(self._fileno, self._read_ready)
370
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500371 def is_closing(self):
372 return self._closing
373
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 def close(self):
375 if not self._closing:
376 self._close(None)
377
Victor Stinner978a9af2015-01-29 17:50:58 +0100378 # On Python 3.3 and older, objects with a destructor part of a reference
379 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
380 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400381 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100382 def __del__(self):
383 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100384 warnings.warn("unclosed transport %r" % self, ResourceWarning,
385 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100386 self._pipe.close()
387
Victor Stinner0ee29c22014-02-19 01:40:41 +0100388 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200390 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
391 if self._loop.get_debug():
392 logger.debug("%r: %s", self, message, exc_info=True)
393 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500394 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100395 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500396 'exception': exc,
397 'transport': self,
398 'protocol': self._protocol,
399 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 self._close(exc)
401
402 def _close(self, exc):
403 self._closing = True
404 self._loop.remove_reader(self._fileno)
405 self._loop.call_soon(self._call_connection_lost, exc)
406
407 def _call_connection_lost(self, exc):
408 try:
409 self._protocol.connection_lost(exc)
410 finally:
411 self._pipe.close()
412 self._pipe = None
413 self._protocol = None
414 self._loop = None
415
416
Yury Selivanov3cb99142014-02-18 18:41:13 -0500417class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800418 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419
420 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100421 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 self._pipe = pipe
424 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700425 mode = os.fstat(self._fileno).st_mode
426 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100427 if not (is_socket or
428 stat.S_ISFIFO(mode) or
429 stat.S_ISCHR(mode)):
430 raise ValueError("Pipe transport is only for "
431 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 _set_nonblocking(self._fileno)
433 self._protocol = protocol
434 self._buffer = []
435 self._conn_lost = 0
436 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700437
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100439
440 # On AIX, the reader trick (to be notified when the read end of the
441 # socket is closed) only works for sockets. On other platforms it
442 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
443 if is_socket or not sys.platform.startswith("aix"):
444 # only start reading when connection_made() has been called
445 self._loop.call_soon(self._loop.add_reader,
446 self._fileno, self._read_ready)
447
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100449 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500450 self._loop.call_soon(futures._set_result_unless_cancelled,
451 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452
Victor Stinnere912e652014-07-12 03:11:53 +0200453 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100454 info = [self.__class__.__name__]
455 if self._pipe is None:
456 info.append('closed')
457 elif self._closing:
458 info.append('closing')
459 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400460 selector = getattr(self._loop, '_selector', None)
461 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200462 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400463 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200464 self._fileno, selectors.EVENT_WRITE)
465 if polling:
466 info.append('polling')
467 else:
468 info.append('idle')
469
470 bufsize = self.get_write_buffer_size()
471 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400472 elif self._pipe is not None:
473 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200474 else:
475 info.append('closed')
476 return '<%s>' % ' '.join(info)
477
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800478 def get_write_buffer_size(self):
479 return sum(len(data) for data in self._buffer)
480
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700482 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200483 if self._loop.get_debug():
484 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100485 if self._buffer:
486 self._close(BrokenPipeError())
487 else:
488 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489
490 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800491 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
492 if isinstance(data, bytearray):
493 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494 if not data:
495 return
496
497 if self._conn_lost or self._closing:
498 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700499 logger.warning('pipe closed by peer or '
500 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501 self._conn_lost += 1
502 return
503
504 if not self._buffer:
505 # Attempt to send it right away first.
506 try:
507 n = os.write(self._fileno, data)
508 except (BlockingIOError, InterruptedError):
509 n = 0
510 except Exception as exc:
511 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100512 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513 return
514 if n == len(data):
515 return
516 elif n > 0:
517 data = data[n:]
518 self._loop.add_writer(self._fileno, self._write_ready)
519
520 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800521 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700522
523 def _write_ready(self):
524 data = b''.join(self._buffer)
525 assert data, 'Data should not be empty'
526
527 self._buffer.clear()
528 try:
529 n = os.write(self._fileno, data)
530 except (BlockingIOError, InterruptedError):
531 self._buffer.append(data)
532 except Exception as exc:
533 self._conn_lost += 1
534 # Remove writer here, _fatal_error() doesn't it
535 # because _buffer is empty.
536 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100537 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 else:
539 if n == len(data):
540 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800541 self._maybe_resume_protocol() # May append to buffer.
542 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 self._loop.remove_reader(self._fileno)
544 self._call_connection_lost(None)
545 return
546 elif n > 0:
547 data = data[n:]
548
549 self._buffer.append(data) # Try again later.
550
551 def can_write_eof(self):
552 return True
553
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 def write_eof(self):
555 if self._closing:
556 return
557 assert self._pipe
558 self._closing = True
559 if not self._buffer:
560 self._loop.remove_reader(self._fileno)
561 self._loop.call_soon(self._call_connection_lost, None)
562
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500563 def is_closing(self):
564 return self._closing
565
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100567 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 # write_eof is all what we needed to close the write pipe
569 self.write_eof()
570
Victor Stinner978a9af2015-01-29 17:50:58 +0100571 # On Python 3.3 and older, objects with a destructor part of a reference
572 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
573 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400574 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100575 def __del__(self):
576 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100577 warnings.warn("unclosed transport %r" % self, ResourceWarning,
578 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100579 self._pipe.close()
580
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 def abort(self):
582 self._close(None)
583
Victor Stinner0ee29c22014-02-19 01:40:41 +0100584 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200586 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200587 if self._loop.get_debug():
588 logger.debug("%r: %s", self, message, exc_info=True)
589 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500590 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100591 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500592 'exception': exc,
593 'transport': self,
594 'protocol': self._protocol,
595 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 self._close(exc)
597
598 def _close(self, exc=None):
599 self._closing = True
600 if self._buffer:
601 self._loop.remove_writer(self._fileno)
602 self._buffer.clear()
603 self._loop.remove_reader(self._fileno)
604 self._loop.call_soon(self._call_connection_lost, exc)
605
606 def _call_connection_lost(self, exc):
607 try:
608 self._protocol.connection_lost(exc)
609 finally:
610 self._pipe.close()
611 self._pipe = None
612 self._protocol = None
613 self._loop = None
614
615
Victor Stinner1e40f102014-12-11 23:30:17 +0100616if hasattr(os, 'set_inheritable'):
617 # Python 3.4 and newer
618 _set_inheritable = os.set_inheritable
619else:
620 import fcntl
621
622 def _set_inheritable(fd, inheritable):
623 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
624
625 old = fcntl.fcntl(fd, fcntl.F_GETFD)
626 if not inheritable:
627 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
628 else:
629 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
630
631
Guido van Rossum59691282013-10-30 14:52:03 -0700632class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
Guido van Rossum59691282013-10-30 14:52:03 -0700634 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700635 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700637 # Use a socket pair for stdin, since not all platforms
638 # support selecting read events on the write end of a
639 # socket (which we use in order to detect closing of the
640 # other end). Notably this is needed on AIX, and works
641 # just fine on other platforms.
642 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100643
644 # Mark the write end of the stdin pipe as non-inheritable,
645 # needed by close_fds=False on Python 3.3 and older
646 # (Python 3.4 implements the PEP 446, socketpair returns
647 # non-inheritable sockets)
648 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 self._proc = subprocess.Popen(
650 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
651 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700652 if stdin_w is not None:
653 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200654 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800655
656
657class AbstractChildWatcher:
658 """Abstract base class for monitoring child processes.
659
660 Objects derived from this class monitor a collection of subprocesses and
661 report their termination or interruption by a signal.
662
663 New callbacks are registered with .add_child_handler(). Starting a new
664 process must be done within a 'with' block to allow the watcher to suspend
665 its activity until the new process if fully registered (this is needed to
666 prevent a race condition in some implementations).
667
668 Example:
669 with watcher:
670 proc = subprocess.Popen("sleep 1")
671 watcher.add_child_handler(proc.pid, callback)
672
673 Notes:
674 Implementations of this class must be thread-safe.
675
676 Since child watcher objects may catch the SIGCHLD signal and call
677 waitpid(-1), there should be only one active object per process.
678 """
679
680 def add_child_handler(self, pid, callback, *args):
681 """Register a new child handler.
682
683 Arrange for callback(pid, returncode, *args) to be called when
684 process 'pid' terminates. Specifying another callback for the same
685 process replaces the previous handler.
686
Victor Stinneracdb7822014-07-14 18:33:40 +0200687 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800688 """
689 raise NotImplementedError()
690
691 def remove_child_handler(self, pid):
692 """Removes the handler for process 'pid'.
693
694 The function returns True if the handler was successfully removed,
695 False if there was nothing to remove."""
696
697 raise NotImplementedError()
698
Guido van Rossum2bcae702013-11-13 15:50:08 -0800699 def attach_loop(self, loop):
700 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800701
Guido van Rossum2bcae702013-11-13 15:50:08 -0800702 If the watcher was previously attached to an event loop, then it is
703 first detached before attaching to the new loop.
704
705 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800706 """
707 raise NotImplementedError()
708
709 def close(self):
710 """Close the watcher.
711
712 This must be called to make sure that any underlying resource is freed.
713 """
714 raise NotImplementedError()
715
716 def __enter__(self):
717 """Enter the watcher's context and allow starting new processes
718
719 This function must return self"""
720 raise NotImplementedError()
721
722 def __exit__(self, a, b, c):
723 """Exit the watcher's context"""
724 raise NotImplementedError()
725
726
727class BaseChildWatcher(AbstractChildWatcher):
728
Guido van Rossum2bcae702013-11-13 15:50:08 -0800729 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800730 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800731
732 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800733 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800734
735 def _do_waitpid(self, expected_pid):
736 raise NotImplementedError()
737
738 def _do_waitpid_all(self):
739 raise NotImplementedError()
740
Guido van Rossum2bcae702013-11-13 15:50:08 -0800741 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800742 assert loop is None or isinstance(loop, events.AbstractEventLoop)
743
744 if self._loop is not None:
745 self._loop.remove_signal_handler(signal.SIGCHLD)
746
747 self._loop = loop
748 if loop is not None:
749 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
750
751 # Prevent a race condition in case a child terminated
752 # during the switch.
753 self._do_waitpid_all()
754
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800755 def _sig_chld(self):
756 try:
757 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500758 except Exception as exc:
759 # self._loop should always be available here
760 # as '_sig_chld' is added as a signal handler
761 # in 'attach_loop'
762 self._loop.call_exception_handler({
763 'message': 'Unknown exception in SIGCHLD handler',
764 'exception': exc,
765 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800766
767 def _compute_returncode(self, status):
768 if os.WIFSIGNALED(status):
769 # The child process died because of a signal.
770 return -os.WTERMSIG(status)
771 elif os.WIFEXITED(status):
772 # The child process exited (e.g sys.exit()).
773 return os.WEXITSTATUS(status)
774 else:
775 # The child exited, but we don't understand its status.
776 # This shouldn't happen, but if it does, let's just
777 # return that status; perhaps that helps debug it.
778 return status
779
780
781class SafeChildWatcher(BaseChildWatcher):
782 """'Safe' child watcher implementation.
783
784 This implementation avoids disrupting other code spawning processes by
785 polling explicitly each process in the SIGCHLD handler instead of calling
786 os.waitpid(-1).
787
788 This is a safe solution but it has a significant overhead when handling a
789 big number of children (O(n) each time SIGCHLD is raised)
790 """
791
Guido van Rossum2bcae702013-11-13 15:50:08 -0800792 def __init__(self):
793 super().__init__()
794 self._callbacks = {}
795
796 def close(self):
797 self._callbacks.clear()
798 super().close()
799
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800 def __enter__(self):
801 return self
802
803 def __exit__(self, a, b, c):
804 pass
805
806 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100807 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800808
809 # Prevent a race condition in case the child is already terminated.
810 self._do_waitpid(pid)
811
Guido van Rossum2bcae702013-11-13 15:50:08 -0800812 def remove_child_handler(self, pid):
813 try:
814 del self._callbacks[pid]
815 return True
816 except KeyError:
817 return False
818
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800819 def _do_waitpid_all(self):
820
821 for pid in list(self._callbacks):
822 self._do_waitpid(pid)
823
824 def _do_waitpid(self, expected_pid):
825 assert expected_pid > 0
826
827 try:
828 pid, status = os.waitpid(expected_pid, os.WNOHANG)
829 except ChildProcessError:
830 # The child process is already reaped
831 # (may happen if waitpid() is called elsewhere).
832 pid = expected_pid
833 returncode = 255
834 logger.warning(
835 "Unknown child process pid %d, will report returncode 255",
836 pid)
837 else:
838 if pid == 0:
839 # The child process is still alive.
840 return
841
842 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200843 if self._loop.get_debug():
844 logger.debug('process %s exited with returncode %s',
845 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800846
847 try:
848 callback, args = self._callbacks.pop(pid)
849 except KeyError: # pragma: no cover
850 # May happen if .remove_child_handler() is called
851 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200852 if self._loop.get_debug():
853 logger.warning("Child watcher got an unexpected pid: %r",
854 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 else:
856 callback(pid, returncode, *args)
857
858
859class FastChildWatcher(BaseChildWatcher):
860 """'Fast' child watcher implementation.
861
862 This implementation reaps every terminated processes by calling
863 os.waitpid(-1) directly, possibly breaking other code spawning processes
864 and waiting for their termination.
865
866 There is no noticeable overhead when handling a big number of children
867 (O(1) each time a child terminates).
868 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800869 def __init__(self):
870 super().__init__()
871 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800872 self._lock = threading.Lock()
873 self._zombies = {}
874 self._forks = 0
875
876 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800877 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800878 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800879 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800880
881 def __enter__(self):
882 with self._lock:
883 self._forks += 1
884
885 return self
886
887 def __exit__(self, a, b, c):
888 with self._lock:
889 self._forks -= 1
890
891 if self._forks or not self._zombies:
892 return
893
894 collateral_victims = str(self._zombies)
895 self._zombies.clear()
896
897 logger.warning(
898 "Caught subprocesses termination from unknown pids: %s",
899 collateral_victims)
900
901 def add_child_handler(self, pid, callback, *args):
902 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800903 with self._lock:
904 try:
905 returncode = self._zombies.pop(pid)
906 except KeyError:
907 # The child is running.
908 self._callbacks[pid] = callback, args
909 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800910
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800911 # The child is dead already. We can fire the callback.
912 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800913
Guido van Rossum2bcae702013-11-13 15:50:08 -0800914 def remove_child_handler(self, pid):
915 try:
916 del self._callbacks[pid]
917 return True
918 except KeyError:
919 return False
920
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800921 def _do_waitpid_all(self):
922 # Because of signal coalescing, we must keep calling waitpid() as
923 # long as we're able to reap a child.
924 while True:
925 try:
926 pid, status = os.waitpid(-1, os.WNOHANG)
927 except ChildProcessError:
928 # No more child processes exist.
929 return
930 else:
931 if pid == 0:
932 # A child process is still alive.
933 return
934
935 returncode = self._compute_returncode(status)
936
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800937 with self._lock:
938 try:
939 callback, args = self._callbacks.pop(pid)
940 except KeyError:
941 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800942 if self._forks:
943 # It may not be registered yet.
944 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200945 if self._loop.get_debug():
946 logger.debug('unknown process %s exited '
947 'with returncode %s',
948 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800949 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800950 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200951 else:
952 if self._loop.get_debug():
953 logger.debug('process %s exited with returncode %s',
954 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800955
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800956 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800957 logger.warning(
958 "Caught subprocess termination from unknown pid: "
959 "%d -> %d", pid, returncode)
960 else:
961 callback(pid, returncode, *args)
962
963
964class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100965 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800966 _loop_factory = _UnixSelectorEventLoop
967
968 def __init__(self):
969 super().__init__()
970 self._watcher = None
971
972 def _init_watcher(self):
973 with events._lock:
974 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800975 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800976 if isinstance(threading.current_thread(),
977 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800978 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800979
980 def set_event_loop(self, loop):
981 """Set the event loop.
982
983 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800984 .set_event_loop() from the main thread will call .attach_loop(loop) on
985 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800986 """
987
988 super().set_event_loop(loop)
989
990 if self._watcher is not None and \
991 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800992 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800993
994 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200995 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800996
997 If not yet set, a SafeChildWatcher object is automatically created.
998 """
999 if self._watcher is None:
1000 self._init_watcher()
1001
1002 return self._watcher
1003
1004 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001005 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001006
1007 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1008
1009 if self._watcher is not None:
1010 self._watcher.close()
1011
1012 self._watcher = watcher
1013
1014SelectorEventLoop = _UnixSelectorEventLoop
1015DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy