blob: 7ebc3cb4f3e86afe1cfd056d7d3297154232301b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
3__all__ = ['AbstractEventLoopPolicy', 'DefaultEventLoopPolicy',
4 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
8 ]
9
10import subprocess
11import sys
12import threading
13import socket
14
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070015from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17
18class Handle:
19 """Object returned by callback registration methods."""
20
21 def __init__(self, callback, args):
22 self._callback = callback
23 self._args = args
24 self._cancelled = False
25
26 def __repr__(self):
27 res = 'Handle({}, {})'.format(self._callback, self._args)
28 if self._cancelled:
29 res += '<cancelled>'
30 return res
31
32 def cancel(self):
33 self._cancelled = True
34
35 def _run(self):
36 try:
37 self._callback(*self._args)
38 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070039 logger.exception('Exception in callback %s %r',
40 self._callback, self._args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041 self = None # Needed to break cycles when an exception occurs.
42
43
44def make_handle(callback, args):
Guido van Rossumec7922c2013-10-30 14:38:05 -070045 # TODO: Inline this? Or make it a private EventLoop method?
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 assert not isinstance(callback, Handle), 'A Handle is not a callback'
47 return Handle(callback, args)
48
49
50class TimerHandle(Handle):
51 """Object returned by timed callback registration methods."""
52
53 def __init__(self, when, callback, args):
54 assert when is not None
55 super().__init__(callback, args)
56
57 self._when = when
58
59 def __repr__(self):
60 res = 'TimerHandle({}, {}, {})'.format(self._when,
61 self._callback,
62 self._args)
63 if self._cancelled:
64 res += '<cancelled>'
65
66 return res
67
68 def __hash__(self):
69 return hash(self._when)
70
71 def __lt__(self, other):
72 return self._when < other._when
73
74 def __le__(self, other):
75 if self._when < other._when:
76 return True
77 return self.__eq__(other)
78
79 def __gt__(self, other):
80 return self._when > other._when
81
82 def __ge__(self, other):
83 if self._when > other._when:
84 return True
85 return self.__eq__(other)
86
87 def __eq__(self, other):
88 if isinstance(other, TimerHandle):
89 return (self._when == other._when and
90 self._callback == other._callback and
91 self._args == other._args and
92 self._cancelled == other._cancelled)
93 return NotImplemented
94
95 def __ne__(self, other):
96 equal = self.__eq__(other)
97 return NotImplemented if equal is NotImplemented else not equal
98
99
100class AbstractServer:
101 """Abstract server returned by create_service()."""
102
103 def close(self):
104 """Stop serving. This leaves existing connections open."""
105 return NotImplemented
106
107 def wait_closed(self):
108 """Coroutine to wait until service is closed."""
109 return NotImplemented
110
111
112class AbstractEventLoop:
113 """Abstract event loop."""
114
115 # Running and stopping the event loop.
116
117 def run_forever(self):
118 """Run the event loop until stop() is called."""
119 raise NotImplementedError
120
121 def run_until_complete(self, future):
122 """Run the event loop until a Future is done.
123
124 Return the Future's result, or raise its exception.
125 """
126 raise NotImplementedError
127
128 def stop(self):
129 """Stop the event loop as soon as reasonable.
130
131 Exactly how soon that is may depend on the implementation, but
132 no more I/O callbacks should be scheduled.
133 """
134 raise NotImplementedError
135
136 def is_running(self):
137 """Return whether the event loop is currently running."""
138 raise NotImplementedError
139
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700140 def close(self):
141 """Close the loop.
142
143 The loop should not be running.
144
145 This is idempotent and irreversible.
146
147 No other methods should be called after this one.
148 """
149 raise NotImplementedError
150
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 # Methods scheduling callbacks. All these return Handles.
152
153 def call_soon(self, callback, *args):
154 return self.call_later(0, callback, *args)
155
156 def call_later(self, delay, callback, *args):
157 raise NotImplementedError
158
159 def call_at(self, when, callback, *args):
160 raise NotImplementedError
161
162 def time(self):
163 raise NotImplementedError
164
165 # Methods for interacting with threads.
166
167 def call_soon_threadsafe(self, callback, *args):
168 raise NotImplementedError
169
170 def run_in_executor(self, executor, callback, *args):
171 raise NotImplementedError
172
173 def set_default_executor(self, executor):
174 raise NotImplementedError
175
176 # Network I/O methods returning Futures.
177
178 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
179 raise NotImplementedError
180
181 def getnameinfo(self, sockaddr, flags=0):
182 raise NotImplementedError
183
184 def create_connection(self, protocol_factory, host=None, port=None, *,
185 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700186 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 raise NotImplementedError
188
189 def create_server(self, protocol_factory, host=None, port=None, *,
190 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
191 sock=None, backlog=100, ssl=None, reuse_address=None):
192 """A coroutine which creates a TCP server bound to host and port.
193
194 The return value is a Server object which can be used to stop
195 the service.
196
197 If host is an empty string or None all interfaces are assumed
198 and a list of multiple sockets will be returned (most likely
199 one for IPv4 and another one for IPv6).
200
201 family can be set to either AF_INET or AF_INET6 to force the
202 socket to use IPv4 or IPv6. If not set it will be determined
203 from host (defaults to AF_UNSPEC).
204
205 flags is a bitmask for getaddrinfo().
206
207 sock can optionally be specified in order to use a preexisting
208 socket object.
209
210 backlog is the maximum number of queued connections passed to
211 listen() (defaults to 100).
212
213 ssl can be set to an SSLContext to enable SSL over the
214 accepted connections.
215
216 reuse_address tells the kernel to reuse a local socket in
217 TIME_WAIT state, without waiting for its natural timeout to
218 expire. If not specified will automatically be set to True on
219 UNIX.
220 """
221 raise NotImplementedError
222
223 def create_datagram_endpoint(self, protocol_factory,
224 local_addr=None, remote_addr=None, *,
225 family=0, proto=0, flags=0):
226 raise NotImplementedError
227
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700228 # Pipes and subprocesses.
229
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 def connect_read_pipe(self, protocol_factory, pipe):
231 """Register read pipe in eventloop.
232
233 protocol_factory should instantiate object with Protocol interface.
234 pipe is file-like object already switched to nonblocking.
235 Return pair (transport, protocol), where transport support
236 ReadTransport ABC"""
237 # The reason to accept file-like object instead of just file descriptor
238 # is: we need to own pipe and close it at transport finishing
239 # Can got complicated errors if pass f.fileno(),
240 # close fd in pipe transport then close f and vise versa.
241 raise NotImplementedError
242
243 def connect_write_pipe(self, protocol_factory, pipe):
244 """Register write pipe in eventloop.
245
246 protocol_factory should instantiate object with BaseProtocol interface.
247 Pipe is file-like object already switched to nonblocking.
248 Return pair (transport, protocol), where transport support
249 WriteTransport ABC"""
250 # The reason to accept file-like object instead of just file descriptor
251 # is: we need to own pipe and close it at transport finishing
252 # Can got complicated errors if pass f.fileno(),
253 # close fd in pipe transport then close f and vise versa.
254 raise NotImplementedError
255
256 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
257 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
258 **kwargs):
259 raise NotImplementedError
260
261 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
262 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
263 **kwargs):
264 raise NotImplementedError
265
266 # Ready-based callback registration methods.
267 # The add_*() methods return None.
268 # The remove_*() methods return True if something was removed,
269 # False if there was nothing to delete.
270
271 def add_reader(self, fd, callback, *args):
272 raise NotImplementedError
273
274 def remove_reader(self, fd):
275 raise NotImplementedError
276
277 def add_writer(self, fd, callback, *args):
278 raise NotImplementedError
279
280 def remove_writer(self, fd):
281 raise NotImplementedError
282
283 # Completion based I/O methods returning Futures.
284
285 def sock_recv(self, sock, nbytes):
286 raise NotImplementedError
287
288 def sock_sendall(self, sock, data):
289 raise NotImplementedError
290
291 def sock_connect(self, sock, address):
292 raise NotImplementedError
293
294 def sock_accept(self, sock):
295 raise NotImplementedError
296
297 # Signal handling.
298
299 def add_signal_handler(self, sig, callback, *args):
300 raise NotImplementedError
301
302 def remove_signal_handler(self, sig):
303 raise NotImplementedError
304
305
306class AbstractEventLoopPolicy:
307 """Abstract policy for accessing the event loop."""
308
309 def get_event_loop(self):
310 """XXX"""
311 raise NotImplementedError
312
313 def set_event_loop(self, loop):
314 """XXX"""
315 raise NotImplementedError
316
317 def new_event_loop(self):
318 """XXX"""
319 raise NotImplementedError
320
321
322class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy):
323 """Default policy implementation for accessing the event loop.
324
325 In this policy, each thread has its own event loop. However, we
326 only automatically create an event loop by default for the main
327 thread; other threads by default have no event loop.
328
329 Other policies may have different rules (e.g. a single global
330 event loop, or automatically creating an event loop per thread, or
331 using some other notion of context to which an event loop is
332 associated).
333 """
334
335 _loop = None
336 _set_called = False
337
338 def get_event_loop(self):
339 """Get the event loop.
340
341 This may be None or an instance of EventLoop.
342 """
343 if (self._loop is None and
344 not self._set_called and
345 isinstance(threading.current_thread(), threading._MainThread)):
346 self._loop = self.new_event_loop()
347 assert self._loop is not None, \
348 ('There is no current event loop in thread %r.' %
349 threading.current_thread().name)
350 return self._loop
351
352 def set_event_loop(self, loop):
353 """Set the event loop."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 self._set_called = True
355 assert loop is None or isinstance(loop, AbstractEventLoop)
356 self._loop = loop
357
358 def new_event_loop(self):
359 """Create a new event loop.
360
361 You must call set_event_loop() to make this the current event
362 loop.
363 """
364 if sys.platform == 'win32': # pragma: no cover
365 from . import windows_events
366 return windows_events.SelectorEventLoop()
367 else: # pragma: no cover
368 from . import unix_events
369 return unix_events.SelectorEventLoop()
370
371
372# Event loop policy. The policy itself is always global, even if the
373# policy's rules say that there is an event loop per thread (or other
374# notion of context). The default policy is installed by the first
375# call to get_event_loop_policy().
376_event_loop_policy = None
377
378
379def get_event_loop_policy():
380 """XXX"""
381 global _event_loop_policy
382 if _event_loop_policy is None:
383 _event_loop_policy = DefaultEventLoopPolicy()
384 return _event_loop_policy
385
386
387def set_event_loop_policy(policy):
388 """XXX"""
389 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
391 _event_loop_policy = policy
392
393
394def get_event_loop():
395 """XXX"""
396 return get_event_loop_policy().get_event_loop()
397
398
399def set_event_loop(loop):
400 """XXX"""
401 get_event_loop_policy().set_event_loop(loop)
402
403
404def new_event_loop():
405 """XXX"""
406 return get_event_loop_policy().new_event_loop()