blob: 031071281b38f72c2d179ab85e7e95addee4f1e2 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
twisteroid ambassador88f07a82019-05-05 19:14:35 +080019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020021import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
Quentin Dawans56065d42019-04-09 15:40:59 +020024import stat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010026import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020028import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010030import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070031import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Yury Selivanovf111b3d2017-12-30 00:35:36 -050033try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
37
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080038from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020039from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070041from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020043from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050044from . import sslproto
twisteroid ambassador88f07a82019-05-05 19:14:35 +080045from . import staggered
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020047from . import transports
Yury Selivanov8cd51652019-05-27 15:57:20 +020048from . import trsock
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070049from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov6370f342017-12-10 18:36:12 -050052__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
54
Yury Selivanov592ada92014-09-25 12:07:56 -040055# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
Andrew Svetlov0dd71802018-09-12 14:03:54 -070063
Yury Selivanovd904c232018-06-28 21:59:32 -040064_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
MartinAltmayer944451c2018-07-31 15:06:12 +010066# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
Victor Stinnerc94a93a2016-04-01 21:43:39 +020069
Victor Stinner0e6f52a2014-06-20 17:34:15 +020070def _format_handle(handle):
71 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040072 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020073 # format the task
74 return repr(cb.__self__)
75 else:
76 return str(handle)
77
78
Victor Stinneracdb7822014-07-14 18:33:40 +020079def _format_pipe(fd):
80 if fd == subprocess.PIPE:
81 return '<pipe>'
82 elif fd == subprocess.STDOUT:
83 return '<stdout>'
84 else:
85 return repr(fd)
86
87
Yury Selivanov5587d7c2016-09-15 15:45:07 -040088def _set_reuseport(sock):
89 if not hasattr(socket, 'SO_REUSEPORT'):
90 raise ValueError('reuse_port not supported by socket module')
91 else:
92 try:
93 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
94 except OSError:
95 raise ValueError('reuse_port not supported by socket module, '
96 'SO_REUSEPORT defined but not implemented.')
97
98
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +020099def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400100 # Try to skip getaddrinfo if "host" is already an IP. Users might have
101 # handled name resolution in their own code and pass in resolved IPs.
102 if not hasattr(socket, 'inet_pton'):
103 return
104
105 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
106 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500107 return None
108
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500109 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500110 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500111 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500112 proto = socket.IPPROTO_UDP
113 else:
114 return None
115
Yury Selivanova7146162016-06-02 16:51:07 -0400116 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400117 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700118 elif isinstance(port, bytes) and port == b'':
119 port = 0
120 elif isinstance(port, str) and port == '':
121 port = 0
122 else:
123 # If port's a service name like "http", don't skip getaddrinfo.
124 try:
125 port = int(port)
126 except (TypeError, ValueError):
127 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400128
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400129 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500130 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400131 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500132 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 else:
134 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500135
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400136 if isinstance(host, bytes):
137 host = host.decode('idna')
138 if '%' in host:
139 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
140 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500141 return None
142
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400143 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500144 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400145 socket.inet_pton(af, host)
146 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400147 if _HAS_IPv6 and af == socket.AF_INET6:
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200148 return af, type, proto, '', (host, port, flowinfo, scopeid)
Yury Selivanovd904c232018-06-28 21:59:32 -0400149 else:
150 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400151 except OSError:
152 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500153
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400154 # "host" is not an IP address.
155 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500156
157
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800158def _interleave_addrinfos(addrinfos, first_address_family_count=1):
159 """Interleave list of addrinfo tuples by family."""
160 # Group addresses by family
161 addrinfos_by_family = collections.OrderedDict()
162 for addr in addrinfos:
163 family = addr[0]
164 if family not in addrinfos_by_family:
165 addrinfos_by_family[family] = []
166 addrinfos_by_family[family].append(addr)
167 addrinfos_lists = list(addrinfos_by_family.values())
168
169 reordered = []
170 if first_address_family_count > 1:
171 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
172 del addrinfos_lists[0][:first_address_family_count - 1]
173 reordered.extend(
174 a for a in itertools.chain.from_iterable(
175 itertools.zip_longest(*addrinfos_lists)
176 ) if a is not None)
177 return reordered
178
179
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100180def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500181 if not fut.cancelled():
182 exc = fut.exception()
Yury Selivanov431b5402019-05-27 14:45:12 +0200183 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500184 # Issue #22429: run_forever() already finished, no need to
185 # stop it.
186 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500187 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100188
189
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200190if hasattr(socket, 'TCP_NODELAY'):
191 def _set_nodelay(sock):
192 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
193 sock.type == socket.SOCK_STREAM and
194 sock.proto == socket.IPPROTO_TCP):
195 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
196else:
197 def _set_nodelay(sock):
198 pass
199
200
Andrew Svetlov7c684072018-01-27 21:22:47 +0200201class _SendfileFallbackProtocol(protocols.Protocol):
202 def __init__(self, transp):
203 if not isinstance(transp, transports._FlowControlMixin):
204 raise TypeError("transport should be _FlowControlMixin instance")
205 self._transport = transp
206 self._proto = transp.get_protocol()
207 self._should_resume_reading = transp.is_reading()
208 self._should_resume_writing = transp._protocol_paused
209 transp.pause_reading()
210 transp.set_protocol(self)
211 if self._should_resume_writing:
212 self._write_ready_fut = self._transport._loop.create_future()
213 else:
214 self._write_ready_fut = None
215
216 async def drain(self):
217 if self._transport.is_closing():
218 raise ConnectionError("Connection closed by peer")
219 fut = self._write_ready_fut
220 if fut is None:
221 return
222 await fut
223
224 def connection_made(self, transport):
225 raise RuntimeError("Invalid state: "
226 "connection should have been established already.")
227
228 def connection_lost(self, exc):
229 if self._write_ready_fut is not None:
230 # Never happens if peer disconnects after sending the whole content
231 # Thus disconnection is always an exception from user perspective
232 if exc is None:
233 self._write_ready_fut.set_exception(
234 ConnectionError("Connection is closed by peer"))
235 else:
236 self._write_ready_fut.set_exception(exc)
237 self._proto.connection_lost(exc)
238
239 def pause_writing(self):
240 if self._write_ready_fut is not None:
241 return
242 self._write_ready_fut = self._transport._loop.create_future()
243
244 def resume_writing(self):
245 if self._write_ready_fut is None:
246 return
247 self._write_ready_fut.set_result(False)
248 self._write_ready_fut = None
249
250 def data_received(self, data):
251 raise RuntimeError("Invalid state: reading should be paused")
252
253 def eof_received(self):
254 raise RuntimeError("Invalid state: reading should be paused")
255
256 async def restore(self):
257 self._transport.set_protocol(self._proto)
258 if self._should_resume_reading:
259 self._transport.resume_reading()
260 if self._write_ready_fut is not None:
261 # Cancel the future.
262 # Basically it has no effect because protocol is switched back,
263 # no code should wait for it anymore.
264 self._write_ready_fut.cancel()
265 if self._should_resume_writing:
266 self._proto.resume_writing()
267
268
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269class Server(events.AbstractServer):
270
Yury Selivanovc9070d02018-01-25 18:08:09 -0500271 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
272 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200273 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500274 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200275 self._active_count = 0
276 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500277 self._protocol_factory = protocol_factory
278 self._backlog = backlog
279 self._ssl_context = ssl_context
280 self._ssl_handshake_timeout = ssl_handshake_timeout
281 self._serving = False
282 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283
Victor Stinnere912e652014-07-12 03:11:53 +0200284 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500285 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200286
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200287 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500288 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200289 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200291 def _detach(self):
292 assert self._active_count > 0
293 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500294 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 self._wakeup()
296
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200298 waiters = self._waiters
299 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 for waiter in waiters:
301 if not waiter.done():
302 waiter.set_result(waiter)
303
Yury Selivanovc9070d02018-01-25 18:08:09 -0500304 def _start_serving(self):
305 if self._serving:
306 return
307 self._serving = True
308 for sock in self._sockets:
309 sock.listen(self._backlog)
310 self._loop._start_serving(
311 self._protocol_factory, sock, self._ssl_context,
312 self, self._backlog, self._ssl_handshake_timeout)
313
314 def get_loop(self):
315 return self._loop
316
317 def is_serving(self):
318 return self._serving
319
320 @property
321 def sockets(self):
322 if self._sockets is None:
Yury Selivanov8cd51652019-05-27 15:57:20 +0200323 return ()
324 return tuple(trsock.TransportSocket(s) for s in self._sockets)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500325
326 def close(self):
327 sockets = self._sockets
328 if sockets is None:
329 return
330 self._sockets = None
331
332 for sock in sockets:
333 self._loop._stop_serving(sock)
334
335 self._serving = False
336
337 if (self._serving_forever_fut is not None and
338 not self._serving_forever_fut.done()):
339 self._serving_forever_fut.cancel()
340 self._serving_forever_fut = None
341
342 if self._active_count == 0:
343 self._wakeup()
344
345 async def start_serving(self):
346 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400347 # Skip one loop iteration so that all 'loop.add_reader'
348 # go through.
349 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500350
351 async def serve_forever(self):
352 if self._serving_forever_fut is not None:
353 raise RuntimeError(
354 f'server {self!r} is already being awaited on serve_forever()')
355 if self._sockets is None:
356 raise RuntimeError(f'server {self!r} is closed')
357
358 self._start_serving()
359 self._serving_forever_fut = self._loop.create_future()
360
361 try:
362 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700363 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500364 try:
365 self.close()
366 await self.wait_closed()
367 finally:
368 raise
369 finally:
370 self._serving_forever_fut = None
371
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200372 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500373 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400375 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200376 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200377 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378
379
380class BaseEventLoop(events.AbstractEventLoop):
381
382 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400383 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200384 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800385 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 self._ready = collections.deque()
387 self._scheduled = []
388 self._default_executor = None
389 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100390 # Identifier of the thread running the event loop, or None if the
391 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100392 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100393 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500394 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800395 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200396 # In debug mode, if the execution of a callback or a step of a task
397 # exceed this duration in seconds, the slow callback/task is logged.
398 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100399 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400400 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800401 self._coroutine_origin_tracking_enabled = False
402 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500404 # A weak set of all asynchronous generators that are
405 # being iterated by the loop.
406 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700407 # Set to True when `loop.shutdown_asyncgens` is called.
408 self._asyncgens_shutdown_called = False
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400409 # Set to True when `loop.shutdown_default_executor` is called.
410 self._executor_shutdown_called = False
Yury Selivanoveb636452016-09-08 22:01:51 -0700411
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200412 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500413 return (
414 f'<{self.__class__.__name__} running={self.is_running()} '
415 f'closed={self.is_closed()} debug={self.get_debug()}>'
416 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200417
Yury Selivanov7661db62016-05-16 15:38:39 -0400418 def create_future(self):
419 """Create a Future object attached to the loop."""
420 return futures.Future(loop=self)
421
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300422 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200423 """Schedule a coroutine object.
424
Victor Stinneracdb7822014-07-14 18:33:40 +0200425 Return a task object.
426 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100427 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400428 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300429 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400430 if task._source_traceback:
431 del task._source_traceback[-1]
432 else:
433 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300434 tasks._set_task_name(task, name)
435
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200436 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200437
Yury Selivanov740169c2015-05-11 14:23:38 -0400438 def set_task_factory(self, factory):
439 """Set a task factory that will be used by loop.create_task().
440
441 If factory is None the default task factory will be set.
442
443 If factory is a callable, it should have a signature matching
444 '(loop, coro)', where 'loop' will be a reference to the active
445 event loop, 'coro' will be a coroutine object. The callable
446 must return a Future.
447 """
448 if factory is not None and not callable(factory):
449 raise TypeError('task factory must be a callable or None')
450 self._task_factory = factory
451
452 def get_task_factory(self):
453 """Return a task factory, or None if the default one is in use."""
454 return self._task_factory
455
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 def _make_socket_transport(self, sock, protocol, waiter=None, *,
457 extra=None, server=None):
458 """Create socket transport."""
459 raise NotImplementedError
460
Neil Aspinallf7686c12017-12-19 19:45:42 +0000461 def _make_ssl_transport(
462 self, rawsock, protocol, sslcontext, waiter=None,
463 *, server_side=False, server_hostname=None,
464 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500465 ssl_handshake_timeout=None,
466 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 """Create SSL transport."""
468 raise NotImplementedError
469
470 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200471 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 """Create datagram transport."""
473 raise NotImplementedError
474
475 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
476 extra=None):
477 """Create read pipe transport."""
478 raise NotImplementedError
479
480 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
481 extra=None):
482 """Create write pipe transport."""
483 raise NotImplementedError
484
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200485 async def _make_subprocess_transport(self, protocol, args, shell,
486 stdin, stdout, stderr, bufsize,
487 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 """Create subprocess transport."""
489 raise NotImplementedError
490
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200492 """Write a byte to self-pipe, to wake up the event loop.
493
494 This may be called from a different thread.
495
496 The subclass is responsible for implementing the self-pipe.
497 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 raise NotImplementedError
499
500 def _process_events(self, event_list):
501 """Process selector events."""
502 raise NotImplementedError
503
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200504 def _check_closed(self):
505 if self._closed:
506 raise RuntimeError('Event loop is closed')
507
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400508 def _check_default_executor(self):
509 if self._executor_shutdown_called:
510 raise RuntimeError('Executor shutdown has been called')
511
Yury Selivanoveb636452016-09-08 22:01:51 -0700512 def _asyncgen_finalizer_hook(self, agen):
513 self._asyncgens.discard(agen)
514 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800515 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700516
517 def _asyncgen_firstiter_hook(self, agen):
518 if self._asyncgens_shutdown_called:
519 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500520 f"asynchronous generator {agen!r} was scheduled after "
521 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700522 ResourceWarning, source=self)
523
524 self._asyncgens.add(agen)
525
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200526 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700527 """Shutdown all active asynchronous generators."""
528 self._asyncgens_shutdown_called = True
529
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500530 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400531 # If Python version is <3.6 or we don't have any asynchronous
532 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700533 return
534
535 closing_agens = list(self._asyncgens)
536 self._asyncgens.clear()
537
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200538 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700539 *[ag.aclose() for ag in closing_agens],
540 return_exceptions=True,
541 loop=self)
542
Yury Selivanoveb636452016-09-08 22:01:51 -0700543 for result, agen in zip(results, closing_agens):
544 if isinstance(result, Exception):
545 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500546 'message': f'an error occurred during closing of '
547 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700548 'exception': result,
549 'asyncgen': agen
550 })
551
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400552 async def shutdown_default_executor(self):
553 """Schedule the shutdown of the default executor."""
554 self._executor_shutdown_called = True
555 if self._default_executor is None:
556 return
557 future = self.create_future()
558 thread = threading.Thread(target=self._do_shutdown, args=(future,))
559 thread.start()
560 try:
561 await future
562 finally:
563 thread.join()
564
565 def _do_shutdown(self, future):
566 try:
567 self._default_executor.shutdown(wait=True)
568 self.call_soon_threadsafe(future.set_result, None)
569 except Exception as ex:
570 self.call_soon_threadsafe(future.set_exception, ex)
571
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 def run_forever(self):
573 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200574 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100575 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400576 raise RuntimeError('This event loop is already running')
577 if events._get_running_loop() is not None:
578 raise RuntimeError(
579 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800580 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100581 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500582
583 old_agen_hooks = sys.get_asyncgen_hooks()
584 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
585 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400587 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800589 self._run_once()
590 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 break
592 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800593 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100594 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400595 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800596 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500597 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598
599 def run_until_complete(self, future):
600 """Run until the Future is done.
601
602 If the argument is a coroutine, it is wrapped in a Task.
603
Victor Stinneracdb7822014-07-14 18:33:40 +0200604 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 with the same coroutine twice -- it would wrap it in two
606 different Tasks and that can't be good.
607
608 Return the Future's result, or raise its exception.
609 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200610 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200611
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700612 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400613 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200614 if new_task:
615 # An exception is raised if the future didn't complete, so there
616 # is no need to log the "destroy pending task" message
617 future._log_destroy_pending = False
618
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100619 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200620 try:
621 self.run_forever()
622 except:
623 if new_task and future.done() and not future.cancelled():
624 # The coroutine raised a BaseException. Consume the exception
625 # to not log a warning, the caller doesn't have access to the
626 # local task.
627 future.exception()
628 raise
jimmylai21b3e042017-05-22 22:32:46 -0700629 finally:
630 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 if not future.done():
632 raise RuntimeError('Event loop stopped before Future completed.')
633
634 return future.result()
635
636 def stop(self):
637 """Stop running the event loop.
638
Guido van Rossum41f69f42015-11-19 13:28:47 -0800639 Every callback already scheduled will still run. This simply informs
640 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800642 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200644 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700645 """Close the event loop.
646
647 This clears the queues and shuts down the executor,
648 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200649
650 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700651 """
Victor Stinner956de692014-12-26 21:07:52 +0100652 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200653 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200654 if self._closed:
655 return
Victor Stinnere912e652014-07-12 03:11:53 +0200656 if self._debug:
657 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400658 self._closed = True
659 self._ready.clear()
660 self._scheduled.clear()
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400661 self._executor_shutdown_called = True
Yury Selivanove8944cb2015-05-12 11:43:04 -0400662 executor = self._default_executor
663 if executor is not None:
664 self._default_executor = None
Łukasz Langa7f9a2ae2019-06-04 13:03:20 +0200665 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200666
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200667 def is_closed(self):
668 """Returns True if the event loop was closed."""
669 return self._closed
670
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100671 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900672 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100673 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900674 if not self.is_running():
675 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100676
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200678 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100679 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680
681 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200682 """Return the time according to the event loop's clock.
683
684 This is a float expressed in seconds since an epoch, but the
685 epoch, precision, accuracy and drift are unspecified and may
686 differ per event loop.
687 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 return time.monotonic()
689
Yury Selivanovf23746a2018-01-22 19:11:18 -0500690 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 """Arrange for a callback to be called at a given time.
692
693 Return a Handle: an opaque object with a cancel() method that
694 can be used to cancel the call.
695
696 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200697 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698
699 Each callback will be called exactly once. If two callbacks
700 are scheduled for exactly the same time, it undefined which
701 will be called first.
702
703 Any positional arguments after the callback will be passed to
704 the callback when it is called.
705 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500706 timer = self.call_at(self.time() + delay, callback, *args,
707 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200708 if timer._source_traceback:
709 del timer._source_traceback[-1]
710 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711
Yury Selivanovf23746a2018-01-22 19:11:18 -0500712 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200713 """Like call_later(), but uses an absolute time.
714
715 Absolute time corresponds to the event loop's time() method.
716 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100717 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100718 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100719 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700720 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500721 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200722 if timer._source_traceback:
723 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400725 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700726 return timer
727
Yury Selivanovf23746a2018-01-22 19:11:18 -0500728 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 """Arrange for a callback to be called as soon as possible.
730
Victor Stinneracdb7822014-07-14 18:33:40 +0200731 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 order in which they are registered. Each callback will be
733 called exactly once.
734
735 Any positional arguments after the callback will be passed to
736 the callback when it is called.
737 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700738 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100739 if self._debug:
740 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700741 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500742 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200743 if handle._source_traceback:
744 del handle._source_traceback[-1]
745 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100746
Yury Selivanov491a9122016-11-03 15:09:24 -0700747 def _check_callback(self, callback, method):
748 if (coroutines.iscoroutine(callback) or
749 coroutines.iscoroutinefunction(callback)):
750 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500751 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700752 if not callable(callback):
753 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500754 f'a callable object was expected by {method}(), '
755 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700756
Yury Selivanovf23746a2018-01-22 19:11:18 -0500757 def _call_soon(self, callback, args, context):
758 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200759 if handle._source_traceback:
760 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 self._ready.append(handle)
762 return handle
763
Victor Stinner956de692014-12-26 21:07:52 +0100764 def _check_thread(self):
765 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100766
Victor Stinneracdb7822014-07-14 18:33:40 +0200767 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100768 likely behave incorrectly when the assumption is violated.
769
Victor Stinneracdb7822014-07-14 18:33:40 +0200770 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100771 responsible for checking this condition for performance reasons.
772 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100773 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200774 return
Victor Stinner956de692014-12-26 21:07:52 +0100775 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100776 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100777 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200778 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100779 "than the current one")
780
Yury Selivanovf23746a2018-01-22 19:11:18 -0500781 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200782 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700783 self._check_closed()
784 if self._debug:
785 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500786 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200787 if handle._source_traceback:
788 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700789 self._write_to_self()
790 return handle
791
Yury Selivanovbec23722018-01-28 14:09:40 -0500792 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100793 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700794 if self._debug:
795 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700796 if executor is None:
797 executor = self._default_executor
Kyle Stanley9fdc64c2019-09-19 08:47:22 -0400798 # Only check when the default executor is being used
799 self._check_default_executor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700800 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400801 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700802 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500803 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500804 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700805
806 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100807 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
808 warnings.warn(
809 'Using the default executor that is not an instance of '
810 'ThreadPoolExecutor is deprecated and will be prohibited '
811 'in Python 3.9',
812 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813 self._default_executor = executor
814
Victor Stinnere912e652014-07-12 03:11:53 +0200815 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500816 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200817 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500818 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200819 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500820 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200821 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500822 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200823 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500824 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200825 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200826 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200827
828 t0 = self.time()
829 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
830 dt = self.time() - t0
831
Yury Selivanov6370f342017-12-10 18:36:12 -0500832 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200833 if dt >= self.slow_callback_duration:
834 logger.info(msg)
835 else:
836 logger.debug(msg)
837 return addrinfo
838
Yury Selivanov19a44f62017-12-14 20:53:26 -0500839 async def getaddrinfo(self, host, port, *,
840 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400841 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500842 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200843 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500844 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700845
Yury Selivanov19a44f62017-12-14 20:53:26 -0500846 return await self.run_in_executor(
847 None, getaddr_func, host, port, family, type, proto, flags)
848
849 async def getnameinfo(self, sockaddr, flags=0):
850 return await self.run_in_executor(
851 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700852
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200853 async def sock_sendfile(self, sock, file, offset=0, count=None,
854 *, fallback=True):
855 if self._debug and sock.gettimeout() != 0:
856 raise ValueError("the socket must be non-blocking")
857 self._check_sendfile_params(sock, file, offset, count)
858 try:
859 return await self._sock_sendfile_native(sock, file,
860 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700861 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200862 if not fallback:
863 raise
864 return await self._sock_sendfile_fallback(sock, file,
865 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200866
867 async def _sock_sendfile_native(self, sock, file, offset, count):
868 # NB: sendfile syscall is not supported for SSL sockets and
869 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700870 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200871 f"syscall sendfile is not available for socket {sock!r} "
872 "and file {file!r} combination")
873
874 async def _sock_sendfile_fallback(self, sock, file, offset, count):
875 if offset:
876 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400877 blocksize = (
878 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
879 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
880 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200881 buf = bytearray(blocksize)
882 total_sent = 0
883 try:
884 while True:
885 if count:
886 blocksize = min(count - total_sent, blocksize)
887 if blocksize <= 0:
888 break
889 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400890 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200891 if not read:
892 break # EOF
Andrew Svetlovef215232019-06-15 14:05:08 +0300893 await self.sock_sendall(sock, view[:read])
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200894 total_sent += read
895 return total_sent
896 finally:
897 if total_sent > 0 and hasattr(file, 'seek'):
898 file.seek(offset + total_sent)
899
900 def _check_sendfile_params(self, sock, file, offset, count):
901 if 'b' not in getattr(file, 'mode', 'b'):
902 raise ValueError("file should be opened in binary mode")
903 if not sock.type == socket.SOCK_STREAM:
904 raise ValueError("only SOCK_STREAM type sockets are supported")
905 if count is not None:
906 if not isinstance(count, int):
907 raise TypeError(
908 "count must be a positive integer (got {!r})".format(count))
909 if count <= 0:
910 raise ValueError(
911 "count must be a positive integer (got {!r})".format(count))
912 if not isinstance(offset, int):
913 raise TypeError(
914 "offset must be a non-negative integer (got {!r})".format(
915 offset))
916 if offset < 0:
917 raise ValueError(
918 "offset must be a non-negative integer (got {!r})".format(
919 offset))
920
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800921 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
922 """Create, bind and connect one socket."""
923 my_exceptions = []
924 exceptions.append(my_exceptions)
925 family, type_, proto, _, address = addr_info
926 sock = None
927 try:
928 sock = socket.socket(family=family, type=type_, proto=proto)
929 sock.setblocking(False)
930 if local_addr_infos is not None:
931 for _, _, _, _, laddr in local_addr_infos:
932 try:
933 sock.bind(laddr)
934 break
935 except OSError as exc:
936 msg = (
937 f'error while attempting to bind on '
938 f'address {laddr!r}: '
939 f'{exc.strerror.lower()}'
940 )
941 exc = OSError(exc.errno, msg)
942 my_exceptions.append(exc)
943 else: # all bind attempts failed
944 raise my_exceptions.pop()
945 await self.sock_connect(sock, address)
946 return sock
947 except OSError as exc:
948 my_exceptions.append(exc)
949 if sock is not None:
950 sock.close()
951 raise
952 except:
953 if sock is not None:
954 sock.close()
955 raise
956
Neil Aspinallf7686c12017-12-19 19:45:42 +0000957 async def create_connection(
958 self, protocol_factory, host=None, port=None,
959 *, ssl=None, family=0,
960 proto=0, flags=0, sock=None,
961 local_addr=None, server_hostname=None,
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800962 ssl_handshake_timeout=None,
963 happy_eyeballs_delay=None, interleave=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200964 """Connect to a TCP server.
965
966 Create a streaming transport connection to a given Internet host and
967 port: socket family AF_INET or socket.AF_INET6 depending on host (or
968 family if specified), socket type SOCK_STREAM. protocol_factory must be
969 a callable returning a protocol instance.
970
971 This method is a coroutine which will try to establish the connection
972 in the background. When successful, the coroutine returns a
973 (transport, protocol) pair.
974 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700975 if server_hostname is not None and not ssl:
976 raise ValueError('server_hostname is only meaningful with ssl')
977
978 if server_hostname is None and ssl:
979 # Use host as default for server_hostname. It is an error
980 # if host is empty or not set, e.g. when an
981 # already-connected socket was passed or when only a port
982 # is given. To avoid this error, you can pass
983 # server_hostname='' -- this will bypass the hostname
984 # check. (This also means that if host is a numeric
985 # IP/IPv6 address, we will attempt to verify that exact
986 # address; this will probably fail, but it is possible to
987 # create a certificate for a specific IP address, so we
988 # don't judge it here.)
989 if not host:
990 raise ValueError('You must set server_hostname '
991 'when using ssl without a host')
992 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700993
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200994 if ssl_handshake_timeout is not None and not ssl:
995 raise ValueError(
996 'ssl_handshake_timeout is only meaningful with ssl')
997
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800998 if happy_eyeballs_delay is not None and interleave is None:
999 # If using happy eyeballs, default to interleave addresses by family
1000 interleave = 1
1001
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001002 if host is not None or port is not None:
1003 if sock is not None:
1004 raise ValueError(
1005 'host/port and sock can not be specified at the same time')
1006
Yury Selivanov19a44f62017-12-14 20:53:26 -05001007 infos = await self._ensure_resolved(
1008 (host, port), family=family,
1009 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001010 if not infos:
1011 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -05001012
1013 if local_addr is not None:
1014 laddr_infos = await self._ensure_resolved(
1015 local_addr, family=family,
1016 type=socket.SOCK_STREAM, proto=proto,
1017 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001018 if not laddr_infos:
1019 raise OSError('getaddrinfo() returned empty list')
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001020 else:
1021 laddr_infos = None
1022
1023 if interleave:
1024 infos = _interleave_addrinfos(infos, interleave)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001025
1026 exceptions = []
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001027 if happy_eyeballs_delay is None:
1028 # not using happy eyeballs
1029 for addrinfo in infos:
1030 try:
1031 sock = await self._connect_sock(
1032 exceptions, addrinfo, laddr_infos)
1033 break
1034 except OSError:
1035 continue
1036 else: # using happy eyeballs
1037 sock, _, _ = await staggered.staggered_race(
1038 (functools.partial(self._connect_sock,
1039 exceptions, addrinfo, laddr_infos)
1040 for addrinfo in infos),
1041 happy_eyeballs_delay, loop=self)
1042
1043 if sock is None:
1044 exceptions = [exc for sub in exceptions for exc in sub]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001045 if len(exceptions) == 1:
1046 raise exceptions[0]
1047 else:
1048 # If they all have the same str(), raise one.
1049 model = str(exceptions[0])
1050 if all(str(exc) == model for exc in exceptions):
1051 raise exceptions[0]
1052 # Raise a combined exception so the user can see all
1053 # the various error messages.
1054 raise OSError('Multiple exceptions: {}'.format(
1055 ', '.join(str(exc) for exc in exceptions)))
1056
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001057 else:
1058 if sock is None:
1059 raise ValueError(
1060 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001061 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -05001062 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1063 # are SOCK_STREAM.
1064 # We support passing AF_UNIX sockets even though we have
1065 # a dedicated API for that: create_unix_connection.
1066 # Disallowing AF_UNIX in this method, breaks backwards
1067 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001068 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001069 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001070
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001071 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001072 sock, protocol_factory, ssl, server_hostname,
1073 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +02001074 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +02001075 # Get the socket from the transport because SSL transport closes
1076 # the old socket and creates a new SSL socket
1077 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001078 logger.debug("%r connected to %s:%r: (%r, %r)",
1079 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001080 return transport, protocol
1081
Neil Aspinallf7686c12017-12-19 19:45:42 +00001082 async def _create_connection_transport(
1083 self, sock, protocol_factory, ssl,
1084 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001085 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001086
1087 sock.setblocking(False)
1088
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001089 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001090 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001091 if ssl:
1092 sslcontext = None if isinstance(ssl, bool) else ssl
1093 transport = self._make_ssl_transport(
1094 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001095 server_side=server_side, server_hostname=server_hostname,
1096 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001097 else:
1098 transport = self._make_socket_transport(sock, protocol, waiter)
1099
Victor Stinner29ad0112015-01-15 00:04:21 +01001100 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001101 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001102 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001103 transport.close()
1104 raise
1105
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001106 return transport, protocol
1107
Andrew Svetlov7c684072018-01-27 21:22:47 +02001108 async def sendfile(self, transport, file, offset=0, count=None,
1109 *, fallback=True):
1110 """Send a file to transport.
1111
1112 Return the total number of bytes which were sent.
1113
1114 The method uses high-performance os.sendfile if available.
1115
1116 file must be a regular file object opened in binary mode.
1117
1118 offset tells from where to start reading the file. If specified,
1119 count is the total number of bytes to transmit as opposed to
1120 sending the file until EOF is reached. File position is updated on
1121 return or also in case of error in which case file.tell()
1122 can be used to figure out the number of bytes
1123 which were sent.
1124
1125 fallback set to True makes asyncio to manually read and send
1126 the file when the platform does not support the sendfile syscall
1127 (e.g. Windows or SSL socket on Unix).
1128
1129 Raise SendfileNotAvailableError if the system does not support
1130 sendfile syscall and fallback is False.
1131 """
1132 if transport.is_closing():
1133 raise RuntimeError("Transport is closing")
1134 mode = getattr(transport, '_sendfile_compatible',
1135 constants._SendfileMode.UNSUPPORTED)
1136 if mode is constants._SendfileMode.UNSUPPORTED:
1137 raise RuntimeError(
1138 f"sendfile is not supported for transport {transport!r}")
1139 if mode is constants._SendfileMode.TRY_NATIVE:
1140 try:
1141 return await self._sendfile_native(transport, file,
1142 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001143 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001144 if not fallback:
1145 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001146
1147 if not fallback:
1148 raise RuntimeError(
1149 f"fallback is disabled and native sendfile is not "
1150 f"supported for transport {transport!r}")
1151
Andrew Svetlov7c684072018-01-27 21:22:47 +02001152 return await self._sendfile_fallback(transport, file,
1153 offset, count)
1154
1155 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001156 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001157 "sendfile syscall is not supported")
1158
1159 async def _sendfile_fallback(self, transp, file, offset, count):
1160 if offset:
1161 file.seek(offset)
1162 blocksize = min(count, 16384) if count else 16384
1163 buf = bytearray(blocksize)
1164 total_sent = 0
1165 proto = _SendfileFallbackProtocol(transp)
1166 try:
1167 while True:
1168 if count:
1169 blocksize = min(count - total_sent, blocksize)
1170 if blocksize <= 0:
1171 return total_sent
1172 view = memoryview(buf)[:blocksize]
Andrew Svetlov02372652019-06-15 14:05:35 +03001173 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov7c684072018-01-27 21:22:47 +02001174 if not read:
1175 return total_sent # EOF
1176 await proto.drain()
Andrew Svetlovef215232019-06-15 14:05:08 +03001177 transp.write(view[:read])
Andrew Svetlov7c684072018-01-27 21:22:47 +02001178 total_sent += read
1179 finally:
1180 if total_sent > 0 and hasattr(file, 'seek'):
1181 file.seek(offset + total_sent)
1182 await proto.restore()
1183
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001184 async def start_tls(self, transport, protocol, sslcontext, *,
1185 server_side=False,
1186 server_hostname=None,
1187 ssl_handshake_timeout=None):
1188 """Upgrade transport to TLS.
1189
1190 Return a new transport that *protocol* should start using
1191 immediately.
1192 """
1193 if ssl is None:
1194 raise RuntimeError('Python ssl module is not available')
1195
1196 if not isinstance(sslcontext, ssl.SSLContext):
1197 raise TypeError(
1198 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1199 f'got {sslcontext!r}')
1200
1201 if not getattr(transport, '_start_tls_compatible', False):
1202 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001203 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001204
1205 waiter = self.create_future()
1206 ssl_protocol = sslproto.SSLProtocol(
1207 self, protocol, sslcontext, waiter,
1208 server_side, server_hostname,
1209 ssl_handshake_timeout=ssl_handshake_timeout,
1210 call_connection_made=False)
1211
Yury Selivanovf2955872018-05-29 01:00:12 -04001212 # Pause early so that "ssl_protocol.data_received()" doesn't
1213 # have a chance to get called before "ssl_protocol.connection_made()".
1214 transport.pause_reading()
1215
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001216 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001217 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1218 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001219
Yury Selivanov96026432018-06-04 11:32:35 -04001220 try:
1221 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +02001222 except BaseException:
Yury Selivanov96026432018-06-04 11:32:35 -04001223 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001224 conmade_cb.cancel()
1225 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001226 raise
1227
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001228 return ssl_protocol._app_transport
1229
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001230 async def create_datagram_endpoint(self, protocol_factory,
1231 local_addr=None, remote_addr=None, *,
1232 family=0, proto=0, flags=0,
1233 reuse_address=None, reuse_port=None,
1234 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001235 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001236 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001237 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001238 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001239 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001240 if (local_addr or remote_addr or
1241 family or proto or flags or
1242 reuse_address or reuse_port or allow_broadcast):
1243 # show the problematic kwargs in exception msg
1244 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1245 family=family, proto=proto, flags=flags,
1246 reuse_address=reuse_address, reuse_port=reuse_port,
1247 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001248 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001249 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001250 f'socket modifier keyword arguments can not be used '
1251 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001252 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001253 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001254 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001255 if not (local_addr or remote_addr):
1256 if family == 0:
1257 raise ValueError('unexpected address family')
1258 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001259 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1260 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001261 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001262 raise TypeError('string is expected')
Quentin Dawans56065d42019-04-09 15:40:59 +02001263
1264 if local_addr and local_addr[0] not in (0, '\x00'):
1265 try:
1266 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1267 os.remove(local_addr)
1268 except FileNotFoundError:
1269 pass
1270 except OSError as err:
1271 # Directory may have permissions only to create socket.
1272 logger.error('Unable to check or remove stale UNIX '
1273 'socket %r: %r',
1274 local_addr, err)
1275
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001276 addr_pairs_info = (((family, proto),
1277 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001278 else:
1279 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001280 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001281 for idx, addr in ((0, local_addr), (1, remote_addr)):
1282 if addr is not None:
1283 assert isinstance(addr, tuple) and len(addr) == 2, (
1284 '2-tuple is expected')
1285
Yury Selivanov19a44f62017-12-14 20:53:26 -05001286 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001287 addr, family=family, type=socket.SOCK_DGRAM,
1288 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001289 if not infos:
1290 raise OSError('getaddrinfo() returned empty list')
1291
1292 for fam, _, pro, _, address in infos:
1293 key = (fam, pro)
1294 if key not in addr_infos:
1295 addr_infos[key] = [None, None]
1296 addr_infos[key][idx] = address
1297
1298 # each addr has to have info for each (family, proto) pair
1299 addr_pairs_info = [
1300 (key, addr_pair) for key, addr_pair in addr_infos.items()
1301 if not ((local_addr and addr_pair[0] is None) or
1302 (remote_addr and addr_pair[1] is None))]
1303
1304 if not addr_pairs_info:
1305 raise ValueError('can not get address information')
1306
1307 exceptions = []
1308
1309 if reuse_address is None:
1310 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1311
1312 for ((family, proto),
1313 (local_address, remote_address)) in addr_pairs_info:
1314 sock = None
1315 r_addr = None
1316 try:
1317 sock = socket.socket(
1318 family=family, type=socket.SOCK_DGRAM, proto=proto)
1319 if reuse_address:
1320 sock.setsockopt(
1321 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1322 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001323 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001324 if allow_broadcast:
1325 sock.setsockopt(
1326 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1327 sock.setblocking(False)
1328
1329 if local_addr:
1330 sock.bind(local_address)
1331 if remote_addr:
Vincent Michel63deaa52019-05-07 19:18:49 +02001332 if not allow_broadcast:
1333 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001334 r_addr = remote_address
1335 except OSError as exc:
1336 if sock is not None:
1337 sock.close()
1338 exceptions.append(exc)
1339 except:
1340 if sock is not None:
1341 sock.close()
1342 raise
1343 else:
1344 break
1345 else:
1346 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001347
1348 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001349 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001350 transport = self._make_datagram_transport(
1351 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001352 if self._debug:
1353 if local_addr:
1354 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1355 "created: (%r, %r)",
1356 local_addr, remote_addr, transport, protocol)
1357 else:
1358 logger.debug("Datagram endpoint remote_addr=%r created: "
1359 "(%r, %r)",
1360 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001361
1362 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001363 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001364 except:
1365 transport.close()
1366 raise
1367
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001368 return transport, protocol
1369
Yury Selivanov19a44f62017-12-14 20:53:26 -05001370 async def _ensure_resolved(self, address, *,
1371 family=0, type=socket.SOCK_STREAM,
1372 proto=0, flags=0, loop):
1373 host, port = address[:2]
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +02001374 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
Yury Selivanov19a44f62017-12-14 20:53:26 -05001375 if info is not None:
1376 # "host" is already a resolved IP.
1377 return [info]
1378 else:
1379 return await loop.getaddrinfo(host, port, family=family, type=type,
1380 proto=proto, flags=flags)
1381
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001382 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001383 infos = await self._ensure_resolved((host, port), family=family,
1384 type=socket.SOCK_STREAM,
1385 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001386 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001387 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001388 return infos
1389
Neil Aspinallf7686c12017-12-19 19:45:42 +00001390 async def create_server(
1391 self, protocol_factory, host=None, port=None,
1392 *,
1393 family=socket.AF_UNSPEC,
1394 flags=socket.AI_PASSIVE,
1395 sock=None,
1396 backlog=100,
1397 ssl=None,
1398 reuse_address=None,
1399 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001400 ssl_handshake_timeout=None,
1401 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001402 """Create a TCP server.
1403
Yury Selivanov6370f342017-12-10 18:36:12 -05001404 The host parameter can be a string, in that case the TCP server is
1405 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001406
1407 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001408 the TCP server is bound to all hosts of the sequence. If a host
1409 appears multiple times (possibly indirectly e.g. when hostnames
1410 resolve to the same IP address), the server is only bound once to that
1411 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001412
Victor Stinneracdb7822014-07-14 18:33:40 +02001413 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001414
1415 This method is a coroutine.
1416 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001417 if isinstance(ssl, bool):
1418 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001419
1420 if ssl_handshake_timeout is not None and ssl is None:
1421 raise ValueError(
1422 'ssl_handshake_timeout is only meaningful with ssl')
1423
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001424 if host is not None or port is not None:
1425 if sock is not None:
1426 raise ValueError(
1427 'host/port and sock can not be specified at the same time')
1428
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001429 if reuse_address is None:
1430 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1431 sockets = []
1432 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001433 hosts = [None]
1434 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001435 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001436 hosts = [host]
1437 else:
1438 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001439
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001440 fs = [self._create_server_getaddrinfo(host, port, family=family,
1441 flags=flags)
1442 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001443 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001444 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001445
1446 completed = False
1447 try:
1448 for res in infos:
1449 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001450 try:
1451 sock = socket.socket(af, socktype, proto)
1452 except socket.error:
1453 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001454 if self._debug:
1455 logger.warning('create_server() failed to create '
1456 'socket.socket(%r, %r, %r)',
1457 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001458 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001459 sockets.append(sock)
1460 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001461 sock.setsockopt(
1462 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1463 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001464 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001465 # Disable IPv4/IPv6 dual stack support (enabled by
1466 # default on Linux) which makes a single socket
1467 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001468 if (_HAS_IPv6 and
1469 af == socket.AF_INET6 and
1470 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001471 sock.setsockopt(socket.IPPROTO_IPV6,
1472 socket.IPV6_V6ONLY,
1473 True)
1474 try:
1475 sock.bind(sa)
1476 except OSError as err:
1477 raise OSError(err.errno, 'error while attempting '
1478 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001479 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001480 completed = True
1481 finally:
1482 if not completed:
1483 for sock in sockets:
1484 sock.close()
1485 else:
1486 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001487 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001488 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001489 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001490 sockets = [sock]
1491
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001492 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001493 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001494
1495 server = Server(self, sockets, protocol_factory,
1496 ssl, backlog, ssl_handshake_timeout)
1497 if start_serving:
1498 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001499 # Skip one loop iteration so that all 'loop.add_reader'
1500 # go through.
1501 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001502
Victor Stinnere912e652014-07-12 03:11:53 +02001503 if self._debug:
1504 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001505 return server
1506
Neil Aspinallf7686c12017-12-19 19:45:42 +00001507 async def connect_accepted_socket(
1508 self, protocol_factory, sock,
1509 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001510 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001511 """Handle an accepted connection.
1512
1513 This is used by servers that accept connections outside of
1514 asyncio but that use asyncio to handle connections.
1515
1516 This method is a coroutine. When completed, the coroutine
1517 returns a (transport, protocol) pair.
1518 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001519 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001520 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001521
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001522 if ssl_handshake_timeout is not None and not ssl:
1523 raise ValueError(
1524 'ssl_handshake_timeout is only meaningful with ssl')
1525
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001526 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001527 sock, protocol_factory, ssl, '', server_side=True,
1528 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001529 if self._debug:
1530 # Get the socket from the transport because SSL transport closes
1531 # the old socket and creates a new SSL socket
1532 sock = transport.get_extra_info('socket')
1533 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1534 return transport, protocol
1535
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001536 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001537 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001538 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001539 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001540
1541 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001542 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001543 except:
1544 transport.close()
1545 raise
1546
Victor Stinneracdb7822014-07-14 18:33:40 +02001547 if self._debug:
1548 logger.debug('Read pipe %r connected: (%r, %r)',
1549 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001550 return transport, protocol
1551
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001552 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001553 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001554 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001555 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001556
1557 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001558 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001559 except:
1560 transport.close()
1561 raise
1562
Victor Stinneracdb7822014-07-14 18:33:40 +02001563 if self._debug:
1564 logger.debug('Write pipe %r connected: (%r, %r)',
1565 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001566 return transport, protocol
1567
Victor Stinneracdb7822014-07-14 18:33:40 +02001568 def _log_subprocess(self, msg, stdin, stdout, stderr):
1569 info = [msg]
1570 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001571 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001572 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001573 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001574 else:
1575 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001576 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001577 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001578 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001579 logger.debug(' '.join(info))
1580
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001581 async def subprocess_shell(self, protocol_factory, cmd, *,
1582 stdin=subprocess.PIPE,
1583 stdout=subprocess.PIPE,
1584 stderr=subprocess.PIPE,
1585 universal_newlines=False,
1586 shell=True, bufsize=0,
sbstpf0d4c642019-05-27 19:51:19 -04001587 encoding=None, errors=None, text=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001588 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001589 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001590 raise ValueError("cmd must be a string")
1591 if universal_newlines:
1592 raise ValueError("universal_newlines must be False")
1593 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001594 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001595 if bufsize != 0:
1596 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001597 if text:
1598 raise ValueError("text must be False")
1599 if encoding is not None:
1600 raise ValueError("encoding must be None")
1601 if errors is not None:
1602 raise ValueError("errors must be None")
1603
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001604 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001605 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001606 if self._debug:
1607 # don't log parameters: they may contain sensitive information
1608 # (password) and may be too long
1609 debug_log = 'run shell command %r' % cmd
1610 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001611 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001612 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001613 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001614 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001615 return transport, protocol
1616
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001617 async def subprocess_exec(self, protocol_factory, program, *args,
1618 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1619 stderr=subprocess.PIPE, universal_newlines=False,
sbstpf0d4c642019-05-27 19:51:19 -04001620 shell=False, bufsize=0,
1621 encoding=None, errors=None, text=None,
1622 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001623 if universal_newlines:
1624 raise ValueError("universal_newlines must be False")
1625 if shell:
1626 raise ValueError("shell must be False")
1627 if bufsize != 0:
1628 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001629 if text:
1630 raise ValueError("text must be False")
1631 if encoding is not None:
1632 raise ValueError("encoding must be None")
1633 if errors is not None:
1634 raise ValueError("errors must be None")
1635
Victor Stinner20e07432014-02-11 11:44:56 +01001636 popen_args = (program,) + args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001637 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001638 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001639 if self._debug:
1640 # don't log parameters: they may contain sensitive information
1641 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001642 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001643 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001644 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001645 protocol, popen_args, False, stdin, stdout, stderr,
1646 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001647 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001648 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001649 return transport, protocol
1650
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001651 def get_exception_handler(self):
1652 """Return an exception handler, or None if the default one is in use.
1653 """
1654 return self._exception_handler
1655
Yury Selivanov569efa22014-02-18 18:02:19 -05001656 def set_exception_handler(self, handler):
1657 """Set handler as the new event loop exception handler.
1658
1659 If handler is None, the default exception handler will
1660 be set.
1661
1662 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001663 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001664 will be a reference to the active event loop, 'context'
1665 will be a dict object (see `call_exception_handler()`
1666 documentation for details about context).
1667 """
1668 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001669 raise TypeError(f'A callable object or None is expected, '
1670 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001671 self._exception_handler = handler
1672
1673 def default_exception_handler(self, context):
1674 """Default exception handler.
1675
1676 This is called when an exception occurs and no exception
1677 handler is set, and can be called by a custom exception
1678 handler that wants to defer to the default behavior.
1679
Antoine Pitrou921e9432017-11-07 17:23:29 +01001680 This default handler logs the error message and other
1681 context-dependent information. In debug mode, a truncated
1682 stack trace is also appended showing where the given object
1683 (e.g. a handle or future or task) was created, if any.
1684
Victor Stinneracdb7822014-07-14 18:33:40 +02001685 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001686 `call_exception_handler()`.
1687 """
1688 message = context.get('message')
1689 if not message:
1690 message = 'Unhandled exception in event loop'
1691
1692 exception = context.get('exception')
1693 if exception is not None:
1694 exc_info = (type(exception), exception, exception.__traceback__)
1695 else:
1696 exc_info = False
1697
Yury Selivanov6370f342017-12-10 18:36:12 -05001698 if ('source_traceback' not in context and
1699 self._current_handle is not None and
1700 self._current_handle._source_traceback):
1701 context['handle_traceback'] = \
1702 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001703
Yury Selivanov569efa22014-02-18 18:02:19 -05001704 log_lines = [message]
1705 for key in sorted(context):
1706 if key in {'message', 'exception'}:
1707 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001708 value = context[key]
1709 if key == 'source_traceback':
1710 tb = ''.join(traceback.format_list(value))
1711 value = 'Object created at (most recent call last):\n'
1712 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001713 elif key == 'handle_traceback':
1714 tb = ''.join(traceback.format_list(value))
1715 value = 'Handle created at (most recent call last):\n'
1716 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001717 else:
1718 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001719 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001720
1721 logger.error('\n'.join(log_lines), exc_info=exc_info)
1722
1723 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001724 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001725
Victor Stinneracdb7822014-07-14 18:33:40 +02001726 The context argument is a dict containing the following keys:
1727
Yury Selivanov569efa22014-02-18 18:02:19 -05001728 - 'message': Error message;
1729 - 'exception' (optional): Exception object;
1730 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001731 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001732 - 'handle' (optional): Handle instance;
1733 - 'protocol' (optional): Protocol instance;
1734 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001735 - 'socket' (optional): Socket instance;
1736 - 'asyncgen' (optional): Asynchronous generator that caused
1737 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001738
Victor Stinneracdb7822014-07-14 18:33:40 +02001739 New keys maybe introduced in the future.
1740
1741 Note: do not overload this method in an event loop subclass.
1742 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001743 `set_exception_handler()` method.
1744 """
1745 if self._exception_handler is None:
1746 try:
1747 self.default_exception_handler(context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001748 except (SystemExit, KeyboardInterrupt):
1749 raise
1750 except BaseException:
Yury Selivanov569efa22014-02-18 18:02:19 -05001751 # Second protection layer for unexpected errors
1752 # in the default implementation, as well as for subclassed
1753 # event loops with overloaded "default_exception_handler".
1754 logger.error('Exception in default exception handler',
1755 exc_info=True)
1756 else:
1757 try:
1758 self._exception_handler(self, context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001759 except (SystemExit, KeyboardInterrupt):
1760 raise
1761 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001762 # Exception in the user set custom exception handler.
1763 try:
1764 # Let's try default handler.
1765 self.default_exception_handler({
1766 'message': 'Unhandled error in exception handler',
1767 'exception': exc,
1768 'context': context,
1769 })
Yury Selivanov431b5402019-05-27 14:45:12 +02001770 except (SystemExit, KeyboardInterrupt):
1771 raise
1772 except BaseException:
Victor Stinneracdb7822014-07-14 18:33:40 +02001773 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001774 # overloaded.
1775 logger.error('Exception in default exception handler '
1776 'while handling an unexpected error '
1777 'in custom exception handler',
1778 exc_info=True)
1779
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001780 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001781 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001782 assert isinstance(handle, events.Handle), 'A Handle is required here'
1783 if handle._cancelled:
1784 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001785 assert not isinstance(handle, events.TimerHandle)
1786 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001787
1788 def _add_callback_signalsafe(self, handle):
1789 """Like _add_callback() but called from a signal handler."""
1790 self._add_callback(handle)
1791 self._write_to_self()
1792
Yury Selivanov592ada92014-09-25 12:07:56 -04001793 def _timer_handle_cancelled(self, handle):
1794 """Notification that a TimerHandle has been cancelled."""
1795 if handle._scheduled:
1796 self._timer_cancelled_count += 1
1797
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001798 def _run_once(self):
1799 """Run one full iteration of the event loop.
1800
1801 This calls all currently ready callbacks, polls for I/O,
1802 schedules the resulting callbacks, and finally schedules
1803 'call_later' callbacks.
1804 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001805
Yury Selivanov592ada92014-09-25 12:07:56 -04001806 sched_count = len(self._scheduled)
1807 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1808 self._timer_cancelled_count / sched_count >
1809 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001810 # Remove delayed calls that were cancelled if their number
1811 # is too high
1812 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001813 for handle in self._scheduled:
1814 if handle._cancelled:
1815 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001816 else:
1817 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001818
Victor Stinner68da8fc2014-09-30 18:08:36 +02001819 heapq.heapify(new_scheduled)
1820 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001821 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001822 else:
1823 # Remove delayed calls that were cancelled from head of queue.
1824 while self._scheduled and self._scheduled[0]._cancelled:
1825 self._timer_cancelled_count -= 1
1826 handle = heapq.heappop(self._scheduled)
1827 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001828
1829 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001830 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001831 timeout = 0
1832 elif self._scheduled:
1833 # Compute the desired timeout.
1834 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001835 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001836
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001837 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001838 self._process_events(event_list)
1839
1840 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001841 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001842 while self._scheduled:
1843 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001844 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001845 break
1846 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001847 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001848 self._ready.append(handle)
1849
1850 # This is the only place where callbacks are actually *called*.
1851 # All other places just add them to ready.
1852 # Note: We run all currently scheduled callbacks, but not any
1853 # callbacks scheduled by callbacks run this time around --
1854 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001855 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001856 ntodo = len(self._ready)
1857 for i in range(ntodo):
1858 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001859 if handle._cancelled:
1860 continue
1861 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001862 try:
1863 self._current_handle = handle
1864 t0 = self.time()
1865 handle._run()
1866 dt = self.time() - t0
1867 if dt >= self.slow_callback_duration:
1868 logger.warning('Executing %s took %.3f seconds',
1869 _format_handle(handle), dt)
1870 finally:
1871 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001872 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001873 handle._run()
1874 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001875
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001876 def _set_coroutine_origin_tracking(self, enabled):
1877 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001878 return
1879
Yury Selivanove8944cb2015-05-12 11:43:04 -04001880 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001881 self._coroutine_origin_tracking_saved_depth = (
1882 sys.get_coroutine_origin_tracking_depth())
1883 sys.set_coroutine_origin_tracking_depth(
1884 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001885 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001886 sys.set_coroutine_origin_tracking_depth(
1887 self._coroutine_origin_tracking_saved_depth)
1888
1889 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001890
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001891 def get_debug(self):
1892 return self._debug
1893
1894 def set_debug(self, enabled):
1895 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001896
Yury Selivanove8944cb2015-05-12 11:43:04 -04001897 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001898 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)