blob: 0975bcb6fc3d2caf068eddd85b6381205289e677 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
22import socket
23import subprocess
24import time
25import os
26import sys
27
28from . import events
29from . import futures
30from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070031from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
33
34__all__ = ['BaseEventLoop', 'Server']
35
36
37# Argument for default thread pool executor creation.
38_MAX_WORKERS = 5
39
40
Victor Stinner0e6f52a2014-06-20 17:34:15 +020041def _format_handle(handle):
42 cb = handle._callback
43 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
44 # format the task
45 return repr(cb.__self__)
46 else:
47 return str(handle)
48
49
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050class _StopError(BaseException):
51 """Raised to stop the event loop."""
52
53
Victor Stinner1b0580b2014-02-13 09:24:37 +010054def _check_resolved_address(sock, address):
55 # Ensure that the address is already resolved to avoid the trap of hanging
56 # the entire event loop when the address requires doing a DNS lookup.
57 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010058 if family == socket.AF_INET:
59 host, port = address
60 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010061 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010062 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010063 return
64
Victor Stinner1b0580b2014-02-13 09:24:37 +010065 type_mask = 0
66 if hasattr(socket, 'SOCK_NONBLOCK'):
67 type_mask |= socket.SOCK_NONBLOCK
68 if hasattr(socket, 'SOCK_CLOEXEC'):
69 type_mask |= socket.SOCK_CLOEXEC
70 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
71 # already resolved.
72 try:
73 socket.getaddrinfo(host, port,
74 family=family,
75 type=(sock.type & ~type_mask),
76 proto=sock.proto,
77 flags=socket.AI_NUMERICHOST)
78 except socket.gaierror as err:
79 raise ValueError("address must be resolved (IP address), got %r: %s"
80 % (address, err))
81
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082def _raise_stop_error(*args):
83 raise _StopError
84
85
86class Server(events.AbstractServer):
87
88 def __init__(self, loop, sockets):
89 self.loop = loop
90 self.sockets = sockets
91 self.active_count = 0
92 self.waiters = []
93
94 def attach(self, transport):
95 assert self.sockets is not None
96 self.active_count += 1
97
98 def detach(self, transport):
99 assert self.active_count > 0
100 self.active_count -= 1
101 if self.active_count == 0 and self.sockets is None:
102 self._wakeup()
103
104 def close(self):
105 sockets = self.sockets
106 if sockets is not None:
107 self.sockets = None
108 for sock in sockets:
109 self.loop._stop_serving(sock)
110 if self.active_count == 0:
111 self._wakeup()
112
113 def _wakeup(self):
114 waiters = self.waiters
115 self.waiters = None
116 for waiter in waiters:
117 if not waiter.done():
118 waiter.set_result(waiter)
119
120 @tasks.coroutine
121 def wait_closed(self):
122 if self.sockets is None or self.waiters is None:
123 return
124 waiter = futures.Future(loop=self.loop)
125 self.waiters.append(waiter)
126 yield from waiter
127
128
129class BaseEventLoop(events.AbstractEventLoop):
130
131 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200132 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 self._ready = collections.deque()
134 self._scheduled = []
135 self._default_executor = None
136 self._internal_fds = 0
137 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100138 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500139 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200140 self._debug = (not sys.flags.ignore_environment
141 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200142 # In debug mode, if the execution of a callback or a step of a task
143 # exceed this duration in seconds, the slow callback/task is logged.
144 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200146 def __repr__(self):
147 return ('<%s running=%s closed=%s debug=%s>'
148 % (self.__class__.__name__, self.is_running(),
149 self.is_closed(), self.get_debug()))
150
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 def _make_socket_transport(self, sock, protocol, waiter=None, *,
152 extra=None, server=None):
153 """Create socket transport."""
154 raise NotImplementedError
155
156 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
157 server_side=False, server_hostname=None,
158 extra=None, server=None):
159 """Create SSL transport."""
160 raise NotImplementedError
161
162 def _make_datagram_transport(self, sock, protocol,
163 address=None, extra=None):
164 """Create datagram transport."""
165 raise NotImplementedError
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 """Create read pipe transport."""
170 raise NotImplementedError
171
172 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
173 extra=None):
174 """Create write pipe transport."""
175 raise NotImplementedError
176
177 @tasks.coroutine
178 def _make_subprocess_transport(self, protocol, args, shell,
179 stdin, stdout, stderr, bufsize,
180 extra=None, **kwargs):
181 """Create subprocess transport."""
182 raise NotImplementedError
183
184 def _read_from_self(self):
185 """XXX"""
186 raise NotImplementedError
187
188 def _write_to_self(self):
189 """XXX"""
190 raise NotImplementedError
191
192 def _process_events(self, event_list):
193 """Process selector events."""
194 raise NotImplementedError
195
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200196 def _check_closed(self):
197 if self._closed:
198 raise RuntimeError('Event loop is closed')
199
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 def run_forever(self):
201 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200202 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 if self._running:
204 raise RuntimeError('Event loop is running.')
205 self._running = True
206 try:
207 while True:
208 try:
209 self._run_once()
210 except _StopError:
211 break
212 finally:
213 self._running = False
214
215 def run_until_complete(self, future):
216 """Run until the Future is done.
217
218 If the argument is a coroutine, it is wrapped in a Task.
219
220 XXX TBD: It would be disastrous to call run_until_complete()
221 with the same coroutine twice -- it would wrap it in two
222 different Tasks and that can't be good.
223
224 Return the Future's result, or raise its exception.
225 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200226 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 future = tasks.async(future, loop=self)
228 future.add_done_callback(_raise_stop_error)
229 self.run_forever()
230 future.remove_done_callback(_raise_stop_error)
231 if not future.done():
232 raise RuntimeError('Event loop stopped before Future completed.')
233
234 return future.result()
235
236 def stop(self):
237 """Stop running the event loop.
238
239 Every callback scheduled before stop() is called will run.
240 Callback scheduled after stop() is called won't. However,
241 those callbacks will run if run() is called again later.
242 """
243 self.call_soon(_raise_stop_error)
244
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200245 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700246 """Close the event loop.
247
248 This clears the queues and shuts down the executor,
249 but does not wait for the executor to finish.
250 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200251 if self._closed:
252 return
253 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200254 self._ready.clear()
255 self._scheduled.clear()
256 executor = self._default_executor
257 if executor is not None:
258 self._default_executor = None
259 executor.shutdown(wait=False)
260
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200261 def is_closed(self):
262 """Returns True if the event loop was closed."""
263 return self._closed
264
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 def is_running(self):
266 """Returns running status of event loop."""
267 return self._running
268
269 def time(self):
270 """Return the time according to the event loop's clock."""
271 return time.monotonic()
272
273 def call_later(self, delay, callback, *args):
274 """Arrange for a callback to be called at a given time.
275
276 Return a Handle: an opaque object with a cancel() method that
277 can be used to cancel the call.
278
279 The delay can be an int or float, expressed in seconds. It is
280 always a relative time.
281
282 Each callback will be called exactly once. If two callbacks
283 are scheduled for exactly the same time, it undefined which
284 will be called first.
285
286 Any positional arguments after the callback will be passed to
287 the callback when it is called.
288 """
289 return self.call_at(self.time() + delay, callback, *args)
290
291 def call_at(self, when, callback, *args):
292 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100293 if tasks.iscoroutinefunction(callback):
294 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100295 if self._debug:
296 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500297 timer = events.TimerHandle(when, callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 heapq.heappush(self._scheduled, timer)
299 return timer
300
301 def call_soon(self, callback, *args):
302 """Arrange for a callback to be called as soon as possible.
303
304 This operates as a FIFO queue, callbacks are called in the
305 order in which they are registered. Each callback will be
306 called exactly once.
307
308 Any positional arguments after the callback will be passed to
309 the callback when it is called.
310 """
Victor Stinner93569c22014-03-21 10:00:52 +0100311 return self._call_soon(callback, args, check_loop=True)
312
313 def _call_soon(self, callback, args, check_loop):
Victor Stinner9af4a242014-02-11 11:34:30 +0100314 if tasks.iscoroutinefunction(callback):
315 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100316 if self._debug and check_loop:
317 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500318 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 self._ready.append(handle)
320 return handle
321
Victor Stinner93569c22014-03-21 10:00:52 +0100322 def _assert_is_current_event_loop(self):
323 """Asserts that this event loop is the current event loop.
324
325 Non-threadsafe methods of this class make this assumption and will
326 likely behave incorrectly when the assumption is violated.
327
328 Should only be called when (self._debug == True). The caller is
329 responsible for checking this condition for performance reasons.
330 """
Victor Stinnerd6de5d82014-06-23 00:03:43 +0200331 current = events.get_event_loop()
332 if current is not None and current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100333 raise RuntimeError(
334 "non-threadsafe operation invoked on an event loop other "
335 "than the current one")
336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 def call_soon_threadsafe(self, callback, *args):
Victor Stinnerd1432092014-06-19 17:11:49 +0200338 """Like call_soon(), but thread safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100339 handle = self._call_soon(callback, args, check_loop=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 self._write_to_self()
341 return handle
342
343 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100344 if tasks.iscoroutinefunction(callback):
345 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 if isinstance(callback, events.Handle):
347 assert not args
348 assert not isinstance(callback, events.TimerHandle)
349 if callback._cancelled:
350 f = futures.Future(loop=self)
351 f.set_result(None)
352 return f
353 callback, args = callback._callback, callback._args
354 if executor is None:
355 executor = self._default_executor
356 if executor is None:
357 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
358 self._default_executor = executor
359 return futures.wrap_future(executor.submit(callback, *args), loop=self)
360
361 def set_default_executor(self, executor):
362 self._default_executor = executor
363
364 def getaddrinfo(self, host, port, *,
365 family=0, type=0, proto=0, flags=0):
366 return self.run_in_executor(None, socket.getaddrinfo,
367 host, port, family, type, proto, flags)
368
369 def getnameinfo(self, sockaddr, flags=0):
370 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
371
372 @tasks.coroutine
373 def create_connection(self, protocol_factory, host=None, port=None, *,
374 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700375 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200376 """Connect to a TCP server.
377
378 Create a streaming transport connection to a given Internet host and
379 port: socket family AF_INET or socket.AF_INET6 depending on host (or
380 family if specified), socket type SOCK_STREAM. protocol_factory must be
381 a callable returning a protocol instance.
382
383 This method is a coroutine which will try to establish the connection
384 in the background. When successful, the coroutine returns a
385 (transport, protocol) pair.
386 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700387 if server_hostname is not None and not ssl:
388 raise ValueError('server_hostname is only meaningful with ssl')
389
390 if server_hostname is None and ssl:
391 # Use host as default for server_hostname. It is an error
392 # if host is empty or not set, e.g. when an
393 # already-connected socket was passed or when only a port
394 # is given. To avoid this error, you can pass
395 # server_hostname='' -- this will bypass the hostname
396 # check. (This also means that if host is a numeric
397 # IP/IPv6 address, we will attempt to verify that exact
398 # address; this will probably fail, but it is possible to
399 # create a certificate for a specific IP address, so we
400 # don't judge it here.)
401 if not host:
402 raise ValueError('You must set server_hostname '
403 'when using ssl without a host')
404 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700405
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 if host is not None or port is not None:
407 if sock is not None:
408 raise ValueError(
409 'host/port and sock can not be specified at the same time')
410
411 f1 = self.getaddrinfo(
412 host, port, family=family,
413 type=socket.SOCK_STREAM, proto=proto, flags=flags)
414 fs = [f1]
415 if local_addr is not None:
416 f2 = self.getaddrinfo(
417 *local_addr, family=family,
418 type=socket.SOCK_STREAM, proto=proto, flags=flags)
419 fs.append(f2)
420 else:
421 f2 = None
422
423 yield from tasks.wait(fs, loop=self)
424
425 infos = f1.result()
426 if not infos:
427 raise OSError('getaddrinfo() returned empty list')
428 if f2 is not None:
429 laddr_infos = f2.result()
430 if not laddr_infos:
431 raise OSError('getaddrinfo() returned empty list')
432
433 exceptions = []
434 for family, type, proto, cname, address in infos:
435 try:
436 sock = socket.socket(family=family, type=type, proto=proto)
437 sock.setblocking(False)
438 if f2 is not None:
439 for _, _, _, _, laddr in laddr_infos:
440 try:
441 sock.bind(laddr)
442 break
443 except OSError as exc:
444 exc = OSError(
445 exc.errno, 'error while '
446 'attempting to bind on address '
447 '{!r}: {}'.format(
448 laddr, exc.strerror.lower()))
449 exceptions.append(exc)
450 else:
451 sock.close()
452 sock = None
453 continue
454 yield from self.sock_connect(sock, address)
455 except OSError as exc:
456 if sock is not None:
457 sock.close()
458 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200459 except:
460 if sock is not None:
461 sock.close()
462 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 else:
464 break
465 else:
466 if len(exceptions) == 1:
467 raise exceptions[0]
468 else:
469 # If they all have the same str(), raise one.
470 model = str(exceptions[0])
471 if all(str(exc) == model for exc in exceptions):
472 raise exceptions[0]
473 # Raise a combined exception so the user can see all
474 # the various error messages.
475 raise OSError('Multiple exceptions: {}'.format(
476 ', '.join(str(exc) for exc in exceptions)))
477
478 elif sock is None:
479 raise ValueError(
480 'host and port was not specified and no sock specified')
481
482 sock.setblocking(False)
483
Yury Selivanovb057c522014-02-18 12:15:06 -0500484 transport, protocol = yield from self._create_connection_transport(
485 sock, protocol_factory, ssl, server_hostname)
486 return transport, protocol
487
488 @tasks.coroutine
489 def _create_connection_transport(self, sock, protocol_factory, ssl,
490 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 protocol = protocol_factory()
492 waiter = futures.Future(loop=self)
493 if ssl:
494 sslcontext = None if isinstance(ssl, bool) else ssl
495 transport = self._make_ssl_transport(
496 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700497 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 else:
499 transport = self._make_socket_transport(sock, protocol, waiter)
500
501 yield from waiter
502 return transport, protocol
503
504 @tasks.coroutine
505 def create_datagram_endpoint(self, protocol_factory,
506 local_addr=None, remote_addr=None, *,
507 family=0, proto=0, flags=0):
508 """Create datagram connection."""
509 if not (local_addr or remote_addr):
510 if family == 0:
511 raise ValueError('unexpected address family')
512 addr_pairs_info = (((family, proto), (None, None)),)
513 else:
514 # join addresss by (family, protocol)
515 addr_infos = collections.OrderedDict()
516 for idx, addr in ((0, local_addr), (1, remote_addr)):
517 if addr is not None:
518 assert isinstance(addr, tuple) and len(addr) == 2, (
519 '2-tuple is expected')
520
521 infos = yield from self.getaddrinfo(
522 *addr, family=family, type=socket.SOCK_DGRAM,
523 proto=proto, flags=flags)
524 if not infos:
525 raise OSError('getaddrinfo() returned empty list')
526
527 for fam, _, pro, _, address in infos:
528 key = (fam, pro)
529 if key not in addr_infos:
530 addr_infos[key] = [None, None]
531 addr_infos[key][idx] = address
532
533 # each addr has to have info for each (family, proto) pair
534 addr_pairs_info = [
535 (key, addr_pair) for key, addr_pair in addr_infos.items()
536 if not ((local_addr and addr_pair[0] is None) or
537 (remote_addr and addr_pair[1] is None))]
538
539 if not addr_pairs_info:
540 raise ValueError('can not get address information')
541
542 exceptions = []
543
544 for ((family, proto),
545 (local_address, remote_address)) in addr_pairs_info:
546 sock = None
547 r_addr = None
548 try:
549 sock = socket.socket(
550 family=family, type=socket.SOCK_DGRAM, proto=proto)
551 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
552 sock.setblocking(False)
553
554 if local_addr:
555 sock.bind(local_address)
556 if remote_addr:
557 yield from self.sock_connect(sock, remote_address)
558 r_addr = remote_address
559 except OSError as exc:
560 if sock is not None:
561 sock.close()
562 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200563 except:
564 if sock is not None:
565 sock.close()
566 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 else:
568 break
569 else:
570 raise exceptions[0]
571
572 protocol = protocol_factory()
573 transport = self._make_datagram_transport(sock, protocol, r_addr)
574 return transport, protocol
575
576 @tasks.coroutine
577 def create_server(self, protocol_factory, host=None, port=None,
578 *,
579 family=socket.AF_UNSPEC,
580 flags=socket.AI_PASSIVE,
581 sock=None,
582 backlog=100,
583 ssl=None,
584 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200585 """Create a TCP server bound to host and port.
586
587 Return an AbstractServer object which can be used to stop the service.
588
589 This method is a coroutine.
590 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700591 if isinstance(ssl, bool):
592 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593 if host is not None or port is not None:
594 if sock is not None:
595 raise ValueError(
596 'host/port and sock can not be specified at the same time')
597
598 AF_INET6 = getattr(socket, 'AF_INET6', 0)
599 if reuse_address is None:
600 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
601 sockets = []
602 if host == '':
603 host = None
604
605 infos = yield from self.getaddrinfo(
606 host, port, family=family,
607 type=socket.SOCK_STREAM, proto=0, flags=flags)
608 if not infos:
609 raise OSError('getaddrinfo() returned empty list')
610
611 completed = False
612 try:
613 for res in infos:
614 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700615 try:
616 sock = socket.socket(af, socktype, proto)
617 except socket.error:
618 # Assume it's a bad family/type/protocol combination.
619 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 sockets.append(sock)
621 if reuse_address:
622 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
623 True)
624 # Disable IPv4/IPv6 dual stack support (enabled by
625 # default on Linux) which makes a single socket
626 # listen on both address families.
627 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
628 sock.setsockopt(socket.IPPROTO_IPV6,
629 socket.IPV6_V6ONLY,
630 True)
631 try:
632 sock.bind(sa)
633 except OSError as err:
634 raise OSError(err.errno, 'error while attempting '
635 'to bind on address %r: %s'
636 % (sa, err.strerror.lower()))
637 completed = True
638 finally:
639 if not completed:
640 for sock in sockets:
641 sock.close()
642 else:
643 if sock is None:
644 raise ValueError(
645 'host and port was not specified and no sock specified')
646 sockets = [sock]
647
648 server = Server(self, sockets)
649 for sock in sockets:
650 sock.listen(backlog)
651 sock.setblocking(False)
652 self._start_serving(protocol_factory, sock, ssl, server)
653 return server
654
655 @tasks.coroutine
656 def connect_read_pipe(self, protocol_factory, pipe):
657 protocol = protocol_factory()
658 waiter = futures.Future(loop=self)
659 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
660 yield from waiter
661 return transport, protocol
662
663 @tasks.coroutine
664 def connect_write_pipe(self, protocol_factory, pipe):
665 protocol = protocol_factory()
666 waiter = futures.Future(loop=self)
667 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
668 yield from waiter
669 return transport, protocol
670
671 @tasks.coroutine
672 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
673 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
674 universal_newlines=False, shell=True, bufsize=0,
675 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100676 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800677 raise ValueError("cmd must be a string")
678 if universal_newlines:
679 raise ValueError("universal_newlines must be False")
680 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100681 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800682 if bufsize != 0:
683 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 protocol = protocol_factory()
685 transport = yield from self._make_subprocess_transport(
686 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
687 return transport, protocol
688
689 @tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500690 def subprocess_exec(self, protocol_factory, program, *args,
691 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
692 stderr=subprocess.PIPE, universal_newlines=False,
693 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800694 if universal_newlines:
695 raise ValueError("universal_newlines must be False")
696 if shell:
697 raise ValueError("shell must be False")
698 if bufsize != 0:
699 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100700 popen_args = (program,) + args
701 for arg in popen_args:
702 if not isinstance(arg, (str, bytes)):
703 raise TypeError("program arguments must be "
704 "a bytes or text string, not %s"
705 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 protocol = protocol_factory()
707 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500708 protocol, popen_args, False, stdin, stdout, stderr,
709 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 return transport, protocol
711
Yury Selivanov569efa22014-02-18 18:02:19 -0500712 def set_exception_handler(self, handler):
713 """Set handler as the new event loop exception handler.
714
715 If handler is None, the default exception handler will
716 be set.
717
718 If handler is a callable object, it should have a
719 matching signature to '(loop, context)', where 'loop'
720 will be a reference to the active event loop, 'context'
721 will be a dict object (see `call_exception_handler()`
722 documentation for details about context).
723 """
724 if handler is not None and not callable(handler):
725 raise TypeError('A callable object or None is expected, '
726 'got {!r}'.format(handler))
727 self._exception_handler = handler
728
729 def default_exception_handler(self, context):
730 """Default exception handler.
731
732 This is called when an exception occurs and no exception
733 handler is set, and can be called by a custom exception
734 handler that wants to defer to the default behavior.
735
736 context parameter has the same meaning as in
737 `call_exception_handler()`.
738 """
739 message = context.get('message')
740 if not message:
741 message = 'Unhandled exception in event loop'
742
743 exception = context.get('exception')
744 if exception is not None:
745 exc_info = (type(exception), exception, exception.__traceback__)
746 else:
747 exc_info = False
748
749 log_lines = [message]
750 for key in sorted(context):
751 if key in {'message', 'exception'}:
752 continue
753 log_lines.append('{}: {!r}'.format(key, context[key]))
754
755 logger.error('\n'.join(log_lines), exc_info=exc_info)
756
757 def call_exception_handler(self, context):
758 """Call the current event loop exception handler.
759
760 context is a dict object containing the following keys
761 (new keys maybe introduced later):
762 - 'message': Error message;
763 - 'exception' (optional): Exception object;
764 - 'future' (optional): Future instance;
765 - 'handle' (optional): Handle instance;
766 - 'protocol' (optional): Protocol instance;
767 - 'transport' (optional): Transport instance;
768 - 'socket' (optional): Socket instance.
769
770 Note: this method should not be overloaded in subclassed
771 event loops. For any custom exception handling, use
772 `set_exception_handler()` method.
773 """
774 if self._exception_handler is None:
775 try:
776 self.default_exception_handler(context)
777 except Exception:
778 # Second protection layer for unexpected errors
779 # in the default implementation, as well as for subclassed
780 # event loops with overloaded "default_exception_handler".
781 logger.error('Exception in default exception handler',
782 exc_info=True)
783 else:
784 try:
785 self._exception_handler(self, context)
786 except Exception as exc:
787 # Exception in the user set custom exception handler.
788 try:
789 # Let's try default handler.
790 self.default_exception_handler({
791 'message': 'Unhandled error in exception handler',
792 'exception': exc,
793 'context': context,
794 })
795 except Exception:
796 # Guard 'default_exception_handler' in case it's
797 # overloaded.
798 logger.error('Exception in default exception handler '
799 'while handling an unexpected error '
800 'in custom exception handler',
801 exc_info=True)
802
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700803 def _add_callback(self, handle):
804 """Add a Handle to ready or scheduled."""
805 assert isinstance(handle, events.Handle), 'A Handle is required here'
806 if handle._cancelled:
807 return
808 if isinstance(handle, events.TimerHandle):
809 heapq.heappush(self._scheduled, handle)
810 else:
811 self._ready.append(handle)
812
813 def _add_callback_signalsafe(self, handle):
814 """Like _add_callback() but called from a signal handler."""
815 self._add_callback(handle)
816 self._write_to_self()
817
818 def _run_once(self):
819 """Run one full iteration of the event loop.
820
821 This calls all currently ready callbacks, polls for I/O,
822 schedules the resulting callbacks, and finally schedules
823 'call_later' callbacks.
824 """
825 # Remove delayed calls that were cancelled from head of queue.
826 while self._scheduled and self._scheduled[0]._cancelled:
827 heapq.heappop(self._scheduled)
828
829 timeout = None
830 if self._ready:
831 timeout = 0
832 elif self._scheduled:
833 # Compute the desired timeout.
834 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700835 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700836
837 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100838 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100839 t0 = self.time()
840 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200841 dt = self.time() - t0
842 if dt >= 1:
Victor Stinner22463aa2014-01-20 23:56:40 +0100843 level = logging.INFO
844 else:
845 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100846 if timeout is not None:
847 logger.log(level, 'poll %.3f took %.3f seconds',
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200848 timeout, dt)
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100849 else:
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200850 logger.log(level, 'poll took %.3f seconds', dt)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700851 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100852 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700853 self._process_events(event_list)
854
855 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100856 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700857 while self._scheduled:
858 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100859 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700860 break
861 handle = heapq.heappop(self._scheduled)
862 self._ready.append(handle)
863
864 # This is the only place where callbacks are actually *called*.
865 # All other places just add them to ready.
866 # Note: We run all currently scheduled callbacks, but not any
867 # callbacks scheduled by callbacks run this time around --
868 # they will be run the next time (after another I/O poll).
869 # Use an idiom that is threadsafe without using locks.
870 ntodo = len(self._ready)
871 for i in range(ntodo):
872 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200873 if handle._cancelled:
874 continue
875 if self._debug:
876 t0 = self.time()
877 handle._run()
878 dt = self.time() - t0
879 if dt >= self.slow_callback_duration:
880 logger.warning('Executing %s took %.3f seconds',
881 _format_handle(handle), dt)
882 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700883 handle._run()
884 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100885
886 def get_debug(self):
887 return self._debug
888
889 def set_debug(self, enabled):
890 self._debug = enabled