blob: 1030c045bd8686deb446718b2b0e305ebe564728 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
11import subprocess
12import sys
13import threading
14import socket
15
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070016from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
19class Handle:
20 """Object returned by callback registration methods."""
21
Yury Selivanov569efa22014-02-18 18:02:19 -050022 __slots__ = ['_callback', '_args', '_cancelled', '_loop']
Yury Selivanovb1317782014-02-12 17:01:52 -050023
Yury Selivanov569efa22014-02-18 18:02:19 -050024 def __init__(self, callback, args, loop):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010025 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Yury Selivanov569efa22014-02-18 18:02:19 -050026 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027 self._callback = callback
28 self._args = args
29 self._cancelled = False
30
31 def __repr__(self):
32 res = 'Handle({}, {})'.format(self._callback, self._args)
33 if self._cancelled:
34 res += '<cancelled>'
35 return res
36
37 def cancel(self):
38 self._cancelled = True
39
40 def _run(self):
41 try:
42 self._callback(*self._args)
Yury Selivanov569efa22014-02-18 18:02:19 -050043 except Exception as exc:
44 msg = 'Exception in callback {}{!r}'.format(self._callback,
45 self._args)
46 self._loop.call_exception_handler({
47 'message': msg,
48 'exception': exc,
49 'handle': self,
50 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 self = None # Needed to break cycles when an exception occurs.
52
53
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054class TimerHandle(Handle):
55 """Object returned by timed callback registration methods."""
56
Yury Selivanovb1317782014-02-12 17:01:52 -050057 __slots__ = ['_when']
58
Yury Selivanov569efa22014-02-18 18:02:19 -050059 def __init__(self, when, callback, args, loop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 assert when is not None
Yury Selivanov569efa22014-02-18 18:02:19 -050061 super().__init__(callback, args, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
63 self._when = when
64
65 def __repr__(self):
66 res = 'TimerHandle({}, {}, {})'.format(self._when,
67 self._callback,
68 self._args)
69 if self._cancelled:
70 res += '<cancelled>'
71
72 return res
73
74 def __hash__(self):
75 return hash(self._when)
76
77 def __lt__(self, other):
78 return self._when < other._when
79
80 def __le__(self, other):
81 if self._when < other._when:
82 return True
83 return self.__eq__(other)
84
85 def __gt__(self, other):
86 return self._when > other._when
87
88 def __ge__(self, other):
89 if self._when > other._when:
90 return True
91 return self.__eq__(other)
92
93 def __eq__(self, other):
94 if isinstance(other, TimerHandle):
95 return (self._when == other._when and
96 self._callback == other._callback and
97 self._args == other._args and
98 self._cancelled == other._cancelled)
99 return NotImplemented
100
101 def __ne__(self, other):
102 equal = self.__eq__(other)
103 return NotImplemented if equal is NotImplemented else not equal
104
105
106class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100107 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def close(self):
110 """Stop serving. This leaves existing connections open."""
111 return NotImplemented
112
113 def wait_closed(self):
114 """Coroutine to wait until service is closed."""
115 return NotImplemented
116
117
118class AbstractEventLoop:
119 """Abstract event loop."""
120
121 # Running and stopping the event loop.
122
123 def run_forever(self):
124 """Run the event loop until stop() is called."""
125 raise NotImplementedError
126
127 def run_until_complete(self, future):
128 """Run the event loop until a Future is done.
129
130 Return the Future's result, or raise its exception.
131 """
132 raise NotImplementedError
133
134 def stop(self):
135 """Stop the event loop as soon as reasonable.
136
137 Exactly how soon that is may depend on the implementation, but
138 no more I/O callbacks should be scheduled.
139 """
140 raise NotImplementedError
141
142 def is_running(self):
143 """Return whether the event loop is currently running."""
144 raise NotImplementedError
145
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700146 def close(self):
147 """Close the loop.
148
149 The loop should not be running.
150
151 This is idempotent and irreversible.
152
153 No other methods should be called after this one.
154 """
155 raise NotImplementedError
156
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157 # Methods scheduling callbacks. All these return Handles.
158
159 def call_soon(self, callback, *args):
160 return self.call_later(0, callback, *args)
161
162 def call_later(self, delay, callback, *args):
163 raise NotImplementedError
164
165 def call_at(self, when, callback, *args):
166 raise NotImplementedError
167
168 def time(self):
169 raise NotImplementedError
170
171 # Methods for interacting with threads.
172
173 def call_soon_threadsafe(self, callback, *args):
174 raise NotImplementedError
175
176 def run_in_executor(self, executor, callback, *args):
177 raise NotImplementedError
178
179 def set_default_executor(self, executor):
180 raise NotImplementedError
181
182 # Network I/O methods returning Futures.
183
184 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
185 raise NotImplementedError
186
187 def getnameinfo(self, sockaddr, flags=0):
188 raise NotImplementedError
189
190 def create_connection(self, protocol_factory, host=None, port=None, *,
191 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700192 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193 raise NotImplementedError
194
195 def create_server(self, protocol_factory, host=None, port=None, *,
196 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
197 sock=None, backlog=100, ssl=None, reuse_address=None):
198 """A coroutine which creates a TCP server bound to host and port.
199
200 The return value is a Server object which can be used to stop
201 the service.
202
203 If host is an empty string or None all interfaces are assumed
204 and a list of multiple sockets will be returned (most likely
205 one for IPv4 and another one for IPv6).
206
207 family can be set to either AF_INET or AF_INET6 to force the
208 socket to use IPv4 or IPv6. If not set it will be determined
209 from host (defaults to AF_UNSPEC).
210
211 flags is a bitmask for getaddrinfo().
212
213 sock can optionally be specified in order to use a preexisting
214 socket object.
215
216 backlog is the maximum number of queued connections passed to
217 listen() (defaults to 100).
218
219 ssl can be set to an SSLContext to enable SSL over the
220 accepted connections.
221
222 reuse_address tells the kernel to reuse a local socket in
223 TIME_WAIT state, without waiting for its natural timeout to
224 expire. If not specified will automatically be set to True on
225 UNIX.
226 """
227 raise NotImplementedError
228
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 def create_unix_connection(self, protocol_factory, path, *,
230 ssl=None, sock=None,
231 server_hostname=None):
232 raise NotImplementedError
233
234 def create_unix_server(self, protocol_factory, path, *,
235 sock=None, backlog=100, ssl=None):
236 """A coroutine which creates a UNIX Domain Socket server.
237
Yury Selivanovdec1a452014-02-18 22:27:48 -0500238 The return value is a Server object, which can be used to stop
Yury Selivanovb057c522014-02-18 12:15:06 -0500239 the service.
240
241 path is a str, representing a file systsem path to bind the
242 server socket to.
243
244 sock can optionally be specified in order to use a preexisting
245 socket object.
246
247 backlog is the maximum number of queued connections passed to
248 listen() (defaults to 100).
249
250 ssl can be set to an SSLContext to enable SSL over the
251 accepted connections.
252 """
253 raise NotImplementedError
254
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 def create_datagram_endpoint(self, protocol_factory,
256 local_addr=None, remote_addr=None, *,
257 family=0, proto=0, flags=0):
258 raise NotImplementedError
259
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700260 # Pipes and subprocesses.
261
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 def connect_read_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500263 """Register read pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264
265 protocol_factory should instantiate object with Protocol interface.
266 pipe is file-like object already switched to nonblocking.
267 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800268 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269 # The reason to accept file-like object instead of just file descriptor
270 # is: we need to own pipe and close it at transport finishing
271 # Can got complicated errors if pass f.fileno(),
272 # close fd in pipe transport then close f and vise versa.
273 raise NotImplementedError
274
275 def connect_write_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500276 """Register write pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277
278 protocol_factory should instantiate object with BaseProtocol interface.
279 Pipe is file-like object already switched to nonblocking.
280 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800281 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 # The reason to accept file-like object instead of just file descriptor
283 # is: we need to own pipe and close it at transport finishing
284 # Can got complicated errors if pass f.fileno(),
285 # close fd in pipe transport then close f and vise versa.
286 raise NotImplementedError
287
288 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
289 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
290 **kwargs):
291 raise NotImplementedError
292
293 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
294 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
295 **kwargs):
296 raise NotImplementedError
297
298 # Ready-based callback registration methods.
299 # The add_*() methods return None.
300 # The remove_*() methods return True if something was removed,
301 # False if there was nothing to delete.
302
303 def add_reader(self, fd, callback, *args):
304 raise NotImplementedError
305
306 def remove_reader(self, fd):
307 raise NotImplementedError
308
309 def add_writer(self, fd, callback, *args):
310 raise NotImplementedError
311
312 def remove_writer(self, fd):
313 raise NotImplementedError
314
315 # Completion based I/O methods returning Futures.
316
317 def sock_recv(self, sock, nbytes):
318 raise NotImplementedError
319
320 def sock_sendall(self, sock, data):
321 raise NotImplementedError
322
323 def sock_connect(self, sock, address):
324 raise NotImplementedError
325
326 def sock_accept(self, sock):
327 raise NotImplementedError
328
329 # Signal handling.
330
331 def add_signal_handler(self, sig, callback, *args):
332 raise NotImplementedError
333
334 def remove_signal_handler(self, sig):
335 raise NotImplementedError
336
Yury Selivanov569efa22014-02-18 18:02:19 -0500337 # Error handlers.
338
339 def set_exception_handler(self, handler):
340 raise NotImplementedError
341
342 def default_exception_handler(self, context):
343 raise NotImplementedError
344
345 def call_exception_handler(self, context):
346 raise NotImplementedError
347
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
349class AbstractEventLoopPolicy:
350 """Abstract policy for accessing the event loop."""
351
352 def get_event_loop(self):
353 """XXX"""
354 raise NotImplementedError
355
356 def set_event_loop(self, loop):
357 """XXX"""
358 raise NotImplementedError
359
360 def new_event_loop(self):
361 """XXX"""
362 raise NotImplementedError
363
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800364 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800366 def get_child_watcher(self):
367 """XXX"""
368 raise NotImplementedError
369
370 def set_child_watcher(self, watcher):
371 """XXX"""
372 raise NotImplementedError
373
374
375class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 """Default policy implementation for accessing the event loop.
377
378 In this policy, each thread has its own event loop. However, we
379 only automatically create an event loop by default for the main
380 thread; other threads by default have no event loop.
381
382 Other policies may have different rules (e.g. a single global
383 event loop, or automatically creating an event loop per thread, or
384 using some other notion of context to which an event loop is
385 associated).
386 """
387
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800388 _loop_factory = None
389
390 class _Local(threading.local):
391 _loop = None
392 _set_called = False
393
394 def __init__(self):
395 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
397 def get_event_loop(self):
398 """Get the event loop.
399
400 This may be None or an instance of EventLoop.
401 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800402 if (self._local._loop is None and
403 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800405 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800406 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 ('There is no current event loop in thread %r.' %
408 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800409 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
411 def set_event_loop(self, loop):
412 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800413 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800415 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 def new_event_loop(self):
418 """Create a new event loop.
419
420 You must call set_event_loop() to make this the current event
421 loop.
422 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800423 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
425
426# Event loop policy. The policy itself is always global, even if the
427# policy's rules say that there is an event loop per thread (or other
428# notion of context). The default policy is installed by the first
429# call to get_event_loop_policy().
430_event_loop_policy = None
431
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800432# Lock for protecting the on-the-fly creation of the event loop policy.
433_lock = threading.Lock()
434
435
436def _init_event_loop_policy():
437 global _event_loop_policy
438 with _lock:
439 if _event_loop_policy is None: # pragma: no branch
440 from . import DefaultEventLoopPolicy
441 _event_loop_policy = DefaultEventLoopPolicy()
442
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443
444def get_event_loop_policy():
445 """XXX"""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800447 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 return _event_loop_policy
449
450
451def set_event_loop_policy(policy):
452 """XXX"""
453 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
455 _event_loop_policy = policy
456
457
458def get_event_loop():
459 """XXX"""
460 return get_event_loop_policy().get_event_loop()
461
462
463def set_event_loop(loop):
464 """XXX"""
465 get_event_loop_policy().set_event_loop(loop)
466
467
468def new_event_loop():
469 """XXX"""
470 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800471
472
473def get_child_watcher():
474 """XXX"""
475 return get_event_loop_policy().get_child_watcher()
476
477
478def set_child_watcher(watcher):
479 """XXX"""
480 return get_event_loop_policy().set_child_watcher(watcher)