blob: b62dd3896d9c411bd602e9ed57b7fc5135cf8d2c [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52 def _socketpair(self):
53 return socket.socketpair()
54
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080059
Victor Stinnerfe5649c2014-07-17 22:43:40 +020060 def _process_self_data(self, data):
61 for signum in data:
62 if not signum:
63 # ignore null bytes written by _write_to_self()
64 continue
65 self._handle_signal(signum)
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 def add_signal_handler(self, sig, callback, *args):
68 """Add a handler for a signal. UNIX only.
69
70 Raise ValueError if the signal number is invalid or uncatchable.
71 Raise RuntimeError if there is a problem setting up the handler.
72 """
Victor Stinner2d99d932014-11-20 15:03:52 +010073 if (coroutines.iscoroutine(callback)
74 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010075 raise TypeError("coroutines cannot be used "
76 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010078 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 try:
80 # set_wakeup_fd() raises ValueError if this is not the
81 # main thread. By calling it early we ensure that an
82 # event loop running in another thread cannot add a signal
83 # handler.
84 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020085 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 raise RuntimeError(str(exc))
87
Yury Selivanov569efa22014-02-18 18:02:19 -050088 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._signal_handlers[sig] = handle
90
91 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020092 # Register a dummy signal handler to ask Python to write the signal
93 # number in the wakup file descriptor. _process_self_data() will
94 # read signal numbers from this file descriptor to handle signals.
95 signal.signal(sig, _sighandler_noop)
96
Charles-François Natali74e7cf32013-12-05 22:47:19 +010097 # Set SA_RESTART to limit EINTR occurrences.
98 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 except OSError as exc:
100 del self._signal_handlers[sig]
101 if not self._signal_handlers:
102 try:
103 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200104 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 if exc.errno == errno.EINVAL:
108 raise RuntimeError('sig {} cannot be caught'.format(sig))
109 else:
110 raise
111
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200112 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 """Internal helper that is the actual signal handler."""
114 handle = self._signal_handlers.get(sig)
115 if handle is None:
116 return # Assume it's some race condition.
117 if handle._cancelled:
118 self.remove_signal_handler(sig) # Remove it properly.
119 else:
120 self._add_callback_signalsafe(handle)
121
122 def remove_signal_handler(self, sig):
123 """Remove a handler for a signal. UNIX only.
124
125 Return True if a signal handler was removed, False if not.
126 """
127 self._check_signal(sig)
128 try:
129 del self._signal_handlers[sig]
130 except KeyError:
131 return False
132
133 if sig == signal.SIGINT:
134 handler = signal.default_int_handler
135 else:
136 handler = signal.SIG_DFL
137
138 try:
139 signal.signal(sig, handler)
140 except OSError as exc:
141 if exc.errno == errno.EINVAL:
142 raise RuntimeError('sig {} cannot be caught'.format(sig))
143 else:
144 raise
145
146 if not self._signal_handlers:
147 try:
148 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200149 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700150 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 return True
153
154 def _check_signal(self, sig):
155 """Internal helper to validate a signal.
156
157 Raise ValueError if the signal number is invalid or uncatchable.
158 Raise RuntimeError if there is a problem setting up the handler.
159 """
160 if not isinstance(sig, int):
161 raise TypeError('sig must be an int, not {!r}'.format(sig))
162
163 if not (1 <= sig < signal.NSIG):
164 raise ValueError(
165 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
170
171 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
172 extra=None):
173 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 with events.get_child_watcher() as watcher:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100180 waiter = futures.Future(loop=self)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 transp = _UnixSubprocessTransport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 waiter=waiter, extra=extra,
184 **kwargs)
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 watcher.add_child_handler(transp.get_pid(),
187 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 try:
189 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100190 except Exception as exc:
191 # Workaround CPython bug #23353: using yield/yield-from in an
192 # except block of a generator doesn't clear properly
193 # sys.exc_info()
194 err = exc
195 else:
196 err = None
197
198 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100200 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100201 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 return transp
204
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800205 def _child_watcher_callback(self, pid, returncode, transp):
206 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 def create_unix_connection(self, protocol_factory, path, *,
210 ssl=None, sock=None,
211 server_hostname=None):
212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Victor Stinner79a29522014-02-19 01:45:59 +0100226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 except:
231 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 raise
233
234 else:
235 if sock is None:
236 raise ValueError('no path and sock were specified')
237 sock.setblocking(False)
238
239 transport, protocol = yield from self._create_connection_transport(
240 sock, protocol_factory, ssl, server_hostname)
241 return transport, protocol
242
Victor Stinnerf951d282014-06-29 00:46:45 +0200243 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 def create_unix_server(self, protocol_factory, path=None, *,
245 sock=None, backlog=100, ssl=None):
246 if isinstance(ssl, bool):
247 raise TypeError('ssl argument must be an SSLContext or None')
248
249 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200250 if sock is not None:
251 raise ValueError(
252 'path and sock can not be specified at the same time')
253
Yury Selivanovb057c522014-02-18 12:15:06 -0500254 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
255
256 try:
257 sock.bind(path)
258 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100259 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500260 if exc.errno == errno.EADDRINUSE:
261 # Let's improve the error message by adding
262 # with what exact address it occurs.
263 msg = 'Address {!r} is already in use'.format(path)
264 raise OSError(errno.EADDRINUSE, msg) from None
265 else:
266 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200267 except:
268 sock.close()
269 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 else:
271 if sock is None:
272 raise ValueError(
273 'path was not specified, and no sock specified')
274
275 if sock.family != socket.AF_UNIX:
276 raise ValueError(
277 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
278
279 server = base_events.Server(self, [sock])
280 sock.listen(backlog)
281 sock.setblocking(False)
282 self._start_serving(protocol_factory, sock, ssl, server)
283 return server
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200286if hasattr(os, 'set_blocking'):
287 def _set_nonblocking(fd):
288 os.set_blocking(fd, False)
289else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400290 import fcntl
291
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200292 def _set_nonblocking(fd):
293 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
294 flags = flags | os.O_NONBLOCK
295 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296
297
298class _UnixReadPipeTransport(transports.ReadTransport):
299
Yury Selivanovdec1a452014-02-18 22:27:48 -0500300 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
303 super().__init__(extra)
304 self._extra['pipe'] = pipe
305 self._loop = loop
306 self._pipe = pipe
307 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700308 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800309 if not (stat.S_ISFIFO(mode) or
310 stat.S_ISSOCK(mode) or
311 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700312 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 _set_nonblocking(self._fileno)
314 self._protocol = protocol
315 self._closing = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100317 # only start reading when connection_made() has been called
318 self._loop.call_soon(self._loop.add_reader,
319 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100321 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500322 self._loop.call_soon(futures._set_result_unless_cancelled,
323 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
Victor Stinnere912e652014-07-12 03:11:53 +0200325 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100326 info = [self.__class__.__name__]
327 if self._pipe is None:
328 info.append('closed')
329 elif self._closing:
330 info.append('closing')
331 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400332 selector = getattr(self._loop, '_selector', None)
333 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200334 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400335 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200336 self._fileno, selectors.EVENT_READ)
337 if polling:
338 info.append('polling')
339 else:
340 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400341 elif self._pipe is not None:
342 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200343 else:
344 info.append('closed')
345 return '<%s>' % ' '.join(info)
346
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 def _read_ready(self):
348 try:
349 data = os.read(self._fileno, self.max_size)
350 except (BlockingIOError, InterruptedError):
351 pass
352 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100353 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 else:
355 if data:
356 self._protocol.data_received(data)
357 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200358 if self._loop.get_debug():
359 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 self._closing = True
361 self._loop.remove_reader(self._fileno)
362 self._loop.call_soon(self._protocol.eof_received)
363 self._loop.call_soon(self._call_connection_lost, None)
364
Guido van Rossum57497ad2013-10-18 07:58:20 -0700365 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 self._loop.remove_reader(self._fileno)
367
Guido van Rossum57497ad2013-10-18 07:58:20 -0700368 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 self._loop.add_reader(self._fileno, self._read_ready)
370
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500371 def is_closing(self):
372 return self._closing
373
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 def close(self):
375 if not self._closing:
376 self._close(None)
377
Victor Stinner978a9af2015-01-29 17:50:58 +0100378 # On Python 3.3 and older, objects with a destructor part of a reference
379 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
380 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400381 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100382 def __del__(self):
383 if self._pipe is not None:
384 warnings.warn("unclosed transport %r" % self, ResourceWarning)
385 self._pipe.close()
386
Victor Stinner0ee29c22014-02-19 01:40:41 +0100387 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200389 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
390 if self._loop.get_debug():
391 logger.debug("%r: %s", self, message, exc_info=True)
392 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500393 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100394 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500395 'exception': exc,
396 'transport': self,
397 'protocol': self._protocol,
398 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._close(exc)
400
401 def _close(self, exc):
402 self._closing = True
403 self._loop.remove_reader(self._fileno)
404 self._loop.call_soon(self._call_connection_lost, exc)
405
406 def _call_connection_lost(self, exc):
407 try:
408 self._protocol.connection_lost(exc)
409 finally:
410 self._pipe.close()
411 self._pipe = None
412 self._protocol = None
413 self._loop = None
414
415
Yury Selivanov3cb99142014-02-18 18:41:13 -0500416class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800417 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
419 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100420 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 self._pipe = pipe
423 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700424 mode = os.fstat(self._fileno).st_mode
425 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100426 if not (is_socket or
427 stat.S_ISFIFO(mode) or
428 stat.S_ISCHR(mode)):
429 raise ValueError("Pipe transport is only for "
430 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 _set_nonblocking(self._fileno)
432 self._protocol = protocol
433 self._buffer = []
434 self._conn_lost = 0
435 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700436
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100438
439 # On AIX, the reader trick (to be notified when the read end of the
440 # socket is closed) only works for sockets. On other platforms it
441 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
442 if is_socket or not sys.platform.startswith("aix"):
443 # only start reading when connection_made() has been called
444 self._loop.call_soon(self._loop.add_reader,
445 self._fileno, self._read_ready)
446
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100448 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500449 self._loop.call_soon(futures._set_result_unless_cancelled,
450 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
Victor Stinnere912e652014-07-12 03:11:53 +0200452 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100453 info = [self.__class__.__name__]
454 if self._pipe is None:
455 info.append('closed')
456 elif self._closing:
457 info.append('closing')
458 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400459 selector = getattr(self._loop, '_selector', None)
460 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200461 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400462 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200463 self._fileno, selectors.EVENT_WRITE)
464 if polling:
465 info.append('polling')
466 else:
467 info.append('idle')
468
469 bufsize = self.get_write_buffer_size()
470 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400471 elif self._pipe is not None:
472 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200473 else:
474 info.append('closed')
475 return '<%s>' % ' '.join(info)
476
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800477 def get_write_buffer_size(self):
478 return sum(len(data) for data in self._buffer)
479
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700481 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200482 if self._loop.get_debug():
483 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100484 if self._buffer:
485 self._close(BrokenPipeError())
486 else:
487 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488
489 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800490 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
491 if isinstance(data, bytearray):
492 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 if not data:
494 return
495
496 if self._conn_lost or self._closing:
497 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700498 logger.warning('pipe closed by peer or '
499 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 self._conn_lost += 1
501 return
502
503 if not self._buffer:
504 # Attempt to send it right away first.
505 try:
506 n = os.write(self._fileno, data)
507 except (BlockingIOError, InterruptedError):
508 n = 0
509 except Exception as exc:
510 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100511 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512 return
513 if n == len(data):
514 return
515 elif n > 0:
516 data = data[n:]
517 self._loop.add_writer(self._fileno, self._write_ready)
518
519 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800520 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521
522 def _write_ready(self):
523 data = b''.join(self._buffer)
524 assert data, 'Data should not be empty'
525
526 self._buffer.clear()
527 try:
528 n = os.write(self._fileno, data)
529 except (BlockingIOError, InterruptedError):
530 self._buffer.append(data)
531 except Exception as exc:
532 self._conn_lost += 1
533 # Remove writer here, _fatal_error() doesn't it
534 # because _buffer is empty.
535 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100536 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 else:
538 if n == len(data):
539 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800540 self._maybe_resume_protocol() # May append to buffer.
541 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 self._loop.remove_reader(self._fileno)
543 self._call_connection_lost(None)
544 return
545 elif n > 0:
546 data = data[n:]
547
548 self._buffer.append(data) # Try again later.
549
550 def can_write_eof(self):
551 return True
552
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 def write_eof(self):
554 if self._closing:
555 return
556 assert self._pipe
557 self._closing = True
558 if not self._buffer:
559 self._loop.remove_reader(self._fileno)
560 self._loop.call_soon(self._call_connection_lost, None)
561
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500562 def is_closing(self):
563 return self._closing
564
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100566 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 # write_eof is all what we needed to close the write pipe
568 self.write_eof()
569
Victor Stinner978a9af2015-01-29 17:50:58 +0100570 # On Python 3.3 and older, objects with a destructor part of a reference
571 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
572 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400573 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100574 def __del__(self):
575 if self._pipe is not None:
576 warnings.warn("unclosed transport %r" % self, ResourceWarning)
577 self._pipe.close()
578
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 def abort(self):
580 self._close(None)
581
Victor Stinner0ee29c22014-02-19 01:40:41 +0100582 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200584 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200585 if self._loop.get_debug():
586 logger.debug("%r: %s", self, message, exc_info=True)
587 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500588 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100589 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500590 'exception': exc,
591 'transport': self,
592 'protocol': self._protocol,
593 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 self._close(exc)
595
596 def _close(self, exc=None):
597 self._closing = True
598 if self._buffer:
599 self._loop.remove_writer(self._fileno)
600 self._buffer.clear()
601 self._loop.remove_reader(self._fileno)
602 self._loop.call_soon(self._call_connection_lost, exc)
603
604 def _call_connection_lost(self, exc):
605 try:
606 self._protocol.connection_lost(exc)
607 finally:
608 self._pipe.close()
609 self._pipe = None
610 self._protocol = None
611 self._loop = None
612
613
Victor Stinner1e40f102014-12-11 23:30:17 +0100614if hasattr(os, 'set_inheritable'):
615 # Python 3.4 and newer
616 _set_inheritable = os.set_inheritable
617else:
618 import fcntl
619
620 def _set_inheritable(fd, inheritable):
621 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
622
623 old = fcntl.fcntl(fd, fcntl.F_GETFD)
624 if not inheritable:
625 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
626 else:
627 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
628
629
Guido van Rossum59691282013-10-30 14:52:03 -0700630class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631
Guido van Rossum59691282013-10-30 14:52:03 -0700632 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700633 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700635 # Use a socket pair for stdin, since not all platforms
636 # support selecting read events on the write end of a
637 # socket (which we use in order to detect closing of the
638 # other end). Notably this is needed on AIX, and works
639 # just fine on other platforms.
640 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100641
642 # Mark the write end of the stdin pipe as non-inheritable,
643 # needed by close_fds=False on Python 3.3 and older
644 # (Python 3.4 implements the PEP 446, socketpair returns
645 # non-inheritable sockets)
646 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 self._proc = subprocess.Popen(
648 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
649 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700650 if stdin_w is not None:
651 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200652 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800653
654
655class AbstractChildWatcher:
656 """Abstract base class for monitoring child processes.
657
658 Objects derived from this class monitor a collection of subprocesses and
659 report their termination or interruption by a signal.
660
661 New callbacks are registered with .add_child_handler(). Starting a new
662 process must be done within a 'with' block to allow the watcher to suspend
663 its activity until the new process if fully registered (this is needed to
664 prevent a race condition in some implementations).
665
666 Example:
667 with watcher:
668 proc = subprocess.Popen("sleep 1")
669 watcher.add_child_handler(proc.pid, callback)
670
671 Notes:
672 Implementations of this class must be thread-safe.
673
674 Since child watcher objects may catch the SIGCHLD signal and call
675 waitpid(-1), there should be only one active object per process.
676 """
677
678 def add_child_handler(self, pid, callback, *args):
679 """Register a new child handler.
680
681 Arrange for callback(pid, returncode, *args) to be called when
682 process 'pid' terminates. Specifying another callback for the same
683 process replaces the previous handler.
684
Victor Stinneracdb7822014-07-14 18:33:40 +0200685 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800686 """
687 raise NotImplementedError()
688
689 def remove_child_handler(self, pid):
690 """Removes the handler for process 'pid'.
691
692 The function returns True if the handler was successfully removed,
693 False if there was nothing to remove."""
694
695 raise NotImplementedError()
696
Guido van Rossum2bcae702013-11-13 15:50:08 -0800697 def attach_loop(self, loop):
698 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800699
Guido van Rossum2bcae702013-11-13 15:50:08 -0800700 If the watcher was previously attached to an event loop, then it is
701 first detached before attaching to the new loop.
702
703 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800704 """
705 raise NotImplementedError()
706
707 def close(self):
708 """Close the watcher.
709
710 This must be called to make sure that any underlying resource is freed.
711 """
712 raise NotImplementedError()
713
714 def __enter__(self):
715 """Enter the watcher's context and allow starting new processes
716
717 This function must return self"""
718 raise NotImplementedError()
719
720 def __exit__(self, a, b, c):
721 """Exit the watcher's context"""
722 raise NotImplementedError()
723
724
725class BaseChildWatcher(AbstractChildWatcher):
726
Guido van Rossum2bcae702013-11-13 15:50:08 -0800727 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800728 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800729
730 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800731 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800732
733 def _do_waitpid(self, expected_pid):
734 raise NotImplementedError()
735
736 def _do_waitpid_all(self):
737 raise NotImplementedError()
738
Guido van Rossum2bcae702013-11-13 15:50:08 -0800739 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800740 assert loop is None or isinstance(loop, events.AbstractEventLoop)
741
742 if self._loop is not None:
743 self._loop.remove_signal_handler(signal.SIGCHLD)
744
745 self._loop = loop
746 if loop is not None:
747 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
748
749 # Prevent a race condition in case a child terminated
750 # during the switch.
751 self._do_waitpid_all()
752
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753 def _sig_chld(self):
754 try:
755 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500756 except Exception as exc:
757 # self._loop should always be available here
758 # as '_sig_chld' is added as a signal handler
759 # in 'attach_loop'
760 self._loop.call_exception_handler({
761 'message': 'Unknown exception in SIGCHLD handler',
762 'exception': exc,
763 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800764
765 def _compute_returncode(self, status):
766 if os.WIFSIGNALED(status):
767 # The child process died because of a signal.
768 return -os.WTERMSIG(status)
769 elif os.WIFEXITED(status):
770 # The child process exited (e.g sys.exit()).
771 return os.WEXITSTATUS(status)
772 else:
773 # The child exited, but we don't understand its status.
774 # This shouldn't happen, but if it does, let's just
775 # return that status; perhaps that helps debug it.
776 return status
777
778
779class SafeChildWatcher(BaseChildWatcher):
780 """'Safe' child watcher implementation.
781
782 This implementation avoids disrupting other code spawning processes by
783 polling explicitly each process in the SIGCHLD handler instead of calling
784 os.waitpid(-1).
785
786 This is a safe solution but it has a significant overhead when handling a
787 big number of children (O(n) each time SIGCHLD is raised)
788 """
789
Guido van Rossum2bcae702013-11-13 15:50:08 -0800790 def __init__(self):
791 super().__init__()
792 self._callbacks = {}
793
794 def close(self):
795 self._callbacks.clear()
796 super().close()
797
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800798 def __enter__(self):
799 return self
800
801 def __exit__(self, a, b, c):
802 pass
803
804 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100805 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800806
807 # Prevent a race condition in case the child is already terminated.
808 self._do_waitpid(pid)
809
Guido van Rossum2bcae702013-11-13 15:50:08 -0800810 def remove_child_handler(self, pid):
811 try:
812 del self._callbacks[pid]
813 return True
814 except KeyError:
815 return False
816
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800817 def _do_waitpid_all(self):
818
819 for pid in list(self._callbacks):
820 self._do_waitpid(pid)
821
822 def _do_waitpid(self, expected_pid):
823 assert expected_pid > 0
824
825 try:
826 pid, status = os.waitpid(expected_pid, os.WNOHANG)
827 except ChildProcessError:
828 # The child process is already reaped
829 # (may happen if waitpid() is called elsewhere).
830 pid = expected_pid
831 returncode = 255
832 logger.warning(
833 "Unknown child process pid %d, will report returncode 255",
834 pid)
835 else:
836 if pid == 0:
837 # The child process is still alive.
838 return
839
840 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200841 if self._loop.get_debug():
842 logger.debug('process %s exited with returncode %s',
843 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800844
845 try:
846 callback, args = self._callbacks.pop(pid)
847 except KeyError: # pragma: no cover
848 # May happen if .remove_child_handler() is called
849 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200850 if self._loop.get_debug():
851 logger.warning("Child watcher got an unexpected pid: %r",
852 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800853 else:
854 callback(pid, returncode, *args)
855
856
857class FastChildWatcher(BaseChildWatcher):
858 """'Fast' child watcher implementation.
859
860 This implementation reaps every terminated processes by calling
861 os.waitpid(-1) directly, possibly breaking other code spawning processes
862 and waiting for their termination.
863
864 There is no noticeable overhead when handling a big number of children
865 (O(1) each time a child terminates).
866 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800867 def __init__(self):
868 super().__init__()
869 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800870 self._lock = threading.Lock()
871 self._zombies = {}
872 self._forks = 0
873
874 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800875 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800876 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800877 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800878
879 def __enter__(self):
880 with self._lock:
881 self._forks += 1
882
883 return self
884
885 def __exit__(self, a, b, c):
886 with self._lock:
887 self._forks -= 1
888
889 if self._forks or not self._zombies:
890 return
891
892 collateral_victims = str(self._zombies)
893 self._zombies.clear()
894
895 logger.warning(
896 "Caught subprocesses termination from unknown pids: %s",
897 collateral_victims)
898
899 def add_child_handler(self, pid, callback, *args):
900 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800901 with self._lock:
902 try:
903 returncode = self._zombies.pop(pid)
904 except KeyError:
905 # The child is running.
906 self._callbacks[pid] = callback, args
907 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800908
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800909 # The child is dead already. We can fire the callback.
910 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800911
Guido van Rossum2bcae702013-11-13 15:50:08 -0800912 def remove_child_handler(self, pid):
913 try:
914 del self._callbacks[pid]
915 return True
916 except KeyError:
917 return False
918
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800919 def _do_waitpid_all(self):
920 # Because of signal coalescing, we must keep calling waitpid() as
921 # long as we're able to reap a child.
922 while True:
923 try:
924 pid, status = os.waitpid(-1, os.WNOHANG)
925 except ChildProcessError:
926 # No more child processes exist.
927 return
928 else:
929 if pid == 0:
930 # A child process is still alive.
931 return
932
933 returncode = self._compute_returncode(status)
934
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800935 with self._lock:
936 try:
937 callback, args = self._callbacks.pop(pid)
938 except KeyError:
939 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800940 if self._forks:
941 # It may not be registered yet.
942 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200943 if self._loop.get_debug():
944 logger.debug('unknown process %s exited '
945 'with returncode %s',
946 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800947 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800948 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200949 else:
950 if self._loop.get_debug():
951 logger.debug('process %s exited with returncode %s',
952 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800953
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800954 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800955 logger.warning(
956 "Caught subprocess termination from unknown pid: "
957 "%d -> %d", pid, returncode)
958 else:
959 callback(pid, returncode, *args)
960
961
962class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100963 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800964 _loop_factory = _UnixSelectorEventLoop
965
966 def __init__(self):
967 super().__init__()
968 self._watcher = None
969
970 def _init_watcher(self):
971 with events._lock:
972 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800973 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800974 if isinstance(threading.current_thread(),
975 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800976 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800977
978 def set_event_loop(self, loop):
979 """Set the event loop.
980
981 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800982 .set_event_loop() from the main thread will call .attach_loop(loop) on
983 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800984 """
985
986 super().set_event_loop(loop)
987
988 if self._watcher is not None and \
989 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800990 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800991
992 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200993 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800994
995 If not yet set, a SafeChildWatcher object is automatically created.
996 """
997 if self._watcher is None:
998 self._init_watcher()
999
1000 return self._watcher
1001
1002 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001003 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001004
1005 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1006
1007 if self._watcher is not None:
1008 self._watcher.close()
1009
1010 self._watcher = watcher
1011
1012SelectorEventLoop = _UnixSelectorEventLoop
1013DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy