blob: 9b4b846131de10e5299e231aecdab475cf2a8672 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020021import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022import socket
Quentin Dawans56065d42019-04-09 15:40:59 +020023import stat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070030import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Yury Selivanovf111b3d2017-12-30 00:35:36 -050032try:
33 import ssl
34except ImportError: # pragma: no cover
35 ssl = None
36
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080037from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020038from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070040from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020042from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050043from . import sslproto
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020045from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070046from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48
Yury Selivanov6370f342017-12-10 18:36:12 -050049__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov592ada92014-09-25 12:07:56 -040052# Minimum number of _scheduled timer handles before cleanup of
53# cancelled handles is performed.
54_MIN_SCHEDULED_TIMER_HANDLES = 100
55
56# Minimum fraction of _scheduled timer handles that are cancelled
57# before cleanup of cancelled handles is performed.
58_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
Victor Stinnerc94a93a2016-04-01 21:43:39 +020060# Exceptions which must not call the exception handler in fatal error
61# methods (_fatal_error())
62_FATAL_ERROR_IGNORE = (BrokenPipeError,
63 ConnectionResetError, ConnectionAbortedError)
64
Andrew Svetlov0dd71802018-09-12 14:03:54 -070065if ssl is not None:
66 _FATAL_ERROR_IGNORE = _FATAL_ERROR_IGNORE + (ssl.SSLCertVerificationError,)
67
Yury Selivanovd904c232018-06-28 21:59:32 -040068_HAS_IPv6 = hasattr(socket, 'AF_INET6')
69
MartinAltmayer944451c2018-07-31 15:06:12 +010070# Maximum timeout passed to select to avoid OS limitations
71MAXIMUM_SELECT_TIMEOUT = 24 * 3600
72
Victor Stinnerc94a93a2016-04-01 21:43:39 +020073
Victor Stinner0e6f52a2014-06-20 17:34:15 +020074def _format_handle(handle):
75 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040076 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020077 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
81
82
Victor Stinneracdb7822014-07-14 18:33:40 +020083def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
90
91
Yury Selivanov5587d7c2016-09-15 15:45:07 -040092def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
101
102
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500103def _ipaddr_info(host, port, family, type, proto):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
108
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500111 return None
112
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500113 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500114 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500115 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500116 proto = socket.IPPROTO_UDP
117 else:
118 return None
119
Yury Selivanova7146162016-06-02 16:51:07 -0400120 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400121 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400132
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500134 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400135 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500136 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400137 else:
138 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500145 return None
146
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400147 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 socket.inet_pton(af, host)
150 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400151 if _HAS_IPv6 and af == socket.AF_INET6:
152 return af, type, proto, '', (host, port, 0, 0)
153 else:
154 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400155 except OSError:
156 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500157
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400158 # "host" is not an IP address.
159 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500160
161
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100162def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500163 if not fut.cancelled():
164 exc = fut.exception()
165 if isinstance(exc, BaseException) and not isinstance(exc, Exception):
166 # Issue #22429: run_forever() already finished, no need to
167 # stop it.
168 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500169 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100170
171
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200172if hasattr(socket, 'TCP_NODELAY'):
173 def _set_nodelay(sock):
174 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
175 sock.type == socket.SOCK_STREAM and
176 sock.proto == socket.IPPROTO_TCP):
177 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
178else:
179 def _set_nodelay(sock):
180 pass
181
182
Andrew Svetlov7c684072018-01-27 21:22:47 +0200183class _SendfileFallbackProtocol(protocols.Protocol):
184 def __init__(self, transp):
185 if not isinstance(transp, transports._FlowControlMixin):
186 raise TypeError("transport should be _FlowControlMixin instance")
187 self._transport = transp
188 self._proto = transp.get_protocol()
189 self._should_resume_reading = transp.is_reading()
190 self._should_resume_writing = transp._protocol_paused
191 transp.pause_reading()
192 transp.set_protocol(self)
193 if self._should_resume_writing:
194 self._write_ready_fut = self._transport._loop.create_future()
195 else:
196 self._write_ready_fut = None
197
198 async def drain(self):
199 if self._transport.is_closing():
200 raise ConnectionError("Connection closed by peer")
201 fut = self._write_ready_fut
202 if fut is None:
203 return
204 await fut
205
206 def connection_made(self, transport):
207 raise RuntimeError("Invalid state: "
208 "connection should have been established already.")
209
210 def connection_lost(self, exc):
211 if self._write_ready_fut is not None:
212 # Never happens if peer disconnects after sending the whole content
213 # Thus disconnection is always an exception from user perspective
214 if exc is None:
215 self._write_ready_fut.set_exception(
216 ConnectionError("Connection is closed by peer"))
217 else:
218 self._write_ready_fut.set_exception(exc)
219 self._proto.connection_lost(exc)
220
221 def pause_writing(self):
222 if self._write_ready_fut is not None:
223 return
224 self._write_ready_fut = self._transport._loop.create_future()
225
226 def resume_writing(self):
227 if self._write_ready_fut is None:
228 return
229 self._write_ready_fut.set_result(False)
230 self._write_ready_fut = None
231
232 def data_received(self, data):
233 raise RuntimeError("Invalid state: reading should be paused")
234
235 def eof_received(self):
236 raise RuntimeError("Invalid state: reading should be paused")
237
238 async def restore(self):
239 self._transport.set_protocol(self._proto)
240 if self._should_resume_reading:
241 self._transport.resume_reading()
242 if self._write_ready_fut is not None:
243 # Cancel the future.
244 # Basically it has no effect because protocol is switched back,
245 # no code should wait for it anymore.
246 self._write_ready_fut.cancel()
247 if self._should_resume_writing:
248 self._proto.resume_writing()
249
250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251class Server(events.AbstractServer):
252
Yury Selivanovc9070d02018-01-25 18:08:09 -0500253 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
254 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200255 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500256 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200257 self._active_count = 0
258 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500259 self._protocol_factory = protocol_factory
260 self._backlog = backlog
261 self._ssl_context = ssl_context
262 self._ssl_handshake_timeout = ssl_handshake_timeout
263 self._serving = False
264 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
Victor Stinnere912e652014-07-12 03:11:53 +0200266 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500267 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200268
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200269 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500270 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200271 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200273 def _detach(self):
274 assert self._active_count > 0
275 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500276 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 self._wakeup()
278
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200280 waiters = self._waiters
281 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 for waiter in waiters:
283 if not waiter.done():
284 waiter.set_result(waiter)
285
Yury Selivanovc9070d02018-01-25 18:08:09 -0500286 def _start_serving(self):
287 if self._serving:
288 return
289 self._serving = True
290 for sock in self._sockets:
291 sock.listen(self._backlog)
292 self._loop._start_serving(
293 self._protocol_factory, sock, self._ssl_context,
294 self, self._backlog, self._ssl_handshake_timeout)
295
296 def get_loop(self):
297 return self._loop
298
299 def is_serving(self):
300 return self._serving
301
302 @property
303 def sockets(self):
304 if self._sockets is None:
305 return []
306 return list(self._sockets)
307
308 def close(self):
309 sockets = self._sockets
310 if sockets is None:
311 return
312 self._sockets = None
313
314 for sock in sockets:
315 self._loop._stop_serving(sock)
316
317 self._serving = False
318
319 if (self._serving_forever_fut is not None and
320 not self._serving_forever_fut.done()):
321 self._serving_forever_fut.cancel()
322 self._serving_forever_fut = None
323
324 if self._active_count == 0:
325 self._wakeup()
326
327 async def start_serving(self):
328 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400329 # Skip one loop iteration so that all 'loop.add_reader'
330 # go through.
331 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500332
333 async def serve_forever(self):
334 if self._serving_forever_fut is not None:
335 raise RuntimeError(
336 f'server {self!r} is already being awaited on serve_forever()')
337 if self._sockets is None:
338 raise RuntimeError(f'server {self!r} is closed')
339
340 self._start_serving()
341 self._serving_forever_fut = self._loop.create_future()
342
343 try:
344 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700345 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500346 try:
347 self.close()
348 await self.wait_closed()
349 finally:
350 raise
351 finally:
352 self._serving_forever_fut = None
353
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200354 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500355 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400357 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200358 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200359 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
361
362class BaseEventLoop(events.AbstractEventLoop):
363
364 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400365 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200366 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800367 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self._ready = collections.deque()
369 self._scheduled = []
370 self._default_executor = None
371 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100372 # Identifier of the thread running the event loop, or None if the
373 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100374 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100375 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500376 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800377 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200378 # In debug mode, if the execution of a callback or a step of a task
379 # exceed this duration in seconds, the slow callback/task is logged.
380 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100381 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400382 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800383 self._coroutine_origin_tracking_enabled = False
384 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500386 # A weak set of all asynchronous generators that are
387 # being iterated by the loop.
388 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700389 # Set to True when `loop.shutdown_asyncgens` is called.
390 self._asyncgens_shutdown_called = False
391
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200392 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500393 return (
394 f'<{self.__class__.__name__} running={self.is_running()} '
395 f'closed={self.is_closed()} debug={self.get_debug()}>'
396 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200397
Yury Selivanov7661db62016-05-16 15:38:39 -0400398 def create_future(self):
399 """Create a Future object attached to the loop."""
400 return futures.Future(loop=self)
401
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300402 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200403 """Schedule a coroutine object.
404
Victor Stinneracdb7822014-07-14 18:33:40 +0200405 Return a task object.
406 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100407 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400408 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300409 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400410 if task._source_traceback:
411 del task._source_traceback[-1]
412 else:
413 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300414 tasks._set_task_name(task, name)
415
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200416 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200417
Yury Selivanov740169c2015-05-11 14:23:38 -0400418 def set_task_factory(self, factory):
419 """Set a task factory that will be used by loop.create_task().
420
421 If factory is None the default task factory will be set.
422
423 If factory is a callable, it should have a signature matching
424 '(loop, coro)', where 'loop' will be a reference to the active
425 event loop, 'coro' will be a coroutine object. The callable
426 must return a Future.
427 """
428 if factory is not None and not callable(factory):
429 raise TypeError('task factory must be a callable or None')
430 self._task_factory = factory
431
432 def get_task_factory(self):
433 """Return a task factory, or None if the default one is in use."""
434 return self._task_factory
435
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 def _make_socket_transport(self, sock, protocol, waiter=None, *,
437 extra=None, server=None):
438 """Create socket transport."""
439 raise NotImplementedError
440
Neil Aspinallf7686c12017-12-19 19:45:42 +0000441 def _make_ssl_transport(
442 self, rawsock, protocol, sslcontext, waiter=None,
443 *, server_side=False, server_hostname=None,
444 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500445 ssl_handshake_timeout=None,
446 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 """Create SSL transport."""
448 raise NotImplementedError
449
450 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200451 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 """Create datagram transport."""
453 raise NotImplementedError
454
455 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
456 extra=None):
457 """Create read pipe transport."""
458 raise NotImplementedError
459
460 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
461 extra=None):
462 """Create write pipe transport."""
463 raise NotImplementedError
464
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200465 async def _make_subprocess_transport(self, protocol, args, shell,
466 stdin, stdout, stderr, bufsize,
467 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 """Create subprocess transport."""
469 raise NotImplementedError
470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200472 """Write a byte to self-pipe, to wake up the event loop.
473
474 This may be called from a different thread.
475
476 The subclass is responsible for implementing the self-pipe.
477 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478 raise NotImplementedError
479
480 def _process_events(self, event_list):
481 """Process selector events."""
482 raise NotImplementedError
483
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200484 def _check_closed(self):
485 if self._closed:
486 raise RuntimeError('Event loop is closed')
487
Yury Selivanoveb636452016-09-08 22:01:51 -0700488 def _asyncgen_finalizer_hook(self, agen):
489 self._asyncgens.discard(agen)
490 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800491 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700492
493 def _asyncgen_firstiter_hook(self, agen):
494 if self._asyncgens_shutdown_called:
495 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500496 f"asynchronous generator {agen!r} was scheduled after "
497 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700498 ResourceWarning, source=self)
499
500 self._asyncgens.add(agen)
501
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200502 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700503 """Shutdown all active asynchronous generators."""
504 self._asyncgens_shutdown_called = True
505
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500506 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400507 # If Python version is <3.6 or we don't have any asynchronous
508 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700509 return
510
511 closing_agens = list(self._asyncgens)
512 self._asyncgens.clear()
513
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200514 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700515 *[ag.aclose() for ag in closing_agens],
516 return_exceptions=True,
517 loop=self)
518
Yury Selivanoveb636452016-09-08 22:01:51 -0700519 for result, agen in zip(results, closing_agens):
520 if isinstance(result, Exception):
521 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500522 'message': f'an error occurred during closing of '
523 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700524 'exception': result,
525 'asyncgen': agen
526 })
527
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 def run_forever(self):
529 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200530 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100531 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400532 raise RuntimeError('This event loop is already running')
533 if events._get_running_loop() is not None:
534 raise RuntimeError(
535 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800536 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100537 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500538
539 old_agen_hooks = sys.get_asyncgen_hooks()
540 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
541 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400543 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800545 self._run_once()
546 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547 break
548 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800549 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100550 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400551 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800552 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500553 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554
555 def run_until_complete(self, future):
556 """Run until the Future is done.
557
558 If the argument is a coroutine, it is wrapped in a Task.
559
Victor Stinneracdb7822014-07-14 18:33:40 +0200560 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 with the same coroutine twice -- it would wrap it in two
562 different Tasks and that can't be good.
563
564 Return the Future's result, or raise its exception.
565 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200566 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200567
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700568 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400569 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200570 if new_task:
571 # An exception is raised if the future didn't complete, so there
572 # is no need to log the "destroy pending task" message
573 future._log_destroy_pending = False
574
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100575 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200576 try:
577 self.run_forever()
578 except:
579 if new_task and future.done() and not future.cancelled():
580 # The coroutine raised a BaseException. Consume the exception
581 # to not log a warning, the caller doesn't have access to the
582 # local task.
583 future.exception()
584 raise
jimmylai21b3e042017-05-22 22:32:46 -0700585 finally:
586 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 if not future.done():
588 raise RuntimeError('Event loop stopped before Future completed.')
589
590 return future.result()
591
592 def stop(self):
593 """Stop running the event loop.
594
Guido van Rossum41f69f42015-11-19 13:28:47 -0800595 Every callback already scheduled will still run. This simply informs
596 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800598 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200600 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700601 """Close the event loop.
602
603 This clears the queues and shuts down the executor,
604 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200605
606 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700607 """
Victor Stinner956de692014-12-26 21:07:52 +0100608 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200609 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200610 if self._closed:
611 return
Victor Stinnere912e652014-07-12 03:11:53 +0200612 if self._debug:
613 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400614 self._closed = True
615 self._ready.clear()
616 self._scheduled.clear()
617 executor = self._default_executor
618 if executor is not None:
619 self._default_executor = None
620 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200621
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200622 def is_closed(self):
623 """Returns True if the event loop was closed."""
624 return self._closed
625
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100626 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900627 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100628 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900629 if not self.is_running():
630 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100631
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200633 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100634 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635
636 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200637 """Return the time according to the event loop's clock.
638
639 This is a float expressed in seconds since an epoch, but the
640 epoch, precision, accuracy and drift are unspecified and may
641 differ per event loop.
642 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 return time.monotonic()
644
Yury Selivanovf23746a2018-01-22 19:11:18 -0500645 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 """Arrange for a callback to be called at a given time.
647
648 Return a Handle: an opaque object with a cancel() method that
649 can be used to cancel the call.
650
651 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200652 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653
654 Each callback will be called exactly once. If two callbacks
655 are scheduled for exactly the same time, it undefined which
656 will be called first.
657
658 Any positional arguments after the callback will be passed to
659 the callback when it is called.
660 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500661 timer = self.call_at(self.time() + delay, callback, *args,
662 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200663 if timer._source_traceback:
664 del timer._source_traceback[-1]
665 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666
Yury Selivanovf23746a2018-01-22 19:11:18 -0500667 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200668 """Like call_later(), but uses an absolute time.
669
670 Absolute time corresponds to the event loop's time() method.
671 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100672 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100673 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100674 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700675 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500676 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200677 if timer._source_traceback:
678 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400680 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 return timer
682
Yury Selivanovf23746a2018-01-22 19:11:18 -0500683 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 """Arrange for a callback to be called as soon as possible.
685
Victor Stinneracdb7822014-07-14 18:33:40 +0200686 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 order in which they are registered. Each callback will be
688 called exactly once.
689
690 Any positional arguments after the callback will be passed to
691 the callback when it is called.
692 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700693 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100694 if self._debug:
695 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700696 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500697 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200698 if handle._source_traceback:
699 del handle._source_traceback[-1]
700 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100701
Yury Selivanov491a9122016-11-03 15:09:24 -0700702 def _check_callback(self, callback, method):
703 if (coroutines.iscoroutine(callback) or
704 coroutines.iscoroutinefunction(callback)):
705 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500706 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700707 if not callable(callback):
708 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500709 f'a callable object was expected by {method}(), '
710 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700711
Yury Selivanovf23746a2018-01-22 19:11:18 -0500712 def _call_soon(self, callback, args, context):
713 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200714 if handle._source_traceback:
715 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716 self._ready.append(handle)
717 return handle
718
Victor Stinner956de692014-12-26 21:07:52 +0100719 def _check_thread(self):
720 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100721
Victor Stinneracdb7822014-07-14 18:33:40 +0200722 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100723 likely behave incorrectly when the assumption is violated.
724
Victor Stinneracdb7822014-07-14 18:33:40 +0200725 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100726 responsible for checking this condition for performance reasons.
727 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100728 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200729 return
Victor Stinner956de692014-12-26 21:07:52 +0100730 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100731 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100732 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200733 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100734 "than the current one")
735
Yury Selivanovf23746a2018-01-22 19:11:18 -0500736 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200737 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700738 self._check_closed()
739 if self._debug:
740 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500741 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200742 if handle._source_traceback:
743 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700744 self._write_to_self()
745 return handle
746
Yury Selivanovbec23722018-01-28 14:09:40 -0500747 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100748 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700749 if self._debug:
750 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751 if executor is None:
752 executor = self._default_executor
753 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400754 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700755 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500756 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500757 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700758
759 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100760 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
761 warnings.warn(
762 'Using the default executor that is not an instance of '
763 'ThreadPoolExecutor is deprecated and will be prohibited '
764 'in Python 3.9',
765 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700766 self._default_executor = executor
767
Victor Stinnere912e652014-07-12 03:11:53 +0200768 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500769 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200770 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500771 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200772 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500773 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200774 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500775 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200776 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500777 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200778 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200779 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200780
781 t0 = self.time()
782 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
783 dt = self.time() - t0
784
Yury Selivanov6370f342017-12-10 18:36:12 -0500785 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200786 if dt >= self.slow_callback_duration:
787 logger.info(msg)
788 else:
789 logger.debug(msg)
790 return addrinfo
791
Yury Selivanov19a44f62017-12-14 20:53:26 -0500792 async def getaddrinfo(self, host, port, *,
793 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400794 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500795 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200796 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500797 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798
Yury Selivanov19a44f62017-12-14 20:53:26 -0500799 return await self.run_in_executor(
800 None, getaddr_func, host, port, family, type, proto, flags)
801
802 async def getnameinfo(self, sockaddr, flags=0):
803 return await self.run_in_executor(
804 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700805
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200806 async def sock_sendfile(self, sock, file, offset=0, count=None,
807 *, fallback=True):
808 if self._debug and sock.gettimeout() != 0:
809 raise ValueError("the socket must be non-blocking")
810 self._check_sendfile_params(sock, file, offset, count)
811 try:
812 return await self._sock_sendfile_native(sock, file,
813 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700814 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200815 if not fallback:
816 raise
817 return await self._sock_sendfile_fallback(sock, file,
818 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200819
820 async def _sock_sendfile_native(self, sock, file, offset, count):
821 # NB: sendfile syscall is not supported for SSL sockets and
822 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700823 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200824 f"syscall sendfile is not available for socket {sock!r} "
825 "and file {file!r} combination")
826
827 async def _sock_sendfile_fallback(self, sock, file, offset, count):
828 if offset:
829 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400830 blocksize = (
831 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
832 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
833 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200834 buf = bytearray(blocksize)
835 total_sent = 0
836 try:
837 while True:
838 if count:
839 blocksize = min(count - total_sent, blocksize)
840 if blocksize <= 0:
841 break
842 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400843 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200844 if not read:
845 break # EOF
846 await self.sock_sendall(sock, view)
847 total_sent += read
848 return total_sent
849 finally:
850 if total_sent > 0 and hasattr(file, 'seek'):
851 file.seek(offset + total_sent)
852
853 def _check_sendfile_params(self, sock, file, offset, count):
854 if 'b' not in getattr(file, 'mode', 'b'):
855 raise ValueError("file should be opened in binary mode")
856 if not sock.type == socket.SOCK_STREAM:
857 raise ValueError("only SOCK_STREAM type sockets are supported")
858 if count is not None:
859 if not isinstance(count, int):
860 raise TypeError(
861 "count must be a positive integer (got {!r})".format(count))
862 if count <= 0:
863 raise ValueError(
864 "count must be a positive integer (got {!r})".format(count))
865 if not isinstance(offset, int):
866 raise TypeError(
867 "offset must be a non-negative integer (got {!r})".format(
868 offset))
869 if offset < 0:
870 raise ValueError(
871 "offset must be a non-negative integer (got {!r})".format(
872 offset))
873
Neil Aspinallf7686c12017-12-19 19:45:42 +0000874 async def create_connection(
875 self, protocol_factory, host=None, port=None,
876 *, ssl=None, family=0,
877 proto=0, flags=0, sock=None,
878 local_addr=None, server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200879 ssl_handshake_timeout=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200880 """Connect to a TCP server.
881
882 Create a streaming transport connection to a given Internet host and
883 port: socket family AF_INET or socket.AF_INET6 depending on host (or
884 family if specified), socket type SOCK_STREAM. protocol_factory must be
885 a callable returning a protocol instance.
886
887 This method is a coroutine which will try to establish the connection
888 in the background. When successful, the coroutine returns a
889 (transport, protocol) pair.
890 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700891 if server_hostname is not None and not ssl:
892 raise ValueError('server_hostname is only meaningful with ssl')
893
894 if server_hostname is None and ssl:
895 # Use host as default for server_hostname. It is an error
896 # if host is empty or not set, e.g. when an
897 # already-connected socket was passed or when only a port
898 # is given. To avoid this error, you can pass
899 # server_hostname='' -- this will bypass the hostname
900 # check. (This also means that if host is a numeric
901 # IP/IPv6 address, we will attempt to verify that exact
902 # address; this will probably fail, but it is possible to
903 # create a certificate for a specific IP address, so we
904 # don't judge it here.)
905 if not host:
906 raise ValueError('You must set server_hostname '
907 'when using ssl without a host')
908 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700909
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200910 if ssl_handshake_timeout is not None and not ssl:
911 raise ValueError(
912 'ssl_handshake_timeout is only meaningful with ssl')
913
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700914 if host is not None or port is not None:
915 if sock is not None:
916 raise ValueError(
917 'host/port and sock can not be specified at the same time')
918
Yury Selivanov19a44f62017-12-14 20:53:26 -0500919 infos = await self._ensure_resolved(
920 (host, port), family=family,
921 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700922 if not infos:
923 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500924
925 if local_addr is not None:
926 laddr_infos = await self._ensure_resolved(
927 local_addr, family=family,
928 type=socket.SOCK_STREAM, proto=proto,
929 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700930 if not laddr_infos:
931 raise OSError('getaddrinfo() returned empty list')
932
933 exceptions = []
934 for family, type, proto, cname, address in infos:
935 try:
936 sock = socket.socket(family=family, type=type, proto=proto)
937 sock.setblocking(False)
Yury Selivanov19a44f62017-12-14 20:53:26 -0500938 if local_addr is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700939 for _, _, _, _, laddr in laddr_infos:
940 try:
941 sock.bind(laddr)
942 break
943 except OSError as exc:
Yury Selivanov6370f342017-12-10 18:36:12 -0500944 msg = (
945 f'error while attempting to bind on '
946 f'address {laddr!r}: '
947 f'{exc.strerror.lower()}'
948 )
949 exc = OSError(exc.errno, msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700950 exceptions.append(exc)
951 else:
952 sock.close()
953 sock = None
954 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200955 if self._debug:
956 logger.debug("connect %r to %r", sock, address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200957 await self.sock_connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700958 except OSError as exc:
959 if sock is not None:
960 sock.close()
961 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200962 except:
963 if sock is not None:
964 sock.close()
965 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700966 else:
967 break
968 else:
969 if len(exceptions) == 1:
970 raise exceptions[0]
971 else:
972 # If they all have the same str(), raise one.
973 model = str(exceptions[0])
974 if all(str(exc) == model for exc in exceptions):
975 raise exceptions[0]
976 # Raise a combined exception so the user can see all
977 # the various error messages.
978 raise OSError('Multiple exceptions: {}'.format(
979 ', '.join(str(exc) for exc in exceptions)))
980
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500981 else:
982 if sock is None:
983 raise ValueError(
984 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500985 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -0500986 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
987 # are SOCK_STREAM.
988 # We support passing AF_UNIX sockets even though we have
989 # a dedicated API for that: create_unix_connection.
990 # Disallowing AF_UNIX in this method, breaks backwards
991 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500992 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500993 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700994
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200995 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000996 sock, protocol_factory, ssl, server_hostname,
997 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +0200998 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200999 # Get the socket from the transport because SSL transport closes
1000 # the old socket and creates a new SSL socket
1001 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001002 logger.debug("%r connected to %s:%r: (%r, %r)",
1003 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001004 return transport, protocol
1005
Neil Aspinallf7686c12017-12-19 19:45:42 +00001006 async def _create_connection_transport(
1007 self, sock, protocol_factory, ssl,
1008 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001009 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001010
1011 sock.setblocking(False)
1012
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001013 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001014 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001015 if ssl:
1016 sslcontext = None if isinstance(ssl, bool) else ssl
1017 transport = self._make_ssl_transport(
1018 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001019 server_side=server_side, server_hostname=server_hostname,
1020 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001021 else:
1022 transport = self._make_socket_transport(sock, protocol, waiter)
1023
Victor Stinner29ad0112015-01-15 00:04:21 +01001024 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001025 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001026 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001027 transport.close()
1028 raise
1029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001030 return transport, protocol
1031
Andrew Svetlov7c684072018-01-27 21:22:47 +02001032 async def sendfile(self, transport, file, offset=0, count=None,
1033 *, fallback=True):
1034 """Send a file to transport.
1035
1036 Return the total number of bytes which were sent.
1037
1038 The method uses high-performance os.sendfile if available.
1039
1040 file must be a regular file object opened in binary mode.
1041
1042 offset tells from where to start reading the file. If specified,
1043 count is the total number of bytes to transmit as opposed to
1044 sending the file until EOF is reached. File position is updated on
1045 return or also in case of error in which case file.tell()
1046 can be used to figure out the number of bytes
1047 which were sent.
1048
1049 fallback set to True makes asyncio to manually read and send
1050 the file when the platform does not support the sendfile syscall
1051 (e.g. Windows or SSL socket on Unix).
1052
1053 Raise SendfileNotAvailableError if the system does not support
1054 sendfile syscall and fallback is False.
1055 """
1056 if transport.is_closing():
1057 raise RuntimeError("Transport is closing")
1058 mode = getattr(transport, '_sendfile_compatible',
1059 constants._SendfileMode.UNSUPPORTED)
1060 if mode is constants._SendfileMode.UNSUPPORTED:
1061 raise RuntimeError(
1062 f"sendfile is not supported for transport {transport!r}")
1063 if mode is constants._SendfileMode.TRY_NATIVE:
1064 try:
1065 return await self._sendfile_native(transport, file,
1066 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001067 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001068 if not fallback:
1069 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001070
1071 if not fallback:
1072 raise RuntimeError(
1073 f"fallback is disabled and native sendfile is not "
1074 f"supported for transport {transport!r}")
1075
Andrew Svetlov7c684072018-01-27 21:22:47 +02001076 return await self._sendfile_fallback(transport, file,
1077 offset, count)
1078
1079 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001080 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001081 "sendfile syscall is not supported")
1082
1083 async def _sendfile_fallback(self, transp, file, offset, count):
1084 if offset:
1085 file.seek(offset)
1086 blocksize = min(count, 16384) if count else 16384
1087 buf = bytearray(blocksize)
1088 total_sent = 0
1089 proto = _SendfileFallbackProtocol(transp)
1090 try:
1091 while True:
1092 if count:
1093 blocksize = min(count - total_sent, blocksize)
1094 if blocksize <= 0:
1095 return total_sent
1096 view = memoryview(buf)[:blocksize]
1097 read = file.readinto(view)
1098 if not read:
1099 return total_sent # EOF
1100 await proto.drain()
1101 transp.write(view)
1102 total_sent += read
1103 finally:
1104 if total_sent > 0 and hasattr(file, 'seek'):
1105 file.seek(offset + total_sent)
1106 await proto.restore()
1107
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001108 async def start_tls(self, transport, protocol, sslcontext, *,
1109 server_side=False,
1110 server_hostname=None,
1111 ssl_handshake_timeout=None):
1112 """Upgrade transport to TLS.
1113
1114 Return a new transport that *protocol* should start using
1115 immediately.
1116 """
1117 if ssl is None:
1118 raise RuntimeError('Python ssl module is not available')
1119
1120 if not isinstance(sslcontext, ssl.SSLContext):
1121 raise TypeError(
1122 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1123 f'got {sslcontext!r}')
1124
1125 if not getattr(transport, '_start_tls_compatible', False):
1126 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001127 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001128
1129 waiter = self.create_future()
1130 ssl_protocol = sslproto.SSLProtocol(
1131 self, protocol, sslcontext, waiter,
1132 server_side, server_hostname,
1133 ssl_handshake_timeout=ssl_handshake_timeout,
1134 call_connection_made=False)
1135
Yury Selivanovf2955872018-05-29 01:00:12 -04001136 # Pause early so that "ssl_protocol.data_received()" doesn't
1137 # have a chance to get called before "ssl_protocol.connection_made()".
1138 transport.pause_reading()
1139
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001140 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001141 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1142 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001143
Yury Selivanov96026432018-06-04 11:32:35 -04001144 try:
1145 await waiter
1146 except Exception:
1147 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001148 conmade_cb.cancel()
1149 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001150 raise
1151
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001152 return ssl_protocol._app_transport
1153
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001154 async def create_datagram_endpoint(self, protocol_factory,
1155 local_addr=None, remote_addr=None, *,
1156 family=0, proto=0, flags=0,
1157 reuse_address=None, reuse_port=None,
1158 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001159 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001160 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001161 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001162 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001163 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001164 if (local_addr or remote_addr or
1165 family or proto or flags or
1166 reuse_address or reuse_port or allow_broadcast):
1167 # show the problematic kwargs in exception msg
1168 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1169 family=family, proto=proto, flags=flags,
1170 reuse_address=reuse_address, reuse_port=reuse_port,
1171 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001172 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001173 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001174 f'socket modifier keyword arguments can not be used '
1175 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001176 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001177 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001178 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001179 if not (local_addr or remote_addr):
1180 if family == 0:
1181 raise ValueError('unexpected address family')
1182 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001183 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1184 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001185 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001186 raise TypeError('string is expected')
Quentin Dawans56065d42019-04-09 15:40:59 +02001187
1188 if local_addr and local_addr[0] not in (0, '\x00'):
1189 try:
1190 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1191 os.remove(local_addr)
1192 except FileNotFoundError:
1193 pass
1194 except OSError as err:
1195 # Directory may have permissions only to create socket.
1196 logger.error('Unable to check or remove stale UNIX '
1197 'socket %r: %r',
1198 local_addr, err)
1199
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001200 addr_pairs_info = (((family, proto),
1201 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001202 else:
1203 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001204 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001205 for idx, addr in ((0, local_addr), (1, remote_addr)):
1206 if addr is not None:
1207 assert isinstance(addr, tuple) and len(addr) == 2, (
1208 '2-tuple is expected')
1209
Yury Selivanov19a44f62017-12-14 20:53:26 -05001210 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001211 addr, family=family, type=socket.SOCK_DGRAM,
1212 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001213 if not infos:
1214 raise OSError('getaddrinfo() returned empty list')
1215
1216 for fam, _, pro, _, address in infos:
1217 key = (fam, pro)
1218 if key not in addr_infos:
1219 addr_infos[key] = [None, None]
1220 addr_infos[key][idx] = address
1221
1222 # each addr has to have info for each (family, proto) pair
1223 addr_pairs_info = [
1224 (key, addr_pair) for key, addr_pair in addr_infos.items()
1225 if not ((local_addr and addr_pair[0] is None) or
1226 (remote_addr and addr_pair[1] is None))]
1227
1228 if not addr_pairs_info:
1229 raise ValueError('can not get address information')
1230
1231 exceptions = []
1232
1233 if reuse_address is None:
1234 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1235
1236 for ((family, proto),
1237 (local_address, remote_address)) in addr_pairs_info:
1238 sock = None
1239 r_addr = None
1240 try:
1241 sock = socket.socket(
1242 family=family, type=socket.SOCK_DGRAM, proto=proto)
1243 if reuse_address:
1244 sock.setsockopt(
1245 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1246 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001247 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001248 if allow_broadcast:
1249 sock.setsockopt(
1250 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1251 sock.setblocking(False)
1252
1253 if local_addr:
1254 sock.bind(local_address)
1255 if remote_addr:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001256 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001257 r_addr = remote_address
1258 except OSError as exc:
1259 if sock is not None:
1260 sock.close()
1261 exceptions.append(exc)
1262 except:
1263 if sock is not None:
1264 sock.close()
1265 raise
1266 else:
1267 break
1268 else:
1269 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001270
1271 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001272 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001273 transport = self._make_datagram_transport(
1274 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001275 if self._debug:
1276 if local_addr:
1277 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1278 "created: (%r, %r)",
1279 local_addr, remote_addr, transport, protocol)
1280 else:
1281 logger.debug("Datagram endpoint remote_addr=%r created: "
1282 "(%r, %r)",
1283 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001284
1285 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001286 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001287 except:
1288 transport.close()
1289 raise
1290
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001291 return transport, protocol
1292
Yury Selivanov19a44f62017-12-14 20:53:26 -05001293 async def _ensure_resolved(self, address, *,
1294 family=0, type=socket.SOCK_STREAM,
1295 proto=0, flags=0, loop):
1296 host, port = address[:2]
1297 info = _ipaddr_info(host, port, family, type, proto)
1298 if info is not None:
1299 # "host" is already a resolved IP.
1300 return [info]
1301 else:
1302 return await loop.getaddrinfo(host, port, family=family, type=type,
1303 proto=proto, flags=flags)
1304
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001305 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001306 infos = await self._ensure_resolved((host, port), family=family,
1307 type=socket.SOCK_STREAM,
1308 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001309 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001310 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001311 return infos
1312
Neil Aspinallf7686c12017-12-19 19:45:42 +00001313 async def create_server(
1314 self, protocol_factory, host=None, port=None,
1315 *,
1316 family=socket.AF_UNSPEC,
1317 flags=socket.AI_PASSIVE,
1318 sock=None,
1319 backlog=100,
1320 ssl=None,
1321 reuse_address=None,
1322 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001323 ssl_handshake_timeout=None,
1324 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001325 """Create a TCP server.
1326
Yury Selivanov6370f342017-12-10 18:36:12 -05001327 The host parameter can be a string, in that case the TCP server is
1328 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001329
1330 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001331 the TCP server is bound to all hosts of the sequence. If a host
1332 appears multiple times (possibly indirectly e.g. when hostnames
1333 resolve to the same IP address), the server is only bound once to that
1334 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001335
Victor Stinneracdb7822014-07-14 18:33:40 +02001336 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001337
1338 This method is a coroutine.
1339 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001340 if isinstance(ssl, bool):
1341 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001342
1343 if ssl_handshake_timeout is not None and ssl is None:
1344 raise ValueError(
1345 'ssl_handshake_timeout is only meaningful with ssl')
1346
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001347 if host is not None or port is not None:
1348 if sock is not None:
1349 raise ValueError(
1350 'host/port and sock can not be specified at the same time')
1351
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001352 if reuse_address is None:
1353 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1354 sockets = []
1355 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001356 hosts = [None]
1357 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001358 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001359 hosts = [host]
1360 else:
1361 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001362
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001363 fs = [self._create_server_getaddrinfo(host, port, family=family,
1364 flags=flags)
1365 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001366 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001367 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001368
1369 completed = False
1370 try:
1371 for res in infos:
1372 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001373 try:
1374 sock = socket.socket(af, socktype, proto)
1375 except socket.error:
1376 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001377 if self._debug:
1378 logger.warning('create_server() failed to create '
1379 'socket.socket(%r, %r, %r)',
1380 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001381 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001382 sockets.append(sock)
1383 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001384 sock.setsockopt(
1385 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1386 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001387 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001388 # Disable IPv4/IPv6 dual stack support (enabled by
1389 # default on Linux) which makes a single socket
1390 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001391 if (_HAS_IPv6 and
1392 af == socket.AF_INET6 and
1393 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001394 sock.setsockopt(socket.IPPROTO_IPV6,
1395 socket.IPV6_V6ONLY,
1396 True)
1397 try:
1398 sock.bind(sa)
1399 except OSError as err:
1400 raise OSError(err.errno, 'error while attempting '
1401 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001402 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001403 completed = True
1404 finally:
1405 if not completed:
1406 for sock in sockets:
1407 sock.close()
1408 else:
1409 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001410 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001411 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001412 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001413 sockets = [sock]
1414
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001415 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001416 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001417
1418 server = Server(self, sockets, protocol_factory,
1419 ssl, backlog, ssl_handshake_timeout)
1420 if start_serving:
1421 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001422 # Skip one loop iteration so that all 'loop.add_reader'
1423 # go through.
1424 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001425
Victor Stinnere912e652014-07-12 03:11:53 +02001426 if self._debug:
1427 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001428 return server
1429
Neil Aspinallf7686c12017-12-19 19:45:42 +00001430 async def connect_accepted_socket(
1431 self, protocol_factory, sock,
1432 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001433 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001434 """Handle an accepted connection.
1435
1436 This is used by servers that accept connections outside of
1437 asyncio but that use asyncio to handle connections.
1438
1439 This method is a coroutine. When completed, the coroutine
1440 returns a (transport, protocol) pair.
1441 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001442 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001443 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001444
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001445 if ssl_handshake_timeout is not None and not ssl:
1446 raise ValueError(
1447 'ssl_handshake_timeout is only meaningful with ssl')
1448
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001449 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001450 sock, protocol_factory, ssl, '', server_side=True,
1451 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001452 if self._debug:
1453 # Get the socket from the transport because SSL transport closes
1454 # the old socket and creates a new SSL socket
1455 sock = transport.get_extra_info('socket')
1456 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1457 return transport, protocol
1458
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001459 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001460 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001461 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001462 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001463
1464 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001465 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001466 except:
1467 transport.close()
1468 raise
1469
Victor Stinneracdb7822014-07-14 18:33:40 +02001470 if self._debug:
1471 logger.debug('Read pipe %r connected: (%r, %r)',
1472 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001473 return transport, protocol
1474
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001475 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001476 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001477 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001478 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001479
1480 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001481 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001482 except:
1483 transport.close()
1484 raise
1485
Victor Stinneracdb7822014-07-14 18:33:40 +02001486 if self._debug:
1487 logger.debug('Write pipe %r connected: (%r, %r)',
1488 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001489 return transport, protocol
1490
Victor Stinneracdb7822014-07-14 18:33:40 +02001491 def _log_subprocess(self, msg, stdin, stdout, stderr):
1492 info = [msg]
1493 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001494 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001495 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001496 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001497 else:
1498 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001499 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001500 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001501 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001502 logger.debug(' '.join(info))
1503
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001504 async def subprocess_shell(self, protocol_factory, cmd, *,
1505 stdin=subprocess.PIPE,
1506 stdout=subprocess.PIPE,
1507 stderr=subprocess.PIPE,
1508 universal_newlines=False,
1509 shell=True, bufsize=0,
1510 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001511 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001512 raise ValueError("cmd must be a string")
1513 if universal_newlines:
1514 raise ValueError("universal_newlines must be False")
1515 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001516 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001517 if bufsize != 0:
1518 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001519 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001520 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001521 if self._debug:
1522 # don't log parameters: they may contain sensitive information
1523 # (password) and may be too long
1524 debug_log = 'run shell command %r' % cmd
1525 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001526 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001527 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001528 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001529 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001530 return transport, protocol
1531
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001532 async def subprocess_exec(self, protocol_factory, program, *args,
1533 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1534 stderr=subprocess.PIPE, universal_newlines=False,
1535 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001536 if universal_newlines:
1537 raise ValueError("universal_newlines must be False")
1538 if shell:
1539 raise ValueError("shell must be False")
1540 if bufsize != 0:
1541 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001542 popen_args = (program,) + args
1543 for arg in popen_args:
1544 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001545 raise TypeError(
1546 f"program arguments must be a bytes or text string, "
1547 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001548 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001549 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001550 if self._debug:
1551 # don't log parameters: they may contain sensitive information
1552 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001553 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001554 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001555 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001556 protocol, popen_args, False, stdin, stdout, stderr,
1557 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001558 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001559 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001560 return transport, protocol
1561
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001562 def get_exception_handler(self):
1563 """Return an exception handler, or None if the default one is in use.
1564 """
1565 return self._exception_handler
1566
Yury Selivanov569efa22014-02-18 18:02:19 -05001567 def set_exception_handler(self, handler):
1568 """Set handler as the new event loop exception handler.
1569
1570 If handler is None, the default exception handler will
1571 be set.
1572
1573 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001574 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001575 will be a reference to the active event loop, 'context'
1576 will be a dict object (see `call_exception_handler()`
1577 documentation for details about context).
1578 """
1579 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001580 raise TypeError(f'A callable object or None is expected, '
1581 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001582 self._exception_handler = handler
1583
1584 def default_exception_handler(self, context):
1585 """Default exception handler.
1586
1587 This is called when an exception occurs and no exception
1588 handler is set, and can be called by a custom exception
1589 handler that wants to defer to the default behavior.
1590
Antoine Pitrou921e9432017-11-07 17:23:29 +01001591 This default handler logs the error message and other
1592 context-dependent information. In debug mode, a truncated
1593 stack trace is also appended showing where the given object
1594 (e.g. a handle or future or task) was created, if any.
1595
Victor Stinneracdb7822014-07-14 18:33:40 +02001596 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001597 `call_exception_handler()`.
1598 """
1599 message = context.get('message')
1600 if not message:
1601 message = 'Unhandled exception in event loop'
1602
1603 exception = context.get('exception')
1604 if exception is not None:
1605 exc_info = (type(exception), exception, exception.__traceback__)
1606 else:
1607 exc_info = False
1608
Yury Selivanov6370f342017-12-10 18:36:12 -05001609 if ('source_traceback' not in context and
1610 self._current_handle is not None and
1611 self._current_handle._source_traceback):
1612 context['handle_traceback'] = \
1613 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001614
Yury Selivanov569efa22014-02-18 18:02:19 -05001615 log_lines = [message]
1616 for key in sorted(context):
1617 if key in {'message', 'exception'}:
1618 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001619 value = context[key]
1620 if key == 'source_traceback':
1621 tb = ''.join(traceback.format_list(value))
1622 value = 'Object created at (most recent call last):\n'
1623 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001624 elif key == 'handle_traceback':
1625 tb = ''.join(traceback.format_list(value))
1626 value = 'Handle created at (most recent call last):\n'
1627 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001628 else:
1629 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001630 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001631
1632 logger.error('\n'.join(log_lines), exc_info=exc_info)
1633
1634 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001635 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001636
Victor Stinneracdb7822014-07-14 18:33:40 +02001637 The context argument is a dict containing the following keys:
1638
Yury Selivanov569efa22014-02-18 18:02:19 -05001639 - 'message': Error message;
1640 - 'exception' (optional): Exception object;
1641 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001642 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001643 - 'handle' (optional): Handle instance;
1644 - 'protocol' (optional): Protocol instance;
1645 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001646 - 'socket' (optional): Socket instance;
1647 - 'asyncgen' (optional): Asynchronous generator that caused
1648 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001649
Victor Stinneracdb7822014-07-14 18:33:40 +02001650 New keys maybe introduced in the future.
1651
1652 Note: do not overload this method in an event loop subclass.
1653 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001654 `set_exception_handler()` method.
1655 """
1656 if self._exception_handler is None:
1657 try:
1658 self.default_exception_handler(context)
1659 except Exception:
1660 # Second protection layer for unexpected errors
1661 # in the default implementation, as well as for subclassed
1662 # event loops with overloaded "default_exception_handler".
1663 logger.error('Exception in default exception handler',
1664 exc_info=True)
1665 else:
1666 try:
1667 self._exception_handler(self, context)
1668 except Exception as exc:
1669 # Exception in the user set custom exception handler.
1670 try:
1671 # Let's try default handler.
1672 self.default_exception_handler({
1673 'message': 'Unhandled error in exception handler',
1674 'exception': exc,
1675 'context': context,
1676 })
1677 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001678 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001679 # overloaded.
1680 logger.error('Exception in default exception handler '
1681 'while handling an unexpected error '
1682 'in custom exception handler',
1683 exc_info=True)
1684
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001685 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001686 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001687 assert isinstance(handle, events.Handle), 'A Handle is required here'
1688 if handle._cancelled:
1689 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001690 assert not isinstance(handle, events.TimerHandle)
1691 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001692
1693 def _add_callback_signalsafe(self, handle):
1694 """Like _add_callback() but called from a signal handler."""
1695 self._add_callback(handle)
1696 self._write_to_self()
1697
Yury Selivanov592ada92014-09-25 12:07:56 -04001698 def _timer_handle_cancelled(self, handle):
1699 """Notification that a TimerHandle has been cancelled."""
1700 if handle._scheduled:
1701 self._timer_cancelled_count += 1
1702
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001703 def _run_once(self):
1704 """Run one full iteration of the event loop.
1705
1706 This calls all currently ready callbacks, polls for I/O,
1707 schedules the resulting callbacks, and finally schedules
1708 'call_later' callbacks.
1709 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001710
Yury Selivanov592ada92014-09-25 12:07:56 -04001711 sched_count = len(self._scheduled)
1712 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1713 self._timer_cancelled_count / sched_count >
1714 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001715 # Remove delayed calls that were cancelled if their number
1716 # is too high
1717 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001718 for handle in self._scheduled:
1719 if handle._cancelled:
1720 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001721 else:
1722 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001723
Victor Stinner68da8fc2014-09-30 18:08:36 +02001724 heapq.heapify(new_scheduled)
1725 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001726 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001727 else:
1728 # Remove delayed calls that were cancelled from head of queue.
1729 while self._scheduled and self._scheduled[0]._cancelled:
1730 self._timer_cancelled_count -= 1
1731 handle = heapq.heappop(self._scheduled)
1732 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001733
1734 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001735 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001736 timeout = 0
1737 elif self._scheduled:
1738 # Compute the desired timeout.
1739 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001740 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001741
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001742 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001743 self._process_events(event_list)
1744
1745 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001746 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001747 while self._scheduled:
1748 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001749 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001750 break
1751 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001752 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001753 self._ready.append(handle)
1754
1755 # This is the only place where callbacks are actually *called*.
1756 # All other places just add them to ready.
1757 # Note: We run all currently scheduled callbacks, but not any
1758 # callbacks scheduled by callbacks run this time around --
1759 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001760 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001761 ntodo = len(self._ready)
1762 for i in range(ntodo):
1763 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001764 if handle._cancelled:
1765 continue
1766 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001767 try:
1768 self._current_handle = handle
1769 t0 = self.time()
1770 handle._run()
1771 dt = self.time() - t0
1772 if dt >= self.slow_callback_duration:
1773 logger.warning('Executing %s took %.3f seconds',
1774 _format_handle(handle), dt)
1775 finally:
1776 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001777 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001778 handle._run()
1779 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001780
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001781 def _set_coroutine_origin_tracking(self, enabled):
1782 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001783 return
1784
Yury Selivanove8944cb2015-05-12 11:43:04 -04001785 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001786 self._coroutine_origin_tracking_saved_depth = (
1787 sys.get_coroutine_origin_tracking_depth())
1788 sys.set_coroutine_origin_tracking_depth(
1789 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001790 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001791 sys.set_coroutine_origin_tracking_depth(
1792 self._coroutine_origin_tracking_saved_depth)
1793
1794 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001795
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001796 def get_debug(self):
1797 return self._debug
1798
1799 def set_debug(self, enabled):
1800 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001801
Yury Selivanove8944cb2015-05-12 11:43:04 -04001802 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001803 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)