blob: b1a7f88f41165a2e6c412e5a4ed81ac90d8c539c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
twisteroid ambassador88f07a82019-05-05 19:14:35 +080019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020021import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
Quentin Dawans56065d42019-04-09 15:40:59 +020024import stat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010026import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020028import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010030import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070031import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Yury Selivanovf111b3d2017-12-30 00:35:36 -050033try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
37
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080038from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020039from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070041from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020043from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050044from . import sslproto
twisteroid ambassador88f07a82019-05-05 19:14:35 +080045from . import staggered
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020047from . import transports
Yury Selivanov8cd51652019-05-27 15:57:20 +020048from . import trsock
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070049from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov6370f342017-12-10 18:36:12 -050052__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
54
Yury Selivanov592ada92014-09-25 12:07:56 -040055# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
Andrew Svetlov0dd71802018-09-12 14:03:54 -070063
Yury Selivanovd904c232018-06-28 21:59:32 -040064_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
MartinAltmayer944451c2018-07-31 15:06:12 +010066# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
Victor Stinnerc94a93a2016-04-01 21:43:39 +020069
Victor Stinner0e6f52a2014-06-20 17:34:15 +020070def _format_handle(handle):
71 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040072 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020073 # format the task
74 return repr(cb.__self__)
75 else:
76 return str(handle)
77
78
Victor Stinneracdb7822014-07-14 18:33:40 +020079def _format_pipe(fd):
80 if fd == subprocess.PIPE:
81 return '<pipe>'
82 elif fd == subprocess.STDOUT:
83 return '<stdout>'
84 else:
85 return repr(fd)
86
87
Yury Selivanov5587d7c2016-09-15 15:45:07 -040088def _set_reuseport(sock):
89 if not hasattr(socket, 'SO_REUSEPORT'):
90 raise ValueError('reuse_port not supported by socket module')
91 else:
92 try:
93 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
94 except OSError:
95 raise ValueError('reuse_port not supported by socket module, '
96 'SO_REUSEPORT defined but not implemented.')
97
98
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +020099def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400100 # Try to skip getaddrinfo if "host" is already an IP. Users might have
101 # handled name resolution in their own code and pass in resolved IPs.
102 if not hasattr(socket, 'inet_pton'):
103 return
104
105 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
106 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500107 return None
108
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500109 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500110 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500111 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500112 proto = socket.IPPROTO_UDP
113 else:
114 return None
115
Yury Selivanova7146162016-06-02 16:51:07 -0400116 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400117 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700118 elif isinstance(port, bytes) and port == b'':
119 port = 0
120 elif isinstance(port, str) and port == '':
121 port = 0
122 else:
123 # If port's a service name like "http", don't skip getaddrinfo.
124 try:
125 port = int(port)
126 except (TypeError, ValueError):
127 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400128
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400129 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500130 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400131 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500132 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 else:
134 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500135
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400136 if isinstance(host, bytes):
137 host = host.decode('idna')
138 if '%' in host:
139 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
140 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500141 return None
142
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400143 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500144 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400145 socket.inet_pton(af, host)
146 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400147 if _HAS_IPv6 and af == socket.AF_INET6:
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200148 return af, type, proto, '', (host, port, flowinfo, scopeid)
Yury Selivanovd904c232018-06-28 21:59:32 -0400149 else:
150 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400151 except OSError:
152 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500153
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400154 # "host" is not an IP address.
155 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500156
157
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800158def _interleave_addrinfos(addrinfos, first_address_family_count=1):
159 """Interleave list of addrinfo tuples by family."""
160 # Group addresses by family
161 addrinfos_by_family = collections.OrderedDict()
162 for addr in addrinfos:
163 family = addr[0]
164 if family not in addrinfos_by_family:
165 addrinfos_by_family[family] = []
166 addrinfos_by_family[family].append(addr)
167 addrinfos_lists = list(addrinfos_by_family.values())
168
169 reordered = []
170 if first_address_family_count > 1:
171 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
172 del addrinfos_lists[0][:first_address_family_count - 1]
173 reordered.extend(
174 a for a in itertools.chain.from_iterable(
175 itertools.zip_longest(*addrinfos_lists)
176 ) if a is not None)
177 return reordered
178
179
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100180def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500181 if not fut.cancelled():
182 exc = fut.exception()
Yury Selivanov431b5402019-05-27 14:45:12 +0200183 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500184 # Issue #22429: run_forever() already finished, no need to
185 # stop it.
186 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500187 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100188
189
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200190if hasattr(socket, 'TCP_NODELAY'):
191 def _set_nodelay(sock):
192 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
193 sock.type == socket.SOCK_STREAM and
194 sock.proto == socket.IPPROTO_TCP):
195 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
196else:
197 def _set_nodelay(sock):
198 pass
199
200
Andrew Svetlov7c684072018-01-27 21:22:47 +0200201class _SendfileFallbackProtocol(protocols.Protocol):
202 def __init__(self, transp):
203 if not isinstance(transp, transports._FlowControlMixin):
204 raise TypeError("transport should be _FlowControlMixin instance")
205 self._transport = transp
206 self._proto = transp.get_protocol()
207 self._should_resume_reading = transp.is_reading()
208 self._should_resume_writing = transp._protocol_paused
209 transp.pause_reading()
210 transp.set_protocol(self)
211 if self._should_resume_writing:
212 self._write_ready_fut = self._transport._loop.create_future()
213 else:
214 self._write_ready_fut = None
215
216 async def drain(self):
217 if self._transport.is_closing():
218 raise ConnectionError("Connection closed by peer")
219 fut = self._write_ready_fut
220 if fut is None:
221 return
222 await fut
223
224 def connection_made(self, transport):
225 raise RuntimeError("Invalid state: "
226 "connection should have been established already.")
227
228 def connection_lost(self, exc):
229 if self._write_ready_fut is not None:
230 # Never happens if peer disconnects after sending the whole content
231 # Thus disconnection is always an exception from user perspective
232 if exc is None:
233 self._write_ready_fut.set_exception(
234 ConnectionError("Connection is closed by peer"))
235 else:
236 self._write_ready_fut.set_exception(exc)
237 self._proto.connection_lost(exc)
238
239 def pause_writing(self):
240 if self._write_ready_fut is not None:
241 return
242 self._write_ready_fut = self._transport._loop.create_future()
243
244 def resume_writing(self):
245 if self._write_ready_fut is None:
246 return
247 self._write_ready_fut.set_result(False)
248 self._write_ready_fut = None
249
250 def data_received(self, data):
251 raise RuntimeError("Invalid state: reading should be paused")
252
253 def eof_received(self):
254 raise RuntimeError("Invalid state: reading should be paused")
255
256 async def restore(self):
257 self._transport.set_protocol(self._proto)
258 if self._should_resume_reading:
259 self._transport.resume_reading()
260 if self._write_ready_fut is not None:
261 # Cancel the future.
262 # Basically it has no effect because protocol is switched back,
263 # no code should wait for it anymore.
264 self._write_ready_fut.cancel()
265 if self._should_resume_writing:
266 self._proto.resume_writing()
267
268
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269class Server(events.AbstractServer):
270
Yury Selivanovc9070d02018-01-25 18:08:09 -0500271 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
272 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200273 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500274 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200275 self._active_count = 0
276 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500277 self._protocol_factory = protocol_factory
278 self._backlog = backlog
279 self._ssl_context = ssl_context
280 self._ssl_handshake_timeout = ssl_handshake_timeout
281 self._serving = False
282 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283
Victor Stinnere912e652014-07-12 03:11:53 +0200284 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500285 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200286
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200287 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500288 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200289 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200291 def _detach(self):
292 assert self._active_count > 0
293 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500294 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 self._wakeup()
296
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200298 waiters = self._waiters
299 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 for waiter in waiters:
301 if not waiter.done():
302 waiter.set_result(waiter)
303
Yury Selivanovc9070d02018-01-25 18:08:09 -0500304 def _start_serving(self):
305 if self._serving:
306 return
307 self._serving = True
308 for sock in self._sockets:
309 sock.listen(self._backlog)
310 self._loop._start_serving(
311 self._protocol_factory, sock, self._ssl_context,
312 self, self._backlog, self._ssl_handshake_timeout)
313
314 def get_loop(self):
315 return self._loop
316
317 def is_serving(self):
318 return self._serving
319
320 @property
321 def sockets(self):
322 if self._sockets is None:
Yury Selivanov8cd51652019-05-27 15:57:20 +0200323 return ()
324 return tuple(trsock.TransportSocket(s) for s in self._sockets)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500325
326 def close(self):
327 sockets = self._sockets
328 if sockets is None:
329 return
330 self._sockets = None
331
332 for sock in sockets:
333 self._loop._stop_serving(sock)
334
335 self._serving = False
336
337 if (self._serving_forever_fut is not None and
338 not self._serving_forever_fut.done()):
339 self._serving_forever_fut.cancel()
340 self._serving_forever_fut = None
341
342 if self._active_count == 0:
343 self._wakeup()
344
345 async def start_serving(self):
346 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400347 # Skip one loop iteration so that all 'loop.add_reader'
348 # go through.
349 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500350
351 async def serve_forever(self):
352 if self._serving_forever_fut is not None:
353 raise RuntimeError(
354 f'server {self!r} is already being awaited on serve_forever()')
355 if self._sockets is None:
356 raise RuntimeError(f'server {self!r} is closed')
357
358 self._start_serving()
359 self._serving_forever_fut = self._loop.create_future()
360
361 try:
362 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700363 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500364 try:
365 self.close()
366 await self.wait_closed()
367 finally:
368 raise
369 finally:
370 self._serving_forever_fut = None
371
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200372 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500373 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400375 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200376 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200377 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378
379
380class BaseEventLoop(events.AbstractEventLoop):
381
382 def __init__(self):
Victor Stinner0f0a30f2019-06-03 23:31:04 +0200383 # If true, close() waits for the default executor to finish
384 self.wait_executor_on_close = True
Yury Selivanov592ada92014-09-25 12:07:56 -0400385 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200386 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800387 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 self._ready = collections.deque()
389 self._scheduled = []
390 self._default_executor = None
391 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100392 # Identifier of the thread running the event loop, or None if the
393 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100394 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100395 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500396 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800397 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200398 # In debug mode, if the execution of a callback or a step of a task
399 # exceed this duration in seconds, the slow callback/task is logged.
400 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100401 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400402 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800403 self._coroutine_origin_tracking_enabled = False
404 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500406 # A weak set of all asynchronous generators that are
407 # being iterated by the loop.
408 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700409 # Set to True when `loop.shutdown_asyncgens` is called.
410 self._asyncgens_shutdown_called = False
411
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200412 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500413 return (
414 f'<{self.__class__.__name__} running={self.is_running()} '
415 f'closed={self.is_closed()} debug={self.get_debug()}>'
416 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200417
Yury Selivanov7661db62016-05-16 15:38:39 -0400418 def create_future(self):
419 """Create a Future object attached to the loop."""
420 return futures.Future(loop=self)
421
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300422 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200423 """Schedule a coroutine object.
424
Victor Stinneracdb7822014-07-14 18:33:40 +0200425 Return a task object.
426 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100427 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400428 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300429 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400430 if task._source_traceback:
431 del task._source_traceback[-1]
432 else:
433 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300434 tasks._set_task_name(task, name)
435
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200436 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200437
Yury Selivanov740169c2015-05-11 14:23:38 -0400438 def set_task_factory(self, factory):
439 """Set a task factory that will be used by loop.create_task().
440
441 If factory is None the default task factory will be set.
442
443 If factory is a callable, it should have a signature matching
444 '(loop, coro)', where 'loop' will be a reference to the active
445 event loop, 'coro' will be a coroutine object. The callable
446 must return a Future.
447 """
448 if factory is not None and not callable(factory):
449 raise TypeError('task factory must be a callable or None')
450 self._task_factory = factory
451
452 def get_task_factory(self):
453 """Return a task factory, or None if the default one is in use."""
454 return self._task_factory
455
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 def _make_socket_transport(self, sock, protocol, waiter=None, *,
457 extra=None, server=None):
458 """Create socket transport."""
459 raise NotImplementedError
460
Neil Aspinallf7686c12017-12-19 19:45:42 +0000461 def _make_ssl_transport(
462 self, rawsock, protocol, sslcontext, waiter=None,
463 *, server_side=False, server_hostname=None,
464 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500465 ssl_handshake_timeout=None,
466 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 """Create SSL transport."""
468 raise NotImplementedError
469
470 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200471 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 """Create datagram transport."""
473 raise NotImplementedError
474
475 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
476 extra=None):
477 """Create read pipe transport."""
478 raise NotImplementedError
479
480 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
481 extra=None):
482 """Create write pipe transport."""
483 raise NotImplementedError
484
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200485 async def _make_subprocess_transport(self, protocol, args, shell,
486 stdin, stdout, stderr, bufsize,
487 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 """Create subprocess transport."""
489 raise NotImplementedError
490
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200492 """Write a byte to self-pipe, to wake up the event loop.
493
494 This may be called from a different thread.
495
496 The subclass is responsible for implementing the self-pipe.
497 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 raise NotImplementedError
499
500 def _process_events(self, event_list):
501 """Process selector events."""
502 raise NotImplementedError
503
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200504 def _check_closed(self):
505 if self._closed:
506 raise RuntimeError('Event loop is closed')
507
Yury Selivanoveb636452016-09-08 22:01:51 -0700508 def _asyncgen_finalizer_hook(self, agen):
509 self._asyncgens.discard(agen)
510 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800511 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700512
513 def _asyncgen_firstiter_hook(self, agen):
514 if self._asyncgens_shutdown_called:
515 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500516 f"asynchronous generator {agen!r} was scheduled after "
517 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700518 ResourceWarning, source=self)
519
520 self._asyncgens.add(agen)
521
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200522 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700523 """Shutdown all active asynchronous generators."""
524 self._asyncgens_shutdown_called = True
525
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500526 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400527 # If Python version is <3.6 or we don't have any asynchronous
528 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700529 return
530
531 closing_agens = list(self._asyncgens)
532 self._asyncgens.clear()
533
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200534 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700535 *[ag.aclose() for ag in closing_agens],
536 return_exceptions=True,
537 loop=self)
538
Yury Selivanoveb636452016-09-08 22:01:51 -0700539 for result, agen in zip(results, closing_agens):
540 if isinstance(result, Exception):
541 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500542 'message': f'an error occurred during closing of '
543 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700544 'exception': result,
545 'asyncgen': agen
546 })
547
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 def run_forever(self):
549 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200550 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100551 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400552 raise RuntimeError('This event loop is already running')
553 if events._get_running_loop() is not None:
554 raise RuntimeError(
555 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800556 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100557 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500558
559 old_agen_hooks = sys.get_asyncgen_hooks()
560 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
561 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400563 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800565 self._run_once()
566 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 break
568 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800569 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100570 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400571 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800572 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500573 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574
575 def run_until_complete(self, future):
576 """Run until the Future is done.
577
578 If the argument is a coroutine, it is wrapped in a Task.
579
Victor Stinneracdb7822014-07-14 18:33:40 +0200580 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 with the same coroutine twice -- it would wrap it in two
582 different Tasks and that can't be good.
583
584 Return the Future's result, or raise its exception.
585 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200586 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200587
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700588 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400589 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200590 if new_task:
591 # An exception is raised if the future didn't complete, so there
592 # is no need to log the "destroy pending task" message
593 future._log_destroy_pending = False
594
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100595 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200596 try:
597 self.run_forever()
598 except:
599 if new_task and future.done() and not future.cancelled():
600 # The coroutine raised a BaseException. Consume the exception
601 # to not log a warning, the caller doesn't have access to the
602 # local task.
603 future.exception()
604 raise
jimmylai21b3e042017-05-22 22:32:46 -0700605 finally:
606 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 if not future.done():
608 raise RuntimeError('Event loop stopped before Future completed.')
609
610 return future.result()
611
612 def stop(self):
613 """Stop running the event loop.
614
Guido van Rossum41f69f42015-11-19 13:28:47 -0800615 Every callback already scheduled will still run. This simply informs
616 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700617 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800618 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200620 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700621 """Close the event loop.
622
623 This clears the queues and shuts down the executor,
624 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200625
626 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700627 """
Victor Stinner956de692014-12-26 21:07:52 +0100628 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200629 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200630 if self._closed:
631 return
Victor Stinnere912e652014-07-12 03:11:53 +0200632 if self._debug:
633 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400634 self._closed = True
635 self._ready.clear()
636 self._scheduled.clear()
637 executor = self._default_executor
638 if executor is not None:
639 self._default_executor = None
Victor Stinner0f0a30f2019-06-03 23:31:04 +0200640 executor.shutdown(wait=self.wait_executor_on_close)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200641
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200642 def is_closed(self):
643 """Returns True if the event loop was closed."""
644 return self._closed
645
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100646 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900647 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100648 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900649 if not self.is_running():
650 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100651
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200653 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100654 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655
656 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200657 """Return the time according to the event loop's clock.
658
659 This is a float expressed in seconds since an epoch, but the
660 epoch, precision, accuracy and drift are unspecified and may
661 differ per event loop.
662 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663 return time.monotonic()
664
Yury Selivanovf23746a2018-01-22 19:11:18 -0500665 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 """Arrange for a callback to be called at a given time.
667
668 Return a Handle: an opaque object with a cancel() method that
669 can be used to cancel the call.
670
671 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200672 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673
674 Each callback will be called exactly once. If two callbacks
675 are scheduled for exactly the same time, it undefined which
676 will be called first.
677
678 Any positional arguments after the callback will be passed to
679 the callback when it is called.
680 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500681 timer = self.call_at(self.time() + delay, callback, *args,
682 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200683 if timer._source_traceback:
684 del timer._source_traceback[-1]
685 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686
Yury Selivanovf23746a2018-01-22 19:11:18 -0500687 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200688 """Like call_later(), but uses an absolute time.
689
690 Absolute time corresponds to the event loop's time() method.
691 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100692 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100693 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100694 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700695 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500696 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200697 if timer._source_traceback:
698 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400700 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 return timer
702
Yury Selivanovf23746a2018-01-22 19:11:18 -0500703 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 """Arrange for a callback to be called as soon as possible.
705
Victor Stinneracdb7822014-07-14 18:33:40 +0200706 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 order in which they are registered. Each callback will be
708 called exactly once.
709
710 Any positional arguments after the callback will be passed to
711 the callback when it is called.
712 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700713 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100714 if self._debug:
715 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700716 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500717 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200718 if handle._source_traceback:
719 del handle._source_traceback[-1]
720 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100721
Yury Selivanov491a9122016-11-03 15:09:24 -0700722 def _check_callback(self, callback, method):
723 if (coroutines.iscoroutine(callback) or
724 coroutines.iscoroutinefunction(callback)):
725 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500726 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700727 if not callable(callback):
728 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500729 f'a callable object was expected by {method}(), '
730 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700731
Yury Selivanovf23746a2018-01-22 19:11:18 -0500732 def _call_soon(self, callback, args, context):
733 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200734 if handle._source_traceback:
735 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 self._ready.append(handle)
737 return handle
738
Victor Stinner956de692014-12-26 21:07:52 +0100739 def _check_thread(self):
740 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100741
Victor Stinneracdb7822014-07-14 18:33:40 +0200742 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100743 likely behave incorrectly when the assumption is violated.
744
Victor Stinneracdb7822014-07-14 18:33:40 +0200745 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100746 responsible for checking this condition for performance reasons.
747 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100748 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200749 return
Victor Stinner956de692014-12-26 21:07:52 +0100750 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100751 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100752 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200753 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100754 "than the current one")
755
Yury Selivanovf23746a2018-01-22 19:11:18 -0500756 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200757 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700758 self._check_closed()
759 if self._debug:
760 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500761 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200762 if handle._source_traceback:
763 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700764 self._write_to_self()
765 return handle
766
Yury Selivanovbec23722018-01-28 14:09:40 -0500767 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100768 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700769 if self._debug:
770 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700771 if executor is None:
772 executor = self._default_executor
773 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400774 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700775 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500776 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500777 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778
779 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100780 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
781 warnings.warn(
782 'Using the default executor that is not an instance of '
783 'ThreadPoolExecutor is deprecated and will be prohibited '
784 'in Python 3.9',
785 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700786 self._default_executor = executor
787
Victor Stinnere912e652014-07-12 03:11:53 +0200788 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500789 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200790 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500791 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200792 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500793 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200794 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500795 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200796 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500797 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200798 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200799 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200800
801 t0 = self.time()
802 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
803 dt = self.time() - t0
804
Yury Selivanov6370f342017-12-10 18:36:12 -0500805 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200806 if dt >= self.slow_callback_duration:
807 logger.info(msg)
808 else:
809 logger.debug(msg)
810 return addrinfo
811
Yury Selivanov19a44f62017-12-14 20:53:26 -0500812 async def getaddrinfo(self, host, port, *,
813 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400814 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500815 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200816 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500817 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700818
Yury Selivanov19a44f62017-12-14 20:53:26 -0500819 return await self.run_in_executor(
820 None, getaddr_func, host, port, family, type, proto, flags)
821
822 async def getnameinfo(self, sockaddr, flags=0):
823 return await self.run_in_executor(
824 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700825
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200826 async def sock_sendfile(self, sock, file, offset=0, count=None,
827 *, fallback=True):
828 if self._debug and sock.gettimeout() != 0:
829 raise ValueError("the socket must be non-blocking")
830 self._check_sendfile_params(sock, file, offset, count)
831 try:
832 return await self._sock_sendfile_native(sock, file,
833 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700834 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200835 if not fallback:
836 raise
837 return await self._sock_sendfile_fallback(sock, file,
838 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200839
840 async def _sock_sendfile_native(self, sock, file, offset, count):
841 # NB: sendfile syscall is not supported for SSL sockets and
842 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700843 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200844 f"syscall sendfile is not available for socket {sock!r} "
845 "and file {file!r} combination")
846
847 async def _sock_sendfile_fallback(self, sock, file, offset, count):
848 if offset:
849 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400850 blocksize = (
851 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
852 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
853 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200854 buf = bytearray(blocksize)
855 total_sent = 0
856 try:
857 while True:
858 if count:
859 blocksize = min(count - total_sent, blocksize)
860 if blocksize <= 0:
861 break
862 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400863 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200864 if not read:
865 break # EOF
866 await self.sock_sendall(sock, view)
867 total_sent += read
868 return total_sent
869 finally:
870 if total_sent > 0 and hasattr(file, 'seek'):
871 file.seek(offset + total_sent)
872
873 def _check_sendfile_params(self, sock, file, offset, count):
874 if 'b' not in getattr(file, 'mode', 'b'):
875 raise ValueError("file should be opened in binary mode")
876 if not sock.type == socket.SOCK_STREAM:
877 raise ValueError("only SOCK_STREAM type sockets are supported")
878 if count is not None:
879 if not isinstance(count, int):
880 raise TypeError(
881 "count must be a positive integer (got {!r})".format(count))
882 if count <= 0:
883 raise ValueError(
884 "count must be a positive integer (got {!r})".format(count))
885 if not isinstance(offset, int):
886 raise TypeError(
887 "offset must be a non-negative integer (got {!r})".format(
888 offset))
889 if offset < 0:
890 raise ValueError(
891 "offset must be a non-negative integer (got {!r})".format(
892 offset))
893
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800894 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
895 """Create, bind and connect one socket."""
896 my_exceptions = []
897 exceptions.append(my_exceptions)
898 family, type_, proto, _, address = addr_info
899 sock = None
900 try:
901 sock = socket.socket(family=family, type=type_, proto=proto)
902 sock.setblocking(False)
903 if local_addr_infos is not None:
904 for _, _, _, _, laddr in local_addr_infos:
905 try:
906 sock.bind(laddr)
907 break
908 except OSError as exc:
909 msg = (
910 f'error while attempting to bind on '
911 f'address {laddr!r}: '
912 f'{exc.strerror.lower()}'
913 )
914 exc = OSError(exc.errno, msg)
915 my_exceptions.append(exc)
916 else: # all bind attempts failed
917 raise my_exceptions.pop()
918 await self.sock_connect(sock, address)
919 return sock
920 except OSError as exc:
921 my_exceptions.append(exc)
922 if sock is not None:
923 sock.close()
924 raise
925 except:
926 if sock is not None:
927 sock.close()
928 raise
929
Neil Aspinallf7686c12017-12-19 19:45:42 +0000930 async def create_connection(
931 self, protocol_factory, host=None, port=None,
932 *, ssl=None, family=0,
933 proto=0, flags=0, sock=None,
934 local_addr=None, server_hostname=None,
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800935 ssl_handshake_timeout=None,
936 happy_eyeballs_delay=None, interleave=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200937 """Connect to a TCP server.
938
939 Create a streaming transport connection to a given Internet host and
940 port: socket family AF_INET or socket.AF_INET6 depending on host (or
941 family if specified), socket type SOCK_STREAM. protocol_factory must be
942 a callable returning a protocol instance.
943
944 This method is a coroutine which will try to establish the connection
945 in the background. When successful, the coroutine returns a
946 (transport, protocol) pair.
947 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700948 if server_hostname is not None and not ssl:
949 raise ValueError('server_hostname is only meaningful with ssl')
950
951 if server_hostname is None and ssl:
952 # Use host as default for server_hostname. It is an error
953 # if host is empty or not set, e.g. when an
954 # already-connected socket was passed or when only a port
955 # is given. To avoid this error, you can pass
956 # server_hostname='' -- this will bypass the hostname
957 # check. (This also means that if host is a numeric
958 # IP/IPv6 address, we will attempt to verify that exact
959 # address; this will probably fail, but it is possible to
960 # create a certificate for a specific IP address, so we
961 # don't judge it here.)
962 if not host:
963 raise ValueError('You must set server_hostname '
964 'when using ssl without a host')
965 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700966
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200967 if ssl_handshake_timeout is not None and not ssl:
968 raise ValueError(
969 'ssl_handshake_timeout is only meaningful with ssl')
970
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800971 if happy_eyeballs_delay is not None and interleave is None:
972 # If using happy eyeballs, default to interleave addresses by family
973 interleave = 1
974
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700975 if host is not None or port is not None:
976 if sock is not None:
977 raise ValueError(
978 'host/port and sock can not be specified at the same time')
979
Yury Selivanov19a44f62017-12-14 20:53:26 -0500980 infos = await self._ensure_resolved(
981 (host, port), family=family,
982 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700983 if not infos:
984 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500985
986 if local_addr is not None:
987 laddr_infos = await self._ensure_resolved(
988 local_addr, family=family,
989 type=socket.SOCK_STREAM, proto=proto,
990 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700991 if not laddr_infos:
992 raise OSError('getaddrinfo() returned empty list')
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800993 else:
994 laddr_infos = None
995
996 if interleave:
997 infos = _interleave_addrinfos(infos, interleave)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700998
999 exceptions = []
twisteroid ambassador88f07a82019-05-05 19:14:35 +08001000 if happy_eyeballs_delay is None:
1001 # not using happy eyeballs
1002 for addrinfo in infos:
1003 try:
1004 sock = await self._connect_sock(
1005 exceptions, addrinfo, laddr_infos)
1006 break
1007 except OSError:
1008 continue
1009 else: # using happy eyeballs
1010 sock, _, _ = await staggered.staggered_race(
1011 (functools.partial(self._connect_sock,
1012 exceptions, addrinfo, laddr_infos)
1013 for addrinfo in infos),
1014 happy_eyeballs_delay, loop=self)
1015
1016 if sock is None:
1017 exceptions = [exc for sub in exceptions for exc in sub]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001018 if len(exceptions) == 1:
1019 raise exceptions[0]
1020 else:
1021 # If they all have the same str(), raise one.
1022 model = str(exceptions[0])
1023 if all(str(exc) == model for exc in exceptions):
1024 raise exceptions[0]
1025 # Raise a combined exception so the user can see all
1026 # the various error messages.
1027 raise OSError('Multiple exceptions: {}'.format(
1028 ', '.join(str(exc) for exc in exceptions)))
1029
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001030 else:
1031 if sock is None:
1032 raise ValueError(
1033 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001034 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -05001035 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1036 # are SOCK_STREAM.
1037 # We support passing AF_UNIX sockets even though we have
1038 # a dedicated API for that: create_unix_connection.
1039 # Disallowing AF_UNIX in this method, breaks backwards
1040 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001041 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001042 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001043
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001044 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001045 sock, protocol_factory, ssl, server_hostname,
1046 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +02001047 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +02001048 # Get the socket from the transport because SSL transport closes
1049 # the old socket and creates a new SSL socket
1050 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001051 logger.debug("%r connected to %s:%r: (%r, %r)",
1052 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001053 return transport, protocol
1054
Neil Aspinallf7686c12017-12-19 19:45:42 +00001055 async def _create_connection_transport(
1056 self, sock, protocol_factory, ssl,
1057 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001058 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001059
1060 sock.setblocking(False)
1061
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001062 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001063 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001064 if ssl:
1065 sslcontext = None if isinstance(ssl, bool) else ssl
1066 transport = self._make_ssl_transport(
1067 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001068 server_side=server_side, server_hostname=server_hostname,
1069 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001070 else:
1071 transport = self._make_socket_transport(sock, protocol, waiter)
1072
Victor Stinner29ad0112015-01-15 00:04:21 +01001073 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001074 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001075 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001076 transport.close()
1077 raise
1078
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001079 return transport, protocol
1080
Andrew Svetlov7c684072018-01-27 21:22:47 +02001081 async def sendfile(self, transport, file, offset=0, count=None,
1082 *, fallback=True):
1083 """Send a file to transport.
1084
1085 Return the total number of bytes which were sent.
1086
1087 The method uses high-performance os.sendfile if available.
1088
1089 file must be a regular file object opened in binary mode.
1090
1091 offset tells from where to start reading the file. If specified,
1092 count is the total number of bytes to transmit as opposed to
1093 sending the file until EOF is reached. File position is updated on
1094 return or also in case of error in which case file.tell()
1095 can be used to figure out the number of bytes
1096 which were sent.
1097
1098 fallback set to True makes asyncio to manually read and send
1099 the file when the platform does not support the sendfile syscall
1100 (e.g. Windows or SSL socket on Unix).
1101
1102 Raise SendfileNotAvailableError if the system does not support
1103 sendfile syscall and fallback is False.
1104 """
1105 if transport.is_closing():
1106 raise RuntimeError("Transport is closing")
1107 mode = getattr(transport, '_sendfile_compatible',
1108 constants._SendfileMode.UNSUPPORTED)
1109 if mode is constants._SendfileMode.UNSUPPORTED:
1110 raise RuntimeError(
1111 f"sendfile is not supported for transport {transport!r}")
1112 if mode is constants._SendfileMode.TRY_NATIVE:
1113 try:
1114 return await self._sendfile_native(transport, file,
1115 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001116 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001117 if not fallback:
1118 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001119
1120 if not fallback:
1121 raise RuntimeError(
1122 f"fallback is disabled and native sendfile is not "
1123 f"supported for transport {transport!r}")
1124
Andrew Svetlov7c684072018-01-27 21:22:47 +02001125 return await self._sendfile_fallback(transport, file,
1126 offset, count)
1127
1128 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001129 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001130 "sendfile syscall is not supported")
1131
1132 async def _sendfile_fallback(self, transp, file, offset, count):
1133 if offset:
1134 file.seek(offset)
1135 blocksize = min(count, 16384) if count else 16384
1136 buf = bytearray(blocksize)
1137 total_sent = 0
1138 proto = _SendfileFallbackProtocol(transp)
1139 try:
1140 while True:
1141 if count:
1142 blocksize = min(count - total_sent, blocksize)
1143 if blocksize <= 0:
1144 return total_sent
1145 view = memoryview(buf)[:blocksize]
1146 read = file.readinto(view)
1147 if not read:
1148 return total_sent # EOF
1149 await proto.drain()
1150 transp.write(view)
1151 total_sent += read
1152 finally:
1153 if total_sent > 0 and hasattr(file, 'seek'):
1154 file.seek(offset + total_sent)
1155 await proto.restore()
1156
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001157 async def start_tls(self, transport, protocol, sslcontext, *,
1158 server_side=False,
1159 server_hostname=None,
1160 ssl_handshake_timeout=None):
1161 """Upgrade transport to TLS.
1162
1163 Return a new transport that *protocol* should start using
1164 immediately.
1165 """
1166 if ssl is None:
1167 raise RuntimeError('Python ssl module is not available')
1168
1169 if not isinstance(sslcontext, ssl.SSLContext):
1170 raise TypeError(
1171 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1172 f'got {sslcontext!r}')
1173
1174 if not getattr(transport, '_start_tls_compatible', False):
1175 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001176 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001177
1178 waiter = self.create_future()
1179 ssl_protocol = sslproto.SSLProtocol(
1180 self, protocol, sslcontext, waiter,
1181 server_side, server_hostname,
1182 ssl_handshake_timeout=ssl_handshake_timeout,
1183 call_connection_made=False)
1184
Yury Selivanovf2955872018-05-29 01:00:12 -04001185 # Pause early so that "ssl_protocol.data_received()" doesn't
1186 # have a chance to get called before "ssl_protocol.connection_made()".
1187 transport.pause_reading()
1188
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001189 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001190 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1191 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001192
Yury Selivanov96026432018-06-04 11:32:35 -04001193 try:
1194 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +02001195 except BaseException:
Yury Selivanov96026432018-06-04 11:32:35 -04001196 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001197 conmade_cb.cancel()
1198 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001199 raise
1200
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001201 return ssl_protocol._app_transport
1202
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001203 async def create_datagram_endpoint(self, protocol_factory,
1204 local_addr=None, remote_addr=None, *,
1205 family=0, proto=0, flags=0,
1206 reuse_address=None, reuse_port=None,
1207 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001208 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001209 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001210 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001211 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001212 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001213 if (local_addr or remote_addr or
1214 family or proto or flags or
1215 reuse_address or reuse_port or allow_broadcast):
1216 # show the problematic kwargs in exception msg
1217 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1218 family=family, proto=proto, flags=flags,
1219 reuse_address=reuse_address, reuse_port=reuse_port,
1220 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001221 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001222 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001223 f'socket modifier keyword arguments can not be used '
1224 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001225 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001226 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001227 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001228 if not (local_addr or remote_addr):
1229 if family == 0:
1230 raise ValueError('unexpected address family')
1231 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001232 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1233 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001234 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001235 raise TypeError('string is expected')
Quentin Dawans56065d42019-04-09 15:40:59 +02001236
1237 if local_addr and local_addr[0] not in (0, '\x00'):
1238 try:
1239 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1240 os.remove(local_addr)
1241 except FileNotFoundError:
1242 pass
1243 except OSError as err:
1244 # Directory may have permissions only to create socket.
1245 logger.error('Unable to check or remove stale UNIX '
1246 'socket %r: %r',
1247 local_addr, err)
1248
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001249 addr_pairs_info = (((family, proto),
1250 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001251 else:
1252 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001253 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001254 for idx, addr in ((0, local_addr), (1, remote_addr)):
1255 if addr is not None:
1256 assert isinstance(addr, tuple) and len(addr) == 2, (
1257 '2-tuple is expected')
1258
Yury Selivanov19a44f62017-12-14 20:53:26 -05001259 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001260 addr, family=family, type=socket.SOCK_DGRAM,
1261 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001262 if not infos:
1263 raise OSError('getaddrinfo() returned empty list')
1264
1265 for fam, _, pro, _, address in infos:
1266 key = (fam, pro)
1267 if key not in addr_infos:
1268 addr_infos[key] = [None, None]
1269 addr_infos[key][idx] = address
1270
1271 # each addr has to have info for each (family, proto) pair
1272 addr_pairs_info = [
1273 (key, addr_pair) for key, addr_pair in addr_infos.items()
1274 if not ((local_addr and addr_pair[0] is None) or
1275 (remote_addr and addr_pair[1] is None))]
1276
1277 if not addr_pairs_info:
1278 raise ValueError('can not get address information')
1279
1280 exceptions = []
1281
1282 if reuse_address is None:
1283 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1284
1285 for ((family, proto),
1286 (local_address, remote_address)) in addr_pairs_info:
1287 sock = None
1288 r_addr = None
1289 try:
1290 sock = socket.socket(
1291 family=family, type=socket.SOCK_DGRAM, proto=proto)
1292 if reuse_address:
1293 sock.setsockopt(
1294 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1295 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001296 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001297 if allow_broadcast:
1298 sock.setsockopt(
1299 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1300 sock.setblocking(False)
1301
1302 if local_addr:
1303 sock.bind(local_address)
1304 if remote_addr:
Vincent Michel63deaa52019-05-07 19:18:49 +02001305 if not allow_broadcast:
1306 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001307 r_addr = remote_address
1308 except OSError as exc:
1309 if sock is not None:
1310 sock.close()
1311 exceptions.append(exc)
1312 except:
1313 if sock is not None:
1314 sock.close()
1315 raise
1316 else:
1317 break
1318 else:
1319 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001320
1321 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001322 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001323 transport = self._make_datagram_transport(
1324 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001325 if self._debug:
1326 if local_addr:
1327 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1328 "created: (%r, %r)",
1329 local_addr, remote_addr, transport, protocol)
1330 else:
1331 logger.debug("Datagram endpoint remote_addr=%r created: "
1332 "(%r, %r)",
1333 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001334
1335 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001336 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001337 except:
1338 transport.close()
1339 raise
1340
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001341 return transport, protocol
1342
Yury Selivanov19a44f62017-12-14 20:53:26 -05001343 async def _ensure_resolved(self, address, *,
1344 family=0, type=socket.SOCK_STREAM,
1345 proto=0, flags=0, loop):
1346 host, port = address[:2]
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +02001347 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
Yury Selivanov19a44f62017-12-14 20:53:26 -05001348 if info is not None:
1349 # "host" is already a resolved IP.
1350 return [info]
1351 else:
1352 return await loop.getaddrinfo(host, port, family=family, type=type,
1353 proto=proto, flags=flags)
1354
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001355 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001356 infos = await self._ensure_resolved((host, port), family=family,
1357 type=socket.SOCK_STREAM,
1358 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001359 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001360 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001361 return infos
1362
Neil Aspinallf7686c12017-12-19 19:45:42 +00001363 async def create_server(
1364 self, protocol_factory, host=None, port=None,
1365 *,
1366 family=socket.AF_UNSPEC,
1367 flags=socket.AI_PASSIVE,
1368 sock=None,
1369 backlog=100,
1370 ssl=None,
1371 reuse_address=None,
1372 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001373 ssl_handshake_timeout=None,
1374 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001375 """Create a TCP server.
1376
Yury Selivanov6370f342017-12-10 18:36:12 -05001377 The host parameter can be a string, in that case the TCP server is
1378 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001379
1380 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001381 the TCP server is bound to all hosts of the sequence. If a host
1382 appears multiple times (possibly indirectly e.g. when hostnames
1383 resolve to the same IP address), the server is only bound once to that
1384 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001385
Victor Stinneracdb7822014-07-14 18:33:40 +02001386 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001387
1388 This method is a coroutine.
1389 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001390 if isinstance(ssl, bool):
1391 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001392
1393 if ssl_handshake_timeout is not None and ssl is None:
1394 raise ValueError(
1395 'ssl_handshake_timeout is only meaningful with ssl')
1396
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001397 if host is not None or port is not None:
1398 if sock is not None:
1399 raise ValueError(
1400 'host/port and sock can not be specified at the same time')
1401
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001402 if reuse_address is None:
1403 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1404 sockets = []
1405 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001406 hosts = [None]
1407 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001408 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001409 hosts = [host]
1410 else:
1411 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001412
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001413 fs = [self._create_server_getaddrinfo(host, port, family=family,
1414 flags=flags)
1415 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001416 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001417 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001418
1419 completed = False
1420 try:
1421 for res in infos:
1422 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001423 try:
1424 sock = socket.socket(af, socktype, proto)
1425 except socket.error:
1426 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001427 if self._debug:
1428 logger.warning('create_server() failed to create '
1429 'socket.socket(%r, %r, %r)',
1430 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001431 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001432 sockets.append(sock)
1433 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001434 sock.setsockopt(
1435 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1436 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001437 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001438 # Disable IPv4/IPv6 dual stack support (enabled by
1439 # default on Linux) which makes a single socket
1440 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001441 if (_HAS_IPv6 and
1442 af == socket.AF_INET6 and
1443 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001444 sock.setsockopt(socket.IPPROTO_IPV6,
1445 socket.IPV6_V6ONLY,
1446 True)
1447 try:
1448 sock.bind(sa)
1449 except OSError as err:
1450 raise OSError(err.errno, 'error while attempting '
1451 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001452 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001453 completed = True
1454 finally:
1455 if not completed:
1456 for sock in sockets:
1457 sock.close()
1458 else:
1459 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001460 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001461 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001462 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001463 sockets = [sock]
1464
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001465 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001466 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001467
1468 server = Server(self, sockets, protocol_factory,
1469 ssl, backlog, ssl_handshake_timeout)
1470 if start_serving:
1471 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001472 # Skip one loop iteration so that all 'loop.add_reader'
1473 # go through.
1474 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001475
Victor Stinnere912e652014-07-12 03:11:53 +02001476 if self._debug:
1477 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001478 return server
1479
Neil Aspinallf7686c12017-12-19 19:45:42 +00001480 async def connect_accepted_socket(
1481 self, protocol_factory, sock,
1482 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001483 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001484 """Handle an accepted connection.
1485
1486 This is used by servers that accept connections outside of
1487 asyncio but that use asyncio to handle connections.
1488
1489 This method is a coroutine. When completed, the coroutine
1490 returns a (transport, protocol) pair.
1491 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001492 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001493 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001494
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001495 if ssl_handshake_timeout is not None and not ssl:
1496 raise ValueError(
1497 'ssl_handshake_timeout is only meaningful with ssl')
1498
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001499 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001500 sock, protocol_factory, ssl, '', server_side=True,
1501 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001502 if self._debug:
1503 # Get the socket from the transport because SSL transport closes
1504 # the old socket and creates a new SSL socket
1505 sock = transport.get_extra_info('socket')
1506 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1507 return transport, protocol
1508
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001509 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001510 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001511 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001512 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001513
1514 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001515 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001516 except:
1517 transport.close()
1518 raise
1519
Victor Stinneracdb7822014-07-14 18:33:40 +02001520 if self._debug:
1521 logger.debug('Read pipe %r connected: (%r, %r)',
1522 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001523 return transport, protocol
1524
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001525 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001526 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001527 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001528 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001529
1530 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001531 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001532 except:
1533 transport.close()
1534 raise
1535
Victor Stinneracdb7822014-07-14 18:33:40 +02001536 if self._debug:
1537 logger.debug('Write pipe %r connected: (%r, %r)',
1538 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001539 return transport, protocol
1540
Victor Stinneracdb7822014-07-14 18:33:40 +02001541 def _log_subprocess(self, msg, stdin, stdout, stderr):
1542 info = [msg]
1543 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001544 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001545 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001546 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001547 else:
1548 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001549 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001550 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001551 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001552 logger.debug(' '.join(info))
1553
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001554 async def subprocess_shell(self, protocol_factory, cmd, *,
1555 stdin=subprocess.PIPE,
1556 stdout=subprocess.PIPE,
1557 stderr=subprocess.PIPE,
1558 universal_newlines=False,
1559 shell=True, bufsize=0,
sbstpf0d4c642019-05-27 19:51:19 -04001560 encoding=None, errors=None, text=None,
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001561 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001562 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001563 raise ValueError("cmd must be a string")
1564 if universal_newlines:
1565 raise ValueError("universal_newlines must be False")
1566 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001567 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001568 if bufsize != 0:
1569 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001570 if text:
1571 raise ValueError("text must be False")
1572 if encoding is not None:
1573 raise ValueError("encoding must be None")
1574 if errors is not None:
1575 raise ValueError("errors must be None")
1576
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001577 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001578 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001579 if self._debug:
1580 # don't log parameters: they may contain sensitive information
1581 # (password) and may be too long
1582 debug_log = 'run shell command %r' % cmd
1583 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001584 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001585 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001586 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001587 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001588 return transport, protocol
1589
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001590 async def subprocess_exec(self, protocol_factory, program, *args,
1591 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1592 stderr=subprocess.PIPE, universal_newlines=False,
sbstpf0d4c642019-05-27 19:51:19 -04001593 shell=False, bufsize=0,
1594 encoding=None, errors=None, text=None,
1595 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001596 if universal_newlines:
1597 raise ValueError("universal_newlines must be False")
1598 if shell:
1599 raise ValueError("shell must be False")
1600 if bufsize != 0:
1601 raise ValueError("bufsize must be 0")
sbstpf0d4c642019-05-27 19:51:19 -04001602 if text:
1603 raise ValueError("text must be False")
1604 if encoding is not None:
1605 raise ValueError("encoding must be None")
1606 if errors is not None:
1607 raise ValueError("errors must be None")
1608
Victor Stinner20e07432014-02-11 11:44:56 +01001609 popen_args = (program,) + args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001610 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001611 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001612 if self._debug:
1613 # don't log parameters: they may contain sensitive information
1614 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001615 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001616 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001617 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001618 protocol, popen_args, False, stdin, stdout, stderr,
1619 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001620 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001621 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001622 return transport, protocol
1623
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001624 def get_exception_handler(self):
1625 """Return an exception handler, or None if the default one is in use.
1626 """
1627 return self._exception_handler
1628
Yury Selivanov569efa22014-02-18 18:02:19 -05001629 def set_exception_handler(self, handler):
1630 """Set handler as the new event loop exception handler.
1631
1632 If handler is None, the default exception handler will
1633 be set.
1634
1635 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001636 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001637 will be a reference to the active event loop, 'context'
1638 will be a dict object (see `call_exception_handler()`
1639 documentation for details about context).
1640 """
1641 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001642 raise TypeError(f'A callable object or None is expected, '
1643 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001644 self._exception_handler = handler
1645
1646 def default_exception_handler(self, context):
1647 """Default exception handler.
1648
1649 This is called when an exception occurs and no exception
1650 handler is set, and can be called by a custom exception
1651 handler that wants to defer to the default behavior.
1652
Antoine Pitrou921e9432017-11-07 17:23:29 +01001653 This default handler logs the error message and other
1654 context-dependent information. In debug mode, a truncated
1655 stack trace is also appended showing where the given object
1656 (e.g. a handle or future or task) was created, if any.
1657
Victor Stinneracdb7822014-07-14 18:33:40 +02001658 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001659 `call_exception_handler()`.
1660 """
1661 message = context.get('message')
1662 if not message:
1663 message = 'Unhandled exception in event loop'
1664
1665 exception = context.get('exception')
1666 if exception is not None:
1667 exc_info = (type(exception), exception, exception.__traceback__)
1668 else:
1669 exc_info = False
1670
Yury Selivanov6370f342017-12-10 18:36:12 -05001671 if ('source_traceback' not in context and
1672 self._current_handle is not None and
1673 self._current_handle._source_traceback):
1674 context['handle_traceback'] = \
1675 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001676
Yury Selivanov569efa22014-02-18 18:02:19 -05001677 log_lines = [message]
1678 for key in sorted(context):
1679 if key in {'message', 'exception'}:
1680 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001681 value = context[key]
1682 if key == 'source_traceback':
1683 tb = ''.join(traceback.format_list(value))
1684 value = 'Object created at (most recent call last):\n'
1685 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001686 elif key == 'handle_traceback':
1687 tb = ''.join(traceback.format_list(value))
1688 value = 'Handle created at (most recent call last):\n'
1689 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001690 else:
1691 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001692 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001693
1694 logger.error('\n'.join(log_lines), exc_info=exc_info)
1695
1696 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001697 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001698
Victor Stinneracdb7822014-07-14 18:33:40 +02001699 The context argument is a dict containing the following keys:
1700
Yury Selivanov569efa22014-02-18 18:02:19 -05001701 - 'message': Error message;
1702 - 'exception' (optional): Exception object;
1703 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001704 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001705 - 'handle' (optional): Handle instance;
1706 - 'protocol' (optional): Protocol instance;
1707 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001708 - 'socket' (optional): Socket instance;
1709 - 'asyncgen' (optional): Asynchronous generator that caused
1710 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001711
Victor Stinneracdb7822014-07-14 18:33:40 +02001712 New keys maybe introduced in the future.
1713
1714 Note: do not overload this method in an event loop subclass.
1715 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001716 `set_exception_handler()` method.
1717 """
1718 if self._exception_handler is None:
1719 try:
1720 self.default_exception_handler(context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001721 except (SystemExit, KeyboardInterrupt):
1722 raise
1723 except BaseException:
Yury Selivanov569efa22014-02-18 18:02:19 -05001724 # Second protection layer for unexpected errors
1725 # in the default implementation, as well as for subclassed
1726 # event loops with overloaded "default_exception_handler".
1727 logger.error('Exception in default exception handler',
1728 exc_info=True)
1729 else:
1730 try:
1731 self._exception_handler(self, context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001732 except (SystemExit, KeyboardInterrupt):
1733 raise
1734 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001735 # Exception in the user set custom exception handler.
1736 try:
1737 # Let's try default handler.
1738 self.default_exception_handler({
1739 'message': 'Unhandled error in exception handler',
1740 'exception': exc,
1741 'context': context,
1742 })
Yury Selivanov431b5402019-05-27 14:45:12 +02001743 except (SystemExit, KeyboardInterrupt):
1744 raise
1745 except BaseException:
Victor Stinneracdb7822014-07-14 18:33:40 +02001746 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001747 # overloaded.
1748 logger.error('Exception in default exception handler '
1749 'while handling an unexpected error '
1750 'in custom exception handler',
1751 exc_info=True)
1752
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001753 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001754 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001755 assert isinstance(handle, events.Handle), 'A Handle is required here'
1756 if handle._cancelled:
1757 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001758 assert not isinstance(handle, events.TimerHandle)
1759 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001760
1761 def _add_callback_signalsafe(self, handle):
1762 """Like _add_callback() but called from a signal handler."""
1763 self._add_callback(handle)
1764 self._write_to_self()
1765
Yury Selivanov592ada92014-09-25 12:07:56 -04001766 def _timer_handle_cancelled(self, handle):
1767 """Notification that a TimerHandle has been cancelled."""
1768 if handle._scheduled:
1769 self._timer_cancelled_count += 1
1770
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001771 def _run_once(self):
1772 """Run one full iteration of the event loop.
1773
1774 This calls all currently ready callbacks, polls for I/O,
1775 schedules the resulting callbacks, and finally schedules
1776 'call_later' callbacks.
1777 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001778
Yury Selivanov592ada92014-09-25 12:07:56 -04001779 sched_count = len(self._scheduled)
1780 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1781 self._timer_cancelled_count / sched_count >
1782 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001783 # Remove delayed calls that were cancelled if their number
1784 # is too high
1785 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001786 for handle in self._scheduled:
1787 if handle._cancelled:
1788 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001789 else:
1790 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001791
Victor Stinner68da8fc2014-09-30 18:08:36 +02001792 heapq.heapify(new_scheduled)
1793 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001794 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001795 else:
1796 # Remove delayed calls that were cancelled from head of queue.
1797 while self._scheduled and self._scheduled[0]._cancelled:
1798 self._timer_cancelled_count -= 1
1799 handle = heapq.heappop(self._scheduled)
1800 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001801
1802 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001803 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001804 timeout = 0
1805 elif self._scheduled:
1806 # Compute the desired timeout.
1807 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001808 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001809
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001810 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001811 self._process_events(event_list)
1812
1813 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001814 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001815 while self._scheduled:
1816 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001817 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001818 break
1819 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001820 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001821 self._ready.append(handle)
1822
1823 # This is the only place where callbacks are actually *called*.
1824 # All other places just add them to ready.
1825 # Note: We run all currently scheduled callbacks, but not any
1826 # callbacks scheduled by callbacks run this time around --
1827 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001828 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001829 ntodo = len(self._ready)
1830 for i in range(ntodo):
1831 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001832 if handle._cancelled:
1833 continue
1834 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001835 try:
1836 self._current_handle = handle
1837 t0 = self.time()
1838 handle._run()
1839 dt = self.time() - t0
1840 if dt >= self.slow_callback_duration:
1841 logger.warning('Executing %s took %.3f seconds',
1842 _format_handle(handle), dt)
1843 finally:
1844 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001845 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001846 handle._run()
1847 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001848
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001849 def _set_coroutine_origin_tracking(self, enabled):
1850 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001851 return
1852
Yury Selivanove8944cb2015-05-12 11:43:04 -04001853 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001854 self._coroutine_origin_tracking_saved_depth = (
1855 sys.get_coroutine_origin_tracking_depth())
1856 sys.set_coroutine_origin_tracking_depth(
1857 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001858 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001859 sys.set_coroutine_origin_tracking_depth(
1860 self._coroutine_origin_tracking_saved_depth)
1861
1862 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001863
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001864 def get_debug(self):
1865 return self._debug
1866
1867 def set_debug(self, enabled):
1868 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001869
Yury Selivanove8944cb2015-05-12 11:43:04 -04001870 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001871 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)