blob: dd9e3fb42992df2e5fdf2157cc5532abb94de74f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
11import subprocess
12import sys
13import threading
14import socket
15
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070016from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
19class Handle:
20 """Object returned by callback registration methods."""
21
Yury Selivanovb1317782014-02-12 17:01:52 -050022 __slots__ = ['_callback', '_args', '_cancelled']
23
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024 def __init__(self, callback, args):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010025 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026 self._callback = callback
27 self._args = args
28 self._cancelled = False
29
30 def __repr__(self):
31 res = 'Handle({}, {})'.format(self._callback, self._args)
32 if self._cancelled:
33 res += '<cancelled>'
34 return res
35
36 def cancel(self):
37 self._cancelled = True
38
39 def _run(self):
40 try:
41 self._callback(*self._args)
42 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070043 logger.exception('Exception in callback %s %r',
44 self._callback, self._args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045 self = None # Needed to break cycles when an exception occurs.
46
47
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048class TimerHandle(Handle):
49 """Object returned by timed callback registration methods."""
50
Yury Selivanovb1317782014-02-12 17:01:52 -050051 __slots__ = ['_when']
52
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053 def __init__(self, when, callback, args):
54 assert when is not None
55 super().__init__(callback, args)
56
57 self._when = when
58
59 def __repr__(self):
60 res = 'TimerHandle({}, {}, {})'.format(self._when,
61 self._callback,
62 self._args)
63 if self._cancelled:
64 res += '<cancelled>'
65
66 return res
67
68 def __hash__(self):
69 return hash(self._when)
70
71 def __lt__(self, other):
72 return self._when < other._when
73
74 def __le__(self, other):
75 if self._when < other._when:
76 return True
77 return self.__eq__(other)
78
79 def __gt__(self, other):
80 return self._when > other._when
81
82 def __ge__(self, other):
83 if self._when > other._when:
84 return True
85 return self.__eq__(other)
86
87 def __eq__(self, other):
88 if isinstance(other, TimerHandle):
89 return (self._when == other._when and
90 self._callback == other._callback and
91 self._args == other._args and
92 self._cancelled == other._cancelled)
93 return NotImplemented
94
95 def __ne__(self, other):
96 equal = self.__eq__(other)
97 return NotImplemented if equal is NotImplemented else not equal
98
99
100class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100101 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102
103 def close(self):
104 """Stop serving. This leaves existing connections open."""
105 return NotImplemented
106
107 def wait_closed(self):
108 """Coroutine to wait until service is closed."""
109 return NotImplemented
110
111
112class AbstractEventLoop:
113 """Abstract event loop."""
114
115 # Running and stopping the event loop.
116
117 def run_forever(self):
118 """Run the event loop until stop() is called."""
119 raise NotImplementedError
120
121 def run_until_complete(self, future):
122 """Run the event loop until a Future is done.
123
124 Return the Future's result, or raise its exception.
125 """
126 raise NotImplementedError
127
128 def stop(self):
129 """Stop the event loop as soon as reasonable.
130
131 Exactly how soon that is may depend on the implementation, but
132 no more I/O callbacks should be scheduled.
133 """
134 raise NotImplementedError
135
136 def is_running(self):
137 """Return whether the event loop is currently running."""
138 raise NotImplementedError
139
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700140 def close(self):
141 """Close the loop.
142
143 The loop should not be running.
144
145 This is idempotent and irreversible.
146
147 No other methods should be called after this one.
148 """
149 raise NotImplementedError
150
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 # Methods scheduling callbacks. All these return Handles.
152
153 def call_soon(self, callback, *args):
154 return self.call_later(0, callback, *args)
155
156 def call_later(self, delay, callback, *args):
157 raise NotImplementedError
158
159 def call_at(self, when, callback, *args):
160 raise NotImplementedError
161
162 def time(self):
163 raise NotImplementedError
164
165 # Methods for interacting with threads.
166
167 def call_soon_threadsafe(self, callback, *args):
168 raise NotImplementedError
169
170 def run_in_executor(self, executor, callback, *args):
171 raise NotImplementedError
172
173 def set_default_executor(self, executor):
174 raise NotImplementedError
175
176 # Network I/O methods returning Futures.
177
178 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
179 raise NotImplementedError
180
181 def getnameinfo(self, sockaddr, flags=0):
182 raise NotImplementedError
183
184 def create_connection(self, protocol_factory, host=None, port=None, *,
185 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700186 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 raise NotImplementedError
188
189 def create_server(self, protocol_factory, host=None, port=None, *,
190 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
191 sock=None, backlog=100, ssl=None, reuse_address=None):
192 """A coroutine which creates a TCP server bound to host and port.
193
194 The return value is a Server object which can be used to stop
195 the service.
196
197 If host is an empty string or None all interfaces are assumed
198 and a list of multiple sockets will be returned (most likely
199 one for IPv4 and another one for IPv6).
200
201 family can be set to either AF_INET or AF_INET6 to force the
202 socket to use IPv4 or IPv6. If not set it will be determined
203 from host (defaults to AF_UNSPEC).
204
205 flags is a bitmask for getaddrinfo().
206
207 sock can optionally be specified in order to use a preexisting
208 socket object.
209
210 backlog is the maximum number of queued connections passed to
211 listen() (defaults to 100).
212
213 ssl can be set to an SSLContext to enable SSL over the
214 accepted connections.
215
216 reuse_address tells the kernel to reuse a local socket in
217 TIME_WAIT state, without waiting for its natural timeout to
218 expire. If not specified will automatically be set to True on
219 UNIX.
220 """
221 raise NotImplementedError
222
223 def create_datagram_endpoint(self, protocol_factory,
224 local_addr=None, remote_addr=None, *,
225 family=0, proto=0, flags=0):
226 raise NotImplementedError
227
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700228 # Pipes and subprocesses.
229
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 def connect_read_pipe(self, protocol_factory, pipe):
231 """Register read pipe in eventloop.
232
233 protocol_factory should instantiate object with Protocol interface.
234 pipe is file-like object already switched to nonblocking.
235 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800236 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 # The reason to accept file-like object instead of just file descriptor
238 # is: we need to own pipe and close it at transport finishing
239 # Can got complicated errors if pass f.fileno(),
240 # close fd in pipe transport then close f and vise versa.
241 raise NotImplementedError
242
243 def connect_write_pipe(self, protocol_factory, pipe):
244 """Register write pipe in eventloop.
245
246 protocol_factory should instantiate object with BaseProtocol interface.
247 Pipe is file-like object already switched to nonblocking.
248 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800249 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 # The reason to accept file-like object instead of just file descriptor
251 # is: we need to own pipe and close it at transport finishing
252 # Can got complicated errors if pass f.fileno(),
253 # close fd in pipe transport then close f and vise versa.
254 raise NotImplementedError
255
256 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
257 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
258 **kwargs):
259 raise NotImplementedError
260
261 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
262 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
263 **kwargs):
264 raise NotImplementedError
265
266 # Ready-based callback registration methods.
267 # The add_*() methods return None.
268 # The remove_*() methods return True if something was removed,
269 # False if there was nothing to delete.
270
271 def add_reader(self, fd, callback, *args):
272 raise NotImplementedError
273
274 def remove_reader(self, fd):
275 raise NotImplementedError
276
277 def add_writer(self, fd, callback, *args):
278 raise NotImplementedError
279
280 def remove_writer(self, fd):
281 raise NotImplementedError
282
283 # Completion based I/O methods returning Futures.
284
285 def sock_recv(self, sock, nbytes):
286 raise NotImplementedError
287
288 def sock_sendall(self, sock, data):
289 raise NotImplementedError
290
291 def sock_connect(self, sock, address):
292 raise NotImplementedError
293
294 def sock_accept(self, sock):
295 raise NotImplementedError
296
297 # Signal handling.
298
299 def add_signal_handler(self, sig, callback, *args):
300 raise NotImplementedError
301
302 def remove_signal_handler(self, sig):
303 raise NotImplementedError
304
305
306class AbstractEventLoopPolicy:
307 """Abstract policy for accessing the event loop."""
308
309 def get_event_loop(self):
310 """XXX"""
311 raise NotImplementedError
312
313 def set_event_loop(self, loop):
314 """XXX"""
315 raise NotImplementedError
316
317 def new_event_loop(self):
318 """XXX"""
319 raise NotImplementedError
320
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800321 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800323 def get_child_watcher(self):
324 """XXX"""
325 raise NotImplementedError
326
327 def set_child_watcher(self, watcher):
328 """XXX"""
329 raise NotImplementedError
330
331
332class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 """Default policy implementation for accessing the event loop.
334
335 In this policy, each thread has its own event loop. However, we
336 only automatically create an event loop by default for the main
337 thread; other threads by default have no event loop.
338
339 Other policies may have different rules (e.g. a single global
340 event loop, or automatically creating an event loop per thread, or
341 using some other notion of context to which an event loop is
342 associated).
343 """
344
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800345 _loop_factory = None
346
347 class _Local(threading.local):
348 _loop = None
349 _set_called = False
350
351 def __init__(self):
352 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 def get_event_loop(self):
355 """Get the event loop.
356
357 This may be None or an instance of EventLoop.
358 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800359 if (self._local._loop is None and
360 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800362 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800363 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 ('There is no current event loop in thread %r.' %
365 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800366 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367
368 def set_event_loop(self, loop):
369 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800370 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800372 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
374 def new_event_loop(self):
375 """Create a new event loop.
376
377 You must call set_event_loop() to make this the current event
378 loop.
379 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800380 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
382
383# Event loop policy. The policy itself is always global, even if the
384# policy's rules say that there is an event loop per thread (or other
385# notion of context). The default policy is installed by the first
386# call to get_event_loop_policy().
387_event_loop_policy = None
388
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800389# Lock for protecting the on-the-fly creation of the event loop policy.
390_lock = threading.Lock()
391
392
393def _init_event_loop_policy():
394 global _event_loop_policy
395 with _lock:
396 if _event_loop_policy is None: # pragma: no branch
397 from . import DefaultEventLoopPolicy
398 _event_loop_policy = DefaultEventLoopPolicy()
399
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
401def get_event_loop_policy():
402 """XXX"""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800404 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 return _event_loop_policy
406
407
408def set_event_loop_policy(policy):
409 """XXX"""
410 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
412 _event_loop_policy = policy
413
414
415def get_event_loop():
416 """XXX"""
417 return get_event_loop_policy().get_event_loop()
418
419
420def set_event_loop(loop):
421 """XXX"""
422 get_event_loop_policy().set_event_loop(loop)
423
424
425def new_event_loop():
426 """XXX"""
427 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800428
429
430def get_child_watcher():
431 """XXX"""
432 return get_event_loop_policy().get_child_watcher()
433
434
435def set_child_watcher(watcher):
436 """XXX"""
437 return get_event_loop_policy().set_child_watcher(watcher)