blob: bfd40115bed38ac10a2b59a4eed5882d081d7d48 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
twisteroid ambassador88f07a82019-05-05 19:14:35 +080019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020021import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
Quentin Dawans56065d42019-04-09 15:40:59 +020024import stat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010026import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020028import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010030import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070031import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Yury Selivanovf111b3d2017-12-30 00:35:36 -050033try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
37
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080038from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020039from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070041from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020043from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050044from . import sslproto
twisteroid ambassador88f07a82019-05-05 19:14:35 +080045from . import staggered
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020047from . import transports
Yury Selivanov8cd51652019-05-27 15:57:20 +020048from . import trsock
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070049from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov6370f342017-12-10 18:36:12 -050052__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
54
Yury Selivanov592ada92014-09-25 12:07:56 -040055# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
Andrew Svetlov0dd71802018-09-12 14:03:54 -070063
Yury Selivanovd904c232018-06-28 21:59:32 -040064_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
MartinAltmayer944451c2018-07-31 15:06:12 +010066# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
Miss Islington (bot)79c29742019-12-09 06:39:54 -080069# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
Victor Stinnerc94a93a2016-04-01 21:43:39 +020073
Victor Stinner0e6f52a2014-06-20 17:34:15 +020074def _format_handle(handle):
75 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040076 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020077 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
81
82
Victor Stinneracdb7822014-07-14 18:33:40 +020083def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
90
91
Yury Selivanov5587d7c2016-09-15 15:45:07 -040092def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
101
102
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
108
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500111 return None
112
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500113 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500114 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500115 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500116 proto = socket.IPPROTO_UDP
117 else:
118 return None
119
Yury Selivanova7146162016-06-02 16:51:07 -0400120 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400121 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400132
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500134 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400135 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500136 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400137 else:
138 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500145 return None
146
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400147 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 socket.inet_pton(af, host)
150 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400151 if _HAS_IPv6 and af == socket.AF_INET6:
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200152 return af, type, proto, '', (host, port, flowinfo, scopeid)
Yury Selivanovd904c232018-06-28 21:59:32 -0400153 else:
154 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400155 except OSError:
156 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500157
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400158 # "host" is not an IP address.
159 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500160
161
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163 """Interleave list of addrinfo tuples by family."""
164 # Group addresses by family
165 addrinfos_by_family = collections.OrderedDict()
166 for addr in addrinfos:
167 family = addr[0]
168 if family not in addrinfos_by_family:
169 addrinfos_by_family[family] = []
170 addrinfos_by_family[family].append(addr)
171 addrinfos_lists = list(addrinfos_by_family.values())
172
173 reordered = []
174 if first_address_family_count > 1:
175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176 del addrinfos_lists[0][:first_address_family_count - 1]
177 reordered.extend(
178 a for a in itertools.chain.from_iterable(
179 itertools.zip_longest(*addrinfos_lists)
180 ) if a is not None)
181 return reordered
182
183
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100184def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500185 if not fut.cancelled():
186 exc = fut.exception()
Yury Selivanov431b5402019-05-27 14:45:12 +0200187 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500188 # Issue #22429: run_forever() already finished, no need to
189 # stop it.
190 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500191 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100192
193
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200194if hasattr(socket, 'TCP_NODELAY'):
195 def _set_nodelay(sock):
196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197 sock.type == socket.SOCK_STREAM and
198 sock.proto == socket.IPPROTO_TCP):
199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201 def _set_nodelay(sock):
202 pass
203
204
Andrew Svetlov7c684072018-01-27 21:22:47 +0200205class _SendfileFallbackProtocol(protocols.Protocol):
206 def __init__(self, transp):
207 if not isinstance(transp, transports._FlowControlMixin):
208 raise TypeError("transport should be _FlowControlMixin instance")
209 self._transport = transp
210 self._proto = transp.get_protocol()
211 self._should_resume_reading = transp.is_reading()
212 self._should_resume_writing = transp._protocol_paused
213 transp.pause_reading()
214 transp.set_protocol(self)
215 if self._should_resume_writing:
216 self._write_ready_fut = self._transport._loop.create_future()
217 else:
218 self._write_ready_fut = None
219
220 async def drain(self):
221 if self._transport.is_closing():
222 raise ConnectionError("Connection closed by peer")
223 fut = self._write_ready_fut
224 if fut is None:
225 return
226 await fut
227
228 def connection_made(self, transport):
229 raise RuntimeError("Invalid state: "
230 "connection should have been established already.")
231
232 def connection_lost(self, exc):
233 if self._write_ready_fut is not None:
234 # Never happens if peer disconnects after sending the whole content
235 # Thus disconnection is always an exception from user perspective
236 if exc is None:
237 self._write_ready_fut.set_exception(
238 ConnectionError("Connection is closed by peer"))
239 else:
240 self._write_ready_fut.set_exception(exc)
241 self._proto.connection_lost(exc)
242
243 def pause_writing(self):
244 if self._write_ready_fut is not None:
245 return
246 self._write_ready_fut = self._transport._loop.create_future()
247
248 def resume_writing(self):
249 if self._write_ready_fut is None:
250 return
251 self._write_ready_fut.set_result(False)
252 self._write_ready_fut = None
253
254 def data_received(self, data):
255 raise RuntimeError("Invalid state: reading should be paused")
256
257 def eof_received(self):
258 raise RuntimeError("Invalid state: reading should be paused")
259
260 async def restore(self):
261 self._transport.set_protocol(self._proto)
262 if self._should_resume_reading:
263 self._transport.resume_reading()
264 if self._write_ready_fut is not None:
265 # Cancel the future.
266 # Basically it has no effect because protocol is switched back,
267 # no code should wait for it anymore.
268 self._write_ready_fut.cancel()
269 if self._should_resume_writing:
270 self._proto.resume_writing()
271
272
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273class Server(events.AbstractServer):
274
Yury Selivanovc9070d02018-01-25 18:08:09 -0500275 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
276 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200277 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500278 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200279 self._active_count = 0
280 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500281 self._protocol_factory = protocol_factory
282 self._backlog = backlog
283 self._ssl_context = ssl_context
284 self._ssl_handshake_timeout = ssl_handshake_timeout
285 self._serving = False
286 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
Victor Stinnere912e652014-07-12 03:11:53 +0200288 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500289 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200290
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200291 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500292 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200293 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200295 def _detach(self):
296 assert self._active_count > 0
297 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500298 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 self._wakeup()
300
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200302 waiters = self._waiters
303 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 for waiter in waiters:
305 if not waiter.done():
306 waiter.set_result(waiter)
307
Yury Selivanovc9070d02018-01-25 18:08:09 -0500308 def _start_serving(self):
309 if self._serving:
310 return
311 self._serving = True
312 for sock in self._sockets:
313 sock.listen(self._backlog)
314 self._loop._start_serving(
315 self._protocol_factory, sock, self._ssl_context,
316 self, self._backlog, self._ssl_handshake_timeout)
317
318 def get_loop(self):
319 return self._loop
320
321 def is_serving(self):
322 return self._serving
323
324 @property
325 def sockets(self):
326 if self._sockets is None:
Yury Selivanov8cd51652019-05-27 15:57:20 +0200327 return ()
328 return tuple(trsock.TransportSocket(s) for s in self._sockets)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500329
330 def close(self):
331 sockets = self._sockets
332 if sockets is None:
333 return
334 self._sockets = None
335
336 for sock in sockets:
337 self._loop._stop_serving(sock)
338
339 self._serving = False
340
341 if (self._serving_forever_fut is not None and
342 not self._serving_forever_fut.done()):
343 self._serving_forever_fut.cancel()
344 self._serving_forever_fut = None
345
346 if self._active_count == 0:
347 self._wakeup()
348
349 async def start_serving(self):
350 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400351 # Skip one loop iteration so that all 'loop.add_reader'
352 # go through.
353 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500354
355 async def serve_forever(self):
356 if self._serving_forever_fut is not None:
357 raise RuntimeError(
358 f'server {self!r} is already being awaited on serve_forever()')
359 if self._sockets is None:
360 raise RuntimeError(f'server {self!r} is closed')
361
362 self._start_serving()
363 self._serving_forever_fut = self._loop.create_future()
364
365 try:
366 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700367 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500368 try:
369 self.close()
370 await self.wait_closed()
371 finally:
372 raise
373 finally:
374 self._serving_forever_fut = None
375
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200376 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500377 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400379 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200380 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200381 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382
383
384class BaseEventLoop(events.AbstractEventLoop):
385
386 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400387 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200388 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800389 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 self._ready = collections.deque()
391 self._scheduled = []
392 self._default_executor = None
393 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100394 # Identifier of the thread running the event loop, or None if the
395 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100396 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100397 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500398 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800399 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200400 # In debug mode, if the execution of a callback or a step of a task
401 # exceed this duration in seconds, the slow callback/task is logged.
402 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100403 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400404 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800405 self._coroutine_origin_tracking_enabled = False
406 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500408 # A weak set of all asynchronous generators that are
409 # being iterated by the loop.
410 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700411 # Set to True when `loop.shutdown_asyncgens` is called.
412 self._asyncgens_shutdown_called = False
413
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200414 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500415 return (
416 f'<{self.__class__.__name__} running={self.is_running()} '
417 f'closed={self.is_closed()} debug={self.get_debug()}>'
418 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200419
Yury Selivanov7661db62016-05-16 15:38:39 -0400420 def create_future(self):
421 """Create a Future object attached to the loop."""
422 return futures.Future(loop=self)
423
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300424 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200425 """Schedule a coroutine object.
426
Victor Stinneracdb7822014-07-14 18:33:40 +0200427 Return a task object.
428 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100429 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400430 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300431 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400432 if task._source_traceback:
433 del task._source_traceback[-1]
434 else:
435 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300436 tasks._set_task_name(task, name)
437
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200438 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200439
Yury Selivanov740169c2015-05-11 14:23:38 -0400440 def set_task_factory(self, factory):
441 """Set a task factory that will be used by loop.create_task().
442
443 If factory is None the default task factory will be set.
444
445 If factory is a callable, it should have a signature matching
446 '(loop, coro)', where 'loop' will be a reference to the active
447 event loop, 'coro' will be a coroutine object. The callable
448 must return a Future.
449 """
450 if factory is not None and not callable(factory):
451 raise TypeError('task factory must be a callable or None')
452 self._task_factory = factory
453
454 def get_task_factory(self):
455 """Return a task factory, or None if the default one is in use."""
456 return self._task_factory
457
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 def _make_socket_transport(self, sock, protocol, waiter=None, *,
459 extra=None, server=None):
460 """Create socket transport."""
461 raise NotImplementedError
462
Neil Aspinallf7686c12017-12-19 19:45:42 +0000463 def _make_ssl_transport(
464 self, rawsock, protocol, sslcontext, waiter=None,
465 *, server_side=False, server_hostname=None,
466 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500467 ssl_handshake_timeout=None,
468 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 """Create SSL transport."""
470 raise NotImplementedError
471
472 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200473 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 """Create datagram transport."""
475 raise NotImplementedError
476
477 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
478 extra=None):
479 """Create read pipe transport."""
480 raise NotImplementedError
481
482 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
483 extra=None):
484 """Create write pipe transport."""
485 raise NotImplementedError
486
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200487 async def _make_subprocess_transport(self, protocol, args, shell,
488 stdin, stdout, stderr, bufsize,
489 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 """Create subprocess transport."""
491 raise NotImplementedError
492
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200494 """Write a byte to self-pipe, to wake up the event loop.
495
496 This may be called from a different thread.
497
498 The subclass is responsible for implementing the self-pipe.
499 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 raise NotImplementedError
501
502 def _process_events(self, event_list):
503 """Process selector events."""
504 raise NotImplementedError
505
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200506 def _check_closed(self):
507 if self._closed:
508 raise RuntimeError('Event loop is closed')
509
Yury Selivanoveb636452016-09-08 22:01:51 -0700510 def _asyncgen_finalizer_hook(self, agen):
511 self._asyncgens.discard(agen)
512 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800513 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700514
515 def _asyncgen_firstiter_hook(self, agen):
516 if self._asyncgens_shutdown_called:
517 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500518 f"asynchronous generator {agen!r} was scheduled after "
519 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700520 ResourceWarning, source=self)
521
522 self._asyncgens.add(agen)
523
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200524 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700525 """Shutdown all active asynchronous generators."""
526 self._asyncgens_shutdown_called = True
527
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500528 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400529 # If Python version is <3.6 or we don't have any asynchronous
530 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700531 return
532
533 closing_agens = list(self._asyncgens)
534 self._asyncgens.clear()
535
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200536 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700537 *[ag.aclose() for ag in closing_agens],
538 return_exceptions=True,
539 loop=self)
540
Yury Selivanoveb636452016-09-08 22:01:51 -0700541 for result, agen in zip(results, closing_agens):
542 if isinstance(result, Exception):
543 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500544 'message': f'an error occurred during closing of '
545 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700546 'exception': result,
547 'asyncgen': agen
548 })
549
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 def run_forever(self):
551 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200552 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100553 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400554 raise RuntimeError('This event loop is already running')
555 if events._get_running_loop() is not None:
556 raise RuntimeError(
557 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800558 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100559 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500560
561 old_agen_hooks = sys.get_asyncgen_hooks()
562 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
563 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400565 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800567 self._run_once()
568 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 break
570 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800571 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100572 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400573 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800574 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500575 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576
577 def run_until_complete(self, future):
578 """Run until the Future is done.
579
580 If the argument is a coroutine, it is wrapped in a Task.
581
Victor Stinneracdb7822014-07-14 18:33:40 +0200582 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 with the same coroutine twice -- it would wrap it in two
584 different Tasks and that can't be good.
585
586 Return the Future's result, or raise its exception.
587 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200588 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200589
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700590 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400591 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200592 if new_task:
593 # An exception is raised if the future didn't complete, so there
594 # is no need to log the "destroy pending task" message
595 future._log_destroy_pending = False
596
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100597 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200598 try:
599 self.run_forever()
600 except:
601 if new_task and future.done() and not future.cancelled():
602 # The coroutine raised a BaseException. Consume the exception
603 # to not log a warning, the caller doesn't have access to the
604 # local task.
605 future.exception()
606 raise
jimmylai21b3e042017-05-22 22:32:46 -0700607 finally:
608 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 if not future.done():
610 raise RuntimeError('Event loop stopped before Future completed.')
611
612 return future.result()
613
614 def stop(self):
615 """Stop running the event loop.
616
Guido van Rossum41f69f42015-11-19 13:28:47 -0800617 Every callback already scheduled will still run. This simply informs
618 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800620 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200622 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700623 """Close the event loop.
624
625 This clears the queues and shuts down the executor,
626 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200627
628 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700629 """
Victor Stinner956de692014-12-26 21:07:52 +0100630 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200631 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200632 if self._closed:
633 return
Victor Stinnere912e652014-07-12 03:11:53 +0200634 if self._debug:
635 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400636 self._closed = True
637 self._ready.clear()
638 self._scheduled.clear()
639 executor = self._default_executor
640 if executor is not None:
641 self._default_executor = None
Łukasz Langa7f9a2ae2019-06-04 13:03:20 +0200642 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200643
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200644 def is_closed(self):
645 """Returns True if the event loop was closed."""
646 return self._closed
647
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100648 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900649 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100650 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900651 if not self.is_running():
652 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100653
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200655 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100656 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657
658 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200659 """Return the time according to the event loop's clock.
660
661 This is a float expressed in seconds since an epoch, but the
662 epoch, precision, accuracy and drift are unspecified and may
663 differ per event loop.
664 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665 return time.monotonic()
666
Yury Selivanovf23746a2018-01-22 19:11:18 -0500667 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 """Arrange for a callback to be called at a given time.
669
670 Return a Handle: an opaque object with a cancel() method that
671 can be used to cancel the call.
672
673 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200674 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675
676 Each callback will be called exactly once. If two callbacks
677 are scheduled for exactly the same time, it undefined which
678 will be called first.
679
680 Any positional arguments after the callback will be passed to
681 the callback when it is called.
682 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500683 timer = self.call_at(self.time() + delay, callback, *args,
684 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200685 if timer._source_traceback:
686 del timer._source_traceback[-1]
687 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688
Yury Selivanovf23746a2018-01-22 19:11:18 -0500689 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200690 """Like call_later(), but uses an absolute time.
691
692 Absolute time corresponds to the event loop's time() method.
693 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100694 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100695 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100696 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700697 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500698 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200699 if timer._source_traceback:
700 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400702 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703 return timer
704
Yury Selivanovf23746a2018-01-22 19:11:18 -0500705 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 """Arrange for a callback to be called as soon as possible.
707
Victor Stinneracdb7822014-07-14 18:33:40 +0200708 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 order in which they are registered. Each callback will be
710 called exactly once.
711
712 Any positional arguments after the callback will be passed to
713 the callback when it is called.
714 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700715 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100716 if self._debug:
717 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700718 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500719 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200720 if handle._source_traceback:
721 del handle._source_traceback[-1]
722 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100723
Yury Selivanov491a9122016-11-03 15:09:24 -0700724 def _check_callback(self, callback, method):
725 if (coroutines.iscoroutine(callback) or
726 coroutines.iscoroutinefunction(callback)):
727 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500728 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700729 if not callable(callback):
730 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500731 f'a callable object was expected by {method}(), '
732 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700733
Yury Selivanovf23746a2018-01-22 19:11:18 -0500734 def _call_soon(self, callback, args, context):
735 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200736 if handle._source_traceback:
737 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 self._ready.append(handle)
739 return handle
740
Victor Stinner956de692014-12-26 21:07:52 +0100741 def _check_thread(self):
742 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100743
Victor Stinneracdb7822014-07-14 18:33:40 +0200744 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100745 likely behave incorrectly when the assumption is violated.
746
Victor Stinneracdb7822014-07-14 18:33:40 +0200747 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100748 responsible for checking this condition for performance reasons.
749 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100750 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200751 return
Victor Stinner956de692014-12-26 21:07:52 +0100752 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100753 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100754 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200755 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100756 "than the current one")
757
Yury Selivanovf23746a2018-01-22 19:11:18 -0500758 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200759 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700760 self._check_closed()
761 if self._debug:
762 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500763 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200764 if handle._source_traceback:
765 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700766 self._write_to_self()
767 return handle
768
Yury Selivanovbec23722018-01-28 14:09:40 -0500769 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100770 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700771 if self._debug:
772 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700773 if executor is None:
774 executor = self._default_executor
775 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400776 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500778 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500779 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700780
781 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100782 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
783 warnings.warn(
784 'Using the default executor that is not an instance of '
785 'ThreadPoolExecutor is deprecated and will be prohibited '
786 'in Python 3.9',
787 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788 self._default_executor = executor
789
Victor Stinnere912e652014-07-12 03:11:53 +0200790 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500791 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200792 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500793 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200794 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500795 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200796 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500797 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200798 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500799 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200800 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200801 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200802
803 t0 = self.time()
804 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
805 dt = self.time() - t0
806
Yury Selivanov6370f342017-12-10 18:36:12 -0500807 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200808 if dt >= self.slow_callback_duration:
809 logger.info(msg)
810 else:
811 logger.debug(msg)
812 return addrinfo
813
Yury Selivanov19a44f62017-12-14 20:53:26 -0500814 async def getaddrinfo(self, host, port, *,
815 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400816 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500817 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200818 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500819 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700820
Yury Selivanov19a44f62017-12-14 20:53:26 -0500821 return await self.run_in_executor(
822 None, getaddr_func, host, port, family, type, proto, flags)
823
824 async def getnameinfo(self, sockaddr, flags=0):
825 return await self.run_in_executor(
826 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200828 async def sock_sendfile(self, sock, file, offset=0, count=None,
829 *, fallback=True):
830 if self._debug and sock.gettimeout() != 0:
831 raise ValueError("the socket must be non-blocking")
832 self._check_sendfile_params(sock, file, offset, count)
833 try:
834 return await self._sock_sendfile_native(sock, file,
835 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700836 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200837 if not fallback:
838 raise
839 return await self._sock_sendfile_fallback(sock, file,
840 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200841
842 async def _sock_sendfile_native(self, sock, file, offset, count):
843 # NB: sendfile syscall is not supported for SSL sockets and
844 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700845 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200846 f"syscall sendfile is not available for socket {sock!r} "
847 "and file {file!r} combination")
848
849 async def _sock_sendfile_fallback(self, sock, file, offset, count):
850 if offset:
851 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400852 blocksize = (
853 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
854 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
855 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200856 buf = bytearray(blocksize)
857 total_sent = 0
858 try:
859 while True:
860 if count:
861 blocksize = min(count - total_sent, blocksize)
862 if blocksize <= 0:
863 break
864 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400865 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200866 if not read:
867 break # EOF
Miss Islington (bot)bb073212019-06-15 04:24:16 -0700868 await self.sock_sendall(sock, view[:read])
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200869 total_sent += read
870 return total_sent
871 finally:
872 if total_sent > 0 and hasattr(file, 'seek'):
873 file.seek(offset + total_sent)
874
875 def _check_sendfile_params(self, sock, file, offset, count):
876 if 'b' not in getattr(file, 'mode', 'b'):
877 raise ValueError("file should be opened in binary mode")
878 if not sock.type == socket.SOCK_STREAM:
879 raise ValueError("only SOCK_STREAM type sockets are supported")
880 if count is not None:
881 if not isinstance(count, int):
882 raise TypeError(
883 "count must be a positive integer (got {!r})".format(count))
884 if count <= 0:
885 raise ValueError(
886 "count must be a positive integer (got {!r})".format(count))
887 if not isinstance(offset, int):
888 raise TypeError(
889 "offset must be a non-negative integer (got {!r})".format(
890 offset))
891 if offset < 0:
892 raise ValueError(
893 "offset must be a non-negative integer (got {!r})".format(
894 offset))
895
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800896 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
897 """Create, bind and connect one socket."""
898 my_exceptions = []
899 exceptions.append(my_exceptions)
900 family, type_, proto, _, address = addr_info
901 sock = None
902 try:
903 sock = socket.socket(family=family, type=type_, proto=proto)
904 sock.setblocking(False)
905 if local_addr_infos is not None:
906 for _, _, _, _, laddr in local_addr_infos:
907 try:
908 sock.bind(laddr)
909 break
910 except OSError as exc:
911 msg = (
912 f'error while attempting to bind on '
913 f'address {laddr!r}: '
914 f'{exc.strerror.lower()}'
915 )
916 exc = OSError(exc.errno, msg)
917 my_exceptions.append(exc)
918 else: # all bind attempts failed
919 raise my_exceptions.pop()
920 await self.sock_connect(sock, address)
921 return sock
922 except OSError as exc:
923 my_exceptions.append(exc)
924 if sock is not None:
925 sock.close()
926 raise
927 except:
928 if sock is not None:
929 sock.close()
930 raise
931
Neil Aspinallf7686c12017-12-19 19:45:42 +0000932 async def create_connection(
933 self, protocol_factory, host=None, port=None,
934 *, ssl=None, family=0,
935 proto=0, flags=0, sock=None,
936 local_addr=None, server_hostname=None,
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800937 ssl_handshake_timeout=None,
938 happy_eyeballs_delay=None, interleave=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200939 """Connect to a TCP server.
940
941 Create a streaming transport connection to a given Internet host and
942 port: socket family AF_INET or socket.AF_INET6 depending on host (or
943 family if specified), socket type SOCK_STREAM. protocol_factory must be
944 a callable returning a protocol instance.
945
946 This method is a coroutine which will try to establish the connection
947 in the background. When successful, the coroutine returns a
948 (transport, protocol) pair.
949 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700950 if server_hostname is not None and not ssl:
951 raise ValueError('server_hostname is only meaningful with ssl')
952
953 if server_hostname is None and ssl:
954 # Use host as default for server_hostname. It is an error
955 # if host is empty or not set, e.g. when an
956 # already-connected socket was passed or when only a port
957 # is given. To avoid this error, you can pass
958 # server_hostname='' -- this will bypass the hostname
959 # check. (This also means that if host is a numeric
960 # IP/IPv6 address, we will attempt to verify that exact
961 # address; this will probably fail, but it is possible to
962 # create a certificate for a specific IP address, so we
963 # don't judge it here.)
964 if not host:
965 raise ValueError('You must set server_hostname '
966 'when using ssl without a host')
967 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700968
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200969 if ssl_handshake_timeout is not None and not ssl:
970 raise ValueError(
971 'ssl_handshake_timeout is only meaningful with ssl')
972
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800973 if happy_eyeballs_delay is not None and interleave is None:
974 # If using happy eyeballs, default to interleave addresses by family
975 interleave = 1
976
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700977 if host is not None or port is not None:
978 if sock is not None:
979 raise ValueError(
980 'host/port and sock can not be specified at the same time')
981
Yury Selivanov19a44f62017-12-14 20:53:26 -0500982 infos = await self._ensure_resolved(
983 (host, port), family=family,
984 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700985 if not infos:
986 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500987
988 if local_addr is not None:
989 laddr_infos = await self._ensure_resolved(
990 local_addr, family=family,
991 type=socket.SOCK_STREAM, proto=proto,
992 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700993 if not laddr_infos:
994 raise OSError('getaddrinfo() returned empty list')
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800995 else:
996 laddr_infos = None
997
998 if interleave:
999 infos = _interleave_addrinfos(infos, interleave)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001000
1001 exceptions = []
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001002 if happy_eyeballs_delay is None:
1003 # not using happy eyeballs
1004 for addrinfo in infos:
1005 try:
1006 sock = await self._connect_sock(
1007 exceptions, addrinfo, laddr_infos)
1008 break
1009 except OSError:
1010 continue
1011 else: # using happy eyeballs
1012 sock, _, _ = await staggered.staggered_race(
1013 (functools.partial(self._connect_sock,
1014 exceptions, addrinfo, laddr_infos)
1015 for addrinfo in infos),
1016 happy_eyeballs_delay, loop=self)
1017
1018 if sock is None:
1019 exceptions = [exc for sub in exceptions for exc in sub]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001020 if len(exceptions) == 1:
1021 raise exceptions[0]
1022 else:
1023 # If they all have the same str(), raise one.
1024 model = str(exceptions[0])
1025 if all(str(exc) == model for exc in exceptions):
1026 raise exceptions[0]
1027 # Raise a combined exception so the user can see all
1028 # the various error messages.
1029 raise OSError('Multiple exceptions: {}'.format(
1030 ', '.join(str(exc) for exc in exceptions)))
1031
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001032 else:
1033 if sock is None:
1034 raise ValueError(
1035 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001036 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -05001037 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1038 # are SOCK_STREAM.
1039 # We support passing AF_UNIX sockets even though we have
1040 # a dedicated API for that: create_unix_connection.
1041 # Disallowing AF_UNIX in this method, breaks backwards
1042 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001043 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001044 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001045
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001046 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001047 sock, protocol_factory, ssl, server_hostname,
1048 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +02001049 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +02001050 # Get the socket from the transport because SSL transport closes
1051 # the old socket and creates a new SSL socket
1052 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001053 logger.debug("%r connected to %s:%r: (%r, %r)",
1054 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001055 return transport, protocol
1056
Neil Aspinallf7686c12017-12-19 19:45:42 +00001057 async def _create_connection_transport(
1058 self, sock, protocol_factory, ssl,
1059 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001060 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001061
1062 sock.setblocking(False)
1063
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001064 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001065 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001066 if ssl:
1067 sslcontext = None if isinstance(ssl, bool) else ssl
1068 transport = self._make_ssl_transport(
1069 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001070 server_side=server_side, server_hostname=server_hostname,
1071 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001072 else:
1073 transport = self._make_socket_transport(sock, protocol, waiter)
1074
Victor Stinner29ad0112015-01-15 00:04:21 +01001075 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001076 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001077 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001078 transport.close()
1079 raise
1080
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001081 return transport, protocol
1082
Andrew Svetlov7c684072018-01-27 21:22:47 +02001083 async def sendfile(self, transport, file, offset=0, count=None,
1084 *, fallback=True):
1085 """Send a file to transport.
1086
1087 Return the total number of bytes which were sent.
1088
1089 The method uses high-performance os.sendfile if available.
1090
1091 file must be a regular file object opened in binary mode.
1092
1093 offset tells from where to start reading the file. If specified,
1094 count is the total number of bytes to transmit as opposed to
1095 sending the file until EOF is reached. File position is updated on
1096 return or also in case of error in which case file.tell()
1097 can be used to figure out the number of bytes
1098 which were sent.
1099
1100 fallback set to True makes asyncio to manually read and send
1101 the file when the platform does not support the sendfile syscall
1102 (e.g. Windows or SSL socket on Unix).
1103
1104 Raise SendfileNotAvailableError if the system does not support
1105 sendfile syscall and fallback is False.
1106 """
1107 if transport.is_closing():
1108 raise RuntimeError("Transport is closing")
1109 mode = getattr(transport, '_sendfile_compatible',
1110 constants._SendfileMode.UNSUPPORTED)
1111 if mode is constants._SendfileMode.UNSUPPORTED:
1112 raise RuntimeError(
1113 f"sendfile is not supported for transport {transport!r}")
1114 if mode is constants._SendfileMode.TRY_NATIVE:
1115 try:
1116 return await self._sendfile_native(transport, file,
1117 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001118 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001119 if not fallback:
1120 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001121
1122 if not fallback:
1123 raise RuntimeError(
1124 f"fallback is disabled and native sendfile is not "
1125 f"supported for transport {transport!r}")
1126
Andrew Svetlov7c684072018-01-27 21:22:47 +02001127 return await self._sendfile_fallback(transport, file,
1128 offset, count)
1129
1130 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001131 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001132 "sendfile syscall is not supported")
1133
1134 async def _sendfile_fallback(self, transp, file, offset, count):
1135 if offset:
1136 file.seek(offset)
1137 blocksize = min(count, 16384) if count else 16384
1138 buf = bytearray(blocksize)
1139 total_sent = 0
1140 proto = _SendfileFallbackProtocol(transp)
1141 try:
1142 while True:
1143 if count:
1144 blocksize = min(count - total_sent, blocksize)
1145 if blocksize <= 0:
1146 return total_sent
1147 view = memoryview(buf)[:blocksize]
Andrew Svetlovb6ff2cd2019-06-15 14:55:52 +03001148 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov7c684072018-01-27 21:22:47 +02001149 if not read:
1150 return total_sent # EOF
1151 await proto.drain()
Miss Islington (bot)bb073212019-06-15 04:24:16 -07001152 transp.write(view[:read])
Andrew Svetlov7c684072018-01-27 21:22:47 +02001153 total_sent += read
1154 finally:
1155 if total_sent > 0 and hasattr(file, 'seek'):
1156 file.seek(offset + total_sent)
1157 await proto.restore()
1158
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001159 async def start_tls(self, transport, protocol, sslcontext, *,
1160 server_side=False,
1161 server_hostname=None,
1162 ssl_handshake_timeout=None):
1163 """Upgrade transport to TLS.
1164
1165 Return a new transport that *protocol* should start using
1166 immediately.
1167 """
1168 if ssl is None:
1169 raise RuntimeError('Python ssl module is not available')
1170
1171 if not isinstance(sslcontext, ssl.SSLContext):
1172 raise TypeError(
1173 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1174 f'got {sslcontext!r}')
1175
1176 if not getattr(transport, '_start_tls_compatible', False):
1177 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001178 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001179
1180 waiter = self.create_future()
1181 ssl_protocol = sslproto.SSLProtocol(
1182 self, protocol, sslcontext, waiter,
1183 server_side, server_hostname,
1184 ssl_handshake_timeout=ssl_handshake_timeout,
1185 call_connection_made=False)
1186
Yury Selivanovf2955872018-05-29 01:00:12 -04001187 # Pause early so that "ssl_protocol.data_received()" doesn't
1188 # have a chance to get called before "ssl_protocol.connection_made()".
1189 transport.pause_reading()
1190
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001191 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001192 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1193 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001194
Yury Selivanov96026432018-06-04 11:32:35 -04001195 try:
1196 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +02001197 except BaseException:
Yury Selivanov96026432018-06-04 11:32:35 -04001198 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001199 conmade_cb.cancel()
1200 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001201 raise
1202
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001203 return ssl_protocol._app_transport
1204
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001205 async def create_datagram_endpoint(self, protocol_factory,
1206 local_addr=None, remote_addr=None, *,
1207 family=0, proto=0, flags=0,
Miss Islington (bot)79c29742019-12-09 06:39:54 -08001208 reuse_address=_unset, reuse_port=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001209 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001210 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001211 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001212 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001213 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001214 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001215 if (local_addr or remote_addr or
1216 family or proto or flags or
Miss Islington (bot)79c29742019-12-09 06:39:54 -08001217 reuse_port or allow_broadcast):
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001218 # show the problematic kwargs in exception msg
1219 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1220 family=family, proto=proto, flags=flags,
1221 reuse_address=reuse_address, reuse_port=reuse_port,
1222 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001223 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001224 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001225 f'socket modifier keyword arguments can not be used '
1226 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001227 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001228 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001229 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001230 if not (local_addr or remote_addr):
1231 if family == 0:
1232 raise ValueError('unexpected address family')
1233 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001234 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1235 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001236 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001237 raise TypeError('string is expected')
Quentin Dawans56065d42019-04-09 15:40:59 +02001238
1239 if local_addr and local_addr[0] not in (0, '\x00'):
1240 try:
1241 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1242 os.remove(local_addr)
1243 except FileNotFoundError:
1244 pass
1245 except OSError as err:
1246 # Directory may have permissions only to create socket.
1247 logger.error('Unable to check or remove stale UNIX '
1248 'socket %r: %r',
1249 local_addr, err)
1250
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001251 addr_pairs_info = (((family, proto),
1252 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001253 else:
1254 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001255 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001256 for idx, addr in ((0, local_addr), (1, remote_addr)):
1257 if addr is not None:
1258 assert isinstance(addr, tuple) and len(addr) == 2, (
1259 '2-tuple is expected')
1260
Yury Selivanov19a44f62017-12-14 20:53:26 -05001261 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001262 addr, family=family, type=socket.SOCK_DGRAM,
1263 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001264 if not infos:
1265 raise OSError('getaddrinfo() returned empty list')
1266
1267 for fam, _, pro, _, address in infos:
1268 key = (fam, pro)
1269 if key not in addr_infos:
1270 addr_infos[key] = [None, None]
1271 addr_infos[key][idx] = address
1272
1273 # each addr has to have info for each (family, proto) pair
1274 addr_pairs_info = [
1275 (key, addr_pair) for key, addr_pair in addr_infos.items()
1276 if not ((local_addr and addr_pair[0] is None) or
1277 (remote_addr and addr_pair[1] is None))]
1278
1279 if not addr_pairs_info:
1280 raise ValueError('can not get address information')
1281
1282 exceptions = []
1283
Miss Islington (bot)79c29742019-12-09 06:39:54 -08001284 # bpo-37228
1285 if reuse_address is not _unset:
1286 if reuse_address:
1287 raise ValueError("Passing `reuse_address=True` is no "
1288 "longer supported, as the usage of "
1289 "SO_REUSEPORT in UDP poses a significant "
1290 "security concern.")
1291 else:
1292 warnings.warn("The *reuse_address* parameter has been "
1293 "deprecated as of 3.5.10 and is scheduled "
1294 "for removal in 3.11.", DeprecationWarning,
1295 stacklevel=2)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001296
1297 for ((family, proto),
1298 (local_address, remote_address)) in addr_pairs_info:
1299 sock = None
1300 r_addr = None
1301 try:
1302 sock = socket.socket(
1303 family=family, type=socket.SOCK_DGRAM, proto=proto)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001304 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001305 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001306 if allow_broadcast:
1307 sock.setsockopt(
1308 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1309 sock.setblocking(False)
1310
1311 if local_addr:
1312 sock.bind(local_address)
1313 if remote_addr:
Vincent Michel63deaa52019-05-07 19:18:49 +02001314 if not allow_broadcast:
1315 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001316 r_addr = remote_address
1317 except OSError as exc:
1318 if sock is not None:
1319 sock.close()
1320 exceptions.append(exc)
1321 except:
1322 if sock is not None:
1323 sock.close()
1324 raise
1325 else:
1326 break
1327 else:
1328 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001329
1330 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001331 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001332 transport = self._make_datagram_transport(
1333 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001334 if self._debug:
1335 if local_addr:
1336 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1337 "created: (%r, %r)",
1338 local_addr, remote_addr, transport, protocol)
1339 else:
1340 logger.debug("Datagram endpoint remote_addr=%r created: "
1341 "(%r, %r)",
1342 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001343
1344 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001345 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001346 except:
1347 transport.close()
1348 raise
1349
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001350 return transport, protocol
1351
Yury Selivanov19a44f62017-12-14 20:53:26 -05001352 async def _ensure_resolved(self, address, *,
1353 family=0, type=socket.SOCK_STREAM,
1354 proto=0, flags=0, loop):
1355 host, port = address[:2]
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +02001356 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
Yury Selivanov19a44f62017-12-14 20:53:26 -05001357 if info is not None:
1358 # "host" is already a resolved IP.
1359 return [info]
1360 else:
1361 return await loop.getaddrinfo(host, port, family=family, type=type,
1362 proto=proto, flags=flags)
1363
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001364 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001365 infos = await self._ensure_resolved((host, port), family=family,
1366 type=socket.SOCK_STREAM,
1367 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001368 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001369 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001370 return infos
1371
Neil Aspinallf7686c12017-12-19 19:45:42 +00001372 async def create_server(
1373 self, protocol_factory, host=None, port=None,
1374 *,
1375 family=socket.AF_UNSPEC,
1376 flags=socket.AI_PASSIVE,
1377 sock=None,
1378 backlog=100,
1379 ssl=None,
1380 reuse_address=None,
1381 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001382 ssl_handshake_timeout=None,
1383 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001384 """Create a TCP server.
1385
Yury Selivanov6370f342017-12-10 18:36:12 -05001386 The host parameter can be a string, in that case the TCP server is
1387 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001388
1389 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001390 the TCP server is bound to all hosts of the sequence. If a host
1391 appears multiple times (possibly indirectly e.g. when hostnames
1392 resolve to the same IP address), the server is only bound once to that
1393 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001394
Victor Stinneracdb7822014-07-14 18:33:40 +02001395 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001396
1397 This method is a coroutine.
1398 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001399 if isinstance(ssl, bool):
1400 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001401
1402 if ssl_handshake_timeout is not None and ssl is None:
1403 raise ValueError(
1404 'ssl_handshake_timeout is only meaningful with ssl')
1405
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001406 if host is not None or port is not None:
1407 if sock is not None:
1408 raise ValueError(
1409 'host/port and sock can not be specified at the same time')
1410
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001411 if reuse_address is None:
1412 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1413 sockets = []
1414 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001415 hosts = [None]
1416 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001417 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001418 hosts = [host]
1419 else:
1420 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001421
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001422 fs = [self._create_server_getaddrinfo(host, port, family=family,
1423 flags=flags)
1424 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001425 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001426 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001427
1428 completed = False
1429 try:
1430 for res in infos:
1431 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001432 try:
1433 sock = socket.socket(af, socktype, proto)
1434 except socket.error:
1435 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001436 if self._debug:
1437 logger.warning('create_server() failed to create '
1438 'socket.socket(%r, %r, %r)',
1439 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001440 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001441 sockets.append(sock)
1442 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001443 sock.setsockopt(
1444 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1445 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001446 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001447 # Disable IPv4/IPv6 dual stack support (enabled by
1448 # default on Linux) which makes a single socket
1449 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001450 if (_HAS_IPv6 and
1451 af == socket.AF_INET6 and
1452 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001453 sock.setsockopt(socket.IPPROTO_IPV6,
1454 socket.IPV6_V6ONLY,
1455 True)
1456 try:
1457 sock.bind(sa)
1458 except OSError as err:
1459 raise OSError(err.errno, 'error while attempting '
1460 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001461 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001462 completed = True
1463 finally:
1464 if not completed:
1465 for sock in sockets:
1466 sock.close()
1467 else:
1468 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001469 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001470 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001471 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001472 sockets = [sock]
1473
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001474 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001475 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001476
1477 server = Server(self, sockets, protocol_factory,
1478 ssl, backlog, ssl_handshake_timeout)
1479 if start_serving:
1480 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001481 # Skip one loop iteration so that all 'loop.add_reader'
1482 # go through.
1483 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001484
Victor Stinnere912e652014-07-12 03:11:53 +02001485 if self._debug:
1486 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001487 return server
1488
Neil Aspinallf7686c12017-12-19 19:45:42 +00001489 async def connect_accepted_socket(
1490 self, protocol_factory, sock,
1491 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001492 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001493 """Handle an accepted connection.
1494
1495 This is used by servers that accept connections outside of
1496 asyncio but that use asyncio to handle connections.
1497
1498 This method is a coroutine. When completed, the coroutine
1499 returns a (transport, protocol) pair.
1500 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001501 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001502 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001503
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001504 if ssl_handshake_timeout is not None and not ssl:
1505 raise ValueError(
1506 'ssl_handshake_timeout is only meaningful with ssl')
1507
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001508 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001509 sock, protocol_factory, ssl, '', server_side=True,
1510 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001511 if self._debug:
1512 # Get the socket from the transport because SSL transport closes
1513 # the old socket and creates a new SSL socket
1514 sock = transport.get_extra_info('socket')
1515 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1516 return transport, protocol
1517
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001518 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001519 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001520 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001521 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001522
1523 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001524 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001525 except:
1526 transport.close()
1527 raise
1528
Victor Stinneracdb7822014-07-14 18:33:40 +02001529 if self._debug:
1530 logger.debug('Read pipe %r connected: (%r, %r)',
1531 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001532 return transport, protocol
1533
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001534 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001535 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001536 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001537 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001538
1539 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001540 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001541 except:
1542 transport.close()
1543 raise
1544
Victor Stinneracdb7822014-07-14 18:33:40 +02001545 if self._debug:
1546 logger.debug('Write pipe %r connected: (%r, %r)',
1547 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001548 return transport, protocol
1549
Victor Stinneracdb7822014-07-14 18:33:40 +02001550 def _log_subprocess(self, msg, stdin, stdout, stderr):
1551 info = [msg]
1552 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001553 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001554 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001555 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001556 else:
1557 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001558 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001559 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001560 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001561 logger.debug(' '.join(info))
1562
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001563 async def subprocess_shell(self, protocol_factory, cmd, *,
1564 stdin=subprocess.PIPE,
1565 stdout=subprocess.PIPE,
1566 stderr=subprocess.PIPE,
1567 universal_newlines=False,
1568 shell=True, bufsize=0,
sbstpf0d4c642019-05-27 19:51:19 -04001569 encoding=None, errors=None, text=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001570 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001571 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001572 raise ValueError("cmd must be a string")
1573 if universal_newlines:
1574 raise ValueError("universal_newlines must be False")
1575 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001576 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001577 if bufsize != 0:
1578 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001579 if text:
1580 raise ValueError("text must be False")
1581 if encoding is not None:
1582 raise ValueError("encoding must be None")
1583 if errors is not None:
1584 raise ValueError("errors must be None")
1585
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001586 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001587 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001588 if self._debug:
1589 # don't log parameters: they may contain sensitive information
1590 # (password) and may be too long
1591 debug_log = 'run shell command %r' % cmd
1592 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001593 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001594 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001595 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001596 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001597 return transport, protocol
1598
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001599 async def subprocess_exec(self, protocol_factory, program, *args,
1600 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1601 stderr=subprocess.PIPE, universal_newlines=False,
sbstpf0d4c642019-05-27 19:51:19 -04001602 shell=False, bufsize=0,
1603 encoding=None, errors=None, text=None,
1604 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001605 if universal_newlines:
1606 raise ValueError("universal_newlines must be False")
1607 if shell:
1608 raise ValueError("shell must be False")
1609 if bufsize != 0:
1610 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001611 if text:
1612 raise ValueError("text must be False")
1613 if encoding is not None:
1614 raise ValueError("encoding must be None")
1615 if errors is not None:
1616 raise ValueError("errors must be None")
1617
Victor Stinner20e07432014-02-11 11:44:56 +01001618 popen_args = (program,) + args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001619 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001620 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001621 if self._debug:
1622 # don't log parameters: they may contain sensitive information
1623 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001624 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001625 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001626 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001627 protocol, popen_args, False, stdin, stdout, stderr,
1628 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001629 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001630 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001631 return transport, protocol
1632
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001633 def get_exception_handler(self):
1634 """Return an exception handler, or None if the default one is in use.
1635 """
1636 return self._exception_handler
1637
Yury Selivanov569efa22014-02-18 18:02:19 -05001638 def set_exception_handler(self, handler):
1639 """Set handler as the new event loop exception handler.
1640
1641 If handler is None, the default exception handler will
1642 be set.
1643
1644 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001645 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001646 will be a reference to the active event loop, 'context'
1647 will be a dict object (see `call_exception_handler()`
1648 documentation for details about context).
1649 """
1650 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001651 raise TypeError(f'A callable object or None is expected, '
1652 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001653 self._exception_handler = handler
1654
1655 def default_exception_handler(self, context):
1656 """Default exception handler.
1657
1658 This is called when an exception occurs and no exception
1659 handler is set, and can be called by a custom exception
1660 handler that wants to defer to the default behavior.
1661
Antoine Pitrou921e9432017-11-07 17:23:29 +01001662 This default handler logs the error message and other
1663 context-dependent information. In debug mode, a truncated
1664 stack trace is also appended showing where the given object
1665 (e.g. a handle or future or task) was created, if any.
1666
Victor Stinneracdb7822014-07-14 18:33:40 +02001667 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001668 `call_exception_handler()`.
1669 """
1670 message = context.get('message')
1671 if not message:
1672 message = 'Unhandled exception in event loop'
1673
1674 exception = context.get('exception')
1675 if exception is not None:
1676 exc_info = (type(exception), exception, exception.__traceback__)
1677 else:
1678 exc_info = False
1679
Yury Selivanov6370f342017-12-10 18:36:12 -05001680 if ('source_traceback' not in context and
1681 self._current_handle is not None and
1682 self._current_handle._source_traceback):
1683 context['handle_traceback'] = \
1684 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001685
Yury Selivanov569efa22014-02-18 18:02:19 -05001686 log_lines = [message]
1687 for key in sorted(context):
1688 if key in {'message', 'exception'}:
1689 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001690 value = context[key]
1691 if key == 'source_traceback':
1692 tb = ''.join(traceback.format_list(value))
1693 value = 'Object created at (most recent call last):\n'
1694 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001695 elif key == 'handle_traceback':
1696 tb = ''.join(traceback.format_list(value))
1697 value = 'Handle created at (most recent call last):\n'
1698 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001699 else:
1700 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001701 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001702
1703 logger.error('\n'.join(log_lines), exc_info=exc_info)
1704
1705 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001706 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001707
Victor Stinneracdb7822014-07-14 18:33:40 +02001708 The context argument is a dict containing the following keys:
1709
Yury Selivanov569efa22014-02-18 18:02:19 -05001710 - 'message': Error message;
1711 - 'exception' (optional): Exception object;
1712 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001713 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001714 - 'handle' (optional): Handle instance;
1715 - 'protocol' (optional): Protocol instance;
1716 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001717 - 'socket' (optional): Socket instance;
1718 - 'asyncgen' (optional): Asynchronous generator that caused
1719 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001720
Victor Stinneracdb7822014-07-14 18:33:40 +02001721 New keys maybe introduced in the future.
1722
1723 Note: do not overload this method in an event loop subclass.
1724 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001725 `set_exception_handler()` method.
1726 """
1727 if self._exception_handler is None:
1728 try:
1729 self.default_exception_handler(context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001730 except (SystemExit, KeyboardInterrupt):
1731 raise
1732 except BaseException:
Yury Selivanov569efa22014-02-18 18:02:19 -05001733 # Second protection layer for unexpected errors
1734 # in the default implementation, as well as for subclassed
1735 # event loops with overloaded "default_exception_handler".
1736 logger.error('Exception in default exception handler',
1737 exc_info=True)
1738 else:
1739 try:
1740 self._exception_handler(self, context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001741 except (SystemExit, KeyboardInterrupt):
1742 raise
1743 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001744 # Exception in the user set custom exception handler.
1745 try:
1746 # Let's try default handler.
1747 self.default_exception_handler({
1748 'message': 'Unhandled error in exception handler',
1749 'exception': exc,
1750 'context': context,
1751 })
Yury Selivanov431b5402019-05-27 14:45:12 +02001752 except (SystemExit, KeyboardInterrupt):
1753 raise
1754 except BaseException:
Victor Stinneracdb7822014-07-14 18:33:40 +02001755 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001756 # overloaded.
1757 logger.error('Exception in default exception handler '
1758 'while handling an unexpected error '
1759 'in custom exception handler',
1760 exc_info=True)
1761
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001762 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001763 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001764 assert isinstance(handle, events.Handle), 'A Handle is required here'
1765 if handle._cancelled:
1766 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001767 assert not isinstance(handle, events.TimerHandle)
1768 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001769
1770 def _add_callback_signalsafe(self, handle):
1771 """Like _add_callback() but called from a signal handler."""
1772 self._add_callback(handle)
1773 self._write_to_self()
1774
Yury Selivanov592ada92014-09-25 12:07:56 -04001775 def _timer_handle_cancelled(self, handle):
1776 """Notification that a TimerHandle has been cancelled."""
1777 if handle._scheduled:
1778 self._timer_cancelled_count += 1
1779
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001780 def _run_once(self):
1781 """Run one full iteration of the event loop.
1782
1783 This calls all currently ready callbacks, polls for I/O,
1784 schedules the resulting callbacks, and finally schedules
1785 'call_later' callbacks.
1786 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001787
Yury Selivanov592ada92014-09-25 12:07:56 -04001788 sched_count = len(self._scheduled)
1789 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1790 self._timer_cancelled_count / sched_count >
1791 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001792 # Remove delayed calls that were cancelled if their number
1793 # is too high
1794 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001795 for handle in self._scheduled:
1796 if handle._cancelled:
1797 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001798 else:
1799 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001800
Victor Stinner68da8fc2014-09-30 18:08:36 +02001801 heapq.heapify(new_scheduled)
1802 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001803 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001804 else:
1805 # Remove delayed calls that were cancelled from head of queue.
1806 while self._scheduled and self._scheduled[0]._cancelled:
1807 self._timer_cancelled_count -= 1
1808 handle = heapq.heappop(self._scheduled)
1809 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001810
1811 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001812 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001813 timeout = 0
1814 elif self._scheduled:
1815 # Compute the desired timeout.
1816 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001817 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001818
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001819 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001820 self._process_events(event_list)
1821
1822 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001823 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001824 while self._scheduled:
1825 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001826 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001827 break
1828 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001829 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001830 self._ready.append(handle)
1831
1832 # This is the only place where callbacks are actually *called*.
1833 # All other places just add them to ready.
1834 # Note: We run all currently scheduled callbacks, but not any
1835 # callbacks scheduled by callbacks run this time around --
1836 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001837 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001838 ntodo = len(self._ready)
1839 for i in range(ntodo):
1840 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001841 if handle._cancelled:
1842 continue
1843 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001844 try:
1845 self._current_handle = handle
1846 t0 = self.time()
1847 handle._run()
1848 dt = self.time() - t0
1849 if dt >= self.slow_callback_duration:
1850 logger.warning('Executing %s took %.3f seconds',
1851 _format_handle(handle), dt)
1852 finally:
1853 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001854 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001855 handle._run()
1856 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001857
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001858 def _set_coroutine_origin_tracking(self, enabled):
1859 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001860 return
1861
Yury Selivanove8944cb2015-05-12 11:43:04 -04001862 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001863 self._coroutine_origin_tracking_saved_depth = (
1864 sys.get_coroutine_origin_tracking_depth())
1865 sys.set_coroutine_origin_tracking_depth(
1866 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001867 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001868 sys.set_coroutine_origin_tracking_depth(
1869 self._coroutine_origin_tracking_saved_depth)
1870
1871 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001872
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001873 def get_debug(self):
1874 return self._debug
1875
1876 def set_debug(self, enabled):
1877 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001878
Yury Selivanove8944cb2015-05-12 11:43:04 -04001879 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001880 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)