blob: 5ee21d1cd117484b4b1acd6ff322460ff308b541 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010048 if family == socket.AF_INET:
49 host, port = address
50 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010051 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010052 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010053 return
54
Victor Stinner1b0580b2014-02-13 09:24:37 +010055 type_mask = 0
56 if hasattr(socket, 'SOCK_NONBLOCK'):
57 type_mask |= socket.SOCK_NONBLOCK
58 if hasattr(socket, 'SOCK_CLOEXEC'):
59 type_mask |= socket.SOCK_CLOEXEC
60 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
61 # already resolved.
62 try:
63 socket.getaddrinfo(host, port,
64 family=family,
65 type=(sock.type & ~type_mask),
66 proto=sock.proto,
67 flags=socket.AI_NUMERICHOST)
68 except socket.gaierror as err:
69 raise ValueError("address must be resolved (IP address), got %r: %s"
70 % (address, err))
71
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072def _raise_stop_error(*args):
73 raise _StopError
74
75
76class Server(events.AbstractServer):
77
78 def __init__(self, loop, sockets):
79 self.loop = loop
80 self.sockets = sockets
81 self.active_count = 0
82 self.waiters = []
83
84 def attach(self, transport):
85 assert self.sockets is not None
86 self.active_count += 1
87
88 def detach(self, transport):
89 assert self.active_count > 0
90 self.active_count -= 1
91 if self.active_count == 0 and self.sockets is None:
92 self._wakeup()
93
94 def close(self):
95 sockets = self.sockets
96 if sockets is not None:
97 self.sockets = None
98 for sock in sockets:
99 self.loop._stop_serving(sock)
100 if self.active_count == 0:
101 self._wakeup()
102
103 def _wakeup(self):
104 waiters = self.waiters
105 self.waiters = None
106 for waiter in waiters:
107 if not waiter.done():
108 waiter.set_result(waiter)
109
110 @tasks.coroutine
111 def wait_closed(self):
112 if self.sockets is None or self.waiters is None:
113 return
114 waiter = futures.Future(loop=self.loop)
115 self.waiters.append(waiter)
116 yield from waiter
117
118
119class BaseEventLoop(events.AbstractEventLoop):
120
121 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200122 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 self._ready = collections.deque()
124 self._scheduled = []
125 self._default_executor = None
126 self._internal_fds = 0
127 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100128 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500129 self._exception_handler = None
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100130 self._debug = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200132 def __repr__(self):
133 return ('<%s running=%s closed=%s debug=%s>'
134 % (self.__class__.__name__, self.is_running(),
135 self.is_closed(), self.get_debug()))
136
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 def _make_socket_transport(self, sock, protocol, waiter=None, *,
138 extra=None, server=None):
139 """Create socket transport."""
140 raise NotImplementedError
141
142 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
143 server_side=False, server_hostname=None,
144 extra=None, server=None):
145 """Create SSL transport."""
146 raise NotImplementedError
147
148 def _make_datagram_transport(self, sock, protocol,
149 address=None, extra=None):
150 """Create datagram transport."""
151 raise NotImplementedError
152
153 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
154 extra=None):
155 """Create read pipe transport."""
156 raise NotImplementedError
157
158 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
159 extra=None):
160 """Create write pipe transport."""
161 raise NotImplementedError
162
163 @tasks.coroutine
164 def _make_subprocess_transport(self, protocol, args, shell,
165 stdin, stdout, stderr, bufsize,
166 extra=None, **kwargs):
167 """Create subprocess transport."""
168 raise NotImplementedError
169
170 def _read_from_self(self):
171 """XXX"""
172 raise NotImplementedError
173
174 def _write_to_self(self):
175 """XXX"""
176 raise NotImplementedError
177
178 def _process_events(self, event_list):
179 """Process selector events."""
180 raise NotImplementedError
181
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200182 def _check_closed(self):
183 if self._closed:
184 raise RuntimeError('Event loop is closed')
185
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 def run_forever(self):
187 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200188 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 if self._running:
190 raise RuntimeError('Event loop is running.')
191 self._running = True
192 try:
193 while True:
194 try:
195 self._run_once()
196 except _StopError:
197 break
198 finally:
199 self._running = False
200
201 def run_until_complete(self, future):
202 """Run until the Future is done.
203
204 If the argument is a coroutine, it is wrapped in a Task.
205
206 XXX TBD: It would be disastrous to call run_until_complete()
207 with the same coroutine twice -- it would wrap it in two
208 different Tasks and that can't be good.
209
210 Return the Future's result, or raise its exception.
211 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200212 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 future = tasks.async(future, loop=self)
214 future.add_done_callback(_raise_stop_error)
215 self.run_forever()
216 future.remove_done_callback(_raise_stop_error)
217 if not future.done():
218 raise RuntimeError('Event loop stopped before Future completed.')
219
220 return future.result()
221
222 def stop(self):
223 """Stop running the event loop.
224
225 Every callback scheduled before stop() is called will run.
226 Callback scheduled after stop() is called won't. However,
227 those callbacks will run if run() is called again later.
228 """
229 self.call_soon(_raise_stop_error)
230
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200231 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700232 """Close the event loop.
233
234 This clears the queues and shuts down the executor,
235 but does not wait for the executor to finish.
236 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200237 if self._closed:
238 return
239 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200240 self._ready.clear()
241 self._scheduled.clear()
242 executor = self._default_executor
243 if executor is not None:
244 self._default_executor = None
245 executor.shutdown(wait=False)
246
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200247 def is_closed(self):
248 """Returns True if the event loop was closed."""
249 return self._closed
250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 def is_running(self):
252 """Returns running status of event loop."""
253 return self._running
254
255 def time(self):
256 """Return the time according to the event loop's clock."""
257 return time.monotonic()
258
259 def call_later(self, delay, callback, *args):
260 """Arrange for a callback to be called at a given time.
261
262 Return a Handle: an opaque object with a cancel() method that
263 can be used to cancel the call.
264
265 The delay can be an int or float, expressed in seconds. It is
266 always a relative time.
267
268 Each callback will be called exactly once. If two callbacks
269 are scheduled for exactly the same time, it undefined which
270 will be called first.
271
272 Any positional arguments after the callback will be passed to
273 the callback when it is called.
274 """
275 return self.call_at(self.time() + delay, callback, *args)
276
277 def call_at(self, when, callback, *args):
278 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100279 if tasks.iscoroutinefunction(callback):
280 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100281 if self._debug:
282 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500283 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 heapq.heappush(self._scheduled, timer)
285 return timer
286
287 def call_soon(self, callback, *args):
288 """Arrange for a callback to be called as soon as possible.
289
290 This operates as a FIFO queue, callbacks are called in the
291 order in which they are registered. Each callback will be
292 called exactly once.
293
294 Any positional arguments after the callback will be passed to
295 the callback when it is called.
296 """
Victor Stinner93569c22014-03-21 10:00:52 +0100297 return self._call_soon(callback, args, check_loop=True)
298
299 def _call_soon(self, callback, args, check_loop):
Victor Stinner9af4a242014-02-11 11:34:30 +0100300 if tasks.iscoroutinefunction(callback):
301 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100302 if self._debug and check_loop:
303 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500304 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 self._ready.append(handle)
306 return handle
307
Victor Stinner93569c22014-03-21 10:00:52 +0100308 def _assert_is_current_event_loop(self):
309 """Asserts that this event loop is the current event loop.
310
311 Non-threadsafe methods of this class make this assumption and will
312 likely behave incorrectly when the assumption is violated.
313
314 Should only be called when (self._debug == True). The caller is
315 responsible for checking this condition for performance reasons.
316 """
317 if events.get_event_loop() is not self:
318 raise RuntimeError(
319 "non-threadsafe operation invoked on an event loop other "
320 "than the current one")
321
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 def call_soon_threadsafe(self, callback, *args):
323 """XXX"""
Victor Stinner93569c22014-03-21 10:00:52 +0100324 handle = self._call_soon(callback, args, check_loop=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 self._write_to_self()
326 return handle
327
328 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100329 if tasks.iscoroutinefunction(callback):
330 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if isinstance(callback, events.Handle):
332 assert not args
333 assert not isinstance(callback, events.TimerHandle)
334 if callback._cancelled:
335 f = futures.Future(loop=self)
336 f.set_result(None)
337 return f
338 callback, args = callback._callback, callback._args
339 if executor is None:
340 executor = self._default_executor
341 if executor is None:
342 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
343 self._default_executor = executor
344 return futures.wrap_future(executor.submit(callback, *args), loop=self)
345
346 def set_default_executor(self, executor):
347 self._default_executor = executor
348
349 def getaddrinfo(self, host, port, *,
350 family=0, type=0, proto=0, flags=0):
351 return self.run_in_executor(None, socket.getaddrinfo,
352 host, port, family, type, proto, flags)
353
354 def getnameinfo(self, sockaddr, flags=0):
355 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
356
357 @tasks.coroutine
358 def create_connection(self, protocol_factory, host=None, port=None, *,
359 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700360 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700362 if server_hostname is not None and not ssl:
363 raise ValueError('server_hostname is only meaningful with ssl')
364
365 if server_hostname is None and ssl:
366 # Use host as default for server_hostname. It is an error
367 # if host is empty or not set, e.g. when an
368 # already-connected socket was passed or when only a port
369 # is given. To avoid this error, you can pass
370 # server_hostname='' -- this will bypass the hostname
371 # check. (This also means that if host is a numeric
372 # IP/IPv6 address, we will attempt to verify that exact
373 # address; this will probably fail, but it is possible to
374 # create a certificate for a specific IP address, so we
375 # don't judge it here.)
376 if not host:
377 raise ValueError('You must set server_hostname '
378 'when using ssl without a host')
379 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700380
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 if host is not None or port is not None:
382 if sock is not None:
383 raise ValueError(
384 'host/port and sock can not be specified at the same time')
385
386 f1 = self.getaddrinfo(
387 host, port, family=family,
388 type=socket.SOCK_STREAM, proto=proto, flags=flags)
389 fs = [f1]
390 if local_addr is not None:
391 f2 = self.getaddrinfo(
392 *local_addr, family=family,
393 type=socket.SOCK_STREAM, proto=proto, flags=flags)
394 fs.append(f2)
395 else:
396 f2 = None
397
398 yield from tasks.wait(fs, loop=self)
399
400 infos = f1.result()
401 if not infos:
402 raise OSError('getaddrinfo() returned empty list')
403 if f2 is not None:
404 laddr_infos = f2.result()
405 if not laddr_infos:
406 raise OSError('getaddrinfo() returned empty list')
407
408 exceptions = []
409 for family, type, proto, cname, address in infos:
410 try:
411 sock = socket.socket(family=family, type=type, proto=proto)
412 sock.setblocking(False)
413 if f2 is not None:
414 for _, _, _, _, laddr in laddr_infos:
415 try:
416 sock.bind(laddr)
417 break
418 except OSError as exc:
419 exc = OSError(
420 exc.errno, 'error while '
421 'attempting to bind on address '
422 '{!r}: {}'.format(
423 laddr, exc.strerror.lower()))
424 exceptions.append(exc)
425 else:
426 sock.close()
427 sock = None
428 continue
429 yield from self.sock_connect(sock, address)
430 except OSError as exc:
431 if sock is not None:
432 sock.close()
433 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200434 except:
435 if sock is not None:
436 sock.close()
437 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 else:
439 break
440 else:
441 if len(exceptions) == 1:
442 raise exceptions[0]
443 else:
444 # If they all have the same str(), raise one.
445 model = str(exceptions[0])
446 if all(str(exc) == model for exc in exceptions):
447 raise exceptions[0]
448 # Raise a combined exception so the user can see all
449 # the various error messages.
450 raise OSError('Multiple exceptions: {}'.format(
451 ', '.join(str(exc) for exc in exceptions)))
452
453 elif sock is None:
454 raise ValueError(
455 'host and port was not specified and no sock specified')
456
457 sock.setblocking(False)
458
Yury Selivanovb057c522014-02-18 12:15:06 -0500459 transport, protocol = yield from self._create_connection_transport(
460 sock, protocol_factory, ssl, server_hostname)
461 return transport, protocol
462
463 @tasks.coroutine
464 def _create_connection_transport(self, sock, protocol_factory, ssl,
465 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 protocol = protocol_factory()
467 waiter = futures.Future(loop=self)
468 if ssl:
469 sslcontext = None if isinstance(ssl, bool) else ssl
470 transport = self._make_ssl_transport(
471 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700472 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 else:
474 transport = self._make_socket_transport(sock, protocol, waiter)
475
476 yield from waiter
477 return transport, protocol
478
479 @tasks.coroutine
480 def create_datagram_endpoint(self, protocol_factory,
481 local_addr=None, remote_addr=None, *,
482 family=0, proto=0, flags=0):
483 """Create datagram connection."""
484 if not (local_addr or remote_addr):
485 if family == 0:
486 raise ValueError('unexpected address family')
487 addr_pairs_info = (((family, proto), (None, None)),)
488 else:
489 # join addresss by (family, protocol)
490 addr_infos = collections.OrderedDict()
491 for idx, addr in ((0, local_addr), (1, remote_addr)):
492 if addr is not None:
493 assert isinstance(addr, tuple) and len(addr) == 2, (
494 '2-tuple is expected')
495
496 infos = yield from self.getaddrinfo(
497 *addr, family=family, type=socket.SOCK_DGRAM,
498 proto=proto, flags=flags)
499 if not infos:
500 raise OSError('getaddrinfo() returned empty list')
501
502 for fam, _, pro, _, address in infos:
503 key = (fam, pro)
504 if key not in addr_infos:
505 addr_infos[key] = [None, None]
506 addr_infos[key][idx] = address
507
508 # each addr has to have info for each (family, proto) pair
509 addr_pairs_info = [
510 (key, addr_pair) for key, addr_pair in addr_infos.items()
511 if not ((local_addr and addr_pair[0] is None) or
512 (remote_addr and addr_pair[1] is None))]
513
514 if not addr_pairs_info:
515 raise ValueError('can not get address information')
516
517 exceptions = []
518
519 for ((family, proto),
520 (local_address, remote_address)) in addr_pairs_info:
521 sock = None
522 r_addr = None
523 try:
524 sock = socket.socket(
525 family=family, type=socket.SOCK_DGRAM, proto=proto)
526 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
527 sock.setblocking(False)
528
529 if local_addr:
530 sock.bind(local_address)
531 if remote_addr:
532 yield from self.sock_connect(sock, remote_address)
533 r_addr = remote_address
534 except OSError as exc:
535 if sock is not None:
536 sock.close()
537 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200538 except:
539 if sock is not None:
540 sock.close()
541 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 else:
543 break
544 else:
545 raise exceptions[0]
546
547 protocol = protocol_factory()
548 transport = self._make_datagram_transport(sock, protocol, r_addr)
549 return transport, protocol
550
551 @tasks.coroutine
552 def create_server(self, protocol_factory, host=None, port=None,
553 *,
554 family=socket.AF_UNSPEC,
555 flags=socket.AI_PASSIVE,
556 sock=None,
557 backlog=100,
558 ssl=None,
559 reuse_address=None):
560 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700561 if isinstance(ssl, bool):
562 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 if host is not None or port is not None:
564 if sock is not None:
565 raise ValueError(
566 'host/port and sock can not be specified at the same time')
567
568 AF_INET6 = getattr(socket, 'AF_INET6', 0)
569 if reuse_address is None:
570 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
571 sockets = []
572 if host == '':
573 host = None
574
575 infos = yield from self.getaddrinfo(
576 host, port, family=family,
577 type=socket.SOCK_STREAM, proto=0, flags=flags)
578 if not infos:
579 raise OSError('getaddrinfo() returned empty list')
580
581 completed = False
582 try:
583 for res in infos:
584 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700585 try:
586 sock = socket.socket(af, socktype, proto)
587 except socket.error:
588 # Assume it's a bad family/type/protocol combination.
589 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 sockets.append(sock)
591 if reuse_address:
592 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
593 True)
594 # Disable IPv4/IPv6 dual stack support (enabled by
595 # default on Linux) which makes a single socket
596 # listen on both address families.
597 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
598 sock.setsockopt(socket.IPPROTO_IPV6,
599 socket.IPV6_V6ONLY,
600 True)
601 try:
602 sock.bind(sa)
603 except OSError as err:
604 raise OSError(err.errno, 'error while attempting '
605 'to bind on address %r: %s'
606 % (sa, err.strerror.lower()))
607 completed = True
608 finally:
609 if not completed:
610 for sock in sockets:
611 sock.close()
612 else:
613 if sock is None:
614 raise ValueError(
615 'host and port was not specified and no sock specified')
616 sockets = [sock]
617
618 server = Server(self, sockets)
619 for sock in sockets:
620 sock.listen(backlog)
621 sock.setblocking(False)
622 self._start_serving(protocol_factory, sock, ssl, server)
623 return server
624
625 @tasks.coroutine
626 def connect_read_pipe(self, protocol_factory, pipe):
627 protocol = protocol_factory()
628 waiter = futures.Future(loop=self)
629 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
630 yield from waiter
631 return transport, protocol
632
633 @tasks.coroutine
634 def connect_write_pipe(self, protocol_factory, pipe):
635 protocol = protocol_factory()
636 waiter = futures.Future(loop=self)
637 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
638 yield from waiter
639 return transport, protocol
640
641 @tasks.coroutine
642 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
643 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
644 universal_newlines=False, shell=True, bufsize=0,
645 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100646 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800647 raise ValueError("cmd must be a string")
648 if universal_newlines:
649 raise ValueError("universal_newlines must be False")
650 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100651 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800652 if bufsize != 0:
653 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 protocol = protocol_factory()
655 transport = yield from self._make_subprocess_transport(
656 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
657 return transport, protocol
658
659 @tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500660 def subprocess_exec(self, protocol_factory, program, *args,
661 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
662 stderr=subprocess.PIPE, universal_newlines=False,
663 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800664 if universal_newlines:
665 raise ValueError("universal_newlines must be False")
666 if shell:
667 raise ValueError("shell must be False")
668 if bufsize != 0:
669 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100670 popen_args = (program,) + args
671 for arg in popen_args:
672 if not isinstance(arg, (str, bytes)):
673 raise TypeError("program arguments must be "
674 "a bytes or text string, not %s"
675 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 protocol = protocol_factory()
677 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500678 protocol, popen_args, False, stdin, stdout, stderr,
679 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 return transport, protocol
681
Yury Selivanov569efa22014-02-18 18:02:19 -0500682 def set_exception_handler(self, handler):
683 """Set handler as the new event loop exception handler.
684
685 If handler is None, the default exception handler will
686 be set.
687
688 If handler is a callable object, it should have a
689 matching signature to '(loop, context)', where 'loop'
690 will be a reference to the active event loop, 'context'
691 will be a dict object (see `call_exception_handler()`
692 documentation for details about context).
693 """
694 if handler is not None and not callable(handler):
695 raise TypeError('A callable object or None is expected, '
696 'got {!r}'.format(handler))
697 self._exception_handler = handler
698
699 def default_exception_handler(self, context):
700 """Default exception handler.
701
702 This is called when an exception occurs and no exception
703 handler is set, and can be called by a custom exception
704 handler that wants to defer to the default behavior.
705
706 context parameter has the same meaning as in
707 `call_exception_handler()`.
708 """
709 message = context.get('message')
710 if not message:
711 message = 'Unhandled exception in event loop'
712
713 exception = context.get('exception')
714 if exception is not None:
715 exc_info = (type(exception), exception, exception.__traceback__)
716 else:
717 exc_info = False
718
719 log_lines = [message]
720 for key in sorted(context):
721 if key in {'message', 'exception'}:
722 continue
723 log_lines.append('{}: {!r}'.format(key, context[key]))
724
725 logger.error('\n'.join(log_lines), exc_info=exc_info)
726
727 def call_exception_handler(self, context):
728 """Call the current event loop exception handler.
729
730 context is a dict object containing the following keys
731 (new keys maybe introduced later):
732 - 'message': Error message;
733 - 'exception' (optional): Exception object;
734 - 'future' (optional): Future instance;
735 - 'handle' (optional): Handle instance;
736 - 'protocol' (optional): Protocol instance;
737 - 'transport' (optional): Transport instance;
738 - 'socket' (optional): Socket instance.
739
740 Note: this method should not be overloaded in subclassed
741 event loops. For any custom exception handling, use
742 `set_exception_handler()` method.
743 """
744 if self._exception_handler is None:
745 try:
746 self.default_exception_handler(context)
747 except Exception:
748 # Second protection layer for unexpected errors
749 # in the default implementation, as well as for subclassed
750 # event loops with overloaded "default_exception_handler".
751 logger.error('Exception in default exception handler',
752 exc_info=True)
753 else:
754 try:
755 self._exception_handler(self, context)
756 except Exception as exc:
757 # Exception in the user set custom exception handler.
758 try:
759 # Let's try default handler.
760 self.default_exception_handler({
761 'message': 'Unhandled error in exception handler',
762 'exception': exc,
763 'context': context,
764 })
765 except Exception:
766 # Guard 'default_exception_handler' in case it's
767 # overloaded.
768 logger.error('Exception in default exception handler '
769 'while handling an unexpected error '
770 'in custom exception handler',
771 exc_info=True)
772
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700773 def _add_callback(self, handle):
774 """Add a Handle to ready or scheduled."""
775 assert isinstance(handle, events.Handle), 'A Handle is required here'
776 if handle._cancelled:
777 return
778 if isinstance(handle, events.TimerHandle):
779 heapq.heappush(self._scheduled, handle)
780 else:
781 self._ready.append(handle)
782
783 def _add_callback_signalsafe(self, handle):
784 """Like _add_callback() but called from a signal handler."""
785 self._add_callback(handle)
786 self._write_to_self()
787
788 def _run_once(self):
789 """Run one full iteration of the event loop.
790
791 This calls all currently ready callbacks, polls for I/O,
792 schedules the resulting callbacks, and finally schedules
793 'call_later' callbacks.
794 """
795 # Remove delayed calls that were cancelled from head of queue.
796 while self._scheduled and self._scheduled[0]._cancelled:
797 heapq.heappop(self._scheduled)
798
799 timeout = None
800 if self._ready:
801 timeout = 0
802 elif self._scheduled:
803 # Compute the desired timeout.
804 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700805 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700806
807 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100808 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100809 t0 = self.time()
810 event_list = self._selector.select(timeout)
811 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100812 if t1-t0 >= 1:
813 level = logging.INFO
814 else:
815 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100816 if timeout is not None:
817 logger.log(level, 'poll %.3f took %.3f seconds',
818 timeout, t1-t0)
819 else:
820 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700821 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100822 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700823 self._process_events(event_list)
824
825 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100826 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827 while self._scheduled:
828 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100829 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700830 break
831 handle = heapq.heappop(self._scheduled)
832 self._ready.append(handle)
833
834 # This is the only place where callbacks are actually *called*.
835 # All other places just add them to ready.
836 # Note: We run all currently scheduled callbacks, but not any
837 # callbacks scheduled by callbacks run this time around --
838 # they will be run the next time (after another I/O poll).
839 # Use an idiom that is threadsafe without using locks.
840 ntodo = len(self._ready)
841 for i in range(ntodo):
842 handle = self._ready.popleft()
843 if not handle._cancelled:
844 handle._run()
845 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100846
847 def get_debug(self):
848 return self._debug
849
850 def set_debug(self, enabled):
851 self._debug = enabled