blob: 62400195a2424ed3dd4ce0117215ed1f98a3891c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
11import subprocess
12import sys
13import threading
14import socket
15
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070016from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
19class Handle:
20 """Object returned by callback registration methods."""
21
22 def __init__(self, callback, args):
23 self._callback = callback
24 self._args = args
25 self._cancelled = False
26
27 def __repr__(self):
28 res = 'Handle({}, {})'.format(self._callback, self._args)
29 if self._cancelled:
30 res += '<cancelled>'
31 return res
32
33 def cancel(self):
34 self._cancelled = True
35
36 def _run(self):
37 try:
38 self._callback(*self._args)
39 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070040 logger.exception('Exception in callback %s %r',
41 self._callback, self._args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 self = None # Needed to break cycles when an exception occurs.
43
44
45def make_handle(callback, args):
Guido van Rossumec7922c2013-10-30 14:38:05 -070046 # TODO: Inline this? Or make it a private EventLoop method?
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047 assert not isinstance(callback, Handle), 'A Handle is not a callback'
48 return Handle(callback, args)
49
50
51class TimerHandle(Handle):
52 """Object returned by timed callback registration methods."""
53
54 def __init__(self, when, callback, args):
55 assert when is not None
56 super().__init__(callback, args)
57
58 self._when = when
59
60 def __repr__(self):
61 res = 'TimerHandle({}, {}, {})'.format(self._when,
62 self._callback,
63 self._args)
64 if self._cancelled:
65 res += '<cancelled>'
66
67 return res
68
69 def __hash__(self):
70 return hash(self._when)
71
72 def __lt__(self, other):
73 return self._when < other._when
74
75 def __le__(self, other):
76 if self._when < other._when:
77 return True
78 return self.__eq__(other)
79
80 def __gt__(self, other):
81 return self._when > other._when
82
83 def __ge__(self, other):
84 if self._when > other._when:
85 return True
86 return self.__eq__(other)
87
88 def __eq__(self, other):
89 if isinstance(other, TimerHandle):
90 return (self._when == other._when and
91 self._callback == other._callback and
92 self._args == other._args and
93 self._cancelled == other._cancelled)
94 return NotImplemented
95
96 def __ne__(self, other):
97 equal = self.__eq__(other)
98 return NotImplemented if equal is NotImplemented else not equal
99
100
101class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100102 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
104 def close(self):
105 """Stop serving. This leaves existing connections open."""
106 return NotImplemented
107
108 def wait_closed(self):
109 """Coroutine to wait until service is closed."""
110 return NotImplemented
111
112
113class AbstractEventLoop:
114 """Abstract event loop."""
115
116 # Running and stopping the event loop.
117
118 def run_forever(self):
119 """Run the event loop until stop() is called."""
120 raise NotImplementedError
121
122 def run_until_complete(self, future):
123 """Run the event loop until a Future is done.
124
125 Return the Future's result, or raise its exception.
126 """
127 raise NotImplementedError
128
129 def stop(self):
130 """Stop the event loop as soon as reasonable.
131
132 Exactly how soon that is may depend on the implementation, but
133 no more I/O callbacks should be scheduled.
134 """
135 raise NotImplementedError
136
137 def is_running(self):
138 """Return whether the event loop is currently running."""
139 raise NotImplementedError
140
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700141 def close(self):
142 """Close the loop.
143
144 The loop should not be running.
145
146 This is idempotent and irreversible.
147
148 No other methods should be called after this one.
149 """
150 raise NotImplementedError
151
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 # Methods scheduling callbacks. All these return Handles.
153
154 def call_soon(self, callback, *args):
155 return self.call_later(0, callback, *args)
156
157 def call_later(self, delay, callback, *args):
158 raise NotImplementedError
159
160 def call_at(self, when, callback, *args):
161 raise NotImplementedError
162
163 def time(self):
164 raise NotImplementedError
165
166 # Methods for interacting with threads.
167
168 def call_soon_threadsafe(self, callback, *args):
169 raise NotImplementedError
170
171 def run_in_executor(self, executor, callback, *args):
172 raise NotImplementedError
173
174 def set_default_executor(self, executor):
175 raise NotImplementedError
176
177 # Network I/O methods returning Futures.
178
179 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
180 raise NotImplementedError
181
182 def getnameinfo(self, sockaddr, flags=0):
183 raise NotImplementedError
184
185 def create_connection(self, protocol_factory, host=None, port=None, *,
186 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700187 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188 raise NotImplementedError
189
190 def create_server(self, protocol_factory, host=None, port=None, *,
191 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
192 sock=None, backlog=100, ssl=None, reuse_address=None):
193 """A coroutine which creates a TCP server bound to host and port.
194
195 The return value is a Server object which can be used to stop
196 the service.
197
198 If host is an empty string or None all interfaces are assumed
199 and a list of multiple sockets will be returned (most likely
200 one for IPv4 and another one for IPv6).
201
202 family can be set to either AF_INET or AF_INET6 to force the
203 socket to use IPv4 or IPv6. If not set it will be determined
204 from host (defaults to AF_UNSPEC).
205
206 flags is a bitmask for getaddrinfo().
207
208 sock can optionally be specified in order to use a preexisting
209 socket object.
210
211 backlog is the maximum number of queued connections passed to
212 listen() (defaults to 100).
213
214 ssl can be set to an SSLContext to enable SSL over the
215 accepted connections.
216
217 reuse_address tells the kernel to reuse a local socket in
218 TIME_WAIT state, without waiting for its natural timeout to
219 expire. If not specified will automatically be set to True on
220 UNIX.
221 """
222 raise NotImplementedError
223
224 def create_datagram_endpoint(self, protocol_factory,
225 local_addr=None, remote_addr=None, *,
226 family=0, proto=0, flags=0):
227 raise NotImplementedError
228
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700229 # Pipes and subprocesses.
230
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 def connect_read_pipe(self, protocol_factory, pipe):
232 """Register read pipe in eventloop.
233
234 protocol_factory should instantiate object with Protocol interface.
235 pipe is file-like object already switched to nonblocking.
236 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800237 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 # The reason to accept file-like object instead of just file descriptor
239 # is: we need to own pipe and close it at transport finishing
240 # Can got complicated errors if pass f.fileno(),
241 # close fd in pipe transport then close f and vise versa.
242 raise NotImplementedError
243
244 def connect_write_pipe(self, protocol_factory, pipe):
245 """Register write pipe in eventloop.
246
247 protocol_factory should instantiate object with BaseProtocol interface.
248 Pipe is file-like object already switched to nonblocking.
249 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800250 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 # The reason to accept file-like object instead of just file descriptor
252 # is: we need to own pipe and close it at transport finishing
253 # Can got complicated errors if pass f.fileno(),
254 # close fd in pipe transport then close f and vise versa.
255 raise NotImplementedError
256
257 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
258 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
259 **kwargs):
260 raise NotImplementedError
261
262 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
263 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
264 **kwargs):
265 raise NotImplementedError
266
267 # Ready-based callback registration methods.
268 # The add_*() methods return None.
269 # The remove_*() methods return True if something was removed,
270 # False if there was nothing to delete.
271
272 def add_reader(self, fd, callback, *args):
273 raise NotImplementedError
274
275 def remove_reader(self, fd):
276 raise NotImplementedError
277
278 def add_writer(self, fd, callback, *args):
279 raise NotImplementedError
280
281 def remove_writer(self, fd):
282 raise NotImplementedError
283
284 # Completion based I/O methods returning Futures.
285
286 def sock_recv(self, sock, nbytes):
287 raise NotImplementedError
288
289 def sock_sendall(self, sock, data):
290 raise NotImplementedError
291
292 def sock_connect(self, sock, address):
293 raise NotImplementedError
294
295 def sock_accept(self, sock):
296 raise NotImplementedError
297
298 # Signal handling.
299
300 def add_signal_handler(self, sig, callback, *args):
301 raise NotImplementedError
302
303 def remove_signal_handler(self, sig):
304 raise NotImplementedError
305
306
307class AbstractEventLoopPolicy:
308 """Abstract policy for accessing the event loop."""
309
310 def get_event_loop(self):
311 """XXX"""
312 raise NotImplementedError
313
314 def set_event_loop(self, loop):
315 """XXX"""
316 raise NotImplementedError
317
318 def new_event_loop(self):
319 """XXX"""
320 raise NotImplementedError
321
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800322 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800324 def get_child_watcher(self):
325 """XXX"""
326 raise NotImplementedError
327
328 def set_child_watcher(self, watcher):
329 """XXX"""
330 raise NotImplementedError
331
332
333class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 """Default policy implementation for accessing the event loop.
335
336 In this policy, each thread has its own event loop. However, we
337 only automatically create an event loop by default for the main
338 thread; other threads by default have no event loop.
339
340 Other policies may have different rules (e.g. a single global
341 event loop, or automatically creating an event loop per thread, or
342 using some other notion of context to which an event loop is
343 associated).
344 """
345
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800346 _loop_factory = None
347
348 class _Local(threading.local):
349 _loop = None
350 _set_called = False
351
352 def __init__(self):
353 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
355 def get_event_loop(self):
356 """Get the event loop.
357
358 This may be None or an instance of EventLoop.
359 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800360 if (self._local._loop is None and
361 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800363 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800364 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 ('There is no current event loop in thread %r.' %
366 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800367 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368
369 def set_event_loop(self, loop):
370 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800371 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800373 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374
375 def new_event_loop(self):
376 """Create a new event loop.
377
378 You must call set_event_loop() to make this the current event
379 loop.
380 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800381 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382
383
384# Event loop policy. The policy itself is always global, even if the
385# policy's rules say that there is an event loop per thread (or other
386# notion of context). The default policy is installed by the first
387# call to get_event_loop_policy().
388_event_loop_policy = None
389
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800390# Lock for protecting the on-the-fly creation of the event loop policy.
391_lock = threading.Lock()
392
393
394def _init_event_loop_policy():
395 global _event_loop_policy
396 with _lock:
397 if _event_loop_policy is None: # pragma: no branch
398 from . import DefaultEventLoopPolicy
399 _event_loop_policy = DefaultEventLoopPolicy()
400
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401
402def get_event_loop_policy():
403 """XXX"""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800405 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 return _event_loop_policy
407
408
409def set_event_loop_policy(policy):
410 """XXX"""
411 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
413 _event_loop_policy = policy
414
415
416def get_event_loop():
417 """XXX"""
418 return get_event_loop_policy().get_event_loop()
419
420
421def set_event_loop(loop):
422 """XXX"""
423 get_event_loop_policy().set_event_loop(loop)
424
425
426def new_event_loop():
427 """XXX"""
428 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800429
430
431def get_child_watcher():
432 """XXX"""
433 return get_event_loop_policy().get_child_watcher()
434
435
436def set_child_watcher(watcher):
437 """XXX"""
438 return get_event_loop_policy().set_child_watcher(watcher)