blob: 09eb440b0ef7af86f50171cf462e5f6cb0f3a0c7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import collections
Serhiy Storchaka2e576f52017-04-24 09:05:00 +030017import collections.abc
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import concurrent.futures
19import heapq
Victor Stinner5e4a7d82015-09-21 18:33:43 +020020import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Yury Selivanoveb636452016-09-08 22:01:51 -070030import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Yury Selivanovf111b3d2017-12-30 00:35:36 -050032try:
33 import ssl
34except ImportError: # pragma: no cover
35 ssl = None
36
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -080037from . import constants
Victor Stinnerf951d282014-06-29 00:46:45 +020038from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039from . import events
40from . import futures
Andrew Svetlov7c684072018-01-27 21:22:47 +020041from . import protocols
Yury Selivanovf111b3d2017-12-30 00:35:36 -050042from . import sslproto
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043from . import tasks
Andrew Svetlov7c684072018-01-27 21:22:47 +020044from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070045from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
47
Yury Selivanov6370f342017-12-10 18:36:12 -050048__all__ = 'BaseEventLoop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
50
Yury Selivanov592ada92014-09-25 12:07:56 -040051# Minimum number of _scheduled timer handles before cleanup of
52# cancelled handles is performed.
53_MIN_SCHEDULED_TIMER_HANDLES = 100
54
55# Minimum fraction of _scheduled timer handles that are cancelled
56# before cleanup of cancelled handles is performed.
57_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058
Victor Stinnerc94a93a2016-04-01 21:43:39 +020059# Exceptions which must not call the exception handler in fatal error
60# methods (_fatal_error())
61_FATAL_ERROR_IGNORE = (BrokenPipeError,
62 ConnectionResetError, ConnectionAbortedError)
63
64
Victor Stinner0e6f52a2014-06-20 17:34:15 +020065def _format_handle(handle):
66 cb = handle._callback
Yury Selivanova0c1ba62016-10-28 12:52:37 -040067 if isinstance(getattr(cb, '__self__', None), tasks.Task):
Victor Stinner0e6f52a2014-06-20 17:34:15 +020068 # format the task
69 return repr(cb.__self__)
70 else:
71 return str(handle)
72
73
Victor Stinneracdb7822014-07-14 18:33:40 +020074def _format_pipe(fd):
75 if fd == subprocess.PIPE:
76 return '<pipe>'
77 elif fd == subprocess.STDOUT:
78 return '<stdout>'
79 else:
80 return repr(fd)
81
82
Yury Selivanov5587d7c2016-09-15 15:45:07 -040083def _set_reuseport(sock):
84 if not hasattr(socket, 'SO_REUSEPORT'):
85 raise ValueError('reuse_port not supported by socket module')
86 else:
87 try:
88 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
89 except OSError:
90 raise ValueError('reuse_port not supported by socket module, '
91 'SO_REUSEPORT defined but not implemented.')
92
93
Yury Selivanovd5c2a622015-12-16 19:31:17 -050094def _ipaddr_info(host, port, family, type, proto):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -040095 # Try to skip getaddrinfo if "host" is already an IP. Users might have
96 # handled name resolution in their own code and pass in resolved IPs.
97 if not hasattr(socket, 'inet_pton'):
98 return
99
100 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
101 host is None:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500102 return None
103
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500104 if type == socket.SOCK_STREAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500105 proto = socket.IPPROTO_TCP
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500106 elif type == socket.SOCK_DGRAM:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500107 proto = socket.IPPROTO_UDP
108 else:
109 return None
110
Yury Selivanova7146162016-06-02 16:51:07 -0400111 if port is None:
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400112 port = 0
Guido van Rossume3c65a72016-09-30 08:17:15 -0700113 elif isinstance(port, bytes) and port == b'':
114 port = 0
115 elif isinstance(port, str) and port == '':
116 port = 0
117 else:
118 # If port's a service name like "http", don't skip getaddrinfo.
119 try:
120 port = int(port)
121 except (TypeError, ValueError):
122 return None
Yury Selivanoveaaaee82016-05-20 17:44:19 -0400123
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400124 if family == socket.AF_UNSPEC:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500125 afs = [socket.AF_INET]
126 if hasattr(socket, 'AF_INET6'):
127 afs.append(socket.AF_INET6)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400128 else:
129 afs = [family]
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500130
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400131 if isinstance(host, bytes):
132 host = host.decode('idna')
133 if '%' in host:
134 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
135 # like '::1%lo0'.
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500136 return None
137
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400138 for af in afs:
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500139 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400140 socket.inet_pton(af, host)
141 # The host has already been resolved.
142 return af, type, proto, '', (host, port)
143 except OSError:
144 pass
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500145
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400146 # "host" is not an IP address.
147 return None
Yury Selivanovd5c2a622015-12-16 19:31:17 -0500148
149
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100150def _run_until_complete_cb(fut):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500151 if not fut.cancelled():
152 exc = fut.exception()
153 if isinstance(exc, BaseException) and not isinstance(exc, Exception):
154 # Issue #22429: run_forever() already finished, no need to
155 # stop it.
156 return
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500157 futures._get_loop(fut).stop()
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100158
159
Andrew Svetlov7c684072018-01-27 21:22:47 +0200160
161class _SendfileFallbackProtocol(protocols.Protocol):
162 def __init__(self, transp):
163 if not isinstance(transp, transports._FlowControlMixin):
164 raise TypeError("transport should be _FlowControlMixin instance")
165 self._transport = transp
166 self._proto = transp.get_protocol()
167 self._should_resume_reading = transp.is_reading()
168 self._should_resume_writing = transp._protocol_paused
169 transp.pause_reading()
170 transp.set_protocol(self)
171 if self._should_resume_writing:
172 self._write_ready_fut = self._transport._loop.create_future()
173 else:
174 self._write_ready_fut = None
175
176 async def drain(self):
177 if self._transport.is_closing():
178 raise ConnectionError("Connection closed by peer")
179 fut = self._write_ready_fut
180 if fut is None:
181 return
182 await fut
183
184 def connection_made(self, transport):
185 raise RuntimeError("Invalid state: "
186 "connection should have been established already.")
187
188 def connection_lost(self, exc):
189 if self._write_ready_fut is not None:
190 # Never happens if peer disconnects after sending the whole content
191 # Thus disconnection is always an exception from user perspective
192 if exc is None:
193 self._write_ready_fut.set_exception(
194 ConnectionError("Connection is closed by peer"))
195 else:
196 self._write_ready_fut.set_exception(exc)
197 self._proto.connection_lost(exc)
198
199 def pause_writing(self):
200 if self._write_ready_fut is not None:
201 return
202 self._write_ready_fut = self._transport._loop.create_future()
203
204 def resume_writing(self):
205 if self._write_ready_fut is None:
206 return
207 self._write_ready_fut.set_result(False)
208 self._write_ready_fut = None
209
210 def data_received(self, data):
211 raise RuntimeError("Invalid state: reading should be paused")
212
213 def eof_received(self):
214 raise RuntimeError("Invalid state: reading should be paused")
215
216 async def restore(self):
217 self._transport.set_protocol(self._proto)
218 if self._should_resume_reading:
219 self._transport.resume_reading()
220 if self._write_ready_fut is not None:
221 # Cancel the future.
222 # Basically it has no effect because protocol is switched back,
223 # no code should wait for it anymore.
224 self._write_ready_fut.cancel()
225 if self._should_resume_writing:
226 self._proto.resume_writing()
227
228
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229class Server(events.AbstractServer):
230
Yury Selivanovc9070d02018-01-25 18:08:09 -0500231 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
232 ssl_handshake_timeout):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200233 self._loop = loop
Yury Selivanovc9070d02018-01-25 18:08:09 -0500234 self._sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200235 self._active_count = 0
236 self._waiters = []
Yury Selivanovc9070d02018-01-25 18:08:09 -0500237 self._protocol_factory = protocol_factory
238 self._backlog = backlog
239 self._ssl_context = ssl_context
240 self._ssl_handshake_timeout = ssl_handshake_timeout
241 self._serving = False
242 self._serving_forever_fut = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243
Victor Stinnere912e652014-07-12 03:11:53 +0200244 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500245 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
Victor Stinnere912e652014-07-12 03:11:53 +0200246
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200247 def _attach(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500248 assert self._sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200249 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200251 def _detach(self):
252 assert self._active_count > 0
253 self._active_count -= 1
Yury Selivanovc9070d02018-01-25 18:08:09 -0500254 if self._active_count == 0 and self._sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 self._wakeup()
256
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200258 waiters = self._waiters
259 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 for waiter in waiters:
261 if not waiter.done():
262 waiter.set_result(waiter)
263
Yury Selivanovc9070d02018-01-25 18:08:09 -0500264 def _start_serving(self):
265 if self._serving:
266 return
267 self._serving = True
268 for sock in self._sockets:
269 sock.listen(self._backlog)
270 self._loop._start_serving(
271 self._protocol_factory, sock, self._ssl_context,
272 self, self._backlog, self._ssl_handshake_timeout)
273
274 def get_loop(self):
275 return self._loop
276
277 def is_serving(self):
278 return self._serving
279
280 @property
281 def sockets(self):
282 if self._sockets is None:
283 return []
284 return list(self._sockets)
285
286 def close(self):
287 sockets = self._sockets
288 if sockets is None:
289 return
290 self._sockets = None
291
292 for sock in sockets:
293 self._loop._stop_serving(sock)
294
295 self._serving = False
296
297 if (self._serving_forever_fut is not None and
298 not self._serving_forever_fut.done()):
299 self._serving_forever_fut.cancel()
300 self._serving_forever_fut = None
301
302 if self._active_count == 0:
303 self._wakeup()
304
305 async def start_serving(self):
306 self._start_serving()
307
308 async def serve_forever(self):
309 if self._serving_forever_fut is not None:
310 raise RuntimeError(
311 f'server {self!r} is already being awaited on serve_forever()')
312 if self._sockets is None:
313 raise RuntimeError(f'server {self!r} is closed')
314
315 self._start_serving()
316 self._serving_forever_fut = self._loop.create_future()
317
318 try:
319 await self._serving_forever_fut
320 except futures.CancelledError:
321 try:
322 self.close()
323 await self.wait_closed()
324 finally:
325 raise
326 finally:
327 self._serving_forever_fut = None
328
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200329 async def wait_closed(self):
Yury Selivanovc9070d02018-01-25 18:08:09 -0500330 if self._sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 return
Yury Selivanov7661db62016-05-16 15:38:39 -0400332 waiter = self._loop.create_future()
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200333 self._waiters.append(waiter)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200334 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335
336
337class BaseEventLoop(events.AbstractEventLoop):
338
339 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400340 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200341 self._closed = False
Guido van Rossum41f69f42015-11-19 13:28:47 -0800342 self._stopping = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 self._ready = collections.deque()
344 self._scheduled = []
345 self._default_executor = None
346 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100347 # Identifier of the thread running the event loop, or None if the
348 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100349 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100350 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500351 self._exception_handler = None
Victor Stinner44862df2017-11-20 07:14:07 -0800352 self.set_debug(coroutines._is_debug_mode())
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200353 # In debug mode, if the execution of a callback or a step of a task
354 # exceed this duration in seconds, the slow callback/task is logged.
355 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100356 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400357 self._task_factory = None
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800358 self._coroutine_origin_tracking_enabled = False
359 self._coroutine_origin_tracking_saved_depth = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500361 # A weak set of all asynchronous generators that are
362 # being iterated by the loop.
363 self._asyncgens = weakref.WeakSet()
Yury Selivanoveb636452016-09-08 22:01:51 -0700364 # Set to True when `loop.shutdown_asyncgens` is called.
365 self._asyncgens_shutdown_called = False
366
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200367 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500368 return (
369 f'<{self.__class__.__name__} running={self.is_running()} '
370 f'closed={self.is_closed()} debug={self.get_debug()}>'
371 )
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200372
Yury Selivanov7661db62016-05-16 15:38:39 -0400373 def create_future(self):
374 """Create a Future object attached to the loop."""
375 return futures.Future(loop=self)
376
Victor Stinner896a25a2014-07-08 11:29:25 +0200377 def create_task(self, coro):
378 """Schedule a coroutine object.
379
Victor Stinneracdb7822014-07-14 18:33:40 +0200380 Return a task object.
381 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100382 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400383 if self._task_factory is None:
384 task = tasks.Task(coro, loop=self)
385 if task._source_traceback:
386 del task._source_traceback[-1]
387 else:
388 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200389 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200390
Yury Selivanov740169c2015-05-11 14:23:38 -0400391 def set_task_factory(self, factory):
392 """Set a task factory that will be used by loop.create_task().
393
394 If factory is None the default task factory will be set.
395
396 If factory is a callable, it should have a signature matching
397 '(loop, coro)', where 'loop' will be a reference to the active
398 event loop, 'coro' will be a coroutine object. The callable
399 must return a Future.
400 """
401 if factory is not None and not callable(factory):
402 raise TypeError('task factory must be a callable or None')
403 self._task_factory = factory
404
405 def get_task_factory(self):
406 """Return a task factory, or None if the default one is in use."""
407 return self._task_factory
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 def _make_socket_transport(self, sock, protocol, waiter=None, *,
410 extra=None, server=None):
411 """Create socket transport."""
412 raise NotImplementedError
413
Neil Aspinallf7686c12017-12-19 19:45:42 +0000414 def _make_ssl_transport(
415 self, rawsock, protocol, sslcontext, waiter=None,
416 *, server_side=False, server_hostname=None,
417 extra=None, server=None,
Yury Selivanovf111b3d2017-12-30 00:35:36 -0500418 ssl_handshake_timeout=None,
419 call_connection_made=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 """Create SSL transport."""
421 raise NotImplementedError
422
423 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200424 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 """Create datagram transport."""
426 raise NotImplementedError
427
428 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
429 extra=None):
430 """Create read pipe transport."""
431 raise NotImplementedError
432
433 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
434 extra=None):
435 """Create write pipe transport."""
436 raise NotImplementedError
437
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200438 async def _make_subprocess_transport(self, protocol, args, shell,
439 stdin, stdout, stderr, bufsize,
440 extra=None, **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 """Create subprocess transport."""
442 raise NotImplementedError
443
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200445 """Write a byte to self-pipe, to wake up the event loop.
446
447 This may be called from a different thread.
448
449 The subclass is responsible for implementing the self-pipe.
450 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 raise NotImplementedError
452
453 def _process_events(self, event_list):
454 """Process selector events."""
455 raise NotImplementedError
456
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200457 def _check_closed(self):
458 if self._closed:
459 raise RuntimeError('Event loop is closed')
460
Yury Selivanoveb636452016-09-08 22:01:51 -0700461 def _asyncgen_finalizer_hook(self, agen):
462 self._asyncgens.discard(agen)
463 if not self.is_closed():
464 self.create_task(agen.aclose())
Yury Selivanoved054062016-10-21 17:13:40 -0400465 # Wake up the loop if the finalizer was called from
466 # a different thread.
467 self._write_to_self()
Yury Selivanoveb636452016-09-08 22:01:51 -0700468
469 def _asyncgen_firstiter_hook(self, agen):
470 if self._asyncgens_shutdown_called:
471 warnings.warn(
Yury Selivanov6370f342017-12-10 18:36:12 -0500472 f"asynchronous generator {agen!r} was scheduled after "
473 f"loop.shutdown_asyncgens() call",
Yury Selivanoveb636452016-09-08 22:01:51 -0700474 ResourceWarning, source=self)
475
476 self._asyncgens.add(agen)
477
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200478 async def shutdown_asyncgens(self):
Yury Selivanoveb636452016-09-08 22:01:51 -0700479 """Shutdown all active asynchronous generators."""
480 self._asyncgens_shutdown_called = True
481
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500482 if not len(self._asyncgens):
Yury Selivanov0a91d482016-09-15 13:24:03 -0400483 # If Python version is <3.6 or we don't have any asynchronous
484 # generators alive.
Yury Selivanoveb636452016-09-08 22:01:51 -0700485 return
486
487 closing_agens = list(self._asyncgens)
488 self._asyncgens.clear()
489
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200490 results = await tasks.gather(
Yury Selivanoveb636452016-09-08 22:01:51 -0700491 *[ag.aclose() for ag in closing_agens],
492 return_exceptions=True,
493 loop=self)
494
Yury Selivanoveb636452016-09-08 22:01:51 -0700495 for result, agen in zip(results, closing_agens):
496 if isinstance(result, Exception):
497 self.call_exception_handler({
Yury Selivanov6370f342017-12-10 18:36:12 -0500498 'message': f'an error occurred during closing of '
499 f'asynchronous generator {agen!r}',
Yury Selivanoveb636452016-09-08 22:01:51 -0700500 'exception': result,
501 'asyncgen': agen
502 })
503
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 def run_forever(self):
505 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200506 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100507 if self.is_running():
Yury Selivanov600a3492016-11-04 14:29:28 -0400508 raise RuntimeError('This event loop is already running')
509 if events._get_running_loop() is not None:
510 raise RuntimeError(
511 'Cannot run the event loop while another loop is running')
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800512 self._set_coroutine_origin_tracking(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100513 self._thread_id = threading.get_ident()
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500514
515 old_agen_hooks = sys.get_asyncgen_hooks()
516 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
517 finalizer=self._asyncgen_finalizer_hook)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 try:
Yury Selivanov600a3492016-11-04 14:29:28 -0400519 events._set_running_loop(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 while True:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800521 self._run_once()
522 if self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 break
524 finally:
Guido van Rossum41f69f42015-11-19 13:28:47 -0800525 self._stopping = False
Victor Stinnera87501f2015-02-05 11:45:33 +0100526 self._thread_id = None
Yury Selivanov600a3492016-11-04 14:29:28 -0400527 events._set_running_loop(None)
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -0800528 self._set_coroutine_origin_tracking(False)
Yury Selivanova4afcdf2018-01-21 14:56:59 -0500529 sys.set_asyncgen_hooks(*old_agen_hooks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530
531 def run_until_complete(self, future):
532 """Run until the Future is done.
533
534 If the argument is a coroutine, it is wrapped in a Task.
535
Victor Stinneracdb7822014-07-14 18:33:40 +0200536 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 with the same coroutine twice -- it would wrap it in two
538 different Tasks and that can't be good.
539
540 Return the Future's result, or raise its exception.
541 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200542 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200543
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700544 new_task = not futures.isfuture(future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400545 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200546 if new_task:
547 # An exception is raised if the future didn't complete, so there
548 # is no need to log the "destroy pending task" message
549 future._log_destroy_pending = False
550
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100551 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200552 try:
553 self.run_forever()
554 except:
555 if new_task and future.done() and not future.cancelled():
556 # The coroutine raised a BaseException. Consume the exception
557 # to not log a warning, the caller doesn't have access to the
558 # local task.
559 future.exception()
560 raise
jimmylai21b3e042017-05-22 22:32:46 -0700561 finally:
562 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 if not future.done():
564 raise RuntimeError('Event loop stopped before Future completed.')
565
566 return future.result()
567
568 def stop(self):
569 """Stop running the event loop.
570
Guido van Rossum41f69f42015-11-19 13:28:47 -0800571 Every callback already scheduled will still run. This simply informs
572 run_forever to stop looping after a complete iteration.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 """
Guido van Rossum41f69f42015-11-19 13:28:47 -0800574 self._stopping = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200576 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700577 """Close the event loop.
578
579 This clears the queues and shuts down the executor,
580 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200581
582 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700583 """
Victor Stinner956de692014-12-26 21:07:52 +0100584 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200585 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200586 if self._closed:
587 return
Victor Stinnere912e652014-07-12 03:11:53 +0200588 if self._debug:
589 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400590 self._closed = True
591 self._ready.clear()
592 self._scheduled.clear()
593 executor = self._default_executor
594 if executor is not None:
595 self._default_executor = None
596 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200597
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200598 def is_closed(self):
599 """Returns True if the event loop was closed."""
600 return self._closed
601
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900602 def __del__(self):
603 if not self.is_closed():
Yury Selivanov6370f342017-12-10 18:36:12 -0500604 warnings.warn(f"unclosed event loop {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900605 source=self)
606 if not self.is_running():
607 self.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100608
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200610 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100611 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612
613 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200614 """Return the time according to the event loop's clock.
615
616 This is a float expressed in seconds since an epoch, but the
617 epoch, precision, accuracy and drift are unspecified and may
618 differ per event loop.
619 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 return time.monotonic()
621
Yury Selivanovf23746a2018-01-22 19:11:18 -0500622 def call_later(self, delay, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623 """Arrange for a callback to be called at a given time.
624
625 Return a Handle: an opaque object with a cancel() method that
626 can be used to cancel the call.
627
628 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200629 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630
631 Each callback will be called exactly once. If two callbacks
632 are scheduled for exactly the same time, it undefined which
633 will be called first.
634
635 Any positional arguments after the callback will be passed to
636 the callback when it is called.
637 """
Yury Selivanovf23746a2018-01-22 19:11:18 -0500638 timer = self.call_at(self.time() + delay, callback, *args,
639 context=context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200640 if timer._source_traceback:
641 del timer._source_traceback[-1]
642 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
Yury Selivanovf23746a2018-01-22 19:11:18 -0500644 def call_at(self, when, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200645 """Like call_later(), but uses an absolute time.
646
647 Absolute time corresponds to the event loop's time() method.
648 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100649 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100650 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100651 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700652 self._check_callback(callback, 'call_at')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500653 timer = events.TimerHandle(when, callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200654 if timer._source_traceback:
655 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400657 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 return timer
659
Yury Selivanovf23746a2018-01-22 19:11:18 -0500660 def call_soon(self, callback, *args, context=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 """Arrange for a callback to be called as soon as possible.
662
Victor Stinneracdb7822014-07-14 18:33:40 +0200663 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 order in which they are registered. Each callback will be
665 called exactly once.
666
667 Any positional arguments after the callback will be passed to
668 the callback when it is called.
669 """
Yury Selivanov491a9122016-11-03 15:09:24 -0700670 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100671 if self._debug:
672 self._check_thread()
Yury Selivanov491a9122016-11-03 15:09:24 -0700673 self._check_callback(callback, 'call_soon')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500674 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200675 if handle._source_traceback:
676 del handle._source_traceback[-1]
677 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100678
Yury Selivanov491a9122016-11-03 15:09:24 -0700679 def _check_callback(self, callback, method):
680 if (coroutines.iscoroutine(callback) or
681 coroutines.iscoroutinefunction(callback)):
682 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500683 f"coroutines cannot be used with {method}()")
Yury Selivanov491a9122016-11-03 15:09:24 -0700684 if not callable(callback):
685 raise TypeError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500686 f'a callable object was expected by {method}(), '
687 f'got {callback!r}')
Yury Selivanov491a9122016-11-03 15:09:24 -0700688
Yury Selivanovf23746a2018-01-22 19:11:18 -0500689 def _call_soon(self, callback, args, context):
690 handle = events.Handle(callback, args, self, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200691 if handle._source_traceback:
692 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 self._ready.append(handle)
694 return handle
695
Victor Stinner956de692014-12-26 21:07:52 +0100696 def _check_thread(self):
697 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100698
Victor Stinneracdb7822014-07-14 18:33:40 +0200699 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100700 likely behave incorrectly when the assumption is violated.
701
Victor Stinneracdb7822014-07-14 18:33:40 +0200702 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100703 responsible for checking this condition for performance reasons.
704 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100705 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200706 return
Victor Stinner956de692014-12-26 21:07:52 +0100707 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100708 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100709 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200710 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100711 "than the current one")
712
Yury Selivanovf23746a2018-01-22 19:11:18 -0500713 def call_soon_threadsafe(self, callback, *args, context=None):
Victor Stinneracdb7822014-07-14 18:33:40 +0200714 """Like call_soon(), but thread-safe."""
Yury Selivanov491a9122016-11-03 15:09:24 -0700715 self._check_closed()
716 if self._debug:
717 self._check_callback(callback, 'call_soon_threadsafe')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500718 handle = self._call_soon(callback, args, context)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200719 if handle._source_traceback:
720 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 self._write_to_self()
722 return handle
723
Yury Selivanovbec23722018-01-28 14:09:40 -0500724 def run_in_executor(self, executor, func, *args):
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100725 self._check_closed()
Yury Selivanov491a9122016-11-03 15:09:24 -0700726 if self._debug:
727 self._check_callback(func, 'run_in_executor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700728 if executor is None:
729 executor = self._default_executor
730 if executor is None:
Yury Selivanove8a60452016-10-21 17:40:42 -0400731 executor = concurrent.futures.ThreadPoolExecutor()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 self._default_executor = executor
Yury Selivanovbec23722018-01-28 14:09:40 -0500733 return futures.wrap_future(
Yury Selivanov19a44f62017-12-14 20:53:26 -0500734 executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735
736 def set_default_executor(self, executor):
737 self._default_executor = executor
738
Victor Stinnere912e652014-07-12 03:11:53 +0200739 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
Yury Selivanov6370f342017-12-10 18:36:12 -0500740 msg = [f"{host}:{port!r}"]
Victor Stinnere912e652014-07-12 03:11:53 +0200741 if family:
Yury Selivanov19d0d542017-12-10 19:52:53 -0500742 msg.append(f'family={family!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200743 if type:
Yury Selivanov6370f342017-12-10 18:36:12 -0500744 msg.append(f'type={type!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200745 if proto:
Yury Selivanov6370f342017-12-10 18:36:12 -0500746 msg.append(f'proto={proto!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200747 if flags:
Yury Selivanov6370f342017-12-10 18:36:12 -0500748 msg.append(f'flags={flags!r}')
Victor Stinnere912e652014-07-12 03:11:53 +0200749 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200750 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200751
752 t0 = self.time()
753 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
754 dt = self.time() - t0
755
Yury Selivanov6370f342017-12-10 18:36:12 -0500756 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
Victor Stinnere912e652014-07-12 03:11:53 +0200757 if dt >= self.slow_callback_duration:
758 logger.info(msg)
759 else:
760 logger.debug(msg)
761 return addrinfo
762
Yury Selivanov19a44f62017-12-14 20:53:26 -0500763 async def getaddrinfo(self, host, port, *,
764 family=0, type=0, proto=0, flags=0):
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400765 if self._debug:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500766 getaddr_func = self._getaddrinfo_debug
Victor Stinnere912e652014-07-12 03:11:53 +0200767 else:
Yury Selivanov19a44f62017-12-14 20:53:26 -0500768 getaddr_func = socket.getaddrinfo
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700769
Yury Selivanov19a44f62017-12-14 20:53:26 -0500770 return await self.run_in_executor(
771 None, getaddr_func, host, port, family, type, proto, flags)
772
773 async def getnameinfo(self, sockaddr, flags=0):
774 return await self.run_in_executor(
775 None, socket.getnameinfo, sockaddr, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200777 async def sock_sendfile(self, sock, file, offset=0, count=None,
778 *, fallback=True):
779 if self._debug and sock.gettimeout() != 0:
780 raise ValueError("the socket must be non-blocking")
781 self._check_sendfile_params(sock, file, offset, count)
782 try:
783 return await self._sock_sendfile_native(sock, file,
784 offset, count)
Andrew Svetlov7464e872018-01-19 20:04:29 +0200785 except events.SendfileNotAvailableError as exc:
786 if not fallback:
787 raise
788 return await self._sock_sendfile_fallback(sock, file,
789 offset, count)
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200790
791 async def _sock_sendfile_native(self, sock, file, offset, count):
792 # NB: sendfile syscall is not supported for SSL sockets and
793 # non-mmap files even if sendfile is supported by OS
Andrew Svetlov7464e872018-01-19 20:04:29 +0200794 raise events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200795 f"syscall sendfile is not available for socket {sock!r} "
796 "and file {file!r} combination")
797
798 async def _sock_sendfile_fallback(self, sock, file, offset, count):
799 if offset:
800 file.seek(offset)
801 blocksize = min(count, 16384) if count else 16384
802 buf = bytearray(blocksize)
803 total_sent = 0
804 try:
805 while True:
806 if count:
807 blocksize = min(count - total_sent, blocksize)
808 if blocksize <= 0:
809 break
810 view = memoryview(buf)[:blocksize]
811 read = file.readinto(view)
812 if not read:
813 break # EOF
814 await self.sock_sendall(sock, view)
815 total_sent += read
816 return total_sent
817 finally:
818 if total_sent > 0 and hasattr(file, 'seek'):
819 file.seek(offset + total_sent)
820
821 def _check_sendfile_params(self, sock, file, offset, count):
822 if 'b' not in getattr(file, 'mode', 'b'):
823 raise ValueError("file should be opened in binary mode")
824 if not sock.type == socket.SOCK_STREAM:
825 raise ValueError("only SOCK_STREAM type sockets are supported")
826 if count is not None:
827 if not isinstance(count, int):
828 raise TypeError(
829 "count must be a positive integer (got {!r})".format(count))
830 if count <= 0:
831 raise ValueError(
832 "count must be a positive integer (got {!r})".format(count))
833 if not isinstance(offset, int):
834 raise TypeError(
835 "offset must be a non-negative integer (got {!r})".format(
836 offset))
837 if offset < 0:
838 raise ValueError(
839 "offset must be a non-negative integer (got {!r})".format(
840 offset))
841
Neil Aspinallf7686c12017-12-19 19:45:42 +0000842 async def create_connection(
843 self, protocol_factory, host=None, port=None,
844 *, ssl=None, family=0,
845 proto=0, flags=0, sock=None,
846 local_addr=None, server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200847 ssl_handshake_timeout=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200848 """Connect to a TCP server.
849
850 Create a streaming transport connection to a given Internet host and
851 port: socket family AF_INET or socket.AF_INET6 depending on host (or
852 family if specified), socket type SOCK_STREAM. protocol_factory must be
853 a callable returning a protocol instance.
854
855 This method is a coroutine which will try to establish the connection
856 in the background. When successful, the coroutine returns a
857 (transport, protocol) pair.
858 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700859 if server_hostname is not None and not ssl:
860 raise ValueError('server_hostname is only meaningful with ssl')
861
862 if server_hostname is None and ssl:
863 # Use host as default for server_hostname. It is an error
864 # if host is empty or not set, e.g. when an
865 # already-connected socket was passed or when only a port
866 # is given. To avoid this error, you can pass
867 # server_hostname='' -- this will bypass the hostname
868 # check. (This also means that if host is a numeric
869 # IP/IPv6 address, we will attempt to verify that exact
870 # address; this will probably fail, but it is possible to
871 # create a certificate for a specific IP address, so we
872 # don't judge it here.)
873 if not host:
874 raise ValueError('You must set server_hostname '
875 'when using ssl without a host')
876 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700877
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200878 if ssl_handshake_timeout is not None and not ssl:
879 raise ValueError(
880 'ssl_handshake_timeout is only meaningful with ssl')
881
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700882 if host is not None or port is not None:
883 if sock is not None:
884 raise ValueError(
885 'host/port and sock can not be specified at the same time')
886
Yury Selivanov19a44f62017-12-14 20:53:26 -0500887 infos = await self._ensure_resolved(
888 (host, port), family=family,
889 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700890 if not infos:
891 raise OSError('getaddrinfo() returned empty list')
Yury Selivanov19a44f62017-12-14 20:53:26 -0500892
893 if local_addr is not None:
894 laddr_infos = await self._ensure_resolved(
895 local_addr, family=family,
896 type=socket.SOCK_STREAM, proto=proto,
897 flags=flags, loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700898 if not laddr_infos:
899 raise OSError('getaddrinfo() returned empty list')
900
901 exceptions = []
902 for family, type, proto, cname, address in infos:
903 try:
904 sock = socket.socket(family=family, type=type, proto=proto)
905 sock.setblocking(False)
Yury Selivanov19a44f62017-12-14 20:53:26 -0500906 if local_addr is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700907 for _, _, _, _, laddr in laddr_infos:
908 try:
909 sock.bind(laddr)
910 break
911 except OSError as exc:
Yury Selivanov6370f342017-12-10 18:36:12 -0500912 msg = (
913 f'error while attempting to bind on '
914 f'address {laddr!r}: '
915 f'{exc.strerror.lower()}'
916 )
917 exc = OSError(exc.errno, msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700918 exceptions.append(exc)
919 else:
920 sock.close()
921 sock = None
922 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200923 if self._debug:
924 logger.debug("connect %r to %r", sock, address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200925 await self.sock_connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700926 except OSError as exc:
927 if sock is not None:
928 sock.close()
929 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200930 except:
931 if sock is not None:
932 sock.close()
933 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700934 else:
935 break
936 else:
937 if len(exceptions) == 1:
938 raise exceptions[0]
939 else:
940 # If they all have the same str(), raise one.
941 model = str(exceptions[0])
942 if all(str(exc) == model for exc in exceptions):
943 raise exceptions[0]
944 # Raise a combined exception so the user can see all
945 # the various error messages.
946 raise OSError('Multiple exceptions: {}'.format(
947 ', '.join(str(exc) for exc in exceptions)))
948
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500949 else:
950 if sock is None:
951 raise ValueError(
952 'host and port was not specified and no sock specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500953 if sock.type != socket.SOCK_STREAM:
Yury Selivanovdab05842016-11-21 17:47:27 -0500954 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
955 # are SOCK_STREAM.
956 # We support passing AF_UNIX sockets even though we have
957 # a dedicated API for that: create_unix_connection.
958 # Disallowing AF_UNIX in this method, breaks backwards
959 # compatibility.
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500960 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500961 f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700962
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200963 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000964 sock, protocol_factory, ssl, server_hostname,
965 ssl_handshake_timeout=ssl_handshake_timeout)
Victor Stinnere912e652014-07-12 03:11:53 +0200966 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200967 # Get the socket from the transport because SSL transport closes
968 # the old socket and creates a new SSL socket
969 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200970 logger.debug("%r connected to %s:%r: (%r, %r)",
971 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500972 return transport, protocol
973
Neil Aspinallf7686c12017-12-19 19:45:42 +0000974 async def _create_connection_transport(
975 self, sock, protocol_factory, ssl,
976 server_hostname, server_side=False,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200977 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -0400978
979 sock.setblocking(False)
980
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700981 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400982 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700983 if ssl:
984 sslcontext = None if isinstance(ssl, bool) else ssl
985 transport = self._make_ssl_transport(
986 sock, protocol, sslcontext, waiter,
Neil Aspinallf7686c12017-12-19 19:45:42 +0000987 server_side=server_side, server_hostname=server_hostname,
988 ssl_handshake_timeout=ssl_handshake_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700989 else:
990 transport = self._make_socket_transport(sock, protocol, waiter)
991
Victor Stinner29ad0112015-01-15 00:04:21 +0100992 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200993 await waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100994 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100995 transport.close()
996 raise
997
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700998 return transport, protocol
999
Andrew Svetlov7c684072018-01-27 21:22:47 +02001000 async def sendfile(self, transport, file, offset=0, count=None,
1001 *, fallback=True):
1002 """Send a file to transport.
1003
1004 Return the total number of bytes which were sent.
1005
1006 The method uses high-performance os.sendfile if available.
1007
1008 file must be a regular file object opened in binary mode.
1009
1010 offset tells from where to start reading the file. If specified,
1011 count is the total number of bytes to transmit as opposed to
1012 sending the file until EOF is reached. File position is updated on
1013 return or also in case of error in which case file.tell()
1014 can be used to figure out the number of bytes
1015 which were sent.
1016
1017 fallback set to True makes asyncio to manually read and send
1018 the file when the platform does not support the sendfile syscall
1019 (e.g. Windows or SSL socket on Unix).
1020
1021 Raise SendfileNotAvailableError if the system does not support
1022 sendfile syscall and fallback is False.
1023 """
1024 if transport.is_closing():
1025 raise RuntimeError("Transport is closing")
1026 mode = getattr(transport, '_sendfile_compatible',
1027 constants._SendfileMode.UNSUPPORTED)
1028 if mode is constants._SendfileMode.UNSUPPORTED:
1029 raise RuntimeError(
1030 f"sendfile is not supported for transport {transport!r}")
1031 if mode is constants._SendfileMode.TRY_NATIVE:
1032 try:
1033 return await self._sendfile_native(transport, file,
1034 offset, count)
1035 except events.SendfileNotAvailableError as exc:
1036 if not fallback:
1037 raise
Yury Selivanovb1a6ac42018-01-27 15:52:52 -05001038
1039 if not fallback:
1040 raise RuntimeError(
1041 f"fallback is disabled and native sendfile is not "
1042 f"supported for transport {transport!r}")
1043
Andrew Svetlov7c684072018-01-27 21:22:47 +02001044 return await self._sendfile_fallback(transport, file,
1045 offset, count)
1046
1047 async def _sendfile_native(self, transp, file, offset, count):
1048 raise events.SendfileNotAvailableError(
1049 "sendfile syscall is not supported")
1050
1051 async def _sendfile_fallback(self, transp, file, offset, count):
1052 if offset:
1053 file.seek(offset)
1054 blocksize = min(count, 16384) if count else 16384
1055 buf = bytearray(blocksize)
1056 total_sent = 0
1057 proto = _SendfileFallbackProtocol(transp)
1058 try:
1059 while True:
1060 if count:
1061 blocksize = min(count - total_sent, blocksize)
1062 if blocksize <= 0:
1063 return total_sent
1064 view = memoryview(buf)[:blocksize]
1065 read = file.readinto(view)
1066 if not read:
1067 return total_sent # EOF
1068 await proto.drain()
1069 transp.write(view)
1070 total_sent += read
1071 finally:
1072 if total_sent > 0 and hasattr(file, 'seek'):
1073 file.seek(offset + total_sent)
1074 await proto.restore()
1075
Yury Selivanovf111b3d2017-12-30 00:35:36 -05001076 async def start_tls(self, transport, protocol, sslcontext, *,
1077 server_side=False,
1078 server_hostname=None,
1079 ssl_handshake_timeout=None):
1080 """Upgrade transport to TLS.
1081
1082 Return a new transport that *protocol* should start using
1083 immediately.
1084 """
1085 if ssl is None:
1086 raise RuntimeError('Python ssl module is not available')
1087
1088 if not isinstance(sslcontext, ssl.SSLContext):
1089 raise TypeError(
1090 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1091 f'got {sslcontext!r}')
1092
1093 if not getattr(transport, '_start_tls_compatible', False):
1094 raise TypeError(
1095 f'transport {self!r} is not supported by start_tls()')
1096
1097 waiter = self.create_future()
1098 ssl_protocol = sslproto.SSLProtocol(
1099 self, protocol, sslcontext, waiter,
1100 server_side, server_hostname,
1101 ssl_handshake_timeout=ssl_handshake_timeout,
1102 call_connection_made=False)
1103
1104 transport.set_protocol(ssl_protocol)
1105 self.call_soon(ssl_protocol.connection_made, transport)
1106 if not transport.is_reading():
1107 self.call_soon(transport.resume_reading)
1108
1109 await waiter
1110 return ssl_protocol._app_transport
1111
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001112 async def create_datagram_endpoint(self, protocol_factory,
1113 local_addr=None, remote_addr=None, *,
1114 family=0, proto=0, flags=0,
1115 reuse_address=None, reuse_port=None,
1116 allow_broadcast=None, sock=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001117 """Create datagram connection."""
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001118 if sock is not None:
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001119 if sock.type != socket.SOCK_DGRAM:
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001120 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001121 f'A UDP Socket was expected, got {sock!r}')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001122 if (local_addr or remote_addr or
1123 family or proto or flags or
1124 reuse_address or reuse_port or allow_broadcast):
1125 # show the problematic kwargs in exception msg
1126 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1127 family=family, proto=proto, flags=flags,
1128 reuse_address=reuse_address, reuse_port=reuse_port,
1129 allow_broadcast=allow_broadcast)
Yury Selivanov6370f342017-12-10 18:36:12 -05001130 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001131 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -05001132 f'socket modifier keyword arguments can not be used '
1133 f'when sock is specified. ({problems})')
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001134 sock.setblocking(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001135 r_addr = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001136 else:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001137 if not (local_addr or remote_addr):
1138 if family == 0:
1139 raise ValueError('unexpected address family')
1140 addr_pairs_info = (((family, proto), (None, None)),)
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001141 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1142 for addr in (local_addr, remote_addr):
Victor Stinner28e61652017-11-28 00:34:08 +01001143 if addr is not None and not isinstance(addr, str):
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +01001144 raise TypeError('string is expected')
1145 addr_pairs_info = (((family, proto),
1146 (local_addr, remote_addr)), )
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001147 else:
1148 # join address by (family, protocol)
1149 addr_infos = collections.OrderedDict()
1150 for idx, addr in ((0, local_addr), (1, remote_addr)):
1151 if addr is not None:
1152 assert isinstance(addr, tuple) and len(addr) == 2, (
1153 '2-tuple is expected')
1154
Yury Selivanov19a44f62017-12-14 20:53:26 -05001155 infos = await self._ensure_resolved(
Yury Selivanovf1c6fa92016-06-08 12:33:31 -04001156 addr, family=family, type=socket.SOCK_DGRAM,
1157 proto=proto, flags=flags, loop=self)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001158 if not infos:
1159 raise OSError('getaddrinfo() returned empty list')
1160
1161 for fam, _, pro, _, address in infos:
1162 key = (fam, pro)
1163 if key not in addr_infos:
1164 addr_infos[key] = [None, None]
1165 addr_infos[key][idx] = address
1166
1167 # each addr has to have info for each (family, proto) pair
1168 addr_pairs_info = [
1169 (key, addr_pair) for key, addr_pair in addr_infos.items()
1170 if not ((local_addr and addr_pair[0] is None) or
1171 (remote_addr and addr_pair[1] is None))]
1172
1173 if not addr_pairs_info:
1174 raise ValueError('can not get address information')
1175
1176 exceptions = []
1177
1178 if reuse_address is None:
1179 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1180
1181 for ((family, proto),
1182 (local_address, remote_address)) in addr_pairs_info:
1183 sock = None
1184 r_addr = None
1185 try:
1186 sock = socket.socket(
1187 family=family, type=socket.SOCK_DGRAM, proto=proto)
1188 if reuse_address:
1189 sock.setsockopt(
1190 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1191 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001192 _set_reuseport(sock)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001193 if allow_broadcast:
1194 sock.setsockopt(
1195 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1196 sock.setblocking(False)
1197
1198 if local_addr:
1199 sock.bind(local_address)
1200 if remote_addr:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001201 await self.sock_connect(sock, remote_address)
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001202 r_addr = remote_address
1203 except OSError as exc:
1204 if sock is not None:
1205 sock.close()
1206 exceptions.append(exc)
1207 except:
1208 if sock is not None:
1209 sock.close()
1210 raise
1211 else:
1212 break
1213 else:
1214 raise exceptions[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001215
1216 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001217 waiter = self.create_future()
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001218 transport = self._make_datagram_transport(
1219 sock, protocol, r_addr, waiter)
Victor Stinnere912e652014-07-12 03:11:53 +02001220 if self._debug:
1221 if local_addr:
1222 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1223 "created: (%r, %r)",
1224 local_addr, remote_addr, transport, protocol)
1225 else:
1226 logger.debug("Datagram endpoint remote_addr=%r created: "
1227 "(%r, %r)",
1228 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +01001229
1230 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001231 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001232 except:
1233 transport.close()
1234 raise
1235
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001236 return transport, protocol
1237
Yury Selivanov19a44f62017-12-14 20:53:26 -05001238 async def _ensure_resolved(self, address, *,
1239 family=0, type=socket.SOCK_STREAM,
1240 proto=0, flags=0, loop):
1241 host, port = address[:2]
1242 info = _ipaddr_info(host, port, family, type, proto)
1243 if info is not None:
1244 # "host" is already a resolved IP.
1245 return [info]
1246 else:
1247 return await loop.getaddrinfo(host, port, family=family, type=type,
1248 proto=proto, flags=flags)
1249
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001250 async def _create_server_getaddrinfo(self, host, port, family, flags):
Yury Selivanov19a44f62017-12-14 20:53:26 -05001251 infos = await self._ensure_resolved((host, port), family=family,
1252 type=socket.SOCK_STREAM,
1253 flags=flags, loop=self)
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001254 if not infos:
Yury Selivanov6370f342017-12-10 18:36:12 -05001255 raise OSError(f'getaddrinfo({host!r}) returned empty list')
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001256 return infos
1257
Neil Aspinallf7686c12017-12-19 19:45:42 +00001258 async def create_server(
1259 self, protocol_factory, host=None, port=None,
1260 *,
1261 family=socket.AF_UNSPEC,
1262 flags=socket.AI_PASSIVE,
1263 sock=None,
1264 backlog=100,
1265 ssl=None,
1266 reuse_address=None,
1267 reuse_port=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -05001268 ssl_handshake_timeout=None,
1269 start_serving=True):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001270 """Create a TCP server.
1271
Yury Selivanov6370f342017-12-10 18:36:12 -05001272 The host parameter can be a string, in that case the TCP server is
1273 bound to host and port.
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001274
1275 The host parameter can also be a sequence of strings and in that case
Yury Selivanove076ffb2016-03-02 11:17:01 -05001276 the TCP server is bound to all hosts of the sequence. If a host
1277 appears multiple times (possibly indirectly e.g. when hostnames
1278 resolve to the same IP address), the server is only bound once to that
1279 host.
Victor Stinnerd1432092014-06-19 17:11:49 +02001280
Victor Stinneracdb7822014-07-14 18:33:40 +02001281 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +02001282
1283 This method is a coroutine.
1284 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -07001285 if isinstance(ssl, bool):
1286 raise TypeError('ssl argument must be an SSLContext or None')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001287
1288 if ssl_handshake_timeout is not None and ssl is None:
1289 raise ValueError(
1290 'ssl_handshake_timeout is only meaningful with ssl')
1291
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001292 if host is not None or port is not None:
1293 if sock is not None:
1294 raise ValueError(
1295 'host/port and sock can not be specified at the same time')
1296
1297 AF_INET6 = getattr(socket, 'AF_INET6', 0)
1298 if reuse_address is None:
1299 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1300 sockets = []
1301 if host == '':
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001302 hosts = [None]
1303 elif (isinstance(host, str) or
Serhiy Storchaka2e576f52017-04-24 09:05:00 +03001304 not isinstance(host, collections.abc.Iterable)):
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001305 hosts = [host]
1306 else:
1307 hosts = host
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001308
Victor Stinner5e4a7d82015-09-21 18:33:43 +02001309 fs = [self._create_server_getaddrinfo(host, port, family=family,
1310 flags=flags)
1311 for host in hosts]
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001312 infos = await tasks.gather(*fs, loop=self)
Yury Selivanove076ffb2016-03-02 11:17:01 -05001313 infos = set(itertools.chain.from_iterable(infos))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001314
1315 completed = False
1316 try:
1317 for res in infos:
1318 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -07001319 try:
1320 sock = socket.socket(af, socktype, proto)
1321 except socket.error:
1322 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +02001323 if self._debug:
1324 logger.warning('create_server() failed to create '
1325 'socket.socket(%r, %r, %r)',
1326 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -07001327 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001328 sockets.append(sock)
1329 if reuse_address:
Guido van Rossumb9bf9132015-10-05 09:15:28 -07001330 sock.setsockopt(
1331 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1332 if reuse_port:
Yury Selivanov5587d7c2016-09-15 15:45:07 -04001333 _set_reuseport(sock)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001334 # Disable IPv4/IPv6 dual stack support (enabled by
1335 # default on Linux) which makes a single socket
1336 # listen on both address families.
1337 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
1338 sock.setsockopt(socket.IPPROTO_IPV6,
1339 socket.IPV6_V6ONLY,
1340 True)
1341 try:
1342 sock.bind(sa)
1343 except OSError as err:
1344 raise OSError(err.errno, 'error while attempting '
1345 'to bind on address %r: %s'
Serhiy Storchaka5affd232017-04-05 09:37:24 +03001346 % (sa, err.strerror.lower())) from None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001347 completed = True
1348 finally:
1349 if not completed:
1350 for sock in sockets:
1351 sock.close()
1352 else:
1353 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +02001354 raise ValueError('Neither host/port nor sock were specified')
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001355 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001356 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001357 sockets = [sock]
1358
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001359 for sock in sockets:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001360 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -05001361
1362 server = Server(self, sockets, protocol_factory,
1363 ssl, backlog, ssl_handshake_timeout)
1364 if start_serving:
1365 server._start_serving()
1366
Victor Stinnere912e652014-07-12 03:11:53 +02001367 if self._debug:
1368 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001369 return server
1370
Neil Aspinallf7686c12017-12-19 19:45:42 +00001371 async def connect_accepted_socket(
1372 self, protocol_factory, sock,
1373 *, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001374 ssl_handshake_timeout=None):
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001375 """Handle an accepted connection.
1376
1377 This is used by servers that accept connections outside of
1378 asyncio but that use asyncio to handle connections.
1379
1380 This method is a coroutine. When completed, the coroutine
1381 returns a (transport, protocol) pair.
1382 """
Yury Selivanova7bd64c2017-12-19 06:44:37 -05001383 if sock.type != socket.SOCK_STREAM:
Yury Selivanov6370f342017-12-10 18:36:12 -05001384 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
Yury Selivanova1a8b7d2016-11-09 15:47:00 -05001385
Andrew Svetlov51eb1c62017-12-20 20:24:43 +02001386 if ssl_handshake_timeout is not None and not ssl:
1387 raise ValueError(
1388 'ssl_handshake_timeout is only meaningful with ssl')
1389
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001390 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +00001391 sock, protocol_factory, ssl, '', server_side=True,
1392 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanov252e9ed2016-07-12 18:23:10 -04001393 if self._debug:
1394 # Get the socket from the transport because SSL transport closes
1395 # the old socket and creates a new SSL socket
1396 sock = transport.get_extra_info('socket')
1397 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1398 return transport, protocol
1399
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001400 async def connect_read_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001401 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001402 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001403 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001404
1405 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001406 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001407 except:
1408 transport.close()
1409 raise
1410
Victor Stinneracdb7822014-07-14 18:33:40 +02001411 if self._debug:
1412 logger.debug('Read pipe %r connected: (%r, %r)',
1413 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001414 return transport, protocol
1415
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001416 async def connect_write_pipe(self, protocol_factory, pipe):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001417 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -04001418 waiter = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001419 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +01001420
1421 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001422 await waiter
Victor Stinner2596dd02015-01-26 11:02:18 +01001423 except:
1424 transport.close()
1425 raise
1426
Victor Stinneracdb7822014-07-14 18:33:40 +02001427 if self._debug:
1428 logger.debug('Write pipe %r connected: (%r, %r)',
1429 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001430 return transport, protocol
1431
Victor Stinneracdb7822014-07-14 18:33:40 +02001432 def _log_subprocess(self, msg, stdin, stdout, stderr):
1433 info = [msg]
1434 if stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001435 info.append(f'stdin={_format_pipe(stdin)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001436 if stdout is not None and stderr == subprocess.STDOUT:
Yury Selivanov6370f342017-12-10 18:36:12 -05001437 info.append(f'stdout=stderr={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001438 else:
1439 if stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001440 info.append(f'stdout={_format_pipe(stdout)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001441 if stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -05001442 info.append(f'stderr={_format_pipe(stderr)}')
Victor Stinneracdb7822014-07-14 18:33:40 +02001443 logger.debug(' '.join(info))
1444
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001445 async def subprocess_shell(self, protocol_factory, cmd, *,
1446 stdin=subprocess.PIPE,
1447 stdout=subprocess.PIPE,
1448 stderr=subprocess.PIPE,
1449 universal_newlines=False,
1450 shell=True, bufsize=0,
1451 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +01001452 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -08001453 raise ValueError("cmd must be a string")
1454 if universal_newlines:
1455 raise ValueError("universal_newlines must be False")
1456 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +01001457 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -08001458 if bufsize != 0:
1459 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001460 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001461 if self._debug:
1462 # don't log parameters: they may contain sensitive information
1463 # (password) and may be too long
1464 debug_log = 'run shell command %r' % cmd
1465 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001466 transport = await self._make_subprocess_transport(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001467 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001468 if self._debug:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001469 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001470 return transport, protocol
1471
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001472 async def subprocess_exec(self, protocol_factory, program, *args,
1473 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1474 stderr=subprocess.PIPE, universal_newlines=False,
1475 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -08001476 if universal_newlines:
1477 raise ValueError("universal_newlines must be False")
1478 if shell:
1479 raise ValueError("shell must be False")
1480 if bufsize != 0:
1481 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +01001482 popen_args = (program,) + args
1483 for arg in popen_args:
1484 if not isinstance(arg, (str, bytes)):
Yury Selivanov6370f342017-12-10 18:36:12 -05001485 raise TypeError(
1486 f"program arguments must be a bytes or text string, "
1487 f"not {type(arg).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001488 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +02001489 if self._debug:
1490 # don't log parameters: they may contain sensitive information
1491 # (password) and may be too long
Yury Selivanov6370f342017-12-10 18:36:12 -05001492 debug_log = f'execute program {program!r}'
Victor Stinneracdb7822014-07-14 18:33:40 +02001493 self._log_subprocess(debug_log, stdin, stdout, stderr)
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001494 transport = await self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -05001495 protocol, popen_args, False, stdin, stdout, stderr,
1496 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +02001497 if self._debug:
Vinay Sajipdd917f82016-08-31 08:22:29 +01001498 logger.info('%s: %r', debug_log, transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001499 return transport, protocol
1500
Yury Selivanov7ed7ce62016-05-16 15:20:38 -04001501 def get_exception_handler(self):
1502 """Return an exception handler, or None if the default one is in use.
1503 """
1504 return self._exception_handler
1505
Yury Selivanov569efa22014-02-18 18:02:19 -05001506 def set_exception_handler(self, handler):
1507 """Set handler as the new event loop exception handler.
1508
1509 If handler is None, the default exception handler will
1510 be set.
1511
1512 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +02001513 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -05001514 will be a reference to the active event loop, 'context'
1515 will be a dict object (see `call_exception_handler()`
1516 documentation for details about context).
1517 """
1518 if handler is not None and not callable(handler):
Yury Selivanov6370f342017-12-10 18:36:12 -05001519 raise TypeError(f'A callable object or None is expected, '
1520 f'got {handler!r}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001521 self._exception_handler = handler
1522
1523 def default_exception_handler(self, context):
1524 """Default exception handler.
1525
1526 This is called when an exception occurs and no exception
1527 handler is set, and can be called by a custom exception
1528 handler that wants to defer to the default behavior.
1529
Antoine Pitrou921e9432017-11-07 17:23:29 +01001530 This default handler logs the error message and other
1531 context-dependent information. In debug mode, a truncated
1532 stack trace is also appended showing where the given object
1533 (e.g. a handle or future or task) was created, if any.
1534
Victor Stinneracdb7822014-07-14 18:33:40 +02001535 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -05001536 `call_exception_handler()`.
1537 """
1538 message = context.get('message')
1539 if not message:
1540 message = 'Unhandled exception in event loop'
1541
1542 exception = context.get('exception')
1543 if exception is not None:
1544 exc_info = (type(exception), exception, exception.__traceback__)
1545 else:
1546 exc_info = False
1547
Yury Selivanov6370f342017-12-10 18:36:12 -05001548 if ('source_traceback' not in context and
1549 self._current_handle is not None and
1550 self._current_handle._source_traceback):
1551 context['handle_traceback'] = \
1552 self._current_handle._source_traceback
Victor Stinner9b524d52015-01-26 11:05:12 +01001553
Yury Selivanov569efa22014-02-18 18:02:19 -05001554 log_lines = [message]
1555 for key in sorted(context):
1556 if key in {'message', 'exception'}:
1557 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001558 value = context[key]
1559 if key == 'source_traceback':
1560 tb = ''.join(traceback.format_list(value))
1561 value = 'Object created at (most recent call last):\n'
1562 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001563 elif key == 'handle_traceback':
1564 tb = ''.join(traceback.format_list(value))
1565 value = 'Handle created at (most recent call last):\n'
1566 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001567 else:
1568 value = repr(value)
Yury Selivanov6370f342017-12-10 18:36:12 -05001569 log_lines.append(f'{key}: {value}')
Yury Selivanov569efa22014-02-18 18:02:19 -05001570
1571 logger.error('\n'.join(log_lines), exc_info=exc_info)
1572
1573 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001574 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001575
Victor Stinneracdb7822014-07-14 18:33:40 +02001576 The context argument is a dict containing the following keys:
1577
Yury Selivanov569efa22014-02-18 18:02:19 -05001578 - 'message': Error message;
1579 - 'exception' (optional): Exception object;
1580 - 'future' (optional): Future instance;
Yury Selivanova4afcdf2018-01-21 14:56:59 -05001581 - 'task' (optional): Task instance;
Yury Selivanov569efa22014-02-18 18:02:19 -05001582 - 'handle' (optional): Handle instance;
1583 - 'protocol' (optional): Protocol instance;
1584 - 'transport' (optional): Transport instance;
Yury Selivanoveb636452016-09-08 22:01:51 -07001585 - 'socket' (optional): Socket instance;
1586 - 'asyncgen' (optional): Asynchronous generator that caused
1587 the exception.
Yury Selivanov569efa22014-02-18 18:02:19 -05001588
Victor Stinneracdb7822014-07-14 18:33:40 +02001589 New keys maybe introduced in the future.
1590
1591 Note: do not overload this method in an event loop subclass.
1592 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001593 `set_exception_handler()` method.
1594 """
1595 if self._exception_handler is None:
1596 try:
1597 self.default_exception_handler(context)
1598 except Exception:
1599 # Second protection layer for unexpected errors
1600 # in the default implementation, as well as for subclassed
1601 # event loops with overloaded "default_exception_handler".
1602 logger.error('Exception in default exception handler',
1603 exc_info=True)
1604 else:
1605 try:
1606 self._exception_handler(self, context)
1607 except Exception as exc:
1608 # Exception in the user set custom exception handler.
1609 try:
1610 # Let's try default handler.
1611 self.default_exception_handler({
1612 'message': 'Unhandled error in exception handler',
1613 'exception': exc,
1614 'context': context,
1615 })
1616 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001617 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001618 # overloaded.
1619 logger.error('Exception in default exception handler '
1620 'while handling an unexpected error '
1621 'in custom exception handler',
1622 exc_info=True)
1623
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001624 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001625 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001626 assert isinstance(handle, events.Handle), 'A Handle is required here'
1627 if handle._cancelled:
1628 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001629 assert not isinstance(handle, events.TimerHandle)
1630 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001631
1632 def _add_callback_signalsafe(self, handle):
1633 """Like _add_callback() but called from a signal handler."""
1634 self._add_callback(handle)
1635 self._write_to_self()
1636
Yury Selivanov592ada92014-09-25 12:07:56 -04001637 def _timer_handle_cancelled(self, handle):
1638 """Notification that a TimerHandle has been cancelled."""
1639 if handle._scheduled:
1640 self._timer_cancelled_count += 1
1641
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001642 def _run_once(self):
1643 """Run one full iteration of the event loop.
1644
1645 This calls all currently ready callbacks, polls for I/O,
1646 schedules the resulting callbacks, and finally schedules
1647 'call_later' callbacks.
1648 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001649
Yury Selivanov592ada92014-09-25 12:07:56 -04001650 sched_count = len(self._scheduled)
1651 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1652 self._timer_cancelled_count / sched_count >
1653 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001654 # Remove delayed calls that were cancelled if their number
1655 # is too high
1656 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001657 for handle in self._scheduled:
1658 if handle._cancelled:
1659 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001660 else:
1661 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001662
Victor Stinner68da8fc2014-09-30 18:08:36 +02001663 heapq.heapify(new_scheduled)
1664 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001665 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001666 else:
1667 # Remove delayed calls that were cancelled from head of queue.
1668 while self._scheduled and self._scheduled[0]._cancelled:
1669 self._timer_cancelled_count -= 1
1670 handle = heapq.heappop(self._scheduled)
1671 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001672
1673 timeout = None
Guido van Rossum41f69f42015-11-19 13:28:47 -08001674 if self._ready or self._stopping:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001675 timeout = 0
1676 elif self._scheduled:
1677 # Compute the desired timeout.
1678 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001679 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001680
Victor Stinner770e48d2014-07-11 11:58:33 +02001681 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001682 t0 = self.time()
1683 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001684 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001685 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001686 level = logging.INFO
1687 else:
1688 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001689 nevent = len(event_list)
1690 if timeout is None:
1691 logger.log(level, 'poll took %.3f ms: %s events',
1692 dt * 1e3, nevent)
1693 elif nevent:
1694 logger.log(level,
1695 'poll %.3f ms took %.3f ms: %s events',
1696 timeout * 1e3, dt * 1e3, nevent)
1697 elif dt >= 1.0:
1698 logger.log(level,
1699 'poll %.3f ms took %.3f ms: timeout',
1700 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001701 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001702 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001703 self._process_events(event_list)
1704
1705 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001706 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001707 while self._scheduled:
1708 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001709 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001710 break
1711 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001712 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001713 self._ready.append(handle)
1714
1715 # This is the only place where callbacks are actually *called*.
1716 # All other places just add them to ready.
1717 # Note: We run all currently scheduled callbacks, but not any
1718 # callbacks scheduled by callbacks run this time around --
1719 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001720 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001721 ntodo = len(self._ready)
1722 for i in range(ntodo):
1723 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001724 if handle._cancelled:
1725 continue
1726 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001727 try:
1728 self._current_handle = handle
1729 t0 = self.time()
1730 handle._run()
1731 dt = self.time() - t0
1732 if dt >= self.slow_callback_duration:
1733 logger.warning('Executing %s took %.3f seconds',
1734 _format_handle(handle), dt)
1735 finally:
1736 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001737 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001738 handle._run()
1739 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001740
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001741 def _set_coroutine_origin_tracking(self, enabled):
1742 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Yury Selivanove8944cb2015-05-12 11:43:04 -04001743 return
1744
Yury Selivanove8944cb2015-05-12 11:43:04 -04001745 if enabled:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001746 self._coroutine_origin_tracking_saved_depth = (
1747 sys.get_coroutine_origin_tracking_depth())
1748 sys.set_coroutine_origin_tracking_depth(
1749 constants.DEBUG_STACK_DEPTH)
Yury Selivanove8944cb2015-05-12 11:43:04 -04001750 else:
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001751 sys.set_coroutine_origin_tracking_depth(
1752 self._coroutine_origin_tracking_saved_depth)
1753
1754 self._coroutine_origin_tracking_enabled = enabled
Yury Selivanove8944cb2015-05-12 11:43:04 -04001755
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001756 def get_debug(self):
1757 return self._debug
1758
1759 def set_debug(self, enabled):
1760 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001761
Yury Selivanove8944cb2015-05-12 11:43:04 -04001762 if self.is_running():
Nathaniel J. Smithfc2f4072018-01-21 06:44:07 -08001763 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)