blob: cb2499d26cf8b9b54338179a89a535460b15f611 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
48 if family not in (socket.AF_INET, socket.AF_INET6):
49 return
50
51 host, port = address
52 type_mask = 0
53 if hasattr(socket, 'SOCK_NONBLOCK'):
54 type_mask |= socket.SOCK_NONBLOCK
55 if hasattr(socket, 'SOCK_CLOEXEC'):
56 type_mask |= socket.SOCK_CLOEXEC
57 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
58 # already resolved.
59 try:
60 socket.getaddrinfo(host, port,
61 family=family,
62 type=(sock.type & ~type_mask),
63 proto=sock.proto,
64 flags=socket.AI_NUMERICHOST)
65 except socket.gaierror as err:
66 raise ValueError("address must be resolved (IP address), got %r: %s"
67 % (address, err))
68
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069def _raise_stop_error(*args):
70 raise _StopError
71
72
73class Server(events.AbstractServer):
74
75 def __init__(self, loop, sockets):
76 self.loop = loop
77 self.sockets = sockets
78 self.active_count = 0
79 self.waiters = []
80
81 def attach(self, transport):
82 assert self.sockets is not None
83 self.active_count += 1
84
85 def detach(self, transport):
86 assert self.active_count > 0
87 self.active_count -= 1
88 if self.active_count == 0 and self.sockets is None:
89 self._wakeup()
90
91 def close(self):
92 sockets = self.sockets
93 if sockets is not None:
94 self.sockets = None
95 for sock in sockets:
96 self.loop._stop_serving(sock)
97 if self.active_count == 0:
98 self._wakeup()
99
100 def _wakeup(self):
101 waiters = self.waiters
102 self.waiters = None
103 for waiter in waiters:
104 if not waiter.done():
105 waiter.set_result(waiter)
106
107 @tasks.coroutine
108 def wait_closed(self):
109 if self.sockets is None or self.waiters is None:
110 return
111 waiter = futures.Future(loop=self.loop)
112 self.waiters.append(waiter)
113 yield from waiter
114
115
116class BaseEventLoop(events.AbstractEventLoop):
117
118 def __init__(self):
119 self._ready = collections.deque()
120 self._scheduled = []
121 self._default_executor = None
122 self._internal_fds = 0
123 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100124 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500125 self._exception_handler = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 def _make_socket_transport(self, sock, protocol, waiter=None, *,
128 extra=None, server=None):
129 """Create socket transport."""
130 raise NotImplementedError
131
132 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
133 server_side=False, server_hostname=None,
134 extra=None, server=None):
135 """Create SSL transport."""
136 raise NotImplementedError
137
138 def _make_datagram_transport(self, sock, protocol,
139 address=None, extra=None):
140 """Create datagram transport."""
141 raise NotImplementedError
142
143 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
144 extra=None):
145 """Create read pipe transport."""
146 raise NotImplementedError
147
148 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
149 extra=None):
150 """Create write pipe transport."""
151 raise NotImplementedError
152
153 @tasks.coroutine
154 def _make_subprocess_transport(self, protocol, args, shell,
155 stdin, stdout, stderr, bufsize,
156 extra=None, **kwargs):
157 """Create subprocess transport."""
158 raise NotImplementedError
159
160 def _read_from_self(self):
161 """XXX"""
162 raise NotImplementedError
163
164 def _write_to_self(self):
165 """XXX"""
166 raise NotImplementedError
167
168 def _process_events(self, event_list):
169 """Process selector events."""
170 raise NotImplementedError
171
172 def run_forever(self):
173 """Run until stop() is called."""
174 if self._running:
175 raise RuntimeError('Event loop is running.')
176 self._running = True
177 try:
178 while True:
179 try:
180 self._run_once()
181 except _StopError:
182 break
183 finally:
184 self._running = False
185
186 def run_until_complete(self, future):
187 """Run until the Future is done.
188
189 If the argument is a coroutine, it is wrapped in a Task.
190
191 XXX TBD: It would be disastrous to call run_until_complete()
192 with the same coroutine twice -- it would wrap it in two
193 different Tasks and that can't be good.
194
195 Return the Future's result, or raise its exception.
196 """
197 future = tasks.async(future, loop=self)
198 future.add_done_callback(_raise_stop_error)
199 self.run_forever()
200 future.remove_done_callback(_raise_stop_error)
201 if not future.done():
202 raise RuntimeError('Event loop stopped before Future completed.')
203
204 return future.result()
205
206 def stop(self):
207 """Stop running the event loop.
208
209 Every callback scheduled before stop() is called will run.
210 Callback scheduled after stop() is called won't. However,
211 those callbacks will run if run() is called again later.
212 """
213 self.call_soon(_raise_stop_error)
214
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200215 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700216 """Close the event loop.
217
218 This clears the queues and shuts down the executor,
219 but does not wait for the executor to finish.
220 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200221 self._ready.clear()
222 self._scheduled.clear()
223 executor = self._default_executor
224 if executor is not None:
225 self._default_executor = None
226 executor.shutdown(wait=False)
227
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 def is_running(self):
229 """Returns running status of event loop."""
230 return self._running
231
232 def time(self):
233 """Return the time according to the event loop's clock."""
234 return time.monotonic()
235
236 def call_later(self, delay, callback, *args):
237 """Arrange for a callback to be called at a given time.
238
239 Return a Handle: an opaque object with a cancel() method that
240 can be used to cancel the call.
241
242 The delay can be an int or float, expressed in seconds. It is
243 always a relative time.
244
245 Each callback will be called exactly once. If two callbacks
246 are scheduled for exactly the same time, it undefined which
247 will be called first.
248
249 Any positional arguments after the callback will be passed to
250 the callback when it is called.
251 """
252 return self.call_at(self.time() + delay, callback, *args)
253
254 def call_at(self, when, callback, *args):
255 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100256 if tasks.iscoroutinefunction(callback):
257 raise TypeError("coroutines cannot be used with call_at()")
Yury Selivanov569efa22014-02-18 18:02:19 -0500258 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 heapq.heappush(self._scheduled, timer)
260 return timer
261
262 def call_soon(self, callback, *args):
263 """Arrange for a callback to be called as soon as possible.
264
265 This operates as a FIFO queue, callbacks are called in the
266 order in which they are registered. Each callback will be
267 called exactly once.
268
269 Any positional arguments after the callback will be passed to
270 the callback when it is called.
271 """
Victor Stinner9af4a242014-02-11 11:34:30 +0100272 if tasks.iscoroutinefunction(callback):
273 raise TypeError("coroutines cannot be used with call_soon()")
Yury Selivanov569efa22014-02-18 18:02:19 -0500274 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 self._ready.append(handle)
276 return handle
277
278 def call_soon_threadsafe(self, callback, *args):
279 """XXX"""
280 handle = self.call_soon(callback, *args)
281 self._write_to_self()
282 return handle
283
284 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100285 if tasks.iscoroutinefunction(callback):
286 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 if isinstance(callback, events.Handle):
288 assert not args
289 assert not isinstance(callback, events.TimerHandle)
290 if callback._cancelled:
291 f = futures.Future(loop=self)
292 f.set_result(None)
293 return f
294 callback, args = callback._callback, callback._args
295 if executor is None:
296 executor = self._default_executor
297 if executor is None:
298 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
299 self._default_executor = executor
300 return futures.wrap_future(executor.submit(callback, *args), loop=self)
301
302 def set_default_executor(self, executor):
303 self._default_executor = executor
304
305 def getaddrinfo(self, host, port, *,
306 family=0, type=0, proto=0, flags=0):
307 return self.run_in_executor(None, socket.getaddrinfo,
308 host, port, family, type, proto, flags)
309
310 def getnameinfo(self, sockaddr, flags=0):
311 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
312
313 @tasks.coroutine
314 def create_connection(self, protocol_factory, host=None, port=None, *,
315 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700316 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700318 if server_hostname is not None and not ssl:
319 raise ValueError('server_hostname is only meaningful with ssl')
320
321 if server_hostname is None and ssl:
322 # Use host as default for server_hostname. It is an error
323 # if host is empty or not set, e.g. when an
324 # already-connected socket was passed or when only a port
325 # is given. To avoid this error, you can pass
326 # server_hostname='' -- this will bypass the hostname
327 # check. (This also means that if host is a numeric
328 # IP/IPv6 address, we will attempt to verify that exact
329 # address; this will probably fail, but it is possible to
330 # create a certificate for a specific IP address, so we
331 # don't judge it here.)
332 if not host:
333 raise ValueError('You must set server_hostname '
334 'when using ssl without a host')
335 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 if host is not None or port is not None:
338 if sock is not None:
339 raise ValueError(
340 'host/port and sock can not be specified at the same time')
341
342 f1 = self.getaddrinfo(
343 host, port, family=family,
344 type=socket.SOCK_STREAM, proto=proto, flags=flags)
345 fs = [f1]
346 if local_addr is not None:
347 f2 = self.getaddrinfo(
348 *local_addr, family=family,
349 type=socket.SOCK_STREAM, proto=proto, flags=flags)
350 fs.append(f2)
351 else:
352 f2 = None
353
354 yield from tasks.wait(fs, loop=self)
355
356 infos = f1.result()
357 if not infos:
358 raise OSError('getaddrinfo() returned empty list')
359 if f2 is not None:
360 laddr_infos = f2.result()
361 if not laddr_infos:
362 raise OSError('getaddrinfo() returned empty list')
363
364 exceptions = []
365 for family, type, proto, cname, address in infos:
366 try:
367 sock = socket.socket(family=family, type=type, proto=proto)
368 sock.setblocking(False)
369 if f2 is not None:
370 for _, _, _, _, laddr in laddr_infos:
371 try:
372 sock.bind(laddr)
373 break
374 except OSError as exc:
375 exc = OSError(
376 exc.errno, 'error while '
377 'attempting to bind on address '
378 '{!r}: {}'.format(
379 laddr, exc.strerror.lower()))
380 exceptions.append(exc)
381 else:
382 sock.close()
383 sock = None
384 continue
385 yield from self.sock_connect(sock, address)
386 except OSError as exc:
387 if sock is not None:
388 sock.close()
389 exceptions.append(exc)
390 else:
391 break
392 else:
393 if len(exceptions) == 1:
394 raise exceptions[0]
395 else:
396 # If they all have the same str(), raise one.
397 model = str(exceptions[0])
398 if all(str(exc) == model for exc in exceptions):
399 raise exceptions[0]
400 # Raise a combined exception so the user can see all
401 # the various error messages.
402 raise OSError('Multiple exceptions: {}'.format(
403 ', '.join(str(exc) for exc in exceptions)))
404
405 elif sock is None:
406 raise ValueError(
407 'host and port was not specified and no sock specified')
408
409 sock.setblocking(False)
410
Yury Selivanovb057c522014-02-18 12:15:06 -0500411 transport, protocol = yield from self._create_connection_transport(
412 sock, protocol_factory, ssl, server_hostname)
413 return transport, protocol
414
415 @tasks.coroutine
416 def _create_connection_transport(self, sock, protocol_factory, ssl,
417 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 protocol = protocol_factory()
419 waiter = futures.Future(loop=self)
420 if ssl:
421 sslcontext = None if isinstance(ssl, bool) else ssl
422 transport = self._make_ssl_transport(
423 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700424 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 else:
426 transport = self._make_socket_transport(sock, protocol, waiter)
427
428 yield from waiter
429 return transport, protocol
430
431 @tasks.coroutine
432 def create_datagram_endpoint(self, protocol_factory,
433 local_addr=None, remote_addr=None, *,
434 family=0, proto=0, flags=0):
435 """Create datagram connection."""
436 if not (local_addr or remote_addr):
437 if family == 0:
438 raise ValueError('unexpected address family')
439 addr_pairs_info = (((family, proto), (None, None)),)
440 else:
441 # join addresss by (family, protocol)
442 addr_infos = collections.OrderedDict()
443 for idx, addr in ((0, local_addr), (1, remote_addr)):
444 if addr is not None:
445 assert isinstance(addr, tuple) and len(addr) == 2, (
446 '2-tuple is expected')
447
448 infos = yield from self.getaddrinfo(
449 *addr, family=family, type=socket.SOCK_DGRAM,
450 proto=proto, flags=flags)
451 if not infos:
452 raise OSError('getaddrinfo() returned empty list')
453
454 for fam, _, pro, _, address in infos:
455 key = (fam, pro)
456 if key not in addr_infos:
457 addr_infos[key] = [None, None]
458 addr_infos[key][idx] = address
459
460 # each addr has to have info for each (family, proto) pair
461 addr_pairs_info = [
462 (key, addr_pair) for key, addr_pair in addr_infos.items()
463 if not ((local_addr and addr_pair[0] is None) or
464 (remote_addr and addr_pair[1] is None))]
465
466 if not addr_pairs_info:
467 raise ValueError('can not get address information')
468
469 exceptions = []
470
471 for ((family, proto),
472 (local_address, remote_address)) in addr_pairs_info:
473 sock = None
474 r_addr = None
475 try:
476 sock = socket.socket(
477 family=family, type=socket.SOCK_DGRAM, proto=proto)
478 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
479 sock.setblocking(False)
480
481 if local_addr:
482 sock.bind(local_address)
483 if remote_addr:
484 yield from self.sock_connect(sock, remote_address)
485 r_addr = remote_address
486 except OSError as exc:
487 if sock is not None:
488 sock.close()
489 exceptions.append(exc)
490 else:
491 break
492 else:
493 raise exceptions[0]
494
495 protocol = protocol_factory()
496 transport = self._make_datagram_transport(sock, protocol, r_addr)
497 return transport, protocol
498
499 @tasks.coroutine
500 def create_server(self, protocol_factory, host=None, port=None,
501 *,
502 family=socket.AF_UNSPEC,
503 flags=socket.AI_PASSIVE,
504 sock=None,
505 backlog=100,
506 ssl=None,
507 reuse_address=None):
508 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700509 if isinstance(ssl, bool):
510 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 if host is not None or port is not None:
512 if sock is not None:
513 raise ValueError(
514 'host/port and sock can not be specified at the same time')
515
516 AF_INET6 = getattr(socket, 'AF_INET6', 0)
517 if reuse_address is None:
518 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
519 sockets = []
520 if host == '':
521 host = None
522
523 infos = yield from self.getaddrinfo(
524 host, port, family=family,
525 type=socket.SOCK_STREAM, proto=0, flags=flags)
526 if not infos:
527 raise OSError('getaddrinfo() returned empty list')
528
529 completed = False
530 try:
531 for res in infos:
532 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700533 try:
534 sock = socket.socket(af, socktype, proto)
535 except socket.error:
536 # Assume it's a bad family/type/protocol combination.
537 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 sockets.append(sock)
539 if reuse_address:
540 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
541 True)
542 # Disable IPv4/IPv6 dual stack support (enabled by
543 # default on Linux) which makes a single socket
544 # listen on both address families.
545 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
546 sock.setsockopt(socket.IPPROTO_IPV6,
547 socket.IPV6_V6ONLY,
548 True)
549 try:
550 sock.bind(sa)
551 except OSError as err:
552 raise OSError(err.errno, 'error while attempting '
553 'to bind on address %r: %s'
554 % (sa, err.strerror.lower()))
555 completed = True
556 finally:
557 if not completed:
558 for sock in sockets:
559 sock.close()
560 else:
561 if sock is None:
562 raise ValueError(
563 'host and port was not specified and no sock specified')
564 sockets = [sock]
565
566 server = Server(self, sockets)
567 for sock in sockets:
568 sock.listen(backlog)
569 sock.setblocking(False)
570 self._start_serving(protocol_factory, sock, ssl, server)
571 return server
572
573 @tasks.coroutine
574 def connect_read_pipe(self, protocol_factory, pipe):
575 protocol = protocol_factory()
576 waiter = futures.Future(loop=self)
577 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
578 yield from waiter
579 return transport, protocol
580
581 @tasks.coroutine
582 def connect_write_pipe(self, protocol_factory, pipe):
583 protocol = protocol_factory()
584 waiter = futures.Future(loop=self)
585 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
586 yield from waiter
587 return transport, protocol
588
589 @tasks.coroutine
590 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
591 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
592 universal_newlines=False, shell=True, bufsize=0,
593 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100594 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800595 raise ValueError("cmd must be a string")
596 if universal_newlines:
597 raise ValueError("universal_newlines must be False")
598 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100599 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800600 if bufsize != 0:
601 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 protocol = protocol_factory()
603 transport = yield from self._make_subprocess_transport(
604 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
605 return transport, protocol
606
607 @tasks.coroutine
Victor Stinner20e07432014-02-11 11:44:56 +0100608 def subprocess_exec(self, protocol_factory, program, *args, stdin=subprocess.PIPE,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
610 universal_newlines=False, shell=False, bufsize=0,
611 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800612 if universal_newlines:
613 raise ValueError("universal_newlines must be False")
614 if shell:
615 raise ValueError("shell must be False")
616 if bufsize != 0:
617 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100618 popen_args = (program,) + args
619 for arg in popen_args:
620 if not isinstance(arg, (str, bytes)):
621 raise TypeError("program arguments must be "
622 "a bytes or text string, not %s"
623 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 protocol = protocol_factory()
625 transport = yield from self._make_subprocess_transport(
Victor Stinner20e07432014-02-11 11:44:56 +0100626 protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 return transport, protocol
628
Yury Selivanov569efa22014-02-18 18:02:19 -0500629 def set_exception_handler(self, handler):
630 """Set handler as the new event loop exception handler.
631
632 If handler is None, the default exception handler will
633 be set.
634
635 If handler is a callable object, it should have a
636 matching signature to '(loop, context)', where 'loop'
637 will be a reference to the active event loop, 'context'
638 will be a dict object (see `call_exception_handler()`
639 documentation for details about context).
640 """
641 if handler is not None and not callable(handler):
642 raise TypeError('A callable object or None is expected, '
643 'got {!r}'.format(handler))
644 self._exception_handler = handler
645
646 def default_exception_handler(self, context):
647 """Default exception handler.
648
649 This is called when an exception occurs and no exception
650 handler is set, and can be called by a custom exception
651 handler that wants to defer to the default behavior.
652
653 context parameter has the same meaning as in
654 `call_exception_handler()`.
655 """
656 message = context.get('message')
657 if not message:
658 message = 'Unhandled exception in event loop'
659
660 exception = context.get('exception')
661 if exception is not None:
662 exc_info = (type(exception), exception, exception.__traceback__)
663 else:
664 exc_info = False
665
666 log_lines = [message]
667 for key in sorted(context):
668 if key in {'message', 'exception'}:
669 continue
670 log_lines.append('{}: {!r}'.format(key, context[key]))
671
672 logger.error('\n'.join(log_lines), exc_info=exc_info)
673
674 def call_exception_handler(self, context):
675 """Call the current event loop exception handler.
676
677 context is a dict object containing the following keys
678 (new keys maybe introduced later):
679 - 'message': Error message;
680 - 'exception' (optional): Exception object;
681 - 'future' (optional): Future instance;
682 - 'handle' (optional): Handle instance;
683 - 'protocol' (optional): Protocol instance;
684 - 'transport' (optional): Transport instance;
685 - 'socket' (optional): Socket instance.
686
687 Note: this method should not be overloaded in subclassed
688 event loops. For any custom exception handling, use
689 `set_exception_handler()` method.
690 """
691 if self._exception_handler is None:
692 try:
693 self.default_exception_handler(context)
694 except Exception:
695 # Second protection layer for unexpected errors
696 # in the default implementation, as well as for subclassed
697 # event loops with overloaded "default_exception_handler".
698 logger.error('Exception in default exception handler',
699 exc_info=True)
700 else:
701 try:
702 self._exception_handler(self, context)
703 except Exception as exc:
704 # Exception in the user set custom exception handler.
705 try:
706 # Let's try default handler.
707 self.default_exception_handler({
708 'message': 'Unhandled error in exception handler',
709 'exception': exc,
710 'context': context,
711 })
712 except Exception:
713 # Guard 'default_exception_handler' in case it's
714 # overloaded.
715 logger.error('Exception in default exception handler '
716 'while handling an unexpected error '
717 'in custom exception handler',
718 exc_info=True)
719
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 def _add_callback(self, handle):
721 """Add a Handle to ready or scheduled."""
722 assert isinstance(handle, events.Handle), 'A Handle is required here'
723 if handle._cancelled:
724 return
725 if isinstance(handle, events.TimerHandle):
726 heapq.heappush(self._scheduled, handle)
727 else:
728 self._ready.append(handle)
729
730 def _add_callback_signalsafe(self, handle):
731 """Like _add_callback() but called from a signal handler."""
732 self._add_callback(handle)
733 self._write_to_self()
734
735 def _run_once(self):
736 """Run one full iteration of the event loop.
737
738 This calls all currently ready callbacks, polls for I/O,
739 schedules the resulting callbacks, and finally schedules
740 'call_later' callbacks.
741 """
742 # Remove delayed calls that were cancelled from head of queue.
743 while self._scheduled and self._scheduled[0]._cancelled:
744 heapq.heappop(self._scheduled)
745
746 timeout = None
747 if self._ready:
748 timeout = 0
749 elif self._scheduled:
750 # Compute the desired timeout.
751 when = self._scheduled[0]._when
752 deadline = max(0, when - self.time())
753 if timeout is None:
754 timeout = deadline
755 else:
756 timeout = min(timeout, deadline)
757
758 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100759 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100760 t0 = self.time()
761 event_list = self._selector.select(timeout)
762 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100763 if t1-t0 >= 1:
764 level = logging.INFO
765 else:
766 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100767 if timeout is not None:
768 logger.log(level, 'poll %.3f took %.3f seconds',
769 timeout, t1-t0)
770 else:
771 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700772 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100773 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700774 self._process_events(event_list)
775
776 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100777 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778 while self._scheduled:
779 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100780 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 break
782 handle = heapq.heappop(self._scheduled)
783 self._ready.append(handle)
784
785 # This is the only place where callbacks are actually *called*.
786 # All other places just add them to ready.
787 # Note: We run all currently scheduled callbacks, but not any
788 # callbacks scheduled by callbacks run this time around --
789 # they will be run the next time (after another I/O poll).
790 # Use an idiom that is threadsafe without using locks.
791 ntodo = len(self._ready)
792 for i in range(ntodo):
793 handle = self._ready.popleft()
794 if not handle._cancelled:
795 handle._run()
796 handle = None # Needed to break cycles when an exception occurs.