blob: c20544545b5bbf1b1313ef3899a95b56a7916955 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
Yury Selivanov2a8911c2015-08-04 15:56:33 -040031from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020032from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033from . import events
34from . import futures
35from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020036from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070037from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39
Victor Stinner8c1a4a22015-01-06 01:03:58 +010040__all__ = ['BaseEventLoop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42
43# Argument for default thread pool executor creation.
44_MAX_WORKERS = 5
45
Yury Selivanov592ada92014-09-25 12:07:56 -040046# Minimum number of _scheduled timer handles before cleanup of
47# cancelled handles is performed.
48_MIN_SCHEDULED_TIMER_HANDLES = 100
49
50# Minimum fraction of _scheduled timer handles that are cancelled
51# before cleanup of cancelled handles is performed.
52_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
Victor Stinner0e6f52a2014-06-20 17:34:15 +020054def _format_handle(handle):
55 cb = handle._callback
56 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
57 # format the task
58 return repr(cb.__self__)
59 else:
60 return str(handle)
61
62
Victor Stinneracdb7822014-07-14 18:33:40 +020063def _format_pipe(fd):
64 if fd == subprocess.PIPE:
65 return '<pipe>'
66 elif fd == subprocess.STDOUT:
67 return '<stdout>'
68 else:
69 return repr(fd)
70
71
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072class _StopError(BaseException):
73 """Raised to stop the event loop."""
74
75
Victor Stinner1b0580b2014-02-13 09:24:37 +010076def _check_resolved_address(sock, address):
77 # Ensure that the address is already resolved to avoid the trap of hanging
78 # the entire event loop when the address requires doing a DNS lookup.
Victor Stinner2fc23132015-02-04 14:51:23 +010079 #
80 # getaddrinfo() is slow (around 10 us per call): this function should only
81 # be called in debug mode
Victor Stinner1b0580b2014-02-13 09:24:37 +010082 family = sock.family
Victor Stinner2fc23132015-02-04 14:51:23 +010083
Victor Stinnerd1a727a2014-02-20 16:43:09 +010084 if family == socket.AF_INET:
85 host, port = address
86 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010087 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010088 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010089 return
90
Victor Stinner2fc23132015-02-04 14:51:23 +010091 # On Windows, socket.inet_pton() is only available since Python 3.4
92 if hasattr(socket, 'inet_pton'):
93 # getaddrinfo() is slow and has known issue: prefer inet_pton()
94 # if available
95 try:
96 socket.inet_pton(family, host)
97 except OSError as exc:
98 raise ValueError("address must be resolved (IP address), "
99 "got host %r: %s"
100 % (host, exc))
101 else:
102 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
103 # already resolved.
104 type_mask = 0
105 if hasattr(socket, 'SOCK_NONBLOCK'):
106 type_mask |= socket.SOCK_NONBLOCK
107 if hasattr(socket, 'SOCK_CLOEXEC'):
108 type_mask |= socket.SOCK_CLOEXEC
109 try:
110 socket.getaddrinfo(host, port,
111 family=family,
112 type=(sock.type & ~type_mask),
113 proto=sock.proto,
114 flags=socket.AI_NUMERICHOST)
115 except socket.gaierror as err:
116 raise ValueError("address must be resolved (IP address), "
117 "got host %r: %s"
118 % (host, err))
Victor Stinner1b0580b2014-02-13 09:24:37 +0100119
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120def _raise_stop_error(*args):
121 raise _StopError
122
123
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100124def _run_until_complete_cb(fut):
125 exc = fut._exception
126 if (isinstance(exc, BaseException)
127 and not isinstance(exc, Exception)):
128 # Issue #22429: run_forever() already finished, no need to
129 # stop it.
130 return
131 _raise_stop_error()
132
133
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134class Server(events.AbstractServer):
135
136 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200137 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200139 self._active_count = 0
140 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700141
Victor Stinnere912e652014-07-12 03:11:53 +0200142 def __repr__(self):
143 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
144
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200145 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200147 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200149 def _detach(self):
150 assert self._active_count > 0
151 self._active_count -= 1
152 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700153 self._wakeup()
154
155 def close(self):
156 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200157 if sockets is None:
158 return
159 self.sockets = None
160 for sock in sockets:
161 self._loop._stop_serving(sock)
162 if self._active_count == 0:
163 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164
165 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200166 waiters = self._waiters
167 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 for waiter in waiters:
169 if not waiter.done():
170 waiter.set_result(waiter)
171
Victor Stinnerf951d282014-06-29 00:46:45 +0200172 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200174 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200176 waiter = futures.Future(loop=self._loop)
177 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 yield from waiter
179
180
181class BaseEventLoop(events.AbstractEventLoop):
182
183 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400184 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200185 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 self._ready = collections.deque()
187 self._scheduled = []
188 self._default_executor = None
189 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100190 # Identifier of the thread running the event loop, or None if the
191 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100192 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100193 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500194 self._exception_handler = None
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400195 self.set_debug((not sys.flags.ignore_environment
196 and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200197 # In debug mode, if the execution of a callback or a step of a task
198 # exceed this duration in seconds, the slow callback/task is logged.
199 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100200 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400201 self._task_factory = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400202 self._coroutine_wrapper_set = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200204 def __repr__(self):
205 return ('<%s running=%s closed=%s debug=%s>'
206 % (self.__class__.__name__, self.is_running(),
207 self.is_closed(), self.get_debug()))
208
Victor Stinner896a25a2014-07-08 11:29:25 +0200209 def create_task(self, coro):
210 """Schedule a coroutine object.
211
Victor Stinneracdb7822014-07-14 18:33:40 +0200212 Return a task object.
213 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100214 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400215 if self._task_factory is None:
216 task = tasks.Task(coro, loop=self)
217 if task._source_traceback:
218 del task._source_traceback[-1]
219 else:
220 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200221 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200222
Yury Selivanov740169c2015-05-11 14:23:38 -0400223 def set_task_factory(self, factory):
224 """Set a task factory that will be used by loop.create_task().
225
226 If factory is None the default task factory will be set.
227
228 If factory is a callable, it should have a signature matching
229 '(loop, coro)', where 'loop' will be a reference to the active
230 event loop, 'coro' will be a coroutine object. The callable
231 must return a Future.
232 """
233 if factory is not None and not callable(factory):
234 raise TypeError('task factory must be a callable or None')
235 self._task_factory = factory
236
237 def get_task_factory(self):
238 """Return a task factory, or None if the default one is in use."""
239 return self._task_factory
240
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 def _make_socket_transport(self, sock, protocol, waiter=None, *,
242 extra=None, server=None):
243 """Create socket transport."""
244 raise NotImplementedError
245
Victor Stinner15cc6782015-01-09 00:09:10 +0100246 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
247 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 extra=None, server=None):
249 """Create SSL transport."""
250 raise NotImplementedError
251
252 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200253 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 """Create datagram transport."""
255 raise NotImplementedError
256
257 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
258 extra=None):
259 """Create read pipe transport."""
260 raise NotImplementedError
261
262 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
263 extra=None):
264 """Create write pipe transport."""
265 raise NotImplementedError
266
Victor Stinnerf951d282014-06-29 00:46:45 +0200267 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 def _make_subprocess_transport(self, protocol, args, shell,
269 stdin, stdout, stderr, bufsize,
270 extra=None, **kwargs):
271 """Create subprocess transport."""
272 raise NotImplementedError
273
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200275 """Write a byte to self-pipe, to wake up the event loop.
276
277 This may be called from a different thread.
278
279 The subclass is responsible for implementing the self-pipe.
280 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 raise NotImplementedError
282
283 def _process_events(self, event_list):
284 """Process selector events."""
285 raise NotImplementedError
286
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200287 def _check_closed(self):
288 if self._closed:
289 raise RuntimeError('Event loop is closed')
290
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 def run_forever(self):
292 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200293 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100294 if self.is_running():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 raise RuntimeError('Event loop is running.')
Yury Selivanove8944cb2015-05-12 11:43:04 -0400296 self._set_coroutine_wrapper(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100297 self._thread_id = threading.get_ident()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 try:
299 while True:
300 try:
301 self._run_once()
302 except _StopError:
303 break
304 finally:
Victor Stinnera87501f2015-02-05 11:45:33 +0100305 self._thread_id = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400306 self._set_coroutine_wrapper(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
308 def run_until_complete(self, future):
309 """Run until the Future is done.
310
311 If the argument is a coroutine, it is wrapped in a Task.
312
Victor Stinneracdb7822014-07-14 18:33:40 +0200313 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 with the same coroutine twice -- it would wrap it in two
315 different Tasks and that can't be good.
316
317 Return the Future's result, or raise its exception.
318 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200319 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200320
321 new_task = not isinstance(future, futures.Future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400322 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200323 if new_task:
324 # An exception is raised if the future didn't complete, so there
325 # is no need to log the "destroy pending task" message
326 future._log_destroy_pending = False
327
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100328 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200329 try:
330 self.run_forever()
331 except:
332 if new_task and future.done() and not future.cancelled():
333 # The coroutine raised a BaseException. Consume the exception
334 # to not log a warning, the caller doesn't have access to the
335 # local task.
336 future.exception()
337 raise
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100338 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 if not future.done():
340 raise RuntimeError('Event loop stopped before Future completed.')
341
342 return future.result()
343
344 def stop(self):
345 """Stop running the event loop.
346
Victor Stinner5006b1f2014-07-24 11:34:11 +0200347 Every callback scheduled before stop() is called will run. Callbacks
348 scheduled after stop() is called will not run. However, those callbacks
349 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 """
351 self.call_soon(_raise_stop_error)
352
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200353 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700354 """Close the event loop.
355
356 This clears the queues and shuts down the executor,
357 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200358
359 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700360 """
Victor Stinner956de692014-12-26 21:07:52 +0100361 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200362 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200363 if self._closed:
364 return
Victor Stinnere912e652014-07-12 03:11:53 +0200365 if self._debug:
366 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400367 self._closed = True
368 self._ready.clear()
369 self._scheduled.clear()
370 executor = self._default_executor
371 if executor is not None:
372 self._default_executor = None
373 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200374
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200375 def is_closed(self):
376 """Returns True if the event loop was closed."""
377 return self._closed
378
Victor Stinner978a9af2015-01-29 17:50:58 +0100379 # On Python 3.3 and older, objects with a destructor part of a reference
380 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
381 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400382 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100383 def __del__(self):
384 if not self.is_closed():
385 warnings.warn("unclosed event loop %r" % self, ResourceWarning)
386 if not self.is_running():
387 self.close()
388
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200390 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100391 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392
393 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200394 """Return the time according to the event loop's clock.
395
396 This is a float expressed in seconds since an epoch, but the
397 epoch, precision, accuracy and drift are unspecified and may
398 differ per event loop.
399 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 return time.monotonic()
401
402 def call_later(self, delay, callback, *args):
403 """Arrange for a callback to be called at a given time.
404
405 Return a Handle: an opaque object with a cancel() method that
406 can be used to cancel the call.
407
408 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200409 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
411 Each callback will be called exactly once. If two callbacks
412 are scheduled for exactly the same time, it undefined which
413 will be called first.
414
415 Any positional arguments after the callback will be passed to
416 the callback when it is called.
417 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200418 timer = self.call_at(self.time() + delay, callback, *args)
419 if timer._source_traceback:
420 del timer._source_traceback[-1]
421 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
423 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200424 """Like call_later(), but uses an absolute time.
425
426 Absolute time corresponds to the event loop's time() method.
427 """
Victor Stinner2d99d932014-11-20 15:03:52 +0100428 if (coroutines.iscoroutine(callback)
429 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100430 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100431 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100432 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100433 self._check_thread()
Yury Selivanov569efa22014-02-18 18:02:19 -0500434 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200435 if timer._source_traceback:
436 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400438 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 return timer
440
441 def call_soon(self, callback, *args):
442 """Arrange for a callback to be called as soon as possible.
443
Victor Stinneracdb7822014-07-14 18:33:40 +0200444 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 order in which they are registered. Each callback will be
446 called exactly once.
447
448 Any positional arguments after the callback will be passed to
449 the callback when it is called.
450 """
Victor Stinner956de692014-12-26 21:07:52 +0100451 if self._debug:
452 self._check_thread()
453 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200454 if handle._source_traceback:
455 del handle._source_traceback[-1]
456 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100457
Victor Stinner956de692014-12-26 21:07:52 +0100458 def _call_soon(self, callback, args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100459 if (coroutines.iscoroutine(callback)
460 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100461 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100462 self._check_closed()
Yury Selivanov569efa22014-02-18 18:02:19 -0500463 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200464 if handle._source_traceback:
465 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 self._ready.append(handle)
467 return handle
468
Victor Stinner956de692014-12-26 21:07:52 +0100469 def _check_thread(self):
470 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100471
Victor Stinneracdb7822014-07-14 18:33:40 +0200472 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100473 likely behave incorrectly when the assumption is violated.
474
Victor Stinneracdb7822014-07-14 18:33:40 +0200475 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100476 responsible for checking this condition for performance reasons.
477 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100478 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200479 return
Victor Stinner956de692014-12-26 21:07:52 +0100480 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100481 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100482 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200483 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100484 "than the current one")
485
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200487 """Like call_soon(), but thread-safe."""
Victor Stinner956de692014-12-26 21:07:52 +0100488 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200489 if handle._source_traceback:
490 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 self._write_to_self()
492 return handle
493
Yury Selivanov740169c2015-05-11 14:23:38 -0400494 def run_in_executor(self, executor, func, *args):
495 if (coroutines.iscoroutine(func)
496 or coroutines.iscoroutinefunction(func)):
Victor Stinner2d99d932014-11-20 15:03:52 +0100497 raise TypeError("coroutines cannot be used with run_in_executor()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100498 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400499 if isinstance(func, events.Handle):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 assert not args
Yury Selivanov740169c2015-05-11 14:23:38 -0400501 assert not isinstance(func, events.TimerHandle)
502 if func._cancelled:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 f = futures.Future(loop=self)
504 f.set_result(None)
505 return f
Yury Selivanov740169c2015-05-11 14:23:38 -0400506 func, args = func._callback, func._args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700507 if executor is None:
508 executor = self._default_executor
509 if executor is None:
510 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
511 self._default_executor = executor
Yury Selivanov740169c2015-05-11 14:23:38 -0400512 return futures.wrap_future(executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513
514 def set_default_executor(self, executor):
515 self._default_executor = executor
516
Victor Stinnere912e652014-07-12 03:11:53 +0200517 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
518 msg = ["%s:%r" % (host, port)]
519 if family:
520 msg.append('family=%r' % family)
521 if type:
522 msg.append('type=%r' % type)
523 if proto:
524 msg.append('proto=%r' % proto)
525 if flags:
526 msg.append('flags=%r' % flags)
527 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200528 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200529
530 t0 = self.time()
531 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
532 dt = self.time() - t0
533
Victor Stinneracdb7822014-07-14 18:33:40 +0200534 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200535 % (msg, dt * 1e3, addrinfo))
536 if dt >= self.slow_callback_duration:
537 logger.info(msg)
538 else:
539 logger.debug(msg)
540 return addrinfo
541
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 def getaddrinfo(self, host, port, *,
543 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200544 if self._debug:
545 return self.run_in_executor(None, self._getaddrinfo_debug,
546 host, port, family, type, proto, flags)
547 else:
548 return self.run_in_executor(None, socket.getaddrinfo,
549 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551 def getnameinfo(self, sockaddr, flags=0):
552 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
553
Victor Stinnerf951d282014-06-29 00:46:45 +0200554 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 def create_connection(self, protocol_factory, host=None, port=None, *,
556 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700557 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200558 """Connect to a TCP server.
559
560 Create a streaming transport connection to a given Internet host and
561 port: socket family AF_INET or socket.AF_INET6 depending on host (or
562 family if specified), socket type SOCK_STREAM. protocol_factory must be
563 a callable returning a protocol instance.
564
565 This method is a coroutine which will try to establish the connection
566 in the background. When successful, the coroutine returns a
567 (transport, protocol) pair.
568 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700569 if server_hostname is not None and not ssl:
570 raise ValueError('server_hostname is only meaningful with ssl')
571
572 if server_hostname is None and ssl:
573 # Use host as default for server_hostname. It is an error
574 # if host is empty or not set, e.g. when an
575 # already-connected socket was passed or when only a port
576 # is given. To avoid this error, you can pass
577 # server_hostname='' -- this will bypass the hostname
578 # check. (This also means that if host is a numeric
579 # IP/IPv6 address, we will attempt to verify that exact
580 # address; this will probably fail, but it is possible to
581 # create a certificate for a specific IP address, so we
582 # don't judge it here.)
583 if not host:
584 raise ValueError('You must set server_hostname '
585 'when using ssl without a host')
586 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700587
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588 if host is not None or port is not None:
589 if sock is not None:
590 raise ValueError(
591 'host/port and sock can not be specified at the same time')
592
593 f1 = self.getaddrinfo(
594 host, port, family=family,
595 type=socket.SOCK_STREAM, proto=proto, flags=flags)
596 fs = [f1]
597 if local_addr is not None:
598 f2 = self.getaddrinfo(
599 *local_addr, family=family,
600 type=socket.SOCK_STREAM, proto=proto, flags=flags)
601 fs.append(f2)
602 else:
603 f2 = None
604
605 yield from tasks.wait(fs, loop=self)
606
607 infos = f1.result()
608 if not infos:
609 raise OSError('getaddrinfo() returned empty list')
610 if f2 is not None:
611 laddr_infos = f2.result()
612 if not laddr_infos:
613 raise OSError('getaddrinfo() returned empty list')
614
615 exceptions = []
616 for family, type, proto, cname, address in infos:
617 try:
618 sock = socket.socket(family=family, type=type, proto=proto)
619 sock.setblocking(False)
620 if f2 is not None:
621 for _, _, _, _, laddr in laddr_infos:
622 try:
623 sock.bind(laddr)
624 break
625 except OSError as exc:
626 exc = OSError(
627 exc.errno, 'error while '
628 'attempting to bind on address '
629 '{!r}: {}'.format(
630 laddr, exc.strerror.lower()))
631 exceptions.append(exc)
632 else:
633 sock.close()
634 sock = None
635 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200636 if self._debug:
637 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 yield from self.sock_connect(sock, address)
639 except OSError as exc:
640 if sock is not None:
641 sock.close()
642 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200643 except:
644 if sock is not None:
645 sock.close()
646 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 else:
648 break
649 else:
650 if len(exceptions) == 1:
651 raise exceptions[0]
652 else:
653 # If they all have the same str(), raise one.
654 model = str(exceptions[0])
655 if all(str(exc) == model for exc in exceptions):
656 raise exceptions[0]
657 # Raise a combined exception so the user can see all
658 # the various error messages.
659 raise OSError('Multiple exceptions: {}'.format(
660 ', '.join(str(exc) for exc in exceptions)))
661
662 elif sock is None:
663 raise ValueError(
664 'host and port was not specified and no sock specified')
665
666 sock.setblocking(False)
667
Yury Selivanovb057c522014-02-18 12:15:06 -0500668 transport, protocol = yield from self._create_connection_transport(
669 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200670 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200671 # Get the socket from the transport because SSL transport closes
672 # the old socket and creates a new SSL socket
673 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200674 logger.debug("%r connected to %s:%r: (%r, %r)",
675 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500676 return transport, protocol
677
Victor Stinnerf951d282014-06-29 00:46:45 +0200678 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500679 def _create_connection_transport(self, sock, protocol_factory, ssl,
680 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 protocol = protocol_factory()
682 waiter = futures.Future(loop=self)
683 if ssl:
684 sslcontext = None if isinstance(ssl, bool) else ssl
685 transport = self._make_ssl_transport(
686 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700687 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 else:
689 transport = self._make_socket_transport(sock, protocol, waiter)
690
Victor Stinner29ad0112015-01-15 00:04:21 +0100691 try:
692 yield from waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100693 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100694 transport.close()
695 raise
696
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700697 return transport, protocol
698
Victor Stinnerf951d282014-06-29 00:46:45 +0200699 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 def create_datagram_endpoint(self, protocol_factory,
701 local_addr=None, remote_addr=None, *,
702 family=0, proto=0, flags=0):
703 """Create datagram connection."""
704 if not (local_addr or remote_addr):
705 if family == 0:
706 raise ValueError('unexpected address family')
707 addr_pairs_info = (((family, proto), (None, None)),)
708 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200709 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 addr_infos = collections.OrderedDict()
711 for idx, addr in ((0, local_addr), (1, remote_addr)):
712 if addr is not None:
713 assert isinstance(addr, tuple) and len(addr) == 2, (
714 '2-tuple is expected')
715
716 infos = yield from self.getaddrinfo(
717 *addr, family=family, type=socket.SOCK_DGRAM,
718 proto=proto, flags=flags)
719 if not infos:
720 raise OSError('getaddrinfo() returned empty list')
721
722 for fam, _, pro, _, address in infos:
723 key = (fam, pro)
724 if key not in addr_infos:
725 addr_infos[key] = [None, None]
726 addr_infos[key][idx] = address
727
728 # each addr has to have info for each (family, proto) pair
729 addr_pairs_info = [
730 (key, addr_pair) for key, addr_pair in addr_infos.items()
731 if not ((local_addr and addr_pair[0] is None) or
732 (remote_addr and addr_pair[1] is None))]
733
734 if not addr_pairs_info:
735 raise ValueError('can not get address information')
736
737 exceptions = []
738
739 for ((family, proto),
740 (local_address, remote_address)) in addr_pairs_info:
741 sock = None
742 r_addr = None
743 try:
744 sock = socket.socket(
745 family=family, type=socket.SOCK_DGRAM, proto=proto)
746 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
747 sock.setblocking(False)
748
749 if local_addr:
750 sock.bind(local_address)
751 if remote_addr:
752 yield from self.sock_connect(sock, remote_address)
753 r_addr = remote_address
754 except OSError as exc:
755 if sock is not None:
756 sock.close()
757 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200758 except:
759 if sock is not None:
760 sock.close()
761 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700762 else:
763 break
764 else:
765 raise exceptions[0]
766
767 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200768 waiter = futures.Future(loop=self)
769 transport = self._make_datagram_transport(sock, protocol, r_addr,
770 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200771 if self._debug:
772 if local_addr:
773 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
774 "created: (%r, %r)",
775 local_addr, remote_addr, transport, protocol)
776 else:
777 logger.debug("Datagram endpoint remote_addr=%r created: "
778 "(%r, %r)",
779 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100780
781 try:
782 yield from waiter
783 except:
784 transport.close()
785 raise
786
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787 return transport, protocol
788
Victor Stinnerf951d282014-06-29 00:46:45 +0200789 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700790 def create_server(self, protocol_factory, host=None, port=None,
791 *,
792 family=socket.AF_UNSPEC,
793 flags=socket.AI_PASSIVE,
794 sock=None,
795 backlog=100,
796 ssl=None,
797 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200798 """Create a TCP server bound to host and port.
799
Victor Stinneracdb7822014-07-14 18:33:40 +0200800 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200801
802 This method is a coroutine.
803 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700804 if isinstance(ssl, bool):
805 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700806 if host is not None or port is not None:
807 if sock is not None:
808 raise ValueError(
809 'host/port and sock can not be specified at the same time')
810
811 AF_INET6 = getattr(socket, 'AF_INET6', 0)
812 if reuse_address is None:
813 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
814 sockets = []
815 if host == '':
816 host = None
817
818 infos = yield from self.getaddrinfo(
819 host, port, family=family,
820 type=socket.SOCK_STREAM, proto=0, flags=flags)
821 if not infos:
822 raise OSError('getaddrinfo() returned empty list')
823
824 completed = False
825 try:
826 for res in infos:
827 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700828 try:
829 sock = socket.socket(af, socktype, proto)
830 except socket.error:
831 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200832 if self._debug:
833 logger.warning('create_server() failed to create '
834 'socket.socket(%r, %r, %r)',
835 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700836 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700837 sockets.append(sock)
838 if reuse_address:
839 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
840 True)
841 # Disable IPv4/IPv6 dual stack support (enabled by
842 # default on Linux) which makes a single socket
843 # listen on both address families.
844 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
845 sock.setsockopt(socket.IPPROTO_IPV6,
846 socket.IPV6_V6ONLY,
847 True)
848 try:
849 sock.bind(sa)
850 except OSError as err:
851 raise OSError(err.errno, 'error while attempting '
852 'to bind on address %r: %s'
853 % (sa, err.strerror.lower()))
854 completed = True
855 finally:
856 if not completed:
857 for sock in sockets:
858 sock.close()
859 else:
860 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200861 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700862 sockets = [sock]
863
864 server = Server(self, sockets)
865 for sock in sockets:
866 sock.listen(backlog)
867 sock.setblocking(False)
868 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200869 if self._debug:
870 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700871 return server
872
Victor Stinnerf951d282014-06-29 00:46:45 +0200873 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700874 def connect_read_pipe(self, protocol_factory, pipe):
875 protocol = protocol_factory()
876 waiter = futures.Future(loop=self)
877 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100878
879 try:
880 yield from waiter
881 except:
882 transport.close()
883 raise
884
Victor Stinneracdb7822014-07-14 18:33:40 +0200885 if self._debug:
886 logger.debug('Read pipe %r connected: (%r, %r)',
887 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700888 return transport, protocol
889
Victor Stinnerf951d282014-06-29 00:46:45 +0200890 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700891 def connect_write_pipe(self, protocol_factory, pipe):
892 protocol = protocol_factory()
893 waiter = futures.Future(loop=self)
894 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100895
896 try:
897 yield from waiter
898 except:
899 transport.close()
900 raise
901
Victor Stinneracdb7822014-07-14 18:33:40 +0200902 if self._debug:
903 logger.debug('Write pipe %r connected: (%r, %r)',
904 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700905 return transport, protocol
906
Victor Stinneracdb7822014-07-14 18:33:40 +0200907 def _log_subprocess(self, msg, stdin, stdout, stderr):
908 info = [msg]
909 if stdin is not None:
910 info.append('stdin=%s' % _format_pipe(stdin))
911 if stdout is not None and stderr == subprocess.STDOUT:
912 info.append('stdout=stderr=%s' % _format_pipe(stdout))
913 else:
914 if stdout is not None:
915 info.append('stdout=%s' % _format_pipe(stdout))
916 if stderr is not None:
917 info.append('stderr=%s' % _format_pipe(stderr))
918 logger.debug(' '.join(info))
919
Victor Stinnerf951d282014-06-29 00:46:45 +0200920 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700921 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
922 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
923 universal_newlines=False, shell=True, bufsize=0,
924 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100925 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800926 raise ValueError("cmd must be a string")
927 if universal_newlines:
928 raise ValueError("universal_newlines must be False")
929 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100930 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800931 if bufsize != 0:
932 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700933 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200934 if self._debug:
935 # don't log parameters: they may contain sensitive information
936 # (password) and may be too long
937 debug_log = 'run shell command %r' % cmd
938 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700939 transport = yield from self._make_subprocess_transport(
940 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200941 if self._debug:
942 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700943 return transport, protocol
944
Victor Stinnerf951d282014-06-29 00:46:45 +0200945 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500946 def subprocess_exec(self, protocol_factory, program, *args,
947 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
948 stderr=subprocess.PIPE, universal_newlines=False,
949 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800950 if universal_newlines:
951 raise ValueError("universal_newlines must be False")
952 if shell:
953 raise ValueError("shell must be False")
954 if bufsize != 0:
955 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100956 popen_args = (program,) + args
957 for arg in popen_args:
958 if not isinstance(arg, (str, bytes)):
959 raise TypeError("program arguments must be "
960 "a bytes or text string, not %s"
961 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700962 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200963 if self._debug:
964 # don't log parameters: they may contain sensitive information
965 # (password) and may be too long
966 debug_log = 'execute program %r' % program
967 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700968 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500969 protocol, popen_args, False, stdin, stdout, stderr,
970 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200971 if self._debug:
972 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700973 return transport, protocol
974
Yury Selivanov569efa22014-02-18 18:02:19 -0500975 def set_exception_handler(self, handler):
976 """Set handler as the new event loop exception handler.
977
978 If handler is None, the default exception handler will
979 be set.
980
981 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200982 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500983 will be a reference to the active event loop, 'context'
984 will be a dict object (see `call_exception_handler()`
985 documentation for details about context).
986 """
987 if handler is not None and not callable(handler):
988 raise TypeError('A callable object or None is expected, '
989 'got {!r}'.format(handler))
990 self._exception_handler = handler
991
992 def default_exception_handler(self, context):
993 """Default exception handler.
994
995 This is called when an exception occurs and no exception
996 handler is set, and can be called by a custom exception
997 handler that wants to defer to the default behavior.
998
Victor Stinneracdb7822014-07-14 18:33:40 +0200999 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001000 `call_exception_handler()`.
1001 """
1002 message = context.get('message')
1003 if not message:
1004 message = 'Unhandled exception in event loop'
1005
1006 exception = context.get('exception')
1007 if exception is not None:
1008 exc_info = (type(exception), exception, exception.__traceback__)
1009 else:
1010 exc_info = False
1011
Victor Stinnerff018e42015-01-28 00:30:40 +01001012 if ('source_traceback' not in context
1013 and self._current_handle is not None
Victor Stinner9b524d52015-01-26 11:05:12 +01001014 and self._current_handle._source_traceback):
1015 context['handle_traceback'] = self._current_handle._source_traceback
1016
Yury Selivanov569efa22014-02-18 18:02:19 -05001017 log_lines = [message]
1018 for key in sorted(context):
1019 if key in {'message', 'exception'}:
1020 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001021 value = context[key]
1022 if key == 'source_traceback':
1023 tb = ''.join(traceback.format_list(value))
1024 value = 'Object created at (most recent call last):\n'
1025 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001026 elif key == 'handle_traceback':
1027 tb = ''.join(traceback.format_list(value))
1028 value = 'Handle created at (most recent call last):\n'
1029 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001030 else:
1031 value = repr(value)
1032 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -05001033
1034 logger.error('\n'.join(log_lines), exc_info=exc_info)
1035
1036 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001037 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001038
Victor Stinneracdb7822014-07-14 18:33:40 +02001039 The context argument is a dict containing the following keys:
1040
Yury Selivanov569efa22014-02-18 18:02:19 -05001041 - 'message': Error message;
1042 - 'exception' (optional): Exception object;
1043 - 'future' (optional): Future instance;
1044 - 'handle' (optional): Handle instance;
1045 - 'protocol' (optional): Protocol instance;
1046 - 'transport' (optional): Transport instance;
1047 - 'socket' (optional): Socket instance.
1048
Victor Stinneracdb7822014-07-14 18:33:40 +02001049 New keys maybe introduced in the future.
1050
1051 Note: do not overload this method in an event loop subclass.
1052 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001053 `set_exception_handler()` method.
1054 """
1055 if self._exception_handler is None:
1056 try:
1057 self.default_exception_handler(context)
1058 except Exception:
1059 # Second protection layer for unexpected errors
1060 # in the default implementation, as well as for subclassed
1061 # event loops with overloaded "default_exception_handler".
1062 logger.error('Exception in default exception handler',
1063 exc_info=True)
1064 else:
1065 try:
1066 self._exception_handler(self, context)
1067 except Exception as exc:
1068 # Exception in the user set custom exception handler.
1069 try:
1070 # Let's try default handler.
1071 self.default_exception_handler({
1072 'message': 'Unhandled error in exception handler',
1073 'exception': exc,
1074 'context': context,
1075 })
1076 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001077 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001078 # overloaded.
1079 logger.error('Exception in default exception handler '
1080 'while handling an unexpected error '
1081 'in custom exception handler',
1082 exc_info=True)
1083
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001084 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001085 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001086 assert isinstance(handle, events.Handle), 'A Handle is required here'
1087 if handle._cancelled:
1088 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001089 assert not isinstance(handle, events.TimerHandle)
1090 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001091
1092 def _add_callback_signalsafe(self, handle):
1093 """Like _add_callback() but called from a signal handler."""
1094 self._add_callback(handle)
1095 self._write_to_self()
1096
Yury Selivanov592ada92014-09-25 12:07:56 -04001097 def _timer_handle_cancelled(self, handle):
1098 """Notification that a TimerHandle has been cancelled."""
1099 if handle._scheduled:
1100 self._timer_cancelled_count += 1
1101
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001102 def _run_once(self):
1103 """Run one full iteration of the event loop.
1104
1105 This calls all currently ready callbacks, polls for I/O,
1106 schedules the resulting callbacks, and finally schedules
1107 'call_later' callbacks.
1108 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001109
Yury Selivanov592ada92014-09-25 12:07:56 -04001110 sched_count = len(self._scheduled)
1111 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1112 self._timer_cancelled_count / sched_count >
1113 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001114 # Remove delayed calls that were cancelled if their number
1115 # is too high
1116 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001117 for handle in self._scheduled:
1118 if handle._cancelled:
1119 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001120 else:
1121 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001122
Victor Stinner68da8fc2014-09-30 18:08:36 +02001123 heapq.heapify(new_scheduled)
1124 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001125 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001126 else:
1127 # Remove delayed calls that were cancelled from head of queue.
1128 while self._scheduled and self._scheduled[0]._cancelled:
1129 self._timer_cancelled_count -= 1
1130 handle = heapq.heappop(self._scheduled)
1131 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001132
1133 timeout = None
1134 if self._ready:
1135 timeout = 0
1136 elif self._scheduled:
1137 # Compute the desired timeout.
1138 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001139 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001140
Victor Stinner770e48d2014-07-11 11:58:33 +02001141 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001142 t0 = self.time()
1143 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001144 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001145 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001146 level = logging.INFO
1147 else:
1148 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001149 nevent = len(event_list)
1150 if timeout is None:
1151 logger.log(level, 'poll took %.3f ms: %s events',
1152 dt * 1e3, nevent)
1153 elif nevent:
1154 logger.log(level,
1155 'poll %.3f ms took %.3f ms: %s events',
1156 timeout * 1e3, dt * 1e3, nevent)
1157 elif dt >= 1.0:
1158 logger.log(level,
1159 'poll %.3f ms took %.3f ms: timeout',
1160 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001161 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001162 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001163 self._process_events(event_list)
1164
1165 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001166 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001167 while self._scheduled:
1168 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001169 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001170 break
1171 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001172 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001173 self._ready.append(handle)
1174
1175 # This is the only place where callbacks are actually *called*.
1176 # All other places just add them to ready.
1177 # Note: We run all currently scheduled callbacks, but not any
1178 # callbacks scheduled by callbacks run this time around --
1179 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001180 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001181 ntodo = len(self._ready)
1182 for i in range(ntodo):
1183 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001184 if handle._cancelled:
1185 continue
1186 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001187 try:
1188 self._current_handle = handle
1189 t0 = self.time()
1190 handle._run()
1191 dt = self.time() - t0
1192 if dt >= self.slow_callback_duration:
1193 logger.warning('Executing %s took %.3f seconds',
1194 _format_handle(handle), dt)
1195 finally:
1196 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001197 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001198 handle._run()
1199 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001200
Yury Selivanove8944cb2015-05-12 11:43:04 -04001201 def _set_coroutine_wrapper(self, enabled):
1202 try:
1203 set_wrapper = sys.set_coroutine_wrapper
1204 get_wrapper = sys.get_coroutine_wrapper
1205 except AttributeError:
1206 return
1207
1208 enabled = bool(enabled)
Yury Selivanov996083d2015-08-04 15:37:24 -04001209 if self._coroutine_wrapper_set == enabled:
Yury Selivanove8944cb2015-05-12 11:43:04 -04001210 return
1211
1212 wrapper = coroutines.debug_wrapper
1213 current_wrapper = get_wrapper()
1214
1215 if enabled:
1216 if current_wrapper not in (None, wrapper):
1217 warnings.warn(
1218 "loop.set_debug(True): cannot set debug coroutine "
1219 "wrapper; another wrapper is already set %r" %
1220 current_wrapper, RuntimeWarning)
1221 else:
1222 set_wrapper(wrapper)
1223 self._coroutine_wrapper_set = True
1224 else:
1225 if current_wrapper not in (None, wrapper):
1226 warnings.warn(
1227 "loop.set_debug(False): cannot unset debug coroutine "
1228 "wrapper; another wrapper was set %r" %
1229 current_wrapper, RuntimeWarning)
1230 else:
1231 set_wrapper(None)
1232 self._coroutine_wrapper_set = False
1233
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001234 def get_debug(self):
1235 return self._debug
1236
1237 def set_debug(self, enabled):
1238 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001239
Yury Selivanove8944cb2015-05-12 11:43:04 -04001240 if self.is_running():
1241 self._set_coroutine_wrapper(enabled)