blob: d2bdc07d36c23b1d963f088a8e8d44ec75481055 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010048 if family == socket.AF_INET:
49 host, port = address
50 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010051 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010052 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010053 return
54
Victor Stinner1b0580b2014-02-13 09:24:37 +010055 type_mask = 0
56 if hasattr(socket, 'SOCK_NONBLOCK'):
57 type_mask |= socket.SOCK_NONBLOCK
58 if hasattr(socket, 'SOCK_CLOEXEC'):
59 type_mask |= socket.SOCK_CLOEXEC
60 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
61 # already resolved.
62 try:
63 socket.getaddrinfo(host, port,
64 family=family,
65 type=(sock.type & ~type_mask),
66 proto=sock.proto,
67 flags=socket.AI_NUMERICHOST)
68 except socket.gaierror as err:
69 raise ValueError("address must be resolved (IP address), got %r: %s"
70 % (address, err))
71
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072def _raise_stop_error(*args):
73 raise _StopError
74
75
76class Server(events.AbstractServer):
77
78 def __init__(self, loop, sockets):
79 self.loop = loop
80 self.sockets = sockets
81 self.active_count = 0
82 self.waiters = []
83
84 def attach(self, transport):
85 assert self.sockets is not None
86 self.active_count += 1
87
88 def detach(self, transport):
89 assert self.active_count > 0
90 self.active_count -= 1
91 if self.active_count == 0 and self.sockets is None:
92 self._wakeup()
93
94 def close(self):
95 sockets = self.sockets
96 if sockets is not None:
97 self.sockets = None
98 for sock in sockets:
99 self.loop._stop_serving(sock)
100 if self.active_count == 0:
101 self._wakeup()
102
103 def _wakeup(self):
104 waiters = self.waiters
105 self.waiters = None
106 for waiter in waiters:
107 if not waiter.done():
108 waiter.set_result(waiter)
109
110 @tasks.coroutine
111 def wait_closed(self):
112 if self.sockets is None or self.waiters is None:
113 return
114 waiter = futures.Future(loop=self.loop)
115 self.waiters.append(waiter)
116 yield from waiter
117
118
119class BaseEventLoop(events.AbstractEventLoop):
120
121 def __init__(self):
122 self._ready = collections.deque()
123 self._scheduled = []
124 self._default_executor = None
125 self._internal_fds = 0
126 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100127 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500128 self._exception_handler = None
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100129 self._debug = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def _make_socket_transport(self, sock, protocol, waiter=None, *,
132 extra=None, server=None):
133 """Create socket transport."""
134 raise NotImplementedError
135
136 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
137 server_side=False, server_hostname=None,
138 extra=None, server=None):
139 """Create SSL transport."""
140 raise NotImplementedError
141
142 def _make_datagram_transport(self, sock, protocol,
143 address=None, extra=None):
144 """Create datagram transport."""
145 raise NotImplementedError
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 """Create read pipe transport."""
150 raise NotImplementedError
151
152 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
153 extra=None):
154 """Create write pipe transport."""
155 raise NotImplementedError
156
157 @tasks.coroutine
158 def _make_subprocess_transport(self, protocol, args, shell,
159 stdin, stdout, stderr, bufsize,
160 extra=None, **kwargs):
161 """Create subprocess transport."""
162 raise NotImplementedError
163
164 def _read_from_self(self):
165 """XXX"""
166 raise NotImplementedError
167
168 def _write_to_self(self):
169 """XXX"""
170 raise NotImplementedError
171
172 def _process_events(self, event_list):
173 """Process selector events."""
174 raise NotImplementedError
175
176 def run_forever(self):
177 """Run until stop() is called."""
178 if self._running:
179 raise RuntimeError('Event loop is running.')
180 self._running = True
181 try:
182 while True:
183 try:
184 self._run_once()
185 except _StopError:
186 break
187 finally:
188 self._running = False
189
190 def run_until_complete(self, future):
191 """Run until the Future is done.
192
193 If the argument is a coroutine, it is wrapped in a Task.
194
195 XXX TBD: It would be disastrous to call run_until_complete()
196 with the same coroutine twice -- it would wrap it in two
197 different Tasks and that can't be good.
198
199 Return the Future's result, or raise its exception.
200 """
201 future = tasks.async(future, loop=self)
202 future.add_done_callback(_raise_stop_error)
203 self.run_forever()
204 future.remove_done_callback(_raise_stop_error)
205 if not future.done():
206 raise RuntimeError('Event loop stopped before Future completed.')
207
208 return future.result()
209
210 def stop(self):
211 """Stop running the event loop.
212
213 Every callback scheduled before stop() is called will run.
214 Callback scheduled after stop() is called won't. However,
215 those callbacks will run if run() is called again later.
216 """
217 self.call_soon(_raise_stop_error)
218
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200219 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700220 """Close the event loop.
221
222 This clears the queues and shuts down the executor,
223 but does not wait for the executor to finish.
224 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200225 self._ready.clear()
226 self._scheduled.clear()
227 executor = self._default_executor
228 if executor is not None:
229 self._default_executor = None
230 executor.shutdown(wait=False)
231
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 def is_running(self):
233 """Returns running status of event loop."""
234 return self._running
235
236 def time(self):
237 """Return the time according to the event loop's clock."""
238 return time.monotonic()
239
240 def call_later(self, delay, callback, *args):
241 """Arrange for a callback to be called at a given time.
242
243 Return a Handle: an opaque object with a cancel() method that
244 can be used to cancel the call.
245
246 The delay can be an int or float, expressed in seconds. It is
247 always a relative time.
248
249 Each callback will be called exactly once. If two callbacks
250 are scheduled for exactly the same time, it undefined which
251 will be called first.
252
253 Any positional arguments after the callback will be passed to
254 the callback when it is called.
255 """
256 return self.call_at(self.time() + delay, callback, *args)
257
258 def call_at(self, when, callback, *args):
259 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100260 if tasks.iscoroutinefunction(callback):
261 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100262 if self._debug:
263 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500264 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 heapq.heappush(self._scheduled, timer)
266 return timer
267
268 def call_soon(self, callback, *args):
269 """Arrange for a callback to be called as soon as possible.
270
271 This operates as a FIFO queue, callbacks are called in the
272 order in which they are registered. Each callback will be
273 called exactly once.
274
275 Any positional arguments after the callback will be passed to
276 the callback when it is called.
277 """
Victor Stinner93569c22014-03-21 10:00:52 +0100278 return self._call_soon(callback, args, check_loop=True)
279
280 def _call_soon(self, callback, args, check_loop):
Victor Stinner9af4a242014-02-11 11:34:30 +0100281 if tasks.iscoroutinefunction(callback):
282 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100283 if self._debug and check_loop:
284 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500285 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 self._ready.append(handle)
287 return handle
288
Victor Stinner93569c22014-03-21 10:00:52 +0100289 def _assert_is_current_event_loop(self):
290 """Asserts that this event loop is the current event loop.
291
292 Non-threadsafe methods of this class make this assumption and will
293 likely behave incorrectly when the assumption is violated.
294
295 Should only be called when (self._debug == True). The caller is
296 responsible for checking this condition for performance reasons.
297 """
298 if events.get_event_loop() is not self:
299 raise RuntimeError(
300 "non-threadsafe operation invoked on an event loop other "
301 "than the current one")
302
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 def call_soon_threadsafe(self, callback, *args):
304 """XXX"""
Victor Stinner93569c22014-03-21 10:00:52 +0100305 handle = self._call_soon(callback, args, check_loop=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 self._write_to_self()
307 return handle
308
309 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100310 if tasks.iscoroutinefunction(callback):
311 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 if isinstance(callback, events.Handle):
313 assert not args
314 assert not isinstance(callback, events.TimerHandle)
315 if callback._cancelled:
316 f = futures.Future(loop=self)
317 f.set_result(None)
318 return f
319 callback, args = callback._callback, callback._args
320 if executor is None:
321 executor = self._default_executor
322 if executor is None:
323 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
324 self._default_executor = executor
325 return futures.wrap_future(executor.submit(callback, *args), loop=self)
326
327 def set_default_executor(self, executor):
328 self._default_executor = executor
329
330 def getaddrinfo(self, host, port, *,
331 family=0, type=0, proto=0, flags=0):
332 return self.run_in_executor(None, socket.getaddrinfo,
333 host, port, family, type, proto, flags)
334
335 def getnameinfo(self, sockaddr, flags=0):
336 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
337
338 @tasks.coroutine
339 def create_connection(self, protocol_factory, host=None, port=None, *,
340 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700341 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700343 if server_hostname is not None and not ssl:
344 raise ValueError('server_hostname is only meaningful with ssl')
345
346 if server_hostname is None and ssl:
347 # Use host as default for server_hostname. It is an error
348 # if host is empty or not set, e.g. when an
349 # already-connected socket was passed or when only a port
350 # is given. To avoid this error, you can pass
351 # server_hostname='' -- this will bypass the hostname
352 # check. (This also means that if host is a numeric
353 # IP/IPv6 address, we will attempt to verify that exact
354 # address; this will probably fail, but it is possible to
355 # create a certificate for a specific IP address, so we
356 # don't judge it here.)
357 if not host:
358 raise ValueError('You must set server_hostname '
359 'when using ssl without a host')
360 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700361
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 if host is not None or port is not None:
363 if sock is not None:
364 raise ValueError(
365 'host/port and sock can not be specified at the same time')
366
367 f1 = self.getaddrinfo(
368 host, port, family=family,
369 type=socket.SOCK_STREAM, proto=proto, flags=flags)
370 fs = [f1]
371 if local_addr is not None:
372 f2 = self.getaddrinfo(
373 *local_addr, family=family,
374 type=socket.SOCK_STREAM, proto=proto, flags=flags)
375 fs.append(f2)
376 else:
377 f2 = None
378
379 yield from tasks.wait(fs, loop=self)
380
381 infos = f1.result()
382 if not infos:
383 raise OSError('getaddrinfo() returned empty list')
384 if f2 is not None:
385 laddr_infos = f2.result()
386 if not laddr_infos:
387 raise OSError('getaddrinfo() returned empty list')
388
389 exceptions = []
390 for family, type, proto, cname, address in infos:
391 try:
392 sock = socket.socket(family=family, type=type, proto=proto)
393 sock.setblocking(False)
394 if f2 is not None:
395 for _, _, _, _, laddr in laddr_infos:
396 try:
397 sock.bind(laddr)
398 break
399 except OSError as exc:
400 exc = OSError(
401 exc.errno, 'error while '
402 'attempting to bind on address '
403 '{!r}: {}'.format(
404 laddr, exc.strerror.lower()))
405 exceptions.append(exc)
406 else:
407 sock.close()
408 sock = None
409 continue
410 yield from self.sock_connect(sock, address)
411 except OSError as exc:
412 if sock is not None:
413 sock.close()
414 exceptions.append(exc)
415 else:
416 break
417 else:
418 if len(exceptions) == 1:
419 raise exceptions[0]
420 else:
421 # If they all have the same str(), raise one.
422 model = str(exceptions[0])
423 if all(str(exc) == model for exc in exceptions):
424 raise exceptions[0]
425 # Raise a combined exception so the user can see all
426 # the various error messages.
427 raise OSError('Multiple exceptions: {}'.format(
428 ', '.join(str(exc) for exc in exceptions)))
429
430 elif sock is None:
431 raise ValueError(
432 'host and port was not specified and no sock specified')
433
434 sock.setblocking(False)
435
Yury Selivanovb057c522014-02-18 12:15:06 -0500436 transport, protocol = yield from self._create_connection_transport(
437 sock, protocol_factory, ssl, server_hostname)
438 return transport, protocol
439
440 @tasks.coroutine
441 def _create_connection_transport(self, sock, protocol_factory, ssl,
442 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 protocol = protocol_factory()
444 waiter = futures.Future(loop=self)
445 if ssl:
446 sslcontext = None if isinstance(ssl, bool) else ssl
447 transport = self._make_ssl_transport(
448 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700449 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 else:
451 transport = self._make_socket_transport(sock, protocol, waiter)
452
453 yield from waiter
454 return transport, protocol
455
456 @tasks.coroutine
457 def create_datagram_endpoint(self, protocol_factory,
458 local_addr=None, remote_addr=None, *,
459 family=0, proto=0, flags=0):
460 """Create datagram connection."""
461 if not (local_addr or remote_addr):
462 if family == 0:
463 raise ValueError('unexpected address family')
464 addr_pairs_info = (((family, proto), (None, None)),)
465 else:
466 # join addresss by (family, protocol)
467 addr_infos = collections.OrderedDict()
468 for idx, addr in ((0, local_addr), (1, remote_addr)):
469 if addr is not None:
470 assert isinstance(addr, tuple) and len(addr) == 2, (
471 '2-tuple is expected')
472
473 infos = yield from self.getaddrinfo(
474 *addr, family=family, type=socket.SOCK_DGRAM,
475 proto=proto, flags=flags)
476 if not infos:
477 raise OSError('getaddrinfo() returned empty list')
478
479 for fam, _, pro, _, address in infos:
480 key = (fam, pro)
481 if key not in addr_infos:
482 addr_infos[key] = [None, None]
483 addr_infos[key][idx] = address
484
485 # each addr has to have info for each (family, proto) pair
486 addr_pairs_info = [
487 (key, addr_pair) for key, addr_pair in addr_infos.items()
488 if not ((local_addr and addr_pair[0] is None) or
489 (remote_addr and addr_pair[1] is None))]
490
491 if not addr_pairs_info:
492 raise ValueError('can not get address information')
493
494 exceptions = []
495
496 for ((family, proto),
497 (local_address, remote_address)) in addr_pairs_info:
498 sock = None
499 r_addr = None
500 try:
501 sock = socket.socket(
502 family=family, type=socket.SOCK_DGRAM, proto=proto)
503 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
504 sock.setblocking(False)
505
506 if local_addr:
507 sock.bind(local_address)
508 if remote_addr:
509 yield from self.sock_connect(sock, remote_address)
510 r_addr = remote_address
511 except OSError as exc:
512 if sock is not None:
513 sock.close()
514 exceptions.append(exc)
515 else:
516 break
517 else:
518 raise exceptions[0]
519
520 protocol = protocol_factory()
521 transport = self._make_datagram_transport(sock, protocol, r_addr)
522 return transport, protocol
523
524 @tasks.coroutine
525 def create_server(self, protocol_factory, host=None, port=None,
526 *,
527 family=socket.AF_UNSPEC,
528 flags=socket.AI_PASSIVE,
529 sock=None,
530 backlog=100,
531 ssl=None,
532 reuse_address=None):
533 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700534 if isinstance(ssl, bool):
535 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 if host is not None or port is not None:
537 if sock is not None:
538 raise ValueError(
539 'host/port and sock can not be specified at the same time')
540
541 AF_INET6 = getattr(socket, 'AF_INET6', 0)
542 if reuse_address is None:
543 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
544 sockets = []
545 if host == '':
546 host = None
547
548 infos = yield from self.getaddrinfo(
549 host, port, family=family,
550 type=socket.SOCK_STREAM, proto=0, flags=flags)
551 if not infos:
552 raise OSError('getaddrinfo() returned empty list')
553
554 completed = False
555 try:
556 for res in infos:
557 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700558 try:
559 sock = socket.socket(af, socktype, proto)
560 except socket.error:
561 # Assume it's a bad family/type/protocol combination.
562 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 sockets.append(sock)
564 if reuse_address:
565 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
566 True)
567 # Disable IPv4/IPv6 dual stack support (enabled by
568 # default on Linux) which makes a single socket
569 # listen on both address families.
570 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
571 sock.setsockopt(socket.IPPROTO_IPV6,
572 socket.IPV6_V6ONLY,
573 True)
574 try:
575 sock.bind(sa)
576 except OSError as err:
577 raise OSError(err.errno, 'error while attempting '
578 'to bind on address %r: %s'
579 % (sa, err.strerror.lower()))
580 completed = True
581 finally:
582 if not completed:
583 for sock in sockets:
584 sock.close()
585 else:
586 if sock is None:
587 raise ValueError(
588 'host and port was not specified and no sock specified')
589 sockets = [sock]
590
591 server = Server(self, sockets)
592 for sock in sockets:
593 sock.listen(backlog)
594 sock.setblocking(False)
595 self._start_serving(protocol_factory, sock, ssl, server)
596 return server
597
598 @tasks.coroutine
599 def connect_read_pipe(self, protocol_factory, pipe):
600 protocol = protocol_factory()
601 waiter = futures.Future(loop=self)
602 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
603 yield from waiter
604 return transport, protocol
605
606 @tasks.coroutine
607 def connect_write_pipe(self, protocol_factory, pipe):
608 protocol = protocol_factory()
609 waiter = futures.Future(loop=self)
610 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
611 yield from waiter
612 return transport, protocol
613
614 @tasks.coroutine
615 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
616 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
617 universal_newlines=False, shell=True, bufsize=0,
618 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100619 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800620 raise ValueError("cmd must be a string")
621 if universal_newlines:
622 raise ValueError("universal_newlines must be False")
623 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100624 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800625 if bufsize != 0:
626 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 protocol = protocol_factory()
628 transport = yield from self._make_subprocess_transport(
629 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
630 return transport, protocol
631
632 @tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500633 def subprocess_exec(self, protocol_factory, program, *args,
634 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
635 stderr=subprocess.PIPE, universal_newlines=False,
636 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800637 if universal_newlines:
638 raise ValueError("universal_newlines must be False")
639 if shell:
640 raise ValueError("shell must be False")
641 if bufsize != 0:
642 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100643 popen_args = (program,) + args
644 for arg in popen_args:
645 if not isinstance(arg, (str, bytes)):
646 raise TypeError("program arguments must be "
647 "a bytes or text string, not %s"
648 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 protocol = protocol_factory()
650 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500651 protocol, popen_args, False, stdin, stdout, stderr,
652 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653 return transport, protocol
654
Yury Selivanov569efa22014-02-18 18:02:19 -0500655 def set_exception_handler(self, handler):
656 """Set handler as the new event loop exception handler.
657
658 If handler is None, the default exception handler will
659 be set.
660
661 If handler is a callable object, it should have a
662 matching signature to '(loop, context)', where 'loop'
663 will be a reference to the active event loop, 'context'
664 will be a dict object (see `call_exception_handler()`
665 documentation for details about context).
666 """
667 if handler is not None and not callable(handler):
668 raise TypeError('A callable object or None is expected, '
669 'got {!r}'.format(handler))
670 self._exception_handler = handler
671
672 def default_exception_handler(self, context):
673 """Default exception handler.
674
675 This is called when an exception occurs and no exception
676 handler is set, and can be called by a custom exception
677 handler that wants to defer to the default behavior.
678
679 context parameter has the same meaning as in
680 `call_exception_handler()`.
681 """
682 message = context.get('message')
683 if not message:
684 message = 'Unhandled exception in event loop'
685
686 exception = context.get('exception')
687 if exception is not None:
688 exc_info = (type(exception), exception, exception.__traceback__)
689 else:
690 exc_info = False
691
692 log_lines = [message]
693 for key in sorted(context):
694 if key in {'message', 'exception'}:
695 continue
696 log_lines.append('{}: {!r}'.format(key, context[key]))
697
698 logger.error('\n'.join(log_lines), exc_info=exc_info)
699
700 def call_exception_handler(self, context):
701 """Call the current event loop exception handler.
702
703 context is a dict object containing the following keys
704 (new keys maybe introduced later):
705 - 'message': Error message;
706 - 'exception' (optional): Exception object;
707 - 'future' (optional): Future instance;
708 - 'handle' (optional): Handle instance;
709 - 'protocol' (optional): Protocol instance;
710 - 'transport' (optional): Transport instance;
711 - 'socket' (optional): Socket instance.
712
713 Note: this method should not be overloaded in subclassed
714 event loops. For any custom exception handling, use
715 `set_exception_handler()` method.
716 """
717 if self._exception_handler is None:
718 try:
719 self.default_exception_handler(context)
720 except Exception:
721 # Second protection layer for unexpected errors
722 # in the default implementation, as well as for subclassed
723 # event loops with overloaded "default_exception_handler".
724 logger.error('Exception in default exception handler',
725 exc_info=True)
726 else:
727 try:
728 self._exception_handler(self, context)
729 except Exception as exc:
730 # Exception in the user set custom exception handler.
731 try:
732 # Let's try default handler.
733 self.default_exception_handler({
734 'message': 'Unhandled error in exception handler',
735 'exception': exc,
736 'context': context,
737 })
738 except Exception:
739 # Guard 'default_exception_handler' in case it's
740 # overloaded.
741 logger.error('Exception in default exception handler '
742 'while handling an unexpected error '
743 'in custom exception handler',
744 exc_info=True)
745
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 def _add_callback(self, handle):
747 """Add a Handle to ready or scheduled."""
748 assert isinstance(handle, events.Handle), 'A Handle is required here'
749 if handle._cancelled:
750 return
751 if isinstance(handle, events.TimerHandle):
752 heapq.heappush(self._scheduled, handle)
753 else:
754 self._ready.append(handle)
755
756 def _add_callback_signalsafe(self, handle):
757 """Like _add_callback() but called from a signal handler."""
758 self._add_callback(handle)
759 self._write_to_self()
760
761 def _run_once(self):
762 """Run one full iteration of the event loop.
763
764 This calls all currently ready callbacks, polls for I/O,
765 schedules the resulting callbacks, and finally schedules
766 'call_later' callbacks.
767 """
768 # Remove delayed calls that were cancelled from head of queue.
769 while self._scheduled and self._scheduled[0]._cancelled:
770 heapq.heappop(self._scheduled)
771
772 timeout = None
773 if self._ready:
774 timeout = 0
775 elif self._scheduled:
776 # Compute the desired timeout.
777 when = self._scheduled[0]._when
778 deadline = max(0, when - self.time())
779 if timeout is None:
780 timeout = deadline
781 else:
782 timeout = min(timeout, deadline)
783
784 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100785 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100786 t0 = self.time()
787 event_list = self._selector.select(timeout)
788 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100789 if t1-t0 >= 1:
790 level = logging.INFO
791 else:
792 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100793 if timeout is not None:
794 logger.log(level, 'poll %.3f took %.3f seconds',
795 timeout, t1-t0)
796 else:
797 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100799 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700800 self._process_events(event_list)
801
802 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100803 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700804 while self._scheduled:
805 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100806 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700807 break
808 handle = heapq.heappop(self._scheduled)
809 self._ready.append(handle)
810
811 # This is the only place where callbacks are actually *called*.
812 # All other places just add them to ready.
813 # Note: We run all currently scheduled callbacks, but not any
814 # callbacks scheduled by callbacks run this time around --
815 # they will be run the next time (after another I/O poll).
816 # Use an idiom that is threadsafe without using locks.
817 ntodo = len(self._ready)
818 for i in range(ntodo):
819 handle = self._ready.popleft()
820 if not handle._cancelled:
821 handle._run()
822 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100823
824 def get_debug(self):
825 return self._debug
826
827 def set_debug(self, enabled):
828 self._debug = enabled