blob: 36fe7e0076c969b0ab8ed7663b8c534e06406d98 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Victor Stinnerb75380f2014-06-30 14:39:11 +020021import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022import socket
23import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010024import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010028import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070029import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
Yury Selivanovf111b3d2017-12-30 00:35:36 -050031try:
32 import ssl
33except ImportError: # pragma: no cover
34 ssl = None
35
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080036from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020037from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070039from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020041from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050042from . import sslproto
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020044from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070045from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
47
Yury Selivanov6370f342017-12-10 18:36:12 -050048__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
50
Yury Selivanov592ada92014-09-25 12:07:56 -040051# Minimum number of _scheduled timer handles before cleanup of
52# cancelled handles is performed.
53_MIN_SCHEDULED_TIMER_HANDLES = 100
54
55# Minimum fraction of _scheduled timer handles that are cancelled
56# before cleanup of cancelled handles is performed.
57_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058
Victor Stinnerc94a93a2016-04-01 21:43:39 +020059# Exceptions which must not call the exception handler in fatal error
60# methods (_fatal_error())
61_FATAL_ERROR_IGNORE = (BrokenPipeError,
62 ConnectionResetError, ConnectionAbortedError)
63
Andrew Svetlov0dd71802018-09-12 14:03:54 -070064if ssl is not None:
65 _FATAL_ERROR_IGNORE = _FATAL_ERROR_IGNORE + (ssl.SSLCertVerificationError,)
66
Yury Selivanovd904c232018-06-28 21:59:32 -040067_HAS_IPv6 = hasattr(socket, 'AF_INET6')
68
MartinAltmayer944451c2018-07-31 15:06:12 +010069# Maximum timeout passed to select to avoid OS limitations
70MAXIMUM_SELECT_TIMEOUT = 24 * 3600
71
Victor Stinnerc94a93a2016-04-01 21:43:39 +020072
Victor Stinner0e6f52a2014-06-20 17:34:15 +020073def _format_handle(handle):
74 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040075 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020076 # format the task
77 return repr(cb.__self__)
78 else:
79 return str(handle)
80
81
Victor Stinneracdb7822014-07-14 18:33:40 +020082def _format_pipe(fd):
83 if fd == subprocess.PIPE:
84 return '<pipe>'
85 elif fd == subprocess.STDOUT:
86 return '<stdout>'
87 else:
88 return repr(fd)
89
90
Yury Selivanov5587d7c2016-09-15 15:45:07 -040091def _set_reuseport(sock):
92 if not hasattr(socket, 'SO_REUSEPORT'):
93 raise ValueError('reuse_port not supported by socket module')
94 else:
95 try:
96 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
97 except OSError:
98 raise ValueError('reuse_port not supported by socket module, '
99 'SO_REUSEPORT defined but not implemented.')
100
101
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500102def _ipaddr_info(host, port, family, type, proto):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400103 # Try to skip getaddrinfo if "host" is already an IP. Users might have
104 # handled name resolution in their own code and pass in resolved IPs.
105 if not hasattr(socket, 'inet_pton'):
106 return
107
108 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
109 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500110 return None
111
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500112 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500113 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500114 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500115 proto = socket.IPPROTO_UDP
116 else:
117 return None
118
Yury Selivanova7146162016-06-02 16:51:07 -0400119 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400120 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700121 elif isinstance(port, bytes) and port == b'':
122 port = 0
123 elif isinstance(port, str) and port == '':
124 port = 0
125 else:
126 # If port's a service name like "http", don't skip getaddrinfo.
127 try:
128 port = int(port)
129 except (TypeError, ValueError):
130 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400131
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400132 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500133 afs = [socket.AF_INET]
Yury Selivanovd904c232018-06-28 21:59:32 -0400134 if _HAS_IPv6:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500135 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400136 else:
137 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500138
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400139 if isinstance(host, bytes):
140 host = host.decode('idna')
141 if '%' in host:
142 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
143 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500144 return None
145
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400146 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500147 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400148 socket.inet_pton(af, host)
149 # The host has already been resolved.
Yury Selivanovd904c232018-06-28 21:59:32 -0400150 if _HAS_IPv6 and af == socket.AF_INET6:
151 return af, type, proto, '', (host, port, 0, 0)
152 else:
153 return af, type, proto, '', (host, port)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400154 except OSError:
155 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500156
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400157 # "host" is not an IP address.
158 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500159
160
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100161def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500162 if not fut.cancelled():
163 exc = fut.exception()
164 if isinstance(exc, BaseException) and not isinstance(exc, Exception):
165 # Issue #22429: run_forever() already finished, no need to
166 # stop it.
167 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500168 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100169
170
Andrew Svetlov3bc0eba2018-12-03 21:08:13 +0200171if hasattr(socket, 'TCP_NODELAY'):
172 def _set_nodelay(sock):
173 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
174 sock.type == socket.SOCK_STREAM and
175 sock.proto == socket.IPPROTO_TCP):
176 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
177else:
178 def _set_nodelay(sock):
179 pass
180
181
Andrew Svetlov7c684072018-01-27 21:22:47 +0200182class _SendfileFallbackProtocol(protocols.Protocol):
183 def __init__(self, transp):
184 if not isinstance(transp, transports._FlowControlMixin):
185 raise TypeError("transport should be _FlowControlMixin instance")
186 self._transport = transp
187 self._proto = transp.get_protocol()
188 self._should_resume_reading = transp.is_reading()
189 self._should_resume_writing = transp._protocol_paused
190 transp.pause_reading()
191 transp.set_protocol(self)
192 if self._should_resume_writing:
193 self._write_ready_fut = self._transport._loop.create_future()
194 else:
195 self._write_ready_fut = None
196
197 async def drain(self):
198 if self._transport.is_closing():
199 raise ConnectionError("Connection closed by peer")
200 fut = self._write_ready_fut
201 if fut is None:
202 return
203 await fut
204
205 def connection_made(self, transport):
206 raise RuntimeError("Invalid state: "
207 "connection should have been established already.")
208
209 def connection_lost(self, exc):
210 if self._write_ready_fut is not None:
211 # Never happens if peer disconnects after sending the whole content
212 # Thus disconnection is always an exception from user perspective
213 if exc is None:
214 self._write_ready_fut.set_exception(
215 ConnectionError("Connection is closed by peer"))
216 else:
217 self._write_ready_fut.set_exception(exc)
218 self._proto.connection_lost(exc)
219
220 def pause_writing(self):
221 if self._write_ready_fut is not None:
222 return
223 self._write_ready_fut = self._transport._loop.create_future()
224
225 def resume_writing(self):
226 if self._write_ready_fut is None:
227 return
228 self._write_ready_fut.set_result(False)
229 self._write_ready_fut = None
230
231 def data_received(self, data):
232 raise RuntimeError("Invalid state: reading should be paused")
233
234 def eof_received(self):
235 raise RuntimeError("Invalid state: reading should be paused")
236
237 async def restore(self):
238 self._transport.set_protocol(self._proto)
239 if self._should_resume_reading:
240 self._transport.resume_reading()
241 if self._write_ready_fut is not None:
242 # Cancel the future.
243 # Basically it has no effect because protocol is switched back,
244 # no code should wait for it anymore.
245 self._write_ready_fut.cancel()
246 if self._should_resume_writing:
247 self._proto.resume_writing()
248
249
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250class Server(events.AbstractServer):
251
Yury Selivanovc9070d02018-01-25 18:08:09 -0500252 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
253 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200254 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500255 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200256 self._active_count = 0
257 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500258 self._protocol_factory = protocol_factory
259 self._backlog = backlog
260 self._ssl_context = ssl_context
261 self._ssl_handshake_timeout = ssl_handshake_timeout
262 self._serving = False
263 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264
Victor Stinnere912e652014-07-12 03:11:53 +0200265 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500266 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200267
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200268 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500269 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200270 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200272 def _detach(self):
273 assert self._active_count > 0
274 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500275 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 self._wakeup()
277
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200279 waiters = self._waiters
280 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 for waiter in waiters:
282 if not waiter.done():
283 waiter.set_result(waiter)
284
Yury Selivanovc9070d02018-01-25 18:08:09 -0500285 def _start_serving(self):
286 if self._serving:
287 return
288 self._serving = True
289 for sock in self._sockets:
290 sock.listen(self._backlog)
291 self._loop._start_serving(
292 self._protocol_factory, sock, self._ssl_context,
293 self, self._backlog, self._ssl_handshake_timeout)
294
295 def get_loop(self):
296 return self._loop
297
298 def is_serving(self):
299 return self._serving
300
301 @property
302 def sockets(self):
303 if self._sockets is None:
304 return []
305 return list(self._sockets)
306
307 def close(self):
308 sockets = self._sockets
309 if sockets is None:
310 return
311 self._sockets = None
312
313 for sock in sockets:
314 self._loop._stop_serving(sock)
315
316 self._serving = False
317
318 if (self._serving_forever_fut is not None and
319 not self._serving_forever_fut.done()):
320 self._serving_forever_fut.cancel()
321 self._serving_forever_fut = None
322
323 if self._active_count == 0:
324 self._wakeup()
325
326 async def start_serving(self):
327 self._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400328 # Skip one loop iteration so that all 'loop.add_reader'
329 # go through.
330 await tasks.sleep(0, loop=self._loop)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500331
332 async def serve_forever(self):
333 if self._serving_forever_fut is not None:
334 raise RuntimeError(
335 f'server {self!r} is already being awaited on serve_forever()')
336 if self._sockets is None:
337 raise RuntimeError(f'server {self!r} is closed')
338
339 self._start_serving()
340 self._serving_forever_fut = self._loop.create_future()
341
342 try:
343 await self._serving_forever_fut
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700344 except exceptions.CancelledError:
Yury Selivanovc9070d02018-01-25 18:08:09 -0500345 try:
346 self.close()
347 await self.wait_closed()
348 finally:
349 raise
350 finally:
351 self._serving_forever_fut = None
352
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200353 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500354 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400356 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200357 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200358 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359
360
361class BaseEventLoop(events.AbstractEventLoop):
362
363 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400364 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200365 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800366 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 self._ready = collections.deque()
368 self._scheduled = []
369 self._default_executor = None
370 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100371 # Identifier of the thread running the event loop, or None if the
372 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100373 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100374 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500375 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800376 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200377 # In debug mode, if the execution of a callback or a step of a task
378 # exceed this duration in seconds, the slow callback/task is logged.
379 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100380 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400381 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800382 self._coroutine_origin_tracking_enabled = False
383 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500385 # A weak set of all asynchronous generators that are
386 # being iterated by the loop.
387 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700388 # Set to True when `loop.shutdown_asyncgens` is called.
389 self._asyncgens_shutdown_called = False
390
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200391 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500392 return (
393 f'<{self.__class__.__name__} running={self.is_running()} '
394 f'closed={self.is_closed()} debug={self.get_debug()}>'
395 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200396
Yury Selivanov7661db62016-05-16 15:38:39 -0400397 def create_future(self):
398 """Create a Future object attached to the loop."""
399 return futures.Future(loop=self)
400
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300401 def create_task(self, coro, *, name=None):
Victor Stinner896a25a2014-07-08 11:29:25 +0200402 """Schedule a coroutine object.
403
Victor Stinneracdb7822014-07-14 18:33:40 +0200404 Return a task object.
405 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100406 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400407 if self._task_factory is None:
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300408 task = tasks.Task(coro, loop=self, name=name)
Yury Selivanov740169c2015-05-11 14:23:38 -0400409 if task._source_traceback:
410 del task._source_traceback[-1]
411 else:
412 task = self._task_factory(self, coro)
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300413 tasks._set_task_name(task, name)
414
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200415 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200416
Yury Selivanov740169c2015-05-11 14:23:38 -0400417 def set_task_factory(self, factory):
418 """Set a task factory that will be used by loop.create_task().
419
420 If factory is None the default task factory will be set.
421
422 If factory is a callable, it should have a signature matching
423 '(loop, coro)', where 'loop' will be a reference to the active
424 event loop, 'coro' will be a coroutine object. The callable
425 must return a Future.
426 """
427 if factory is not None and not callable(factory):
428 raise TypeError('task factory must be a callable or None')
429 self._task_factory = factory
430
431 def get_task_factory(self):
432 """Return a task factory, or None if the default one is in use."""
433 return self._task_factory
434
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 def _make_socket_transport(self, sock, protocol, waiter=None, *,
436 extra=None, server=None):
437 """Create socket transport."""
438 raise NotImplementedError
439
Neil Aspinallf7686c12017-12-19 19:45:42 +0000440 def _make_ssl_transport(
441 self, rawsock, protocol, sslcontext, waiter=None,
442 *, server_side=False, server_hostname=None,
443 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500444 ssl_handshake_timeout=None,
445 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 """Create SSL transport."""
447 raise NotImplementedError
448
449 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200450 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 """Create datagram transport."""
452 raise NotImplementedError
453
454 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
455 extra=None):
456 """Create read pipe transport."""
457 raise NotImplementedError
458
459 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
460 extra=None):
461 """Create write pipe transport."""
462 raise NotImplementedError
463
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200464 async def _make_subprocess_transport(self, protocol, args, shell,
465 stdin, stdout, stderr, bufsize,
466 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 """Create subprocess transport."""
468 raise NotImplementedError
469
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200471 """Write a byte to self-pipe, to wake up the event loop.
472
473 This may be called from a different thread.
474
475 The subclass is responsible for implementing the self-pipe.
476 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 raise NotImplementedError
478
479 def _process_events(self, event_list):
480 """Process selector events."""
481 raise NotImplementedError
482
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200483 def _check_closed(self):
484 if self._closed:
485 raise RuntimeError('Event loop is closed')
486
Yury Selivanoveb636452016-09-08 22:01:51 -0700487 def _asyncgen_finalizer_hook(self, agen):
488 self._asyncgens.discard(agen)
489 if not self.is_closed():
twisteroid ambassadorc880ffe2018-10-09 23:30:21 +0800490 self.call_soon_threadsafe(self.create_task, agen.aclose())
Yury Selivanoveb636452016-09-08 22:01:51 -0700491
492 def _asyncgen_firstiter_hook(self, agen):
493 if self._asyncgens_shutdown_called:
494 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500495 f"asynchronous generator {agen!r} was scheduled after "
496 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700497 ResourceWarning, source=self)
498
499 self._asyncgens.add(agen)
500
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200501 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700502 """Shutdown all active asynchronous generators."""
503 self._asyncgens_shutdown_called = True
504
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500505 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400506 # If Python version is <3.6 or we don't have any asynchronous
507 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700508 return
509
510 closing_agens = list(self._asyncgens)
511 self._asyncgens.clear()
512
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200513 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700514 *[ag.aclose() for ag in closing_agens],
515 return_exceptions=True,
516 loop=self)
517
Yury Selivanoveb636452016-09-08 22:01:51 -0700518 for result, agen in zip(results, closing_agens):
519 if isinstance(result, Exception):
520 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500521 'message': f'an error occurred during closing of '
522 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700523 'exception': result,
524 'asyncgen': agen
525 })
526
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 def run_forever(self):
528 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200529 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100530 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400531 raise RuntimeError('This event loop is already running')
532 if events._get_running_loop() is not None:
533 raise RuntimeError(
534 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800535 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100536 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500537
538 old_agen_hooks = sys.get_asyncgen_hooks()
539 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
540 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400542 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800544 self._run_once()
545 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 break
547 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800548 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100549 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400550 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800551 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500552 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
554 def run_until_complete(self, future):
555 """Run until the Future is done.
556
557 If the argument is a coroutine, it is wrapped in a Task.
558
Victor Stinneracdb7822014-07-14 18:33:40 +0200559 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 with the same coroutine twice -- it would wrap it in two
561 different Tasks and that can't be good.
562
563 Return the Future's result, or raise its exception.
564 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200565 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200566
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700567 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400568 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200569 if new_task:
570 # An exception is raised if the future didn't complete, so there
571 # is no need to log the "destroy pending task" message
572 future._log_destroy_pending = False
573
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100574 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200575 try:
576 self.run_forever()
577 except:
578 if new_task and future.done() and not future.cancelled():
579 # The coroutine raised a BaseException. Consume the exception
580 # to not log a warning, the caller doesn't have access to the
581 # local task.
582 future.exception()
583 raise
jimmylai21b3e042017-05-22 22:32:46 -0700584 finally:
585 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 if not future.done():
587 raise RuntimeError('Event loop stopped before Future completed.')
588
589 return future.result()
590
591 def stop(self):
592 """Stop running the event loop.
593
Guido van Rossum41f69f42015-11-19 13:28:47 -0800594 Every callback already scheduled will still run. This simply informs
595 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800597 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200599 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700600 """Close the event loop.
601
602 This clears the queues and shuts down the executor,
603 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200604
605 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700606 """
Victor Stinner956de692014-12-26 21:07:52 +0100607 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200608 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200609 if self._closed:
610 return
Victor Stinnere912e652014-07-12 03:11:53 +0200611 if self._debug:
612 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400613 self._closed = True
614 self._ready.clear()
615 self._scheduled.clear()
616 executor = self._default_executor
617 if executor is not None:
618 self._default_executor = None
619 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200620
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200621 def is_closed(self):
622 """Returns True if the event loop was closed."""
623 return self._closed
624
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100625 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900626 if not self.is_closed():
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100627 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900628 if not self.is_running():
629 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100630
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200632 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100633 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634
635 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200636 """Return the time according to the event loop's clock.
637
638 This is a float expressed in seconds since an epoch, but the
639 epoch, precision, accuracy and drift are unspecified and may
640 differ per event loop.
641 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 return time.monotonic()
643
Yury Selivanovf23746a2018-01-22 19:11:18 -0500644 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645 """Arrange for a callback to be called at a given time.
646
647 Return a Handle: an opaque object with a cancel() method that
648 can be used to cancel the call.
649
650 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200651 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652
653 Each callback will be called exactly once. If two callbacks
654 are scheduled for exactly the same time, it undefined which
655 will be called first.
656
657 Any positional arguments after the callback will be passed to
658 the callback when it is called.
659 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500660 timer = self.call_at(self.time() + delay, callback, *args,
661 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200662 if timer._source_traceback:
663 del timer._source_traceback[-1]
664 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
Yury Selivanovf23746a2018-01-22 19:11:18 -0500666 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200667 """Like call_later(), but uses an absolute time.
668
669 Absolute time corresponds to the event loop's time() method.
670 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100671 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100672 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100673 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700674 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500675 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200676 if timer._source_traceback:
677 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400679 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 return timer
681
Yury Selivanovf23746a2018-01-22 19:11:18 -0500682 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683 """Arrange for a callback to be called as soon as possible.
684
Victor Stinneracdb7822014-07-14 18:33:40 +0200685 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 order in which they are registered. Each callback will be
687 called exactly once.
688
689 Any positional arguments after the callback will be passed to
690 the callback when it is called.
691 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700692 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100693 if self._debug:
694 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700695 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500696 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200697 if handle._source_traceback:
698 del handle._source_traceback[-1]
699 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100700
Yury Selivanov491a9122016-11-03 15:09:24 -0700701 def _check_callback(self, callback, method):
702 if (coroutines.iscoroutine(callback) or
703 coroutines.iscoroutinefunction(callback)):
704 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500705 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700706 if not callable(callback):
707 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500708 f'a callable object was expected by {method}(), '
709 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700710
Yury Selivanovf23746a2018-01-22 19:11:18 -0500711 def _call_soon(self, callback, args, context):
712 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200713 if handle._source_traceback:
714 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 self._ready.append(handle)
716 return handle
717
Victor Stinner956de692014-12-26 21:07:52 +0100718 def _check_thread(self):
719 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100720
Victor Stinneracdb7822014-07-14 18:33:40 +0200721 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100722 likely behave incorrectly when the assumption is violated.
723
Victor Stinneracdb7822014-07-14 18:33:40 +0200724 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100725 responsible for checking this condition for performance reasons.
726 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100727 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200728 return
Victor Stinner956de692014-12-26 21:07:52 +0100729 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100730 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100731 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200732 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100733 "than the current one")
734
Yury Selivanovf23746a2018-01-22 19:11:18 -0500735 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200736 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700737 self._check_closed()
738 if self._debug:
739 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500740 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200741 if handle._source_traceback:
742 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743 self._write_to_self()
744 return handle
745
Yury Selivanovbec23722018-01-28 14:09:40 -0500746 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100747 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700748 if self._debug:
749 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750 if executor is None:
751 executor = self._default_executor
752 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400753 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500755 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500756 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700757
758 def set_default_executor(self, executor):
Elvis Pranskevichus22d25082018-07-30 11:42:43 +0100759 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
760 warnings.warn(
761 'Using the default executor that is not an instance of '
762 'ThreadPoolExecutor is deprecated and will be prohibited '
763 'in Python 3.9',
764 DeprecationWarning, 2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700765 self._default_executor = executor
766
Victor Stinnere912e652014-07-12 03:11:53 +0200767 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500768 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200769 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500770 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200771 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500772 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200773 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500774 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200775 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500776 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200777 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200778 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200779
780 t0 = self.time()
781 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
782 dt = self.time() - t0
783
Yury Selivanov6370f342017-12-10 18:36:12 -0500784 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200785 if dt >= self.slow_callback_duration:
786 logger.info(msg)
787 else:
788 logger.debug(msg)
789 return addrinfo
790
Yury Selivanov19a44f62017-12-14 20:53:26 -0500791 async def getaddrinfo(self, host, port, *,
792 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400793 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500794 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200795 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500796 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700797
Yury Selivanov19a44f62017-12-14 20:53:26 -0500798 return await self.run_in_executor(
799 None, getaddr_func, host, port, family, type, proto, flags)
800
801 async def getnameinfo(self, sockaddr, flags=0):
802 return await self.run_in_executor(
803 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700804
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200805 async def sock_sendfile(self, sock, file, offset=0, count=None,
806 *, fallback=True):
807 if self._debug and sock.gettimeout() != 0:
808 raise ValueError("the socket must be non-blocking")
809 self._check_sendfile_params(sock, file, offset, count)
810 try:
811 return await self._sock_sendfile_native(sock, file,
812 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700813 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200814 if not fallback:
815 raise
816 return await self._sock_sendfile_fallback(sock, file,
817 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200818
819 async def _sock_sendfile_native(self, sock, file, offset, count):
820 # NB: sendfile syscall is not supported for SSL sockets and
821 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700822 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200823 f"syscall sendfile is not available for socket {sock!r} "
824 "and file {file!r} combination")
825
826 async def _sock_sendfile_fallback(self, sock, file, offset, count):
827 if offset:
828 file.seek(offset)
Yury Selivanov71657542018-05-28 18:31:55 -0400829 blocksize = (
830 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
831 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
832 )
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200833 buf = bytearray(blocksize)
834 total_sent = 0
835 try:
836 while True:
837 if count:
838 blocksize = min(count - total_sent, blocksize)
839 if blocksize <= 0:
840 break
841 view = memoryview(buf)[:blocksize]
Yury Selivanov71657542018-05-28 18:31:55 -0400842 read = await self.run_in_executor(None, file.readinto, view)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200843 if not read:
844 break # EOF
845 await self.sock_sendall(sock, view)
846 total_sent += read
847 return total_sent
848 finally:
849 if total_sent > 0 and hasattr(file, 'seek'):
850 file.seek(offset + total_sent)
851
852 def _check_sendfile_params(self, sock, file, offset, count):
853 if 'b' not in getattr(file, 'mode', 'b'):
854 raise ValueError("file should be opened in binary mode")
855 if not sock.type == socket.SOCK_STREAM:
856 raise ValueError("only SOCK_STREAM type sockets are supported")
857 if count is not None:
858 if not isinstance(count, int):
859 raise TypeError(
860 "count must be a positive integer (got {!r})".format(count))
861 if count <= 0:
862 raise ValueError(
863 "count must be a positive integer (got {!r})".format(count))
864 if not isinstance(offset, int):
865 raise TypeError(
866 "offset must be a non-negative integer (got {!r})".format(
867 offset))
868 if offset < 0:
869 raise ValueError(
870 "offset must be a non-negative integer (got {!r})".format(
871 offset))
872
Neil Aspinallf7686c12017-12-19 19:45:42 +0000873 async def create_connection(
874 self, protocol_factory, host=None, port=None,
875 *, ssl=None, family=0,
876 proto=0, flags=0, sock=None,
877 local_addr=None, server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200878 ssl_handshake_timeout=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200879 """Connect to a TCP server.
880
881 Create a streaming transport connection to a given Internet host and
882 port: socket family AF_INET or socket.AF_INET6 depending on host (or
883 family if specified), socket type SOCK_STREAM. protocol_factory must be
884 a callable returning a protocol instance.
885
886 This method is a coroutine which will try to establish the connection
887 in the background. When successful, the coroutine returns a
888 (transport, protocol) pair.
889 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700890 if server_hostname is not None and not ssl:
891 raise ValueError('server_hostname is only meaningful with ssl')
892
893 if server_hostname is None and ssl:
894 # Use host as default for server_hostname. It is an error
895 # if host is empty or not set, e.g. when an
896 # already-connected socket was passed or when only a port
897 # is given. To avoid this error, you can pass
898 # server_hostname='' -- this will bypass the hostname
899 # check. (This also means that if host is a numeric
900 # IP/IPv6 address, we will attempt to verify that exact
901 # address; this will probably fail, but it is possible to
902 # create a certificate for a specific IP address, so we
903 # don't judge it here.)
904 if not host:
905 raise ValueError('You must set server_hostname '
906 'when using ssl without a host')
907 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700908
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200909 if ssl_handshake_timeout is not None and not ssl:
910 raise ValueError(
911 'ssl_handshake_timeout is only meaningful with ssl')
912
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700913 if host is not None or port is not None:
914 if sock is not None:
915 raise ValueError(
916 'host/port and sock can not be specified at the same time')
917
Yury Selivanov19a44f62017-12-14 20:53:26 -0500918 infos = await self._ensure_resolved(
919 (host, port), family=family,
920 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700921 if not infos:
922 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500923
924 if local_addr is not None:
925 laddr_infos = await self._ensure_resolved(
926 local_addr, family=family,
927 type=socket.SOCK_STREAM, proto=proto,
928 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700929 if not laddr_infos:
930 raise OSError('getaddrinfo() returned empty list')
931
932 exceptions = []
933 for family, type, proto, cname, address in infos:
934 try:
935 sock = socket.socket(family=family, type=type, proto=proto)
936 sock.setblocking(False)
Yury Selivanov19a44f62017-12-14 20:53:26 -0500937 if local_addr is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700938 for _, _, _, _, laddr in laddr_infos:
939 try:
940 sock.bind(laddr)
941 break
942 except OSError as exc:
Yury Selivanov6370f342017-12-10 18:36:12 -0500943 msg = (
944 f'error while attempting to bind on '
945 f'address {laddr!r}: '
946 f'{exc.strerror.lower()}'
947 )
948 exc = OSError(exc.errno, msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700949 exceptions.append(exc)
950 else:
951 sock.close()
952 sock = None
953 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200954 if self._debug:
955 logger.debug("connect %r to %r", sock, address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200956 await self.sock_connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700957 except OSError as exc:
958 if sock is not None:
959 sock.close()
960 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200961 except:
962 if sock is not None:
963 sock.close()
964 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700965 else:
966 break
967 else:
968 if len(exceptions) == 1:
969 raise exceptions[0]
970 else:
971 # If they all have the same str(), raise one.
972 model = str(exceptions[0])
973 if all(str(exc) == model for exc in exceptions):
974 raise exceptions[0]
975 # Raise a combined exception so the user can see all
976 # the various error messages.
977 raise OSError('Multiple exceptions: {}'.format(
978 ', '.join(str(exc) for exc in exceptions)))
979
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500980 else:
981 if sock is None:
982 raise ValueError(
983 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500984 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -0500985 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
986 # are SOCK_STREAM.
987 # We support passing AF_UNIX sockets even though we have
988 # a dedicated API for that: create_unix_connection.
989 # Disallowing AF_UNIX in this method, breaks backwards
990 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500991 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500992 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700993
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200994 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000995 sock, protocol_factory, ssl, server_hostname,
996 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +0200997 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200998 # Get the socket from the transport because SSL transport closes
999 # the old socket and creates a new SSL socket
1000 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +02001001 logger.debug("%r connected to %s:%r: (%r, %r)",
1002 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -05001003 return transport, protocol
1004
Neil Aspinallf7686c12017-12-19 19:45:42 +00001005 async def _create_connection_transport(
1006 self, sock, protocol_factory, ssl,
1007 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001008 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001009
1010 sock.setblocking(False)
1011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001012 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001013 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001014 if ssl:
1015 sslcontext = None if isinstance(ssl, bool) else ssl
1016 transport = self._make_ssl_transport(
1017 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +00001018 server_side=server_side, server_hostname=server_hostname,
1019 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001020 else:
1021 transport = self._make_socket_transport(sock, protocol, waiter)
1022
Victor Stinner29ad0112015-01-15 00:04:21 +01001023 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001024 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +01001025 except:
Victor Stinner29ad0112015-01-15 00:04:21 +01001026 transport.close()
1027 raise
1028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001029 return transport, protocol
1030
Andrew Svetlov7c684072018-01-27 21:22:47 +02001031 async def sendfile(self, transport, file, offset=0, count=None,
1032 *, fallback=True):
1033 """Send a file to transport.
1034
1035 Return the total number of bytes which were sent.
1036
1037 The method uses high-performance os.sendfile if available.
1038
1039 file must be a regular file object opened in binary mode.
1040
1041 offset tells from where to start reading the file. If specified,
1042 count is the total number of bytes to transmit as opposed to
1043 sending the file until EOF is reached. File position is updated on
1044 return or also in case of error in which case file.tell()
1045 can be used to figure out the number of bytes
1046 which were sent.
1047
1048 fallback set to True makes asyncio to manually read and send
1049 the file when the platform does not support the sendfile syscall
1050 (e.g. Windows or SSL socket on Unix).
1051
1052 Raise SendfileNotAvailableError if the system does not support
1053 sendfile syscall and fallback is False.
1054 """
1055 if transport.is_closing():
1056 raise RuntimeError("Transport is closing")
1057 mode = getattr(transport, '_sendfile_compatible',
1058 constants._SendfileMode.UNSUPPORTED)
1059 if mode is constants._SendfileMode.UNSUPPORTED:
1060 raise RuntimeError(
1061 f"sendfile is not supported for transport {transport!r}")
1062 if mode is constants._SendfileMode.TRY_NATIVE:
1063 try:
1064 return await self._sendfile_native(transport, file,
1065 offset, count)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001066 except exceptions.SendfileNotAvailableError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +02001067 if not fallback:
1068 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001069
1070 if not fallback:
1071 raise RuntimeError(
1072 f"fallback is disabled and native sendfile is not "
1073 f"supported for transport {transport!r}")
1074
Andrew Svetlov7c684072018-01-27 21:22:47 +02001075 return await self._sendfile_fallback(transport, file,
1076 offset, count)
1077
1078 async def _sendfile_native(self, transp, file, offset, count):
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001079 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov7c684072018-01-27 21:22:47 +02001080 "sendfile syscall is not supported")
1081
1082 async def _sendfile_fallback(self, transp, file, offset, count):
1083 if offset:
1084 file.seek(offset)
1085 blocksize = min(count, 16384) if count else 16384
1086 buf = bytearray(blocksize)
1087 total_sent = 0
1088 proto = _SendfileFallbackProtocol(transp)
1089 try:
1090 while True:
1091 if count:
1092 blocksize = min(count - total_sent, blocksize)
1093 if blocksize <= 0:
1094 return total_sent
1095 view = memoryview(buf)[:blocksize]
1096 read = file.readinto(view)
1097 if not read:
1098 return total_sent # EOF
1099 await proto.drain()
1100 transp.write(view)
1101 total_sent += read
1102 finally:
1103 if total_sent > 0 and hasattr(file, 'seek'):
1104 file.seek(offset + total_sent)
1105 await proto.restore()
1106
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001107 async def start_tls(self, transport, protocol, sslcontext, *,
1108 server_side=False,
1109 server_hostname=None,
1110 ssl_handshake_timeout=None):
1111 """Upgrade transport to TLS.
1112
1113 Return a new transport that *protocol* should start using
1114 immediately.
1115 """
1116 if ssl is None:
1117 raise RuntimeError('Python ssl module is not available')
1118
1119 if not isinstance(sslcontext, ssl.SSLContext):
1120 raise TypeError(
1121 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1122 f'got {sslcontext!r}')
1123
1124 if not getattr(transport, '_start_tls_compatible', False):
1125 raise TypeError(
Yury Selivanov415bc462018-06-05 08:59:58 -04001126 f'transport {transport!r} is not supported by start_tls()')
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001127
1128 waiter = self.create_future()
1129 ssl_protocol = sslproto.SSLProtocol(
1130 self, protocol, sslcontext, waiter,
1131 server_side, server_hostname,
1132 ssl_handshake_timeout=ssl_handshake_timeout,
1133 call_connection_made=False)
1134
Yury Selivanovf2955872018-05-29 01:00:12 -04001135 # Pause early so that "ssl_protocol.data_received()" doesn't
1136 # have a chance to get called before "ssl_protocol.connection_made()".
1137 transport.pause_reading()
1138
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001139 transport.set_protocol(ssl_protocol)
Yury Selivanov415bc462018-06-05 08:59:58 -04001140 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1141 resume_cb = self.call_soon(transport.resume_reading)
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001142
Yury Selivanov96026432018-06-04 11:32:35 -04001143 try:
1144 await waiter
1145 except Exception:
1146 transport.close()
Yury Selivanov415bc462018-06-05 08:59:58 -04001147 conmade_cb.cancel()
1148 resume_cb.cancel()
Yury Selivanov96026432018-06-04 11:32:35 -04001149 raise
1150
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001151 return ssl_protocol._app_transport
1152
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001153 async def create_datagram_endpoint(self, protocol_factory,
1154 local_addr=None, remote_addr=None, *,
1155 family=0, proto=0, flags=0,
1156 reuse_address=None, reuse_port=None,
1157 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001158 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001159 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001160 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001161 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001162 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001163 if (local_addr or remote_addr or
1164 family or proto or flags or
1165 reuse_address or reuse_port or allow_broadcast):
1166 # show the problematic kwargs in exception msg
1167 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1168 family=family, proto=proto, flags=flags,
1169 reuse_address=reuse_address, reuse_port=reuse_port,
1170 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001171 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001172 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001173 f'socket modifier keyword arguments can not be used '
1174 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001175 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001176 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001177 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001178 if not (local_addr or remote_addr):
1179 if family == 0:
1180 raise ValueError('unexpected address family')
1181 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001182 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1183 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001184 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001185 raise TypeError('string is expected')
1186 addr_pairs_info = (((family, proto),
1187 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001188 else:
1189 # join address by (family, protocol)
Inada Naokif3451702019-02-05 17:04:40 +09001190 addr_infos = {} # Using order preserving dict
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001191 for idx, addr in ((0, local_addr), (1, remote_addr)):
1192 if addr is not None:
1193 assert isinstance(addr, tuple) and len(addr) == 2, (
1194 '2-tuple is expected')
1195
Yury Selivanov19a44f62017-12-14 20:53:26 -05001196 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001197 addr, family=family, type=socket.SOCK_DGRAM,
1198 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001199 if not infos:
1200 raise OSError('getaddrinfo() returned empty list')
1201
1202 for fam, _, pro, _, address in infos:
1203 key = (fam, pro)
1204 if key not in addr_infos:
1205 addr_infos[key] = [None, None]
1206 addr_infos[key][idx] = address
1207
1208 # each addr has to have info for each (family, proto) pair
1209 addr_pairs_info = [
1210 (key, addr_pair) for key, addr_pair in addr_infos.items()
1211 if not ((local_addr and addr_pair[0] is None) or
1212 (remote_addr and addr_pair[1] is None))]
1213
1214 if not addr_pairs_info:
1215 raise ValueError('can not get address information')
1216
1217 exceptions = []
1218
1219 if reuse_address is None:
1220 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1221
1222 for ((family, proto),
1223 (local_address, remote_address)) in addr_pairs_info:
1224 sock = None
1225 r_addr = None
1226 try:
1227 sock = socket.socket(
1228 family=family, type=socket.SOCK_DGRAM, proto=proto)
1229 if reuse_address:
1230 sock.setsockopt(
1231 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1232 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001233 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001234 if allow_broadcast:
1235 sock.setsockopt(
1236 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1237 sock.setblocking(False)
1238
1239 if local_addr:
1240 sock.bind(local_address)
1241 if remote_addr:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001242 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001243 r_addr = remote_address
1244 except OSError as exc:
1245 if sock is not None:
1246 sock.close()
1247 exceptions.append(exc)
1248 except:
1249 if sock is not None:
1250 sock.close()
1251 raise
1252 else:
1253 break
1254 else:
1255 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001256
1257 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001258 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001259 transport = self._make_datagram_transport(
1260 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001261 if self._debug:
1262 if local_addr:
1263 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1264 "created: (%r, %r)",
1265 local_addr, remote_addr, transport, protocol)
1266 else:
1267 logger.debug("Datagram endpoint remote_addr=%r created: "
1268 "(%r, %r)",
1269 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001270
1271 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001272 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001273 except:
1274 transport.close()
1275 raise
1276
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001277 return transport, protocol
1278
Yury Selivanov19a44f62017-12-14 20:53:26 -05001279 async def _ensure_resolved(self, address, *,
1280 family=0, type=socket.SOCK_STREAM,
1281 proto=0, flags=0, loop):
1282 host, port = address[:2]
1283 info = _ipaddr_info(host, port, family, type, proto)
1284 if info is not None:
1285 # "host" is already a resolved IP.
1286 return [info]
1287 else:
1288 return await loop.getaddrinfo(host, port, family=family, type=type,
1289 proto=proto, flags=flags)
1290
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001291 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001292 infos = await self._ensure_resolved((host, port), family=family,
1293 type=socket.SOCK_STREAM,
1294 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001295 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001296 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001297 return infos
1298
Neil Aspinallf7686c12017-12-19 19:45:42 +00001299 async def create_server(
1300 self, protocol_factory, host=None, port=None,
1301 *,
1302 family=socket.AF_UNSPEC,
1303 flags=socket.AI_PASSIVE,
1304 sock=None,
1305 backlog=100,
1306 ssl=None,
1307 reuse_address=None,
1308 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001309 ssl_handshake_timeout=None,
1310 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001311 """Create a TCP server.
1312
Yury Selivanov6370f342017-12-10 18:36:12 -05001313 The host parameter can be a string, in that case the TCP server is
1314 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001315
1316 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001317 the TCP server is bound to all hosts of the sequence. If a host
1318 appears multiple times (possibly indirectly e.g. when hostnames
1319 resolve to the same IP address), the server is only bound once to that
1320 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001321
Victor Stinneracdb7822014-07-14 18:33:40 +02001322 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001323
1324 This method is a coroutine.
1325 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001326 if isinstance(ssl, bool):
1327 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001328
1329 if ssl_handshake_timeout is not None and ssl is None:
1330 raise ValueError(
1331 'ssl_handshake_timeout is only meaningful with ssl')
1332
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001333 if host is not None or port is not None:
1334 if sock is not None:
1335 raise ValueError(
1336 'host/port and sock can not be specified at the same time')
1337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001338 if reuse_address is None:
1339 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1340 sockets = []
1341 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001342 hosts = [None]
1343 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001344 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001345 hosts = [host]
1346 else:
1347 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001348
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001349 fs = [self._create_server_getaddrinfo(host, port, family=family,
1350 flags=flags)
1351 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001352 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001353 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001354
1355 completed = False
1356 try:
1357 for res in infos:
1358 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001359 try:
1360 sock = socket.socket(af, socktype, proto)
1361 except socket.error:
1362 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001363 if self._debug:
1364 logger.warning('create_server() failed to create '
1365 'socket.socket(%r, %r, %r)',
1366 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001367 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001368 sockets.append(sock)
1369 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001370 sock.setsockopt(
1371 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1372 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001373 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001374 # Disable IPv4/IPv6 dual stack support (enabled by
1375 # default on Linux) which makes a single socket
1376 # listen on both address families.
Yury Selivanovd904c232018-06-28 21:59:32 -04001377 if (_HAS_IPv6 and
1378 af == socket.AF_INET6 and
1379 hasattr(socket, 'IPPROTO_IPV6')):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001380 sock.setsockopt(socket.IPPROTO_IPV6,
1381 socket.IPV6_V6ONLY,
1382 True)
1383 try:
1384 sock.bind(sa)
1385 except OSError as err:
1386 raise OSError(err.errno, 'error while attempting '
1387 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001388 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001389 completed = True
1390 finally:
1391 if not completed:
1392 for sock in sockets:
1393 sock.close()
1394 else:
1395 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001396 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001397 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001398 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001399 sockets = [sock]
1400
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001401 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001402 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001403
1404 server = Server(self, sockets, protocol_factory,
1405 ssl, backlog, ssl_handshake_timeout)
1406 if start_serving:
1407 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -04001408 # Skip one loop iteration so that all 'loop.add_reader'
1409 # go through.
1410 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001411
Victor Stinnere912e652014-07-12 03:11:53 +02001412 if self._debug:
1413 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001414 return server
1415
Neil Aspinallf7686c12017-12-19 19:45:42 +00001416 async def connect_accepted_socket(
1417 self, protocol_factory, sock,
1418 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001419 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001420 """Handle an accepted connection.
1421
1422 This is used by servers that accept connections outside of
1423 asyncio but that use asyncio to handle connections.
1424
1425 This method is a coroutine. When completed, the coroutine
1426 returns a (transport, protocol) pair.
1427 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001428 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001429 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001430
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001431 if ssl_handshake_timeout is not None and not ssl:
1432 raise ValueError(
1433 'ssl_handshake_timeout is only meaningful with ssl')
1434
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001435 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001436 sock, protocol_factory, ssl, '', server_side=True,
1437 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001438 if self._debug:
1439 # Get the socket from the transport because SSL transport closes
1440 # the old socket and creates a new SSL socket
1441 sock = transport.get_extra_info('socket')
1442 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1443 return transport, protocol
1444
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001445 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001446 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001447 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001448 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001449
1450 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001451 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001452 except:
1453 transport.close()
1454 raise
1455
Victor Stinneracdb7822014-07-14 18:33:40 +02001456 if self._debug:
1457 logger.debug('Read pipe %r connected: (%r, %r)',
1458 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001459 return transport, protocol
1460
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001461 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001462 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001463 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001464 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001465
1466 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001467 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001468 except:
1469 transport.close()
1470 raise
1471
Victor Stinneracdb7822014-07-14 18:33:40 +02001472 if self._debug:
1473 logger.debug('Write pipe %r connected: (%r, %r)',
1474 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001475 return transport, protocol
1476
Victor Stinneracdb7822014-07-14 18:33:40 +02001477 def _log_subprocess(self, msg, stdin, stdout, stderr):
1478 info = [msg]
1479 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001480 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001481 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001482 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001483 else:
1484 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001485 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001486 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001487 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001488 logger.debug(' '.join(info))
1489
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001490 async def subprocess_shell(self, protocol_factory, cmd, *,
1491 stdin=subprocess.PIPE,
1492 stdout=subprocess.PIPE,
1493 stderr=subprocess.PIPE,
1494 universal_newlines=False,
1495 shell=True, bufsize=0,
1496 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001497 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001498 raise ValueError("cmd must be a string")
1499 if universal_newlines:
1500 raise ValueError("universal_newlines must be False")
1501 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001502 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001503 if bufsize != 0:
1504 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001505 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001506 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001507 if self._debug:
1508 # don't log parameters: they may contain sensitive information
1509 # (password) and may be too long
1510 debug_log = 'run shell command %r' % cmd
1511 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001512 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001513 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001514 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001515 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001516 return transport, protocol
1517
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001518 async def subprocess_exec(self, protocol_factory, program, *args,
1519 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1520 stderr=subprocess.PIPE, universal_newlines=False,
1521 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001522 if universal_newlines:
1523 raise ValueError("universal_newlines must be False")
1524 if shell:
1525 raise ValueError("shell must be False")
1526 if bufsize != 0:
1527 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001528 popen_args = (program,) + args
1529 for arg in popen_args:
1530 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001531 raise TypeError(
1532 f"program arguments must be a bytes or text string, "
1533 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001534 protocol = protocol_factory()
Yury Selivanov12f482e2018-06-08 18:24:37 -04001535 debug_log = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001536 if self._debug:
1537 # don't log parameters: they may contain sensitive information
1538 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001539 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001540 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001541 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001542 protocol, popen_args, False, stdin, stdout, stderr,
1543 bufsize, **kwargs)
Yury Selivanov12f482e2018-06-08 18:24:37 -04001544 if self._debug and debug_log is not None:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001545 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001546 return transport, protocol
1547
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001548 def get_exception_handler(self):
1549 """Return an exception handler, or None if the default one is in use.
1550 """
1551 return self._exception_handler
1552
Yury Selivanov569efa22014-02-18 18:02:19 -05001553 def set_exception_handler(self, handler):
1554 """Set handler as the new event loop exception handler.
1555
1556 If handler is None, the default exception handler will
1557 be set.
1558
1559 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001560 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001561 will be a reference to the active event loop, 'context'
1562 will be a dict object (see `call_exception_handler()`
1563 documentation for details about context).
1564 """
1565 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001566 raise TypeError(f'A callable object or None is expected, '
1567 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001568 self._exception_handler = handler
1569
1570 def default_exception_handler(self, context):
1571 """Default exception handler.
1572
1573 This is called when an exception occurs and no exception
1574 handler is set, and can be called by a custom exception
1575 handler that wants to defer to the default behavior.
1576
Antoine Pitrou921e9432017-11-07 17:23:29 +01001577 This default handler logs the error message and other
1578 context-dependent information. In debug mode, a truncated
1579 stack trace is also appended showing where the given object
1580 (e.g. a handle or future or task) was created, if any.
1581
Victor Stinneracdb7822014-07-14 18:33:40 +02001582 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001583 `call_exception_handler()`.
1584 """
1585 message = context.get('message')
1586 if not message:
1587 message = 'Unhandled exception in event loop'
1588
1589 exception = context.get('exception')
1590 if exception is not None:
1591 exc_info = (type(exception), exception, exception.__traceback__)
1592 else:
1593 exc_info = False
1594
Yury Selivanov6370f342017-12-10 18:36:12 -05001595 if ('source_traceback' not in context and
1596 self._current_handle is not None and
1597 self._current_handle._source_traceback):
1598 context['handle_traceback'] = \
1599 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001600
Yury Selivanov569efa22014-02-18 18:02:19 -05001601 log_lines = [message]
1602 for key in sorted(context):
1603 if key in {'message', 'exception'}:
1604 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001605 value = context[key]
1606 if key == 'source_traceback':
1607 tb = ''.join(traceback.format_list(value))
1608 value = 'Object created at (most recent call last):\n'
1609 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001610 elif key == 'handle_traceback':
1611 tb = ''.join(traceback.format_list(value))
1612 value = 'Handle created at (most recent call last):\n'
1613 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001614 else:
1615 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001616 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001617
1618 logger.error('\n'.join(log_lines), exc_info=exc_info)
1619
1620 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001621 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001622
Victor Stinneracdb7822014-07-14 18:33:40 +02001623 The context argument is a dict containing the following keys:
1624
Yury Selivanov569efa22014-02-18 18:02:19 -05001625 - 'message': Error message;
1626 - 'exception' (optional): Exception object;
1627 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001628 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001629 - 'handle' (optional): Handle instance;
1630 - 'protocol' (optional): Protocol instance;
1631 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001632 - 'socket' (optional): Socket instance;
1633 - 'asyncgen' (optional): Asynchronous generator that caused
1634 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001635
Victor Stinneracdb7822014-07-14 18:33:40 +02001636 New keys maybe introduced in the future.
1637
1638 Note: do not overload this method in an event loop subclass.
1639 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001640 `set_exception_handler()` method.
1641 """
1642 if self._exception_handler is None:
1643 try:
1644 self.default_exception_handler(context)
1645 except Exception:
1646 # Second protection layer for unexpected errors
1647 # in the default implementation, as well as for subclassed
1648 # event loops with overloaded "default_exception_handler".
1649 logger.error('Exception in default exception handler',
1650 exc_info=True)
1651 else:
1652 try:
1653 self._exception_handler(self, context)
1654 except Exception as exc:
1655 # Exception in the user set custom exception handler.
1656 try:
1657 # Let's try default handler.
1658 self.default_exception_handler({
1659 'message': 'Unhandled error in exception handler',
1660 'exception': exc,
1661 'context': context,
1662 })
1663 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001664 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001665 # overloaded.
1666 logger.error('Exception in default exception handler '
1667 'while handling an unexpected error '
1668 'in custom exception handler',
1669 exc_info=True)
1670
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001671 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001672 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001673 assert isinstance(handle, events.Handle), 'A Handle is required here'
1674 if handle._cancelled:
1675 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001676 assert not isinstance(handle, events.TimerHandle)
1677 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001678
1679 def _add_callback_signalsafe(self, handle):
1680 """Like _add_callback() but called from a signal handler."""
1681 self._add_callback(handle)
1682 self._write_to_self()
1683
Yury Selivanov592ada92014-09-25 12:07:56 -04001684 def _timer_handle_cancelled(self, handle):
1685 """Notification that a TimerHandle has been cancelled."""
1686 if handle._scheduled:
1687 self._timer_cancelled_count += 1
1688
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001689 def _run_once(self):
1690 """Run one full iteration of the event loop.
1691
1692 This calls all currently ready callbacks, polls for I/O,
1693 schedules the resulting callbacks, and finally schedules
1694 'call_later' callbacks.
1695 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001696
Yury Selivanov592ada92014-09-25 12:07:56 -04001697 sched_count = len(self._scheduled)
1698 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1699 self._timer_cancelled_count / sched_count >
1700 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001701 # Remove delayed calls that were cancelled if their number
1702 # is too high
1703 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001704 for handle in self._scheduled:
1705 if handle._cancelled:
1706 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001707 else:
1708 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001709
Victor Stinner68da8fc2014-09-30 18:08:36 +02001710 heapq.heapify(new_scheduled)
1711 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001712 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001713 else:
1714 # Remove delayed calls that were cancelled from head of queue.
1715 while self._scheduled and self._scheduled[0]._cancelled:
1716 self._timer_cancelled_count -= 1
1717 handle = heapq.heappop(self._scheduled)
1718 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001719
1720 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001721 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001722 timeout = 0
1723 elif self._scheduled:
1724 # Compute the desired timeout.
1725 when = self._scheduled[0]._when
MartinAltmayer944451c2018-07-31 15:06:12 +01001726 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001727
Andrew Svetlovd5bd0362018-09-30 08:28:40 +03001728 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001729 self._process_events(event_list)
1730
1731 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001732 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001733 while self._scheduled:
1734 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001735 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001736 break
1737 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001738 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001739 self._ready.append(handle)
1740
1741 # This is the only place where callbacks are actually *called*.
1742 # All other places just add them to ready.
1743 # Note: We run all currently scheduled callbacks, but not any
1744 # callbacks scheduled by callbacks run this time around --
1745 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001746 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001747 ntodo = len(self._ready)
1748 for i in range(ntodo):
1749 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001750 if handle._cancelled:
1751 continue
1752 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001753 try:
1754 self._current_handle = handle
1755 t0 = self.time()
1756 handle._run()
1757 dt = self.time() - t0
1758 if dt >= self.slow_callback_duration:
1759 logger.warning('Executing %s took %.3f seconds',
1760 _format_handle(handle), dt)
1761 finally:
1762 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001763 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001764 handle._run()
1765 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001766
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001767 def _set_coroutine_origin_tracking(self, enabled):
1768 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001769 return
1770
Yury Selivanove8944cb2015-05-12 11:43:04 -04001771 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001772 self._coroutine_origin_tracking_saved_depth = (
1773 sys.get_coroutine_origin_tracking_depth())
1774 sys.set_coroutine_origin_tracking_depth(
1775 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001776 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001777 sys.set_coroutine_origin_tracking_depth(
1778 self._coroutine_origin_tracking_saved_depth)
1779
1780 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001781
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001782 def get_debug(self):
1783 return self._debug
1784
1785 def set_debug(self, enabled):
1786 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001787
Yury Selivanove8944cb2015-05-12 11:43:04 -04001788 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001789 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)