blob: 3726c556d4f09df424fcb1566cc8a1219d09ab7f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070030import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Yury Selivanovf111b3d2017-12-30 00:35:36 -050032try:
33 import ssl
34except ImportError: # pragma: no cover
35 ssl = None
36
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080037from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020038from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070040from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020042from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050043from . import sslproto
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020045from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070046from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48
Yury Selivanov6370f342017-12-10 18:36:12 -050049__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov592ada92014-09-25 12:07:56 -040052# Minimum number of _scheduled timer handles before cleanup of
53# cancelled handles is performed.
54_MIN_SCHEDULED_TIMER_HANDLES = 100
55
56# Minimum fraction of _scheduled timer handles that are cancelled
57# before cleanup of cancelled handles is performed.
58_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
Victor Stinnerc94a93a2016-04-01 21:43:39 +020060# Exceptions which must not call the exception handler in fatal error
61# methods (_fatal_error())
62_FATAL_ERROR_IGNORE = (BrokenPipeError,
63 ConnectionResetError, ConnectionAbortedError)
64
Andrew Svetlov0dd71802018-09-12 14:03:54 -070065if ssl is not None:
66 _FATAL_ERROR_IGNORE = _FATAL_ERROR_IGNORE + (ssl.SSLCertVerificationError,)
67
Yury Selivanovd904c232018-06-28 21:59:32 -040068_HAS_IPv6 = hasattr(socket, 'AF_INET6')
69
MartinAltmayer944451c2018-07-31 15:06:12 +010070# Maximum timeout passed to select to avoid OS limitations
71MAXIMUM_SELECT_TIMEOUT = 24 * 3600
72
Victor Stinnerc94a93a2016-04-01 21:43:39 +020073
Victor Stinner0e6f52a2014-06-20 17:34:15 +020074def _format_handle(handle):
75 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040076 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020077 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
81
82
Victor Stinneracdb7822014-07-14 18:33:40 +020083def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
90
91
Yury Selivanov5587d7c2016-09-15 15:45:07 -040092def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
101
102
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500103def _ipaddr_info(host, port, family, type, proto):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
108
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500111 return None
112
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500113 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500114 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500115 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500116 proto = socket.IPPROTO_UDP
117 else:
118 return None
119
Yury Selivanova7146162016-06-02 16:51:07 -0400120 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400121 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400132
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500134 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400135 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500136 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400137 else:
138 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500145 return None
146
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400147 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 socket.inet_pton(af, host)
150 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400151 if _HAS_IPv6 and af == socket.AF_INET6:
152 return af, type, proto, '', (host, port, 0, 0)
153 else:
154 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400155 except OSError:
156 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500157
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400158 # "host" is not an IP address.
159 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500160
161
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100162def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500163 if not fut.cancelled():
164 exc = fut.exception()
165 if isinstance(exc, BaseException) and not isinstance(exc, Exception):
166 # Issue #22429: run_forever() already finished, no need to
167 # stop it.
168 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500169 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100170
171
Andrew Svetlov7c684072018-01-27 21:22:47 +0200172class _SendfileFallbackProtocol(protocols.Protocol):
173 def __init__(self, transp):
174 if not isinstance(transp, transports._FlowControlMixin):
175 raise TypeError("transport should be _FlowControlMixin instance")
176 self._transport = transp
177 self._proto = transp.get_protocol()
178 self._should_resume_reading = transp.is_reading()
179 self._should_resume_writing = transp._protocol_paused
180 transp.pause_reading()
181 transp.set_protocol(self)
182 if self._should_resume_writing:
183 self._write_ready_fut = self._transport._loop.create_future()
184 else:
185 self._write_ready_fut = None
186
187 async def drain(self):
188 if self._transport.is_closing():
189 raise ConnectionError("Connection closed by peer")
190 fut = self._write_ready_fut
191 if fut is None:
192 return
193 await fut
194
195 def connection_made(self, transport):
196 raise RuntimeError("Invalid state: "
197 "connection should have been established already.")
198
199 def connection_lost(self, exc):
200 if self._write_ready_fut is not None:
201 # Never happens if peer disconnects after sending the whole content
202 # Thus disconnection is always an exception from user perspective
203 if exc is None:
204 self._write_ready_fut.set_exception(
205 ConnectionError("Connection is closed by peer"))
206 else:
207 self._write_ready_fut.set_exception(exc)
208 self._proto.connection_lost(exc)
209
210 def pause_writing(self):
211 if self._write_ready_fut is not None:
212 return
213 self._write_ready_fut = self._transport._loop.create_future()
214
215 def resume_writing(self):
216 if self._write_ready_fut is None:
217 return
218 self._write_ready_fut.set_result(False)
219 self._write_ready_fut = None
220
221 def data_received(self, data):
222 raise RuntimeError("Invalid state: reading should be paused")
223
224 def eof_received(self):
225 raise RuntimeError("Invalid state: reading should be paused")
226
227 async def restore(self):
228 self._transport.set_protocol(self._proto)
229 if self._should_resume_reading:
230 self._transport.resume_reading()
231 if self._write_ready_fut is not None:
232 # Cancel the future.
233 # Basically it has no effect because protocol is switched back,
234 # no code should wait for it anymore.
235 self._write_ready_fut.cancel()
236 if self._should_resume_writing:
237 self._proto.resume_writing()
238
239
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240class Server(events.AbstractServer):
241
Yury Selivanovc9070d02018-01-25 18:08:09 -0500242 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
243 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200244 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500245 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200246 self._active_count = 0
247 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500248 self._protocol_factory = protocol_factory
249 self._backlog = backlog
250 self._ssl_context = ssl_context
251 self._ssl_handshake_timeout = ssl_handshake_timeout
252 self._serving = False
253 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
Victor Stinnere912e652014-07-12 03:11:53 +0200255 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500256 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200257
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200258 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500259 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200260 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200262 def _detach(self):
263 assert self._active_count > 0
264 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500265 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 self._wakeup()
267
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200269 waiters = self._waiters
270 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 for waiter in waiters:
272 if not waiter.done():
273 waiter.set_result(waiter)
274
Yury Selivanovc9070d02018-01-25 18:08:09 -0500275 def _start_serving(self):
276 if self._serving:
277 return
278 self._serving = True
279 for sock in self._sockets:
280 sock.listen(self._backlog)
281 self._loop._start_serving(
282 self._protocol_factory, sock, self._ssl_context,
283 self, self._backlog, self._ssl_handshake_timeout)
284
285 def get_loop(self):
286 return self._loop
287
288 def is_serving(self):
289 return self._serving
290
291 @property
292 def sockets(self):
293 if self._sockets is None:
294 return []
295 return list(self._sockets)
296
297 def close(self):
298 sockets = self._sockets
299 if sockets is None:
300 return
301 self._sockets = None
302
303 for sock in sockets:
304 self._loop._stop_serving(sock)
305
306 self._serving = False
307
308 if (self._serving_forever_fut is not None and
309 not self._serving_forever_fut.done()):
310 self._serving_forever_fut.cancel()
311 self._serving_forever_fut = None
312
313 if self._active_count == 0:
314 self._wakeup()
315
316 async def start_serving(self):
317 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400318 # Skip one loop iteration so that all 'loop.add_reader'
319 # go through.
320 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500321
322 async def serve_forever(self):
323 if self._serving_forever_fut is not None:
324 raise RuntimeError(
325 f'server {self!r} is already being awaited on serve_forever()')
326 if self._sockets is None:
327 raise RuntimeError(f'server {self!r} is closed')
328
329 self._start_serving()
330 self._serving_forever_fut = self._loop.create_future()
331
332 try:
333 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700334 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500335 try:
336 self.close()
337 await self.wait_closed()
338 finally:
339 raise
340 finally:
341 self._serving_forever_fut = None
342
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200343 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500344 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400346 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200347 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200348 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350
351class BaseEventLoop(events.AbstractEventLoop):
352
353 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400354 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200355 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800356 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 self._ready = collections.deque()
358 self._scheduled = []
359 self._default_executor = None
360 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100361 # Identifier of the thread running the event loop, or None if the
362 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100363 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100364 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500365 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800366 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200367 # In debug mode, if the execution of a callback or a step of a task
368 # exceed this duration in seconds, the slow callback/task is logged.
369 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100370 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400371 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800372 self._coroutine_origin_tracking_enabled = False
373 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500375 # A weak set of all asynchronous generators that are
376 # being iterated by the loop.
377 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700378 # Set to True when `loop.shutdown_asyncgens` is called.
379 self._asyncgens_shutdown_called = False
380
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200381 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500382 return (
383 f'<{self.__class__.__name__} running={self.is_running()} '
384 f'closed={self.is_closed()} debug={self.get_debug()}>'
385 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200386
Yury Selivanov7661db62016-05-16 15:38:39 -0400387 def create_future(self):
388 """Create a Future object attached to the loop."""
389 return futures.Future(loop=self)
390
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300391 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200392 """Schedule a coroutine object.
393
Victor Stinneracdb7822014-07-14 18:33:40 +0200394 Return a task object.
395 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100396 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400397 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300398 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400399 if task._source_traceback:
400 del task._source_traceback[-1]
401 else:
402 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300403 tasks._set_task_name(task, name)
404
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200405 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200406
Yury Selivanov740169c2015-05-11 14:23:38 -0400407 def set_task_factory(self, factory):
408 """Set a task factory that will be used by loop.create_task().
409
410 If factory is None the default task factory will be set.
411
412 If factory is a callable, it should have a signature matching
413 '(loop, coro)', where 'loop' will be a reference to the active
414 event loop, 'coro' will be a coroutine object. The callable
415 must return a Future.
416 """
417 if factory is not None and not callable(factory):
418 raise TypeError('task factory must be a callable or None')
419 self._task_factory = factory
420
421 def get_task_factory(self):
422 """Return a task factory, or None if the default one is in use."""
423 return self._task_factory
424
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 def _make_socket_transport(self, sock, protocol, waiter=None, *,
426 extra=None, server=None):
427 """Create socket transport."""
428 raise NotImplementedError
429
Neil Aspinallf7686c12017-12-19 19:45:42 +0000430 def _make_ssl_transport(
431 self, rawsock, protocol, sslcontext, waiter=None,
432 *, server_side=False, server_hostname=None,
433 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500434 ssl_handshake_timeout=None,
435 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 """Create SSL transport."""
437 raise NotImplementedError
438
439 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200440 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 """Create datagram transport."""
442 raise NotImplementedError
443
444 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
445 extra=None):
446 """Create read pipe transport."""
447 raise NotImplementedError
448
449 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
450 extra=None):
451 """Create write pipe transport."""
452 raise NotImplementedError
453
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200454 async def _make_subprocess_transport(self, protocol, args, shell,
455 stdin, stdout, stderr, bufsize,
456 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 """Create subprocess transport."""
458 raise NotImplementedError
459
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200461 """Write a byte to self-pipe, to wake up the event loop.
462
463 This may be called from a different thread.
464
465 The subclass is responsible for implementing the self-pipe.
466 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 raise NotImplementedError
468
469 def _process_events(self, event_list):
470 """Process selector events."""
471 raise NotImplementedError
472
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200473 def _check_closed(self):
474 if self._closed:
475 raise RuntimeError('Event loop is closed')
476
Yury Selivanoveb636452016-09-08 22:01:51 -0700477 def _asyncgen_finalizer_hook(self, agen):
478 self._asyncgens.discard(agen)
479 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800480 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700481
482 def _asyncgen_firstiter_hook(self, agen):
483 if self._asyncgens_shutdown_called:
484 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500485 f"asynchronous generator {agen!r} was scheduled after "
486 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700487 ResourceWarning, source=self)
488
489 self._asyncgens.add(agen)
490
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200491 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700492 """Shutdown all active asynchronous generators."""
493 self._asyncgens_shutdown_called = True
494
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500495 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400496 # If Python version is <3.6 or we don't have any asynchronous
497 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700498 return
499
500 closing_agens = list(self._asyncgens)
501 self._asyncgens.clear()
502
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200503 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700504 *[ag.aclose() for ag in closing_agens],
505 return_exceptions=True,
506 loop=self)
507
Yury Selivanoveb636452016-09-08 22:01:51 -0700508 for result, agen in zip(results, closing_agens):
509 if isinstance(result, Exception):
510 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500511 'message': f'an error occurred during closing of '
512 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700513 'exception': result,
514 'asyncgen': agen
515 })
516
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 def run_forever(self):
518 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200519 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100520 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400521 raise RuntimeError('This event loop is already running')
522 if events._get_running_loop() is not None:
523 raise RuntimeError(
524 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800525 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100526 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500527
528 old_agen_hooks = sys.get_asyncgen_hooks()
529 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
530 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400532 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800534 self._run_once()
535 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 break
537 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800538 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100539 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400540 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800541 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500542 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
544 def run_until_complete(self, future):
545 """Run until the Future is done.
546
547 If the argument is a coroutine, it is wrapped in a Task.
548
Victor Stinneracdb7822014-07-14 18:33:40 +0200549 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 with the same coroutine twice -- it would wrap it in two
551 different Tasks and that can't be good.
552
553 Return the Future's result, or raise its exception.
554 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200555 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200556
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700557 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400558 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200559 if new_task:
560 # An exception is raised if the future didn't complete, so there
561 # is no need to log the "destroy pending task" message
562 future._log_destroy_pending = False
563
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100564 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200565 try:
566 self.run_forever()
567 except:
568 if new_task and future.done() and not future.cancelled():
569 # The coroutine raised a BaseException. Consume the exception
570 # to not log a warning, the caller doesn't have access to the
571 # local task.
572 future.exception()
573 raise
jimmylai21b3e042017-05-22 22:32:46 -0700574 finally:
575 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 if not future.done():
577 raise RuntimeError('Event loop stopped before Future completed.')
578
579 return future.result()
580
581 def stop(self):
582 """Stop running the event loop.
583
Guido van Rossum41f69f42015-11-19 13:28:47 -0800584 Every callback already scheduled will still run. This simply informs
585 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800587 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200589 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700590 """Close the event loop.
591
592 This clears the queues and shuts down the executor,
593 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200594
595 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700596 """
Victor Stinner956de692014-12-26 21:07:52 +0100597 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200598 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200599 if self._closed:
600 return
Victor Stinnere912e652014-07-12 03:11:53 +0200601 if self._debug:
602 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400603 self._closed = True
604 self._ready.clear()
605 self._scheduled.clear()
606 executor = self._default_executor
607 if executor is not None:
608 self._default_executor = None
609 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200610
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200611 def is_closed(self):
612 """Returns True if the event loop was closed."""
613 return self._closed
614
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900615 def __del__(self):
616 if not self.is_closed():
Yury Selivanov6370f342017-12-10 18:36:12 -0500617 warnings.warn(f"unclosed event loop {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900618 source=self)
619 if not self.is_running():
620 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100621
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200623 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100624 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625
626 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200627 """Return the time according to the event loop's clock.
628
629 This is a float expressed in seconds since an epoch, but the
630 epoch, precision, accuracy and drift are unspecified and may
631 differ per event loop.
632 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633 return time.monotonic()
634
Yury Selivanovf23746a2018-01-22 19:11:18 -0500635 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 """Arrange for a callback to be called at a given time.
637
638 Return a Handle: an opaque object with a cancel() method that
639 can be used to cancel the call.
640
641 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200642 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
644 Each callback will be called exactly once. If two callbacks
645 are scheduled for exactly the same time, it undefined which
646 will be called first.
647
648 Any positional arguments after the callback will be passed to
649 the callback when it is called.
650 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500651 timer = self.call_at(self.time() + delay, callback, *args,
652 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200653 if timer._source_traceback:
654 del timer._source_traceback[-1]
655 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
Yury Selivanovf23746a2018-01-22 19:11:18 -0500657 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200658 """Like call_later(), but uses an absolute time.
659
660 Absolute time corresponds to the event loop's time() method.
661 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100662 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100663 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100664 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700665 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500666 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200667 if timer._source_traceback:
668 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400670 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 return timer
672
Yury Selivanovf23746a2018-01-22 19:11:18 -0500673 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 """Arrange for a callback to be called as soon as possible.
675
Victor Stinneracdb7822014-07-14 18:33:40 +0200676 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 order in which they are registered. Each callback will be
678 called exactly once.
679
680 Any positional arguments after the callback will be passed to
681 the callback when it is called.
682 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700683 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100684 if self._debug:
685 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700686 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500687 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200688 if handle._source_traceback:
689 del handle._source_traceback[-1]
690 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100691
Yury Selivanov491a9122016-11-03 15:09:24 -0700692 def _check_callback(self, callback, method):
693 if (coroutines.iscoroutine(callback) or
694 coroutines.iscoroutinefunction(callback)):
695 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500696 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700697 if not callable(callback):
698 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500699 f'a callable object was expected by {method}(), '
700 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700701
Yury Selivanovf23746a2018-01-22 19:11:18 -0500702 def _call_soon(self, callback, args, context):
703 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200704 if handle._source_traceback:
705 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 self._ready.append(handle)
707 return handle
708
Victor Stinner956de692014-12-26 21:07:52 +0100709 def _check_thread(self):
710 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100711
Victor Stinneracdb7822014-07-14 18:33:40 +0200712 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100713 likely behave incorrectly when the assumption is violated.
714
Victor Stinneracdb7822014-07-14 18:33:40 +0200715 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100716 responsible for checking this condition for performance reasons.
717 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100718 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200719 return
Victor Stinner956de692014-12-26 21:07:52 +0100720 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100721 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100722 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200723 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100724 "than the current one")
725
Yury Selivanovf23746a2018-01-22 19:11:18 -0500726 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200727 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700728 self._check_closed()
729 if self._debug:
730 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500731 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200732 if handle._source_traceback:
733 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 self._write_to_self()
735 return handle
736
Yury Selivanovbec23722018-01-28 14:09:40 -0500737 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100738 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700739 if self._debug:
740 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700741 if executor is None:
742 executor = self._default_executor
743 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400744 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500746 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500747 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748
749 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100750 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
751 warnings.warn(
752 'Using the default executor that is not an instance of '
753 'ThreadPoolExecutor is deprecated and will be prohibited '
754 'in Python 3.9',
755 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756 self._default_executor = executor
757
Victor Stinnere912e652014-07-12 03:11:53 +0200758 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500759 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200760 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500761 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200762 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500763 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200764 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500765 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200766 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500767 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200768 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200769 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200770
771 t0 = self.time()
772 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
773 dt = self.time() - t0
774
Yury Selivanov6370f342017-12-10 18:36:12 -0500775 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200776 if dt >= self.slow_callback_duration:
777 logger.info(msg)
778 else:
779 logger.debug(msg)
780 return addrinfo
781
Yury Selivanov19a44f62017-12-14 20:53:26 -0500782 async def getaddrinfo(self, host, port, *,
783 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400784 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500785 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200786 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500787 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788
Yury Selivanov19a44f62017-12-14 20:53:26 -0500789 return await self.run_in_executor(
790 None, getaddr_func, host, port, family, type, proto, flags)
791
792 async def getnameinfo(self, sockaddr, flags=0):
793 return await self.run_in_executor(
794 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700795
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200796 async def sock_sendfile(self, sock, file, offset=0, count=None,
797 *, fallback=True):
798 if self._debug and sock.gettimeout() != 0:
799 raise ValueError("the socket must be non-blocking")
800 self._check_sendfile_params(sock, file, offset, count)
801 try:
802 return await self._sock_sendfile_native(sock, file,
803 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700804 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200805 if not fallback:
806 raise
807 return await self._sock_sendfile_fallback(sock, file,
808 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200809
810 async def _sock_sendfile_native(self, sock, file, offset, count):
811 # NB: sendfile syscall is not supported for SSL sockets and
812 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700813 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200814 f"syscall sendfile is not available for socket {sock!r} "
815 "and file {file!r} combination")
816
817 async def _sock_sendfile_fallback(self, sock, file, offset, count):
818 if offset:
819 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400820 blocksize = (
821 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
822 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
823 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200824 buf = bytearray(blocksize)
825 total_sent = 0
826 try:
827 while True:
828 if count:
829 blocksize = min(count - total_sent, blocksize)
830 if blocksize <= 0:
831 break
832 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400833 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200834 if not read:
835 break # EOF
836 await self.sock_sendall(sock, view)
837 total_sent += read
838 return total_sent
839 finally:
840 if total_sent > 0 and hasattr(file, 'seek'):
841 file.seek(offset + total_sent)
842
843 def _check_sendfile_params(self, sock, file, offset, count):
844 if 'b' not in getattr(file, 'mode', 'b'):
845 raise ValueError("file should be opened in binary mode")
846 if not sock.type == socket.SOCK_STREAM:
847 raise ValueError("only SOCK_STREAM type sockets are supported")
848 if count is not None:
849 if not isinstance(count, int):
850 raise TypeError(
851 "count must be a positive integer (got {!r})".format(count))
852 if count <= 0:
853 raise ValueError(
854 "count must be a positive integer (got {!r})".format(count))
855 if not isinstance(offset, int):
856 raise TypeError(
857 "offset must be a non-negative integer (got {!r})".format(
858 offset))
859 if offset < 0:
860 raise ValueError(
861 "offset must be a non-negative integer (got {!r})".format(
862 offset))
863
Neil Aspinallf7686c12017-12-19 19:45:42 +0000864 async def create_connection(
865 self, protocol_factory, host=None, port=None,
866 *, ssl=None, family=0,
867 proto=0, flags=0, sock=None,
868 local_addr=None, server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200869 ssl_handshake_timeout=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200870 """Connect to a TCP server.
871
872 Create a streaming transport connection to a given Internet host and
873 port: socket family AF_INET or socket.AF_INET6 depending on host (or
874 family if specified), socket type SOCK_STREAM. protocol_factory must be
875 a callable returning a protocol instance.
876
877 This method is a coroutine which will try to establish the connection
878 in the background. When successful, the coroutine returns a
879 (transport, protocol) pair.
880 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700881 if server_hostname is not None and not ssl:
882 raise ValueError('server_hostname is only meaningful with ssl')
883
884 if server_hostname is None and ssl:
885 # Use host as default for server_hostname. It is an error
886 # if host is empty or not set, e.g. when an
887 # already-connected socket was passed or when only a port
888 # is given. To avoid this error, you can pass
889 # server_hostname='' -- this will bypass the hostname
890 # check. (This also means that if host is a numeric
891 # IP/IPv6 address, we will attempt to verify that exact
892 # address; this will probably fail, but it is possible to
893 # create a certificate for a specific IP address, so we
894 # don't judge it here.)
895 if not host:
896 raise ValueError('You must set server_hostname '
897 'when using ssl without a host')
898 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700899
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200900 if ssl_handshake_timeout is not None and not ssl:
901 raise ValueError(
902 'ssl_handshake_timeout is only meaningful with ssl')
903
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700904 if host is not None or port is not None:
905 if sock is not None:
906 raise ValueError(
907 'host/port and sock can not be specified at the same time')
908
Yury Selivanov19a44f62017-12-14 20:53:26 -0500909 infos = await self._ensure_resolved(
910 (host, port), family=family,
911 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700912 if not infos:
913 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500914
915 if local_addr is not None:
916 laddr_infos = await self._ensure_resolved(
917 local_addr, family=family,
918 type=socket.SOCK_STREAM, proto=proto,
919 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700920 if not laddr_infos:
921 raise OSError('getaddrinfo() returned empty list')
922
923 exceptions = []
924 for family, type, proto, cname, address in infos:
925 try:
926 sock = socket.socket(family=family, type=type, proto=proto)
927 sock.setblocking(False)
Yury Selivanov19a44f62017-12-14 20:53:26 -0500928 if local_addr is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700929 for _, _, _, _, laddr in laddr_infos:
930 try:
931 sock.bind(laddr)
932 break
933 except OSError as exc:
Yury Selivanov6370f342017-12-10 18:36:12 -0500934 msg = (
935 f'error while attempting to bind on '
936 f'address {laddr!r}: '
937 f'{exc.strerror.lower()}'
938 )
939 exc = OSError(exc.errno, msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700940 exceptions.append(exc)
941 else:
942 sock.close()
943 sock = None
944 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200945 if self._debug:
946 logger.debug("connect %r to %r", sock, address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200947 await self.sock_connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700948 except OSError as exc:
949 if sock is not None:
950 sock.close()
951 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200952 except:
953 if sock is not None:
954 sock.close()
955 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700956 else:
957 break
958 else:
959 if len(exceptions) == 1:
960 raise exceptions[0]
961 else:
962 # If they all have the same str(), raise one.
963 model = str(exceptions[0])
964 if all(str(exc) == model for exc in exceptions):
965 raise exceptions[0]
966 # Raise a combined exception so the user can see all
967 # the various error messages.
968 raise OSError('Multiple exceptions: {}'.format(
969 ', '.join(str(exc) for exc in exceptions)))
970
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500971 else:
972 if sock is None:
973 raise ValueError(
974 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500975 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -0500976 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
977 # are SOCK_STREAM.
978 # We support passing AF_UNIX sockets even though we have
979 # a dedicated API for that: create_unix_connection.
980 # Disallowing AF_UNIX in this method, breaks backwards
981 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500982 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500983 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700984
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200985 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000986 sock, protocol_factory, ssl, server_hostname,
987 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +0200988 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200989 # Get the socket from the transport because SSL transport closes
990 # the old socket and creates a new SSL socket
991 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200992 logger.debug("%r connected to %s:%r: (%r, %r)",
993 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500994 return transport, protocol
995
Neil Aspinallf7686c12017-12-19 19:45:42 +0000996 async def _create_connection_transport(
997 self, sock, protocol_factory, ssl,
998 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200999 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001000
1001 sock.setblocking(False)
1002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001003 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001004 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001005 if ssl:
1006 sslcontext = None if isinstance(ssl, bool) else ssl
1007 transport = self._make_ssl_transport(
1008 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001009 server_side=server_side, server_hostname=server_hostname,
1010 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001011 else:
1012 transport = self._make_socket_transport(sock, protocol, waiter)
1013
Victor Stinner29ad0112015-01-15 00:04:21 +01001014 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001015 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001016 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001017 transport.close()
1018 raise
1019
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001020 return transport, protocol
1021
Andrew Svetlov7c684072018-01-27 21:22:47 +02001022 async def sendfile(self, transport, file, offset=0, count=None,
1023 *, fallback=True):
1024 """Send a file to transport.
1025
1026 Return the total number of bytes which were sent.
1027
1028 The method uses high-performance os.sendfile if available.
1029
1030 file must be a regular file object opened in binary mode.
1031
1032 offset tells from where to start reading the file. If specified,
1033 count is the total number of bytes to transmit as opposed to
1034 sending the file until EOF is reached. File position is updated on
1035 return or also in case of error in which case file.tell()
1036 can be used to figure out the number of bytes
1037 which were sent.
1038
1039 fallback set to True makes asyncio to manually read and send
1040 the file when the platform does not support the sendfile syscall
1041 (e.g. Windows or SSL socket on Unix).
1042
1043 Raise SendfileNotAvailableError if the system does not support
1044 sendfile syscall and fallback is False.
1045 """
1046 if transport.is_closing():
1047 raise RuntimeError("Transport is closing")
1048 mode = getattr(transport, '_sendfile_compatible',
1049 constants._SendfileMode.UNSUPPORTED)
1050 if mode is constants._SendfileMode.UNSUPPORTED:
1051 raise RuntimeError(
1052 f"sendfile is not supported for transport {transport!r}")
1053 if mode is constants._SendfileMode.TRY_NATIVE:
1054 try:
1055 return await self._sendfile_native(transport, file,
1056 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001057 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001058 if not fallback:
1059 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001060
1061 if not fallback:
1062 raise RuntimeError(
1063 f"fallback is disabled and native sendfile is not "
1064 f"supported for transport {transport!r}")
1065
Andrew Svetlov7c684072018-01-27 21:22:47 +02001066 return await self._sendfile_fallback(transport, file,
1067 offset, count)
1068
1069 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001070 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001071 "sendfile syscall is not supported")
1072
1073 async def _sendfile_fallback(self, transp, file, offset, count):
1074 if offset:
1075 file.seek(offset)
1076 blocksize = min(count, 16384) if count else 16384
1077 buf = bytearray(blocksize)
1078 total_sent = 0
1079 proto = _SendfileFallbackProtocol(transp)
1080 try:
1081 while True:
1082 if count:
1083 blocksize = min(count - total_sent, blocksize)
1084 if blocksize <= 0:
1085 return total_sent
1086 view = memoryview(buf)[:blocksize]
1087 read = file.readinto(view)
1088 if not read:
1089 return total_sent # EOF
1090 await proto.drain()
1091 transp.write(view)
1092 total_sent += read
1093 finally:
1094 if total_sent > 0 and hasattr(file, 'seek'):
1095 file.seek(offset + total_sent)
1096 await proto.restore()
1097
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001098 async def start_tls(self, transport, protocol, sslcontext, *,
1099 server_side=False,
1100 server_hostname=None,
1101 ssl_handshake_timeout=None):
1102 """Upgrade transport to TLS.
1103
1104 Return a new transport that *protocol* should start using
1105 immediately.
1106 """
1107 if ssl is None:
1108 raise RuntimeError('Python ssl module is not available')
1109
1110 if not isinstance(sslcontext, ssl.SSLContext):
1111 raise TypeError(
1112 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1113 f'got {sslcontext!r}')
1114
1115 if not getattr(transport, '_start_tls_compatible', False):
1116 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001117 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001118
1119 waiter = self.create_future()
1120 ssl_protocol = sslproto.SSLProtocol(
1121 self, protocol, sslcontext, waiter,
1122 server_side, server_hostname,
1123 ssl_handshake_timeout=ssl_handshake_timeout,
1124 call_connection_made=False)
1125
Yury Selivanovf2955872018-05-29 01:00:12 -04001126 # Pause early so that "ssl_protocol.data_received()" doesn't
1127 # have a chance to get called before "ssl_protocol.connection_made()".
1128 transport.pause_reading()
1129
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001130 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001131 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1132 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001133
Yury Selivanov96026432018-06-04 11:32:35 -04001134 try:
1135 await waiter
1136 except Exception:
1137 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001138 conmade_cb.cancel()
1139 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001140 raise
1141
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001142 return ssl_protocol._app_transport
1143
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001144 async def create_datagram_endpoint(self, protocol_factory,
1145 local_addr=None, remote_addr=None, *,
1146 family=0, proto=0, flags=0,
1147 reuse_address=None, reuse_port=None,
1148 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001149 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001150 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001151 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001152 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001153 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001154 if (local_addr or remote_addr or
1155 family or proto or flags or
1156 reuse_address or reuse_port or allow_broadcast):
1157 # show the problematic kwargs in exception msg
1158 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1159 family=family, proto=proto, flags=flags,
1160 reuse_address=reuse_address, reuse_port=reuse_port,
1161 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001162 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001163 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001164 f'socket modifier keyword arguments can not be used '
1165 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001166 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001167 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001168 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001169 if not (local_addr or remote_addr):
1170 if family == 0:
1171 raise ValueError('unexpected address family')
1172 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001173 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1174 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001175 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001176 raise TypeError('string is expected')
1177 addr_pairs_info = (((family, proto),
1178 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001179 else:
1180 # join address by (family, protocol)
1181 addr_infos = collections.OrderedDict()
1182 for idx, addr in ((0, local_addr), (1, remote_addr)):
1183 if addr is not None:
1184 assert isinstance(addr, tuple) and len(addr) == 2, (
1185 '2-tuple is expected')
1186
Yury Selivanov19a44f62017-12-14 20:53:26 -05001187 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001188 addr, family=family, type=socket.SOCK_DGRAM,
1189 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001190 if not infos:
1191 raise OSError('getaddrinfo() returned empty list')
1192
1193 for fam, _, pro, _, address in infos:
1194 key = (fam, pro)
1195 if key not in addr_infos:
1196 addr_infos[key] = [None, None]
1197 addr_infos[key][idx] = address
1198
1199 # each addr has to have info for each (family, proto) pair
1200 addr_pairs_info = [
1201 (key, addr_pair) for key, addr_pair in addr_infos.items()
1202 if not ((local_addr and addr_pair[0] is None) or
1203 (remote_addr and addr_pair[1] is None))]
1204
1205 if not addr_pairs_info:
1206 raise ValueError('can not get address information')
1207
1208 exceptions = []
1209
1210 if reuse_address is None:
1211 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1212
1213 for ((family, proto),
1214 (local_address, remote_address)) in addr_pairs_info:
1215 sock = None
1216 r_addr = None
1217 try:
1218 sock = socket.socket(
1219 family=family, type=socket.SOCK_DGRAM, proto=proto)
1220 if reuse_address:
1221 sock.setsockopt(
1222 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1223 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001224 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001225 if allow_broadcast:
1226 sock.setsockopt(
1227 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1228 sock.setblocking(False)
1229
1230 if local_addr:
1231 sock.bind(local_address)
1232 if remote_addr:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001233 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001234 r_addr = remote_address
1235 except OSError as exc:
1236 if sock is not None:
1237 sock.close()
1238 exceptions.append(exc)
1239 except:
1240 if sock is not None:
1241 sock.close()
1242 raise
1243 else:
1244 break
1245 else:
1246 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001247
1248 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001249 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001250 transport = self._make_datagram_transport(
1251 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001252 if self._debug:
1253 if local_addr:
1254 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1255 "created: (%r, %r)",
1256 local_addr, remote_addr, transport, protocol)
1257 else:
1258 logger.debug("Datagram endpoint remote_addr=%r created: "
1259 "(%r, %r)",
1260 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001261
1262 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001263 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001264 except:
1265 transport.close()
1266 raise
1267
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001268 return transport, protocol
1269
Yury Selivanov19a44f62017-12-14 20:53:26 -05001270 async def _ensure_resolved(self, address, *,
1271 family=0, type=socket.SOCK_STREAM,
1272 proto=0, flags=0, loop):
1273 host, port = address[:2]
1274 info = _ipaddr_info(host, port, family, type, proto)
1275 if info is not None:
1276 # "host" is already a resolved IP.
1277 return [info]
1278 else:
1279 return await loop.getaddrinfo(host, port, family=family, type=type,
1280 proto=proto, flags=flags)
1281
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001282 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001283 infos = await self._ensure_resolved((host, port), family=family,
1284 type=socket.SOCK_STREAM,
1285 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001286 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001287 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001288 return infos
1289
Neil Aspinallf7686c12017-12-19 19:45:42 +00001290 async def create_server(
1291 self, protocol_factory, host=None, port=None,
1292 *,
1293 family=socket.AF_UNSPEC,
1294 flags=socket.AI_PASSIVE,
1295 sock=None,
1296 backlog=100,
1297 ssl=None,
1298 reuse_address=None,
1299 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001300 ssl_handshake_timeout=None,
1301 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001302 """Create a TCP server.
1303
Yury Selivanov6370f342017-12-10 18:36:12 -05001304 The host parameter can be a string, in that case the TCP server is
1305 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001306
1307 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001308 the TCP server is bound to all hosts of the sequence. If a host
1309 appears multiple times (possibly indirectly e.g. when hostnames
1310 resolve to the same IP address), the server is only bound once to that
1311 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001312
Victor Stinneracdb7822014-07-14 18:33:40 +02001313 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001314
1315 This method is a coroutine.
1316 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001317 if isinstance(ssl, bool):
1318 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001319
1320 if ssl_handshake_timeout is not None and ssl is None:
1321 raise ValueError(
1322 'ssl_handshake_timeout is only meaningful with ssl')
1323
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001324 if host is not None or port is not None:
1325 if sock is not None:
1326 raise ValueError(
1327 'host/port and sock can not be specified at the same time')
1328
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001329 if reuse_address is None:
1330 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1331 sockets = []
1332 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001333 hosts = [None]
1334 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001335 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001336 hosts = [host]
1337 else:
1338 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001339
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001340 fs = [self._create_server_getaddrinfo(host, port, family=family,
1341 flags=flags)
1342 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001343 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001344 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001345
1346 completed = False
1347 try:
1348 for res in infos:
1349 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001350 try:
1351 sock = socket.socket(af, socktype, proto)
1352 except socket.error:
1353 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001354 if self._debug:
1355 logger.warning('create_server() failed to create '
1356 'socket.socket(%r, %r, %r)',
1357 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001358 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001359 sockets.append(sock)
1360 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001361 sock.setsockopt(
1362 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1363 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001364 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001365 # Disable IPv4/IPv6 dual stack support (enabled by
1366 # default on Linux) which makes a single socket
1367 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001368 if (_HAS_IPv6 and
1369 af == socket.AF_INET6 and
1370 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001371 sock.setsockopt(socket.IPPROTO_IPV6,
1372 socket.IPV6_V6ONLY,
1373 True)
1374 try:
1375 sock.bind(sa)
1376 except OSError as err:
1377 raise OSError(err.errno, 'error while attempting '
1378 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001379 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001380 completed = True
1381 finally:
1382 if not completed:
1383 for sock in sockets:
1384 sock.close()
1385 else:
1386 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001387 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001388 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001389 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001390 sockets = [sock]
1391
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001392 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001393 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001394
1395 server = Server(self, sockets, protocol_factory,
1396 ssl, backlog, ssl_handshake_timeout)
1397 if start_serving:
1398 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001399 # Skip one loop iteration so that all 'loop.add_reader'
1400 # go through.
1401 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001402
Victor Stinnere912e652014-07-12 03:11:53 +02001403 if self._debug:
1404 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001405 return server
1406
Neil Aspinallf7686c12017-12-19 19:45:42 +00001407 async def connect_accepted_socket(
1408 self, protocol_factory, sock,
1409 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001410 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001411 """Handle an accepted connection.
1412
1413 This is used by servers that accept connections outside of
1414 asyncio but that use asyncio to handle connections.
1415
1416 This method is a coroutine. When completed, the coroutine
1417 returns a (transport, protocol) pair.
1418 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001419 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001420 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001421
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001422 if ssl_handshake_timeout is not None and not ssl:
1423 raise ValueError(
1424 'ssl_handshake_timeout is only meaningful with ssl')
1425
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001426 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001427 sock, protocol_factory, ssl, '', server_side=True,
1428 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001429 if self._debug:
1430 # Get the socket from the transport because SSL transport closes
1431 # the old socket and creates a new SSL socket
1432 sock = transport.get_extra_info('socket')
1433 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1434 return transport, protocol
1435
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001436 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001437 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001438 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001439 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001440
1441 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001442 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001443 except:
1444 transport.close()
1445 raise
1446
Victor Stinneracdb7822014-07-14 18:33:40 +02001447 if self._debug:
1448 logger.debug('Read pipe %r connected: (%r, %r)',
1449 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001450 return transport, protocol
1451
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001452 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001453 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001454 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001455 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001456
1457 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001458 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001459 except:
1460 transport.close()
1461 raise
1462
Victor Stinneracdb7822014-07-14 18:33:40 +02001463 if self._debug:
1464 logger.debug('Write pipe %r connected: (%r, %r)',
1465 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001466 return transport, protocol
1467
Victor Stinneracdb7822014-07-14 18:33:40 +02001468 def _log_subprocess(self, msg, stdin, stdout, stderr):
1469 info = [msg]
1470 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001471 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001472 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001473 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001474 else:
1475 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001476 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001477 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001478 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001479 logger.debug(' '.join(info))
1480
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001481 async def subprocess_shell(self, protocol_factory, cmd, *,
1482 stdin=subprocess.PIPE,
1483 stdout=subprocess.PIPE,
1484 stderr=subprocess.PIPE,
1485 universal_newlines=False,
1486 shell=True, bufsize=0,
1487 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001488 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001489 raise ValueError("cmd must be a string")
1490 if universal_newlines:
1491 raise ValueError("universal_newlines must be False")
1492 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001493 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001494 if bufsize != 0:
1495 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001496 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001497 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001498 if self._debug:
1499 # don't log parameters: they may contain sensitive information
1500 # (password) and may be too long
1501 debug_log = 'run shell command %r' % cmd
1502 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001503 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001504 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001505 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001506 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001507 return transport, protocol
1508
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001509 async def subprocess_exec(self, protocol_factory, program, *args,
1510 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1511 stderr=subprocess.PIPE, universal_newlines=False,
1512 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001513 if universal_newlines:
1514 raise ValueError("universal_newlines must be False")
1515 if shell:
1516 raise ValueError("shell must be False")
1517 if bufsize != 0:
1518 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001519 popen_args = (program,) + args
1520 for arg in popen_args:
1521 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001522 raise TypeError(
1523 f"program arguments must be a bytes or text string, "
1524 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001525 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001526 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001527 if self._debug:
1528 # don't log parameters: they may contain sensitive information
1529 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001530 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001531 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001532 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001533 protocol, popen_args, False, stdin, stdout, stderr,
1534 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001535 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001536 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001537 return transport, protocol
1538
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001539 def get_exception_handler(self):
1540 """Return an exception handler, or None if the default one is in use.
1541 """
1542 return self._exception_handler
1543
Yury Selivanov569efa22014-02-18 18:02:19 -05001544 def set_exception_handler(self, handler):
1545 """Set handler as the new event loop exception handler.
1546
1547 If handler is None, the default exception handler will
1548 be set.
1549
1550 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001551 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001552 will be a reference to the active event loop, 'context'
1553 will be a dict object (see `call_exception_handler()`
1554 documentation for details about context).
1555 """
1556 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001557 raise TypeError(f'A callable object or None is expected, '
1558 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001559 self._exception_handler = handler
1560
1561 def default_exception_handler(self, context):
1562 """Default exception handler.
1563
1564 This is called when an exception occurs and no exception
1565 handler is set, and can be called by a custom exception
1566 handler that wants to defer to the default behavior.
1567
Antoine Pitrou921e9432017-11-07 17:23:29 +01001568 This default handler logs the error message and other
1569 context-dependent information. In debug mode, a truncated
1570 stack trace is also appended showing where the given object
1571 (e.g. a handle or future or task) was created, if any.
1572
Victor Stinneracdb7822014-07-14 18:33:40 +02001573 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001574 `call_exception_handler()`.
1575 """
1576 message = context.get('message')
1577 if not message:
1578 message = 'Unhandled exception in event loop'
1579
1580 exception = context.get('exception')
1581 if exception is not None:
1582 exc_info = (type(exception), exception, exception.__traceback__)
1583 else:
1584 exc_info = False
1585
Yury Selivanov6370f342017-12-10 18:36:12 -05001586 if ('source_traceback' not in context and
1587 self._current_handle is not None and
1588 self._current_handle._source_traceback):
1589 context['handle_traceback'] = \
1590 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001591
Yury Selivanov569efa22014-02-18 18:02:19 -05001592 log_lines = [message]
1593 for key in sorted(context):
1594 if key in {'message', 'exception'}:
1595 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001596 value = context[key]
1597 if key == 'source_traceback':
1598 tb = ''.join(traceback.format_list(value))
1599 value = 'Object created at (most recent call last):\n'
1600 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001601 elif key == 'handle_traceback':
1602 tb = ''.join(traceback.format_list(value))
1603 value = 'Handle created at (most recent call last):\n'
1604 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001605 else:
1606 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001607 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001608
1609 logger.error('\n'.join(log_lines), exc_info=exc_info)
1610
1611 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001612 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001613
Victor Stinneracdb7822014-07-14 18:33:40 +02001614 The context argument is a dict containing the following keys:
1615
Yury Selivanov569efa22014-02-18 18:02:19 -05001616 - 'message': Error message;
1617 - 'exception' (optional): Exception object;
1618 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001619 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001620 - 'handle' (optional): Handle instance;
1621 - 'protocol' (optional): Protocol instance;
1622 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001623 - 'socket' (optional): Socket instance;
1624 - 'asyncgen' (optional): Asynchronous generator that caused
1625 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001626
Victor Stinneracdb7822014-07-14 18:33:40 +02001627 New keys maybe introduced in the future.
1628
1629 Note: do not overload this method in an event loop subclass.
1630 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001631 `set_exception_handler()` method.
1632 """
1633 if self._exception_handler is None:
1634 try:
1635 self.default_exception_handler(context)
1636 except Exception:
1637 # Second protection layer for unexpected errors
1638 # in the default implementation, as well as for subclassed
1639 # event loops with overloaded "default_exception_handler".
1640 logger.error('Exception in default exception handler',
1641 exc_info=True)
1642 else:
1643 try:
1644 self._exception_handler(self, context)
1645 except Exception as exc:
1646 # Exception in the user set custom exception handler.
1647 try:
1648 # Let's try default handler.
1649 self.default_exception_handler({
1650 'message': 'Unhandled error in exception handler',
1651 'exception': exc,
1652 'context': context,
1653 })
1654 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001655 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001656 # overloaded.
1657 logger.error('Exception in default exception handler '
1658 'while handling an unexpected error '
1659 'in custom exception handler',
1660 exc_info=True)
1661
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001662 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001663 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001664 assert isinstance(handle, events.Handle), 'A Handle is required here'
1665 if handle._cancelled:
1666 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001667 assert not isinstance(handle, events.TimerHandle)
1668 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001669
1670 def _add_callback_signalsafe(self, handle):
1671 """Like _add_callback() but called from a signal handler."""
1672 self._add_callback(handle)
1673 self._write_to_self()
1674
Yury Selivanov592ada92014-09-25 12:07:56 -04001675 def _timer_handle_cancelled(self, handle):
1676 """Notification that a TimerHandle has been cancelled."""
1677 if handle._scheduled:
1678 self._timer_cancelled_count += 1
1679
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001680 def _run_once(self):
1681 """Run one full iteration of the event loop.
1682
1683 This calls all currently ready callbacks, polls for I/O,
1684 schedules the resulting callbacks, and finally schedules
1685 'call_later' callbacks.
1686 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001687
Yury Selivanov592ada92014-09-25 12:07:56 -04001688 sched_count = len(self._scheduled)
1689 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1690 self._timer_cancelled_count / sched_count >
1691 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001692 # Remove delayed calls that were cancelled if their number
1693 # is too high
1694 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001695 for handle in self._scheduled:
1696 if handle._cancelled:
1697 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001698 else:
1699 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001700
Victor Stinner68da8fc2014-09-30 18:08:36 +02001701 heapq.heapify(new_scheduled)
1702 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001703 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001704 else:
1705 # Remove delayed calls that were cancelled from head of queue.
1706 while self._scheduled and self._scheduled[0]._cancelled:
1707 self._timer_cancelled_count -= 1
1708 handle = heapq.heappop(self._scheduled)
1709 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001710
1711 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001712 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001713 timeout = 0
1714 elif self._scheduled:
1715 # Compute the desired timeout.
1716 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001717 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001718
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001719 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001720 self._process_events(event_list)
1721
1722 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001723 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001724 while self._scheduled:
1725 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001726 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001727 break
1728 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001729 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001730 self._ready.append(handle)
1731
1732 # This is the only place where callbacks are actually *called*.
1733 # All other places just add them to ready.
1734 # Note: We run all currently scheduled callbacks, but not any
1735 # callbacks scheduled by callbacks run this time around --
1736 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001737 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001738 ntodo = len(self._ready)
1739 for i in range(ntodo):
1740 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001741 if handle._cancelled:
1742 continue
1743 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001744 try:
1745 self._current_handle = handle
1746 t0 = self.time()
1747 handle._run()
1748 dt = self.time() - t0
1749 if dt >= self.slow_callback_duration:
1750 logger.warning('Executing %s took %.3f seconds',
1751 _format_handle(handle), dt)
1752 finally:
1753 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001754 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001755 handle._run()
1756 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001757
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001758 def _set_coroutine_origin_tracking(self, enabled):
1759 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001760 return
1761
Yury Selivanove8944cb2015-05-12 11:43:04 -04001762 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001763 self._coroutine_origin_tracking_saved_depth = (
1764 sys.get_coroutine_origin_tracking_depth())
1765 sys.set_coroutine_origin_tracking_depth(
1766 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001767 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001768 sys.set_coroutine_origin_tracking_depth(
1769 self._coroutine_origin_tracking_saved_depth)
1770
1771 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001772
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001773 def get_debug(self):
1774 return self._debug
1775
1776 def set_debug(self, enabled):
1777 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001778
Yury Selivanove8944cb2015-05-12 11:43:04 -04001779 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001780 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)