blob: 348de033b2e29440fe24273bfaf60614664c23a6 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer. Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
7import socket
8
9from . import base_events
10from . import constants
11from . import futures
12from . import transports
13from .log import asyncio_log
14
15
16class _ProactorBasePipeTransport(transports.BaseTransport):
17 """Base class for pipe and socket transports."""
18
19 def __init__(self, loop, sock, protocol, waiter=None,
20 extra=None, server=None):
21 super().__init__(extra)
22 self._set_extra(sock)
23 self._loop = loop
24 self._sock = sock
25 self._protocol = protocol
26 self._server = server
27 self._buffer = []
28 self._read_fut = None
29 self._write_fut = None
30 self._conn_lost = 0
31 self._closing = False # Set when close() called.
32 self._eof_written = False
33 if self._server is not None:
34 self._server.attach(self)
35 self._loop.call_soon(self._protocol.connection_made, self)
36 if waiter is not None:
37 self._loop.call_soon(waiter.set_result, None)
38
39 def _set_extra(self, sock):
40 self._extra['pipe'] = sock
41
42 def close(self):
43 if self._closing:
44 return
45 self._closing = True
46 self._conn_lost += 1
47 if not self._buffer and self._write_fut is None:
48 self._loop.call_soon(self._call_connection_lost, None)
49 if self._read_fut is not None:
50 self._read_fut.cancel()
51
52 def _fatal_error(self, exc):
53 asyncio_log.exception('Fatal error for %s', self)
54 self._force_close(exc)
55
56 def _force_close(self, exc):
57 if self._closing:
58 return
59 self._closing = True
60 self._conn_lost += 1
61 if self._write_fut:
62 self._write_fut.cancel()
63 if self._read_fut:
64 self._read_fut.cancel()
65 self._write_fut = self._read_fut = None
66 self._buffer = []
67 self._loop.call_soon(self._call_connection_lost, exc)
68
69 def _call_connection_lost(self, exc):
70 try:
71 self._protocol.connection_lost(exc)
72 finally:
73 # XXX If there is a pending overlapped read on the other
74 # end then it may fail with ERROR_NETNAME_DELETED if we
75 # just close our end. First calling shutdown() seems to
76 # cure it, but maybe using DisconnectEx() would be better.
77 if hasattr(self._sock, 'shutdown'):
78 self._sock.shutdown(socket.SHUT_RDWR)
79 self._sock.close()
80 server = self._server
81 if server is not None:
82 server.detach(self)
83 self._server = None
84
85
86class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
87 transports.ReadTransport):
88 """Transport for read pipes."""
89
90 def __init__(self, loop, sock, protocol, waiter=None,
91 extra=None, server=None):
92 super().__init__(loop, sock, protocol, waiter, extra, server)
93 self._read_fut = None
94 self._paused = False
95 self._loop.call_soon(self._loop_reading)
96
97 def pause(self):
98 assert not self._closing, 'Cannot pause() when closing'
99 assert not self._paused, 'Already paused'
100 self._paused = True
101
102 def resume(self):
103 assert self._paused, 'Not paused'
104 self._paused = False
105 if self._closing:
106 return
107 self._loop.call_soon(self._loop_reading, self._read_fut)
108
109 def _loop_reading(self, fut=None):
110 if self._paused:
111 return
112 data = None
113
114 try:
115 if fut is not None:
116 assert self._read_fut is fut or (self._read_fut is None and
117 self._closing)
118 self._read_fut = None
119 data = fut.result() # deliver data later in "finally" clause
120
121 if self._closing:
122 # since close() has been called we ignore any read data
123 data = None
124 return
125
126 if data == b'':
127 # we got end-of-file so no need to reschedule a new read
128 return
129
130 # reschedule a new read
131 self._read_fut = self._loop._proactor.recv(self._sock, 4096)
132 except ConnectionAbortedError as exc:
133 if not self._closing:
134 self._fatal_error(exc)
135 except ConnectionResetError as exc:
136 self._force_close(exc)
137 except OSError as exc:
138 self._fatal_error(exc)
139 except futures.CancelledError:
140 if not self._closing:
141 raise
142 else:
143 self._read_fut.add_done_callback(self._loop_reading)
144 finally:
145 if data:
146 self._protocol.data_received(data)
147 elif data is not None:
148 keep_open = self._protocol.eof_received()
149 if not keep_open:
150 self.close()
151
152
153class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
154 transports.WriteTransport):
155 """Transport for write pipes."""
156
157 def write(self, data):
158 assert isinstance(data, bytes), repr(data)
159 if self._eof_written:
160 raise IOError('write_eof() already called')
161
162 if not data:
163 return
164
165 if self._conn_lost:
166 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
167 asyncio_log.warning('socket.send() raised exception.')
168 self._conn_lost += 1
169 return
170 self._buffer.append(data)
171 if self._write_fut is None:
172 self._loop_writing()
173
174 def _loop_writing(self, f=None):
175 try:
176 assert f is self._write_fut
177 self._write_fut = None
178 if f:
179 f.result()
180 data = b''.join(self._buffer)
181 self._buffer = []
182 if not data:
183 if self._closing:
184 self._loop.call_soon(self._call_connection_lost, None)
185 if self._eof_written:
186 self._sock.shutdown(socket.SHUT_WR)
187 return
188 self._write_fut = self._loop._proactor.send(self._sock, data)
189 self._write_fut.add_done_callback(self._loop_writing)
190 except ConnectionResetError as exc:
191 self._force_close(exc)
192 except OSError as exc:
193 self._fatal_error(exc)
194
195 def can_write_eof(self):
196 return True
197
198 def write_eof(self):
199 self.close()
200
201 def abort(self):
202 self._force_close(None)
203
204
205class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
206 _ProactorWritePipeTransport,
207 transports.Transport):
208 """Transport for duplex pipes."""
209
210 def can_write_eof(self):
211 return False
212
213 def write_eof(self):
214 raise NotImplementedError
215
216
217class _ProactorSocketTransport(_ProactorReadPipeTransport,
218 _ProactorWritePipeTransport,
219 transports.Transport):
220 """Transport for connected sockets."""
221
222 def _set_extra(self, sock):
223 self._extra['socket'] = sock
224 try:
225 self._extra['sockname'] = sock.getsockname()
226 except (socket.error, AttributeError):
227 pass
228 if 'peername' not in self._extra:
229 try:
230 self._extra['peername'] = sock.getpeername()
231 except (socket.error, AttributeError):
232 pass
233
234 def can_write_eof(self):
235 return True
236
237 def write_eof(self):
238 if self._closing or self._eof_written:
239 return
240 self._eof_written = True
241 if self._write_fut is None:
242 self._sock.shutdown(socket.SHUT_WR)
243
244
245class BaseProactorEventLoop(base_events.BaseEventLoop):
246
247 def __init__(self, proactor):
248 super().__init__()
249 asyncio_log.debug('Using proactor: %s', proactor.__class__.__name__)
250 self._proactor = proactor
251 self._selector = proactor # convenient alias
252 proactor.set_loop(self)
253 self._make_self_pipe()
254
255 def _make_socket_transport(self, sock, protocol, waiter=None,
256 extra=None, server=None):
257 return _ProactorSocketTransport(self, sock, protocol, waiter,
258 extra, server)
259
260 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
261 extra=None):
262 return _ProactorDuplexPipeTransport(self,
263 sock, protocol, waiter, extra)
264
265 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
266 extra=None):
267 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
268
269 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
270 extra=None):
271 return _ProactorWritePipeTransport(self, sock, protocol, waiter, extra)
272
273 def close(self):
274 if self._proactor is not None:
275 self._close_self_pipe()
276 self._proactor.close()
277 self._proactor = None
278 self._selector = None
279
280 def sock_recv(self, sock, n):
281 return self._proactor.recv(sock, n)
282
283 def sock_sendall(self, sock, data):
284 return self._proactor.send(sock, data)
285
286 def sock_connect(self, sock, address):
287 return self._proactor.connect(sock, address)
288
289 def sock_accept(self, sock):
290 return self._proactor.accept(sock)
291
292 def _socketpair(self):
293 raise NotImplementedError
294
295 def _close_self_pipe(self):
296 self._ssock.close()
297 self._ssock = None
298 self._csock.close()
299 self._csock = None
300 self._internal_fds -= 1
301
302 def _make_self_pipe(self):
303 # A self-socket, really. :-)
304 self._ssock, self._csock = self._socketpair()
305 self._ssock.setblocking(False)
306 self._csock.setblocking(False)
307 self._internal_fds += 1
308 self.call_soon(self._loop_self_reading)
309
310 def _loop_self_reading(self, f=None):
311 try:
312 if f is not None:
313 f.result() # may raise
314 f = self._proactor.recv(self._ssock, 4096)
315 except:
316 self.close()
317 raise
318 else:
319 f.add_done_callback(self._loop_self_reading)
320
321 def _write_to_self(self):
322 self._csock.send(b'x')
323
324 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
325 assert not ssl, 'IocpEventLoop is incompatible with SSL.'
326
327 def loop(f=None):
328 try:
329 if f is not None:
330 conn, addr = f.result()
331 protocol = protocol_factory()
332 self._make_socket_transport(
333 conn, protocol,
334 extra={'peername': addr}, server=server)
335 f = self._proactor.accept(sock)
336 except OSError:
337 if sock.fileno() != -1:
338 asyncio_log.exception('Accept failed')
339 sock.close()
340 except futures.CancelledError:
341 sock.close()
342 else:
343 f.add_done_callback(loop)
344
345 self.call_soon(loop)
346
347 def _process_events(self, event_list):
348 pass # XXX hard work currently done in poll
349
350 def _stop_serving(self, sock):
351 self._proactor._stop_serving(sock)
352 sock.close()