blob: 780a06192dcf8c743f72eb577df3ad1d3cadd657 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070030import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Yury Selivanovf111b3d2017-12-30 00:35:36 -050032try:
33 import ssl
34except ImportError: # pragma: no cover
35 ssl = None
36
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080037from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020038from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070040from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020042from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050043from . import sslproto
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020045from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070046from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48
Yury Selivanov6370f342017-12-10 18:36:12 -050049__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
51
Yury Selivanov592ada92014-09-25 12:07:56 -040052# Minimum number of _scheduled timer handles before cleanup of
53# cancelled handles is performed.
54_MIN_SCHEDULED_TIMER_HANDLES = 100
55
56# Minimum fraction of _scheduled timer handles that are cancelled
57# before cleanup of cancelled handles is performed.
58_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
Victor Stinnerc94a93a2016-04-01 21:43:39 +020060# Exceptions which must not call the exception handler in fatal error
61# methods (_fatal_error())
62_FATAL_ERROR_IGNORE = (BrokenPipeError,
63 ConnectionResetError, ConnectionAbortedError)
64
Andrew Svetlov0dd71802018-09-12 14:03:54 -070065if ssl is not None:
66 _FATAL_ERROR_IGNORE = _FATAL_ERROR_IGNORE + (ssl.SSLCertVerificationError,)
67
Yury Selivanovd904c232018-06-28 21:59:32 -040068_HAS_IPv6 = hasattr(socket, 'AF_INET6')
69
MartinAltmayer944451c2018-07-31 15:06:12 +010070# Maximum timeout passed to select to avoid OS limitations
71MAXIMUM_SELECT_TIMEOUT = 24 * 3600
72
Victor Stinnerc94a93a2016-04-01 21:43:39 +020073
Victor Stinner0e6f52a2014-06-20 17:34:15 +020074def _format_handle(handle):
75 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040076 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020077 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
81
82
Victor Stinneracdb7822014-07-14 18:33:40 +020083def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
90
91
Yury Selivanov5587d7c2016-09-15 15:45:07 -040092def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
101
102
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500103def _ipaddr_info(host, port, family, type, proto):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
108
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500111 return None
112
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500113 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500114 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500115 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500116 proto = socket.IPPROTO_UDP
117 else:
118 return None
119
Yury Selivanova7146162016-06-02 16:51:07 -0400120 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400121 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400132
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400133 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500134 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400135 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500136 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400137 else:
138 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500145 return None
146
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400147 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 socket.inet_pton(af, host)
150 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400151 if _HAS_IPv6 and af == socket.AF_INET6:
152 return af, type, proto, '', (host, port, 0, 0)
153 else:
154 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400155 except OSError:
156 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500157
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400158 # "host" is not an IP address.
159 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500160
161
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100162def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500163 if not fut.cancelled():
164 exc = fut.exception()
165 if isinstance(exc, BaseException) and not isinstance(exc, Exception):
166 # Issue #22429: run_forever() already finished, no need to
167 # stop it.
168 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500169 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100170
171
Andrew Svetlov7c684072018-01-27 21:22:47 +0200172class _SendfileFallbackProtocol(protocols.Protocol):
173 def __init__(self, transp):
174 if not isinstance(transp, transports._FlowControlMixin):
175 raise TypeError("transport should be _FlowControlMixin instance")
176 self._transport = transp
177 self._proto = transp.get_protocol()
178 self._should_resume_reading = transp.is_reading()
179 self._should_resume_writing = transp._protocol_paused
180 transp.pause_reading()
181 transp.set_protocol(self)
182 if self._should_resume_writing:
183 self._write_ready_fut = self._transport._loop.create_future()
184 else:
185 self._write_ready_fut = None
186
187 async def drain(self):
188 if self._transport.is_closing():
189 raise ConnectionError("Connection closed by peer")
190 fut = self._write_ready_fut
191 if fut is None:
192 return
193 await fut
194
195 def connection_made(self, transport):
196 raise RuntimeError("Invalid state: "
197 "connection should have been established already.")
198
199 def connection_lost(self, exc):
200 if self._write_ready_fut is not None:
201 # Never happens if peer disconnects after sending the whole content
202 # Thus disconnection is always an exception from user perspective
203 if exc is None:
204 self._write_ready_fut.set_exception(
205 ConnectionError("Connection is closed by peer"))
206 else:
207 self._write_ready_fut.set_exception(exc)
208 self._proto.connection_lost(exc)
209
210 def pause_writing(self):
211 if self._write_ready_fut is not None:
212 return
213 self._write_ready_fut = self._transport._loop.create_future()
214
215 def resume_writing(self):
216 if self._write_ready_fut is None:
217 return
218 self._write_ready_fut.set_result(False)
219 self._write_ready_fut = None
220
221 def data_received(self, data):
222 raise RuntimeError("Invalid state: reading should be paused")
223
224 def eof_received(self):
225 raise RuntimeError("Invalid state: reading should be paused")
226
227 async def restore(self):
228 self._transport.set_protocol(self._proto)
229 if self._should_resume_reading:
230 self._transport.resume_reading()
231 if self._write_ready_fut is not None:
232 # Cancel the future.
233 # Basically it has no effect because protocol is switched back,
234 # no code should wait for it anymore.
235 self._write_ready_fut.cancel()
236 if self._should_resume_writing:
237 self._proto.resume_writing()
238
239
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240class Server(events.AbstractServer):
241
Yury Selivanovc9070d02018-01-25 18:08:09 -0500242 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
243 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200244 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500245 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200246 self._active_count = 0
247 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500248 self._protocol_factory = protocol_factory
249 self._backlog = backlog
250 self._ssl_context = ssl_context
251 self._ssl_handshake_timeout = ssl_handshake_timeout
252 self._serving = False
253 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
Victor Stinnere912e652014-07-12 03:11:53 +0200255 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500256 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200257
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200258 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500259 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200260 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200262 def _detach(self):
263 assert self._active_count > 0
264 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500265 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 self._wakeup()
267
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200269 waiters = self._waiters
270 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 for waiter in waiters:
272 if not waiter.done():
273 waiter.set_result(waiter)
274
Yury Selivanovc9070d02018-01-25 18:08:09 -0500275 def _start_serving(self):
276 if self._serving:
277 return
278 self._serving = True
279 for sock in self._sockets:
280 sock.listen(self._backlog)
281 self._loop._start_serving(
282 self._protocol_factory, sock, self._ssl_context,
283 self, self._backlog, self._ssl_handshake_timeout)
284
285 def get_loop(self):
286 return self._loop
287
288 def is_serving(self):
289 return self._serving
290
291 @property
292 def sockets(self):
293 if self._sockets is None:
294 return []
295 return list(self._sockets)
296
297 def close(self):
298 sockets = self._sockets
299 if sockets is None:
300 return
301 self._sockets = None
302
303 for sock in sockets:
304 self._loop._stop_serving(sock)
305
306 self._serving = False
307
308 if (self._serving_forever_fut is not None and
309 not self._serving_forever_fut.done()):
310 self._serving_forever_fut.cancel()
311 self._serving_forever_fut = None
312
313 if self._active_count == 0:
314 self._wakeup()
315
316 async def start_serving(self):
317 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400318 # Skip one loop iteration so that all 'loop.add_reader'
319 # go through.
320 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500321
322 async def serve_forever(self):
323 if self._serving_forever_fut is not None:
324 raise RuntimeError(
325 f'server {self!r} is already being awaited on serve_forever()')
326 if self._sockets is None:
327 raise RuntimeError(f'server {self!r} is closed')
328
329 self._start_serving()
330 self._serving_forever_fut = self._loop.create_future()
331
332 try:
333 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700334 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500335 try:
336 self.close()
337 await self.wait_closed()
338 finally:
339 raise
340 finally:
341 self._serving_forever_fut = None
342
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200343 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500344 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400346 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200347 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200348 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350
351class BaseEventLoop(events.AbstractEventLoop):
352
353 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400354 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200355 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800356 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 self._ready = collections.deque()
358 self._scheduled = []
359 self._default_executor = None
360 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100361 # Identifier of the thread running the event loop, or None if the
362 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100363 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100364 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500365 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800366 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200367 # In debug mode, if the execution of a callback or a step of a task
368 # exceed this duration in seconds, the slow callback/task is logged.
369 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100370 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400371 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800372 self._coroutine_origin_tracking_enabled = False
373 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500375 # A weak set of all asynchronous generators that are
376 # being iterated by the loop.
377 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700378 # Set to True when `loop.shutdown_asyncgens` is called.
379 self._asyncgens_shutdown_called = False
380
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200381 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500382 return (
383 f'<{self.__class__.__name__} running={self.is_running()} '
384 f'closed={self.is_closed()} debug={self.get_debug()}>'
385 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200386
Yury Selivanov7661db62016-05-16 15:38:39 -0400387 def create_future(self):
388 """Create a Future object attached to the loop."""
389 return futures.Future(loop=self)
390
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300391 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200392 """Schedule a coroutine object.
393
Victor Stinneracdb7822014-07-14 18:33:40 +0200394 Return a task object.
395 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100396 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400397 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300398 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400399 if task._source_traceback:
400 del task._source_traceback[-1]
401 else:
402 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300403 tasks._set_task_name(task, name)
404
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200405 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200406
Yury Selivanov740169c2015-05-11 14:23:38 -0400407 def set_task_factory(self, factory):
408 """Set a task factory that will be used by loop.create_task().
409
410 If factory is None the default task factory will be set.
411
412 If factory is a callable, it should have a signature matching
413 '(loop, coro)', where 'loop' will be a reference to the active
414 event loop, 'coro' will be a coroutine object. The callable
415 must return a Future.
416 """
417 if factory is not None and not callable(factory):
418 raise TypeError('task factory must be a callable or None')
419 self._task_factory = factory
420
421 def get_task_factory(self):
422 """Return a task factory, or None if the default one is in use."""
423 return self._task_factory
424
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 def _make_socket_transport(self, sock, protocol, waiter=None, *,
426 extra=None, server=None):
427 """Create socket transport."""
428 raise NotImplementedError
429
Neil Aspinallf7686c12017-12-19 19:45:42 +0000430 def _make_ssl_transport(
431 self, rawsock, protocol, sslcontext, waiter=None,
432 *, server_side=False, server_hostname=None,
433 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500434 ssl_handshake_timeout=None,
435 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 """Create SSL transport."""
437 raise NotImplementedError
438
439 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200440 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 """Create datagram transport."""
442 raise NotImplementedError
443
444 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
445 extra=None):
446 """Create read pipe transport."""
447 raise NotImplementedError
448
449 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
450 extra=None):
451 """Create write pipe transport."""
452 raise NotImplementedError
453
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200454 async def _make_subprocess_transport(self, protocol, args, shell,
455 stdin, stdout, stderr, bufsize,
456 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 """Create subprocess transport."""
458 raise NotImplementedError
459
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200461 """Write a byte to self-pipe, to wake up the event loop.
462
463 This may be called from a different thread.
464
465 The subclass is responsible for implementing the self-pipe.
466 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 raise NotImplementedError
468
469 def _process_events(self, event_list):
470 """Process selector events."""
471 raise NotImplementedError
472
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200473 def _check_closed(self):
474 if self._closed:
475 raise RuntimeError('Event loop is closed')
476
Yury Selivanoveb636452016-09-08 22:01:51 -0700477 def _asyncgen_finalizer_hook(self, agen):
478 self._asyncgens.discard(agen)
479 if not self.is_closed():
480 self.create_task(agen.aclose())
Yury Selivanoved054062016-10-21 17:13:40 -0400481 # Wake up the loop if the finalizer was called from
482 # a different thread.
483 self._write_to_self()
Yury Selivanoveb636452016-09-08 22:01:51 -0700484
485 def _asyncgen_firstiter_hook(self, agen):
486 if self._asyncgens_shutdown_called:
487 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500488 f"asynchronous generator {agen!r} was scheduled after "
489 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700490 ResourceWarning, source=self)
491
492 self._asyncgens.add(agen)
493
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200494 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700495 """Shutdown all active asynchronous generators."""
496 self._asyncgens_shutdown_called = True
497
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500498 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400499 # If Python version is <3.6 or we don't have any asynchronous
500 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700501 return
502
503 closing_agens = list(self._asyncgens)
504 self._asyncgens.clear()
505
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200506 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700507 *[ag.aclose() for ag in closing_agens],
508 return_exceptions=True,
509 loop=self)
510
Yury Selivanoveb636452016-09-08 22:01:51 -0700511 for result, agen in zip(results, closing_agens):
512 if isinstance(result, Exception):
513 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500514 'message': f'an error occurred during closing of '
515 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700516 'exception': result,
517 'asyncgen': agen
518 })
519
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 def run_forever(self):
521 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200522 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100523 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400524 raise RuntimeError('This event loop is already running')
525 if events._get_running_loop() is not None:
526 raise RuntimeError(
527 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800528 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100529 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500530
531 old_agen_hooks = sys.get_asyncgen_hooks()
532 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
533 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400535 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800537 self._run_once()
538 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 break
540 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800541 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100542 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400543 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800544 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500545 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
547 def run_until_complete(self, future):
548 """Run until the Future is done.
549
550 If the argument is a coroutine, it is wrapped in a Task.
551
Victor Stinneracdb7822014-07-14 18:33:40 +0200552 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 with the same coroutine twice -- it would wrap it in two
554 different Tasks and that can't be good.
555
556 Return the Future's result, or raise its exception.
557 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200558 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200559
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700560 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400561 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200562 if new_task:
563 # An exception is raised if the future didn't complete, so there
564 # is no need to log the "destroy pending task" message
565 future._log_destroy_pending = False
566
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100567 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200568 try:
569 self.run_forever()
570 except:
571 if new_task and future.done() and not future.cancelled():
572 # The coroutine raised a BaseException. Consume the exception
573 # to not log a warning, the caller doesn't have access to the
574 # local task.
575 future.exception()
576 raise
jimmylai21b3e042017-05-22 22:32:46 -0700577 finally:
578 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 if not future.done():
580 raise RuntimeError('Event loop stopped before Future completed.')
581
582 return future.result()
583
584 def stop(self):
585 """Stop running the event loop.
586
Guido van Rossum41f69f42015-11-19 13:28:47 -0800587 Every callback already scheduled will still run. This simply informs
588 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800590 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200592 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700593 """Close the event loop.
594
595 This clears the queues and shuts down the executor,
596 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200597
598 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700599 """
Victor Stinner956de692014-12-26 21:07:52 +0100600 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200601 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200602 if self._closed:
603 return
Victor Stinnere912e652014-07-12 03:11:53 +0200604 if self._debug:
605 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400606 self._closed = True
607 self._ready.clear()
608 self._scheduled.clear()
609 executor = self._default_executor
610 if executor is not None:
611 self._default_executor = None
612 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200613
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200614 def is_closed(self):
615 """Returns True if the event loop was closed."""
616 return self._closed
617
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900618 def __del__(self):
619 if not self.is_closed():
Yury Selivanov6370f342017-12-10 18:36:12 -0500620 warnings.warn(f"unclosed event loop {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900621 source=self)
622 if not self.is_running():
623 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100624
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200626 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100627 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628
629 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200630 """Return the time according to the event loop's clock.
631
632 This is a float expressed in seconds since an epoch, but the
633 epoch, precision, accuracy and drift are unspecified and may
634 differ per event loop.
635 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 return time.monotonic()
637
Yury Selivanovf23746a2018-01-22 19:11:18 -0500638 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 """Arrange for a callback to be called at a given time.
640
641 Return a Handle: an opaque object with a cancel() method that
642 can be used to cancel the call.
643
644 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200645 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646
647 Each callback will be called exactly once. If two callbacks
648 are scheduled for exactly the same time, it undefined which
649 will be called first.
650
651 Any positional arguments after the callback will be passed to
652 the callback when it is called.
653 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500654 timer = self.call_at(self.time() + delay, callback, *args,
655 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200656 if timer._source_traceback:
657 del timer._source_traceback[-1]
658 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Yury Selivanovf23746a2018-01-22 19:11:18 -0500660 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200661 """Like call_later(), but uses an absolute time.
662
663 Absolute time corresponds to the event loop's time() method.
664 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100665 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100666 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100667 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700668 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500669 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200670 if timer._source_traceback:
671 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400673 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 return timer
675
Yury Selivanovf23746a2018-01-22 19:11:18 -0500676 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 """Arrange for a callback to be called as soon as possible.
678
Victor Stinneracdb7822014-07-14 18:33:40 +0200679 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 order in which they are registered. Each callback will be
681 called exactly once.
682
683 Any positional arguments after the callback will be passed to
684 the callback when it is called.
685 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700686 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100687 if self._debug:
688 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700689 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500690 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200691 if handle._source_traceback:
692 del handle._source_traceback[-1]
693 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100694
Yury Selivanov491a9122016-11-03 15:09:24 -0700695 def _check_callback(self, callback, method):
696 if (coroutines.iscoroutine(callback) or
697 coroutines.iscoroutinefunction(callback)):
698 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500699 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700700 if not callable(callback):
701 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500702 f'a callable object was expected by {method}(), '
703 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700704
Yury Selivanovf23746a2018-01-22 19:11:18 -0500705 def _call_soon(self, callback, args, context):
706 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200707 if handle._source_traceback:
708 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 self._ready.append(handle)
710 return handle
711
Victor Stinner956de692014-12-26 21:07:52 +0100712 def _check_thread(self):
713 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100714
Victor Stinneracdb7822014-07-14 18:33:40 +0200715 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100716 likely behave incorrectly when the assumption is violated.
717
Victor Stinneracdb7822014-07-14 18:33:40 +0200718 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100719 responsible for checking this condition for performance reasons.
720 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100721 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200722 return
Victor Stinner956de692014-12-26 21:07:52 +0100723 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100724 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100725 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200726 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100727 "than the current one")
728
Yury Selivanovf23746a2018-01-22 19:11:18 -0500729 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200730 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700731 self._check_closed()
732 if self._debug:
733 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500734 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200735 if handle._source_traceback:
736 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 self._write_to_self()
738 return handle
739
Yury Selivanovbec23722018-01-28 14:09:40 -0500740 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100741 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700742 if self._debug:
743 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700744 if executor is None:
745 executor = self._default_executor
746 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400747 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500749 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500750 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751
752 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100753 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
754 warnings.warn(
755 'Using the default executor that is not an instance of '
756 'ThreadPoolExecutor is deprecated and will be prohibited '
757 'in Python 3.9',
758 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700759 self._default_executor = executor
760
Victor Stinnere912e652014-07-12 03:11:53 +0200761 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500762 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200763 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500764 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200765 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500766 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200767 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500768 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200769 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500770 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200771 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200772 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200773
774 t0 = self.time()
775 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
776 dt = self.time() - t0
777
Yury Selivanov6370f342017-12-10 18:36:12 -0500778 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200779 if dt >= self.slow_callback_duration:
780 logger.info(msg)
781 else:
782 logger.debug(msg)
783 return addrinfo
784
Yury Selivanov19a44f62017-12-14 20:53:26 -0500785 async def getaddrinfo(self, host, port, *,
786 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400787 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500788 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200789 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500790 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700791
Yury Selivanov19a44f62017-12-14 20:53:26 -0500792 return await self.run_in_executor(
793 None, getaddr_func, host, port, family, type, proto, flags)
794
795 async def getnameinfo(self, sockaddr, flags=0):
796 return await self.run_in_executor(
797 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200799 async def sock_sendfile(self, sock, file, offset=0, count=None,
800 *, fallback=True):
801 if self._debug and sock.gettimeout() != 0:
802 raise ValueError("the socket must be non-blocking")
803 self._check_sendfile_params(sock, file, offset, count)
804 try:
805 return await self._sock_sendfile_native(sock, file,
806 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700807 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200808 if not fallback:
809 raise
810 return await self._sock_sendfile_fallback(sock, file,
811 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200812
813 async def _sock_sendfile_native(self, sock, file, offset, count):
814 # NB: sendfile syscall is not supported for SSL sockets and
815 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700816 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200817 f"syscall sendfile is not available for socket {sock!r} "
818 "and file {file!r} combination")
819
820 async def _sock_sendfile_fallback(self, sock, file, offset, count):
821 if offset:
822 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400823 blocksize = (
824 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
825 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
826 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200827 buf = bytearray(blocksize)
828 total_sent = 0
829 try:
830 while True:
831 if count:
832 blocksize = min(count - total_sent, blocksize)
833 if blocksize <= 0:
834 break
835 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400836 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200837 if not read:
838 break # EOF
839 await self.sock_sendall(sock, view)
840 total_sent += read
841 return total_sent
842 finally:
843 if total_sent > 0 and hasattr(file, 'seek'):
844 file.seek(offset + total_sent)
845
846 def _check_sendfile_params(self, sock, file, offset, count):
847 if 'b' not in getattr(file, 'mode', 'b'):
848 raise ValueError("file should be opened in binary mode")
849 if not sock.type == socket.SOCK_STREAM:
850 raise ValueError("only SOCK_STREAM type sockets are supported")
851 if count is not None:
852 if not isinstance(count, int):
853 raise TypeError(
854 "count must be a positive integer (got {!r})".format(count))
855 if count <= 0:
856 raise ValueError(
857 "count must be a positive integer (got {!r})".format(count))
858 if not isinstance(offset, int):
859 raise TypeError(
860 "offset must be a non-negative integer (got {!r})".format(
861 offset))
862 if offset < 0:
863 raise ValueError(
864 "offset must be a non-negative integer (got {!r})".format(
865 offset))
866
Neil Aspinallf7686c12017-12-19 19:45:42 +0000867 async def create_connection(
868 self, protocol_factory, host=None, port=None,
869 *, ssl=None, family=0,
870 proto=0, flags=0, sock=None,
871 local_addr=None, server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200872 ssl_handshake_timeout=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200873 """Connect to a TCP server.
874
875 Create a streaming transport connection to a given Internet host and
876 port: socket family AF_INET or socket.AF_INET6 depending on host (or
877 family if specified), socket type SOCK_STREAM. protocol_factory must be
878 a callable returning a protocol instance.
879
880 This method is a coroutine which will try to establish the connection
881 in the background. When successful, the coroutine returns a
882 (transport, protocol) pair.
883 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700884 if server_hostname is not None and not ssl:
885 raise ValueError('server_hostname is only meaningful with ssl')
886
887 if server_hostname is None and ssl:
888 # Use host as default for server_hostname. It is an error
889 # if host is empty or not set, e.g. when an
890 # already-connected socket was passed or when only a port
891 # is given. To avoid this error, you can pass
892 # server_hostname='' -- this will bypass the hostname
893 # check. (This also means that if host is a numeric
894 # IP/IPv6 address, we will attempt to verify that exact
895 # address; this will probably fail, but it is possible to
896 # create a certificate for a specific IP address, so we
897 # don't judge it here.)
898 if not host:
899 raise ValueError('You must set server_hostname '
900 'when using ssl without a host')
901 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700902
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200903 if ssl_handshake_timeout is not None and not ssl:
904 raise ValueError(
905 'ssl_handshake_timeout is only meaningful with ssl')
906
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700907 if host is not None or port is not None:
908 if sock is not None:
909 raise ValueError(
910 'host/port and sock can not be specified at the same time')
911
Yury Selivanov19a44f62017-12-14 20:53:26 -0500912 infos = await self._ensure_resolved(
913 (host, port), family=family,
914 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700915 if not infos:
916 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500917
918 if local_addr is not None:
919 laddr_infos = await self._ensure_resolved(
920 local_addr, family=family,
921 type=socket.SOCK_STREAM, proto=proto,
922 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700923 if not laddr_infos:
924 raise OSError('getaddrinfo() returned empty list')
925
926 exceptions = []
927 for family, type, proto, cname, address in infos:
928 try:
929 sock = socket.socket(family=family, type=type, proto=proto)
930 sock.setblocking(False)
Yury Selivanov19a44f62017-12-14 20:53:26 -0500931 if local_addr is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700932 for _, _, _, _, laddr in laddr_infos:
933 try:
934 sock.bind(laddr)
935 break
936 except OSError as exc:
Yury Selivanov6370f342017-12-10 18:36:12 -0500937 msg = (
938 f'error while attempting to bind on '
939 f'address {laddr!r}: '
940 f'{exc.strerror.lower()}'
941 )
942 exc = OSError(exc.errno, msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700943 exceptions.append(exc)
944 else:
945 sock.close()
946 sock = None
947 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200948 if self._debug:
949 logger.debug("connect %r to %r", sock, address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200950 await self.sock_connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700951 except OSError as exc:
952 if sock is not None:
953 sock.close()
954 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200955 except:
956 if sock is not None:
957 sock.close()
958 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700959 else:
960 break
961 else:
962 if len(exceptions) == 1:
963 raise exceptions[0]
964 else:
965 # If they all have the same str(), raise one.
966 model = str(exceptions[0])
967 if all(str(exc) == model for exc in exceptions):
968 raise exceptions[0]
969 # Raise a combined exception so the user can see all
970 # the various error messages.
971 raise OSError('Multiple exceptions: {}'.format(
972 ', '.join(str(exc) for exc in exceptions)))
973
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500974 else:
975 if sock is None:
976 raise ValueError(
977 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500978 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -0500979 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
980 # are SOCK_STREAM.
981 # We support passing AF_UNIX sockets even though we have
982 # a dedicated API for that: create_unix_connection.
983 # Disallowing AF_UNIX in this method, breaks backwards
984 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500985 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500986 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700987
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200988 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000989 sock, protocol_factory, ssl, server_hostname,
990 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +0200991 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200992 # Get the socket from the transport because SSL transport closes
993 # the old socket and creates a new SSL socket
994 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200995 logger.debug("%r connected to %s:%r: (%r, %r)",
996 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500997 return transport, protocol
998
Neil Aspinallf7686c12017-12-19 19:45:42 +0000999 async def _create_connection_transport(
1000 self, sock, protocol_factory, ssl,
1001 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001002 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001003
1004 sock.setblocking(False)
1005
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001006 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001007 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001008 if ssl:
1009 sslcontext = None if isinstance(ssl, bool) else ssl
1010 transport = self._make_ssl_transport(
1011 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001012 server_side=server_side, server_hostname=server_hostname,
1013 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001014 else:
1015 transport = self._make_socket_transport(sock, protocol, waiter)
1016
Victor Stinner29ad0112015-01-15 00:04:21 +01001017 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001018 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001019 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001020 transport.close()
1021 raise
1022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001023 return transport, protocol
1024
Andrew Svetlov7c684072018-01-27 21:22:47 +02001025 async def sendfile(self, transport, file, offset=0, count=None,
1026 *, fallback=True):
1027 """Send a file to transport.
1028
1029 Return the total number of bytes which were sent.
1030
1031 The method uses high-performance os.sendfile if available.
1032
1033 file must be a regular file object opened in binary mode.
1034
1035 offset tells from where to start reading the file. If specified,
1036 count is the total number of bytes to transmit as opposed to
1037 sending the file until EOF is reached. File position is updated on
1038 return or also in case of error in which case file.tell()
1039 can be used to figure out the number of bytes
1040 which were sent.
1041
1042 fallback set to True makes asyncio to manually read and send
1043 the file when the platform does not support the sendfile syscall
1044 (e.g. Windows or SSL socket on Unix).
1045
1046 Raise SendfileNotAvailableError if the system does not support
1047 sendfile syscall and fallback is False.
1048 """
1049 if transport.is_closing():
1050 raise RuntimeError("Transport is closing")
1051 mode = getattr(transport, '_sendfile_compatible',
1052 constants._SendfileMode.UNSUPPORTED)
1053 if mode is constants._SendfileMode.UNSUPPORTED:
1054 raise RuntimeError(
1055 f"sendfile is not supported for transport {transport!r}")
1056 if mode is constants._SendfileMode.TRY_NATIVE:
1057 try:
1058 return await self._sendfile_native(transport, file,
1059 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001060 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001061 if not fallback:
1062 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001063
1064 if not fallback:
1065 raise RuntimeError(
1066 f"fallback is disabled and native sendfile is not "
1067 f"supported for transport {transport!r}")
1068
Andrew Svetlov7c684072018-01-27 21:22:47 +02001069 return await self._sendfile_fallback(transport, file,
1070 offset, count)
1071
1072 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001073 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001074 "sendfile syscall is not supported")
1075
1076 async def _sendfile_fallback(self, transp, file, offset, count):
1077 if offset:
1078 file.seek(offset)
1079 blocksize = min(count, 16384) if count else 16384
1080 buf = bytearray(blocksize)
1081 total_sent = 0
1082 proto = _SendfileFallbackProtocol(transp)
1083 try:
1084 while True:
1085 if count:
1086 blocksize = min(count - total_sent, blocksize)
1087 if blocksize <= 0:
1088 return total_sent
1089 view = memoryview(buf)[:blocksize]
1090 read = file.readinto(view)
1091 if not read:
1092 return total_sent # EOF
1093 await proto.drain()
1094 transp.write(view)
1095 total_sent += read
1096 finally:
1097 if total_sent > 0 and hasattr(file, 'seek'):
1098 file.seek(offset + total_sent)
1099 await proto.restore()
1100
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001101 async def start_tls(self, transport, protocol, sslcontext, *,
1102 server_side=False,
1103 server_hostname=None,
1104 ssl_handshake_timeout=None):
1105 """Upgrade transport to TLS.
1106
1107 Return a new transport that *protocol* should start using
1108 immediately.
1109 """
1110 if ssl is None:
1111 raise RuntimeError('Python ssl module is not available')
1112
1113 if not isinstance(sslcontext, ssl.SSLContext):
1114 raise TypeError(
1115 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1116 f'got {sslcontext!r}')
1117
1118 if not getattr(transport, '_start_tls_compatible', False):
1119 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001120 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001121
1122 waiter = self.create_future()
1123 ssl_protocol = sslproto.SSLProtocol(
1124 self, protocol, sslcontext, waiter,
1125 server_side, server_hostname,
1126 ssl_handshake_timeout=ssl_handshake_timeout,
1127 call_connection_made=False)
1128
Yury Selivanovf2955872018-05-29 01:00:12 -04001129 # Pause early so that "ssl_protocol.data_received()" doesn't
1130 # have a chance to get called before "ssl_protocol.connection_made()".
1131 transport.pause_reading()
1132
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001133 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001134 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1135 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001136
Yury Selivanov96026432018-06-04 11:32:35 -04001137 try:
1138 await waiter
1139 except Exception:
1140 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001141 conmade_cb.cancel()
1142 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001143 raise
1144
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001145 return ssl_protocol._app_transport
1146
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001147 async def create_datagram_endpoint(self, protocol_factory,
1148 local_addr=None, remote_addr=None, *,
1149 family=0, proto=0, flags=0,
1150 reuse_address=None, reuse_port=None,
1151 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001152 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001153 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001154 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001155 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001156 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001157 if (local_addr or remote_addr or
1158 family or proto or flags or
1159 reuse_address or reuse_port or allow_broadcast):
1160 # show the problematic kwargs in exception msg
1161 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1162 family=family, proto=proto, flags=flags,
1163 reuse_address=reuse_address, reuse_port=reuse_port,
1164 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001165 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001166 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001167 f'socket modifier keyword arguments can not be used '
1168 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001169 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001170 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001171 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001172 if not (local_addr or remote_addr):
1173 if family == 0:
1174 raise ValueError('unexpected address family')
1175 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001176 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1177 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001178 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001179 raise TypeError('string is expected')
1180 addr_pairs_info = (((family, proto),
1181 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001182 else:
1183 # join address by (family, protocol)
1184 addr_infos = collections.OrderedDict()
1185 for idx, addr in ((0, local_addr), (1, remote_addr)):
1186 if addr is not None:
1187 assert isinstance(addr, tuple) and len(addr) == 2, (
1188 '2-tuple is expected')
1189
Yury Selivanov19a44f62017-12-14 20:53:26 -05001190 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001191 addr, family=family, type=socket.SOCK_DGRAM,
1192 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001193 if not infos:
1194 raise OSError('getaddrinfo() returned empty list')
1195
1196 for fam, _, pro, _, address in infos:
1197 key = (fam, pro)
1198 if key not in addr_infos:
1199 addr_infos[key] = [None, None]
1200 addr_infos[key][idx] = address
1201
1202 # each addr has to have info for each (family, proto) pair
1203 addr_pairs_info = [
1204 (key, addr_pair) for key, addr_pair in addr_infos.items()
1205 if not ((local_addr and addr_pair[0] is None) or
1206 (remote_addr and addr_pair[1] is None))]
1207
1208 if not addr_pairs_info:
1209 raise ValueError('can not get address information')
1210
1211 exceptions = []
1212
1213 if reuse_address is None:
1214 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1215
1216 for ((family, proto),
1217 (local_address, remote_address)) in addr_pairs_info:
1218 sock = None
1219 r_addr = None
1220 try:
1221 sock = socket.socket(
1222 family=family, type=socket.SOCK_DGRAM, proto=proto)
1223 if reuse_address:
1224 sock.setsockopt(
1225 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1226 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001227 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001228 if allow_broadcast:
1229 sock.setsockopt(
1230 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1231 sock.setblocking(False)
1232
1233 if local_addr:
1234 sock.bind(local_address)
1235 if remote_addr:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001236 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001237 r_addr = remote_address
1238 except OSError as exc:
1239 if sock is not None:
1240 sock.close()
1241 exceptions.append(exc)
1242 except:
1243 if sock is not None:
1244 sock.close()
1245 raise
1246 else:
1247 break
1248 else:
1249 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001250
1251 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001252 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001253 transport = self._make_datagram_transport(
1254 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001255 if self._debug:
1256 if local_addr:
1257 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1258 "created: (%r, %r)",
1259 local_addr, remote_addr, transport, protocol)
1260 else:
1261 logger.debug("Datagram endpoint remote_addr=%r created: "
1262 "(%r, %r)",
1263 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001264
1265 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001266 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001267 except:
1268 transport.close()
1269 raise
1270
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001271 return transport, protocol
1272
Yury Selivanov19a44f62017-12-14 20:53:26 -05001273 async def _ensure_resolved(self, address, *,
1274 family=0, type=socket.SOCK_STREAM,
1275 proto=0, flags=0, loop):
1276 host, port = address[:2]
1277 info = _ipaddr_info(host, port, family, type, proto)
1278 if info is not None:
1279 # "host" is already a resolved IP.
1280 return [info]
1281 else:
1282 return await loop.getaddrinfo(host, port, family=family, type=type,
1283 proto=proto, flags=flags)
1284
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001285 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001286 infos = await self._ensure_resolved((host, port), family=family,
1287 type=socket.SOCK_STREAM,
1288 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001289 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001290 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001291 return infos
1292
Neil Aspinallf7686c12017-12-19 19:45:42 +00001293 async def create_server(
1294 self, protocol_factory, host=None, port=None,
1295 *,
1296 family=socket.AF_UNSPEC,
1297 flags=socket.AI_PASSIVE,
1298 sock=None,
1299 backlog=100,
1300 ssl=None,
1301 reuse_address=None,
1302 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001303 ssl_handshake_timeout=None,
1304 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001305 """Create a TCP server.
1306
Yury Selivanov6370f342017-12-10 18:36:12 -05001307 The host parameter can be a string, in that case the TCP server is
1308 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001309
1310 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001311 the TCP server is bound to all hosts of the sequence. If a host
1312 appears multiple times (possibly indirectly e.g. when hostnames
1313 resolve to the same IP address), the server is only bound once to that
1314 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001315
Victor Stinneracdb7822014-07-14 18:33:40 +02001316 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001317
1318 This method is a coroutine.
1319 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001320 if isinstance(ssl, bool):
1321 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001322
1323 if ssl_handshake_timeout is not None and ssl is None:
1324 raise ValueError(
1325 'ssl_handshake_timeout is only meaningful with ssl')
1326
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001327 if host is not None or port is not None:
1328 if sock is not None:
1329 raise ValueError(
1330 'host/port and sock can not be specified at the same time')
1331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001332 if reuse_address is None:
1333 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1334 sockets = []
1335 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001336 hosts = [None]
1337 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001338 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001339 hosts = [host]
1340 else:
1341 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001342
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001343 fs = [self._create_server_getaddrinfo(host, port, family=family,
1344 flags=flags)
1345 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001346 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001347 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001348
1349 completed = False
1350 try:
1351 for res in infos:
1352 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001353 try:
1354 sock = socket.socket(af, socktype, proto)
1355 except socket.error:
1356 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001357 if self._debug:
1358 logger.warning('create_server() failed to create '
1359 'socket.socket(%r, %r, %r)',
1360 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001361 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001362 sockets.append(sock)
1363 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001364 sock.setsockopt(
1365 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1366 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001367 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001368 # Disable IPv4/IPv6 dual stack support (enabled by
1369 # default on Linux) which makes a single socket
1370 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001371 if (_HAS_IPv6 and
1372 af == socket.AF_INET6 and
1373 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001374 sock.setsockopt(socket.IPPROTO_IPV6,
1375 socket.IPV6_V6ONLY,
1376 True)
1377 try:
1378 sock.bind(sa)
1379 except OSError as err:
1380 raise OSError(err.errno, 'error while attempting '
1381 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001382 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001383 completed = True
1384 finally:
1385 if not completed:
1386 for sock in sockets:
1387 sock.close()
1388 else:
1389 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001390 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001391 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001392 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001393 sockets = [sock]
1394
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001395 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001396 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001397
1398 server = Server(self, sockets, protocol_factory,
1399 ssl, backlog, ssl_handshake_timeout)
1400 if start_serving:
1401 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001402 # Skip one loop iteration so that all 'loop.add_reader'
1403 # go through.
1404 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001405
Victor Stinnere912e652014-07-12 03:11:53 +02001406 if self._debug:
1407 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001408 return server
1409
Neil Aspinallf7686c12017-12-19 19:45:42 +00001410 async def connect_accepted_socket(
1411 self, protocol_factory, sock,
1412 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001413 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001414 """Handle an accepted connection.
1415
1416 This is used by servers that accept connections outside of
1417 asyncio but that use asyncio to handle connections.
1418
1419 This method is a coroutine. When completed, the coroutine
1420 returns a (transport, protocol) pair.
1421 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001422 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001423 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001424
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001425 if ssl_handshake_timeout is not None and not ssl:
1426 raise ValueError(
1427 'ssl_handshake_timeout is only meaningful with ssl')
1428
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001429 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001430 sock, protocol_factory, ssl, '', server_side=True,
1431 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001432 if self._debug:
1433 # Get the socket from the transport because SSL transport closes
1434 # the old socket and creates a new SSL socket
1435 sock = transport.get_extra_info('socket')
1436 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1437 return transport, protocol
1438
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001439 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001440 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001441 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001442 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001443
1444 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001445 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001446 except:
1447 transport.close()
1448 raise
1449
Victor Stinneracdb7822014-07-14 18:33:40 +02001450 if self._debug:
1451 logger.debug('Read pipe %r connected: (%r, %r)',
1452 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001453 return transport, protocol
1454
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001455 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001456 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001457 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001458 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001459
1460 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001461 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001462 except:
1463 transport.close()
1464 raise
1465
Victor Stinneracdb7822014-07-14 18:33:40 +02001466 if self._debug:
1467 logger.debug('Write pipe %r connected: (%r, %r)',
1468 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001469 return transport, protocol
1470
Victor Stinneracdb7822014-07-14 18:33:40 +02001471 def _log_subprocess(self, msg, stdin, stdout, stderr):
1472 info = [msg]
1473 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001474 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001475 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001476 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001477 else:
1478 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001479 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001480 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001481 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001482 logger.debug(' '.join(info))
1483
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001484 async def subprocess_shell(self, protocol_factory, cmd, *,
1485 stdin=subprocess.PIPE,
1486 stdout=subprocess.PIPE,
1487 stderr=subprocess.PIPE,
1488 universal_newlines=False,
1489 shell=True, bufsize=0,
1490 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001491 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001492 raise ValueError("cmd must be a string")
1493 if universal_newlines:
1494 raise ValueError("universal_newlines must be False")
1495 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001496 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001497 if bufsize != 0:
1498 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001499 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001500 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001501 if self._debug:
1502 # don't log parameters: they may contain sensitive information
1503 # (password) and may be too long
1504 debug_log = 'run shell command %r' % cmd
1505 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001506 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001507 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001508 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001509 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001510 return transport, protocol
1511
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001512 async def subprocess_exec(self, protocol_factory, program, *args,
1513 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1514 stderr=subprocess.PIPE, universal_newlines=False,
1515 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001516 if universal_newlines:
1517 raise ValueError("universal_newlines must be False")
1518 if shell:
1519 raise ValueError("shell must be False")
1520 if bufsize != 0:
1521 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001522 popen_args = (program,) + args
1523 for arg in popen_args:
1524 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001525 raise TypeError(
1526 f"program arguments must be a bytes or text string, "
1527 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001528 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001529 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001530 if self._debug:
1531 # don't log parameters: they may contain sensitive information
1532 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001533 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001534 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001535 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001536 protocol, popen_args, False, stdin, stdout, stderr,
1537 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001538 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001539 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001540 return transport, protocol
1541
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001542 def get_exception_handler(self):
1543 """Return an exception handler, or None if the default one is in use.
1544 """
1545 return self._exception_handler
1546
Yury Selivanov569efa22014-02-18 18:02:19 -05001547 def set_exception_handler(self, handler):
1548 """Set handler as the new event loop exception handler.
1549
1550 If handler is None, the default exception handler will
1551 be set.
1552
1553 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001554 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001555 will be a reference to the active event loop, 'context'
1556 will be a dict object (see `call_exception_handler()`
1557 documentation for details about context).
1558 """
1559 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001560 raise TypeError(f'A callable object or None is expected, '
1561 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001562 self._exception_handler = handler
1563
1564 def default_exception_handler(self, context):
1565 """Default exception handler.
1566
1567 This is called when an exception occurs and no exception
1568 handler is set, and can be called by a custom exception
1569 handler that wants to defer to the default behavior.
1570
Antoine Pitrou921e9432017-11-07 17:23:29 +01001571 This default handler logs the error message and other
1572 context-dependent information. In debug mode, a truncated
1573 stack trace is also appended showing where the given object
1574 (e.g. a handle or future or task) was created, if any.
1575
Victor Stinneracdb7822014-07-14 18:33:40 +02001576 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001577 `call_exception_handler()`.
1578 """
1579 message = context.get('message')
1580 if not message:
1581 message = 'Unhandled exception in event loop'
1582
1583 exception = context.get('exception')
1584 if exception is not None:
1585 exc_info = (type(exception), exception, exception.__traceback__)
1586 else:
1587 exc_info = False
1588
Yury Selivanov6370f342017-12-10 18:36:12 -05001589 if ('source_traceback' not in context and
1590 self._current_handle is not None and
1591 self._current_handle._source_traceback):
1592 context['handle_traceback'] = \
1593 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001594
Yury Selivanov569efa22014-02-18 18:02:19 -05001595 log_lines = [message]
1596 for key in sorted(context):
1597 if key in {'message', 'exception'}:
1598 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001599 value = context[key]
1600 if key == 'source_traceback':
1601 tb = ''.join(traceback.format_list(value))
1602 value = 'Object created at (most recent call last):\n'
1603 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001604 elif key == 'handle_traceback':
1605 tb = ''.join(traceback.format_list(value))
1606 value = 'Handle created at (most recent call last):\n'
1607 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001608 else:
1609 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001610 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001611
1612 logger.error('\n'.join(log_lines), exc_info=exc_info)
1613
1614 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001615 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001616
Victor Stinneracdb7822014-07-14 18:33:40 +02001617 The context argument is a dict containing the following keys:
1618
Yury Selivanov569efa22014-02-18 18:02:19 -05001619 - 'message': Error message;
1620 - 'exception' (optional): Exception object;
1621 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001622 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001623 - 'handle' (optional): Handle instance;
1624 - 'protocol' (optional): Protocol instance;
1625 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001626 - 'socket' (optional): Socket instance;
1627 - 'asyncgen' (optional): Asynchronous generator that caused
1628 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001629
Victor Stinneracdb7822014-07-14 18:33:40 +02001630 New keys maybe introduced in the future.
1631
1632 Note: do not overload this method in an event loop subclass.
1633 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001634 `set_exception_handler()` method.
1635 """
1636 if self._exception_handler is None:
1637 try:
1638 self.default_exception_handler(context)
1639 except Exception:
1640 # Second protection layer for unexpected errors
1641 # in the default implementation, as well as for subclassed
1642 # event loops with overloaded "default_exception_handler".
1643 logger.error('Exception in default exception handler',
1644 exc_info=True)
1645 else:
1646 try:
1647 self._exception_handler(self, context)
1648 except Exception as exc:
1649 # Exception in the user set custom exception handler.
1650 try:
1651 # Let's try default handler.
1652 self.default_exception_handler({
1653 'message': 'Unhandled error in exception handler',
1654 'exception': exc,
1655 'context': context,
1656 })
1657 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001658 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001659 # overloaded.
1660 logger.error('Exception in default exception handler '
1661 'while handling an unexpected error '
1662 'in custom exception handler',
1663 exc_info=True)
1664
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001665 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001666 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001667 assert isinstance(handle, events.Handle), 'A Handle is required here'
1668 if handle._cancelled:
1669 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001670 assert not isinstance(handle, events.TimerHandle)
1671 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001672
1673 def _add_callback_signalsafe(self, handle):
1674 """Like _add_callback() but called from a signal handler."""
1675 self._add_callback(handle)
1676 self._write_to_self()
1677
Yury Selivanov592ada92014-09-25 12:07:56 -04001678 def _timer_handle_cancelled(self, handle):
1679 """Notification that a TimerHandle has been cancelled."""
1680 if handle._scheduled:
1681 self._timer_cancelled_count += 1
1682
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001683 def _run_once(self):
1684 """Run one full iteration of the event loop.
1685
1686 This calls all currently ready callbacks, polls for I/O,
1687 schedules the resulting callbacks, and finally schedules
1688 'call_later' callbacks.
1689 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001690
Yury Selivanov592ada92014-09-25 12:07:56 -04001691 sched_count = len(self._scheduled)
1692 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1693 self._timer_cancelled_count / sched_count >
1694 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001695 # Remove delayed calls that were cancelled if their number
1696 # is too high
1697 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001698 for handle in self._scheduled:
1699 if handle._cancelled:
1700 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001701 else:
1702 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001703
Victor Stinner68da8fc2014-09-30 18:08:36 +02001704 heapq.heapify(new_scheduled)
1705 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001706 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001707 else:
1708 # Remove delayed calls that were cancelled from head of queue.
1709 while self._scheduled and self._scheduled[0]._cancelled:
1710 self._timer_cancelled_count -= 1
1711 handle = heapq.heappop(self._scheduled)
1712 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001713
1714 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001715 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001716 timeout = 0
1717 elif self._scheduled:
1718 # Compute the desired timeout.
1719 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001720 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001721
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001722 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001723 self._process_events(event_list)
1724
1725 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001726 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001727 while self._scheduled:
1728 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001729 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001730 break
1731 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001732 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001733 self._ready.append(handle)
1734
1735 # This is the only place where callbacks are actually *called*.
1736 # All other places just add them to ready.
1737 # Note: We run all currently scheduled callbacks, but not any
1738 # callbacks scheduled by callbacks run this time around --
1739 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001740 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001741 ntodo = len(self._ready)
1742 for i in range(ntodo):
1743 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001744 if handle._cancelled:
1745 continue
1746 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001747 try:
1748 self._current_handle = handle
1749 t0 = self.time()
1750 handle._run()
1751 dt = self.time() - t0
1752 if dt >= self.slow_callback_duration:
1753 logger.warning('Executing %s took %.3f seconds',
1754 _format_handle(handle), dt)
1755 finally:
1756 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001757 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001758 handle._run()
1759 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001760
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001761 def _set_coroutine_origin_tracking(self, enabled):
1762 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001763 return
1764
Yury Selivanove8944cb2015-05-12 11:43:04 -04001765 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001766 self._coroutine_origin_tracking_saved_depth = (
1767 sys.get_coroutine_origin_tracking_depth())
1768 sys.set_coroutine_origin_tracking_depth(
1769 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001770 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001771 sys.set_coroutine_origin_tracking_depth(
1772 self._coroutine_origin_tracking_saved_depth)
1773
1774 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001775
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001776 def get_debug(self):
1777 return self._debug
1778
1779 def set_debug(self, enabled):
1780 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001781
Yury Selivanove8944cb2015-05-12 11:43:04 -04001782 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001783 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)