blob: e5e9394835f7e0007804b2b7c9b9a5965fff27bb [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
Yury Selivanovd5c2a622015-12-16 19:31:17 -050019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020021import inspect
Yury Selivanovd5c2a622015-12-16 19:31:17 -050022import ipaddress
Victor Stinner5e4a7d82015-09-21 18:33:43 +020023import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020025import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import socket
27import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010028import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020030import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010032import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Yury Selivanov2a8911c2015-08-04 15:56:33 -040034from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020035from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036from . import events
37from . import futures
38from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020039from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070040from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42
Victor Stinner8c1a4a22015-01-06 01:03:58 +010043__all__ = ['BaseEventLoop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
45
46# Argument for default thread pool executor creation.
47_MAX_WORKERS = 5
48
Yury Selivanov592ada92014-09-25 12:07:56 -040049# Minimum number of _scheduled timer handles before cleanup of
50# cancelled handles is performed.
51_MIN_SCHEDULED_TIMER_HANDLES = 100
52
53# Minimum fraction of _scheduled timer handles that are cancelled
54# before cleanup of cancelled handles is performed.
55_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056
Victor Stinnerc94a93a2016-04-01 21:43:39 +020057# Exceptions which must not call the exception handler in fatal error
58# methods (_fatal_error())
59_FATAL_ERROR_IGNORE = (BrokenPipeError,
60 ConnectionResetError, ConnectionAbortedError)
61
62
Victor Stinner0e6f52a2014-06-20 17:34:15 +020063def _format_handle(handle):
64 cb = handle._callback
65 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
66 # format the task
67 return repr(cb.__self__)
68 else:
69 return str(handle)
70
71
Victor Stinneracdb7822014-07-14 18:33:40 +020072def _format_pipe(fd):
73 if fd == subprocess.PIPE:
74 return '<pipe>'
75 elif fd == subprocess.STDOUT:
76 return '<stdout>'
77 else:
78 return repr(fd)
79
80
Yury Selivanovd5c2a622015-12-16 19:31:17 -050081# Linux's sock.type is a bitmask that can include extra info about socket.
82_SOCKET_TYPE_MASK = 0
83if hasattr(socket, 'SOCK_NONBLOCK'):
84 _SOCKET_TYPE_MASK |= socket.SOCK_NONBLOCK
85if hasattr(socket, 'SOCK_CLOEXEC'):
86 _SOCKET_TYPE_MASK |= socket.SOCK_CLOEXEC
87
88
89@functools.lru_cache(maxsize=1024)
90def _ipaddr_info(host, port, family, type, proto):
91 # Try to skip getaddrinfo if "host" is already an IP. Since getaddrinfo
92 # blocks on an exclusive lock on some platforms, users might handle name
93 # resolution in their own code and pass in resolved IPs.
94 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or host is None:
95 return None
96
97 type &= ~_SOCKET_TYPE_MASK
98 if type == socket.SOCK_STREAM:
99 proto = socket.IPPROTO_TCP
100 elif type == socket.SOCK_DGRAM:
101 proto = socket.IPPROTO_UDP
102 else:
103 return None
104
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400105 if port in {None, ''}:
106 port = 0
107 elif isinstance(port, (bytes, str)):
108 port = int(port)
109
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500110 if hasattr(socket, 'inet_pton'):
111 if family == socket.AF_UNSPEC:
112 afs = [socket.AF_INET, socket.AF_INET6]
113 else:
114 afs = [family]
115
116 for af in afs:
117 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
118 # like '::1%lo0', so strip it. If we happen to make an invalid
119 # address look valid, we fail later in sock.connect or sock.bind.
120 try:
121 if af == socket.AF_INET6:
122 socket.inet_pton(af, host.partition('%')[0])
123 else:
124 socket.inet_pton(af, host)
125 return af, type, proto, '', (host, port)
126 except OSError:
127 pass
128
129 # "host" is not an IP address.
130 return None
131
132 # No inet_pton. (On Windows it's only available since Python 3.4.)
133 # Even though getaddrinfo with AI_NUMERICHOST would be non-blocking, it
134 # still requires a lock on some platforms, and waiting for that lock could
135 # block the event loop. Use ipaddress instead, it's just text parsing.
136 try:
137 addr = ipaddress.IPv4Address(host)
138 except ValueError:
139 try:
140 addr = ipaddress.IPv6Address(host.partition('%')[0])
141 except ValueError:
142 return None
143
144 af = socket.AF_INET if addr.version == 4 else socket.AF_INET6
145 if family not in (socket.AF_UNSPEC, af):
146 # "host" is wrong IP version for "family".
147 return None
148
149 return af, type, proto, '', (host, port)
150
151
Victor Stinner1b0580b2014-02-13 09:24:37 +0100152def _check_resolved_address(sock, address):
153 # Ensure that the address is already resolved to avoid the trap of hanging
154 # the entire event loop when the address requires doing a DNS lookup.
Victor Stinner2fc23132015-02-04 14:51:23 +0100155
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500156 if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
Victor Stinner1b0580b2014-02-13 09:24:37 +0100157 return
158
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500159 host, port = address[:2]
160 if _ipaddr_info(host, port, sock.family, sock.type, sock.proto) is None:
161 raise ValueError("address must be resolved (IP address),"
162 " got host %r" % host)
Victor Stinner1b0580b2014-02-13 09:24:37 +0100163
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100165def _run_until_complete_cb(fut):
166 exc = fut._exception
167 if (isinstance(exc, BaseException)
168 and not isinstance(exc, Exception)):
169 # Issue #22429: run_forever() already finished, no need to
170 # stop it.
171 return
Guido van Rossum41f69f42015-11-19 13:28:47 -0800172 fut._loop.stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100173
174
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175class Server(events.AbstractServer):
176
177 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200178 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200180 self._active_count = 0
181 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182
Victor Stinnere912e652014-07-12 03:11:53 +0200183 def __repr__(self):
184 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
185
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200186 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200188 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200190 def _detach(self):
191 assert self._active_count > 0
192 self._active_count -= 1
193 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 self._wakeup()
195
196 def close(self):
197 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200198 if sockets is None:
199 return
200 self.sockets = None
201 for sock in sockets:
202 self._loop._stop_serving(sock)
203 if self._active_count == 0:
204 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200207 waiters = self._waiters
208 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 for waiter in waiters:
210 if not waiter.done():
211 waiter.set_result(waiter)
212
Victor Stinnerf951d282014-06-29 00:46:45 +0200213 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200215 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400217 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200218 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 yield from waiter
220
221
222class BaseEventLoop(events.AbstractEventLoop):
223
224 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400225 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200226 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800227 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 self._ready = collections.deque()
229 self._scheduled = []
230 self._default_executor = None
231 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100232 # Identifier of the thread running the event loop, or None if the
233 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100234 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100235 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500236 self._exception_handler = None
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400237 self.set_debug((not sys.flags.ignore_environment
238 and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200239 # In debug mode, if the execution of a callback or a step of a task
240 # exceed this duration in seconds, the slow callback/task is logged.
241 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100242 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400243 self._task_factory = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400244 self._coroutine_wrapper_set = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200246 def __repr__(self):
247 return ('<%s running=%s closed=%s debug=%s>'
248 % (self.__class__.__name__, self.is_running(),
249 self.is_closed(), self.get_debug()))
250
Yury Selivanov7661db62016-05-16 15:38:39 -0400251 def create_future(self):
252 """Create a Future object attached to the loop."""
253 return futures.Future(loop=self)
254
Victor Stinner896a25a2014-07-08 11:29:25 +0200255 def create_task(self, coro):
256 """Schedule a coroutine object.
257
Victor Stinneracdb7822014-07-14 18:33:40 +0200258 Return a task object.
259 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100260 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400261 if self._task_factory is None:
262 task = tasks.Task(coro, loop=self)
263 if task._source_traceback:
264 del task._source_traceback[-1]
265 else:
266 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200267 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200268
Yury Selivanov740169c2015-05-11 14:23:38 -0400269 def set_task_factory(self, factory):
270 """Set a task factory that will be used by loop.create_task().
271
272 If factory is None the default task factory will be set.
273
274 If factory is a callable, it should have a signature matching
275 '(loop, coro)', where 'loop' will be a reference to the active
276 event loop, 'coro' will be a coroutine object. The callable
277 must return a Future.
278 """
279 if factory is not None and not callable(factory):
280 raise TypeError('task factory must be a callable or None')
281 self._task_factory = factory
282
283 def get_task_factory(self):
284 """Return a task factory, or None if the default one is in use."""
285 return self._task_factory
286
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 def _make_socket_transport(self, sock, protocol, waiter=None, *,
288 extra=None, server=None):
289 """Create socket transport."""
290 raise NotImplementedError
291
Victor Stinner15cc6782015-01-09 00:09:10 +0100292 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
293 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 extra=None, server=None):
295 """Create SSL transport."""
296 raise NotImplementedError
297
298 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200299 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 """Create datagram transport."""
301 raise NotImplementedError
302
303 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
304 extra=None):
305 """Create read pipe transport."""
306 raise NotImplementedError
307
308 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
309 extra=None):
310 """Create write pipe transport."""
311 raise NotImplementedError
312
Victor Stinnerf951d282014-06-29 00:46:45 +0200313 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 def _make_subprocess_transport(self, protocol, args, shell,
315 stdin, stdout, stderr, bufsize,
316 extra=None, **kwargs):
317 """Create subprocess transport."""
318 raise NotImplementedError
319
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200321 """Write a byte to self-pipe, to wake up the event loop.
322
323 This may be called from a different thread.
324
325 The subclass is responsible for implementing the self-pipe.
326 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 raise NotImplementedError
328
329 def _process_events(self, event_list):
330 """Process selector events."""
331 raise NotImplementedError
332
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200333 def _check_closed(self):
334 if self._closed:
335 raise RuntimeError('Event loop is closed')
336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 def run_forever(self):
338 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200339 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100340 if self.is_running():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 raise RuntimeError('Event loop is running.')
Yury Selivanove8944cb2015-05-12 11:43:04 -0400342 self._set_coroutine_wrapper(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100343 self._thread_id = threading.get_ident()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 try:
345 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800346 self._run_once()
347 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 break
349 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800350 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100351 self._thread_id = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400352 self._set_coroutine_wrapper(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 def run_until_complete(self, future):
355 """Run until the Future is done.
356
357 If the argument is a coroutine, it is wrapped in a Task.
358
Victor Stinneracdb7822014-07-14 18:33:40 +0200359 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 with the same coroutine twice -- it would wrap it in two
361 different Tasks and that can't be good.
362
363 Return the Future's result, or raise its exception.
364 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200365 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200366
367 new_task = not isinstance(future, futures.Future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400368 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200369 if new_task:
370 # An exception is raised if the future didn't complete, so there
371 # is no need to log the "destroy pending task" message
372 future._log_destroy_pending = False
373
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100374 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200375 try:
376 self.run_forever()
377 except:
378 if new_task and future.done() and not future.cancelled():
379 # The coroutine raised a BaseException. Consume the exception
380 # to not log a warning, the caller doesn't have access to the
381 # local task.
382 future.exception()
383 raise
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100384 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 if not future.done():
386 raise RuntimeError('Event loop stopped before Future completed.')
387
388 return future.result()
389
390 def stop(self):
391 """Stop running the event loop.
392
Guido van Rossum41f69f42015-11-19 13:28:47 -0800393 Every callback already scheduled will still run. This simply informs
394 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800396 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200398 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700399 """Close the event loop.
400
401 This clears the queues and shuts down the executor,
402 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200403
404 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700405 """
Victor Stinner956de692014-12-26 21:07:52 +0100406 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200407 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200408 if self._closed:
409 return
Victor Stinnere912e652014-07-12 03:11:53 +0200410 if self._debug:
411 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400412 self._closed = True
413 self._ready.clear()
414 self._scheduled.clear()
415 executor = self._default_executor
416 if executor is not None:
417 self._default_executor = None
418 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200419
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200420 def is_closed(self):
421 """Returns True if the event loop was closed."""
422 return self._closed
423
Victor Stinner978a9af2015-01-29 17:50:58 +0100424 # On Python 3.3 and older, objects with a destructor part of a reference
425 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
426 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400427 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100428 def __del__(self):
429 if not self.is_closed():
430 warnings.warn("unclosed event loop %r" % self, ResourceWarning)
431 if not self.is_running():
432 self.close()
433
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200435 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100436 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
438 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200439 """Return the time according to the event loop's clock.
440
441 This is a float expressed in seconds since an epoch, but the
442 epoch, precision, accuracy and drift are unspecified and may
443 differ per event loop.
444 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 return time.monotonic()
446
447 def call_later(self, delay, callback, *args):
448 """Arrange for a callback to be called at a given time.
449
450 Return a Handle: an opaque object with a cancel() method that
451 can be used to cancel the call.
452
453 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200454 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
456 Each callback will be called exactly once. If two callbacks
457 are scheduled for exactly the same time, it undefined which
458 will be called first.
459
460 Any positional arguments after the callback will be passed to
461 the callback when it is called.
462 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200463 timer = self.call_at(self.time() + delay, callback, *args)
464 if timer._source_traceback:
465 del timer._source_traceback[-1]
466 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467
468 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200469 """Like call_later(), but uses an absolute time.
470
471 Absolute time corresponds to the event loop's time() method.
472 """
Victor Stinner2d99d932014-11-20 15:03:52 +0100473 if (coroutines.iscoroutine(callback)
474 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100475 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100476 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100477 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100478 self._check_thread()
Yury Selivanov569efa22014-02-18 18:02:19 -0500479 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200480 if timer._source_traceback:
481 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400483 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 return timer
485
486 def call_soon(self, callback, *args):
487 """Arrange for a callback to be called as soon as possible.
488
Victor Stinneracdb7822014-07-14 18:33:40 +0200489 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 order in which they are registered. Each callback will be
491 called exactly once.
492
493 Any positional arguments after the callback will be passed to
494 the callback when it is called.
495 """
Victor Stinner956de692014-12-26 21:07:52 +0100496 if self._debug:
497 self._check_thread()
498 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200499 if handle._source_traceback:
500 del handle._source_traceback[-1]
501 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100502
Victor Stinner956de692014-12-26 21:07:52 +0100503 def _call_soon(self, callback, args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100504 if (coroutines.iscoroutine(callback)
505 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100506 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100507 self._check_closed()
Yury Selivanov569efa22014-02-18 18:02:19 -0500508 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200509 if handle._source_traceback:
510 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 self._ready.append(handle)
512 return handle
513
Victor Stinner956de692014-12-26 21:07:52 +0100514 def _check_thread(self):
515 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100516
Victor Stinneracdb7822014-07-14 18:33:40 +0200517 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100518 likely behave incorrectly when the assumption is violated.
519
Victor Stinneracdb7822014-07-14 18:33:40 +0200520 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100521 responsible for checking this condition for performance reasons.
522 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100523 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200524 return
Victor Stinner956de692014-12-26 21:07:52 +0100525 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100526 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100527 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200528 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100529 "than the current one")
530
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200532 """Like call_soon(), but thread-safe."""
Victor Stinner956de692014-12-26 21:07:52 +0100533 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200534 if handle._source_traceback:
535 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 self._write_to_self()
537 return handle
538
Yury Selivanov740169c2015-05-11 14:23:38 -0400539 def run_in_executor(self, executor, func, *args):
540 if (coroutines.iscoroutine(func)
541 or coroutines.iscoroutinefunction(func)):
Victor Stinner2d99d932014-11-20 15:03:52 +0100542 raise TypeError("coroutines cannot be used with run_in_executor()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100543 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400544 if isinstance(func, events.Handle):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 assert not args
Yury Selivanov740169c2015-05-11 14:23:38 -0400546 assert not isinstance(func, events.TimerHandle)
547 if func._cancelled:
Yury Selivanov7661db62016-05-16 15:38:39 -0400548 f = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 f.set_result(None)
550 return f
Yury Selivanov740169c2015-05-11 14:23:38 -0400551 func, args = func._callback, func._args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 if executor is None:
553 executor = self._default_executor
554 if executor is None:
555 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
556 self._default_executor = executor
Yury Selivanov740169c2015-05-11 14:23:38 -0400557 return futures.wrap_future(executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558
559 def set_default_executor(self, executor):
560 self._default_executor = executor
561
Victor Stinnere912e652014-07-12 03:11:53 +0200562 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
563 msg = ["%s:%r" % (host, port)]
564 if family:
565 msg.append('family=%r' % family)
566 if type:
567 msg.append('type=%r' % type)
568 if proto:
569 msg.append('proto=%r' % proto)
570 if flags:
571 msg.append('flags=%r' % flags)
572 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200573 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200574
575 t0 = self.time()
576 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
577 dt = self.time() - t0
578
Victor Stinneracdb7822014-07-14 18:33:40 +0200579 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200580 % (msg, dt * 1e3, addrinfo))
581 if dt >= self.slow_callback_duration:
582 logger.info(msg)
583 else:
584 logger.debug(msg)
585 return addrinfo
586
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 def getaddrinfo(self, host, port, *,
588 family=0, type=0, proto=0, flags=0):
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500589 info = _ipaddr_info(host, port, family, type, proto)
590 if info is not None:
Yury Selivanov7661db62016-05-16 15:38:39 -0400591 fut = self.create_future()
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500592 fut.set_result([info])
593 return fut
594 elif self._debug:
Victor Stinnere912e652014-07-12 03:11:53 +0200595 return self.run_in_executor(None, self._getaddrinfo_debug,
596 host, port, family, type, proto, flags)
597 else:
598 return self.run_in_executor(None, socket.getaddrinfo,
599 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600
601 def getnameinfo(self, sockaddr, flags=0):
602 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
603
Victor Stinnerf951d282014-06-29 00:46:45 +0200604 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 def create_connection(self, protocol_factory, host=None, port=None, *,
606 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700607 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200608 """Connect to a TCP server.
609
610 Create a streaming transport connection to a given Internet host and
611 port: socket family AF_INET or socket.AF_INET6 depending on host (or
612 family if specified), socket type SOCK_STREAM. protocol_factory must be
613 a callable returning a protocol instance.
614
615 This method is a coroutine which will try to establish the connection
616 in the background. When successful, the coroutine returns a
617 (transport, protocol) pair.
618 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700619 if server_hostname is not None and not ssl:
620 raise ValueError('server_hostname is only meaningful with ssl')
621
622 if server_hostname is None and ssl:
623 # Use host as default for server_hostname. It is an error
624 # if host is empty or not set, e.g. when an
625 # already-connected socket was passed or when only a port
626 # is given. To avoid this error, you can pass
627 # server_hostname='' -- this will bypass the hostname
628 # check. (This also means that if host is a numeric
629 # IP/IPv6 address, we will attempt to verify that exact
630 # address; this will probably fail, but it is possible to
631 # create a certificate for a specific IP address, so we
632 # don't judge it here.)
633 if not host:
634 raise ValueError('You must set server_hostname '
635 'when using ssl without a host')
636 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700637
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 if host is not None or port is not None:
639 if sock is not None:
640 raise ValueError(
641 'host/port and sock can not be specified at the same time')
642
643 f1 = self.getaddrinfo(
644 host, port, family=family,
645 type=socket.SOCK_STREAM, proto=proto, flags=flags)
646 fs = [f1]
647 if local_addr is not None:
648 f2 = self.getaddrinfo(
649 *local_addr, family=family,
650 type=socket.SOCK_STREAM, proto=proto, flags=flags)
651 fs.append(f2)
652 else:
653 f2 = None
654
655 yield from tasks.wait(fs, loop=self)
656
657 infos = f1.result()
658 if not infos:
659 raise OSError('getaddrinfo() returned empty list')
660 if f2 is not None:
661 laddr_infos = f2.result()
662 if not laddr_infos:
663 raise OSError('getaddrinfo() returned empty list')
664
665 exceptions = []
666 for family, type, proto, cname, address in infos:
667 try:
668 sock = socket.socket(family=family, type=type, proto=proto)
669 sock.setblocking(False)
670 if f2 is not None:
671 for _, _, _, _, laddr in laddr_infos:
672 try:
673 sock.bind(laddr)
674 break
675 except OSError as exc:
676 exc = OSError(
677 exc.errno, 'error while '
678 'attempting to bind on address '
679 '{!r}: {}'.format(
680 laddr, exc.strerror.lower()))
681 exceptions.append(exc)
682 else:
683 sock.close()
684 sock = None
685 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200686 if self._debug:
687 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 yield from self.sock_connect(sock, address)
689 except OSError as exc:
690 if sock is not None:
691 sock.close()
692 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200693 except:
694 if sock is not None:
695 sock.close()
696 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700697 else:
698 break
699 else:
700 if len(exceptions) == 1:
701 raise exceptions[0]
702 else:
703 # If they all have the same str(), raise one.
704 model = str(exceptions[0])
705 if all(str(exc) == model for exc in exceptions):
706 raise exceptions[0]
707 # Raise a combined exception so the user can see all
708 # the various error messages.
709 raise OSError('Multiple exceptions: {}'.format(
710 ', '.join(str(exc) for exc in exceptions)))
711
712 elif sock is None:
713 raise ValueError(
714 'host and port was not specified and no sock specified')
715
716 sock.setblocking(False)
717
Yury Selivanovb057c522014-02-18 12:15:06 -0500718 transport, protocol = yield from self._create_connection_transport(
719 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200720 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200721 # Get the socket from the transport because SSL transport closes
722 # the old socket and creates a new SSL socket
723 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200724 logger.debug("%r connected to %s:%r: (%r, %r)",
725 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500726 return transport, protocol
727
Victor Stinnerf951d282014-06-29 00:46:45 +0200728 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500729 def _create_connection_transport(self, sock, protocol_factory, ssl,
730 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400732 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700733 if ssl:
734 sslcontext = None if isinstance(ssl, bool) else ssl
735 transport = self._make_ssl_transport(
736 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700737 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 else:
739 transport = self._make_socket_transport(sock, protocol, waiter)
740
Victor Stinner29ad0112015-01-15 00:04:21 +0100741 try:
742 yield from waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100743 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100744 transport.close()
745 raise
746
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747 return transport, protocol
748
Victor Stinnerf951d282014-06-29 00:46:45 +0200749 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750 def create_datagram_endpoint(self, protocol_factory,
751 local_addr=None, remote_addr=None, *,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700752 family=0, proto=0, flags=0,
753 reuse_address=None, reuse_port=None,
754 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700755 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700756 if sock is not None:
757 if (local_addr or remote_addr or
758 family or proto or flags or
759 reuse_address or reuse_port or allow_broadcast):
760 # show the problematic kwargs in exception msg
761 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
762 family=family, proto=proto, flags=flags,
763 reuse_address=reuse_address, reuse_port=reuse_port,
764 allow_broadcast=allow_broadcast)
765 problems = ', '.join(
766 '{}={}'.format(k, v) for k, v in opts.items() if v)
767 raise ValueError(
768 'socket modifier keyword arguments can not be used '
769 'when sock is specified. ({})'.format(problems))
770 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700771 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700772 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700773 if not (local_addr or remote_addr):
774 if family == 0:
775 raise ValueError('unexpected address family')
776 addr_pairs_info = (((family, proto), (None, None)),)
777 else:
778 # join address by (family, protocol)
779 addr_infos = collections.OrderedDict()
780 for idx, addr in ((0, local_addr), (1, remote_addr)):
781 if addr is not None:
782 assert isinstance(addr, tuple) and len(addr) == 2, (
783 '2-tuple is expected')
784
785 infos = yield from self.getaddrinfo(
786 *addr, family=family, type=socket.SOCK_DGRAM,
787 proto=proto, flags=flags)
788 if not infos:
789 raise OSError('getaddrinfo() returned empty list')
790
791 for fam, _, pro, _, address in infos:
792 key = (fam, pro)
793 if key not in addr_infos:
794 addr_infos[key] = [None, None]
795 addr_infos[key][idx] = address
796
797 # each addr has to have info for each (family, proto) pair
798 addr_pairs_info = [
799 (key, addr_pair) for key, addr_pair in addr_infos.items()
800 if not ((local_addr and addr_pair[0] is None) or
801 (remote_addr and addr_pair[1] is None))]
802
803 if not addr_pairs_info:
804 raise ValueError('can not get address information')
805
806 exceptions = []
807
808 if reuse_address is None:
809 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
810
811 for ((family, proto),
812 (local_address, remote_address)) in addr_pairs_info:
813 sock = None
814 r_addr = None
815 try:
816 sock = socket.socket(
817 family=family, type=socket.SOCK_DGRAM, proto=proto)
818 if reuse_address:
819 sock.setsockopt(
820 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
821 if reuse_port:
822 if not hasattr(socket, 'SO_REUSEPORT'):
823 raise ValueError(
824 'reuse_port not supported by socket module')
825 else:
826 sock.setsockopt(
827 socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
828 if allow_broadcast:
829 sock.setsockopt(
830 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
831 sock.setblocking(False)
832
833 if local_addr:
834 sock.bind(local_address)
835 if remote_addr:
836 yield from self.sock_connect(sock, remote_address)
837 r_addr = remote_address
838 except OSError as exc:
839 if sock is not None:
840 sock.close()
841 exceptions.append(exc)
842 except:
843 if sock is not None:
844 sock.close()
845 raise
846 else:
847 break
848 else:
849 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700850
851 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400852 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700853 transport = self._make_datagram_transport(
854 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200855 if self._debug:
856 if local_addr:
857 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
858 "created: (%r, %r)",
859 local_addr, remote_addr, transport, protocol)
860 else:
861 logger.debug("Datagram endpoint remote_addr=%r created: "
862 "(%r, %r)",
863 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100864
865 try:
866 yield from waiter
867 except:
868 transport.close()
869 raise
870
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700871 return transport, protocol
872
Victor Stinnerf951d282014-06-29 00:46:45 +0200873 @coroutine
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200874 def _create_server_getaddrinfo(self, host, port, family, flags):
875 infos = yield from self.getaddrinfo(host, port, family=family,
876 type=socket.SOCK_STREAM,
877 flags=flags)
878 if not infos:
879 raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
880 return infos
881
882 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700883 def create_server(self, protocol_factory, host=None, port=None,
884 *,
885 family=socket.AF_UNSPEC,
886 flags=socket.AI_PASSIVE,
887 sock=None,
888 backlog=100,
889 ssl=None,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700890 reuse_address=None,
891 reuse_port=None):
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200892 """Create a TCP server.
893
894 The host parameter can be a string, in that case the TCP server is bound
895 to host and port.
896
897 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -0500898 the TCP server is bound to all hosts of the sequence. If a host
899 appears multiple times (possibly indirectly e.g. when hostnames
900 resolve to the same IP address), the server is only bound once to that
901 host.
Victor Stinnerd1432092014-06-19 17:11:49 +0200902
Victor Stinneracdb7822014-07-14 18:33:40 +0200903 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200904
905 This method is a coroutine.
906 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700907 if isinstance(ssl, bool):
908 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700909 if host is not None or port is not None:
910 if sock is not None:
911 raise ValueError(
912 'host/port and sock can not be specified at the same time')
913
914 AF_INET6 = getattr(socket, 'AF_INET6', 0)
915 if reuse_address is None:
916 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
917 sockets = []
918 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200919 hosts = [None]
920 elif (isinstance(host, str) or
921 not isinstance(host, collections.Iterable)):
922 hosts = [host]
923 else:
924 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700925
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200926 fs = [self._create_server_getaddrinfo(host, port, family=family,
927 flags=flags)
928 for host in hosts]
929 infos = yield from tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -0500930 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700931
932 completed = False
933 try:
934 for res in infos:
935 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700936 try:
937 sock = socket.socket(af, socktype, proto)
938 except socket.error:
939 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200940 if self._debug:
941 logger.warning('create_server() failed to create '
942 'socket.socket(%r, %r, %r)',
943 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700944 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700945 sockets.append(sock)
946 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700947 sock.setsockopt(
948 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
949 if reuse_port:
950 if not hasattr(socket, 'SO_REUSEPORT'):
951 raise ValueError(
952 'reuse_port not supported by socket module')
953 else:
954 sock.setsockopt(
955 socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700956 # Disable IPv4/IPv6 dual stack support (enabled by
957 # default on Linux) which makes a single socket
958 # listen on both address families.
959 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
960 sock.setsockopt(socket.IPPROTO_IPV6,
961 socket.IPV6_V6ONLY,
962 True)
963 try:
964 sock.bind(sa)
965 except OSError as err:
966 raise OSError(err.errno, 'error while attempting '
967 'to bind on address %r: %s'
968 % (sa, err.strerror.lower()))
969 completed = True
970 finally:
971 if not completed:
972 for sock in sockets:
973 sock.close()
974 else:
975 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200976 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700977 sockets = [sock]
978
979 server = Server(self, sockets)
980 for sock in sockets:
981 sock.listen(backlog)
982 sock.setblocking(False)
983 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200984 if self._debug:
985 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700986 return server
987
Victor Stinnerf951d282014-06-29 00:46:45 +0200988 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700989 def connect_read_pipe(self, protocol_factory, pipe):
990 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400991 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700992 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100993
994 try:
995 yield from waiter
996 except:
997 transport.close()
998 raise
999
Victor Stinneracdb7822014-07-14 18:33:40 +02001000 if self._debug:
1001 logger.debug('Read pipe %r connected: (%r, %r)',
1002 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001003 return transport, protocol
1004
Victor Stinnerf951d282014-06-29 00:46:45 +02001005 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001006 def connect_write_pipe(self, protocol_factory, pipe):
1007 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001008 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001009 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001010
1011 try:
1012 yield from waiter
1013 except:
1014 transport.close()
1015 raise
1016
Victor Stinneracdb7822014-07-14 18:33:40 +02001017 if self._debug:
1018 logger.debug('Write pipe %r connected: (%r, %r)',
1019 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001020 return transport, protocol
1021
Victor Stinneracdb7822014-07-14 18:33:40 +02001022 def _log_subprocess(self, msg, stdin, stdout, stderr):
1023 info = [msg]
1024 if stdin is not None:
1025 info.append('stdin=%s' % _format_pipe(stdin))
1026 if stdout is not None and stderr == subprocess.STDOUT:
1027 info.append('stdout=stderr=%s' % _format_pipe(stdout))
1028 else:
1029 if stdout is not None:
1030 info.append('stdout=%s' % _format_pipe(stdout))
1031 if stderr is not None:
1032 info.append('stderr=%s' % _format_pipe(stderr))
1033 logger.debug(' '.join(info))
1034
Victor Stinnerf951d282014-06-29 00:46:45 +02001035 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001036 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
1037 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1038 universal_newlines=False, shell=True, bufsize=0,
1039 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001040 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001041 raise ValueError("cmd must be a string")
1042 if universal_newlines:
1043 raise ValueError("universal_newlines must be False")
1044 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001045 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001046 if bufsize != 0:
1047 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001048 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001049 if self._debug:
1050 # don't log parameters: they may contain sensitive information
1051 # (password) and may be too long
1052 debug_log = 'run shell command %r' % cmd
1053 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001054 transport = yield from self._make_subprocess_transport(
1055 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001056 if self._debug:
1057 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001058 return transport, protocol
1059
Victor Stinnerf951d282014-06-29 00:46:45 +02001060 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -05001061 def subprocess_exec(self, protocol_factory, program, *args,
1062 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1063 stderr=subprocess.PIPE, universal_newlines=False,
1064 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001065 if universal_newlines:
1066 raise ValueError("universal_newlines must be False")
1067 if shell:
1068 raise ValueError("shell must be False")
1069 if bufsize != 0:
1070 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001071 popen_args = (program,) + args
1072 for arg in popen_args:
1073 if not isinstance(arg, (str, bytes)):
1074 raise TypeError("program arguments must be "
1075 "a bytes or text string, not %s"
1076 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001077 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001078 if self._debug:
1079 # don't log parameters: they may contain sensitive information
1080 # (password) and may be too long
1081 debug_log = 'execute program %r' % program
1082 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001083 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001084 protocol, popen_args, False, stdin, stdout, stderr,
1085 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001086 if self._debug:
1087 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001088 return transport, protocol
1089
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001090 def get_exception_handler(self):
1091 """Return an exception handler, or None if the default one is in use.
1092 """
1093 return self._exception_handler
1094
Yury Selivanov569efa22014-02-18 18:02:19 -05001095 def set_exception_handler(self, handler):
1096 """Set handler as the new event loop exception handler.
1097
1098 If handler is None, the default exception handler will
1099 be set.
1100
1101 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001102 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001103 will be a reference to the active event loop, 'context'
1104 will be a dict object (see `call_exception_handler()`
1105 documentation for details about context).
1106 """
1107 if handler is not None and not callable(handler):
1108 raise TypeError('A callable object or None is expected, '
1109 'got {!r}'.format(handler))
1110 self._exception_handler = handler
1111
1112 def default_exception_handler(self, context):
1113 """Default exception handler.
1114
1115 This is called when an exception occurs and no exception
1116 handler is set, and can be called by a custom exception
1117 handler that wants to defer to the default behavior.
1118
Victor Stinneracdb7822014-07-14 18:33:40 +02001119 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001120 `call_exception_handler()`.
1121 """
1122 message = context.get('message')
1123 if not message:
1124 message = 'Unhandled exception in event loop'
1125
1126 exception = context.get('exception')
1127 if exception is not None:
1128 exc_info = (type(exception), exception, exception.__traceback__)
1129 else:
1130 exc_info = False
1131
Victor Stinnerff018e42015-01-28 00:30:40 +01001132 if ('source_traceback' not in context
1133 and self._current_handle is not None
Victor Stinner9b524d52015-01-26 11:05:12 +01001134 and self._current_handle._source_traceback):
1135 context['handle_traceback'] = self._current_handle._source_traceback
1136
Yury Selivanov569efa22014-02-18 18:02:19 -05001137 log_lines = [message]
1138 for key in sorted(context):
1139 if key in {'message', 'exception'}:
1140 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001141 value = context[key]
1142 if key == 'source_traceback':
1143 tb = ''.join(traceback.format_list(value))
1144 value = 'Object created at (most recent call last):\n'
1145 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001146 elif key == 'handle_traceback':
1147 tb = ''.join(traceback.format_list(value))
1148 value = 'Handle created at (most recent call last):\n'
1149 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001150 else:
1151 value = repr(value)
1152 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -05001153
1154 logger.error('\n'.join(log_lines), exc_info=exc_info)
1155
1156 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001157 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001158
Victor Stinneracdb7822014-07-14 18:33:40 +02001159 The context argument is a dict containing the following keys:
1160
Yury Selivanov569efa22014-02-18 18:02:19 -05001161 - 'message': Error message;
1162 - 'exception' (optional): Exception object;
1163 - 'future' (optional): Future instance;
1164 - 'handle' (optional): Handle instance;
1165 - 'protocol' (optional): Protocol instance;
1166 - 'transport' (optional): Transport instance;
1167 - 'socket' (optional): Socket instance.
1168
Victor Stinneracdb7822014-07-14 18:33:40 +02001169 New keys maybe introduced in the future.
1170
1171 Note: do not overload this method in an event loop subclass.
1172 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001173 `set_exception_handler()` method.
1174 """
1175 if self._exception_handler is None:
1176 try:
1177 self.default_exception_handler(context)
1178 except Exception:
1179 # Second protection layer for unexpected errors
1180 # in the default implementation, as well as for subclassed
1181 # event loops with overloaded "default_exception_handler".
1182 logger.error('Exception in default exception handler',
1183 exc_info=True)
1184 else:
1185 try:
1186 self._exception_handler(self, context)
1187 except Exception as exc:
1188 # Exception in the user set custom exception handler.
1189 try:
1190 # Let's try default handler.
1191 self.default_exception_handler({
1192 'message': 'Unhandled error in exception handler',
1193 'exception': exc,
1194 'context': context,
1195 })
1196 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001197 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001198 # overloaded.
1199 logger.error('Exception in default exception handler '
1200 'while handling an unexpected error '
1201 'in custom exception handler',
1202 exc_info=True)
1203
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001204 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001205 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001206 assert isinstance(handle, events.Handle), 'A Handle is required here'
1207 if handle._cancelled:
1208 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001209 assert not isinstance(handle, events.TimerHandle)
1210 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001211
1212 def _add_callback_signalsafe(self, handle):
1213 """Like _add_callback() but called from a signal handler."""
1214 self._add_callback(handle)
1215 self._write_to_self()
1216
Yury Selivanov592ada92014-09-25 12:07:56 -04001217 def _timer_handle_cancelled(self, handle):
1218 """Notification that a TimerHandle has been cancelled."""
1219 if handle._scheduled:
1220 self._timer_cancelled_count += 1
1221
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001222 def _run_once(self):
1223 """Run one full iteration of the event loop.
1224
1225 This calls all currently ready callbacks, polls for I/O,
1226 schedules the resulting callbacks, and finally schedules
1227 'call_later' callbacks.
1228 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001229
Yury Selivanov592ada92014-09-25 12:07:56 -04001230 sched_count = len(self._scheduled)
1231 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1232 self._timer_cancelled_count / sched_count >
1233 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001234 # Remove delayed calls that were cancelled if their number
1235 # is too high
1236 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001237 for handle in self._scheduled:
1238 if handle._cancelled:
1239 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001240 else:
1241 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001242
Victor Stinner68da8fc2014-09-30 18:08:36 +02001243 heapq.heapify(new_scheduled)
1244 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001245 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001246 else:
1247 # Remove delayed calls that were cancelled from head of queue.
1248 while self._scheduled and self._scheduled[0]._cancelled:
1249 self._timer_cancelled_count -= 1
1250 handle = heapq.heappop(self._scheduled)
1251 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001252
1253 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001254 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001255 timeout = 0
1256 elif self._scheduled:
1257 # Compute the desired timeout.
1258 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001259 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001260
Victor Stinner770e48d2014-07-11 11:58:33 +02001261 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001262 t0 = self.time()
1263 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001264 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001265 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001266 level = logging.INFO
1267 else:
1268 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001269 nevent = len(event_list)
1270 if timeout is None:
1271 logger.log(level, 'poll took %.3f ms: %s events',
1272 dt * 1e3, nevent)
1273 elif nevent:
1274 logger.log(level,
1275 'poll %.3f ms took %.3f ms: %s events',
1276 timeout * 1e3, dt * 1e3, nevent)
1277 elif dt >= 1.0:
1278 logger.log(level,
1279 'poll %.3f ms took %.3f ms: timeout',
1280 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001281 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001282 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001283 self._process_events(event_list)
1284
1285 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001286 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001287 while self._scheduled:
1288 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001289 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001290 break
1291 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001292 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001293 self._ready.append(handle)
1294
1295 # This is the only place where callbacks are actually *called*.
1296 # All other places just add them to ready.
1297 # Note: We run all currently scheduled callbacks, but not any
1298 # callbacks scheduled by callbacks run this time around --
1299 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001300 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001301 ntodo = len(self._ready)
1302 for i in range(ntodo):
1303 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001304 if handle._cancelled:
1305 continue
1306 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001307 try:
1308 self._current_handle = handle
1309 t0 = self.time()
1310 handle._run()
1311 dt = self.time() - t0
1312 if dt >= self.slow_callback_duration:
1313 logger.warning('Executing %s took %.3f seconds',
1314 _format_handle(handle), dt)
1315 finally:
1316 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001317 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001318 handle._run()
1319 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001320
Yury Selivanove8944cb2015-05-12 11:43:04 -04001321 def _set_coroutine_wrapper(self, enabled):
1322 try:
1323 set_wrapper = sys.set_coroutine_wrapper
1324 get_wrapper = sys.get_coroutine_wrapper
1325 except AttributeError:
1326 return
1327
1328 enabled = bool(enabled)
Yury Selivanov996083d2015-08-04 15:37:24 -04001329 if self._coroutine_wrapper_set == enabled:
Yury Selivanove8944cb2015-05-12 11:43:04 -04001330 return
1331
1332 wrapper = coroutines.debug_wrapper
1333 current_wrapper = get_wrapper()
1334
1335 if enabled:
1336 if current_wrapper not in (None, wrapper):
1337 warnings.warn(
1338 "loop.set_debug(True): cannot set debug coroutine "
1339 "wrapper; another wrapper is already set %r" %
1340 current_wrapper, RuntimeWarning)
1341 else:
1342 set_wrapper(wrapper)
1343 self._coroutine_wrapper_set = True
1344 else:
1345 if current_wrapper not in (None, wrapper):
1346 warnings.warn(
1347 "loop.set_debug(False): cannot unset debug coroutine "
1348 "wrapper; another wrapper was set %r" %
1349 current_wrapper, RuntimeWarning)
1350 else:
1351 set_wrapper(None)
1352 self._coroutine_wrapper_set = False
1353
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001354 def get_debug(self):
1355 return self._debug
1356
1357 def set_debug(self, enabled):
1358 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001359
Yury Selivanove8944cb2015-05-12 11:43:04 -04001360 if self.is_running():
1361 self._set_coroutine_wrapper(enabled)