blob: 69caa4d723775558cd8bff9eed5233ad5bf0680d [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
48 if family not in (socket.AF_INET, socket.AF_INET6):
49 return
50
51 host, port = address
52 type_mask = 0
53 if hasattr(socket, 'SOCK_NONBLOCK'):
54 type_mask |= socket.SOCK_NONBLOCK
55 if hasattr(socket, 'SOCK_CLOEXEC'):
56 type_mask |= socket.SOCK_CLOEXEC
57 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
58 # already resolved.
59 try:
60 socket.getaddrinfo(host, port,
61 family=family,
62 type=(sock.type & ~type_mask),
63 proto=sock.proto,
64 flags=socket.AI_NUMERICHOST)
65 except socket.gaierror as err:
66 raise ValueError("address must be resolved (IP address), got %r: %s"
67 % (address, err))
68
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069def _raise_stop_error(*args):
70 raise _StopError
71
72
73class Server(events.AbstractServer):
74
75 def __init__(self, loop, sockets):
76 self.loop = loop
77 self.sockets = sockets
78 self.active_count = 0
79 self.waiters = []
80
81 def attach(self, transport):
82 assert self.sockets is not None
83 self.active_count += 1
84
85 def detach(self, transport):
86 assert self.active_count > 0
87 self.active_count -= 1
88 if self.active_count == 0 and self.sockets is None:
89 self._wakeup()
90
91 def close(self):
92 sockets = self.sockets
93 if sockets is not None:
94 self.sockets = None
95 for sock in sockets:
96 self.loop._stop_serving(sock)
97 if self.active_count == 0:
98 self._wakeup()
99
100 def _wakeup(self):
101 waiters = self.waiters
102 self.waiters = None
103 for waiter in waiters:
104 if not waiter.done():
105 waiter.set_result(waiter)
106
107 @tasks.coroutine
108 def wait_closed(self):
109 if self.sockets is None or self.waiters is None:
110 return
111 waiter = futures.Future(loop=self.loop)
112 self.waiters.append(waiter)
113 yield from waiter
114
115
116class BaseEventLoop(events.AbstractEventLoop):
117
118 def __init__(self):
119 self._ready = collections.deque()
120 self._scheduled = []
121 self._default_executor = None
122 self._internal_fds = 0
123 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100124 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500125 self._exception_handler = None
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100126 self._debug = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 def _make_socket_transport(self, sock, protocol, waiter=None, *,
129 extra=None, server=None):
130 """Create socket transport."""
131 raise NotImplementedError
132
133 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
134 server_side=False, server_hostname=None,
135 extra=None, server=None):
136 """Create SSL transport."""
137 raise NotImplementedError
138
139 def _make_datagram_transport(self, sock, protocol,
140 address=None, extra=None):
141 """Create datagram transport."""
142 raise NotImplementedError
143
144 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
145 extra=None):
146 """Create read pipe transport."""
147 raise NotImplementedError
148
149 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
150 extra=None):
151 """Create write pipe transport."""
152 raise NotImplementedError
153
154 @tasks.coroutine
155 def _make_subprocess_transport(self, protocol, args, shell,
156 stdin, stdout, stderr, bufsize,
157 extra=None, **kwargs):
158 """Create subprocess transport."""
159 raise NotImplementedError
160
161 def _read_from_self(self):
162 """XXX"""
163 raise NotImplementedError
164
165 def _write_to_self(self):
166 """XXX"""
167 raise NotImplementedError
168
169 def _process_events(self, event_list):
170 """Process selector events."""
171 raise NotImplementedError
172
173 def run_forever(self):
174 """Run until stop() is called."""
175 if self._running:
176 raise RuntimeError('Event loop is running.')
177 self._running = True
178 try:
179 while True:
180 try:
181 self._run_once()
182 except _StopError:
183 break
184 finally:
185 self._running = False
186
187 def run_until_complete(self, future):
188 """Run until the Future is done.
189
190 If the argument is a coroutine, it is wrapped in a Task.
191
192 XXX TBD: It would be disastrous to call run_until_complete()
193 with the same coroutine twice -- it would wrap it in two
194 different Tasks and that can't be good.
195
196 Return the Future's result, or raise its exception.
197 """
198 future = tasks.async(future, loop=self)
199 future.add_done_callback(_raise_stop_error)
200 self.run_forever()
201 future.remove_done_callback(_raise_stop_error)
202 if not future.done():
203 raise RuntimeError('Event loop stopped before Future completed.')
204
205 return future.result()
206
207 def stop(self):
208 """Stop running the event loop.
209
210 Every callback scheduled before stop() is called will run.
211 Callback scheduled after stop() is called won't. However,
212 those callbacks will run if run() is called again later.
213 """
214 self.call_soon(_raise_stop_error)
215
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200216 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700217 """Close the event loop.
218
219 This clears the queues and shuts down the executor,
220 but does not wait for the executor to finish.
221 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200222 self._ready.clear()
223 self._scheduled.clear()
224 executor = self._default_executor
225 if executor is not None:
226 self._default_executor = None
227 executor.shutdown(wait=False)
228
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 def is_running(self):
230 """Returns running status of event loop."""
231 return self._running
232
233 def time(self):
234 """Return the time according to the event loop's clock."""
235 return time.monotonic()
236
237 def call_later(self, delay, callback, *args):
238 """Arrange for a callback to be called at a given time.
239
240 Return a Handle: an opaque object with a cancel() method that
241 can be used to cancel the call.
242
243 The delay can be an int or float, expressed in seconds. It is
244 always a relative time.
245
246 Each callback will be called exactly once. If two callbacks
247 are scheduled for exactly the same time, it undefined which
248 will be called first.
249
250 Any positional arguments after the callback will be passed to
251 the callback when it is called.
252 """
253 return self.call_at(self.time() + delay, callback, *args)
254
255 def call_at(self, when, callback, *args):
256 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100257 if tasks.iscoroutinefunction(callback):
258 raise TypeError("coroutines cannot be used with call_at()")
Yury Selivanov569efa22014-02-18 18:02:19 -0500259 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 heapq.heappush(self._scheduled, timer)
261 return timer
262
263 def call_soon(self, callback, *args):
264 """Arrange for a callback to be called as soon as possible.
265
266 This operates as a FIFO queue, callbacks are called in the
267 order in which they are registered. Each callback will be
268 called exactly once.
269
270 Any positional arguments after the callback will be passed to
271 the callback when it is called.
272 """
Victor Stinner9af4a242014-02-11 11:34:30 +0100273 if tasks.iscoroutinefunction(callback):
274 raise TypeError("coroutines cannot be used with call_soon()")
Yury Selivanov569efa22014-02-18 18:02:19 -0500275 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 self._ready.append(handle)
277 return handle
278
279 def call_soon_threadsafe(self, callback, *args):
280 """XXX"""
281 handle = self.call_soon(callback, *args)
282 self._write_to_self()
283 return handle
284
285 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100286 if tasks.iscoroutinefunction(callback):
287 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 if isinstance(callback, events.Handle):
289 assert not args
290 assert not isinstance(callback, events.TimerHandle)
291 if callback._cancelled:
292 f = futures.Future(loop=self)
293 f.set_result(None)
294 return f
295 callback, args = callback._callback, callback._args
296 if executor is None:
297 executor = self._default_executor
298 if executor is None:
299 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
300 self._default_executor = executor
301 return futures.wrap_future(executor.submit(callback, *args), loop=self)
302
303 def set_default_executor(self, executor):
304 self._default_executor = executor
305
306 def getaddrinfo(self, host, port, *,
307 family=0, type=0, proto=0, flags=0):
308 return self.run_in_executor(None, socket.getaddrinfo,
309 host, port, family, type, proto, flags)
310
311 def getnameinfo(self, sockaddr, flags=0):
312 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
313
314 @tasks.coroutine
315 def create_connection(self, protocol_factory, host=None, port=None, *,
316 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700317 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700319 if server_hostname is not None and not ssl:
320 raise ValueError('server_hostname is only meaningful with ssl')
321
322 if server_hostname is None and ssl:
323 # Use host as default for server_hostname. It is an error
324 # if host is empty or not set, e.g. when an
325 # already-connected socket was passed or when only a port
326 # is given. To avoid this error, you can pass
327 # server_hostname='' -- this will bypass the hostname
328 # check. (This also means that if host is a numeric
329 # IP/IPv6 address, we will attempt to verify that exact
330 # address; this will probably fail, but it is possible to
331 # create a certificate for a specific IP address, so we
332 # don't judge it here.)
333 if not host:
334 raise ValueError('You must set server_hostname '
335 'when using ssl without a host')
336 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 if host is not None or port is not None:
339 if sock is not None:
340 raise ValueError(
341 'host/port and sock can not be specified at the same time')
342
343 f1 = self.getaddrinfo(
344 host, port, family=family,
345 type=socket.SOCK_STREAM, proto=proto, flags=flags)
346 fs = [f1]
347 if local_addr is not None:
348 f2 = self.getaddrinfo(
349 *local_addr, family=family,
350 type=socket.SOCK_STREAM, proto=proto, flags=flags)
351 fs.append(f2)
352 else:
353 f2 = None
354
355 yield from tasks.wait(fs, loop=self)
356
357 infos = f1.result()
358 if not infos:
359 raise OSError('getaddrinfo() returned empty list')
360 if f2 is not None:
361 laddr_infos = f2.result()
362 if not laddr_infos:
363 raise OSError('getaddrinfo() returned empty list')
364
365 exceptions = []
366 for family, type, proto, cname, address in infos:
367 try:
368 sock = socket.socket(family=family, type=type, proto=proto)
369 sock.setblocking(False)
370 if f2 is not None:
371 for _, _, _, _, laddr in laddr_infos:
372 try:
373 sock.bind(laddr)
374 break
375 except OSError as exc:
376 exc = OSError(
377 exc.errno, 'error while '
378 'attempting to bind on address '
379 '{!r}: {}'.format(
380 laddr, exc.strerror.lower()))
381 exceptions.append(exc)
382 else:
383 sock.close()
384 sock = None
385 continue
386 yield from self.sock_connect(sock, address)
387 except OSError as exc:
388 if sock is not None:
389 sock.close()
390 exceptions.append(exc)
391 else:
392 break
393 else:
394 if len(exceptions) == 1:
395 raise exceptions[0]
396 else:
397 # If they all have the same str(), raise one.
398 model = str(exceptions[0])
399 if all(str(exc) == model for exc in exceptions):
400 raise exceptions[0]
401 # Raise a combined exception so the user can see all
402 # the various error messages.
403 raise OSError('Multiple exceptions: {}'.format(
404 ', '.join(str(exc) for exc in exceptions)))
405
406 elif sock is None:
407 raise ValueError(
408 'host and port was not specified and no sock specified')
409
410 sock.setblocking(False)
411
Yury Selivanovb057c522014-02-18 12:15:06 -0500412 transport, protocol = yield from self._create_connection_transport(
413 sock, protocol_factory, ssl, server_hostname)
414 return transport, protocol
415
416 @tasks.coroutine
417 def _create_connection_transport(self, sock, protocol_factory, ssl,
418 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 protocol = protocol_factory()
420 waiter = futures.Future(loop=self)
421 if ssl:
422 sslcontext = None if isinstance(ssl, bool) else ssl
423 transport = self._make_ssl_transport(
424 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700425 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 else:
427 transport = self._make_socket_transport(sock, protocol, waiter)
428
429 yield from waiter
430 return transport, protocol
431
432 @tasks.coroutine
433 def create_datagram_endpoint(self, protocol_factory,
434 local_addr=None, remote_addr=None, *,
435 family=0, proto=0, flags=0):
436 """Create datagram connection."""
437 if not (local_addr or remote_addr):
438 if family == 0:
439 raise ValueError('unexpected address family')
440 addr_pairs_info = (((family, proto), (None, None)),)
441 else:
442 # join addresss by (family, protocol)
443 addr_infos = collections.OrderedDict()
444 for idx, addr in ((0, local_addr), (1, remote_addr)):
445 if addr is not None:
446 assert isinstance(addr, tuple) and len(addr) == 2, (
447 '2-tuple is expected')
448
449 infos = yield from self.getaddrinfo(
450 *addr, family=family, type=socket.SOCK_DGRAM,
451 proto=proto, flags=flags)
452 if not infos:
453 raise OSError('getaddrinfo() returned empty list')
454
455 for fam, _, pro, _, address in infos:
456 key = (fam, pro)
457 if key not in addr_infos:
458 addr_infos[key] = [None, None]
459 addr_infos[key][idx] = address
460
461 # each addr has to have info for each (family, proto) pair
462 addr_pairs_info = [
463 (key, addr_pair) for key, addr_pair in addr_infos.items()
464 if not ((local_addr and addr_pair[0] is None) or
465 (remote_addr and addr_pair[1] is None))]
466
467 if not addr_pairs_info:
468 raise ValueError('can not get address information')
469
470 exceptions = []
471
472 for ((family, proto),
473 (local_address, remote_address)) in addr_pairs_info:
474 sock = None
475 r_addr = None
476 try:
477 sock = socket.socket(
478 family=family, type=socket.SOCK_DGRAM, proto=proto)
479 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
480 sock.setblocking(False)
481
482 if local_addr:
483 sock.bind(local_address)
484 if remote_addr:
485 yield from self.sock_connect(sock, remote_address)
486 r_addr = remote_address
487 except OSError as exc:
488 if sock is not None:
489 sock.close()
490 exceptions.append(exc)
491 else:
492 break
493 else:
494 raise exceptions[0]
495
496 protocol = protocol_factory()
497 transport = self._make_datagram_transport(sock, protocol, r_addr)
498 return transport, protocol
499
500 @tasks.coroutine
501 def create_server(self, protocol_factory, host=None, port=None,
502 *,
503 family=socket.AF_UNSPEC,
504 flags=socket.AI_PASSIVE,
505 sock=None,
506 backlog=100,
507 ssl=None,
508 reuse_address=None):
509 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700510 if isinstance(ssl, bool):
511 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512 if host is not None or port is not None:
513 if sock is not None:
514 raise ValueError(
515 'host/port and sock can not be specified at the same time')
516
517 AF_INET6 = getattr(socket, 'AF_INET6', 0)
518 if reuse_address is None:
519 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
520 sockets = []
521 if host == '':
522 host = None
523
524 infos = yield from self.getaddrinfo(
525 host, port, family=family,
526 type=socket.SOCK_STREAM, proto=0, flags=flags)
527 if not infos:
528 raise OSError('getaddrinfo() returned empty list')
529
530 completed = False
531 try:
532 for res in infos:
533 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700534 try:
535 sock = socket.socket(af, socktype, proto)
536 except socket.error:
537 # Assume it's a bad family/type/protocol combination.
538 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 sockets.append(sock)
540 if reuse_address:
541 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
542 True)
543 # Disable IPv4/IPv6 dual stack support (enabled by
544 # default on Linux) which makes a single socket
545 # listen on both address families.
546 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
547 sock.setsockopt(socket.IPPROTO_IPV6,
548 socket.IPV6_V6ONLY,
549 True)
550 try:
551 sock.bind(sa)
552 except OSError as err:
553 raise OSError(err.errno, 'error while attempting '
554 'to bind on address %r: %s'
555 % (sa, err.strerror.lower()))
556 completed = True
557 finally:
558 if not completed:
559 for sock in sockets:
560 sock.close()
561 else:
562 if sock is None:
563 raise ValueError(
564 'host and port was not specified and no sock specified')
565 sockets = [sock]
566
567 server = Server(self, sockets)
568 for sock in sockets:
569 sock.listen(backlog)
570 sock.setblocking(False)
571 self._start_serving(protocol_factory, sock, ssl, server)
572 return server
573
574 @tasks.coroutine
575 def connect_read_pipe(self, protocol_factory, pipe):
576 protocol = protocol_factory()
577 waiter = futures.Future(loop=self)
578 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
579 yield from waiter
580 return transport, protocol
581
582 @tasks.coroutine
583 def connect_write_pipe(self, protocol_factory, pipe):
584 protocol = protocol_factory()
585 waiter = futures.Future(loop=self)
586 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
587 yield from waiter
588 return transport, protocol
589
590 @tasks.coroutine
591 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
592 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
593 universal_newlines=False, shell=True, bufsize=0,
594 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100595 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800596 raise ValueError("cmd must be a string")
597 if universal_newlines:
598 raise ValueError("universal_newlines must be False")
599 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100600 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800601 if bufsize != 0:
602 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 protocol = protocol_factory()
604 transport = yield from self._make_subprocess_transport(
605 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
606 return transport, protocol
607
608 @tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500609 def subprocess_exec(self, protocol_factory, program, *args,
610 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
611 stderr=subprocess.PIPE, universal_newlines=False,
612 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800613 if universal_newlines:
614 raise ValueError("universal_newlines must be False")
615 if shell:
616 raise ValueError("shell must be False")
617 if bufsize != 0:
618 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100619 popen_args = (program,) + args
620 for arg in popen_args:
621 if not isinstance(arg, (str, bytes)):
622 raise TypeError("program arguments must be "
623 "a bytes or text string, not %s"
624 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 protocol = protocol_factory()
626 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500627 protocol, popen_args, False, stdin, stdout, stderr,
628 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 return transport, protocol
630
Yury Selivanov569efa22014-02-18 18:02:19 -0500631 def set_exception_handler(self, handler):
632 """Set handler as the new event loop exception handler.
633
634 If handler is None, the default exception handler will
635 be set.
636
637 If handler is a callable object, it should have a
638 matching signature to '(loop, context)', where 'loop'
639 will be a reference to the active event loop, 'context'
640 will be a dict object (see `call_exception_handler()`
641 documentation for details about context).
642 """
643 if handler is not None and not callable(handler):
644 raise TypeError('A callable object or None is expected, '
645 'got {!r}'.format(handler))
646 self._exception_handler = handler
647
648 def default_exception_handler(self, context):
649 """Default exception handler.
650
651 This is called when an exception occurs and no exception
652 handler is set, and can be called by a custom exception
653 handler that wants to defer to the default behavior.
654
655 context parameter has the same meaning as in
656 `call_exception_handler()`.
657 """
658 message = context.get('message')
659 if not message:
660 message = 'Unhandled exception in event loop'
661
662 exception = context.get('exception')
663 if exception is not None:
664 exc_info = (type(exception), exception, exception.__traceback__)
665 else:
666 exc_info = False
667
668 log_lines = [message]
669 for key in sorted(context):
670 if key in {'message', 'exception'}:
671 continue
672 log_lines.append('{}: {!r}'.format(key, context[key]))
673
674 logger.error('\n'.join(log_lines), exc_info=exc_info)
675
676 def call_exception_handler(self, context):
677 """Call the current event loop exception handler.
678
679 context is a dict object containing the following keys
680 (new keys maybe introduced later):
681 - 'message': Error message;
682 - 'exception' (optional): Exception object;
683 - 'future' (optional): Future instance;
684 - 'handle' (optional): Handle instance;
685 - 'protocol' (optional): Protocol instance;
686 - 'transport' (optional): Transport instance;
687 - 'socket' (optional): Socket instance.
688
689 Note: this method should not be overloaded in subclassed
690 event loops. For any custom exception handling, use
691 `set_exception_handler()` method.
692 """
693 if self._exception_handler is None:
694 try:
695 self.default_exception_handler(context)
696 except Exception:
697 # Second protection layer for unexpected errors
698 # in the default implementation, as well as for subclassed
699 # event loops with overloaded "default_exception_handler".
700 logger.error('Exception in default exception handler',
701 exc_info=True)
702 else:
703 try:
704 self._exception_handler(self, context)
705 except Exception as exc:
706 # Exception in the user set custom exception handler.
707 try:
708 # Let's try default handler.
709 self.default_exception_handler({
710 'message': 'Unhandled error in exception handler',
711 'exception': exc,
712 'context': context,
713 })
714 except Exception:
715 # Guard 'default_exception_handler' in case it's
716 # overloaded.
717 logger.error('Exception in default exception handler '
718 'while handling an unexpected error '
719 'in custom exception handler',
720 exc_info=True)
721
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722 def _add_callback(self, handle):
723 """Add a Handle to ready or scheduled."""
724 assert isinstance(handle, events.Handle), 'A Handle is required here'
725 if handle._cancelled:
726 return
727 if isinstance(handle, events.TimerHandle):
728 heapq.heappush(self._scheduled, handle)
729 else:
730 self._ready.append(handle)
731
732 def _add_callback_signalsafe(self, handle):
733 """Like _add_callback() but called from a signal handler."""
734 self._add_callback(handle)
735 self._write_to_self()
736
737 def _run_once(self):
738 """Run one full iteration of the event loop.
739
740 This calls all currently ready callbacks, polls for I/O,
741 schedules the resulting callbacks, and finally schedules
742 'call_later' callbacks.
743 """
744 # Remove delayed calls that were cancelled from head of queue.
745 while self._scheduled and self._scheduled[0]._cancelled:
746 heapq.heappop(self._scheduled)
747
748 timeout = None
749 if self._ready:
750 timeout = 0
751 elif self._scheduled:
752 # Compute the desired timeout.
753 when = self._scheduled[0]._when
754 deadline = max(0, when - self.time())
755 if timeout is None:
756 timeout = deadline
757 else:
758 timeout = min(timeout, deadline)
759
760 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100761 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100762 t0 = self.time()
763 event_list = self._selector.select(timeout)
764 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100765 if t1-t0 >= 1:
766 level = logging.INFO
767 else:
768 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100769 if timeout is not None:
770 logger.log(level, 'poll %.3f took %.3f seconds',
771 timeout, t1-t0)
772 else:
773 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700774 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100775 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776 self._process_events(event_list)
777
778 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100779 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700780 while self._scheduled:
781 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100782 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700783 break
784 handle = heapq.heappop(self._scheduled)
785 self._ready.append(handle)
786
787 # This is the only place where callbacks are actually *called*.
788 # All other places just add them to ready.
789 # Note: We run all currently scheduled callbacks, but not any
790 # callbacks scheduled by callbacks run this time around --
791 # they will be run the next time (after another I/O poll).
792 # Use an idiom that is threadsafe without using locks.
793 ntodo = len(self._ready)
794 for i in range(ntodo):
795 handle = self._ready.popleft()
796 if not handle._cancelled:
797 handle._run()
798 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100799
800 def get_debug(self):
801 return self._debug
802
803 def set_debug(self, enabled):
804 self._debug = enabled