blob: ffdb50f4beea3ea07ed309cdead7251f7a906be5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070030import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Victor Stinnerf951d282014-06-29 00:46:45 +020032from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033from . import events
34from . import futures
35from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020036from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070037from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39
Victor Stinner8c1a4a22015-01-06 01:03:58 +010040__all__ = ['BaseEventLoop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42
Yury Selivanov592ada92014-09-25 12:07:56 -040043# Minimum number of _scheduled timer handles before cleanup of
44# cancelled handles is performed.
45_MIN_SCHEDULED_TIMER_HANDLES = 100
46
47# Minimum fraction of _scheduled timer handles that are cancelled
48# before cleanup of cancelled handles is performed.
49_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinnerc94a93a2016-04-01 21:43:39 +020051# Exceptions which must not call the exception handler in fatal error
52# methods (_fatal_error())
53_FATAL_ERROR_IGNORE = (BrokenPipeError,
54 ConnectionResetError, ConnectionAbortedError)
55
56
Victor Stinner0e6f52a2014-06-20 17:34:15 +020057def _format_handle(handle):
58 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040059 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020060 # format the task
61 return repr(cb.__self__)
62 else:
63 return str(handle)
64
65
Victor Stinneracdb7822014-07-14 18:33:40 +020066def _format_pipe(fd):
67 if fd == subprocess.PIPE:
68 return '<pipe>'
69 elif fd == subprocess.STDOUT:
70 return '<stdout>'
71 else:
72 return repr(fd)
73
74
Yury Selivanov5587d7c2016-09-15 15:45:07 -040075def _set_reuseport(sock):
76 if not hasattr(socket, 'SO_REUSEPORT'):
77 raise ValueError('reuse_port not supported by socket module')
78 else:
79 try:
80 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
81 except OSError:
82 raise ValueError('reuse_port not supported by socket module, '
83 'SO_REUSEPORT defined but not implemented.')
84
85
Yury Selivanova1a8b7d2016-11-09 15:47:00 -050086def _is_stream_socket(sock):
87 # Linux's socket.type is a bitmask that can include extra info
88 # about socket, therefore we can't do simple
89 # `sock_type == socket.SOCK_STREAM`.
90 return (sock.type & socket.SOCK_STREAM) == socket.SOCK_STREAM
91
92
93def _is_dgram_socket(sock):
94 # Linux's socket.type is a bitmask that can include extra info
95 # about socket, therefore we can't do simple
96 # `sock_type == socket.SOCK_DGRAM`.
97 return (sock.type & socket.SOCK_DGRAM) == socket.SOCK_DGRAM
98
99
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500100def _ipaddr_info(host, port, family, type, proto):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400101 # Try to skip getaddrinfo if "host" is already an IP. Users might have
102 # handled name resolution in their own code and pass in resolved IPs.
103 if not hasattr(socket, 'inet_pton'):
104 return
105
106 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
107 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500108 return None
109
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500110 if type == socket.SOCK_STREAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500111 # Linux only:
112 # getaddrinfo() can raise when socket.type is a bit mask.
113 # So if socket.type is a bit mask of SOCK_STREAM, and say
114 # SOCK_NONBLOCK, we simply return None, which will trigger
115 # a call to getaddrinfo() letting it process this request.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500116 proto = socket.IPPROTO_TCP
117 elif type == socket.SOCK_DGRAM:
118 proto = socket.IPPROTO_UDP
119 else:
120 return None
121
Yury Selivanova7146162016-06-02 16:51:07 -0400122 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400123 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700124 elif isinstance(port, bytes) and port == b'':
125 port = 0
126 elif isinstance(port, str) and port == '':
127 port = 0
128 else:
129 # If port's a service name like "http", don't skip getaddrinfo.
130 try:
131 port = int(port)
132 except (TypeError, ValueError):
133 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400134
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400135 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500136 afs = [socket.AF_INET]
137 if hasattr(socket, 'AF_INET6'):
138 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400139 else:
140 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500141
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400142 if isinstance(host, bytes):
143 host = host.decode('idna')
144 if '%' in host:
145 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
146 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500147 return None
148
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400149 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500150 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400151 socket.inet_pton(af, host)
152 # The host has already been resolved.
153 return af, type, proto, '', (host, port)
154 except OSError:
155 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500156
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400157 # "host" is not an IP address.
158 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500159
160
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400161def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0,
162 flags=0, loop):
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500163 host, port = address[:2]
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400164 info = _ipaddr_info(host, port, family, type, proto)
165 if info is not None:
166 # "host" is already a resolved IP.
167 fut = loop.create_future()
168 fut.set_result([info])
169 return fut
170 else:
171 return loop.getaddrinfo(host, port, family=family, type=type,
172 proto=proto, flags=flags)
Victor Stinner1b0580b2014-02-13 09:24:37 +0100173
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100175def _run_until_complete_cb(fut):
176 exc = fut._exception
177 if (isinstance(exc, BaseException)
178 and not isinstance(exc, Exception)):
179 # Issue #22429: run_forever() already finished, no need to
180 # stop it.
181 return
Guido van Rossum41f69f42015-11-19 13:28:47 -0800182 fut._loop.stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100183
184
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185class Server(events.AbstractServer):
186
187 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200188 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200190 self._active_count = 0
191 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192
Victor Stinnere912e652014-07-12 03:11:53 +0200193 def __repr__(self):
194 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
195
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200196 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200198 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200200 def _detach(self):
201 assert self._active_count > 0
202 self._active_count -= 1
203 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 self._wakeup()
205
206 def close(self):
207 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200208 if sockets is None:
209 return
210 self.sockets = None
211 for sock in sockets:
212 self._loop._stop_serving(sock)
213 if self._active_count == 0:
214 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215
216 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200217 waiters = self._waiters
218 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 for waiter in waiters:
220 if not waiter.done():
221 waiter.set_result(waiter)
222
Victor Stinnerf951d282014-06-29 00:46:45 +0200223 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200225 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400227 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200228 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 yield from waiter
230
231
232class BaseEventLoop(events.AbstractEventLoop):
233
234 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400235 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200236 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800237 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 self._ready = collections.deque()
239 self._scheduled = []
240 self._default_executor = None
241 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100242 # Identifier of the thread running the event loop, or None if the
243 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100244 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100245 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500246 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800247 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200248 # In debug mode, if the execution of a callback or a step of a task
249 # exceed this duration in seconds, the slow callback/task is logged.
250 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100251 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400252 self._task_factory = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400253 self._coroutine_wrapper_set = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
Yury Selivanov0a91d482016-09-15 13:24:03 -0400255 if hasattr(sys, 'get_asyncgen_hooks'):
256 # Python >= 3.6
257 # A weak set of all asynchronous generators that are
258 # being iterated by the loop.
259 self._asyncgens = weakref.WeakSet()
260 else:
261 self._asyncgens = None
Yury Selivanoveb636452016-09-08 22:01:51 -0700262
263 # Set to True when `loop.shutdown_asyncgens` is called.
264 self._asyncgens_shutdown_called = False
265
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200266 def __repr__(self):
267 return ('<%s running=%s closed=%s debug=%s>'
268 % (self.__class__.__name__, self.is_running(),
269 self.is_closed(), self.get_debug()))
270
Yury Selivanov7661db62016-05-16 15:38:39 -0400271 def create_future(self):
272 """Create a Future object attached to the loop."""
273 return futures.Future(loop=self)
274
Victor Stinner896a25a2014-07-08 11:29:25 +0200275 def create_task(self, coro):
276 """Schedule a coroutine object.
277
Victor Stinneracdb7822014-07-14 18:33:40 +0200278 Return a task object.
279 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100280 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400281 if self._task_factory is None:
282 task = tasks.Task(coro, loop=self)
283 if task._source_traceback:
284 del task._source_traceback[-1]
285 else:
286 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200287 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200288
Yury Selivanov740169c2015-05-11 14:23:38 -0400289 def set_task_factory(self, factory):
290 """Set a task factory that will be used by loop.create_task().
291
292 If factory is None the default task factory will be set.
293
294 If factory is a callable, it should have a signature matching
295 '(loop, coro)', where 'loop' will be a reference to the active
296 event loop, 'coro' will be a coroutine object. The callable
297 must return a Future.
298 """
299 if factory is not None and not callable(factory):
300 raise TypeError('task factory must be a callable or None')
301 self._task_factory = factory
302
303 def get_task_factory(self):
304 """Return a task factory, or None if the default one is in use."""
305 return self._task_factory
306
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 def _make_socket_transport(self, sock, protocol, waiter=None, *,
308 extra=None, server=None):
309 """Create socket transport."""
310 raise NotImplementedError
311
Victor Stinner15cc6782015-01-09 00:09:10 +0100312 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
313 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 extra=None, server=None):
315 """Create SSL transport."""
316 raise NotImplementedError
317
318 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200319 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 """Create datagram transport."""
321 raise NotImplementedError
322
323 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
324 extra=None):
325 """Create read pipe transport."""
326 raise NotImplementedError
327
328 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
329 extra=None):
330 """Create write pipe transport."""
331 raise NotImplementedError
332
Victor Stinnerf951d282014-06-29 00:46:45 +0200333 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 def _make_subprocess_transport(self, protocol, args, shell,
335 stdin, stdout, stderr, bufsize,
336 extra=None, **kwargs):
337 """Create subprocess transport."""
338 raise NotImplementedError
339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200341 """Write a byte to self-pipe, to wake up the event loop.
342
343 This may be called from a different thread.
344
345 The subclass is responsible for implementing the self-pipe.
346 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 raise NotImplementedError
348
349 def _process_events(self, event_list):
350 """Process selector events."""
351 raise NotImplementedError
352
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200353 def _check_closed(self):
354 if self._closed:
355 raise RuntimeError('Event loop is closed')
356
Yury Selivanoveb636452016-09-08 22:01:51 -0700357 def _asyncgen_finalizer_hook(self, agen):
358 self._asyncgens.discard(agen)
359 if not self.is_closed():
360 self.create_task(agen.aclose())
Yury Selivanoved054062016-10-21 17:13:40 -0400361 # Wake up the loop if the finalizer was called from
362 # a different thread.
363 self._write_to_self()
Yury Selivanoveb636452016-09-08 22:01:51 -0700364
365 def _asyncgen_firstiter_hook(self, agen):
366 if self._asyncgens_shutdown_called:
367 warnings.warn(
368 "asynchronous generator {!r} was scheduled after "
369 "loop.shutdown_asyncgens() call".format(agen),
370 ResourceWarning, source=self)
371
372 self._asyncgens.add(agen)
373
374 @coroutine
375 def shutdown_asyncgens(self):
376 """Shutdown all active asynchronous generators."""
377 self._asyncgens_shutdown_called = True
378
Yury Selivanov0a91d482016-09-15 13:24:03 -0400379 if self._asyncgens is None or not len(self._asyncgens):
380 # If Python version is <3.6 or we don't have any asynchronous
381 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700382 return
383
384 closing_agens = list(self._asyncgens)
385 self._asyncgens.clear()
386
387 shutdown_coro = tasks.gather(
388 *[ag.aclose() for ag in closing_agens],
389 return_exceptions=True,
390 loop=self)
391
392 results = yield from shutdown_coro
393 for result, agen in zip(results, closing_agens):
394 if isinstance(result, Exception):
395 self.call_exception_handler({
396 'message': 'an error occurred during closing of '
397 'asynchronous generator {!r}'.format(agen),
398 'exception': result,
399 'asyncgen': agen
400 })
401
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 def run_forever(self):
403 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200404 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100405 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400406 raise RuntimeError('This event loop is already running')
407 if events._get_running_loop() is not None:
408 raise RuntimeError(
409 'Cannot run the event loop while another loop is running')
Yury Selivanove8944cb2015-05-12 11:43:04 -0400410 self._set_coroutine_wrapper(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100411 self._thread_id = threading.get_ident()
Yury Selivanov0a91d482016-09-15 13:24:03 -0400412 if self._asyncgens is not None:
413 old_agen_hooks = sys.get_asyncgen_hooks()
414 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
415 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400417 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800419 self._run_once()
420 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 break
422 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800423 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100424 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400425 events._set_running_loop(None)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400426 self._set_coroutine_wrapper(False)
Yury Selivanov0a91d482016-09-15 13:24:03 -0400427 if self._asyncgens is not None:
428 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
430 def run_until_complete(self, future):
431 """Run until the Future is done.
432
433 If the argument is a coroutine, it is wrapped in a Task.
434
Victor Stinneracdb7822014-07-14 18:33:40 +0200435 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 with the same coroutine twice -- it would wrap it in two
437 different Tasks and that can't be good.
438
439 Return the Future's result, or raise its exception.
440 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200441 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200442
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700443 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400444 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200445 if new_task:
446 # An exception is raised if the future didn't complete, so there
447 # is no need to log the "destroy pending task" message
448 future._log_destroy_pending = False
449
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100450 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200451 try:
452 self.run_forever()
453 except:
454 if new_task and future.done() and not future.cancelled():
455 # The coroutine raised a BaseException. Consume the exception
456 # to not log a warning, the caller doesn't have access to the
457 # local task.
458 future.exception()
459 raise
jimmylai21b3e042017-05-22 22:32:46 -0700460 finally:
461 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 if not future.done():
463 raise RuntimeError('Event loop stopped before Future completed.')
464
465 return future.result()
466
467 def stop(self):
468 """Stop running the event loop.
469
Guido van Rossum41f69f42015-11-19 13:28:47 -0800470 Every callback already scheduled will still run. This simply informs
471 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800473 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200475 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700476 """Close the event loop.
477
478 This clears the queues and shuts down the executor,
479 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200480
481 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700482 """
Victor Stinner956de692014-12-26 21:07:52 +0100483 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200484 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200485 if self._closed:
486 return
Victor Stinnere912e652014-07-12 03:11:53 +0200487 if self._debug:
488 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400489 self._closed = True
490 self._ready.clear()
491 self._scheduled.clear()
492 executor = self._default_executor
493 if executor is not None:
494 self._default_executor = None
495 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200496
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200497 def is_closed(self):
498 """Returns True if the event loop was closed."""
499 return self._closed
500
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900501 def __del__(self):
502 if not self.is_closed():
503 warnings.warn("unclosed event loop %r" % self, ResourceWarning,
504 source=self)
505 if not self.is_running():
506 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100507
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200509 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100510 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511
512 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200513 """Return the time according to the event loop's clock.
514
515 This is a float expressed in seconds since an epoch, but the
516 epoch, precision, accuracy and drift are unspecified and may
517 differ per event loop.
518 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 return time.monotonic()
520
521 def call_later(self, delay, callback, *args):
522 """Arrange for a callback to be called at a given time.
523
524 Return a Handle: an opaque object with a cancel() method that
525 can be used to cancel the call.
526
527 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200528 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529
530 Each callback will be called exactly once. If two callbacks
531 are scheduled for exactly the same time, it undefined which
532 will be called first.
533
534 Any positional arguments after the callback will be passed to
535 the callback when it is called.
536 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200537 timer = self.call_at(self.time() + delay, callback, *args)
538 if timer._source_traceback:
539 del timer._source_traceback[-1]
540 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541
542 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200543 """Like call_later(), but uses an absolute time.
544
545 Absolute time corresponds to the event loop's time() method.
546 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100547 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100548 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100549 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700550 self._check_callback(callback, 'call_at')
Yury Selivanov569efa22014-02-18 18:02:19 -0500551 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200552 if timer._source_traceback:
553 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400555 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 return timer
557
558 def call_soon(self, callback, *args):
559 """Arrange for a callback to be called as soon as possible.
560
Victor Stinneracdb7822014-07-14 18:33:40 +0200561 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 order in which they are registered. Each callback will be
563 called exactly once.
564
565 Any positional arguments after the callback will be passed to
566 the callback when it is called.
567 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700568 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100569 if self._debug:
570 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700571 self._check_callback(callback, 'call_soon')
Victor Stinner956de692014-12-26 21:07:52 +0100572 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200573 if handle._source_traceback:
574 del handle._source_traceback[-1]
575 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100576
Yury Selivanov491a9122016-11-03 15:09:24 -0700577 def _check_callback(self, callback, method):
578 if (coroutines.iscoroutine(callback) or
579 coroutines.iscoroutinefunction(callback)):
580 raise TypeError(
581 "coroutines cannot be used with {}()".format(method))
582 if not callable(callback):
583 raise TypeError(
584 'a callable object was expected by {}(), got {!r}'.format(
585 method, callback))
586
587
Victor Stinner956de692014-12-26 21:07:52 +0100588 def _call_soon(self, callback, args):
Yury Selivanov569efa22014-02-18 18:02:19 -0500589 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200590 if handle._source_traceback:
591 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 self._ready.append(handle)
593 return handle
594
Victor Stinner956de692014-12-26 21:07:52 +0100595 def _check_thread(self):
596 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100597
Victor Stinneracdb7822014-07-14 18:33:40 +0200598 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100599 likely behave incorrectly when the assumption is violated.
600
Victor Stinneracdb7822014-07-14 18:33:40 +0200601 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100602 responsible for checking this condition for performance reasons.
603 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100604 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200605 return
Victor Stinner956de692014-12-26 21:07:52 +0100606 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100607 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100608 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200609 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100610 "than the current one")
611
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200613 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700614 self._check_closed()
615 if self._debug:
616 self._check_callback(callback, 'call_soon_threadsafe')
Victor Stinner956de692014-12-26 21:07:52 +0100617 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200618 if handle._source_traceback:
619 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 self._write_to_self()
621 return handle
622
Yury Selivanov740169c2015-05-11 14:23:38 -0400623 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100624 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700625 if self._debug:
626 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 if executor is None:
628 executor = self._default_executor
629 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400630 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 self._default_executor = executor
Yury Selivanov740169c2015-05-11 14:23:38 -0400632 return futures.wrap_future(executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
634 def set_default_executor(self, executor):
635 self._default_executor = executor
636
Victor Stinnere912e652014-07-12 03:11:53 +0200637 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
638 msg = ["%s:%r" % (host, port)]
639 if family:
640 msg.append('family=%r' % family)
641 if type:
642 msg.append('type=%r' % type)
643 if proto:
644 msg.append('proto=%r' % proto)
645 if flags:
646 msg.append('flags=%r' % flags)
647 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200648 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200649
650 t0 = self.time()
651 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
652 dt = self.time() - t0
653
Victor Stinneracdb7822014-07-14 18:33:40 +0200654 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200655 % (msg, dt * 1e3, addrinfo))
656 if dt >= self.slow_callback_duration:
657 logger.info(msg)
658 else:
659 logger.debug(msg)
660 return addrinfo
661
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 def getaddrinfo(self, host, port, *,
663 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400664 if self._debug:
Victor Stinnere912e652014-07-12 03:11:53 +0200665 return self.run_in_executor(None, self._getaddrinfo_debug,
666 host, port, family, type, proto, flags)
667 else:
668 return self.run_in_executor(None, socket.getaddrinfo,
669 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670
671 def getnameinfo(self, sockaddr, flags=0):
672 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
673
Victor Stinnerf951d282014-06-29 00:46:45 +0200674 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675 def create_connection(self, protocol_factory, host=None, port=None, *,
676 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700677 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200678 """Connect to a TCP server.
679
680 Create a streaming transport connection to a given Internet host and
681 port: socket family AF_INET or socket.AF_INET6 depending on host (or
682 family if specified), socket type SOCK_STREAM. protocol_factory must be
683 a callable returning a protocol instance.
684
685 This method is a coroutine which will try to establish the connection
686 in the background. When successful, the coroutine returns a
687 (transport, protocol) pair.
688 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700689 if server_hostname is not None and not ssl:
690 raise ValueError('server_hostname is only meaningful with ssl')
691
692 if server_hostname is None and ssl:
693 # Use host as default for server_hostname. It is an error
694 # if host is empty or not set, e.g. when an
695 # already-connected socket was passed or when only a port
696 # is given. To avoid this error, you can pass
697 # server_hostname='' -- this will bypass the hostname
698 # check. (This also means that if host is a numeric
699 # IP/IPv6 address, we will attempt to verify that exact
700 # address; this will probably fail, but it is possible to
701 # create a certificate for a specific IP address, so we
702 # don't judge it here.)
703 if not host:
704 raise ValueError('You must set server_hostname '
705 'when using ssl without a host')
706 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700707
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708 if host is not None or port is not None:
709 if sock is not None:
710 raise ValueError(
711 'host/port and sock can not be specified at the same time')
712
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400713 f1 = _ensure_resolved((host, port), family=family,
714 type=socket.SOCK_STREAM, proto=proto,
715 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716 fs = [f1]
717 if local_addr is not None:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400718 f2 = _ensure_resolved(local_addr, family=family,
719 type=socket.SOCK_STREAM, proto=proto,
720 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 fs.append(f2)
722 else:
723 f2 = None
724
725 yield from tasks.wait(fs, loop=self)
726
727 infos = f1.result()
728 if not infos:
729 raise OSError('getaddrinfo() returned empty list')
730 if f2 is not None:
731 laddr_infos = f2.result()
732 if not laddr_infos:
733 raise OSError('getaddrinfo() returned empty list')
734
735 exceptions = []
736 for family, type, proto, cname, address in infos:
737 try:
738 sock = socket.socket(family=family, type=type, proto=proto)
739 sock.setblocking(False)
740 if f2 is not None:
741 for _, _, _, _, laddr in laddr_infos:
742 try:
743 sock.bind(laddr)
744 break
745 except OSError as exc:
746 exc = OSError(
747 exc.errno, 'error while '
748 'attempting to bind on address '
749 '{!r}: {}'.format(
750 laddr, exc.strerror.lower()))
751 exceptions.append(exc)
752 else:
753 sock.close()
754 sock = None
755 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200756 if self._debug:
757 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700758 yield from self.sock_connect(sock, address)
759 except OSError as exc:
760 if sock is not None:
761 sock.close()
762 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200763 except:
764 if sock is not None:
765 sock.close()
766 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700767 else:
768 break
769 else:
770 if len(exceptions) == 1:
771 raise exceptions[0]
772 else:
773 # If they all have the same str(), raise one.
774 model = str(exceptions[0])
775 if all(str(exc) == model for exc in exceptions):
776 raise exceptions[0]
777 # Raise a combined exception so the user can see all
778 # the various error messages.
779 raise OSError('Multiple exceptions: {}'.format(
780 ', '.join(str(exc) for exc in exceptions)))
781
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500782 else:
783 if sock is None:
784 raise ValueError(
785 'host and port was not specified and no sock specified')
Yury Selivanovdab05842016-11-21 17:47:27 -0500786 if not _is_stream_socket(sock):
787 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
788 # are SOCK_STREAM.
789 # We support passing AF_UNIX sockets even though we have
790 # a dedicated API for that: create_unix_connection.
791 # Disallowing AF_UNIX in this method, breaks backwards
792 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500793 raise ValueError(
Yury Selivanovdab05842016-11-21 17:47:27 -0500794 'A Stream Socket was expected, got {!r}'.format(sock))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700795
Yury Selivanovb057c522014-02-18 12:15:06 -0500796 transport, protocol = yield from self._create_connection_transport(
797 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200798 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200799 # Get the socket from the transport because SSL transport closes
800 # the old socket and creates a new SSL socket
801 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200802 logger.debug("%r connected to %s:%r: (%r, %r)",
803 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500804 return transport, protocol
805
Victor Stinnerf951d282014-06-29 00:46:45 +0200806 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500807 def _create_connection_transport(self, sock, protocol_factory, ssl,
Yury Selivanov252e9ed2016-07-12 18:23:10 -0400808 server_hostname, server_side=False):
809
810 sock.setblocking(False)
811
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700812 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400813 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700814 if ssl:
815 sslcontext = None if isinstance(ssl, bool) else ssl
816 transport = self._make_ssl_transport(
817 sock, protocol, sslcontext, waiter,
Yury Selivanov252e9ed2016-07-12 18:23:10 -0400818 server_side=server_side, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700819 else:
820 transport = self._make_socket_transport(sock, protocol, waiter)
821
Victor Stinner29ad0112015-01-15 00:04:21 +0100822 try:
823 yield from waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100824 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100825 transport.close()
826 raise
827
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700828 return transport, protocol
829
Victor Stinnerf951d282014-06-29 00:46:45 +0200830 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700831 def create_datagram_endpoint(self, protocol_factory,
832 local_addr=None, remote_addr=None, *,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700833 family=0, proto=0, flags=0,
834 reuse_address=None, reuse_port=None,
835 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700836 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700837 if sock is not None:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500838 if not _is_dgram_socket(sock):
839 raise ValueError(
840 'A UDP Socket was expected, got {!r}'.format(sock))
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700841 if (local_addr or remote_addr or
842 family or proto or flags or
843 reuse_address or reuse_port or allow_broadcast):
844 # show the problematic kwargs in exception msg
845 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
846 family=family, proto=proto, flags=flags,
847 reuse_address=reuse_address, reuse_port=reuse_port,
848 allow_broadcast=allow_broadcast)
849 problems = ', '.join(
850 '{}={}'.format(k, v) for k, v in opts.items() if v)
851 raise ValueError(
852 'socket modifier keyword arguments can not be used '
853 'when sock is specified. ({})'.format(problems))
854 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700855 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700856 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700857 if not (local_addr or remote_addr):
858 if family == 0:
859 raise ValueError('unexpected address family')
860 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +0100861 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
862 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +0100863 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +0100864 raise TypeError('string is expected')
865 addr_pairs_info = (((family, proto),
866 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700867 else:
868 # join address by (family, protocol)
869 addr_infos = collections.OrderedDict()
870 for idx, addr in ((0, local_addr), (1, remote_addr)):
871 if addr is not None:
872 assert isinstance(addr, tuple) and len(addr) == 2, (
873 '2-tuple is expected')
874
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400875 infos = yield from _ensure_resolved(
876 addr, family=family, type=socket.SOCK_DGRAM,
877 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700878 if not infos:
879 raise OSError('getaddrinfo() returned empty list')
880
881 for fam, _, pro, _, address in infos:
882 key = (fam, pro)
883 if key not in addr_infos:
884 addr_infos[key] = [None, None]
885 addr_infos[key][idx] = address
886
887 # each addr has to have info for each (family, proto) pair
888 addr_pairs_info = [
889 (key, addr_pair) for key, addr_pair in addr_infos.items()
890 if not ((local_addr and addr_pair[0] is None) or
891 (remote_addr and addr_pair[1] is None))]
892
893 if not addr_pairs_info:
894 raise ValueError('can not get address information')
895
896 exceptions = []
897
898 if reuse_address is None:
899 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
900
901 for ((family, proto),
902 (local_address, remote_address)) in addr_pairs_info:
903 sock = None
904 r_addr = None
905 try:
906 sock = socket.socket(
907 family=family, type=socket.SOCK_DGRAM, proto=proto)
908 if reuse_address:
909 sock.setsockopt(
910 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
911 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -0400912 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700913 if allow_broadcast:
914 sock.setsockopt(
915 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
916 sock.setblocking(False)
917
918 if local_addr:
919 sock.bind(local_address)
920 if remote_addr:
921 yield from self.sock_connect(sock, remote_address)
922 r_addr = remote_address
923 except OSError as exc:
924 if sock is not None:
925 sock.close()
926 exceptions.append(exc)
927 except:
928 if sock is not None:
929 sock.close()
930 raise
931 else:
932 break
933 else:
934 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700935
936 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400937 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700938 transport = self._make_datagram_transport(
939 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200940 if self._debug:
941 if local_addr:
942 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
943 "created: (%r, %r)",
944 local_addr, remote_addr, transport, protocol)
945 else:
946 logger.debug("Datagram endpoint remote_addr=%r created: "
947 "(%r, %r)",
948 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100949
950 try:
951 yield from waiter
952 except:
953 transport.close()
954 raise
955
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700956 return transport, protocol
957
Victor Stinnerf951d282014-06-29 00:46:45 +0200958 @coroutine
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200959 def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400960 infos = yield from _ensure_resolved((host, port), family=family,
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200961 type=socket.SOCK_STREAM,
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400962 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200963 if not infos:
964 raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
965 return infos
966
967 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700968 def create_server(self, protocol_factory, host=None, port=None,
969 *,
970 family=socket.AF_UNSPEC,
971 flags=socket.AI_PASSIVE,
972 sock=None,
973 backlog=100,
974 ssl=None,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700975 reuse_address=None,
976 reuse_port=None):
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200977 """Create a TCP server.
978
979 The host parameter can be a string, in that case the TCP server is bound
980 to host and port.
981
982 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -0500983 the TCP server is bound to all hosts of the sequence. If a host
984 appears multiple times (possibly indirectly e.g. when hostnames
985 resolve to the same IP address), the server is only bound once to that
986 host.
Victor Stinnerd1432092014-06-19 17:11:49 +0200987
Victor Stinneracdb7822014-07-14 18:33:40 +0200988 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200989
990 This method is a coroutine.
991 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700992 if isinstance(ssl, bool):
993 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700994 if host is not None or port is not None:
995 if sock is not None:
996 raise ValueError(
997 'host/port and sock can not be specified at the same time')
998
999 AF_INET6 = getattr(socket, 'AF_INET6', 0)
1000 if reuse_address is None:
1001 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1002 sockets = []
1003 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001004 hosts = [None]
1005 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001006 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001007 hosts = [host]
1008 else:
1009 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001010
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001011 fs = [self._create_server_getaddrinfo(host, port, family=family,
1012 flags=flags)
1013 for host in hosts]
1014 infos = yield from tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001015 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001016
1017 completed = False
1018 try:
1019 for res in infos:
1020 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001021 try:
1022 sock = socket.socket(af, socktype, proto)
1023 except socket.error:
1024 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001025 if self._debug:
1026 logger.warning('create_server() failed to create '
1027 'socket.socket(%r, %r, %r)',
1028 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001029 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001030 sockets.append(sock)
1031 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001032 sock.setsockopt(
1033 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1034 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001035 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001036 # Disable IPv4/IPv6 dual stack support (enabled by
1037 # default on Linux) which makes a single socket
1038 # listen on both address families.
1039 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
1040 sock.setsockopt(socket.IPPROTO_IPV6,
1041 socket.IPV6_V6ONLY,
1042 True)
1043 try:
1044 sock.bind(sa)
1045 except OSError as err:
1046 raise OSError(err.errno, 'error while attempting '
1047 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001048 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001049 completed = True
1050 finally:
1051 if not completed:
1052 for sock in sockets:
1053 sock.close()
1054 else:
1055 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001056 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanovdab05842016-11-21 17:47:27 -05001057 if not _is_stream_socket(sock):
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001058 raise ValueError(
Yury Selivanovdab05842016-11-21 17:47:27 -05001059 'A Stream Socket was expected, got {!r}'.format(sock))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001060 sockets = [sock]
1061
1062 server = Server(self, sockets)
1063 for sock in sockets:
1064 sock.listen(backlog)
1065 sock.setblocking(False)
Yury Selivanova1b0e7d2016-09-15 14:13:15 -04001066 self._start_serving(protocol_factory, sock, ssl, server, backlog)
Victor Stinnere912e652014-07-12 03:11:53 +02001067 if self._debug:
1068 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001069 return server
1070
Victor Stinnerf951d282014-06-29 00:46:45 +02001071 @coroutine
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001072 def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
1073 """Handle an accepted connection.
1074
1075 This is used by servers that accept connections outside of
1076 asyncio but that use asyncio to handle connections.
1077
1078 This method is a coroutine. When completed, the coroutine
1079 returns a (transport, protocol) pair.
1080 """
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001081 if not _is_stream_socket(sock):
1082 raise ValueError(
1083 'A Stream Socket was expected, got {!r}'.format(sock))
1084
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001085 transport, protocol = yield from self._create_connection_transport(
1086 sock, protocol_factory, ssl, '', server_side=True)
1087 if self._debug:
1088 # Get the socket from the transport because SSL transport closes
1089 # the old socket and creates a new SSL socket
1090 sock = transport.get_extra_info('socket')
1091 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1092 return transport, protocol
1093
1094 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001095 def connect_read_pipe(self, protocol_factory, pipe):
1096 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001097 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001098 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001099
1100 try:
1101 yield from waiter
1102 except:
1103 transport.close()
1104 raise
1105
Victor Stinneracdb7822014-07-14 18:33:40 +02001106 if self._debug:
1107 logger.debug('Read pipe %r connected: (%r, %r)',
1108 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001109 return transport, protocol
1110
Victor Stinnerf951d282014-06-29 00:46:45 +02001111 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001112 def connect_write_pipe(self, protocol_factory, pipe):
1113 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001114 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001115 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001116
1117 try:
1118 yield from waiter
1119 except:
1120 transport.close()
1121 raise
1122
Victor Stinneracdb7822014-07-14 18:33:40 +02001123 if self._debug:
1124 logger.debug('Write pipe %r connected: (%r, %r)',
1125 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001126 return transport, protocol
1127
Victor Stinneracdb7822014-07-14 18:33:40 +02001128 def _log_subprocess(self, msg, stdin, stdout, stderr):
1129 info = [msg]
1130 if stdin is not None:
1131 info.append('stdin=%s' % _format_pipe(stdin))
1132 if stdout is not None and stderr == subprocess.STDOUT:
1133 info.append('stdout=stderr=%s' % _format_pipe(stdout))
1134 else:
1135 if stdout is not None:
1136 info.append('stdout=%s' % _format_pipe(stdout))
1137 if stderr is not None:
1138 info.append('stderr=%s' % _format_pipe(stderr))
1139 logger.debug(' '.join(info))
1140
Victor Stinnerf951d282014-06-29 00:46:45 +02001141 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001142 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
1143 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1144 universal_newlines=False, shell=True, bufsize=0,
1145 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001146 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001147 raise ValueError("cmd must be a string")
1148 if universal_newlines:
1149 raise ValueError("universal_newlines must be False")
1150 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001151 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001152 if bufsize != 0:
1153 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001154 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001155 if self._debug:
1156 # don't log parameters: they may contain sensitive information
1157 # (password) and may be too long
1158 debug_log = 'run shell command %r' % cmd
1159 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001160 transport = yield from self._make_subprocess_transport(
1161 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001162 if self._debug:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001163 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001164 return transport, protocol
1165
Victor Stinnerf951d282014-06-29 00:46:45 +02001166 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -05001167 def subprocess_exec(self, protocol_factory, program, *args,
1168 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1169 stderr=subprocess.PIPE, universal_newlines=False,
1170 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001171 if universal_newlines:
1172 raise ValueError("universal_newlines must be False")
1173 if shell:
1174 raise ValueError("shell must be False")
1175 if bufsize != 0:
1176 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001177 popen_args = (program,) + args
1178 for arg in popen_args:
1179 if not isinstance(arg, (str, bytes)):
1180 raise TypeError("program arguments must be "
1181 "a bytes or text string, not %s"
1182 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001183 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001184 if self._debug:
1185 # don't log parameters: they may contain sensitive information
1186 # (password) and may be too long
1187 debug_log = 'execute program %r' % program
1188 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001189 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001190 protocol, popen_args, False, stdin, stdout, stderr,
1191 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001192 if self._debug:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001193 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001194 return transport, protocol
1195
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001196 def get_exception_handler(self):
1197 """Return an exception handler, or None if the default one is in use.
1198 """
1199 return self._exception_handler
1200
Yury Selivanov569efa22014-02-18 18:02:19 -05001201 def set_exception_handler(self, handler):
1202 """Set handler as the new event loop exception handler.
1203
1204 If handler is None, the default exception handler will
1205 be set.
1206
1207 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001208 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001209 will be a reference to the active event loop, 'context'
1210 will be a dict object (see `call_exception_handler()`
1211 documentation for details about context).
1212 """
1213 if handler is not None and not callable(handler):
1214 raise TypeError('A callable object or None is expected, '
1215 'got {!r}'.format(handler))
1216 self._exception_handler = handler
1217
1218 def default_exception_handler(self, context):
1219 """Default exception handler.
1220
1221 This is called when an exception occurs and no exception
1222 handler is set, and can be called by a custom exception
1223 handler that wants to defer to the default behavior.
1224
Antoine Pitrou921e9432017-11-07 17:23:29 +01001225 This default handler logs the error message and other
1226 context-dependent information. In debug mode, a truncated
1227 stack trace is also appended showing where the given object
1228 (e.g. a handle or future or task) was created, if any.
1229
Victor Stinneracdb7822014-07-14 18:33:40 +02001230 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001231 `call_exception_handler()`.
1232 """
1233 message = context.get('message')
1234 if not message:
1235 message = 'Unhandled exception in event loop'
1236
1237 exception = context.get('exception')
1238 if exception is not None:
1239 exc_info = (type(exception), exception, exception.__traceback__)
1240 else:
1241 exc_info = False
1242
Victor Stinnerff018e42015-01-28 00:30:40 +01001243 if ('source_traceback' not in context
1244 and self._current_handle is not None
Victor Stinner9b524d52015-01-26 11:05:12 +01001245 and self._current_handle._source_traceback):
1246 context['handle_traceback'] = self._current_handle._source_traceback
1247
Yury Selivanov569efa22014-02-18 18:02:19 -05001248 log_lines = [message]
1249 for key in sorted(context):
1250 if key in {'message', 'exception'}:
1251 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001252 value = context[key]
1253 if key == 'source_traceback':
1254 tb = ''.join(traceback.format_list(value))
1255 value = 'Object created at (most recent call last):\n'
1256 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001257 elif key == 'handle_traceback':
1258 tb = ''.join(traceback.format_list(value))
1259 value = 'Handle created at (most recent call last):\n'
1260 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001261 else:
1262 value = repr(value)
1263 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -05001264
1265 logger.error('\n'.join(log_lines), exc_info=exc_info)
1266
1267 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001268 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001269
Victor Stinneracdb7822014-07-14 18:33:40 +02001270 The context argument is a dict containing the following keys:
1271
Yury Selivanov569efa22014-02-18 18:02:19 -05001272 - 'message': Error message;
1273 - 'exception' (optional): Exception object;
1274 - 'future' (optional): Future instance;
1275 - 'handle' (optional): Handle instance;
1276 - 'protocol' (optional): Protocol instance;
1277 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001278 - 'socket' (optional): Socket instance;
1279 - 'asyncgen' (optional): Asynchronous generator that caused
1280 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001281
Victor Stinneracdb7822014-07-14 18:33:40 +02001282 New keys maybe introduced in the future.
1283
1284 Note: do not overload this method in an event loop subclass.
1285 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001286 `set_exception_handler()` method.
1287 """
1288 if self._exception_handler is None:
1289 try:
1290 self.default_exception_handler(context)
1291 except Exception:
1292 # Second protection layer for unexpected errors
1293 # in the default implementation, as well as for subclassed
1294 # event loops with overloaded "default_exception_handler".
1295 logger.error('Exception in default exception handler',
1296 exc_info=True)
1297 else:
1298 try:
1299 self._exception_handler(self, context)
1300 except Exception as exc:
1301 # Exception in the user set custom exception handler.
1302 try:
1303 # Let's try default handler.
1304 self.default_exception_handler({
1305 'message': 'Unhandled error in exception handler',
1306 'exception': exc,
1307 'context': context,
1308 })
1309 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001310 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001311 # overloaded.
1312 logger.error('Exception in default exception handler '
1313 'while handling an unexpected error '
1314 'in custom exception handler',
1315 exc_info=True)
1316
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001317 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001318 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001319 assert isinstance(handle, events.Handle), 'A Handle is required here'
1320 if handle._cancelled:
1321 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001322 assert not isinstance(handle, events.TimerHandle)
1323 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001324
1325 def _add_callback_signalsafe(self, handle):
1326 """Like _add_callback() but called from a signal handler."""
1327 self._add_callback(handle)
1328 self._write_to_self()
1329
Yury Selivanov592ada92014-09-25 12:07:56 -04001330 def _timer_handle_cancelled(self, handle):
1331 """Notification that a TimerHandle has been cancelled."""
1332 if handle._scheduled:
1333 self._timer_cancelled_count += 1
1334
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001335 def _run_once(self):
1336 """Run one full iteration of the event loop.
1337
1338 This calls all currently ready callbacks, polls for I/O,
1339 schedules the resulting callbacks, and finally schedules
1340 'call_later' callbacks.
1341 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001342
Yury Selivanov592ada92014-09-25 12:07:56 -04001343 sched_count = len(self._scheduled)
1344 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1345 self._timer_cancelled_count / sched_count >
1346 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001347 # Remove delayed calls that were cancelled if their number
1348 # is too high
1349 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001350 for handle in self._scheduled:
1351 if handle._cancelled:
1352 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001353 else:
1354 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001355
Victor Stinner68da8fc2014-09-30 18:08:36 +02001356 heapq.heapify(new_scheduled)
1357 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001358 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001359 else:
1360 # Remove delayed calls that were cancelled from head of queue.
1361 while self._scheduled and self._scheduled[0]._cancelled:
1362 self._timer_cancelled_count -= 1
1363 handle = heapq.heappop(self._scheduled)
1364 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001365
1366 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001367 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001368 timeout = 0
1369 elif self._scheduled:
1370 # Compute the desired timeout.
1371 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001372 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001373
Victor Stinner770e48d2014-07-11 11:58:33 +02001374 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001375 t0 = self.time()
1376 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001377 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001378 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001379 level = logging.INFO
1380 else:
1381 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001382 nevent = len(event_list)
1383 if timeout is None:
1384 logger.log(level, 'poll took %.3f ms: %s events',
1385 dt * 1e3, nevent)
1386 elif nevent:
1387 logger.log(level,
1388 'poll %.3f ms took %.3f ms: %s events',
1389 timeout * 1e3, dt * 1e3, nevent)
1390 elif dt >= 1.0:
1391 logger.log(level,
1392 'poll %.3f ms took %.3f ms: timeout',
1393 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001394 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001395 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001396 self._process_events(event_list)
1397
1398 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001399 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001400 while self._scheduled:
1401 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001402 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001403 break
1404 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001405 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001406 self._ready.append(handle)
1407
1408 # This is the only place where callbacks are actually *called*.
1409 # All other places just add them to ready.
1410 # Note: We run all currently scheduled callbacks, but not any
1411 # callbacks scheduled by callbacks run this time around --
1412 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001413 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001414 ntodo = len(self._ready)
1415 for i in range(ntodo):
1416 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001417 if handle._cancelled:
1418 continue
1419 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001420 try:
1421 self._current_handle = handle
1422 t0 = self.time()
1423 handle._run()
1424 dt = self.time() - t0
1425 if dt >= self.slow_callback_duration:
1426 logger.warning('Executing %s took %.3f seconds',
1427 _format_handle(handle), dt)
1428 finally:
1429 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001430 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001431 handle._run()
1432 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001433
Yury Selivanove8944cb2015-05-12 11:43:04 -04001434 def _set_coroutine_wrapper(self, enabled):
1435 try:
1436 set_wrapper = sys.set_coroutine_wrapper
1437 get_wrapper = sys.get_coroutine_wrapper
1438 except AttributeError:
1439 return
1440
1441 enabled = bool(enabled)
Yury Selivanov996083d2015-08-04 15:37:24 -04001442 if self._coroutine_wrapper_set == enabled:
Yury Selivanove8944cb2015-05-12 11:43:04 -04001443 return
1444
1445 wrapper = coroutines.debug_wrapper
1446 current_wrapper = get_wrapper()
1447
1448 if enabled:
1449 if current_wrapper not in (None, wrapper):
1450 warnings.warn(
1451 "loop.set_debug(True): cannot set debug coroutine "
1452 "wrapper; another wrapper is already set %r" %
1453 current_wrapper, RuntimeWarning)
1454 else:
1455 set_wrapper(wrapper)
1456 self._coroutine_wrapper_set = True
1457 else:
1458 if current_wrapper not in (None, wrapper):
1459 warnings.warn(
1460 "loop.set_debug(False): cannot unset debug coroutine "
1461 "wrapper; another wrapper was set %r" %
1462 current_wrapper, RuntimeWarning)
1463 else:
1464 set_wrapper(None)
1465 self._coroutine_wrapper_set = False
1466
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001467 def get_debug(self):
1468 return self._debug
1469
1470 def set_debug(self, enabled):
1471 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001472
Yury Selivanove8944cb2015-05-12 11:43:04 -04001473 if self.is_running():
1474 self._set_coroutine_wrapper(enabled)