blob: c42e7f9848ee48912b14f9d97a151e27bc88c542 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
22import socket
23import subprocess
Victor Stinner80f53aa2014-06-27 13:52:20 +020024import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025import time
26import os
27import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
43
Victor Stinner0e6f52a2014-06-20 17:34:15 +020044def _format_handle(handle):
45 cb = handle._callback
46 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
47 # format the task
48 return repr(cb.__self__)
49 else:
50 return str(handle)
51
52
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053class _StopError(BaseException):
54 """Raised to stop the event loop."""
55
56
Victor Stinner1b0580b2014-02-13 09:24:37 +010057def _check_resolved_address(sock, address):
58 # Ensure that the address is already resolved to avoid the trap of hanging
59 # the entire event loop when the address requires doing a DNS lookup.
60 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010061 if family == socket.AF_INET:
62 host, port = address
63 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010064 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010065 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010066 return
67
Victor Stinner1b0580b2014-02-13 09:24:37 +010068 type_mask = 0
69 if hasattr(socket, 'SOCK_NONBLOCK'):
70 type_mask |= socket.SOCK_NONBLOCK
71 if hasattr(socket, 'SOCK_CLOEXEC'):
72 type_mask |= socket.SOCK_CLOEXEC
73 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
74 # already resolved.
75 try:
76 socket.getaddrinfo(host, port,
77 family=family,
78 type=(sock.type & ~type_mask),
79 proto=sock.proto,
80 flags=socket.AI_NUMERICHOST)
81 except socket.gaierror as err:
82 raise ValueError("address must be resolved (IP address), got %r: %s"
83 % (address, err))
84
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085def _raise_stop_error(*args):
86 raise _StopError
87
88
89class Server(events.AbstractServer):
90
91 def __init__(self, loop, sockets):
92 self.loop = loop
93 self.sockets = sockets
94 self.active_count = 0
95 self.waiters = []
96
97 def attach(self, transport):
98 assert self.sockets is not None
99 self.active_count += 1
100
101 def detach(self, transport):
102 assert self.active_count > 0
103 self.active_count -= 1
104 if self.active_count == 0 and self.sockets is None:
105 self._wakeup()
106
107 def close(self):
108 sockets = self.sockets
109 if sockets is not None:
110 self.sockets = None
111 for sock in sockets:
112 self.loop._stop_serving(sock)
113 if self.active_count == 0:
114 self._wakeup()
115
116 def _wakeup(self):
117 waiters = self.waiters
118 self.waiters = None
119 for waiter in waiters:
120 if not waiter.done():
121 waiter.set_result(waiter)
122
Victor Stinnerf951d282014-06-29 00:46:45 +0200123 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 def wait_closed(self):
125 if self.sockets is None or self.waiters is None:
126 return
127 waiter = futures.Future(loop=self.loop)
128 self.waiters.append(waiter)
129 yield from waiter
130
131
132class BaseEventLoop(events.AbstractEventLoop):
133
134 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200135 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 self._ready = collections.deque()
137 self._scheduled = []
138 self._default_executor = None
139 self._internal_fds = 0
140 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100141 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500142 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200143 self._debug = (not sys.flags.ignore_environment
144 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200145 # In debug mode, if the execution of a callback or a step of a task
146 # exceed this duration in seconds, the slow callback/task is logged.
147 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200149 def __repr__(self):
150 return ('<%s running=%s closed=%s debug=%s>'
151 % (self.__class__.__name__, self.is_running(),
152 self.is_closed(), self.get_debug()))
153
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 def _make_socket_transport(self, sock, protocol, waiter=None, *,
155 extra=None, server=None):
156 """Create socket transport."""
157 raise NotImplementedError
158
159 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
160 server_side=False, server_hostname=None,
161 extra=None, server=None):
162 """Create SSL transport."""
163 raise NotImplementedError
164
165 def _make_datagram_transport(self, sock, protocol,
166 address=None, extra=None):
167 """Create datagram transport."""
168 raise NotImplementedError
169
170 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
171 extra=None):
172 """Create read pipe transport."""
173 raise NotImplementedError
174
175 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
176 extra=None):
177 """Create write pipe transport."""
178 raise NotImplementedError
179
Victor Stinnerf951d282014-06-29 00:46:45 +0200180 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 def _make_subprocess_transport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
183 extra=None, **kwargs):
184 """Create subprocess transport."""
185 raise NotImplementedError
186
187 def _read_from_self(self):
188 """XXX"""
189 raise NotImplementedError
190
191 def _write_to_self(self):
192 """XXX"""
193 raise NotImplementedError
194
195 def _process_events(self, event_list):
196 """Process selector events."""
197 raise NotImplementedError
198
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200199 def _check_closed(self):
200 if self._closed:
201 raise RuntimeError('Event loop is closed')
202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 def run_forever(self):
204 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200205 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 if self._running:
207 raise RuntimeError('Event loop is running.')
208 self._running = True
209 try:
210 while True:
211 try:
212 self._run_once()
213 except _StopError:
214 break
215 finally:
216 self._running = False
217
218 def run_until_complete(self, future):
219 """Run until the Future is done.
220
221 If the argument is a coroutine, it is wrapped in a Task.
222
223 XXX TBD: It would be disastrous to call run_until_complete()
224 with the same coroutine twice -- it would wrap it in two
225 different Tasks and that can't be good.
226
227 Return the Future's result, or raise its exception.
228 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200229 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 future = tasks.async(future, loop=self)
231 future.add_done_callback(_raise_stop_error)
232 self.run_forever()
233 future.remove_done_callback(_raise_stop_error)
234 if not future.done():
235 raise RuntimeError('Event loop stopped before Future completed.')
236
237 return future.result()
238
239 def stop(self):
240 """Stop running the event loop.
241
242 Every callback scheduled before stop() is called will run.
243 Callback scheduled after stop() is called won't. However,
244 those callbacks will run if run() is called again later.
245 """
246 self.call_soon(_raise_stop_error)
247
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200248 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700249 """Close the event loop.
250
251 This clears the queues and shuts down the executor,
252 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200253
254 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700255 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200256 if self._running:
257 raise RuntimeError("cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200258 if self._closed:
259 return
260 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200261 self._ready.clear()
262 self._scheduled.clear()
263 executor = self._default_executor
264 if executor is not None:
265 self._default_executor = None
266 executor.shutdown(wait=False)
267
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200268 def is_closed(self):
269 """Returns True if the event loop was closed."""
270 return self._closed
271
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 def is_running(self):
273 """Returns running status of event loop."""
274 return self._running
275
276 def time(self):
277 """Return the time according to the event loop's clock."""
278 return time.monotonic()
279
280 def call_later(self, delay, callback, *args):
281 """Arrange for a callback to be called at a given time.
282
283 Return a Handle: an opaque object with a cancel() method that
284 can be used to cancel the call.
285
286 The delay can be an int or float, expressed in seconds. It is
287 always a relative time.
288
289 Each callback will be called exactly once. If two callbacks
290 are scheduled for exactly the same time, it undefined which
291 will be called first.
292
293 Any positional arguments after the callback will be passed to
294 the callback when it is called.
295 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200296 timer = self.call_at(self.time() + delay, callback, *args)
297 if timer._source_traceback:
298 del timer._source_traceback[-1]
299 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300
301 def call_at(self, when, callback, *args):
302 """Like call_later(), but uses an absolute time."""
Victor Stinnerf951d282014-06-29 00:46:45 +0200303 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100304 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100305 if self._debug:
306 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500307 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200308 if timer._source_traceback:
309 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 heapq.heappush(self._scheduled, timer)
311 return timer
312
313 def call_soon(self, callback, *args):
314 """Arrange for a callback to be called as soon as possible.
315
316 This operates as a FIFO queue, callbacks are called in the
317 order in which they are registered. Each callback will be
318 called exactly once.
319
320 Any positional arguments after the callback will be passed to
321 the callback when it is called.
322 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200323 handle = self._call_soon(callback, args, check_loop=True)
324 if handle._source_traceback:
325 del handle._source_traceback[-1]
326 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100327
328 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200329 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100330 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100331 if self._debug and check_loop:
332 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500333 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200334 if handle._source_traceback:
335 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 self._ready.append(handle)
337 return handle
338
Victor Stinner93569c22014-03-21 10:00:52 +0100339 def _assert_is_current_event_loop(self):
340 """Asserts that this event loop is the current event loop.
341
342 Non-threadsafe methods of this class make this assumption and will
343 likely behave incorrectly when the assumption is violated.
344
345 Should only be called when (self._debug == True). The caller is
346 responsible for checking this condition for performance reasons.
347 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200348 try:
349 current = events.get_event_loop()
350 except AssertionError:
351 return
352 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100353 raise RuntimeError(
354 "non-threadsafe operation invoked on an event loop other "
355 "than the current one")
356
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 def call_soon_threadsafe(self, callback, *args):
Victor Stinnerd1432092014-06-19 17:11:49 +0200358 """Like call_soon(), but thread safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100359 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200360 if handle._source_traceback:
361 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 self._write_to_self()
363 return handle
364
365 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200366 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100367 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 if isinstance(callback, events.Handle):
369 assert not args
370 assert not isinstance(callback, events.TimerHandle)
371 if callback._cancelled:
372 f = futures.Future(loop=self)
373 f.set_result(None)
374 return f
375 callback, args = callback._callback, callback._args
376 if executor is None:
377 executor = self._default_executor
378 if executor is None:
379 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
380 self._default_executor = executor
381 return futures.wrap_future(executor.submit(callback, *args), loop=self)
382
383 def set_default_executor(self, executor):
384 self._default_executor = executor
385
386 def getaddrinfo(self, host, port, *,
387 family=0, type=0, proto=0, flags=0):
388 return self.run_in_executor(None, socket.getaddrinfo,
389 host, port, family, type, proto, flags)
390
391 def getnameinfo(self, sockaddr, flags=0):
392 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
393
Victor Stinnerf951d282014-06-29 00:46:45 +0200394 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 def create_connection(self, protocol_factory, host=None, port=None, *,
396 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700397 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200398 """Connect to a TCP server.
399
400 Create a streaming transport connection to a given Internet host and
401 port: socket family AF_INET or socket.AF_INET6 depending on host (or
402 family if specified), socket type SOCK_STREAM. protocol_factory must be
403 a callable returning a protocol instance.
404
405 This method is a coroutine which will try to establish the connection
406 in the background. When successful, the coroutine returns a
407 (transport, protocol) pair.
408 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700409 if server_hostname is not None and not ssl:
410 raise ValueError('server_hostname is only meaningful with ssl')
411
412 if server_hostname is None and ssl:
413 # Use host as default for server_hostname. It is an error
414 # if host is empty or not set, e.g. when an
415 # already-connected socket was passed or when only a port
416 # is given. To avoid this error, you can pass
417 # server_hostname='' -- this will bypass the hostname
418 # check. (This also means that if host is a numeric
419 # IP/IPv6 address, we will attempt to verify that exact
420 # address; this will probably fail, but it is possible to
421 # create a certificate for a specific IP address, so we
422 # don't judge it here.)
423 if not host:
424 raise ValueError('You must set server_hostname '
425 'when using ssl without a host')
426 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700427
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 if host is not None or port is not None:
429 if sock is not None:
430 raise ValueError(
431 'host/port and sock can not be specified at the same time')
432
433 f1 = self.getaddrinfo(
434 host, port, family=family,
435 type=socket.SOCK_STREAM, proto=proto, flags=flags)
436 fs = [f1]
437 if local_addr is not None:
438 f2 = self.getaddrinfo(
439 *local_addr, family=family,
440 type=socket.SOCK_STREAM, proto=proto, flags=flags)
441 fs.append(f2)
442 else:
443 f2 = None
444
445 yield from tasks.wait(fs, loop=self)
446
447 infos = f1.result()
448 if not infos:
449 raise OSError('getaddrinfo() returned empty list')
450 if f2 is not None:
451 laddr_infos = f2.result()
452 if not laddr_infos:
453 raise OSError('getaddrinfo() returned empty list')
454
455 exceptions = []
456 for family, type, proto, cname, address in infos:
457 try:
458 sock = socket.socket(family=family, type=type, proto=proto)
459 sock.setblocking(False)
460 if f2 is not None:
461 for _, _, _, _, laddr in laddr_infos:
462 try:
463 sock.bind(laddr)
464 break
465 except OSError as exc:
466 exc = OSError(
467 exc.errno, 'error while '
468 'attempting to bind on address '
469 '{!r}: {}'.format(
470 laddr, exc.strerror.lower()))
471 exceptions.append(exc)
472 else:
473 sock.close()
474 sock = None
475 continue
476 yield from self.sock_connect(sock, address)
477 except OSError as exc:
478 if sock is not None:
479 sock.close()
480 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200481 except:
482 if sock is not None:
483 sock.close()
484 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 else:
486 break
487 else:
488 if len(exceptions) == 1:
489 raise exceptions[0]
490 else:
491 # If they all have the same str(), raise one.
492 model = str(exceptions[0])
493 if all(str(exc) == model for exc in exceptions):
494 raise exceptions[0]
495 # Raise a combined exception so the user can see all
496 # the various error messages.
497 raise OSError('Multiple exceptions: {}'.format(
498 ', '.join(str(exc) for exc in exceptions)))
499
500 elif sock is None:
501 raise ValueError(
502 'host and port was not specified and no sock specified')
503
504 sock.setblocking(False)
505
Yury Selivanovb057c522014-02-18 12:15:06 -0500506 transport, protocol = yield from self._create_connection_transport(
507 sock, protocol_factory, ssl, server_hostname)
508 return transport, protocol
509
Victor Stinnerf951d282014-06-29 00:46:45 +0200510 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500511 def _create_connection_transport(self, sock, protocol_factory, ssl,
512 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513 protocol = protocol_factory()
514 waiter = futures.Future(loop=self)
515 if ssl:
516 sslcontext = None if isinstance(ssl, bool) else ssl
517 transport = self._make_ssl_transport(
518 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700519 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 else:
521 transport = self._make_socket_transport(sock, protocol, waiter)
522
523 yield from waiter
524 return transport, protocol
525
Victor Stinnerf951d282014-06-29 00:46:45 +0200526 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 def create_datagram_endpoint(self, protocol_factory,
528 local_addr=None, remote_addr=None, *,
529 family=0, proto=0, flags=0):
530 """Create datagram connection."""
531 if not (local_addr or remote_addr):
532 if family == 0:
533 raise ValueError('unexpected address family')
534 addr_pairs_info = (((family, proto), (None, None)),)
535 else:
536 # join addresss by (family, protocol)
537 addr_infos = collections.OrderedDict()
538 for idx, addr in ((0, local_addr), (1, remote_addr)):
539 if addr is not None:
540 assert isinstance(addr, tuple) and len(addr) == 2, (
541 '2-tuple is expected')
542
543 infos = yield from self.getaddrinfo(
544 *addr, family=family, type=socket.SOCK_DGRAM,
545 proto=proto, flags=flags)
546 if not infos:
547 raise OSError('getaddrinfo() returned empty list')
548
549 for fam, _, pro, _, address in infos:
550 key = (fam, pro)
551 if key not in addr_infos:
552 addr_infos[key] = [None, None]
553 addr_infos[key][idx] = address
554
555 # each addr has to have info for each (family, proto) pair
556 addr_pairs_info = [
557 (key, addr_pair) for key, addr_pair in addr_infos.items()
558 if not ((local_addr and addr_pair[0] is None) or
559 (remote_addr and addr_pair[1] is None))]
560
561 if not addr_pairs_info:
562 raise ValueError('can not get address information')
563
564 exceptions = []
565
566 for ((family, proto),
567 (local_address, remote_address)) in addr_pairs_info:
568 sock = None
569 r_addr = None
570 try:
571 sock = socket.socket(
572 family=family, type=socket.SOCK_DGRAM, proto=proto)
573 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
574 sock.setblocking(False)
575
576 if local_addr:
577 sock.bind(local_address)
578 if remote_addr:
579 yield from self.sock_connect(sock, remote_address)
580 r_addr = remote_address
581 except OSError as exc:
582 if sock is not None:
583 sock.close()
584 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200585 except:
586 if sock is not None:
587 sock.close()
588 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 else:
590 break
591 else:
592 raise exceptions[0]
593
594 protocol = protocol_factory()
595 transport = self._make_datagram_transport(sock, protocol, r_addr)
596 return transport, protocol
597
Victor Stinnerf951d282014-06-29 00:46:45 +0200598 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599 def create_server(self, protocol_factory, host=None, port=None,
600 *,
601 family=socket.AF_UNSPEC,
602 flags=socket.AI_PASSIVE,
603 sock=None,
604 backlog=100,
605 ssl=None,
606 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200607 """Create a TCP server bound to host and port.
608
609 Return an AbstractServer object which can be used to stop the service.
610
611 This method is a coroutine.
612 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700613 if isinstance(ssl, bool):
614 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 if host is not None or port is not None:
616 if sock is not None:
617 raise ValueError(
618 'host/port and sock can not be specified at the same time')
619
620 AF_INET6 = getattr(socket, 'AF_INET6', 0)
621 if reuse_address is None:
622 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
623 sockets = []
624 if host == '':
625 host = None
626
627 infos = yield from self.getaddrinfo(
628 host, port, family=family,
629 type=socket.SOCK_STREAM, proto=0, flags=flags)
630 if not infos:
631 raise OSError('getaddrinfo() returned empty list')
632
633 completed = False
634 try:
635 for res in infos:
636 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700637 try:
638 sock = socket.socket(af, socktype, proto)
639 except socket.error:
640 # Assume it's a bad family/type/protocol combination.
641 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 sockets.append(sock)
643 if reuse_address:
644 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
645 True)
646 # Disable IPv4/IPv6 dual stack support (enabled by
647 # default on Linux) which makes a single socket
648 # listen on both address families.
649 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
650 sock.setsockopt(socket.IPPROTO_IPV6,
651 socket.IPV6_V6ONLY,
652 True)
653 try:
654 sock.bind(sa)
655 except OSError as err:
656 raise OSError(err.errno, 'error while attempting '
657 'to bind on address %r: %s'
658 % (sa, err.strerror.lower()))
659 completed = True
660 finally:
661 if not completed:
662 for sock in sockets:
663 sock.close()
664 else:
665 if sock is None:
666 raise ValueError(
667 'host and port was not specified and no sock specified')
668 sockets = [sock]
669
670 server = Server(self, sockets)
671 for sock in sockets:
672 sock.listen(backlog)
673 sock.setblocking(False)
674 self._start_serving(protocol_factory, sock, ssl, server)
675 return server
676
Victor Stinnerf951d282014-06-29 00:46:45 +0200677 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 def connect_read_pipe(self, protocol_factory, pipe):
679 protocol = protocol_factory()
680 waiter = futures.Future(loop=self)
681 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
682 yield from waiter
683 return transport, protocol
684
Victor Stinnerf951d282014-06-29 00:46:45 +0200685 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 def connect_write_pipe(self, protocol_factory, pipe):
687 protocol = protocol_factory()
688 waiter = futures.Future(loop=self)
689 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
690 yield from waiter
691 return transport, protocol
692
Victor Stinnerf951d282014-06-29 00:46:45 +0200693 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
695 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
696 universal_newlines=False, shell=True, bufsize=0,
697 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100698 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800699 raise ValueError("cmd must be a string")
700 if universal_newlines:
701 raise ValueError("universal_newlines must be False")
702 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100703 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800704 if bufsize != 0:
705 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 protocol = protocol_factory()
707 transport = yield from self._make_subprocess_transport(
708 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
709 return transport, protocol
710
Victor Stinnerf951d282014-06-29 00:46:45 +0200711 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500712 def subprocess_exec(self, protocol_factory, program, *args,
713 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
714 stderr=subprocess.PIPE, universal_newlines=False,
715 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800716 if universal_newlines:
717 raise ValueError("universal_newlines must be False")
718 if shell:
719 raise ValueError("shell must be False")
720 if bufsize != 0:
721 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100722 popen_args = (program,) + args
723 for arg in popen_args:
724 if not isinstance(arg, (str, bytes)):
725 raise TypeError("program arguments must be "
726 "a bytes or text string, not %s"
727 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700728 protocol = protocol_factory()
729 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500730 protocol, popen_args, False, stdin, stdout, stderr,
731 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 return transport, protocol
733
Yury Selivanov569efa22014-02-18 18:02:19 -0500734 def set_exception_handler(self, handler):
735 """Set handler as the new event loop exception handler.
736
737 If handler is None, the default exception handler will
738 be set.
739
740 If handler is a callable object, it should have a
741 matching signature to '(loop, context)', where 'loop'
742 will be a reference to the active event loop, 'context'
743 will be a dict object (see `call_exception_handler()`
744 documentation for details about context).
745 """
746 if handler is not None and not callable(handler):
747 raise TypeError('A callable object or None is expected, '
748 'got {!r}'.format(handler))
749 self._exception_handler = handler
750
751 def default_exception_handler(self, context):
752 """Default exception handler.
753
754 This is called when an exception occurs and no exception
755 handler is set, and can be called by a custom exception
756 handler that wants to defer to the default behavior.
757
758 context parameter has the same meaning as in
759 `call_exception_handler()`.
760 """
761 message = context.get('message')
762 if not message:
763 message = 'Unhandled exception in event loop'
764
765 exception = context.get('exception')
766 if exception is not None:
767 exc_info = (type(exception), exception, exception.__traceback__)
768 else:
769 exc_info = False
770
771 log_lines = [message]
772 for key in sorted(context):
773 if key in {'message', 'exception'}:
774 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200775 value = context[key]
776 if key == 'source_traceback':
777 tb = ''.join(traceback.format_list(value))
778 value = 'Object created at (most recent call last):\n'
779 value += tb.rstrip()
780 else:
781 value = repr(value)
782 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500783
784 logger.error('\n'.join(log_lines), exc_info=exc_info)
785
786 def call_exception_handler(self, context):
787 """Call the current event loop exception handler.
788
789 context is a dict object containing the following keys
790 (new keys maybe introduced later):
791 - 'message': Error message;
792 - 'exception' (optional): Exception object;
793 - 'future' (optional): Future instance;
794 - 'handle' (optional): Handle instance;
795 - 'protocol' (optional): Protocol instance;
796 - 'transport' (optional): Transport instance;
797 - 'socket' (optional): Socket instance.
798
799 Note: this method should not be overloaded in subclassed
800 event loops. For any custom exception handling, use
801 `set_exception_handler()` method.
802 """
803 if self._exception_handler is None:
804 try:
805 self.default_exception_handler(context)
806 except Exception:
807 # Second protection layer for unexpected errors
808 # in the default implementation, as well as for subclassed
809 # event loops with overloaded "default_exception_handler".
810 logger.error('Exception in default exception handler',
811 exc_info=True)
812 else:
813 try:
814 self._exception_handler(self, context)
815 except Exception as exc:
816 # Exception in the user set custom exception handler.
817 try:
818 # Let's try default handler.
819 self.default_exception_handler({
820 'message': 'Unhandled error in exception handler',
821 'exception': exc,
822 'context': context,
823 })
824 except Exception:
825 # Guard 'default_exception_handler' in case it's
826 # overloaded.
827 logger.error('Exception in default exception handler '
828 'while handling an unexpected error '
829 'in custom exception handler',
830 exc_info=True)
831
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700832 def _add_callback(self, handle):
833 """Add a Handle to ready or scheduled."""
834 assert isinstance(handle, events.Handle), 'A Handle is required here'
835 if handle._cancelled:
836 return
837 if isinstance(handle, events.TimerHandle):
838 heapq.heappush(self._scheduled, handle)
839 else:
840 self._ready.append(handle)
841
842 def _add_callback_signalsafe(self, handle):
843 """Like _add_callback() but called from a signal handler."""
844 self._add_callback(handle)
845 self._write_to_self()
846
847 def _run_once(self):
848 """Run one full iteration of the event loop.
849
850 This calls all currently ready callbacks, polls for I/O,
851 schedules the resulting callbacks, and finally schedules
852 'call_later' callbacks.
853 """
854 # Remove delayed calls that were cancelled from head of queue.
855 while self._scheduled and self._scheduled[0]._cancelled:
856 heapq.heappop(self._scheduled)
857
858 timeout = None
859 if self._ready:
860 timeout = 0
861 elif self._scheduled:
862 # Compute the desired timeout.
863 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700864 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700865
Victor Stinner1580fe32014-06-23 00:31:08 +0200866 if self._debug:
Victor Stinner22463aa2014-01-20 23:56:40 +0100867 t0 = self.time()
868 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200869 dt = self.time() - t0
870 if dt >= 1:
Victor Stinner22463aa2014-01-20 23:56:40 +0100871 level = logging.INFO
872 else:
873 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100874 if timeout is not None:
875 logger.log(level, 'poll %.3f took %.3f seconds',
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200876 timeout, dt)
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100877 else:
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200878 logger.log(level, 'poll took %.3f seconds', dt)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700879 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100880 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700881 self._process_events(event_list)
882
883 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100884 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700885 while self._scheduled:
886 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100887 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700888 break
889 handle = heapq.heappop(self._scheduled)
890 self._ready.append(handle)
891
892 # This is the only place where callbacks are actually *called*.
893 # All other places just add them to ready.
894 # Note: We run all currently scheduled callbacks, but not any
895 # callbacks scheduled by callbacks run this time around --
896 # they will be run the next time (after another I/O poll).
897 # Use an idiom that is threadsafe without using locks.
898 ntodo = len(self._ready)
899 for i in range(ntodo):
900 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200901 if handle._cancelled:
902 continue
903 if self._debug:
904 t0 = self.time()
905 handle._run()
906 dt = self.time() - t0
907 if dt >= self.slow_callback_duration:
908 logger.warning('Executing %s took %.3f seconds',
909 _format_handle(handle), dt)
910 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700911 handle._run()
912 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100913
914 def get_debug(self):
915 return self._debug
916
917 def set_debug(self, enabled):
918 self._debug = enabled