blob: 3703480eba0e98d831c8827ecc62f2c9f8d8fc69 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
Yury Selivanovd5c2a622015-12-16 19:31:17 -050019import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020021import inspect
Yury Selivanovd5c2a622015-12-16 19:31:17 -050022import ipaddress
Victor Stinner5e4a7d82015-09-21 18:33:43 +020023import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020025import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import socket
27import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010028import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020030import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010032import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Yury Selivanov2a8911c2015-08-04 15:56:33 -040034from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020035from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036from . import events
37from . import futures
38from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020039from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070040from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42
Victor Stinner8c1a4a22015-01-06 01:03:58 +010043__all__ = ['BaseEventLoop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
45
46# Argument for default thread pool executor creation.
47_MAX_WORKERS = 5
48
Yury Selivanov592ada92014-09-25 12:07:56 -040049# Minimum number of _scheduled timer handles before cleanup of
50# cancelled handles is performed.
51_MIN_SCHEDULED_TIMER_HANDLES = 100
52
53# Minimum fraction of _scheduled timer handles that are cancelled
54# before cleanup of cancelled handles is performed.
55_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056
Victor Stinnerc94a93a2016-04-01 21:43:39 +020057# Exceptions which must not call the exception handler in fatal error
58# methods (_fatal_error())
59_FATAL_ERROR_IGNORE = (BrokenPipeError,
60 ConnectionResetError, ConnectionAbortedError)
61
62
Victor Stinner0e6f52a2014-06-20 17:34:15 +020063def _format_handle(handle):
64 cb = handle._callback
65 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
66 # format the task
67 return repr(cb.__self__)
68 else:
69 return str(handle)
70
71
Victor Stinneracdb7822014-07-14 18:33:40 +020072def _format_pipe(fd):
73 if fd == subprocess.PIPE:
74 return '<pipe>'
75 elif fd == subprocess.STDOUT:
76 return '<stdout>'
77 else:
78 return repr(fd)
79
80
Yury Selivanovd5c2a622015-12-16 19:31:17 -050081# Linux's sock.type is a bitmask that can include extra info about socket.
82_SOCKET_TYPE_MASK = 0
83if hasattr(socket, 'SOCK_NONBLOCK'):
84 _SOCKET_TYPE_MASK |= socket.SOCK_NONBLOCK
85if hasattr(socket, 'SOCK_CLOEXEC'):
86 _SOCKET_TYPE_MASK |= socket.SOCK_CLOEXEC
87
88
89@functools.lru_cache(maxsize=1024)
90def _ipaddr_info(host, port, family, type, proto):
91 # Try to skip getaddrinfo if "host" is already an IP. Since getaddrinfo
92 # blocks on an exclusive lock on some platforms, users might handle name
93 # resolution in their own code and pass in resolved IPs.
94 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or host is None:
95 return None
96
97 type &= ~_SOCKET_TYPE_MASK
98 if type == socket.SOCK_STREAM:
99 proto = socket.IPPROTO_TCP
100 elif type == socket.SOCK_DGRAM:
101 proto = socket.IPPROTO_UDP
102 else:
103 return None
104
105 if hasattr(socket, 'inet_pton'):
106 if family == socket.AF_UNSPEC:
107 afs = [socket.AF_INET, socket.AF_INET6]
108 else:
109 afs = [family]
110
111 for af in afs:
112 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
113 # like '::1%lo0', so strip it. If we happen to make an invalid
114 # address look valid, we fail later in sock.connect or sock.bind.
115 try:
116 if af == socket.AF_INET6:
117 socket.inet_pton(af, host.partition('%')[0])
118 else:
119 socket.inet_pton(af, host)
120 return af, type, proto, '', (host, port)
121 except OSError:
122 pass
123
124 # "host" is not an IP address.
125 return None
126
127 # No inet_pton. (On Windows it's only available since Python 3.4.)
128 # Even though getaddrinfo with AI_NUMERICHOST would be non-blocking, it
129 # still requires a lock on some platforms, and waiting for that lock could
130 # block the event loop. Use ipaddress instead, it's just text parsing.
131 try:
132 addr = ipaddress.IPv4Address(host)
133 except ValueError:
134 try:
135 addr = ipaddress.IPv6Address(host.partition('%')[0])
136 except ValueError:
137 return None
138
139 af = socket.AF_INET if addr.version == 4 else socket.AF_INET6
140 if family not in (socket.AF_UNSPEC, af):
141 # "host" is wrong IP version for "family".
142 return None
143
144 return af, type, proto, '', (host, port)
145
146
Victor Stinner1b0580b2014-02-13 09:24:37 +0100147def _check_resolved_address(sock, address):
148 # Ensure that the address is already resolved to avoid the trap of hanging
149 # the entire event loop when the address requires doing a DNS lookup.
Victor Stinner2fc23132015-02-04 14:51:23 +0100150
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500151 if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
Victor Stinner1b0580b2014-02-13 09:24:37 +0100152 return
153
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500154 host, port = address[:2]
155 if _ipaddr_info(host, port, sock.family, sock.type, sock.proto) is None:
156 raise ValueError("address must be resolved (IP address),"
157 " got host %r" % host)
Victor Stinner1b0580b2014-02-13 09:24:37 +0100158
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100160def _run_until_complete_cb(fut):
161 exc = fut._exception
162 if (isinstance(exc, BaseException)
163 and not isinstance(exc, Exception)):
164 # Issue #22429: run_forever() already finished, no need to
165 # stop it.
166 return
Guido van Rossum41f69f42015-11-19 13:28:47 -0800167 fut._loop.stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100168
169
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170class Server(events.AbstractServer):
171
172 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200173 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200175 self._active_count = 0
176 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177
Victor Stinnere912e652014-07-12 03:11:53 +0200178 def __repr__(self):
179 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
180
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200181 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200183 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200185 def _detach(self):
186 assert self._active_count > 0
187 self._active_count -= 1
188 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 self._wakeup()
190
191 def close(self):
192 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200193 if sockets is None:
194 return
195 self.sockets = None
196 for sock in sockets:
197 self._loop._stop_serving(sock)
198 if self._active_count == 0:
199 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200
201 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200202 waiters = self._waiters
203 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 for waiter in waiters:
205 if not waiter.done():
206 waiter.set_result(waiter)
207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200210 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200212 waiter = futures.Future(loop=self._loop)
213 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 yield from waiter
215
216
217class BaseEventLoop(events.AbstractEventLoop):
218
219 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400220 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200221 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800222 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 self._ready = collections.deque()
224 self._scheduled = []
225 self._default_executor = None
226 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100227 # Identifier of the thread running the event loop, or None if the
228 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100229 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100230 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500231 self._exception_handler = None
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400232 self.set_debug((not sys.flags.ignore_environment
233 and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200234 # In debug mode, if the execution of a callback or a step of a task
235 # exceed this duration in seconds, the slow callback/task is logged.
236 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100237 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400238 self._task_factory = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400239 self._coroutine_wrapper_set = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200241 def __repr__(self):
242 return ('<%s running=%s closed=%s debug=%s>'
243 % (self.__class__.__name__, self.is_running(),
244 self.is_closed(), self.get_debug()))
245
Victor Stinner896a25a2014-07-08 11:29:25 +0200246 def create_task(self, coro):
247 """Schedule a coroutine object.
248
Victor Stinneracdb7822014-07-14 18:33:40 +0200249 Return a task object.
250 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100251 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400252 if self._task_factory is None:
253 task = tasks.Task(coro, loop=self)
254 if task._source_traceback:
255 del task._source_traceback[-1]
256 else:
257 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200258 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200259
Yury Selivanov740169c2015-05-11 14:23:38 -0400260 def set_task_factory(self, factory):
261 """Set a task factory that will be used by loop.create_task().
262
263 If factory is None the default task factory will be set.
264
265 If factory is a callable, it should have a signature matching
266 '(loop, coro)', where 'loop' will be a reference to the active
267 event loop, 'coro' will be a coroutine object. The callable
268 must return a Future.
269 """
270 if factory is not None and not callable(factory):
271 raise TypeError('task factory must be a callable or None')
272 self._task_factory = factory
273
274 def get_task_factory(self):
275 """Return a task factory, or None if the default one is in use."""
276 return self._task_factory
277
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 def _make_socket_transport(self, sock, protocol, waiter=None, *,
279 extra=None, server=None):
280 """Create socket transport."""
281 raise NotImplementedError
282
Victor Stinner15cc6782015-01-09 00:09:10 +0100283 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
284 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 extra=None, server=None):
286 """Create SSL transport."""
287 raise NotImplementedError
288
289 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200290 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 """Create datagram transport."""
292 raise NotImplementedError
293
294 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
295 extra=None):
296 """Create read pipe transport."""
297 raise NotImplementedError
298
299 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
300 extra=None):
301 """Create write pipe transport."""
302 raise NotImplementedError
303
Victor Stinnerf951d282014-06-29 00:46:45 +0200304 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 def _make_subprocess_transport(self, protocol, args, shell,
306 stdin, stdout, stderr, bufsize,
307 extra=None, **kwargs):
308 """Create subprocess transport."""
309 raise NotImplementedError
310
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200312 """Write a byte to self-pipe, to wake up the event loop.
313
314 This may be called from a different thread.
315
316 The subclass is responsible for implementing the self-pipe.
317 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 raise NotImplementedError
319
320 def _process_events(self, event_list):
321 """Process selector events."""
322 raise NotImplementedError
323
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200324 def _check_closed(self):
325 if self._closed:
326 raise RuntimeError('Event loop is closed')
327
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 def run_forever(self):
329 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200330 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100331 if self.is_running():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 raise RuntimeError('Event loop is running.')
Yury Selivanove8944cb2015-05-12 11:43:04 -0400333 self._set_coroutine_wrapper(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100334 self._thread_id = threading.get_ident()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 try:
336 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800337 self._run_once()
338 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 break
340 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800341 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100342 self._thread_id = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400343 self._set_coroutine_wrapper(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344
345 def run_until_complete(self, future):
346 """Run until the Future is done.
347
348 If the argument is a coroutine, it is wrapped in a Task.
349
Victor Stinneracdb7822014-07-14 18:33:40 +0200350 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 with the same coroutine twice -- it would wrap it in two
352 different Tasks and that can't be good.
353
354 Return the Future's result, or raise its exception.
355 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200356 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200357
358 new_task = not isinstance(future, futures.Future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400359 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200360 if new_task:
361 # An exception is raised if the future didn't complete, so there
362 # is no need to log the "destroy pending task" message
363 future._log_destroy_pending = False
364
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100365 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200366 try:
367 self.run_forever()
368 except:
369 if new_task and future.done() and not future.cancelled():
370 # The coroutine raised a BaseException. Consume the exception
371 # to not log a warning, the caller doesn't have access to the
372 # local task.
373 future.exception()
374 raise
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100375 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 if not future.done():
377 raise RuntimeError('Event loop stopped before Future completed.')
378
379 return future.result()
380
381 def stop(self):
382 """Stop running the event loop.
383
Guido van Rossum41f69f42015-11-19 13:28:47 -0800384 Every callback already scheduled will still run. This simply informs
385 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800387 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200389 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700390 """Close the event loop.
391
392 This clears the queues and shuts down the executor,
393 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200394
395 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700396 """
Victor Stinner956de692014-12-26 21:07:52 +0100397 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200398 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200399 if self._closed:
400 return
Victor Stinnere912e652014-07-12 03:11:53 +0200401 if self._debug:
402 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400403 self._closed = True
404 self._ready.clear()
405 self._scheduled.clear()
406 executor = self._default_executor
407 if executor is not None:
408 self._default_executor = None
409 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200410
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200411 def is_closed(self):
412 """Returns True if the event loop was closed."""
413 return self._closed
414
Victor Stinner978a9af2015-01-29 17:50:58 +0100415 # On Python 3.3 and older, objects with a destructor part of a reference
416 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
417 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400418 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100419 def __del__(self):
420 if not self.is_closed():
421 warnings.warn("unclosed event loop %r" % self, ResourceWarning)
422 if not self.is_running():
423 self.close()
424
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200426 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100427 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428
429 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200430 """Return the time according to the event loop's clock.
431
432 This is a float expressed in seconds since an epoch, but the
433 epoch, precision, accuracy and drift are unspecified and may
434 differ per event loop.
435 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 return time.monotonic()
437
438 def call_later(self, delay, callback, *args):
439 """Arrange for a callback to be called at a given time.
440
441 Return a Handle: an opaque object with a cancel() method that
442 can be used to cancel the call.
443
444 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200445 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
447 Each callback will be called exactly once. If two callbacks
448 are scheduled for exactly the same time, it undefined which
449 will be called first.
450
451 Any positional arguments after the callback will be passed to
452 the callback when it is called.
453 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200454 timer = self.call_at(self.time() + delay, callback, *args)
455 if timer._source_traceback:
456 del timer._source_traceback[-1]
457 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458
459 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200460 """Like call_later(), but uses an absolute time.
461
462 Absolute time corresponds to the event loop's time() method.
463 """
Victor Stinner2d99d932014-11-20 15:03:52 +0100464 if (coroutines.iscoroutine(callback)
465 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100466 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100467 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100468 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100469 self._check_thread()
Yury Selivanov569efa22014-02-18 18:02:19 -0500470 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200471 if timer._source_traceback:
472 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400474 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 return timer
476
477 def call_soon(self, callback, *args):
478 """Arrange for a callback to be called as soon as possible.
479
Victor Stinneracdb7822014-07-14 18:33:40 +0200480 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 order in which they are registered. Each callback will be
482 called exactly once.
483
484 Any positional arguments after the callback will be passed to
485 the callback when it is called.
486 """
Victor Stinner956de692014-12-26 21:07:52 +0100487 if self._debug:
488 self._check_thread()
489 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200490 if handle._source_traceback:
491 del handle._source_traceback[-1]
492 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100493
Victor Stinner956de692014-12-26 21:07:52 +0100494 def _call_soon(self, callback, args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100495 if (coroutines.iscoroutine(callback)
496 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100497 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100498 self._check_closed()
Yury Selivanov569efa22014-02-18 18:02:19 -0500499 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200500 if handle._source_traceback:
501 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502 self._ready.append(handle)
503 return handle
504
Victor Stinner956de692014-12-26 21:07:52 +0100505 def _check_thread(self):
506 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100507
Victor Stinneracdb7822014-07-14 18:33:40 +0200508 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100509 likely behave incorrectly when the assumption is violated.
510
Victor Stinneracdb7822014-07-14 18:33:40 +0200511 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100512 responsible for checking this condition for performance reasons.
513 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100514 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200515 return
Victor Stinner956de692014-12-26 21:07:52 +0100516 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100517 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100518 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200519 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100520 "than the current one")
521
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700522 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200523 """Like call_soon(), but thread-safe."""
Victor Stinner956de692014-12-26 21:07:52 +0100524 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200525 if handle._source_traceback:
526 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 self._write_to_self()
528 return handle
529
Yury Selivanov740169c2015-05-11 14:23:38 -0400530 def run_in_executor(self, executor, func, *args):
531 if (coroutines.iscoroutine(func)
532 or coroutines.iscoroutinefunction(func)):
Victor Stinner2d99d932014-11-20 15:03:52 +0100533 raise TypeError("coroutines cannot be used with run_in_executor()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100534 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400535 if isinstance(func, events.Handle):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 assert not args
Yury Selivanov740169c2015-05-11 14:23:38 -0400537 assert not isinstance(func, events.TimerHandle)
538 if func._cancelled:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 f = futures.Future(loop=self)
540 f.set_result(None)
541 return f
Yury Selivanov740169c2015-05-11 14:23:38 -0400542 func, args = func._callback, func._args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 if executor is None:
544 executor = self._default_executor
545 if executor is None:
546 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
547 self._default_executor = executor
Yury Selivanov740169c2015-05-11 14:23:38 -0400548 return futures.wrap_future(executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549
550 def set_default_executor(self, executor):
551 self._default_executor = executor
552
Victor Stinnere912e652014-07-12 03:11:53 +0200553 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
554 msg = ["%s:%r" % (host, port)]
555 if family:
556 msg.append('family=%r' % family)
557 if type:
558 msg.append('type=%r' % type)
559 if proto:
560 msg.append('proto=%r' % proto)
561 if flags:
562 msg.append('flags=%r' % flags)
563 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200564 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200565
566 t0 = self.time()
567 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
568 dt = self.time() - t0
569
Victor Stinneracdb7822014-07-14 18:33:40 +0200570 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200571 % (msg, dt * 1e3, addrinfo))
572 if dt >= self.slow_callback_duration:
573 logger.info(msg)
574 else:
575 logger.debug(msg)
576 return addrinfo
577
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 def getaddrinfo(self, host, port, *,
579 family=0, type=0, proto=0, flags=0):
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500580 info = _ipaddr_info(host, port, family, type, proto)
581 if info is not None:
582 fut = futures.Future(loop=self)
583 fut.set_result([info])
584 return fut
585 elif self._debug:
Victor Stinnere912e652014-07-12 03:11:53 +0200586 return self.run_in_executor(None, self._getaddrinfo_debug,
587 host, port, family, type, proto, flags)
588 else:
589 return self.run_in_executor(None, socket.getaddrinfo,
590 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591
592 def getnameinfo(self, sockaddr, flags=0):
593 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
594
Victor Stinnerf951d282014-06-29 00:46:45 +0200595 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 def create_connection(self, protocol_factory, host=None, port=None, *,
597 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700598 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200599 """Connect to a TCP server.
600
601 Create a streaming transport connection to a given Internet host and
602 port: socket family AF_INET or socket.AF_INET6 depending on host (or
603 family if specified), socket type SOCK_STREAM. protocol_factory must be
604 a callable returning a protocol instance.
605
606 This method is a coroutine which will try to establish the connection
607 in the background. When successful, the coroutine returns a
608 (transport, protocol) pair.
609 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700610 if server_hostname is not None and not ssl:
611 raise ValueError('server_hostname is only meaningful with ssl')
612
613 if server_hostname is None and ssl:
614 # Use host as default for server_hostname. It is an error
615 # if host is empty or not set, e.g. when an
616 # already-connected socket was passed or when only a port
617 # is given. To avoid this error, you can pass
618 # server_hostname='' -- this will bypass the hostname
619 # check. (This also means that if host is a numeric
620 # IP/IPv6 address, we will attempt to verify that exact
621 # address; this will probably fail, but it is possible to
622 # create a certificate for a specific IP address, so we
623 # don't judge it here.)
624 if not host:
625 raise ValueError('You must set server_hostname '
626 'when using ssl without a host')
627 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700628
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 if host is not None or port is not None:
630 if sock is not None:
631 raise ValueError(
632 'host/port and sock can not be specified at the same time')
633
634 f1 = self.getaddrinfo(
635 host, port, family=family,
636 type=socket.SOCK_STREAM, proto=proto, flags=flags)
637 fs = [f1]
638 if local_addr is not None:
639 f2 = self.getaddrinfo(
640 *local_addr, family=family,
641 type=socket.SOCK_STREAM, proto=proto, flags=flags)
642 fs.append(f2)
643 else:
644 f2 = None
645
646 yield from tasks.wait(fs, loop=self)
647
648 infos = f1.result()
649 if not infos:
650 raise OSError('getaddrinfo() returned empty list')
651 if f2 is not None:
652 laddr_infos = f2.result()
653 if not laddr_infos:
654 raise OSError('getaddrinfo() returned empty list')
655
656 exceptions = []
657 for family, type, proto, cname, address in infos:
658 try:
659 sock = socket.socket(family=family, type=type, proto=proto)
660 sock.setblocking(False)
661 if f2 is not None:
662 for _, _, _, _, laddr in laddr_infos:
663 try:
664 sock.bind(laddr)
665 break
666 except OSError as exc:
667 exc = OSError(
668 exc.errno, 'error while '
669 'attempting to bind on address '
670 '{!r}: {}'.format(
671 laddr, exc.strerror.lower()))
672 exceptions.append(exc)
673 else:
674 sock.close()
675 sock = None
676 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200677 if self._debug:
678 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 yield from self.sock_connect(sock, address)
680 except OSError as exc:
681 if sock is not None:
682 sock.close()
683 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200684 except:
685 if sock is not None:
686 sock.close()
687 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 else:
689 break
690 else:
691 if len(exceptions) == 1:
692 raise exceptions[0]
693 else:
694 # If they all have the same str(), raise one.
695 model = str(exceptions[0])
696 if all(str(exc) == model for exc in exceptions):
697 raise exceptions[0]
698 # Raise a combined exception so the user can see all
699 # the various error messages.
700 raise OSError('Multiple exceptions: {}'.format(
701 ', '.join(str(exc) for exc in exceptions)))
702
703 elif sock is None:
704 raise ValueError(
705 'host and port was not specified and no sock specified')
706
707 sock.setblocking(False)
708
Yury Selivanovb057c522014-02-18 12:15:06 -0500709 transport, protocol = yield from self._create_connection_transport(
710 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200711 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200712 # Get the socket from the transport because SSL transport closes
713 # the old socket and creates a new SSL socket
714 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200715 logger.debug("%r connected to %s:%r: (%r, %r)",
716 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500717 return transport, protocol
718
Victor Stinnerf951d282014-06-29 00:46:45 +0200719 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500720 def _create_connection_transport(self, sock, protocol_factory, ssl,
721 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722 protocol = protocol_factory()
723 waiter = futures.Future(loop=self)
724 if ssl:
725 sslcontext = None if isinstance(ssl, bool) else ssl
726 transport = self._make_ssl_transport(
727 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700728 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 else:
730 transport = self._make_socket_transport(sock, protocol, waiter)
731
Victor Stinner29ad0112015-01-15 00:04:21 +0100732 try:
733 yield from waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100734 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100735 transport.close()
736 raise
737
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 return transport, protocol
739
Victor Stinnerf951d282014-06-29 00:46:45 +0200740 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700741 def create_datagram_endpoint(self, protocol_factory,
742 local_addr=None, remote_addr=None, *,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700743 family=0, proto=0, flags=0,
744 reuse_address=None, reuse_port=None,
745 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700747 if sock is not None:
748 if (local_addr or remote_addr or
749 family or proto or flags or
750 reuse_address or reuse_port or allow_broadcast):
751 # show the problematic kwargs in exception msg
752 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
753 family=family, proto=proto, flags=flags,
754 reuse_address=reuse_address, reuse_port=reuse_port,
755 allow_broadcast=allow_broadcast)
756 problems = ', '.join(
757 '{}={}'.format(k, v) for k, v in opts.items() if v)
758 raise ValueError(
759 'socket modifier keyword arguments can not be used '
760 'when sock is specified. ({})'.format(problems))
761 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700762 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700763 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700764 if not (local_addr or remote_addr):
765 if family == 0:
766 raise ValueError('unexpected address family')
767 addr_pairs_info = (((family, proto), (None, None)),)
768 else:
769 # join address by (family, protocol)
770 addr_infos = collections.OrderedDict()
771 for idx, addr in ((0, local_addr), (1, remote_addr)):
772 if addr is not None:
773 assert isinstance(addr, tuple) and len(addr) == 2, (
774 '2-tuple is expected')
775
776 infos = yield from self.getaddrinfo(
777 *addr, family=family, type=socket.SOCK_DGRAM,
778 proto=proto, flags=flags)
779 if not infos:
780 raise OSError('getaddrinfo() returned empty list')
781
782 for fam, _, pro, _, address in infos:
783 key = (fam, pro)
784 if key not in addr_infos:
785 addr_infos[key] = [None, None]
786 addr_infos[key][idx] = address
787
788 # each addr has to have info for each (family, proto) pair
789 addr_pairs_info = [
790 (key, addr_pair) for key, addr_pair in addr_infos.items()
791 if not ((local_addr and addr_pair[0] is None) or
792 (remote_addr and addr_pair[1] is None))]
793
794 if not addr_pairs_info:
795 raise ValueError('can not get address information')
796
797 exceptions = []
798
799 if reuse_address is None:
800 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
801
802 for ((family, proto),
803 (local_address, remote_address)) in addr_pairs_info:
804 sock = None
805 r_addr = None
806 try:
807 sock = socket.socket(
808 family=family, type=socket.SOCK_DGRAM, proto=proto)
809 if reuse_address:
810 sock.setsockopt(
811 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
812 if reuse_port:
813 if not hasattr(socket, 'SO_REUSEPORT'):
814 raise ValueError(
815 'reuse_port not supported by socket module')
816 else:
817 sock.setsockopt(
818 socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
819 if allow_broadcast:
820 sock.setsockopt(
821 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
822 sock.setblocking(False)
823
824 if local_addr:
825 sock.bind(local_address)
826 if remote_addr:
827 yield from self.sock_connect(sock, remote_address)
828 r_addr = remote_address
829 except OSError as exc:
830 if sock is not None:
831 sock.close()
832 exceptions.append(exc)
833 except:
834 if sock is not None:
835 sock.close()
836 raise
837 else:
838 break
839 else:
840 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700841
842 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200843 waiter = futures.Future(loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700844 transport = self._make_datagram_transport(
845 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200846 if self._debug:
847 if local_addr:
848 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
849 "created: (%r, %r)",
850 local_addr, remote_addr, transport, protocol)
851 else:
852 logger.debug("Datagram endpoint remote_addr=%r created: "
853 "(%r, %r)",
854 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100855
856 try:
857 yield from waiter
858 except:
859 transport.close()
860 raise
861
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700862 return transport, protocol
863
Victor Stinnerf951d282014-06-29 00:46:45 +0200864 @coroutine
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200865 def _create_server_getaddrinfo(self, host, port, family, flags):
866 infos = yield from self.getaddrinfo(host, port, family=family,
867 type=socket.SOCK_STREAM,
868 flags=flags)
869 if not infos:
870 raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
871 return infos
872
873 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700874 def create_server(self, protocol_factory, host=None, port=None,
875 *,
876 family=socket.AF_UNSPEC,
877 flags=socket.AI_PASSIVE,
878 sock=None,
879 backlog=100,
880 ssl=None,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700881 reuse_address=None,
882 reuse_port=None):
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200883 """Create a TCP server.
884
885 The host parameter can be a string, in that case the TCP server is bound
886 to host and port.
887
888 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -0500889 the TCP server is bound to all hosts of the sequence. If a host
890 appears multiple times (possibly indirectly e.g. when hostnames
891 resolve to the same IP address), the server is only bound once to that
892 host.
Victor Stinnerd1432092014-06-19 17:11:49 +0200893
Victor Stinneracdb7822014-07-14 18:33:40 +0200894 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200895
896 This method is a coroutine.
897 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700898 if isinstance(ssl, bool):
899 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700900 if host is not None or port is not None:
901 if sock is not None:
902 raise ValueError(
903 'host/port and sock can not be specified at the same time')
904
905 AF_INET6 = getattr(socket, 'AF_INET6', 0)
906 if reuse_address is None:
907 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
908 sockets = []
909 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200910 hosts = [None]
911 elif (isinstance(host, str) or
912 not isinstance(host, collections.Iterable)):
913 hosts = [host]
914 else:
915 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700916
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200917 fs = [self._create_server_getaddrinfo(host, port, family=family,
918 flags=flags)
919 for host in hosts]
920 infos = yield from tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -0500921 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700922
923 completed = False
924 try:
925 for res in infos:
926 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700927 try:
928 sock = socket.socket(af, socktype, proto)
929 except socket.error:
930 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200931 if self._debug:
932 logger.warning('create_server() failed to create '
933 'socket.socket(%r, %r, %r)',
934 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700935 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700936 sockets.append(sock)
937 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700938 sock.setsockopt(
939 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
940 if reuse_port:
941 if not hasattr(socket, 'SO_REUSEPORT'):
942 raise ValueError(
943 'reuse_port not supported by socket module')
944 else:
945 sock.setsockopt(
946 socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700947 # Disable IPv4/IPv6 dual stack support (enabled by
948 # default on Linux) which makes a single socket
949 # listen on both address families.
950 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
951 sock.setsockopt(socket.IPPROTO_IPV6,
952 socket.IPV6_V6ONLY,
953 True)
954 try:
955 sock.bind(sa)
956 except OSError as err:
957 raise OSError(err.errno, 'error while attempting '
958 'to bind on address %r: %s'
959 % (sa, err.strerror.lower()))
960 completed = True
961 finally:
962 if not completed:
963 for sock in sockets:
964 sock.close()
965 else:
966 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200967 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700968 sockets = [sock]
969
970 server = Server(self, sockets)
971 for sock in sockets:
972 sock.listen(backlog)
973 sock.setblocking(False)
974 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200975 if self._debug:
976 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700977 return server
978
Victor Stinnerf951d282014-06-29 00:46:45 +0200979 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700980 def connect_read_pipe(self, protocol_factory, pipe):
981 protocol = protocol_factory()
982 waiter = futures.Future(loop=self)
983 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100984
985 try:
986 yield from waiter
987 except:
988 transport.close()
989 raise
990
Victor Stinneracdb7822014-07-14 18:33:40 +0200991 if self._debug:
992 logger.debug('Read pipe %r connected: (%r, %r)',
993 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700994 return transport, protocol
995
Victor Stinnerf951d282014-06-29 00:46:45 +0200996 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700997 def connect_write_pipe(self, protocol_factory, pipe):
998 protocol = protocol_factory()
999 waiter = futures.Future(loop=self)
1000 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001001
1002 try:
1003 yield from waiter
1004 except:
1005 transport.close()
1006 raise
1007
Victor Stinneracdb7822014-07-14 18:33:40 +02001008 if self._debug:
1009 logger.debug('Write pipe %r connected: (%r, %r)',
1010 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001011 return transport, protocol
1012
Victor Stinneracdb7822014-07-14 18:33:40 +02001013 def _log_subprocess(self, msg, stdin, stdout, stderr):
1014 info = [msg]
1015 if stdin is not None:
1016 info.append('stdin=%s' % _format_pipe(stdin))
1017 if stdout is not None and stderr == subprocess.STDOUT:
1018 info.append('stdout=stderr=%s' % _format_pipe(stdout))
1019 else:
1020 if stdout is not None:
1021 info.append('stdout=%s' % _format_pipe(stdout))
1022 if stderr is not None:
1023 info.append('stderr=%s' % _format_pipe(stderr))
1024 logger.debug(' '.join(info))
1025
Victor Stinnerf951d282014-06-29 00:46:45 +02001026 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001027 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
1028 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1029 universal_newlines=False, shell=True, bufsize=0,
1030 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001031 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001032 raise ValueError("cmd must be a string")
1033 if universal_newlines:
1034 raise ValueError("universal_newlines must be False")
1035 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001036 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001037 if bufsize != 0:
1038 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001039 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001040 if self._debug:
1041 # don't log parameters: they may contain sensitive information
1042 # (password) and may be too long
1043 debug_log = 'run shell command %r' % cmd
1044 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001045 transport = yield from self._make_subprocess_transport(
1046 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001047 if self._debug:
1048 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001049 return transport, protocol
1050
Victor Stinnerf951d282014-06-29 00:46:45 +02001051 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -05001052 def subprocess_exec(self, protocol_factory, program, *args,
1053 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1054 stderr=subprocess.PIPE, universal_newlines=False,
1055 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001056 if universal_newlines:
1057 raise ValueError("universal_newlines must be False")
1058 if shell:
1059 raise ValueError("shell must be False")
1060 if bufsize != 0:
1061 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001062 popen_args = (program,) + args
1063 for arg in popen_args:
1064 if not isinstance(arg, (str, bytes)):
1065 raise TypeError("program arguments must be "
1066 "a bytes or text string, not %s"
1067 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001068 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001069 if self._debug:
1070 # don't log parameters: they may contain sensitive information
1071 # (password) and may be too long
1072 debug_log = 'execute program %r' % program
1073 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001074 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001075 protocol, popen_args, False, stdin, stdout, stderr,
1076 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001077 if self._debug:
1078 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001079 return transport, protocol
1080
Yury Selivanov569efa22014-02-18 18:02:19 -05001081 def set_exception_handler(self, handler):
1082 """Set handler as the new event loop exception handler.
1083
1084 If handler is None, the default exception handler will
1085 be set.
1086
1087 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001088 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001089 will be a reference to the active event loop, 'context'
1090 will be a dict object (see `call_exception_handler()`
1091 documentation for details about context).
1092 """
1093 if handler is not None and not callable(handler):
1094 raise TypeError('A callable object or None is expected, '
1095 'got {!r}'.format(handler))
1096 self._exception_handler = handler
1097
1098 def default_exception_handler(self, context):
1099 """Default exception handler.
1100
1101 This is called when an exception occurs and no exception
1102 handler is set, and can be called by a custom exception
1103 handler that wants to defer to the default behavior.
1104
Victor Stinneracdb7822014-07-14 18:33:40 +02001105 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001106 `call_exception_handler()`.
1107 """
1108 message = context.get('message')
1109 if not message:
1110 message = 'Unhandled exception in event loop'
1111
1112 exception = context.get('exception')
1113 if exception is not None:
1114 exc_info = (type(exception), exception, exception.__traceback__)
1115 else:
1116 exc_info = False
1117
Victor Stinnerff018e42015-01-28 00:30:40 +01001118 if ('source_traceback' not in context
1119 and self._current_handle is not None
Victor Stinner9b524d52015-01-26 11:05:12 +01001120 and self._current_handle._source_traceback):
1121 context['handle_traceback'] = self._current_handle._source_traceback
1122
Yury Selivanov569efa22014-02-18 18:02:19 -05001123 log_lines = [message]
1124 for key in sorted(context):
1125 if key in {'message', 'exception'}:
1126 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001127 value = context[key]
1128 if key == 'source_traceback':
1129 tb = ''.join(traceback.format_list(value))
1130 value = 'Object created at (most recent call last):\n'
1131 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001132 elif key == 'handle_traceback':
1133 tb = ''.join(traceback.format_list(value))
1134 value = 'Handle created at (most recent call last):\n'
1135 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001136 else:
1137 value = repr(value)
1138 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -05001139
1140 logger.error('\n'.join(log_lines), exc_info=exc_info)
1141
1142 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001143 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001144
Victor Stinneracdb7822014-07-14 18:33:40 +02001145 The context argument is a dict containing the following keys:
1146
Yury Selivanov569efa22014-02-18 18:02:19 -05001147 - 'message': Error message;
1148 - 'exception' (optional): Exception object;
1149 - 'future' (optional): Future instance;
1150 - 'handle' (optional): Handle instance;
1151 - 'protocol' (optional): Protocol instance;
1152 - 'transport' (optional): Transport instance;
1153 - 'socket' (optional): Socket instance.
1154
Victor Stinneracdb7822014-07-14 18:33:40 +02001155 New keys maybe introduced in the future.
1156
1157 Note: do not overload this method in an event loop subclass.
1158 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001159 `set_exception_handler()` method.
1160 """
1161 if self._exception_handler is None:
1162 try:
1163 self.default_exception_handler(context)
1164 except Exception:
1165 # Second protection layer for unexpected errors
1166 # in the default implementation, as well as for subclassed
1167 # event loops with overloaded "default_exception_handler".
1168 logger.error('Exception in default exception handler',
1169 exc_info=True)
1170 else:
1171 try:
1172 self._exception_handler(self, context)
1173 except Exception as exc:
1174 # Exception in the user set custom exception handler.
1175 try:
1176 # Let's try default handler.
1177 self.default_exception_handler({
1178 'message': 'Unhandled error in exception handler',
1179 'exception': exc,
1180 'context': context,
1181 })
1182 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001183 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001184 # overloaded.
1185 logger.error('Exception in default exception handler '
1186 'while handling an unexpected error '
1187 'in custom exception handler',
1188 exc_info=True)
1189
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001190 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001191 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001192 assert isinstance(handle, events.Handle), 'A Handle is required here'
1193 if handle._cancelled:
1194 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001195 assert not isinstance(handle, events.TimerHandle)
1196 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001197
1198 def _add_callback_signalsafe(self, handle):
1199 """Like _add_callback() but called from a signal handler."""
1200 self._add_callback(handle)
1201 self._write_to_self()
1202
Yury Selivanov592ada92014-09-25 12:07:56 -04001203 def _timer_handle_cancelled(self, handle):
1204 """Notification that a TimerHandle has been cancelled."""
1205 if handle._scheduled:
1206 self._timer_cancelled_count += 1
1207
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001208 def _run_once(self):
1209 """Run one full iteration of the event loop.
1210
1211 This calls all currently ready callbacks, polls for I/O,
1212 schedules the resulting callbacks, and finally schedules
1213 'call_later' callbacks.
1214 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001215
Yury Selivanov592ada92014-09-25 12:07:56 -04001216 sched_count = len(self._scheduled)
1217 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1218 self._timer_cancelled_count / sched_count >
1219 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001220 # Remove delayed calls that were cancelled if their number
1221 # is too high
1222 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001223 for handle in self._scheduled:
1224 if handle._cancelled:
1225 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001226 else:
1227 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001228
Victor Stinner68da8fc2014-09-30 18:08:36 +02001229 heapq.heapify(new_scheduled)
1230 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001231 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001232 else:
1233 # Remove delayed calls that were cancelled from head of queue.
1234 while self._scheduled and self._scheduled[0]._cancelled:
1235 self._timer_cancelled_count -= 1
1236 handle = heapq.heappop(self._scheduled)
1237 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001238
1239 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001240 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001241 timeout = 0
1242 elif self._scheduled:
1243 # Compute the desired timeout.
1244 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001245 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001246
Victor Stinner770e48d2014-07-11 11:58:33 +02001247 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001248 t0 = self.time()
1249 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001250 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001251 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001252 level = logging.INFO
1253 else:
1254 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001255 nevent = len(event_list)
1256 if timeout is None:
1257 logger.log(level, 'poll took %.3f ms: %s events',
1258 dt * 1e3, nevent)
1259 elif nevent:
1260 logger.log(level,
1261 'poll %.3f ms took %.3f ms: %s events',
1262 timeout * 1e3, dt * 1e3, nevent)
1263 elif dt >= 1.0:
1264 logger.log(level,
1265 'poll %.3f ms took %.3f ms: timeout',
1266 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001267 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001268 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001269 self._process_events(event_list)
1270
1271 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001272 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001273 while self._scheduled:
1274 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001275 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001276 break
1277 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001278 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001279 self._ready.append(handle)
1280
1281 # This is the only place where callbacks are actually *called*.
1282 # All other places just add them to ready.
1283 # Note: We run all currently scheduled callbacks, but not any
1284 # callbacks scheduled by callbacks run this time around --
1285 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001286 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001287 ntodo = len(self._ready)
1288 for i in range(ntodo):
1289 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001290 if handle._cancelled:
1291 continue
1292 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001293 try:
1294 self._current_handle = handle
1295 t0 = self.time()
1296 handle._run()
1297 dt = self.time() - t0
1298 if dt >= self.slow_callback_duration:
1299 logger.warning('Executing %s took %.3f seconds',
1300 _format_handle(handle), dt)
1301 finally:
1302 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001303 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001304 handle._run()
1305 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001306
Yury Selivanove8944cb2015-05-12 11:43:04 -04001307 def _set_coroutine_wrapper(self, enabled):
1308 try:
1309 set_wrapper = sys.set_coroutine_wrapper
1310 get_wrapper = sys.get_coroutine_wrapper
1311 except AttributeError:
1312 return
1313
1314 enabled = bool(enabled)
Yury Selivanov996083d2015-08-04 15:37:24 -04001315 if self._coroutine_wrapper_set == enabled:
Yury Selivanove8944cb2015-05-12 11:43:04 -04001316 return
1317
1318 wrapper = coroutines.debug_wrapper
1319 current_wrapper = get_wrapper()
1320
1321 if enabled:
1322 if current_wrapper not in (None, wrapper):
1323 warnings.warn(
1324 "loop.set_debug(True): cannot set debug coroutine "
1325 "wrapper; another wrapper is already set %r" %
1326 current_wrapper, RuntimeWarning)
1327 else:
1328 set_wrapper(wrapper)
1329 self._coroutine_wrapper_set = True
1330 else:
1331 if current_wrapper not in (None, wrapper):
1332 warnings.warn(
1333 "loop.set_debug(False): cannot unset debug coroutine "
1334 "wrapper; another wrapper was set %r" %
1335 current_wrapper, RuntimeWarning)
1336 else:
1337 set_wrapper(None)
1338 self._coroutine_wrapper_set = False
1339
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001340 def get_debug(self):
1341 return self._debug
1342
1343 def set_debug(self, enabled):
1344 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001345
Yury Selivanove8944cb2015-05-12 11:43:04 -04001346 if self.is_running():
1347 self._set_coroutine_wrapper(enabled)