blob: ce4f1904f95056b8ad0441b1d9cfa40ce40271a7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
twisteroid ambassador88f07a82019-05-05 19:14:35 +080019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020021import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
Quentin Dawans56065d42019-04-09 15:40:59 +020024import stat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010026import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020028import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010030import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070031import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Yury Selivanovf111b3d2017-12-30 00:35:36 -050033try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
37
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080038from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020039from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070041from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020043from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050044from . import sslproto
twisteroid ambassador88f07a82019-05-05 19:14:35 +080045from . import staggered
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020047from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070048from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
50
Yury Selivanov6370f342017-12-10 18:36:12 -050051__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052
53
Yury Selivanov592ada92014-09-25 12:07:56 -040054# Minimum number of _scheduled timer handles before cleanup of
55# cancelled handles is performed.
56_MIN_SCHEDULED_TIMER_HANDLES = 100
57
58# Minimum fraction of _scheduled timer handles that are cancelled
59# before cleanup of cancelled handles is performed.
60_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070061
Andrew Svetlov0dd71802018-09-12 14:03:54 -070062
Yury Selivanovd904c232018-06-28 21:59:32 -040063_HAS_IPv6 = hasattr(socket, 'AF_INET6')
64
MartinAltmayer944451c2018-07-31 15:06:12 +010065# Maximum timeout passed to select to avoid OS limitations
66MAXIMUM_SELECT_TIMEOUT = 24 * 3600
67
Victor Stinnerc94a93a2016-04-01 21:43:39 +020068
Victor Stinner0e6f52a2014-06-20 17:34:15 +020069def _format_handle(handle):
70 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040071 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020072 # format the task
73 return repr(cb.__self__)
74 else:
75 return str(handle)
76
77
Victor Stinneracdb7822014-07-14 18:33:40 +020078def _format_pipe(fd):
79 if fd == subprocess.PIPE:
80 return '<pipe>'
81 elif fd == subprocess.STDOUT:
82 return '<stdout>'
83 else:
84 return repr(fd)
85
86
Yury Selivanov5587d7c2016-09-15 15:45:07 -040087def _set_reuseport(sock):
88 if not hasattr(socket, 'SO_REUSEPORT'):
89 raise ValueError('reuse_port not supported by socket module')
90 else:
91 try:
92 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
93 except OSError:
94 raise ValueError('reuse_port not supported by socket module, '
95 'SO_REUSEPORT defined but not implemented.')
96
97
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +020098def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -040099 # Try to skip getaddrinfo if "host" is already an IP. Users might have
100 # handled name resolution in their own code and pass in resolved IPs.
101 if not hasattr(socket, 'inet_pton'):
102 return
103
104 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
105 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500106 return None
107
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500108 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500109 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500110 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500111 proto = socket.IPPROTO_UDP
112 else:
113 return None
114
Yury Selivanova7146162016-06-02 16:51:07 -0400115 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400116 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700117 elif isinstance(port, bytes) and port == b'':
118 port = 0
119 elif isinstance(port, str) and port == '':
120 port = 0
121 else:
122 # If port's a service name like "http", don't skip getaddrinfo.
123 try:
124 port = int(port)
125 except (TypeError, ValueError):
126 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400127
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400128 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500129 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400130 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500131 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400132 else:
133 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500134
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400135 if isinstance(host, bytes):
136 host = host.decode('idna')
137 if '%' in host:
138 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
139 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500140 return None
141
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400142 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500143 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400144 socket.inet_pton(af, host)
145 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400146 if _HAS_IPv6 and af == socket.AF_INET6:
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +0200147 return af, type, proto, '', (host, port, flowinfo, scopeid)
Yury Selivanovd904c232018-06-28 21:59:32 -0400148 else:
149 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400150 except OSError:
151 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500152
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400153 # "host" is not an IP address.
154 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500155
156
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800157def _interleave_addrinfos(addrinfos, first_address_family_count=1):
158 """Interleave list of addrinfo tuples by family."""
159 # Group addresses by family
160 addrinfos_by_family = collections.OrderedDict()
161 for addr in addrinfos:
162 family = addr[0]
163 if family not in addrinfos_by_family:
164 addrinfos_by_family[family] = []
165 addrinfos_by_family[family].append(addr)
166 addrinfos_lists = list(addrinfos_by_family.values())
167
168 reordered = []
169 if first_address_family_count > 1:
170 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
171 del addrinfos_lists[0][:first_address_family_count - 1]
172 reordered.extend(
173 a for a in itertools.chain.from_iterable(
174 itertools.zip_longest(*addrinfos_lists)
175 ) if a is not None)
176 return reordered
177
178
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100179def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500180 if not fut.cancelled():
181 exc = fut.exception()
Yury Selivanov431b5402019-05-27 14:45:12 +0200182 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500183 # Issue #22429: run_forever() already finished, no need to
184 # stop it.
185 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500186 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100187
188
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200189if hasattr(socket, 'TCP_NODELAY'):
190 def _set_nodelay(sock):
191 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
192 sock.type == socket.SOCK_STREAM and
193 sock.proto == socket.IPPROTO_TCP):
194 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
195else:
196 def _set_nodelay(sock):
197 pass
198
199
Andrew Svetlov7c684072018-01-27 21:22:47 +0200200class _SendfileFallbackProtocol(protocols.Protocol):
201 def __init__(self, transp):
202 if not isinstance(transp, transports._FlowControlMixin):
203 raise TypeError("transport should be _FlowControlMixin instance")
204 self._transport = transp
205 self._proto = transp.get_protocol()
206 self._should_resume_reading = transp.is_reading()
207 self._should_resume_writing = transp._protocol_paused
208 transp.pause_reading()
209 transp.set_protocol(self)
210 if self._should_resume_writing:
211 self._write_ready_fut = self._transport._loop.create_future()
212 else:
213 self._write_ready_fut = None
214
215 async def drain(self):
216 if self._transport.is_closing():
217 raise ConnectionError("Connection closed by peer")
218 fut = self._write_ready_fut
219 if fut is None:
220 return
221 await fut
222
223 def connection_made(self, transport):
224 raise RuntimeError("Invalid state: "
225 "connection should have been established already.")
226
227 def connection_lost(self, exc):
228 if self._write_ready_fut is not None:
229 # Never happens if peer disconnects after sending the whole content
230 # Thus disconnection is always an exception from user perspective
231 if exc is None:
232 self._write_ready_fut.set_exception(
233 ConnectionError("Connection is closed by peer"))
234 else:
235 self._write_ready_fut.set_exception(exc)
236 self._proto.connection_lost(exc)
237
238 def pause_writing(self):
239 if self._write_ready_fut is not None:
240 return
241 self._write_ready_fut = self._transport._loop.create_future()
242
243 def resume_writing(self):
244 if self._write_ready_fut is None:
245 return
246 self._write_ready_fut.set_result(False)
247 self._write_ready_fut = None
248
249 def data_received(self, data):
250 raise RuntimeError("Invalid state: reading should be paused")
251
252 def eof_received(self):
253 raise RuntimeError("Invalid state: reading should be paused")
254
255 async def restore(self):
256 self._transport.set_protocol(self._proto)
257 if self._should_resume_reading:
258 self._transport.resume_reading()
259 if self._write_ready_fut is not None:
260 # Cancel the future.
261 # Basically it has no effect because protocol is switched back,
262 # no code should wait for it anymore.
263 self._write_ready_fut.cancel()
264 if self._should_resume_writing:
265 self._proto.resume_writing()
266
267
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268class Server(events.AbstractServer):
269
Yury Selivanovc9070d02018-01-25 18:08:09 -0500270 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
271 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200272 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500273 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200274 self._active_count = 0
275 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500276 self._protocol_factory = protocol_factory
277 self._backlog = backlog
278 self._ssl_context = ssl_context
279 self._ssl_handshake_timeout = ssl_handshake_timeout
280 self._serving = False
281 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
Victor Stinnere912e652014-07-12 03:11:53 +0200283 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500284 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200285
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200286 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500287 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200288 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200290 def _detach(self):
291 assert self._active_count > 0
292 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500293 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 self._wakeup()
295
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200297 waiters = self._waiters
298 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 for waiter in waiters:
300 if not waiter.done():
301 waiter.set_result(waiter)
302
Yury Selivanovc9070d02018-01-25 18:08:09 -0500303 def _start_serving(self):
304 if self._serving:
305 return
306 self._serving = True
307 for sock in self._sockets:
308 sock.listen(self._backlog)
309 self._loop._start_serving(
310 self._protocol_factory, sock, self._ssl_context,
311 self, self._backlog, self._ssl_handshake_timeout)
312
313 def get_loop(self):
314 return self._loop
315
316 def is_serving(self):
317 return self._serving
318
319 @property
320 def sockets(self):
321 if self._sockets is None:
322 return []
323 return list(self._sockets)
324
325 def close(self):
326 sockets = self._sockets
327 if sockets is None:
328 return
329 self._sockets = None
330
331 for sock in sockets:
332 self._loop._stop_serving(sock)
333
334 self._serving = False
335
336 if (self._serving_forever_fut is not None and
337 not self._serving_forever_fut.done()):
338 self._serving_forever_fut.cancel()
339 self._serving_forever_fut = None
340
341 if self._active_count == 0:
342 self._wakeup()
343
344 async def start_serving(self):
345 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400346 # Skip one loop iteration so that all 'loop.add_reader'
347 # go through.
348 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500349
350 async def serve_forever(self):
351 if self._serving_forever_fut is not None:
352 raise RuntimeError(
353 f'server {self!r} is already being awaited on serve_forever()')
354 if self._sockets is None:
355 raise RuntimeError(f'server {self!r} is closed')
356
357 self._start_serving()
358 self._serving_forever_fut = self._loop.create_future()
359
360 try:
361 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700362 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500363 try:
364 self.close()
365 await self.wait_closed()
366 finally:
367 raise
368 finally:
369 self._serving_forever_fut = None
370
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200371 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500372 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400374 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200375 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200376 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377
378
379class BaseEventLoop(events.AbstractEventLoop):
380
381 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400382 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200383 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800384 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 self._ready = collections.deque()
386 self._scheduled = []
387 self._default_executor = None
388 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100389 # Identifier of the thread running the event loop, or None if the
390 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100391 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100392 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500393 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800394 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200395 # In debug mode, if the execution of a callback or a step of a task
396 # exceed this duration in seconds, the slow callback/task is logged.
397 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100398 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400399 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800400 self._coroutine_origin_tracking_enabled = False
401 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500403 # A weak set of all asynchronous generators that are
404 # being iterated by the loop.
405 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700406 # Set to True when `loop.shutdown_asyncgens` is called.
407 self._asyncgens_shutdown_called = False
408
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200409 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500410 return (
411 f'<{self.__class__.__name__} running={self.is_running()} '
412 f'closed={self.is_closed()} debug={self.get_debug()}>'
413 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200414
Yury Selivanov7661db62016-05-16 15:38:39 -0400415 def create_future(self):
416 """Create a Future object attached to the loop."""
417 return futures.Future(loop=self)
418
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300419 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200420 """Schedule a coroutine object.
421
Victor Stinneracdb7822014-07-14 18:33:40 +0200422 Return a task object.
423 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100424 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400425 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300426 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400427 if task._source_traceback:
428 del task._source_traceback[-1]
429 else:
430 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300431 tasks._set_task_name(task, name)
432
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200433 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200434
Yury Selivanov740169c2015-05-11 14:23:38 -0400435 def set_task_factory(self, factory):
436 """Set a task factory that will be used by loop.create_task().
437
438 If factory is None the default task factory will be set.
439
440 If factory is a callable, it should have a signature matching
441 '(loop, coro)', where 'loop' will be a reference to the active
442 event loop, 'coro' will be a coroutine object. The callable
443 must return a Future.
444 """
445 if factory is not None and not callable(factory):
446 raise TypeError('task factory must be a callable or None')
447 self._task_factory = factory
448
449 def get_task_factory(self):
450 """Return a task factory, or None if the default one is in use."""
451 return self._task_factory
452
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 def _make_socket_transport(self, sock, protocol, waiter=None, *,
454 extra=None, server=None):
455 """Create socket transport."""
456 raise NotImplementedError
457
Neil Aspinallf7686c12017-12-19 19:45:42 +0000458 def _make_ssl_transport(
459 self, rawsock, protocol, sslcontext, waiter=None,
460 *, server_side=False, server_hostname=None,
461 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500462 ssl_handshake_timeout=None,
463 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464 """Create SSL transport."""
465 raise NotImplementedError
466
467 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200468 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 """Create datagram transport."""
470 raise NotImplementedError
471
472 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
473 extra=None):
474 """Create read pipe transport."""
475 raise NotImplementedError
476
477 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
478 extra=None):
479 """Create write pipe transport."""
480 raise NotImplementedError
481
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200482 async def _make_subprocess_transport(self, protocol, args, shell,
483 stdin, stdout, stderr, bufsize,
484 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 """Create subprocess transport."""
486 raise NotImplementedError
487
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200489 """Write a byte to self-pipe, to wake up the event loop.
490
491 This may be called from a different thread.
492
493 The subclass is responsible for implementing the self-pipe.
494 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 raise NotImplementedError
496
497 def _process_events(self, event_list):
498 """Process selector events."""
499 raise NotImplementedError
500
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200501 def _check_closed(self):
502 if self._closed:
503 raise RuntimeError('Event loop is closed')
504
Yury Selivanoveb636452016-09-08 22:01:51 -0700505 def _asyncgen_finalizer_hook(self, agen):
506 self._asyncgens.discard(agen)
507 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800508 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700509
510 def _asyncgen_firstiter_hook(self, agen):
511 if self._asyncgens_shutdown_called:
512 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500513 f"asynchronous generator {agen!r} was scheduled after "
514 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700515 ResourceWarning, source=self)
516
517 self._asyncgens.add(agen)
518
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200519 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700520 """Shutdown all active asynchronous generators."""
521 self._asyncgens_shutdown_called = True
522
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500523 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400524 # If Python version is <3.6 or we don't have any asynchronous
525 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700526 return
527
528 closing_agens = list(self._asyncgens)
529 self._asyncgens.clear()
530
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200531 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700532 *[ag.aclose() for ag in closing_agens],
533 return_exceptions=True,
534 loop=self)
535
Yury Selivanoveb636452016-09-08 22:01:51 -0700536 for result, agen in zip(results, closing_agens):
537 if isinstance(result, Exception):
538 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500539 'message': f'an error occurred during closing of '
540 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700541 'exception': result,
542 'asyncgen': agen
543 })
544
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 def run_forever(self):
546 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200547 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100548 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400549 raise RuntimeError('This event loop is already running')
550 if events._get_running_loop() is not None:
551 raise RuntimeError(
552 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800553 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100554 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500555
556 old_agen_hooks = sys.get_asyncgen_hooks()
557 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
558 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400560 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800562 self._run_once()
563 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 break
565 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800566 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100567 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400568 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800569 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500570 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571
572 def run_until_complete(self, future):
573 """Run until the Future is done.
574
575 If the argument is a coroutine, it is wrapped in a Task.
576
Victor Stinneracdb7822014-07-14 18:33:40 +0200577 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 with the same coroutine twice -- it would wrap it in two
579 different Tasks and that can't be good.
580
581 Return the Future's result, or raise its exception.
582 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200583 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200584
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700585 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400586 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200587 if new_task:
588 # An exception is raised if the future didn't complete, so there
589 # is no need to log the "destroy pending task" message
590 future._log_destroy_pending = False
591
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100592 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200593 try:
594 self.run_forever()
595 except:
596 if new_task and future.done() and not future.cancelled():
597 # The coroutine raised a BaseException. Consume the exception
598 # to not log a warning, the caller doesn't have access to the
599 # local task.
600 future.exception()
601 raise
jimmylai21b3e042017-05-22 22:32:46 -0700602 finally:
603 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 if not future.done():
605 raise RuntimeError('Event loop stopped before Future completed.')
606
607 return future.result()
608
609 def stop(self):
610 """Stop running the event loop.
611
Guido van Rossum41f69f42015-11-19 13:28:47 -0800612 Every callback already scheduled will still run. This simply informs
613 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800615 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200617 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700618 """Close the event loop.
619
620 This clears the queues and shuts down the executor,
621 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200622
623 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700624 """
Victor Stinner956de692014-12-26 21:07:52 +0100625 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200626 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200627 if self._closed:
628 return
Victor Stinnere912e652014-07-12 03:11:53 +0200629 if self._debug:
630 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400631 self._closed = True
632 self._ready.clear()
633 self._scheduled.clear()
634 executor = self._default_executor
635 if executor is not None:
636 self._default_executor = None
637 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200638
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200639 def is_closed(self):
640 """Returns True if the event loop was closed."""
641 return self._closed
642
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100643 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900644 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100645 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900646 if not self.is_running():
647 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100648
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200650 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100651 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652
653 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200654 """Return the time according to the event loop's clock.
655
656 This is a float expressed in seconds since an epoch, but the
657 epoch, precision, accuracy and drift are unspecified and may
658 differ per event loop.
659 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 return time.monotonic()
661
Yury Selivanovf23746a2018-01-22 19:11:18 -0500662 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663 """Arrange for a callback to be called at a given time.
664
665 Return a Handle: an opaque object with a cancel() method that
666 can be used to cancel the call.
667
668 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200669 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670
671 Each callback will be called exactly once. If two callbacks
672 are scheduled for exactly the same time, it undefined which
673 will be called first.
674
675 Any positional arguments after the callback will be passed to
676 the callback when it is called.
677 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500678 timer = self.call_at(self.time() + delay, callback, *args,
679 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200680 if timer._source_traceback:
681 del timer._source_traceback[-1]
682 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683
Yury Selivanovf23746a2018-01-22 19:11:18 -0500684 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200685 """Like call_later(), but uses an absolute time.
686
687 Absolute time corresponds to the event loop's time() method.
688 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100689 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100690 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100691 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700692 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500693 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200694 if timer._source_traceback:
695 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400697 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 return timer
699
Yury Selivanovf23746a2018-01-22 19:11:18 -0500700 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 """Arrange for a callback to be called as soon as possible.
702
Victor Stinneracdb7822014-07-14 18:33:40 +0200703 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 order in which they are registered. Each callback will be
705 called exactly once.
706
707 Any positional arguments after the callback will be passed to
708 the callback when it is called.
709 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700710 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100711 if self._debug:
712 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700713 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500714 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200715 if handle._source_traceback:
716 del handle._source_traceback[-1]
717 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100718
Yury Selivanov491a9122016-11-03 15:09:24 -0700719 def _check_callback(self, callback, method):
720 if (coroutines.iscoroutine(callback) or
721 coroutines.iscoroutinefunction(callback)):
722 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500723 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700724 if not callable(callback):
725 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500726 f'a callable object was expected by {method}(), '
727 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700728
Yury Selivanovf23746a2018-01-22 19:11:18 -0500729 def _call_soon(self, callback, args, context):
730 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200731 if handle._source_traceback:
732 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700733 self._ready.append(handle)
734 return handle
735
Victor Stinner956de692014-12-26 21:07:52 +0100736 def _check_thread(self):
737 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100738
Victor Stinneracdb7822014-07-14 18:33:40 +0200739 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100740 likely behave incorrectly when the assumption is violated.
741
Victor Stinneracdb7822014-07-14 18:33:40 +0200742 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100743 responsible for checking this condition for performance reasons.
744 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100745 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200746 return
Victor Stinner956de692014-12-26 21:07:52 +0100747 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100748 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100749 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200750 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100751 "than the current one")
752
Yury Selivanovf23746a2018-01-22 19:11:18 -0500753 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200754 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700755 self._check_closed()
756 if self._debug:
757 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500758 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200759 if handle._source_traceback:
760 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 self._write_to_self()
762 return handle
763
Yury Selivanovbec23722018-01-28 14:09:40 -0500764 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100765 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700766 if self._debug:
767 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700768 if executor is None:
769 executor = self._default_executor
770 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400771 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700772 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500773 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500774 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700775
776 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100777 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
778 warnings.warn(
779 'Using the default executor that is not an instance of '
780 'ThreadPoolExecutor is deprecated and will be prohibited '
781 'in Python 3.9',
782 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700783 self._default_executor = executor
784
Victor Stinnere912e652014-07-12 03:11:53 +0200785 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500786 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200787 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500788 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200789 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500790 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200791 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500792 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200793 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500794 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200795 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200796 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200797
798 t0 = self.time()
799 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
800 dt = self.time() - t0
801
Yury Selivanov6370f342017-12-10 18:36:12 -0500802 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200803 if dt >= self.slow_callback_duration:
804 logger.info(msg)
805 else:
806 logger.debug(msg)
807 return addrinfo
808
Yury Selivanov19a44f62017-12-14 20:53:26 -0500809 async def getaddrinfo(self, host, port, *,
810 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400811 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500812 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200813 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500814 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700815
Yury Selivanov19a44f62017-12-14 20:53:26 -0500816 return await self.run_in_executor(
817 None, getaddr_func, host, port, family, type, proto, flags)
818
819 async def getnameinfo(self, sockaddr, flags=0):
820 return await self.run_in_executor(
821 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700822
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200823 async def sock_sendfile(self, sock, file, offset=0, count=None,
824 *, fallback=True):
825 if self._debug and sock.gettimeout() != 0:
826 raise ValueError("the socket must be non-blocking")
827 self._check_sendfile_params(sock, file, offset, count)
828 try:
829 return await self._sock_sendfile_native(sock, file,
830 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700831 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200832 if not fallback:
833 raise
834 return await self._sock_sendfile_fallback(sock, file,
835 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200836
837 async def _sock_sendfile_native(self, sock, file, offset, count):
838 # NB: sendfile syscall is not supported for SSL sockets and
839 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700840 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200841 f"syscall sendfile is not available for socket {sock!r} "
842 "and file {file!r} combination")
843
844 async def _sock_sendfile_fallback(self, sock, file, offset, count):
845 if offset:
846 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400847 blocksize = (
848 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
849 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
850 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200851 buf = bytearray(blocksize)
852 total_sent = 0
853 try:
854 while True:
855 if count:
856 blocksize = min(count - total_sent, blocksize)
857 if blocksize <= 0:
858 break
859 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400860 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200861 if not read:
862 break # EOF
863 await self.sock_sendall(sock, view)
864 total_sent += read
865 return total_sent
866 finally:
867 if total_sent > 0 and hasattr(file, 'seek'):
868 file.seek(offset + total_sent)
869
870 def _check_sendfile_params(self, sock, file, offset, count):
871 if 'b' not in getattr(file, 'mode', 'b'):
872 raise ValueError("file should be opened in binary mode")
873 if not sock.type == socket.SOCK_STREAM:
874 raise ValueError("only SOCK_STREAM type sockets are supported")
875 if count is not None:
876 if not isinstance(count, int):
877 raise TypeError(
878 "count must be a positive integer (got {!r})".format(count))
879 if count <= 0:
880 raise ValueError(
881 "count must be a positive integer (got {!r})".format(count))
882 if not isinstance(offset, int):
883 raise TypeError(
884 "offset must be a non-negative integer (got {!r})".format(
885 offset))
886 if offset < 0:
887 raise ValueError(
888 "offset must be a non-negative integer (got {!r})".format(
889 offset))
890
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800891 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
892 """Create, bind and connect one socket."""
893 my_exceptions = []
894 exceptions.append(my_exceptions)
895 family, type_, proto, _, address = addr_info
896 sock = None
897 try:
898 sock = socket.socket(family=family, type=type_, proto=proto)
899 sock.setblocking(False)
900 if local_addr_infos is not None:
901 for _, _, _, _, laddr in local_addr_infos:
902 try:
903 sock.bind(laddr)
904 break
905 except OSError as exc:
906 msg = (
907 f'error while attempting to bind on '
908 f'address {laddr!r}: '
909 f'{exc.strerror.lower()}'
910 )
911 exc = OSError(exc.errno, msg)
912 my_exceptions.append(exc)
913 else: # all bind attempts failed
914 raise my_exceptions.pop()
915 await self.sock_connect(sock, address)
916 return sock
917 except OSError as exc:
918 my_exceptions.append(exc)
919 if sock is not None:
920 sock.close()
921 raise
922 except:
923 if sock is not None:
924 sock.close()
925 raise
926
Neil Aspinallf7686c12017-12-19 19:45:42 +0000927 async def create_connection(
928 self, protocol_factory, host=None, port=None,
929 *, ssl=None, family=0,
930 proto=0, flags=0, sock=None,
931 local_addr=None, server_hostname=None,
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800932 ssl_handshake_timeout=None,
933 happy_eyeballs_delay=None, interleave=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200934 """Connect to a TCP server.
935
936 Create a streaming transport connection to a given Internet host and
937 port: socket family AF_INET or socket.AF_INET6 depending on host (or
938 family if specified), socket type SOCK_STREAM. protocol_factory must be
939 a callable returning a protocol instance.
940
941 This method is a coroutine which will try to establish the connection
942 in the background. When successful, the coroutine returns a
943 (transport, protocol) pair.
944 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700945 if server_hostname is not None and not ssl:
946 raise ValueError('server_hostname is only meaningful with ssl')
947
948 if server_hostname is None and ssl:
949 # Use host as default for server_hostname. It is an error
950 # if host is empty or not set, e.g. when an
951 # already-connected socket was passed or when only a port
952 # is given. To avoid this error, you can pass
953 # server_hostname='' -- this will bypass the hostname
954 # check. (This also means that if host is a numeric
955 # IP/IPv6 address, we will attempt to verify that exact
956 # address; this will probably fail, but it is possible to
957 # create a certificate for a specific IP address, so we
958 # don't judge it here.)
959 if not host:
960 raise ValueError('You must set server_hostname '
961 'when using ssl without a host')
962 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700963
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200964 if ssl_handshake_timeout is not None and not ssl:
965 raise ValueError(
966 'ssl_handshake_timeout is only meaningful with ssl')
967
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800968 if happy_eyeballs_delay is not None and interleave is None:
969 # If using happy eyeballs, default to interleave addresses by family
970 interleave = 1
971
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700972 if host is not None or port is not None:
973 if sock is not None:
974 raise ValueError(
975 'host/port and sock can not be specified at the same time')
976
Yury Selivanov19a44f62017-12-14 20:53:26 -0500977 infos = await self._ensure_resolved(
978 (host, port), family=family,
979 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700980 if not infos:
981 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500982
983 if local_addr is not None:
984 laddr_infos = await self._ensure_resolved(
985 local_addr, family=family,
986 type=socket.SOCK_STREAM, proto=proto,
987 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700988 if not laddr_infos:
989 raise OSError('getaddrinfo() returned empty list')
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800990 else:
991 laddr_infos = None
992
993 if interleave:
994 infos = _interleave_addrinfos(infos, interleave)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700995
996 exceptions = []
twisteroid ambassador88f07a82019-05-05 19:14:35 +0800997 if happy_eyeballs_delay is None:
998 # not using happy eyeballs
999 for addrinfo in infos:
1000 try:
1001 sock = await self._connect_sock(
1002 exceptions, addrinfo, laddr_infos)
1003 break
1004 except OSError:
1005 continue
1006 else: # using happy eyeballs
1007 sock, _, _ = await staggered.staggered_race(
1008 (functools.partial(self._connect_sock,
1009 exceptions, addrinfo, laddr_infos)
1010 for addrinfo in infos),
1011 happy_eyeballs_delay, loop=self)
1012
1013 if sock is None:
1014 exceptions = [exc for sub in exceptions for exc in sub]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001015 if len(exceptions) == 1:
1016 raise exceptions[0]
1017 else:
1018 # If they all have the same str(), raise one.
1019 model = str(exceptions[0])
1020 if all(str(exc) == model for exc in exceptions):
1021 raise exceptions[0]
1022 # Raise a combined exception so the user can see all
1023 # the various error messages.
1024 raise OSError('Multiple exceptions: {}'.format(
1025 ', '.join(str(exc) for exc in exceptions)))
1026
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001027 else:
1028 if sock is None:
1029 raise ValueError(
1030 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001031 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -05001032 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1033 # are SOCK_STREAM.
1034 # We support passing AF_UNIX sockets even though we have
1035 # a dedicated API for that: create_unix_connection.
1036 # Disallowing AF_UNIX in this method, breaks backwards
1037 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001038 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001039 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001040
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001041 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001042 sock, protocol_factory, ssl, server_hostname,
1043 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +02001044 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +02001045 # Get the socket from the transport because SSL transport closes
1046 # the old socket and creates a new SSL socket
1047 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001048 logger.debug("%r connected to %s:%r: (%r, %r)",
1049 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001050 return transport, protocol
1051
Neil Aspinallf7686c12017-12-19 19:45:42 +00001052 async def _create_connection_transport(
1053 self, sock, protocol_factory, ssl,
1054 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001055 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001056
1057 sock.setblocking(False)
1058
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001059 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001060 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001061 if ssl:
1062 sslcontext = None if isinstance(ssl, bool) else ssl
1063 transport = self._make_ssl_transport(
1064 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001065 server_side=server_side, server_hostname=server_hostname,
1066 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001067 else:
1068 transport = self._make_socket_transport(sock, protocol, waiter)
1069
Victor Stinner29ad0112015-01-15 00:04:21 +01001070 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001071 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001072 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001073 transport.close()
1074 raise
1075
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001076 return transport, protocol
1077
Andrew Svetlov7c684072018-01-27 21:22:47 +02001078 async def sendfile(self, transport, file, offset=0, count=None,
1079 *, fallback=True):
1080 """Send a file to transport.
1081
1082 Return the total number of bytes which were sent.
1083
1084 The method uses high-performance os.sendfile if available.
1085
1086 file must be a regular file object opened in binary mode.
1087
1088 offset tells from where to start reading the file. If specified,
1089 count is the total number of bytes to transmit as opposed to
1090 sending the file until EOF is reached. File position is updated on
1091 return or also in case of error in which case file.tell()
1092 can be used to figure out the number of bytes
1093 which were sent.
1094
1095 fallback set to True makes asyncio to manually read and send
1096 the file when the platform does not support the sendfile syscall
1097 (e.g. Windows or SSL socket on Unix).
1098
1099 Raise SendfileNotAvailableError if the system does not support
1100 sendfile syscall and fallback is False.
1101 """
1102 if transport.is_closing():
1103 raise RuntimeError("Transport is closing")
1104 mode = getattr(transport, '_sendfile_compatible',
1105 constants._SendfileMode.UNSUPPORTED)
1106 if mode is constants._SendfileMode.UNSUPPORTED:
1107 raise RuntimeError(
1108 f"sendfile is not supported for transport {transport!r}")
1109 if mode is constants._SendfileMode.TRY_NATIVE:
1110 try:
1111 return await self._sendfile_native(transport, file,
1112 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001113 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001114 if not fallback:
1115 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001116
1117 if not fallback:
1118 raise RuntimeError(
1119 f"fallback is disabled and native sendfile is not "
1120 f"supported for transport {transport!r}")
1121
Andrew Svetlov7c684072018-01-27 21:22:47 +02001122 return await self._sendfile_fallback(transport, file,
1123 offset, count)
1124
1125 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001126 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001127 "sendfile syscall is not supported")
1128
1129 async def _sendfile_fallback(self, transp, file, offset, count):
1130 if offset:
1131 file.seek(offset)
1132 blocksize = min(count, 16384) if count else 16384
1133 buf = bytearray(blocksize)
1134 total_sent = 0
1135 proto = _SendfileFallbackProtocol(transp)
1136 try:
1137 while True:
1138 if count:
1139 blocksize = min(count - total_sent, blocksize)
1140 if blocksize <= 0:
1141 return total_sent
1142 view = memoryview(buf)[:blocksize]
1143 read = file.readinto(view)
1144 if not read:
1145 return total_sent # EOF
1146 await proto.drain()
1147 transp.write(view)
1148 total_sent += read
1149 finally:
1150 if total_sent > 0 and hasattr(file, 'seek'):
1151 file.seek(offset + total_sent)
1152 await proto.restore()
1153
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001154 async def start_tls(self, transport, protocol, sslcontext, *,
1155 server_side=False,
1156 server_hostname=None,
1157 ssl_handshake_timeout=None):
1158 """Upgrade transport to TLS.
1159
1160 Return a new transport that *protocol* should start using
1161 immediately.
1162 """
1163 if ssl is None:
1164 raise RuntimeError('Python ssl module is not available')
1165
1166 if not isinstance(sslcontext, ssl.SSLContext):
1167 raise TypeError(
1168 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1169 f'got {sslcontext!r}')
1170
1171 if not getattr(transport, '_start_tls_compatible', False):
1172 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001173 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001174
1175 waiter = self.create_future()
1176 ssl_protocol = sslproto.SSLProtocol(
1177 self, protocol, sslcontext, waiter,
1178 server_side, server_hostname,
1179 ssl_handshake_timeout=ssl_handshake_timeout,
1180 call_connection_made=False)
1181
Yury Selivanovf2955872018-05-29 01:00:12 -04001182 # Pause early so that "ssl_protocol.data_received()" doesn't
1183 # have a chance to get called before "ssl_protocol.connection_made()".
1184 transport.pause_reading()
1185
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001186 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001187 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1188 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001189
Yury Selivanov96026432018-06-04 11:32:35 -04001190 try:
1191 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +02001192 except BaseException:
Yury Selivanov96026432018-06-04 11:32:35 -04001193 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001194 conmade_cb.cancel()
1195 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001196 raise
1197
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001198 return ssl_protocol._app_transport
1199
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001200 async def create_datagram_endpoint(self, protocol_factory,
1201 local_addr=None, remote_addr=None, *,
1202 family=0, proto=0, flags=0,
1203 reuse_address=None, reuse_port=None,
1204 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001205 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001206 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001207 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001208 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001209 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001210 if (local_addr or remote_addr or
1211 family or proto or flags or
1212 reuse_address or reuse_port or allow_broadcast):
1213 # show the problematic kwargs in exception msg
1214 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1215 family=family, proto=proto, flags=flags,
1216 reuse_address=reuse_address, reuse_port=reuse_port,
1217 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001218 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001219 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001220 f'socket modifier keyword arguments can not be used '
1221 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001222 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001223 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001224 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001225 if not (local_addr or remote_addr):
1226 if family == 0:
1227 raise ValueError('unexpected address family')
1228 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001229 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1230 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001231 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001232 raise TypeError('string is expected')
Quentin Dawans56065d42019-04-09 15:40:59 +02001233
1234 if local_addr and local_addr[0] not in (0, '\x00'):
1235 try:
1236 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1237 os.remove(local_addr)
1238 except FileNotFoundError:
1239 pass
1240 except OSError as err:
1241 # Directory may have permissions only to create socket.
1242 logger.error('Unable to check or remove stale UNIX '
1243 'socket %r: %r',
1244 local_addr, err)
1245
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001246 addr_pairs_info = (((family, proto),
1247 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001248 else:
1249 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001250 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001251 for idx, addr in ((0, local_addr), (1, remote_addr)):
1252 if addr is not None:
1253 assert isinstance(addr, tuple) and len(addr) == 2, (
1254 '2-tuple is expected')
1255
Yury Selivanov19a44f62017-12-14 20:53:26 -05001256 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001257 addr, family=family, type=socket.SOCK_DGRAM,
1258 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001259 if not infos:
1260 raise OSError('getaddrinfo() returned empty list')
1261
1262 for fam, _, pro, _, address in infos:
1263 key = (fam, pro)
1264 if key not in addr_infos:
1265 addr_infos[key] = [None, None]
1266 addr_infos[key][idx] = address
1267
1268 # each addr has to have info for each (family, proto) pair
1269 addr_pairs_info = [
1270 (key, addr_pair) for key, addr_pair in addr_infos.items()
1271 if not ((local_addr and addr_pair[0] is None) or
1272 (remote_addr and addr_pair[1] is None))]
1273
1274 if not addr_pairs_info:
1275 raise ValueError('can not get address information')
1276
1277 exceptions = []
1278
1279 if reuse_address is None:
1280 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1281
1282 for ((family, proto),
1283 (local_address, remote_address)) in addr_pairs_info:
1284 sock = None
1285 r_addr = None
1286 try:
1287 sock = socket.socket(
1288 family=family, type=socket.SOCK_DGRAM, proto=proto)
1289 if reuse_address:
1290 sock.setsockopt(
1291 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1292 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001293 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001294 if allow_broadcast:
1295 sock.setsockopt(
1296 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1297 sock.setblocking(False)
1298
1299 if local_addr:
1300 sock.bind(local_address)
1301 if remote_addr:
Vincent Michel63deaa52019-05-07 19:18:49 +02001302 if not allow_broadcast:
1303 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001304 r_addr = remote_address
1305 except OSError as exc:
1306 if sock is not None:
1307 sock.close()
1308 exceptions.append(exc)
1309 except:
1310 if sock is not None:
1311 sock.close()
1312 raise
1313 else:
1314 break
1315 else:
1316 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001317
1318 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001319 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001320 transport = self._make_datagram_transport(
1321 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001322 if self._debug:
1323 if local_addr:
1324 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1325 "created: (%r, %r)",
1326 local_addr, remote_addr, transport, protocol)
1327 else:
1328 logger.debug("Datagram endpoint remote_addr=%r created: "
1329 "(%r, %r)",
1330 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001331
1332 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001333 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001334 except:
1335 transport.close()
1336 raise
1337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001338 return transport, protocol
1339
Yury Selivanov19a44f62017-12-14 20:53:26 -05001340 async def _ensure_resolved(self, address, *,
1341 family=0, type=socket.SOCK_STREAM,
1342 proto=0, flags=0, loop):
1343 host, port = address[:2]
Erwan Le Papeac8eb8f2019-05-17 10:28:39 +02001344 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
Yury Selivanov19a44f62017-12-14 20:53:26 -05001345 if info is not None:
1346 # "host" is already a resolved IP.
1347 return [info]
1348 else:
1349 return await loop.getaddrinfo(host, port, family=family, type=type,
1350 proto=proto, flags=flags)
1351
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001352 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001353 infos = await self._ensure_resolved((host, port), family=family,
1354 type=socket.SOCK_STREAM,
1355 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001356 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001357 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001358 return infos
1359
Neil Aspinallf7686c12017-12-19 19:45:42 +00001360 async def create_server(
1361 self, protocol_factory, host=None, port=None,
1362 *,
1363 family=socket.AF_UNSPEC,
1364 flags=socket.AI_PASSIVE,
1365 sock=None,
1366 backlog=100,
1367 ssl=None,
1368 reuse_address=None,
1369 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001370 ssl_handshake_timeout=None,
1371 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001372 """Create a TCP server.
1373
Yury Selivanov6370f342017-12-10 18:36:12 -05001374 The host parameter can be a string, in that case the TCP server is
1375 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001376
1377 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001378 the TCP server is bound to all hosts of the sequence. If a host
1379 appears multiple times (possibly indirectly e.g. when hostnames
1380 resolve to the same IP address), the server is only bound once to that
1381 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001382
Victor Stinneracdb7822014-07-14 18:33:40 +02001383 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001384
1385 This method is a coroutine.
1386 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001387 if isinstance(ssl, bool):
1388 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001389
1390 if ssl_handshake_timeout is not None and ssl is None:
1391 raise ValueError(
1392 'ssl_handshake_timeout is only meaningful with ssl')
1393
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001394 if host is not None or port is not None:
1395 if sock is not None:
1396 raise ValueError(
1397 'host/port and sock can not be specified at the same time')
1398
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001399 if reuse_address is None:
1400 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1401 sockets = []
1402 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001403 hosts = [None]
1404 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001405 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001406 hosts = [host]
1407 else:
1408 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001409
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001410 fs = [self._create_server_getaddrinfo(host, port, family=family,
1411 flags=flags)
1412 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001413 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001414 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001415
1416 completed = False
1417 try:
1418 for res in infos:
1419 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001420 try:
1421 sock = socket.socket(af, socktype, proto)
1422 except socket.error:
1423 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001424 if self._debug:
1425 logger.warning('create_server() failed to create '
1426 'socket.socket(%r, %r, %r)',
1427 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001428 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001429 sockets.append(sock)
1430 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001431 sock.setsockopt(
1432 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1433 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001434 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001435 # Disable IPv4/IPv6 dual stack support (enabled by
1436 # default on Linux) which makes a single socket
1437 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001438 if (_HAS_IPv6 and
1439 af == socket.AF_INET6 and
1440 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001441 sock.setsockopt(socket.IPPROTO_IPV6,
1442 socket.IPV6_V6ONLY,
1443 True)
1444 try:
1445 sock.bind(sa)
1446 except OSError as err:
1447 raise OSError(err.errno, 'error while attempting '
1448 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001449 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001450 completed = True
1451 finally:
1452 if not completed:
1453 for sock in sockets:
1454 sock.close()
1455 else:
1456 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001457 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001458 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001459 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001460 sockets = [sock]
1461
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001462 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001463 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001464
1465 server = Server(self, sockets, protocol_factory,
1466 ssl, backlog, ssl_handshake_timeout)
1467 if start_serving:
1468 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001469 # Skip one loop iteration so that all 'loop.add_reader'
1470 # go through.
1471 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001472
Victor Stinnere912e652014-07-12 03:11:53 +02001473 if self._debug:
1474 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001475 return server
1476
Neil Aspinallf7686c12017-12-19 19:45:42 +00001477 async def connect_accepted_socket(
1478 self, protocol_factory, sock,
1479 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001480 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001481 """Handle an accepted connection.
1482
1483 This is used by servers that accept connections outside of
1484 asyncio but that use asyncio to handle connections.
1485
1486 This method is a coroutine. When completed, the coroutine
1487 returns a (transport, protocol) pair.
1488 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001489 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001490 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001491
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001492 if ssl_handshake_timeout is not None and not ssl:
1493 raise ValueError(
1494 'ssl_handshake_timeout is only meaningful with ssl')
1495
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001496 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001497 sock, protocol_factory, ssl, '', server_side=True,
1498 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001499 if self._debug:
1500 # Get the socket from the transport because SSL transport closes
1501 # the old socket and creates a new SSL socket
1502 sock = transport.get_extra_info('socket')
1503 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1504 return transport, protocol
1505
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001506 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001507 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001508 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001509 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001510
1511 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001512 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001513 except:
1514 transport.close()
1515 raise
1516
Victor Stinneracdb7822014-07-14 18:33:40 +02001517 if self._debug:
1518 logger.debug('Read pipe %r connected: (%r, %r)',
1519 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001520 return transport, protocol
1521
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001522 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001523 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001524 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001525 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001526
1527 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001528 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001529 except:
1530 transport.close()
1531 raise
1532
Victor Stinneracdb7822014-07-14 18:33:40 +02001533 if self._debug:
1534 logger.debug('Write pipe %r connected: (%r, %r)',
1535 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001536 return transport, protocol
1537
Victor Stinneracdb7822014-07-14 18:33:40 +02001538 def _log_subprocess(self, msg, stdin, stdout, stderr):
1539 info = [msg]
1540 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001541 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001542 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001543 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001544 else:
1545 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001546 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001547 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001548 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001549 logger.debug(' '.join(info))
1550
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001551 async def subprocess_shell(self, protocol_factory, cmd, *,
1552 stdin=subprocess.PIPE,
1553 stdout=subprocess.PIPE,
1554 stderr=subprocess.PIPE,
1555 universal_newlines=False,
1556 shell=True, bufsize=0,
1557 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001558 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001559 raise ValueError("cmd must be a string")
1560 if universal_newlines:
1561 raise ValueError("universal_newlines must be False")
1562 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001563 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001564 if bufsize != 0:
1565 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001566 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001567 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001568 if self._debug:
1569 # don't log parameters: they may contain sensitive information
1570 # (password) and may be too long
1571 debug_log = 'run shell command %r' % cmd
1572 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001573 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001574 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001575 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001576 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001577 return transport, protocol
1578
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001579 async def subprocess_exec(self, protocol_factory, program, *args,
1580 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1581 stderr=subprocess.PIPE, universal_newlines=False,
1582 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001583 if universal_newlines:
1584 raise ValueError("universal_newlines must be False")
1585 if shell:
1586 raise ValueError("shell must be False")
1587 if bufsize != 0:
1588 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001589 popen_args = (program,) + args
1590 for arg in popen_args:
1591 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001592 raise TypeError(
1593 f"program arguments must be a bytes or text string, "
1594 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001595 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001596 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001597 if self._debug:
1598 # don't log parameters: they may contain sensitive information
1599 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001600 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001601 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001602 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001603 protocol, popen_args, False, stdin, stdout, stderr,
1604 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001605 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001606 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001607 return transport, protocol
1608
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001609 def get_exception_handler(self):
1610 """Return an exception handler, or None if the default one is in use.
1611 """
1612 return self._exception_handler
1613
Yury Selivanov569efa22014-02-18 18:02:19 -05001614 def set_exception_handler(self, handler):
1615 """Set handler as the new event loop exception handler.
1616
1617 If handler is None, the default exception handler will
1618 be set.
1619
1620 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001621 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001622 will be a reference to the active event loop, 'context'
1623 will be a dict object (see `call_exception_handler()`
1624 documentation for details about context).
1625 """
1626 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001627 raise TypeError(f'A callable object or None is expected, '
1628 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001629 self._exception_handler = handler
1630
1631 def default_exception_handler(self, context):
1632 """Default exception handler.
1633
1634 This is called when an exception occurs and no exception
1635 handler is set, and can be called by a custom exception
1636 handler that wants to defer to the default behavior.
1637
Antoine Pitrou921e9432017-11-07 17:23:29 +01001638 This default handler logs the error message and other
1639 context-dependent information. In debug mode, a truncated
1640 stack trace is also appended showing where the given object
1641 (e.g. a handle or future or task) was created, if any.
1642
Victor Stinneracdb7822014-07-14 18:33:40 +02001643 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001644 `call_exception_handler()`.
1645 """
1646 message = context.get('message')
1647 if not message:
1648 message = 'Unhandled exception in event loop'
1649
1650 exception = context.get('exception')
1651 if exception is not None:
1652 exc_info = (type(exception), exception, exception.__traceback__)
1653 else:
1654 exc_info = False
1655
Yury Selivanov6370f342017-12-10 18:36:12 -05001656 if ('source_traceback' not in context and
1657 self._current_handle is not None and
1658 self._current_handle._source_traceback):
1659 context['handle_traceback'] = \
1660 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001661
Yury Selivanov569efa22014-02-18 18:02:19 -05001662 log_lines = [message]
1663 for key in sorted(context):
1664 if key in {'message', 'exception'}:
1665 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001666 value = context[key]
1667 if key == 'source_traceback':
1668 tb = ''.join(traceback.format_list(value))
1669 value = 'Object created at (most recent call last):\n'
1670 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001671 elif key == 'handle_traceback':
1672 tb = ''.join(traceback.format_list(value))
1673 value = 'Handle created at (most recent call last):\n'
1674 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001675 else:
1676 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001677 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001678
1679 logger.error('\n'.join(log_lines), exc_info=exc_info)
1680
1681 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001682 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001683
Victor Stinneracdb7822014-07-14 18:33:40 +02001684 The context argument is a dict containing the following keys:
1685
Yury Selivanov569efa22014-02-18 18:02:19 -05001686 - 'message': Error message;
1687 - 'exception' (optional): Exception object;
1688 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001689 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001690 - 'handle' (optional): Handle instance;
1691 - 'protocol' (optional): Protocol instance;
1692 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001693 - 'socket' (optional): Socket instance;
1694 - 'asyncgen' (optional): Asynchronous generator that caused
1695 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001696
Victor Stinneracdb7822014-07-14 18:33:40 +02001697 New keys maybe introduced in the future.
1698
1699 Note: do not overload this method in an event loop subclass.
1700 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001701 `set_exception_handler()` method.
1702 """
1703 if self._exception_handler is None:
1704 try:
1705 self.default_exception_handler(context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001706 except (SystemExit, KeyboardInterrupt):
1707 raise
1708 except BaseException:
Yury Selivanov569efa22014-02-18 18:02:19 -05001709 # Second protection layer for unexpected errors
1710 # in the default implementation, as well as for subclassed
1711 # event loops with overloaded "default_exception_handler".
1712 logger.error('Exception in default exception handler',
1713 exc_info=True)
1714 else:
1715 try:
1716 self._exception_handler(self, context)
Yury Selivanov431b5402019-05-27 14:45:12 +02001717 except (SystemExit, KeyboardInterrupt):
1718 raise
1719 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001720 # Exception in the user set custom exception handler.
1721 try:
1722 # Let's try default handler.
1723 self.default_exception_handler({
1724 'message': 'Unhandled error in exception handler',
1725 'exception': exc,
1726 'context': context,
1727 })
Yury Selivanov431b5402019-05-27 14:45:12 +02001728 except (SystemExit, KeyboardInterrupt):
1729 raise
1730 except BaseException:
Victor Stinneracdb7822014-07-14 18:33:40 +02001731 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001732 # overloaded.
1733 logger.error('Exception in default exception handler '
1734 'while handling an unexpected error '
1735 'in custom exception handler',
1736 exc_info=True)
1737
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001738 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001739 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001740 assert isinstance(handle, events.Handle), 'A Handle is required here'
1741 if handle._cancelled:
1742 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001743 assert not isinstance(handle, events.TimerHandle)
1744 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001745
1746 def _add_callback_signalsafe(self, handle):
1747 """Like _add_callback() but called from a signal handler."""
1748 self._add_callback(handle)
1749 self._write_to_self()
1750
Yury Selivanov592ada92014-09-25 12:07:56 -04001751 def _timer_handle_cancelled(self, handle):
1752 """Notification that a TimerHandle has been cancelled."""
1753 if handle._scheduled:
1754 self._timer_cancelled_count += 1
1755
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001756 def _run_once(self):
1757 """Run one full iteration of the event loop.
1758
1759 This calls all currently ready callbacks, polls for I/O,
1760 schedules the resulting callbacks, and finally schedules
1761 'call_later' callbacks.
1762 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001763
Yury Selivanov592ada92014-09-25 12:07:56 -04001764 sched_count = len(self._scheduled)
1765 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1766 self._timer_cancelled_count / sched_count >
1767 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001768 # Remove delayed calls that were cancelled if their number
1769 # is too high
1770 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001771 for handle in self._scheduled:
1772 if handle._cancelled:
1773 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001774 else:
1775 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001776
Victor Stinner68da8fc2014-09-30 18:08:36 +02001777 heapq.heapify(new_scheduled)
1778 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001779 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001780 else:
1781 # Remove delayed calls that were cancelled from head of queue.
1782 while self._scheduled and self._scheduled[0]._cancelled:
1783 self._timer_cancelled_count -= 1
1784 handle = heapq.heappop(self._scheduled)
1785 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001786
1787 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001788 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001789 timeout = 0
1790 elif self._scheduled:
1791 # Compute the desired timeout.
1792 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001793 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001794
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001795 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001796 self._process_events(event_list)
1797
1798 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001799 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001800 while self._scheduled:
1801 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001802 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001803 break
1804 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001805 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001806 self._ready.append(handle)
1807
1808 # This is the only place where callbacks are actually *called*.
1809 # All other places just add them to ready.
1810 # Note: We run all currently scheduled callbacks, but not any
1811 # callbacks scheduled by callbacks run this time around --
1812 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001813 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001814 ntodo = len(self._ready)
1815 for i in range(ntodo):
1816 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001817 if handle._cancelled:
1818 continue
1819 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001820 try:
1821 self._current_handle = handle
1822 t0 = self.time()
1823 handle._run()
1824 dt = self.time() - t0
1825 if dt >= self.slow_callback_duration:
1826 logger.warning('Executing %s took %.3f seconds',
1827 _format_handle(handle), dt)
1828 finally:
1829 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001830 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001831 handle._run()
1832 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001833
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001834 def _set_coroutine_origin_tracking(self, enabled):
1835 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001836 return
1837
Yury Selivanove8944cb2015-05-12 11:43:04 -04001838 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001839 self._coroutine_origin_tracking_saved_depth = (
1840 sys.get_coroutine_origin_tracking_depth())
1841 sys.set_coroutine_origin_tracking_depth(
1842 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001843 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001844 sys.set_coroutine_origin_tracking_depth(
1845 self._coroutine_origin_tracking_saved_depth)
1846
1847 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001848
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001849 def get_debug(self):
1850 return self._debug
1851
1852 def set_debug(self, enabled):
1853 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001854
Yury Selivanove8944cb2015-05-12 11:43:04 -04001855 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001856 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)