blob: 96cc4f02588ac8f8954d1743c7069d303a69b24b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070030import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Victor Stinnerf951d282014-06-29 00:46:45 +020032from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033from . import events
34from . import futures
35from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070036from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
38
Yury Selivanov6370f342017-12-10 18:36:12 -050039__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41
Yury Selivanov592ada92014-09-25 12:07:56 -040042# Minimum number of _scheduled timer handles before cleanup of
43# cancelled handles is performed.
44_MIN_SCHEDULED_TIMER_HANDLES = 100
45
46# Minimum fraction of _scheduled timer handles that are cancelled
47# before cleanup of cancelled handles is performed.
48_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
Victor Stinnerc94a93a2016-04-01 21:43:39 +020050# Exceptions which must not call the exception handler in fatal error
51# methods (_fatal_error())
52_FATAL_ERROR_IGNORE = (BrokenPipeError,
53 ConnectionResetError, ConnectionAbortedError)
54
55
Victor Stinner0e6f52a2014-06-20 17:34:15 +020056def _format_handle(handle):
57 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040058 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020059 # format the task
60 return repr(cb.__self__)
61 else:
62 return str(handle)
63
64
Victor Stinneracdb7822014-07-14 18:33:40 +020065def _format_pipe(fd):
66 if fd == subprocess.PIPE:
67 return '<pipe>'
68 elif fd == subprocess.STDOUT:
69 return '<stdout>'
70 else:
71 return repr(fd)
72
73
Yury Selivanov5587d7c2016-09-15 15:45:07 -040074def _set_reuseport(sock):
75 if not hasattr(socket, 'SO_REUSEPORT'):
76 raise ValueError('reuse_port not supported by socket module')
77 else:
78 try:
79 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
80 except OSError:
81 raise ValueError('reuse_port not supported by socket module, '
82 'SO_REUSEPORT defined but not implemented.')
83
84
Yury Selivanovd5c2a622015-12-16 19:31:17 -050085def _ipaddr_info(host, port, family, type, proto):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -040086 # Try to skip getaddrinfo if "host" is already an IP. Users might have
87 # handled name resolution in their own code and pass in resolved IPs.
88 if not hasattr(socket, 'inet_pton'):
89 return
90
91 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
92 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -050093 return None
94
Yury Selivanova7bd64c2017-12-19 06:44:37 -050095 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -050096 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -050097 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -050098 proto = socket.IPPROTO_UDP
99 else:
100 return None
101
Yury Selivanova7146162016-06-02 16:51:07 -0400102 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400103 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700104 elif isinstance(port, bytes) and port == b'':
105 port = 0
106 elif isinstance(port, str) and port == '':
107 port = 0
108 else:
109 # If port's a service name like "http", don't skip getaddrinfo.
110 try:
111 port = int(port)
112 except (TypeError, ValueError):
113 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400114
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400115 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500116 afs = [socket.AF_INET]
117 if hasattr(socket, 'AF_INET6'):
118 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400119 else:
120 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500121
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400122 if isinstance(host, bytes):
123 host = host.decode('idna')
124 if '%' in host:
125 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
126 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500127 return None
128
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400129 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500130 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400131 socket.inet_pton(af, host)
132 # The host has already been resolved.
133 return af, type, proto, '', (host, port)
134 except OSError:
135 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500136
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400137 # "host" is not an IP address.
138 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139
140
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100141def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500142 if not fut.cancelled():
143 exc = fut.exception()
144 if isinstance(exc, BaseException) and not isinstance(exc, Exception):
145 # Issue #22429: run_forever() already finished, no need to
146 # stop it.
147 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500148 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100149
150
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151class Server(events.AbstractServer):
152
153 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200154 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200156 self._active_count = 0
157 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
Victor Stinnere912e652014-07-12 03:11:53 +0200159 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500160 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200161
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200162 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200164 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200166 def _detach(self):
167 assert self._active_count > 0
168 self._active_count -= 1
169 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 self._wakeup()
171
172 def close(self):
173 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200174 if sockets is None:
175 return
176 self.sockets = None
177 for sock in sockets:
178 self._loop._stop_serving(sock)
179 if self._active_count == 0:
180 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181
182 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200183 waiters = self._waiters
184 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 for waiter in waiters:
186 if not waiter.done():
187 waiter.set_result(waiter)
188
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200189 async def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200190 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700191 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400192 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200193 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200194 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700195
196
197class BaseEventLoop(events.AbstractEventLoop):
198
199 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400200 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200201 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800202 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 self._ready = collections.deque()
204 self._scheduled = []
205 self._default_executor = None
206 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100207 # Identifier of the thread running the event loop, or None if the
208 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100209 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100210 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500211 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800212 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200213 # In debug mode, if the execution of a callback or a step of a task
214 # exceed this duration in seconds, the slow callback/task is logged.
215 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100216 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400217 self._task_factory = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400218 self._coroutine_wrapper_set = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219
Yury Selivanov0a91d482016-09-15 13:24:03 -0400220 if hasattr(sys, 'get_asyncgen_hooks'):
221 # Python >= 3.6
222 # A weak set of all asynchronous generators that are
223 # being iterated by the loop.
224 self._asyncgens = weakref.WeakSet()
225 else:
226 self._asyncgens = None
Yury Selivanoveb636452016-09-08 22:01:51 -0700227
228 # Set to True when `loop.shutdown_asyncgens` is called.
229 self._asyncgens_shutdown_called = False
230
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200231 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500232 return (
233 f'<{self.__class__.__name__} running={self.is_running()} '
234 f'closed={self.is_closed()} debug={self.get_debug()}>'
235 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200236
Yury Selivanov7661db62016-05-16 15:38:39 -0400237 def create_future(self):
238 """Create a Future object attached to the loop."""
239 return futures.Future(loop=self)
240
Victor Stinner896a25a2014-07-08 11:29:25 +0200241 def create_task(self, coro):
242 """Schedule a coroutine object.
243
Victor Stinneracdb7822014-07-14 18:33:40 +0200244 Return a task object.
245 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100246 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400247 if self._task_factory is None:
248 task = tasks.Task(coro, loop=self)
249 if task._source_traceback:
250 del task._source_traceback[-1]
251 else:
252 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200253 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200254
Yury Selivanov740169c2015-05-11 14:23:38 -0400255 def set_task_factory(self, factory):
256 """Set a task factory that will be used by loop.create_task().
257
258 If factory is None the default task factory will be set.
259
260 If factory is a callable, it should have a signature matching
261 '(loop, coro)', where 'loop' will be a reference to the active
262 event loop, 'coro' will be a coroutine object. The callable
263 must return a Future.
264 """
265 if factory is not None and not callable(factory):
266 raise TypeError('task factory must be a callable or None')
267 self._task_factory = factory
268
269 def get_task_factory(self):
270 """Return a task factory, or None if the default one is in use."""
271 return self._task_factory
272
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 def _make_socket_transport(self, sock, protocol, waiter=None, *,
274 extra=None, server=None):
275 """Create socket transport."""
276 raise NotImplementedError
277
Neil Aspinallf7686c12017-12-19 19:45:42 +0000278 def _make_ssl_transport(
279 self, rawsock, protocol, sslcontext, waiter=None,
280 *, server_side=False, server_hostname=None,
281 extra=None, server=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200282 ssl_handshake_timeout=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 """Create SSL transport."""
284 raise NotImplementedError
285
286 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200287 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 """Create datagram transport."""
289 raise NotImplementedError
290
291 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
292 extra=None):
293 """Create read pipe transport."""
294 raise NotImplementedError
295
296 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
297 extra=None):
298 """Create write pipe transport."""
299 raise NotImplementedError
300
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200301 async def _make_subprocess_transport(self, protocol, args, shell,
302 stdin, stdout, stderr, bufsize,
303 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 """Create subprocess transport."""
305 raise NotImplementedError
306
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200308 """Write a byte to self-pipe, to wake up the event loop.
309
310 This may be called from a different thread.
311
312 The subclass is responsible for implementing the self-pipe.
313 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 raise NotImplementedError
315
316 def _process_events(self, event_list):
317 """Process selector events."""
318 raise NotImplementedError
319
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200320 def _check_closed(self):
321 if self._closed:
322 raise RuntimeError('Event loop is closed')
323
Yury Selivanoveb636452016-09-08 22:01:51 -0700324 def _asyncgen_finalizer_hook(self, agen):
325 self._asyncgens.discard(agen)
326 if not self.is_closed():
327 self.create_task(agen.aclose())
Yury Selivanoved054062016-10-21 17:13:40 -0400328 # Wake up the loop if the finalizer was called from
329 # a different thread.
330 self._write_to_self()
Yury Selivanoveb636452016-09-08 22:01:51 -0700331
332 def _asyncgen_firstiter_hook(self, agen):
333 if self._asyncgens_shutdown_called:
334 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500335 f"asynchronous generator {agen!r} was scheduled after "
336 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700337 ResourceWarning, source=self)
338
339 self._asyncgens.add(agen)
340
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200341 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700342 """Shutdown all active asynchronous generators."""
343 self._asyncgens_shutdown_called = True
344
Yury Selivanov0a91d482016-09-15 13:24:03 -0400345 if self._asyncgens is None or not len(self._asyncgens):
346 # If Python version is <3.6 or we don't have any asynchronous
347 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700348 return
349
350 closing_agens = list(self._asyncgens)
351 self._asyncgens.clear()
352
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200353 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700354 *[ag.aclose() for ag in closing_agens],
355 return_exceptions=True,
356 loop=self)
357
Yury Selivanoveb636452016-09-08 22:01:51 -0700358 for result, agen in zip(results, closing_agens):
359 if isinstance(result, Exception):
360 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500361 'message': f'an error occurred during closing of '
362 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700363 'exception': result,
364 'asyncgen': agen
365 })
366
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 def run_forever(self):
368 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200369 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100370 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400371 raise RuntimeError('This event loop is already running')
372 if events._get_running_loop() is not None:
373 raise RuntimeError(
374 'Cannot run the event loop while another loop is running')
Yury Selivanove8944cb2015-05-12 11:43:04 -0400375 self._set_coroutine_wrapper(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100376 self._thread_id = threading.get_ident()
Yury Selivanov0a91d482016-09-15 13:24:03 -0400377 if self._asyncgens is not None:
378 old_agen_hooks = sys.get_asyncgen_hooks()
379 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
380 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400382 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800384 self._run_once()
385 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 break
387 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800388 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100389 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400390 events._set_running_loop(None)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400391 self._set_coroutine_wrapper(False)
Yury Selivanov0a91d482016-09-15 13:24:03 -0400392 if self._asyncgens is not None:
393 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395 def run_until_complete(self, future):
396 """Run until the Future is done.
397
398 If the argument is a coroutine, it is wrapped in a Task.
399
Victor Stinneracdb7822014-07-14 18:33:40 +0200400 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 with the same coroutine twice -- it would wrap it in two
402 different Tasks and that can't be good.
403
404 Return the Future's result, or raise its exception.
405 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200406 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200407
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700408 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400409 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200410 if new_task:
411 # An exception is raised if the future didn't complete, so there
412 # is no need to log the "destroy pending task" message
413 future._log_destroy_pending = False
414
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100415 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200416 try:
417 self.run_forever()
418 except:
419 if new_task and future.done() and not future.cancelled():
420 # The coroutine raised a BaseException. Consume the exception
421 # to not log a warning, the caller doesn't have access to the
422 # local task.
423 future.exception()
424 raise
jimmylai21b3e042017-05-22 22:32:46 -0700425 finally:
426 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 if not future.done():
428 raise RuntimeError('Event loop stopped before Future completed.')
429
430 return future.result()
431
432 def stop(self):
433 """Stop running the event loop.
434
Guido van Rossum41f69f42015-11-19 13:28:47 -0800435 Every callback already scheduled will still run. This simply informs
436 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800438 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200440 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700441 """Close the event loop.
442
443 This clears the queues and shuts down the executor,
444 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200445
446 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700447 """
Victor Stinner956de692014-12-26 21:07:52 +0100448 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200449 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200450 if self._closed:
451 return
Victor Stinnere912e652014-07-12 03:11:53 +0200452 if self._debug:
453 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400454 self._closed = True
455 self._ready.clear()
456 self._scheduled.clear()
457 executor = self._default_executor
458 if executor is not None:
459 self._default_executor = None
460 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200461
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200462 def is_closed(self):
463 """Returns True if the event loop was closed."""
464 return self._closed
465
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900466 def __del__(self):
467 if not self.is_closed():
Yury Selivanov6370f342017-12-10 18:36:12 -0500468 warnings.warn(f"unclosed event loop {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900469 source=self)
470 if not self.is_running():
471 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100472
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200474 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100475 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
477 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200478 """Return the time according to the event loop's clock.
479
480 This is a float expressed in seconds since an epoch, but the
481 epoch, precision, accuracy and drift are unspecified and may
482 differ per event loop.
483 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 return time.monotonic()
485
486 def call_later(self, delay, callback, *args):
487 """Arrange for a callback to be called at a given time.
488
489 Return a Handle: an opaque object with a cancel() method that
490 can be used to cancel the call.
491
492 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200493 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494
495 Each callback will be called exactly once. If two callbacks
496 are scheduled for exactly the same time, it undefined which
497 will be called first.
498
499 Any positional arguments after the callback will be passed to
500 the callback when it is called.
501 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200502 timer = self.call_at(self.time() + delay, callback, *args)
503 if timer._source_traceback:
504 del timer._source_traceback[-1]
505 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506
507 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200508 """Like call_later(), but uses an absolute time.
509
510 Absolute time corresponds to the event loop's time() method.
511 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100512 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100513 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100514 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700515 self._check_callback(callback, 'call_at')
Yury Selivanov569efa22014-02-18 18:02:19 -0500516 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200517 if timer._source_traceback:
518 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400520 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 return timer
522
523 def call_soon(self, callback, *args):
524 """Arrange for a callback to be called as soon as possible.
525
Victor Stinneracdb7822014-07-14 18:33:40 +0200526 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 order in which they are registered. Each callback will be
528 called exactly once.
529
530 Any positional arguments after the callback will be passed to
531 the callback when it is called.
532 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700533 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100534 if self._debug:
535 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700536 self._check_callback(callback, 'call_soon')
Victor Stinner956de692014-12-26 21:07:52 +0100537 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200538 if handle._source_traceback:
539 del handle._source_traceback[-1]
540 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100541
Yury Selivanov491a9122016-11-03 15:09:24 -0700542 def _check_callback(self, callback, method):
543 if (coroutines.iscoroutine(callback) or
544 coroutines.iscoroutinefunction(callback)):
545 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500546 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700547 if not callable(callback):
548 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500549 f'a callable object was expected by {method}(), '
550 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700551
Victor Stinner956de692014-12-26 21:07:52 +0100552 def _call_soon(self, callback, args):
Yury Selivanov569efa22014-02-18 18:02:19 -0500553 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200554 if handle._source_traceback:
555 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._ready.append(handle)
557 return handle
558
Victor Stinner956de692014-12-26 21:07:52 +0100559 def _check_thread(self):
560 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100561
Victor Stinneracdb7822014-07-14 18:33:40 +0200562 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100563 likely behave incorrectly when the assumption is violated.
564
Victor Stinneracdb7822014-07-14 18:33:40 +0200565 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100566 responsible for checking this condition for performance reasons.
567 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100568 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200569 return
Victor Stinner956de692014-12-26 21:07:52 +0100570 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100571 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100572 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200573 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100574 "than the current one")
575
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200577 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700578 self._check_closed()
579 if self._debug:
580 self._check_callback(callback, 'call_soon_threadsafe')
Victor Stinner956de692014-12-26 21:07:52 +0100581 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200582 if handle._source_traceback:
583 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584 self._write_to_self()
585 return handle
586
Yury Selivanov19a44f62017-12-14 20:53:26 -0500587 async def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100588 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700589 if self._debug:
590 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 if executor is None:
592 executor = self._default_executor
593 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400594 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595 self._default_executor = executor
Yury Selivanov19a44f62017-12-14 20:53:26 -0500596 return await futures.wrap_future(
597 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598
599 def set_default_executor(self, executor):
600 self._default_executor = executor
601
Victor Stinnere912e652014-07-12 03:11:53 +0200602 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500603 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200604 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500605 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200606 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500607 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200608 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500609 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200610 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500611 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200612 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200613 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200614
615 t0 = self.time()
616 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
617 dt = self.time() - t0
618
Yury Selivanov6370f342017-12-10 18:36:12 -0500619 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200620 if dt >= self.slow_callback_duration:
621 logger.info(msg)
622 else:
623 logger.debug(msg)
624 return addrinfo
625
Yury Selivanov19a44f62017-12-14 20:53:26 -0500626 async def getaddrinfo(self, host, port, *,
627 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400628 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500629 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200630 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500631 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632
Yury Selivanov19a44f62017-12-14 20:53:26 -0500633 return await self.run_in_executor(
634 None, getaddr_func, host, port, family, type, proto, flags)
635
636 async def getnameinfo(self, sockaddr, flags=0):
637 return await self.run_in_executor(
638 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639
Neil Aspinallf7686c12017-12-19 19:45:42 +0000640 async def create_connection(
641 self, protocol_factory, host=None, port=None,
642 *, ssl=None, family=0,
643 proto=0, flags=0, sock=None,
644 local_addr=None, server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200645 ssl_handshake_timeout=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200646 """Connect to a TCP server.
647
648 Create a streaming transport connection to a given Internet host and
649 port: socket family AF_INET or socket.AF_INET6 depending on host (or
650 family if specified), socket type SOCK_STREAM. protocol_factory must be
651 a callable returning a protocol instance.
652
653 This method is a coroutine which will try to establish the connection
654 in the background. When successful, the coroutine returns a
655 (transport, protocol) pair.
656 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700657 if server_hostname is not None and not ssl:
658 raise ValueError('server_hostname is only meaningful with ssl')
659
660 if server_hostname is None and ssl:
661 # Use host as default for server_hostname. It is an error
662 # if host is empty or not set, e.g. when an
663 # already-connected socket was passed or when only a port
664 # is given. To avoid this error, you can pass
665 # server_hostname='' -- this will bypass the hostname
666 # check. (This also means that if host is a numeric
667 # IP/IPv6 address, we will attempt to verify that exact
668 # address; this will probably fail, but it is possible to
669 # create a certificate for a specific IP address, so we
670 # don't judge it here.)
671 if not host:
672 raise ValueError('You must set server_hostname '
673 'when using ssl without a host')
674 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700675
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200676 if ssl_handshake_timeout is not None and not ssl:
677 raise ValueError(
678 'ssl_handshake_timeout is only meaningful with ssl')
679
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 if host is not None or port is not None:
681 if sock is not None:
682 raise ValueError(
683 'host/port and sock can not be specified at the same time')
684
Yury Selivanov19a44f62017-12-14 20:53:26 -0500685 infos = await self._ensure_resolved(
686 (host, port), family=family,
687 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 if not infos:
689 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500690
691 if local_addr is not None:
692 laddr_infos = await self._ensure_resolved(
693 local_addr, family=family,
694 type=socket.SOCK_STREAM, proto=proto,
695 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 if not laddr_infos:
697 raise OSError('getaddrinfo() returned empty list')
698
699 exceptions = []
700 for family, type, proto, cname, address in infos:
701 try:
702 sock = socket.socket(family=family, type=type, proto=proto)
703 sock.setblocking(False)
Yury Selivanov19a44f62017-12-14 20:53:26 -0500704 if local_addr is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 for _, _, _, _, laddr in laddr_infos:
706 try:
707 sock.bind(laddr)
708 break
709 except OSError as exc:
Yury Selivanov6370f342017-12-10 18:36:12 -0500710 msg = (
711 f'error while attempting to bind on '
712 f'address {laddr!r}: '
713 f'{exc.strerror.lower()}'
714 )
715 exc = OSError(exc.errno, msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716 exceptions.append(exc)
717 else:
718 sock.close()
719 sock = None
720 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200721 if self._debug:
722 logger.debug("connect %r to %r", sock, address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200723 await self.sock_connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 except OSError as exc:
725 if sock is not None:
726 sock.close()
727 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200728 except:
729 if sock is not None:
730 sock.close()
731 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 else:
733 break
734 else:
735 if len(exceptions) == 1:
736 raise exceptions[0]
737 else:
738 # If they all have the same str(), raise one.
739 model = str(exceptions[0])
740 if all(str(exc) == model for exc in exceptions):
741 raise exceptions[0]
742 # Raise a combined exception so the user can see all
743 # the various error messages.
744 raise OSError('Multiple exceptions: {}'.format(
745 ', '.join(str(exc) for exc in exceptions)))
746
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500747 else:
748 if sock is None:
749 raise ValueError(
750 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500751 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -0500752 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
753 # are SOCK_STREAM.
754 # We support passing AF_UNIX sockets even though we have
755 # a dedicated API for that: create_unix_connection.
756 # Disallowing AF_UNIX in this method, breaks backwards
757 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500758 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500759 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700760
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200761 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000762 sock, protocol_factory, ssl, server_hostname,
763 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +0200764 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200765 # Get the socket from the transport because SSL transport closes
766 # the old socket and creates a new SSL socket
767 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200768 logger.debug("%r connected to %s:%r: (%r, %r)",
769 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500770 return transport, protocol
771
Neil Aspinallf7686c12017-12-19 19:45:42 +0000772 async def _create_connection_transport(
773 self, sock, protocol_factory, ssl,
774 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200775 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -0400776
777 sock.setblocking(False)
778
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700779 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400780 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 if ssl:
782 sslcontext = None if isinstance(ssl, bool) else ssl
783 transport = self._make_ssl_transport(
784 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +0000785 server_side=server_side, server_hostname=server_hostname,
786 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787 else:
788 transport = self._make_socket_transport(sock, protocol, waiter)
789
Victor Stinner29ad0112015-01-15 00:04:21 +0100790 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200791 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100792 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100793 transport.close()
794 raise
795
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700796 return transport, protocol
797
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200798 async def create_datagram_endpoint(self, protocol_factory,
799 local_addr=None, remote_addr=None, *,
800 family=0, proto=0, flags=0,
801 reuse_address=None, reuse_port=None,
802 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700803 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700804 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500805 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500806 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500807 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700808 if (local_addr or remote_addr or
809 family or proto or flags or
810 reuse_address or reuse_port or allow_broadcast):
811 # show the problematic kwargs in exception msg
812 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
813 family=family, proto=proto, flags=flags,
814 reuse_address=reuse_address, reuse_port=reuse_port,
815 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -0500816 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700817 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500818 f'socket modifier keyword arguments can not be used '
819 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700820 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700821 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700822 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700823 if not (local_addr or remote_addr):
824 if family == 0:
825 raise ValueError('unexpected address family')
826 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +0100827 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
828 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +0100829 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +0100830 raise TypeError('string is expected')
831 addr_pairs_info = (((family, proto),
832 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700833 else:
834 # join address by (family, protocol)
835 addr_infos = collections.OrderedDict()
836 for idx, addr in ((0, local_addr), (1, remote_addr)):
837 if addr is not None:
838 assert isinstance(addr, tuple) and len(addr) == 2, (
839 '2-tuple is expected')
840
Yury Selivanov19a44f62017-12-14 20:53:26 -0500841 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400842 addr, family=family, type=socket.SOCK_DGRAM,
843 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700844 if not infos:
845 raise OSError('getaddrinfo() returned empty list')
846
847 for fam, _, pro, _, address in infos:
848 key = (fam, pro)
849 if key not in addr_infos:
850 addr_infos[key] = [None, None]
851 addr_infos[key][idx] = address
852
853 # each addr has to have info for each (family, proto) pair
854 addr_pairs_info = [
855 (key, addr_pair) for key, addr_pair in addr_infos.items()
856 if not ((local_addr and addr_pair[0] is None) or
857 (remote_addr and addr_pair[1] is None))]
858
859 if not addr_pairs_info:
860 raise ValueError('can not get address information')
861
862 exceptions = []
863
864 if reuse_address is None:
865 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
866
867 for ((family, proto),
868 (local_address, remote_address)) in addr_pairs_info:
869 sock = None
870 r_addr = None
871 try:
872 sock = socket.socket(
873 family=family, type=socket.SOCK_DGRAM, proto=proto)
874 if reuse_address:
875 sock.setsockopt(
876 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
877 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -0400878 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700879 if allow_broadcast:
880 sock.setsockopt(
881 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
882 sock.setblocking(False)
883
884 if local_addr:
885 sock.bind(local_address)
886 if remote_addr:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200887 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700888 r_addr = remote_address
889 except OSError as exc:
890 if sock is not None:
891 sock.close()
892 exceptions.append(exc)
893 except:
894 if sock is not None:
895 sock.close()
896 raise
897 else:
898 break
899 else:
900 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700901
902 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400903 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700904 transport = self._make_datagram_transport(
905 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200906 if self._debug:
907 if local_addr:
908 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
909 "created: (%r, %r)",
910 local_addr, remote_addr, transport, protocol)
911 else:
912 logger.debug("Datagram endpoint remote_addr=%r created: "
913 "(%r, %r)",
914 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100915
916 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200917 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +0100918 except:
919 transport.close()
920 raise
921
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700922 return transport, protocol
923
Yury Selivanov19a44f62017-12-14 20:53:26 -0500924 async def _ensure_resolved(self, address, *,
925 family=0, type=socket.SOCK_STREAM,
926 proto=0, flags=0, loop):
927 host, port = address[:2]
928 info = _ipaddr_info(host, port, family, type, proto)
929 if info is not None:
930 # "host" is already a resolved IP.
931 return [info]
932 else:
933 return await loop.getaddrinfo(host, port, family=family, type=type,
934 proto=proto, flags=flags)
935
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200936 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -0500937 infos = await self._ensure_resolved((host, port), family=family,
938 type=socket.SOCK_STREAM,
939 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200940 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -0500941 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200942 return infos
943
Neil Aspinallf7686c12017-12-19 19:45:42 +0000944 async def create_server(
945 self, protocol_factory, host=None, port=None,
946 *,
947 family=socket.AF_UNSPEC,
948 flags=socket.AI_PASSIVE,
949 sock=None,
950 backlog=100,
951 ssl=None,
952 reuse_address=None,
953 reuse_port=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200954 ssl_handshake_timeout=None):
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200955 """Create a TCP server.
956
Yury Selivanov6370f342017-12-10 18:36:12 -0500957 The host parameter can be a string, in that case the TCP server is
958 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200959
960 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -0500961 the TCP server is bound to all hosts of the sequence. If a host
962 appears multiple times (possibly indirectly e.g. when hostnames
963 resolve to the same IP address), the server is only bound once to that
964 host.
Victor Stinnerd1432092014-06-19 17:11:49 +0200965
Victor Stinneracdb7822014-07-14 18:33:40 +0200966 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200967
968 This method is a coroutine.
969 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700970 if isinstance(ssl, bool):
971 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200972
973 if ssl_handshake_timeout is not None and ssl is None:
974 raise ValueError(
975 'ssl_handshake_timeout is only meaningful with ssl')
976
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700977 if host is not None or port is not None:
978 if sock is not None:
979 raise ValueError(
980 'host/port and sock can not be specified at the same time')
981
982 AF_INET6 = getattr(socket, 'AF_INET6', 0)
983 if reuse_address is None:
984 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
985 sockets = []
986 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200987 hosts = [None]
988 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +0300989 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200990 hosts = [host]
991 else:
992 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700993
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200994 fs = [self._create_server_getaddrinfo(host, port, family=family,
995 flags=flags)
996 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200997 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -0500998 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700999
1000 completed = False
1001 try:
1002 for res in infos:
1003 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001004 try:
1005 sock = socket.socket(af, socktype, proto)
1006 except socket.error:
1007 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001008 if self._debug:
1009 logger.warning('create_server() failed to create '
1010 'socket.socket(%r, %r, %r)',
1011 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001012 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001013 sockets.append(sock)
1014 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001015 sock.setsockopt(
1016 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1017 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001018 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001019 # Disable IPv4/IPv6 dual stack support (enabled by
1020 # default on Linux) which makes a single socket
1021 # listen on both address families.
1022 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
1023 sock.setsockopt(socket.IPPROTO_IPV6,
1024 socket.IPV6_V6ONLY,
1025 True)
1026 try:
1027 sock.bind(sa)
1028 except OSError as err:
1029 raise OSError(err.errno, 'error while attempting '
1030 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001031 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001032 completed = True
1033 finally:
1034 if not completed:
1035 for sock in sockets:
1036 sock.close()
1037 else:
1038 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001039 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001040 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001041 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001042 sockets = [sock]
1043
1044 server = Server(self, sockets)
1045 for sock in sockets:
1046 sock.listen(backlog)
1047 sock.setblocking(False)
Neil Aspinallf7686c12017-12-19 19:45:42 +00001048 self._start_serving(protocol_factory, sock, ssl, server, backlog,
1049 ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +02001050 if self._debug:
1051 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001052 return server
1053
Neil Aspinallf7686c12017-12-19 19:45:42 +00001054 async def connect_accepted_socket(
1055 self, protocol_factory, sock,
1056 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001057 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001058 """Handle an accepted connection.
1059
1060 This is used by servers that accept connections outside of
1061 asyncio but that use asyncio to handle connections.
1062
1063 This method is a coroutine. When completed, the coroutine
1064 returns a (transport, protocol) pair.
1065 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001066 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001067 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001068
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001069 if ssl_handshake_timeout is not None and not ssl:
1070 raise ValueError(
1071 'ssl_handshake_timeout is only meaningful with ssl')
1072
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001073 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001074 sock, protocol_factory, ssl, '', server_side=True,
1075 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001076 if self._debug:
1077 # Get the socket from the transport because SSL transport closes
1078 # the old socket and creates a new SSL socket
1079 sock = transport.get_extra_info('socket')
1080 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1081 return transport, protocol
1082
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001083 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001084 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001085 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001086 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001087
1088 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001089 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001090 except:
1091 transport.close()
1092 raise
1093
Victor Stinneracdb7822014-07-14 18:33:40 +02001094 if self._debug:
1095 logger.debug('Read pipe %r connected: (%r, %r)',
1096 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001097 return transport, protocol
1098
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001099 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001100 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001101 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001102 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001103
1104 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001105 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001106 except:
1107 transport.close()
1108 raise
1109
Victor Stinneracdb7822014-07-14 18:33:40 +02001110 if self._debug:
1111 logger.debug('Write pipe %r connected: (%r, %r)',
1112 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001113 return transport, protocol
1114
Victor Stinneracdb7822014-07-14 18:33:40 +02001115 def _log_subprocess(self, msg, stdin, stdout, stderr):
1116 info = [msg]
1117 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001118 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001119 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001120 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001121 else:
1122 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001123 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001124 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001125 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001126 logger.debug(' '.join(info))
1127
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001128 async def subprocess_shell(self, protocol_factory, cmd, *,
1129 stdin=subprocess.PIPE,
1130 stdout=subprocess.PIPE,
1131 stderr=subprocess.PIPE,
1132 universal_newlines=False,
1133 shell=True, bufsize=0,
1134 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001135 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001136 raise ValueError("cmd must be a string")
1137 if universal_newlines:
1138 raise ValueError("universal_newlines must be False")
1139 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001140 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001141 if bufsize != 0:
1142 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001143 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001144 if self._debug:
1145 # don't log parameters: they may contain sensitive information
1146 # (password) and may be too long
1147 debug_log = 'run shell command %r' % cmd
1148 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001149 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001150 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001151 if self._debug:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001152 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001153 return transport, protocol
1154
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001155 async def subprocess_exec(self, protocol_factory, program, *args,
1156 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1157 stderr=subprocess.PIPE, universal_newlines=False,
1158 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001159 if universal_newlines:
1160 raise ValueError("universal_newlines must be False")
1161 if shell:
1162 raise ValueError("shell must be False")
1163 if bufsize != 0:
1164 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001165 popen_args = (program,) + args
1166 for arg in popen_args:
1167 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001168 raise TypeError(
1169 f"program arguments must be a bytes or text string, "
1170 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001171 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001172 if self._debug:
1173 # don't log parameters: they may contain sensitive information
1174 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001175 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001176 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001177 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001178 protocol, popen_args, False, stdin, stdout, stderr,
1179 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001180 if self._debug:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001181 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001182 return transport, protocol
1183
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001184 def get_exception_handler(self):
1185 """Return an exception handler, or None if the default one is in use.
1186 """
1187 return self._exception_handler
1188
Yury Selivanov569efa22014-02-18 18:02:19 -05001189 def set_exception_handler(self, handler):
1190 """Set handler as the new event loop exception handler.
1191
1192 If handler is None, the default exception handler will
1193 be set.
1194
1195 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001196 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001197 will be a reference to the active event loop, 'context'
1198 will be a dict object (see `call_exception_handler()`
1199 documentation for details about context).
1200 """
1201 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001202 raise TypeError(f'A callable object or None is expected, '
1203 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001204 self._exception_handler = handler
1205
1206 def default_exception_handler(self, context):
1207 """Default exception handler.
1208
1209 This is called when an exception occurs and no exception
1210 handler is set, and can be called by a custom exception
1211 handler that wants to defer to the default behavior.
1212
Antoine Pitrou921e9432017-11-07 17:23:29 +01001213 This default handler logs the error message and other
1214 context-dependent information. In debug mode, a truncated
1215 stack trace is also appended showing where the given object
1216 (e.g. a handle or future or task) was created, if any.
1217
Victor Stinneracdb7822014-07-14 18:33:40 +02001218 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001219 `call_exception_handler()`.
1220 """
1221 message = context.get('message')
1222 if not message:
1223 message = 'Unhandled exception in event loop'
1224
1225 exception = context.get('exception')
1226 if exception is not None:
1227 exc_info = (type(exception), exception, exception.__traceback__)
1228 else:
1229 exc_info = False
1230
Yury Selivanov6370f342017-12-10 18:36:12 -05001231 if ('source_traceback' not in context and
1232 self._current_handle is not None and
1233 self._current_handle._source_traceback):
1234 context['handle_traceback'] = \
1235 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001236
Yury Selivanov569efa22014-02-18 18:02:19 -05001237 log_lines = [message]
1238 for key in sorted(context):
1239 if key in {'message', 'exception'}:
1240 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001241 value = context[key]
1242 if key == 'source_traceback':
1243 tb = ''.join(traceback.format_list(value))
1244 value = 'Object created at (most recent call last):\n'
1245 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001246 elif key == 'handle_traceback':
1247 tb = ''.join(traceback.format_list(value))
1248 value = 'Handle created at (most recent call last):\n'
1249 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001250 else:
1251 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001252 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001253
1254 logger.error('\n'.join(log_lines), exc_info=exc_info)
1255
1256 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001257 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001258
Victor Stinneracdb7822014-07-14 18:33:40 +02001259 The context argument is a dict containing the following keys:
1260
Yury Selivanov569efa22014-02-18 18:02:19 -05001261 - 'message': Error message;
1262 - 'exception' (optional): Exception object;
1263 - 'future' (optional): Future instance;
1264 - 'handle' (optional): Handle instance;
1265 - 'protocol' (optional): Protocol instance;
1266 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001267 - 'socket' (optional): Socket instance;
1268 - 'asyncgen' (optional): Asynchronous generator that caused
1269 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001270
Victor Stinneracdb7822014-07-14 18:33:40 +02001271 New keys maybe introduced in the future.
1272
1273 Note: do not overload this method in an event loop subclass.
1274 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001275 `set_exception_handler()` method.
1276 """
1277 if self._exception_handler is None:
1278 try:
1279 self.default_exception_handler(context)
1280 except Exception:
1281 # Second protection layer for unexpected errors
1282 # in the default implementation, as well as for subclassed
1283 # event loops with overloaded "default_exception_handler".
1284 logger.error('Exception in default exception handler',
1285 exc_info=True)
1286 else:
1287 try:
1288 self._exception_handler(self, context)
1289 except Exception as exc:
1290 # Exception in the user set custom exception handler.
1291 try:
1292 # Let's try default handler.
1293 self.default_exception_handler({
1294 'message': 'Unhandled error in exception handler',
1295 'exception': exc,
1296 'context': context,
1297 })
1298 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001299 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001300 # overloaded.
1301 logger.error('Exception in default exception handler '
1302 'while handling an unexpected error '
1303 'in custom exception handler',
1304 exc_info=True)
1305
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001306 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001307 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001308 assert isinstance(handle, events.Handle), 'A Handle is required here'
1309 if handle._cancelled:
1310 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001311 assert not isinstance(handle, events.TimerHandle)
1312 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001313
1314 def _add_callback_signalsafe(self, handle):
1315 """Like _add_callback() but called from a signal handler."""
1316 self._add_callback(handle)
1317 self._write_to_self()
1318
Yury Selivanov592ada92014-09-25 12:07:56 -04001319 def _timer_handle_cancelled(self, handle):
1320 """Notification that a TimerHandle has been cancelled."""
1321 if handle._scheduled:
1322 self._timer_cancelled_count += 1
1323
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001324 def _run_once(self):
1325 """Run one full iteration of the event loop.
1326
1327 This calls all currently ready callbacks, polls for I/O,
1328 schedules the resulting callbacks, and finally schedules
1329 'call_later' callbacks.
1330 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001331
Yury Selivanov592ada92014-09-25 12:07:56 -04001332 sched_count = len(self._scheduled)
1333 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1334 self._timer_cancelled_count / sched_count >
1335 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001336 # Remove delayed calls that were cancelled if their number
1337 # is too high
1338 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001339 for handle in self._scheduled:
1340 if handle._cancelled:
1341 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001342 else:
1343 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001344
Victor Stinner68da8fc2014-09-30 18:08:36 +02001345 heapq.heapify(new_scheduled)
1346 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001347 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001348 else:
1349 # Remove delayed calls that were cancelled from head of queue.
1350 while self._scheduled and self._scheduled[0]._cancelled:
1351 self._timer_cancelled_count -= 1
1352 handle = heapq.heappop(self._scheduled)
1353 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001354
1355 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001356 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001357 timeout = 0
1358 elif self._scheduled:
1359 # Compute the desired timeout.
1360 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001361 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001362
Victor Stinner770e48d2014-07-11 11:58:33 +02001363 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001364 t0 = self.time()
1365 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001366 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001367 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001368 level = logging.INFO
1369 else:
1370 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001371 nevent = len(event_list)
1372 if timeout is None:
1373 logger.log(level, 'poll took %.3f ms: %s events',
1374 dt * 1e3, nevent)
1375 elif nevent:
1376 logger.log(level,
1377 'poll %.3f ms took %.3f ms: %s events',
1378 timeout * 1e3, dt * 1e3, nevent)
1379 elif dt >= 1.0:
1380 logger.log(level,
1381 'poll %.3f ms took %.3f ms: timeout',
1382 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001383 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001384 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001385 self._process_events(event_list)
1386
1387 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001388 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001389 while self._scheduled:
1390 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001391 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001392 break
1393 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001394 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001395 self._ready.append(handle)
1396
1397 # This is the only place where callbacks are actually *called*.
1398 # All other places just add them to ready.
1399 # Note: We run all currently scheduled callbacks, but not any
1400 # callbacks scheduled by callbacks run this time around --
1401 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001402 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001403 ntodo = len(self._ready)
1404 for i in range(ntodo):
1405 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001406 if handle._cancelled:
1407 continue
1408 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001409 try:
1410 self._current_handle = handle
1411 t0 = self.time()
1412 handle._run()
1413 dt = self.time() - t0
1414 if dt >= self.slow_callback_duration:
1415 logger.warning('Executing %s took %.3f seconds',
1416 _format_handle(handle), dt)
1417 finally:
1418 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001419 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001420 handle._run()
1421 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001422
Yury Selivanove8944cb2015-05-12 11:43:04 -04001423 def _set_coroutine_wrapper(self, enabled):
1424 try:
1425 set_wrapper = sys.set_coroutine_wrapper
1426 get_wrapper = sys.get_coroutine_wrapper
1427 except AttributeError:
1428 return
1429
1430 enabled = bool(enabled)
Yury Selivanov996083d2015-08-04 15:37:24 -04001431 if self._coroutine_wrapper_set == enabled:
Yury Selivanove8944cb2015-05-12 11:43:04 -04001432 return
1433
1434 wrapper = coroutines.debug_wrapper
1435 current_wrapper = get_wrapper()
1436
1437 if enabled:
1438 if current_wrapper not in (None, wrapper):
1439 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -05001440 f"loop.set_debug(True): cannot set debug coroutine "
1441 f"wrapper; another wrapper is already set "
1442 f"{current_wrapper!r}",
1443 RuntimeWarning)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001444 else:
1445 set_wrapper(wrapper)
1446 self._coroutine_wrapper_set = True
1447 else:
1448 if current_wrapper not in (None, wrapper):
1449 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -05001450 f"loop.set_debug(False): cannot unset debug coroutine "
1451 f"wrapper; another wrapper was set {current_wrapper!r}",
1452 RuntimeWarning)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001453 else:
1454 set_wrapper(None)
1455 self._coroutine_wrapper_set = False
1456
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001457 def get_debug(self):
1458 return self._debug
1459
1460 def set_debug(self, enabled):
1461 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001462
Yury Selivanove8944cb2015-05-12 11:43:04 -04001463 if self.is_running():
1464 self._set_coroutine_wrapper(enabled)