blob: 1615ecbf42add2a52206330ae1671aaa40a2d72c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010048 if family == socket.AF_INET:
49 host, port = address
50 elif family == socket.AF_INET6:
51 host, port, flow_info, scope_id = address
52 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010053 return
54
Victor Stinner1b0580b2014-02-13 09:24:37 +010055 type_mask = 0
56 if hasattr(socket, 'SOCK_NONBLOCK'):
57 type_mask |= socket.SOCK_NONBLOCK
58 if hasattr(socket, 'SOCK_CLOEXEC'):
59 type_mask |= socket.SOCK_CLOEXEC
60 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
61 # already resolved.
62 try:
63 socket.getaddrinfo(host, port,
64 family=family,
65 type=(sock.type & ~type_mask),
66 proto=sock.proto,
67 flags=socket.AI_NUMERICHOST)
68 except socket.gaierror as err:
69 raise ValueError("address must be resolved (IP address), got %r: %s"
70 % (address, err))
71
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072def _raise_stop_error(*args):
73 raise _StopError
74
75
76class Server(events.AbstractServer):
77
78 def __init__(self, loop, sockets):
79 self.loop = loop
80 self.sockets = sockets
81 self.active_count = 0
82 self.waiters = []
83
84 def attach(self, transport):
85 assert self.sockets is not None
86 self.active_count += 1
87
88 def detach(self, transport):
89 assert self.active_count > 0
90 self.active_count -= 1
91 if self.active_count == 0 and self.sockets is None:
92 self._wakeup()
93
94 def close(self):
95 sockets = self.sockets
96 if sockets is not None:
97 self.sockets = None
98 for sock in sockets:
99 self.loop._stop_serving(sock)
100 if self.active_count == 0:
101 self._wakeup()
102
103 def _wakeup(self):
104 waiters = self.waiters
105 self.waiters = None
106 for waiter in waiters:
107 if not waiter.done():
108 waiter.set_result(waiter)
109
110 @tasks.coroutine
111 def wait_closed(self):
112 if self.sockets is None or self.waiters is None:
113 return
114 waiter = futures.Future(loop=self.loop)
115 self.waiters.append(waiter)
116 yield from waiter
117
118
119class BaseEventLoop(events.AbstractEventLoop):
120
121 def __init__(self):
122 self._ready = collections.deque()
123 self._scheduled = []
124 self._default_executor = None
125 self._internal_fds = 0
126 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100127 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500128 self._exception_handler = None
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100129 self._debug = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def _make_socket_transport(self, sock, protocol, waiter=None, *,
132 extra=None, server=None):
133 """Create socket transport."""
134 raise NotImplementedError
135
136 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
137 server_side=False, server_hostname=None,
138 extra=None, server=None):
139 """Create SSL transport."""
140 raise NotImplementedError
141
142 def _make_datagram_transport(self, sock, protocol,
143 address=None, extra=None):
144 """Create datagram transport."""
145 raise NotImplementedError
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 """Create read pipe transport."""
150 raise NotImplementedError
151
152 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
153 extra=None):
154 """Create write pipe transport."""
155 raise NotImplementedError
156
157 @tasks.coroutine
158 def _make_subprocess_transport(self, protocol, args, shell,
159 stdin, stdout, stderr, bufsize,
160 extra=None, **kwargs):
161 """Create subprocess transport."""
162 raise NotImplementedError
163
164 def _read_from_self(self):
165 """XXX"""
166 raise NotImplementedError
167
168 def _write_to_self(self):
169 """XXX"""
170 raise NotImplementedError
171
172 def _process_events(self, event_list):
173 """Process selector events."""
174 raise NotImplementedError
175
176 def run_forever(self):
177 """Run until stop() is called."""
178 if self._running:
179 raise RuntimeError('Event loop is running.')
180 self._running = True
181 try:
182 while True:
183 try:
184 self._run_once()
185 except _StopError:
186 break
187 finally:
188 self._running = False
189
190 def run_until_complete(self, future):
191 """Run until the Future is done.
192
193 If the argument is a coroutine, it is wrapped in a Task.
194
195 XXX TBD: It would be disastrous to call run_until_complete()
196 with the same coroutine twice -- it would wrap it in two
197 different Tasks and that can't be good.
198
199 Return the Future's result, or raise its exception.
200 """
201 future = tasks.async(future, loop=self)
202 future.add_done_callback(_raise_stop_error)
203 self.run_forever()
204 future.remove_done_callback(_raise_stop_error)
205 if not future.done():
206 raise RuntimeError('Event loop stopped before Future completed.')
207
208 return future.result()
209
210 def stop(self):
211 """Stop running the event loop.
212
213 Every callback scheduled before stop() is called will run.
214 Callback scheduled after stop() is called won't. However,
215 those callbacks will run if run() is called again later.
216 """
217 self.call_soon(_raise_stop_error)
218
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200219 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700220 """Close the event loop.
221
222 This clears the queues and shuts down the executor,
223 but does not wait for the executor to finish.
224 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200225 self._ready.clear()
226 self._scheduled.clear()
227 executor = self._default_executor
228 if executor is not None:
229 self._default_executor = None
230 executor.shutdown(wait=False)
231
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 def is_running(self):
233 """Returns running status of event loop."""
234 return self._running
235
236 def time(self):
237 """Return the time according to the event loop's clock."""
238 return time.monotonic()
239
240 def call_later(self, delay, callback, *args):
241 """Arrange for a callback to be called at a given time.
242
243 Return a Handle: an opaque object with a cancel() method that
244 can be used to cancel the call.
245
246 The delay can be an int or float, expressed in seconds. It is
247 always a relative time.
248
249 Each callback will be called exactly once. If two callbacks
250 are scheduled for exactly the same time, it undefined which
251 will be called first.
252
253 Any positional arguments after the callback will be passed to
254 the callback when it is called.
255 """
256 return self.call_at(self.time() + delay, callback, *args)
257
258 def call_at(self, when, callback, *args):
259 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100260 if tasks.iscoroutinefunction(callback):
261 raise TypeError("coroutines cannot be used with call_at()")
Yury Selivanov569efa22014-02-18 18:02:19 -0500262 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 heapq.heappush(self._scheduled, timer)
264 return timer
265
266 def call_soon(self, callback, *args):
267 """Arrange for a callback to be called as soon as possible.
268
269 This operates as a FIFO queue, callbacks are called in the
270 order in which they are registered. Each callback will be
271 called exactly once.
272
273 Any positional arguments after the callback will be passed to
274 the callback when it is called.
275 """
Victor Stinner9af4a242014-02-11 11:34:30 +0100276 if tasks.iscoroutinefunction(callback):
277 raise TypeError("coroutines cannot be used with call_soon()")
Yury Selivanov569efa22014-02-18 18:02:19 -0500278 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 self._ready.append(handle)
280 return handle
281
282 def call_soon_threadsafe(self, callback, *args):
283 """XXX"""
284 handle = self.call_soon(callback, *args)
285 self._write_to_self()
286 return handle
287
288 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100289 if tasks.iscoroutinefunction(callback):
290 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 if isinstance(callback, events.Handle):
292 assert not args
293 assert not isinstance(callback, events.TimerHandle)
294 if callback._cancelled:
295 f = futures.Future(loop=self)
296 f.set_result(None)
297 return f
298 callback, args = callback._callback, callback._args
299 if executor is None:
300 executor = self._default_executor
301 if executor is None:
302 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
303 self._default_executor = executor
304 return futures.wrap_future(executor.submit(callback, *args), loop=self)
305
306 def set_default_executor(self, executor):
307 self._default_executor = executor
308
309 def getaddrinfo(self, host, port, *,
310 family=0, type=0, proto=0, flags=0):
311 return self.run_in_executor(None, socket.getaddrinfo,
312 host, port, family, type, proto, flags)
313
314 def getnameinfo(self, sockaddr, flags=0):
315 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
316
317 @tasks.coroutine
318 def create_connection(self, protocol_factory, host=None, port=None, *,
319 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700320 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700322 if server_hostname is not None and not ssl:
323 raise ValueError('server_hostname is only meaningful with ssl')
324
325 if server_hostname is None and ssl:
326 # Use host as default for server_hostname. It is an error
327 # if host is empty or not set, e.g. when an
328 # already-connected socket was passed or when only a port
329 # is given. To avoid this error, you can pass
330 # server_hostname='' -- this will bypass the hostname
331 # check. (This also means that if host is a numeric
332 # IP/IPv6 address, we will attempt to verify that exact
333 # address; this will probably fail, but it is possible to
334 # create a certificate for a specific IP address, so we
335 # don't judge it here.)
336 if not host:
337 raise ValueError('You must set server_hostname '
338 'when using ssl without a host')
339 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700340
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 if host is not None or port is not None:
342 if sock is not None:
343 raise ValueError(
344 'host/port and sock can not be specified at the same time')
345
346 f1 = self.getaddrinfo(
347 host, port, family=family,
348 type=socket.SOCK_STREAM, proto=proto, flags=flags)
349 fs = [f1]
350 if local_addr is not None:
351 f2 = self.getaddrinfo(
352 *local_addr, family=family,
353 type=socket.SOCK_STREAM, proto=proto, flags=flags)
354 fs.append(f2)
355 else:
356 f2 = None
357
358 yield from tasks.wait(fs, loop=self)
359
360 infos = f1.result()
361 if not infos:
362 raise OSError('getaddrinfo() returned empty list')
363 if f2 is not None:
364 laddr_infos = f2.result()
365 if not laddr_infos:
366 raise OSError('getaddrinfo() returned empty list')
367
368 exceptions = []
369 for family, type, proto, cname, address in infos:
370 try:
371 sock = socket.socket(family=family, type=type, proto=proto)
372 sock.setblocking(False)
373 if f2 is not None:
374 for _, _, _, _, laddr in laddr_infos:
375 try:
376 sock.bind(laddr)
377 break
378 except OSError as exc:
379 exc = OSError(
380 exc.errno, 'error while '
381 'attempting to bind on address '
382 '{!r}: {}'.format(
383 laddr, exc.strerror.lower()))
384 exceptions.append(exc)
385 else:
386 sock.close()
387 sock = None
388 continue
389 yield from self.sock_connect(sock, address)
390 except OSError as exc:
391 if sock is not None:
392 sock.close()
393 exceptions.append(exc)
394 else:
395 break
396 else:
397 if len(exceptions) == 1:
398 raise exceptions[0]
399 else:
400 # If they all have the same str(), raise one.
401 model = str(exceptions[0])
402 if all(str(exc) == model for exc in exceptions):
403 raise exceptions[0]
404 # Raise a combined exception so the user can see all
405 # the various error messages.
406 raise OSError('Multiple exceptions: {}'.format(
407 ', '.join(str(exc) for exc in exceptions)))
408
409 elif sock is None:
410 raise ValueError(
411 'host and port was not specified and no sock specified')
412
413 sock.setblocking(False)
414
Yury Selivanovb057c522014-02-18 12:15:06 -0500415 transport, protocol = yield from self._create_connection_transport(
416 sock, protocol_factory, ssl, server_hostname)
417 return transport, protocol
418
419 @tasks.coroutine
420 def _create_connection_transport(self, sock, protocol_factory, ssl,
421 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 protocol = protocol_factory()
423 waiter = futures.Future(loop=self)
424 if ssl:
425 sslcontext = None if isinstance(ssl, bool) else ssl
426 transport = self._make_ssl_transport(
427 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700428 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 else:
430 transport = self._make_socket_transport(sock, protocol, waiter)
431
432 yield from waiter
433 return transport, protocol
434
435 @tasks.coroutine
436 def create_datagram_endpoint(self, protocol_factory,
437 local_addr=None, remote_addr=None, *,
438 family=0, proto=0, flags=0):
439 """Create datagram connection."""
440 if not (local_addr or remote_addr):
441 if family == 0:
442 raise ValueError('unexpected address family')
443 addr_pairs_info = (((family, proto), (None, None)),)
444 else:
445 # join addresss by (family, protocol)
446 addr_infos = collections.OrderedDict()
447 for idx, addr in ((0, local_addr), (1, remote_addr)):
448 if addr is not None:
449 assert isinstance(addr, tuple) and len(addr) == 2, (
450 '2-tuple is expected')
451
452 infos = yield from self.getaddrinfo(
453 *addr, family=family, type=socket.SOCK_DGRAM,
454 proto=proto, flags=flags)
455 if not infos:
456 raise OSError('getaddrinfo() returned empty list')
457
458 for fam, _, pro, _, address in infos:
459 key = (fam, pro)
460 if key not in addr_infos:
461 addr_infos[key] = [None, None]
462 addr_infos[key][idx] = address
463
464 # each addr has to have info for each (family, proto) pair
465 addr_pairs_info = [
466 (key, addr_pair) for key, addr_pair in addr_infos.items()
467 if not ((local_addr and addr_pair[0] is None) or
468 (remote_addr and addr_pair[1] is None))]
469
470 if not addr_pairs_info:
471 raise ValueError('can not get address information')
472
473 exceptions = []
474
475 for ((family, proto),
476 (local_address, remote_address)) in addr_pairs_info:
477 sock = None
478 r_addr = None
479 try:
480 sock = socket.socket(
481 family=family, type=socket.SOCK_DGRAM, proto=proto)
482 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
483 sock.setblocking(False)
484
485 if local_addr:
486 sock.bind(local_address)
487 if remote_addr:
488 yield from self.sock_connect(sock, remote_address)
489 r_addr = remote_address
490 except OSError as exc:
491 if sock is not None:
492 sock.close()
493 exceptions.append(exc)
494 else:
495 break
496 else:
497 raise exceptions[0]
498
499 protocol = protocol_factory()
500 transport = self._make_datagram_transport(sock, protocol, r_addr)
501 return transport, protocol
502
503 @tasks.coroutine
504 def create_server(self, protocol_factory, host=None, port=None,
505 *,
506 family=socket.AF_UNSPEC,
507 flags=socket.AI_PASSIVE,
508 sock=None,
509 backlog=100,
510 ssl=None,
511 reuse_address=None):
512 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700513 if isinstance(ssl, bool):
514 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 if host is not None or port is not None:
516 if sock is not None:
517 raise ValueError(
518 'host/port and sock can not be specified at the same time')
519
520 AF_INET6 = getattr(socket, 'AF_INET6', 0)
521 if reuse_address is None:
522 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
523 sockets = []
524 if host == '':
525 host = None
526
527 infos = yield from self.getaddrinfo(
528 host, port, family=family,
529 type=socket.SOCK_STREAM, proto=0, flags=flags)
530 if not infos:
531 raise OSError('getaddrinfo() returned empty list')
532
533 completed = False
534 try:
535 for res in infos:
536 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700537 try:
538 sock = socket.socket(af, socktype, proto)
539 except socket.error:
540 # Assume it's a bad family/type/protocol combination.
541 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 sockets.append(sock)
543 if reuse_address:
544 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
545 True)
546 # Disable IPv4/IPv6 dual stack support (enabled by
547 # default on Linux) which makes a single socket
548 # listen on both address families.
549 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
550 sock.setsockopt(socket.IPPROTO_IPV6,
551 socket.IPV6_V6ONLY,
552 True)
553 try:
554 sock.bind(sa)
555 except OSError as err:
556 raise OSError(err.errno, 'error while attempting '
557 'to bind on address %r: %s'
558 % (sa, err.strerror.lower()))
559 completed = True
560 finally:
561 if not completed:
562 for sock in sockets:
563 sock.close()
564 else:
565 if sock is None:
566 raise ValueError(
567 'host and port was not specified and no sock specified')
568 sockets = [sock]
569
570 server = Server(self, sockets)
571 for sock in sockets:
572 sock.listen(backlog)
573 sock.setblocking(False)
574 self._start_serving(protocol_factory, sock, ssl, server)
575 return server
576
577 @tasks.coroutine
578 def connect_read_pipe(self, protocol_factory, pipe):
579 protocol = protocol_factory()
580 waiter = futures.Future(loop=self)
581 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
582 yield from waiter
583 return transport, protocol
584
585 @tasks.coroutine
586 def connect_write_pipe(self, protocol_factory, pipe):
587 protocol = protocol_factory()
588 waiter = futures.Future(loop=self)
589 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
590 yield from waiter
591 return transport, protocol
592
593 @tasks.coroutine
594 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
595 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
596 universal_newlines=False, shell=True, bufsize=0,
597 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100598 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800599 raise ValueError("cmd must be a string")
600 if universal_newlines:
601 raise ValueError("universal_newlines must be False")
602 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100603 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800604 if bufsize != 0:
605 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 protocol = protocol_factory()
607 transport = yield from self._make_subprocess_transport(
608 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
609 return transport, protocol
610
611 @tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500612 def subprocess_exec(self, protocol_factory, program, *args,
613 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
614 stderr=subprocess.PIPE, universal_newlines=False,
615 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800616 if universal_newlines:
617 raise ValueError("universal_newlines must be False")
618 if shell:
619 raise ValueError("shell must be False")
620 if bufsize != 0:
621 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100622 popen_args = (program,) + args
623 for arg in popen_args:
624 if not isinstance(arg, (str, bytes)):
625 raise TypeError("program arguments must be "
626 "a bytes or text string, not %s"
627 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 protocol = protocol_factory()
629 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500630 protocol, popen_args, False, stdin, stdout, stderr,
631 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 return transport, protocol
633
Yury Selivanov569efa22014-02-18 18:02:19 -0500634 def set_exception_handler(self, handler):
635 """Set handler as the new event loop exception handler.
636
637 If handler is None, the default exception handler will
638 be set.
639
640 If handler is a callable object, it should have a
641 matching signature to '(loop, context)', where 'loop'
642 will be a reference to the active event loop, 'context'
643 will be a dict object (see `call_exception_handler()`
644 documentation for details about context).
645 """
646 if handler is not None and not callable(handler):
647 raise TypeError('A callable object or None is expected, '
648 'got {!r}'.format(handler))
649 self._exception_handler = handler
650
651 def default_exception_handler(self, context):
652 """Default exception handler.
653
654 This is called when an exception occurs and no exception
655 handler is set, and can be called by a custom exception
656 handler that wants to defer to the default behavior.
657
658 context parameter has the same meaning as in
659 `call_exception_handler()`.
660 """
661 message = context.get('message')
662 if not message:
663 message = 'Unhandled exception in event loop'
664
665 exception = context.get('exception')
666 if exception is not None:
667 exc_info = (type(exception), exception, exception.__traceback__)
668 else:
669 exc_info = False
670
671 log_lines = [message]
672 for key in sorted(context):
673 if key in {'message', 'exception'}:
674 continue
675 log_lines.append('{}: {!r}'.format(key, context[key]))
676
677 logger.error('\n'.join(log_lines), exc_info=exc_info)
678
679 def call_exception_handler(self, context):
680 """Call the current event loop exception handler.
681
682 context is a dict object containing the following keys
683 (new keys maybe introduced later):
684 - 'message': Error message;
685 - 'exception' (optional): Exception object;
686 - 'future' (optional): Future instance;
687 - 'handle' (optional): Handle instance;
688 - 'protocol' (optional): Protocol instance;
689 - 'transport' (optional): Transport instance;
690 - 'socket' (optional): Socket instance.
691
692 Note: this method should not be overloaded in subclassed
693 event loops. For any custom exception handling, use
694 `set_exception_handler()` method.
695 """
696 if self._exception_handler is None:
697 try:
698 self.default_exception_handler(context)
699 except Exception:
700 # Second protection layer for unexpected errors
701 # in the default implementation, as well as for subclassed
702 # event loops with overloaded "default_exception_handler".
703 logger.error('Exception in default exception handler',
704 exc_info=True)
705 else:
706 try:
707 self._exception_handler(self, context)
708 except Exception as exc:
709 # Exception in the user set custom exception handler.
710 try:
711 # Let's try default handler.
712 self.default_exception_handler({
713 'message': 'Unhandled error in exception handler',
714 'exception': exc,
715 'context': context,
716 })
717 except Exception:
718 # Guard 'default_exception_handler' in case it's
719 # overloaded.
720 logger.error('Exception in default exception handler '
721 'while handling an unexpected error '
722 'in custom exception handler',
723 exc_info=True)
724
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 def _add_callback(self, handle):
726 """Add a Handle to ready or scheduled."""
727 assert isinstance(handle, events.Handle), 'A Handle is required here'
728 if handle._cancelled:
729 return
730 if isinstance(handle, events.TimerHandle):
731 heapq.heappush(self._scheduled, handle)
732 else:
733 self._ready.append(handle)
734
735 def _add_callback_signalsafe(self, handle):
736 """Like _add_callback() but called from a signal handler."""
737 self._add_callback(handle)
738 self._write_to_self()
739
740 def _run_once(self):
741 """Run one full iteration of the event loop.
742
743 This calls all currently ready callbacks, polls for I/O,
744 schedules the resulting callbacks, and finally schedules
745 'call_later' callbacks.
746 """
747 # Remove delayed calls that were cancelled from head of queue.
748 while self._scheduled and self._scheduled[0]._cancelled:
749 heapq.heappop(self._scheduled)
750
751 timeout = None
752 if self._ready:
753 timeout = 0
754 elif self._scheduled:
755 # Compute the desired timeout.
756 when = self._scheduled[0]._when
757 deadline = max(0, when - self.time())
758 if timeout is None:
759 timeout = deadline
760 else:
761 timeout = min(timeout, deadline)
762
763 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100764 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100765 t0 = self.time()
766 event_list = self._selector.select(timeout)
767 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100768 if t1-t0 >= 1:
769 level = logging.INFO
770 else:
771 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100772 if timeout is not None:
773 logger.log(level, 'poll %.3f took %.3f seconds',
774 timeout, t1-t0)
775 else:
776 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100778 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700779 self._process_events(event_list)
780
781 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100782 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700783 while self._scheduled:
784 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100785 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700786 break
787 handle = heapq.heappop(self._scheduled)
788 self._ready.append(handle)
789
790 # This is the only place where callbacks are actually *called*.
791 # All other places just add them to ready.
792 # Note: We run all currently scheduled callbacks, but not any
793 # callbacks scheduled by callbacks run this time around --
794 # they will be run the next time (after another I/O poll).
795 # Use an idiom that is threadsafe without using locks.
796 ntodo = len(self._ready)
797 for i in range(ntodo):
798 handle = self._ready.popleft()
799 if not handle._cancelled:
800 handle._run()
801 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100802
803 def get_debug(self):
804 return self._debug
805
806 def set_debug(self, enabled):
807 self._debug = enabled