asyncio: sync with Tulip
* PipeServer.close() now cancels the "accept pipe" future which cancels the
overlapped operation.
* Fix _SelectorTransport.__repr__() if the transport was closed
* Fix debug log in BaseEventLoop.create_connection(): get the socket object
from the transport because SSL transport closes the old socket and creates a
new SSL socket object. Remove also the _SelectorSslTransport._rawsock
attribute: it contained the closed socket (not very useful) and it was not
used.
* Issue #22063: socket operations (sock_recv, sock_sendall, sock_connect,
sock_accept) of the proactor event loop don't raise an exception in debug
mode if the socket are in blocking mode. Overlapped operations also work on
blocking sockets.
* Fix unit tests in debug mode: mock a non-blocking socket for socket
operations which now raise an exception if the socket is blocking.
* _fatal_error() method of _UnixReadPipeTransport and _UnixWritePipeTransport
now log all exceptions in debug mode
* Don't log expected errors in unit tests
* Tulip issue 200: _WaitHandleFuture._unregister_wait() now catchs and logs
exceptions.
* Tulip issue 200: Log errors in debug mode instead of simply ignoring them.
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index ec427d5..6763f0b 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -111,10 +111,17 @@
return
try:
_overlapped.UnregisterWait(self._wait_handle)
- except OSError as e:
- if e.winerror != _overlapped.ERROR_IO_PENDING:
- raise
+ except OSError as exc:
# ERROR_IO_PENDING is not an error, the wait was unregistered
+ if exc.winerror != _overlapped.ERROR_IO_PENDING:
+ context = {
+ 'message': 'Failed to unregister the wait handle',
+ 'exception': exc,
+ 'future': self,
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
self._wait_handle = None
self._iocp = None
self._ov = None
@@ -145,6 +152,11 @@
def __init__(self, address):
self._address = address
self._free_instances = weakref.WeakSet()
+ # initialize the pipe attribute before calling _server_pipe_handle()
+ # because this function can raise an exception and the destructor calls
+ # the close() method
+ self._pipe = None
+ self._accept_pipe_future = None
self._pipe = self._server_pipe_handle(True)
def _get_unconnected_pipe(self):
@@ -174,6 +186,9 @@
return pipe
def close(self):
+ if self._accept_pipe_future is not None:
+ self._accept_pipe_future.cancel()
+ self._accept_pipe_future = None
# Close all instances which have not been connected to by a client.
if self._address is not None:
for pipe in self._free_instances:
@@ -216,7 +231,7 @@
def start_serving_pipe(self, protocol_factory, address):
server = PipeServer(address)
- def loop(f=None):
+ def loop_accept_pipe(f=None):
pipe = None
try:
if f:
@@ -237,13 +252,17 @@
'pipe': pipe,
})
pipe.close()
+ elif self._debug:
+ logger.warning("Accept pipe failed on pipe %r",
+ pipe, exc_info=True)
except futures.CancelledError:
if pipe:
pipe.close()
else:
- f.add_done_callback(loop)
+ server._accept_pipe_future = f
+ f.add_done_callback(loop_accept_pipe)
- self.call_soon(loop)
+ self.call_soon(loop_accept_pipe)
return [server]
@coroutine