blob: 7841ad9ba52a7228f82007e086645f1ea69e479a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
11import subprocess
12import sys
13import threading
14import socket
15
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070016from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
19class Handle:
20 """Object returned by callback registration methods."""
21
Yury Selivanovb1317782014-02-12 17:01:52 -050022 __slots__ = ['_callback', '_args', '_cancelled']
23
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024 def __init__(self, callback, args):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010025 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026 self._callback = callback
27 self._args = args
28 self._cancelled = False
29
30 def __repr__(self):
31 res = 'Handle({}, {})'.format(self._callback, self._args)
32 if self._cancelled:
33 res += '<cancelled>'
34 return res
35
36 def cancel(self):
37 self._cancelled = True
38
39 def _run(self):
40 try:
41 self._callback(*self._args)
42 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070043 logger.exception('Exception in callback %s %r',
44 self._callback, self._args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045 self = None # Needed to break cycles when an exception occurs.
46
47
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048class TimerHandle(Handle):
49 """Object returned by timed callback registration methods."""
50
Yury Selivanovb1317782014-02-12 17:01:52 -050051 __slots__ = ['_when']
52
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053 def __init__(self, when, callback, args):
54 assert when is not None
55 super().__init__(callback, args)
56
57 self._when = when
58
59 def __repr__(self):
60 res = 'TimerHandle({}, {}, {})'.format(self._when,
61 self._callback,
62 self._args)
63 if self._cancelled:
64 res += '<cancelled>'
65
66 return res
67
68 def __hash__(self):
69 return hash(self._when)
70
71 def __lt__(self, other):
72 return self._when < other._when
73
74 def __le__(self, other):
75 if self._when < other._when:
76 return True
77 return self.__eq__(other)
78
79 def __gt__(self, other):
80 return self._when > other._when
81
82 def __ge__(self, other):
83 if self._when > other._when:
84 return True
85 return self.__eq__(other)
86
87 def __eq__(self, other):
88 if isinstance(other, TimerHandle):
89 return (self._when == other._when and
90 self._callback == other._callback and
91 self._args == other._args and
92 self._cancelled == other._cancelled)
93 return NotImplemented
94
95 def __ne__(self, other):
96 equal = self.__eq__(other)
97 return NotImplemented if equal is NotImplemented else not equal
98
99
100class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100101 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102
103 def close(self):
104 """Stop serving. This leaves existing connections open."""
105 return NotImplemented
106
107 def wait_closed(self):
108 """Coroutine to wait until service is closed."""
109 return NotImplemented
110
111
112class AbstractEventLoop:
113 """Abstract event loop."""
114
115 # Running and stopping the event loop.
116
117 def run_forever(self):
118 """Run the event loop until stop() is called."""
119 raise NotImplementedError
120
121 def run_until_complete(self, future):
122 """Run the event loop until a Future is done.
123
124 Return the Future's result, or raise its exception.
125 """
126 raise NotImplementedError
127
128 def stop(self):
129 """Stop the event loop as soon as reasonable.
130
131 Exactly how soon that is may depend on the implementation, but
132 no more I/O callbacks should be scheduled.
133 """
134 raise NotImplementedError
135
136 def is_running(self):
137 """Return whether the event loop is currently running."""
138 raise NotImplementedError
139
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700140 def close(self):
141 """Close the loop.
142
143 The loop should not be running.
144
145 This is idempotent and irreversible.
146
147 No other methods should be called after this one.
148 """
149 raise NotImplementedError
150
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 # Methods scheduling callbacks. All these return Handles.
152
153 def call_soon(self, callback, *args):
154 return self.call_later(0, callback, *args)
155
156 def call_later(self, delay, callback, *args):
157 raise NotImplementedError
158
159 def call_at(self, when, callback, *args):
160 raise NotImplementedError
161
162 def time(self):
163 raise NotImplementedError
164
165 # Methods for interacting with threads.
166
167 def call_soon_threadsafe(self, callback, *args):
168 raise NotImplementedError
169
170 def run_in_executor(self, executor, callback, *args):
171 raise NotImplementedError
172
173 def set_default_executor(self, executor):
174 raise NotImplementedError
175
176 # Network I/O methods returning Futures.
177
178 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
179 raise NotImplementedError
180
181 def getnameinfo(self, sockaddr, flags=0):
182 raise NotImplementedError
183
184 def create_connection(self, protocol_factory, host=None, port=None, *,
185 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700186 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 raise NotImplementedError
188
189 def create_server(self, protocol_factory, host=None, port=None, *,
190 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
191 sock=None, backlog=100, ssl=None, reuse_address=None):
192 """A coroutine which creates a TCP server bound to host and port.
193
194 The return value is a Server object which can be used to stop
195 the service.
196
197 If host is an empty string or None all interfaces are assumed
198 and a list of multiple sockets will be returned (most likely
199 one for IPv4 and another one for IPv6).
200
201 family can be set to either AF_INET or AF_INET6 to force the
202 socket to use IPv4 or IPv6. If not set it will be determined
203 from host (defaults to AF_UNSPEC).
204
205 flags is a bitmask for getaddrinfo().
206
207 sock can optionally be specified in order to use a preexisting
208 socket object.
209
210 backlog is the maximum number of queued connections passed to
211 listen() (defaults to 100).
212
213 ssl can be set to an SSLContext to enable SSL over the
214 accepted connections.
215
216 reuse_address tells the kernel to reuse a local socket in
217 TIME_WAIT state, without waiting for its natural timeout to
218 expire. If not specified will automatically be set to True on
219 UNIX.
220 """
221 raise NotImplementedError
222
Yury Selivanovb057c522014-02-18 12:15:06 -0500223 def create_unix_connection(self, protocol_factory, path, *,
224 ssl=None, sock=None,
225 server_hostname=None):
226 raise NotImplementedError
227
228 def create_unix_server(self, protocol_factory, path, *,
229 sock=None, backlog=100, ssl=None):
230 """A coroutine which creates a UNIX Domain Socket server.
231
232 The return valud is a Server object, which can be used to stop
233 the service.
234
235 path is a str, representing a file systsem path to bind the
236 server socket to.
237
238 sock can optionally be specified in order to use a preexisting
239 socket object.
240
241 backlog is the maximum number of queued connections passed to
242 listen() (defaults to 100).
243
244 ssl can be set to an SSLContext to enable SSL over the
245 accepted connections.
246 """
247 raise NotImplementedError
248
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 def create_datagram_endpoint(self, protocol_factory,
250 local_addr=None, remote_addr=None, *,
251 family=0, proto=0, flags=0):
252 raise NotImplementedError
253
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700254 # Pipes and subprocesses.
255
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 def connect_read_pipe(self, protocol_factory, pipe):
257 """Register read pipe in eventloop.
258
259 protocol_factory should instantiate object with Protocol interface.
260 pipe is file-like object already switched to nonblocking.
261 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800262 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 # The reason to accept file-like object instead of just file descriptor
264 # is: we need to own pipe and close it at transport finishing
265 # Can got complicated errors if pass f.fileno(),
266 # close fd in pipe transport then close f and vise versa.
267 raise NotImplementedError
268
269 def connect_write_pipe(self, protocol_factory, pipe):
270 """Register write pipe in eventloop.
271
272 protocol_factory should instantiate object with BaseProtocol interface.
273 Pipe is file-like object already switched to nonblocking.
274 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800275 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 # The reason to accept file-like object instead of just file descriptor
277 # is: we need to own pipe and close it at transport finishing
278 # Can got complicated errors if pass f.fileno(),
279 # close fd in pipe transport then close f and vise versa.
280 raise NotImplementedError
281
282 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
283 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
284 **kwargs):
285 raise NotImplementedError
286
287 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
288 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
289 **kwargs):
290 raise NotImplementedError
291
292 # Ready-based callback registration methods.
293 # The add_*() methods return None.
294 # The remove_*() methods return True if something was removed,
295 # False if there was nothing to delete.
296
297 def add_reader(self, fd, callback, *args):
298 raise NotImplementedError
299
300 def remove_reader(self, fd):
301 raise NotImplementedError
302
303 def add_writer(self, fd, callback, *args):
304 raise NotImplementedError
305
306 def remove_writer(self, fd):
307 raise NotImplementedError
308
309 # Completion based I/O methods returning Futures.
310
311 def sock_recv(self, sock, nbytes):
312 raise NotImplementedError
313
314 def sock_sendall(self, sock, data):
315 raise NotImplementedError
316
317 def sock_connect(self, sock, address):
318 raise NotImplementedError
319
320 def sock_accept(self, sock):
321 raise NotImplementedError
322
323 # Signal handling.
324
325 def add_signal_handler(self, sig, callback, *args):
326 raise NotImplementedError
327
328 def remove_signal_handler(self, sig):
329 raise NotImplementedError
330
331
332class AbstractEventLoopPolicy:
333 """Abstract policy for accessing the event loop."""
334
335 def get_event_loop(self):
336 """XXX"""
337 raise NotImplementedError
338
339 def set_event_loop(self, loop):
340 """XXX"""
341 raise NotImplementedError
342
343 def new_event_loop(self):
344 """XXX"""
345 raise NotImplementedError
346
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800347 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800349 def get_child_watcher(self):
350 """XXX"""
351 raise NotImplementedError
352
353 def set_child_watcher(self, watcher):
354 """XXX"""
355 raise NotImplementedError
356
357
358class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 """Default policy implementation for accessing the event loop.
360
361 In this policy, each thread has its own event loop. However, we
362 only automatically create an event loop by default for the main
363 thread; other threads by default have no event loop.
364
365 Other policies may have different rules (e.g. a single global
366 event loop, or automatically creating an event loop per thread, or
367 using some other notion of context to which an event loop is
368 associated).
369 """
370
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800371 _loop_factory = None
372
373 class _Local(threading.local):
374 _loop = None
375 _set_called = False
376
377 def __init__(self):
378 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379
380 def get_event_loop(self):
381 """Get the event loop.
382
383 This may be None or an instance of EventLoop.
384 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800385 if (self._local._loop is None and
386 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800388 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800389 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 ('There is no current event loop in thread %r.' %
391 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800392 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393
394 def set_event_loop(self, loop):
395 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800396 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800398 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
400 def new_event_loop(self):
401 """Create a new event loop.
402
403 You must call set_event_loop() to make this the current event
404 loop.
405 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800406 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
408
409# Event loop policy. The policy itself is always global, even if the
410# policy's rules say that there is an event loop per thread (or other
411# notion of context). The default policy is installed by the first
412# call to get_event_loop_policy().
413_event_loop_policy = None
414
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800415# Lock for protecting the on-the-fly creation of the event loop policy.
416_lock = threading.Lock()
417
418
419def _init_event_loop_policy():
420 global _event_loop_policy
421 with _lock:
422 if _event_loop_policy is None: # pragma: no branch
423 from . import DefaultEventLoopPolicy
424 _event_loop_policy = DefaultEventLoopPolicy()
425
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427def get_event_loop_policy():
428 """XXX"""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800430 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 return _event_loop_policy
432
433
434def set_event_loop_policy(policy):
435 """XXX"""
436 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
438 _event_loop_policy = policy
439
440
441def get_event_loop():
442 """XXX"""
443 return get_event_loop_policy().get_event_loop()
444
445
446def set_event_loop(loop):
447 """XXX"""
448 get_event_loop_policy().set_event_loop(loop)
449
450
451def new_event_loop():
452 """XXX"""
453 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800454
455
456def get_child_watcher():
457 """XXX"""
458 return get_event_loop_policy().get_child_watcher()
459
460
461def set_child_watcher(watcher):
462 """XXX"""
463 return get_event_loop_policy().set_child_watcher(watcher)