blob: 935098924dc6b947edf8e113cc652e8c03426ecd [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010048 if family == socket.AF_INET:
49 host, port = address
50 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010051 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010052 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010053 return
54
Victor Stinner1b0580b2014-02-13 09:24:37 +010055 type_mask = 0
56 if hasattr(socket, 'SOCK_NONBLOCK'):
57 type_mask |= socket.SOCK_NONBLOCK
58 if hasattr(socket, 'SOCK_CLOEXEC'):
59 type_mask |= socket.SOCK_CLOEXEC
60 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
61 # already resolved.
62 try:
63 socket.getaddrinfo(host, port,
64 family=family,
65 type=(sock.type & ~type_mask),
66 proto=sock.proto,
67 flags=socket.AI_NUMERICHOST)
68 except socket.gaierror as err:
69 raise ValueError("address must be resolved (IP address), got %r: %s"
70 % (address, err))
71
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072def _raise_stop_error(*args):
73 raise _StopError
74
75
76class Server(events.AbstractServer):
77
78 def __init__(self, loop, sockets):
79 self.loop = loop
80 self.sockets = sockets
81 self.active_count = 0
82 self.waiters = []
83
84 def attach(self, transport):
85 assert self.sockets is not None
86 self.active_count += 1
87
88 def detach(self, transport):
89 assert self.active_count > 0
90 self.active_count -= 1
91 if self.active_count == 0 and self.sockets is None:
92 self._wakeup()
93
94 def close(self):
95 sockets = self.sockets
96 if sockets is not None:
97 self.sockets = None
98 for sock in sockets:
99 self.loop._stop_serving(sock)
100 if self.active_count == 0:
101 self._wakeup()
102
103 def _wakeup(self):
104 waiters = self.waiters
105 self.waiters = None
106 for waiter in waiters:
107 if not waiter.done():
108 waiter.set_result(waiter)
109
110 @tasks.coroutine
111 def wait_closed(self):
112 if self.sockets is None or self.waiters is None:
113 return
114 waiter = futures.Future(loop=self.loop)
115 self.waiters.append(waiter)
116 yield from waiter
117
118
119class BaseEventLoop(events.AbstractEventLoop):
120
121 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200122 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 self._ready = collections.deque()
124 self._scheduled = []
125 self._default_executor = None
126 self._internal_fds = 0
127 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100128 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500129 self._exception_handler = None
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100130 self._debug = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200132 def __repr__(self):
133 return ('<%s running=%s closed=%s debug=%s>'
134 % (self.__class__.__name__, self.is_running(),
135 self.is_closed(), self.get_debug()))
136
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 def _make_socket_transport(self, sock, protocol, waiter=None, *,
138 extra=None, server=None):
139 """Create socket transport."""
140 raise NotImplementedError
141
142 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
143 server_side=False, server_hostname=None,
144 extra=None, server=None):
145 """Create SSL transport."""
146 raise NotImplementedError
147
148 def _make_datagram_transport(self, sock, protocol,
149 address=None, extra=None):
150 """Create datagram transport."""
151 raise NotImplementedError
152
153 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
154 extra=None):
155 """Create read pipe transport."""
156 raise NotImplementedError
157
158 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
159 extra=None):
160 """Create write pipe transport."""
161 raise NotImplementedError
162
163 @tasks.coroutine
164 def _make_subprocess_transport(self, protocol, args, shell,
165 stdin, stdout, stderr, bufsize,
166 extra=None, **kwargs):
167 """Create subprocess transport."""
168 raise NotImplementedError
169
170 def _read_from_self(self):
171 """XXX"""
172 raise NotImplementedError
173
174 def _write_to_self(self):
175 """XXX"""
176 raise NotImplementedError
177
178 def _process_events(self, event_list):
179 """Process selector events."""
180 raise NotImplementedError
181
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200182 def _check_closed(self):
183 if self._closed:
184 raise RuntimeError('Event loop is closed')
185
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 def run_forever(self):
187 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200188 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 if self._running:
190 raise RuntimeError('Event loop is running.')
191 self._running = True
192 try:
193 while True:
194 try:
195 self._run_once()
196 except _StopError:
197 break
198 finally:
199 self._running = False
200
201 def run_until_complete(self, future):
202 """Run until the Future is done.
203
204 If the argument is a coroutine, it is wrapped in a Task.
205
206 XXX TBD: It would be disastrous to call run_until_complete()
207 with the same coroutine twice -- it would wrap it in two
208 different Tasks and that can't be good.
209
210 Return the Future's result, or raise its exception.
211 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200212 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 future = tasks.async(future, loop=self)
214 future.add_done_callback(_raise_stop_error)
215 self.run_forever()
216 future.remove_done_callback(_raise_stop_error)
217 if not future.done():
218 raise RuntimeError('Event loop stopped before Future completed.')
219
220 return future.result()
221
222 def stop(self):
223 """Stop running the event loop.
224
225 Every callback scheduled before stop() is called will run.
226 Callback scheduled after stop() is called won't. However,
227 those callbacks will run if run() is called again later.
228 """
229 self.call_soon(_raise_stop_error)
230
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200231 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700232 """Close the event loop.
233
234 This clears the queues and shuts down the executor,
235 but does not wait for the executor to finish.
236 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200237 if self._closed:
238 return
239 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200240 self._ready.clear()
241 self._scheduled.clear()
242 executor = self._default_executor
243 if executor is not None:
244 self._default_executor = None
245 executor.shutdown(wait=False)
246
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200247 def is_closed(self):
248 """Returns True if the event loop was closed."""
249 return self._closed
250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 def is_running(self):
252 """Returns running status of event loop."""
253 return self._running
254
255 def time(self):
256 """Return the time according to the event loop's clock."""
257 return time.monotonic()
258
259 def call_later(self, delay, callback, *args):
260 """Arrange for a callback to be called at a given time.
261
262 Return a Handle: an opaque object with a cancel() method that
263 can be used to cancel the call.
264
265 The delay can be an int or float, expressed in seconds. It is
266 always a relative time.
267
268 Each callback will be called exactly once. If two callbacks
269 are scheduled for exactly the same time, it undefined which
270 will be called first.
271
272 Any positional arguments after the callback will be passed to
273 the callback when it is called.
274 """
275 return self.call_at(self.time() + delay, callback, *args)
276
277 def call_at(self, when, callback, *args):
278 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100279 if tasks.iscoroutinefunction(callback):
280 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100281 if self._debug:
282 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500283 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 heapq.heappush(self._scheduled, timer)
285 return timer
286
287 def call_soon(self, callback, *args):
288 """Arrange for a callback to be called as soon as possible.
289
290 This operates as a FIFO queue, callbacks are called in the
291 order in which they are registered. Each callback will be
292 called exactly once.
293
294 Any positional arguments after the callback will be passed to
295 the callback when it is called.
296 """
Victor Stinner93569c22014-03-21 10:00:52 +0100297 return self._call_soon(callback, args, check_loop=True)
298
299 def _call_soon(self, callback, args, check_loop):
Victor Stinner9af4a242014-02-11 11:34:30 +0100300 if tasks.iscoroutinefunction(callback):
301 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100302 if self._debug and check_loop:
303 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500304 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 self._ready.append(handle)
306 return handle
307
Victor Stinner93569c22014-03-21 10:00:52 +0100308 def _assert_is_current_event_loop(self):
309 """Asserts that this event loop is the current event loop.
310
311 Non-threadsafe methods of this class make this assumption and will
312 likely behave incorrectly when the assumption is violated.
313
314 Should only be called when (self._debug == True). The caller is
315 responsible for checking this condition for performance reasons.
316 """
317 if events.get_event_loop() is not self:
318 raise RuntimeError(
319 "non-threadsafe operation invoked on an event loop other "
320 "than the current one")
321
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 def call_soon_threadsafe(self, callback, *args):
Victor Stinnerd1432092014-06-19 17:11:49 +0200323 """Like call_soon(), but thread safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100324 handle = self._call_soon(callback, args, check_loop=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 self._write_to_self()
326 return handle
327
328 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100329 if tasks.iscoroutinefunction(callback):
330 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if isinstance(callback, events.Handle):
332 assert not args
333 assert not isinstance(callback, events.TimerHandle)
334 if callback._cancelled:
335 f = futures.Future(loop=self)
336 f.set_result(None)
337 return f
338 callback, args = callback._callback, callback._args
339 if executor is None:
340 executor = self._default_executor
341 if executor is None:
342 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
343 self._default_executor = executor
344 return futures.wrap_future(executor.submit(callback, *args), loop=self)
345
346 def set_default_executor(self, executor):
347 self._default_executor = executor
348
349 def getaddrinfo(self, host, port, *,
350 family=0, type=0, proto=0, flags=0):
351 return self.run_in_executor(None, socket.getaddrinfo,
352 host, port, family, type, proto, flags)
353
354 def getnameinfo(self, sockaddr, flags=0):
355 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
356
357 @tasks.coroutine
358 def create_connection(self, protocol_factory, host=None, port=None, *,
359 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700360 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200361 """Connect to a TCP server.
362
363 Create a streaming transport connection to a given Internet host and
364 port: socket family AF_INET or socket.AF_INET6 depending on host (or
365 family if specified), socket type SOCK_STREAM. protocol_factory must be
366 a callable returning a protocol instance.
367
368 This method is a coroutine which will try to establish the connection
369 in the background. When successful, the coroutine returns a
370 (transport, protocol) pair.
371 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700372 if server_hostname is not None and not ssl:
373 raise ValueError('server_hostname is only meaningful with ssl')
374
375 if server_hostname is None and ssl:
376 # Use host as default for server_hostname. It is an error
377 # if host is empty or not set, e.g. when an
378 # already-connected socket was passed or when only a port
379 # is given. To avoid this error, you can pass
380 # server_hostname='' -- this will bypass the hostname
381 # check. (This also means that if host is a numeric
382 # IP/IPv6 address, we will attempt to verify that exact
383 # address; this will probably fail, but it is possible to
384 # create a certificate for a specific IP address, so we
385 # don't judge it here.)
386 if not host:
387 raise ValueError('You must set server_hostname '
388 'when using ssl without a host')
389 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700390
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 if host is not None or port is not None:
392 if sock is not None:
393 raise ValueError(
394 'host/port and sock can not be specified at the same time')
395
396 f1 = self.getaddrinfo(
397 host, port, family=family,
398 type=socket.SOCK_STREAM, proto=proto, flags=flags)
399 fs = [f1]
400 if local_addr is not None:
401 f2 = self.getaddrinfo(
402 *local_addr, family=family,
403 type=socket.SOCK_STREAM, proto=proto, flags=flags)
404 fs.append(f2)
405 else:
406 f2 = None
407
408 yield from tasks.wait(fs, loop=self)
409
410 infos = f1.result()
411 if not infos:
412 raise OSError('getaddrinfo() returned empty list')
413 if f2 is not None:
414 laddr_infos = f2.result()
415 if not laddr_infos:
416 raise OSError('getaddrinfo() returned empty list')
417
418 exceptions = []
419 for family, type, proto, cname, address in infos:
420 try:
421 sock = socket.socket(family=family, type=type, proto=proto)
422 sock.setblocking(False)
423 if f2 is not None:
424 for _, _, _, _, laddr in laddr_infos:
425 try:
426 sock.bind(laddr)
427 break
428 except OSError as exc:
429 exc = OSError(
430 exc.errno, 'error while '
431 'attempting to bind on address '
432 '{!r}: {}'.format(
433 laddr, exc.strerror.lower()))
434 exceptions.append(exc)
435 else:
436 sock.close()
437 sock = None
438 continue
439 yield from self.sock_connect(sock, address)
440 except OSError as exc:
441 if sock is not None:
442 sock.close()
443 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200444 except:
445 if sock is not None:
446 sock.close()
447 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 else:
449 break
450 else:
451 if len(exceptions) == 1:
452 raise exceptions[0]
453 else:
454 # If they all have the same str(), raise one.
455 model = str(exceptions[0])
456 if all(str(exc) == model for exc in exceptions):
457 raise exceptions[0]
458 # Raise a combined exception so the user can see all
459 # the various error messages.
460 raise OSError('Multiple exceptions: {}'.format(
461 ', '.join(str(exc) for exc in exceptions)))
462
463 elif sock is None:
464 raise ValueError(
465 'host and port was not specified and no sock specified')
466
467 sock.setblocking(False)
468
Yury Selivanovb057c522014-02-18 12:15:06 -0500469 transport, protocol = yield from self._create_connection_transport(
470 sock, protocol_factory, ssl, server_hostname)
471 return transport, protocol
472
473 @tasks.coroutine
474 def _create_connection_transport(self, sock, protocol_factory, ssl,
475 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 protocol = protocol_factory()
477 waiter = futures.Future(loop=self)
478 if ssl:
479 sslcontext = None if isinstance(ssl, bool) else ssl
480 transport = self._make_ssl_transport(
481 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700482 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 else:
484 transport = self._make_socket_transport(sock, protocol, waiter)
485
486 yield from waiter
487 return transport, protocol
488
489 @tasks.coroutine
490 def create_datagram_endpoint(self, protocol_factory,
491 local_addr=None, remote_addr=None, *,
492 family=0, proto=0, flags=0):
493 """Create datagram connection."""
494 if not (local_addr or remote_addr):
495 if family == 0:
496 raise ValueError('unexpected address family')
497 addr_pairs_info = (((family, proto), (None, None)),)
498 else:
499 # join addresss by (family, protocol)
500 addr_infos = collections.OrderedDict()
501 for idx, addr in ((0, local_addr), (1, remote_addr)):
502 if addr is not None:
503 assert isinstance(addr, tuple) and len(addr) == 2, (
504 '2-tuple is expected')
505
506 infos = yield from self.getaddrinfo(
507 *addr, family=family, type=socket.SOCK_DGRAM,
508 proto=proto, flags=flags)
509 if not infos:
510 raise OSError('getaddrinfo() returned empty list')
511
512 for fam, _, pro, _, address in infos:
513 key = (fam, pro)
514 if key not in addr_infos:
515 addr_infos[key] = [None, None]
516 addr_infos[key][idx] = address
517
518 # each addr has to have info for each (family, proto) pair
519 addr_pairs_info = [
520 (key, addr_pair) for key, addr_pair in addr_infos.items()
521 if not ((local_addr and addr_pair[0] is None) or
522 (remote_addr and addr_pair[1] is None))]
523
524 if not addr_pairs_info:
525 raise ValueError('can not get address information')
526
527 exceptions = []
528
529 for ((family, proto),
530 (local_address, remote_address)) in addr_pairs_info:
531 sock = None
532 r_addr = None
533 try:
534 sock = socket.socket(
535 family=family, type=socket.SOCK_DGRAM, proto=proto)
536 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
537 sock.setblocking(False)
538
539 if local_addr:
540 sock.bind(local_address)
541 if remote_addr:
542 yield from self.sock_connect(sock, remote_address)
543 r_addr = remote_address
544 except OSError as exc:
545 if sock is not None:
546 sock.close()
547 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200548 except:
549 if sock is not None:
550 sock.close()
551 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 else:
553 break
554 else:
555 raise exceptions[0]
556
557 protocol = protocol_factory()
558 transport = self._make_datagram_transport(sock, protocol, r_addr)
559 return transport, protocol
560
561 @tasks.coroutine
562 def create_server(self, protocol_factory, host=None, port=None,
563 *,
564 family=socket.AF_UNSPEC,
565 flags=socket.AI_PASSIVE,
566 sock=None,
567 backlog=100,
568 ssl=None,
569 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200570 """Create a TCP server bound to host and port.
571
572 Return an AbstractServer object which can be used to stop the service.
573
574 This method is a coroutine.
575 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700576 if isinstance(ssl, bool):
577 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 if host is not None or port is not None:
579 if sock is not None:
580 raise ValueError(
581 'host/port and sock can not be specified at the same time')
582
583 AF_INET6 = getattr(socket, 'AF_INET6', 0)
584 if reuse_address is None:
585 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
586 sockets = []
587 if host == '':
588 host = None
589
590 infos = yield from self.getaddrinfo(
591 host, port, family=family,
592 type=socket.SOCK_STREAM, proto=0, flags=flags)
593 if not infos:
594 raise OSError('getaddrinfo() returned empty list')
595
596 completed = False
597 try:
598 for res in infos:
599 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700600 try:
601 sock = socket.socket(af, socktype, proto)
602 except socket.error:
603 # Assume it's a bad family/type/protocol combination.
604 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 sockets.append(sock)
606 if reuse_address:
607 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
608 True)
609 # Disable IPv4/IPv6 dual stack support (enabled by
610 # default on Linux) which makes a single socket
611 # listen on both address families.
612 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
613 sock.setsockopt(socket.IPPROTO_IPV6,
614 socket.IPV6_V6ONLY,
615 True)
616 try:
617 sock.bind(sa)
618 except OSError as err:
619 raise OSError(err.errno, 'error while attempting '
620 'to bind on address %r: %s'
621 % (sa, err.strerror.lower()))
622 completed = True
623 finally:
624 if not completed:
625 for sock in sockets:
626 sock.close()
627 else:
628 if sock is None:
629 raise ValueError(
630 'host and port was not specified and no sock specified')
631 sockets = [sock]
632
633 server = Server(self, sockets)
634 for sock in sockets:
635 sock.listen(backlog)
636 sock.setblocking(False)
637 self._start_serving(protocol_factory, sock, ssl, server)
638 return server
639
640 @tasks.coroutine
641 def connect_read_pipe(self, protocol_factory, pipe):
642 protocol = protocol_factory()
643 waiter = futures.Future(loop=self)
644 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
645 yield from waiter
646 return transport, protocol
647
648 @tasks.coroutine
649 def connect_write_pipe(self, protocol_factory, pipe):
650 protocol = protocol_factory()
651 waiter = futures.Future(loop=self)
652 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
653 yield from waiter
654 return transport, protocol
655
656 @tasks.coroutine
657 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
658 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
659 universal_newlines=False, shell=True, bufsize=0,
660 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100661 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800662 raise ValueError("cmd must be a string")
663 if universal_newlines:
664 raise ValueError("universal_newlines must be False")
665 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100666 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800667 if bufsize != 0:
668 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 protocol = protocol_factory()
670 transport = yield from self._make_subprocess_transport(
671 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
672 return transport, protocol
673
674 @tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500675 def subprocess_exec(self, protocol_factory, program, *args,
676 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
677 stderr=subprocess.PIPE, universal_newlines=False,
678 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800679 if universal_newlines:
680 raise ValueError("universal_newlines must be False")
681 if shell:
682 raise ValueError("shell must be False")
683 if bufsize != 0:
684 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100685 popen_args = (program,) + args
686 for arg in popen_args:
687 if not isinstance(arg, (str, bytes)):
688 raise TypeError("program arguments must be "
689 "a bytes or text string, not %s"
690 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 protocol = protocol_factory()
692 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500693 protocol, popen_args, False, stdin, stdout, stderr,
694 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 return transport, protocol
696
Yury Selivanov569efa22014-02-18 18:02:19 -0500697 def set_exception_handler(self, handler):
698 """Set handler as the new event loop exception handler.
699
700 If handler is None, the default exception handler will
701 be set.
702
703 If handler is a callable object, it should have a
704 matching signature to '(loop, context)', where 'loop'
705 will be a reference to the active event loop, 'context'
706 will be a dict object (see `call_exception_handler()`
707 documentation for details about context).
708 """
709 if handler is not None and not callable(handler):
710 raise TypeError('A callable object or None is expected, '
711 'got {!r}'.format(handler))
712 self._exception_handler = handler
713
714 def default_exception_handler(self, context):
715 """Default exception handler.
716
717 This is called when an exception occurs and no exception
718 handler is set, and can be called by a custom exception
719 handler that wants to defer to the default behavior.
720
721 context parameter has the same meaning as in
722 `call_exception_handler()`.
723 """
724 message = context.get('message')
725 if not message:
726 message = 'Unhandled exception in event loop'
727
728 exception = context.get('exception')
729 if exception is not None:
730 exc_info = (type(exception), exception, exception.__traceback__)
731 else:
732 exc_info = False
733
734 log_lines = [message]
735 for key in sorted(context):
736 if key in {'message', 'exception'}:
737 continue
738 log_lines.append('{}: {!r}'.format(key, context[key]))
739
740 logger.error('\n'.join(log_lines), exc_info=exc_info)
741
742 def call_exception_handler(self, context):
743 """Call the current event loop exception handler.
744
745 context is a dict object containing the following keys
746 (new keys maybe introduced later):
747 - 'message': Error message;
748 - 'exception' (optional): Exception object;
749 - 'future' (optional): Future instance;
750 - 'handle' (optional): Handle instance;
751 - 'protocol' (optional): Protocol instance;
752 - 'transport' (optional): Transport instance;
753 - 'socket' (optional): Socket instance.
754
755 Note: this method should not be overloaded in subclassed
756 event loops. For any custom exception handling, use
757 `set_exception_handler()` method.
758 """
759 if self._exception_handler is None:
760 try:
761 self.default_exception_handler(context)
762 except Exception:
763 # Second protection layer for unexpected errors
764 # in the default implementation, as well as for subclassed
765 # event loops with overloaded "default_exception_handler".
766 logger.error('Exception in default exception handler',
767 exc_info=True)
768 else:
769 try:
770 self._exception_handler(self, context)
771 except Exception as exc:
772 # Exception in the user set custom exception handler.
773 try:
774 # Let's try default handler.
775 self.default_exception_handler({
776 'message': 'Unhandled error in exception handler',
777 'exception': exc,
778 'context': context,
779 })
780 except Exception:
781 # Guard 'default_exception_handler' in case it's
782 # overloaded.
783 logger.error('Exception in default exception handler '
784 'while handling an unexpected error '
785 'in custom exception handler',
786 exc_info=True)
787
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788 def _add_callback(self, handle):
789 """Add a Handle to ready or scheduled."""
790 assert isinstance(handle, events.Handle), 'A Handle is required here'
791 if handle._cancelled:
792 return
793 if isinstance(handle, events.TimerHandle):
794 heapq.heappush(self._scheduled, handle)
795 else:
796 self._ready.append(handle)
797
798 def _add_callback_signalsafe(self, handle):
799 """Like _add_callback() but called from a signal handler."""
800 self._add_callback(handle)
801 self._write_to_self()
802
803 def _run_once(self):
804 """Run one full iteration of the event loop.
805
806 This calls all currently ready callbacks, polls for I/O,
807 schedules the resulting callbacks, and finally schedules
808 'call_later' callbacks.
809 """
810 # Remove delayed calls that were cancelled from head of queue.
811 while self._scheduled and self._scheduled[0]._cancelled:
812 heapq.heappop(self._scheduled)
813
814 timeout = None
815 if self._ready:
816 timeout = 0
817 elif self._scheduled:
818 # Compute the desired timeout.
819 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700820 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700821
822 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100823 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100824 t0 = self.time()
825 event_list = self._selector.select(timeout)
826 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100827 if t1-t0 >= 1:
828 level = logging.INFO
829 else:
830 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100831 if timeout is not None:
832 logger.log(level, 'poll %.3f took %.3f seconds',
833 timeout, t1-t0)
834 else:
835 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700836 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100837 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700838 self._process_events(event_list)
839
840 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100841 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700842 while self._scheduled:
843 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100844 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700845 break
846 handle = heapq.heappop(self._scheduled)
847 self._ready.append(handle)
848
849 # This is the only place where callbacks are actually *called*.
850 # All other places just add them to ready.
851 # Note: We run all currently scheduled callbacks, but not any
852 # callbacks scheduled by callbacks run this time around --
853 # they will be run the next time (after another I/O poll).
854 # Use an idiom that is threadsafe without using locks.
855 ntodo = len(self._ready)
856 for i in range(ntodo):
857 handle = self._ready.popleft()
858 if not handle._cancelled:
859 handle._run()
860 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100861
862 def get_debug(self):
863 return self._debug
864
865 def set_debug(self, enabled):
866 self._debug = enabled