blob: 799013d5ccccb593201b8ab829ba6bb5ad76add8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
twisteroid ambassador88f07a82019-05-05 19:14:35 +080019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020021import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
Quentin Dawans56065d42019-04-09 15:40:59 +020024import stat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010026import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020028import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010030import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070031import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Yury Selivanovf111b3d2017-12-30 00:35:36 -050033try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
37
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080038from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020039from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070041from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020043from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050044from . import sslproto
twisteroid ambassador88f07a82019-05-05 19:14:35 +080045from . import staggered
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020047from . import transports
Yury Selivanov8cd51652019-05-27 15:57:20 +020048from . import trsock
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070049from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov6370f342017-12-10 18:36:12 -050052__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
54
Yury Selivanov592ada92014-09-25 12:07:56 -040055# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
Andrew Svetlov0dd71802018-09-12 14:03:54 -070063
Yury Selivanovd904c232018-06-28 21:59:32 -040064_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
MartinAltmayer944451c2018-07-31 15:06:12 +010066# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
Miss Islington (bot)79c29742019-12-09 06:39:54 -080069# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
Victor Stinnerc94a93a2016-04-01 21:43:39 +020073
Victor Stinner0e6f52a2014-06-20 17:34:15 +020074def _format_handle(handle):
75 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040076 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020077 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
81
82
Victor Stinneracdb7822014-07-14 18:33:40 +020083def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
90
91
Yury Selivanov5587d7c2016-09-15 15:45:07 -040092def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
101
102
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
108
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500111 return None
112
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500113 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500114 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500115 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500116 proto = socket.IPPROTO_UDP
117 else:
118 return None
119
Yury Selivanova7146162016-06-02 16:51:07 -0400120 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400121 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400132
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500134 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400135 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500136 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400137 else:
138 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500145 return None
146
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400147 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 socket.inet_pton(af, host)
150 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400151 if _HAS_IPv6 and af == socket.AF_INET6:
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200152 return af, type, proto, '', (host, port, flowinfo, scopeid)
Yury Selivanovd904c232018-06-28 21:59:32 -0400153 else:
154 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400155 except OSError:
156 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500157
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400158 # "host" is not an IP address.
159 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500160
161
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163 """Interleave list of addrinfo tuples by family."""
164 # Group addresses by family
165 addrinfos_by_family = collections.OrderedDict()
166 for addr in addrinfos:
167 family = addr[0]
168 if family not in addrinfos_by_family:
169 addrinfos_by_family[family] = []
170 addrinfos_by_family[family].append(addr)
171 addrinfos_lists = list(addrinfos_by_family.values())
172
173 reordered = []
174 if first_address_family_count > 1:
175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176 del addrinfos_lists[0][:first_address_family_count - 1]
177 reordered.extend(
178 a for a in itertools.chain.from_iterable(
179 itertools.zip_longest(*addrinfos_lists)
180 ) if a is not None)
181 return reordered
182
183
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100184def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500185 if not fut.cancelled():
186 exc = fut.exception()
Yury Selivanov431b5402019-05-27 14:45:12 +0200187 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500188 # Issue #22429: run_forever() already finished, no need to
189 # stop it.
190 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500191 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100192
193
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200194if hasattr(socket, 'TCP_NODELAY'):
195 def _set_nodelay(sock):
196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197 sock.type == socket.SOCK_STREAM and
198 sock.proto == socket.IPPROTO_TCP):
199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201 def _set_nodelay(sock):
202 pass
203
204
Andrew Svetlov7c684072018-01-27 21:22:47 +0200205class _SendfileFallbackProtocol(protocols.Protocol):
206 def __init__(self, transp):
207 if not isinstance(transp, transports._FlowControlMixin):
208 raise TypeError("transport should be _FlowControlMixin instance")
209 self._transport = transp
210 self._proto = transp.get_protocol()
211 self._should_resume_reading = transp.is_reading()
212 self._should_resume_writing = transp._protocol_paused
213 transp.pause_reading()
214 transp.set_protocol(self)
215 if self._should_resume_writing:
216 self._write_ready_fut = self._transport._loop.create_future()
217 else:
218 self._write_ready_fut = None
219
220 async def drain(self):
221 if self._transport.is_closing():
222 raise ConnectionError("Connection closed by peer")
223 fut = self._write_ready_fut
224 if fut is None:
225 return
226 await fut
227
228 def connection_made(self, transport):
229 raise RuntimeError("Invalid state: "
230 "connection should have been established already.")
231
232 def connection_lost(self, exc):
233 if self._write_ready_fut is not None:
234 # Never happens if peer disconnects after sending the whole content
235 # Thus disconnection is always an exception from user perspective
236 if exc is None:
237 self._write_ready_fut.set_exception(
238 ConnectionError("Connection is closed by peer"))
239 else:
240 self._write_ready_fut.set_exception(exc)
241 self._proto.connection_lost(exc)
242
243 def pause_writing(self):
244 if self._write_ready_fut is not None:
245 return
246 self._write_ready_fut = self._transport._loop.create_future()
247
248 def resume_writing(self):
249 if self._write_ready_fut is None:
250 return
251 self._write_ready_fut.set_result(False)
252 self._write_ready_fut = None
253
254 def data_received(self, data):
255 raise RuntimeError("Invalid state: reading should be paused")
256
257 def eof_received(self):
258 raise RuntimeError("Invalid state: reading should be paused")
259
260 async def restore(self):
261 self._transport.set_protocol(self._proto)
262 if self._should_resume_reading:
263 self._transport.resume_reading()
264 if self._write_ready_fut is not None:
265 # Cancel the future.
266 # Basically it has no effect because protocol is switched back,
267 # no code should wait for it anymore.
268 self._write_ready_fut.cancel()
269 if self._should_resume_writing:
270 self._proto.resume_writing()
271
272
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273class Server(events.AbstractServer):
274
Yury Selivanovc9070d02018-01-25 18:08:09 -0500275 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
276 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200277 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500278 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200279 self._active_count = 0
280 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500281 self._protocol_factory = protocol_factory
282 self._backlog = backlog
283 self._ssl_context = ssl_context
284 self._ssl_handshake_timeout = ssl_handshake_timeout
285 self._serving = False
286 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
Victor Stinnere912e652014-07-12 03:11:53 +0200288 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500289 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200290
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200291 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500292 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200293 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200295 def _detach(self):
296 assert self._active_count > 0
297 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500298 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 self._wakeup()
300
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200302 waiters = self._waiters
303 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 for waiter in waiters:
305 if not waiter.done():
306 waiter.set_result(waiter)
307
Yury Selivanovc9070d02018-01-25 18:08:09 -0500308 def _start_serving(self):
309 if self._serving:
310 return
311 self._serving = True
312 for sock in self._sockets:
313 sock.listen(self._backlog)
314 self._loop._start_serving(
315 self._protocol_factory, sock, self._ssl_context,
316 self, self._backlog, self._ssl_handshake_timeout)
317
318 def get_loop(self):
319 return self._loop
320
321 def is_serving(self):
322 return self._serving
323
324 @property
325 def sockets(self):
326 if self._sockets is None:
Yury Selivanov8cd51652019-05-27 15:57:20 +0200327 return ()
328 return tuple(trsock.TransportSocket(s) for s in self._sockets)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500329
330 def close(self):
331 sockets = self._sockets
332 if sockets is None:
333 return
334 self._sockets = None
335
336 for sock in sockets:
337 self._loop._stop_serving(sock)
338
339 self._serving = False
340
341 if (self._serving_forever_fut is not None and
342 not self._serving_forever_fut.done()):
343 self._serving_forever_fut.cancel()
344 self._serving_forever_fut = None
345
346 if self._active_count == 0:
347 self._wakeup()
348
349 async def start_serving(self):
350 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400351 # Skip one loop iteration so that all 'loop.add_reader'
352 # go through.
353 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500354
355 async def serve_forever(self):
356 if self._serving_forever_fut is not None:
357 raise RuntimeError(
358 f'server {self!r} is already being awaited on serve_forever()')
359 if self._sockets is None:
360 raise RuntimeError(f'server {self!r} is closed')
361
362 self._start_serving()
363 self._serving_forever_fut = self._loop.create_future()
364
365 try:
366 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700367 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500368 try:
369 self.close()
370 await self.wait_closed()
371 finally:
372 raise
373 finally:
374 self._serving_forever_fut = None
375
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200376 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500377 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400379 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200380 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200381 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382
383
384class BaseEventLoop(events.AbstractEventLoop):
385
386 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400387 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200388 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800389 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 self._ready = collections.deque()
391 self._scheduled = []
392 self._default_executor = None
393 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100394 # Identifier of the thread running the event loop, or None if the
395 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100396 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100397 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500398 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800399 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200400 # In debug mode, if the execution of a callback or a step of a task
401 # exceed this duration in seconds, the slow callback/task is logged.
402 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100403 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400404 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800405 self._coroutine_origin_tracking_enabled = False
406 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500408 # A weak set of all asynchronous generators that are
409 # being iterated by the loop.
410 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700411 # Set to True when `loop.shutdown_asyncgens` is called.
412 self._asyncgens_shutdown_called = False
413
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200414 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500415 return (
416 f'<{self.__class__.__name__} running={self.is_running()} '
417 f'closed={self.is_closed()} debug={self.get_debug()}>'
418 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200419
Yury Selivanov7661db62016-05-16 15:38:39 -0400420 def create_future(self):
421 """Create a Future object attached to the loop."""
422 return futures.Future(loop=self)
423
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300424 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200425 """Schedule a coroutine object.
426
Victor Stinneracdb7822014-07-14 18:33:40 +0200427 Return a task object.
428 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100429 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400430 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300431 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400432 if task._source_traceback:
433 del task._source_traceback[-1]
434 else:
435 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300436 tasks._set_task_name(task, name)
437
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200438 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200439
Yury Selivanov740169c2015-05-11 14:23:38 -0400440 def set_task_factory(self, factory):
441 """Set a task factory that will be used by loop.create_task().
442
443 If factory is None the default task factory will be set.
444
445 If factory is a callable, it should have a signature matching
446 '(loop, coro)', where 'loop' will be a reference to the active
447 event loop, 'coro' will be a coroutine object. The callable
448 must return a Future.
449 """
450 if factory is not None and not callable(factory):
451 raise TypeError('task factory must be a callable or None')
452 self._task_factory = factory
453
454 def get_task_factory(self):
455 """Return a task factory, or None if the default one is in use."""
456 return self._task_factory
457
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 def _make_socket_transport(self, sock, protocol, waiter=None, *,
459 extra=None, server=None):
460 """Create socket transport."""
461 raise NotImplementedError
462
Neil Aspinallf7686c12017-12-19 19:45:42 +0000463 def _make_ssl_transport(
464 self, rawsock, protocol, sslcontext, waiter=None,
465 *, server_side=False, server_hostname=None,
466 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500467 ssl_handshake_timeout=None,
468 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 """Create SSL transport."""
470 raise NotImplementedError
471
472 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200473 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 """Create datagram transport."""
475 raise NotImplementedError
476
477 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
478 extra=None):
479 """Create read pipe transport."""
480 raise NotImplementedError
481
482 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
483 extra=None):
484 """Create write pipe transport."""
485 raise NotImplementedError
486
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200487 async def _make_subprocess_transport(self, protocol, args, shell,
488 stdin, stdout, stderr, bufsize,
489 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 """Create subprocess transport."""
491 raise NotImplementedError
492
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200494 """Write a byte to self-pipe, to wake up the event loop.
495
496 This may be called from a different thread.
497
498 The subclass is responsible for implementing the self-pipe.
499 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 raise NotImplementedError
501
502 def _process_events(self, event_list):
503 """Process selector events."""
504 raise NotImplementedError
505
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200506 def _check_closed(self):
507 if self._closed:
508 raise RuntimeError('Event loop is closed')
509
Yury Selivanoveb636452016-09-08 22:01:51 -0700510 def _asyncgen_finalizer_hook(self, agen):
511 self._asyncgens.discard(agen)
512 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800513 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700514
515 def _asyncgen_firstiter_hook(self, agen):
516 if self._asyncgens_shutdown_called:
517 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500518 f"asynchronous generator {agen!r} was scheduled after "
519 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700520 ResourceWarning, source=self)
521
522 self._asyncgens.add(agen)
523
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200524 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700525 """Shutdown all active asynchronous generators."""
526 self._asyncgens_shutdown_called = True
527
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500528 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400529 # If Python version is <3.6 or we don't have any asynchronous
530 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700531 return
532
533 closing_agens = list(self._asyncgens)
534 self._asyncgens.clear()
535
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200536 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700537 *[ag.aclose() for ag in closing_agens],
538 return_exceptions=True,
539 loop=self)
540
Yury Selivanoveb636452016-09-08 22:01:51 -0700541 for result, agen in zip(results, closing_agens):
542 if isinstance(result, Exception):
543 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500544 'message': f'an error occurred during closing of '
545 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700546 'exception': result,
547 'asyncgen': agen
548 })
549
Andrew Svetlov4112a3d2020-01-07 16:55:19 +0200550 def _check_running(self):
Victor Stinner956de692014-12-26 21:07:52 +0100551 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400552 raise RuntimeError('This event loop is already running')
553 if events._get_running_loop() is not None:
554 raise RuntimeError(
555 'Cannot run the event loop while another loop is running')
Andrew Svetlov867d8332020-01-04 11:49:11 +0200556
557 def run_forever(self):
558 """Run until stop() is called."""
559 self._check_closed()
Andrew Svetlov4112a3d2020-01-07 16:55:19 +0200560 self._check_running()
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800561 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100562 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500563
564 old_agen_hooks = sys.get_asyncgen_hooks()
565 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
566 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400568 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800570 self._run_once()
571 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 break
573 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800574 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100575 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400576 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800577 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500578 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579
580 def run_until_complete(self, future):
581 """Run until the Future is done.
582
583 If the argument is a coroutine, it is wrapped in a Task.
584
Victor Stinneracdb7822014-07-14 18:33:40 +0200585 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 with the same coroutine twice -- it would wrap it in two
587 different Tasks and that can't be good.
588
589 Return the Future's result, or raise its exception.
590 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200591 self._check_closed()
Andrew Svetlov4112a3d2020-01-07 16:55:19 +0200592 self._check_running()
Victor Stinner98b63912014-06-30 14:51:04 +0200593
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700594 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400595 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200596 if new_task:
597 # An exception is raised if the future didn't complete, so there
598 # is no need to log the "destroy pending task" message
599 future._log_destroy_pending = False
600
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100601 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200602 try:
603 self.run_forever()
604 except:
605 if new_task and future.done() and not future.cancelled():
606 # The coroutine raised a BaseException. Consume the exception
607 # to not log a warning, the caller doesn't have access to the
608 # local task.
609 future.exception()
610 raise
jimmylai21b3e042017-05-22 22:32:46 -0700611 finally:
612 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 if not future.done():
614 raise RuntimeError('Event loop stopped before Future completed.')
615
616 return future.result()
617
618 def stop(self):
619 """Stop running the event loop.
620
Guido van Rossum41f69f42015-11-19 13:28:47 -0800621 Every callback already scheduled will still run. This simply informs
622 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800624 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200626 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700627 """Close the event loop.
628
629 This clears the queues and shuts down the executor,
630 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200631
632 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700633 """
Victor Stinner956de692014-12-26 21:07:52 +0100634 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200635 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200636 if self._closed:
637 return
Victor Stinnere912e652014-07-12 03:11:53 +0200638 if self._debug:
639 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400640 self._closed = True
641 self._ready.clear()
642 self._scheduled.clear()
643 executor = self._default_executor
644 if executor is not None:
645 self._default_executor = None
Łukasz Langa7f9a2ae2019-06-04 13:03:20 +0200646 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200647
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200648 def is_closed(self):
649 """Returns True if the event loop was closed."""
650 return self._closed
651
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100652 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900653 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100654 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900655 if not self.is_running():
656 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100657
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200659 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100660 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661
662 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200663 """Return the time according to the event loop's clock.
664
665 This is a float expressed in seconds since an epoch, but the
666 epoch, precision, accuracy and drift are unspecified and may
667 differ per event loop.
668 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 return time.monotonic()
670
Yury Selivanovf23746a2018-01-22 19:11:18 -0500671 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 """Arrange for a callback to be called at a given time.
673
674 Return a Handle: an opaque object with a cancel() method that
675 can be used to cancel the call.
676
677 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200678 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679
680 Each callback will be called exactly once. If two callbacks
681 are scheduled for exactly the same time, it undefined which
682 will be called first.
683
684 Any positional arguments after the callback will be passed to
685 the callback when it is called.
686 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500687 timer = self.call_at(self.time() + delay, callback, *args,
688 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200689 if timer._source_traceback:
690 del timer._source_traceback[-1]
691 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700692
Yury Selivanovf23746a2018-01-22 19:11:18 -0500693 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200694 """Like call_later(), but uses an absolute time.
695
696 Absolute time corresponds to the event loop's time() method.
697 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100698 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100699 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100700 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700701 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500702 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200703 if timer._source_traceback:
704 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400706 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 return timer
708
Yury Selivanovf23746a2018-01-22 19:11:18 -0500709 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 """Arrange for a callback to be called as soon as possible.
711
Victor Stinneracdb7822014-07-14 18:33:40 +0200712 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 order in which they are registered. Each callback will be
714 called exactly once.
715
716 Any positional arguments after the callback will be passed to
717 the callback when it is called.
718 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700719 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100720 if self._debug:
721 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700722 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500723 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200724 if handle._source_traceback:
725 del handle._source_traceback[-1]
726 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100727
Yury Selivanov491a9122016-11-03 15:09:24 -0700728 def _check_callback(self, callback, method):
729 if (coroutines.iscoroutine(callback) or
730 coroutines.iscoroutinefunction(callback)):
731 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500732 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700733 if not callable(callback):
734 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500735 f'a callable object was expected by {method}(), '
736 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700737
Yury Selivanovf23746a2018-01-22 19:11:18 -0500738 def _call_soon(self, callback, args, context):
739 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200740 if handle._source_traceback:
741 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 self._ready.append(handle)
743 return handle
744
Victor Stinner956de692014-12-26 21:07:52 +0100745 def _check_thread(self):
746 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100747
Victor Stinneracdb7822014-07-14 18:33:40 +0200748 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100749 likely behave incorrectly when the assumption is violated.
750
Victor Stinneracdb7822014-07-14 18:33:40 +0200751 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100752 responsible for checking this condition for performance reasons.
753 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100754 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200755 return
Victor Stinner956de692014-12-26 21:07:52 +0100756 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100757 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100758 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200759 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100760 "than the current one")
761
Yury Selivanovf23746a2018-01-22 19:11:18 -0500762 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200763 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700764 self._check_closed()
765 if self._debug:
766 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500767 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200768 if handle._source_traceback:
769 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700770 self._write_to_self()
771 return handle
772
Yury Selivanovbec23722018-01-28 14:09:40 -0500773 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100774 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700775 if self._debug:
776 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777 if executor is None:
778 executor = self._default_executor
779 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400780 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500782 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500783 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784
785 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100786 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
787 warnings.warn(
788 'Using the default executor that is not an instance of '
789 'ThreadPoolExecutor is deprecated and will be prohibited '
790 'in Python 3.9',
791 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700792 self._default_executor = executor
793
Victor Stinnere912e652014-07-12 03:11:53 +0200794 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500795 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200796 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500797 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200798 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500799 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200800 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500801 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200802 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500803 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200804 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200805 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200806
807 t0 = self.time()
808 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
809 dt = self.time() - t0
810
Yury Selivanov6370f342017-12-10 18:36:12 -0500811 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200812 if dt >= self.slow_callback_duration:
813 logger.info(msg)
814 else:
815 logger.debug(msg)
816 return addrinfo
817
Yury Selivanov19a44f62017-12-14 20:53:26 -0500818 async def getaddrinfo(self, host, port, *,
819 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400820 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500821 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200822 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500823 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700824
Yury Selivanov19a44f62017-12-14 20:53:26 -0500825 return await self.run_in_executor(
826 None, getaddr_func, host, port, family, type, proto, flags)
827
828 async def getnameinfo(self, sockaddr, flags=0):
829 return await self.run_in_executor(
830 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700831
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200832 async def sock_sendfile(self, sock, file, offset=0, count=None,
833 *, fallback=True):
834 if self._debug and sock.gettimeout() != 0:
835 raise ValueError("the socket must be non-blocking")
836 self._check_sendfile_params(sock, file, offset, count)
837 try:
838 return await self._sock_sendfile_native(sock, file,
839 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700840 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200841 if not fallback:
842 raise
843 return await self._sock_sendfile_fallback(sock, file,
844 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200845
846 async def _sock_sendfile_native(self, sock, file, offset, count):
847 # NB: sendfile syscall is not supported for SSL sockets and
848 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700849 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200850 f"syscall sendfile is not available for socket {sock!r} "
851 "and file {file!r} combination")
852
853 async def _sock_sendfile_fallback(self, sock, file, offset, count):
854 if offset:
855 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400856 blocksize = (
857 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
858 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
859 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200860 buf = bytearray(blocksize)
861 total_sent = 0
862 try:
863 while True:
864 if count:
865 blocksize = min(count - total_sent, blocksize)
866 if blocksize <= 0:
867 break
868 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400869 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200870 if not read:
871 break # EOF
Miss Islington (bot)bb073212019-06-15 04:24:16 -0700872 await self.sock_sendall(sock, view[:read])
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200873 total_sent += read
874 return total_sent
875 finally:
876 if total_sent > 0 and hasattr(file, 'seek'):
877 file.seek(offset + total_sent)
878
879 def _check_sendfile_params(self, sock, file, offset, count):
880 if 'b' not in getattr(file, 'mode', 'b'):
881 raise ValueError("file should be opened in binary mode")
882 if not sock.type == socket.SOCK_STREAM:
883 raise ValueError("only SOCK_STREAM type sockets are supported")
884 if count is not None:
885 if not isinstance(count, int):
886 raise TypeError(
887 "count must be a positive integer (got {!r})".format(count))
888 if count <= 0:
889 raise ValueError(
890 "count must be a positive integer (got {!r})".format(count))
891 if not isinstance(offset, int):
892 raise TypeError(
893 "offset must be a non-negative integer (got {!r})".format(
894 offset))
895 if offset < 0:
896 raise ValueError(
897 "offset must be a non-negative integer (got {!r})".format(
898 offset))
899
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800900 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
901 """Create, bind and connect one socket."""
902 my_exceptions = []
903 exceptions.append(my_exceptions)
904 family, type_, proto, _, address = addr_info
905 sock = None
906 try:
907 sock = socket.socket(family=family, type=type_, proto=proto)
908 sock.setblocking(False)
909 if local_addr_infos is not None:
910 for _, _, _, _, laddr in local_addr_infos:
911 try:
912 sock.bind(laddr)
913 break
914 except OSError as exc:
915 msg = (
916 f'error while attempting to bind on '
917 f'address {laddr!r}: '
918 f'{exc.strerror.lower()}'
919 )
920 exc = OSError(exc.errno, msg)
921 my_exceptions.append(exc)
922 else: # all bind attempts failed
923 raise my_exceptions.pop()
924 await self.sock_connect(sock, address)
925 return sock
926 except OSError as exc:
927 my_exceptions.append(exc)
928 if sock is not None:
929 sock.close()
930 raise
931 except:
932 if sock is not None:
933 sock.close()
934 raise
935
Neil Aspinallf7686c12017-12-19 19:45:42 +0000936 async def create_connection(
937 self, protocol_factory, host=None, port=None,
938 *, ssl=None, family=0,
939 proto=0, flags=0, sock=None,
940 local_addr=None, server_hostname=None,
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800941 ssl_handshake_timeout=None,
942 happy_eyeballs_delay=None, interleave=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200943 """Connect to a TCP server.
944
945 Create a streaming transport connection to a given Internet host and
946 port: socket family AF_INET or socket.AF_INET6 depending on host (or
947 family if specified), socket type SOCK_STREAM. protocol_factory must be
948 a callable returning a protocol instance.
949
950 This method is a coroutine which will try to establish the connection
951 in the background. When successful, the coroutine returns a
952 (transport, protocol) pair.
953 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700954 if server_hostname is not None and not ssl:
955 raise ValueError('server_hostname is only meaningful with ssl')
956
957 if server_hostname is None and ssl:
958 # Use host as default for server_hostname. It is an error
959 # if host is empty or not set, e.g. when an
960 # already-connected socket was passed or when only a port
961 # is given. To avoid this error, you can pass
962 # server_hostname='' -- this will bypass the hostname
963 # check. (This also means that if host is a numeric
964 # IP/IPv6 address, we will attempt to verify that exact
965 # address; this will probably fail, but it is possible to
966 # create a certificate for a specific IP address, so we
967 # don't judge it here.)
968 if not host:
969 raise ValueError('You must set server_hostname '
970 'when using ssl without a host')
971 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700972
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200973 if ssl_handshake_timeout is not None and not ssl:
974 raise ValueError(
975 'ssl_handshake_timeout is only meaningful with ssl')
976
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800977 if happy_eyeballs_delay is not None and interleave is None:
978 # If using happy eyeballs, default to interleave addresses by family
979 interleave = 1
980
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700981 if host is not None or port is not None:
982 if sock is not None:
983 raise ValueError(
984 'host/port and sock can not be specified at the same time')
985
Yury Selivanov19a44f62017-12-14 20:53:26 -0500986 infos = await self._ensure_resolved(
987 (host, port), family=family,
988 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700989 if not infos:
990 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500991
992 if local_addr is not None:
993 laddr_infos = await self._ensure_resolved(
994 local_addr, family=family,
995 type=socket.SOCK_STREAM, proto=proto,
996 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700997 if not laddr_infos:
998 raise OSError('getaddrinfo() returned empty list')
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800999 else:
1000 laddr_infos = None
1001
1002 if interleave:
1003 infos = _interleave_addrinfos(infos, interleave)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001004
1005 exceptions = []
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001006 if happy_eyeballs_delay is None:
1007 # not using happy eyeballs
1008 for addrinfo in infos:
1009 try:
1010 sock = await self._connect_sock(
1011 exceptions, addrinfo, laddr_infos)
1012 break
1013 except OSError:
1014 continue
1015 else: # using happy eyeballs
1016 sock, _, _ = await staggered.staggered_race(
1017 (functools.partial(self._connect_sock,
1018 exceptions, addrinfo, laddr_infos)
1019 for addrinfo in infos),
1020 happy_eyeballs_delay, loop=self)
1021
1022 if sock is None:
1023 exceptions = [exc for sub in exceptions for exc in sub]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001024 if len(exceptions) == 1:
1025 raise exceptions[0]
1026 else:
1027 # If they all have the same str(), raise one.
1028 model = str(exceptions[0])
1029 if all(str(exc) == model for exc in exceptions):
1030 raise exceptions[0]
1031 # Raise a combined exception so the user can see all
1032 # the various error messages.
1033 raise OSError('Multiple exceptions: {}'.format(
1034 ', '.join(str(exc) for exc in exceptions)))
1035
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001036 else:
1037 if sock is None:
1038 raise ValueError(
1039 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001040 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -05001041 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1042 # are SOCK_STREAM.
1043 # We support passing AF_UNIX sockets even though we have
1044 # a dedicated API for that: create_unix_connection.
1045 # Disallowing AF_UNIX in this method, breaks backwards
1046 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001047 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001048 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001049
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001050 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001051 sock, protocol_factory, ssl, server_hostname,
1052 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +02001053 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +02001054 # Get the socket from the transport because SSL transport closes
1055 # the old socket and creates a new SSL socket
1056 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001057 logger.debug("%r connected to %s:%r: (%r, %r)",
1058 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001059 return transport, protocol
1060
Neil Aspinallf7686c12017-12-19 19:45:42 +00001061 async def _create_connection_transport(
1062 self, sock, protocol_factory, ssl,
1063 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001064 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001065
1066 sock.setblocking(False)
1067
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001068 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001069 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001070 if ssl:
1071 sslcontext = None if isinstance(ssl, bool) else ssl
1072 transport = self._make_ssl_transport(
1073 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001074 server_side=server_side, server_hostname=server_hostname,
1075 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001076 else:
1077 transport = self._make_socket_transport(sock, protocol, waiter)
1078
Victor Stinner29ad0112015-01-15 00:04:21 +01001079 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001080 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001081 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001082 transport.close()
1083 raise
1084
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001085 return transport, protocol
1086
Andrew Svetlov7c684072018-01-27 21:22:47 +02001087 async def sendfile(self, transport, file, offset=0, count=None,
1088 *, fallback=True):
1089 """Send a file to transport.
1090
1091 Return the total number of bytes which were sent.
1092
1093 The method uses high-performance os.sendfile if available.
1094
1095 file must be a regular file object opened in binary mode.
1096
1097 offset tells from where to start reading the file. If specified,
1098 count is the total number of bytes to transmit as opposed to
1099 sending the file until EOF is reached. File position is updated on
1100 return or also in case of error in which case file.tell()
1101 can be used to figure out the number of bytes
1102 which were sent.
1103
1104 fallback set to True makes asyncio to manually read and send
1105 the file when the platform does not support the sendfile syscall
1106 (e.g. Windows or SSL socket on Unix).
1107
1108 Raise SendfileNotAvailableError if the system does not support
1109 sendfile syscall and fallback is False.
1110 """
1111 if transport.is_closing():
1112 raise RuntimeError("Transport is closing")
1113 mode = getattr(transport, '_sendfile_compatible',
1114 constants._SendfileMode.UNSUPPORTED)
1115 if mode is constants._SendfileMode.UNSUPPORTED:
1116 raise RuntimeError(
1117 f"sendfile is not supported for transport {transport!r}")
1118 if mode is constants._SendfileMode.TRY_NATIVE:
1119 try:
1120 return await self._sendfile_native(transport, file,
1121 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001122 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001123 if not fallback:
1124 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001125
1126 if not fallback:
1127 raise RuntimeError(
1128 f"fallback is disabled and native sendfile is not "
1129 f"supported for transport {transport!r}")
1130
Andrew Svetlov7c684072018-01-27 21:22:47 +02001131 return await self._sendfile_fallback(transport, file,
1132 offset, count)
1133
1134 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001135 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001136 "sendfile syscall is not supported")
1137
1138 async def _sendfile_fallback(self, transp, file, offset, count):
1139 if offset:
1140 file.seek(offset)
1141 blocksize = min(count, 16384) if count else 16384
1142 buf = bytearray(blocksize)
1143 total_sent = 0
1144 proto = _SendfileFallbackProtocol(transp)
1145 try:
1146 while True:
1147 if count:
1148 blocksize = min(count - total_sent, blocksize)
1149 if blocksize <= 0:
1150 return total_sent
1151 view = memoryview(buf)[:blocksize]
Andrew Svetlovb6ff2cd2019-06-15 14:55:52 +03001152 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov7c684072018-01-27 21:22:47 +02001153 if not read:
1154 return total_sent # EOF
1155 await proto.drain()
Miss Islington (bot)bb073212019-06-15 04:24:16 -07001156 transp.write(view[:read])
Andrew Svetlov7c684072018-01-27 21:22:47 +02001157 total_sent += read
1158 finally:
1159 if total_sent > 0 and hasattr(file, 'seek'):
1160 file.seek(offset + total_sent)
1161 await proto.restore()
1162
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001163 async def start_tls(self, transport, protocol, sslcontext, *,
1164 server_side=False,
1165 server_hostname=None,
1166 ssl_handshake_timeout=None):
1167 """Upgrade transport to TLS.
1168
1169 Return a new transport that *protocol* should start using
1170 immediately.
1171 """
1172 if ssl is None:
1173 raise RuntimeError('Python ssl module is not available')
1174
1175 if not isinstance(sslcontext, ssl.SSLContext):
1176 raise TypeError(
1177 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1178 f'got {sslcontext!r}')
1179
1180 if not getattr(transport, '_start_tls_compatible', False):
1181 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001182 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001183
1184 waiter = self.create_future()
1185 ssl_protocol = sslproto.SSLProtocol(
1186 self, protocol, sslcontext, waiter,
1187 server_side, server_hostname,
1188 ssl_handshake_timeout=ssl_handshake_timeout,
1189 call_connection_made=False)
1190
Yury Selivanovf2955872018-05-29 01:00:12 -04001191 # Pause early so that "ssl_protocol.data_received()" doesn't
1192 # have a chance to get called before "ssl_protocol.connection_made()".
1193 transport.pause_reading()
1194
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001195 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001196 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1197 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001198
Yury Selivanov96026432018-06-04 11:32:35 -04001199 try:
1200 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +02001201 except BaseException:
Yury Selivanov96026432018-06-04 11:32:35 -04001202 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001203 conmade_cb.cancel()
1204 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001205 raise
1206
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001207 return ssl_protocol._app_transport
1208
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001209 async def create_datagram_endpoint(self, protocol_factory,
1210 local_addr=None, remote_addr=None, *,
1211 family=0, proto=0, flags=0,
Miss Islington (bot)79c29742019-12-09 06:39:54 -08001212 reuse_address=_unset, reuse_port=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001213 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001214 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001215 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001216 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001217 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001218 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001219 if (local_addr or remote_addr or
1220 family or proto or flags or
Miss Islington (bot)79c29742019-12-09 06:39:54 -08001221 reuse_port or allow_broadcast):
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001222 # show the problematic kwargs in exception msg
1223 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1224 family=family, proto=proto, flags=flags,
1225 reuse_address=reuse_address, reuse_port=reuse_port,
1226 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001227 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001228 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001229 f'socket modifier keyword arguments can not be used '
1230 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001231 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001232 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001233 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001234 if not (local_addr or remote_addr):
1235 if family == 0:
1236 raise ValueError('unexpected address family')
1237 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001238 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1239 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001240 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001241 raise TypeError('string is expected')
Quentin Dawans56065d42019-04-09 15:40:59 +02001242
1243 if local_addr and local_addr[0] not in (0, '\x00'):
1244 try:
1245 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1246 os.remove(local_addr)
1247 except FileNotFoundError:
1248 pass
1249 except OSError as err:
1250 # Directory may have permissions only to create socket.
1251 logger.error('Unable to check or remove stale UNIX '
1252 'socket %r: %r',
1253 local_addr, err)
1254
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001255 addr_pairs_info = (((family, proto),
1256 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001257 else:
1258 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001259 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001260 for idx, addr in ((0, local_addr), (1, remote_addr)):
1261 if addr is not None:
1262 assert isinstance(addr, tuple) and len(addr) == 2, (
1263 '2-tuple is expected')
1264
Yury Selivanov19a44f62017-12-14 20:53:26 -05001265 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001266 addr, family=family, type=socket.SOCK_DGRAM,
1267 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001268 if not infos:
1269 raise OSError('getaddrinfo() returned empty list')
1270
1271 for fam, _, pro, _, address in infos:
1272 key = (fam, pro)
1273 if key not in addr_infos:
1274 addr_infos[key] = [None, None]
1275 addr_infos[key][idx] = address
1276
1277 # each addr has to have info for each (family, proto) pair
1278 addr_pairs_info = [
1279 (key, addr_pair) for key, addr_pair in addr_infos.items()
1280 if not ((local_addr and addr_pair[0] is None) or
1281 (remote_addr and addr_pair[1] is None))]
1282
1283 if not addr_pairs_info:
1284 raise ValueError('can not get address information')
1285
1286 exceptions = []
1287
Miss Islington (bot)79c29742019-12-09 06:39:54 -08001288 # bpo-37228
1289 if reuse_address is not _unset:
1290 if reuse_address:
1291 raise ValueError("Passing `reuse_address=True` is no "
1292 "longer supported, as the usage of "
1293 "SO_REUSEPORT in UDP poses a significant "
1294 "security concern.")
1295 else:
1296 warnings.warn("The *reuse_address* parameter has been "
1297 "deprecated as of 3.5.10 and is scheduled "
1298 "for removal in 3.11.", DeprecationWarning,
1299 stacklevel=2)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001300
1301 for ((family, proto),
1302 (local_address, remote_address)) in addr_pairs_info:
1303 sock = None
1304 r_addr = None
1305 try:
1306 sock = socket.socket(
1307 family=family, type=socket.SOCK_DGRAM, proto=proto)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001308 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001309 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001310 if allow_broadcast:
1311 sock.setsockopt(
1312 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1313 sock.setblocking(False)
1314
1315 if local_addr:
1316 sock.bind(local_address)
1317 if remote_addr:
Vincent Michel63deaa52019-05-07 19:18:49 +02001318 if not allow_broadcast:
1319 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001320 r_addr = remote_address
1321 except OSError as exc:
1322 if sock is not None:
1323 sock.close()
1324 exceptions.append(exc)
1325 except:
1326 if sock is not None:
1327 sock.close()
1328 raise
1329 else:
1330 break
1331 else:
1332 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001333
1334 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001335 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001336 transport = self._make_datagram_transport(
1337 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001338 if self._debug:
1339 if local_addr:
1340 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1341 "created: (%r, %r)",
1342 local_addr, remote_addr, transport, protocol)
1343 else:
1344 logger.debug("Datagram endpoint remote_addr=%r created: "
1345 "(%r, %r)",
1346 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001347
1348 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001349 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001350 except:
1351 transport.close()
1352 raise
1353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001354 return transport, protocol
1355
Yury Selivanov19a44f62017-12-14 20:53:26 -05001356 async def _ensure_resolved(self, address, *,
1357 family=0, type=socket.SOCK_STREAM,
1358 proto=0, flags=0, loop):
1359 host, port = address[:2]
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +02001360 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
Yury Selivanov19a44f62017-12-14 20:53:26 -05001361 if info is not None:
1362 # "host" is already a resolved IP.
1363 return [info]
1364 else:
1365 return await loop.getaddrinfo(host, port, family=family, type=type,
1366 proto=proto, flags=flags)
1367
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001368 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001369 infos = await self._ensure_resolved((host, port), family=family,
1370 type=socket.SOCK_STREAM,
1371 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001372 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001373 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001374 return infos
1375
Neil Aspinallf7686c12017-12-19 19:45:42 +00001376 async def create_server(
1377 self, protocol_factory, host=None, port=None,
1378 *,
1379 family=socket.AF_UNSPEC,
1380 flags=socket.AI_PASSIVE,
1381 sock=None,
1382 backlog=100,
1383 ssl=None,
1384 reuse_address=None,
1385 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001386 ssl_handshake_timeout=None,
1387 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001388 """Create a TCP server.
1389
Yury Selivanov6370f342017-12-10 18:36:12 -05001390 The host parameter can be a string, in that case the TCP server is
1391 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001392
1393 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001394 the TCP server is bound to all hosts of the sequence. If a host
1395 appears multiple times (possibly indirectly e.g. when hostnames
1396 resolve to the same IP address), the server is only bound once to that
1397 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001398
Victor Stinneracdb7822014-07-14 18:33:40 +02001399 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001400
1401 This method is a coroutine.
1402 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001403 if isinstance(ssl, bool):
1404 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001405
1406 if ssl_handshake_timeout is not None and ssl is None:
1407 raise ValueError(
1408 'ssl_handshake_timeout is only meaningful with ssl')
1409
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001410 if host is not None or port is not None:
1411 if sock is not None:
1412 raise ValueError(
1413 'host/port and sock can not be specified at the same time')
1414
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001415 if reuse_address is None:
1416 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1417 sockets = []
1418 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001419 hosts = [None]
1420 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001421 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001422 hosts = [host]
1423 else:
1424 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001425
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001426 fs = [self._create_server_getaddrinfo(host, port, family=family,
1427 flags=flags)
1428 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001429 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001430 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001431
1432 completed = False
1433 try:
1434 for res in infos:
1435 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001436 try:
1437 sock = socket.socket(af, socktype, proto)
1438 except socket.error:
1439 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001440 if self._debug:
1441 logger.warning('create_server() failed to create '
1442 'socket.socket(%r, %r, %r)',
1443 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001444 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001445 sockets.append(sock)
1446 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001447 sock.setsockopt(
1448 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1449 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001450 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001451 # Disable IPv4/IPv6 dual stack support (enabled by
1452 # default on Linux) which makes a single socket
1453 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001454 if (_HAS_IPv6 and
1455 af == socket.AF_INET6 and
1456 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001457 sock.setsockopt(socket.IPPROTO_IPV6,
1458 socket.IPV6_V6ONLY,
1459 True)
1460 try:
1461 sock.bind(sa)
1462 except OSError as err:
1463 raise OSError(err.errno, 'error while attempting '
1464 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001465 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001466 completed = True
1467 finally:
1468 if not completed:
1469 for sock in sockets:
1470 sock.close()
1471 else:
1472 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001473 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001474 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001475 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001476 sockets = [sock]
1477
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001478 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001479 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001480
1481 server = Server(self, sockets, protocol_factory,
1482 ssl, backlog, ssl_handshake_timeout)
1483 if start_serving:
1484 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001485 # Skip one loop iteration so that all 'loop.add_reader'
1486 # go through.
1487 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001488
Victor Stinnere912e652014-07-12 03:11:53 +02001489 if self._debug:
1490 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001491 return server
1492
Neil Aspinallf7686c12017-12-19 19:45:42 +00001493 async def connect_accepted_socket(
1494 self, protocol_factory, sock,
1495 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001496 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001497 """Handle an accepted connection.
1498
1499 This is used by servers that accept connections outside of
1500 asyncio but that use asyncio to handle connections.
1501
1502 This method is a coroutine. When completed, the coroutine
1503 returns a (transport, protocol) pair.
1504 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001505 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001506 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001507
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001508 if ssl_handshake_timeout is not None and not ssl:
1509 raise ValueError(
1510 'ssl_handshake_timeout is only meaningful with ssl')
1511
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001512 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001513 sock, protocol_factory, ssl, '', server_side=True,
1514 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001515 if self._debug:
1516 # Get the socket from the transport because SSL transport closes
1517 # the old socket and creates a new SSL socket
1518 sock = transport.get_extra_info('socket')
1519 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1520 return transport, protocol
1521
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001522 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001523 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001524 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001525 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001526
1527 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001528 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001529 except:
1530 transport.close()
1531 raise
1532
Victor Stinneracdb7822014-07-14 18:33:40 +02001533 if self._debug:
1534 logger.debug('Read pipe %r connected: (%r, %r)',
1535 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001536 return transport, protocol
1537
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001538 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001539 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001540 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001541 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001542
1543 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001544 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001545 except:
1546 transport.close()
1547 raise
1548
Victor Stinneracdb7822014-07-14 18:33:40 +02001549 if self._debug:
1550 logger.debug('Write pipe %r connected: (%r, %r)',
1551 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001552 return transport, protocol
1553
Victor Stinneracdb7822014-07-14 18:33:40 +02001554 def _log_subprocess(self, msg, stdin, stdout, stderr):
1555 info = [msg]
1556 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001557 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001558 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001559 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001560 else:
1561 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001562 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001563 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001564 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001565 logger.debug(' '.join(info))
1566
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001567 async def subprocess_shell(self, protocol_factory, cmd, *,
1568 stdin=subprocess.PIPE,
1569 stdout=subprocess.PIPE,
1570 stderr=subprocess.PIPE,
1571 universal_newlines=False,
1572 shell=True, bufsize=0,
sbstpf0d4c642019-05-27 19:51:19 -04001573 encoding=None, errors=None, text=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001574 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001575 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001576 raise ValueError("cmd must be a string")
1577 if universal_newlines:
1578 raise ValueError("universal_newlines must be False")
1579 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001580 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001581 if bufsize != 0:
1582 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001583 if text:
1584 raise ValueError("text must be False")
1585 if encoding is not None:
1586 raise ValueError("encoding must be None")
1587 if errors is not None:
1588 raise ValueError("errors must be None")
1589
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001590 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001591 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001592 if self._debug:
1593 # don't log parameters: they may contain sensitive information
1594 # (password) and may be too long
1595 debug_log = 'run shell command %r' % cmd
1596 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001597 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001598 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001599 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001600 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001601 return transport, protocol
1602
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001603 async def subprocess_exec(self, protocol_factory, program, *args,
1604 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1605 stderr=subprocess.PIPE, universal_newlines=False,
sbstpf0d4c642019-05-27 19:51:19 -04001606 shell=False, bufsize=0,
1607 encoding=None, errors=None, text=None,
1608 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001609 if universal_newlines:
1610 raise ValueError("universal_newlines must be False")
1611 if shell:
1612 raise ValueError("shell must be False")
1613 if bufsize != 0:
1614 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001615 if text:
1616 raise ValueError("text must be False")
1617 if encoding is not None:
1618 raise ValueError("encoding must be None")
1619 if errors is not None:
1620 raise ValueError("errors must be None")
1621
Victor Stinner20e07432014-02-11 11:44:56 +01001622 popen_args = (program,) + args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001623 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001624 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001625 if self._debug:
1626 # don't log parameters: they may contain sensitive information
1627 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001628 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001629 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001630 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001631 protocol, popen_args, False, stdin, stdout, stderr,
1632 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001633 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001634 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001635 return transport, protocol
1636
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001637 def get_exception_handler(self):
1638 """Return an exception handler, or None if the default one is in use.
1639 """
1640 return self._exception_handler
1641
Yury Selivanov569efa22014-02-18 18:02:19 -05001642 def set_exception_handler(self, handler):
1643 """Set handler as the new event loop exception handler.
1644
1645 If handler is None, the default exception handler will
1646 be set.
1647
1648 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001649 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001650 will be a reference to the active event loop, 'context'
1651 will be a dict object (see `call_exception_handler()`
1652 documentation for details about context).
1653 """
1654 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001655 raise TypeError(f'A callable object or None is expected, '
1656 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001657 self._exception_handler = handler
1658
1659 def default_exception_handler(self, context):
1660 """Default exception handler.
1661
1662 This is called when an exception occurs and no exception
1663 handler is set, and can be called by a custom exception
1664 handler that wants to defer to the default behavior.
1665
Antoine Pitrou921e9432017-11-07 17:23:29 +01001666 This default handler logs the error message and other
1667 context-dependent information. In debug mode, a truncated
1668 stack trace is also appended showing where the given object
1669 (e.g. a handle or future or task) was created, if any.
1670
Victor Stinneracdb7822014-07-14 18:33:40 +02001671 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001672 `call_exception_handler()`.
1673 """
1674 message = context.get('message')
1675 if not message:
1676 message = 'Unhandled exception in event loop'
1677
1678 exception = context.get('exception')
1679 if exception is not None:
1680 exc_info = (type(exception), exception, exception.__traceback__)
1681 else:
1682 exc_info = False
1683
Yury Selivanov6370f342017-12-10 18:36:12 -05001684 if ('source_traceback' not in context and
1685 self._current_handle is not None and
1686 self._current_handle._source_traceback):
1687 context['handle_traceback'] = \
1688 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001689
Yury Selivanov569efa22014-02-18 18:02:19 -05001690 log_lines = [message]
1691 for key in sorted(context):
1692 if key in {'message', 'exception'}:
1693 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001694 value = context[key]
1695 if key == 'source_traceback':
1696 tb = ''.join(traceback.format_list(value))
1697 value = 'Object created at (most recent call last):\n'
1698 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001699 elif key == 'handle_traceback':
1700 tb = ''.join(traceback.format_list(value))
1701 value = 'Handle created at (most recent call last):\n'
1702 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001703 else:
1704 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001705 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001706
1707 logger.error('\n'.join(log_lines), exc_info=exc_info)
1708
1709 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001710 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001711
Victor Stinneracdb7822014-07-14 18:33:40 +02001712 The context argument is a dict containing the following keys:
1713
Yury Selivanov569efa22014-02-18 18:02:19 -05001714 - 'message': Error message;
1715 - 'exception' (optional): Exception object;
1716 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001717 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001718 - 'handle' (optional): Handle instance;
1719 - 'protocol' (optional): Protocol instance;
1720 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001721 - 'socket' (optional): Socket instance;
1722 - 'asyncgen' (optional): Asynchronous generator that caused
1723 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001724
Victor Stinneracdb7822014-07-14 18:33:40 +02001725 New keys maybe introduced in the future.
1726
1727 Note: do not overload this method in an event loop subclass.
1728 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001729 `set_exception_handler()` method.
1730 """
1731 if self._exception_handler is None:
1732 try:
1733 self.default_exception_handler(context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001734 except (SystemExit, KeyboardInterrupt):
1735 raise
1736 except BaseException:
Yury Selivanov569efa22014-02-18 18:02:19 -05001737 # Second protection layer for unexpected errors
1738 # in the default implementation, as well as for subclassed
1739 # event loops with overloaded "default_exception_handler".
1740 logger.error('Exception in default exception handler',
1741 exc_info=True)
1742 else:
1743 try:
1744 self._exception_handler(self, context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001745 except (SystemExit, KeyboardInterrupt):
1746 raise
1747 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001748 # Exception in the user set custom exception handler.
1749 try:
1750 # Let's try default handler.
1751 self.default_exception_handler({
1752 'message': 'Unhandled error in exception handler',
1753 'exception': exc,
1754 'context': context,
1755 })
Yury Selivanov431b5402019-05-27 14:45:12 +02001756 except (SystemExit, KeyboardInterrupt):
1757 raise
1758 except BaseException:
Victor Stinneracdb7822014-07-14 18:33:40 +02001759 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001760 # overloaded.
1761 logger.error('Exception in default exception handler '
1762 'while handling an unexpected error '
1763 'in custom exception handler',
1764 exc_info=True)
1765
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001766 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001767 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001768 assert isinstance(handle, events.Handle), 'A Handle is required here'
1769 if handle._cancelled:
1770 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001771 assert not isinstance(handle, events.TimerHandle)
1772 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001773
1774 def _add_callback_signalsafe(self, handle):
1775 """Like _add_callback() but called from a signal handler."""
1776 self._add_callback(handle)
1777 self._write_to_self()
1778
Yury Selivanov592ada92014-09-25 12:07:56 -04001779 def _timer_handle_cancelled(self, handle):
1780 """Notification that a TimerHandle has been cancelled."""
1781 if handle._scheduled:
1782 self._timer_cancelled_count += 1
1783
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001784 def _run_once(self):
1785 """Run one full iteration of the event loop.
1786
1787 This calls all currently ready callbacks, polls for I/O,
1788 schedules the resulting callbacks, and finally schedules
1789 'call_later' callbacks.
1790 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001791
Yury Selivanov592ada92014-09-25 12:07:56 -04001792 sched_count = len(self._scheduled)
1793 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1794 self._timer_cancelled_count / sched_count >
1795 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001796 # Remove delayed calls that were cancelled if their number
1797 # is too high
1798 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001799 for handle in self._scheduled:
1800 if handle._cancelled:
1801 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001802 else:
1803 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001804
Victor Stinner68da8fc2014-09-30 18:08:36 +02001805 heapq.heapify(new_scheduled)
1806 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001807 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001808 else:
1809 # Remove delayed calls that were cancelled from head of queue.
1810 while self._scheduled and self._scheduled[0]._cancelled:
1811 self._timer_cancelled_count -= 1
1812 handle = heapq.heappop(self._scheduled)
1813 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001814
1815 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001816 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001817 timeout = 0
1818 elif self._scheduled:
1819 # Compute the desired timeout.
1820 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001821 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001822
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001823 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001824 self._process_events(event_list)
1825
1826 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001827 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001828 while self._scheduled:
1829 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001830 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001831 break
1832 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001833 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001834 self._ready.append(handle)
1835
1836 # This is the only place where callbacks are actually *called*.
1837 # All other places just add them to ready.
1838 # Note: We run all currently scheduled callbacks, but not any
1839 # callbacks scheduled by callbacks run this time around --
1840 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001841 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001842 ntodo = len(self._ready)
1843 for i in range(ntodo):
1844 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001845 if handle._cancelled:
1846 continue
1847 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001848 try:
1849 self._current_handle = handle
1850 t0 = self.time()
1851 handle._run()
1852 dt = self.time() - t0
1853 if dt >= self.slow_callback_duration:
1854 logger.warning('Executing %s took %.3f seconds',
1855 _format_handle(handle), dt)
1856 finally:
1857 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001858 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001859 handle._run()
1860 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001861
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001862 def _set_coroutine_origin_tracking(self, enabled):
1863 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001864 return
1865
Yury Selivanove8944cb2015-05-12 11:43:04 -04001866 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001867 self._coroutine_origin_tracking_saved_depth = (
1868 sys.get_coroutine_origin_tracking_depth())
1869 sys.set_coroutine_origin_tracking_depth(
1870 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001871 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001872 sys.set_coroutine_origin_tracking_depth(
1873 self._coroutine_origin_tracking_saved_depth)
1874
1875 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001876
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001877 def get_debug(self):
1878 return self._debug
1879
1880 def set_debug(self, enabled):
1881 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001882
Yury Selivanove8944cb2015-05-12 11:43:04 -04001883 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001884 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)