blob: 4c0cbb091768bccd62f13c33e65d97f6c8e7ea8f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
11import subprocess
12import sys
13import threading
14import socket
15
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070016from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
19class Handle:
20 """Object returned by callback registration methods."""
21
22 def __init__(self, callback, args):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010023 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024 self._callback = callback
25 self._args = args
26 self._cancelled = False
27
28 def __repr__(self):
29 res = 'Handle({}, {})'.format(self._callback, self._args)
30 if self._cancelled:
31 res += '<cancelled>'
32 return res
33
34 def cancel(self):
35 self._cancelled = True
36
37 def _run(self):
38 try:
39 self._callback(*self._args)
40 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070041 logger.exception('Exception in callback %s %r',
42 self._callback, self._args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 self = None # Needed to break cycles when an exception occurs.
44
45
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046class TimerHandle(Handle):
47 """Object returned by timed callback registration methods."""
48
49 def __init__(self, when, callback, args):
50 assert when is not None
51 super().__init__(callback, args)
52
53 self._when = when
54
55 def __repr__(self):
56 res = 'TimerHandle({}, {}, {})'.format(self._when,
57 self._callback,
58 self._args)
59 if self._cancelled:
60 res += '<cancelled>'
61
62 return res
63
64 def __hash__(self):
65 return hash(self._when)
66
67 def __lt__(self, other):
68 return self._when < other._when
69
70 def __le__(self, other):
71 if self._when < other._when:
72 return True
73 return self.__eq__(other)
74
75 def __gt__(self, other):
76 return self._when > other._when
77
78 def __ge__(self, other):
79 if self._when > other._when:
80 return True
81 return self.__eq__(other)
82
83 def __eq__(self, other):
84 if isinstance(other, TimerHandle):
85 return (self._when == other._when and
86 self._callback == other._callback and
87 self._args == other._args and
88 self._cancelled == other._cancelled)
89 return NotImplemented
90
91 def __ne__(self, other):
92 equal = self.__eq__(other)
93 return NotImplemented if equal is NotImplemented else not equal
94
95
96class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +010097 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098
99 def close(self):
100 """Stop serving. This leaves existing connections open."""
101 return NotImplemented
102
103 def wait_closed(self):
104 """Coroutine to wait until service is closed."""
105 return NotImplemented
106
107
108class AbstractEventLoop:
109 """Abstract event loop."""
110
111 # Running and stopping the event loop.
112
113 def run_forever(self):
114 """Run the event loop until stop() is called."""
115 raise NotImplementedError
116
117 def run_until_complete(self, future):
118 """Run the event loop until a Future is done.
119
120 Return the Future's result, or raise its exception.
121 """
122 raise NotImplementedError
123
124 def stop(self):
125 """Stop the event loop as soon as reasonable.
126
127 Exactly how soon that is may depend on the implementation, but
128 no more I/O callbacks should be scheduled.
129 """
130 raise NotImplementedError
131
132 def is_running(self):
133 """Return whether the event loop is currently running."""
134 raise NotImplementedError
135
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700136 def close(self):
137 """Close the loop.
138
139 The loop should not be running.
140
141 This is idempotent and irreversible.
142
143 No other methods should be called after this one.
144 """
145 raise NotImplementedError
146
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 # Methods scheduling callbacks. All these return Handles.
148
149 def call_soon(self, callback, *args):
150 return self.call_later(0, callback, *args)
151
152 def call_later(self, delay, callback, *args):
153 raise NotImplementedError
154
155 def call_at(self, when, callback, *args):
156 raise NotImplementedError
157
158 def time(self):
159 raise NotImplementedError
160
161 # Methods for interacting with threads.
162
163 def call_soon_threadsafe(self, callback, *args):
164 raise NotImplementedError
165
166 def run_in_executor(self, executor, callback, *args):
167 raise NotImplementedError
168
169 def set_default_executor(self, executor):
170 raise NotImplementedError
171
172 # Network I/O methods returning Futures.
173
174 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
175 raise NotImplementedError
176
177 def getnameinfo(self, sockaddr, flags=0):
178 raise NotImplementedError
179
180 def create_connection(self, protocol_factory, host=None, port=None, *,
181 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700182 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 raise NotImplementedError
184
185 def create_server(self, protocol_factory, host=None, port=None, *,
186 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
187 sock=None, backlog=100, ssl=None, reuse_address=None):
188 """A coroutine which creates a TCP server bound to host and port.
189
190 The return value is a Server object which can be used to stop
191 the service.
192
193 If host is an empty string or None all interfaces are assumed
194 and a list of multiple sockets will be returned (most likely
195 one for IPv4 and another one for IPv6).
196
197 family can be set to either AF_INET or AF_INET6 to force the
198 socket to use IPv4 or IPv6. If not set it will be determined
199 from host (defaults to AF_UNSPEC).
200
201 flags is a bitmask for getaddrinfo().
202
203 sock can optionally be specified in order to use a preexisting
204 socket object.
205
206 backlog is the maximum number of queued connections passed to
207 listen() (defaults to 100).
208
209 ssl can be set to an SSLContext to enable SSL over the
210 accepted connections.
211
212 reuse_address tells the kernel to reuse a local socket in
213 TIME_WAIT state, without waiting for its natural timeout to
214 expire. If not specified will automatically be set to True on
215 UNIX.
216 """
217 raise NotImplementedError
218
219 def create_datagram_endpoint(self, protocol_factory,
220 local_addr=None, remote_addr=None, *,
221 family=0, proto=0, flags=0):
222 raise NotImplementedError
223
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700224 # Pipes and subprocesses.
225
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 def connect_read_pipe(self, protocol_factory, pipe):
227 """Register read pipe in eventloop.
228
229 protocol_factory should instantiate object with Protocol interface.
230 pipe is file-like object already switched to nonblocking.
231 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800232 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 # The reason to accept file-like object instead of just file descriptor
234 # is: we need to own pipe and close it at transport finishing
235 # Can got complicated errors if pass f.fileno(),
236 # close fd in pipe transport then close f and vise versa.
237 raise NotImplementedError
238
239 def connect_write_pipe(self, protocol_factory, pipe):
240 """Register write pipe in eventloop.
241
242 protocol_factory should instantiate object with BaseProtocol interface.
243 Pipe is file-like object already switched to nonblocking.
244 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800245 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246 # The reason to accept file-like object instead of just file descriptor
247 # is: we need to own pipe and close it at transport finishing
248 # Can got complicated errors if pass f.fileno(),
249 # close fd in pipe transport then close f and vise versa.
250 raise NotImplementedError
251
252 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
253 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
254 **kwargs):
255 raise NotImplementedError
256
257 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
258 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
259 **kwargs):
260 raise NotImplementedError
261
262 # Ready-based callback registration methods.
263 # The add_*() methods return None.
264 # The remove_*() methods return True if something was removed,
265 # False if there was nothing to delete.
266
267 def add_reader(self, fd, callback, *args):
268 raise NotImplementedError
269
270 def remove_reader(self, fd):
271 raise NotImplementedError
272
273 def add_writer(self, fd, callback, *args):
274 raise NotImplementedError
275
276 def remove_writer(self, fd):
277 raise NotImplementedError
278
279 # Completion based I/O methods returning Futures.
280
281 def sock_recv(self, sock, nbytes):
282 raise NotImplementedError
283
284 def sock_sendall(self, sock, data):
285 raise NotImplementedError
286
287 def sock_connect(self, sock, address):
288 raise NotImplementedError
289
290 def sock_accept(self, sock):
291 raise NotImplementedError
292
293 # Signal handling.
294
295 def add_signal_handler(self, sig, callback, *args):
296 raise NotImplementedError
297
298 def remove_signal_handler(self, sig):
299 raise NotImplementedError
300
301
302class AbstractEventLoopPolicy:
303 """Abstract policy for accessing the event loop."""
304
305 def get_event_loop(self):
306 """XXX"""
307 raise NotImplementedError
308
309 def set_event_loop(self, loop):
310 """XXX"""
311 raise NotImplementedError
312
313 def new_event_loop(self):
314 """XXX"""
315 raise NotImplementedError
316
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800317 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800319 def get_child_watcher(self):
320 """XXX"""
321 raise NotImplementedError
322
323 def set_child_watcher(self, watcher):
324 """XXX"""
325 raise NotImplementedError
326
327
328class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 """Default policy implementation for accessing the event loop.
330
331 In this policy, each thread has its own event loop. However, we
332 only automatically create an event loop by default for the main
333 thread; other threads by default have no event loop.
334
335 Other policies may have different rules (e.g. a single global
336 event loop, or automatically creating an event loop per thread, or
337 using some other notion of context to which an event loop is
338 associated).
339 """
340
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800341 _loop_factory = None
342
343 class _Local(threading.local):
344 _loop = None
345 _set_called = False
346
347 def __init__(self):
348 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350 def get_event_loop(self):
351 """Get the event loop.
352
353 This may be None or an instance of EventLoop.
354 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800355 if (self._local._loop is None and
356 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800358 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800359 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 ('There is no current event loop in thread %r.' %
361 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800362 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363
364 def set_event_loop(self, loop):
365 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800366 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800368 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370 def new_event_loop(self):
371 """Create a new event loop.
372
373 You must call set_event_loop() to make this the current event
374 loop.
375 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800376 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377
378
379# Event loop policy. The policy itself is always global, even if the
380# policy's rules say that there is an event loop per thread (or other
381# notion of context). The default policy is installed by the first
382# call to get_event_loop_policy().
383_event_loop_policy = None
384
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800385# Lock for protecting the on-the-fly creation of the event loop policy.
386_lock = threading.Lock()
387
388
389def _init_event_loop_policy():
390 global _event_loop_policy
391 with _lock:
392 if _event_loop_policy is None: # pragma: no branch
393 from . import DefaultEventLoopPolicy
394 _event_loop_policy = DefaultEventLoopPolicy()
395
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
397def get_event_loop_policy():
398 """XXX"""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800400 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 return _event_loop_policy
402
403
404def set_event_loop_policy(policy):
405 """XXX"""
406 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
408 _event_loop_policy = policy
409
410
411def get_event_loop():
412 """XXX"""
413 return get_event_loop_policy().get_event_loop()
414
415
416def set_event_loop(loop):
417 """XXX"""
418 get_event_loop_policy().set_event_loop(loop)
419
420
421def new_event_loop():
422 """XXX"""
423 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800424
425
426def get_child_watcher():
427 """XXX"""
428 return get_event_loop_policy().get_child_watcher()
429
430
431def set_child_watcher(watcher):
432 """XXX"""
433 return get_event_loop_policy().set_child_watcher(watcher)