blob: a736d01d6f37844911ace7001be588e76a220fce [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070030import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Yury Selivanovf111b3d2017-12-30 00:35:36 -050032try:
33 import ssl
34except ImportError: # pragma: no cover
35 ssl = None
36
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080037from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020038from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039from . import events
40from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020041from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050042from . import sslproto
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020044from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070045from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
47
Yury Selivanov6370f342017-12-10 18:36:12 -050048__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
50
Yury Selivanov592ada92014-09-25 12:07:56 -040051# Minimum number of _scheduled timer handles before cleanup of
52# cancelled handles is performed.
53_MIN_SCHEDULED_TIMER_HANDLES = 100
54
55# Minimum fraction of _scheduled timer handles that are cancelled
56# before cleanup of cancelled handles is performed.
57_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058
Miss Islington (bot)3ed44142018-06-28 19:16:48 -070059_HAS_IPv6 = hasattr(socket, 'AF_INET6')
60
Miss Islington (bot)172a81e2018-07-31 08:29:07 -070061# Maximum timeout passed to select to avoid OS limitations
62MAXIMUM_SELECT_TIMEOUT = 24 * 3600
63
Victor Stinnerc94a93a2016-04-01 21:43:39 +020064
Victor Stinner0e6f52a2014-06-20 17:34:15 +020065def _format_handle(handle):
66 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040067 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020068 # format the task
69 return repr(cb.__self__)
70 else:
71 return str(handle)
72
73
Victor Stinneracdb7822014-07-14 18:33:40 +020074def _format_pipe(fd):
75 if fd == subprocess.PIPE:
76 return '<pipe>'
77 elif fd == subprocess.STDOUT:
78 return '<stdout>'
79 else:
80 return repr(fd)
81
82
Yury Selivanov5587d7c2016-09-15 15:45:07 -040083def _set_reuseport(sock):
84 if not hasattr(socket, 'SO_REUSEPORT'):
85 raise ValueError('reuse_port not supported by socket module')
86 else:
87 try:
88 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
89 except OSError:
90 raise ValueError('reuse_port not supported by socket module, '
91 'SO_REUSEPORT defined but not implemented.')
92
93
Miss Islington (bot)94704042019-05-17 02:05:19 -070094def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -040095 # Try to skip getaddrinfo if "host" is already an IP. Users might have
96 # handled name resolution in their own code and pass in resolved IPs.
97 if not hasattr(socket, 'inet_pton'):
98 return
99
100 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
101 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500102 return None
103
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500104 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500105 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500106 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500107 proto = socket.IPPROTO_UDP
108 else:
109 return None
110
Yury Selivanova7146162016-06-02 16:51:07 -0400111 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400112 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700113 elif isinstance(port, bytes) and port == b'':
114 port = 0
115 elif isinstance(port, str) and port == '':
116 port = 0
117 else:
118 # If port's a service name like "http", don't skip getaddrinfo.
119 try:
120 port = int(port)
121 except (TypeError, ValueError):
122 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400123
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400124 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500125 afs = [socket.AF_INET]
Miss Islington (bot)3ed44142018-06-28 19:16:48 -0700126 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500127 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400128 else:
129 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500130
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400131 if isinstance(host, bytes):
132 host = host.decode('idna')
133 if '%' in host:
134 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
135 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500136 return None
137
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400138 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 socket.inet_pton(af, host)
141 # The host has already been resolved.
Miss Islington (bot)3ed44142018-06-28 19:16:48 -0700142 if _HAS_IPv6 and af == socket.AF_INET6:
Miss Islington (bot)94704042019-05-17 02:05:19 -0700143 return af, type, proto, '', (host, port, flowinfo, scopeid)
Miss Islington (bot)3ed44142018-06-28 19:16:48 -0700144 else:
145 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400146 except OSError:
147 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 # "host" is not an IP address.
150 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500151
152
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100153def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500154 if not fut.cancelled():
155 exc = fut.exception()
156 if isinstance(exc, BaseException) and not isinstance(exc, Exception):
157 # Issue #22429: run_forever() already finished, no need to
158 # stop it.
159 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500160 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100161
162
Miss Islington (bot)fe91e9b2018-12-03 13:11:41 -0800163if hasattr(socket, 'TCP_NODELAY'):
164 def _set_nodelay(sock):
165 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
166 sock.type == socket.SOCK_STREAM and
167 sock.proto == socket.IPPROTO_TCP):
168 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
169else:
170 def _set_nodelay(sock):
171 pass
172
173
Andrew Svetlov7c684072018-01-27 21:22:47 +0200174class _SendfileFallbackProtocol(protocols.Protocol):
175 def __init__(self, transp):
176 if not isinstance(transp, transports._FlowControlMixin):
177 raise TypeError("transport should be _FlowControlMixin instance")
178 self._transport = transp
179 self._proto = transp.get_protocol()
180 self._should_resume_reading = transp.is_reading()
181 self._should_resume_writing = transp._protocol_paused
182 transp.pause_reading()
183 transp.set_protocol(self)
184 if self._should_resume_writing:
185 self._write_ready_fut = self._transport._loop.create_future()
186 else:
187 self._write_ready_fut = None
188
189 async def drain(self):
190 if self._transport.is_closing():
191 raise ConnectionError("Connection closed by peer")
192 fut = self._write_ready_fut
193 if fut is None:
194 return
195 await fut
196
197 def connection_made(self, transport):
198 raise RuntimeError("Invalid state: "
199 "connection should have been established already.")
200
201 def connection_lost(self, exc):
202 if self._write_ready_fut is not None:
203 # Never happens if peer disconnects after sending the whole content
204 # Thus disconnection is always an exception from user perspective
205 if exc is None:
206 self._write_ready_fut.set_exception(
207 ConnectionError("Connection is closed by peer"))
208 else:
209 self._write_ready_fut.set_exception(exc)
210 self._proto.connection_lost(exc)
211
212 def pause_writing(self):
213 if self._write_ready_fut is not None:
214 return
215 self._write_ready_fut = self._transport._loop.create_future()
216
217 def resume_writing(self):
218 if self._write_ready_fut is None:
219 return
220 self._write_ready_fut.set_result(False)
221 self._write_ready_fut = None
222
223 def data_received(self, data):
224 raise RuntimeError("Invalid state: reading should be paused")
225
226 def eof_received(self):
227 raise RuntimeError("Invalid state: reading should be paused")
228
229 async def restore(self):
230 self._transport.set_protocol(self._proto)
231 if self._should_resume_reading:
232 self._transport.resume_reading()
233 if self._write_ready_fut is not None:
234 # Cancel the future.
235 # Basically it has no effect because protocol is switched back,
236 # no code should wait for it anymore.
237 self._write_ready_fut.cancel()
238 if self._should_resume_writing:
239 self._proto.resume_writing()
240
241
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242class Server(events.AbstractServer):
243
Yury Selivanovc9070d02018-01-25 18:08:09 -0500244 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
245 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200246 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500247 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200248 self._active_count = 0
249 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500250 self._protocol_factory = protocol_factory
251 self._backlog = backlog
252 self._ssl_context = ssl_context
253 self._ssl_handshake_timeout = ssl_handshake_timeout
254 self._serving = False
255 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256
Victor Stinnere912e652014-07-12 03:11:53 +0200257 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500258 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200259
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200260 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500261 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200262 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200264 def _detach(self):
265 assert self._active_count > 0
266 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500267 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 self._wakeup()
269
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200271 waiters = self._waiters
272 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 for waiter in waiters:
274 if not waiter.done():
275 waiter.set_result(waiter)
276
Yury Selivanovc9070d02018-01-25 18:08:09 -0500277 def _start_serving(self):
278 if self._serving:
279 return
280 self._serving = True
281 for sock in self._sockets:
282 sock.listen(self._backlog)
283 self._loop._start_serving(
284 self._protocol_factory, sock, self._ssl_context,
285 self, self._backlog, self._ssl_handshake_timeout)
286
287 def get_loop(self):
288 return self._loop
289
290 def is_serving(self):
291 return self._serving
292
293 @property
294 def sockets(self):
295 if self._sockets is None:
296 return []
297 return list(self._sockets)
298
299 def close(self):
300 sockets = self._sockets
301 if sockets is None:
302 return
303 self._sockets = None
304
305 for sock in sockets:
306 self._loop._stop_serving(sock)
307
308 self._serving = False
309
310 if (self._serving_forever_fut is not None and
311 not self._serving_forever_fut.done()):
312 self._serving_forever_fut.cancel()
313 self._serving_forever_fut = None
314
315 if self._active_count == 0:
316 self._wakeup()
317
318 async def start_serving(self):
319 self._start_serving()
Miss Islington (bot)bc3a0022018-05-28 11:50:45 -0700320 # Skip one loop iteration so that all 'loop.add_reader'
321 # go through.
322 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500323
324 async def serve_forever(self):
325 if self._serving_forever_fut is not None:
326 raise RuntimeError(
327 f'server {self!r} is already being awaited on serve_forever()')
328 if self._sockets is None:
329 raise RuntimeError(f'server {self!r} is closed')
330
331 self._start_serving()
332 self._serving_forever_fut = self._loop.create_future()
333
334 try:
335 await self._serving_forever_fut
336 except futures.CancelledError:
337 try:
338 self.close()
339 await self.wait_closed()
340 finally:
341 raise
342 finally:
343 self._serving_forever_fut = None
344
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200345 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500346 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400348 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200349 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200350 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351
352
353class BaseEventLoop(events.AbstractEventLoop):
354
355 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400356 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200357 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800358 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 self._ready = collections.deque()
360 self._scheduled = []
361 self._default_executor = None
362 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100363 # Identifier of the thread running the event loop, or None if the
364 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100365 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100366 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500367 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800368 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200369 # In debug mode, if the execution of a callback or a step of a task
370 # exceed this duration in seconds, the slow callback/task is logged.
371 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100372 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400373 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800374 self._coroutine_origin_tracking_enabled = False
375 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500377 # A weak set of all asynchronous generators that are
378 # being iterated by the loop.
379 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700380 # Set to True when `loop.shutdown_asyncgens` is called.
381 self._asyncgens_shutdown_called = False
382
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200383 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500384 return (
385 f'<{self.__class__.__name__} running={self.is_running()} '
386 f'closed={self.is_closed()} debug={self.get_debug()}>'
387 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200388
Yury Selivanov7661db62016-05-16 15:38:39 -0400389 def create_future(self):
390 """Create a Future object attached to the loop."""
391 return futures.Future(loop=self)
392
Victor Stinner896a25a2014-07-08 11:29:25 +0200393 def create_task(self, coro):
394 """Schedule a coroutine object.
395
Victor Stinneracdb7822014-07-14 18:33:40 +0200396 Return a task object.
397 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100398 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400399 if self._task_factory is None:
400 task = tasks.Task(coro, loop=self)
401 if task._source_traceback:
402 del task._source_traceback[-1]
403 else:
404 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200405 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200406
Yury Selivanov740169c2015-05-11 14:23:38 -0400407 def set_task_factory(self, factory):
408 """Set a task factory that will be used by loop.create_task().
409
410 If factory is None the default task factory will be set.
411
412 If factory is a callable, it should have a signature matching
413 '(loop, coro)', where 'loop' will be a reference to the active
414 event loop, 'coro' will be a coroutine object. The callable
415 must return a Future.
416 """
417 if factory is not None and not callable(factory):
418 raise TypeError('task factory must be a callable or None')
419 self._task_factory = factory
420
421 def get_task_factory(self):
422 """Return a task factory, or None if the default one is in use."""
423 return self._task_factory
424
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 def _make_socket_transport(self, sock, protocol, waiter=None, *,
426 extra=None, server=None):
427 """Create socket transport."""
428 raise NotImplementedError
429
Neil Aspinallf7686c12017-12-19 19:45:42 +0000430 def _make_ssl_transport(
431 self, rawsock, protocol, sslcontext, waiter=None,
432 *, server_side=False, server_hostname=None,
433 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500434 ssl_handshake_timeout=None,
435 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 """Create SSL transport."""
437 raise NotImplementedError
438
439 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200440 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 """Create datagram transport."""
442 raise NotImplementedError
443
444 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
445 extra=None):
446 """Create read pipe transport."""
447 raise NotImplementedError
448
449 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
450 extra=None):
451 """Create write pipe transport."""
452 raise NotImplementedError
453
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200454 async def _make_subprocess_transport(self, protocol, args, shell,
455 stdin, stdout, stderr, bufsize,
456 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 """Create subprocess transport."""
458 raise NotImplementedError
459
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200461 """Write a byte to self-pipe, to wake up the event loop.
462
463 This may be called from a different thread.
464
465 The subclass is responsible for implementing the self-pipe.
466 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 raise NotImplementedError
468
469 def _process_events(self, event_list):
470 """Process selector events."""
471 raise NotImplementedError
472
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200473 def _check_closed(self):
474 if self._closed:
475 raise RuntimeError('Event loop is closed')
476
Yury Selivanoveb636452016-09-08 22:01:51 -0700477 def _asyncgen_finalizer_hook(self, agen):
478 self._asyncgens.discard(agen)
479 if not self.is_closed():
Miss Islington (bot)41e5ec32018-10-09 09:03:35 -0700480 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700481
482 def _asyncgen_firstiter_hook(self, agen):
483 if self._asyncgens_shutdown_called:
484 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500485 f"asynchronous generator {agen!r} was scheduled after "
486 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700487 ResourceWarning, source=self)
488
489 self._asyncgens.add(agen)
490
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200491 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700492 """Shutdown all active asynchronous generators."""
493 self._asyncgens_shutdown_called = True
494
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500495 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400496 # If Python version is <3.6 or we don't have any asynchronous
497 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700498 return
499
500 closing_agens = list(self._asyncgens)
501 self._asyncgens.clear()
502
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200503 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700504 *[ag.aclose() for ag in closing_agens],
505 return_exceptions=True,
506 loop=self)
507
Yury Selivanoveb636452016-09-08 22:01:51 -0700508 for result, agen in zip(results, closing_agens):
509 if isinstance(result, Exception):
510 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500511 'message': f'an error occurred during closing of '
512 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700513 'exception': result,
514 'asyncgen': agen
515 })
516
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 def run_forever(self):
518 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200519 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100520 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400521 raise RuntimeError('This event loop is already running')
522 if events._get_running_loop() is not None:
523 raise RuntimeError(
524 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800525 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100526 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500527
528 old_agen_hooks = sys.get_asyncgen_hooks()
529 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
530 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400532 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800534 self._run_once()
535 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 break
537 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800538 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100539 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400540 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800541 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500542 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
544 def run_until_complete(self, future):
545 """Run until the Future is done.
546
547 If the argument is a coroutine, it is wrapped in a Task.
548
Victor Stinneracdb7822014-07-14 18:33:40 +0200549 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 with the same coroutine twice -- it would wrap it in two
551 different Tasks and that can't be good.
552
553 Return the Future's result, or raise its exception.
554 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200555 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200556
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700557 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400558 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200559 if new_task:
560 # An exception is raised if the future didn't complete, so there
561 # is no need to log the "destroy pending task" message
562 future._log_destroy_pending = False
563
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100564 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200565 try:
566 self.run_forever()
567 except:
568 if new_task and future.done() and not future.cancelled():
569 # The coroutine raised a BaseException. Consume the exception
570 # to not log a warning, the caller doesn't have access to the
571 # local task.
572 future.exception()
573 raise
jimmylai21b3e042017-05-22 22:32:46 -0700574 finally:
575 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 if not future.done():
577 raise RuntimeError('Event loop stopped before Future completed.')
578
579 return future.result()
580
581 def stop(self):
582 """Stop running the event loop.
583
Guido van Rossum41f69f42015-11-19 13:28:47 -0800584 Every callback already scheduled will still run. This simply informs
585 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800587 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200589 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700590 """Close the event loop.
591
592 This clears the queues and shuts down the executor,
593 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200594
595 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700596 """
Victor Stinner956de692014-12-26 21:07:52 +0100597 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200598 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200599 if self._closed:
600 return
Victor Stinnere912e652014-07-12 03:11:53 +0200601 if self._debug:
602 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400603 self._closed = True
604 self._ready.clear()
605 self._scheduled.clear()
606 executor = self._default_executor
607 if executor is not None:
608 self._default_executor = None
609 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200610
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200611 def is_closed(self):
612 """Returns True if the event loop was closed."""
613 return self._closed
614
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900615 def __del__(self):
616 if not self.is_closed():
Yury Selivanov6370f342017-12-10 18:36:12 -0500617 warnings.warn(f"unclosed event loop {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900618 source=self)
619 if not self.is_running():
620 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100621
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200623 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100624 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625
626 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200627 """Return the time according to the event loop's clock.
628
629 This is a float expressed in seconds since an epoch, but the
630 epoch, precision, accuracy and drift are unspecified and may
631 differ per event loop.
632 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633 return time.monotonic()
634
Yury Selivanovf23746a2018-01-22 19:11:18 -0500635 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 """Arrange for a callback to be called at a given time.
637
638 Return a Handle: an opaque object with a cancel() method that
639 can be used to cancel the call.
640
641 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200642 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
644 Each callback will be called exactly once. If two callbacks
645 are scheduled for exactly the same time, it undefined which
646 will be called first.
647
648 Any positional arguments after the callback will be passed to
649 the callback when it is called.
650 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500651 timer = self.call_at(self.time() + delay, callback, *args,
652 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200653 if timer._source_traceback:
654 del timer._source_traceback[-1]
655 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
Yury Selivanovf23746a2018-01-22 19:11:18 -0500657 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200658 """Like call_later(), but uses an absolute time.
659
660 Absolute time corresponds to the event loop's time() method.
661 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100662 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100663 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100664 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700665 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500666 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200667 if timer._source_traceback:
668 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400670 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 return timer
672
Yury Selivanovf23746a2018-01-22 19:11:18 -0500673 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 """Arrange for a callback to be called as soon as possible.
675
Victor Stinneracdb7822014-07-14 18:33:40 +0200676 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 order in which they are registered. Each callback will be
678 called exactly once.
679
680 Any positional arguments after the callback will be passed to
681 the callback when it is called.
682 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700683 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100684 if self._debug:
685 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700686 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500687 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200688 if handle._source_traceback:
689 del handle._source_traceback[-1]
690 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100691
Yury Selivanov491a9122016-11-03 15:09:24 -0700692 def _check_callback(self, callback, method):
693 if (coroutines.iscoroutine(callback) or
694 coroutines.iscoroutinefunction(callback)):
695 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500696 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700697 if not callable(callback):
698 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500699 f'a callable object was expected by {method}(), '
700 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700701
Yury Selivanovf23746a2018-01-22 19:11:18 -0500702 def _call_soon(self, callback, args, context):
703 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200704 if handle._source_traceback:
705 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 self._ready.append(handle)
707 return handle
708
Victor Stinner956de692014-12-26 21:07:52 +0100709 def _check_thread(self):
710 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100711
Victor Stinneracdb7822014-07-14 18:33:40 +0200712 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100713 likely behave incorrectly when the assumption is violated.
714
Victor Stinneracdb7822014-07-14 18:33:40 +0200715 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100716 responsible for checking this condition for performance reasons.
717 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100718 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200719 return
Victor Stinner956de692014-12-26 21:07:52 +0100720 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100721 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100722 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200723 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100724 "than the current one")
725
Yury Selivanovf23746a2018-01-22 19:11:18 -0500726 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200727 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700728 self._check_closed()
729 if self._debug:
730 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500731 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200732 if handle._source_traceback:
733 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 self._write_to_self()
735 return handle
736
Yury Selivanovbec23722018-01-28 14:09:40 -0500737 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100738 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700739 if self._debug:
740 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700741 if executor is None:
742 executor = self._default_executor
743 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400744 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500746 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500747 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748
749 def set_default_executor(self, executor):
750 self._default_executor = executor
751
Victor Stinnere912e652014-07-12 03:11:53 +0200752 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500753 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200754 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500755 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200756 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500757 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200758 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500759 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200760 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500761 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200762 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200763 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200764
765 t0 = self.time()
766 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
767 dt = self.time() - t0
768
Yury Selivanov6370f342017-12-10 18:36:12 -0500769 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200770 if dt >= self.slow_callback_duration:
771 logger.info(msg)
772 else:
773 logger.debug(msg)
774 return addrinfo
775
Yury Selivanov19a44f62017-12-14 20:53:26 -0500776 async def getaddrinfo(self, host, port, *,
777 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400778 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500779 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200780 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500781 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700782
Yury Selivanov19a44f62017-12-14 20:53:26 -0500783 return await self.run_in_executor(
784 None, getaddr_func, host, port, family, type, proto, flags)
785
786 async def getnameinfo(self, sockaddr, flags=0):
787 return await self.run_in_executor(
788 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700789
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200790 async def sock_sendfile(self, sock, file, offset=0, count=None,
791 *, fallback=True):
792 if self._debug and sock.gettimeout() != 0:
793 raise ValueError("the socket must be non-blocking")
794 self._check_sendfile_params(sock, file, offset, count)
795 try:
796 return await self._sock_sendfile_native(sock, file,
797 offset, count)
Andrew Svetlov7464e872018-01-19 20:04:29 +0200798 except events.SendfileNotAvailableError as exc:
799 if not fallback:
800 raise
801 return await self._sock_sendfile_fallback(sock, file,
802 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200803
804 async def _sock_sendfile_native(self, sock, file, offset, count):
805 # NB: sendfile syscall is not supported for SSL sockets and
806 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov7464e872018-01-19 20:04:29 +0200807 raise events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200808 f"syscall sendfile is not available for socket {sock!r} "
809 "and file {file!r} combination")
810
811 async def _sock_sendfile_fallback(self, sock, file, offset, count):
812 if offset:
813 file.seek(offset)
Miss Islington (bot)420092e2018-05-28 18:42:45 -0700814 blocksize = (
815 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
816 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
817 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200818 buf = bytearray(blocksize)
819 total_sent = 0
820 try:
821 while True:
822 if count:
823 blocksize = min(count - total_sent, blocksize)
824 if blocksize <= 0:
825 break
826 view = memoryview(buf)[:blocksize]
Miss Islington (bot)420092e2018-05-28 18:42:45 -0700827 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200828 if not read:
829 break # EOF
830 await self.sock_sendall(sock, view)
831 total_sent += read
832 return total_sent
833 finally:
834 if total_sent > 0 and hasattr(file, 'seek'):
835 file.seek(offset + total_sent)
836
837 def _check_sendfile_params(self, sock, file, offset, count):
838 if 'b' not in getattr(file, 'mode', 'b'):
839 raise ValueError("file should be opened in binary mode")
840 if not sock.type == socket.SOCK_STREAM:
841 raise ValueError("only SOCK_STREAM type sockets are supported")
842 if count is not None:
843 if not isinstance(count, int):
844 raise TypeError(
845 "count must be a positive integer (got {!r})".format(count))
846 if count <= 0:
847 raise ValueError(
848 "count must be a positive integer (got {!r})".format(count))
849 if not isinstance(offset, int):
850 raise TypeError(
851 "offset must be a non-negative integer (got {!r})".format(
852 offset))
853 if offset < 0:
854 raise ValueError(
855 "offset must be a non-negative integer (got {!r})".format(
856 offset))
857
Neil Aspinallf7686c12017-12-19 19:45:42 +0000858 async def create_connection(
859 self, protocol_factory, host=None, port=None,
860 *, ssl=None, family=0,
861 proto=0, flags=0, sock=None,
862 local_addr=None, server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200863 ssl_handshake_timeout=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200864 """Connect to a TCP server.
865
866 Create a streaming transport connection to a given Internet host and
867 port: socket family AF_INET or socket.AF_INET6 depending on host (or
868 family if specified), socket type SOCK_STREAM. protocol_factory must be
869 a callable returning a protocol instance.
870
871 This method is a coroutine which will try to establish the connection
872 in the background. When successful, the coroutine returns a
873 (transport, protocol) pair.
874 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700875 if server_hostname is not None and not ssl:
876 raise ValueError('server_hostname is only meaningful with ssl')
877
878 if server_hostname is None and ssl:
879 # Use host as default for server_hostname. It is an error
880 # if host is empty or not set, e.g. when an
881 # already-connected socket was passed or when only a port
882 # is given. To avoid this error, you can pass
883 # server_hostname='' -- this will bypass the hostname
884 # check. (This also means that if host is a numeric
885 # IP/IPv6 address, we will attempt to verify that exact
886 # address; this will probably fail, but it is possible to
887 # create a certificate for a specific IP address, so we
888 # don't judge it here.)
889 if not host:
890 raise ValueError('You must set server_hostname '
891 'when using ssl without a host')
892 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700893
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200894 if ssl_handshake_timeout is not None and not ssl:
895 raise ValueError(
896 'ssl_handshake_timeout is only meaningful with ssl')
897
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700898 if host is not None or port is not None:
899 if sock is not None:
900 raise ValueError(
901 'host/port and sock can not be specified at the same time')
902
Yury Selivanov19a44f62017-12-14 20:53:26 -0500903 infos = await self._ensure_resolved(
904 (host, port), family=family,
905 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700906 if not infos:
907 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500908
909 if local_addr is not None:
910 laddr_infos = await self._ensure_resolved(
911 local_addr, family=family,
912 type=socket.SOCK_STREAM, proto=proto,
913 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700914 if not laddr_infos:
915 raise OSError('getaddrinfo() returned empty list')
916
917 exceptions = []
918 for family, type, proto, cname, address in infos:
919 try:
920 sock = socket.socket(family=family, type=type, proto=proto)
921 sock.setblocking(False)
Yury Selivanov19a44f62017-12-14 20:53:26 -0500922 if local_addr is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700923 for _, _, _, _, laddr in laddr_infos:
924 try:
925 sock.bind(laddr)
926 break
927 except OSError as exc:
Yury Selivanov6370f342017-12-10 18:36:12 -0500928 msg = (
929 f'error while attempting to bind on '
930 f'address {laddr!r}: '
931 f'{exc.strerror.lower()}'
932 )
933 exc = OSError(exc.errno, msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700934 exceptions.append(exc)
935 else:
936 sock.close()
937 sock = None
938 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200939 if self._debug:
940 logger.debug("connect %r to %r", sock, address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200941 await self.sock_connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700942 except OSError as exc:
943 if sock is not None:
944 sock.close()
945 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200946 except:
947 if sock is not None:
948 sock.close()
949 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700950 else:
951 break
952 else:
953 if len(exceptions) == 1:
954 raise exceptions[0]
955 else:
956 # If they all have the same str(), raise one.
957 model = str(exceptions[0])
958 if all(str(exc) == model for exc in exceptions):
959 raise exceptions[0]
960 # Raise a combined exception so the user can see all
961 # the various error messages.
962 raise OSError('Multiple exceptions: {}'.format(
963 ', '.join(str(exc) for exc in exceptions)))
964
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500965 else:
966 if sock is None:
967 raise ValueError(
968 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500969 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -0500970 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
971 # are SOCK_STREAM.
972 # We support passing AF_UNIX sockets even though we have
973 # a dedicated API for that: create_unix_connection.
974 # Disallowing AF_UNIX in this method, breaks backwards
975 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500976 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500977 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700978
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200979 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000980 sock, protocol_factory, ssl, server_hostname,
981 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +0200982 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200983 # Get the socket from the transport because SSL transport closes
984 # the old socket and creates a new SSL socket
985 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200986 logger.debug("%r connected to %s:%r: (%r, %r)",
987 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500988 return transport, protocol
989
Neil Aspinallf7686c12017-12-19 19:45:42 +0000990 async def _create_connection_transport(
991 self, sock, protocol_factory, ssl,
992 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200993 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -0400994
995 sock.setblocking(False)
996
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700997 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400998 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700999 if ssl:
1000 sslcontext = None if isinstance(ssl, bool) else ssl
1001 transport = self._make_ssl_transport(
1002 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001003 server_side=server_side, server_hostname=server_hostname,
1004 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001005 else:
1006 transport = self._make_socket_transport(sock, protocol, waiter)
1007
Victor Stinner29ad0112015-01-15 00:04:21 +01001008 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001009 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001010 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001011 transport.close()
1012 raise
1013
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001014 return transport, protocol
1015
Andrew Svetlov7c684072018-01-27 21:22:47 +02001016 async def sendfile(self, transport, file, offset=0, count=None,
1017 *, fallback=True):
1018 """Send a file to transport.
1019
1020 Return the total number of bytes which were sent.
1021
1022 The method uses high-performance os.sendfile if available.
1023
1024 file must be a regular file object opened in binary mode.
1025
1026 offset tells from where to start reading the file. If specified,
1027 count is the total number of bytes to transmit as opposed to
1028 sending the file until EOF is reached. File position is updated on
1029 return or also in case of error in which case file.tell()
1030 can be used to figure out the number of bytes
1031 which were sent.
1032
1033 fallback set to True makes asyncio to manually read and send
1034 the file when the platform does not support the sendfile syscall
1035 (e.g. Windows or SSL socket on Unix).
1036
1037 Raise SendfileNotAvailableError if the system does not support
1038 sendfile syscall and fallback is False.
1039 """
1040 if transport.is_closing():
1041 raise RuntimeError("Transport is closing")
1042 mode = getattr(transport, '_sendfile_compatible',
1043 constants._SendfileMode.UNSUPPORTED)
1044 if mode is constants._SendfileMode.UNSUPPORTED:
1045 raise RuntimeError(
1046 f"sendfile is not supported for transport {transport!r}")
1047 if mode is constants._SendfileMode.TRY_NATIVE:
1048 try:
1049 return await self._sendfile_native(transport, file,
1050 offset, count)
1051 except events.SendfileNotAvailableError as exc:
1052 if not fallback:
1053 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001054
1055 if not fallback:
1056 raise RuntimeError(
1057 f"fallback is disabled and native sendfile is not "
1058 f"supported for transport {transport!r}")
1059
Andrew Svetlov7c684072018-01-27 21:22:47 +02001060 return await self._sendfile_fallback(transport, file,
1061 offset, count)
1062
1063 async def _sendfile_native(self, transp, file, offset, count):
1064 raise events.SendfileNotAvailableError(
1065 "sendfile syscall is not supported")
1066
1067 async def _sendfile_fallback(self, transp, file, offset, count):
1068 if offset:
1069 file.seek(offset)
1070 blocksize = min(count, 16384) if count else 16384
1071 buf = bytearray(blocksize)
1072 total_sent = 0
1073 proto = _SendfileFallbackProtocol(transp)
1074 try:
1075 while True:
1076 if count:
1077 blocksize = min(count - total_sent, blocksize)
1078 if blocksize <= 0:
1079 return total_sent
1080 view = memoryview(buf)[:blocksize]
1081 read = file.readinto(view)
1082 if not read:
1083 return total_sent # EOF
1084 await proto.drain()
1085 transp.write(view)
1086 total_sent += read
1087 finally:
1088 if total_sent > 0 and hasattr(file, 'seek'):
1089 file.seek(offset + total_sent)
1090 await proto.restore()
1091
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001092 async def start_tls(self, transport, protocol, sslcontext, *,
1093 server_side=False,
1094 server_hostname=None,
1095 ssl_handshake_timeout=None):
1096 """Upgrade transport to TLS.
1097
1098 Return a new transport that *protocol* should start using
1099 immediately.
1100 """
1101 if ssl is None:
1102 raise RuntimeError('Python ssl module is not available')
1103
1104 if not isinstance(sslcontext, ssl.SSLContext):
1105 raise TypeError(
1106 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1107 f'got {sslcontext!r}')
1108
1109 if not getattr(transport, '_start_tls_compatible', False):
1110 raise TypeError(
Miss Islington (bot)79c7e572018-06-05 07:18:20 -07001111 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001112
1113 waiter = self.create_future()
1114 ssl_protocol = sslproto.SSLProtocol(
1115 self, protocol, sslcontext, waiter,
1116 server_side, server_hostname,
1117 ssl_handshake_timeout=ssl_handshake_timeout,
1118 call_connection_made=False)
1119
Miss Islington (bot)eca08592018-05-28 22:59:03 -07001120 # Pause early so that "ssl_protocol.data_received()" doesn't
1121 # have a chance to get called before "ssl_protocol.connection_made()".
1122 transport.pause_reading()
1123
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001124 transport.set_protocol(ssl_protocol)
Miss Islington (bot)79c7e572018-06-05 07:18:20 -07001125 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1126 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001127
Miss Islington (bot)87936d02018-06-04 09:05:46 -07001128 try:
1129 await waiter
1130 except Exception:
1131 transport.close()
Miss Islington (bot)79c7e572018-06-05 07:18:20 -07001132 conmade_cb.cancel()
1133 resume_cb.cancel()
Miss Islington (bot)87936d02018-06-04 09:05:46 -07001134 raise
1135
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001136 return ssl_protocol._app_transport
1137
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001138 async def create_datagram_endpoint(self, protocol_factory,
1139 local_addr=None, remote_addr=None, *,
1140 family=0, proto=0, flags=0,
1141 reuse_address=None, reuse_port=None,
1142 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001143 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001144 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001145 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001146 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001147 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001148 if (local_addr or remote_addr or
1149 family or proto or flags or
1150 reuse_address or reuse_port or allow_broadcast):
1151 # show the problematic kwargs in exception msg
1152 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1153 family=family, proto=proto, flags=flags,
1154 reuse_address=reuse_address, reuse_port=reuse_port,
1155 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001156 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001157 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001158 f'socket modifier keyword arguments can not be used '
1159 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001160 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001161 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001162 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001163 if not (local_addr or remote_addr):
1164 if family == 0:
1165 raise ValueError('unexpected address family')
1166 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001167 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1168 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001169 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001170 raise TypeError('string is expected')
1171 addr_pairs_info = (((family, proto),
1172 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001173 else:
1174 # join address by (family, protocol)
1175 addr_infos = collections.OrderedDict()
1176 for idx, addr in ((0, local_addr), (1, remote_addr)):
1177 if addr is not None:
1178 assert isinstance(addr, tuple) and len(addr) == 2, (
1179 '2-tuple is expected')
1180
Yury Selivanov19a44f62017-12-14 20:53:26 -05001181 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001182 addr, family=family, type=socket.SOCK_DGRAM,
1183 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001184 if not infos:
1185 raise OSError('getaddrinfo() returned empty list')
1186
1187 for fam, _, pro, _, address in infos:
1188 key = (fam, pro)
1189 if key not in addr_infos:
1190 addr_infos[key] = [None, None]
1191 addr_infos[key][idx] = address
1192
1193 # each addr has to have info for each (family, proto) pair
1194 addr_pairs_info = [
1195 (key, addr_pair) for key, addr_pair in addr_infos.items()
1196 if not ((local_addr and addr_pair[0] is None) or
1197 (remote_addr and addr_pair[1] is None))]
1198
1199 if not addr_pairs_info:
1200 raise ValueError('can not get address information')
1201
1202 exceptions = []
1203
1204 if reuse_address is None:
1205 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1206
1207 for ((family, proto),
1208 (local_address, remote_address)) in addr_pairs_info:
1209 sock = None
1210 r_addr = None
1211 try:
1212 sock = socket.socket(
1213 family=family, type=socket.SOCK_DGRAM, proto=proto)
1214 if reuse_address:
1215 sock.setsockopt(
1216 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1217 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001218 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001219 if allow_broadcast:
1220 sock.setsockopt(
1221 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1222 sock.setblocking(False)
1223
1224 if local_addr:
1225 sock.bind(local_address)
1226 if remote_addr:
Miss Islington (bot)19ca5b52019-05-07 10:45:53 -07001227 if not allow_broadcast:
1228 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001229 r_addr = remote_address
1230 except OSError as exc:
1231 if sock is not None:
1232 sock.close()
1233 exceptions.append(exc)
1234 except:
1235 if sock is not None:
1236 sock.close()
1237 raise
1238 else:
1239 break
1240 else:
1241 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001242
1243 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001244 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001245 transport = self._make_datagram_transport(
1246 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001247 if self._debug:
1248 if local_addr:
1249 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1250 "created: (%r, %r)",
1251 local_addr, remote_addr, transport, protocol)
1252 else:
1253 logger.debug("Datagram endpoint remote_addr=%r created: "
1254 "(%r, %r)",
1255 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001256
1257 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001258 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001259 except:
1260 transport.close()
1261 raise
1262
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001263 return transport, protocol
1264
Yury Selivanov19a44f62017-12-14 20:53:26 -05001265 async def _ensure_resolved(self, address, *,
1266 family=0, type=socket.SOCK_STREAM,
1267 proto=0, flags=0, loop):
1268 host, port = address[:2]
Miss Islington (bot)94704042019-05-17 02:05:19 -07001269 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
Yury Selivanov19a44f62017-12-14 20:53:26 -05001270 if info is not None:
1271 # "host" is already a resolved IP.
1272 return [info]
1273 else:
1274 return await loop.getaddrinfo(host, port, family=family, type=type,
1275 proto=proto, flags=flags)
1276
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001277 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001278 infos = await self._ensure_resolved((host, port), family=family,
1279 type=socket.SOCK_STREAM,
1280 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001281 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001282 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001283 return infos
1284
Neil Aspinallf7686c12017-12-19 19:45:42 +00001285 async def create_server(
1286 self, protocol_factory, host=None, port=None,
1287 *,
1288 family=socket.AF_UNSPEC,
1289 flags=socket.AI_PASSIVE,
1290 sock=None,
1291 backlog=100,
1292 ssl=None,
1293 reuse_address=None,
1294 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001295 ssl_handshake_timeout=None,
1296 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001297 """Create a TCP server.
1298
Yury Selivanov6370f342017-12-10 18:36:12 -05001299 The host parameter can be a string, in that case the TCP server is
1300 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001301
1302 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001303 the TCP server is bound to all hosts of the sequence. If a host
1304 appears multiple times (possibly indirectly e.g. when hostnames
1305 resolve to the same IP address), the server is only bound once to that
1306 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001307
Victor Stinneracdb7822014-07-14 18:33:40 +02001308 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001309
1310 This method is a coroutine.
1311 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001312 if isinstance(ssl, bool):
1313 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001314
1315 if ssl_handshake_timeout is not None and ssl is None:
1316 raise ValueError(
1317 'ssl_handshake_timeout is only meaningful with ssl')
1318
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001319 if host is not None or port is not None:
1320 if sock is not None:
1321 raise ValueError(
1322 'host/port and sock can not be specified at the same time')
1323
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001324 if reuse_address is None:
1325 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1326 sockets = []
1327 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001328 hosts = [None]
1329 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001330 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001331 hosts = [host]
1332 else:
1333 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001334
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001335 fs = [self._create_server_getaddrinfo(host, port, family=family,
1336 flags=flags)
1337 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001338 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001339 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001340
1341 completed = False
1342 try:
1343 for res in infos:
1344 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001345 try:
1346 sock = socket.socket(af, socktype, proto)
1347 except socket.error:
1348 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001349 if self._debug:
1350 logger.warning('create_server() failed to create '
1351 'socket.socket(%r, %r, %r)',
1352 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001353 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001354 sockets.append(sock)
1355 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001356 sock.setsockopt(
1357 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1358 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001359 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001360 # Disable IPv4/IPv6 dual stack support (enabled by
1361 # default on Linux) which makes a single socket
1362 # listen on both address families.
Miss Islington (bot)3ed44142018-06-28 19:16:48 -07001363 if (_HAS_IPv6 and
1364 af == socket.AF_INET6 and
1365 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001366 sock.setsockopt(socket.IPPROTO_IPV6,
1367 socket.IPV6_V6ONLY,
1368 True)
1369 try:
1370 sock.bind(sa)
1371 except OSError as err:
1372 raise OSError(err.errno, 'error while attempting '
1373 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001374 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001375 completed = True
1376 finally:
1377 if not completed:
1378 for sock in sockets:
1379 sock.close()
1380 else:
1381 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001382 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001383 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001384 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001385 sockets = [sock]
1386
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001387 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001388 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001389
1390 server = Server(self, sockets, protocol_factory,
1391 ssl, backlog, ssl_handshake_timeout)
1392 if start_serving:
1393 server._start_serving()
Miss Islington (bot)bc3a0022018-05-28 11:50:45 -07001394 # Skip one loop iteration so that all 'loop.add_reader'
1395 # go through.
1396 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001397
Victor Stinnere912e652014-07-12 03:11:53 +02001398 if self._debug:
1399 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001400 return server
1401
Neil Aspinallf7686c12017-12-19 19:45:42 +00001402 async def connect_accepted_socket(
1403 self, protocol_factory, sock,
1404 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001405 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001406 """Handle an accepted connection.
1407
1408 This is used by servers that accept connections outside of
1409 asyncio but that use asyncio to handle connections.
1410
1411 This method is a coroutine. When completed, the coroutine
1412 returns a (transport, protocol) pair.
1413 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001414 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001415 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001416
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001417 if ssl_handshake_timeout is not None and not ssl:
1418 raise ValueError(
1419 'ssl_handshake_timeout is only meaningful with ssl')
1420
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001421 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001422 sock, protocol_factory, ssl, '', server_side=True,
1423 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001424 if self._debug:
1425 # Get the socket from the transport because SSL transport closes
1426 # the old socket and creates a new SSL socket
1427 sock = transport.get_extra_info('socket')
1428 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1429 return transport, protocol
1430
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001431 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001432 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001433 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001434 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001435
1436 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001437 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001438 except:
1439 transport.close()
1440 raise
1441
Victor Stinneracdb7822014-07-14 18:33:40 +02001442 if self._debug:
1443 logger.debug('Read pipe %r connected: (%r, %r)',
1444 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001445 return transport, protocol
1446
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001447 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001448 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001449 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001450 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001451
1452 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001453 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001454 except:
1455 transport.close()
1456 raise
1457
Victor Stinneracdb7822014-07-14 18:33:40 +02001458 if self._debug:
1459 logger.debug('Write pipe %r connected: (%r, %r)',
1460 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001461 return transport, protocol
1462
Victor Stinneracdb7822014-07-14 18:33:40 +02001463 def _log_subprocess(self, msg, stdin, stdout, stderr):
1464 info = [msg]
1465 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001466 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001467 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001468 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001469 else:
1470 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001471 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001472 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001473 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001474 logger.debug(' '.join(info))
1475
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001476 async def subprocess_shell(self, protocol_factory, cmd, *,
1477 stdin=subprocess.PIPE,
1478 stdout=subprocess.PIPE,
1479 stderr=subprocess.PIPE,
1480 universal_newlines=False,
1481 shell=True, bufsize=0,
1482 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001483 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001484 raise ValueError("cmd must be a string")
1485 if universal_newlines:
1486 raise ValueError("universal_newlines must be False")
1487 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001488 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001489 if bufsize != 0:
1490 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001491 protocol = protocol_factory()
Miss Islington (bot)21f4c782018-06-08 15:42:07 -07001492 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001493 if self._debug:
1494 # don't log parameters: they may contain sensitive information
1495 # (password) and may be too long
1496 debug_log = 'run shell command %r' % cmd
1497 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001498 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001499 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Miss Islington (bot)21f4c782018-06-08 15:42:07 -07001500 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001501 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001502 return transport, protocol
1503
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001504 async def subprocess_exec(self, protocol_factory, program, *args,
1505 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1506 stderr=subprocess.PIPE, universal_newlines=False,
1507 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001508 if universal_newlines:
1509 raise ValueError("universal_newlines must be False")
1510 if shell:
1511 raise ValueError("shell must be False")
1512 if bufsize != 0:
1513 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001514 popen_args = (program,) + args
1515 for arg in popen_args:
1516 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001517 raise TypeError(
1518 f"program arguments must be a bytes or text string, "
1519 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001520 protocol = protocol_factory()
Miss Islington (bot)21f4c782018-06-08 15:42:07 -07001521 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001522 if self._debug:
1523 # don't log parameters: they may contain sensitive information
1524 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001525 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001526 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001527 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001528 protocol, popen_args, False, stdin, stdout, stderr,
1529 bufsize, **kwargs)
Miss Islington (bot)21f4c782018-06-08 15:42:07 -07001530 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001531 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001532 return transport, protocol
1533
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001534 def get_exception_handler(self):
1535 """Return an exception handler, or None if the default one is in use.
1536 """
1537 return self._exception_handler
1538
Yury Selivanov569efa22014-02-18 18:02:19 -05001539 def set_exception_handler(self, handler):
1540 """Set handler as the new event loop exception handler.
1541
1542 If handler is None, the default exception handler will
1543 be set.
1544
1545 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001546 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001547 will be a reference to the active event loop, 'context'
1548 will be a dict object (see `call_exception_handler()`
1549 documentation for details about context).
1550 """
1551 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001552 raise TypeError(f'A callable object or None is expected, '
1553 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001554 self._exception_handler = handler
1555
1556 def default_exception_handler(self, context):
1557 """Default exception handler.
1558
1559 This is called when an exception occurs and no exception
1560 handler is set, and can be called by a custom exception
1561 handler that wants to defer to the default behavior.
1562
Antoine Pitrou921e9432017-11-07 17:23:29 +01001563 This default handler logs the error message and other
1564 context-dependent information. In debug mode, a truncated
1565 stack trace is also appended showing where the given object
1566 (e.g. a handle or future or task) was created, if any.
1567
Victor Stinneracdb7822014-07-14 18:33:40 +02001568 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001569 `call_exception_handler()`.
1570 """
1571 message = context.get('message')
1572 if not message:
1573 message = 'Unhandled exception in event loop'
1574
1575 exception = context.get('exception')
1576 if exception is not None:
1577 exc_info = (type(exception), exception, exception.__traceback__)
1578 else:
1579 exc_info = False
1580
Yury Selivanov6370f342017-12-10 18:36:12 -05001581 if ('source_traceback' not in context and
1582 self._current_handle is not None and
1583 self._current_handle._source_traceback):
1584 context['handle_traceback'] = \
1585 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001586
Yury Selivanov569efa22014-02-18 18:02:19 -05001587 log_lines = [message]
1588 for key in sorted(context):
1589 if key in {'message', 'exception'}:
1590 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001591 value = context[key]
1592 if key == 'source_traceback':
1593 tb = ''.join(traceback.format_list(value))
1594 value = 'Object created at (most recent call last):\n'
1595 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001596 elif key == 'handle_traceback':
1597 tb = ''.join(traceback.format_list(value))
1598 value = 'Handle created at (most recent call last):\n'
1599 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001600 else:
1601 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001602 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001603
1604 logger.error('\n'.join(log_lines), exc_info=exc_info)
1605
1606 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001607 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001608
Victor Stinneracdb7822014-07-14 18:33:40 +02001609 The context argument is a dict containing the following keys:
1610
Yury Selivanov569efa22014-02-18 18:02:19 -05001611 - 'message': Error message;
1612 - 'exception' (optional): Exception object;
1613 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001614 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001615 - 'handle' (optional): Handle instance;
1616 - 'protocol' (optional): Protocol instance;
1617 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001618 - 'socket' (optional): Socket instance;
1619 - 'asyncgen' (optional): Asynchronous generator that caused
1620 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001621
Victor Stinneracdb7822014-07-14 18:33:40 +02001622 New keys maybe introduced in the future.
1623
1624 Note: do not overload this method in an event loop subclass.
1625 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001626 `set_exception_handler()` method.
1627 """
1628 if self._exception_handler is None:
1629 try:
1630 self.default_exception_handler(context)
1631 except Exception:
1632 # Second protection layer for unexpected errors
1633 # in the default implementation, as well as for subclassed
1634 # event loops with overloaded "default_exception_handler".
1635 logger.error('Exception in default exception handler',
1636 exc_info=True)
1637 else:
1638 try:
1639 self._exception_handler(self, context)
1640 except Exception as exc:
1641 # Exception in the user set custom exception handler.
1642 try:
1643 # Let's try default handler.
1644 self.default_exception_handler({
1645 'message': 'Unhandled error in exception handler',
1646 'exception': exc,
1647 'context': context,
1648 })
1649 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001650 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001651 # overloaded.
1652 logger.error('Exception in default exception handler '
1653 'while handling an unexpected error '
1654 'in custom exception handler',
1655 exc_info=True)
1656
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001657 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001658 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001659 assert isinstance(handle, events.Handle), 'A Handle is required here'
1660 if handle._cancelled:
1661 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001662 assert not isinstance(handle, events.TimerHandle)
1663 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001664
1665 def _add_callback_signalsafe(self, handle):
1666 """Like _add_callback() but called from a signal handler."""
1667 self._add_callback(handle)
1668 self._write_to_self()
1669
Yury Selivanov592ada92014-09-25 12:07:56 -04001670 def _timer_handle_cancelled(self, handle):
1671 """Notification that a TimerHandle has been cancelled."""
1672 if handle._scheduled:
1673 self._timer_cancelled_count += 1
1674
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001675 def _run_once(self):
1676 """Run one full iteration of the event loop.
1677
1678 This calls all currently ready callbacks, polls for I/O,
1679 schedules the resulting callbacks, and finally schedules
1680 'call_later' callbacks.
1681 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001682
Yury Selivanov592ada92014-09-25 12:07:56 -04001683 sched_count = len(self._scheduled)
1684 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1685 self._timer_cancelled_count / sched_count >
1686 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001687 # Remove delayed calls that were cancelled if their number
1688 # is too high
1689 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001690 for handle in self._scheduled:
1691 if handle._cancelled:
1692 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001693 else:
1694 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001695
Victor Stinner68da8fc2014-09-30 18:08:36 +02001696 heapq.heapify(new_scheduled)
1697 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001698 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001699 else:
1700 # Remove delayed calls that were cancelled from head of queue.
1701 while self._scheduled and self._scheduled[0]._cancelled:
1702 self._timer_cancelled_count -= 1
1703 handle = heapq.heappop(self._scheduled)
1704 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001705
1706 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001707 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001708 timeout = 0
1709 elif self._scheduled:
1710 # Compute the desired timeout.
1711 when = self._scheduled[0]._when
Miss Islington (bot)172a81e2018-07-31 08:29:07 -07001712 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001713
Victor Stinner770e48d2014-07-11 11:58:33 +02001714 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001715 t0 = self.time()
1716 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001717 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001718 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001719 level = logging.INFO
1720 else:
1721 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001722 nevent = len(event_list)
1723 if timeout is None:
1724 logger.log(level, 'poll took %.3f ms: %s events',
1725 dt * 1e3, nevent)
1726 elif nevent:
1727 logger.log(level,
1728 'poll %.3f ms took %.3f ms: %s events',
1729 timeout * 1e3, dt * 1e3, nevent)
1730 elif dt >= 1.0:
1731 logger.log(level,
1732 'poll %.3f ms took %.3f ms: timeout',
1733 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001734 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001735 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001736 self._process_events(event_list)
1737
1738 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001739 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001740 while self._scheduled:
1741 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001742 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001743 break
1744 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001745 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001746 self._ready.append(handle)
1747
1748 # This is the only place where callbacks are actually *called*.
1749 # All other places just add them to ready.
1750 # Note: We run all currently scheduled callbacks, but not any
1751 # callbacks scheduled by callbacks run this time around --
1752 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001753 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001754 ntodo = len(self._ready)
1755 for i in range(ntodo):
1756 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001757 if handle._cancelled:
1758 continue
1759 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001760 try:
1761 self._current_handle = handle
1762 t0 = self.time()
1763 handle._run()
1764 dt = self.time() - t0
1765 if dt >= self.slow_callback_duration:
1766 logger.warning('Executing %s took %.3f seconds',
1767 _format_handle(handle), dt)
1768 finally:
1769 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001770 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001771 handle._run()
1772 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001773
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001774 def _set_coroutine_origin_tracking(self, enabled):
1775 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001776 return
1777
Yury Selivanove8944cb2015-05-12 11:43:04 -04001778 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001779 self._coroutine_origin_tracking_saved_depth = (
1780 sys.get_coroutine_origin_tracking_depth())
1781 sys.set_coroutine_origin_tracking_depth(
1782 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001783 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001784 sys.set_coroutine_origin_tracking_depth(
1785 self._coroutine_origin_tracking_saved_depth)
1786
1787 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001788
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001789 def get_debug(self):
1790 return self._debug
1791
1792 def set_debug(self, enabled):
1793 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001794
Yury Selivanove8944cb2015-05-12 11:43:04 -04001795 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001796 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)