blob: a47253a699c64f618012f0d2db9d3fb722cdc2af [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
3__all__ = ['AbstractEventLoopPolicy', 'DefaultEventLoopPolicy',
4 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
8 ]
9
10import subprocess
11import sys
12import threading
13import socket
14
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070015from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17
18class Handle:
19 """Object returned by callback registration methods."""
20
21 def __init__(self, callback, args):
22 self._callback = callback
23 self._args = args
24 self._cancelled = False
25
26 def __repr__(self):
27 res = 'Handle({}, {})'.format(self._callback, self._args)
28 if self._cancelled:
29 res += '<cancelled>'
30 return res
31
32 def cancel(self):
33 self._cancelled = True
34
35 def _run(self):
36 try:
37 self._callback(*self._args)
38 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070039 logger.exception('Exception in callback %s %r',
40 self._callback, self._args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041 self = None # Needed to break cycles when an exception occurs.
42
43
44def make_handle(callback, args):
Guido van Rossumec7922c2013-10-30 14:38:05 -070045 # TODO: Inline this? Or make it a private EventLoop method?
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 assert not isinstance(callback, Handle), 'A Handle is not a callback'
47 return Handle(callback, args)
48
49
50class TimerHandle(Handle):
51 """Object returned by timed callback registration methods."""
52
53 def __init__(self, when, callback, args):
54 assert when is not None
55 super().__init__(callback, args)
56
57 self._when = when
58
59 def __repr__(self):
60 res = 'TimerHandle({}, {}, {})'.format(self._when,
61 self._callback,
62 self._args)
63 if self._cancelled:
64 res += '<cancelled>'
65
66 return res
67
68 def __hash__(self):
69 return hash(self._when)
70
71 def __lt__(self, other):
72 return self._when < other._when
73
74 def __le__(self, other):
75 if self._when < other._when:
76 return True
77 return self.__eq__(other)
78
79 def __gt__(self, other):
80 return self._when > other._when
81
82 def __ge__(self, other):
83 if self._when > other._when:
84 return True
85 return self.__eq__(other)
86
87 def __eq__(self, other):
88 if isinstance(other, TimerHandle):
89 return (self._when == other._when and
90 self._callback == other._callback and
91 self._args == other._args and
92 self._cancelled == other._cancelled)
93 return NotImplemented
94
95 def __ne__(self, other):
96 equal = self.__eq__(other)
97 return NotImplemented if equal is NotImplemented else not equal
98
99
100class AbstractServer:
101 """Abstract server returned by create_service()."""
102
103 def close(self):
104 """Stop serving. This leaves existing connections open."""
105 return NotImplemented
106
107 def wait_closed(self):
108 """Coroutine to wait until service is closed."""
109 return NotImplemented
110
111
112class AbstractEventLoop:
113 """Abstract event loop."""
114
115 # Running and stopping the event loop.
116
117 def run_forever(self):
118 """Run the event loop until stop() is called."""
119 raise NotImplementedError
120
121 def run_until_complete(self, future):
122 """Run the event loop until a Future is done.
123
124 Return the Future's result, or raise its exception.
125 """
126 raise NotImplementedError
127
128 def stop(self):
129 """Stop the event loop as soon as reasonable.
130
131 Exactly how soon that is may depend on the implementation, but
132 no more I/O callbacks should be scheduled.
133 """
134 raise NotImplementedError
135
136 def is_running(self):
137 """Return whether the event loop is currently running."""
138 raise NotImplementedError
139
140 # Methods scheduling callbacks. All these return Handles.
141
142 def call_soon(self, callback, *args):
143 return self.call_later(0, callback, *args)
144
145 def call_later(self, delay, callback, *args):
146 raise NotImplementedError
147
148 def call_at(self, when, callback, *args):
149 raise NotImplementedError
150
151 def time(self):
152 raise NotImplementedError
153
154 # Methods for interacting with threads.
155
156 def call_soon_threadsafe(self, callback, *args):
157 raise NotImplementedError
158
159 def run_in_executor(self, executor, callback, *args):
160 raise NotImplementedError
161
162 def set_default_executor(self, executor):
163 raise NotImplementedError
164
165 # Network I/O methods returning Futures.
166
167 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
168 raise NotImplementedError
169
170 def getnameinfo(self, sockaddr, flags=0):
171 raise NotImplementedError
172
173 def create_connection(self, protocol_factory, host=None, port=None, *,
174 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700175 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 raise NotImplementedError
177
178 def create_server(self, protocol_factory, host=None, port=None, *,
179 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
180 sock=None, backlog=100, ssl=None, reuse_address=None):
181 """A coroutine which creates a TCP server bound to host and port.
182
183 The return value is a Server object which can be used to stop
184 the service.
185
186 If host is an empty string or None all interfaces are assumed
187 and a list of multiple sockets will be returned (most likely
188 one for IPv4 and another one for IPv6).
189
190 family can be set to either AF_INET or AF_INET6 to force the
191 socket to use IPv4 or IPv6. If not set it will be determined
192 from host (defaults to AF_UNSPEC).
193
194 flags is a bitmask for getaddrinfo().
195
196 sock can optionally be specified in order to use a preexisting
197 socket object.
198
199 backlog is the maximum number of queued connections passed to
200 listen() (defaults to 100).
201
202 ssl can be set to an SSLContext to enable SSL over the
203 accepted connections.
204
205 reuse_address tells the kernel to reuse a local socket in
206 TIME_WAIT state, without waiting for its natural timeout to
207 expire. If not specified will automatically be set to True on
208 UNIX.
209 """
210 raise NotImplementedError
211
212 def create_datagram_endpoint(self, protocol_factory,
213 local_addr=None, remote_addr=None, *,
214 family=0, proto=0, flags=0):
215 raise NotImplementedError
216
217 def connect_read_pipe(self, protocol_factory, pipe):
218 """Register read pipe in eventloop.
219
220 protocol_factory should instantiate object with Protocol interface.
221 pipe is file-like object already switched to nonblocking.
222 Return pair (transport, protocol), where transport support
223 ReadTransport ABC"""
224 # The reason to accept file-like object instead of just file descriptor
225 # is: we need to own pipe and close it at transport finishing
226 # Can got complicated errors if pass f.fileno(),
227 # close fd in pipe transport then close f and vise versa.
228 raise NotImplementedError
229
230 def connect_write_pipe(self, protocol_factory, pipe):
231 """Register write pipe in eventloop.
232
233 protocol_factory should instantiate object with BaseProtocol interface.
234 Pipe is file-like object already switched to nonblocking.
235 Return pair (transport, protocol), where transport support
236 WriteTransport ABC"""
237 # The reason to accept file-like object instead of just file descriptor
238 # is: we need to own pipe and close it at transport finishing
239 # Can got complicated errors if pass f.fileno(),
240 # close fd in pipe transport then close f and vise versa.
241 raise NotImplementedError
242
243 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
244 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
245 **kwargs):
246 raise NotImplementedError
247
248 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
249 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
250 **kwargs):
251 raise NotImplementedError
252
253 # Ready-based callback registration methods.
254 # The add_*() methods return None.
255 # The remove_*() methods return True if something was removed,
256 # False if there was nothing to delete.
257
258 def add_reader(self, fd, callback, *args):
259 raise NotImplementedError
260
261 def remove_reader(self, fd):
262 raise NotImplementedError
263
264 def add_writer(self, fd, callback, *args):
265 raise NotImplementedError
266
267 def remove_writer(self, fd):
268 raise NotImplementedError
269
270 # Completion based I/O methods returning Futures.
271
272 def sock_recv(self, sock, nbytes):
273 raise NotImplementedError
274
275 def sock_sendall(self, sock, data):
276 raise NotImplementedError
277
278 def sock_connect(self, sock, address):
279 raise NotImplementedError
280
281 def sock_accept(self, sock):
282 raise NotImplementedError
283
284 # Signal handling.
285
286 def add_signal_handler(self, sig, callback, *args):
287 raise NotImplementedError
288
289 def remove_signal_handler(self, sig):
290 raise NotImplementedError
291
292
293class AbstractEventLoopPolicy:
294 """Abstract policy for accessing the event loop."""
295
296 def get_event_loop(self):
297 """XXX"""
298 raise NotImplementedError
299
300 def set_event_loop(self, loop):
301 """XXX"""
302 raise NotImplementedError
303
304 def new_event_loop(self):
305 """XXX"""
306 raise NotImplementedError
307
308
309class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy):
310 """Default policy implementation for accessing the event loop.
311
312 In this policy, each thread has its own event loop. However, we
313 only automatically create an event loop by default for the main
314 thread; other threads by default have no event loop.
315
316 Other policies may have different rules (e.g. a single global
317 event loop, or automatically creating an event loop per thread, or
318 using some other notion of context to which an event loop is
319 associated).
320 """
321
322 _loop = None
323 _set_called = False
324
325 def get_event_loop(self):
326 """Get the event loop.
327
328 This may be None or an instance of EventLoop.
329 """
330 if (self._loop is None and
331 not self._set_called and
332 isinstance(threading.current_thread(), threading._MainThread)):
333 self._loop = self.new_event_loop()
334 assert self._loop is not None, \
335 ('There is no current event loop in thread %r.' %
336 threading.current_thread().name)
337 return self._loop
338
339 def set_event_loop(self, loop):
340 """Set the event loop."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 self._set_called = True
342 assert loop is None or isinstance(loop, AbstractEventLoop)
343 self._loop = loop
344
345 def new_event_loop(self):
346 """Create a new event loop.
347
348 You must call set_event_loop() to make this the current event
349 loop.
350 """
351 if sys.platform == 'win32': # pragma: no cover
352 from . import windows_events
353 return windows_events.SelectorEventLoop()
354 else: # pragma: no cover
355 from . import unix_events
356 return unix_events.SelectorEventLoop()
357
358
359# Event loop policy. The policy itself is always global, even if the
360# policy's rules say that there is an event loop per thread (or other
361# notion of context). The default policy is installed by the first
362# call to get_event_loop_policy().
363_event_loop_policy = None
364
365
366def get_event_loop_policy():
367 """XXX"""
368 global _event_loop_policy
369 if _event_loop_policy is None:
370 _event_loop_policy = DefaultEventLoopPolicy()
371 return _event_loop_policy
372
373
374def set_event_loop_policy(policy):
375 """XXX"""
376 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
378 _event_loop_policy = policy
379
380
381def get_event_loop():
382 """XXX"""
383 return get_event_loop_policy().get_event_loop()
384
385
386def set_event_loop(loop):
387 """XXX"""
388 get_event_loop_policy().set_event_loop(loop)
389
390
391def new_event_loop():
392 """XXX"""
393 return get_event_loop_policy().new_event_loop()