blob: 833f81d4a9086937fa2b9658993c3723e8efdb4a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
43
Victor Stinner0e6f52a2014-06-20 17:34:15 +020044def _format_handle(handle):
45 cb = handle._callback
46 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
47 # format the task
48 return repr(cb.__self__)
49 else:
50 return str(handle)
51
52
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053class _StopError(BaseException):
54 """Raised to stop the event loop."""
55
56
Victor Stinner1b0580b2014-02-13 09:24:37 +010057def _check_resolved_address(sock, address):
58 # Ensure that the address is already resolved to avoid the trap of hanging
59 # the entire event loop when the address requires doing a DNS lookup.
60 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010061 if family == socket.AF_INET:
62 host, port = address
63 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010064 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010065 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010066 return
67
Victor Stinner1b0580b2014-02-13 09:24:37 +010068 type_mask = 0
69 if hasattr(socket, 'SOCK_NONBLOCK'):
70 type_mask |= socket.SOCK_NONBLOCK
71 if hasattr(socket, 'SOCK_CLOEXEC'):
72 type_mask |= socket.SOCK_CLOEXEC
73 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
74 # already resolved.
75 try:
76 socket.getaddrinfo(host, port,
77 family=family,
78 type=(sock.type & ~type_mask),
79 proto=sock.proto,
80 flags=socket.AI_NUMERICHOST)
81 except socket.gaierror as err:
82 raise ValueError("address must be resolved (IP address), got %r: %s"
83 % (address, err))
84
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085def _raise_stop_error(*args):
86 raise _StopError
87
88
89class Server(events.AbstractServer):
90
91 def __init__(self, loop, sockets):
92 self.loop = loop
93 self.sockets = sockets
94 self.active_count = 0
95 self.waiters = []
96
97 def attach(self, transport):
98 assert self.sockets is not None
99 self.active_count += 1
100
101 def detach(self, transport):
102 assert self.active_count > 0
103 self.active_count -= 1
104 if self.active_count == 0 and self.sockets is None:
105 self._wakeup()
106
107 def close(self):
108 sockets = self.sockets
109 if sockets is not None:
110 self.sockets = None
111 for sock in sockets:
112 self.loop._stop_serving(sock)
113 if self.active_count == 0:
114 self._wakeup()
115
116 def _wakeup(self):
117 waiters = self.waiters
118 self.waiters = None
119 for waiter in waiters:
120 if not waiter.done():
121 waiter.set_result(waiter)
122
Victor Stinnerf951d282014-06-29 00:46:45 +0200123 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 def wait_closed(self):
125 if self.sockets is None or self.waiters is None:
126 return
127 waiter = futures.Future(loop=self.loop)
128 self.waiters.append(waiter)
129 yield from waiter
130
131
132class BaseEventLoop(events.AbstractEventLoop):
133
134 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200135 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 self._ready = collections.deque()
137 self._scheduled = []
138 self._default_executor = None
139 self._internal_fds = 0
140 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100141 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500142 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200143 self._debug = (not sys.flags.ignore_environment
144 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200145 # In debug mode, if the execution of a callback or a step of a task
146 # exceed this duration in seconds, the slow callback/task is logged.
147 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200149 def __repr__(self):
150 return ('<%s running=%s closed=%s debug=%s>'
151 % (self.__class__.__name__, self.is_running(),
152 self.is_closed(), self.get_debug()))
153
Victor Stinner896a25a2014-07-08 11:29:25 +0200154 def create_task(self, coro):
155 """Schedule a coroutine object.
156
157 Return a task object."""
158 return tasks.Task(coro, loop=self)
159
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 def _make_socket_transport(self, sock, protocol, waiter=None, *,
161 extra=None, server=None):
162 """Create socket transport."""
163 raise NotImplementedError
164
165 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
166 server_side=False, server_hostname=None,
167 extra=None, server=None):
168 """Create SSL transport."""
169 raise NotImplementedError
170
171 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200172 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 """Create datagram transport."""
174 raise NotImplementedError
175
176 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
177 extra=None):
178 """Create read pipe transport."""
179 raise NotImplementedError
180
181 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
182 extra=None):
183 """Create write pipe transport."""
184 raise NotImplementedError
185
Victor Stinnerf951d282014-06-29 00:46:45 +0200186 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 def _make_subprocess_transport(self, protocol, args, shell,
188 stdin, stdout, stderr, bufsize,
189 extra=None, **kwargs):
190 """Create subprocess transport."""
191 raise NotImplementedError
192
193 def _read_from_self(self):
194 """XXX"""
195 raise NotImplementedError
196
197 def _write_to_self(self):
198 """XXX"""
199 raise NotImplementedError
200
201 def _process_events(self, event_list):
202 """Process selector events."""
203 raise NotImplementedError
204
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200205 def _check_closed(self):
206 if self._closed:
207 raise RuntimeError('Event loop is closed')
208
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 def run_forever(self):
210 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200211 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 if self._running:
213 raise RuntimeError('Event loop is running.')
214 self._running = True
215 try:
216 while True:
217 try:
218 self._run_once()
219 except _StopError:
220 break
221 finally:
222 self._running = False
223
224 def run_until_complete(self, future):
225 """Run until the Future is done.
226
227 If the argument is a coroutine, it is wrapped in a Task.
228
229 XXX TBD: It would be disastrous to call run_until_complete()
230 with the same coroutine twice -- it would wrap it in two
231 different Tasks and that can't be good.
232
233 Return the Future's result, or raise its exception.
234 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200235 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200236
237 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200239 if new_task:
240 # An exception is raised if the future didn't complete, so there
241 # is no need to log the "destroy pending task" message
242 future._log_destroy_pending = False
243
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700244 future.add_done_callback(_raise_stop_error)
245 self.run_forever()
246 future.remove_done_callback(_raise_stop_error)
247 if not future.done():
248 raise RuntimeError('Event loop stopped before Future completed.')
249
250 return future.result()
251
252 def stop(self):
253 """Stop running the event loop.
254
255 Every callback scheduled before stop() is called will run.
256 Callback scheduled after stop() is called won't. However,
257 those callbacks will run if run() is called again later.
258 """
259 self.call_soon(_raise_stop_error)
260
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200261 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700262 """Close the event loop.
263
264 This clears the queues and shuts down the executor,
265 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200266
267 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700268 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200269 if self._running:
270 raise RuntimeError("cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200271 if self._closed:
272 return
273 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200274 self._ready.clear()
275 self._scheduled.clear()
276 executor = self._default_executor
277 if executor is not None:
278 self._default_executor = None
279 executor.shutdown(wait=False)
280
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200281 def is_closed(self):
282 """Returns True if the event loop was closed."""
283 return self._closed
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 def is_running(self):
286 """Returns running status of event loop."""
287 return self._running
288
289 def time(self):
290 """Return the time according to the event loop's clock."""
291 return time.monotonic()
292
293 def call_later(self, delay, callback, *args):
294 """Arrange for a callback to be called at a given time.
295
296 Return a Handle: an opaque object with a cancel() method that
297 can be used to cancel the call.
298
299 The delay can be an int or float, expressed in seconds. It is
300 always a relative time.
301
302 Each callback will be called exactly once. If two callbacks
303 are scheduled for exactly the same time, it undefined which
304 will be called first.
305
306 Any positional arguments after the callback will be passed to
307 the callback when it is called.
308 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200309 timer = self.call_at(self.time() + delay, callback, *args)
310 if timer._source_traceback:
311 del timer._source_traceback[-1]
312 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313
314 def call_at(self, when, callback, *args):
315 """Like call_later(), but uses an absolute time."""
Victor Stinnerf951d282014-06-29 00:46:45 +0200316 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100317 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100318 if self._debug:
319 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500320 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200321 if timer._source_traceback:
322 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 heapq.heappush(self._scheduled, timer)
324 return timer
325
326 def call_soon(self, callback, *args):
327 """Arrange for a callback to be called as soon as possible.
328
329 This operates as a FIFO queue, callbacks are called in the
330 order in which they are registered. Each callback will be
331 called exactly once.
332
333 Any positional arguments after the callback will be passed to
334 the callback when it is called.
335 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200336 handle = self._call_soon(callback, args, check_loop=True)
337 if handle._source_traceback:
338 del handle._source_traceback[-1]
339 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100340
341 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200342 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100343 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100344 if self._debug and check_loop:
345 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500346 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200347 if handle._source_traceback:
348 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 self._ready.append(handle)
350 return handle
351
Victor Stinner93569c22014-03-21 10:00:52 +0100352 def _assert_is_current_event_loop(self):
353 """Asserts that this event loop is the current event loop.
354
355 Non-threadsafe methods of this class make this assumption and will
356 likely behave incorrectly when the assumption is violated.
357
358 Should only be called when (self._debug == True). The caller is
359 responsible for checking this condition for performance reasons.
360 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200361 try:
362 current = events.get_event_loop()
363 except AssertionError:
364 return
365 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100366 raise RuntimeError(
367 "non-threadsafe operation invoked on an event loop other "
368 "than the current one")
369
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 def call_soon_threadsafe(self, callback, *args):
Victor Stinnerd1432092014-06-19 17:11:49 +0200371 """Like call_soon(), but thread safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100372 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200373 if handle._source_traceback:
374 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 self._write_to_self()
376 return handle
377
378 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200379 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100380 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 if isinstance(callback, events.Handle):
382 assert not args
383 assert not isinstance(callback, events.TimerHandle)
384 if callback._cancelled:
385 f = futures.Future(loop=self)
386 f.set_result(None)
387 return f
388 callback, args = callback._callback, callback._args
389 if executor is None:
390 executor = self._default_executor
391 if executor is None:
392 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
393 self._default_executor = executor
394 return futures.wrap_future(executor.submit(callback, *args), loop=self)
395
396 def set_default_executor(self, executor):
397 self._default_executor = executor
398
399 def getaddrinfo(self, host, port, *,
400 family=0, type=0, proto=0, flags=0):
401 return self.run_in_executor(None, socket.getaddrinfo,
402 host, port, family, type, proto, flags)
403
404 def getnameinfo(self, sockaddr, flags=0):
405 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
406
Victor Stinnerf951d282014-06-29 00:46:45 +0200407 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 def create_connection(self, protocol_factory, host=None, port=None, *,
409 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700410 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200411 """Connect to a TCP server.
412
413 Create a streaming transport connection to a given Internet host and
414 port: socket family AF_INET or socket.AF_INET6 depending on host (or
415 family if specified), socket type SOCK_STREAM. protocol_factory must be
416 a callable returning a protocol instance.
417
418 This method is a coroutine which will try to establish the connection
419 in the background. When successful, the coroutine returns a
420 (transport, protocol) pair.
421 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700422 if server_hostname is not None and not ssl:
423 raise ValueError('server_hostname is only meaningful with ssl')
424
425 if server_hostname is None and ssl:
426 # Use host as default for server_hostname. It is an error
427 # if host is empty or not set, e.g. when an
428 # already-connected socket was passed or when only a port
429 # is given. To avoid this error, you can pass
430 # server_hostname='' -- this will bypass the hostname
431 # check. (This also means that if host is a numeric
432 # IP/IPv6 address, we will attempt to verify that exact
433 # address; this will probably fail, but it is possible to
434 # create a certificate for a specific IP address, so we
435 # don't judge it here.)
436 if not host:
437 raise ValueError('You must set server_hostname '
438 'when using ssl without a host')
439 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700440
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 if host is not None or port is not None:
442 if sock is not None:
443 raise ValueError(
444 'host/port and sock can not be specified at the same time')
445
446 f1 = self.getaddrinfo(
447 host, port, family=family,
448 type=socket.SOCK_STREAM, proto=proto, flags=flags)
449 fs = [f1]
450 if local_addr is not None:
451 f2 = self.getaddrinfo(
452 *local_addr, family=family,
453 type=socket.SOCK_STREAM, proto=proto, flags=flags)
454 fs.append(f2)
455 else:
456 f2 = None
457
458 yield from tasks.wait(fs, loop=self)
459
460 infos = f1.result()
461 if not infos:
462 raise OSError('getaddrinfo() returned empty list')
463 if f2 is not None:
464 laddr_infos = f2.result()
465 if not laddr_infos:
466 raise OSError('getaddrinfo() returned empty list')
467
468 exceptions = []
469 for family, type, proto, cname, address in infos:
470 try:
471 sock = socket.socket(family=family, type=type, proto=proto)
472 sock.setblocking(False)
473 if f2 is not None:
474 for _, _, _, _, laddr in laddr_infos:
475 try:
476 sock.bind(laddr)
477 break
478 except OSError as exc:
479 exc = OSError(
480 exc.errno, 'error while '
481 'attempting to bind on address '
482 '{!r}: {}'.format(
483 laddr, exc.strerror.lower()))
484 exceptions.append(exc)
485 else:
486 sock.close()
487 sock = None
488 continue
489 yield from self.sock_connect(sock, address)
490 except OSError as exc:
491 if sock is not None:
492 sock.close()
493 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200494 except:
495 if sock is not None:
496 sock.close()
497 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 else:
499 break
500 else:
501 if len(exceptions) == 1:
502 raise exceptions[0]
503 else:
504 # If they all have the same str(), raise one.
505 model = str(exceptions[0])
506 if all(str(exc) == model for exc in exceptions):
507 raise exceptions[0]
508 # Raise a combined exception so the user can see all
509 # the various error messages.
510 raise OSError('Multiple exceptions: {}'.format(
511 ', '.join(str(exc) for exc in exceptions)))
512
513 elif sock is None:
514 raise ValueError(
515 'host and port was not specified and no sock specified')
516
517 sock.setblocking(False)
518
Yury Selivanovb057c522014-02-18 12:15:06 -0500519 transport, protocol = yield from self._create_connection_transport(
520 sock, protocol_factory, ssl, server_hostname)
521 return transport, protocol
522
Victor Stinnerf951d282014-06-29 00:46:45 +0200523 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500524 def _create_connection_transport(self, sock, protocol_factory, ssl,
525 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 protocol = protocol_factory()
527 waiter = futures.Future(loop=self)
528 if ssl:
529 sslcontext = None if isinstance(ssl, bool) else ssl
530 transport = self._make_ssl_transport(
531 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700532 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 else:
534 transport = self._make_socket_transport(sock, protocol, waiter)
535
536 yield from waiter
537 return transport, protocol
538
Victor Stinnerf951d282014-06-29 00:46:45 +0200539 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 def create_datagram_endpoint(self, protocol_factory,
541 local_addr=None, remote_addr=None, *,
542 family=0, proto=0, flags=0):
543 """Create datagram connection."""
544 if not (local_addr or remote_addr):
545 if family == 0:
546 raise ValueError('unexpected address family')
547 addr_pairs_info = (((family, proto), (None, None)),)
548 else:
549 # join addresss by (family, protocol)
550 addr_infos = collections.OrderedDict()
551 for idx, addr in ((0, local_addr), (1, remote_addr)):
552 if addr is not None:
553 assert isinstance(addr, tuple) and len(addr) == 2, (
554 '2-tuple is expected')
555
556 infos = yield from self.getaddrinfo(
557 *addr, family=family, type=socket.SOCK_DGRAM,
558 proto=proto, flags=flags)
559 if not infos:
560 raise OSError('getaddrinfo() returned empty list')
561
562 for fam, _, pro, _, address in infos:
563 key = (fam, pro)
564 if key not in addr_infos:
565 addr_infos[key] = [None, None]
566 addr_infos[key][idx] = address
567
568 # each addr has to have info for each (family, proto) pair
569 addr_pairs_info = [
570 (key, addr_pair) for key, addr_pair in addr_infos.items()
571 if not ((local_addr and addr_pair[0] is None) or
572 (remote_addr and addr_pair[1] is None))]
573
574 if not addr_pairs_info:
575 raise ValueError('can not get address information')
576
577 exceptions = []
578
579 for ((family, proto),
580 (local_address, remote_address)) in addr_pairs_info:
581 sock = None
582 r_addr = None
583 try:
584 sock = socket.socket(
585 family=family, type=socket.SOCK_DGRAM, proto=proto)
586 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
587 sock.setblocking(False)
588
589 if local_addr:
590 sock.bind(local_address)
591 if remote_addr:
592 yield from self.sock_connect(sock, remote_address)
593 r_addr = remote_address
594 except OSError as exc:
595 if sock is not None:
596 sock.close()
597 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200598 except:
599 if sock is not None:
600 sock.close()
601 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 else:
603 break
604 else:
605 raise exceptions[0]
606
607 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200608 waiter = futures.Future(loop=self)
609 transport = self._make_datagram_transport(sock, protocol, r_addr,
610 waiter)
611 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 return transport, protocol
613
Victor Stinnerf951d282014-06-29 00:46:45 +0200614 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 def create_server(self, protocol_factory, host=None, port=None,
616 *,
617 family=socket.AF_UNSPEC,
618 flags=socket.AI_PASSIVE,
619 sock=None,
620 backlog=100,
621 ssl=None,
622 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200623 """Create a TCP server bound to host and port.
624
625 Return an AbstractServer object which can be used to stop the service.
626
627 This method is a coroutine.
628 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700629 if isinstance(ssl, bool):
630 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 if host is not None or port is not None:
632 if sock is not None:
633 raise ValueError(
634 'host/port and sock can not be specified at the same time')
635
636 AF_INET6 = getattr(socket, 'AF_INET6', 0)
637 if reuse_address is None:
638 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
639 sockets = []
640 if host == '':
641 host = None
642
643 infos = yield from self.getaddrinfo(
644 host, port, family=family,
645 type=socket.SOCK_STREAM, proto=0, flags=flags)
646 if not infos:
647 raise OSError('getaddrinfo() returned empty list')
648
649 completed = False
650 try:
651 for res in infos:
652 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700653 try:
654 sock = socket.socket(af, socktype, proto)
655 except socket.error:
656 # Assume it's a bad family/type/protocol combination.
657 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 sockets.append(sock)
659 if reuse_address:
660 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
661 True)
662 # Disable IPv4/IPv6 dual stack support (enabled by
663 # default on Linux) which makes a single socket
664 # listen on both address families.
665 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
666 sock.setsockopt(socket.IPPROTO_IPV6,
667 socket.IPV6_V6ONLY,
668 True)
669 try:
670 sock.bind(sa)
671 except OSError as err:
672 raise OSError(err.errno, 'error while attempting '
673 'to bind on address %r: %s'
674 % (sa, err.strerror.lower()))
675 completed = True
676 finally:
677 if not completed:
678 for sock in sockets:
679 sock.close()
680 else:
681 if sock is None:
682 raise ValueError(
683 'host and port was not specified and no sock specified')
684 sockets = [sock]
685
686 server = Server(self, sockets)
687 for sock in sockets:
688 sock.listen(backlog)
689 sock.setblocking(False)
690 self._start_serving(protocol_factory, sock, ssl, server)
691 return server
692
Victor Stinnerf951d282014-06-29 00:46:45 +0200693 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 def connect_read_pipe(self, protocol_factory, pipe):
695 protocol = protocol_factory()
696 waiter = futures.Future(loop=self)
697 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
698 yield from waiter
699 return transport, protocol
700
Victor Stinnerf951d282014-06-29 00:46:45 +0200701 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 def connect_write_pipe(self, protocol_factory, pipe):
703 protocol = protocol_factory()
704 waiter = futures.Future(loop=self)
705 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
706 yield from waiter
707 return transport, protocol
708
Victor Stinnerf951d282014-06-29 00:46:45 +0200709 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
711 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
712 universal_newlines=False, shell=True, bufsize=0,
713 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100714 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800715 raise ValueError("cmd must be a string")
716 if universal_newlines:
717 raise ValueError("universal_newlines must be False")
718 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100719 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800720 if bufsize != 0:
721 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722 protocol = protocol_factory()
723 transport = yield from self._make_subprocess_transport(
724 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
725 return transport, protocol
726
Victor Stinnerf951d282014-06-29 00:46:45 +0200727 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500728 def subprocess_exec(self, protocol_factory, program, *args,
729 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
730 stderr=subprocess.PIPE, universal_newlines=False,
731 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800732 if universal_newlines:
733 raise ValueError("universal_newlines must be False")
734 if shell:
735 raise ValueError("shell must be False")
736 if bufsize != 0:
737 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100738 popen_args = (program,) + args
739 for arg in popen_args:
740 if not isinstance(arg, (str, bytes)):
741 raise TypeError("program arguments must be "
742 "a bytes or text string, not %s"
743 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700744 protocol = protocol_factory()
745 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500746 protocol, popen_args, False, stdin, stdout, stderr,
747 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748 return transport, protocol
749
Yury Selivanov569efa22014-02-18 18:02:19 -0500750 def set_exception_handler(self, handler):
751 """Set handler as the new event loop exception handler.
752
753 If handler is None, the default exception handler will
754 be set.
755
756 If handler is a callable object, it should have a
757 matching signature to '(loop, context)', where 'loop'
758 will be a reference to the active event loop, 'context'
759 will be a dict object (see `call_exception_handler()`
760 documentation for details about context).
761 """
762 if handler is not None and not callable(handler):
763 raise TypeError('A callable object or None is expected, '
764 'got {!r}'.format(handler))
765 self._exception_handler = handler
766
767 def default_exception_handler(self, context):
768 """Default exception handler.
769
770 This is called when an exception occurs and no exception
771 handler is set, and can be called by a custom exception
772 handler that wants to defer to the default behavior.
773
774 context parameter has the same meaning as in
775 `call_exception_handler()`.
776 """
777 message = context.get('message')
778 if not message:
779 message = 'Unhandled exception in event loop'
780
781 exception = context.get('exception')
782 if exception is not None:
783 exc_info = (type(exception), exception, exception.__traceback__)
784 else:
785 exc_info = False
786
787 log_lines = [message]
788 for key in sorted(context):
789 if key in {'message', 'exception'}:
790 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200791 value = context[key]
792 if key == 'source_traceback':
793 tb = ''.join(traceback.format_list(value))
794 value = 'Object created at (most recent call last):\n'
795 value += tb.rstrip()
796 else:
797 value = repr(value)
798 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500799
800 logger.error('\n'.join(log_lines), exc_info=exc_info)
801
802 def call_exception_handler(self, context):
803 """Call the current event loop exception handler.
804
805 context is a dict object containing the following keys
806 (new keys maybe introduced later):
807 - 'message': Error message;
808 - 'exception' (optional): Exception object;
809 - 'future' (optional): Future instance;
810 - 'handle' (optional): Handle instance;
811 - 'protocol' (optional): Protocol instance;
812 - 'transport' (optional): Transport instance;
813 - 'socket' (optional): Socket instance.
814
815 Note: this method should not be overloaded in subclassed
816 event loops. For any custom exception handling, use
817 `set_exception_handler()` method.
818 """
819 if self._exception_handler is None:
820 try:
821 self.default_exception_handler(context)
822 except Exception:
823 # Second protection layer for unexpected errors
824 # in the default implementation, as well as for subclassed
825 # event loops with overloaded "default_exception_handler".
826 logger.error('Exception in default exception handler',
827 exc_info=True)
828 else:
829 try:
830 self._exception_handler(self, context)
831 except Exception as exc:
832 # Exception in the user set custom exception handler.
833 try:
834 # Let's try default handler.
835 self.default_exception_handler({
836 'message': 'Unhandled error in exception handler',
837 'exception': exc,
838 'context': context,
839 })
840 except Exception:
841 # Guard 'default_exception_handler' in case it's
842 # overloaded.
843 logger.error('Exception in default exception handler '
844 'while handling an unexpected error '
845 'in custom exception handler',
846 exc_info=True)
847
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700848 def _add_callback(self, handle):
849 """Add a Handle to ready or scheduled."""
850 assert isinstance(handle, events.Handle), 'A Handle is required here'
851 if handle._cancelled:
852 return
853 if isinstance(handle, events.TimerHandle):
854 heapq.heappush(self._scheduled, handle)
855 else:
856 self._ready.append(handle)
857
858 def _add_callback_signalsafe(self, handle):
859 """Like _add_callback() but called from a signal handler."""
860 self._add_callback(handle)
861 self._write_to_self()
862
863 def _run_once(self):
864 """Run one full iteration of the event loop.
865
866 This calls all currently ready callbacks, polls for I/O,
867 schedules the resulting callbacks, and finally schedules
868 'call_later' callbacks.
869 """
870 # Remove delayed calls that were cancelled from head of queue.
871 while self._scheduled and self._scheduled[0]._cancelled:
872 heapq.heappop(self._scheduled)
873
874 timeout = None
875 if self._ready:
876 timeout = 0
877 elif self._scheduled:
878 # Compute the desired timeout.
879 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700880 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700881
Victor Stinner1580fe32014-06-23 00:31:08 +0200882 if self._debug:
Victor Stinner22463aa2014-01-20 23:56:40 +0100883 t0 = self.time()
884 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200885 dt = self.time() - t0
886 if dt >= 1:
Victor Stinner22463aa2014-01-20 23:56:40 +0100887 level = logging.INFO
888 else:
889 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100890 if timeout is not None:
891 logger.log(level, 'poll %.3f took %.3f seconds',
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200892 timeout, dt)
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100893 else:
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200894 logger.log(level, 'poll took %.3f seconds', dt)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700895 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100896 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700897 self._process_events(event_list)
898
899 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100900 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700901 while self._scheduled:
902 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100903 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700904 break
905 handle = heapq.heappop(self._scheduled)
906 self._ready.append(handle)
907
908 # This is the only place where callbacks are actually *called*.
909 # All other places just add them to ready.
910 # Note: We run all currently scheduled callbacks, but not any
911 # callbacks scheduled by callbacks run this time around --
912 # they will be run the next time (after another I/O poll).
913 # Use an idiom that is threadsafe without using locks.
914 ntodo = len(self._ready)
915 for i in range(ntodo):
916 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200917 if handle._cancelled:
918 continue
919 if self._debug:
920 t0 = self.time()
921 handle._run()
922 dt = self.time() - t0
923 if dt >= self.slow_callback_duration:
924 logger.warning('Executing %s took %.3f seconds',
925 _format_handle(handle), dt)
926 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700927 handle._run()
928 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100929
930 def get_debug(self):
931 return self._debug
932
933 def set_debug(self, enabled):
934 self._debug = enabled