blob: 57af68afb641b243b104ccc038597b073012d33f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
11import subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import threading
13import socket
14
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16class Handle:
17 """Object returned by callback registration methods."""
18
Yury Selivanov569efa22014-02-18 18:02:19 -050019 __slots__ = ['_callback', '_args', '_cancelled', '_loop']
Yury Selivanovb1317782014-02-12 17:01:52 -050020
Yury Selivanov569efa22014-02-18 18:02:19 -050021 def __init__(self, callback, args, loop):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010022 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Yury Selivanov569efa22014-02-18 18:02:19 -050023 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024 self._callback = callback
25 self._args = args
26 self._cancelled = False
27
28 def __repr__(self):
29 res = 'Handle({}, {})'.format(self._callback, self._args)
30 if self._cancelled:
31 res += '<cancelled>'
32 return res
33
34 def cancel(self):
35 self._cancelled = True
36
37 def _run(self):
38 try:
39 self._callback(*self._args)
Yury Selivanov569efa22014-02-18 18:02:19 -050040 except Exception as exc:
41 msg = 'Exception in callback {}{!r}'.format(self._callback,
42 self._args)
43 self._loop.call_exception_handler({
44 'message': msg,
45 'exception': exc,
46 'handle': self,
47 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048 self = None # Needed to break cycles when an exception occurs.
49
50
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051class TimerHandle(Handle):
52 """Object returned by timed callback registration methods."""
53
Yury Selivanovb1317782014-02-12 17:01:52 -050054 __slots__ = ['_when']
55
Yury Selivanov569efa22014-02-18 18:02:19 -050056 def __init__(self, when, callback, args, loop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057 assert when is not None
Yury Selivanov569efa22014-02-18 18:02:19 -050058 super().__init__(callback, args, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
60 self._when = when
61
62 def __repr__(self):
63 res = 'TimerHandle({}, {}, {})'.format(self._when,
64 self._callback,
65 self._args)
66 if self._cancelled:
67 res += '<cancelled>'
68
69 return res
70
71 def __hash__(self):
72 return hash(self._when)
73
74 def __lt__(self, other):
75 return self._when < other._when
76
77 def __le__(self, other):
78 if self._when < other._when:
79 return True
80 return self.__eq__(other)
81
82 def __gt__(self, other):
83 return self._when > other._when
84
85 def __ge__(self, other):
86 if self._when > other._when:
87 return True
88 return self.__eq__(other)
89
90 def __eq__(self, other):
91 if isinstance(other, TimerHandle):
92 return (self._when == other._when and
93 self._callback == other._callback and
94 self._args == other._args and
95 self._cancelled == other._cancelled)
96 return NotImplemented
97
98 def __ne__(self, other):
99 equal = self.__eq__(other)
100 return NotImplemented if equal is NotImplemented else not equal
101
102
103class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100104 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105
106 def close(self):
107 """Stop serving. This leaves existing connections open."""
108 return NotImplemented
109
110 def wait_closed(self):
111 """Coroutine to wait until service is closed."""
112 return NotImplemented
113
114
115class AbstractEventLoop:
116 """Abstract event loop."""
117
118 # Running and stopping the event loop.
119
120 def run_forever(self):
121 """Run the event loop until stop() is called."""
122 raise NotImplementedError
123
124 def run_until_complete(self, future):
125 """Run the event loop until a Future is done.
126
127 Return the Future's result, or raise its exception.
128 """
129 raise NotImplementedError
130
131 def stop(self):
132 """Stop the event loop as soon as reasonable.
133
134 Exactly how soon that is may depend on the implementation, but
135 no more I/O callbacks should be scheduled.
136 """
137 raise NotImplementedError
138
139 def is_running(self):
140 """Return whether the event loop is currently running."""
141 raise NotImplementedError
142
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700143 def close(self):
144 """Close the loop.
145
146 The loop should not be running.
147
148 This is idempotent and irreversible.
149
150 No other methods should be called after this one.
151 """
152 raise NotImplementedError
153
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 # Methods scheduling callbacks. All these return Handles.
155
156 def call_soon(self, callback, *args):
157 return self.call_later(0, callback, *args)
158
159 def call_later(self, delay, callback, *args):
160 raise NotImplementedError
161
162 def call_at(self, when, callback, *args):
163 raise NotImplementedError
164
165 def time(self):
166 raise NotImplementedError
167
168 # Methods for interacting with threads.
169
170 def call_soon_threadsafe(self, callback, *args):
171 raise NotImplementedError
172
173 def run_in_executor(self, executor, callback, *args):
174 raise NotImplementedError
175
176 def set_default_executor(self, executor):
177 raise NotImplementedError
178
179 # Network I/O methods returning Futures.
180
181 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
182 raise NotImplementedError
183
184 def getnameinfo(self, sockaddr, flags=0):
185 raise NotImplementedError
186
187 def create_connection(self, protocol_factory, host=None, port=None, *,
188 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700189 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 raise NotImplementedError
191
192 def create_server(self, protocol_factory, host=None, port=None, *,
193 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
194 sock=None, backlog=100, ssl=None, reuse_address=None):
195 """A coroutine which creates a TCP server bound to host and port.
196
197 The return value is a Server object which can be used to stop
198 the service.
199
200 If host is an empty string or None all interfaces are assumed
201 and a list of multiple sockets will be returned (most likely
202 one for IPv4 and another one for IPv6).
203
204 family can be set to either AF_INET or AF_INET6 to force the
205 socket to use IPv4 or IPv6. If not set it will be determined
206 from host (defaults to AF_UNSPEC).
207
208 flags is a bitmask for getaddrinfo().
209
210 sock can optionally be specified in order to use a preexisting
211 socket object.
212
213 backlog is the maximum number of queued connections passed to
214 listen() (defaults to 100).
215
216 ssl can be set to an SSLContext to enable SSL over the
217 accepted connections.
218
219 reuse_address tells the kernel to reuse a local socket in
220 TIME_WAIT state, without waiting for its natural timeout to
221 expire. If not specified will automatically be set to True on
222 UNIX.
223 """
224 raise NotImplementedError
225
Yury Selivanovb057c522014-02-18 12:15:06 -0500226 def create_unix_connection(self, protocol_factory, path, *,
227 ssl=None, sock=None,
228 server_hostname=None):
229 raise NotImplementedError
230
231 def create_unix_server(self, protocol_factory, path, *,
232 sock=None, backlog=100, ssl=None):
233 """A coroutine which creates a UNIX Domain Socket server.
234
Yury Selivanovdec1a452014-02-18 22:27:48 -0500235 The return value is a Server object, which can be used to stop
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 the service.
237
238 path is a str, representing a file systsem path to bind the
239 server socket to.
240
241 sock can optionally be specified in order to use a preexisting
242 socket object.
243
244 backlog is the maximum number of queued connections passed to
245 listen() (defaults to 100).
246
247 ssl can be set to an SSLContext to enable SSL over the
248 accepted connections.
249 """
250 raise NotImplementedError
251
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 def create_datagram_endpoint(self, protocol_factory,
253 local_addr=None, remote_addr=None, *,
254 family=0, proto=0, flags=0):
255 raise NotImplementedError
256
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700257 # Pipes and subprocesses.
258
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 def connect_read_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500260 """Register read pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
262 protocol_factory should instantiate object with Protocol interface.
263 pipe is file-like object already switched to nonblocking.
264 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800265 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 # The reason to accept file-like object instead of just file descriptor
267 # is: we need to own pipe and close it at transport finishing
268 # Can got complicated errors if pass f.fileno(),
269 # close fd in pipe transport then close f and vise versa.
270 raise NotImplementedError
271
272 def connect_write_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500273 """Register write pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274
275 protocol_factory should instantiate object with BaseProtocol interface.
276 Pipe is file-like object already switched to nonblocking.
277 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800278 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 # The reason to accept file-like object instead of just file descriptor
280 # is: we need to own pipe and close it at transport finishing
281 # Can got complicated errors if pass f.fileno(),
282 # close fd in pipe transport then close f and vise versa.
283 raise NotImplementedError
284
285 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
286 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
287 **kwargs):
288 raise NotImplementedError
289
290 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
291 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
292 **kwargs):
293 raise NotImplementedError
294
295 # Ready-based callback registration methods.
296 # The add_*() methods return None.
297 # The remove_*() methods return True if something was removed,
298 # False if there was nothing to delete.
299
300 def add_reader(self, fd, callback, *args):
301 raise NotImplementedError
302
303 def remove_reader(self, fd):
304 raise NotImplementedError
305
306 def add_writer(self, fd, callback, *args):
307 raise NotImplementedError
308
309 def remove_writer(self, fd):
310 raise NotImplementedError
311
312 # Completion based I/O methods returning Futures.
313
314 def sock_recv(self, sock, nbytes):
315 raise NotImplementedError
316
317 def sock_sendall(self, sock, data):
318 raise NotImplementedError
319
320 def sock_connect(self, sock, address):
321 raise NotImplementedError
322
323 def sock_accept(self, sock):
324 raise NotImplementedError
325
326 # Signal handling.
327
328 def add_signal_handler(self, sig, callback, *args):
329 raise NotImplementedError
330
331 def remove_signal_handler(self, sig):
332 raise NotImplementedError
333
Yury Selivanov569efa22014-02-18 18:02:19 -0500334 # Error handlers.
335
336 def set_exception_handler(self, handler):
337 raise NotImplementedError
338
339 def default_exception_handler(self, context):
340 raise NotImplementedError
341
342 def call_exception_handler(self, context):
343 raise NotImplementedError
344
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100345 # Debug flag management.
346
347 def get_debug(self):
348 raise NotImplementedError
349
350 def set_debug(self, enabled):
351 raise NotImplementedError
352
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354class AbstractEventLoopPolicy:
355 """Abstract policy for accessing the event loop."""
356
357 def get_event_loop(self):
358 """XXX"""
359 raise NotImplementedError
360
361 def set_event_loop(self, loop):
362 """XXX"""
363 raise NotImplementedError
364
365 def new_event_loop(self):
366 """XXX"""
367 raise NotImplementedError
368
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800369 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800371 def get_child_watcher(self):
372 """XXX"""
373 raise NotImplementedError
374
375 def set_child_watcher(self, watcher):
376 """XXX"""
377 raise NotImplementedError
378
379
380class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 """Default policy implementation for accessing the event loop.
382
383 In this policy, each thread has its own event loop. However, we
384 only automatically create an event loop by default for the main
385 thread; other threads by default have no event loop.
386
387 Other policies may have different rules (e.g. a single global
388 event loop, or automatically creating an event loop per thread, or
389 using some other notion of context to which an event loop is
390 associated).
391 """
392
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800393 _loop_factory = None
394
395 class _Local(threading.local):
396 _loop = None
397 _set_called = False
398
399 def __init__(self):
400 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401
402 def get_event_loop(self):
403 """Get the event loop.
404
405 This may be None or an instance of EventLoop.
406 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800407 if (self._local._loop is None and
408 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800410 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800411 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 ('There is no current event loop in thread %r.' %
413 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800414 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415
416 def set_event_loop(self, loop):
417 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800418 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800420 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
422 def new_event_loop(self):
423 """Create a new event loop.
424
425 You must call set_event_loop() to make this the current event
426 loop.
427 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800428 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
430
431# Event loop policy. The policy itself is always global, even if the
432# policy's rules say that there is an event loop per thread (or other
433# notion of context). The default policy is installed by the first
434# call to get_event_loop_policy().
435_event_loop_policy = None
436
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800437# Lock for protecting the on-the-fly creation of the event loop policy.
438_lock = threading.Lock()
439
440
441def _init_event_loop_policy():
442 global _event_loop_policy
443 with _lock:
444 if _event_loop_policy is None: # pragma: no branch
445 from . import DefaultEventLoopPolicy
446 _event_loop_policy = DefaultEventLoopPolicy()
447
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449def get_event_loop_policy():
450 """XXX"""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800452 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 return _event_loop_policy
454
455
456def set_event_loop_policy(policy):
457 """XXX"""
458 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
460 _event_loop_policy = policy
461
462
463def get_event_loop():
464 """XXX"""
465 return get_event_loop_policy().get_event_loop()
466
467
468def set_event_loop(loop):
469 """XXX"""
470 get_event_loop_policy().set_event_loop(loop)
471
472
473def new_event_loop():
474 """XXX"""
475 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800476
477
478def get_child_watcher():
479 """XXX"""
480 return get_event_loop_policy().get_child_watcher()
481
482
483def set_child_watcher(watcher):
484 """XXX"""
485 return get_event_loop_policy().set_child_watcher(watcher)