blob: e54ee309e42e6f0332f79f32fbcda56b2f2ac1a9 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
twisteroid ambassador88f07a82019-05-05 19:14:35 +080019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020021import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
Quentin Dawans56065d42019-04-09 15:40:59 +020024import stat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010026import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020028import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010030import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070031import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Yury Selivanovf111b3d2017-12-30 00:35:36 -050033try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
37
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080038from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020039from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070041from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020043from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050044from . import sslproto
twisteroid ambassador88f07a82019-05-05 19:14:35 +080045from . import staggered
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020047from . import transports
Yury Selivanov8cd51652019-05-27 15:57:20 +020048from . import trsock
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070049from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov6370f342017-12-10 18:36:12 -050052__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
54
Yury Selivanov592ada92014-09-25 12:07:56 -040055# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
Andrew Svetlov0dd71802018-09-12 14:03:54 -070063
Yury Selivanovd904c232018-06-28 21:59:32 -040064_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
MartinAltmayer944451c2018-07-31 15:06:12 +010066# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
Kyle Stanleyab513a32019-12-09 09:21:10 -050069# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
Victor Stinnerc94a93a2016-04-01 21:43:39 +020073
Victor Stinner0e6f52a2014-06-20 17:34:15 +020074def _format_handle(handle):
75 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040076 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020077 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
81
82
Victor Stinneracdb7822014-07-14 18:33:40 +020083def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
90
91
Yury Selivanov5587d7c2016-09-15 15:45:07 -040092def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
101
102
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
108
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500111 return None
112
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500113 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500114 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500115 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500116 proto = socket.IPPROTO_UDP
117 else:
118 return None
119
Yury Selivanova7146162016-06-02 16:51:07 -0400120 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400121 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400132
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500134 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400135 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500136 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400137 else:
138 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500145 return None
146
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400147 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 socket.inet_pton(af, host)
150 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400151 if _HAS_IPv6 and af == socket.AF_INET6:
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200152 return af, type, proto, '', (host, port, flowinfo, scopeid)
Yury Selivanovd904c232018-06-28 21:59:32 -0400153 else:
154 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400155 except OSError:
156 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500157
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400158 # "host" is not an IP address.
159 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500160
161
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163 """Interleave list of addrinfo tuples by family."""
164 # Group addresses by family
165 addrinfos_by_family = collections.OrderedDict()
166 for addr in addrinfos:
167 family = addr[0]
168 if family not in addrinfos_by_family:
169 addrinfos_by_family[family] = []
170 addrinfos_by_family[family].append(addr)
171 addrinfos_lists = list(addrinfos_by_family.values())
172
173 reordered = []
174 if first_address_family_count > 1:
175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176 del addrinfos_lists[0][:first_address_family_count - 1]
177 reordered.extend(
178 a for a in itertools.chain.from_iterable(
179 itertools.zip_longest(*addrinfos_lists)
180 ) if a is not None)
181 return reordered
182
183
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100184def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500185 if not fut.cancelled():
186 exc = fut.exception()
Yury Selivanov431b5402019-05-27 14:45:12 +0200187 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500188 # Issue #22429: run_forever() already finished, no need to
189 # stop it.
190 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500191 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100192
193
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200194if hasattr(socket, 'TCP_NODELAY'):
195 def _set_nodelay(sock):
196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197 sock.type == socket.SOCK_STREAM and
198 sock.proto == socket.IPPROTO_TCP):
199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201 def _set_nodelay(sock):
202 pass
203
204
Andrew Svetlov7c684072018-01-27 21:22:47 +0200205class _SendfileFallbackProtocol(protocols.Protocol):
206 def __init__(self, transp):
207 if not isinstance(transp, transports._FlowControlMixin):
208 raise TypeError("transport should be _FlowControlMixin instance")
209 self._transport = transp
210 self._proto = transp.get_protocol()
211 self._should_resume_reading = transp.is_reading()
212 self._should_resume_writing = transp._protocol_paused
213 transp.pause_reading()
214 transp.set_protocol(self)
215 if self._should_resume_writing:
216 self._write_ready_fut = self._transport._loop.create_future()
217 else:
218 self._write_ready_fut = None
219
220 async def drain(self):
221 if self._transport.is_closing():
222 raise ConnectionError("Connection closed by peer")
223 fut = self._write_ready_fut
224 if fut is None:
225 return
226 await fut
227
228 def connection_made(self, transport):
229 raise RuntimeError("Invalid state: "
230 "connection should have been established already.")
231
232 def connection_lost(self, exc):
233 if self._write_ready_fut is not None:
234 # Never happens if peer disconnects after sending the whole content
235 # Thus disconnection is always an exception from user perspective
236 if exc is None:
237 self._write_ready_fut.set_exception(
238 ConnectionError("Connection is closed by peer"))
239 else:
240 self._write_ready_fut.set_exception(exc)
241 self._proto.connection_lost(exc)
242
243 def pause_writing(self):
244 if self._write_ready_fut is not None:
245 return
246 self._write_ready_fut = self._transport._loop.create_future()
247
248 def resume_writing(self):
249 if self._write_ready_fut is None:
250 return
251 self._write_ready_fut.set_result(False)
252 self._write_ready_fut = None
253
254 def data_received(self, data):
255 raise RuntimeError("Invalid state: reading should be paused")
256
257 def eof_received(self):
258 raise RuntimeError("Invalid state: reading should be paused")
259
260 async def restore(self):
261 self._transport.set_protocol(self._proto)
262 if self._should_resume_reading:
263 self._transport.resume_reading()
264 if self._write_ready_fut is not None:
265 # Cancel the future.
266 # Basically it has no effect because protocol is switched back,
267 # no code should wait for it anymore.
268 self._write_ready_fut.cancel()
269 if self._should_resume_writing:
270 self._proto.resume_writing()
271
272
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273class Server(events.AbstractServer):
274
Yury Selivanovc9070d02018-01-25 18:08:09 -0500275 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300276 ssl_handshake_timeout, ssl_shutdown_timeout=None):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200277 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500278 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200279 self._active_count = 0
280 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500281 self._protocol_factory = protocol_factory
282 self._backlog = backlog
283 self._ssl_context = ssl_context
284 self._ssl_handshake_timeout = ssl_handshake_timeout
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300285 self._ssl_shutdown_timeout = ssl_shutdown_timeout
Yury Selivanovc9070d02018-01-25 18:08:09 -0500286 self._serving = False
287 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288
Victor Stinnere912e652014-07-12 03:11:53 +0200289 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500290 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200291
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200292 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500293 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200294 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200296 def _detach(self):
297 assert self._active_count > 0
298 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500299 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 self._wakeup()
301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200303 waiters = self._waiters
304 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 for waiter in waiters:
306 if not waiter.done():
307 waiter.set_result(waiter)
308
Yury Selivanovc9070d02018-01-25 18:08:09 -0500309 def _start_serving(self):
310 if self._serving:
311 return
312 self._serving = True
313 for sock in self._sockets:
314 sock.listen(self._backlog)
315 self._loop._start_serving(
316 self._protocol_factory, sock, self._ssl_context,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300317 self, self._backlog, self._ssl_handshake_timeout,
318 self._ssl_shutdown_timeout)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500319
320 def get_loop(self):
321 return self._loop
322
323 def is_serving(self):
324 return self._serving
325
326 @property
327 def sockets(self):
328 if self._sockets is None:
Yury Selivanov8cd51652019-05-27 15:57:20 +0200329 return ()
330 return tuple(trsock.TransportSocket(s) for s in self._sockets)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500331
332 def close(self):
333 sockets = self._sockets
334 if sockets is None:
335 return
336 self._sockets = None
337
338 for sock in sockets:
339 self._loop._stop_serving(sock)
340
341 self._serving = False
342
343 if (self._serving_forever_fut is not None and
344 not self._serving_forever_fut.done()):
345 self._serving_forever_fut.cancel()
346 self._serving_forever_fut = None
347
348 if self._active_count == 0:
349 self._wakeup()
350
351 async def start_serving(self):
352 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400353 # Skip one loop iteration so that all 'loop.add_reader'
354 # go through.
Yurii Karabase4fe3032020-11-28 10:21:17 +0200355 await tasks.sleep(0)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500356
357 async def serve_forever(self):
358 if self._serving_forever_fut is not None:
359 raise RuntimeError(
360 f'server {self!r} is already being awaited on serve_forever()')
361 if self._sockets is None:
362 raise RuntimeError(f'server {self!r} is closed')
363
364 self._start_serving()
365 self._serving_forever_fut = self._loop.create_future()
366
367 try:
368 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700369 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500370 try:
371 self.close()
372 await self.wait_closed()
373 finally:
374 raise
375 finally:
376 self._serving_forever_fut = None
377
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200378 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500379 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400381 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200382 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200383 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384
385
386class BaseEventLoop(events.AbstractEventLoop):
387
388 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400389 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200390 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800391 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 self._ready = collections.deque()
393 self._scheduled = []
394 self._default_executor = None
395 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100396 # Identifier of the thread running the event loop, or None if the
397 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100398 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100399 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500400 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800401 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200402 # In debug mode, if the execution of a callback or a step of a task
403 # exceed this duration in seconds, the slow callback/task is logged.
404 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100405 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400406 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800407 self._coroutine_origin_tracking_enabled = False
408 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500410 # A weak set of all asynchronous generators that are
411 # being iterated by the loop.
412 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700413 # Set to True when `loop.shutdown_asyncgens` is called.
414 self._asyncgens_shutdown_called = False
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400415 # Set to True when `loop.shutdown_default_executor` is called.
416 self._executor_shutdown_called = False
Yury Selivanoveb636452016-09-08 22:01:51 -0700417
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200418 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500419 return (
420 f'<{self.__class__.__name__} running={self.is_running()} '
421 f'closed={self.is_closed()} debug={self.get_debug()}>'
422 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200423
Yury Selivanov7661db62016-05-16 15:38:39 -0400424 def create_future(self):
425 """Create a Future object attached to the loop."""
426 return futures.Future(loop=self)
427
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300428 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200429 """Schedule a coroutine object.
430
Victor Stinneracdb7822014-07-14 18:33:40 +0200431 Return a task object.
432 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100433 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400434 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300435 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400436 if task._source_traceback:
437 del task._source_traceback[-1]
438 else:
439 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300440 tasks._set_task_name(task, name)
441
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200442 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200443
Yury Selivanov740169c2015-05-11 14:23:38 -0400444 def set_task_factory(self, factory):
445 """Set a task factory that will be used by loop.create_task().
446
447 If factory is None the default task factory will be set.
448
449 If factory is a callable, it should have a signature matching
450 '(loop, coro)', where 'loop' will be a reference to the active
451 event loop, 'coro' will be a coroutine object. The callable
452 must return a Future.
453 """
454 if factory is not None and not callable(factory):
455 raise TypeError('task factory must be a callable or None')
456 self._task_factory = factory
457
458 def get_task_factory(self):
459 """Return a task factory, or None if the default one is in use."""
460 return self._task_factory
461
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 def _make_socket_transport(self, sock, protocol, waiter=None, *,
463 extra=None, server=None):
464 """Create socket transport."""
465 raise NotImplementedError
466
Neil Aspinallf7686c12017-12-19 19:45:42 +0000467 def _make_ssl_transport(
468 self, rawsock, protocol, sslcontext, waiter=None,
469 *, server_side=False, server_hostname=None,
470 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500471 ssl_handshake_timeout=None,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300472 ssl_shutdown_timeout=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500473 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 """Create SSL transport."""
475 raise NotImplementedError
476
477 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200478 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 """Create datagram transport."""
480 raise NotImplementedError
481
482 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
483 extra=None):
484 """Create read pipe transport."""
485 raise NotImplementedError
486
487 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
488 extra=None):
489 """Create write pipe transport."""
490 raise NotImplementedError
491
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200492 async def _make_subprocess_transport(self, protocol, args, shell,
493 stdin, stdout, stderr, bufsize,
494 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 """Create subprocess transport."""
496 raise NotImplementedError
497
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200499 """Write a byte to self-pipe, to wake up the event loop.
500
501 This may be called from a different thread.
502
503 The subclass is responsible for implementing the self-pipe.
504 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 raise NotImplementedError
506
507 def _process_events(self, event_list):
508 """Process selector events."""
509 raise NotImplementedError
510
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200511 def _check_closed(self):
512 if self._closed:
513 raise RuntimeError('Event loop is closed')
514
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400515 def _check_default_executor(self):
516 if self._executor_shutdown_called:
517 raise RuntimeError('Executor shutdown has been called')
518
Yury Selivanoveb636452016-09-08 22:01:51 -0700519 def _asyncgen_finalizer_hook(self, agen):
520 self._asyncgens.discard(agen)
521 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800522 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700523
524 def _asyncgen_firstiter_hook(self, agen):
525 if self._asyncgens_shutdown_called:
526 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500527 f"asynchronous generator {agen!r} was scheduled after "
528 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700529 ResourceWarning, source=self)
530
531 self._asyncgens.add(agen)
532
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200533 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700534 """Shutdown all active asynchronous generators."""
535 self._asyncgens_shutdown_called = True
536
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500537 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400538 # If Python version is <3.6 or we don't have any asynchronous
539 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700540 return
541
542 closing_agens = list(self._asyncgens)
543 self._asyncgens.clear()
544
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200545 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700546 *[ag.aclose() for ag in closing_agens],
Yurii Karabase4fe3032020-11-28 10:21:17 +0200547 return_exceptions=True)
Yury Selivanoveb636452016-09-08 22:01:51 -0700548
Yury Selivanoveb636452016-09-08 22:01:51 -0700549 for result, agen in zip(results, closing_agens):
550 if isinstance(result, Exception):
551 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500552 'message': f'an error occurred during closing of '
553 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700554 'exception': result,
555 'asyncgen': agen
556 })
557
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400558 async def shutdown_default_executor(self):
559 """Schedule the shutdown of the default executor."""
560 self._executor_shutdown_called = True
561 if self._default_executor is None:
562 return
563 future = self.create_future()
564 thread = threading.Thread(target=self._do_shutdown, args=(future,))
565 thread.start()
566 try:
567 await future
568 finally:
569 thread.join()
570
571 def _do_shutdown(self, future):
572 try:
573 self._default_executor.shutdown(wait=True)
574 self.call_soon_threadsafe(future.set_result, None)
575 except Exception as ex:
576 self.call_soon_threadsafe(future.set_exception, ex)
577
Andrew Svetlov10ac0cd2020-01-07 15:23:01 +0200578 def _check_running(self):
Victor Stinner956de692014-12-26 21:07:52 +0100579 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400580 raise RuntimeError('This event loop is already running')
581 if events._get_running_loop() is not None:
582 raise RuntimeError(
583 'Cannot run the event loop while another loop is running')
Andrew Svetlov3a5de512020-01-04 11:10:14 +0200584
585 def run_forever(self):
586 """Run until stop() is called."""
587 self._check_closed()
Andrew Svetlov10ac0cd2020-01-07 15:23:01 +0200588 self._check_running()
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800589 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100590 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500591
592 old_agen_hooks = sys.get_asyncgen_hooks()
593 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
594 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400596 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800598 self._run_once()
599 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600 break
601 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800602 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100603 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400604 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800605 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500606 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607
608 def run_until_complete(self, future):
609 """Run until the Future is done.
610
611 If the argument is a coroutine, it is wrapped in a Task.
612
Victor Stinneracdb7822014-07-14 18:33:40 +0200613 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 with the same coroutine twice -- it would wrap it in two
615 different Tasks and that can't be good.
616
617 Return the Future's result, or raise its exception.
618 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200619 self._check_closed()
Andrew Svetlov10ac0cd2020-01-07 15:23:01 +0200620 self._check_running()
Victor Stinner98b63912014-06-30 14:51:04 +0200621
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700622 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400623 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200624 if new_task:
625 # An exception is raised if the future didn't complete, so there
626 # is no need to log the "destroy pending task" message
627 future._log_destroy_pending = False
628
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100629 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200630 try:
631 self.run_forever()
632 except:
633 if new_task and future.done() and not future.cancelled():
634 # The coroutine raised a BaseException. Consume the exception
635 # to not log a warning, the caller doesn't have access to the
636 # local task.
637 future.exception()
638 raise
jimmylai21b3e042017-05-22 22:32:46 -0700639 finally:
640 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 if not future.done():
642 raise RuntimeError('Event loop stopped before Future completed.')
643
644 return future.result()
645
646 def stop(self):
647 """Stop running the event loop.
648
Guido van Rossum41f69f42015-11-19 13:28:47 -0800649 Every callback already scheduled will still run. This simply informs
650 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800652 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200654 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700655 """Close the event loop.
656
657 This clears the queues and shuts down the executor,
658 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200659
660 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700661 """
Victor Stinner956de692014-12-26 21:07:52 +0100662 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200663 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200664 if self._closed:
665 return
Victor Stinnere912e652014-07-12 03:11:53 +0200666 if self._debug:
667 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400668 self._closed = True
669 self._ready.clear()
670 self._scheduled.clear()
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400671 self._executor_shutdown_called = True
Yury Selivanove8944cb2015-05-12 11:43:04 -0400672 executor = self._default_executor
673 if executor is not None:
674 self._default_executor = None
Łukasz Langa7f9a2ae2019-06-04 13:03:20 +0200675 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200676
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200677 def is_closed(self):
678 """Returns True if the event loop was closed."""
679 return self._closed
680
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100681 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900682 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100683 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900684 if not self.is_running():
685 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100686
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200688 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100689 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700690
691 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200692 """Return the time according to the event loop's clock.
693
694 This is a float expressed in seconds since an epoch, but the
695 epoch, precision, accuracy and drift are unspecified and may
696 differ per event loop.
697 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 return time.monotonic()
699
Yury Selivanovf23746a2018-01-22 19:11:18 -0500700 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 """Arrange for a callback to be called at a given time.
702
703 Return a Handle: an opaque object with a cancel() method that
704 can be used to cancel the call.
705
706 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200707 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708
709 Each callback will be called exactly once. If two callbacks
710 are scheduled for exactly the same time, it undefined which
711 will be called first.
712
713 Any positional arguments after the callback will be passed to
714 the callback when it is called.
715 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500716 timer = self.call_at(self.time() + delay, callback, *args,
717 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200718 if timer._source_traceback:
719 del timer._source_traceback[-1]
720 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721
Yury Selivanovf23746a2018-01-22 19:11:18 -0500722 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200723 """Like call_later(), but uses an absolute time.
724
725 Absolute time corresponds to the event loop's time() method.
726 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100727 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100728 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100729 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700730 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500731 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200732 if timer._source_traceback:
733 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400735 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 return timer
737
Yury Selivanovf23746a2018-01-22 19:11:18 -0500738 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700739 """Arrange for a callback to be called as soon as possible.
740
Victor Stinneracdb7822014-07-14 18:33:40 +0200741 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 order in which they are registered. Each callback will be
743 called exactly once.
744
745 Any positional arguments after the callback will be passed to
746 the callback when it is called.
747 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700748 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100749 if self._debug:
750 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700751 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500752 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200753 if handle._source_traceback:
754 del handle._source_traceback[-1]
755 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100756
Yury Selivanov491a9122016-11-03 15:09:24 -0700757 def _check_callback(self, callback, method):
758 if (coroutines.iscoroutine(callback) or
759 coroutines.iscoroutinefunction(callback)):
760 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500761 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700762 if not callable(callback):
763 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500764 f'a callable object was expected by {method}(), '
765 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700766
Yury Selivanovf23746a2018-01-22 19:11:18 -0500767 def _call_soon(self, callback, args, context):
768 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200769 if handle._source_traceback:
770 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700771 self._ready.append(handle)
772 return handle
773
Victor Stinner956de692014-12-26 21:07:52 +0100774 def _check_thread(self):
775 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100776
Victor Stinneracdb7822014-07-14 18:33:40 +0200777 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100778 likely behave incorrectly when the assumption is violated.
779
Victor Stinneracdb7822014-07-14 18:33:40 +0200780 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100781 responsible for checking this condition for performance reasons.
782 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100783 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200784 return
Victor Stinner956de692014-12-26 21:07:52 +0100785 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100786 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100787 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200788 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100789 "than the current one")
790
Yury Selivanovf23746a2018-01-22 19:11:18 -0500791 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200792 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700793 self._check_closed()
794 if self._debug:
795 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500796 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200797 if handle._source_traceback:
798 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700799 self._write_to_self()
800 return handle
801
Yury Selivanovbec23722018-01-28 14:09:40 -0500802 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100803 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700804 if self._debug:
805 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700806 if executor is None:
807 executor = self._default_executor
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400808 # Only check when the default executor is being used
809 self._check_default_executor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700810 if executor is None:
Markus Mohrhard374d9982020-02-28 04:01:47 +0800811 executor = concurrent.futures.ThreadPoolExecutor(
812 thread_name_prefix='asyncio'
813 )
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700814 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500815 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500816 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700817
818 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100819 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
820 warnings.warn(
821 'Using the default executor that is not an instance of '
822 'ThreadPoolExecutor is deprecated and will be prohibited '
823 'in Python 3.9',
824 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700825 self._default_executor = executor
826
Victor Stinnere912e652014-07-12 03:11:53 +0200827 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500828 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200829 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500830 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200831 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500832 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200833 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500834 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200835 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500836 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200837 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200838 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200839
840 t0 = self.time()
841 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
842 dt = self.time() - t0
843
Yury Selivanov6370f342017-12-10 18:36:12 -0500844 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200845 if dt >= self.slow_callback_duration:
846 logger.info(msg)
847 else:
848 logger.debug(msg)
849 return addrinfo
850
Yury Selivanov19a44f62017-12-14 20:53:26 -0500851 async def getaddrinfo(self, host, port, *,
852 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400853 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500854 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200855 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500856 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700857
Yury Selivanov19a44f62017-12-14 20:53:26 -0500858 return await self.run_in_executor(
859 None, getaddr_func, host, port, family, type, proto, flags)
860
861 async def getnameinfo(self, sockaddr, flags=0):
862 return await self.run_in_executor(
863 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700864
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200865 async def sock_sendfile(self, sock, file, offset=0, count=None,
866 *, fallback=True):
867 if self._debug and sock.gettimeout() != 0:
868 raise ValueError("the socket must be non-blocking")
869 self._check_sendfile_params(sock, file, offset, count)
870 try:
871 return await self._sock_sendfile_native(sock, file,
872 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700873 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200874 if not fallback:
875 raise
876 return await self._sock_sendfile_fallback(sock, file,
877 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200878
879 async def _sock_sendfile_native(self, sock, file, offset, count):
880 # NB: sendfile syscall is not supported for SSL sockets and
881 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700882 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200883 f"syscall sendfile is not available for socket {sock!r} "
884 "and file {file!r} combination")
885
886 async def _sock_sendfile_fallback(self, sock, file, offset, count):
887 if offset:
888 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400889 blocksize = (
890 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
891 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
892 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200893 buf = bytearray(blocksize)
894 total_sent = 0
895 try:
896 while True:
897 if count:
898 blocksize = min(count - total_sent, blocksize)
899 if blocksize <= 0:
900 break
901 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400902 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200903 if not read:
904 break # EOF
Andrew Svetlovef215232019-06-15 14:05:08 +0300905 await self.sock_sendall(sock, view[:read])
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200906 total_sent += read
907 return total_sent
908 finally:
909 if total_sent > 0 and hasattr(file, 'seek'):
910 file.seek(offset + total_sent)
911
912 def _check_sendfile_params(self, sock, file, offset, count):
913 if 'b' not in getattr(file, 'mode', 'b'):
914 raise ValueError("file should be opened in binary mode")
915 if not sock.type == socket.SOCK_STREAM:
916 raise ValueError("only SOCK_STREAM type sockets are supported")
917 if count is not None:
918 if not isinstance(count, int):
919 raise TypeError(
920 "count must be a positive integer (got {!r})".format(count))
921 if count <= 0:
922 raise ValueError(
923 "count must be a positive integer (got {!r})".format(count))
924 if not isinstance(offset, int):
925 raise TypeError(
926 "offset must be a non-negative integer (got {!r})".format(
927 offset))
928 if offset < 0:
929 raise ValueError(
930 "offset must be a non-negative integer (got {!r})".format(
931 offset))
932
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800933 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
934 """Create, bind and connect one socket."""
935 my_exceptions = []
936 exceptions.append(my_exceptions)
937 family, type_, proto, _, address = addr_info
938 sock = None
939 try:
940 sock = socket.socket(family=family, type=type_, proto=proto)
941 sock.setblocking(False)
942 if local_addr_infos is not None:
943 for _, _, _, _, laddr in local_addr_infos:
944 try:
945 sock.bind(laddr)
946 break
947 except OSError as exc:
948 msg = (
949 f'error while attempting to bind on '
950 f'address {laddr!r}: '
951 f'{exc.strerror.lower()}'
952 )
953 exc = OSError(exc.errno, msg)
954 my_exceptions.append(exc)
955 else: # all bind attempts failed
956 raise my_exceptions.pop()
957 await self.sock_connect(sock, address)
958 return sock
959 except OSError as exc:
960 my_exceptions.append(exc)
961 if sock is not None:
962 sock.close()
963 raise
964 except:
965 if sock is not None:
966 sock.close()
967 raise
968
Neil Aspinallf7686c12017-12-19 19:45:42 +0000969 async def create_connection(
970 self, protocol_factory, host=None, port=None,
971 *, ssl=None, family=0,
972 proto=0, flags=0, sock=None,
973 local_addr=None, server_hostname=None,
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800974 ssl_handshake_timeout=None,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300975 ssl_shutdown_timeout=None,
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800976 happy_eyeballs_delay=None, interleave=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200977 """Connect to a TCP server.
978
979 Create a streaming transport connection to a given Internet host and
980 port: socket family AF_INET or socket.AF_INET6 depending on host (or
981 family if specified), socket type SOCK_STREAM. protocol_factory must be
982 a callable returning a protocol instance.
983
984 This method is a coroutine which will try to establish the connection
985 in the background. When successful, the coroutine returns a
986 (transport, protocol) pair.
987 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700988 if server_hostname is not None and not ssl:
989 raise ValueError('server_hostname is only meaningful with ssl')
990
991 if server_hostname is None and ssl:
992 # Use host as default for server_hostname. It is an error
993 # if host is empty or not set, e.g. when an
994 # already-connected socket was passed or when only a port
995 # is given. To avoid this error, you can pass
996 # server_hostname='' -- this will bypass the hostname
997 # check. (This also means that if host is a numeric
998 # IP/IPv6 address, we will attempt to verify that exact
999 # address; this will probably fail, but it is possible to
1000 # create a certificate for a specific IP address, so we
1001 # don't judge it here.)
1002 if not host:
1003 raise ValueError('You must set server_hostname '
1004 'when using ssl without a host')
1005 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -07001006
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001007 if ssl_handshake_timeout is not None and not ssl:
1008 raise ValueError(
1009 'ssl_handshake_timeout is only meaningful with ssl')
1010
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001011 if ssl_shutdown_timeout is not None and not ssl:
1012 raise ValueError(
1013 'ssl_shutdown_timeout is only meaningful with ssl')
1014
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001015 if happy_eyeballs_delay is not None and interleave is None:
1016 # If using happy eyeballs, default to interleave addresses by family
1017 interleave = 1
1018
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001019 if host is not None or port is not None:
1020 if sock is not None:
1021 raise ValueError(
1022 'host/port and sock can not be specified at the same time')
1023
Yury Selivanov19a44f62017-12-14 20:53:26 -05001024 infos = await self._ensure_resolved(
1025 (host, port), family=family,
1026 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001027 if not infos:
1028 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -05001029
1030 if local_addr is not None:
1031 laddr_infos = await self._ensure_resolved(
1032 local_addr, family=family,
1033 type=socket.SOCK_STREAM, proto=proto,
1034 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001035 if not laddr_infos:
1036 raise OSError('getaddrinfo() returned empty list')
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001037 else:
1038 laddr_infos = None
1039
1040 if interleave:
1041 infos = _interleave_addrinfos(infos, interleave)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001042
1043 exceptions = []
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001044 if happy_eyeballs_delay is None:
1045 # not using happy eyeballs
1046 for addrinfo in infos:
1047 try:
1048 sock = await self._connect_sock(
1049 exceptions, addrinfo, laddr_infos)
1050 break
1051 except OSError:
1052 continue
1053 else: # using happy eyeballs
1054 sock, _, _ = await staggered.staggered_race(
1055 (functools.partial(self._connect_sock,
1056 exceptions, addrinfo, laddr_infos)
1057 for addrinfo in infos),
1058 happy_eyeballs_delay, loop=self)
1059
1060 if sock is None:
1061 exceptions = [exc for sub in exceptions for exc in sub]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001062 if len(exceptions) == 1:
1063 raise exceptions[0]
1064 else:
1065 # If they all have the same str(), raise one.
1066 model = str(exceptions[0])
1067 if all(str(exc) == model for exc in exceptions):
1068 raise exceptions[0]
1069 # Raise a combined exception so the user can see all
1070 # the various error messages.
1071 raise OSError('Multiple exceptions: {}'.format(
1072 ', '.join(str(exc) for exc in exceptions)))
1073
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001074 else:
1075 if sock is None:
1076 raise ValueError(
1077 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001078 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -05001079 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1080 # are SOCK_STREAM.
1081 # We support passing AF_UNIX sockets even though we have
1082 # a dedicated API for that: create_unix_connection.
1083 # Disallowing AF_UNIX in this method, breaks backwards
1084 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001085 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001086 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001087
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001088 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001089 sock, protocol_factory, ssl, server_hostname,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001090 ssl_handshake_timeout=ssl_handshake_timeout,
1091 ssl_shutdown_timeout=ssl_shutdown_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +02001092 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +02001093 # Get the socket from the transport because SSL transport closes
1094 # the old socket and creates a new SSL socket
1095 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001096 logger.debug("%r connected to %s:%r: (%r, %r)",
1097 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001098 return transport, protocol
1099
Neil Aspinallf7686c12017-12-19 19:45:42 +00001100 async def _create_connection_transport(
1101 self, sock, protocol_factory, ssl,
1102 server_hostname, server_side=False,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001103 ssl_handshake_timeout=None,
1104 ssl_shutdown_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001105
1106 sock.setblocking(False)
1107
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001108 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001109 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001110 if ssl:
1111 sslcontext = None if isinstance(ssl, bool) else ssl
1112 transport = self._make_ssl_transport(
1113 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001114 server_side=server_side, server_hostname=server_hostname,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001115 ssl_handshake_timeout=ssl_handshake_timeout,
1116 ssl_shutdown_timeout=ssl_shutdown_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001117 else:
1118 transport = self._make_socket_transport(sock, protocol, waiter)
1119
Victor Stinner29ad0112015-01-15 00:04:21 +01001120 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001121 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001122 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001123 transport.close()
1124 raise
1125
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001126 return transport, protocol
1127
Andrew Svetlov7c684072018-01-27 21:22:47 +02001128 async def sendfile(self, transport, file, offset=0, count=None,
1129 *, fallback=True):
1130 """Send a file to transport.
1131
1132 Return the total number of bytes which were sent.
1133
1134 The method uses high-performance os.sendfile if available.
1135
1136 file must be a regular file object opened in binary mode.
1137
1138 offset tells from where to start reading the file. If specified,
1139 count is the total number of bytes to transmit as opposed to
1140 sending the file until EOF is reached. File position is updated on
1141 return or also in case of error in which case file.tell()
1142 can be used to figure out the number of bytes
1143 which were sent.
1144
1145 fallback set to True makes asyncio to manually read and send
1146 the file when the platform does not support the sendfile syscall
1147 (e.g. Windows or SSL socket on Unix).
1148
1149 Raise SendfileNotAvailableError if the system does not support
1150 sendfile syscall and fallback is False.
1151 """
1152 if transport.is_closing():
1153 raise RuntimeError("Transport is closing")
1154 mode = getattr(transport, '_sendfile_compatible',
1155 constants._SendfileMode.UNSUPPORTED)
1156 if mode is constants._SendfileMode.UNSUPPORTED:
1157 raise RuntimeError(
1158 f"sendfile is not supported for transport {transport!r}")
1159 if mode is constants._SendfileMode.TRY_NATIVE:
1160 try:
1161 return await self._sendfile_native(transport, file,
1162 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001163 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001164 if not fallback:
1165 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001166
1167 if not fallback:
1168 raise RuntimeError(
1169 f"fallback is disabled and native sendfile is not "
1170 f"supported for transport {transport!r}")
1171
Andrew Svetlov7c684072018-01-27 21:22:47 +02001172 return await self._sendfile_fallback(transport, file,
1173 offset, count)
1174
1175 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001176 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001177 "sendfile syscall is not supported")
1178
1179 async def _sendfile_fallback(self, transp, file, offset, count):
1180 if offset:
1181 file.seek(offset)
1182 blocksize = min(count, 16384) if count else 16384
1183 buf = bytearray(blocksize)
1184 total_sent = 0
1185 proto = _SendfileFallbackProtocol(transp)
1186 try:
1187 while True:
1188 if count:
1189 blocksize = min(count - total_sent, blocksize)
1190 if blocksize <= 0:
1191 return total_sent
1192 view = memoryview(buf)[:blocksize]
Andrew Svetlov02372652019-06-15 14:05:35 +03001193 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov7c684072018-01-27 21:22:47 +02001194 if not read:
1195 return total_sent # EOF
1196 await proto.drain()
Andrew Svetlovef215232019-06-15 14:05:08 +03001197 transp.write(view[:read])
Andrew Svetlov7c684072018-01-27 21:22:47 +02001198 total_sent += read
1199 finally:
1200 if total_sent > 0 and hasattr(file, 'seek'):
1201 file.seek(offset + total_sent)
1202 await proto.restore()
1203
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001204 async def start_tls(self, transport, protocol, sslcontext, *,
1205 server_side=False,
1206 server_hostname=None,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001207 ssl_handshake_timeout=None,
1208 ssl_shutdown_timeout=None):
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001209 """Upgrade transport to TLS.
1210
1211 Return a new transport that *protocol* should start using
1212 immediately.
1213 """
1214 if ssl is None:
1215 raise RuntimeError('Python ssl module is not available')
1216
1217 if not isinstance(sslcontext, ssl.SSLContext):
1218 raise TypeError(
1219 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1220 f'got {sslcontext!r}')
1221
1222 if not getattr(transport, '_start_tls_compatible', False):
1223 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001224 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001225
1226 waiter = self.create_future()
1227 ssl_protocol = sslproto.SSLProtocol(
1228 self, protocol, sslcontext, waiter,
1229 server_side, server_hostname,
1230 ssl_handshake_timeout=ssl_handshake_timeout,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001231 ssl_shutdown_timeout=ssl_shutdown_timeout,
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001232 call_connection_made=False)
1233
Yury Selivanovf2955872018-05-29 01:00:12 -04001234 # Pause early so that "ssl_protocol.data_received()" doesn't
1235 # have a chance to get called before "ssl_protocol.connection_made()".
1236 transport.pause_reading()
1237
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001238 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001239 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1240 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001241
Yury Selivanov96026432018-06-04 11:32:35 -04001242 try:
1243 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +02001244 except BaseException:
Yury Selivanov96026432018-06-04 11:32:35 -04001245 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001246 conmade_cb.cancel()
1247 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001248 raise
1249
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001250 return ssl_protocol._app_transport
1251
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001252 async def create_datagram_endpoint(self, protocol_factory,
1253 local_addr=None, remote_addr=None, *,
1254 family=0, proto=0, flags=0,
Kyle Stanleyab513a32019-12-09 09:21:10 -05001255 reuse_address=_unset, reuse_port=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001256 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001257 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001258 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001259 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001260 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001261 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001262 if (local_addr or remote_addr or
1263 family or proto or flags or
Kyle Stanleyab513a32019-12-09 09:21:10 -05001264 reuse_port or allow_broadcast):
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001265 # show the problematic kwargs in exception msg
1266 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1267 family=family, proto=proto, flags=flags,
1268 reuse_address=reuse_address, reuse_port=reuse_port,
1269 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001270 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001271 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001272 f'socket modifier keyword arguments can not be used '
1273 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001274 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001275 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001276 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001277 if not (local_addr or remote_addr):
1278 if family == 0:
1279 raise ValueError('unexpected address family')
1280 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001281 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1282 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001283 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001284 raise TypeError('string is expected')
Quentin Dawans56065d42019-04-09 15:40:59 +02001285
1286 if local_addr and local_addr[0] not in (0, '\x00'):
1287 try:
1288 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1289 os.remove(local_addr)
1290 except FileNotFoundError:
1291 pass
1292 except OSError as err:
1293 # Directory may have permissions only to create socket.
1294 logger.error('Unable to check or remove stale UNIX '
1295 'socket %r: %r',
1296 local_addr, err)
1297
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001298 addr_pairs_info = (((family, proto),
1299 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001300 else:
1301 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001302 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001303 for idx, addr in ((0, local_addr), (1, remote_addr)):
1304 if addr is not None:
1305 assert isinstance(addr, tuple) and len(addr) == 2, (
1306 '2-tuple is expected')
1307
Yury Selivanov19a44f62017-12-14 20:53:26 -05001308 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001309 addr, family=family, type=socket.SOCK_DGRAM,
1310 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001311 if not infos:
1312 raise OSError('getaddrinfo() returned empty list')
1313
1314 for fam, _, pro, _, address in infos:
1315 key = (fam, pro)
1316 if key not in addr_infos:
1317 addr_infos[key] = [None, None]
1318 addr_infos[key][idx] = address
1319
1320 # each addr has to have info for each (family, proto) pair
1321 addr_pairs_info = [
1322 (key, addr_pair) for key, addr_pair in addr_infos.items()
1323 if not ((local_addr and addr_pair[0] is None) or
1324 (remote_addr and addr_pair[1] is None))]
1325
1326 if not addr_pairs_info:
1327 raise ValueError('can not get address information')
1328
1329 exceptions = []
1330
Kyle Stanleyab513a32019-12-09 09:21:10 -05001331 # bpo-37228
1332 if reuse_address is not _unset:
1333 if reuse_address:
1334 raise ValueError("Passing `reuse_address=True` is no "
1335 "longer supported, as the usage of "
1336 "SO_REUSEPORT in UDP poses a significant "
1337 "security concern.")
1338 else:
1339 warnings.warn("The *reuse_address* parameter has been "
1340 "deprecated as of 3.5.10 and is scheduled "
1341 "for removal in 3.11.", DeprecationWarning,
1342 stacklevel=2)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001343
1344 for ((family, proto),
1345 (local_address, remote_address)) in addr_pairs_info:
1346 sock = None
1347 r_addr = None
1348 try:
1349 sock = socket.socket(
1350 family=family, type=socket.SOCK_DGRAM, proto=proto)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001351 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001352 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001353 if allow_broadcast:
1354 sock.setsockopt(
1355 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1356 sock.setblocking(False)
1357
1358 if local_addr:
1359 sock.bind(local_address)
1360 if remote_addr:
Vincent Michel63deaa52019-05-07 19:18:49 +02001361 if not allow_broadcast:
1362 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001363 r_addr = remote_address
1364 except OSError as exc:
1365 if sock is not None:
1366 sock.close()
1367 exceptions.append(exc)
1368 except:
1369 if sock is not None:
1370 sock.close()
1371 raise
1372 else:
1373 break
1374 else:
1375 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001376
1377 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001378 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001379 transport = self._make_datagram_transport(
1380 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001381 if self._debug:
1382 if local_addr:
1383 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1384 "created: (%r, %r)",
1385 local_addr, remote_addr, transport, protocol)
1386 else:
1387 logger.debug("Datagram endpoint remote_addr=%r created: "
1388 "(%r, %r)",
1389 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001390
1391 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001392 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001393 except:
1394 transport.close()
1395 raise
1396
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001397 return transport, protocol
1398
Yury Selivanov19a44f62017-12-14 20:53:26 -05001399 async def _ensure_resolved(self, address, *,
1400 family=0, type=socket.SOCK_STREAM,
1401 proto=0, flags=0, loop):
1402 host, port = address[:2]
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +02001403 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
Yury Selivanov19a44f62017-12-14 20:53:26 -05001404 if info is not None:
1405 # "host" is already a resolved IP.
1406 return [info]
1407 else:
1408 return await loop.getaddrinfo(host, port, family=family, type=type,
1409 proto=proto, flags=flags)
1410
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001411 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001412 infos = await self._ensure_resolved((host, port), family=family,
1413 type=socket.SOCK_STREAM,
1414 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001415 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001416 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001417 return infos
1418
Neil Aspinallf7686c12017-12-19 19:45:42 +00001419 async def create_server(
1420 self, protocol_factory, host=None, port=None,
1421 *,
1422 family=socket.AF_UNSPEC,
1423 flags=socket.AI_PASSIVE,
1424 sock=None,
1425 backlog=100,
1426 ssl=None,
1427 reuse_address=None,
1428 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001429 ssl_handshake_timeout=None,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001430 ssl_shutdown_timeout=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001431 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001432 """Create a TCP server.
1433
Yury Selivanov6370f342017-12-10 18:36:12 -05001434 The host parameter can be a string, in that case the TCP server is
1435 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001436
1437 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001438 the TCP server is bound to all hosts of the sequence. If a host
1439 appears multiple times (possibly indirectly e.g. when hostnames
1440 resolve to the same IP address), the server is only bound once to that
1441 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001442
Victor Stinneracdb7822014-07-14 18:33:40 +02001443 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001444
1445 This method is a coroutine.
1446 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001447 if isinstance(ssl, bool):
1448 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001449
1450 if ssl_handshake_timeout is not None and ssl is None:
1451 raise ValueError(
1452 'ssl_handshake_timeout is only meaningful with ssl')
1453
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001454 if ssl_shutdown_timeout is not None and ssl is None:
1455 raise ValueError(
1456 'ssl_shutdown_timeout is only meaningful with ssl')
1457
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001458 if host is not None or port is not None:
1459 if sock is not None:
1460 raise ValueError(
1461 'host/port and sock can not be specified at the same time')
1462
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001463 if reuse_address is None:
1464 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1465 sockets = []
1466 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001467 hosts = [None]
1468 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001469 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001470 hosts = [host]
1471 else:
1472 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001473
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001474 fs = [self._create_server_getaddrinfo(host, port, family=family,
1475 flags=flags)
1476 for host in hosts]
Yurii Karabase4fe3032020-11-28 10:21:17 +02001477 infos = await tasks.gather(*fs)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001478 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001479
1480 completed = False
1481 try:
1482 for res in infos:
1483 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001484 try:
1485 sock = socket.socket(af, socktype, proto)
1486 except socket.error:
1487 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001488 if self._debug:
1489 logger.warning('create_server() failed to create '
1490 'socket.socket(%r, %r, %r)',
1491 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001492 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001493 sockets.append(sock)
1494 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001495 sock.setsockopt(
1496 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1497 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001498 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001499 # Disable IPv4/IPv6 dual stack support (enabled by
1500 # default on Linux) which makes a single socket
1501 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001502 if (_HAS_IPv6 and
1503 af == socket.AF_INET6 and
1504 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001505 sock.setsockopt(socket.IPPROTO_IPV6,
1506 socket.IPV6_V6ONLY,
1507 True)
1508 try:
1509 sock.bind(sa)
1510 except OSError as err:
1511 raise OSError(err.errno, 'error while attempting '
1512 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001513 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001514 completed = True
1515 finally:
1516 if not completed:
1517 for sock in sockets:
1518 sock.close()
1519 else:
1520 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001521 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001522 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001523 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001524 sockets = [sock]
1525
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001526 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001527 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001528
1529 server = Server(self, sockets, protocol_factory,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001530 ssl, backlog, ssl_handshake_timeout,
1531 ssl_shutdown_timeout)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001532 if start_serving:
1533 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001534 # Skip one loop iteration so that all 'loop.add_reader'
1535 # go through.
Yurii Karabase4fe3032020-11-28 10:21:17 +02001536 await tasks.sleep(0)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001537
Victor Stinnere912e652014-07-12 03:11:53 +02001538 if self._debug:
1539 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001540 return server
1541
Neil Aspinallf7686c12017-12-19 19:45:42 +00001542 async def connect_accepted_socket(
1543 self, protocol_factory, sock,
1544 *, ssl=None,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001545 ssl_handshake_timeout=None,
1546 ssl_shutdown_timeout=None):
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001547 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001548 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001549
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001550 if ssl_handshake_timeout is not None and not ssl:
1551 raise ValueError(
1552 'ssl_handshake_timeout is only meaningful with ssl')
1553
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001554 if ssl_shutdown_timeout is not None and not ssl:
1555 raise ValueError(
1556 'ssl_shutdown_timeout is only meaningful with ssl')
1557
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001558 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001559 sock, protocol_factory, ssl, '', server_side=True,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +03001560 ssl_handshake_timeout=ssl_handshake_timeout,
1561 ssl_shutdown_timeout=ssl_shutdown_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001562 if self._debug:
1563 # Get the socket from the transport because SSL transport closes
1564 # the old socket and creates a new SSL socket
1565 sock = transport.get_extra_info('socket')
1566 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1567 return transport, protocol
1568
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001569 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001570 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001571 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001572 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001573
1574 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001575 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001576 except:
1577 transport.close()
1578 raise
1579
Victor Stinneracdb7822014-07-14 18:33:40 +02001580 if self._debug:
1581 logger.debug('Read pipe %r connected: (%r, %r)',
1582 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001583 return transport, protocol
1584
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001585 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001586 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001587 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001588 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001589
1590 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001591 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001592 except:
1593 transport.close()
1594 raise
1595
Victor Stinneracdb7822014-07-14 18:33:40 +02001596 if self._debug:
1597 logger.debug('Write pipe %r connected: (%r, %r)',
1598 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001599 return transport, protocol
1600
Victor Stinneracdb7822014-07-14 18:33:40 +02001601 def _log_subprocess(self, msg, stdin, stdout, stderr):
1602 info = [msg]
1603 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001604 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001605 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001606 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001607 else:
1608 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001609 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001610 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001611 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001612 logger.debug(' '.join(info))
1613
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001614 async def subprocess_shell(self, protocol_factory, cmd, *,
1615 stdin=subprocess.PIPE,
1616 stdout=subprocess.PIPE,
1617 stderr=subprocess.PIPE,
1618 universal_newlines=False,
1619 shell=True, bufsize=0,
sbstpf0d4c642019-05-27 19:51:19 -04001620 encoding=None, errors=None, text=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001621 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001622 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001623 raise ValueError("cmd must be a string")
1624 if universal_newlines:
1625 raise ValueError("universal_newlines must be False")
1626 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001627 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001628 if bufsize != 0:
1629 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001630 if text:
1631 raise ValueError("text must be False")
1632 if encoding is not None:
1633 raise ValueError("encoding must be None")
1634 if errors is not None:
1635 raise ValueError("errors must be None")
1636
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001637 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001638 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001639 if self._debug:
1640 # don't log parameters: they may contain sensitive information
1641 # (password) and may be too long
1642 debug_log = 'run shell command %r' % cmd
1643 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001644 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001645 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001646 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001647 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001648 return transport, protocol
1649
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001650 async def subprocess_exec(self, protocol_factory, program, *args,
1651 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1652 stderr=subprocess.PIPE, universal_newlines=False,
sbstpf0d4c642019-05-27 19:51:19 -04001653 shell=False, bufsize=0,
1654 encoding=None, errors=None, text=None,
1655 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001656 if universal_newlines:
1657 raise ValueError("universal_newlines must be False")
1658 if shell:
1659 raise ValueError("shell must be False")
1660 if bufsize != 0:
1661 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001662 if text:
1663 raise ValueError("text must be False")
1664 if encoding is not None:
1665 raise ValueError("encoding must be None")
1666 if errors is not None:
1667 raise ValueError("errors must be None")
1668
Victor Stinner20e07432014-02-11 11:44:56 +01001669 popen_args = (program,) + args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001670 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001671 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001672 if self._debug:
1673 # don't log parameters: they may contain sensitive information
1674 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001675 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001676 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001677 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001678 protocol, popen_args, False, stdin, stdout, stderr,
1679 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001680 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001681 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001682 return transport, protocol
1683
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001684 def get_exception_handler(self):
1685 """Return an exception handler, or None if the default one is in use.
1686 """
1687 return self._exception_handler
1688
Yury Selivanov569efa22014-02-18 18:02:19 -05001689 def set_exception_handler(self, handler):
1690 """Set handler as the new event loop exception handler.
1691
1692 If handler is None, the default exception handler will
1693 be set.
1694
1695 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001696 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001697 will be a reference to the active event loop, 'context'
1698 will be a dict object (see `call_exception_handler()`
1699 documentation for details about context).
1700 """
1701 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001702 raise TypeError(f'A callable object or None is expected, '
1703 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001704 self._exception_handler = handler
1705
1706 def default_exception_handler(self, context):
1707 """Default exception handler.
1708
1709 This is called when an exception occurs and no exception
1710 handler is set, and can be called by a custom exception
1711 handler that wants to defer to the default behavior.
1712
Antoine Pitrou921e9432017-11-07 17:23:29 +01001713 This default handler logs the error message and other
1714 context-dependent information. In debug mode, a truncated
1715 stack trace is also appended showing where the given object
1716 (e.g. a handle or future or task) was created, if any.
1717
Victor Stinneracdb7822014-07-14 18:33:40 +02001718 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001719 `call_exception_handler()`.
1720 """
1721 message = context.get('message')
1722 if not message:
1723 message = 'Unhandled exception in event loop'
1724
1725 exception = context.get('exception')
1726 if exception is not None:
1727 exc_info = (type(exception), exception, exception.__traceback__)
1728 else:
1729 exc_info = False
1730
Yury Selivanov6370f342017-12-10 18:36:12 -05001731 if ('source_traceback' not in context and
1732 self._current_handle is not None and
1733 self._current_handle._source_traceback):
1734 context['handle_traceback'] = \
1735 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001736
Yury Selivanov569efa22014-02-18 18:02:19 -05001737 log_lines = [message]
1738 for key in sorted(context):
1739 if key in {'message', 'exception'}:
1740 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001741 value = context[key]
1742 if key == 'source_traceback':
1743 tb = ''.join(traceback.format_list(value))
1744 value = 'Object created at (most recent call last):\n'
1745 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001746 elif key == 'handle_traceback':
1747 tb = ''.join(traceback.format_list(value))
1748 value = 'Handle created at (most recent call last):\n'
1749 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001750 else:
1751 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001752 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001753
1754 logger.error('\n'.join(log_lines), exc_info=exc_info)
1755
1756 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001757 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001758
Victor Stinneracdb7822014-07-14 18:33:40 +02001759 The context argument is a dict containing the following keys:
1760
Yury Selivanov569efa22014-02-18 18:02:19 -05001761 - 'message': Error message;
1762 - 'exception' (optional): Exception object;
1763 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001764 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001765 - 'handle' (optional): Handle instance;
1766 - 'protocol' (optional): Protocol instance;
1767 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001768 - 'socket' (optional): Socket instance;
1769 - 'asyncgen' (optional): Asynchronous generator that caused
1770 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001771
Victor Stinneracdb7822014-07-14 18:33:40 +02001772 New keys maybe introduced in the future.
1773
1774 Note: do not overload this method in an event loop subclass.
1775 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001776 `set_exception_handler()` method.
1777 """
1778 if self._exception_handler is None:
1779 try:
1780 self.default_exception_handler(context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001781 except (SystemExit, KeyboardInterrupt):
1782 raise
1783 except BaseException:
Yury Selivanov569efa22014-02-18 18:02:19 -05001784 # Second protection layer for unexpected errors
1785 # in the default implementation, as well as for subclassed
1786 # event loops with overloaded "default_exception_handler".
1787 logger.error('Exception in default exception handler',
1788 exc_info=True)
1789 else:
1790 try:
1791 self._exception_handler(self, context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001792 except (SystemExit, KeyboardInterrupt):
1793 raise
1794 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001795 # Exception in the user set custom exception handler.
1796 try:
1797 # Let's try default handler.
1798 self.default_exception_handler({
1799 'message': 'Unhandled error in exception handler',
1800 'exception': exc,
1801 'context': context,
1802 })
Yury Selivanov431b5402019-05-27 14:45:12 +02001803 except (SystemExit, KeyboardInterrupt):
1804 raise
1805 except BaseException:
Victor Stinneracdb7822014-07-14 18:33:40 +02001806 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001807 # overloaded.
1808 logger.error('Exception in default exception handler '
1809 'while handling an unexpected error '
1810 'in custom exception handler',
1811 exc_info=True)
1812
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001813 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001814 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001815 assert isinstance(handle, events.Handle), 'A Handle is required here'
1816 if handle._cancelled:
1817 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001818 assert not isinstance(handle, events.TimerHandle)
1819 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001820
1821 def _add_callback_signalsafe(self, handle):
1822 """Like _add_callback() but called from a signal handler."""
1823 self._add_callback(handle)
1824 self._write_to_self()
1825
Yury Selivanov592ada92014-09-25 12:07:56 -04001826 def _timer_handle_cancelled(self, handle):
1827 """Notification that a TimerHandle has been cancelled."""
1828 if handle._scheduled:
1829 self._timer_cancelled_count += 1
1830
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001831 def _run_once(self):
1832 """Run one full iteration of the event loop.
1833
1834 This calls all currently ready callbacks, polls for I/O,
1835 schedules the resulting callbacks, and finally schedules
1836 'call_later' callbacks.
1837 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001838
Yury Selivanov592ada92014-09-25 12:07:56 -04001839 sched_count = len(self._scheduled)
1840 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1841 self._timer_cancelled_count / sched_count >
1842 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001843 # Remove delayed calls that were cancelled if their number
1844 # is too high
1845 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001846 for handle in self._scheduled:
1847 if handle._cancelled:
1848 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001849 else:
1850 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001851
Victor Stinner68da8fc2014-09-30 18:08:36 +02001852 heapq.heapify(new_scheduled)
1853 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001854 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001855 else:
1856 # Remove delayed calls that were cancelled from head of queue.
1857 while self._scheduled and self._scheduled[0]._cancelled:
1858 self._timer_cancelled_count -= 1
1859 handle = heapq.heappop(self._scheduled)
1860 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001861
1862 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001863 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001864 timeout = 0
1865 elif self._scheduled:
1866 # Compute the desired timeout.
1867 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001868 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001869
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001870 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001871 self._process_events(event_list)
1872
1873 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001874 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001875 while self._scheduled:
1876 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001877 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001878 break
1879 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001880 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001881 self._ready.append(handle)
1882
1883 # This is the only place where callbacks are actually *called*.
1884 # All other places just add them to ready.
1885 # Note: We run all currently scheduled callbacks, but not any
1886 # callbacks scheduled by callbacks run this time around --
1887 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001888 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001889 ntodo = len(self._ready)
1890 for i in range(ntodo):
1891 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001892 if handle._cancelled:
1893 continue
1894 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001895 try:
1896 self._current_handle = handle
1897 t0 = self.time()
1898 handle._run()
1899 dt = self.time() - t0
1900 if dt >= self.slow_callback_duration:
1901 logger.warning('Executing %s took %.3f seconds',
1902 _format_handle(handle), dt)
1903 finally:
1904 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001905 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001906 handle._run()
1907 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001908
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001909 def _set_coroutine_origin_tracking(self, enabled):
1910 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001911 return
1912
Yury Selivanove8944cb2015-05-12 11:43:04 -04001913 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001914 self._coroutine_origin_tracking_saved_depth = (
1915 sys.get_coroutine_origin_tracking_depth())
1916 sys.set_coroutine_origin_tracking_depth(
1917 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001918 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001919 sys.set_coroutine_origin_tracking_depth(
1920 self._coroutine_origin_tracking_saved_depth)
1921
1922 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001923
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001924 def get_debug(self):
1925 return self._debug
1926
1927 def set_debug(self, enabled):
1928 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001929
Yury Selivanove8944cb2015-05-12 11:43:04 -04001930 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001931 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)