blob: 1c7073c3703c76996beb07ecbface26f0e485c46 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010048 if family == socket.AF_INET:
49 host, port = address
50 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010051 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010052 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010053 return
54
Victor Stinner1b0580b2014-02-13 09:24:37 +010055 type_mask = 0
56 if hasattr(socket, 'SOCK_NONBLOCK'):
57 type_mask |= socket.SOCK_NONBLOCK
58 if hasattr(socket, 'SOCK_CLOEXEC'):
59 type_mask |= socket.SOCK_CLOEXEC
60 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
61 # already resolved.
62 try:
63 socket.getaddrinfo(host, port,
64 family=family,
65 type=(sock.type & ~type_mask),
66 proto=sock.proto,
67 flags=socket.AI_NUMERICHOST)
68 except socket.gaierror as err:
69 raise ValueError("address must be resolved (IP address), got %r: %s"
70 % (address, err))
71
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072def _raise_stop_error(*args):
73 raise _StopError
74
75
76class Server(events.AbstractServer):
77
78 def __init__(self, loop, sockets):
79 self.loop = loop
80 self.sockets = sockets
81 self.active_count = 0
82 self.waiters = []
83
84 def attach(self, transport):
85 assert self.sockets is not None
86 self.active_count += 1
87
88 def detach(self, transport):
89 assert self.active_count > 0
90 self.active_count -= 1
91 if self.active_count == 0 and self.sockets is None:
92 self._wakeup()
93
94 def close(self):
95 sockets = self.sockets
96 if sockets is not None:
97 self.sockets = None
98 for sock in sockets:
99 self.loop._stop_serving(sock)
100 if self.active_count == 0:
101 self._wakeup()
102
103 def _wakeup(self):
104 waiters = self.waiters
105 self.waiters = None
106 for waiter in waiters:
107 if not waiter.done():
108 waiter.set_result(waiter)
109
110 @tasks.coroutine
111 def wait_closed(self):
112 if self.sockets is None or self.waiters is None:
113 return
114 waiter = futures.Future(loop=self.loop)
115 self.waiters.append(waiter)
116 yield from waiter
117
118
119class BaseEventLoop(events.AbstractEventLoop):
120
121 def __init__(self):
122 self._ready = collections.deque()
123 self._scheduled = []
124 self._default_executor = None
125 self._internal_fds = 0
126 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100127 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500128 self._exception_handler = None
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100129 self._debug = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def _make_socket_transport(self, sock, protocol, waiter=None, *,
132 extra=None, server=None):
133 """Create socket transport."""
134 raise NotImplementedError
135
136 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
137 server_side=False, server_hostname=None,
138 extra=None, server=None):
139 """Create SSL transport."""
140 raise NotImplementedError
141
142 def _make_datagram_transport(self, sock, protocol,
143 address=None, extra=None):
144 """Create datagram transport."""
145 raise NotImplementedError
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 """Create read pipe transport."""
150 raise NotImplementedError
151
152 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
153 extra=None):
154 """Create write pipe transport."""
155 raise NotImplementedError
156
157 @tasks.coroutine
158 def _make_subprocess_transport(self, protocol, args, shell,
159 stdin, stdout, stderr, bufsize,
160 extra=None, **kwargs):
161 """Create subprocess transport."""
162 raise NotImplementedError
163
164 def _read_from_self(self):
165 """XXX"""
166 raise NotImplementedError
167
168 def _write_to_self(self):
169 """XXX"""
170 raise NotImplementedError
171
172 def _process_events(self, event_list):
173 """Process selector events."""
174 raise NotImplementedError
175
176 def run_forever(self):
177 """Run until stop() is called."""
178 if self._running:
179 raise RuntimeError('Event loop is running.')
180 self._running = True
181 try:
182 while True:
183 try:
184 self._run_once()
185 except _StopError:
186 break
187 finally:
188 self._running = False
189
190 def run_until_complete(self, future):
191 """Run until the Future is done.
192
193 If the argument is a coroutine, it is wrapped in a Task.
194
195 XXX TBD: It would be disastrous to call run_until_complete()
196 with the same coroutine twice -- it would wrap it in two
197 different Tasks and that can't be good.
198
199 Return the Future's result, or raise its exception.
200 """
201 future = tasks.async(future, loop=self)
202 future.add_done_callback(_raise_stop_error)
203 self.run_forever()
204 future.remove_done_callback(_raise_stop_error)
205 if not future.done():
206 raise RuntimeError('Event loop stopped before Future completed.')
207
208 return future.result()
209
210 def stop(self):
211 """Stop running the event loop.
212
213 Every callback scheduled before stop() is called will run.
214 Callback scheduled after stop() is called won't. However,
215 those callbacks will run if run() is called again later.
216 """
217 self.call_soon(_raise_stop_error)
218
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200219 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700220 """Close the event loop.
221
222 This clears the queues and shuts down the executor,
223 but does not wait for the executor to finish.
224 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200225 self._ready.clear()
226 self._scheduled.clear()
227 executor = self._default_executor
228 if executor is not None:
229 self._default_executor = None
230 executor.shutdown(wait=False)
231
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 def is_running(self):
233 """Returns running status of event loop."""
234 return self._running
235
236 def time(self):
237 """Return the time according to the event loop's clock."""
238 return time.monotonic()
239
240 def call_later(self, delay, callback, *args):
241 """Arrange for a callback to be called at a given time.
242
243 Return a Handle: an opaque object with a cancel() method that
244 can be used to cancel the call.
245
246 The delay can be an int or float, expressed in seconds. It is
247 always a relative time.
248
249 Each callback will be called exactly once. If two callbacks
250 are scheduled for exactly the same time, it undefined which
251 will be called first.
252
253 Any positional arguments after the callback will be passed to
254 the callback when it is called.
255 """
256 return self.call_at(self.time() + delay, callback, *args)
257
258 def call_at(self, when, callback, *args):
259 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100260 if tasks.iscoroutinefunction(callback):
261 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100262 if self._debug:
263 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500264 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 heapq.heappush(self._scheduled, timer)
266 return timer
267
268 def call_soon(self, callback, *args):
269 """Arrange for a callback to be called as soon as possible.
270
271 This operates as a FIFO queue, callbacks are called in the
272 order in which they are registered. Each callback will be
273 called exactly once.
274
275 Any positional arguments after the callback will be passed to
276 the callback when it is called.
277 """
Victor Stinner93569c22014-03-21 10:00:52 +0100278 return self._call_soon(callback, args, check_loop=True)
279
280 def _call_soon(self, callback, args, check_loop):
Victor Stinner9af4a242014-02-11 11:34:30 +0100281 if tasks.iscoroutinefunction(callback):
282 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100283 if self._debug and check_loop:
284 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500285 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 self._ready.append(handle)
287 return handle
288
Victor Stinner93569c22014-03-21 10:00:52 +0100289 def _assert_is_current_event_loop(self):
290 """Asserts that this event loop is the current event loop.
291
292 Non-threadsafe methods of this class make this assumption and will
293 likely behave incorrectly when the assumption is violated.
294
295 Should only be called when (self._debug == True). The caller is
296 responsible for checking this condition for performance reasons.
297 """
298 if events.get_event_loop() is not self:
299 raise RuntimeError(
300 "non-threadsafe operation invoked on an event loop other "
301 "than the current one")
302
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 def call_soon_threadsafe(self, callback, *args):
304 """XXX"""
Victor Stinner93569c22014-03-21 10:00:52 +0100305 handle = self._call_soon(callback, args, check_loop=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 self._write_to_self()
307 return handle
308
309 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100310 if tasks.iscoroutinefunction(callback):
311 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 if isinstance(callback, events.Handle):
313 assert not args
314 assert not isinstance(callback, events.TimerHandle)
315 if callback._cancelled:
316 f = futures.Future(loop=self)
317 f.set_result(None)
318 return f
319 callback, args = callback._callback, callback._args
320 if executor is None:
321 executor = self._default_executor
322 if executor is None:
323 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
324 self._default_executor = executor
325 return futures.wrap_future(executor.submit(callback, *args), loop=self)
326
327 def set_default_executor(self, executor):
328 self._default_executor = executor
329
330 def getaddrinfo(self, host, port, *,
331 family=0, type=0, proto=0, flags=0):
332 return self.run_in_executor(None, socket.getaddrinfo,
333 host, port, family, type, proto, flags)
334
335 def getnameinfo(self, sockaddr, flags=0):
336 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
337
338 @tasks.coroutine
339 def create_connection(self, protocol_factory, host=None, port=None, *,
340 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700341 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700343 if server_hostname is not None and not ssl:
344 raise ValueError('server_hostname is only meaningful with ssl')
345
346 if server_hostname is None and ssl:
347 # Use host as default for server_hostname. It is an error
348 # if host is empty or not set, e.g. when an
349 # already-connected socket was passed or when only a port
350 # is given. To avoid this error, you can pass
351 # server_hostname='' -- this will bypass the hostname
352 # check. (This also means that if host is a numeric
353 # IP/IPv6 address, we will attempt to verify that exact
354 # address; this will probably fail, but it is possible to
355 # create a certificate for a specific IP address, so we
356 # don't judge it here.)
357 if not host:
358 raise ValueError('You must set server_hostname '
359 'when using ssl without a host')
360 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700361
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 if host is not None or port is not None:
363 if sock is not None:
364 raise ValueError(
365 'host/port and sock can not be specified at the same time')
366
367 f1 = self.getaddrinfo(
368 host, port, family=family,
369 type=socket.SOCK_STREAM, proto=proto, flags=flags)
370 fs = [f1]
371 if local_addr is not None:
372 f2 = self.getaddrinfo(
373 *local_addr, family=family,
374 type=socket.SOCK_STREAM, proto=proto, flags=flags)
375 fs.append(f2)
376 else:
377 f2 = None
378
379 yield from tasks.wait(fs, loop=self)
380
381 infos = f1.result()
382 if not infos:
383 raise OSError('getaddrinfo() returned empty list')
384 if f2 is not None:
385 laddr_infos = f2.result()
386 if not laddr_infos:
387 raise OSError('getaddrinfo() returned empty list')
388
389 exceptions = []
390 for family, type, proto, cname, address in infos:
391 try:
392 sock = socket.socket(family=family, type=type, proto=proto)
393 sock.setblocking(False)
394 if f2 is not None:
395 for _, _, _, _, laddr in laddr_infos:
396 try:
397 sock.bind(laddr)
398 break
399 except OSError as exc:
400 exc = OSError(
401 exc.errno, 'error while '
402 'attempting to bind on address '
403 '{!r}: {}'.format(
404 laddr, exc.strerror.lower()))
405 exceptions.append(exc)
406 else:
407 sock.close()
408 sock = None
409 continue
410 yield from self.sock_connect(sock, address)
411 except OSError as exc:
412 if sock is not None:
413 sock.close()
414 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200415 except:
416 if sock is not None:
417 sock.close()
418 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 else:
420 break
421 else:
422 if len(exceptions) == 1:
423 raise exceptions[0]
424 else:
425 # If they all have the same str(), raise one.
426 model = str(exceptions[0])
427 if all(str(exc) == model for exc in exceptions):
428 raise exceptions[0]
429 # Raise a combined exception so the user can see all
430 # the various error messages.
431 raise OSError('Multiple exceptions: {}'.format(
432 ', '.join(str(exc) for exc in exceptions)))
433
434 elif sock is None:
435 raise ValueError(
436 'host and port was not specified and no sock specified')
437
438 sock.setblocking(False)
439
Yury Selivanovb057c522014-02-18 12:15:06 -0500440 transport, protocol = yield from self._create_connection_transport(
441 sock, protocol_factory, ssl, server_hostname)
442 return transport, protocol
443
444 @tasks.coroutine
445 def _create_connection_transport(self, sock, protocol_factory, ssl,
446 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 protocol = protocol_factory()
448 waiter = futures.Future(loop=self)
449 if ssl:
450 sslcontext = None if isinstance(ssl, bool) else ssl
451 transport = self._make_ssl_transport(
452 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700453 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 else:
455 transport = self._make_socket_transport(sock, protocol, waiter)
456
457 yield from waiter
458 return transport, protocol
459
460 @tasks.coroutine
461 def create_datagram_endpoint(self, protocol_factory,
462 local_addr=None, remote_addr=None, *,
463 family=0, proto=0, flags=0):
464 """Create datagram connection."""
465 if not (local_addr or remote_addr):
466 if family == 0:
467 raise ValueError('unexpected address family')
468 addr_pairs_info = (((family, proto), (None, None)),)
469 else:
470 # join addresss by (family, protocol)
471 addr_infos = collections.OrderedDict()
472 for idx, addr in ((0, local_addr), (1, remote_addr)):
473 if addr is not None:
474 assert isinstance(addr, tuple) and len(addr) == 2, (
475 '2-tuple is expected')
476
477 infos = yield from self.getaddrinfo(
478 *addr, family=family, type=socket.SOCK_DGRAM,
479 proto=proto, flags=flags)
480 if not infos:
481 raise OSError('getaddrinfo() returned empty list')
482
483 for fam, _, pro, _, address in infos:
484 key = (fam, pro)
485 if key not in addr_infos:
486 addr_infos[key] = [None, None]
487 addr_infos[key][idx] = address
488
489 # each addr has to have info for each (family, proto) pair
490 addr_pairs_info = [
491 (key, addr_pair) for key, addr_pair in addr_infos.items()
492 if not ((local_addr and addr_pair[0] is None) or
493 (remote_addr and addr_pair[1] is None))]
494
495 if not addr_pairs_info:
496 raise ValueError('can not get address information')
497
498 exceptions = []
499
500 for ((family, proto),
501 (local_address, remote_address)) in addr_pairs_info:
502 sock = None
503 r_addr = None
504 try:
505 sock = socket.socket(
506 family=family, type=socket.SOCK_DGRAM, proto=proto)
507 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
508 sock.setblocking(False)
509
510 if local_addr:
511 sock.bind(local_address)
512 if remote_addr:
513 yield from self.sock_connect(sock, remote_address)
514 r_addr = remote_address
515 except OSError as exc:
516 if sock is not None:
517 sock.close()
518 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200519 except:
520 if sock is not None:
521 sock.close()
522 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 else:
524 break
525 else:
526 raise exceptions[0]
527
528 protocol = protocol_factory()
529 transport = self._make_datagram_transport(sock, protocol, r_addr)
530 return transport, protocol
531
532 @tasks.coroutine
533 def create_server(self, protocol_factory, host=None, port=None,
534 *,
535 family=socket.AF_UNSPEC,
536 flags=socket.AI_PASSIVE,
537 sock=None,
538 backlog=100,
539 ssl=None,
540 reuse_address=None):
541 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700542 if isinstance(ssl, bool):
543 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 if host is not None or port is not None:
545 if sock is not None:
546 raise ValueError(
547 'host/port and sock can not be specified at the same time')
548
549 AF_INET6 = getattr(socket, 'AF_INET6', 0)
550 if reuse_address is None:
551 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
552 sockets = []
553 if host == '':
554 host = None
555
556 infos = yield from self.getaddrinfo(
557 host, port, family=family,
558 type=socket.SOCK_STREAM, proto=0, flags=flags)
559 if not infos:
560 raise OSError('getaddrinfo() returned empty list')
561
562 completed = False
563 try:
564 for res in infos:
565 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700566 try:
567 sock = socket.socket(af, socktype, proto)
568 except socket.error:
569 # Assume it's a bad family/type/protocol combination.
570 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 sockets.append(sock)
572 if reuse_address:
573 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
574 True)
575 # Disable IPv4/IPv6 dual stack support (enabled by
576 # default on Linux) which makes a single socket
577 # listen on both address families.
578 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
579 sock.setsockopt(socket.IPPROTO_IPV6,
580 socket.IPV6_V6ONLY,
581 True)
582 try:
583 sock.bind(sa)
584 except OSError as err:
585 raise OSError(err.errno, 'error while attempting '
586 'to bind on address %r: %s'
587 % (sa, err.strerror.lower()))
588 completed = True
589 finally:
590 if not completed:
591 for sock in sockets:
592 sock.close()
593 else:
594 if sock is None:
595 raise ValueError(
596 'host and port was not specified and no sock specified')
597 sockets = [sock]
598
599 server = Server(self, sockets)
600 for sock in sockets:
601 sock.listen(backlog)
602 sock.setblocking(False)
603 self._start_serving(protocol_factory, sock, ssl, server)
604 return server
605
606 @tasks.coroutine
607 def connect_read_pipe(self, protocol_factory, pipe):
608 protocol = protocol_factory()
609 waiter = futures.Future(loop=self)
610 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
611 yield from waiter
612 return transport, protocol
613
614 @tasks.coroutine
615 def connect_write_pipe(self, protocol_factory, pipe):
616 protocol = protocol_factory()
617 waiter = futures.Future(loop=self)
618 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
619 yield from waiter
620 return transport, protocol
621
622 @tasks.coroutine
623 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
624 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
625 universal_newlines=False, shell=True, bufsize=0,
626 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100627 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800628 raise ValueError("cmd must be a string")
629 if universal_newlines:
630 raise ValueError("universal_newlines must be False")
631 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100632 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800633 if bufsize != 0:
634 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 protocol = protocol_factory()
636 transport = yield from self._make_subprocess_transport(
637 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
638 return transport, protocol
639
640 @tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500641 def subprocess_exec(self, protocol_factory, program, *args,
642 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
643 stderr=subprocess.PIPE, universal_newlines=False,
644 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800645 if universal_newlines:
646 raise ValueError("universal_newlines must be False")
647 if shell:
648 raise ValueError("shell must be False")
649 if bufsize != 0:
650 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100651 popen_args = (program,) + args
652 for arg in popen_args:
653 if not isinstance(arg, (str, bytes)):
654 raise TypeError("program arguments must be "
655 "a bytes or text string, not %s"
656 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657 protocol = protocol_factory()
658 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500659 protocol, popen_args, False, stdin, stdout, stderr,
660 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 return transport, protocol
662
Yury Selivanov569efa22014-02-18 18:02:19 -0500663 def set_exception_handler(self, handler):
664 """Set handler as the new event loop exception handler.
665
666 If handler is None, the default exception handler will
667 be set.
668
669 If handler is a callable object, it should have a
670 matching signature to '(loop, context)', where 'loop'
671 will be a reference to the active event loop, 'context'
672 will be a dict object (see `call_exception_handler()`
673 documentation for details about context).
674 """
675 if handler is not None and not callable(handler):
676 raise TypeError('A callable object or None is expected, '
677 'got {!r}'.format(handler))
678 self._exception_handler = handler
679
680 def default_exception_handler(self, context):
681 """Default exception handler.
682
683 This is called when an exception occurs and no exception
684 handler is set, and can be called by a custom exception
685 handler that wants to defer to the default behavior.
686
687 context parameter has the same meaning as in
688 `call_exception_handler()`.
689 """
690 message = context.get('message')
691 if not message:
692 message = 'Unhandled exception in event loop'
693
694 exception = context.get('exception')
695 if exception is not None:
696 exc_info = (type(exception), exception, exception.__traceback__)
697 else:
698 exc_info = False
699
700 log_lines = [message]
701 for key in sorted(context):
702 if key in {'message', 'exception'}:
703 continue
704 log_lines.append('{}: {!r}'.format(key, context[key]))
705
706 logger.error('\n'.join(log_lines), exc_info=exc_info)
707
708 def call_exception_handler(self, context):
709 """Call the current event loop exception handler.
710
711 context is a dict object containing the following keys
712 (new keys maybe introduced later):
713 - 'message': Error message;
714 - 'exception' (optional): Exception object;
715 - 'future' (optional): Future instance;
716 - 'handle' (optional): Handle instance;
717 - 'protocol' (optional): Protocol instance;
718 - 'transport' (optional): Transport instance;
719 - 'socket' (optional): Socket instance.
720
721 Note: this method should not be overloaded in subclassed
722 event loops. For any custom exception handling, use
723 `set_exception_handler()` method.
724 """
725 if self._exception_handler is None:
726 try:
727 self.default_exception_handler(context)
728 except Exception:
729 # Second protection layer for unexpected errors
730 # in the default implementation, as well as for subclassed
731 # event loops with overloaded "default_exception_handler".
732 logger.error('Exception in default exception handler',
733 exc_info=True)
734 else:
735 try:
736 self._exception_handler(self, context)
737 except Exception as exc:
738 # Exception in the user set custom exception handler.
739 try:
740 # Let's try default handler.
741 self.default_exception_handler({
742 'message': 'Unhandled error in exception handler',
743 'exception': exc,
744 'context': context,
745 })
746 except Exception:
747 # Guard 'default_exception_handler' in case it's
748 # overloaded.
749 logger.error('Exception in default exception handler '
750 'while handling an unexpected error '
751 'in custom exception handler',
752 exc_info=True)
753
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754 def _add_callback(self, handle):
755 """Add a Handle to ready or scheduled."""
756 assert isinstance(handle, events.Handle), 'A Handle is required here'
757 if handle._cancelled:
758 return
759 if isinstance(handle, events.TimerHandle):
760 heapq.heappush(self._scheduled, handle)
761 else:
762 self._ready.append(handle)
763
764 def _add_callback_signalsafe(self, handle):
765 """Like _add_callback() but called from a signal handler."""
766 self._add_callback(handle)
767 self._write_to_self()
768
769 def _run_once(self):
770 """Run one full iteration of the event loop.
771
772 This calls all currently ready callbacks, polls for I/O,
773 schedules the resulting callbacks, and finally schedules
774 'call_later' callbacks.
775 """
776 # Remove delayed calls that were cancelled from head of queue.
777 while self._scheduled and self._scheduled[0]._cancelled:
778 heapq.heappop(self._scheduled)
779
780 timeout = None
781 if self._ready:
782 timeout = 0
783 elif self._scheduled:
784 # Compute the desired timeout.
785 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700786 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787
788 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100789 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100790 t0 = self.time()
791 event_list = self._selector.select(timeout)
792 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100793 if t1-t0 >= 1:
794 level = logging.INFO
795 else:
796 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100797 if timeout is not None:
798 logger.log(level, 'poll %.3f took %.3f seconds',
799 timeout, t1-t0)
800 else:
801 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700802 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100803 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700804 self._process_events(event_list)
805
806 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100807 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700808 while self._scheduled:
809 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100810 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700811 break
812 handle = heapq.heappop(self._scheduled)
813 self._ready.append(handle)
814
815 # This is the only place where callbacks are actually *called*.
816 # All other places just add them to ready.
817 # Note: We run all currently scheduled callbacks, but not any
818 # callbacks scheduled by callbacks run this time around --
819 # they will be run the next time (after another I/O poll).
820 # Use an idiom that is threadsafe without using locks.
821 ntodo = len(self._ready)
822 for i in range(ntodo):
823 handle = self._ready.popleft()
824 if not handle._cancelled:
825 handle._run()
826 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100827
828 def get_debug(self):
829 return self._debug
830
831 def set_debug(self, enabled):
832 self._debug = enabled