blob: 3951fb75b47bc98cadbae9aa1f90c84b5371ff8f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
43
Victor Stinner0e6f52a2014-06-20 17:34:15 +020044def _format_handle(handle):
45 cb = handle._callback
46 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
47 # format the task
48 return repr(cb.__self__)
49 else:
50 return str(handle)
51
52
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053class _StopError(BaseException):
54 """Raised to stop the event loop."""
55
56
Victor Stinner1b0580b2014-02-13 09:24:37 +010057def _check_resolved_address(sock, address):
58 # Ensure that the address is already resolved to avoid the trap of hanging
59 # the entire event loop when the address requires doing a DNS lookup.
60 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010061 if family == socket.AF_INET:
62 host, port = address
63 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010064 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010065 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010066 return
67
Victor Stinner1b0580b2014-02-13 09:24:37 +010068 type_mask = 0
69 if hasattr(socket, 'SOCK_NONBLOCK'):
70 type_mask |= socket.SOCK_NONBLOCK
71 if hasattr(socket, 'SOCK_CLOEXEC'):
72 type_mask |= socket.SOCK_CLOEXEC
73 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
74 # already resolved.
75 try:
76 socket.getaddrinfo(host, port,
77 family=family,
78 type=(sock.type & ~type_mask),
79 proto=sock.proto,
80 flags=socket.AI_NUMERICHOST)
81 except socket.gaierror as err:
82 raise ValueError("address must be resolved (IP address), got %r: %s"
83 % (address, err))
84
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085def _raise_stop_error(*args):
86 raise _StopError
87
88
89class Server(events.AbstractServer):
90
91 def __init__(self, loop, sockets):
92 self.loop = loop
93 self.sockets = sockets
94 self.active_count = 0
95 self.waiters = []
96
97 def attach(self, transport):
98 assert self.sockets is not None
99 self.active_count += 1
100
101 def detach(self, transport):
102 assert self.active_count > 0
103 self.active_count -= 1
104 if self.active_count == 0 and self.sockets is None:
105 self._wakeup()
106
107 def close(self):
108 sockets = self.sockets
109 if sockets is not None:
110 self.sockets = None
111 for sock in sockets:
112 self.loop._stop_serving(sock)
113 if self.active_count == 0:
114 self._wakeup()
115
116 def _wakeup(self):
117 waiters = self.waiters
118 self.waiters = None
119 for waiter in waiters:
120 if not waiter.done():
121 waiter.set_result(waiter)
122
Victor Stinnerf951d282014-06-29 00:46:45 +0200123 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 def wait_closed(self):
125 if self.sockets is None or self.waiters is None:
126 return
127 waiter = futures.Future(loop=self.loop)
128 self.waiters.append(waiter)
129 yield from waiter
130
131
132class BaseEventLoop(events.AbstractEventLoop):
133
134 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200135 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 self._ready = collections.deque()
137 self._scheduled = []
138 self._default_executor = None
139 self._internal_fds = 0
140 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100141 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500142 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200143 self._debug = (not sys.flags.ignore_environment
144 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200145 # In debug mode, if the execution of a callback or a step of a task
146 # exceed this duration in seconds, the slow callback/task is logged.
147 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200149 def __repr__(self):
150 return ('<%s running=%s closed=%s debug=%s>'
151 % (self.__class__.__name__, self.is_running(),
152 self.is_closed(), self.get_debug()))
153
Victor Stinner896a25a2014-07-08 11:29:25 +0200154 def create_task(self, coro):
155 """Schedule a coroutine object.
156
157 Return a task object."""
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200158 task = tasks.Task(coro, loop=self)
159 if task._source_traceback:
160 del task._source_traceback[-1]
161 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 def _make_socket_transport(self, sock, protocol, waiter=None, *,
164 extra=None, server=None):
165 """Create socket transport."""
166 raise NotImplementedError
167
168 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
169 server_side=False, server_hostname=None,
170 extra=None, server=None):
171 """Create SSL transport."""
172 raise NotImplementedError
173
174 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200175 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 """Create datagram transport."""
177 raise NotImplementedError
178
179 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
180 extra=None):
181 """Create read pipe transport."""
182 raise NotImplementedError
183
184 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
185 extra=None):
186 """Create write pipe transport."""
187 raise NotImplementedError
188
Victor Stinnerf951d282014-06-29 00:46:45 +0200189 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 def _make_subprocess_transport(self, protocol, args, shell,
191 stdin, stdout, stderr, bufsize,
192 extra=None, **kwargs):
193 """Create subprocess transport."""
194 raise NotImplementedError
195
196 def _read_from_self(self):
197 """XXX"""
198 raise NotImplementedError
199
200 def _write_to_self(self):
201 """XXX"""
202 raise NotImplementedError
203
204 def _process_events(self, event_list):
205 """Process selector events."""
206 raise NotImplementedError
207
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200208 def _check_closed(self):
209 if self._closed:
210 raise RuntimeError('Event loop is closed')
211
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 def run_forever(self):
213 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200214 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 if self._running:
216 raise RuntimeError('Event loop is running.')
217 self._running = True
218 try:
219 while True:
220 try:
221 self._run_once()
222 except _StopError:
223 break
224 finally:
225 self._running = False
226
227 def run_until_complete(self, future):
228 """Run until the Future is done.
229
230 If the argument is a coroutine, it is wrapped in a Task.
231
232 XXX TBD: It would be disastrous to call run_until_complete()
233 with the same coroutine twice -- it would wrap it in two
234 different Tasks and that can't be good.
235
236 Return the Future's result, or raise its exception.
237 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200238 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200239
240 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200242 if new_task:
243 # An exception is raised if the future didn't complete, so there
244 # is no need to log the "destroy pending task" message
245 future._log_destroy_pending = False
246
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 future.add_done_callback(_raise_stop_error)
248 self.run_forever()
249 future.remove_done_callback(_raise_stop_error)
250 if not future.done():
251 raise RuntimeError('Event loop stopped before Future completed.')
252
253 return future.result()
254
255 def stop(self):
256 """Stop running the event loop.
257
258 Every callback scheduled before stop() is called will run.
259 Callback scheduled after stop() is called won't. However,
260 those callbacks will run if run() is called again later.
261 """
262 self.call_soon(_raise_stop_error)
263
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200264 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700265 """Close the event loop.
266
267 This clears the queues and shuts down the executor,
268 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200269
270 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700271 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200272 if self._running:
273 raise RuntimeError("cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200274 if self._closed:
275 return
276 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200277 self._ready.clear()
278 self._scheduled.clear()
279 executor = self._default_executor
280 if executor is not None:
281 self._default_executor = None
282 executor.shutdown(wait=False)
283
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200284 def is_closed(self):
285 """Returns True if the event loop was closed."""
286 return self._closed
287
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 def is_running(self):
289 """Returns running status of event loop."""
290 return self._running
291
292 def time(self):
293 """Return the time according to the event loop's clock."""
294 return time.monotonic()
295
296 def call_later(self, delay, callback, *args):
297 """Arrange for a callback to be called at a given time.
298
299 Return a Handle: an opaque object with a cancel() method that
300 can be used to cancel the call.
301
302 The delay can be an int or float, expressed in seconds. It is
303 always a relative time.
304
305 Each callback will be called exactly once. If two callbacks
306 are scheduled for exactly the same time, it undefined which
307 will be called first.
308
309 Any positional arguments after the callback will be passed to
310 the callback when it is called.
311 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200312 timer = self.call_at(self.time() + delay, callback, *args)
313 if timer._source_traceback:
314 del timer._source_traceback[-1]
315 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316
317 def call_at(self, when, callback, *args):
318 """Like call_later(), but uses an absolute time."""
Victor Stinnerf951d282014-06-29 00:46:45 +0200319 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100320 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100321 if self._debug:
322 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500323 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200324 if timer._source_traceback:
325 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 heapq.heappush(self._scheduled, timer)
327 return timer
328
329 def call_soon(self, callback, *args):
330 """Arrange for a callback to be called as soon as possible.
331
332 This operates as a FIFO queue, callbacks are called in the
333 order in which they are registered. Each callback will be
334 called exactly once.
335
336 Any positional arguments after the callback will be passed to
337 the callback when it is called.
338 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200339 handle = self._call_soon(callback, args, check_loop=True)
340 if handle._source_traceback:
341 del handle._source_traceback[-1]
342 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100343
344 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200345 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100346 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100347 if self._debug and check_loop:
348 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500349 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200350 if handle._source_traceback:
351 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 self._ready.append(handle)
353 return handle
354
Victor Stinner93569c22014-03-21 10:00:52 +0100355 def _assert_is_current_event_loop(self):
356 """Asserts that this event loop is the current event loop.
357
358 Non-threadsafe methods of this class make this assumption and will
359 likely behave incorrectly when the assumption is violated.
360
361 Should only be called when (self._debug == True). The caller is
362 responsible for checking this condition for performance reasons.
363 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200364 try:
365 current = events.get_event_loop()
366 except AssertionError:
367 return
368 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100369 raise RuntimeError(
370 "non-threadsafe operation invoked on an event loop other "
371 "than the current one")
372
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 def call_soon_threadsafe(self, callback, *args):
Victor Stinnerd1432092014-06-19 17:11:49 +0200374 """Like call_soon(), but thread safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100375 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200376 if handle._source_traceback:
377 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 self._write_to_self()
379 return handle
380
381 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200382 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100383 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 if isinstance(callback, events.Handle):
385 assert not args
386 assert not isinstance(callback, events.TimerHandle)
387 if callback._cancelled:
388 f = futures.Future(loop=self)
389 f.set_result(None)
390 return f
391 callback, args = callback._callback, callback._args
392 if executor is None:
393 executor = self._default_executor
394 if executor is None:
395 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
396 self._default_executor = executor
397 return futures.wrap_future(executor.submit(callback, *args), loop=self)
398
399 def set_default_executor(self, executor):
400 self._default_executor = executor
401
402 def getaddrinfo(self, host, port, *,
403 family=0, type=0, proto=0, flags=0):
404 return self.run_in_executor(None, socket.getaddrinfo,
405 host, port, family, type, proto, flags)
406
407 def getnameinfo(self, sockaddr, flags=0):
408 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
409
Victor Stinnerf951d282014-06-29 00:46:45 +0200410 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 def create_connection(self, protocol_factory, host=None, port=None, *,
412 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700413 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200414 """Connect to a TCP server.
415
416 Create a streaming transport connection to a given Internet host and
417 port: socket family AF_INET or socket.AF_INET6 depending on host (or
418 family if specified), socket type SOCK_STREAM. protocol_factory must be
419 a callable returning a protocol instance.
420
421 This method is a coroutine which will try to establish the connection
422 in the background. When successful, the coroutine returns a
423 (transport, protocol) pair.
424 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700425 if server_hostname is not None and not ssl:
426 raise ValueError('server_hostname is only meaningful with ssl')
427
428 if server_hostname is None and ssl:
429 # Use host as default for server_hostname. It is an error
430 # if host is empty or not set, e.g. when an
431 # already-connected socket was passed or when only a port
432 # is given. To avoid this error, you can pass
433 # server_hostname='' -- this will bypass the hostname
434 # check. (This also means that if host is a numeric
435 # IP/IPv6 address, we will attempt to verify that exact
436 # address; this will probably fail, but it is possible to
437 # create a certificate for a specific IP address, so we
438 # don't judge it here.)
439 if not host:
440 raise ValueError('You must set server_hostname '
441 'when using ssl without a host')
442 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700443
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 if host is not None or port is not None:
445 if sock is not None:
446 raise ValueError(
447 'host/port and sock can not be specified at the same time')
448
449 f1 = self.getaddrinfo(
450 host, port, family=family,
451 type=socket.SOCK_STREAM, proto=proto, flags=flags)
452 fs = [f1]
453 if local_addr is not None:
454 f2 = self.getaddrinfo(
455 *local_addr, family=family,
456 type=socket.SOCK_STREAM, proto=proto, flags=flags)
457 fs.append(f2)
458 else:
459 f2 = None
460
461 yield from tasks.wait(fs, loop=self)
462
463 infos = f1.result()
464 if not infos:
465 raise OSError('getaddrinfo() returned empty list')
466 if f2 is not None:
467 laddr_infos = f2.result()
468 if not laddr_infos:
469 raise OSError('getaddrinfo() returned empty list')
470
471 exceptions = []
472 for family, type, proto, cname, address in infos:
473 try:
474 sock = socket.socket(family=family, type=type, proto=proto)
475 sock.setblocking(False)
476 if f2 is not None:
477 for _, _, _, _, laddr in laddr_infos:
478 try:
479 sock.bind(laddr)
480 break
481 except OSError as exc:
482 exc = OSError(
483 exc.errno, 'error while '
484 'attempting to bind on address '
485 '{!r}: {}'.format(
486 laddr, exc.strerror.lower()))
487 exceptions.append(exc)
488 else:
489 sock.close()
490 sock = None
491 continue
492 yield from self.sock_connect(sock, address)
493 except OSError as exc:
494 if sock is not None:
495 sock.close()
496 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200497 except:
498 if sock is not None:
499 sock.close()
500 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501 else:
502 break
503 else:
504 if len(exceptions) == 1:
505 raise exceptions[0]
506 else:
507 # If they all have the same str(), raise one.
508 model = str(exceptions[0])
509 if all(str(exc) == model for exc in exceptions):
510 raise exceptions[0]
511 # Raise a combined exception so the user can see all
512 # the various error messages.
513 raise OSError('Multiple exceptions: {}'.format(
514 ', '.join(str(exc) for exc in exceptions)))
515
516 elif sock is None:
517 raise ValueError(
518 'host and port was not specified and no sock specified')
519
520 sock.setblocking(False)
521
Yury Selivanovb057c522014-02-18 12:15:06 -0500522 transport, protocol = yield from self._create_connection_transport(
523 sock, protocol_factory, ssl, server_hostname)
524 return transport, protocol
525
Victor Stinnerf951d282014-06-29 00:46:45 +0200526 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500527 def _create_connection_transport(self, sock, protocol_factory, ssl,
528 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 protocol = protocol_factory()
530 waiter = futures.Future(loop=self)
531 if ssl:
532 sslcontext = None if isinstance(ssl, bool) else ssl
533 transport = self._make_ssl_transport(
534 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700535 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 else:
537 transport = self._make_socket_transport(sock, protocol, waiter)
538
539 yield from waiter
540 return transport, protocol
541
Victor Stinnerf951d282014-06-29 00:46:45 +0200542 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 def create_datagram_endpoint(self, protocol_factory,
544 local_addr=None, remote_addr=None, *,
545 family=0, proto=0, flags=0):
546 """Create datagram connection."""
547 if not (local_addr or remote_addr):
548 if family == 0:
549 raise ValueError('unexpected address family')
550 addr_pairs_info = (((family, proto), (None, None)),)
551 else:
552 # join addresss by (family, protocol)
553 addr_infos = collections.OrderedDict()
554 for idx, addr in ((0, local_addr), (1, remote_addr)):
555 if addr is not None:
556 assert isinstance(addr, tuple) and len(addr) == 2, (
557 '2-tuple is expected')
558
559 infos = yield from self.getaddrinfo(
560 *addr, family=family, type=socket.SOCK_DGRAM,
561 proto=proto, flags=flags)
562 if not infos:
563 raise OSError('getaddrinfo() returned empty list')
564
565 for fam, _, pro, _, address in infos:
566 key = (fam, pro)
567 if key not in addr_infos:
568 addr_infos[key] = [None, None]
569 addr_infos[key][idx] = address
570
571 # each addr has to have info for each (family, proto) pair
572 addr_pairs_info = [
573 (key, addr_pair) for key, addr_pair in addr_infos.items()
574 if not ((local_addr and addr_pair[0] is None) or
575 (remote_addr and addr_pair[1] is None))]
576
577 if not addr_pairs_info:
578 raise ValueError('can not get address information')
579
580 exceptions = []
581
582 for ((family, proto),
583 (local_address, remote_address)) in addr_pairs_info:
584 sock = None
585 r_addr = None
586 try:
587 sock = socket.socket(
588 family=family, type=socket.SOCK_DGRAM, proto=proto)
589 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
590 sock.setblocking(False)
591
592 if local_addr:
593 sock.bind(local_address)
594 if remote_addr:
595 yield from self.sock_connect(sock, remote_address)
596 r_addr = remote_address
597 except OSError as exc:
598 if sock is not None:
599 sock.close()
600 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200601 except:
602 if sock is not None:
603 sock.close()
604 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 else:
606 break
607 else:
608 raise exceptions[0]
609
610 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200611 waiter = futures.Future(loop=self)
612 transport = self._make_datagram_transport(sock, protocol, r_addr,
613 waiter)
614 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 return transport, protocol
616
Victor Stinnerf951d282014-06-29 00:46:45 +0200617 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 def create_server(self, protocol_factory, host=None, port=None,
619 *,
620 family=socket.AF_UNSPEC,
621 flags=socket.AI_PASSIVE,
622 sock=None,
623 backlog=100,
624 ssl=None,
625 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200626 """Create a TCP server bound to host and port.
627
628 Return an AbstractServer object which can be used to stop the service.
629
630 This method is a coroutine.
631 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700632 if isinstance(ssl, bool):
633 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 if host is not None or port is not None:
635 if sock is not None:
636 raise ValueError(
637 'host/port and sock can not be specified at the same time')
638
639 AF_INET6 = getattr(socket, 'AF_INET6', 0)
640 if reuse_address is None:
641 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
642 sockets = []
643 if host == '':
644 host = None
645
646 infos = yield from self.getaddrinfo(
647 host, port, family=family,
648 type=socket.SOCK_STREAM, proto=0, flags=flags)
649 if not infos:
650 raise OSError('getaddrinfo() returned empty list')
651
652 completed = False
653 try:
654 for res in infos:
655 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700656 try:
657 sock = socket.socket(af, socktype, proto)
658 except socket.error:
659 # Assume it's a bad family/type/protocol combination.
660 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 sockets.append(sock)
662 if reuse_address:
663 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
664 True)
665 # Disable IPv4/IPv6 dual stack support (enabled by
666 # default on Linux) which makes a single socket
667 # listen on both address families.
668 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
669 sock.setsockopt(socket.IPPROTO_IPV6,
670 socket.IPV6_V6ONLY,
671 True)
672 try:
673 sock.bind(sa)
674 except OSError as err:
675 raise OSError(err.errno, 'error while attempting '
676 'to bind on address %r: %s'
677 % (sa, err.strerror.lower()))
678 completed = True
679 finally:
680 if not completed:
681 for sock in sockets:
682 sock.close()
683 else:
684 if sock is None:
685 raise ValueError(
686 'host and port was not specified and no sock specified')
687 sockets = [sock]
688
689 server = Server(self, sockets)
690 for sock in sockets:
691 sock.listen(backlog)
692 sock.setblocking(False)
693 self._start_serving(protocol_factory, sock, ssl, server)
694 return server
695
Victor Stinnerf951d282014-06-29 00:46:45 +0200696 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700697 def connect_read_pipe(self, protocol_factory, pipe):
698 protocol = protocol_factory()
699 waiter = futures.Future(loop=self)
700 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
701 yield from waiter
702 return transport, protocol
703
Victor Stinnerf951d282014-06-29 00:46:45 +0200704 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 def connect_write_pipe(self, protocol_factory, pipe):
706 protocol = protocol_factory()
707 waiter = futures.Future(loop=self)
708 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
709 yield from waiter
710 return transport, protocol
711
Victor Stinnerf951d282014-06-29 00:46:45 +0200712 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
714 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
715 universal_newlines=False, shell=True, bufsize=0,
716 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100717 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800718 raise ValueError("cmd must be a string")
719 if universal_newlines:
720 raise ValueError("universal_newlines must be False")
721 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100722 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800723 if bufsize != 0:
724 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 protocol = protocol_factory()
726 transport = yield from self._make_subprocess_transport(
727 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
728 return transport, protocol
729
Victor Stinnerf951d282014-06-29 00:46:45 +0200730 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500731 def subprocess_exec(self, protocol_factory, program, *args,
732 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
733 stderr=subprocess.PIPE, universal_newlines=False,
734 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800735 if universal_newlines:
736 raise ValueError("universal_newlines must be False")
737 if shell:
738 raise ValueError("shell must be False")
739 if bufsize != 0:
740 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100741 popen_args = (program,) + args
742 for arg in popen_args:
743 if not isinstance(arg, (str, bytes)):
744 raise TypeError("program arguments must be "
745 "a bytes or text string, not %s"
746 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747 protocol = protocol_factory()
748 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500749 protocol, popen_args, False, stdin, stdout, stderr,
750 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751 return transport, protocol
752
Yury Selivanov569efa22014-02-18 18:02:19 -0500753 def set_exception_handler(self, handler):
754 """Set handler as the new event loop exception handler.
755
756 If handler is None, the default exception handler will
757 be set.
758
759 If handler is a callable object, it should have a
760 matching signature to '(loop, context)', where 'loop'
761 will be a reference to the active event loop, 'context'
762 will be a dict object (see `call_exception_handler()`
763 documentation for details about context).
764 """
765 if handler is not None and not callable(handler):
766 raise TypeError('A callable object or None is expected, '
767 'got {!r}'.format(handler))
768 self._exception_handler = handler
769
770 def default_exception_handler(self, context):
771 """Default exception handler.
772
773 This is called when an exception occurs and no exception
774 handler is set, and can be called by a custom exception
775 handler that wants to defer to the default behavior.
776
777 context parameter has the same meaning as in
778 `call_exception_handler()`.
779 """
780 message = context.get('message')
781 if not message:
782 message = 'Unhandled exception in event loop'
783
784 exception = context.get('exception')
785 if exception is not None:
786 exc_info = (type(exception), exception, exception.__traceback__)
787 else:
788 exc_info = False
789
790 log_lines = [message]
791 for key in sorted(context):
792 if key in {'message', 'exception'}:
793 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200794 value = context[key]
795 if key == 'source_traceback':
796 tb = ''.join(traceback.format_list(value))
797 value = 'Object created at (most recent call last):\n'
798 value += tb.rstrip()
799 else:
800 value = repr(value)
801 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500802
803 logger.error('\n'.join(log_lines), exc_info=exc_info)
804
805 def call_exception_handler(self, context):
806 """Call the current event loop exception handler.
807
808 context is a dict object containing the following keys
809 (new keys maybe introduced later):
810 - 'message': Error message;
811 - 'exception' (optional): Exception object;
812 - 'future' (optional): Future instance;
813 - 'handle' (optional): Handle instance;
814 - 'protocol' (optional): Protocol instance;
815 - 'transport' (optional): Transport instance;
816 - 'socket' (optional): Socket instance.
817
818 Note: this method should not be overloaded in subclassed
819 event loops. For any custom exception handling, use
820 `set_exception_handler()` method.
821 """
822 if self._exception_handler is None:
823 try:
824 self.default_exception_handler(context)
825 except Exception:
826 # Second protection layer for unexpected errors
827 # in the default implementation, as well as for subclassed
828 # event loops with overloaded "default_exception_handler".
829 logger.error('Exception in default exception handler',
830 exc_info=True)
831 else:
832 try:
833 self._exception_handler(self, context)
834 except Exception as exc:
835 # Exception in the user set custom exception handler.
836 try:
837 # Let's try default handler.
838 self.default_exception_handler({
839 'message': 'Unhandled error in exception handler',
840 'exception': exc,
841 'context': context,
842 })
843 except Exception:
844 # Guard 'default_exception_handler' in case it's
845 # overloaded.
846 logger.error('Exception in default exception handler '
847 'while handling an unexpected error '
848 'in custom exception handler',
849 exc_info=True)
850
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700851 def _add_callback(self, handle):
852 """Add a Handle to ready or scheduled."""
853 assert isinstance(handle, events.Handle), 'A Handle is required here'
854 if handle._cancelled:
855 return
856 if isinstance(handle, events.TimerHandle):
857 heapq.heappush(self._scheduled, handle)
858 else:
859 self._ready.append(handle)
860
861 def _add_callback_signalsafe(self, handle):
862 """Like _add_callback() but called from a signal handler."""
863 self._add_callback(handle)
864 self._write_to_self()
865
866 def _run_once(self):
867 """Run one full iteration of the event loop.
868
869 This calls all currently ready callbacks, polls for I/O,
870 schedules the resulting callbacks, and finally schedules
871 'call_later' callbacks.
872 """
873 # Remove delayed calls that were cancelled from head of queue.
874 while self._scheduled and self._scheduled[0]._cancelled:
875 heapq.heappop(self._scheduled)
876
877 timeout = None
878 if self._ready:
879 timeout = 0
880 elif self._scheduled:
881 # Compute the desired timeout.
882 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700883 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700884
Victor Stinner770e48d2014-07-11 11:58:33 +0200885 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +0100886 t0 = self.time()
887 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200888 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +0200889 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +0100890 level = logging.INFO
891 else:
892 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +0200893 nevent = len(event_list)
894 if timeout is None:
895 logger.log(level, 'poll took %.3f ms: %s events',
896 dt * 1e3, nevent)
897 elif nevent:
898 logger.log(level,
899 'poll %.3f ms took %.3f ms: %s events',
900 timeout * 1e3, dt * 1e3, nevent)
901 elif dt >= 1.0:
902 logger.log(level,
903 'poll %.3f ms took %.3f ms: timeout',
904 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700905 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100906 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700907 self._process_events(event_list)
908
909 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100910 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700911 while self._scheduled:
912 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100913 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700914 break
915 handle = heapq.heappop(self._scheduled)
916 self._ready.append(handle)
917
918 # This is the only place where callbacks are actually *called*.
919 # All other places just add them to ready.
920 # Note: We run all currently scheduled callbacks, but not any
921 # callbacks scheduled by callbacks run this time around --
922 # they will be run the next time (after another I/O poll).
923 # Use an idiom that is threadsafe without using locks.
924 ntodo = len(self._ready)
925 for i in range(ntodo):
926 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200927 if handle._cancelled:
928 continue
929 if self._debug:
930 t0 = self.time()
931 handle._run()
932 dt = self.time() - t0
933 if dt >= self.slow_callback_duration:
934 logger.warning('Executing %s took %.3f seconds',
935 _format_handle(handle), dt)
936 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700937 handle._run()
938 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100939
940 def get_debug(self):
941 return self._debug
942
943 def set_debug(self, enabled):
944 self._debug = enabled