blob: e5683fd15bcbad88026949745519ebd5f97cb8bd [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
43
Victor Stinner0e6f52a2014-06-20 17:34:15 +020044def _format_handle(handle):
45 cb = handle._callback
46 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
47 # format the task
48 return repr(cb.__self__)
49 else:
50 return str(handle)
51
52
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053class _StopError(BaseException):
54 """Raised to stop the event loop."""
55
56
Victor Stinner1b0580b2014-02-13 09:24:37 +010057def _check_resolved_address(sock, address):
58 # Ensure that the address is already resolved to avoid the trap of hanging
59 # the entire event loop when the address requires doing a DNS lookup.
60 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010061 if family == socket.AF_INET:
62 host, port = address
63 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010064 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010065 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010066 return
67
Victor Stinner1b0580b2014-02-13 09:24:37 +010068 type_mask = 0
69 if hasattr(socket, 'SOCK_NONBLOCK'):
70 type_mask |= socket.SOCK_NONBLOCK
71 if hasattr(socket, 'SOCK_CLOEXEC'):
72 type_mask |= socket.SOCK_CLOEXEC
73 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
74 # already resolved.
75 try:
76 socket.getaddrinfo(host, port,
77 family=family,
78 type=(sock.type & ~type_mask),
79 proto=sock.proto,
80 flags=socket.AI_NUMERICHOST)
81 except socket.gaierror as err:
82 raise ValueError("address must be resolved (IP address), got %r: %s"
83 % (address, err))
84
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085def _raise_stop_error(*args):
86 raise _StopError
87
88
89class Server(events.AbstractServer):
90
91 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +020092 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +020094 self._active_count = 0
95 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
Victor Stinnere912e652014-07-12 03:11:53 +020097 def __repr__(self):
98 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
99
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200100 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200102 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200104 def _detach(self):
105 assert self._active_count > 0
106 self._active_count -= 1
107 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 self._wakeup()
109
110 def close(self):
111 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200112 if sockets is None:
113 return
114 self.sockets = None
115 for sock in sockets:
116 self._loop._stop_serving(sock)
117 if self._active_count == 0:
118 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200121 waiters = self._waiters
122 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 for waiter in waiters:
124 if not waiter.done():
125 waiter.set_result(waiter)
126
Victor Stinnerf951d282014-06-29 00:46:45 +0200127 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200129 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200131 waiter = futures.Future(loop=self._loop)
132 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 yield from waiter
134
135
136class BaseEventLoop(events.AbstractEventLoop):
137
138 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200139 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140 self._ready = collections.deque()
141 self._scheduled = []
142 self._default_executor = None
143 self._internal_fds = 0
144 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100145 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500146 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200147 self._debug = (not sys.flags.ignore_environment
148 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200149 # In debug mode, if the execution of a callback or a step of a task
150 # exceed this duration in seconds, the slow callback/task is logged.
151 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200153 def __repr__(self):
154 return ('<%s running=%s closed=%s debug=%s>'
155 % (self.__class__.__name__, self.is_running(),
156 self.is_closed(), self.get_debug()))
157
Victor Stinner896a25a2014-07-08 11:29:25 +0200158 def create_task(self, coro):
159 """Schedule a coroutine object.
160
161 Return a task object."""
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200162 task = tasks.Task(coro, loop=self)
163 if task._source_traceback:
164 del task._source_traceback[-1]
165 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 def _make_socket_transport(self, sock, protocol, waiter=None, *,
168 extra=None, server=None):
169 """Create socket transport."""
170 raise NotImplementedError
171
172 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
173 server_side=False, server_hostname=None,
174 extra=None, server=None):
175 """Create SSL transport."""
176 raise NotImplementedError
177
178 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200179 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 """Create datagram transport."""
181 raise NotImplementedError
182
183 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
184 extra=None):
185 """Create read pipe transport."""
186 raise NotImplementedError
187
188 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
189 extra=None):
190 """Create write pipe transport."""
191 raise NotImplementedError
192
Victor Stinnerf951d282014-06-29 00:46:45 +0200193 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 def _make_subprocess_transport(self, protocol, args, shell,
195 stdin, stdout, stderr, bufsize,
196 extra=None, **kwargs):
197 """Create subprocess transport."""
198 raise NotImplementedError
199
200 def _read_from_self(self):
201 """XXX"""
202 raise NotImplementedError
203
204 def _write_to_self(self):
205 """XXX"""
206 raise NotImplementedError
207
208 def _process_events(self, event_list):
209 """Process selector events."""
210 raise NotImplementedError
211
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200212 def _check_closed(self):
213 if self._closed:
214 raise RuntimeError('Event loop is closed')
215
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 def run_forever(self):
217 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200218 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 if self._running:
220 raise RuntimeError('Event loop is running.')
221 self._running = True
222 try:
223 while True:
224 try:
225 self._run_once()
226 except _StopError:
227 break
228 finally:
229 self._running = False
230
231 def run_until_complete(self, future):
232 """Run until the Future is done.
233
234 If the argument is a coroutine, it is wrapped in a Task.
235
236 XXX TBD: It would be disastrous to call run_until_complete()
237 with the same coroutine twice -- it would wrap it in two
238 different Tasks and that can't be good.
239
240 Return the Future's result, or raise its exception.
241 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200242 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200243
244 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200246 if new_task:
247 # An exception is raised if the future didn't complete, so there
248 # is no need to log the "destroy pending task" message
249 future._log_destroy_pending = False
250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 future.add_done_callback(_raise_stop_error)
252 self.run_forever()
253 future.remove_done_callback(_raise_stop_error)
254 if not future.done():
255 raise RuntimeError('Event loop stopped before Future completed.')
256
257 return future.result()
258
259 def stop(self):
260 """Stop running the event loop.
261
262 Every callback scheduled before stop() is called will run.
263 Callback scheduled after stop() is called won't. However,
264 those callbacks will run if run() is called again later.
265 """
266 self.call_soon(_raise_stop_error)
267
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200268 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700269 """Close the event loop.
270
271 This clears the queues and shuts down the executor,
272 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200273
274 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700275 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200276 if self._running:
277 raise RuntimeError("cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200278 if self._closed:
279 return
Victor Stinnere912e652014-07-12 03:11:53 +0200280 if self._debug:
281 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200282 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200283 self._ready.clear()
284 self._scheduled.clear()
285 executor = self._default_executor
286 if executor is not None:
287 self._default_executor = None
288 executor.shutdown(wait=False)
289
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200290 def is_closed(self):
291 """Returns True if the event loop was closed."""
292 return self._closed
293
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 def is_running(self):
295 """Returns running status of event loop."""
296 return self._running
297
298 def time(self):
299 """Return the time according to the event loop's clock."""
300 return time.monotonic()
301
302 def call_later(self, delay, callback, *args):
303 """Arrange for a callback to be called at a given time.
304
305 Return a Handle: an opaque object with a cancel() method that
306 can be used to cancel the call.
307
308 The delay can be an int or float, expressed in seconds. It is
309 always a relative time.
310
311 Each callback will be called exactly once. If two callbacks
312 are scheduled for exactly the same time, it undefined which
313 will be called first.
314
315 Any positional arguments after the callback will be passed to
316 the callback when it is called.
317 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200318 timer = self.call_at(self.time() + delay, callback, *args)
319 if timer._source_traceback:
320 del timer._source_traceback[-1]
321 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
323 def call_at(self, when, callback, *args):
324 """Like call_later(), but uses an absolute time."""
Victor Stinnerf951d282014-06-29 00:46:45 +0200325 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100326 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100327 if self._debug:
328 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500329 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200330 if timer._source_traceback:
331 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 heapq.heappush(self._scheduled, timer)
333 return timer
334
335 def call_soon(self, callback, *args):
336 """Arrange for a callback to be called as soon as possible.
337
338 This operates as a FIFO queue, callbacks are called in the
339 order in which they are registered. Each callback will be
340 called exactly once.
341
342 Any positional arguments after the callback will be passed to
343 the callback when it is called.
344 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200345 handle = self._call_soon(callback, args, check_loop=True)
346 if handle._source_traceback:
347 del handle._source_traceback[-1]
348 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100349
350 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200351 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100352 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100353 if self._debug and check_loop:
354 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500355 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200356 if handle._source_traceback:
357 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 self._ready.append(handle)
359 return handle
360
Victor Stinner93569c22014-03-21 10:00:52 +0100361 def _assert_is_current_event_loop(self):
362 """Asserts that this event loop is the current event loop.
363
364 Non-threadsafe methods of this class make this assumption and will
365 likely behave incorrectly when the assumption is violated.
366
367 Should only be called when (self._debug == True). The caller is
368 responsible for checking this condition for performance reasons.
369 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200370 try:
371 current = events.get_event_loop()
372 except AssertionError:
373 return
374 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100375 raise RuntimeError(
376 "non-threadsafe operation invoked on an event loop other "
377 "than the current one")
378
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 def call_soon_threadsafe(self, callback, *args):
Victor Stinnerd1432092014-06-19 17:11:49 +0200380 """Like call_soon(), but thread safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100381 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200382 if handle._source_traceback:
383 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 self._write_to_self()
385 return handle
386
387 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200388 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100389 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 if isinstance(callback, events.Handle):
391 assert not args
392 assert not isinstance(callback, events.TimerHandle)
393 if callback._cancelled:
394 f = futures.Future(loop=self)
395 f.set_result(None)
396 return f
397 callback, args = callback._callback, callback._args
398 if executor is None:
399 executor = self._default_executor
400 if executor is None:
401 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
402 self._default_executor = executor
403 return futures.wrap_future(executor.submit(callback, *args), loop=self)
404
405 def set_default_executor(self, executor):
406 self._default_executor = executor
407
Victor Stinnere912e652014-07-12 03:11:53 +0200408 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
409 msg = ["%s:%r" % (host, port)]
410 if family:
411 msg.append('family=%r' % family)
412 if type:
413 msg.append('type=%r' % type)
414 if proto:
415 msg.append('proto=%r' % proto)
416 if flags:
417 msg.append('flags=%r' % flags)
418 msg = ', '.join(msg)
419 logger.debug('Get addresss info %s', msg)
420
421 t0 = self.time()
422 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
423 dt = self.time() - t0
424
425 msg = ('Getting addresss info %s took %.3f ms: %r'
426 % (msg, dt * 1e3, addrinfo))
427 if dt >= self.slow_callback_duration:
428 logger.info(msg)
429 else:
430 logger.debug(msg)
431 return addrinfo
432
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 def getaddrinfo(self, host, port, *,
434 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200435 if self._debug:
436 return self.run_in_executor(None, self._getaddrinfo_debug,
437 host, port, family, type, proto, flags)
438 else:
439 return self.run_in_executor(None, socket.getaddrinfo,
440 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441
442 def getnameinfo(self, sockaddr, flags=0):
443 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
444
Victor Stinnerf951d282014-06-29 00:46:45 +0200445 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 def create_connection(self, protocol_factory, host=None, port=None, *,
447 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700448 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200449 """Connect to a TCP server.
450
451 Create a streaming transport connection to a given Internet host and
452 port: socket family AF_INET or socket.AF_INET6 depending on host (or
453 family if specified), socket type SOCK_STREAM. protocol_factory must be
454 a callable returning a protocol instance.
455
456 This method is a coroutine which will try to establish the connection
457 in the background. When successful, the coroutine returns a
458 (transport, protocol) pair.
459 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700460 if server_hostname is not None and not ssl:
461 raise ValueError('server_hostname is only meaningful with ssl')
462
463 if server_hostname is None and ssl:
464 # Use host as default for server_hostname. It is an error
465 # if host is empty or not set, e.g. when an
466 # already-connected socket was passed or when only a port
467 # is given. To avoid this error, you can pass
468 # server_hostname='' -- this will bypass the hostname
469 # check. (This also means that if host is a numeric
470 # IP/IPv6 address, we will attempt to verify that exact
471 # address; this will probably fail, but it is possible to
472 # create a certificate for a specific IP address, so we
473 # don't judge it here.)
474 if not host:
475 raise ValueError('You must set server_hostname '
476 'when using ssl without a host')
477 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700478
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 if host is not None or port is not None:
480 if sock is not None:
481 raise ValueError(
482 'host/port and sock can not be specified at the same time')
483
484 f1 = self.getaddrinfo(
485 host, port, family=family,
486 type=socket.SOCK_STREAM, proto=proto, flags=flags)
487 fs = [f1]
488 if local_addr is not None:
489 f2 = self.getaddrinfo(
490 *local_addr, family=family,
491 type=socket.SOCK_STREAM, proto=proto, flags=flags)
492 fs.append(f2)
493 else:
494 f2 = None
495
496 yield from tasks.wait(fs, loop=self)
497
498 infos = f1.result()
499 if not infos:
500 raise OSError('getaddrinfo() returned empty list')
501 if f2 is not None:
502 laddr_infos = f2.result()
503 if not laddr_infos:
504 raise OSError('getaddrinfo() returned empty list')
505
506 exceptions = []
507 for family, type, proto, cname, address in infos:
508 try:
509 sock = socket.socket(family=family, type=type, proto=proto)
510 sock.setblocking(False)
511 if f2 is not None:
512 for _, _, _, _, laddr in laddr_infos:
513 try:
514 sock.bind(laddr)
515 break
516 except OSError as exc:
517 exc = OSError(
518 exc.errno, 'error while '
519 'attempting to bind on address '
520 '{!r}: {}'.format(
521 laddr, exc.strerror.lower()))
522 exceptions.append(exc)
523 else:
524 sock.close()
525 sock = None
526 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200527 if self._debug:
528 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 yield from self.sock_connect(sock, address)
530 except OSError as exc:
531 if sock is not None:
532 sock.close()
533 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200534 except:
535 if sock is not None:
536 sock.close()
537 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 else:
539 break
540 else:
541 if len(exceptions) == 1:
542 raise exceptions[0]
543 else:
544 # If they all have the same str(), raise one.
545 model = str(exceptions[0])
546 if all(str(exc) == model for exc in exceptions):
547 raise exceptions[0]
548 # Raise a combined exception so the user can see all
549 # the various error messages.
550 raise OSError('Multiple exceptions: {}'.format(
551 ', '.join(str(exc) for exc in exceptions)))
552
553 elif sock is None:
554 raise ValueError(
555 'host and port was not specified and no sock specified')
556
557 sock.setblocking(False)
558
Yury Selivanovb057c522014-02-18 12:15:06 -0500559 transport, protocol = yield from self._create_connection_transport(
560 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200561 if self._debug:
562 logger.debug("connected to %s:%r: (%r, %r)",
563 host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500564 return transport, protocol
565
Victor Stinnerf951d282014-06-29 00:46:45 +0200566 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500567 def _create_connection_transport(self, sock, protocol_factory, ssl,
568 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 protocol = protocol_factory()
570 waiter = futures.Future(loop=self)
571 if ssl:
572 sslcontext = None if isinstance(ssl, bool) else ssl
573 transport = self._make_ssl_transport(
574 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700575 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 else:
577 transport = self._make_socket_transport(sock, protocol, waiter)
578
579 yield from waiter
580 return transport, protocol
581
Victor Stinnerf951d282014-06-29 00:46:45 +0200582 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 def create_datagram_endpoint(self, protocol_factory,
584 local_addr=None, remote_addr=None, *,
585 family=0, proto=0, flags=0):
586 """Create datagram connection."""
587 if not (local_addr or remote_addr):
588 if family == 0:
589 raise ValueError('unexpected address family')
590 addr_pairs_info = (((family, proto), (None, None)),)
591 else:
592 # join addresss by (family, protocol)
593 addr_infos = collections.OrderedDict()
594 for idx, addr in ((0, local_addr), (1, remote_addr)):
595 if addr is not None:
596 assert isinstance(addr, tuple) and len(addr) == 2, (
597 '2-tuple is expected')
598
599 infos = yield from self.getaddrinfo(
600 *addr, family=family, type=socket.SOCK_DGRAM,
601 proto=proto, flags=flags)
602 if not infos:
603 raise OSError('getaddrinfo() returned empty list')
604
605 for fam, _, pro, _, address in infos:
606 key = (fam, pro)
607 if key not in addr_infos:
608 addr_infos[key] = [None, None]
609 addr_infos[key][idx] = address
610
611 # each addr has to have info for each (family, proto) pair
612 addr_pairs_info = [
613 (key, addr_pair) for key, addr_pair in addr_infos.items()
614 if not ((local_addr and addr_pair[0] is None) or
615 (remote_addr and addr_pair[1] is None))]
616
617 if not addr_pairs_info:
618 raise ValueError('can not get address information')
619
620 exceptions = []
621
622 for ((family, proto),
623 (local_address, remote_address)) in addr_pairs_info:
624 sock = None
625 r_addr = None
626 try:
627 sock = socket.socket(
628 family=family, type=socket.SOCK_DGRAM, proto=proto)
629 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
630 sock.setblocking(False)
631
632 if local_addr:
633 sock.bind(local_address)
634 if remote_addr:
635 yield from self.sock_connect(sock, remote_address)
636 r_addr = remote_address
637 except OSError as exc:
638 if sock is not None:
639 sock.close()
640 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200641 except:
642 if sock is not None:
643 sock.close()
644 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645 else:
646 break
647 else:
648 raise exceptions[0]
649
650 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200651 waiter = futures.Future(loop=self)
652 transport = self._make_datagram_transport(sock, protocol, r_addr,
653 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200654 if self._debug:
655 if local_addr:
656 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
657 "created: (%r, %r)",
658 local_addr, remote_addr, transport, protocol)
659 else:
660 logger.debug("Datagram endpoint remote_addr=%r created: "
661 "(%r, %r)",
662 remote_addr, transport, protocol)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200663 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 return transport, protocol
665
Victor Stinnerf951d282014-06-29 00:46:45 +0200666 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 def create_server(self, protocol_factory, host=None, port=None,
668 *,
669 family=socket.AF_UNSPEC,
670 flags=socket.AI_PASSIVE,
671 sock=None,
672 backlog=100,
673 ssl=None,
674 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200675 """Create a TCP server bound to host and port.
676
Victor Stinner8ebeb032014-07-11 23:47:40 +0200677 Return an Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200678
679 This method is a coroutine.
680 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700681 if isinstance(ssl, bool):
682 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683 if host is not None or port is not None:
684 if sock is not None:
685 raise ValueError(
686 'host/port and sock can not be specified at the same time')
687
688 AF_INET6 = getattr(socket, 'AF_INET6', 0)
689 if reuse_address is None:
690 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
691 sockets = []
692 if host == '':
693 host = None
694
695 infos = yield from self.getaddrinfo(
696 host, port, family=family,
697 type=socket.SOCK_STREAM, proto=0, flags=flags)
698 if not infos:
699 raise OSError('getaddrinfo() returned empty list')
700
701 completed = False
702 try:
703 for res in infos:
704 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700705 try:
706 sock = socket.socket(af, socktype, proto)
707 except socket.error:
708 # Assume it's a bad family/type/protocol combination.
709 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 sockets.append(sock)
711 if reuse_address:
712 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
713 True)
714 # Disable IPv4/IPv6 dual stack support (enabled by
715 # default on Linux) which makes a single socket
716 # listen on both address families.
717 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
718 sock.setsockopt(socket.IPPROTO_IPV6,
719 socket.IPV6_V6ONLY,
720 True)
721 try:
722 sock.bind(sa)
723 except OSError as err:
724 raise OSError(err.errno, 'error while attempting '
725 'to bind on address %r: %s'
726 % (sa, err.strerror.lower()))
727 completed = True
728 finally:
729 if not completed:
730 for sock in sockets:
731 sock.close()
732 else:
733 if sock is None:
734 raise ValueError(
735 'host and port was not specified and no sock specified')
736 sockets = [sock]
737
738 server = Server(self, sockets)
739 for sock in sockets:
740 sock.listen(backlog)
741 sock.setblocking(False)
742 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200743 if self._debug:
744 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 return server
746
Victor Stinnerf951d282014-06-29 00:46:45 +0200747 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748 def connect_read_pipe(self, protocol_factory, pipe):
749 protocol = protocol_factory()
750 waiter = futures.Future(loop=self)
751 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
752 yield from waiter
753 return transport, protocol
754
Victor Stinnerf951d282014-06-29 00:46:45 +0200755 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756 def connect_write_pipe(self, protocol_factory, pipe):
757 protocol = protocol_factory()
758 waiter = futures.Future(loop=self)
759 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
760 yield from waiter
761 return transport, protocol
762
Victor Stinnerf951d282014-06-29 00:46:45 +0200763 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700764 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
765 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
766 universal_newlines=False, shell=True, bufsize=0,
767 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100768 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800769 raise ValueError("cmd must be a string")
770 if universal_newlines:
771 raise ValueError("universal_newlines must be False")
772 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100773 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800774 if bufsize != 0:
775 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776 protocol = protocol_factory()
777 transport = yield from self._make_subprocess_transport(
778 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
779 return transport, protocol
780
Victor Stinnerf951d282014-06-29 00:46:45 +0200781 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500782 def subprocess_exec(self, protocol_factory, program, *args,
783 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
784 stderr=subprocess.PIPE, universal_newlines=False,
785 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800786 if universal_newlines:
787 raise ValueError("universal_newlines must be False")
788 if shell:
789 raise ValueError("shell must be False")
790 if bufsize != 0:
791 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100792 popen_args = (program,) + args
793 for arg in popen_args:
794 if not isinstance(arg, (str, bytes)):
795 raise TypeError("program arguments must be "
796 "a bytes or text string, not %s"
797 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798 protocol = protocol_factory()
799 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500800 protocol, popen_args, False, stdin, stdout, stderr,
801 bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700802 return transport, protocol
803
Yury Selivanov569efa22014-02-18 18:02:19 -0500804 def set_exception_handler(self, handler):
805 """Set handler as the new event loop exception handler.
806
807 If handler is None, the default exception handler will
808 be set.
809
810 If handler is a callable object, it should have a
811 matching signature to '(loop, context)', where 'loop'
812 will be a reference to the active event loop, 'context'
813 will be a dict object (see `call_exception_handler()`
814 documentation for details about context).
815 """
816 if handler is not None and not callable(handler):
817 raise TypeError('A callable object or None is expected, '
818 'got {!r}'.format(handler))
819 self._exception_handler = handler
820
821 def default_exception_handler(self, context):
822 """Default exception handler.
823
824 This is called when an exception occurs and no exception
825 handler is set, and can be called by a custom exception
826 handler that wants to defer to the default behavior.
827
828 context parameter has the same meaning as in
829 `call_exception_handler()`.
830 """
831 message = context.get('message')
832 if not message:
833 message = 'Unhandled exception in event loop'
834
835 exception = context.get('exception')
836 if exception is not None:
837 exc_info = (type(exception), exception, exception.__traceback__)
838 else:
839 exc_info = False
840
841 log_lines = [message]
842 for key in sorted(context):
843 if key in {'message', 'exception'}:
844 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200845 value = context[key]
846 if key == 'source_traceback':
847 tb = ''.join(traceback.format_list(value))
848 value = 'Object created at (most recent call last):\n'
849 value += tb.rstrip()
850 else:
851 value = repr(value)
852 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500853
854 logger.error('\n'.join(log_lines), exc_info=exc_info)
855
856 def call_exception_handler(self, context):
857 """Call the current event loop exception handler.
858
859 context is a dict object containing the following keys
860 (new keys maybe introduced later):
861 - 'message': Error message;
862 - 'exception' (optional): Exception object;
863 - 'future' (optional): Future instance;
864 - 'handle' (optional): Handle instance;
865 - 'protocol' (optional): Protocol instance;
866 - 'transport' (optional): Transport instance;
867 - 'socket' (optional): Socket instance.
868
869 Note: this method should not be overloaded in subclassed
870 event loops. For any custom exception handling, use
871 `set_exception_handler()` method.
872 """
873 if self._exception_handler is None:
874 try:
875 self.default_exception_handler(context)
876 except Exception:
877 # Second protection layer for unexpected errors
878 # in the default implementation, as well as for subclassed
879 # event loops with overloaded "default_exception_handler".
880 logger.error('Exception in default exception handler',
881 exc_info=True)
882 else:
883 try:
884 self._exception_handler(self, context)
885 except Exception as exc:
886 # Exception in the user set custom exception handler.
887 try:
888 # Let's try default handler.
889 self.default_exception_handler({
890 'message': 'Unhandled error in exception handler',
891 'exception': exc,
892 'context': context,
893 })
894 except Exception:
895 # Guard 'default_exception_handler' in case it's
896 # overloaded.
897 logger.error('Exception in default exception handler '
898 'while handling an unexpected error '
899 'in custom exception handler',
900 exc_info=True)
901
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700902 def _add_callback(self, handle):
903 """Add a Handle to ready or scheduled."""
904 assert isinstance(handle, events.Handle), 'A Handle is required here'
905 if handle._cancelled:
906 return
907 if isinstance(handle, events.TimerHandle):
908 heapq.heappush(self._scheduled, handle)
909 else:
910 self._ready.append(handle)
911
912 def _add_callback_signalsafe(self, handle):
913 """Like _add_callback() but called from a signal handler."""
914 self._add_callback(handle)
915 self._write_to_self()
916
917 def _run_once(self):
918 """Run one full iteration of the event loop.
919
920 This calls all currently ready callbacks, polls for I/O,
921 schedules the resulting callbacks, and finally schedules
922 'call_later' callbacks.
923 """
924 # Remove delayed calls that were cancelled from head of queue.
925 while self._scheduled and self._scheduled[0]._cancelled:
926 heapq.heappop(self._scheduled)
927
928 timeout = None
929 if self._ready:
930 timeout = 0
931 elif self._scheduled:
932 # Compute the desired timeout.
933 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700934 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700935
Victor Stinner770e48d2014-07-11 11:58:33 +0200936 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +0100937 t0 = self.time()
938 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200939 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +0200940 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +0100941 level = logging.INFO
942 else:
943 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +0200944 nevent = len(event_list)
945 if timeout is None:
946 logger.log(level, 'poll took %.3f ms: %s events',
947 dt * 1e3, nevent)
948 elif nevent:
949 logger.log(level,
950 'poll %.3f ms took %.3f ms: %s events',
951 timeout * 1e3, dt * 1e3, nevent)
952 elif dt >= 1.0:
953 logger.log(level,
954 'poll %.3f ms took %.3f ms: timeout',
955 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700956 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100957 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700958 self._process_events(event_list)
959
960 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100961 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700962 while self._scheduled:
963 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100964 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700965 break
966 handle = heapq.heappop(self._scheduled)
967 self._ready.append(handle)
968
969 # This is the only place where callbacks are actually *called*.
970 # All other places just add them to ready.
971 # Note: We run all currently scheduled callbacks, but not any
972 # callbacks scheduled by callbacks run this time around --
973 # they will be run the next time (after another I/O poll).
974 # Use an idiom that is threadsafe without using locks.
975 ntodo = len(self._ready)
976 for i in range(ntodo):
977 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200978 if handle._cancelled:
979 continue
980 if self._debug:
981 t0 = self.time()
982 handle._run()
983 dt = self.time() - t0
984 if dt >= self.slow_callback_duration:
985 logger.warning('Executing %s took %.3f seconds',
986 _format_handle(handle), dt)
987 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700988 handle._run()
989 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100990
991 def get_debug(self):
992 return self._debug
993
994 def set_debug(self, enabled):
995 self._debug = enabled